xref: /qemu/migration/rdma.c (revision 814bb12a)
1 /*
2  * RDMA protocol and interfaces
3  *
4  * Copyright IBM, Corp. 2010-2013
5  * Copyright Red Hat, Inc. 2015-2016
6  *
7  * Authors:
8  *  Michael R. Hines <mrhines@us.ibm.com>
9  *  Jiuxing Liu <jl@us.ibm.com>
10  *  Daniel P. Berrange <berrange@redhat.com>
11  *
12  * This work is licensed under the terms of the GNU GPL, version 2 or
13  * later.  See the COPYING file in the top-level directory.
14  *
15  */
16 #include "qemu/osdep.h"
17 #include "qapi/error.h"
18 #include "qemu-common.h"
19 #include "qemu/cutils.h"
20 #include "migration/migration.h"
21 #include "migration/qemu-file.h"
22 #include "exec/cpu-common.h"
23 #include "qemu/error-report.h"
24 #include "qemu/main-loop.h"
25 #include "qemu/sockets.h"
26 #include "qemu/bitmap.h"
27 #include "qemu/coroutine.h"
28 #include <sys/socket.h>
29 #include <netdb.h>
30 #include <arpa/inet.h>
31 #include <rdma/rdma_cma.h>
32 #include "trace.h"
33 
34 /*
35  * Print and error on both the Monitor and the Log file.
36  */
37 #define ERROR(errp, fmt, ...) \
38     do { \
39         fprintf(stderr, "RDMA ERROR: " fmt "\n", ## __VA_ARGS__); \
40         if (errp && (*(errp) == NULL)) { \
41             error_setg(errp, "RDMA ERROR: " fmt, ## __VA_ARGS__); \
42         } \
43     } while (0)
44 
45 #define RDMA_RESOLVE_TIMEOUT_MS 10000
46 
47 /* Do not merge data if larger than this. */
48 #define RDMA_MERGE_MAX (2 * 1024 * 1024)
49 #define RDMA_SIGNALED_SEND_MAX (RDMA_MERGE_MAX / 4096)
50 
51 #define RDMA_REG_CHUNK_SHIFT 20 /* 1 MB */
52 
53 /*
54  * This is only for non-live state being migrated.
55  * Instead of RDMA_WRITE messages, we use RDMA_SEND
56  * messages for that state, which requires a different
57  * delivery design than main memory.
58  */
59 #define RDMA_SEND_INCREMENT 32768
60 
61 /*
62  * Maximum size infiniband SEND message
63  */
64 #define RDMA_CONTROL_MAX_BUFFER (512 * 1024)
65 #define RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE 4096
66 
67 #define RDMA_CONTROL_VERSION_CURRENT 1
68 /*
69  * Capabilities for negotiation.
70  */
71 #define RDMA_CAPABILITY_PIN_ALL 0x01
72 
73 /*
74  * Add the other flags above to this list of known capabilities
75  * as they are introduced.
76  */
77 static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL;
78 
79 #define CHECK_ERROR_STATE() \
80     do { \
81         if (rdma->error_state) { \
82             if (!rdma->error_reported) { \
83                 error_report("RDMA is in an error state waiting migration" \
84                                 " to abort!"); \
85                 rdma->error_reported = 1; \
86             } \
87             return rdma->error_state; \
88         } \
89     } while (0);
90 
91 /*
92  * A work request ID is 64-bits and we split up these bits
93  * into 3 parts:
94  *
95  * bits 0-15 : type of control message, 2^16
96  * bits 16-29: ram block index, 2^14
97  * bits 30-63: ram block chunk number, 2^34
98  *
99  * The last two bit ranges are only used for RDMA writes,
100  * in order to track their completion and potentially
101  * also track unregistration status of the message.
102  */
103 #define RDMA_WRID_TYPE_SHIFT  0UL
104 #define RDMA_WRID_BLOCK_SHIFT 16UL
105 #define RDMA_WRID_CHUNK_SHIFT 30UL
106 
107 #define RDMA_WRID_TYPE_MASK \
108     ((1UL << RDMA_WRID_BLOCK_SHIFT) - 1UL)
109 
110 #define RDMA_WRID_BLOCK_MASK \
111     (~RDMA_WRID_TYPE_MASK & ((1UL << RDMA_WRID_CHUNK_SHIFT) - 1UL))
112 
113 #define RDMA_WRID_CHUNK_MASK (~RDMA_WRID_BLOCK_MASK & ~RDMA_WRID_TYPE_MASK)
114 
115 /*
116  * RDMA migration protocol:
117  * 1. RDMA Writes (data messages, i.e. RAM)
118  * 2. IB Send/Recv (control channel messages)
119  */
120 enum {
121     RDMA_WRID_NONE = 0,
122     RDMA_WRID_RDMA_WRITE = 1,
123     RDMA_WRID_SEND_CONTROL = 2000,
124     RDMA_WRID_RECV_CONTROL = 4000,
125 };
126 
127 static const char *wrid_desc[] = {
128     [RDMA_WRID_NONE] = "NONE",
129     [RDMA_WRID_RDMA_WRITE] = "WRITE RDMA",
130     [RDMA_WRID_SEND_CONTROL] = "CONTROL SEND",
131     [RDMA_WRID_RECV_CONTROL] = "CONTROL RECV",
132 };
133 
134 /*
135  * Work request IDs for IB SEND messages only (not RDMA writes).
136  * This is used by the migration protocol to transmit
137  * control messages (such as device state and registration commands)
138  *
139  * We could use more WRs, but we have enough for now.
140  */
141 enum {
142     RDMA_WRID_READY = 0,
143     RDMA_WRID_DATA,
144     RDMA_WRID_CONTROL,
145     RDMA_WRID_MAX,
146 };
147 
148 /*
149  * SEND/RECV IB Control Messages.
150  */
151 enum {
152     RDMA_CONTROL_NONE = 0,
153     RDMA_CONTROL_ERROR,
154     RDMA_CONTROL_READY,               /* ready to receive */
155     RDMA_CONTROL_QEMU_FILE,           /* QEMUFile-transmitted bytes */
156     RDMA_CONTROL_RAM_BLOCKS_REQUEST,  /* RAMBlock synchronization */
157     RDMA_CONTROL_RAM_BLOCKS_RESULT,   /* RAMBlock synchronization */
158     RDMA_CONTROL_COMPRESS,            /* page contains repeat values */
159     RDMA_CONTROL_REGISTER_REQUEST,    /* dynamic page registration */
160     RDMA_CONTROL_REGISTER_RESULT,     /* key to use after registration */
161     RDMA_CONTROL_REGISTER_FINISHED,   /* current iteration finished */
162     RDMA_CONTROL_UNREGISTER_REQUEST,  /* dynamic UN-registration */
163     RDMA_CONTROL_UNREGISTER_FINISHED, /* unpinning finished */
164 };
165 
166 static const char *control_desc[] = {
167     [RDMA_CONTROL_NONE] = "NONE",
168     [RDMA_CONTROL_ERROR] = "ERROR",
169     [RDMA_CONTROL_READY] = "READY",
170     [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE",
171     [RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST",
172     [RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT",
173     [RDMA_CONTROL_COMPRESS] = "COMPRESS",
174     [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST",
175     [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT",
176     [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED",
177     [RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST",
178     [RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED",
179 };
180 
181 /*
182  * Memory and MR structures used to represent an IB Send/Recv work request.
183  * This is *not* used for RDMA writes, only IB Send/Recv.
184  */
185 typedef struct {
186     uint8_t  control[RDMA_CONTROL_MAX_BUFFER]; /* actual buffer to register */
187     struct   ibv_mr *control_mr;               /* registration metadata */
188     size_t   control_len;                      /* length of the message */
189     uint8_t *control_curr;                     /* start of unconsumed bytes */
190 } RDMAWorkRequestData;
191 
192 /*
193  * Negotiate RDMA capabilities during connection-setup time.
194  */
195 typedef struct {
196     uint32_t version;
197     uint32_t flags;
198 } RDMACapabilities;
199 
200 static void caps_to_network(RDMACapabilities *cap)
201 {
202     cap->version = htonl(cap->version);
203     cap->flags = htonl(cap->flags);
204 }
205 
206 static void network_to_caps(RDMACapabilities *cap)
207 {
208     cap->version = ntohl(cap->version);
209     cap->flags = ntohl(cap->flags);
210 }
211 
212 /*
213  * Representation of a RAMBlock from an RDMA perspective.
214  * This is not transmitted, only local.
215  * This and subsequent structures cannot be linked lists
216  * because we're using a single IB message to transmit
217  * the information. It's small anyway, so a list is overkill.
218  */
219 typedef struct RDMALocalBlock {
220     char          *block_name;
221     uint8_t       *local_host_addr; /* local virtual address */
222     uint64_t       remote_host_addr; /* remote virtual address */
223     uint64_t       offset;
224     uint64_t       length;
225     struct         ibv_mr **pmr;    /* MRs for chunk-level registration */
226     struct         ibv_mr *mr;      /* MR for non-chunk-level registration */
227     uint32_t      *remote_keys;     /* rkeys for chunk-level registration */
228     uint32_t       remote_rkey;     /* rkeys for non-chunk-level registration */
229     int            index;           /* which block are we */
230     unsigned int   src_index;       /* (Only used on dest) */
231     bool           is_ram_block;
232     int            nb_chunks;
233     unsigned long *transit_bitmap;
234     unsigned long *unregister_bitmap;
235 } RDMALocalBlock;
236 
237 /*
238  * Also represents a RAMblock, but only on the dest.
239  * This gets transmitted by the dest during connection-time
240  * to the source VM and then is used to populate the
241  * corresponding RDMALocalBlock with
242  * the information needed to perform the actual RDMA.
243  */
244 typedef struct QEMU_PACKED RDMADestBlock {
245     uint64_t remote_host_addr;
246     uint64_t offset;
247     uint64_t length;
248     uint32_t remote_rkey;
249     uint32_t padding;
250 } RDMADestBlock;
251 
252 static uint64_t htonll(uint64_t v)
253 {
254     union { uint32_t lv[2]; uint64_t llv; } u;
255     u.lv[0] = htonl(v >> 32);
256     u.lv[1] = htonl(v & 0xFFFFFFFFULL);
257     return u.llv;
258 }
259 
260 static uint64_t ntohll(uint64_t v) {
261     union { uint32_t lv[2]; uint64_t llv; } u;
262     u.llv = v;
263     return ((uint64_t)ntohl(u.lv[0]) << 32) | (uint64_t) ntohl(u.lv[1]);
264 }
265 
266 static void dest_block_to_network(RDMADestBlock *db)
267 {
268     db->remote_host_addr = htonll(db->remote_host_addr);
269     db->offset = htonll(db->offset);
270     db->length = htonll(db->length);
271     db->remote_rkey = htonl(db->remote_rkey);
272 }
273 
274 static void network_to_dest_block(RDMADestBlock *db)
275 {
276     db->remote_host_addr = ntohll(db->remote_host_addr);
277     db->offset = ntohll(db->offset);
278     db->length = ntohll(db->length);
279     db->remote_rkey = ntohl(db->remote_rkey);
280 }
281 
282 /*
283  * Virtual address of the above structures used for transmitting
284  * the RAMBlock descriptions at connection-time.
285  * This structure is *not* transmitted.
286  */
287 typedef struct RDMALocalBlocks {
288     int nb_blocks;
289     bool     init;             /* main memory init complete */
290     RDMALocalBlock *block;
291 } RDMALocalBlocks;
292 
293 /*
294  * Main data structure for RDMA state.
295  * While there is only one copy of this structure being allocated right now,
296  * this is the place where one would start if you wanted to consider
297  * having more than one RDMA connection open at the same time.
298  */
299 typedef struct RDMAContext {
300     char *host;
301     int port;
302 
303     RDMAWorkRequestData wr_data[RDMA_WRID_MAX];
304 
305     /*
306      * This is used by *_exchange_send() to figure out whether or not
307      * the initial "READY" message has already been received or not.
308      * This is because other functions may potentially poll() and detect
309      * the READY message before send() does, in which case we need to
310      * know if it completed.
311      */
312     int control_ready_expected;
313 
314     /* number of outstanding writes */
315     int nb_sent;
316 
317     /* store info about current buffer so that we can
318        merge it with future sends */
319     uint64_t current_addr;
320     uint64_t current_length;
321     /* index of ram block the current buffer belongs to */
322     int current_index;
323     /* index of the chunk in the current ram block */
324     int current_chunk;
325 
326     bool pin_all;
327 
328     /*
329      * infiniband-specific variables for opening the device
330      * and maintaining connection state and so forth.
331      *
332      * cm_id also has ibv_context, rdma_event_channel, and ibv_qp in
333      * cm_id->verbs, cm_id->channel, and cm_id->qp.
334      */
335     struct rdma_cm_id *cm_id;               /* connection manager ID */
336     struct rdma_cm_id *listen_id;
337     bool connected;
338 
339     struct ibv_context          *verbs;
340     struct rdma_event_channel   *channel;
341     struct ibv_qp *qp;                      /* queue pair */
342     struct ibv_comp_channel *comp_channel;  /* completion channel */
343     struct ibv_pd *pd;                      /* protection domain */
344     struct ibv_cq *cq;                      /* completion queue */
345 
346     /*
347      * If a previous write failed (perhaps because of a failed
348      * memory registration, then do not attempt any future work
349      * and remember the error state.
350      */
351     int error_state;
352     int error_reported;
353     int received_error;
354 
355     /*
356      * Description of ram blocks used throughout the code.
357      */
358     RDMALocalBlocks local_ram_blocks;
359     RDMADestBlock  *dest_blocks;
360 
361     /* Index of the next RAMBlock received during block registration */
362     unsigned int    next_src_index;
363 
364     /*
365      * Migration on *destination* started.
366      * Then use coroutine yield function.
367      * Source runs in a thread, so we don't care.
368      */
369     int migration_started_on_destination;
370 
371     int total_registrations;
372     int total_writes;
373 
374     int unregister_current, unregister_next;
375     uint64_t unregistrations[RDMA_SIGNALED_SEND_MAX];
376 
377     GHashTable *blockmap;
378 } RDMAContext;
379 
380 #define TYPE_QIO_CHANNEL_RDMA "qio-channel-rdma"
381 #define QIO_CHANNEL_RDMA(obj)                                     \
382     OBJECT_CHECK(QIOChannelRDMA, (obj), TYPE_QIO_CHANNEL_RDMA)
383 
384 typedef struct QIOChannelRDMA QIOChannelRDMA;
385 
386 
387 struct QIOChannelRDMA {
388     QIOChannel parent;
389     RDMAContext *rdma;
390     QEMUFile *file;
391     size_t len;
392     bool blocking; /* XXX we don't actually honour this yet */
393 };
394 
395 /*
396  * Main structure for IB Send/Recv control messages.
397  * This gets prepended at the beginning of every Send/Recv.
398  */
399 typedef struct QEMU_PACKED {
400     uint32_t len;     /* Total length of data portion */
401     uint32_t type;    /* which control command to perform */
402     uint32_t repeat;  /* number of commands in data portion of same type */
403     uint32_t padding;
404 } RDMAControlHeader;
405 
406 static void control_to_network(RDMAControlHeader *control)
407 {
408     control->type = htonl(control->type);
409     control->len = htonl(control->len);
410     control->repeat = htonl(control->repeat);
411 }
412 
413 static void network_to_control(RDMAControlHeader *control)
414 {
415     control->type = ntohl(control->type);
416     control->len = ntohl(control->len);
417     control->repeat = ntohl(control->repeat);
418 }
419 
420 /*
421  * Register a single Chunk.
422  * Information sent by the source VM to inform the dest
423  * to register an single chunk of memory before we can perform
424  * the actual RDMA operation.
425  */
426 typedef struct QEMU_PACKED {
427     union QEMU_PACKED {
428         uint64_t current_addr;  /* offset into the ram_addr_t space */
429         uint64_t chunk;         /* chunk to lookup if unregistering */
430     } key;
431     uint32_t current_index; /* which ramblock the chunk belongs to */
432     uint32_t padding;
433     uint64_t chunks;            /* how many sequential chunks to register */
434 } RDMARegister;
435 
436 static void register_to_network(RDMAContext *rdma, RDMARegister *reg)
437 {
438     RDMALocalBlock *local_block;
439     local_block  = &rdma->local_ram_blocks.block[reg->current_index];
440 
441     if (local_block->is_ram_block) {
442         /*
443          * current_addr as passed in is an address in the local ram_addr_t
444          * space, we need to translate this for the destination
445          */
446         reg->key.current_addr -= local_block->offset;
447         reg->key.current_addr += rdma->dest_blocks[reg->current_index].offset;
448     }
449     reg->key.current_addr = htonll(reg->key.current_addr);
450     reg->current_index = htonl(reg->current_index);
451     reg->chunks = htonll(reg->chunks);
452 }
453 
454 static void network_to_register(RDMARegister *reg)
455 {
456     reg->key.current_addr = ntohll(reg->key.current_addr);
457     reg->current_index = ntohl(reg->current_index);
458     reg->chunks = ntohll(reg->chunks);
459 }
460 
461 typedef struct QEMU_PACKED {
462     uint32_t value;     /* if zero, we will madvise() */
463     uint32_t block_idx; /* which ram block index */
464     uint64_t offset;    /* Address in remote ram_addr_t space */
465     uint64_t length;    /* length of the chunk */
466 } RDMACompress;
467 
468 static void compress_to_network(RDMAContext *rdma, RDMACompress *comp)
469 {
470     comp->value = htonl(comp->value);
471     /*
472      * comp->offset as passed in is an address in the local ram_addr_t
473      * space, we need to translate this for the destination
474      */
475     comp->offset -= rdma->local_ram_blocks.block[comp->block_idx].offset;
476     comp->offset += rdma->dest_blocks[comp->block_idx].offset;
477     comp->block_idx = htonl(comp->block_idx);
478     comp->offset = htonll(comp->offset);
479     comp->length = htonll(comp->length);
480 }
481 
482 static void network_to_compress(RDMACompress *comp)
483 {
484     comp->value = ntohl(comp->value);
485     comp->block_idx = ntohl(comp->block_idx);
486     comp->offset = ntohll(comp->offset);
487     comp->length = ntohll(comp->length);
488 }
489 
490 /*
491  * The result of the dest's memory registration produces an "rkey"
492  * which the source VM must reference in order to perform
493  * the RDMA operation.
494  */
495 typedef struct QEMU_PACKED {
496     uint32_t rkey;
497     uint32_t padding;
498     uint64_t host_addr;
499 } RDMARegisterResult;
500 
501 static void result_to_network(RDMARegisterResult *result)
502 {
503     result->rkey = htonl(result->rkey);
504     result->host_addr = htonll(result->host_addr);
505 };
506 
507 static void network_to_result(RDMARegisterResult *result)
508 {
509     result->rkey = ntohl(result->rkey);
510     result->host_addr = ntohll(result->host_addr);
511 };
512 
513 const char *print_wrid(int wrid);
514 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
515                                    uint8_t *data, RDMAControlHeader *resp,
516                                    int *resp_idx,
517                                    int (*callback)(RDMAContext *rdma));
518 
519 static inline uint64_t ram_chunk_index(const uint8_t *start,
520                                        const uint8_t *host)
521 {
522     return ((uintptr_t) host - (uintptr_t) start) >> RDMA_REG_CHUNK_SHIFT;
523 }
524 
525 static inline uint8_t *ram_chunk_start(const RDMALocalBlock *rdma_ram_block,
526                                        uint64_t i)
527 {
528     return (uint8_t *)(uintptr_t)(rdma_ram_block->local_host_addr +
529                                   (i << RDMA_REG_CHUNK_SHIFT));
530 }
531 
532 static inline uint8_t *ram_chunk_end(const RDMALocalBlock *rdma_ram_block,
533                                      uint64_t i)
534 {
535     uint8_t *result = ram_chunk_start(rdma_ram_block, i) +
536                                          (1UL << RDMA_REG_CHUNK_SHIFT);
537 
538     if (result > (rdma_ram_block->local_host_addr + rdma_ram_block->length)) {
539         result = rdma_ram_block->local_host_addr + rdma_ram_block->length;
540     }
541 
542     return result;
543 }
544 
545 static int rdma_add_block(RDMAContext *rdma, const char *block_name,
546                          void *host_addr,
547                          ram_addr_t block_offset, uint64_t length)
548 {
549     RDMALocalBlocks *local = &rdma->local_ram_blocks;
550     RDMALocalBlock *block;
551     RDMALocalBlock *old = local->block;
552 
553     local->block = g_new0(RDMALocalBlock, local->nb_blocks + 1);
554 
555     if (local->nb_blocks) {
556         int x;
557 
558         if (rdma->blockmap) {
559             for (x = 0; x < local->nb_blocks; x++) {
560                 g_hash_table_remove(rdma->blockmap,
561                                     (void *)(uintptr_t)old[x].offset);
562                 g_hash_table_insert(rdma->blockmap,
563                                     (void *)(uintptr_t)old[x].offset,
564                                     &local->block[x]);
565             }
566         }
567         memcpy(local->block, old, sizeof(RDMALocalBlock) * local->nb_blocks);
568         g_free(old);
569     }
570 
571     block = &local->block[local->nb_blocks];
572 
573     block->block_name = g_strdup(block_name);
574     block->local_host_addr = host_addr;
575     block->offset = block_offset;
576     block->length = length;
577     block->index = local->nb_blocks;
578     block->src_index = ~0U; /* Filled in by the receipt of the block list */
579     block->nb_chunks = ram_chunk_index(host_addr, host_addr + length) + 1UL;
580     block->transit_bitmap = bitmap_new(block->nb_chunks);
581     bitmap_clear(block->transit_bitmap, 0, block->nb_chunks);
582     block->unregister_bitmap = bitmap_new(block->nb_chunks);
583     bitmap_clear(block->unregister_bitmap, 0, block->nb_chunks);
584     block->remote_keys = g_new0(uint32_t, block->nb_chunks);
585 
586     block->is_ram_block = local->init ? false : true;
587 
588     if (rdma->blockmap) {
589         g_hash_table_insert(rdma->blockmap, (void *)(uintptr_t)block_offset, block);
590     }
591 
592     trace_rdma_add_block(block_name, local->nb_blocks,
593                          (uintptr_t) block->local_host_addr,
594                          block->offset, block->length,
595                          (uintptr_t) (block->local_host_addr + block->length),
596                          BITS_TO_LONGS(block->nb_chunks) *
597                              sizeof(unsigned long) * 8,
598                          block->nb_chunks);
599 
600     local->nb_blocks++;
601 
602     return 0;
603 }
604 
605 /*
606  * Memory regions need to be registered with the device and queue pairs setup
607  * in advanced before the migration starts. This tells us where the RAM blocks
608  * are so that we can register them individually.
609  */
610 static int qemu_rdma_init_one_block(const char *block_name, void *host_addr,
611     ram_addr_t block_offset, ram_addr_t length, void *opaque)
612 {
613     return rdma_add_block(opaque, block_name, host_addr, block_offset, length);
614 }
615 
616 /*
617  * Identify the RAMBlocks and their quantity. They will be references to
618  * identify chunk boundaries inside each RAMBlock and also be referenced
619  * during dynamic page registration.
620  */
621 static int qemu_rdma_init_ram_blocks(RDMAContext *rdma)
622 {
623     RDMALocalBlocks *local = &rdma->local_ram_blocks;
624 
625     assert(rdma->blockmap == NULL);
626     memset(local, 0, sizeof *local);
627     qemu_ram_foreach_block(qemu_rdma_init_one_block, rdma);
628     trace_qemu_rdma_init_ram_blocks(local->nb_blocks);
629     rdma->dest_blocks = g_new0(RDMADestBlock,
630                                rdma->local_ram_blocks.nb_blocks);
631     local->init = true;
632     return 0;
633 }
634 
635 /*
636  * Note: If used outside of cleanup, the caller must ensure that the destination
637  * block structures are also updated
638  */
639 static int rdma_delete_block(RDMAContext *rdma, RDMALocalBlock *block)
640 {
641     RDMALocalBlocks *local = &rdma->local_ram_blocks;
642     RDMALocalBlock *old = local->block;
643     int x;
644 
645     if (rdma->blockmap) {
646         g_hash_table_remove(rdma->blockmap, (void *)(uintptr_t)block->offset);
647     }
648     if (block->pmr) {
649         int j;
650 
651         for (j = 0; j < block->nb_chunks; j++) {
652             if (!block->pmr[j]) {
653                 continue;
654             }
655             ibv_dereg_mr(block->pmr[j]);
656             rdma->total_registrations--;
657         }
658         g_free(block->pmr);
659         block->pmr = NULL;
660     }
661 
662     if (block->mr) {
663         ibv_dereg_mr(block->mr);
664         rdma->total_registrations--;
665         block->mr = NULL;
666     }
667 
668     g_free(block->transit_bitmap);
669     block->transit_bitmap = NULL;
670 
671     g_free(block->unregister_bitmap);
672     block->unregister_bitmap = NULL;
673 
674     g_free(block->remote_keys);
675     block->remote_keys = NULL;
676 
677     g_free(block->block_name);
678     block->block_name = NULL;
679 
680     if (rdma->blockmap) {
681         for (x = 0; x < local->nb_blocks; x++) {
682             g_hash_table_remove(rdma->blockmap,
683                                 (void *)(uintptr_t)old[x].offset);
684         }
685     }
686 
687     if (local->nb_blocks > 1) {
688 
689         local->block = g_new0(RDMALocalBlock, local->nb_blocks - 1);
690 
691         if (block->index) {
692             memcpy(local->block, old, sizeof(RDMALocalBlock) * block->index);
693         }
694 
695         if (block->index < (local->nb_blocks - 1)) {
696             memcpy(local->block + block->index, old + (block->index + 1),
697                 sizeof(RDMALocalBlock) *
698                     (local->nb_blocks - (block->index + 1)));
699         }
700     } else {
701         assert(block == local->block);
702         local->block = NULL;
703     }
704 
705     trace_rdma_delete_block(block, (uintptr_t)block->local_host_addr,
706                            block->offset, block->length,
707                             (uintptr_t)(block->local_host_addr + block->length),
708                            BITS_TO_LONGS(block->nb_chunks) *
709                                sizeof(unsigned long) * 8, block->nb_chunks);
710 
711     g_free(old);
712 
713     local->nb_blocks--;
714 
715     if (local->nb_blocks && rdma->blockmap) {
716         for (x = 0; x < local->nb_blocks; x++) {
717             g_hash_table_insert(rdma->blockmap,
718                                 (void *)(uintptr_t)local->block[x].offset,
719                                 &local->block[x]);
720         }
721     }
722 
723     return 0;
724 }
725 
726 /*
727  * Put in the log file which RDMA device was opened and the details
728  * associated with that device.
729  */
730 static void qemu_rdma_dump_id(const char *who, struct ibv_context *verbs)
731 {
732     struct ibv_port_attr port;
733 
734     if (ibv_query_port(verbs, 1, &port)) {
735         error_report("Failed to query port information");
736         return;
737     }
738 
739     printf("%s RDMA Device opened: kernel name %s "
740            "uverbs device name %s, "
741            "infiniband_verbs class device path %s, "
742            "infiniband class device path %s, "
743            "transport: (%d) %s\n",
744                 who,
745                 verbs->device->name,
746                 verbs->device->dev_name,
747                 verbs->device->dev_path,
748                 verbs->device->ibdev_path,
749                 port.link_layer,
750                 (port.link_layer == IBV_LINK_LAYER_INFINIBAND) ? "Infiniband" :
751                  ((port.link_layer == IBV_LINK_LAYER_ETHERNET)
752                     ? "Ethernet" : "Unknown"));
753 }
754 
755 /*
756  * Put in the log file the RDMA gid addressing information,
757  * useful for folks who have trouble understanding the
758  * RDMA device hierarchy in the kernel.
759  */
760 static void qemu_rdma_dump_gid(const char *who, struct rdma_cm_id *id)
761 {
762     char sgid[33];
763     char dgid[33];
764     inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.sgid, sgid, sizeof sgid);
765     inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.dgid, dgid, sizeof dgid);
766     trace_qemu_rdma_dump_gid(who, sgid, dgid);
767 }
768 
769 /*
770  * As of now, IPv6 over RoCE / iWARP is not supported by linux.
771  * We will try the next addrinfo struct, and fail if there are
772  * no other valid addresses to bind against.
773  *
774  * If user is listening on '[::]', then we will not have a opened a device
775  * yet and have no way of verifying if the device is RoCE or not.
776  *
777  * In this case, the source VM will throw an error for ALL types of
778  * connections (both IPv4 and IPv6) if the destination machine does not have
779  * a regular infiniband network available for use.
780  *
781  * The only way to guarantee that an error is thrown for broken kernels is
782  * for the management software to choose a *specific* interface at bind time
783  * and validate what time of hardware it is.
784  *
785  * Unfortunately, this puts the user in a fix:
786  *
787  *  If the source VM connects with an IPv4 address without knowing that the
788  *  destination has bound to '[::]' the migration will unconditionally fail
789  *  unless the management software is explicitly listening on the IPv4
790  *  address while using a RoCE-based device.
791  *
792  *  If the source VM connects with an IPv6 address, then we're OK because we can
793  *  throw an error on the source (and similarly on the destination).
794  *
795  *  But in mixed environments, this will be broken for a while until it is fixed
796  *  inside linux.
797  *
798  * We do provide a *tiny* bit of help in this function: We can list all of the
799  * devices in the system and check to see if all the devices are RoCE or
800  * Infiniband.
801  *
802  * If we detect that we have a *pure* RoCE environment, then we can safely
803  * thrown an error even if the management software has specified '[::]' as the
804  * bind address.
805  *
806  * However, if there is are multiple hetergeneous devices, then we cannot make
807  * this assumption and the user just has to be sure they know what they are
808  * doing.
809  *
810  * Patches are being reviewed on linux-rdma.
811  */
812 static int qemu_rdma_broken_ipv6_kernel(Error **errp, struct ibv_context *verbs)
813 {
814     struct ibv_port_attr port_attr;
815 
816     /* This bug only exists in linux, to our knowledge. */
817 #ifdef CONFIG_LINUX
818 
819     /*
820      * Verbs are only NULL if management has bound to '[::]'.
821      *
822      * Let's iterate through all the devices and see if there any pure IB
823      * devices (non-ethernet).
824      *
825      * If not, then we can safely proceed with the migration.
826      * Otherwise, there are no guarantees until the bug is fixed in linux.
827      */
828     if (!verbs) {
829         int num_devices, x;
830         struct ibv_device ** dev_list = ibv_get_device_list(&num_devices);
831         bool roce_found = false;
832         bool ib_found = false;
833 
834         for (x = 0; x < num_devices; x++) {
835             verbs = ibv_open_device(dev_list[x]);
836             if (!verbs) {
837                 if (errno == EPERM) {
838                     continue;
839                 } else {
840                     return -EINVAL;
841                 }
842             }
843 
844             if (ibv_query_port(verbs, 1, &port_attr)) {
845                 ibv_close_device(verbs);
846                 ERROR(errp, "Could not query initial IB port");
847                 return -EINVAL;
848             }
849 
850             if (port_attr.link_layer == IBV_LINK_LAYER_INFINIBAND) {
851                 ib_found = true;
852             } else if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) {
853                 roce_found = true;
854             }
855 
856             ibv_close_device(verbs);
857 
858         }
859 
860         if (roce_found) {
861             if (ib_found) {
862                 fprintf(stderr, "WARN: migrations may fail:"
863                                 " IPv6 over RoCE / iWARP in linux"
864                                 " is broken. But since you appear to have a"
865                                 " mixed RoCE / IB environment, be sure to only"
866                                 " migrate over the IB fabric until the kernel "
867                                 " fixes the bug.\n");
868             } else {
869                 ERROR(errp, "You only have RoCE / iWARP devices in your systems"
870                             " and your management software has specified '[::]'"
871                             ", but IPv6 over RoCE / iWARP is not supported in Linux.");
872                 return -ENONET;
873             }
874         }
875 
876         return 0;
877     }
878 
879     /*
880      * If we have a verbs context, that means that some other than '[::]' was
881      * used by the management software for binding. In which case we can
882      * actually warn the user about a potentially broken kernel.
883      */
884 
885     /* IB ports start with 1, not 0 */
886     if (ibv_query_port(verbs, 1, &port_attr)) {
887         ERROR(errp, "Could not query initial IB port");
888         return -EINVAL;
889     }
890 
891     if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) {
892         ERROR(errp, "Linux kernel's RoCE / iWARP does not support IPv6 "
893                     "(but patches on linux-rdma in progress)");
894         return -ENONET;
895     }
896 
897 #endif
898 
899     return 0;
900 }
901 
902 /*
903  * Figure out which RDMA device corresponds to the requested IP hostname
904  * Also create the initial connection manager identifiers for opening
905  * the connection.
906  */
907 static int qemu_rdma_resolve_host(RDMAContext *rdma, Error **errp)
908 {
909     int ret;
910     struct rdma_addrinfo *res;
911     char port_str[16];
912     struct rdma_cm_event *cm_event;
913     char ip[40] = "unknown";
914     struct rdma_addrinfo *e;
915 
916     if (rdma->host == NULL || !strcmp(rdma->host, "")) {
917         ERROR(errp, "RDMA hostname has not been set");
918         return -EINVAL;
919     }
920 
921     /* create CM channel */
922     rdma->channel = rdma_create_event_channel();
923     if (!rdma->channel) {
924         ERROR(errp, "could not create CM channel");
925         return -EINVAL;
926     }
927 
928     /* create CM id */
929     ret = rdma_create_id(rdma->channel, &rdma->cm_id, NULL, RDMA_PS_TCP);
930     if (ret) {
931         ERROR(errp, "could not create channel id");
932         goto err_resolve_create_id;
933     }
934 
935     snprintf(port_str, 16, "%d", rdma->port);
936     port_str[15] = '\0';
937 
938     ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res);
939     if (ret < 0) {
940         ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host);
941         goto err_resolve_get_addr;
942     }
943 
944     for (e = res; e != NULL; e = e->ai_next) {
945         inet_ntop(e->ai_family,
946             &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip);
947         trace_qemu_rdma_resolve_host_trying(rdma->host, ip);
948 
949         ret = rdma_resolve_addr(rdma->cm_id, NULL, e->ai_dst_addr,
950                 RDMA_RESOLVE_TIMEOUT_MS);
951         if (!ret) {
952             if (e->ai_family == AF_INET6) {
953                 ret = qemu_rdma_broken_ipv6_kernel(errp, rdma->cm_id->verbs);
954                 if (ret) {
955                     continue;
956                 }
957             }
958             goto route;
959         }
960     }
961 
962     ERROR(errp, "could not resolve address %s", rdma->host);
963     goto err_resolve_get_addr;
964 
965 route:
966     qemu_rdma_dump_gid("source_resolve_addr", rdma->cm_id);
967 
968     ret = rdma_get_cm_event(rdma->channel, &cm_event);
969     if (ret) {
970         ERROR(errp, "could not perform event_addr_resolved");
971         goto err_resolve_get_addr;
972     }
973 
974     if (cm_event->event != RDMA_CM_EVENT_ADDR_RESOLVED) {
975         ERROR(errp, "result not equal to event_addr_resolved %s",
976                 rdma_event_str(cm_event->event));
977         perror("rdma_resolve_addr");
978         rdma_ack_cm_event(cm_event);
979         ret = -EINVAL;
980         goto err_resolve_get_addr;
981     }
982     rdma_ack_cm_event(cm_event);
983 
984     /* resolve route */
985     ret = rdma_resolve_route(rdma->cm_id, RDMA_RESOLVE_TIMEOUT_MS);
986     if (ret) {
987         ERROR(errp, "could not resolve rdma route");
988         goto err_resolve_get_addr;
989     }
990 
991     ret = rdma_get_cm_event(rdma->channel, &cm_event);
992     if (ret) {
993         ERROR(errp, "could not perform event_route_resolved");
994         goto err_resolve_get_addr;
995     }
996     if (cm_event->event != RDMA_CM_EVENT_ROUTE_RESOLVED) {
997         ERROR(errp, "result not equal to event_route_resolved: %s",
998                         rdma_event_str(cm_event->event));
999         rdma_ack_cm_event(cm_event);
1000         ret = -EINVAL;
1001         goto err_resolve_get_addr;
1002     }
1003     rdma_ack_cm_event(cm_event);
1004     rdma->verbs = rdma->cm_id->verbs;
1005     qemu_rdma_dump_id("source_resolve_host", rdma->cm_id->verbs);
1006     qemu_rdma_dump_gid("source_resolve_host", rdma->cm_id);
1007     return 0;
1008 
1009 err_resolve_get_addr:
1010     rdma_destroy_id(rdma->cm_id);
1011     rdma->cm_id = NULL;
1012 err_resolve_create_id:
1013     rdma_destroy_event_channel(rdma->channel);
1014     rdma->channel = NULL;
1015     return ret;
1016 }
1017 
1018 /*
1019  * Create protection domain and completion queues
1020  */
1021 static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma)
1022 {
1023     /* allocate pd */
1024     rdma->pd = ibv_alloc_pd(rdma->verbs);
1025     if (!rdma->pd) {
1026         error_report("failed to allocate protection domain");
1027         return -1;
1028     }
1029 
1030     /* create completion channel */
1031     rdma->comp_channel = ibv_create_comp_channel(rdma->verbs);
1032     if (!rdma->comp_channel) {
1033         error_report("failed to allocate completion channel");
1034         goto err_alloc_pd_cq;
1035     }
1036 
1037     /*
1038      * Completion queue can be filled by both read and write work requests,
1039      * so must reflect the sum of both possible queue sizes.
1040      */
1041     rdma->cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3),
1042             NULL, rdma->comp_channel, 0);
1043     if (!rdma->cq) {
1044         error_report("failed to allocate completion queue");
1045         goto err_alloc_pd_cq;
1046     }
1047 
1048     return 0;
1049 
1050 err_alloc_pd_cq:
1051     if (rdma->pd) {
1052         ibv_dealloc_pd(rdma->pd);
1053     }
1054     if (rdma->comp_channel) {
1055         ibv_destroy_comp_channel(rdma->comp_channel);
1056     }
1057     rdma->pd = NULL;
1058     rdma->comp_channel = NULL;
1059     return -1;
1060 
1061 }
1062 
1063 /*
1064  * Create queue pairs.
1065  */
1066 static int qemu_rdma_alloc_qp(RDMAContext *rdma)
1067 {
1068     struct ibv_qp_init_attr attr = { 0 };
1069     int ret;
1070 
1071     attr.cap.max_send_wr = RDMA_SIGNALED_SEND_MAX;
1072     attr.cap.max_recv_wr = 3;
1073     attr.cap.max_send_sge = 1;
1074     attr.cap.max_recv_sge = 1;
1075     attr.send_cq = rdma->cq;
1076     attr.recv_cq = rdma->cq;
1077     attr.qp_type = IBV_QPT_RC;
1078 
1079     ret = rdma_create_qp(rdma->cm_id, rdma->pd, &attr);
1080     if (ret) {
1081         return -1;
1082     }
1083 
1084     rdma->qp = rdma->cm_id->qp;
1085     return 0;
1086 }
1087 
1088 static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma)
1089 {
1090     int i;
1091     RDMALocalBlocks *local = &rdma->local_ram_blocks;
1092 
1093     for (i = 0; i < local->nb_blocks; i++) {
1094         local->block[i].mr =
1095             ibv_reg_mr(rdma->pd,
1096                     local->block[i].local_host_addr,
1097                     local->block[i].length,
1098                     IBV_ACCESS_LOCAL_WRITE |
1099                     IBV_ACCESS_REMOTE_WRITE
1100                     );
1101         if (!local->block[i].mr) {
1102             perror("Failed to register local dest ram block!\n");
1103             break;
1104         }
1105         rdma->total_registrations++;
1106     }
1107 
1108     if (i >= local->nb_blocks) {
1109         return 0;
1110     }
1111 
1112     for (i--; i >= 0; i--) {
1113         ibv_dereg_mr(local->block[i].mr);
1114         rdma->total_registrations--;
1115     }
1116 
1117     return -1;
1118 
1119 }
1120 
1121 /*
1122  * Find the ram block that corresponds to the page requested to be
1123  * transmitted by QEMU.
1124  *
1125  * Once the block is found, also identify which 'chunk' within that
1126  * block that the page belongs to.
1127  *
1128  * This search cannot fail or the migration will fail.
1129  */
1130 static int qemu_rdma_search_ram_block(RDMAContext *rdma,
1131                                       uintptr_t block_offset,
1132                                       uint64_t offset,
1133                                       uint64_t length,
1134                                       uint64_t *block_index,
1135                                       uint64_t *chunk_index)
1136 {
1137     uint64_t current_addr = block_offset + offset;
1138     RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap,
1139                                                 (void *) block_offset);
1140     assert(block);
1141     assert(current_addr >= block->offset);
1142     assert((current_addr + length) <= (block->offset + block->length));
1143 
1144     *block_index = block->index;
1145     *chunk_index = ram_chunk_index(block->local_host_addr,
1146                 block->local_host_addr + (current_addr - block->offset));
1147 
1148     return 0;
1149 }
1150 
1151 /*
1152  * Register a chunk with IB. If the chunk was already registered
1153  * previously, then skip.
1154  *
1155  * Also return the keys associated with the registration needed
1156  * to perform the actual RDMA operation.
1157  */
1158 static int qemu_rdma_register_and_get_keys(RDMAContext *rdma,
1159         RDMALocalBlock *block, uintptr_t host_addr,
1160         uint32_t *lkey, uint32_t *rkey, int chunk,
1161         uint8_t *chunk_start, uint8_t *chunk_end)
1162 {
1163     if (block->mr) {
1164         if (lkey) {
1165             *lkey = block->mr->lkey;
1166         }
1167         if (rkey) {
1168             *rkey = block->mr->rkey;
1169         }
1170         return 0;
1171     }
1172 
1173     /* allocate memory to store chunk MRs */
1174     if (!block->pmr) {
1175         block->pmr = g_new0(struct ibv_mr *, block->nb_chunks);
1176     }
1177 
1178     /*
1179      * If 'rkey', then we're the destination, so grant access to the source.
1180      *
1181      * If 'lkey', then we're the source VM, so grant access only to ourselves.
1182      */
1183     if (!block->pmr[chunk]) {
1184         uint64_t len = chunk_end - chunk_start;
1185 
1186         trace_qemu_rdma_register_and_get_keys(len, chunk_start);
1187 
1188         block->pmr[chunk] = ibv_reg_mr(rdma->pd,
1189                 chunk_start, len,
1190                 (rkey ? (IBV_ACCESS_LOCAL_WRITE |
1191                         IBV_ACCESS_REMOTE_WRITE) : 0));
1192 
1193         if (!block->pmr[chunk]) {
1194             perror("Failed to register chunk!");
1195             fprintf(stderr, "Chunk details: block: %d chunk index %d"
1196                             " start %" PRIuPTR " end %" PRIuPTR
1197                             " host %" PRIuPTR
1198                             " local %" PRIuPTR " registrations: %d\n",
1199                             block->index, chunk, (uintptr_t)chunk_start,
1200                             (uintptr_t)chunk_end, host_addr,
1201                             (uintptr_t)block->local_host_addr,
1202                             rdma->total_registrations);
1203             return -1;
1204         }
1205         rdma->total_registrations++;
1206     }
1207 
1208     if (lkey) {
1209         *lkey = block->pmr[chunk]->lkey;
1210     }
1211     if (rkey) {
1212         *rkey = block->pmr[chunk]->rkey;
1213     }
1214     return 0;
1215 }
1216 
1217 /*
1218  * Register (at connection time) the memory used for control
1219  * channel messages.
1220  */
1221 static int qemu_rdma_reg_control(RDMAContext *rdma, int idx)
1222 {
1223     rdma->wr_data[idx].control_mr = ibv_reg_mr(rdma->pd,
1224             rdma->wr_data[idx].control, RDMA_CONTROL_MAX_BUFFER,
1225             IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
1226     if (rdma->wr_data[idx].control_mr) {
1227         rdma->total_registrations++;
1228         return 0;
1229     }
1230     error_report("qemu_rdma_reg_control failed");
1231     return -1;
1232 }
1233 
1234 const char *print_wrid(int wrid)
1235 {
1236     if (wrid >= RDMA_WRID_RECV_CONTROL) {
1237         return wrid_desc[RDMA_WRID_RECV_CONTROL];
1238     }
1239     return wrid_desc[wrid];
1240 }
1241 
1242 /*
1243  * RDMA requires memory registration (mlock/pinning), but this is not good for
1244  * overcommitment.
1245  *
1246  * In preparation for the future where LRU information or workload-specific
1247  * writable writable working set memory access behavior is available to QEMU
1248  * it would be nice to have in place the ability to UN-register/UN-pin
1249  * particular memory regions from the RDMA hardware when it is determine that
1250  * those regions of memory will likely not be accessed again in the near future.
1251  *
1252  * While we do not yet have such information right now, the following
1253  * compile-time option allows us to perform a non-optimized version of this
1254  * behavior.
1255  *
1256  * By uncommenting this option, you will cause *all* RDMA transfers to be
1257  * unregistered immediately after the transfer completes on both sides of the
1258  * connection. This has no effect in 'rdma-pin-all' mode, only regular mode.
1259  *
1260  * This will have a terrible impact on migration performance, so until future
1261  * workload information or LRU information is available, do not attempt to use
1262  * this feature except for basic testing.
1263  */
1264 //#define RDMA_UNREGISTRATION_EXAMPLE
1265 
1266 /*
1267  * Perform a non-optimized memory unregistration after every transfer
1268  * for demonstration purposes, only if pin-all is not requested.
1269  *
1270  * Potential optimizations:
1271  * 1. Start a new thread to run this function continuously
1272         - for bit clearing
1273         - and for receipt of unregister messages
1274  * 2. Use an LRU.
1275  * 3. Use workload hints.
1276  */
1277 static int qemu_rdma_unregister_waiting(RDMAContext *rdma)
1278 {
1279     while (rdma->unregistrations[rdma->unregister_current]) {
1280         int ret;
1281         uint64_t wr_id = rdma->unregistrations[rdma->unregister_current];
1282         uint64_t chunk =
1283             (wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
1284         uint64_t index =
1285             (wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
1286         RDMALocalBlock *block =
1287             &(rdma->local_ram_blocks.block[index]);
1288         RDMARegister reg = { .current_index = index };
1289         RDMAControlHeader resp = { .type = RDMA_CONTROL_UNREGISTER_FINISHED,
1290                                  };
1291         RDMAControlHeader head = { .len = sizeof(RDMARegister),
1292                                    .type = RDMA_CONTROL_UNREGISTER_REQUEST,
1293                                    .repeat = 1,
1294                                  };
1295 
1296         trace_qemu_rdma_unregister_waiting_proc(chunk,
1297                                                 rdma->unregister_current);
1298 
1299         rdma->unregistrations[rdma->unregister_current] = 0;
1300         rdma->unregister_current++;
1301 
1302         if (rdma->unregister_current == RDMA_SIGNALED_SEND_MAX) {
1303             rdma->unregister_current = 0;
1304         }
1305 
1306 
1307         /*
1308          * Unregistration is speculative (because migration is single-threaded
1309          * and we cannot break the protocol's inifinband message ordering).
1310          * Thus, if the memory is currently being used for transmission,
1311          * then abort the attempt to unregister and try again
1312          * later the next time a completion is received for this memory.
1313          */
1314         clear_bit(chunk, block->unregister_bitmap);
1315 
1316         if (test_bit(chunk, block->transit_bitmap)) {
1317             trace_qemu_rdma_unregister_waiting_inflight(chunk);
1318             continue;
1319         }
1320 
1321         trace_qemu_rdma_unregister_waiting_send(chunk);
1322 
1323         ret = ibv_dereg_mr(block->pmr[chunk]);
1324         block->pmr[chunk] = NULL;
1325         block->remote_keys[chunk] = 0;
1326 
1327         if (ret != 0) {
1328             perror("unregistration chunk failed");
1329             return -ret;
1330         }
1331         rdma->total_registrations--;
1332 
1333         reg.key.chunk = chunk;
1334         register_to_network(rdma, &reg);
1335         ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
1336                                 &resp, NULL, NULL);
1337         if (ret < 0) {
1338             return ret;
1339         }
1340 
1341         trace_qemu_rdma_unregister_waiting_complete(chunk);
1342     }
1343 
1344     return 0;
1345 }
1346 
1347 static uint64_t qemu_rdma_make_wrid(uint64_t wr_id, uint64_t index,
1348                                          uint64_t chunk)
1349 {
1350     uint64_t result = wr_id & RDMA_WRID_TYPE_MASK;
1351 
1352     result |= (index << RDMA_WRID_BLOCK_SHIFT);
1353     result |= (chunk << RDMA_WRID_CHUNK_SHIFT);
1354 
1355     return result;
1356 }
1357 
1358 /*
1359  * Set bit for unregistration in the next iteration.
1360  * We cannot transmit right here, but will unpin later.
1361  */
1362 static void qemu_rdma_signal_unregister(RDMAContext *rdma, uint64_t index,
1363                                         uint64_t chunk, uint64_t wr_id)
1364 {
1365     if (rdma->unregistrations[rdma->unregister_next] != 0) {
1366         error_report("rdma migration: queue is full");
1367     } else {
1368         RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
1369 
1370         if (!test_and_set_bit(chunk, block->unregister_bitmap)) {
1371             trace_qemu_rdma_signal_unregister_append(chunk,
1372                                                      rdma->unregister_next);
1373 
1374             rdma->unregistrations[rdma->unregister_next++] =
1375                     qemu_rdma_make_wrid(wr_id, index, chunk);
1376 
1377             if (rdma->unregister_next == RDMA_SIGNALED_SEND_MAX) {
1378                 rdma->unregister_next = 0;
1379             }
1380         } else {
1381             trace_qemu_rdma_signal_unregister_already(chunk);
1382         }
1383     }
1384 }
1385 
1386 /*
1387  * Consult the connection manager to see a work request
1388  * (of any kind) has completed.
1389  * Return the work request ID that completed.
1390  */
1391 static uint64_t qemu_rdma_poll(RDMAContext *rdma, uint64_t *wr_id_out,
1392                                uint32_t *byte_len)
1393 {
1394     int ret;
1395     struct ibv_wc wc;
1396     uint64_t wr_id;
1397 
1398     ret = ibv_poll_cq(rdma->cq, 1, &wc);
1399 
1400     if (!ret) {
1401         *wr_id_out = RDMA_WRID_NONE;
1402         return 0;
1403     }
1404 
1405     if (ret < 0) {
1406         error_report("ibv_poll_cq return %d", ret);
1407         return ret;
1408     }
1409 
1410     wr_id = wc.wr_id & RDMA_WRID_TYPE_MASK;
1411 
1412     if (wc.status != IBV_WC_SUCCESS) {
1413         fprintf(stderr, "ibv_poll_cq wc.status=%d %s!\n",
1414                         wc.status, ibv_wc_status_str(wc.status));
1415         fprintf(stderr, "ibv_poll_cq wrid=%s!\n", wrid_desc[wr_id]);
1416 
1417         return -1;
1418     }
1419 
1420     if (rdma->control_ready_expected &&
1421         (wr_id >= RDMA_WRID_RECV_CONTROL)) {
1422         trace_qemu_rdma_poll_recv(wrid_desc[RDMA_WRID_RECV_CONTROL],
1423                   wr_id - RDMA_WRID_RECV_CONTROL, wr_id, rdma->nb_sent);
1424         rdma->control_ready_expected = 0;
1425     }
1426 
1427     if (wr_id == RDMA_WRID_RDMA_WRITE) {
1428         uint64_t chunk =
1429             (wc.wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
1430         uint64_t index =
1431             (wc.wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
1432         RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
1433 
1434         trace_qemu_rdma_poll_write(print_wrid(wr_id), wr_id, rdma->nb_sent,
1435                                    index, chunk, block->local_host_addr,
1436                                    (void *)(uintptr_t)block->remote_host_addr);
1437 
1438         clear_bit(chunk, block->transit_bitmap);
1439 
1440         if (rdma->nb_sent > 0) {
1441             rdma->nb_sent--;
1442         }
1443 
1444         if (!rdma->pin_all) {
1445             /*
1446              * FYI: If one wanted to signal a specific chunk to be unregistered
1447              * using LRU or workload-specific information, this is the function
1448              * you would call to do so. That chunk would then get asynchronously
1449              * unregistered later.
1450              */
1451 #ifdef RDMA_UNREGISTRATION_EXAMPLE
1452             qemu_rdma_signal_unregister(rdma, index, chunk, wc.wr_id);
1453 #endif
1454         }
1455     } else {
1456         trace_qemu_rdma_poll_other(print_wrid(wr_id), wr_id, rdma->nb_sent);
1457     }
1458 
1459     *wr_id_out = wc.wr_id;
1460     if (byte_len) {
1461         *byte_len = wc.byte_len;
1462     }
1463 
1464     return  0;
1465 }
1466 
1467 /*
1468  * Block until the next work request has completed.
1469  *
1470  * First poll to see if a work request has already completed,
1471  * otherwise block.
1472  *
1473  * If we encounter completed work requests for IDs other than
1474  * the one we're interested in, then that's generally an error.
1475  *
1476  * The only exception is actual RDMA Write completions. These
1477  * completions only need to be recorded, but do not actually
1478  * need further processing.
1479  */
1480 static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested,
1481                                     uint32_t *byte_len)
1482 {
1483     int num_cq_events = 0, ret = 0;
1484     struct ibv_cq *cq;
1485     void *cq_ctx;
1486     uint64_t wr_id = RDMA_WRID_NONE, wr_id_in;
1487 
1488     if (ibv_req_notify_cq(rdma->cq, 0)) {
1489         return -1;
1490     }
1491     /* poll cq first */
1492     while (wr_id != wrid_requested) {
1493         ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len);
1494         if (ret < 0) {
1495             return ret;
1496         }
1497 
1498         wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
1499 
1500         if (wr_id == RDMA_WRID_NONE) {
1501             break;
1502         }
1503         if (wr_id != wrid_requested) {
1504             trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested),
1505                        wrid_requested, print_wrid(wr_id), wr_id);
1506         }
1507     }
1508 
1509     if (wr_id == wrid_requested) {
1510         return 0;
1511     }
1512 
1513     while (1) {
1514         /*
1515          * Coroutine doesn't start until migration_fd_process_incoming()
1516          * so don't yield unless we know we're running inside of a coroutine.
1517          */
1518         if (rdma->migration_started_on_destination) {
1519             yield_until_fd_readable(rdma->comp_channel->fd);
1520         }
1521 
1522         if (ibv_get_cq_event(rdma->comp_channel, &cq, &cq_ctx)) {
1523             perror("ibv_get_cq_event");
1524             goto err_block_for_wrid;
1525         }
1526 
1527         num_cq_events++;
1528 
1529         if (ibv_req_notify_cq(cq, 0)) {
1530             goto err_block_for_wrid;
1531         }
1532 
1533         while (wr_id != wrid_requested) {
1534             ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len);
1535             if (ret < 0) {
1536                 goto err_block_for_wrid;
1537             }
1538 
1539             wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
1540 
1541             if (wr_id == RDMA_WRID_NONE) {
1542                 break;
1543             }
1544             if (wr_id != wrid_requested) {
1545                 trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested),
1546                                    wrid_requested, print_wrid(wr_id), wr_id);
1547             }
1548         }
1549 
1550         if (wr_id == wrid_requested) {
1551             goto success_block_for_wrid;
1552         }
1553     }
1554 
1555 success_block_for_wrid:
1556     if (num_cq_events) {
1557         ibv_ack_cq_events(cq, num_cq_events);
1558     }
1559     return 0;
1560 
1561 err_block_for_wrid:
1562     if (num_cq_events) {
1563         ibv_ack_cq_events(cq, num_cq_events);
1564     }
1565     return ret;
1566 }
1567 
1568 /*
1569  * Post a SEND message work request for the control channel
1570  * containing some data and block until the post completes.
1571  */
1572 static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf,
1573                                        RDMAControlHeader *head)
1574 {
1575     int ret = 0;
1576     RDMAWorkRequestData *wr = &rdma->wr_data[RDMA_WRID_CONTROL];
1577     struct ibv_send_wr *bad_wr;
1578     struct ibv_sge sge = {
1579                            .addr = (uintptr_t)(wr->control),
1580                            .length = head->len + sizeof(RDMAControlHeader),
1581                            .lkey = wr->control_mr->lkey,
1582                          };
1583     struct ibv_send_wr send_wr = {
1584                                    .wr_id = RDMA_WRID_SEND_CONTROL,
1585                                    .opcode = IBV_WR_SEND,
1586                                    .send_flags = IBV_SEND_SIGNALED,
1587                                    .sg_list = &sge,
1588                                    .num_sge = 1,
1589                                 };
1590 
1591     trace_qemu_rdma_post_send_control(control_desc[head->type]);
1592 
1593     /*
1594      * We don't actually need to do a memcpy() in here if we used
1595      * the "sge" properly, but since we're only sending control messages
1596      * (not RAM in a performance-critical path), then its OK for now.
1597      *
1598      * The copy makes the RDMAControlHeader simpler to manipulate
1599      * for the time being.
1600      */
1601     assert(head->len <= RDMA_CONTROL_MAX_BUFFER - sizeof(*head));
1602     memcpy(wr->control, head, sizeof(RDMAControlHeader));
1603     control_to_network((void *) wr->control);
1604 
1605     if (buf) {
1606         memcpy(wr->control + sizeof(RDMAControlHeader), buf, head->len);
1607     }
1608 
1609 
1610     ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
1611 
1612     if (ret > 0) {
1613         error_report("Failed to use post IB SEND for control");
1614         return -ret;
1615     }
1616 
1617     ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_SEND_CONTROL, NULL);
1618     if (ret < 0) {
1619         error_report("rdma migration: send polling control error");
1620     }
1621 
1622     return ret;
1623 }
1624 
1625 /*
1626  * Post a RECV work request in anticipation of some future receipt
1627  * of data on the control channel.
1628  */
1629 static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx)
1630 {
1631     struct ibv_recv_wr *bad_wr;
1632     struct ibv_sge sge = {
1633                             .addr = (uintptr_t)(rdma->wr_data[idx].control),
1634                             .length = RDMA_CONTROL_MAX_BUFFER,
1635                             .lkey = rdma->wr_data[idx].control_mr->lkey,
1636                          };
1637 
1638     struct ibv_recv_wr recv_wr = {
1639                                     .wr_id = RDMA_WRID_RECV_CONTROL + idx,
1640                                     .sg_list = &sge,
1641                                     .num_sge = 1,
1642                                  };
1643 
1644 
1645     if (ibv_post_recv(rdma->qp, &recv_wr, &bad_wr)) {
1646         return -1;
1647     }
1648 
1649     return 0;
1650 }
1651 
1652 /*
1653  * Block and wait for a RECV control channel message to arrive.
1654  */
1655 static int qemu_rdma_exchange_get_response(RDMAContext *rdma,
1656                 RDMAControlHeader *head, int expecting, int idx)
1657 {
1658     uint32_t byte_len;
1659     int ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RECV_CONTROL + idx,
1660                                        &byte_len);
1661 
1662     if (ret < 0) {
1663         error_report("rdma migration: recv polling control error!");
1664         return ret;
1665     }
1666 
1667     network_to_control((void *) rdma->wr_data[idx].control);
1668     memcpy(head, rdma->wr_data[idx].control, sizeof(RDMAControlHeader));
1669 
1670     trace_qemu_rdma_exchange_get_response_start(control_desc[expecting]);
1671 
1672     if (expecting == RDMA_CONTROL_NONE) {
1673         trace_qemu_rdma_exchange_get_response_none(control_desc[head->type],
1674                                              head->type);
1675     } else if (head->type != expecting || head->type == RDMA_CONTROL_ERROR) {
1676         error_report("Was expecting a %s (%d) control message"
1677                 ", but got: %s (%d), length: %d",
1678                 control_desc[expecting], expecting,
1679                 control_desc[head->type], head->type, head->len);
1680         if (head->type == RDMA_CONTROL_ERROR) {
1681             rdma->received_error = true;
1682         }
1683         return -EIO;
1684     }
1685     if (head->len > RDMA_CONTROL_MAX_BUFFER - sizeof(*head)) {
1686         error_report("too long length: %d", head->len);
1687         return -EINVAL;
1688     }
1689     if (sizeof(*head) + head->len != byte_len) {
1690         error_report("Malformed length: %d byte_len %d", head->len, byte_len);
1691         return -EINVAL;
1692     }
1693 
1694     return 0;
1695 }
1696 
1697 /*
1698  * When a RECV work request has completed, the work request's
1699  * buffer is pointed at the header.
1700  *
1701  * This will advance the pointer to the data portion
1702  * of the control message of the work request's buffer that
1703  * was populated after the work request finished.
1704  */
1705 static void qemu_rdma_move_header(RDMAContext *rdma, int idx,
1706                                   RDMAControlHeader *head)
1707 {
1708     rdma->wr_data[idx].control_len = head->len;
1709     rdma->wr_data[idx].control_curr =
1710         rdma->wr_data[idx].control + sizeof(RDMAControlHeader);
1711 }
1712 
1713 /*
1714  * This is an 'atomic' high-level operation to deliver a single, unified
1715  * control-channel message.
1716  *
1717  * Additionally, if the user is expecting some kind of reply to this message,
1718  * they can request a 'resp' response message be filled in by posting an
1719  * additional work request on behalf of the user and waiting for an additional
1720  * completion.
1721  *
1722  * The extra (optional) response is used during registration to us from having
1723  * to perform an *additional* exchange of message just to provide a response by
1724  * instead piggy-backing on the acknowledgement.
1725  */
1726 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
1727                                    uint8_t *data, RDMAControlHeader *resp,
1728                                    int *resp_idx,
1729                                    int (*callback)(RDMAContext *rdma))
1730 {
1731     int ret = 0;
1732 
1733     /*
1734      * Wait until the dest is ready before attempting to deliver the message
1735      * by waiting for a READY message.
1736      */
1737     if (rdma->control_ready_expected) {
1738         RDMAControlHeader resp;
1739         ret = qemu_rdma_exchange_get_response(rdma,
1740                                     &resp, RDMA_CONTROL_READY, RDMA_WRID_READY);
1741         if (ret < 0) {
1742             return ret;
1743         }
1744     }
1745 
1746     /*
1747      * If the user is expecting a response, post a WR in anticipation of it.
1748      */
1749     if (resp) {
1750         ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_DATA);
1751         if (ret) {
1752             error_report("rdma migration: error posting"
1753                     " extra control recv for anticipated result!");
1754             return ret;
1755         }
1756     }
1757 
1758     /*
1759      * Post a WR to replace the one we just consumed for the READY message.
1760      */
1761     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
1762     if (ret) {
1763         error_report("rdma migration: error posting first control recv!");
1764         return ret;
1765     }
1766 
1767     /*
1768      * Deliver the control message that was requested.
1769      */
1770     ret = qemu_rdma_post_send_control(rdma, data, head);
1771 
1772     if (ret < 0) {
1773         error_report("Failed to send control buffer!");
1774         return ret;
1775     }
1776 
1777     /*
1778      * If we're expecting a response, block and wait for it.
1779      */
1780     if (resp) {
1781         if (callback) {
1782             trace_qemu_rdma_exchange_send_issue_callback();
1783             ret = callback(rdma);
1784             if (ret < 0) {
1785                 return ret;
1786             }
1787         }
1788 
1789         trace_qemu_rdma_exchange_send_waiting(control_desc[resp->type]);
1790         ret = qemu_rdma_exchange_get_response(rdma, resp,
1791                                               resp->type, RDMA_WRID_DATA);
1792 
1793         if (ret < 0) {
1794             return ret;
1795         }
1796 
1797         qemu_rdma_move_header(rdma, RDMA_WRID_DATA, resp);
1798         if (resp_idx) {
1799             *resp_idx = RDMA_WRID_DATA;
1800         }
1801         trace_qemu_rdma_exchange_send_received(control_desc[resp->type]);
1802     }
1803 
1804     rdma->control_ready_expected = 1;
1805 
1806     return 0;
1807 }
1808 
1809 /*
1810  * This is an 'atomic' high-level operation to receive a single, unified
1811  * control-channel message.
1812  */
1813 static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader *head,
1814                                 int expecting)
1815 {
1816     RDMAControlHeader ready = {
1817                                 .len = 0,
1818                                 .type = RDMA_CONTROL_READY,
1819                                 .repeat = 1,
1820                               };
1821     int ret;
1822 
1823     /*
1824      * Inform the source that we're ready to receive a message.
1825      */
1826     ret = qemu_rdma_post_send_control(rdma, NULL, &ready);
1827 
1828     if (ret < 0) {
1829         error_report("Failed to send control buffer!");
1830         return ret;
1831     }
1832 
1833     /*
1834      * Block and wait for the message.
1835      */
1836     ret = qemu_rdma_exchange_get_response(rdma, head,
1837                                           expecting, RDMA_WRID_READY);
1838 
1839     if (ret < 0) {
1840         return ret;
1841     }
1842 
1843     qemu_rdma_move_header(rdma, RDMA_WRID_READY, head);
1844 
1845     /*
1846      * Post a new RECV work request to replace the one we just consumed.
1847      */
1848     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
1849     if (ret) {
1850         error_report("rdma migration: error posting second control recv!");
1851         return ret;
1852     }
1853 
1854     return 0;
1855 }
1856 
1857 /*
1858  * Write an actual chunk of memory using RDMA.
1859  *
1860  * If we're using dynamic registration on the dest-side, we have to
1861  * send a registration command first.
1862  */
1863 static int qemu_rdma_write_one(QEMUFile *f, RDMAContext *rdma,
1864                                int current_index, uint64_t current_addr,
1865                                uint64_t length)
1866 {
1867     struct ibv_sge sge;
1868     struct ibv_send_wr send_wr = { 0 };
1869     struct ibv_send_wr *bad_wr;
1870     int reg_result_idx, ret, count = 0;
1871     uint64_t chunk, chunks;
1872     uint8_t *chunk_start, *chunk_end;
1873     RDMALocalBlock *block = &(rdma->local_ram_blocks.block[current_index]);
1874     RDMARegister reg;
1875     RDMARegisterResult *reg_result;
1876     RDMAControlHeader resp = { .type = RDMA_CONTROL_REGISTER_RESULT };
1877     RDMAControlHeader head = { .len = sizeof(RDMARegister),
1878                                .type = RDMA_CONTROL_REGISTER_REQUEST,
1879                                .repeat = 1,
1880                              };
1881 
1882 retry:
1883     sge.addr = (uintptr_t)(block->local_host_addr +
1884                             (current_addr - block->offset));
1885     sge.length = length;
1886 
1887     chunk = ram_chunk_index(block->local_host_addr,
1888                             (uint8_t *)(uintptr_t)sge.addr);
1889     chunk_start = ram_chunk_start(block, chunk);
1890 
1891     if (block->is_ram_block) {
1892         chunks = length / (1UL << RDMA_REG_CHUNK_SHIFT);
1893 
1894         if (chunks && ((length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
1895             chunks--;
1896         }
1897     } else {
1898         chunks = block->length / (1UL << RDMA_REG_CHUNK_SHIFT);
1899 
1900         if (chunks && ((block->length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
1901             chunks--;
1902         }
1903     }
1904 
1905     trace_qemu_rdma_write_one_top(chunks + 1,
1906                                   (chunks + 1) *
1907                                   (1UL << RDMA_REG_CHUNK_SHIFT) / 1024 / 1024);
1908 
1909     chunk_end = ram_chunk_end(block, chunk + chunks);
1910 
1911     if (!rdma->pin_all) {
1912 #ifdef RDMA_UNREGISTRATION_EXAMPLE
1913         qemu_rdma_unregister_waiting(rdma);
1914 #endif
1915     }
1916 
1917     while (test_bit(chunk, block->transit_bitmap)) {
1918         (void)count;
1919         trace_qemu_rdma_write_one_block(count++, current_index, chunk,
1920                 sge.addr, length, rdma->nb_sent, block->nb_chunks);
1921 
1922         ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
1923 
1924         if (ret < 0) {
1925             error_report("Failed to Wait for previous write to complete "
1926                     "block %d chunk %" PRIu64
1927                     " current %" PRIu64 " len %" PRIu64 " %d",
1928                     current_index, chunk, sge.addr, length, rdma->nb_sent);
1929             return ret;
1930         }
1931     }
1932 
1933     if (!rdma->pin_all || !block->is_ram_block) {
1934         if (!block->remote_keys[chunk]) {
1935             /*
1936              * This chunk has not yet been registered, so first check to see
1937              * if the entire chunk is zero. If so, tell the other size to
1938              * memset() + madvise() the entire chunk without RDMA.
1939              */
1940 
1941             if (buffer_is_zero((void *)(uintptr_t)sge.addr, length)) {
1942                 RDMACompress comp = {
1943                                         .offset = current_addr,
1944                                         .value = 0,
1945                                         .block_idx = current_index,
1946                                         .length = length,
1947                                     };
1948 
1949                 head.len = sizeof(comp);
1950                 head.type = RDMA_CONTROL_COMPRESS;
1951 
1952                 trace_qemu_rdma_write_one_zero(chunk, sge.length,
1953                                                current_index, current_addr);
1954 
1955                 compress_to_network(rdma, &comp);
1956                 ret = qemu_rdma_exchange_send(rdma, &head,
1957                                 (uint8_t *) &comp, NULL, NULL, NULL);
1958 
1959                 if (ret < 0) {
1960                     return -EIO;
1961                 }
1962 
1963                 acct_update_position(f, sge.length, true);
1964 
1965                 return 1;
1966             }
1967 
1968             /*
1969              * Otherwise, tell other side to register.
1970              */
1971             reg.current_index = current_index;
1972             if (block->is_ram_block) {
1973                 reg.key.current_addr = current_addr;
1974             } else {
1975                 reg.key.chunk = chunk;
1976             }
1977             reg.chunks = chunks;
1978 
1979             trace_qemu_rdma_write_one_sendreg(chunk, sge.length, current_index,
1980                                               current_addr);
1981 
1982             register_to_network(rdma, &reg);
1983             ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
1984                                     &resp, &reg_result_idx, NULL);
1985             if (ret < 0) {
1986                 return ret;
1987             }
1988 
1989             /* try to overlap this single registration with the one we sent. */
1990             if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
1991                                                 &sge.lkey, NULL, chunk,
1992                                                 chunk_start, chunk_end)) {
1993                 error_report("cannot get lkey");
1994                 return -EINVAL;
1995             }
1996 
1997             reg_result = (RDMARegisterResult *)
1998                     rdma->wr_data[reg_result_idx].control_curr;
1999 
2000             network_to_result(reg_result);
2001 
2002             trace_qemu_rdma_write_one_recvregres(block->remote_keys[chunk],
2003                                                  reg_result->rkey, chunk);
2004 
2005             block->remote_keys[chunk] = reg_result->rkey;
2006             block->remote_host_addr = reg_result->host_addr;
2007         } else {
2008             /* already registered before */
2009             if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
2010                                                 &sge.lkey, NULL, chunk,
2011                                                 chunk_start, chunk_end)) {
2012                 error_report("cannot get lkey!");
2013                 return -EINVAL;
2014             }
2015         }
2016 
2017         send_wr.wr.rdma.rkey = block->remote_keys[chunk];
2018     } else {
2019         send_wr.wr.rdma.rkey = block->remote_rkey;
2020 
2021         if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
2022                                                      &sge.lkey, NULL, chunk,
2023                                                      chunk_start, chunk_end)) {
2024             error_report("cannot get lkey!");
2025             return -EINVAL;
2026         }
2027     }
2028 
2029     /*
2030      * Encode the ram block index and chunk within this wrid.
2031      * We will use this information at the time of completion
2032      * to figure out which bitmap to check against and then which
2033      * chunk in the bitmap to look for.
2034      */
2035     send_wr.wr_id = qemu_rdma_make_wrid(RDMA_WRID_RDMA_WRITE,
2036                                         current_index, chunk);
2037 
2038     send_wr.opcode = IBV_WR_RDMA_WRITE;
2039     send_wr.send_flags = IBV_SEND_SIGNALED;
2040     send_wr.sg_list = &sge;
2041     send_wr.num_sge = 1;
2042     send_wr.wr.rdma.remote_addr = block->remote_host_addr +
2043                                 (current_addr - block->offset);
2044 
2045     trace_qemu_rdma_write_one_post(chunk, sge.addr, send_wr.wr.rdma.remote_addr,
2046                                    sge.length);
2047 
2048     /*
2049      * ibv_post_send() does not return negative error numbers,
2050      * per the specification they are positive - no idea why.
2051      */
2052     ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
2053 
2054     if (ret == ENOMEM) {
2055         trace_qemu_rdma_write_one_queue_full();
2056         ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
2057         if (ret < 0) {
2058             error_report("rdma migration: failed to make "
2059                          "room in full send queue! %d", ret);
2060             return ret;
2061         }
2062 
2063         goto retry;
2064 
2065     } else if (ret > 0) {
2066         perror("rdma migration: post rdma write failed");
2067         return -ret;
2068     }
2069 
2070     set_bit(chunk, block->transit_bitmap);
2071     acct_update_position(f, sge.length, false);
2072     rdma->total_writes++;
2073 
2074     return 0;
2075 }
2076 
2077 /*
2078  * Push out any unwritten RDMA operations.
2079  *
2080  * We support sending out multiple chunks at the same time.
2081  * Not all of them need to get signaled in the completion queue.
2082  */
2083 static int qemu_rdma_write_flush(QEMUFile *f, RDMAContext *rdma)
2084 {
2085     int ret;
2086 
2087     if (!rdma->current_length) {
2088         return 0;
2089     }
2090 
2091     ret = qemu_rdma_write_one(f, rdma,
2092             rdma->current_index, rdma->current_addr, rdma->current_length);
2093 
2094     if (ret < 0) {
2095         return ret;
2096     }
2097 
2098     if (ret == 0) {
2099         rdma->nb_sent++;
2100         trace_qemu_rdma_write_flush(rdma->nb_sent);
2101     }
2102 
2103     rdma->current_length = 0;
2104     rdma->current_addr = 0;
2105 
2106     return 0;
2107 }
2108 
2109 static inline int qemu_rdma_buffer_mergable(RDMAContext *rdma,
2110                     uint64_t offset, uint64_t len)
2111 {
2112     RDMALocalBlock *block;
2113     uint8_t *host_addr;
2114     uint8_t *chunk_end;
2115 
2116     if (rdma->current_index < 0) {
2117         return 0;
2118     }
2119 
2120     if (rdma->current_chunk < 0) {
2121         return 0;
2122     }
2123 
2124     block = &(rdma->local_ram_blocks.block[rdma->current_index]);
2125     host_addr = block->local_host_addr + (offset - block->offset);
2126     chunk_end = ram_chunk_end(block, rdma->current_chunk);
2127 
2128     if (rdma->current_length == 0) {
2129         return 0;
2130     }
2131 
2132     /*
2133      * Only merge into chunk sequentially.
2134      */
2135     if (offset != (rdma->current_addr + rdma->current_length)) {
2136         return 0;
2137     }
2138 
2139     if (offset < block->offset) {
2140         return 0;
2141     }
2142 
2143     if ((offset + len) > (block->offset + block->length)) {
2144         return 0;
2145     }
2146 
2147     if ((host_addr + len) > chunk_end) {
2148         return 0;
2149     }
2150 
2151     return 1;
2152 }
2153 
2154 /*
2155  * We're not actually writing here, but doing three things:
2156  *
2157  * 1. Identify the chunk the buffer belongs to.
2158  * 2. If the chunk is full or the buffer doesn't belong to the current
2159  *    chunk, then start a new chunk and flush() the old chunk.
2160  * 3. To keep the hardware busy, we also group chunks into batches
2161  *    and only require that a batch gets acknowledged in the completion
2162  *    qeueue instead of each individual chunk.
2163  */
2164 static int qemu_rdma_write(QEMUFile *f, RDMAContext *rdma,
2165                            uint64_t block_offset, uint64_t offset,
2166                            uint64_t len)
2167 {
2168     uint64_t current_addr = block_offset + offset;
2169     uint64_t index = rdma->current_index;
2170     uint64_t chunk = rdma->current_chunk;
2171     int ret;
2172 
2173     /* If we cannot merge it, we flush the current buffer first. */
2174     if (!qemu_rdma_buffer_mergable(rdma, current_addr, len)) {
2175         ret = qemu_rdma_write_flush(f, rdma);
2176         if (ret) {
2177             return ret;
2178         }
2179         rdma->current_length = 0;
2180         rdma->current_addr = current_addr;
2181 
2182         ret = qemu_rdma_search_ram_block(rdma, block_offset,
2183                                          offset, len, &index, &chunk);
2184         if (ret) {
2185             error_report("ram block search failed");
2186             return ret;
2187         }
2188         rdma->current_index = index;
2189         rdma->current_chunk = chunk;
2190     }
2191 
2192     /* merge it */
2193     rdma->current_length += len;
2194 
2195     /* flush it if buffer is too large */
2196     if (rdma->current_length >= RDMA_MERGE_MAX) {
2197         return qemu_rdma_write_flush(f, rdma);
2198     }
2199 
2200     return 0;
2201 }
2202 
2203 static void qemu_rdma_cleanup(RDMAContext *rdma)
2204 {
2205     struct rdma_cm_event *cm_event;
2206     int ret, idx;
2207 
2208     if (rdma->cm_id && rdma->connected) {
2209         if (rdma->error_state && !rdma->received_error) {
2210             RDMAControlHeader head = { .len = 0,
2211                                        .type = RDMA_CONTROL_ERROR,
2212                                        .repeat = 1,
2213                                      };
2214             error_report("Early error. Sending error.");
2215             qemu_rdma_post_send_control(rdma, NULL, &head);
2216         }
2217 
2218         ret = rdma_disconnect(rdma->cm_id);
2219         if (!ret) {
2220             trace_qemu_rdma_cleanup_waiting_for_disconnect();
2221             ret = rdma_get_cm_event(rdma->channel, &cm_event);
2222             if (!ret) {
2223                 rdma_ack_cm_event(cm_event);
2224             }
2225         }
2226         trace_qemu_rdma_cleanup_disconnect();
2227         rdma->connected = false;
2228     }
2229 
2230     g_free(rdma->dest_blocks);
2231     rdma->dest_blocks = NULL;
2232 
2233     for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2234         if (rdma->wr_data[idx].control_mr) {
2235             rdma->total_registrations--;
2236             ibv_dereg_mr(rdma->wr_data[idx].control_mr);
2237         }
2238         rdma->wr_data[idx].control_mr = NULL;
2239     }
2240 
2241     if (rdma->local_ram_blocks.block) {
2242         while (rdma->local_ram_blocks.nb_blocks) {
2243             rdma_delete_block(rdma, &rdma->local_ram_blocks.block[0]);
2244         }
2245     }
2246 
2247     if (rdma->qp) {
2248         rdma_destroy_qp(rdma->cm_id);
2249         rdma->qp = NULL;
2250     }
2251     if (rdma->cq) {
2252         ibv_destroy_cq(rdma->cq);
2253         rdma->cq = NULL;
2254     }
2255     if (rdma->comp_channel) {
2256         ibv_destroy_comp_channel(rdma->comp_channel);
2257         rdma->comp_channel = NULL;
2258     }
2259     if (rdma->pd) {
2260         ibv_dealloc_pd(rdma->pd);
2261         rdma->pd = NULL;
2262     }
2263     if (rdma->cm_id) {
2264         rdma_destroy_id(rdma->cm_id);
2265         rdma->cm_id = NULL;
2266     }
2267     if (rdma->listen_id) {
2268         rdma_destroy_id(rdma->listen_id);
2269         rdma->listen_id = NULL;
2270     }
2271     if (rdma->channel) {
2272         rdma_destroy_event_channel(rdma->channel);
2273         rdma->channel = NULL;
2274     }
2275     g_free(rdma->host);
2276     rdma->host = NULL;
2277 }
2278 
2279 
2280 static int qemu_rdma_source_init(RDMAContext *rdma, Error **errp, bool pin_all)
2281 {
2282     int ret, idx;
2283     Error *local_err = NULL, **temp = &local_err;
2284 
2285     /*
2286      * Will be validated against destination's actual capabilities
2287      * after the connect() completes.
2288      */
2289     rdma->pin_all = pin_all;
2290 
2291     ret = qemu_rdma_resolve_host(rdma, temp);
2292     if (ret) {
2293         goto err_rdma_source_init;
2294     }
2295 
2296     ret = qemu_rdma_alloc_pd_cq(rdma);
2297     if (ret) {
2298         ERROR(temp, "rdma migration: error allocating pd and cq! Your mlock()"
2299                     " limits may be too low. Please check $ ulimit -a # and "
2300                     "search for 'ulimit -l' in the output");
2301         goto err_rdma_source_init;
2302     }
2303 
2304     ret = qemu_rdma_alloc_qp(rdma);
2305     if (ret) {
2306         ERROR(temp, "rdma migration: error allocating qp!");
2307         goto err_rdma_source_init;
2308     }
2309 
2310     ret = qemu_rdma_init_ram_blocks(rdma);
2311     if (ret) {
2312         ERROR(temp, "rdma migration: error initializing ram blocks!");
2313         goto err_rdma_source_init;
2314     }
2315 
2316     /* Build the hash that maps from offset to RAMBlock */
2317     rdma->blockmap = g_hash_table_new(g_direct_hash, g_direct_equal);
2318     for (idx = 0; idx < rdma->local_ram_blocks.nb_blocks; idx++) {
2319         g_hash_table_insert(rdma->blockmap,
2320                 (void *)(uintptr_t)rdma->local_ram_blocks.block[idx].offset,
2321                 &rdma->local_ram_blocks.block[idx]);
2322     }
2323 
2324     for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2325         ret = qemu_rdma_reg_control(rdma, idx);
2326         if (ret) {
2327             ERROR(temp, "rdma migration: error registering %d control!",
2328                                                             idx);
2329             goto err_rdma_source_init;
2330         }
2331     }
2332 
2333     return 0;
2334 
2335 err_rdma_source_init:
2336     error_propagate(errp, local_err);
2337     qemu_rdma_cleanup(rdma);
2338     return -1;
2339 }
2340 
2341 static int qemu_rdma_connect(RDMAContext *rdma, Error **errp)
2342 {
2343     RDMACapabilities cap = {
2344                                 .version = RDMA_CONTROL_VERSION_CURRENT,
2345                                 .flags = 0,
2346                            };
2347     struct rdma_conn_param conn_param = { .initiator_depth = 2,
2348                                           .retry_count = 5,
2349                                           .private_data = &cap,
2350                                           .private_data_len = sizeof(cap),
2351                                         };
2352     struct rdma_cm_event *cm_event;
2353     int ret;
2354 
2355     /*
2356      * Only negotiate the capability with destination if the user
2357      * on the source first requested the capability.
2358      */
2359     if (rdma->pin_all) {
2360         trace_qemu_rdma_connect_pin_all_requested();
2361         cap.flags |= RDMA_CAPABILITY_PIN_ALL;
2362     }
2363 
2364     caps_to_network(&cap);
2365 
2366     ret = rdma_connect(rdma->cm_id, &conn_param);
2367     if (ret) {
2368         perror("rdma_connect");
2369         ERROR(errp, "connecting to destination!");
2370         goto err_rdma_source_connect;
2371     }
2372 
2373     ret = rdma_get_cm_event(rdma->channel, &cm_event);
2374     if (ret) {
2375         perror("rdma_get_cm_event after rdma_connect");
2376         ERROR(errp, "connecting to destination!");
2377         rdma_ack_cm_event(cm_event);
2378         goto err_rdma_source_connect;
2379     }
2380 
2381     if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
2382         perror("rdma_get_cm_event != EVENT_ESTABLISHED after rdma_connect");
2383         ERROR(errp, "connecting to destination!");
2384         rdma_ack_cm_event(cm_event);
2385         goto err_rdma_source_connect;
2386     }
2387     rdma->connected = true;
2388 
2389     memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
2390     network_to_caps(&cap);
2391 
2392     /*
2393      * Verify that the *requested* capabilities are supported by the destination
2394      * and disable them otherwise.
2395      */
2396     if (rdma->pin_all && !(cap.flags & RDMA_CAPABILITY_PIN_ALL)) {
2397         ERROR(errp, "Server cannot support pinning all memory. "
2398                         "Will register memory dynamically.");
2399         rdma->pin_all = false;
2400     }
2401 
2402     trace_qemu_rdma_connect_pin_all_outcome(rdma->pin_all);
2403 
2404     rdma_ack_cm_event(cm_event);
2405 
2406     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
2407     if (ret) {
2408         ERROR(errp, "posting second control recv!");
2409         goto err_rdma_source_connect;
2410     }
2411 
2412     rdma->control_ready_expected = 1;
2413     rdma->nb_sent = 0;
2414     return 0;
2415 
2416 err_rdma_source_connect:
2417     qemu_rdma_cleanup(rdma);
2418     return -1;
2419 }
2420 
2421 static int qemu_rdma_dest_init(RDMAContext *rdma, Error **errp)
2422 {
2423     int ret, idx;
2424     struct rdma_cm_id *listen_id;
2425     char ip[40] = "unknown";
2426     struct rdma_addrinfo *res, *e;
2427     char port_str[16];
2428 
2429     for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2430         rdma->wr_data[idx].control_len = 0;
2431         rdma->wr_data[idx].control_curr = NULL;
2432     }
2433 
2434     if (!rdma->host || !rdma->host[0]) {
2435         ERROR(errp, "RDMA host is not set!");
2436         rdma->error_state = -EINVAL;
2437         return -1;
2438     }
2439     /* create CM channel */
2440     rdma->channel = rdma_create_event_channel();
2441     if (!rdma->channel) {
2442         ERROR(errp, "could not create rdma event channel");
2443         rdma->error_state = -EINVAL;
2444         return -1;
2445     }
2446 
2447     /* create CM id */
2448     ret = rdma_create_id(rdma->channel, &listen_id, NULL, RDMA_PS_TCP);
2449     if (ret) {
2450         ERROR(errp, "could not create cm_id!");
2451         goto err_dest_init_create_listen_id;
2452     }
2453 
2454     snprintf(port_str, 16, "%d", rdma->port);
2455     port_str[15] = '\0';
2456 
2457     ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res);
2458     if (ret < 0) {
2459         ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host);
2460         goto err_dest_init_bind_addr;
2461     }
2462 
2463     for (e = res; e != NULL; e = e->ai_next) {
2464         inet_ntop(e->ai_family,
2465             &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip);
2466         trace_qemu_rdma_dest_init_trying(rdma->host, ip);
2467         ret = rdma_bind_addr(listen_id, e->ai_dst_addr);
2468         if (ret) {
2469             continue;
2470         }
2471         if (e->ai_family == AF_INET6) {
2472             ret = qemu_rdma_broken_ipv6_kernel(errp, listen_id->verbs);
2473             if (ret) {
2474                 continue;
2475             }
2476         }
2477         break;
2478     }
2479 
2480     if (!e) {
2481         ERROR(errp, "Error: could not rdma_bind_addr!");
2482         goto err_dest_init_bind_addr;
2483     }
2484 
2485     rdma->listen_id = listen_id;
2486     qemu_rdma_dump_gid("dest_init", listen_id);
2487     return 0;
2488 
2489 err_dest_init_bind_addr:
2490     rdma_destroy_id(listen_id);
2491 err_dest_init_create_listen_id:
2492     rdma_destroy_event_channel(rdma->channel);
2493     rdma->channel = NULL;
2494     rdma->error_state = ret;
2495     return ret;
2496 
2497 }
2498 
2499 static void *qemu_rdma_data_init(const char *host_port, Error **errp)
2500 {
2501     RDMAContext *rdma = NULL;
2502     InetSocketAddress *addr;
2503 
2504     if (host_port) {
2505         rdma = g_new0(RDMAContext, 1);
2506         rdma->current_index = -1;
2507         rdma->current_chunk = -1;
2508 
2509         addr = inet_parse(host_port, NULL);
2510         if (addr != NULL) {
2511             rdma->port = atoi(addr->port);
2512             rdma->host = g_strdup(addr->host);
2513         } else {
2514             ERROR(errp, "bad RDMA migration address '%s'", host_port);
2515             g_free(rdma);
2516             rdma = NULL;
2517         }
2518 
2519         qapi_free_InetSocketAddress(addr);
2520     }
2521 
2522     return rdma;
2523 }
2524 
2525 /*
2526  * QEMUFile interface to the control channel.
2527  * SEND messages for control only.
2528  * VM's ram is handled with regular RDMA messages.
2529  */
2530 static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
2531                                        const struct iovec *iov,
2532                                        size_t niov,
2533                                        int *fds,
2534                                        size_t nfds,
2535                                        Error **errp)
2536 {
2537     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2538     QEMUFile *f = rioc->file;
2539     RDMAContext *rdma = rioc->rdma;
2540     int ret;
2541     ssize_t done = 0;
2542     size_t i;
2543 
2544     CHECK_ERROR_STATE();
2545 
2546     /*
2547      * Push out any writes that
2548      * we're queued up for VM's ram.
2549      */
2550     ret = qemu_rdma_write_flush(f, rdma);
2551     if (ret < 0) {
2552         rdma->error_state = ret;
2553         return ret;
2554     }
2555 
2556     for (i = 0; i < niov; i++) {
2557         size_t remaining = iov[i].iov_len;
2558         uint8_t * data = (void *)iov[i].iov_base;
2559         while (remaining) {
2560             RDMAControlHeader head;
2561 
2562             rioc->len = MIN(remaining, RDMA_SEND_INCREMENT);
2563             remaining -= rioc->len;
2564 
2565             head.len = rioc->len;
2566             head.type = RDMA_CONTROL_QEMU_FILE;
2567 
2568             ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL);
2569 
2570             if (ret < 0) {
2571                 rdma->error_state = ret;
2572                 return ret;
2573             }
2574 
2575             data += rioc->len;
2576             done += rioc->len;
2577         }
2578     }
2579 
2580     return done;
2581 }
2582 
2583 static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf,
2584                              size_t size, int idx)
2585 {
2586     size_t len = 0;
2587 
2588     if (rdma->wr_data[idx].control_len) {
2589         trace_qemu_rdma_fill(rdma->wr_data[idx].control_len, size);
2590 
2591         len = MIN(size, rdma->wr_data[idx].control_len);
2592         memcpy(buf, rdma->wr_data[idx].control_curr, len);
2593         rdma->wr_data[idx].control_curr += len;
2594         rdma->wr_data[idx].control_len -= len;
2595     }
2596 
2597     return len;
2598 }
2599 
2600 /*
2601  * QEMUFile interface to the control channel.
2602  * RDMA links don't use bytestreams, so we have to
2603  * return bytes to QEMUFile opportunistically.
2604  */
2605 static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
2606                                       const struct iovec *iov,
2607                                       size_t niov,
2608                                       int **fds,
2609                                       size_t *nfds,
2610                                       Error **errp)
2611 {
2612     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2613     RDMAContext *rdma = rioc->rdma;
2614     RDMAControlHeader head;
2615     int ret = 0;
2616     ssize_t i;
2617     size_t done = 0;
2618 
2619     CHECK_ERROR_STATE();
2620 
2621     for (i = 0; i < niov; i++) {
2622         size_t want = iov[i].iov_len;
2623         uint8_t *data = (void *)iov[i].iov_base;
2624 
2625         /*
2626          * First, we hold on to the last SEND message we
2627          * were given and dish out the bytes until we run
2628          * out of bytes.
2629          */
2630         ret = qemu_rdma_fill(rioc->rdma, data, want, 0);
2631         done += ret;
2632         want -= ret;
2633         /* Got what we needed, so go to next iovec */
2634         if (want == 0) {
2635             continue;
2636         }
2637 
2638         /* If we got any data so far, then don't wait
2639          * for more, just return what we have */
2640         if (done > 0) {
2641             break;
2642         }
2643 
2644 
2645         /* We've got nothing at all, so lets wait for
2646          * more to arrive
2647          */
2648         ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE);
2649 
2650         if (ret < 0) {
2651             rdma->error_state = ret;
2652             return ret;
2653         }
2654 
2655         /*
2656          * SEND was received with new bytes, now try again.
2657          */
2658         ret = qemu_rdma_fill(rioc->rdma, data, want, 0);
2659         done += ret;
2660         want -= ret;
2661 
2662         /* Still didn't get enough, so lets just return */
2663         if (want) {
2664             if (done == 0) {
2665                 return QIO_CHANNEL_ERR_BLOCK;
2666             } else {
2667                 break;
2668             }
2669         }
2670     }
2671     rioc->len = done;
2672     return rioc->len;
2673 }
2674 
2675 /*
2676  * Block until all the outstanding chunks have been delivered by the hardware.
2677  */
2678 static int qemu_rdma_drain_cq(QEMUFile *f, RDMAContext *rdma)
2679 {
2680     int ret;
2681 
2682     if (qemu_rdma_write_flush(f, rdma) < 0) {
2683         return -EIO;
2684     }
2685 
2686     while (rdma->nb_sent) {
2687         ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
2688         if (ret < 0) {
2689             error_report("rdma migration: complete polling error!");
2690             return -EIO;
2691         }
2692     }
2693 
2694     qemu_rdma_unregister_waiting(rdma);
2695 
2696     return 0;
2697 }
2698 
2699 
2700 static int qio_channel_rdma_set_blocking(QIOChannel *ioc,
2701                                          bool blocking,
2702                                          Error **errp)
2703 {
2704     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2705     /* XXX we should make readv/writev actually honour this :-) */
2706     rioc->blocking = blocking;
2707     return 0;
2708 }
2709 
2710 
2711 typedef struct QIOChannelRDMASource QIOChannelRDMASource;
2712 struct QIOChannelRDMASource {
2713     GSource parent;
2714     QIOChannelRDMA *rioc;
2715     GIOCondition condition;
2716 };
2717 
2718 static gboolean
2719 qio_channel_rdma_source_prepare(GSource *source,
2720                                 gint *timeout)
2721 {
2722     QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
2723     RDMAContext *rdma = rsource->rioc->rdma;
2724     GIOCondition cond = 0;
2725     *timeout = -1;
2726 
2727     if (rdma->wr_data[0].control_len) {
2728         cond |= G_IO_IN;
2729     }
2730     cond |= G_IO_OUT;
2731 
2732     return cond & rsource->condition;
2733 }
2734 
2735 static gboolean
2736 qio_channel_rdma_source_check(GSource *source)
2737 {
2738     QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
2739     RDMAContext *rdma = rsource->rioc->rdma;
2740     GIOCondition cond = 0;
2741 
2742     if (rdma->wr_data[0].control_len) {
2743         cond |= G_IO_IN;
2744     }
2745     cond |= G_IO_OUT;
2746 
2747     return cond & rsource->condition;
2748 }
2749 
2750 static gboolean
2751 qio_channel_rdma_source_dispatch(GSource *source,
2752                                  GSourceFunc callback,
2753                                  gpointer user_data)
2754 {
2755     QIOChannelFunc func = (QIOChannelFunc)callback;
2756     QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
2757     RDMAContext *rdma = rsource->rioc->rdma;
2758     GIOCondition cond = 0;
2759 
2760     if (rdma->wr_data[0].control_len) {
2761         cond |= G_IO_IN;
2762     }
2763     cond |= G_IO_OUT;
2764 
2765     return (*func)(QIO_CHANNEL(rsource->rioc),
2766                    (cond & rsource->condition),
2767                    user_data);
2768 }
2769 
2770 static void
2771 qio_channel_rdma_source_finalize(GSource *source)
2772 {
2773     QIOChannelRDMASource *ssource = (QIOChannelRDMASource *)source;
2774 
2775     object_unref(OBJECT(ssource->rioc));
2776 }
2777 
2778 GSourceFuncs qio_channel_rdma_source_funcs = {
2779     qio_channel_rdma_source_prepare,
2780     qio_channel_rdma_source_check,
2781     qio_channel_rdma_source_dispatch,
2782     qio_channel_rdma_source_finalize
2783 };
2784 
2785 static GSource *qio_channel_rdma_create_watch(QIOChannel *ioc,
2786                                               GIOCondition condition)
2787 {
2788     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2789     QIOChannelRDMASource *ssource;
2790     GSource *source;
2791 
2792     source = g_source_new(&qio_channel_rdma_source_funcs,
2793                           sizeof(QIOChannelRDMASource));
2794     ssource = (QIOChannelRDMASource *)source;
2795 
2796     ssource->rioc = rioc;
2797     object_ref(OBJECT(rioc));
2798 
2799     ssource->condition = condition;
2800 
2801     return source;
2802 }
2803 
2804 
2805 static int qio_channel_rdma_close(QIOChannel *ioc,
2806                                   Error **errp)
2807 {
2808     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2809     trace_qemu_rdma_close();
2810     if (rioc->rdma) {
2811         if (!rioc->rdma->error_state) {
2812             rioc->rdma->error_state = qemu_file_get_error(rioc->file);
2813         }
2814         qemu_rdma_cleanup(rioc->rdma);
2815         g_free(rioc->rdma);
2816         rioc->rdma = NULL;
2817     }
2818     return 0;
2819 }
2820 
2821 /*
2822  * Parameters:
2823  *    @offset == 0 :
2824  *        This means that 'block_offset' is a full virtual address that does not
2825  *        belong to a RAMBlock of the virtual machine and instead
2826  *        represents a private malloc'd memory area that the caller wishes to
2827  *        transfer.
2828  *
2829  *    @offset != 0 :
2830  *        Offset is an offset to be added to block_offset and used
2831  *        to also lookup the corresponding RAMBlock.
2832  *
2833  *    @size > 0 :
2834  *        Initiate an transfer this size.
2835  *
2836  *    @size == 0 :
2837  *        A 'hint' or 'advice' that means that we wish to speculatively
2838  *        and asynchronously unregister this memory. In this case, there is no
2839  *        guarantee that the unregister will actually happen, for example,
2840  *        if the memory is being actively transmitted. Additionally, the memory
2841  *        may be re-registered at any future time if a write within the same
2842  *        chunk was requested again, even if you attempted to unregister it
2843  *        here.
2844  *
2845  *    @size < 0 : TODO, not yet supported
2846  *        Unregister the memory NOW. This means that the caller does not
2847  *        expect there to be any future RDMA transfers and we just want to clean
2848  *        things up. This is used in case the upper layer owns the memory and
2849  *        cannot wait for qemu_fclose() to occur.
2850  *
2851  *    @bytes_sent : User-specificed pointer to indicate how many bytes were
2852  *                  sent. Usually, this will not be more than a few bytes of
2853  *                  the protocol because most transfers are sent asynchronously.
2854  */
2855 static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque,
2856                                   ram_addr_t block_offset, ram_addr_t offset,
2857                                   size_t size, uint64_t *bytes_sent)
2858 {
2859     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
2860     RDMAContext *rdma = rioc->rdma;
2861     int ret;
2862 
2863     CHECK_ERROR_STATE();
2864 
2865     qemu_fflush(f);
2866 
2867     if (size > 0) {
2868         /*
2869          * Add this page to the current 'chunk'. If the chunk
2870          * is full, or the page doen't belong to the current chunk,
2871          * an actual RDMA write will occur and a new chunk will be formed.
2872          */
2873         ret = qemu_rdma_write(f, rdma, block_offset, offset, size);
2874         if (ret < 0) {
2875             error_report("rdma migration: write error! %d", ret);
2876             goto err;
2877         }
2878 
2879         /*
2880          * We always return 1 bytes because the RDMA
2881          * protocol is completely asynchronous. We do not yet know
2882          * whether an  identified chunk is zero or not because we're
2883          * waiting for other pages to potentially be merged with
2884          * the current chunk. So, we have to call qemu_update_position()
2885          * later on when the actual write occurs.
2886          */
2887         if (bytes_sent) {
2888             *bytes_sent = 1;
2889         }
2890     } else {
2891         uint64_t index, chunk;
2892 
2893         /* TODO: Change QEMUFileOps prototype to be signed: size_t => long
2894         if (size < 0) {
2895             ret = qemu_rdma_drain_cq(f, rdma);
2896             if (ret < 0) {
2897                 fprintf(stderr, "rdma: failed to synchronously drain"
2898                                 " completion queue before unregistration.\n");
2899                 goto err;
2900             }
2901         }
2902         */
2903 
2904         ret = qemu_rdma_search_ram_block(rdma, block_offset,
2905                                          offset, size, &index, &chunk);
2906 
2907         if (ret) {
2908             error_report("ram block search failed");
2909             goto err;
2910         }
2911 
2912         qemu_rdma_signal_unregister(rdma, index, chunk, 0);
2913 
2914         /*
2915          * TODO: Synchronous, guaranteed unregistration (should not occur during
2916          * fast-path). Otherwise, unregisters will process on the next call to
2917          * qemu_rdma_drain_cq()
2918         if (size < 0) {
2919             qemu_rdma_unregister_waiting(rdma);
2920         }
2921         */
2922     }
2923 
2924     /*
2925      * Drain the Completion Queue if possible, but do not block,
2926      * just poll.
2927      *
2928      * If nothing to poll, the end of the iteration will do this
2929      * again to make sure we don't overflow the request queue.
2930      */
2931     while (1) {
2932         uint64_t wr_id, wr_id_in;
2933         int ret = qemu_rdma_poll(rdma, &wr_id_in, NULL);
2934         if (ret < 0) {
2935             error_report("rdma migration: polling error! %d", ret);
2936             goto err;
2937         }
2938 
2939         wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
2940 
2941         if (wr_id == RDMA_WRID_NONE) {
2942             break;
2943         }
2944     }
2945 
2946     return RAM_SAVE_CONTROL_DELAYED;
2947 err:
2948     rdma->error_state = ret;
2949     return ret;
2950 }
2951 
2952 static int qemu_rdma_accept(RDMAContext *rdma)
2953 {
2954     RDMACapabilities cap;
2955     struct rdma_conn_param conn_param = {
2956                                             .responder_resources = 2,
2957                                             .private_data = &cap,
2958                                             .private_data_len = sizeof(cap),
2959                                          };
2960     struct rdma_cm_event *cm_event;
2961     struct ibv_context *verbs;
2962     int ret = -EINVAL;
2963     int idx;
2964 
2965     ret = rdma_get_cm_event(rdma->channel, &cm_event);
2966     if (ret) {
2967         goto err_rdma_dest_wait;
2968     }
2969 
2970     if (cm_event->event != RDMA_CM_EVENT_CONNECT_REQUEST) {
2971         rdma_ack_cm_event(cm_event);
2972         goto err_rdma_dest_wait;
2973     }
2974 
2975     memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
2976 
2977     network_to_caps(&cap);
2978 
2979     if (cap.version < 1 || cap.version > RDMA_CONTROL_VERSION_CURRENT) {
2980             error_report("Unknown source RDMA version: %d, bailing...",
2981                             cap.version);
2982             rdma_ack_cm_event(cm_event);
2983             goto err_rdma_dest_wait;
2984     }
2985 
2986     /*
2987      * Respond with only the capabilities this version of QEMU knows about.
2988      */
2989     cap.flags &= known_capabilities;
2990 
2991     /*
2992      * Enable the ones that we do know about.
2993      * Add other checks here as new ones are introduced.
2994      */
2995     if (cap.flags & RDMA_CAPABILITY_PIN_ALL) {
2996         rdma->pin_all = true;
2997     }
2998 
2999     rdma->cm_id = cm_event->id;
3000     verbs = cm_event->id->verbs;
3001 
3002     rdma_ack_cm_event(cm_event);
3003 
3004     trace_qemu_rdma_accept_pin_state(rdma->pin_all);
3005 
3006     caps_to_network(&cap);
3007 
3008     trace_qemu_rdma_accept_pin_verbsc(verbs);
3009 
3010     if (!rdma->verbs) {
3011         rdma->verbs = verbs;
3012     } else if (rdma->verbs != verbs) {
3013             error_report("ibv context not matching %p, %p!", rdma->verbs,
3014                          verbs);
3015             goto err_rdma_dest_wait;
3016     }
3017 
3018     qemu_rdma_dump_id("dest_init", verbs);
3019 
3020     ret = qemu_rdma_alloc_pd_cq(rdma);
3021     if (ret) {
3022         error_report("rdma migration: error allocating pd and cq!");
3023         goto err_rdma_dest_wait;
3024     }
3025 
3026     ret = qemu_rdma_alloc_qp(rdma);
3027     if (ret) {
3028         error_report("rdma migration: error allocating qp!");
3029         goto err_rdma_dest_wait;
3030     }
3031 
3032     ret = qemu_rdma_init_ram_blocks(rdma);
3033     if (ret) {
3034         error_report("rdma migration: error initializing ram blocks!");
3035         goto err_rdma_dest_wait;
3036     }
3037 
3038     for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
3039         ret = qemu_rdma_reg_control(rdma, idx);
3040         if (ret) {
3041             error_report("rdma: error registering %d control", idx);
3042             goto err_rdma_dest_wait;
3043         }
3044     }
3045 
3046     qemu_set_fd_handler(rdma->channel->fd, NULL, NULL, NULL);
3047 
3048     ret = rdma_accept(rdma->cm_id, &conn_param);
3049     if (ret) {
3050         error_report("rdma_accept returns %d", ret);
3051         goto err_rdma_dest_wait;
3052     }
3053 
3054     ret = rdma_get_cm_event(rdma->channel, &cm_event);
3055     if (ret) {
3056         error_report("rdma_accept get_cm_event failed %d", ret);
3057         goto err_rdma_dest_wait;
3058     }
3059 
3060     if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
3061         error_report("rdma_accept not event established");
3062         rdma_ack_cm_event(cm_event);
3063         goto err_rdma_dest_wait;
3064     }
3065 
3066     rdma_ack_cm_event(cm_event);
3067     rdma->connected = true;
3068 
3069     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
3070     if (ret) {
3071         error_report("rdma migration: error posting second control recv");
3072         goto err_rdma_dest_wait;
3073     }
3074 
3075     qemu_rdma_dump_gid("dest_connect", rdma->cm_id);
3076 
3077     return 0;
3078 
3079 err_rdma_dest_wait:
3080     rdma->error_state = ret;
3081     qemu_rdma_cleanup(rdma);
3082     return ret;
3083 }
3084 
3085 static int dest_ram_sort_func(const void *a, const void *b)
3086 {
3087     unsigned int a_index = ((const RDMALocalBlock *)a)->src_index;
3088     unsigned int b_index = ((const RDMALocalBlock *)b)->src_index;
3089 
3090     return (a_index < b_index) ? -1 : (a_index != b_index);
3091 }
3092 
3093 /*
3094  * During each iteration of the migration, we listen for instructions
3095  * by the source VM to perform dynamic page registrations before they
3096  * can perform RDMA operations.
3097  *
3098  * We respond with the 'rkey'.
3099  *
3100  * Keep doing this until the source tells us to stop.
3101  */
3102 static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque)
3103 {
3104     RDMAControlHeader reg_resp = { .len = sizeof(RDMARegisterResult),
3105                                .type = RDMA_CONTROL_REGISTER_RESULT,
3106                                .repeat = 0,
3107                              };
3108     RDMAControlHeader unreg_resp = { .len = 0,
3109                                .type = RDMA_CONTROL_UNREGISTER_FINISHED,
3110                                .repeat = 0,
3111                              };
3112     RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT,
3113                                  .repeat = 1 };
3114     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
3115     RDMAContext *rdma = rioc->rdma;
3116     RDMALocalBlocks *local = &rdma->local_ram_blocks;
3117     RDMAControlHeader head;
3118     RDMARegister *reg, *registers;
3119     RDMACompress *comp;
3120     RDMARegisterResult *reg_result;
3121     static RDMARegisterResult results[RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE];
3122     RDMALocalBlock *block;
3123     void *host_addr;
3124     int ret = 0;
3125     int idx = 0;
3126     int count = 0;
3127     int i = 0;
3128 
3129     CHECK_ERROR_STATE();
3130 
3131     do {
3132         trace_qemu_rdma_registration_handle_wait();
3133 
3134         ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_NONE);
3135 
3136         if (ret < 0) {
3137             break;
3138         }
3139 
3140         if (head.repeat > RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE) {
3141             error_report("rdma: Too many requests in this message (%d)."
3142                             "Bailing.", head.repeat);
3143             ret = -EIO;
3144             break;
3145         }
3146 
3147         switch (head.type) {
3148         case RDMA_CONTROL_COMPRESS:
3149             comp = (RDMACompress *) rdma->wr_data[idx].control_curr;
3150             network_to_compress(comp);
3151 
3152             trace_qemu_rdma_registration_handle_compress(comp->length,
3153                                                          comp->block_idx,
3154                                                          comp->offset);
3155             if (comp->block_idx >= rdma->local_ram_blocks.nb_blocks) {
3156                 error_report("rdma: 'compress' bad block index %u (vs %d)",
3157                              (unsigned int)comp->block_idx,
3158                              rdma->local_ram_blocks.nb_blocks);
3159                 ret = -EIO;
3160                 goto out;
3161             }
3162             block = &(rdma->local_ram_blocks.block[comp->block_idx]);
3163 
3164             host_addr = block->local_host_addr +
3165                             (comp->offset - block->offset);
3166 
3167             ram_handle_compressed(host_addr, comp->value, comp->length);
3168             break;
3169 
3170         case RDMA_CONTROL_REGISTER_FINISHED:
3171             trace_qemu_rdma_registration_handle_finished();
3172             goto out;
3173 
3174         case RDMA_CONTROL_RAM_BLOCKS_REQUEST:
3175             trace_qemu_rdma_registration_handle_ram_blocks();
3176 
3177             /* Sort our local RAM Block list so it's the same as the source,
3178              * we can do this since we've filled in a src_index in the list
3179              * as we received the RAMBlock list earlier.
3180              */
3181             qsort(rdma->local_ram_blocks.block,
3182                   rdma->local_ram_blocks.nb_blocks,
3183                   sizeof(RDMALocalBlock), dest_ram_sort_func);
3184             if (rdma->pin_all) {
3185                 ret = qemu_rdma_reg_whole_ram_blocks(rdma);
3186                 if (ret) {
3187                     error_report("rdma migration: error dest "
3188                                     "registering ram blocks");
3189                     goto out;
3190                 }
3191             }
3192 
3193             /*
3194              * Dest uses this to prepare to transmit the RAMBlock descriptions
3195              * to the source VM after connection setup.
3196              * Both sides use the "remote" structure to communicate and update
3197              * their "local" descriptions with what was sent.
3198              */
3199             for (i = 0; i < local->nb_blocks; i++) {
3200                 rdma->dest_blocks[i].remote_host_addr =
3201                     (uintptr_t)(local->block[i].local_host_addr);
3202 
3203                 if (rdma->pin_all) {
3204                     rdma->dest_blocks[i].remote_rkey = local->block[i].mr->rkey;
3205                 }
3206 
3207                 rdma->dest_blocks[i].offset = local->block[i].offset;
3208                 rdma->dest_blocks[i].length = local->block[i].length;
3209 
3210                 dest_block_to_network(&rdma->dest_blocks[i]);
3211                 trace_qemu_rdma_registration_handle_ram_blocks_loop(
3212                     local->block[i].block_name,
3213                     local->block[i].offset,
3214                     local->block[i].length,
3215                     local->block[i].local_host_addr,
3216                     local->block[i].src_index);
3217             }
3218 
3219             blocks.len = rdma->local_ram_blocks.nb_blocks
3220                                                 * sizeof(RDMADestBlock);
3221 
3222 
3223             ret = qemu_rdma_post_send_control(rdma,
3224                                         (uint8_t *) rdma->dest_blocks, &blocks);
3225 
3226             if (ret < 0) {
3227                 error_report("rdma migration: error sending remote info");
3228                 goto out;
3229             }
3230 
3231             break;
3232         case RDMA_CONTROL_REGISTER_REQUEST:
3233             trace_qemu_rdma_registration_handle_register(head.repeat);
3234 
3235             reg_resp.repeat = head.repeat;
3236             registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
3237 
3238             for (count = 0; count < head.repeat; count++) {
3239                 uint64_t chunk;
3240                 uint8_t *chunk_start, *chunk_end;
3241 
3242                 reg = &registers[count];
3243                 network_to_register(reg);
3244 
3245                 reg_result = &results[count];
3246 
3247                 trace_qemu_rdma_registration_handle_register_loop(count,
3248                          reg->current_index, reg->key.current_addr, reg->chunks);
3249 
3250                 if (reg->current_index >= rdma->local_ram_blocks.nb_blocks) {
3251                     error_report("rdma: 'register' bad block index %u (vs %d)",
3252                                  (unsigned int)reg->current_index,
3253                                  rdma->local_ram_blocks.nb_blocks);
3254                     ret = -ENOENT;
3255                     goto out;
3256                 }
3257                 block = &(rdma->local_ram_blocks.block[reg->current_index]);
3258                 if (block->is_ram_block) {
3259                     if (block->offset > reg->key.current_addr) {
3260                         error_report("rdma: bad register address for block %s"
3261                             " offset: %" PRIx64 " current_addr: %" PRIx64,
3262                             block->block_name, block->offset,
3263                             reg->key.current_addr);
3264                         ret = -ERANGE;
3265                         goto out;
3266                     }
3267                     host_addr = (block->local_host_addr +
3268                                 (reg->key.current_addr - block->offset));
3269                     chunk = ram_chunk_index(block->local_host_addr,
3270                                             (uint8_t *) host_addr);
3271                 } else {
3272                     chunk = reg->key.chunk;
3273                     host_addr = block->local_host_addr +
3274                         (reg->key.chunk * (1UL << RDMA_REG_CHUNK_SHIFT));
3275                     /* Check for particularly bad chunk value */
3276                     if (host_addr < (void *)block->local_host_addr) {
3277                         error_report("rdma: bad chunk for block %s"
3278                             " chunk: %" PRIx64,
3279                             block->block_name, reg->key.chunk);
3280                         ret = -ERANGE;
3281                         goto out;
3282                     }
3283                 }
3284                 chunk_start = ram_chunk_start(block, chunk);
3285                 chunk_end = ram_chunk_end(block, chunk + reg->chunks);
3286                 if (qemu_rdma_register_and_get_keys(rdma, block,
3287                             (uintptr_t)host_addr, NULL, &reg_result->rkey,
3288                             chunk, chunk_start, chunk_end)) {
3289                     error_report("cannot get rkey");
3290                     ret = -EINVAL;
3291                     goto out;
3292                 }
3293 
3294                 reg_result->host_addr = (uintptr_t)block->local_host_addr;
3295 
3296                 trace_qemu_rdma_registration_handle_register_rkey(
3297                                                            reg_result->rkey);
3298 
3299                 result_to_network(reg_result);
3300             }
3301 
3302             ret = qemu_rdma_post_send_control(rdma,
3303                             (uint8_t *) results, &reg_resp);
3304 
3305             if (ret < 0) {
3306                 error_report("Failed to send control buffer");
3307                 goto out;
3308             }
3309             break;
3310         case RDMA_CONTROL_UNREGISTER_REQUEST:
3311             trace_qemu_rdma_registration_handle_unregister(head.repeat);
3312             unreg_resp.repeat = head.repeat;
3313             registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
3314 
3315             for (count = 0; count < head.repeat; count++) {
3316                 reg = &registers[count];
3317                 network_to_register(reg);
3318 
3319                 trace_qemu_rdma_registration_handle_unregister_loop(count,
3320                            reg->current_index, reg->key.chunk);
3321 
3322                 block = &(rdma->local_ram_blocks.block[reg->current_index]);
3323 
3324                 ret = ibv_dereg_mr(block->pmr[reg->key.chunk]);
3325                 block->pmr[reg->key.chunk] = NULL;
3326 
3327                 if (ret != 0) {
3328                     perror("rdma unregistration chunk failed");
3329                     ret = -ret;
3330                     goto out;
3331                 }
3332 
3333                 rdma->total_registrations--;
3334 
3335                 trace_qemu_rdma_registration_handle_unregister_success(
3336                                                        reg->key.chunk);
3337             }
3338 
3339             ret = qemu_rdma_post_send_control(rdma, NULL, &unreg_resp);
3340 
3341             if (ret < 0) {
3342                 error_report("Failed to send control buffer");
3343                 goto out;
3344             }
3345             break;
3346         case RDMA_CONTROL_REGISTER_RESULT:
3347             error_report("Invalid RESULT message at dest.");
3348             ret = -EIO;
3349             goto out;
3350         default:
3351             error_report("Unknown control message %s", control_desc[head.type]);
3352             ret = -EIO;
3353             goto out;
3354         }
3355     } while (1);
3356 out:
3357     if (ret < 0) {
3358         rdma->error_state = ret;
3359     }
3360     return ret;
3361 }
3362 
3363 /* Destination:
3364  * Called via a ram_control_load_hook during the initial RAM load section which
3365  * lists the RAMBlocks by name.  This lets us know the order of the RAMBlocks
3366  * on the source.
3367  * We've already built our local RAMBlock list, but not yet sent the list to
3368  * the source.
3369  */
3370 static int
3371 rdma_block_notification_handle(QIOChannelRDMA *rioc, const char *name)
3372 {
3373     RDMAContext *rdma = rioc->rdma;
3374     int curr;
3375     int found = -1;
3376 
3377     /* Find the matching RAMBlock in our local list */
3378     for (curr = 0; curr < rdma->local_ram_blocks.nb_blocks; curr++) {
3379         if (!strcmp(rdma->local_ram_blocks.block[curr].block_name, name)) {
3380             found = curr;
3381             break;
3382         }
3383     }
3384 
3385     if (found == -1) {
3386         error_report("RAMBlock '%s' not found on destination", name);
3387         return -ENOENT;
3388     }
3389 
3390     rdma->local_ram_blocks.block[curr].src_index = rdma->next_src_index;
3391     trace_rdma_block_notification_handle(name, rdma->next_src_index);
3392     rdma->next_src_index++;
3393 
3394     return 0;
3395 }
3396 
3397 static int rdma_load_hook(QEMUFile *f, void *opaque, uint64_t flags, void *data)
3398 {
3399     switch (flags) {
3400     case RAM_CONTROL_BLOCK_REG:
3401         return rdma_block_notification_handle(opaque, data);
3402 
3403     case RAM_CONTROL_HOOK:
3404         return qemu_rdma_registration_handle(f, opaque);
3405 
3406     default:
3407         /* Shouldn't be called with any other values */
3408         abort();
3409     }
3410 }
3411 
3412 static int qemu_rdma_registration_start(QEMUFile *f, void *opaque,
3413                                         uint64_t flags, void *data)
3414 {
3415     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
3416     RDMAContext *rdma = rioc->rdma;
3417 
3418     CHECK_ERROR_STATE();
3419 
3420     trace_qemu_rdma_registration_start(flags);
3421     qemu_put_be64(f, RAM_SAVE_FLAG_HOOK);
3422     qemu_fflush(f);
3423 
3424     return 0;
3425 }
3426 
3427 /*
3428  * Inform dest that dynamic registrations are done for now.
3429  * First, flush writes, if any.
3430  */
3431 static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
3432                                        uint64_t flags, void *data)
3433 {
3434     Error *local_err = NULL, **errp = &local_err;
3435     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
3436     RDMAContext *rdma = rioc->rdma;
3437     RDMAControlHeader head = { .len = 0, .repeat = 1 };
3438     int ret = 0;
3439 
3440     CHECK_ERROR_STATE();
3441 
3442     qemu_fflush(f);
3443     ret = qemu_rdma_drain_cq(f, rdma);
3444 
3445     if (ret < 0) {
3446         goto err;
3447     }
3448 
3449     if (flags == RAM_CONTROL_SETUP) {
3450         RDMAControlHeader resp = {.type = RDMA_CONTROL_RAM_BLOCKS_RESULT };
3451         RDMALocalBlocks *local = &rdma->local_ram_blocks;
3452         int reg_result_idx, i, nb_dest_blocks;
3453 
3454         head.type = RDMA_CONTROL_RAM_BLOCKS_REQUEST;
3455         trace_qemu_rdma_registration_stop_ram();
3456 
3457         /*
3458          * Make sure that we parallelize the pinning on both sides.
3459          * For very large guests, doing this serially takes a really
3460          * long time, so we have to 'interleave' the pinning locally
3461          * with the control messages by performing the pinning on this
3462          * side before we receive the control response from the other
3463          * side that the pinning has completed.
3464          */
3465         ret = qemu_rdma_exchange_send(rdma, &head, NULL, &resp,
3466                     &reg_result_idx, rdma->pin_all ?
3467                     qemu_rdma_reg_whole_ram_blocks : NULL);
3468         if (ret < 0) {
3469             ERROR(errp, "receiving remote info!");
3470             return ret;
3471         }
3472 
3473         nb_dest_blocks = resp.len / sizeof(RDMADestBlock);
3474 
3475         /*
3476          * The protocol uses two different sets of rkeys (mutually exclusive):
3477          * 1. One key to represent the virtual address of the entire ram block.
3478          *    (dynamic chunk registration disabled - pin everything with one rkey.)
3479          * 2. One to represent individual chunks within a ram block.
3480          *    (dynamic chunk registration enabled - pin individual chunks.)
3481          *
3482          * Once the capability is successfully negotiated, the destination transmits
3483          * the keys to use (or sends them later) including the virtual addresses
3484          * and then propagates the remote ram block descriptions to his local copy.
3485          */
3486 
3487         if (local->nb_blocks != nb_dest_blocks) {
3488             ERROR(errp, "ram blocks mismatch (Number of blocks %d vs %d) "
3489                         "Your QEMU command line parameters are probably "
3490                         "not identical on both the source and destination.",
3491                         local->nb_blocks, nb_dest_blocks);
3492             rdma->error_state = -EINVAL;
3493             return -EINVAL;
3494         }
3495 
3496         qemu_rdma_move_header(rdma, reg_result_idx, &resp);
3497         memcpy(rdma->dest_blocks,
3498             rdma->wr_data[reg_result_idx].control_curr, resp.len);
3499         for (i = 0; i < nb_dest_blocks; i++) {
3500             network_to_dest_block(&rdma->dest_blocks[i]);
3501 
3502             /* We require that the blocks are in the same order */
3503             if (rdma->dest_blocks[i].length != local->block[i].length) {
3504                 ERROR(errp, "Block %s/%d has a different length %" PRIu64
3505                             "vs %" PRIu64, local->block[i].block_name, i,
3506                             local->block[i].length,
3507                             rdma->dest_blocks[i].length);
3508                 rdma->error_state = -EINVAL;
3509                 return -EINVAL;
3510             }
3511             local->block[i].remote_host_addr =
3512                     rdma->dest_blocks[i].remote_host_addr;
3513             local->block[i].remote_rkey = rdma->dest_blocks[i].remote_rkey;
3514         }
3515     }
3516 
3517     trace_qemu_rdma_registration_stop(flags);
3518 
3519     head.type = RDMA_CONTROL_REGISTER_FINISHED;
3520     ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL);
3521 
3522     if (ret < 0) {
3523         goto err;
3524     }
3525 
3526     return 0;
3527 err:
3528     rdma->error_state = ret;
3529     return ret;
3530 }
3531 
3532 static const QEMUFileHooks rdma_read_hooks = {
3533     .hook_ram_load = rdma_load_hook,
3534 };
3535 
3536 static const QEMUFileHooks rdma_write_hooks = {
3537     .before_ram_iterate = qemu_rdma_registration_start,
3538     .after_ram_iterate  = qemu_rdma_registration_stop,
3539     .save_page          = qemu_rdma_save_page,
3540 };
3541 
3542 
3543 static void qio_channel_rdma_finalize(Object *obj)
3544 {
3545     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(obj);
3546     if (rioc->rdma) {
3547         qemu_rdma_cleanup(rioc->rdma);
3548         g_free(rioc->rdma);
3549         rioc->rdma = NULL;
3550     }
3551 }
3552 
3553 static void qio_channel_rdma_class_init(ObjectClass *klass,
3554                                         void *class_data G_GNUC_UNUSED)
3555 {
3556     QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass);
3557 
3558     ioc_klass->io_writev = qio_channel_rdma_writev;
3559     ioc_klass->io_readv = qio_channel_rdma_readv;
3560     ioc_klass->io_set_blocking = qio_channel_rdma_set_blocking;
3561     ioc_klass->io_close = qio_channel_rdma_close;
3562     ioc_klass->io_create_watch = qio_channel_rdma_create_watch;
3563 }
3564 
3565 static const TypeInfo qio_channel_rdma_info = {
3566     .parent = TYPE_QIO_CHANNEL,
3567     .name = TYPE_QIO_CHANNEL_RDMA,
3568     .instance_size = sizeof(QIOChannelRDMA),
3569     .instance_finalize = qio_channel_rdma_finalize,
3570     .class_init = qio_channel_rdma_class_init,
3571 };
3572 
3573 static void qio_channel_rdma_register_types(void)
3574 {
3575     type_register_static(&qio_channel_rdma_info);
3576 }
3577 
3578 type_init(qio_channel_rdma_register_types);
3579 
3580 static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
3581 {
3582     QIOChannelRDMA *rioc;
3583 
3584     if (qemu_file_mode_is_not_valid(mode)) {
3585         return NULL;
3586     }
3587 
3588     rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
3589     rioc->rdma = rdma;
3590 
3591     if (mode[0] == 'w') {
3592         rioc->file = qemu_fopen_channel_output(QIO_CHANNEL(rioc));
3593         qemu_file_set_hooks(rioc->file, &rdma_write_hooks);
3594     } else {
3595         rioc->file = qemu_fopen_channel_input(QIO_CHANNEL(rioc));
3596         qemu_file_set_hooks(rioc->file, &rdma_read_hooks);
3597     }
3598 
3599     return rioc->file;
3600 }
3601 
3602 static void rdma_accept_incoming_migration(void *opaque)
3603 {
3604     RDMAContext *rdma = opaque;
3605     int ret;
3606     QEMUFile *f;
3607     Error *local_err = NULL, **errp = &local_err;
3608 
3609     trace_qemu_rdma_accept_incoming_migration();
3610     ret = qemu_rdma_accept(rdma);
3611 
3612     if (ret) {
3613         ERROR(errp, "RDMA Migration initialization failed!");
3614         return;
3615     }
3616 
3617     trace_qemu_rdma_accept_incoming_migration_accepted();
3618 
3619     f = qemu_fopen_rdma(rdma, "rb");
3620     if (f == NULL) {
3621         ERROR(errp, "could not qemu_fopen_rdma!");
3622         qemu_rdma_cleanup(rdma);
3623         return;
3624     }
3625 
3626     rdma->migration_started_on_destination = 1;
3627     migration_fd_process_incoming(f);
3628 }
3629 
3630 void rdma_start_incoming_migration(const char *host_port, Error **errp)
3631 {
3632     int ret;
3633     RDMAContext *rdma;
3634     Error *local_err = NULL;
3635 
3636     trace_rdma_start_incoming_migration();
3637     rdma = qemu_rdma_data_init(host_port, &local_err);
3638 
3639     if (rdma == NULL) {
3640         goto err;
3641     }
3642 
3643     ret = qemu_rdma_dest_init(rdma, &local_err);
3644 
3645     if (ret) {
3646         goto err;
3647     }
3648 
3649     trace_rdma_start_incoming_migration_after_dest_init();
3650 
3651     ret = rdma_listen(rdma->listen_id, 5);
3652 
3653     if (ret) {
3654         ERROR(errp, "listening on socket!");
3655         goto err;
3656     }
3657 
3658     trace_rdma_start_incoming_migration_after_rdma_listen();
3659 
3660     qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
3661                         NULL, (void *)(intptr_t)rdma);
3662     return;
3663 err:
3664     error_propagate(errp, local_err);
3665     g_free(rdma);
3666 }
3667 
3668 void rdma_start_outgoing_migration(void *opaque,
3669                             const char *host_port, Error **errp)
3670 {
3671     MigrationState *s = opaque;
3672     RDMAContext *rdma = qemu_rdma_data_init(host_port, errp);
3673     int ret = 0;
3674 
3675     if (rdma == NULL) {
3676         goto err;
3677     }
3678 
3679     ret = qemu_rdma_source_init(rdma, errp,
3680         s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL]);
3681 
3682     if (ret) {
3683         goto err;
3684     }
3685 
3686     trace_rdma_start_outgoing_migration_after_rdma_source_init();
3687     ret = qemu_rdma_connect(rdma, errp);
3688 
3689     if (ret) {
3690         goto err;
3691     }
3692 
3693     trace_rdma_start_outgoing_migration_after_rdma_connect();
3694 
3695     s->to_dst_file = qemu_fopen_rdma(rdma, "wb");
3696     migrate_fd_connect(s);
3697     return;
3698 err:
3699     g_free(rdma);
3700 }
3701