xref: /qemu/migration/rdma.c (revision ba4912cb)
1 /*
2  * RDMA protocol and interfaces
3  *
4  * Copyright IBM, Corp. 2010-2013
5  * Copyright Red Hat, Inc. 2015-2016
6  *
7  * Authors:
8  *  Michael R. Hines <mrhines@us.ibm.com>
9  *  Jiuxing Liu <jl@us.ibm.com>
10  *  Daniel P. Berrange <berrange@redhat.com>
11  *
12  * This work is licensed under the terms of the GNU GPL, version 2 or
13  * later.  See the COPYING file in the top-level directory.
14  *
15  */
16 #include "qemu/osdep.h"
17 #include "qapi/error.h"
18 #include "qemu-common.h"
19 #include "qemu/cutils.h"
20 #include "rdma.h"
21 #include "migration.h"
22 #include "qemu-file.h"
23 #include "ram.h"
24 #include "qemu-file-channel.h"
25 #include "qemu/error-report.h"
26 #include "qemu/main-loop.h"
27 #include "qemu/sockets.h"
28 #include "qemu/bitmap.h"
29 #include "qemu/coroutine.h"
30 #include <sys/socket.h>
31 #include <netdb.h>
32 #include <arpa/inet.h>
33 #include <rdma/rdma_cma.h>
34 #include "trace.h"
35 
36 /*
37  * Print and error on both the Monitor and the Log file.
38  */
39 #define ERROR(errp, fmt, ...) \
40     do { \
41         fprintf(stderr, "RDMA ERROR: " fmt "\n", ## __VA_ARGS__); \
42         if (errp && (*(errp) == NULL)) { \
43             error_setg(errp, "RDMA ERROR: " fmt, ## __VA_ARGS__); \
44         } \
45     } while (0)
46 
47 #define RDMA_RESOLVE_TIMEOUT_MS 10000
48 
49 /* Do not merge data if larger than this. */
50 #define RDMA_MERGE_MAX (2 * 1024 * 1024)
51 #define RDMA_SIGNALED_SEND_MAX (RDMA_MERGE_MAX / 4096)
52 
53 #define RDMA_REG_CHUNK_SHIFT 20 /* 1 MB */
54 
55 /*
56  * This is only for non-live state being migrated.
57  * Instead of RDMA_WRITE messages, we use RDMA_SEND
58  * messages for that state, which requires a different
59  * delivery design than main memory.
60  */
61 #define RDMA_SEND_INCREMENT 32768
62 
63 /*
64  * Maximum size infiniband SEND message
65  */
66 #define RDMA_CONTROL_MAX_BUFFER (512 * 1024)
67 #define RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE 4096
68 
69 #define RDMA_CONTROL_VERSION_CURRENT 1
70 /*
71  * Capabilities for negotiation.
72  */
73 #define RDMA_CAPABILITY_PIN_ALL 0x01
74 
75 /*
76  * Add the other flags above to this list of known capabilities
77  * as they are introduced.
78  */
79 static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL;
80 
81 #define CHECK_ERROR_STATE() \
82     do { \
83         if (rdma->error_state) { \
84             if (!rdma->error_reported) { \
85                 error_report("RDMA is in an error state waiting migration" \
86                                 " to abort!"); \
87                 rdma->error_reported = 1; \
88             } \
89             rcu_read_unlock(); \
90             return rdma->error_state; \
91         } \
92     } while (0)
93 
94 /*
95  * A work request ID is 64-bits and we split up these bits
96  * into 3 parts:
97  *
98  * bits 0-15 : type of control message, 2^16
99  * bits 16-29: ram block index, 2^14
100  * bits 30-63: ram block chunk number, 2^34
101  *
102  * The last two bit ranges are only used for RDMA writes,
103  * in order to track their completion and potentially
104  * also track unregistration status of the message.
105  */
106 #define RDMA_WRID_TYPE_SHIFT  0UL
107 #define RDMA_WRID_BLOCK_SHIFT 16UL
108 #define RDMA_WRID_CHUNK_SHIFT 30UL
109 
110 #define RDMA_WRID_TYPE_MASK \
111     ((1UL << RDMA_WRID_BLOCK_SHIFT) - 1UL)
112 
113 #define RDMA_WRID_BLOCK_MASK \
114     (~RDMA_WRID_TYPE_MASK & ((1UL << RDMA_WRID_CHUNK_SHIFT) - 1UL))
115 
116 #define RDMA_WRID_CHUNK_MASK (~RDMA_WRID_BLOCK_MASK & ~RDMA_WRID_TYPE_MASK)
117 
118 /*
119  * RDMA migration protocol:
120  * 1. RDMA Writes (data messages, i.e. RAM)
121  * 2. IB Send/Recv (control channel messages)
122  */
123 enum {
124     RDMA_WRID_NONE = 0,
125     RDMA_WRID_RDMA_WRITE = 1,
126     RDMA_WRID_SEND_CONTROL = 2000,
127     RDMA_WRID_RECV_CONTROL = 4000,
128 };
129 
130 static const char *wrid_desc[] = {
131     [RDMA_WRID_NONE] = "NONE",
132     [RDMA_WRID_RDMA_WRITE] = "WRITE RDMA",
133     [RDMA_WRID_SEND_CONTROL] = "CONTROL SEND",
134     [RDMA_WRID_RECV_CONTROL] = "CONTROL RECV",
135 };
136 
137 /*
138  * Work request IDs for IB SEND messages only (not RDMA writes).
139  * This is used by the migration protocol to transmit
140  * control messages (such as device state and registration commands)
141  *
142  * We could use more WRs, but we have enough for now.
143  */
144 enum {
145     RDMA_WRID_READY = 0,
146     RDMA_WRID_DATA,
147     RDMA_WRID_CONTROL,
148     RDMA_WRID_MAX,
149 };
150 
151 /*
152  * SEND/RECV IB Control Messages.
153  */
154 enum {
155     RDMA_CONTROL_NONE = 0,
156     RDMA_CONTROL_ERROR,
157     RDMA_CONTROL_READY,               /* ready to receive */
158     RDMA_CONTROL_QEMU_FILE,           /* QEMUFile-transmitted bytes */
159     RDMA_CONTROL_RAM_BLOCKS_REQUEST,  /* RAMBlock synchronization */
160     RDMA_CONTROL_RAM_BLOCKS_RESULT,   /* RAMBlock synchronization */
161     RDMA_CONTROL_COMPRESS,            /* page contains repeat values */
162     RDMA_CONTROL_REGISTER_REQUEST,    /* dynamic page registration */
163     RDMA_CONTROL_REGISTER_RESULT,     /* key to use after registration */
164     RDMA_CONTROL_REGISTER_FINISHED,   /* current iteration finished */
165     RDMA_CONTROL_UNREGISTER_REQUEST,  /* dynamic UN-registration */
166     RDMA_CONTROL_UNREGISTER_FINISHED, /* unpinning finished */
167 };
168 
169 
170 /*
171  * Memory and MR structures used to represent an IB Send/Recv work request.
172  * This is *not* used for RDMA writes, only IB Send/Recv.
173  */
174 typedef struct {
175     uint8_t  control[RDMA_CONTROL_MAX_BUFFER]; /* actual buffer to register */
176     struct   ibv_mr *control_mr;               /* registration metadata */
177     size_t   control_len;                      /* length of the message */
178     uint8_t *control_curr;                     /* start of unconsumed bytes */
179 } RDMAWorkRequestData;
180 
181 /*
182  * Negotiate RDMA capabilities during connection-setup time.
183  */
184 typedef struct {
185     uint32_t version;
186     uint32_t flags;
187 } RDMACapabilities;
188 
189 static void caps_to_network(RDMACapabilities *cap)
190 {
191     cap->version = htonl(cap->version);
192     cap->flags = htonl(cap->flags);
193 }
194 
195 static void network_to_caps(RDMACapabilities *cap)
196 {
197     cap->version = ntohl(cap->version);
198     cap->flags = ntohl(cap->flags);
199 }
200 
201 /*
202  * Representation of a RAMBlock from an RDMA perspective.
203  * This is not transmitted, only local.
204  * This and subsequent structures cannot be linked lists
205  * because we're using a single IB message to transmit
206  * the information. It's small anyway, so a list is overkill.
207  */
208 typedef struct RDMALocalBlock {
209     char          *block_name;
210     uint8_t       *local_host_addr; /* local virtual address */
211     uint64_t       remote_host_addr; /* remote virtual address */
212     uint64_t       offset;
213     uint64_t       length;
214     struct         ibv_mr **pmr;    /* MRs for chunk-level registration */
215     struct         ibv_mr *mr;      /* MR for non-chunk-level registration */
216     uint32_t      *remote_keys;     /* rkeys for chunk-level registration */
217     uint32_t       remote_rkey;     /* rkeys for non-chunk-level registration */
218     int            index;           /* which block are we */
219     unsigned int   src_index;       /* (Only used on dest) */
220     bool           is_ram_block;
221     int            nb_chunks;
222     unsigned long *transit_bitmap;
223     unsigned long *unregister_bitmap;
224 } RDMALocalBlock;
225 
226 /*
227  * Also represents a RAMblock, but only on the dest.
228  * This gets transmitted by the dest during connection-time
229  * to the source VM and then is used to populate the
230  * corresponding RDMALocalBlock with
231  * the information needed to perform the actual RDMA.
232  */
233 typedef struct QEMU_PACKED RDMADestBlock {
234     uint64_t remote_host_addr;
235     uint64_t offset;
236     uint64_t length;
237     uint32_t remote_rkey;
238     uint32_t padding;
239 } RDMADestBlock;
240 
241 static const char *control_desc(unsigned int rdma_control)
242 {
243     static const char *strs[] = {
244         [RDMA_CONTROL_NONE] = "NONE",
245         [RDMA_CONTROL_ERROR] = "ERROR",
246         [RDMA_CONTROL_READY] = "READY",
247         [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE",
248         [RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST",
249         [RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT",
250         [RDMA_CONTROL_COMPRESS] = "COMPRESS",
251         [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST",
252         [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT",
253         [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED",
254         [RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST",
255         [RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED",
256     };
257 
258     if (rdma_control > RDMA_CONTROL_UNREGISTER_FINISHED) {
259         return "??BAD CONTROL VALUE??";
260     }
261 
262     return strs[rdma_control];
263 }
264 
265 static uint64_t htonll(uint64_t v)
266 {
267     union { uint32_t lv[2]; uint64_t llv; } u;
268     u.lv[0] = htonl(v >> 32);
269     u.lv[1] = htonl(v & 0xFFFFFFFFULL);
270     return u.llv;
271 }
272 
273 static uint64_t ntohll(uint64_t v) {
274     union { uint32_t lv[2]; uint64_t llv; } u;
275     u.llv = v;
276     return ((uint64_t)ntohl(u.lv[0]) << 32) | (uint64_t) ntohl(u.lv[1]);
277 }
278 
279 static void dest_block_to_network(RDMADestBlock *db)
280 {
281     db->remote_host_addr = htonll(db->remote_host_addr);
282     db->offset = htonll(db->offset);
283     db->length = htonll(db->length);
284     db->remote_rkey = htonl(db->remote_rkey);
285 }
286 
287 static void network_to_dest_block(RDMADestBlock *db)
288 {
289     db->remote_host_addr = ntohll(db->remote_host_addr);
290     db->offset = ntohll(db->offset);
291     db->length = ntohll(db->length);
292     db->remote_rkey = ntohl(db->remote_rkey);
293 }
294 
295 /*
296  * Virtual address of the above structures used for transmitting
297  * the RAMBlock descriptions at connection-time.
298  * This structure is *not* transmitted.
299  */
300 typedef struct RDMALocalBlocks {
301     int nb_blocks;
302     bool     init;             /* main memory init complete */
303     RDMALocalBlock *block;
304 } RDMALocalBlocks;
305 
306 /*
307  * Main data structure for RDMA state.
308  * While there is only one copy of this structure being allocated right now,
309  * this is the place where one would start if you wanted to consider
310  * having more than one RDMA connection open at the same time.
311  */
312 typedef struct RDMAContext {
313     char *host;
314     int port;
315 
316     RDMAWorkRequestData wr_data[RDMA_WRID_MAX];
317 
318     /*
319      * This is used by *_exchange_send() to figure out whether or not
320      * the initial "READY" message has already been received or not.
321      * This is because other functions may potentially poll() and detect
322      * the READY message before send() does, in which case we need to
323      * know if it completed.
324      */
325     int control_ready_expected;
326 
327     /* number of outstanding writes */
328     int nb_sent;
329 
330     /* store info about current buffer so that we can
331        merge it with future sends */
332     uint64_t current_addr;
333     uint64_t current_length;
334     /* index of ram block the current buffer belongs to */
335     int current_index;
336     /* index of the chunk in the current ram block */
337     int current_chunk;
338 
339     bool pin_all;
340 
341     /*
342      * infiniband-specific variables for opening the device
343      * and maintaining connection state and so forth.
344      *
345      * cm_id also has ibv_context, rdma_event_channel, and ibv_qp in
346      * cm_id->verbs, cm_id->channel, and cm_id->qp.
347      */
348     struct rdma_cm_id *cm_id;               /* connection manager ID */
349     struct rdma_cm_id *listen_id;
350     bool connected;
351 
352     struct ibv_context          *verbs;
353     struct rdma_event_channel   *channel;
354     struct ibv_qp *qp;                      /* queue pair */
355     struct ibv_comp_channel *comp_channel;  /* completion channel */
356     struct ibv_pd *pd;                      /* protection domain */
357     struct ibv_cq *cq;                      /* completion queue */
358 
359     /*
360      * If a previous write failed (perhaps because of a failed
361      * memory registration, then do not attempt any future work
362      * and remember the error state.
363      */
364     int error_state;
365     int error_reported;
366     int received_error;
367 
368     /*
369      * Description of ram blocks used throughout the code.
370      */
371     RDMALocalBlocks local_ram_blocks;
372     RDMADestBlock  *dest_blocks;
373 
374     /* Index of the next RAMBlock received during block registration */
375     unsigned int    next_src_index;
376 
377     /*
378      * Migration on *destination* started.
379      * Then use coroutine yield function.
380      * Source runs in a thread, so we don't care.
381      */
382     int migration_started_on_destination;
383 
384     int total_registrations;
385     int total_writes;
386 
387     int unregister_current, unregister_next;
388     uint64_t unregistrations[RDMA_SIGNALED_SEND_MAX];
389 
390     GHashTable *blockmap;
391 
392     /* the RDMAContext for return path */
393     struct RDMAContext *return_path;
394     bool is_return_path;
395 } RDMAContext;
396 
397 #define TYPE_QIO_CHANNEL_RDMA "qio-channel-rdma"
398 #define QIO_CHANNEL_RDMA(obj)                                     \
399     OBJECT_CHECK(QIOChannelRDMA, (obj), TYPE_QIO_CHANNEL_RDMA)
400 
401 typedef struct QIOChannelRDMA QIOChannelRDMA;
402 
403 
404 struct QIOChannelRDMA {
405     QIOChannel parent;
406     RDMAContext *rdmain;
407     RDMAContext *rdmaout;
408     QEMUFile *file;
409     bool blocking; /* XXX we don't actually honour this yet */
410 };
411 
412 /*
413  * Main structure for IB Send/Recv control messages.
414  * This gets prepended at the beginning of every Send/Recv.
415  */
416 typedef struct QEMU_PACKED {
417     uint32_t len;     /* Total length of data portion */
418     uint32_t type;    /* which control command to perform */
419     uint32_t repeat;  /* number of commands in data portion of same type */
420     uint32_t padding;
421 } RDMAControlHeader;
422 
423 static void control_to_network(RDMAControlHeader *control)
424 {
425     control->type = htonl(control->type);
426     control->len = htonl(control->len);
427     control->repeat = htonl(control->repeat);
428 }
429 
430 static void network_to_control(RDMAControlHeader *control)
431 {
432     control->type = ntohl(control->type);
433     control->len = ntohl(control->len);
434     control->repeat = ntohl(control->repeat);
435 }
436 
437 /*
438  * Register a single Chunk.
439  * Information sent by the source VM to inform the dest
440  * to register an single chunk of memory before we can perform
441  * the actual RDMA operation.
442  */
443 typedef struct QEMU_PACKED {
444     union QEMU_PACKED {
445         uint64_t current_addr;  /* offset into the ram_addr_t space */
446         uint64_t chunk;         /* chunk to lookup if unregistering */
447     } key;
448     uint32_t current_index; /* which ramblock the chunk belongs to */
449     uint32_t padding;
450     uint64_t chunks;            /* how many sequential chunks to register */
451 } RDMARegister;
452 
453 static void register_to_network(RDMAContext *rdma, RDMARegister *reg)
454 {
455     RDMALocalBlock *local_block;
456     local_block  = &rdma->local_ram_blocks.block[reg->current_index];
457 
458     if (local_block->is_ram_block) {
459         /*
460          * current_addr as passed in is an address in the local ram_addr_t
461          * space, we need to translate this for the destination
462          */
463         reg->key.current_addr -= local_block->offset;
464         reg->key.current_addr += rdma->dest_blocks[reg->current_index].offset;
465     }
466     reg->key.current_addr = htonll(reg->key.current_addr);
467     reg->current_index = htonl(reg->current_index);
468     reg->chunks = htonll(reg->chunks);
469 }
470 
471 static void network_to_register(RDMARegister *reg)
472 {
473     reg->key.current_addr = ntohll(reg->key.current_addr);
474     reg->current_index = ntohl(reg->current_index);
475     reg->chunks = ntohll(reg->chunks);
476 }
477 
478 typedef struct QEMU_PACKED {
479     uint32_t value;     /* if zero, we will madvise() */
480     uint32_t block_idx; /* which ram block index */
481     uint64_t offset;    /* Address in remote ram_addr_t space */
482     uint64_t length;    /* length of the chunk */
483 } RDMACompress;
484 
485 static void compress_to_network(RDMAContext *rdma, RDMACompress *comp)
486 {
487     comp->value = htonl(comp->value);
488     /*
489      * comp->offset as passed in is an address in the local ram_addr_t
490      * space, we need to translate this for the destination
491      */
492     comp->offset -= rdma->local_ram_blocks.block[comp->block_idx].offset;
493     comp->offset += rdma->dest_blocks[comp->block_idx].offset;
494     comp->block_idx = htonl(comp->block_idx);
495     comp->offset = htonll(comp->offset);
496     comp->length = htonll(comp->length);
497 }
498 
499 static void network_to_compress(RDMACompress *comp)
500 {
501     comp->value = ntohl(comp->value);
502     comp->block_idx = ntohl(comp->block_idx);
503     comp->offset = ntohll(comp->offset);
504     comp->length = ntohll(comp->length);
505 }
506 
507 /*
508  * The result of the dest's memory registration produces an "rkey"
509  * which the source VM must reference in order to perform
510  * the RDMA operation.
511  */
512 typedef struct QEMU_PACKED {
513     uint32_t rkey;
514     uint32_t padding;
515     uint64_t host_addr;
516 } RDMARegisterResult;
517 
518 static void result_to_network(RDMARegisterResult *result)
519 {
520     result->rkey = htonl(result->rkey);
521     result->host_addr = htonll(result->host_addr);
522 };
523 
524 static void network_to_result(RDMARegisterResult *result)
525 {
526     result->rkey = ntohl(result->rkey);
527     result->host_addr = ntohll(result->host_addr);
528 };
529 
530 const char *print_wrid(int wrid);
531 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
532                                    uint8_t *data, RDMAControlHeader *resp,
533                                    int *resp_idx,
534                                    int (*callback)(RDMAContext *rdma));
535 
536 static inline uint64_t ram_chunk_index(const uint8_t *start,
537                                        const uint8_t *host)
538 {
539     return ((uintptr_t) host - (uintptr_t) start) >> RDMA_REG_CHUNK_SHIFT;
540 }
541 
542 static inline uint8_t *ram_chunk_start(const RDMALocalBlock *rdma_ram_block,
543                                        uint64_t i)
544 {
545     return (uint8_t *)(uintptr_t)(rdma_ram_block->local_host_addr +
546                                   (i << RDMA_REG_CHUNK_SHIFT));
547 }
548 
549 static inline uint8_t *ram_chunk_end(const RDMALocalBlock *rdma_ram_block,
550                                      uint64_t i)
551 {
552     uint8_t *result = ram_chunk_start(rdma_ram_block, i) +
553                                          (1UL << RDMA_REG_CHUNK_SHIFT);
554 
555     if (result > (rdma_ram_block->local_host_addr + rdma_ram_block->length)) {
556         result = rdma_ram_block->local_host_addr + rdma_ram_block->length;
557     }
558 
559     return result;
560 }
561 
562 static int rdma_add_block(RDMAContext *rdma, const char *block_name,
563                          void *host_addr,
564                          ram_addr_t block_offset, uint64_t length)
565 {
566     RDMALocalBlocks *local = &rdma->local_ram_blocks;
567     RDMALocalBlock *block;
568     RDMALocalBlock *old = local->block;
569 
570     local->block = g_new0(RDMALocalBlock, local->nb_blocks + 1);
571 
572     if (local->nb_blocks) {
573         int x;
574 
575         if (rdma->blockmap) {
576             for (x = 0; x < local->nb_blocks; x++) {
577                 g_hash_table_remove(rdma->blockmap,
578                                     (void *)(uintptr_t)old[x].offset);
579                 g_hash_table_insert(rdma->blockmap,
580                                     (void *)(uintptr_t)old[x].offset,
581                                     &local->block[x]);
582             }
583         }
584         memcpy(local->block, old, sizeof(RDMALocalBlock) * local->nb_blocks);
585         g_free(old);
586     }
587 
588     block = &local->block[local->nb_blocks];
589 
590     block->block_name = g_strdup(block_name);
591     block->local_host_addr = host_addr;
592     block->offset = block_offset;
593     block->length = length;
594     block->index = local->nb_blocks;
595     block->src_index = ~0U; /* Filled in by the receipt of the block list */
596     block->nb_chunks = ram_chunk_index(host_addr, host_addr + length) + 1UL;
597     block->transit_bitmap = bitmap_new(block->nb_chunks);
598     bitmap_clear(block->transit_bitmap, 0, block->nb_chunks);
599     block->unregister_bitmap = bitmap_new(block->nb_chunks);
600     bitmap_clear(block->unregister_bitmap, 0, block->nb_chunks);
601     block->remote_keys = g_new0(uint32_t, block->nb_chunks);
602 
603     block->is_ram_block = local->init ? false : true;
604 
605     if (rdma->blockmap) {
606         g_hash_table_insert(rdma->blockmap, (void *)(uintptr_t)block_offset, block);
607     }
608 
609     trace_rdma_add_block(block_name, local->nb_blocks,
610                          (uintptr_t) block->local_host_addr,
611                          block->offset, block->length,
612                          (uintptr_t) (block->local_host_addr + block->length),
613                          BITS_TO_LONGS(block->nb_chunks) *
614                              sizeof(unsigned long) * 8,
615                          block->nb_chunks);
616 
617     local->nb_blocks++;
618 
619     return 0;
620 }
621 
622 /*
623  * Memory regions need to be registered with the device and queue pairs setup
624  * in advanced before the migration starts. This tells us where the RAM blocks
625  * are so that we can register them individually.
626  */
627 static int qemu_rdma_init_one_block(RAMBlock *rb, void *opaque)
628 {
629     const char *block_name = qemu_ram_get_idstr(rb);
630     void *host_addr = qemu_ram_get_host_addr(rb);
631     ram_addr_t block_offset = qemu_ram_get_offset(rb);
632     ram_addr_t length = qemu_ram_get_used_length(rb);
633     return rdma_add_block(opaque, block_name, host_addr, block_offset, length);
634 }
635 
636 /*
637  * Identify the RAMBlocks and their quantity. They will be references to
638  * identify chunk boundaries inside each RAMBlock and also be referenced
639  * during dynamic page registration.
640  */
641 static int qemu_rdma_init_ram_blocks(RDMAContext *rdma)
642 {
643     RDMALocalBlocks *local = &rdma->local_ram_blocks;
644     int ret;
645 
646     assert(rdma->blockmap == NULL);
647     memset(local, 0, sizeof *local);
648     ret = foreach_not_ignored_block(qemu_rdma_init_one_block, rdma);
649     if (ret) {
650         return ret;
651     }
652     trace_qemu_rdma_init_ram_blocks(local->nb_blocks);
653     rdma->dest_blocks = g_new0(RDMADestBlock,
654                                rdma->local_ram_blocks.nb_blocks);
655     local->init = true;
656     return 0;
657 }
658 
659 /*
660  * Note: If used outside of cleanup, the caller must ensure that the destination
661  * block structures are also updated
662  */
663 static int rdma_delete_block(RDMAContext *rdma, RDMALocalBlock *block)
664 {
665     RDMALocalBlocks *local = &rdma->local_ram_blocks;
666     RDMALocalBlock *old = local->block;
667     int x;
668 
669     if (rdma->blockmap) {
670         g_hash_table_remove(rdma->blockmap, (void *)(uintptr_t)block->offset);
671     }
672     if (block->pmr) {
673         int j;
674 
675         for (j = 0; j < block->nb_chunks; j++) {
676             if (!block->pmr[j]) {
677                 continue;
678             }
679             ibv_dereg_mr(block->pmr[j]);
680             rdma->total_registrations--;
681         }
682         g_free(block->pmr);
683         block->pmr = NULL;
684     }
685 
686     if (block->mr) {
687         ibv_dereg_mr(block->mr);
688         rdma->total_registrations--;
689         block->mr = NULL;
690     }
691 
692     g_free(block->transit_bitmap);
693     block->transit_bitmap = NULL;
694 
695     g_free(block->unregister_bitmap);
696     block->unregister_bitmap = NULL;
697 
698     g_free(block->remote_keys);
699     block->remote_keys = NULL;
700 
701     g_free(block->block_name);
702     block->block_name = NULL;
703 
704     if (rdma->blockmap) {
705         for (x = 0; x < local->nb_blocks; x++) {
706             g_hash_table_remove(rdma->blockmap,
707                                 (void *)(uintptr_t)old[x].offset);
708         }
709     }
710 
711     if (local->nb_blocks > 1) {
712 
713         local->block = g_new0(RDMALocalBlock, local->nb_blocks - 1);
714 
715         if (block->index) {
716             memcpy(local->block, old, sizeof(RDMALocalBlock) * block->index);
717         }
718 
719         if (block->index < (local->nb_blocks - 1)) {
720             memcpy(local->block + block->index, old + (block->index + 1),
721                 sizeof(RDMALocalBlock) *
722                     (local->nb_blocks - (block->index + 1)));
723             for (x = block->index; x < local->nb_blocks - 1; x++) {
724                 local->block[x].index--;
725             }
726         }
727     } else {
728         assert(block == local->block);
729         local->block = NULL;
730     }
731 
732     trace_rdma_delete_block(block, (uintptr_t)block->local_host_addr,
733                            block->offset, block->length,
734                             (uintptr_t)(block->local_host_addr + block->length),
735                            BITS_TO_LONGS(block->nb_chunks) *
736                                sizeof(unsigned long) * 8, block->nb_chunks);
737 
738     g_free(old);
739 
740     local->nb_blocks--;
741 
742     if (local->nb_blocks && rdma->blockmap) {
743         for (x = 0; x < local->nb_blocks; x++) {
744             g_hash_table_insert(rdma->blockmap,
745                                 (void *)(uintptr_t)local->block[x].offset,
746                                 &local->block[x]);
747         }
748     }
749 
750     return 0;
751 }
752 
753 /*
754  * Put in the log file which RDMA device was opened and the details
755  * associated with that device.
756  */
757 static void qemu_rdma_dump_id(const char *who, struct ibv_context *verbs)
758 {
759     struct ibv_port_attr port;
760 
761     if (ibv_query_port(verbs, 1, &port)) {
762         error_report("Failed to query port information");
763         return;
764     }
765 
766     printf("%s RDMA Device opened: kernel name %s "
767            "uverbs device name %s, "
768            "infiniband_verbs class device path %s, "
769            "infiniband class device path %s, "
770            "transport: (%d) %s\n",
771                 who,
772                 verbs->device->name,
773                 verbs->device->dev_name,
774                 verbs->device->dev_path,
775                 verbs->device->ibdev_path,
776                 port.link_layer,
777                 (port.link_layer == IBV_LINK_LAYER_INFINIBAND) ? "Infiniband" :
778                  ((port.link_layer == IBV_LINK_LAYER_ETHERNET)
779                     ? "Ethernet" : "Unknown"));
780 }
781 
782 /*
783  * Put in the log file the RDMA gid addressing information,
784  * useful for folks who have trouble understanding the
785  * RDMA device hierarchy in the kernel.
786  */
787 static void qemu_rdma_dump_gid(const char *who, struct rdma_cm_id *id)
788 {
789     char sgid[33];
790     char dgid[33];
791     inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.sgid, sgid, sizeof sgid);
792     inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.dgid, dgid, sizeof dgid);
793     trace_qemu_rdma_dump_gid(who, sgid, dgid);
794 }
795 
796 /*
797  * As of now, IPv6 over RoCE / iWARP is not supported by linux.
798  * We will try the next addrinfo struct, and fail if there are
799  * no other valid addresses to bind against.
800  *
801  * If user is listening on '[::]', then we will not have a opened a device
802  * yet and have no way of verifying if the device is RoCE or not.
803  *
804  * In this case, the source VM will throw an error for ALL types of
805  * connections (both IPv4 and IPv6) if the destination machine does not have
806  * a regular infiniband network available for use.
807  *
808  * The only way to guarantee that an error is thrown for broken kernels is
809  * for the management software to choose a *specific* interface at bind time
810  * and validate what time of hardware it is.
811  *
812  * Unfortunately, this puts the user in a fix:
813  *
814  *  If the source VM connects with an IPv4 address without knowing that the
815  *  destination has bound to '[::]' the migration will unconditionally fail
816  *  unless the management software is explicitly listening on the IPv4
817  *  address while using a RoCE-based device.
818  *
819  *  If the source VM connects with an IPv6 address, then we're OK because we can
820  *  throw an error on the source (and similarly on the destination).
821  *
822  *  But in mixed environments, this will be broken for a while until it is fixed
823  *  inside linux.
824  *
825  * We do provide a *tiny* bit of help in this function: We can list all of the
826  * devices in the system and check to see if all the devices are RoCE or
827  * Infiniband.
828  *
829  * If we detect that we have a *pure* RoCE environment, then we can safely
830  * thrown an error even if the management software has specified '[::]' as the
831  * bind address.
832  *
833  * However, if there is are multiple hetergeneous devices, then we cannot make
834  * this assumption and the user just has to be sure they know what they are
835  * doing.
836  *
837  * Patches are being reviewed on linux-rdma.
838  */
839 static int qemu_rdma_broken_ipv6_kernel(struct ibv_context *verbs, Error **errp)
840 {
841     struct ibv_port_attr port_attr;
842 
843     /* This bug only exists in linux, to our knowledge. */
844 #ifdef CONFIG_LINUX
845 
846     /*
847      * Verbs are only NULL if management has bound to '[::]'.
848      *
849      * Let's iterate through all the devices and see if there any pure IB
850      * devices (non-ethernet).
851      *
852      * If not, then we can safely proceed with the migration.
853      * Otherwise, there are no guarantees until the bug is fixed in linux.
854      */
855     if (!verbs) {
856         int num_devices, x;
857         struct ibv_device ** dev_list = ibv_get_device_list(&num_devices);
858         bool roce_found = false;
859         bool ib_found = false;
860 
861         for (x = 0; x < num_devices; x++) {
862             verbs = ibv_open_device(dev_list[x]);
863             if (!verbs) {
864                 if (errno == EPERM) {
865                     continue;
866                 } else {
867                     return -EINVAL;
868                 }
869             }
870 
871             if (ibv_query_port(verbs, 1, &port_attr)) {
872                 ibv_close_device(verbs);
873                 ERROR(errp, "Could not query initial IB port");
874                 return -EINVAL;
875             }
876 
877             if (port_attr.link_layer == IBV_LINK_LAYER_INFINIBAND) {
878                 ib_found = true;
879             } else if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) {
880                 roce_found = true;
881             }
882 
883             ibv_close_device(verbs);
884 
885         }
886 
887         if (roce_found) {
888             if (ib_found) {
889                 fprintf(stderr, "WARN: migrations may fail:"
890                                 " IPv6 over RoCE / iWARP in linux"
891                                 " is broken. But since you appear to have a"
892                                 " mixed RoCE / IB environment, be sure to only"
893                                 " migrate over the IB fabric until the kernel "
894                                 " fixes the bug.\n");
895             } else {
896                 ERROR(errp, "You only have RoCE / iWARP devices in your systems"
897                             " and your management software has specified '[::]'"
898                             ", but IPv6 over RoCE / iWARP is not supported in Linux.");
899                 return -ENONET;
900             }
901         }
902 
903         return 0;
904     }
905 
906     /*
907      * If we have a verbs context, that means that some other than '[::]' was
908      * used by the management software for binding. In which case we can
909      * actually warn the user about a potentially broken kernel.
910      */
911 
912     /* IB ports start with 1, not 0 */
913     if (ibv_query_port(verbs, 1, &port_attr)) {
914         ERROR(errp, "Could not query initial IB port");
915         return -EINVAL;
916     }
917 
918     if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) {
919         ERROR(errp, "Linux kernel's RoCE / iWARP does not support IPv6 "
920                     "(but patches on linux-rdma in progress)");
921         return -ENONET;
922     }
923 
924 #endif
925 
926     return 0;
927 }
928 
929 /*
930  * Figure out which RDMA device corresponds to the requested IP hostname
931  * Also create the initial connection manager identifiers for opening
932  * the connection.
933  */
934 static int qemu_rdma_resolve_host(RDMAContext *rdma, Error **errp)
935 {
936     int ret;
937     struct rdma_addrinfo *res;
938     char port_str[16];
939     struct rdma_cm_event *cm_event;
940     char ip[40] = "unknown";
941     struct rdma_addrinfo *e;
942 
943     if (rdma->host == NULL || !strcmp(rdma->host, "")) {
944         ERROR(errp, "RDMA hostname has not been set");
945         return -EINVAL;
946     }
947 
948     /* create CM channel */
949     rdma->channel = rdma_create_event_channel();
950     if (!rdma->channel) {
951         ERROR(errp, "could not create CM channel");
952         return -EINVAL;
953     }
954 
955     /* create CM id */
956     ret = rdma_create_id(rdma->channel, &rdma->cm_id, NULL, RDMA_PS_TCP);
957     if (ret) {
958         ERROR(errp, "could not create channel id");
959         goto err_resolve_create_id;
960     }
961 
962     snprintf(port_str, 16, "%d", rdma->port);
963     port_str[15] = '\0';
964 
965     ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res);
966     if (ret < 0) {
967         ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host);
968         goto err_resolve_get_addr;
969     }
970 
971     for (e = res; e != NULL; e = e->ai_next) {
972         inet_ntop(e->ai_family,
973             &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip);
974         trace_qemu_rdma_resolve_host_trying(rdma->host, ip);
975 
976         ret = rdma_resolve_addr(rdma->cm_id, NULL, e->ai_dst_addr,
977                 RDMA_RESOLVE_TIMEOUT_MS);
978         if (!ret) {
979             if (e->ai_family == AF_INET6) {
980                 ret = qemu_rdma_broken_ipv6_kernel(rdma->cm_id->verbs, errp);
981                 if (ret) {
982                     continue;
983                 }
984             }
985             goto route;
986         }
987     }
988 
989     ERROR(errp, "could not resolve address %s", rdma->host);
990     goto err_resolve_get_addr;
991 
992 route:
993     qemu_rdma_dump_gid("source_resolve_addr", rdma->cm_id);
994 
995     ret = rdma_get_cm_event(rdma->channel, &cm_event);
996     if (ret) {
997         ERROR(errp, "could not perform event_addr_resolved");
998         goto err_resolve_get_addr;
999     }
1000 
1001     if (cm_event->event != RDMA_CM_EVENT_ADDR_RESOLVED) {
1002         ERROR(errp, "result not equal to event_addr_resolved %s",
1003                 rdma_event_str(cm_event->event));
1004         perror("rdma_resolve_addr");
1005         rdma_ack_cm_event(cm_event);
1006         ret = -EINVAL;
1007         goto err_resolve_get_addr;
1008     }
1009     rdma_ack_cm_event(cm_event);
1010 
1011     /* resolve route */
1012     ret = rdma_resolve_route(rdma->cm_id, RDMA_RESOLVE_TIMEOUT_MS);
1013     if (ret) {
1014         ERROR(errp, "could not resolve rdma route");
1015         goto err_resolve_get_addr;
1016     }
1017 
1018     ret = rdma_get_cm_event(rdma->channel, &cm_event);
1019     if (ret) {
1020         ERROR(errp, "could not perform event_route_resolved");
1021         goto err_resolve_get_addr;
1022     }
1023     if (cm_event->event != RDMA_CM_EVENT_ROUTE_RESOLVED) {
1024         ERROR(errp, "result not equal to event_route_resolved: %s",
1025                         rdma_event_str(cm_event->event));
1026         rdma_ack_cm_event(cm_event);
1027         ret = -EINVAL;
1028         goto err_resolve_get_addr;
1029     }
1030     rdma_ack_cm_event(cm_event);
1031     rdma->verbs = rdma->cm_id->verbs;
1032     qemu_rdma_dump_id("source_resolve_host", rdma->cm_id->verbs);
1033     qemu_rdma_dump_gid("source_resolve_host", rdma->cm_id);
1034     return 0;
1035 
1036 err_resolve_get_addr:
1037     rdma_destroy_id(rdma->cm_id);
1038     rdma->cm_id = NULL;
1039 err_resolve_create_id:
1040     rdma_destroy_event_channel(rdma->channel);
1041     rdma->channel = NULL;
1042     return ret;
1043 }
1044 
1045 /*
1046  * Create protection domain and completion queues
1047  */
1048 static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma)
1049 {
1050     /* allocate pd */
1051     rdma->pd = ibv_alloc_pd(rdma->verbs);
1052     if (!rdma->pd) {
1053         error_report("failed to allocate protection domain");
1054         return -1;
1055     }
1056 
1057     /* create completion channel */
1058     rdma->comp_channel = ibv_create_comp_channel(rdma->verbs);
1059     if (!rdma->comp_channel) {
1060         error_report("failed to allocate completion channel");
1061         goto err_alloc_pd_cq;
1062     }
1063 
1064     /*
1065      * Completion queue can be filled by both read and write work requests,
1066      * so must reflect the sum of both possible queue sizes.
1067      */
1068     rdma->cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3),
1069             NULL, rdma->comp_channel, 0);
1070     if (!rdma->cq) {
1071         error_report("failed to allocate completion queue");
1072         goto err_alloc_pd_cq;
1073     }
1074 
1075     return 0;
1076 
1077 err_alloc_pd_cq:
1078     if (rdma->pd) {
1079         ibv_dealloc_pd(rdma->pd);
1080     }
1081     if (rdma->comp_channel) {
1082         ibv_destroy_comp_channel(rdma->comp_channel);
1083     }
1084     rdma->pd = NULL;
1085     rdma->comp_channel = NULL;
1086     return -1;
1087 
1088 }
1089 
1090 /*
1091  * Create queue pairs.
1092  */
1093 static int qemu_rdma_alloc_qp(RDMAContext *rdma)
1094 {
1095     struct ibv_qp_init_attr attr = { 0 };
1096     int ret;
1097 
1098     attr.cap.max_send_wr = RDMA_SIGNALED_SEND_MAX;
1099     attr.cap.max_recv_wr = 3;
1100     attr.cap.max_send_sge = 1;
1101     attr.cap.max_recv_sge = 1;
1102     attr.send_cq = rdma->cq;
1103     attr.recv_cq = rdma->cq;
1104     attr.qp_type = IBV_QPT_RC;
1105 
1106     ret = rdma_create_qp(rdma->cm_id, rdma->pd, &attr);
1107     if (ret) {
1108         return -1;
1109     }
1110 
1111     rdma->qp = rdma->cm_id->qp;
1112     return 0;
1113 }
1114 
1115 static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma)
1116 {
1117     int i;
1118     RDMALocalBlocks *local = &rdma->local_ram_blocks;
1119 
1120     for (i = 0; i < local->nb_blocks; i++) {
1121         local->block[i].mr =
1122             ibv_reg_mr(rdma->pd,
1123                     local->block[i].local_host_addr,
1124                     local->block[i].length,
1125                     IBV_ACCESS_LOCAL_WRITE |
1126                     IBV_ACCESS_REMOTE_WRITE
1127                     );
1128         if (!local->block[i].mr) {
1129             perror("Failed to register local dest ram block!\n");
1130             break;
1131         }
1132         rdma->total_registrations++;
1133     }
1134 
1135     if (i >= local->nb_blocks) {
1136         return 0;
1137     }
1138 
1139     for (i--; i >= 0; i--) {
1140         ibv_dereg_mr(local->block[i].mr);
1141         rdma->total_registrations--;
1142     }
1143 
1144     return -1;
1145 
1146 }
1147 
1148 /*
1149  * Find the ram block that corresponds to the page requested to be
1150  * transmitted by QEMU.
1151  *
1152  * Once the block is found, also identify which 'chunk' within that
1153  * block that the page belongs to.
1154  *
1155  * This search cannot fail or the migration will fail.
1156  */
1157 static int qemu_rdma_search_ram_block(RDMAContext *rdma,
1158                                       uintptr_t block_offset,
1159                                       uint64_t offset,
1160                                       uint64_t length,
1161                                       uint64_t *block_index,
1162                                       uint64_t *chunk_index)
1163 {
1164     uint64_t current_addr = block_offset + offset;
1165     RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap,
1166                                                 (void *) block_offset);
1167     assert(block);
1168     assert(current_addr >= block->offset);
1169     assert((current_addr + length) <= (block->offset + block->length));
1170 
1171     *block_index = block->index;
1172     *chunk_index = ram_chunk_index(block->local_host_addr,
1173                 block->local_host_addr + (current_addr - block->offset));
1174 
1175     return 0;
1176 }
1177 
1178 /*
1179  * Register a chunk with IB. If the chunk was already registered
1180  * previously, then skip.
1181  *
1182  * Also return the keys associated with the registration needed
1183  * to perform the actual RDMA operation.
1184  */
1185 static int qemu_rdma_register_and_get_keys(RDMAContext *rdma,
1186         RDMALocalBlock *block, uintptr_t host_addr,
1187         uint32_t *lkey, uint32_t *rkey, int chunk,
1188         uint8_t *chunk_start, uint8_t *chunk_end)
1189 {
1190     if (block->mr) {
1191         if (lkey) {
1192             *lkey = block->mr->lkey;
1193         }
1194         if (rkey) {
1195             *rkey = block->mr->rkey;
1196         }
1197         return 0;
1198     }
1199 
1200     /* allocate memory to store chunk MRs */
1201     if (!block->pmr) {
1202         block->pmr = g_new0(struct ibv_mr *, block->nb_chunks);
1203     }
1204 
1205     /*
1206      * If 'rkey', then we're the destination, so grant access to the source.
1207      *
1208      * If 'lkey', then we're the source VM, so grant access only to ourselves.
1209      */
1210     if (!block->pmr[chunk]) {
1211         uint64_t len = chunk_end - chunk_start;
1212 
1213         trace_qemu_rdma_register_and_get_keys(len, chunk_start);
1214 
1215         block->pmr[chunk] = ibv_reg_mr(rdma->pd,
1216                 chunk_start, len,
1217                 (rkey ? (IBV_ACCESS_LOCAL_WRITE |
1218                         IBV_ACCESS_REMOTE_WRITE) : 0));
1219 
1220         if (!block->pmr[chunk]) {
1221             perror("Failed to register chunk!");
1222             fprintf(stderr, "Chunk details: block: %d chunk index %d"
1223                             " start %" PRIuPTR " end %" PRIuPTR
1224                             " host %" PRIuPTR
1225                             " local %" PRIuPTR " registrations: %d\n",
1226                             block->index, chunk, (uintptr_t)chunk_start,
1227                             (uintptr_t)chunk_end, host_addr,
1228                             (uintptr_t)block->local_host_addr,
1229                             rdma->total_registrations);
1230             return -1;
1231         }
1232         rdma->total_registrations++;
1233     }
1234 
1235     if (lkey) {
1236         *lkey = block->pmr[chunk]->lkey;
1237     }
1238     if (rkey) {
1239         *rkey = block->pmr[chunk]->rkey;
1240     }
1241     return 0;
1242 }
1243 
1244 /*
1245  * Register (at connection time) the memory used for control
1246  * channel messages.
1247  */
1248 static int qemu_rdma_reg_control(RDMAContext *rdma, int idx)
1249 {
1250     rdma->wr_data[idx].control_mr = ibv_reg_mr(rdma->pd,
1251             rdma->wr_data[idx].control, RDMA_CONTROL_MAX_BUFFER,
1252             IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
1253     if (rdma->wr_data[idx].control_mr) {
1254         rdma->total_registrations++;
1255         return 0;
1256     }
1257     error_report("qemu_rdma_reg_control failed");
1258     return -1;
1259 }
1260 
1261 const char *print_wrid(int wrid)
1262 {
1263     if (wrid >= RDMA_WRID_RECV_CONTROL) {
1264         return wrid_desc[RDMA_WRID_RECV_CONTROL];
1265     }
1266     return wrid_desc[wrid];
1267 }
1268 
1269 /*
1270  * RDMA requires memory registration (mlock/pinning), but this is not good for
1271  * overcommitment.
1272  *
1273  * In preparation for the future where LRU information or workload-specific
1274  * writable writable working set memory access behavior is available to QEMU
1275  * it would be nice to have in place the ability to UN-register/UN-pin
1276  * particular memory regions from the RDMA hardware when it is determine that
1277  * those regions of memory will likely not be accessed again in the near future.
1278  *
1279  * While we do not yet have such information right now, the following
1280  * compile-time option allows us to perform a non-optimized version of this
1281  * behavior.
1282  *
1283  * By uncommenting this option, you will cause *all* RDMA transfers to be
1284  * unregistered immediately after the transfer completes on both sides of the
1285  * connection. This has no effect in 'rdma-pin-all' mode, only regular mode.
1286  *
1287  * This will have a terrible impact on migration performance, so until future
1288  * workload information or LRU information is available, do not attempt to use
1289  * this feature except for basic testing.
1290  */
1291 //#define RDMA_UNREGISTRATION_EXAMPLE
1292 
1293 /*
1294  * Perform a non-optimized memory unregistration after every transfer
1295  * for demonstration purposes, only if pin-all is not requested.
1296  *
1297  * Potential optimizations:
1298  * 1. Start a new thread to run this function continuously
1299         - for bit clearing
1300         - and for receipt of unregister messages
1301  * 2. Use an LRU.
1302  * 3. Use workload hints.
1303  */
1304 static int qemu_rdma_unregister_waiting(RDMAContext *rdma)
1305 {
1306     while (rdma->unregistrations[rdma->unregister_current]) {
1307         int ret;
1308         uint64_t wr_id = rdma->unregistrations[rdma->unregister_current];
1309         uint64_t chunk =
1310             (wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
1311         uint64_t index =
1312             (wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
1313         RDMALocalBlock *block =
1314             &(rdma->local_ram_blocks.block[index]);
1315         RDMARegister reg = { .current_index = index };
1316         RDMAControlHeader resp = { .type = RDMA_CONTROL_UNREGISTER_FINISHED,
1317                                  };
1318         RDMAControlHeader head = { .len = sizeof(RDMARegister),
1319                                    .type = RDMA_CONTROL_UNREGISTER_REQUEST,
1320                                    .repeat = 1,
1321                                  };
1322 
1323         trace_qemu_rdma_unregister_waiting_proc(chunk,
1324                                                 rdma->unregister_current);
1325 
1326         rdma->unregistrations[rdma->unregister_current] = 0;
1327         rdma->unregister_current++;
1328 
1329         if (rdma->unregister_current == RDMA_SIGNALED_SEND_MAX) {
1330             rdma->unregister_current = 0;
1331         }
1332 
1333 
1334         /*
1335          * Unregistration is speculative (because migration is single-threaded
1336          * and we cannot break the protocol's inifinband message ordering).
1337          * Thus, if the memory is currently being used for transmission,
1338          * then abort the attempt to unregister and try again
1339          * later the next time a completion is received for this memory.
1340          */
1341         clear_bit(chunk, block->unregister_bitmap);
1342 
1343         if (test_bit(chunk, block->transit_bitmap)) {
1344             trace_qemu_rdma_unregister_waiting_inflight(chunk);
1345             continue;
1346         }
1347 
1348         trace_qemu_rdma_unregister_waiting_send(chunk);
1349 
1350         ret = ibv_dereg_mr(block->pmr[chunk]);
1351         block->pmr[chunk] = NULL;
1352         block->remote_keys[chunk] = 0;
1353 
1354         if (ret != 0) {
1355             perror("unregistration chunk failed");
1356             return -ret;
1357         }
1358         rdma->total_registrations--;
1359 
1360         reg.key.chunk = chunk;
1361         register_to_network(rdma, &reg);
1362         ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
1363                                 &resp, NULL, NULL);
1364         if (ret < 0) {
1365             return ret;
1366         }
1367 
1368         trace_qemu_rdma_unregister_waiting_complete(chunk);
1369     }
1370 
1371     return 0;
1372 }
1373 
1374 static uint64_t qemu_rdma_make_wrid(uint64_t wr_id, uint64_t index,
1375                                          uint64_t chunk)
1376 {
1377     uint64_t result = wr_id & RDMA_WRID_TYPE_MASK;
1378 
1379     result |= (index << RDMA_WRID_BLOCK_SHIFT);
1380     result |= (chunk << RDMA_WRID_CHUNK_SHIFT);
1381 
1382     return result;
1383 }
1384 
1385 /*
1386  * Set bit for unregistration in the next iteration.
1387  * We cannot transmit right here, but will unpin later.
1388  */
1389 static void qemu_rdma_signal_unregister(RDMAContext *rdma, uint64_t index,
1390                                         uint64_t chunk, uint64_t wr_id)
1391 {
1392     if (rdma->unregistrations[rdma->unregister_next] != 0) {
1393         error_report("rdma migration: queue is full");
1394     } else {
1395         RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
1396 
1397         if (!test_and_set_bit(chunk, block->unregister_bitmap)) {
1398             trace_qemu_rdma_signal_unregister_append(chunk,
1399                                                      rdma->unregister_next);
1400 
1401             rdma->unregistrations[rdma->unregister_next++] =
1402                     qemu_rdma_make_wrid(wr_id, index, chunk);
1403 
1404             if (rdma->unregister_next == RDMA_SIGNALED_SEND_MAX) {
1405                 rdma->unregister_next = 0;
1406             }
1407         } else {
1408             trace_qemu_rdma_signal_unregister_already(chunk);
1409         }
1410     }
1411 }
1412 
1413 /*
1414  * Consult the connection manager to see a work request
1415  * (of any kind) has completed.
1416  * Return the work request ID that completed.
1417  */
1418 static uint64_t qemu_rdma_poll(RDMAContext *rdma, uint64_t *wr_id_out,
1419                                uint32_t *byte_len)
1420 {
1421     int ret;
1422     struct ibv_wc wc;
1423     uint64_t wr_id;
1424 
1425     ret = ibv_poll_cq(rdma->cq, 1, &wc);
1426 
1427     if (!ret) {
1428         *wr_id_out = RDMA_WRID_NONE;
1429         return 0;
1430     }
1431 
1432     if (ret < 0) {
1433         error_report("ibv_poll_cq return %d", ret);
1434         return ret;
1435     }
1436 
1437     wr_id = wc.wr_id & RDMA_WRID_TYPE_MASK;
1438 
1439     if (wc.status != IBV_WC_SUCCESS) {
1440         fprintf(stderr, "ibv_poll_cq wc.status=%d %s!\n",
1441                         wc.status, ibv_wc_status_str(wc.status));
1442         fprintf(stderr, "ibv_poll_cq wrid=%s!\n", wrid_desc[wr_id]);
1443 
1444         return -1;
1445     }
1446 
1447     if (rdma->control_ready_expected &&
1448         (wr_id >= RDMA_WRID_RECV_CONTROL)) {
1449         trace_qemu_rdma_poll_recv(wrid_desc[RDMA_WRID_RECV_CONTROL],
1450                   wr_id - RDMA_WRID_RECV_CONTROL, wr_id, rdma->nb_sent);
1451         rdma->control_ready_expected = 0;
1452     }
1453 
1454     if (wr_id == RDMA_WRID_RDMA_WRITE) {
1455         uint64_t chunk =
1456             (wc.wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
1457         uint64_t index =
1458             (wc.wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
1459         RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
1460 
1461         trace_qemu_rdma_poll_write(print_wrid(wr_id), wr_id, rdma->nb_sent,
1462                                    index, chunk, block->local_host_addr,
1463                                    (void *)(uintptr_t)block->remote_host_addr);
1464 
1465         clear_bit(chunk, block->transit_bitmap);
1466 
1467         if (rdma->nb_sent > 0) {
1468             rdma->nb_sent--;
1469         }
1470 
1471         if (!rdma->pin_all) {
1472             /*
1473              * FYI: If one wanted to signal a specific chunk to be unregistered
1474              * using LRU or workload-specific information, this is the function
1475              * you would call to do so. That chunk would then get asynchronously
1476              * unregistered later.
1477              */
1478 #ifdef RDMA_UNREGISTRATION_EXAMPLE
1479             qemu_rdma_signal_unregister(rdma, index, chunk, wc.wr_id);
1480 #endif
1481         }
1482     } else {
1483         trace_qemu_rdma_poll_other(print_wrid(wr_id), wr_id, rdma->nb_sent);
1484     }
1485 
1486     *wr_id_out = wc.wr_id;
1487     if (byte_len) {
1488         *byte_len = wc.byte_len;
1489     }
1490 
1491     return  0;
1492 }
1493 
1494 /* Wait for activity on the completion channel.
1495  * Returns 0 on success, none-0 on error.
1496  */
1497 static int qemu_rdma_wait_comp_channel(RDMAContext *rdma)
1498 {
1499     struct rdma_cm_event *cm_event;
1500     int ret = -1;
1501 
1502     /*
1503      * Coroutine doesn't start until migration_fd_process_incoming()
1504      * so don't yield unless we know we're running inside of a coroutine.
1505      */
1506     if (rdma->migration_started_on_destination &&
1507         migration_incoming_get_current()->state == MIGRATION_STATUS_ACTIVE) {
1508         yield_until_fd_readable(rdma->comp_channel->fd);
1509     } else {
1510         /* This is the source side, we're in a separate thread
1511          * or destination prior to migration_fd_process_incoming()
1512          * after postcopy, the destination also in a seprate thread.
1513          * we can't yield; so we have to poll the fd.
1514          * But we need to be able to handle 'cancel' or an error
1515          * without hanging forever.
1516          */
1517         while (!rdma->error_state  && !rdma->received_error) {
1518             GPollFD pfds[2];
1519             pfds[0].fd = rdma->comp_channel->fd;
1520             pfds[0].events = G_IO_IN | G_IO_HUP | G_IO_ERR;
1521             pfds[0].revents = 0;
1522 
1523             pfds[1].fd = rdma->channel->fd;
1524             pfds[1].events = G_IO_IN | G_IO_HUP | G_IO_ERR;
1525             pfds[1].revents = 0;
1526 
1527             /* 0.1s timeout, should be fine for a 'cancel' */
1528             switch (qemu_poll_ns(pfds, 2, 100 * 1000 * 1000)) {
1529             case 2:
1530             case 1: /* fd active */
1531                 if (pfds[0].revents) {
1532                     return 0;
1533                 }
1534 
1535                 if (pfds[1].revents) {
1536                     ret = rdma_get_cm_event(rdma->channel, &cm_event);
1537                     if (!ret) {
1538                         rdma_ack_cm_event(cm_event);
1539                     }
1540 
1541                     error_report("receive cm event while wait comp channel,"
1542                                  "cm event is %d", cm_event->event);
1543                     if (cm_event->event == RDMA_CM_EVENT_DISCONNECTED ||
1544                         cm_event->event == RDMA_CM_EVENT_DEVICE_REMOVAL) {
1545                         return -EPIPE;
1546                     }
1547                 }
1548                 break;
1549 
1550             case 0: /* Timeout, go around again */
1551                 break;
1552 
1553             default: /* Error of some type -
1554                       * I don't trust errno from qemu_poll_ns
1555                      */
1556                 error_report("%s: poll failed", __func__);
1557                 return -EPIPE;
1558             }
1559 
1560             if (migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) {
1561                 /* Bail out and let the cancellation happen */
1562                 return -EPIPE;
1563             }
1564         }
1565     }
1566 
1567     if (rdma->received_error) {
1568         return -EPIPE;
1569     }
1570     return rdma->error_state;
1571 }
1572 
1573 /*
1574  * Block until the next work request has completed.
1575  *
1576  * First poll to see if a work request has already completed,
1577  * otherwise block.
1578  *
1579  * If we encounter completed work requests for IDs other than
1580  * the one we're interested in, then that's generally an error.
1581  *
1582  * The only exception is actual RDMA Write completions. These
1583  * completions only need to be recorded, but do not actually
1584  * need further processing.
1585  */
1586 static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested,
1587                                     uint32_t *byte_len)
1588 {
1589     int num_cq_events = 0, ret = 0;
1590     struct ibv_cq *cq;
1591     void *cq_ctx;
1592     uint64_t wr_id = RDMA_WRID_NONE, wr_id_in;
1593 
1594     if (ibv_req_notify_cq(rdma->cq, 0)) {
1595         return -1;
1596     }
1597     /* poll cq first */
1598     while (wr_id != wrid_requested) {
1599         ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len);
1600         if (ret < 0) {
1601             return ret;
1602         }
1603 
1604         wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
1605 
1606         if (wr_id == RDMA_WRID_NONE) {
1607             break;
1608         }
1609         if (wr_id != wrid_requested) {
1610             trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested),
1611                        wrid_requested, print_wrid(wr_id), wr_id);
1612         }
1613     }
1614 
1615     if (wr_id == wrid_requested) {
1616         return 0;
1617     }
1618 
1619     while (1) {
1620         ret = qemu_rdma_wait_comp_channel(rdma);
1621         if (ret) {
1622             goto err_block_for_wrid;
1623         }
1624 
1625         ret = ibv_get_cq_event(rdma->comp_channel, &cq, &cq_ctx);
1626         if (ret) {
1627             perror("ibv_get_cq_event");
1628             goto err_block_for_wrid;
1629         }
1630 
1631         num_cq_events++;
1632 
1633         ret = -ibv_req_notify_cq(cq, 0);
1634         if (ret) {
1635             goto err_block_for_wrid;
1636         }
1637 
1638         while (wr_id != wrid_requested) {
1639             ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len);
1640             if (ret < 0) {
1641                 goto err_block_for_wrid;
1642             }
1643 
1644             wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
1645 
1646             if (wr_id == RDMA_WRID_NONE) {
1647                 break;
1648             }
1649             if (wr_id != wrid_requested) {
1650                 trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested),
1651                                    wrid_requested, print_wrid(wr_id), wr_id);
1652             }
1653         }
1654 
1655         if (wr_id == wrid_requested) {
1656             goto success_block_for_wrid;
1657         }
1658     }
1659 
1660 success_block_for_wrid:
1661     if (num_cq_events) {
1662         ibv_ack_cq_events(cq, num_cq_events);
1663     }
1664     return 0;
1665 
1666 err_block_for_wrid:
1667     if (num_cq_events) {
1668         ibv_ack_cq_events(cq, num_cq_events);
1669     }
1670 
1671     rdma->error_state = ret;
1672     return ret;
1673 }
1674 
1675 /*
1676  * Post a SEND message work request for the control channel
1677  * containing some data and block until the post completes.
1678  */
1679 static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf,
1680                                        RDMAControlHeader *head)
1681 {
1682     int ret = 0;
1683     RDMAWorkRequestData *wr = &rdma->wr_data[RDMA_WRID_CONTROL];
1684     struct ibv_send_wr *bad_wr;
1685     struct ibv_sge sge = {
1686                            .addr = (uintptr_t)(wr->control),
1687                            .length = head->len + sizeof(RDMAControlHeader),
1688                            .lkey = wr->control_mr->lkey,
1689                          };
1690     struct ibv_send_wr send_wr = {
1691                                    .wr_id = RDMA_WRID_SEND_CONTROL,
1692                                    .opcode = IBV_WR_SEND,
1693                                    .send_flags = IBV_SEND_SIGNALED,
1694                                    .sg_list = &sge,
1695                                    .num_sge = 1,
1696                                 };
1697 
1698     trace_qemu_rdma_post_send_control(control_desc(head->type));
1699 
1700     /*
1701      * We don't actually need to do a memcpy() in here if we used
1702      * the "sge" properly, but since we're only sending control messages
1703      * (not RAM in a performance-critical path), then its OK for now.
1704      *
1705      * The copy makes the RDMAControlHeader simpler to manipulate
1706      * for the time being.
1707      */
1708     assert(head->len <= RDMA_CONTROL_MAX_BUFFER - sizeof(*head));
1709     memcpy(wr->control, head, sizeof(RDMAControlHeader));
1710     control_to_network((void *) wr->control);
1711 
1712     if (buf) {
1713         memcpy(wr->control + sizeof(RDMAControlHeader), buf, head->len);
1714     }
1715 
1716 
1717     ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
1718 
1719     if (ret > 0) {
1720         error_report("Failed to use post IB SEND for control");
1721         return -ret;
1722     }
1723 
1724     ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_SEND_CONTROL, NULL);
1725     if (ret < 0) {
1726         error_report("rdma migration: send polling control error");
1727     }
1728 
1729     return ret;
1730 }
1731 
1732 /*
1733  * Post a RECV work request in anticipation of some future receipt
1734  * of data on the control channel.
1735  */
1736 static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx)
1737 {
1738     struct ibv_recv_wr *bad_wr;
1739     struct ibv_sge sge = {
1740                             .addr = (uintptr_t)(rdma->wr_data[idx].control),
1741                             .length = RDMA_CONTROL_MAX_BUFFER,
1742                             .lkey = rdma->wr_data[idx].control_mr->lkey,
1743                          };
1744 
1745     struct ibv_recv_wr recv_wr = {
1746                                     .wr_id = RDMA_WRID_RECV_CONTROL + idx,
1747                                     .sg_list = &sge,
1748                                     .num_sge = 1,
1749                                  };
1750 
1751 
1752     if (ibv_post_recv(rdma->qp, &recv_wr, &bad_wr)) {
1753         return -1;
1754     }
1755 
1756     return 0;
1757 }
1758 
1759 /*
1760  * Block and wait for a RECV control channel message to arrive.
1761  */
1762 static int qemu_rdma_exchange_get_response(RDMAContext *rdma,
1763                 RDMAControlHeader *head, int expecting, int idx)
1764 {
1765     uint32_t byte_len;
1766     int ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RECV_CONTROL + idx,
1767                                        &byte_len);
1768 
1769     if (ret < 0) {
1770         error_report("rdma migration: recv polling control error!");
1771         return ret;
1772     }
1773 
1774     network_to_control((void *) rdma->wr_data[idx].control);
1775     memcpy(head, rdma->wr_data[idx].control, sizeof(RDMAControlHeader));
1776 
1777     trace_qemu_rdma_exchange_get_response_start(control_desc(expecting));
1778 
1779     if (expecting == RDMA_CONTROL_NONE) {
1780         trace_qemu_rdma_exchange_get_response_none(control_desc(head->type),
1781                                              head->type);
1782     } else if (head->type != expecting || head->type == RDMA_CONTROL_ERROR) {
1783         error_report("Was expecting a %s (%d) control message"
1784                 ", but got: %s (%d), length: %d",
1785                 control_desc(expecting), expecting,
1786                 control_desc(head->type), head->type, head->len);
1787         if (head->type == RDMA_CONTROL_ERROR) {
1788             rdma->received_error = true;
1789         }
1790         return -EIO;
1791     }
1792     if (head->len > RDMA_CONTROL_MAX_BUFFER - sizeof(*head)) {
1793         error_report("too long length: %d", head->len);
1794         return -EINVAL;
1795     }
1796     if (sizeof(*head) + head->len != byte_len) {
1797         error_report("Malformed length: %d byte_len %d", head->len, byte_len);
1798         return -EINVAL;
1799     }
1800 
1801     return 0;
1802 }
1803 
1804 /*
1805  * When a RECV work request has completed, the work request's
1806  * buffer is pointed at the header.
1807  *
1808  * This will advance the pointer to the data portion
1809  * of the control message of the work request's buffer that
1810  * was populated after the work request finished.
1811  */
1812 static void qemu_rdma_move_header(RDMAContext *rdma, int idx,
1813                                   RDMAControlHeader *head)
1814 {
1815     rdma->wr_data[idx].control_len = head->len;
1816     rdma->wr_data[idx].control_curr =
1817         rdma->wr_data[idx].control + sizeof(RDMAControlHeader);
1818 }
1819 
1820 /*
1821  * This is an 'atomic' high-level operation to deliver a single, unified
1822  * control-channel message.
1823  *
1824  * Additionally, if the user is expecting some kind of reply to this message,
1825  * they can request a 'resp' response message be filled in by posting an
1826  * additional work request on behalf of the user and waiting for an additional
1827  * completion.
1828  *
1829  * The extra (optional) response is used during registration to us from having
1830  * to perform an *additional* exchange of message just to provide a response by
1831  * instead piggy-backing on the acknowledgement.
1832  */
1833 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
1834                                    uint8_t *data, RDMAControlHeader *resp,
1835                                    int *resp_idx,
1836                                    int (*callback)(RDMAContext *rdma))
1837 {
1838     int ret = 0;
1839 
1840     /*
1841      * Wait until the dest is ready before attempting to deliver the message
1842      * by waiting for a READY message.
1843      */
1844     if (rdma->control_ready_expected) {
1845         RDMAControlHeader resp;
1846         ret = qemu_rdma_exchange_get_response(rdma,
1847                                     &resp, RDMA_CONTROL_READY, RDMA_WRID_READY);
1848         if (ret < 0) {
1849             return ret;
1850         }
1851     }
1852 
1853     /*
1854      * If the user is expecting a response, post a WR in anticipation of it.
1855      */
1856     if (resp) {
1857         ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_DATA);
1858         if (ret) {
1859             error_report("rdma migration: error posting"
1860                     " extra control recv for anticipated result!");
1861             return ret;
1862         }
1863     }
1864 
1865     /*
1866      * Post a WR to replace the one we just consumed for the READY message.
1867      */
1868     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
1869     if (ret) {
1870         error_report("rdma migration: error posting first control recv!");
1871         return ret;
1872     }
1873 
1874     /*
1875      * Deliver the control message that was requested.
1876      */
1877     ret = qemu_rdma_post_send_control(rdma, data, head);
1878 
1879     if (ret < 0) {
1880         error_report("Failed to send control buffer!");
1881         return ret;
1882     }
1883 
1884     /*
1885      * If we're expecting a response, block and wait for it.
1886      */
1887     if (resp) {
1888         if (callback) {
1889             trace_qemu_rdma_exchange_send_issue_callback();
1890             ret = callback(rdma);
1891             if (ret < 0) {
1892                 return ret;
1893             }
1894         }
1895 
1896         trace_qemu_rdma_exchange_send_waiting(control_desc(resp->type));
1897         ret = qemu_rdma_exchange_get_response(rdma, resp,
1898                                               resp->type, RDMA_WRID_DATA);
1899 
1900         if (ret < 0) {
1901             return ret;
1902         }
1903 
1904         qemu_rdma_move_header(rdma, RDMA_WRID_DATA, resp);
1905         if (resp_idx) {
1906             *resp_idx = RDMA_WRID_DATA;
1907         }
1908         trace_qemu_rdma_exchange_send_received(control_desc(resp->type));
1909     }
1910 
1911     rdma->control_ready_expected = 1;
1912 
1913     return 0;
1914 }
1915 
1916 /*
1917  * This is an 'atomic' high-level operation to receive a single, unified
1918  * control-channel message.
1919  */
1920 static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader *head,
1921                                 int expecting)
1922 {
1923     RDMAControlHeader ready = {
1924                                 .len = 0,
1925                                 .type = RDMA_CONTROL_READY,
1926                                 .repeat = 1,
1927                               };
1928     int ret;
1929 
1930     /*
1931      * Inform the source that we're ready to receive a message.
1932      */
1933     ret = qemu_rdma_post_send_control(rdma, NULL, &ready);
1934 
1935     if (ret < 0) {
1936         error_report("Failed to send control buffer!");
1937         return ret;
1938     }
1939 
1940     /*
1941      * Block and wait for the message.
1942      */
1943     ret = qemu_rdma_exchange_get_response(rdma, head,
1944                                           expecting, RDMA_WRID_READY);
1945 
1946     if (ret < 0) {
1947         return ret;
1948     }
1949 
1950     qemu_rdma_move_header(rdma, RDMA_WRID_READY, head);
1951 
1952     /*
1953      * Post a new RECV work request to replace the one we just consumed.
1954      */
1955     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
1956     if (ret) {
1957         error_report("rdma migration: error posting second control recv!");
1958         return ret;
1959     }
1960 
1961     return 0;
1962 }
1963 
1964 /*
1965  * Write an actual chunk of memory using RDMA.
1966  *
1967  * If we're using dynamic registration on the dest-side, we have to
1968  * send a registration command first.
1969  */
1970 static int qemu_rdma_write_one(QEMUFile *f, RDMAContext *rdma,
1971                                int current_index, uint64_t current_addr,
1972                                uint64_t length)
1973 {
1974     struct ibv_sge sge;
1975     struct ibv_send_wr send_wr = { 0 };
1976     struct ibv_send_wr *bad_wr;
1977     int reg_result_idx, ret, count = 0;
1978     uint64_t chunk, chunks;
1979     uint8_t *chunk_start, *chunk_end;
1980     RDMALocalBlock *block = &(rdma->local_ram_blocks.block[current_index]);
1981     RDMARegister reg;
1982     RDMARegisterResult *reg_result;
1983     RDMAControlHeader resp = { .type = RDMA_CONTROL_REGISTER_RESULT };
1984     RDMAControlHeader head = { .len = sizeof(RDMARegister),
1985                                .type = RDMA_CONTROL_REGISTER_REQUEST,
1986                                .repeat = 1,
1987                              };
1988 
1989 retry:
1990     sge.addr = (uintptr_t)(block->local_host_addr +
1991                             (current_addr - block->offset));
1992     sge.length = length;
1993 
1994     chunk = ram_chunk_index(block->local_host_addr,
1995                             (uint8_t *)(uintptr_t)sge.addr);
1996     chunk_start = ram_chunk_start(block, chunk);
1997 
1998     if (block->is_ram_block) {
1999         chunks = length / (1UL << RDMA_REG_CHUNK_SHIFT);
2000 
2001         if (chunks && ((length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
2002             chunks--;
2003         }
2004     } else {
2005         chunks = block->length / (1UL << RDMA_REG_CHUNK_SHIFT);
2006 
2007         if (chunks && ((block->length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
2008             chunks--;
2009         }
2010     }
2011 
2012     trace_qemu_rdma_write_one_top(chunks + 1,
2013                                   (chunks + 1) *
2014                                   (1UL << RDMA_REG_CHUNK_SHIFT) / 1024 / 1024);
2015 
2016     chunk_end = ram_chunk_end(block, chunk + chunks);
2017 
2018     if (!rdma->pin_all) {
2019 #ifdef RDMA_UNREGISTRATION_EXAMPLE
2020         qemu_rdma_unregister_waiting(rdma);
2021 #endif
2022     }
2023 
2024     while (test_bit(chunk, block->transit_bitmap)) {
2025         (void)count;
2026         trace_qemu_rdma_write_one_block(count++, current_index, chunk,
2027                 sge.addr, length, rdma->nb_sent, block->nb_chunks);
2028 
2029         ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
2030 
2031         if (ret < 0) {
2032             error_report("Failed to Wait for previous write to complete "
2033                     "block %d chunk %" PRIu64
2034                     " current %" PRIu64 " len %" PRIu64 " %d",
2035                     current_index, chunk, sge.addr, length, rdma->nb_sent);
2036             return ret;
2037         }
2038     }
2039 
2040     if (!rdma->pin_all || !block->is_ram_block) {
2041         if (!block->remote_keys[chunk]) {
2042             /*
2043              * This chunk has not yet been registered, so first check to see
2044              * if the entire chunk is zero. If so, tell the other size to
2045              * memset() + madvise() the entire chunk without RDMA.
2046              */
2047 
2048             if (buffer_is_zero((void *)(uintptr_t)sge.addr, length)) {
2049                 RDMACompress comp = {
2050                                         .offset = current_addr,
2051                                         .value = 0,
2052                                         .block_idx = current_index,
2053                                         .length = length,
2054                                     };
2055 
2056                 head.len = sizeof(comp);
2057                 head.type = RDMA_CONTROL_COMPRESS;
2058 
2059                 trace_qemu_rdma_write_one_zero(chunk, sge.length,
2060                                                current_index, current_addr);
2061 
2062                 compress_to_network(rdma, &comp);
2063                 ret = qemu_rdma_exchange_send(rdma, &head,
2064                                 (uint8_t *) &comp, NULL, NULL, NULL);
2065 
2066                 if (ret < 0) {
2067                     return -EIO;
2068                 }
2069 
2070                 acct_update_position(f, sge.length, true);
2071 
2072                 return 1;
2073             }
2074 
2075             /*
2076              * Otherwise, tell other side to register.
2077              */
2078             reg.current_index = current_index;
2079             if (block->is_ram_block) {
2080                 reg.key.current_addr = current_addr;
2081             } else {
2082                 reg.key.chunk = chunk;
2083             }
2084             reg.chunks = chunks;
2085 
2086             trace_qemu_rdma_write_one_sendreg(chunk, sge.length, current_index,
2087                                               current_addr);
2088 
2089             register_to_network(rdma, &reg);
2090             ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
2091                                     &resp, &reg_result_idx, NULL);
2092             if (ret < 0) {
2093                 return ret;
2094             }
2095 
2096             /* try to overlap this single registration with the one we sent. */
2097             if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
2098                                                 &sge.lkey, NULL, chunk,
2099                                                 chunk_start, chunk_end)) {
2100                 error_report("cannot get lkey");
2101                 return -EINVAL;
2102             }
2103 
2104             reg_result = (RDMARegisterResult *)
2105                     rdma->wr_data[reg_result_idx].control_curr;
2106 
2107             network_to_result(reg_result);
2108 
2109             trace_qemu_rdma_write_one_recvregres(block->remote_keys[chunk],
2110                                                  reg_result->rkey, chunk);
2111 
2112             block->remote_keys[chunk] = reg_result->rkey;
2113             block->remote_host_addr = reg_result->host_addr;
2114         } else {
2115             /* already registered before */
2116             if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
2117                                                 &sge.lkey, NULL, chunk,
2118                                                 chunk_start, chunk_end)) {
2119                 error_report("cannot get lkey!");
2120                 return -EINVAL;
2121             }
2122         }
2123 
2124         send_wr.wr.rdma.rkey = block->remote_keys[chunk];
2125     } else {
2126         send_wr.wr.rdma.rkey = block->remote_rkey;
2127 
2128         if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
2129                                                      &sge.lkey, NULL, chunk,
2130                                                      chunk_start, chunk_end)) {
2131             error_report("cannot get lkey!");
2132             return -EINVAL;
2133         }
2134     }
2135 
2136     /*
2137      * Encode the ram block index and chunk within this wrid.
2138      * We will use this information at the time of completion
2139      * to figure out which bitmap to check against and then which
2140      * chunk in the bitmap to look for.
2141      */
2142     send_wr.wr_id = qemu_rdma_make_wrid(RDMA_WRID_RDMA_WRITE,
2143                                         current_index, chunk);
2144 
2145     send_wr.opcode = IBV_WR_RDMA_WRITE;
2146     send_wr.send_flags = IBV_SEND_SIGNALED;
2147     send_wr.sg_list = &sge;
2148     send_wr.num_sge = 1;
2149     send_wr.wr.rdma.remote_addr = block->remote_host_addr +
2150                                 (current_addr - block->offset);
2151 
2152     trace_qemu_rdma_write_one_post(chunk, sge.addr, send_wr.wr.rdma.remote_addr,
2153                                    sge.length);
2154 
2155     /*
2156      * ibv_post_send() does not return negative error numbers,
2157      * per the specification they are positive - no idea why.
2158      */
2159     ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
2160 
2161     if (ret == ENOMEM) {
2162         trace_qemu_rdma_write_one_queue_full();
2163         ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
2164         if (ret < 0) {
2165             error_report("rdma migration: failed to make "
2166                          "room in full send queue! %d", ret);
2167             return ret;
2168         }
2169 
2170         goto retry;
2171 
2172     } else if (ret > 0) {
2173         perror("rdma migration: post rdma write failed");
2174         return -ret;
2175     }
2176 
2177     set_bit(chunk, block->transit_bitmap);
2178     acct_update_position(f, sge.length, false);
2179     rdma->total_writes++;
2180 
2181     return 0;
2182 }
2183 
2184 /*
2185  * Push out any unwritten RDMA operations.
2186  *
2187  * We support sending out multiple chunks at the same time.
2188  * Not all of them need to get signaled in the completion queue.
2189  */
2190 static int qemu_rdma_write_flush(QEMUFile *f, RDMAContext *rdma)
2191 {
2192     int ret;
2193 
2194     if (!rdma->current_length) {
2195         return 0;
2196     }
2197 
2198     ret = qemu_rdma_write_one(f, rdma,
2199             rdma->current_index, rdma->current_addr, rdma->current_length);
2200 
2201     if (ret < 0) {
2202         return ret;
2203     }
2204 
2205     if (ret == 0) {
2206         rdma->nb_sent++;
2207         trace_qemu_rdma_write_flush(rdma->nb_sent);
2208     }
2209 
2210     rdma->current_length = 0;
2211     rdma->current_addr = 0;
2212 
2213     return 0;
2214 }
2215 
2216 static inline int qemu_rdma_buffer_mergable(RDMAContext *rdma,
2217                     uint64_t offset, uint64_t len)
2218 {
2219     RDMALocalBlock *block;
2220     uint8_t *host_addr;
2221     uint8_t *chunk_end;
2222 
2223     if (rdma->current_index < 0) {
2224         return 0;
2225     }
2226 
2227     if (rdma->current_chunk < 0) {
2228         return 0;
2229     }
2230 
2231     block = &(rdma->local_ram_blocks.block[rdma->current_index]);
2232     host_addr = block->local_host_addr + (offset - block->offset);
2233     chunk_end = ram_chunk_end(block, rdma->current_chunk);
2234 
2235     if (rdma->current_length == 0) {
2236         return 0;
2237     }
2238 
2239     /*
2240      * Only merge into chunk sequentially.
2241      */
2242     if (offset != (rdma->current_addr + rdma->current_length)) {
2243         return 0;
2244     }
2245 
2246     if (offset < block->offset) {
2247         return 0;
2248     }
2249 
2250     if ((offset + len) > (block->offset + block->length)) {
2251         return 0;
2252     }
2253 
2254     if ((host_addr + len) > chunk_end) {
2255         return 0;
2256     }
2257 
2258     return 1;
2259 }
2260 
2261 /*
2262  * We're not actually writing here, but doing three things:
2263  *
2264  * 1. Identify the chunk the buffer belongs to.
2265  * 2. If the chunk is full or the buffer doesn't belong to the current
2266  *    chunk, then start a new chunk and flush() the old chunk.
2267  * 3. To keep the hardware busy, we also group chunks into batches
2268  *    and only require that a batch gets acknowledged in the completion
2269  *    qeueue instead of each individual chunk.
2270  */
2271 static int qemu_rdma_write(QEMUFile *f, RDMAContext *rdma,
2272                            uint64_t block_offset, uint64_t offset,
2273                            uint64_t len)
2274 {
2275     uint64_t current_addr = block_offset + offset;
2276     uint64_t index = rdma->current_index;
2277     uint64_t chunk = rdma->current_chunk;
2278     int ret;
2279 
2280     /* If we cannot merge it, we flush the current buffer first. */
2281     if (!qemu_rdma_buffer_mergable(rdma, current_addr, len)) {
2282         ret = qemu_rdma_write_flush(f, rdma);
2283         if (ret) {
2284             return ret;
2285         }
2286         rdma->current_length = 0;
2287         rdma->current_addr = current_addr;
2288 
2289         ret = qemu_rdma_search_ram_block(rdma, block_offset,
2290                                          offset, len, &index, &chunk);
2291         if (ret) {
2292             error_report("ram block search failed");
2293             return ret;
2294         }
2295         rdma->current_index = index;
2296         rdma->current_chunk = chunk;
2297     }
2298 
2299     /* merge it */
2300     rdma->current_length += len;
2301 
2302     /* flush it if buffer is too large */
2303     if (rdma->current_length >= RDMA_MERGE_MAX) {
2304         return qemu_rdma_write_flush(f, rdma);
2305     }
2306 
2307     return 0;
2308 }
2309 
2310 static void qemu_rdma_cleanup(RDMAContext *rdma)
2311 {
2312     int idx;
2313 
2314     if (rdma->cm_id && rdma->connected) {
2315         if ((rdma->error_state ||
2316              migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) &&
2317             !rdma->received_error) {
2318             RDMAControlHeader head = { .len = 0,
2319                                        .type = RDMA_CONTROL_ERROR,
2320                                        .repeat = 1,
2321                                      };
2322             error_report("Early error. Sending error.");
2323             qemu_rdma_post_send_control(rdma, NULL, &head);
2324         }
2325 
2326         rdma_disconnect(rdma->cm_id);
2327         trace_qemu_rdma_cleanup_disconnect();
2328         rdma->connected = false;
2329     }
2330 
2331     if (rdma->channel) {
2332         qemu_set_fd_handler(rdma->channel->fd, NULL, NULL, NULL);
2333     }
2334     g_free(rdma->dest_blocks);
2335     rdma->dest_blocks = NULL;
2336 
2337     for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2338         if (rdma->wr_data[idx].control_mr) {
2339             rdma->total_registrations--;
2340             ibv_dereg_mr(rdma->wr_data[idx].control_mr);
2341         }
2342         rdma->wr_data[idx].control_mr = NULL;
2343     }
2344 
2345     if (rdma->local_ram_blocks.block) {
2346         while (rdma->local_ram_blocks.nb_blocks) {
2347             rdma_delete_block(rdma, &rdma->local_ram_blocks.block[0]);
2348         }
2349     }
2350 
2351     if (rdma->qp) {
2352         rdma_destroy_qp(rdma->cm_id);
2353         rdma->qp = NULL;
2354     }
2355     if (rdma->cq) {
2356         ibv_destroy_cq(rdma->cq);
2357         rdma->cq = NULL;
2358     }
2359     if (rdma->comp_channel) {
2360         ibv_destroy_comp_channel(rdma->comp_channel);
2361         rdma->comp_channel = NULL;
2362     }
2363     if (rdma->pd) {
2364         ibv_dealloc_pd(rdma->pd);
2365         rdma->pd = NULL;
2366     }
2367     if (rdma->cm_id) {
2368         rdma_destroy_id(rdma->cm_id);
2369         rdma->cm_id = NULL;
2370     }
2371 
2372     /* the destination side, listen_id and channel is shared */
2373     if (rdma->listen_id) {
2374         if (!rdma->is_return_path) {
2375             rdma_destroy_id(rdma->listen_id);
2376         }
2377         rdma->listen_id = NULL;
2378 
2379         if (rdma->channel) {
2380             if (!rdma->is_return_path) {
2381                 rdma_destroy_event_channel(rdma->channel);
2382             }
2383             rdma->channel = NULL;
2384         }
2385     }
2386 
2387     if (rdma->channel) {
2388         rdma_destroy_event_channel(rdma->channel);
2389         rdma->channel = NULL;
2390     }
2391     g_free(rdma->host);
2392     rdma->host = NULL;
2393 }
2394 
2395 
2396 static int qemu_rdma_source_init(RDMAContext *rdma, bool pin_all, Error **errp)
2397 {
2398     int ret, idx;
2399     Error *local_err = NULL, **temp = &local_err;
2400 
2401     /*
2402      * Will be validated against destination's actual capabilities
2403      * after the connect() completes.
2404      */
2405     rdma->pin_all = pin_all;
2406 
2407     ret = qemu_rdma_resolve_host(rdma, temp);
2408     if (ret) {
2409         goto err_rdma_source_init;
2410     }
2411 
2412     ret = qemu_rdma_alloc_pd_cq(rdma);
2413     if (ret) {
2414         ERROR(temp, "rdma migration: error allocating pd and cq! Your mlock()"
2415                     " limits may be too low. Please check $ ulimit -a # and "
2416                     "search for 'ulimit -l' in the output");
2417         goto err_rdma_source_init;
2418     }
2419 
2420     ret = qemu_rdma_alloc_qp(rdma);
2421     if (ret) {
2422         ERROR(temp, "rdma migration: error allocating qp!");
2423         goto err_rdma_source_init;
2424     }
2425 
2426     ret = qemu_rdma_init_ram_blocks(rdma);
2427     if (ret) {
2428         ERROR(temp, "rdma migration: error initializing ram blocks!");
2429         goto err_rdma_source_init;
2430     }
2431 
2432     /* Build the hash that maps from offset to RAMBlock */
2433     rdma->blockmap = g_hash_table_new(g_direct_hash, g_direct_equal);
2434     for (idx = 0; idx < rdma->local_ram_blocks.nb_blocks; idx++) {
2435         g_hash_table_insert(rdma->blockmap,
2436                 (void *)(uintptr_t)rdma->local_ram_blocks.block[idx].offset,
2437                 &rdma->local_ram_blocks.block[idx]);
2438     }
2439 
2440     for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2441         ret = qemu_rdma_reg_control(rdma, idx);
2442         if (ret) {
2443             ERROR(temp, "rdma migration: error registering %d control!",
2444                                                             idx);
2445             goto err_rdma_source_init;
2446         }
2447     }
2448 
2449     return 0;
2450 
2451 err_rdma_source_init:
2452     error_propagate(errp, local_err);
2453     qemu_rdma_cleanup(rdma);
2454     return -1;
2455 }
2456 
2457 static int qemu_rdma_connect(RDMAContext *rdma, Error **errp)
2458 {
2459     RDMACapabilities cap = {
2460                                 .version = RDMA_CONTROL_VERSION_CURRENT,
2461                                 .flags = 0,
2462                            };
2463     struct rdma_conn_param conn_param = { .initiator_depth = 2,
2464                                           .retry_count = 5,
2465                                           .private_data = &cap,
2466                                           .private_data_len = sizeof(cap),
2467                                         };
2468     struct rdma_cm_event *cm_event;
2469     int ret;
2470 
2471     /*
2472      * Only negotiate the capability with destination if the user
2473      * on the source first requested the capability.
2474      */
2475     if (rdma->pin_all) {
2476         trace_qemu_rdma_connect_pin_all_requested();
2477         cap.flags |= RDMA_CAPABILITY_PIN_ALL;
2478     }
2479 
2480     caps_to_network(&cap);
2481 
2482     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
2483     if (ret) {
2484         ERROR(errp, "posting second control recv");
2485         goto err_rdma_source_connect;
2486     }
2487 
2488     ret = rdma_connect(rdma->cm_id, &conn_param);
2489     if (ret) {
2490         perror("rdma_connect");
2491         ERROR(errp, "connecting to destination!");
2492         goto err_rdma_source_connect;
2493     }
2494 
2495     ret = rdma_get_cm_event(rdma->channel, &cm_event);
2496     if (ret) {
2497         perror("rdma_get_cm_event after rdma_connect");
2498         ERROR(errp, "connecting to destination!");
2499         rdma_ack_cm_event(cm_event);
2500         goto err_rdma_source_connect;
2501     }
2502 
2503     if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
2504         perror("rdma_get_cm_event != EVENT_ESTABLISHED after rdma_connect");
2505         ERROR(errp, "connecting to destination!");
2506         rdma_ack_cm_event(cm_event);
2507         goto err_rdma_source_connect;
2508     }
2509     rdma->connected = true;
2510 
2511     memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
2512     network_to_caps(&cap);
2513 
2514     /*
2515      * Verify that the *requested* capabilities are supported by the destination
2516      * and disable them otherwise.
2517      */
2518     if (rdma->pin_all && !(cap.flags & RDMA_CAPABILITY_PIN_ALL)) {
2519         ERROR(errp, "Server cannot support pinning all memory. "
2520                         "Will register memory dynamically.");
2521         rdma->pin_all = false;
2522     }
2523 
2524     trace_qemu_rdma_connect_pin_all_outcome(rdma->pin_all);
2525 
2526     rdma_ack_cm_event(cm_event);
2527 
2528     rdma->control_ready_expected = 1;
2529     rdma->nb_sent = 0;
2530     return 0;
2531 
2532 err_rdma_source_connect:
2533     qemu_rdma_cleanup(rdma);
2534     return -1;
2535 }
2536 
2537 static int qemu_rdma_dest_init(RDMAContext *rdma, Error **errp)
2538 {
2539     int ret, idx;
2540     struct rdma_cm_id *listen_id;
2541     char ip[40] = "unknown";
2542     struct rdma_addrinfo *res, *e;
2543     char port_str[16];
2544 
2545     for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2546         rdma->wr_data[idx].control_len = 0;
2547         rdma->wr_data[idx].control_curr = NULL;
2548     }
2549 
2550     if (!rdma->host || !rdma->host[0]) {
2551         ERROR(errp, "RDMA host is not set!");
2552         rdma->error_state = -EINVAL;
2553         return -1;
2554     }
2555     /* create CM channel */
2556     rdma->channel = rdma_create_event_channel();
2557     if (!rdma->channel) {
2558         ERROR(errp, "could not create rdma event channel");
2559         rdma->error_state = -EINVAL;
2560         return -1;
2561     }
2562 
2563     /* create CM id */
2564     ret = rdma_create_id(rdma->channel, &listen_id, NULL, RDMA_PS_TCP);
2565     if (ret) {
2566         ERROR(errp, "could not create cm_id!");
2567         goto err_dest_init_create_listen_id;
2568     }
2569 
2570     snprintf(port_str, 16, "%d", rdma->port);
2571     port_str[15] = '\0';
2572 
2573     ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res);
2574     if (ret < 0) {
2575         ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host);
2576         goto err_dest_init_bind_addr;
2577     }
2578 
2579     for (e = res; e != NULL; e = e->ai_next) {
2580         inet_ntop(e->ai_family,
2581             &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip);
2582         trace_qemu_rdma_dest_init_trying(rdma->host, ip);
2583         ret = rdma_bind_addr(listen_id, e->ai_dst_addr);
2584         if (ret) {
2585             continue;
2586         }
2587         if (e->ai_family == AF_INET6) {
2588             ret = qemu_rdma_broken_ipv6_kernel(listen_id->verbs, errp);
2589             if (ret) {
2590                 continue;
2591             }
2592         }
2593         break;
2594     }
2595 
2596     if (!e) {
2597         ERROR(errp, "Error: could not rdma_bind_addr!");
2598         goto err_dest_init_bind_addr;
2599     }
2600 
2601     rdma->listen_id = listen_id;
2602     qemu_rdma_dump_gid("dest_init", listen_id);
2603     return 0;
2604 
2605 err_dest_init_bind_addr:
2606     rdma_destroy_id(listen_id);
2607 err_dest_init_create_listen_id:
2608     rdma_destroy_event_channel(rdma->channel);
2609     rdma->channel = NULL;
2610     rdma->error_state = ret;
2611     return ret;
2612 
2613 }
2614 
2615 static void qemu_rdma_return_path_dest_init(RDMAContext *rdma_return_path,
2616                                             RDMAContext *rdma)
2617 {
2618     int idx;
2619 
2620     for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2621         rdma_return_path->wr_data[idx].control_len = 0;
2622         rdma_return_path->wr_data[idx].control_curr = NULL;
2623     }
2624 
2625     /*the CM channel and CM id is shared*/
2626     rdma_return_path->channel = rdma->channel;
2627     rdma_return_path->listen_id = rdma->listen_id;
2628 
2629     rdma->return_path = rdma_return_path;
2630     rdma_return_path->return_path = rdma;
2631     rdma_return_path->is_return_path = true;
2632 }
2633 
2634 static void *qemu_rdma_data_init(const char *host_port, Error **errp)
2635 {
2636     RDMAContext *rdma = NULL;
2637     InetSocketAddress *addr;
2638 
2639     if (host_port) {
2640         rdma = g_new0(RDMAContext, 1);
2641         rdma->current_index = -1;
2642         rdma->current_chunk = -1;
2643 
2644         addr = g_new(InetSocketAddress, 1);
2645         if (!inet_parse(addr, host_port, NULL)) {
2646             rdma->port = atoi(addr->port);
2647             rdma->host = g_strdup(addr->host);
2648         } else {
2649             ERROR(errp, "bad RDMA migration address '%s'", host_port);
2650             g_free(rdma);
2651             rdma = NULL;
2652         }
2653 
2654         qapi_free_InetSocketAddress(addr);
2655     }
2656 
2657     return rdma;
2658 }
2659 
2660 /*
2661  * QEMUFile interface to the control channel.
2662  * SEND messages for control only.
2663  * VM's ram is handled with regular RDMA messages.
2664  */
2665 static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
2666                                        const struct iovec *iov,
2667                                        size_t niov,
2668                                        int *fds,
2669                                        size_t nfds,
2670                                        Error **errp)
2671 {
2672     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2673     QEMUFile *f = rioc->file;
2674     RDMAContext *rdma;
2675     int ret;
2676     ssize_t done = 0;
2677     size_t i;
2678     size_t len = 0;
2679 
2680     rcu_read_lock();
2681     rdma = atomic_rcu_read(&rioc->rdmaout);
2682 
2683     if (!rdma) {
2684         rcu_read_unlock();
2685         return -EIO;
2686     }
2687 
2688     CHECK_ERROR_STATE();
2689 
2690     /*
2691      * Push out any writes that
2692      * we're queued up for VM's ram.
2693      */
2694     ret = qemu_rdma_write_flush(f, rdma);
2695     if (ret < 0) {
2696         rdma->error_state = ret;
2697         rcu_read_unlock();
2698         return ret;
2699     }
2700 
2701     for (i = 0; i < niov; i++) {
2702         size_t remaining = iov[i].iov_len;
2703         uint8_t * data = (void *)iov[i].iov_base;
2704         while (remaining) {
2705             RDMAControlHeader head;
2706 
2707             len = MIN(remaining, RDMA_SEND_INCREMENT);
2708             remaining -= len;
2709 
2710             head.len = len;
2711             head.type = RDMA_CONTROL_QEMU_FILE;
2712 
2713             ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL);
2714 
2715             if (ret < 0) {
2716                 rdma->error_state = ret;
2717                 rcu_read_unlock();
2718                 return ret;
2719             }
2720 
2721             data += len;
2722             done += len;
2723         }
2724     }
2725 
2726     rcu_read_unlock();
2727     return done;
2728 }
2729 
2730 static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf,
2731                              size_t size, int idx)
2732 {
2733     size_t len = 0;
2734 
2735     if (rdma->wr_data[idx].control_len) {
2736         trace_qemu_rdma_fill(rdma->wr_data[idx].control_len, size);
2737 
2738         len = MIN(size, rdma->wr_data[idx].control_len);
2739         memcpy(buf, rdma->wr_data[idx].control_curr, len);
2740         rdma->wr_data[idx].control_curr += len;
2741         rdma->wr_data[idx].control_len -= len;
2742     }
2743 
2744     return len;
2745 }
2746 
2747 /*
2748  * QEMUFile interface to the control channel.
2749  * RDMA links don't use bytestreams, so we have to
2750  * return bytes to QEMUFile opportunistically.
2751  */
2752 static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
2753                                       const struct iovec *iov,
2754                                       size_t niov,
2755                                       int **fds,
2756                                       size_t *nfds,
2757                                       Error **errp)
2758 {
2759     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2760     RDMAContext *rdma;
2761     RDMAControlHeader head;
2762     int ret = 0;
2763     ssize_t i;
2764     size_t done = 0;
2765 
2766     rcu_read_lock();
2767     rdma = atomic_rcu_read(&rioc->rdmain);
2768 
2769     if (!rdma) {
2770         rcu_read_unlock();
2771         return -EIO;
2772     }
2773 
2774     CHECK_ERROR_STATE();
2775 
2776     for (i = 0; i < niov; i++) {
2777         size_t want = iov[i].iov_len;
2778         uint8_t *data = (void *)iov[i].iov_base;
2779 
2780         /*
2781          * First, we hold on to the last SEND message we
2782          * were given and dish out the bytes until we run
2783          * out of bytes.
2784          */
2785         ret = qemu_rdma_fill(rdma, data, want, 0);
2786         done += ret;
2787         want -= ret;
2788         /* Got what we needed, so go to next iovec */
2789         if (want == 0) {
2790             continue;
2791         }
2792 
2793         /* If we got any data so far, then don't wait
2794          * for more, just return what we have */
2795         if (done > 0) {
2796             break;
2797         }
2798 
2799 
2800         /* We've got nothing at all, so lets wait for
2801          * more to arrive
2802          */
2803         ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE);
2804 
2805         if (ret < 0) {
2806             rdma->error_state = ret;
2807             rcu_read_unlock();
2808             return ret;
2809         }
2810 
2811         /*
2812          * SEND was received with new bytes, now try again.
2813          */
2814         ret = qemu_rdma_fill(rdma, data, want, 0);
2815         done += ret;
2816         want -= ret;
2817 
2818         /* Still didn't get enough, so lets just return */
2819         if (want) {
2820             if (done == 0) {
2821                 rcu_read_unlock();
2822                 return QIO_CHANNEL_ERR_BLOCK;
2823             } else {
2824                 break;
2825             }
2826         }
2827     }
2828     rcu_read_unlock();
2829     return done;
2830 }
2831 
2832 /*
2833  * Block until all the outstanding chunks have been delivered by the hardware.
2834  */
2835 static int qemu_rdma_drain_cq(QEMUFile *f, RDMAContext *rdma)
2836 {
2837     int ret;
2838 
2839     if (qemu_rdma_write_flush(f, rdma) < 0) {
2840         return -EIO;
2841     }
2842 
2843     while (rdma->nb_sent) {
2844         ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
2845         if (ret < 0) {
2846             error_report("rdma migration: complete polling error!");
2847             return -EIO;
2848         }
2849     }
2850 
2851     qemu_rdma_unregister_waiting(rdma);
2852 
2853     return 0;
2854 }
2855 
2856 
2857 static int qio_channel_rdma_set_blocking(QIOChannel *ioc,
2858                                          bool blocking,
2859                                          Error **errp)
2860 {
2861     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2862     /* XXX we should make readv/writev actually honour this :-) */
2863     rioc->blocking = blocking;
2864     return 0;
2865 }
2866 
2867 
2868 typedef struct QIOChannelRDMASource QIOChannelRDMASource;
2869 struct QIOChannelRDMASource {
2870     GSource parent;
2871     QIOChannelRDMA *rioc;
2872     GIOCondition condition;
2873 };
2874 
2875 static gboolean
2876 qio_channel_rdma_source_prepare(GSource *source,
2877                                 gint *timeout)
2878 {
2879     QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
2880     RDMAContext *rdma;
2881     GIOCondition cond = 0;
2882     *timeout = -1;
2883 
2884     rcu_read_lock();
2885     if (rsource->condition == G_IO_IN) {
2886         rdma = atomic_rcu_read(&rsource->rioc->rdmain);
2887     } else {
2888         rdma = atomic_rcu_read(&rsource->rioc->rdmaout);
2889     }
2890 
2891     if (!rdma) {
2892         error_report("RDMAContext is NULL when prepare Gsource");
2893         rcu_read_unlock();
2894         return FALSE;
2895     }
2896 
2897     if (rdma->wr_data[0].control_len) {
2898         cond |= G_IO_IN;
2899     }
2900     cond |= G_IO_OUT;
2901 
2902     rcu_read_unlock();
2903     return cond & rsource->condition;
2904 }
2905 
2906 static gboolean
2907 qio_channel_rdma_source_check(GSource *source)
2908 {
2909     QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
2910     RDMAContext *rdma;
2911     GIOCondition cond = 0;
2912 
2913     rcu_read_lock();
2914     if (rsource->condition == G_IO_IN) {
2915         rdma = atomic_rcu_read(&rsource->rioc->rdmain);
2916     } else {
2917         rdma = atomic_rcu_read(&rsource->rioc->rdmaout);
2918     }
2919 
2920     if (!rdma) {
2921         error_report("RDMAContext is NULL when check Gsource");
2922         rcu_read_unlock();
2923         return FALSE;
2924     }
2925 
2926     if (rdma->wr_data[0].control_len) {
2927         cond |= G_IO_IN;
2928     }
2929     cond |= G_IO_OUT;
2930 
2931     rcu_read_unlock();
2932     return cond & rsource->condition;
2933 }
2934 
2935 static gboolean
2936 qio_channel_rdma_source_dispatch(GSource *source,
2937                                  GSourceFunc callback,
2938                                  gpointer user_data)
2939 {
2940     QIOChannelFunc func = (QIOChannelFunc)callback;
2941     QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
2942     RDMAContext *rdma;
2943     GIOCondition cond = 0;
2944 
2945     rcu_read_lock();
2946     if (rsource->condition == G_IO_IN) {
2947         rdma = atomic_rcu_read(&rsource->rioc->rdmain);
2948     } else {
2949         rdma = atomic_rcu_read(&rsource->rioc->rdmaout);
2950     }
2951 
2952     if (!rdma) {
2953         error_report("RDMAContext is NULL when dispatch Gsource");
2954         rcu_read_unlock();
2955         return FALSE;
2956     }
2957 
2958     if (rdma->wr_data[0].control_len) {
2959         cond |= G_IO_IN;
2960     }
2961     cond |= G_IO_OUT;
2962 
2963     rcu_read_unlock();
2964     return (*func)(QIO_CHANNEL(rsource->rioc),
2965                    (cond & rsource->condition),
2966                    user_data);
2967 }
2968 
2969 static void
2970 qio_channel_rdma_source_finalize(GSource *source)
2971 {
2972     QIOChannelRDMASource *ssource = (QIOChannelRDMASource *)source;
2973 
2974     object_unref(OBJECT(ssource->rioc));
2975 }
2976 
2977 GSourceFuncs qio_channel_rdma_source_funcs = {
2978     qio_channel_rdma_source_prepare,
2979     qio_channel_rdma_source_check,
2980     qio_channel_rdma_source_dispatch,
2981     qio_channel_rdma_source_finalize
2982 };
2983 
2984 static GSource *qio_channel_rdma_create_watch(QIOChannel *ioc,
2985                                               GIOCondition condition)
2986 {
2987     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2988     QIOChannelRDMASource *ssource;
2989     GSource *source;
2990 
2991     source = g_source_new(&qio_channel_rdma_source_funcs,
2992                           sizeof(QIOChannelRDMASource));
2993     ssource = (QIOChannelRDMASource *)source;
2994 
2995     ssource->rioc = rioc;
2996     object_ref(OBJECT(rioc));
2997 
2998     ssource->condition = condition;
2999 
3000     return source;
3001 }
3002 
3003 static void qio_channel_rdma_set_aio_fd_handler(QIOChannel *ioc,
3004                                                   AioContext *ctx,
3005                                                   IOHandler *io_read,
3006                                                   IOHandler *io_write,
3007                                                   void *opaque)
3008 {
3009     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
3010     if (io_read) {
3011         aio_set_fd_handler(ctx, rioc->rdmain->comp_channel->fd,
3012                            false, io_read, io_write, NULL, opaque);
3013     } else {
3014         aio_set_fd_handler(ctx, rioc->rdmaout->comp_channel->fd,
3015                            false, io_read, io_write, NULL, opaque);
3016     }
3017 }
3018 
3019 static int qio_channel_rdma_close(QIOChannel *ioc,
3020                                   Error **errp)
3021 {
3022     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
3023     RDMAContext *rdmain, *rdmaout;
3024     trace_qemu_rdma_close();
3025 
3026     rdmain = rioc->rdmain;
3027     if (rdmain) {
3028         atomic_rcu_set(&rioc->rdmain, NULL);
3029     }
3030 
3031     rdmaout = rioc->rdmaout;
3032     if (rdmaout) {
3033         atomic_rcu_set(&rioc->rdmaout, NULL);
3034     }
3035 
3036     synchronize_rcu();
3037 
3038     if (rdmain) {
3039         qemu_rdma_cleanup(rdmain);
3040     }
3041 
3042     if (rdmaout) {
3043         qemu_rdma_cleanup(rdmaout);
3044     }
3045 
3046     g_free(rdmain);
3047     g_free(rdmaout);
3048 
3049     return 0;
3050 }
3051 
3052 static int
3053 qio_channel_rdma_shutdown(QIOChannel *ioc,
3054                             QIOChannelShutdown how,
3055                             Error **errp)
3056 {
3057     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
3058     RDMAContext *rdmain, *rdmaout;
3059 
3060     rcu_read_lock();
3061 
3062     rdmain = atomic_rcu_read(&rioc->rdmain);
3063     rdmaout = atomic_rcu_read(&rioc->rdmain);
3064 
3065     switch (how) {
3066     case QIO_CHANNEL_SHUTDOWN_READ:
3067         if (rdmain) {
3068             rdmain->error_state = -1;
3069         }
3070         break;
3071     case QIO_CHANNEL_SHUTDOWN_WRITE:
3072         if (rdmaout) {
3073             rdmaout->error_state = -1;
3074         }
3075         break;
3076     case QIO_CHANNEL_SHUTDOWN_BOTH:
3077     default:
3078         if (rdmain) {
3079             rdmain->error_state = -1;
3080         }
3081         if (rdmaout) {
3082             rdmaout->error_state = -1;
3083         }
3084         break;
3085     }
3086 
3087     rcu_read_unlock();
3088     return 0;
3089 }
3090 
3091 /*
3092  * Parameters:
3093  *    @offset == 0 :
3094  *        This means that 'block_offset' is a full virtual address that does not
3095  *        belong to a RAMBlock of the virtual machine and instead
3096  *        represents a private malloc'd memory area that the caller wishes to
3097  *        transfer.
3098  *
3099  *    @offset != 0 :
3100  *        Offset is an offset to be added to block_offset and used
3101  *        to also lookup the corresponding RAMBlock.
3102  *
3103  *    @size > 0 :
3104  *        Initiate an transfer this size.
3105  *
3106  *    @size == 0 :
3107  *        A 'hint' or 'advice' that means that we wish to speculatively
3108  *        and asynchronously unregister this memory. In this case, there is no
3109  *        guarantee that the unregister will actually happen, for example,
3110  *        if the memory is being actively transmitted. Additionally, the memory
3111  *        may be re-registered at any future time if a write within the same
3112  *        chunk was requested again, even if you attempted to unregister it
3113  *        here.
3114  *
3115  *    @size < 0 : TODO, not yet supported
3116  *        Unregister the memory NOW. This means that the caller does not
3117  *        expect there to be any future RDMA transfers and we just want to clean
3118  *        things up. This is used in case the upper layer owns the memory and
3119  *        cannot wait for qemu_fclose() to occur.
3120  *
3121  *    @bytes_sent : User-specificed pointer to indicate how many bytes were
3122  *                  sent. Usually, this will not be more than a few bytes of
3123  *                  the protocol because most transfers are sent asynchronously.
3124  */
3125 static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque,
3126                                   ram_addr_t block_offset, ram_addr_t offset,
3127                                   size_t size, uint64_t *bytes_sent)
3128 {
3129     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
3130     RDMAContext *rdma;
3131     int ret;
3132 
3133     rcu_read_lock();
3134     rdma = atomic_rcu_read(&rioc->rdmaout);
3135 
3136     if (!rdma) {
3137         rcu_read_unlock();
3138         return -EIO;
3139     }
3140 
3141     CHECK_ERROR_STATE();
3142 
3143     if (migrate_get_current()->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) {
3144         rcu_read_unlock();
3145         return RAM_SAVE_CONTROL_NOT_SUPP;
3146     }
3147 
3148     qemu_fflush(f);
3149 
3150     if (size > 0) {
3151         /*
3152          * Add this page to the current 'chunk'. If the chunk
3153          * is full, or the page doen't belong to the current chunk,
3154          * an actual RDMA write will occur and a new chunk will be formed.
3155          */
3156         ret = qemu_rdma_write(f, rdma, block_offset, offset, size);
3157         if (ret < 0) {
3158             error_report("rdma migration: write error! %d", ret);
3159             goto err;
3160         }
3161 
3162         /*
3163          * We always return 1 bytes because the RDMA
3164          * protocol is completely asynchronous. We do not yet know
3165          * whether an  identified chunk is zero or not because we're
3166          * waiting for other pages to potentially be merged with
3167          * the current chunk. So, we have to call qemu_update_position()
3168          * later on when the actual write occurs.
3169          */
3170         if (bytes_sent) {
3171             *bytes_sent = 1;
3172         }
3173     } else {
3174         uint64_t index, chunk;
3175 
3176         /* TODO: Change QEMUFileOps prototype to be signed: size_t => long
3177         if (size < 0) {
3178             ret = qemu_rdma_drain_cq(f, rdma);
3179             if (ret < 0) {
3180                 fprintf(stderr, "rdma: failed to synchronously drain"
3181                                 " completion queue before unregistration.\n");
3182                 goto err;
3183             }
3184         }
3185         */
3186 
3187         ret = qemu_rdma_search_ram_block(rdma, block_offset,
3188                                          offset, size, &index, &chunk);
3189 
3190         if (ret) {
3191             error_report("ram block search failed");
3192             goto err;
3193         }
3194 
3195         qemu_rdma_signal_unregister(rdma, index, chunk, 0);
3196 
3197         /*
3198          * TODO: Synchronous, guaranteed unregistration (should not occur during
3199          * fast-path). Otherwise, unregisters will process on the next call to
3200          * qemu_rdma_drain_cq()
3201         if (size < 0) {
3202             qemu_rdma_unregister_waiting(rdma);
3203         }
3204         */
3205     }
3206 
3207     /*
3208      * Drain the Completion Queue if possible, but do not block,
3209      * just poll.
3210      *
3211      * If nothing to poll, the end of the iteration will do this
3212      * again to make sure we don't overflow the request queue.
3213      */
3214     while (1) {
3215         uint64_t wr_id, wr_id_in;
3216         int ret = qemu_rdma_poll(rdma, &wr_id_in, NULL);
3217         if (ret < 0) {
3218             error_report("rdma migration: polling error! %d", ret);
3219             goto err;
3220         }
3221 
3222         wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
3223 
3224         if (wr_id == RDMA_WRID_NONE) {
3225             break;
3226         }
3227     }
3228 
3229     rcu_read_unlock();
3230     return RAM_SAVE_CONTROL_DELAYED;
3231 err:
3232     rdma->error_state = ret;
3233     rcu_read_unlock();
3234     return ret;
3235 }
3236 
3237 static void rdma_accept_incoming_migration(void *opaque);
3238 
3239 static void rdma_cm_poll_handler(void *opaque)
3240 {
3241     RDMAContext *rdma = opaque;
3242     int ret;
3243     struct rdma_cm_event *cm_event;
3244     MigrationIncomingState *mis = migration_incoming_get_current();
3245 
3246     ret = rdma_get_cm_event(rdma->channel, &cm_event);
3247     if (ret) {
3248         error_report("get_cm_event failed %d", errno);
3249         return;
3250     }
3251     rdma_ack_cm_event(cm_event);
3252 
3253     if (cm_event->event == RDMA_CM_EVENT_DISCONNECTED ||
3254         cm_event->event == RDMA_CM_EVENT_DEVICE_REMOVAL) {
3255         error_report("receive cm event, cm event is %d", cm_event->event);
3256         rdma->error_state = -EPIPE;
3257         if (rdma->return_path) {
3258             rdma->return_path->error_state = -EPIPE;
3259         }
3260 
3261         if (mis->migration_incoming_co) {
3262             qemu_coroutine_enter(mis->migration_incoming_co);
3263         }
3264         return;
3265     }
3266 }
3267 
3268 static int qemu_rdma_accept(RDMAContext *rdma)
3269 {
3270     RDMACapabilities cap;
3271     struct rdma_conn_param conn_param = {
3272                                             .responder_resources = 2,
3273                                             .private_data = &cap,
3274                                             .private_data_len = sizeof(cap),
3275                                          };
3276     struct rdma_cm_event *cm_event;
3277     struct ibv_context *verbs;
3278     int ret = -EINVAL;
3279     int idx;
3280 
3281     ret = rdma_get_cm_event(rdma->channel, &cm_event);
3282     if (ret) {
3283         goto err_rdma_dest_wait;
3284     }
3285 
3286     if (cm_event->event != RDMA_CM_EVENT_CONNECT_REQUEST) {
3287         rdma_ack_cm_event(cm_event);
3288         goto err_rdma_dest_wait;
3289     }
3290 
3291     memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
3292 
3293     network_to_caps(&cap);
3294 
3295     if (cap.version < 1 || cap.version > RDMA_CONTROL_VERSION_CURRENT) {
3296             error_report("Unknown source RDMA version: %d, bailing...",
3297                             cap.version);
3298             rdma_ack_cm_event(cm_event);
3299             goto err_rdma_dest_wait;
3300     }
3301 
3302     /*
3303      * Respond with only the capabilities this version of QEMU knows about.
3304      */
3305     cap.flags &= known_capabilities;
3306 
3307     /*
3308      * Enable the ones that we do know about.
3309      * Add other checks here as new ones are introduced.
3310      */
3311     if (cap.flags & RDMA_CAPABILITY_PIN_ALL) {
3312         rdma->pin_all = true;
3313     }
3314 
3315     rdma->cm_id = cm_event->id;
3316     verbs = cm_event->id->verbs;
3317 
3318     rdma_ack_cm_event(cm_event);
3319 
3320     trace_qemu_rdma_accept_pin_state(rdma->pin_all);
3321 
3322     caps_to_network(&cap);
3323 
3324     trace_qemu_rdma_accept_pin_verbsc(verbs);
3325 
3326     if (!rdma->verbs) {
3327         rdma->verbs = verbs;
3328     } else if (rdma->verbs != verbs) {
3329             error_report("ibv context not matching %p, %p!", rdma->verbs,
3330                          verbs);
3331             goto err_rdma_dest_wait;
3332     }
3333 
3334     qemu_rdma_dump_id("dest_init", verbs);
3335 
3336     ret = qemu_rdma_alloc_pd_cq(rdma);
3337     if (ret) {
3338         error_report("rdma migration: error allocating pd and cq!");
3339         goto err_rdma_dest_wait;
3340     }
3341 
3342     ret = qemu_rdma_alloc_qp(rdma);
3343     if (ret) {
3344         error_report("rdma migration: error allocating qp!");
3345         goto err_rdma_dest_wait;
3346     }
3347 
3348     ret = qemu_rdma_init_ram_blocks(rdma);
3349     if (ret) {
3350         error_report("rdma migration: error initializing ram blocks!");
3351         goto err_rdma_dest_wait;
3352     }
3353 
3354     for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
3355         ret = qemu_rdma_reg_control(rdma, idx);
3356         if (ret) {
3357             error_report("rdma: error registering %d control", idx);
3358             goto err_rdma_dest_wait;
3359         }
3360     }
3361 
3362     /* Accept the second connection request for return path */
3363     if (migrate_postcopy() && !rdma->is_return_path) {
3364         qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
3365                             NULL,
3366                             (void *)(intptr_t)rdma->return_path);
3367     } else {
3368         qemu_set_fd_handler(rdma->channel->fd, rdma_cm_poll_handler,
3369                             NULL, rdma);
3370     }
3371 
3372     ret = rdma_accept(rdma->cm_id, &conn_param);
3373     if (ret) {
3374         error_report("rdma_accept returns %d", ret);
3375         goto err_rdma_dest_wait;
3376     }
3377 
3378     ret = rdma_get_cm_event(rdma->channel, &cm_event);
3379     if (ret) {
3380         error_report("rdma_accept get_cm_event failed %d", ret);
3381         goto err_rdma_dest_wait;
3382     }
3383 
3384     if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
3385         error_report("rdma_accept not event established");
3386         rdma_ack_cm_event(cm_event);
3387         goto err_rdma_dest_wait;
3388     }
3389 
3390     rdma_ack_cm_event(cm_event);
3391     rdma->connected = true;
3392 
3393     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
3394     if (ret) {
3395         error_report("rdma migration: error posting second control recv");
3396         goto err_rdma_dest_wait;
3397     }
3398 
3399     qemu_rdma_dump_gid("dest_connect", rdma->cm_id);
3400 
3401     return 0;
3402 
3403 err_rdma_dest_wait:
3404     rdma->error_state = ret;
3405     qemu_rdma_cleanup(rdma);
3406     return ret;
3407 }
3408 
3409 static int dest_ram_sort_func(const void *a, const void *b)
3410 {
3411     unsigned int a_index = ((const RDMALocalBlock *)a)->src_index;
3412     unsigned int b_index = ((const RDMALocalBlock *)b)->src_index;
3413 
3414     return (a_index < b_index) ? -1 : (a_index != b_index);
3415 }
3416 
3417 /*
3418  * During each iteration of the migration, we listen for instructions
3419  * by the source VM to perform dynamic page registrations before they
3420  * can perform RDMA operations.
3421  *
3422  * We respond with the 'rkey'.
3423  *
3424  * Keep doing this until the source tells us to stop.
3425  */
3426 static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque)
3427 {
3428     RDMAControlHeader reg_resp = { .len = sizeof(RDMARegisterResult),
3429                                .type = RDMA_CONTROL_REGISTER_RESULT,
3430                                .repeat = 0,
3431                              };
3432     RDMAControlHeader unreg_resp = { .len = 0,
3433                                .type = RDMA_CONTROL_UNREGISTER_FINISHED,
3434                                .repeat = 0,
3435                              };
3436     RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT,
3437                                  .repeat = 1 };
3438     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
3439     RDMAContext *rdma;
3440     RDMALocalBlocks *local;
3441     RDMAControlHeader head;
3442     RDMARegister *reg, *registers;
3443     RDMACompress *comp;
3444     RDMARegisterResult *reg_result;
3445     static RDMARegisterResult results[RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE];
3446     RDMALocalBlock *block;
3447     void *host_addr;
3448     int ret = 0;
3449     int idx = 0;
3450     int count = 0;
3451     int i = 0;
3452 
3453     rcu_read_lock();
3454     rdma = atomic_rcu_read(&rioc->rdmain);
3455 
3456     if (!rdma) {
3457         rcu_read_unlock();
3458         return -EIO;
3459     }
3460 
3461     CHECK_ERROR_STATE();
3462 
3463     local = &rdma->local_ram_blocks;
3464     do {
3465         trace_qemu_rdma_registration_handle_wait();
3466 
3467         ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_NONE);
3468 
3469         if (ret < 0) {
3470             break;
3471         }
3472 
3473         if (head.repeat > RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE) {
3474             error_report("rdma: Too many requests in this message (%d)."
3475                             "Bailing.", head.repeat);
3476             ret = -EIO;
3477             break;
3478         }
3479 
3480         switch (head.type) {
3481         case RDMA_CONTROL_COMPRESS:
3482             comp = (RDMACompress *) rdma->wr_data[idx].control_curr;
3483             network_to_compress(comp);
3484 
3485             trace_qemu_rdma_registration_handle_compress(comp->length,
3486                                                          comp->block_idx,
3487                                                          comp->offset);
3488             if (comp->block_idx >= rdma->local_ram_blocks.nb_blocks) {
3489                 error_report("rdma: 'compress' bad block index %u (vs %d)",
3490                              (unsigned int)comp->block_idx,
3491                              rdma->local_ram_blocks.nb_blocks);
3492                 ret = -EIO;
3493                 goto out;
3494             }
3495             block = &(rdma->local_ram_blocks.block[comp->block_idx]);
3496 
3497             host_addr = block->local_host_addr +
3498                             (comp->offset - block->offset);
3499 
3500             ram_handle_compressed(host_addr, comp->value, comp->length);
3501             break;
3502 
3503         case RDMA_CONTROL_REGISTER_FINISHED:
3504             trace_qemu_rdma_registration_handle_finished();
3505             goto out;
3506 
3507         case RDMA_CONTROL_RAM_BLOCKS_REQUEST:
3508             trace_qemu_rdma_registration_handle_ram_blocks();
3509 
3510             /* Sort our local RAM Block list so it's the same as the source,
3511              * we can do this since we've filled in a src_index in the list
3512              * as we received the RAMBlock list earlier.
3513              */
3514             qsort(rdma->local_ram_blocks.block,
3515                   rdma->local_ram_blocks.nb_blocks,
3516                   sizeof(RDMALocalBlock), dest_ram_sort_func);
3517             for (i = 0; i < local->nb_blocks; i++) {
3518                 local->block[i].index = i;
3519             }
3520 
3521             if (rdma->pin_all) {
3522                 ret = qemu_rdma_reg_whole_ram_blocks(rdma);
3523                 if (ret) {
3524                     error_report("rdma migration: error dest "
3525                                     "registering ram blocks");
3526                     goto out;
3527                 }
3528             }
3529 
3530             /*
3531              * Dest uses this to prepare to transmit the RAMBlock descriptions
3532              * to the source VM after connection setup.
3533              * Both sides use the "remote" structure to communicate and update
3534              * their "local" descriptions with what was sent.
3535              */
3536             for (i = 0; i < local->nb_blocks; i++) {
3537                 rdma->dest_blocks[i].remote_host_addr =
3538                     (uintptr_t)(local->block[i].local_host_addr);
3539 
3540                 if (rdma->pin_all) {
3541                     rdma->dest_blocks[i].remote_rkey = local->block[i].mr->rkey;
3542                 }
3543 
3544                 rdma->dest_blocks[i].offset = local->block[i].offset;
3545                 rdma->dest_blocks[i].length = local->block[i].length;
3546 
3547                 dest_block_to_network(&rdma->dest_blocks[i]);
3548                 trace_qemu_rdma_registration_handle_ram_blocks_loop(
3549                     local->block[i].block_name,
3550                     local->block[i].offset,
3551                     local->block[i].length,
3552                     local->block[i].local_host_addr,
3553                     local->block[i].src_index);
3554             }
3555 
3556             blocks.len = rdma->local_ram_blocks.nb_blocks
3557                                                 * sizeof(RDMADestBlock);
3558 
3559 
3560             ret = qemu_rdma_post_send_control(rdma,
3561                                         (uint8_t *) rdma->dest_blocks, &blocks);
3562 
3563             if (ret < 0) {
3564                 error_report("rdma migration: error sending remote info");
3565                 goto out;
3566             }
3567 
3568             break;
3569         case RDMA_CONTROL_REGISTER_REQUEST:
3570             trace_qemu_rdma_registration_handle_register(head.repeat);
3571 
3572             reg_resp.repeat = head.repeat;
3573             registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
3574 
3575             for (count = 0; count < head.repeat; count++) {
3576                 uint64_t chunk;
3577                 uint8_t *chunk_start, *chunk_end;
3578 
3579                 reg = &registers[count];
3580                 network_to_register(reg);
3581 
3582                 reg_result = &results[count];
3583 
3584                 trace_qemu_rdma_registration_handle_register_loop(count,
3585                          reg->current_index, reg->key.current_addr, reg->chunks);
3586 
3587                 if (reg->current_index >= rdma->local_ram_blocks.nb_blocks) {
3588                     error_report("rdma: 'register' bad block index %u (vs %d)",
3589                                  (unsigned int)reg->current_index,
3590                                  rdma->local_ram_blocks.nb_blocks);
3591                     ret = -ENOENT;
3592                     goto out;
3593                 }
3594                 block = &(rdma->local_ram_blocks.block[reg->current_index]);
3595                 if (block->is_ram_block) {
3596                     if (block->offset > reg->key.current_addr) {
3597                         error_report("rdma: bad register address for block %s"
3598                             " offset: %" PRIx64 " current_addr: %" PRIx64,
3599                             block->block_name, block->offset,
3600                             reg->key.current_addr);
3601                         ret = -ERANGE;
3602                         goto out;
3603                     }
3604                     host_addr = (block->local_host_addr +
3605                                 (reg->key.current_addr - block->offset));
3606                     chunk = ram_chunk_index(block->local_host_addr,
3607                                             (uint8_t *) host_addr);
3608                 } else {
3609                     chunk = reg->key.chunk;
3610                     host_addr = block->local_host_addr +
3611                         (reg->key.chunk * (1UL << RDMA_REG_CHUNK_SHIFT));
3612                     /* Check for particularly bad chunk value */
3613                     if (host_addr < (void *)block->local_host_addr) {
3614                         error_report("rdma: bad chunk for block %s"
3615                             " chunk: %" PRIx64,
3616                             block->block_name, reg->key.chunk);
3617                         ret = -ERANGE;
3618                         goto out;
3619                     }
3620                 }
3621                 chunk_start = ram_chunk_start(block, chunk);
3622                 chunk_end = ram_chunk_end(block, chunk + reg->chunks);
3623                 /* avoid "-Waddress-of-packed-member" warning */
3624                 uint32_t tmp_rkey = 0;
3625                 if (qemu_rdma_register_and_get_keys(rdma, block,
3626                             (uintptr_t)host_addr, NULL, &tmp_rkey,
3627                             chunk, chunk_start, chunk_end)) {
3628                     error_report("cannot get rkey");
3629                     ret = -EINVAL;
3630                     goto out;
3631                 }
3632                 reg_result->rkey = tmp_rkey;
3633 
3634                 reg_result->host_addr = (uintptr_t)block->local_host_addr;
3635 
3636                 trace_qemu_rdma_registration_handle_register_rkey(
3637                                                            reg_result->rkey);
3638 
3639                 result_to_network(reg_result);
3640             }
3641 
3642             ret = qemu_rdma_post_send_control(rdma,
3643                             (uint8_t *) results, &reg_resp);
3644 
3645             if (ret < 0) {
3646                 error_report("Failed to send control buffer");
3647                 goto out;
3648             }
3649             break;
3650         case RDMA_CONTROL_UNREGISTER_REQUEST:
3651             trace_qemu_rdma_registration_handle_unregister(head.repeat);
3652             unreg_resp.repeat = head.repeat;
3653             registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
3654 
3655             for (count = 0; count < head.repeat; count++) {
3656                 reg = &registers[count];
3657                 network_to_register(reg);
3658 
3659                 trace_qemu_rdma_registration_handle_unregister_loop(count,
3660                            reg->current_index, reg->key.chunk);
3661 
3662                 block = &(rdma->local_ram_blocks.block[reg->current_index]);
3663 
3664                 ret = ibv_dereg_mr(block->pmr[reg->key.chunk]);
3665                 block->pmr[reg->key.chunk] = NULL;
3666 
3667                 if (ret != 0) {
3668                     perror("rdma unregistration chunk failed");
3669                     ret = -ret;
3670                     goto out;
3671                 }
3672 
3673                 rdma->total_registrations--;
3674 
3675                 trace_qemu_rdma_registration_handle_unregister_success(
3676                                                        reg->key.chunk);
3677             }
3678 
3679             ret = qemu_rdma_post_send_control(rdma, NULL, &unreg_resp);
3680 
3681             if (ret < 0) {
3682                 error_report("Failed to send control buffer");
3683                 goto out;
3684             }
3685             break;
3686         case RDMA_CONTROL_REGISTER_RESULT:
3687             error_report("Invalid RESULT message at dest.");
3688             ret = -EIO;
3689             goto out;
3690         default:
3691             error_report("Unknown control message %s", control_desc(head.type));
3692             ret = -EIO;
3693             goto out;
3694         }
3695     } while (1);
3696 out:
3697     if (ret < 0) {
3698         rdma->error_state = ret;
3699     }
3700     rcu_read_unlock();
3701     return ret;
3702 }
3703 
3704 /* Destination:
3705  * Called via a ram_control_load_hook during the initial RAM load section which
3706  * lists the RAMBlocks by name.  This lets us know the order of the RAMBlocks
3707  * on the source.
3708  * We've already built our local RAMBlock list, but not yet sent the list to
3709  * the source.
3710  */
3711 static int
3712 rdma_block_notification_handle(QIOChannelRDMA *rioc, const char *name)
3713 {
3714     RDMAContext *rdma;
3715     int curr;
3716     int found = -1;
3717 
3718     rcu_read_lock();
3719     rdma = atomic_rcu_read(&rioc->rdmain);
3720 
3721     if (!rdma) {
3722         rcu_read_unlock();
3723         return -EIO;
3724     }
3725 
3726     /* Find the matching RAMBlock in our local list */
3727     for (curr = 0; curr < rdma->local_ram_blocks.nb_blocks; curr++) {
3728         if (!strcmp(rdma->local_ram_blocks.block[curr].block_name, name)) {
3729             found = curr;
3730             break;
3731         }
3732     }
3733 
3734     if (found == -1) {
3735         error_report("RAMBlock '%s' not found on destination", name);
3736         rcu_read_unlock();
3737         return -ENOENT;
3738     }
3739 
3740     rdma->local_ram_blocks.block[curr].src_index = rdma->next_src_index;
3741     trace_rdma_block_notification_handle(name, rdma->next_src_index);
3742     rdma->next_src_index++;
3743 
3744     rcu_read_unlock();
3745     return 0;
3746 }
3747 
3748 static int rdma_load_hook(QEMUFile *f, void *opaque, uint64_t flags, void *data)
3749 {
3750     switch (flags) {
3751     case RAM_CONTROL_BLOCK_REG:
3752         return rdma_block_notification_handle(opaque, data);
3753 
3754     case RAM_CONTROL_HOOK:
3755         return qemu_rdma_registration_handle(f, opaque);
3756 
3757     default:
3758         /* Shouldn't be called with any other values */
3759         abort();
3760     }
3761 }
3762 
3763 static int qemu_rdma_registration_start(QEMUFile *f, void *opaque,
3764                                         uint64_t flags, void *data)
3765 {
3766     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
3767     RDMAContext *rdma;
3768 
3769     rcu_read_lock();
3770     rdma = atomic_rcu_read(&rioc->rdmaout);
3771     if (!rdma) {
3772         rcu_read_unlock();
3773         return -EIO;
3774     }
3775 
3776     CHECK_ERROR_STATE();
3777 
3778     if (migrate_get_current()->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) {
3779         rcu_read_unlock();
3780         return 0;
3781     }
3782 
3783     trace_qemu_rdma_registration_start(flags);
3784     qemu_put_be64(f, RAM_SAVE_FLAG_HOOK);
3785     qemu_fflush(f);
3786 
3787     rcu_read_unlock();
3788     return 0;
3789 }
3790 
3791 /*
3792  * Inform dest that dynamic registrations are done for now.
3793  * First, flush writes, if any.
3794  */
3795 static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
3796                                        uint64_t flags, void *data)
3797 {
3798     Error *local_err = NULL, **errp = &local_err;
3799     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
3800     RDMAContext *rdma;
3801     RDMAControlHeader head = { .len = 0, .repeat = 1 };
3802     int ret = 0;
3803 
3804     rcu_read_lock();
3805     rdma = atomic_rcu_read(&rioc->rdmaout);
3806     if (!rdma) {
3807         rcu_read_unlock();
3808         return -EIO;
3809     }
3810 
3811     CHECK_ERROR_STATE();
3812 
3813     if (migrate_get_current()->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) {
3814         rcu_read_unlock();
3815         return 0;
3816     }
3817 
3818     qemu_fflush(f);
3819     ret = qemu_rdma_drain_cq(f, rdma);
3820 
3821     if (ret < 0) {
3822         goto err;
3823     }
3824 
3825     if (flags == RAM_CONTROL_SETUP) {
3826         RDMAControlHeader resp = {.type = RDMA_CONTROL_RAM_BLOCKS_RESULT };
3827         RDMALocalBlocks *local = &rdma->local_ram_blocks;
3828         int reg_result_idx, i, nb_dest_blocks;
3829 
3830         head.type = RDMA_CONTROL_RAM_BLOCKS_REQUEST;
3831         trace_qemu_rdma_registration_stop_ram();
3832 
3833         /*
3834          * Make sure that we parallelize the pinning on both sides.
3835          * For very large guests, doing this serially takes a really
3836          * long time, so we have to 'interleave' the pinning locally
3837          * with the control messages by performing the pinning on this
3838          * side before we receive the control response from the other
3839          * side that the pinning has completed.
3840          */
3841         ret = qemu_rdma_exchange_send(rdma, &head, NULL, &resp,
3842                     &reg_result_idx, rdma->pin_all ?
3843                     qemu_rdma_reg_whole_ram_blocks : NULL);
3844         if (ret < 0) {
3845             ERROR(errp, "receiving remote info!");
3846             rcu_read_unlock();
3847             return ret;
3848         }
3849 
3850         nb_dest_blocks = resp.len / sizeof(RDMADestBlock);
3851 
3852         /*
3853          * The protocol uses two different sets of rkeys (mutually exclusive):
3854          * 1. One key to represent the virtual address of the entire ram block.
3855          *    (dynamic chunk registration disabled - pin everything with one rkey.)
3856          * 2. One to represent individual chunks within a ram block.
3857          *    (dynamic chunk registration enabled - pin individual chunks.)
3858          *
3859          * Once the capability is successfully negotiated, the destination transmits
3860          * the keys to use (or sends them later) including the virtual addresses
3861          * and then propagates the remote ram block descriptions to his local copy.
3862          */
3863 
3864         if (local->nb_blocks != nb_dest_blocks) {
3865             ERROR(errp, "ram blocks mismatch (Number of blocks %d vs %d) "
3866                         "Your QEMU command line parameters are probably "
3867                         "not identical on both the source and destination.",
3868                         local->nb_blocks, nb_dest_blocks);
3869             rdma->error_state = -EINVAL;
3870             rcu_read_unlock();
3871             return -EINVAL;
3872         }
3873 
3874         qemu_rdma_move_header(rdma, reg_result_idx, &resp);
3875         memcpy(rdma->dest_blocks,
3876             rdma->wr_data[reg_result_idx].control_curr, resp.len);
3877         for (i = 0; i < nb_dest_blocks; i++) {
3878             network_to_dest_block(&rdma->dest_blocks[i]);
3879 
3880             /* We require that the blocks are in the same order */
3881             if (rdma->dest_blocks[i].length != local->block[i].length) {
3882                 ERROR(errp, "Block %s/%d has a different length %" PRIu64
3883                             "vs %" PRIu64, local->block[i].block_name, i,
3884                             local->block[i].length,
3885                             rdma->dest_blocks[i].length);
3886                 rdma->error_state = -EINVAL;
3887                 rcu_read_unlock();
3888                 return -EINVAL;
3889             }
3890             local->block[i].remote_host_addr =
3891                     rdma->dest_blocks[i].remote_host_addr;
3892             local->block[i].remote_rkey = rdma->dest_blocks[i].remote_rkey;
3893         }
3894     }
3895 
3896     trace_qemu_rdma_registration_stop(flags);
3897 
3898     head.type = RDMA_CONTROL_REGISTER_FINISHED;
3899     ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL);
3900 
3901     if (ret < 0) {
3902         goto err;
3903     }
3904 
3905     rcu_read_unlock();
3906     return 0;
3907 err:
3908     rdma->error_state = ret;
3909     rcu_read_unlock();
3910     return ret;
3911 }
3912 
3913 static const QEMUFileHooks rdma_read_hooks = {
3914     .hook_ram_load = rdma_load_hook,
3915 };
3916 
3917 static const QEMUFileHooks rdma_write_hooks = {
3918     .before_ram_iterate = qemu_rdma_registration_start,
3919     .after_ram_iterate  = qemu_rdma_registration_stop,
3920     .save_page          = qemu_rdma_save_page,
3921 };
3922 
3923 
3924 static void qio_channel_rdma_finalize(Object *obj)
3925 {
3926     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(obj);
3927     if (rioc->rdmain) {
3928         qemu_rdma_cleanup(rioc->rdmain);
3929         g_free(rioc->rdmain);
3930         rioc->rdmain = NULL;
3931     }
3932     if (rioc->rdmaout) {
3933         qemu_rdma_cleanup(rioc->rdmaout);
3934         g_free(rioc->rdmaout);
3935         rioc->rdmaout = NULL;
3936     }
3937 }
3938 
3939 static void qio_channel_rdma_class_init(ObjectClass *klass,
3940                                         void *class_data G_GNUC_UNUSED)
3941 {
3942     QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass);
3943 
3944     ioc_klass->io_writev = qio_channel_rdma_writev;
3945     ioc_klass->io_readv = qio_channel_rdma_readv;
3946     ioc_klass->io_set_blocking = qio_channel_rdma_set_blocking;
3947     ioc_klass->io_close = qio_channel_rdma_close;
3948     ioc_klass->io_create_watch = qio_channel_rdma_create_watch;
3949     ioc_klass->io_set_aio_fd_handler = qio_channel_rdma_set_aio_fd_handler;
3950     ioc_klass->io_shutdown = qio_channel_rdma_shutdown;
3951 }
3952 
3953 static const TypeInfo qio_channel_rdma_info = {
3954     .parent = TYPE_QIO_CHANNEL,
3955     .name = TYPE_QIO_CHANNEL_RDMA,
3956     .instance_size = sizeof(QIOChannelRDMA),
3957     .instance_finalize = qio_channel_rdma_finalize,
3958     .class_init = qio_channel_rdma_class_init,
3959 };
3960 
3961 static void qio_channel_rdma_register_types(void)
3962 {
3963     type_register_static(&qio_channel_rdma_info);
3964 }
3965 
3966 type_init(qio_channel_rdma_register_types);
3967 
3968 static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
3969 {
3970     QIOChannelRDMA *rioc;
3971 
3972     if (qemu_file_mode_is_not_valid(mode)) {
3973         return NULL;
3974     }
3975 
3976     rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
3977 
3978     if (mode[0] == 'w') {
3979         rioc->file = qemu_fopen_channel_output(QIO_CHANNEL(rioc));
3980         rioc->rdmaout = rdma;
3981         rioc->rdmain = rdma->return_path;
3982         qemu_file_set_hooks(rioc->file, &rdma_write_hooks);
3983     } else {
3984         rioc->file = qemu_fopen_channel_input(QIO_CHANNEL(rioc));
3985         rioc->rdmain = rdma;
3986         rioc->rdmaout = rdma->return_path;
3987         qemu_file_set_hooks(rioc->file, &rdma_read_hooks);
3988     }
3989 
3990     return rioc->file;
3991 }
3992 
3993 static void rdma_accept_incoming_migration(void *opaque)
3994 {
3995     RDMAContext *rdma = opaque;
3996     int ret;
3997     QEMUFile *f;
3998     Error *local_err = NULL, **errp = &local_err;
3999 
4000     trace_qemu_rdma_accept_incoming_migration();
4001     ret = qemu_rdma_accept(rdma);
4002 
4003     if (ret) {
4004         ERROR(errp, "RDMA Migration initialization failed!");
4005         return;
4006     }
4007 
4008     trace_qemu_rdma_accept_incoming_migration_accepted();
4009 
4010     if (rdma->is_return_path) {
4011         return;
4012     }
4013 
4014     f = qemu_fopen_rdma(rdma, "rb");
4015     if (f == NULL) {
4016         ERROR(errp, "could not qemu_fopen_rdma!");
4017         qemu_rdma_cleanup(rdma);
4018         return;
4019     }
4020 
4021     rdma->migration_started_on_destination = 1;
4022     migration_fd_process_incoming(f);
4023 }
4024 
4025 void rdma_start_incoming_migration(const char *host_port, Error **errp)
4026 {
4027     int ret;
4028     RDMAContext *rdma, *rdma_return_path = NULL;
4029     Error *local_err = NULL;
4030 
4031     trace_rdma_start_incoming_migration();
4032     rdma = qemu_rdma_data_init(host_port, &local_err);
4033 
4034     if (rdma == NULL) {
4035         goto err;
4036     }
4037 
4038     ret = qemu_rdma_dest_init(rdma, &local_err);
4039 
4040     if (ret) {
4041         goto err;
4042     }
4043 
4044     trace_rdma_start_incoming_migration_after_dest_init();
4045 
4046     ret = rdma_listen(rdma->listen_id, 5);
4047 
4048     if (ret) {
4049         ERROR(errp, "listening on socket!");
4050         goto err;
4051     }
4052 
4053     trace_rdma_start_incoming_migration_after_rdma_listen();
4054 
4055     /* initialize the RDMAContext for return path */
4056     if (migrate_postcopy()) {
4057         rdma_return_path = qemu_rdma_data_init(host_port, &local_err);
4058 
4059         if (rdma_return_path == NULL) {
4060             goto err;
4061         }
4062 
4063         qemu_rdma_return_path_dest_init(rdma_return_path, rdma);
4064     }
4065 
4066     qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
4067                         NULL, (void *)(intptr_t)rdma);
4068     return;
4069 err:
4070     error_propagate(errp, local_err);
4071     g_free(rdma);
4072     g_free(rdma_return_path);
4073 }
4074 
4075 void rdma_start_outgoing_migration(void *opaque,
4076                             const char *host_port, Error **errp)
4077 {
4078     MigrationState *s = opaque;
4079     RDMAContext *rdma = qemu_rdma_data_init(host_port, errp);
4080     RDMAContext *rdma_return_path = NULL;
4081     int ret = 0;
4082 
4083     if (rdma == NULL) {
4084         goto err;
4085     }
4086 
4087     ret = qemu_rdma_source_init(rdma,
4088         s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL], errp);
4089 
4090     if (ret) {
4091         goto err;
4092     }
4093 
4094     trace_rdma_start_outgoing_migration_after_rdma_source_init();
4095     ret = qemu_rdma_connect(rdma, errp);
4096 
4097     if (ret) {
4098         goto err;
4099     }
4100 
4101     /* RDMA postcopy need a seprate queue pair for return path */
4102     if (migrate_postcopy()) {
4103         rdma_return_path = qemu_rdma_data_init(host_port, errp);
4104 
4105         if (rdma_return_path == NULL) {
4106             goto err;
4107         }
4108 
4109         ret = qemu_rdma_source_init(rdma_return_path,
4110             s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL], errp);
4111 
4112         if (ret) {
4113             goto err;
4114         }
4115 
4116         ret = qemu_rdma_connect(rdma_return_path, errp);
4117 
4118         if (ret) {
4119             goto err;
4120         }
4121 
4122         rdma->return_path = rdma_return_path;
4123         rdma_return_path->return_path = rdma;
4124         rdma_return_path->is_return_path = true;
4125     }
4126 
4127     trace_rdma_start_outgoing_migration_after_rdma_connect();
4128 
4129     s->to_dst_file = qemu_fopen_rdma(rdma, "wb");
4130     migrate_fd_connect(s, NULL);
4131     return;
4132 err:
4133     g_free(rdma);
4134     g_free(rdma_return_path);
4135 }
4136