xref: /qemu/migration/rdma.c (revision aef04fc7)
1 /*
2  * RDMA protocol and interfaces
3  *
4  * Copyright IBM, Corp. 2010-2013
5  * Copyright Red Hat, Inc. 2015-2016
6  *
7  * Authors:
8  *  Michael R. Hines <mrhines@us.ibm.com>
9  *  Jiuxing Liu <jl@us.ibm.com>
10  *  Daniel P. Berrange <berrange@redhat.com>
11  *
12  * This work is licensed under the terms of the GNU GPL, version 2 or
13  * later.  See the COPYING file in the top-level directory.
14  *
15  */
16 
17 #include "qemu/osdep.h"
18 #include "qapi/error.h"
19 #include "qemu/cutils.h"
20 #include "rdma.h"
21 #include "migration.h"
22 #include "qemu-file.h"
23 #include "ram.h"
24 #include "qemu/error-report.h"
25 #include "qemu/main-loop.h"
26 #include "qemu/module.h"
27 #include "qemu/rcu.h"
28 #include "qemu/sockets.h"
29 #include "qemu/bitmap.h"
30 #include "qemu/coroutine.h"
31 #include "exec/memory.h"
32 #include <sys/socket.h>
33 #include <netdb.h>
34 #include <arpa/inet.h>
35 #include <rdma/rdma_cma.h>
36 #include "trace.h"
37 #include "qom/object.h"
38 #include "options.h"
39 #include <poll.h>
40 
41 /*
42  * Print and error on both the Monitor and the Log file.
43  */
44 #define ERROR(errp, fmt, ...) \
45     do { \
46         fprintf(stderr, "RDMA ERROR: " fmt "\n", ## __VA_ARGS__); \
47         if (errp && (*(errp) == NULL)) { \
48             error_setg(errp, "RDMA ERROR: " fmt, ## __VA_ARGS__); \
49         } \
50     } while (0)
51 
52 #define RDMA_RESOLVE_TIMEOUT_MS 10000
53 
54 /* Do not merge data if larger than this. */
55 #define RDMA_MERGE_MAX (2 * 1024 * 1024)
56 #define RDMA_SIGNALED_SEND_MAX (RDMA_MERGE_MAX / 4096)
57 
58 #define RDMA_REG_CHUNK_SHIFT 20 /* 1 MB */
59 
60 /*
61  * This is only for non-live state being migrated.
62  * Instead of RDMA_WRITE messages, we use RDMA_SEND
63  * messages for that state, which requires a different
64  * delivery design than main memory.
65  */
66 #define RDMA_SEND_INCREMENT 32768
67 
68 /*
69  * Maximum size infiniband SEND message
70  */
71 #define RDMA_CONTROL_MAX_BUFFER (512 * 1024)
72 #define RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE 4096
73 
74 #define RDMA_CONTROL_VERSION_CURRENT 1
75 /*
76  * Capabilities for negotiation.
77  */
78 #define RDMA_CAPABILITY_PIN_ALL 0x01
79 
80 /*
81  * Add the other flags above to this list of known capabilities
82  * as they are introduced.
83  */
84 static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL;
85 
86 #define CHECK_ERROR_STATE() \
87     do { \
88         if (rdma->error_state) { \
89             if (!rdma->error_reported) { \
90                 error_report("RDMA is in an error state waiting migration" \
91                                 " to abort!"); \
92                 rdma->error_reported = 1; \
93             } \
94             return rdma->error_state; \
95         } \
96     } while (0)
97 
98 /*
99  * A work request ID is 64-bits and we split up these bits
100  * into 3 parts:
101  *
102  * bits 0-15 : type of control message, 2^16
103  * bits 16-29: ram block index, 2^14
104  * bits 30-63: ram block chunk number, 2^34
105  *
106  * The last two bit ranges are only used for RDMA writes,
107  * in order to track their completion and potentially
108  * also track unregistration status of the message.
109  */
110 #define RDMA_WRID_TYPE_SHIFT  0UL
111 #define RDMA_WRID_BLOCK_SHIFT 16UL
112 #define RDMA_WRID_CHUNK_SHIFT 30UL
113 
114 #define RDMA_WRID_TYPE_MASK \
115     ((1UL << RDMA_WRID_BLOCK_SHIFT) - 1UL)
116 
117 #define RDMA_WRID_BLOCK_MASK \
118     (~RDMA_WRID_TYPE_MASK & ((1UL << RDMA_WRID_CHUNK_SHIFT) - 1UL))
119 
120 #define RDMA_WRID_CHUNK_MASK (~RDMA_WRID_BLOCK_MASK & ~RDMA_WRID_TYPE_MASK)
121 
122 /*
123  * RDMA migration protocol:
124  * 1. RDMA Writes (data messages, i.e. RAM)
125  * 2. IB Send/Recv (control channel messages)
126  */
127 enum {
128     RDMA_WRID_NONE = 0,
129     RDMA_WRID_RDMA_WRITE = 1,
130     RDMA_WRID_SEND_CONTROL = 2000,
131     RDMA_WRID_RECV_CONTROL = 4000,
132 };
133 
134 static const char *wrid_desc[] = {
135     [RDMA_WRID_NONE] = "NONE",
136     [RDMA_WRID_RDMA_WRITE] = "WRITE RDMA",
137     [RDMA_WRID_SEND_CONTROL] = "CONTROL SEND",
138     [RDMA_WRID_RECV_CONTROL] = "CONTROL RECV",
139 };
140 
141 /*
142  * Work request IDs for IB SEND messages only (not RDMA writes).
143  * This is used by the migration protocol to transmit
144  * control messages (such as device state and registration commands)
145  *
146  * We could use more WRs, but we have enough for now.
147  */
148 enum {
149     RDMA_WRID_READY = 0,
150     RDMA_WRID_DATA,
151     RDMA_WRID_CONTROL,
152     RDMA_WRID_MAX,
153 };
154 
155 /*
156  * SEND/RECV IB Control Messages.
157  */
158 enum {
159     RDMA_CONTROL_NONE = 0,
160     RDMA_CONTROL_ERROR,
161     RDMA_CONTROL_READY,               /* ready to receive */
162     RDMA_CONTROL_QEMU_FILE,           /* QEMUFile-transmitted bytes */
163     RDMA_CONTROL_RAM_BLOCKS_REQUEST,  /* RAMBlock synchronization */
164     RDMA_CONTROL_RAM_BLOCKS_RESULT,   /* RAMBlock synchronization */
165     RDMA_CONTROL_COMPRESS,            /* page contains repeat values */
166     RDMA_CONTROL_REGISTER_REQUEST,    /* dynamic page registration */
167     RDMA_CONTROL_REGISTER_RESULT,     /* key to use after registration */
168     RDMA_CONTROL_REGISTER_FINISHED,   /* current iteration finished */
169     RDMA_CONTROL_UNREGISTER_REQUEST,  /* dynamic UN-registration */
170     RDMA_CONTROL_UNREGISTER_FINISHED, /* unpinning finished */
171 };
172 
173 
174 /*
175  * Memory and MR structures used to represent an IB Send/Recv work request.
176  * This is *not* used for RDMA writes, only IB Send/Recv.
177  */
178 typedef struct {
179     uint8_t  control[RDMA_CONTROL_MAX_BUFFER]; /* actual buffer to register */
180     struct   ibv_mr *control_mr;               /* registration metadata */
181     size_t   control_len;                      /* length of the message */
182     uint8_t *control_curr;                     /* start of unconsumed bytes */
183 } RDMAWorkRequestData;
184 
185 /*
186  * Negotiate RDMA capabilities during connection-setup time.
187  */
188 typedef struct {
189     uint32_t version;
190     uint32_t flags;
191 } RDMACapabilities;
192 
193 static void caps_to_network(RDMACapabilities *cap)
194 {
195     cap->version = htonl(cap->version);
196     cap->flags = htonl(cap->flags);
197 }
198 
199 static void network_to_caps(RDMACapabilities *cap)
200 {
201     cap->version = ntohl(cap->version);
202     cap->flags = ntohl(cap->flags);
203 }
204 
205 /*
206  * Representation of a RAMBlock from an RDMA perspective.
207  * This is not transmitted, only local.
208  * This and subsequent structures cannot be linked lists
209  * because we're using a single IB message to transmit
210  * the information. It's small anyway, so a list is overkill.
211  */
212 typedef struct RDMALocalBlock {
213     char          *block_name;
214     uint8_t       *local_host_addr; /* local virtual address */
215     uint64_t       remote_host_addr; /* remote virtual address */
216     uint64_t       offset;
217     uint64_t       length;
218     struct         ibv_mr **pmr;    /* MRs for chunk-level registration */
219     struct         ibv_mr *mr;      /* MR for non-chunk-level registration */
220     uint32_t      *remote_keys;     /* rkeys for chunk-level registration */
221     uint32_t       remote_rkey;     /* rkeys for non-chunk-level registration */
222     int            index;           /* which block are we */
223     unsigned int   src_index;       /* (Only used on dest) */
224     bool           is_ram_block;
225     int            nb_chunks;
226     unsigned long *transit_bitmap;
227     unsigned long *unregister_bitmap;
228 } RDMALocalBlock;
229 
230 /*
231  * Also represents a RAMblock, but only on the dest.
232  * This gets transmitted by the dest during connection-time
233  * to the source VM and then is used to populate the
234  * corresponding RDMALocalBlock with
235  * the information needed to perform the actual RDMA.
236  */
237 typedef struct QEMU_PACKED RDMADestBlock {
238     uint64_t remote_host_addr;
239     uint64_t offset;
240     uint64_t length;
241     uint32_t remote_rkey;
242     uint32_t padding;
243 } RDMADestBlock;
244 
245 static const char *control_desc(unsigned int rdma_control)
246 {
247     static const char *strs[] = {
248         [RDMA_CONTROL_NONE] = "NONE",
249         [RDMA_CONTROL_ERROR] = "ERROR",
250         [RDMA_CONTROL_READY] = "READY",
251         [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE",
252         [RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST",
253         [RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT",
254         [RDMA_CONTROL_COMPRESS] = "COMPRESS",
255         [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST",
256         [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT",
257         [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED",
258         [RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST",
259         [RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED",
260     };
261 
262     if (rdma_control > RDMA_CONTROL_UNREGISTER_FINISHED) {
263         return "??BAD CONTROL VALUE??";
264     }
265 
266     return strs[rdma_control];
267 }
268 
269 static uint64_t htonll(uint64_t v)
270 {
271     union { uint32_t lv[2]; uint64_t llv; } u;
272     u.lv[0] = htonl(v >> 32);
273     u.lv[1] = htonl(v & 0xFFFFFFFFULL);
274     return u.llv;
275 }
276 
277 static uint64_t ntohll(uint64_t v)
278 {
279     union { uint32_t lv[2]; uint64_t llv; } u;
280     u.llv = v;
281     return ((uint64_t)ntohl(u.lv[0]) << 32) | (uint64_t) ntohl(u.lv[1]);
282 }
283 
284 static void dest_block_to_network(RDMADestBlock *db)
285 {
286     db->remote_host_addr = htonll(db->remote_host_addr);
287     db->offset = htonll(db->offset);
288     db->length = htonll(db->length);
289     db->remote_rkey = htonl(db->remote_rkey);
290 }
291 
292 static void network_to_dest_block(RDMADestBlock *db)
293 {
294     db->remote_host_addr = ntohll(db->remote_host_addr);
295     db->offset = ntohll(db->offset);
296     db->length = ntohll(db->length);
297     db->remote_rkey = ntohl(db->remote_rkey);
298 }
299 
300 /*
301  * Virtual address of the above structures used for transmitting
302  * the RAMBlock descriptions at connection-time.
303  * This structure is *not* transmitted.
304  */
305 typedef struct RDMALocalBlocks {
306     int nb_blocks;
307     bool     init;             /* main memory init complete */
308     RDMALocalBlock *block;
309 } RDMALocalBlocks;
310 
311 /*
312  * Main data structure for RDMA state.
313  * While there is only one copy of this structure being allocated right now,
314  * this is the place where one would start if you wanted to consider
315  * having more than one RDMA connection open at the same time.
316  */
317 typedef struct RDMAContext {
318     char *host;
319     int port;
320     char *host_port;
321 
322     RDMAWorkRequestData wr_data[RDMA_WRID_MAX];
323 
324     /*
325      * This is used by *_exchange_send() to figure out whether or not
326      * the initial "READY" message has already been received or not.
327      * This is because other functions may potentially poll() and detect
328      * the READY message before send() does, in which case we need to
329      * know if it completed.
330      */
331     int control_ready_expected;
332 
333     /* number of outstanding writes */
334     int nb_sent;
335 
336     /* store info about current buffer so that we can
337        merge it with future sends */
338     uint64_t current_addr;
339     uint64_t current_length;
340     /* index of ram block the current buffer belongs to */
341     int current_index;
342     /* index of the chunk in the current ram block */
343     int current_chunk;
344 
345     bool pin_all;
346 
347     /*
348      * infiniband-specific variables for opening the device
349      * and maintaining connection state and so forth.
350      *
351      * cm_id also has ibv_context, rdma_event_channel, and ibv_qp in
352      * cm_id->verbs, cm_id->channel, and cm_id->qp.
353      */
354     struct rdma_cm_id *cm_id;               /* connection manager ID */
355     struct rdma_cm_id *listen_id;
356     bool connected;
357 
358     struct ibv_context          *verbs;
359     struct rdma_event_channel   *channel;
360     struct ibv_qp *qp;                      /* queue pair */
361     struct ibv_comp_channel *recv_comp_channel;  /* recv completion channel */
362     struct ibv_comp_channel *send_comp_channel;  /* send completion channel */
363     struct ibv_pd *pd;                      /* protection domain */
364     struct ibv_cq *recv_cq;                 /* recvieve completion queue */
365     struct ibv_cq *send_cq;                 /* send completion queue */
366 
367     /*
368      * If a previous write failed (perhaps because of a failed
369      * memory registration, then do not attempt any future work
370      * and remember the error state.
371      */
372     int error_state;
373     int error_reported;
374     int received_error;
375 
376     /*
377      * Description of ram blocks used throughout the code.
378      */
379     RDMALocalBlocks local_ram_blocks;
380     RDMADestBlock  *dest_blocks;
381 
382     /* Index of the next RAMBlock received during block registration */
383     unsigned int    next_src_index;
384 
385     /*
386      * Migration on *destination* started.
387      * Then use coroutine yield function.
388      * Source runs in a thread, so we don't care.
389      */
390     int migration_started_on_destination;
391 
392     int total_registrations;
393     int total_writes;
394 
395     int unregister_current, unregister_next;
396     uint64_t unregistrations[RDMA_SIGNALED_SEND_MAX];
397 
398     GHashTable *blockmap;
399 
400     /* the RDMAContext for return path */
401     struct RDMAContext *return_path;
402     bool is_return_path;
403 } RDMAContext;
404 
405 #define TYPE_QIO_CHANNEL_RDMA "qio-channel-rdma"
406 OBJECT_DECLARE_SIMPLE_TYPE(QIOChannelRDMA, QIO_CHANNEL_RDMA)
407 
408 
409 
410 struct QIOChannelRDMA {
411     QIOChannel parent;
412     RDMAContext *rdmain;
413     RDMAContext *rdmaout;
414     QEMUFile *file;
415     bool blocking; /* XXX we don't actually honour this yet */
416 };
417 
418 /*
419  * Main structure for IB Send/Recv control messages.
420  * This gets prepended at the beginning of every Send/Recv.
421  */
422 typedef struct QEMU_PACKED {
423     uint32_t len;     /* Total length of data portion */
424     uint32_t type;    /* which control command to perform */
425     uint32_t repeat;  /* number of commands in data portion of same type */
426     uint32_t padding;
427 } RDMAControlHeader;
428 
429 static void control_to_network(RDMAControlHeader *control)
430 {
431     control->type = htonl(control->type);
432     control->len = htonl(control->len);
433     control->repeat = htonl(control->repeat);
434 }
435 
436 static void network_to_control(RDMAControlHeader *control)
437 {
438     control->type = ntohl(control->type);
439     control->len = ntohl(control->len);
440     control->repeat = ntohl(control->repeat);
441 }
442 
443 /*
444  * Register a single Chunk.
445  * Information sent by the source VM to inform the dest
446  * to register an single chunk of memory before we can perform
447  * the actual RDMA operation.
448  */
449 typedef struct QEMU_PACKED {
450     union QEMU_PACKED {
451         uint64_t current_addr;  /* offset into the ram_addr_t space */
452         uint64_t chunk;         /* chunk to lookup if unregistering */
453     } key;
454     uint32_t current_index; /* which ramblock the chunk belongs to */
455     uint32_t padding;
456     uint64_t chunks;            /* how many sequential chunks to register */
457 } RDMARegister;
458 
459 static void register_to_network(RDMAContext *rdma, RDMARegister *reg)
460 {
461     RDMALocalBlock *local_block;
462     local_block  = &rdma->local_ram_blocks.block[reg->current_index];
463 
464     if (local_block->is_ram_block) {
465         /*
466          * current_addr as passed in is an address in the local ram_addr_t
467          * space, we need to translate this for the destination
468          */
469         reg->key.current_addr -= local_block->offset;
470         reg->key.current_addr += rdma->dest_blocks[reg->current_index].offset;
471     }
472     reg->key.current_addr = htonll(reg->key.current_addr);
473     reg->current_index = htonl(reg->current_index);
474     reg->chunks = htonll(reg->chunks);
475 }
476 
477 static void network_to_register(RDMARegister *reg)
478 {
479     reg->key.current_addr = ntohll(reg->key.current_addr);
480     reg->current_index = ntohl(reg->current_index);
481     reg->chunks = ntohll(reg->chunks);
482 }
483 
484 typedef struct QEMU_PACKED {
485     uint32_t value;     /* if zero, we will madvise() */
486     uint32_t block_idx; /* which ram block index */
487     uint64_t offset;    /* Address in remote ram_addr_t space */
488     uint64_t length;    /* length of the chunk */
489 } RDMACompress;
490 
491 static void compress_to_network(RDMAContext *rdma, RDMACompress *comp)
492 {
493     comp->value = htonl(comp->value);
494     /*
495      * comp->offset as passed in is an address in the local ram_addr_t
496      * space, we need to translate this for the destination
497      */
498     comp->offset -= rdma->local_ram_blocks.block[comp->block_idx].offset;
499     comp->offset += rdma->dest_blocks[comp->block_idx].offset;
500     comp->block_idx = htonl(comp->block_idx);
501     comp->offset = htonll(comp->offset);
502     comp->length = htonll(comp->length);
503 }
504 
505 static void network_to_compress(RDMACompress *comp)
506 {
507     comp->value = ntohl(comp->value);
508     comp->block_idx = ntohl(comp->block_idx);
509     comp->offset = ntohll(comp->offset);
510     comp->length = ntohll(comp->length);
511 }
512 
513 /*
514  * The result of the dest's memory registration produces an "rkey"
515  * which the source VM must reference in order to perform
516  * the RDMA operation.
517  */
518 typedef struct QEMU_PACKED {
519     uint32_t rkey;
520     uint32_t padding;
521     uint64_t host_addr;
522 } RDMARegisterResult;
523 
524 static void result_to_network(RDMARegisterResult *result)
525 {
526     result->rkey = htonl(result->rkey);
527     result->host_addr = htonll(result->host_addr);
528 };
529 
530 static void network_to_result(RDMARegisterResult *result)
531 {
532     result->rkey = ntohl(result->rkey);
533     result->host_addr = ntohll(result->host_addr);
534 };
535 
536 const char *print_wrid(int wrid);
537 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
538                                    uint8_t *data, RDMAControlHeader *resp,
539                                    int *resp_idx,
540                                    int (*callback)(RDMAContext *rdma));
541 
542 static inline uint64_t ram_chunk_index(const uint8_t *start,
543                                        const uint8_t *host)
544 {
545     return ((uintptr_t) host - (uintptr_t) start) >> RDMA_REG_CHUNK_SHIFT;
546 }
547 
548 static inline uint8_t *ram_chunk_start(const RDMALocalBlock *rdma_ram_block,
549                                        uint64_t i)
550 {
551     return (uint8_t *)(uintptr_t)(rdma_ram_block->local_host_addr +
552                                   (i << RDMA_REG_CHUNK_SHIFT));
553 }
554 
555 static inline uint8_t *ram_chunk_end(const RDMALocalBlock *rdma_ram_block,
556                                      uint64_t i)
557 {
558     uint8_t *result = ram_chunk_start(rdma_ram_block, i) +
559                                          (1UL << RDMA_REG_CHUNK_SHIFT);
560 
561     if (result > (rdma_ram_block->local_host_addr + rdma_ram_block->length)) {
562         result = rdma_ram_block->local_host_addr + rdma_ram_block->length;
563     }
564 
565     return result;
566 }
567 
568 static int rdma_add_block(RDMAContext *rdma, const char *block_name,
569                          void *host_addr,
570                          ram_addr_t block_offset, uint64_t length)
571 {
572     RDMALocalBlocks *local = &rdma->local_ram_blocks;
573     RDMALocalBlock *block;
574     RDMALocalBlock *old = local->block;
575 
576     local->block = g_new0(RDMALocalBlock, local->nb_blocks + 1);
577 
578     if (local->nb_blocks) {
579         int x;
580 
581         if (rdma->blockmap) {
582             for (x = 0; x < local->nb_blocks; x++) {
583                 g_hash_table_remove(rdma->blockmap,
584                                     (void *)(uintptr_t)old[x].offset);
585                 g_hash_table_insert(rdma->blockmap,
586                                     (void *)(uintptr_t)old[x].offset,
587                                     &local->block[x]);
588             }
589         }
590         memcpy(local->block, old, sizeof(RDMALocalBlock) * local->nb_blocks);
591         g_free(old);
592     }
593 
594     block = &local->block[local->nb_blocks];
595 
596     block->block_name = g_strdup(block_name);
597     block->local_host_addr = host_addr;
598     block->offset = block_offset;
599     block->length = length;
600     block->index = local->nb_blocks;
601     block->src_index = ~0U; /* Filled in by the receipt of the block list */
602     block->nb_chunks = ram_chunk_index(host_addr, host_addr + length) + 1UL;
603     block->transit_bitmap = bitmap_new(block->nb_chunks);
604     bitmap_clear(block->transit_bitmap, 0, block->nb_chunks);
605     block->unregister_bitmap = bitmap_new(block->nb_chunks);
606     bitmap_clear(block->unregister_bitmap, 0, block->nb_chunks);
607     block->remote_keys = g_new0(uint32_t, block->nb_chunks);
608 
609     block->is_ram_block = local->init ? false : true;
610 
611     if (rdma->blockmap) {
612         g_hash_table_insert(rdma->blockmap, (void *)(uintptr_t)block_offset, block);
613     }
614 
615     trace_rdma_add_block(block_name, local->nb_blocks,
616                          (uintptr_t) block->local_host_addr,
617                          block->offset, block->length,
618                          (uintptr_t) (block->local_host_addr + block->length),
619                          BITS_TO_LONGS(block->nb_chunks) *
620                              sizeof(unsigned long) * 8,
621                          block->nb_chunks);
622 
623     local->nb_blocks++;
624 
625     return 0;
626 }
627 
628 /*
629  * Memory regions need to be registered with the device and queue pairs setup
630  * in advanced before the migration starts. This tells us where the RAM blocks
631  * are so that we can register them individually.
632  */
633 static int qemu_rdma_init_one_block(RAMBlock *rb, void *opaque)
634 {
635     const char *block_name = qemu_ram_get_idstr(rb);
636     void *host_addr = qemu_ram_get_host_addr(rb);
637     ram_addr_t block_offset = qemu_ram_get_offset(rb);
638     ram_addr_t length = qemu_ram_get_used_length(rb);
639     return rdma_add_block(opaque, block_name, host_addr, block_offset, length);
640 }
641 
642 /*
643  * Identify the RAMBlocks and their quantity. They will be references to
644  * identify chunk boundaries inside each RAMBlock and also be referenced
645  * during dynamic page registration.
646  */
647 static int qemu_rdma_init_ram_blocks(RDMAContext *rdma)
648 {
649     RDMALocalBlocks *local = &rdma->local_ram_blocks;
650     int ret;
651 
652     assert(rdma->blockmap == NULL);
653     memset(local, 0, sizeof *local);
654     ret = foreach_not_ignored_block(qemu_rdma_init_one_block, rdma);
655     if (ret) {
656         return ret;
657     }
658     trace_qemu_rdma_init_ram_blocks(local->nb_blocks);
659     rdma->dest_blocks = g_new0(RDMADestBlock,
660                                rdma->local_ram_blocks.nb_blocks);
661     local->init = true;
662     return 0;
663 }
664 
665 /*
666  * Note: If used outside of cleanup, the caller must ensure that the destination
667  * block structures are also updated
668  */
669 static int rdma_delete_block(RDMAContext *rdma, RDMALocalBlock *block)
670 {
671     RDMALocalBlocks *local = &rdma->local_ram_blocks;
672     RDMALocalBlock *old = local->block;
673     int x;
674 
675     if (rdma->blockmap) {
676         g_hash_table_remove(rdma->blockmap, (void *)(uintptr_t)block->offset);
677     }
678     if (block->pmr) {
679         int j;
680 
681         for (j = 0; j < block->nb_chunks; j++) {
682             if (!block->pmr[j]) {
683                 continue;
684             }
685             ibv_dereg_mr(block->pmr[j]);
686             rdma->total_registrations--;
687         }
688         g_free(block->pmr);
689         block->pmr = NULL;
690     }
691 
692     if (block->mr) {
693         ibv_dereg_mr(block->mr);
694         rdma->total_registrations--;
695         block->mr = NULL;
696     }
697 
698     g_free(block->transit_bitmap);
699     block->transit_bitmap = NULL;
700 
701     g_free(block->unregister_bitmap);
702     block->unregister_bitmap = NULL;
703 
704     g_free(block->remote_keys);
705     block->remote_keys = NULL;
706 
707     g_free(block->block_name);
708     block->block_name = NULL;
709 
710     if (rdma->blockmap) {
711         for (x = 0; x < local->nb_blocks; x++) {
712             g_hash_table_remove(rdma->blockmap,
713                                 (void *)(uintptr_t)old[x].offset);
714         }
715     }
716 
717     if (local->nb_blocks > 1) {
718 
719         local->block = g_new0(RDMALocalBlock, local->nb_blocks - 1);
720 
721         if (block->index) {
722             memcpy(local->block, old, sizeof(RDMALocalBlock) * block->index);
723         }
724 
725         if (block->index < (local->nb_blocks - 1)) {
726             memcpy(local->block + block->index, old + (block->index + 1),
727                 sizeof(RDMALocalBlock) *
728                     (local->nb_blocks - (block->index + 1)));
729             for (x = block->index; x < local->nb_blocks - 1; x++) {
730                 local->block[x].index--;
731             }
732         }
733     } else {
734         assert(block == local->block);
735         local->block = NULL;
736     }
737 
738     trace_rdma_delete_block(block, (uintptr_t)block->local_host_addr,
739                            block->offset, block->length,
740                             (uintptr_t)(block->local_host_addr + block->length),
741                            BITS_TO_LONGS(block->nb_chunks) *
742                                sizeof(unsigned long) * 8, block->nb_chunks);
743 
744     g_free(old);
745 
746     local->nb_blocks--;
747 
748     if (local->nb_blocks && rdma->blockmap) {
749         for (x = 0; x < local->nb_blocks; x++) {
750             g_hash_table_insert(rdma->blockmap,
751                                 (void *)(uintptr_t)local->block[x].offset,
752                                 &local->block[x]);
753         }
754     }
755 
756     return 0;
757 }
758 
759 /*
760  * Put in the log file which RDMA device was opened and the details
761  * associated with that device.
762  */
763 static void qemu_rdma_dump_id(const char *who, struct ibv_context *verbs)
764 {
765     struct ibv_port_attr port;
766 
767     if (ibv_query_port(verbs, 1, &port)) {
768         error_report("Failed to query port information");
769         return;
770     }
771 
772     printf("%s RDMA Device opened: kernel name %s "
773            "uverbs device name %s, "
774            "infiniband_verbs class device path %s, "
775            "infiniband class device path %s, "
776            "transport: (%d) %s\n",
777                 who,
778                 verbs->device->name,
779                 verbs->device->dev_name,
780                 verbs->device->dev_path,
781                 verbs->device->ibdev_path,
782                 port.link_layer,
783                 (port.link_layer == IBV_LINK_LAYER_INFINIBAND) ? "Infiniband" :
784                  ((port.link_layer == IBV_LINK_LAYER_ETHERNET)
785                     ? "Ethernet" : "Unknown"));
786 }
787 
788 /*
789  * Put in the log file the RDMA gid addressing information,
790  * useful for folks who have trouble understanding the
791  * RDMA device hierarchy in the kernel.
792  */
793 static void qemu_rdma_dump_gid(const char *who, struct rdma_cm_id *id)
794 {
795     char sgid[33];
796     char dgid[33];
797     inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.sgid, sgid, sizeof sgid);
798     inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.dgid, dgid, sizeof dgid);
799     trace_qemu_rdma_dump_gid(who, sgid, dgid);
800 }
801 
802 /*
803  * As of now, IPv6 over RoCE / iWARP is not supported by linux.
804  * We will try the next addrinfo struct, and fail if there are
805  * no other valid addresses to bind against.
806  *
807  * If user is listening on '[::]', then we will not have a opened a device
808  * yet and have no way of verifying if the device is RoCE or not.
809  *
810  * In this case, the source VM will throw an error for ALL types of
811  * connections (both IPv4 and IPv6) if the destination machine does not have
812  * a regular infiniband network available for use.
813  *
814  * The only way to guarantee that an error is thrown for broken kernels is
815  * for the management software to choose a *specific* interface at bind time
816  * and validate what time of hardware it is.
817  *
818  * Unfortunately, this puts the user in a fix:
819  *
820  *  If the source VM connects with an IPv4 address without knowing that the
821  *  destination has bound to '[::]' the migration will unconditionally fail
822  *  unless the management software is explicitly listening on the IPv4
823  *  address while using a RoCE-based device.
824  *
825  *  If the source VM connects with an IPv6 address, then we're OK because we can
826  *  throw an error on the source (and similarly on the destination).
827  *
828  *  But in mixed environments, this will be broken for a while until it is fixed
829  *  inside linux.
830  *
831  * We do provide a *tiny* bit of help in this function: We can list all of the
832  * devices in the system and check to see if all the devices are RoCE or
833  * Infiniband.
834  *
835  * If we detect that we have a *pure* RoCE environment, then we can safely
836  * thrown an error even if the management software has specified '[::]' as the
837  * bind address.
838  *
839  * However, if there is are multiple hetergeneous devices, then we cannot make
840  * this assumption and the user just has to be sure they know what they are
841  * doing.
842  *
843  * Patches are being reviewed on linux-rdma.
844  */
845 static int qemu_rdma_broken_ipv6_kernel(struct ibv_context *verbs, Error **errp)
846 {
847     /* This bug only exists in linux, to our knowledge. */
848 #ifdef CONFIG_LINUX
849     struct ibv_port_attr port_attr;
850 
851     /*
852      * Verbs are only NULL if management has bound to '[::]'.
853      *
854      * Let's iterate through all the devices and see if there any pure IB
855      * devices (non-ethernet).
856      *
857      * If not, then we can safely proceed with the migration.
858      * Otherwise, there are no guarantees until the bug is fixed in linux.
859      */
860     if (!verbs) {
861         int num_devices, x;
862         struct ibv_device **dev_list = ibv_get_device_list(&num_devices);
863         bool roce_found = false;
864         bool ib_found = false;
865 
866         for (x = 0; x < num_devices; x++) {
867             verbs = ibv_open_device(dev_list[x]);
868             if (!verbs) {
869                 if (errno == EPERM) {
870                     continue;
871                 } else {
872                     return -EINVAL;
873                 }
874             }
875 
876             if (ibv_query_port(verbs, 1, &port_attr)) {
877                 ibv_close_device(verbs);
878                 ERROR(errp, "Could not query initial IB port");
879                 return -EINVAL;
880             }
881 
882             if (port_attr.link_layer == IBV_LINK_LAYER_INFINIBAND) {
883                 ib_found = true;
884             } else if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) {
885                 roce_found = true;
886             }
887 
888             ibv_close_device(verbs);
889 
890         }
891 
892         if (roce_found) {
893             if (ib_found) {
894                 fprintf(stderr, "WARN: migrations may fail:"
895                                 " IPv6 over RoCE / iWARP in linux"
896                                 " is broken. But since you appear to have a"
897                                 " mixed RoCE / IB environment, be sure to only"
898                                 " migrate over the IB fabric until the kernel "
899                                 " fixes the bug.\n");
900             } else {
901                 ERROR(errp, "You only have RoCE / iWARP devices in your systems"
902                             " and your management software has specified '[::]'"
903                             ", but IPv6 over RoCE / iWARP is not supported in Linux.");
904                 return -ENONET;
905             }
906         }
907 
908         return 0;
909     }
910 
911     /*
912      * If we have a verbs context, that means that some other than '[::]' was
913      * used by the management software for binding. In which case we can
914      * actually warn the user about a potentially broken kernel.
915      */
916 
917     /* IB ports start with 1, not 0 */
918     if (ibv_query_port(verbs, 1, &port_attr)) {
919         ERROR(errp, "Could not query initial IB port");
920         return -EINVAL;
921     }
922 
923     if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) {
924         ERROR(errp, "Linux kernel's RoCE / iWARP does not support IPv6 "
925                     "(but patches on linux-rdma in progress)");
926         return -ENONET;
927     }
928 
929 #endif
930 
931     return 0;
932 }
933 
934 /*
935  * Figure out which RDMA device corresponds to the requested IP hostname
936  * Also create the initial connection manager identifiers for opening
937  * the connection.
938  */
939 static int qemu_rdma_resolve_host(RDMAContext *rdma, Error **errp)
940 {
941     int ret;
942     struct rdma_addrinfo *res;
943     char port_str[16];
944     struct rdma_cm_event *cm_event;
945     char ip[40] = "unknown";
946     struct rdma_addrinfo *e;
947 
948     if (rdma->host == NULL || !strcmp(rdma->host, "")) {
949         ERROR(errp, "RDMA hostname has not been set");
950         return -EINVAL;
951     }
952 
953     /* create CM channel */
954     rdma->channel = rdma_create_event_channel();
955     if (!rdma->channel) {
956         ERROR(errp, "could not create CM channel");
957         return -EINVAL;
958     }
959 
960     /* create CM id */
961     ret = rdma_create_id(rdma->channel, &rdma->cm_id, NULL, RDMA_PS_TCP);
962     if (ret) {
963         ERROR(errp, "could not create channel id");
964         goto err_resolve_create_id;
965     }
966 
967     snprintf(port_str, 16, "%d", rdma->port);
968     port_str[15] = '\0';
969 
970     ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res);
971     if (ret < 0) {
972         ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host);
973         goto err_resolve_get_addr;
974     }
975 
976     for (e = res; e != NULL; e = e->ai_next) {
977         inet_ntop(e->ai_family,
978             &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip);
979         trace_qemu_rdma_resolve_host_trying(rdma->host, ip);
980 
981         ret = rdma_resolve_addr(rdma->cm_id, NULL, e->ai_dst_addr,
982                 RDMA_RESOLVE_TIMEOUT_MS);
983         if (!ret) {
984             if (e->ai_family == AF_INET6) {
985                 ret = qemu_rdma_broken_ipv6_kernel(rdma->cm_id->verbs, errp);
986                 if (ret) {
987                     continue;
988                 }
989             }
990             goto route;
991         }
992     }
993 
994     rdma_freeaddrinfo(res);
995     ERROR(errp, "could not resolve address %s", rdma->host);
996     goto err_resolve_get_addr;
997 
998 route:
999     rdma_freeaddrinfo(res);
1000     qemu_rdma_dump_gid("source_resolve_addr", rdma->cm_id);
1001 
1002     ret = rdma_get_cm_event(rdma->channel, &cm_event);
1003     if (ret) {
1004         ERROR(errp, "could not perform event_addr_resolved");
1005         goto err_resolve_get_addr;
1006     }
1007 
1008     if (cm_event->event != RDMA_CM_EVENT_ADDR_RESOLVED) {
1009         ERROR(errp, "result not equal to event_addr_resolved %s",
1010                 rdma_event_str(cm_event->event));
1011         error_report("rdma_resolve_addr");
1012         rdma_ack_cm_event(cm_event);
1013         ret = -EINVAL;
1014         goto err_resolve_get_addr;
1015     }
1016     rdma_ack_cm_event(cm_event);
1017 
1018     /* resolve route */
1019     ret = rdma_resolve_route(rdma->cm_id, RDMA_RESOLVE_TIMEOUT_MS);
1020     if (ret) {
1021         ERROR(errp, "could not resolve rdma route");
1022         goto err_resolve_get_addr;
1023     }
1024 
1025     ret = rdma_get_cm_event(rdma->channel, &cm_event);
1026     if (ret) {
1027         ERROR(errp, "could not perform event_route_resolved");
1028         goto err_resolve_get_addr;
1029     }
1030     if (cm_event->event != RDMA_CM_EVENT_ROUTE_RESOLVED) {
1031         ERROR(errp, "result not equal to event_route_resolved: %s",
1032                         rdma_event_str(cm_event->event));
1033         rdma_ack_cm_event(cm_event);
1034         ret = -EINVAL;
1035         goto err_resolve_get_addr;
1036     }
1037     rdma_ack_cm_event(cm_event);
1038     rdma->verbs = rdma->cm_id->verbs;
1039     qemu_rdma_dump_id("source_resolve_host", rdma->cm_id->verbs);
1040     qemu_rdma_dump_gid("source_resolve_host", rdma->cm_id);
1041     return 0;
1042 
1043 err_resolve_get_addr:
1044     rdma_destroy_id(rdma->cm_id);
1045     rdma->cm_id = NULL;
1046 err_resolve_create_id:
1047     rdma_destroy_event_channel(rdma->channel);
1048     rdma->channel = NULL;
1049     return ret;
1050 }
1051 
1052 /*
1053  * Create protection domain and completion queues
1054  */
1055 static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma)
1056 {
1057     /* allocate pd */
1058     rdma->pd = ibv_alloc_pd(rdma->verbs);
1059     if (!rdma->pd) {
1060         error_report("failed to allocate protection domain");
1061         return -1;
1062     }
1063 
1064     /* create receive completion channel */
1065     rdma->recv_comp_channel = ibv_create_comp_channel(rdma->verbs);
1066     if (!rdma->recv_comp_channel) {
1067         error_report("failed to allocate receive completion channel");
1068         goto err_alloc_pd_cq;
1069     }
1070 
1071     /*
1072      * Completion queue can be filled by read work requests.
1073      */
1074     rdma->recv_cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3),
1075                                   NULL, rdma->recv_comp_channel, 0);
1076     if (!rdma->recv_cq) {
1077         error_report("failed to allocate receive completion queue");
1078         goto err_alloc_pd_cq;
1079     }
1080 
1081     /* create send completion channel */
1082     rdma->send_comp_channel = ibv_create_comp_channel(rdma->verbs);
1083     if (!rdma->send_comp_channel) {
1084         error_report("failed to allocate send completion channel");
1085         goto err_alloc_pd_cq;
1086     }
1087 
1088     rdma->send_cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3),
1089                                   NULL, rdma->send_comp_channel, 0);
1090     if (!rdma->send_cq) {
1091         error_report("failed to allocate send completion queue");
1092         goto err_alloc_pd_cq;
1093     }
1094 
1095     return 0;
1096 
1097 err_alloc_pd_cq:
1098     if (rdma->pd) {
1099         ibv_dealloc_pd(rdma->pd);
1100     }
1101     if (rdma->recv_comp_channel) {
1102         ibv_destroy_comp_channel(rdma->recv_comp_channel);
1103     }
1104     if (rdma->send_comp_channel) {
1105         ibv_destroy_comp_channel(rdma->send_comp_channel);
1106     }
1107     if (rdma->recv_cq) {
1108         ibv_destroy_cq(rdma->recv_cq);
1109         rdma->recv_cq = NULL;
1110     }
1111     rdma->pd = NULL;
1112     rdma->recv_comp_channel = NULL;
1113     rdma->send_comp_channel = NULL;
1114     return -1;
1115 
1116 }
1117 
1118 /*
1119  * Create queue pairs.
1120  */
1121 static int qemu_rdma_alloc_qp(RDMAContext *rdma)
1122 {
1123     struct ibv_qp_init_attr attr = { 0 };
1124     int ret;
1125 
1126     attr.cap.max_send_wr = RDMA_SIGNALED_SEND_MAX;
1127     attr.cap.max_recv_wr = 3;
1128     attr.cap.max_send_sge = 1;
1129     attr.cap.max_recv_sge = 1;
1130     attr.send_cq = rdma->send_cq;
1131     attr.recv_cq = rdma->recv_cq;
1132     attr.qp_type = IBV_QPT_RC;
1133 
1134     ret = rdma_create_qp(rdma->cm_id, rdma->pd, &attr);
1135     if (ret) {
1136         return -1;
1137     }
1138 
1139     rdma->qp = rdma->cm_id->qp;
1140     return 0;
1141 }
1142 
1143 /* Check whether On-Demand Paging is supported by RDAM device */
1144 static bool rdma_support_odp(struct ibv_context *dev)
1145 {
1146     struct ibv_device_attr_ex attr = {0};
1147     int ret = ibv_query_device_ex(dev, NULL, &attr);
1148     if (ret) {
1149         return false;
1150     }
1151 
1152     if (attr.odp_caps.general_caps & IBV_ODP_SUPPORT) {
1153         return true;
1154     }
1155 
1156     return false;
1157 }
1158 
1159 /*
1160  * ibv_advise_mr to avoid RNR NAK error as far as possible.
1161  * The responder mr registering with ODP will sent RNR NAK back to
1162  * the requester in the face of the page fault.
1163  */
1164 static void qemu_rdma_advise_prefetch_mr(struct ibv_pd *pd, uint64_t addr,
1165                                          uint32_t len,  uint32_t lkey,
1166                                          const char *name, bool wr)
1167 {
1168 #ifdef HAVE_IBV_ADVISE_MR
1169     int ret;
1170     int advice = wr ? IBV_ADVISE_MR_ADVICE_PREFETCH_WRITE :
1171                  IBV_ADVISE_MR_ADVICE_PREFETCH;
1172     struct ibv_sge sg_list = {.lkey = lkey, .addr = addr, .length = len};
1173 
1174     ret = ibv_advise_mr(pd, advice,
1175                         IBV_ADVISE_MR_FLAG_FLUSH, &sg_list, 1);
1176     /* ignore the error */
1177     if (ret) {
1178         trace_qemu_rdma_advise_mr(name, len, addr, strerror(errno));
1179     } else {
1180         trace_qemu_rdma_advise_mr(name, len, addr, "successed");
1181     }
1182 #endif
1183 }
1184 
1185 static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma)
1186 {
1187     int i;
1188     RDMALocalBlocks *local = &rdma->local_ram_blocks;
1189 
1190     for (i = 0; i < local->nb_blocks; i++) {
1191         int access = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE;
1192 
1193         local->block[i].mr =
1194             ibv_reg_mr(rdma->pd,
1195                     local->block[i].local_host_addr,
1196                     local->block[i].length, access
1197                     );
1198 
1199         if (!local->block[i].mr &&
1200             errno == ENOTSUP && rdma_support_odp(rdma->verbs)) {
1201                 access |= IBV_ACCESS_ON_DEMAND;
1202                 /* register ODP mr */
1203                 local->block[i].mr =
1204                     ibv_reg_mr(rdma->pd,
1205                                local->block[i].local_host_addr,
1206                                local->block[i].length, access);
1207                 trace_qemu_rdma_register_odp_mr(local->block[i].block_name);
1208 
1209                 if (local->block[i].mr) {
1210                     qemu_rdma_advise_prefetch_mr(rdma->pd,
1211                                     (uintptr_t)local->block[i].local_host_addr,
1212                                     local->block[i].length,
1213                                     local->block[i].mr->lkey,
1214                                     local->block[i].block_name,
1215                                     true);
1216                 }
1217         }
1218 
1219         if (!local->block[i].mr) {
1220             perror("Failed to register local dest ram block!");
1221             break;
1222         }
1223         rdma->total_registrations++;
1224     }
1225 
1226     if (i >= local->nb_blocks) {
1227         return 0;
1228     }
1229 
1230     for (i--; i >= 0; i--) {
1231         ibv_dereg_mr(local->block[i].mr);
1232         local->block[i].mr = NULL;
1233         rdma->total_registrations--;
1234     }
1235 
1236     return -1;
1237 
1238 }
1239 
1240 /*
1241  * Find the ram block that corresponds to the page requested to be
1242  * transmitted by QEMU.
1243  *
1244  * Once the block is found, also identify which 'chunk' within that
1245  * block that the page belongs to.
1246  *
1247  * This search cannot fail or the migration will fail.
1248  */
1249 static int qemu_rdma_search_ram_block(RDMAContext *rdma,
1250                                       uintptr_t block_offset,
1251                                       uint64_t offset,
1252                                       uint64_t length,
1253                                       uint64_t *block_index,
1254                                       uint64_t *chunk_index)
1255 {
1256     uint64_t current_addr = block_offset + offset;
1257     RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap,
1258                                                 (void *) block_offset);
1259     assert(block);
1260     assert(current_addr >= block->offset);
1261     assert((current_addr + length) <= (block->offset + block->length));
1262 
1263     *block_index = block->index;
1264     *chunk_index = ram_chunk_index(block->local_host_addr,
1265                 block->local_host_addr + (current_addr - block->offset));
1266 
1267     return 0;
1268 }
1269 
1270 /*
1271  * Register a chunk with IB. If the chunk was already registered
1272  * previously, then skip.
1273  *
1274  * Also return the keys associated with the registration needed
1275  * to perform the actual RDMA operation.
1276  */
1277 static int qemu_rdma_register_and_get_keys(RDMAContext *rdma,
1278         RDMALocalBlock *block, uintptr_t host_addr,
1279         uint32_t *lkey, uint32_t *rkey, int chunk,
1280         uint8_t *chunk_start, uint8_t *chunk_end)
1281 {
1282     if (block->mr) {
1283         if (lkey) {
1284             *lkey = block->mr->lkey;
1285         }
1286         if (rkey) {
1287             *rkey = block->mr->rkey;
1288         }
1289         return 0;
1290     }
1291 
1292     /* allocate memory to store chunk MRs */
1293     if (!block->pmr) {
1294         block->pmr = g_new0(struct ibv_mr *, block->nb_chunks);
1295     }
1296 
1297     /*
1298      * If 'rkey', then we're the destination, so grant access to the source.
1299      *
1300      * If 'lkey', then we're the source VM, so grant access only to ourselves.
1301      */
1302     if (!block->pmr[chunk]) {
1303         uint64_t len = chunk_end - chunk_start;
1304         int access = rkey ? IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE :
1305                      0;
1306 
1307         trace_qemu_rdma_register_and_get_keys(len, chunk_start);
1308 
1309         block->pmr[chunk] = ibv_reg_mr(rdma->pd, chunk_start, len, access);
1310         if (!block->pmr[chunk] &&
1311             errno == ENOTSUP && rdma_support_odp(rdma->verbs)) {
1312             access |= IBV_ACCESS_ON_DEMAND;
1313             /* register ODP mr */
1314             block->pmr[chunk] = ibv_reg_mr(rdma->pd, chunk_start, len, access);
1315             trace_qemu_rdma_register_odp_mr(block->block_name);
1316 
1317             if (block->pmr[chunk]) {
1318                 qemu_rdma_advise_prefetch_mr(rdma->pd, (uintptr_t)chunk_start,
1319                                             len, block->pmr[chunk]->lkey,
1320                                             block->block_name, rkey);
1321 
1322             }
1323         }
1324     }
1325     if (!block->pmr[chunk]) {
1326         perror("Failed to register chunk!");
1327         fprintf(stderr, "Chunk details: block: %d chunk index %d"
1328                         " start %" PRIuPTR " end %" PRIuPTR
1329                         " host %" PRIuPTR
1330                         " local %" PRIuPTR " registrations: %d\n",
1331                         block->index, chunk, (uintptr_t)chunk_start,
1332                         (uintptr_t)chunk_end, host_addr,
1333                         (uintptr_t)block->local_host_addr,
1334                         rdma->total_registrations);
1335         return -1;
1336     }
1337     rdma->total_registrations++;
1338 
1339     if (lkey) {
1340         *lkey = block->pmr[chunk]->lkey;
1341     }
1342     if (rkey) {
1343         *rkey = block->pmr[chunk]->rkey;
1344     }
1345     return 0;
1346 }
1347 
1348 /*
1349  * Register (at connection time) the memory used for control
1350  * channel messages.
1351  */
1352 static int qemu_rdma_reg_control(RDMAContext *rdma, int idx)
1353 {
1354     rdma->wr_data[idx].control_mr = ibv_reg_mr(rdma->pd,
1355             rdma->wr_data[idx].control, RDMA_CONTROL_MAX_BUFFER,
1356             IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
1357     if (rdma->wr_data[idx].control_mr) {
1358         rdma->total_registrations++;
1359         return 0;
1360     }
1361     error_report("qemu_rdma_reg_control failed");
1362     return -1;
1363 }
1364 
1365 const char *print_wrid(int wrid)
1366 {
1367     if (wrid >= RDMA_WRID_RECV_CONTROL) {
1368         return wrid_desc[RDMA_WRID_RECV_CONTROL];
1369     }
1370     return wrid_desc[wrid];
1371 }
1372 
1373 /*
1374  * Perform a non-optimized memory unregistration after every transfer
1375  * for demonstration purposes, only if pin-all is not requested.
1376  *
1377  * Potential optimizations:
1378  * 1. Start a new thread to run this function continuously
1379         - for bit clearing
1380         - and for receipt of unregister messages
1381  * 2. Use an LRU.
1382  * 3. Use workload hints.
1383  */
1384 static int qemu_rdma_unregister_waiting(RDMAContext *rdma)
1385 {
1386     while (rdma->unregistrations[rdma->unregister_current]) {
1387         int ret;
1388         uint64_t wr_id = rdma->unregistrations[rdma->unregister_current];
1389         uint64_t chunk =
1390             (wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
1391         uint64_t index =
1392             (wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
1393         RDMALocalBlock *block =
1394             &(rdma->local_ram_blocks.block[index]);
1395         RDMARegister reg = { .current_index = index };
1396         RDMAControlHeader resp = { .type = RDMA_CONTROL_UNREGISTER_FINISHED,
1397                                  };
1398         RDMAControlHeader head = { .len = sizeof(RDMARegister),
1399                                    .type = RDMA_CONTROL_UNREGISTER_REQUEST,
1400                                    .repeat = 1,
1401                                  };
1402 
1403         trace_qemu_rdma_unregister_waiting_proc(chunk,
1404                                                 rdma->unregister_current);
1405 
1406         rdma->unregistrations[rdma->unregister_current] = 0;
1407         rdma->unregister_current++;
1408 
1409         if (rdma->unregister_current == RDMA_SIGNALED_SEND_MAX) {
1410             rdma->unregister_current = 0;
1411         }
1412 
1413 
1414         /*
1415          * Unregistration is speculative (because migration is single-threaded
1416          * and we cannot break the protocol's inifinband message ordering).
1417          * Thus, if the memory is currently being used for transmission,
1418          * then abort the attempt to unregister and try again
1419          * later the next time a completion is received for this memory.
1420          */
1421         clear_bit(chunk, block->unregister_bitmap);
1422 
1423         if (test_bit(chunk, block->transit_bitmap)) {
1424             trace_qemu_rdma_unregister_waiting_inflight(chunk);
1425             continue;
1426         }
1427 
1428         trace_qemu_rdma_unregister_waiting_send(chunk);
1429 
1430         ret = ibv_dereg_mr(block->pmr[chunk]);
1431         block->pmr[chunk] = NULL;
1432         block->remote_keys[chunk] = 0;
1433 
1434         if (ret != 0) {
1435             perror("unregistration chunk failed");
1436             return -ret;
1437         }
1438         rdma->total_registrations--;
1439 
1440         reg.key.chunk = chunk;
1441         register_to_network(rdma, &reg);
1442         ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
1443                                 &resp, NULL, NULL);
1444         if (ret < 0) {
1445             return ret;
1446         }
1447 
1448         trace_qemu_rdma_unregister_waiting_complete(chunk);
1449     }
1450 
1451     return 0;
1452 }
1453 
1454 static uint64_t qemu_rdma_make_wrid(uint64_t wr_id, uint64_t index,
1455                                          uint64_t chunk)
1456 {
1457     uint64_t result = wr_id & RDMA_WRID_TYPE_MASK;
1458 
1459     result |= (index << RDMA_WRID_BLOCK_SHIFT);
1460     result |= (chunk << RDMA_WRID_CHUNK_SHIFT);
1461 
1462     return result;
1463 }
1464 
1465 /*
1466  * Consult the connection manager to see a work request
1467  * (of any kind) has completed.
1468  * Return the work request ID that completed.
1469  */
1470 static uint64_t qemu_rdma_poll(RDMAContext *rdma, struct ibv_cq *cq,
1471                                uint64_t *wr_id_out, uint32_t *byte_len)
1472 {
1473     int ret;
1474     struct ibv_wc wc;
1475     uint64_t wr_id;
1476 
1477     ret = ibv_poll_cq(cq, 1, &wc);
1478 
1479     if (!ret) {
1480         *wr_id_out = RDMA_WRID_NONE;
1481         return 0;
1482     }
1483 
1484     if (ret < 0) {
1485         error_report("ibv_poll_cq return %d", ret);
1486         return ret;
1487     }
1488 
1489     wr_id = wc.wr_id & RDMA_WRID_TYPE_MASK;
1490 
1491     if (wc.status != IBV_WC_SUCCESS) {
1492         fprintf(stderr, "ibv_poll_cq wc.status=%d %s!\n",
1493                         wc.status, ibv_wc_status_str(wc.status));
1494         fprintf(stderr, "ibv_poll_cq wrid=%s!\n", wrid_desc[wr_id]);
1495 
1496         return -1;
1497     }
1498 
1499     if (rdma->control_ready_expected &&
1500         (wr_id >= RDMA_WRID_RECV_CONTROL)) {
1501         trace_qemu_rdma_poll_recv(wrid_desc[RDMA_WRID_RECV_CONTROL],
1502                   wr_id - RDMA_WRID_RECV_CONTROL, wr_id, rdma->nb_sent);
1503         rdma->control_ready_expected = 0;
1504     }
1505 
1506     if (wr_id == RDMA_WRID_RDMA_WRITE) {
1507         uint64_t chunk =
1508             (wc.wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
1509         uint64_t index =
1510             (wc.wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
1511         RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
1512 
1513         trace_qemu_rdma_poll_write(print_wrid(wr_id), wr_id, rdma->nb_sent,
1514                                    index, chunk, block->local_host_addr,
1515                                    (void *)(uintptr_t)block->remote_host_addr);
1516 
1517         clear_bit(chunk, block->transit_bitmap);
1518 
1519         if (rdma->nb_sent > 0) {
1520             rdma->nb_sent--;
1521         }
1522     } else {
1523         trace_qemu_rdma_poll_other(print_wrid(wr_id), wr_id, rdma->nb_sent);
1524     }
1525 
1526     *wr_id_out = wc.wr_id;
1527     if (byte_len) {
1528         *byte_len = wc.byte_len;
1529     }
1530 
1531     return  0;
1532 }
1533 
1534 /* Wait for activity on the completion channel.
1535  * Returns 0 on success, none-0 on error.
1536  */
1537 static int qemu_rdma_wait_comp_channel(RDMAContext *rdma,
1538                                        struct ibv_comp_channel *comp_channel)
1539 {
1540     struct rdma_cm_event *cm_event;
1541     int ret = -1;
1542 
1543     /*
1544      * Coroutine doesn't start until migration_fd_process_incoming()
1545      * so don't yield unless we know we're running inside of a coroutine.
1546      */
1547     if (rdma->migration_started_on_destination &&
1548         migration_incoming_get_current()->state == MIGRATION_STATUS_ACTIVE) {
1549         yield_until_fd_readable(comp_channel->fd);
1550     } else {
1551         /* This is the source side, we're in a separate thread
1552          * or destination prior to migration_fd_process_incoming()
1553          * after postcopy, the destination also in a separate thread.
1554          * we can't yield; so we have to poll the fd.
1555          * But we need to be able to handle 'cancel' or an error
1556          * without hanging forever.
1557          */
1558         while (!rdma->error_state  && !rdma->received_error) {
1559             GPollFD pfds[2];
1560             pfds[0].fd = comp_channel->fd;
1561             pfds[0].events = G_IO_IN | G_IO_HUP | G_IO_ERR;
1562             pfds[0].revents = 0;
1563 
1564             pfds[1].fd = rdma->channel->fd;
1565             pfds[1].events = G_IO_IN | G_IO_HUP | G_IO_ERR;
1566             pfds[1].revents = 0;
1567 
1568             /* 0.1s timeout, should be fine for a 'cancel' */
1569             switch (qemu_poll_ns(pfds, 2, 100 * 1000 * 1000)) {
1570             case 2:
1571             case 1: /* fd active */
1572                 if (pfds[0].revents) {
1573                     return 0;
1574                 }
1575 
1576                 if (pfds[1].revents) {
1577                     ret = rdma_get_cm_event(rdma->channel, &cm_event);
1578                     if (ret) {
1579                         error_report("failed to get cm event while wait "
1580                                      "completion channel");
1581                         return -EPIPE;
1582                     }
1583 
1584                     error_report("receive cm event while wait comp channel,"
1585                                  "cm event is %d", cm_event->event);
1586                     if (cm_event->event == RDMA_CM_EVENT_DISCONNECTED ||
1587                         cm_event->event == RDMA_CM_EVENT_DEVICE_REMOVAL) {
1588                         rdma_ack_cm_event(cm_event);
1589                         return -EPIPE;
1590                     }
1591                     rdma_ack_cm_event(cm_event);
1592                 }
1593                 break;
1594 
1595             case 0: /* Timeout, go around again */
1596                 break;
1597 
1598             default: /* Error of some type -
1599                       * I don't trust errno from qemu_poll_ns
1600                      */
1601                 error_report("%s: poll failed", __func__);
1602                 return -EPIPE;
1603             }
1604 
1605             if (migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) {
1606                 /* Bail out and let the cancellation happen */
1607                 return -EPIPE;
1608             }
1609         }
1610     }
1611 
1612     if (rdma->received_error) {
1613         return -EPIPE;
1614     }
1615     return rdma->error_state;
1616 }
1617 
1618 static struct ibv_comp_channel *to_channel(RDMAContext *rdma, int wrid)
1619 {
1620     return wrid < RDMA_WRID_RECV_CONTROL ? rdma->send_comp_channel :
1621            rdma->recv_comp_channel;
1622 }
1623 
1624 static struct ibv_cq *to_cq(RDMAContext *rdma, int wrid)
1625 {
1626     return wrid < RDMA_WRID_RECV_CONTROL ? rdma->send_cq : rdma->recv_cq;
1627 }
1628 
1629 /*
1630  * Block until the next work request has completed.
1631  *
1632  * First poll to see if a work request has already completed,
1633  * otherwise block.
1634  *
1635  * If we encounter completed work requests for IDs other than
1636  * the one we're interested in, then that's generally an error.
1637  *
1638  * The only exception is actual RDMA Write completions. These
1639  * completions only need to be recorded, but do not actually
1640  * need further processing.
1641  */
1642 static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested,
1643                                     uint32_t *byte_len)
1644 {
1645     int num_cq_events = 0, ret = 0;
1646     struct ibv_cq *cq;
1647     void *cq_ctx;
1648     uint64_t wr_id = RDMA_WRID_NONE, wr_id_in;
1649     struct ibv_comp_channel *ch = to_channel(rdma, wrid_requested);
1650     struct ibv_cq *poll_cq = to_cq(rdma, wrid_requested);
1651 
1652     if (ibv_req_notify_cq(poll_cq, 0)) {
1653         return -1;
1654     }
1655     /* poll cq first */
1656     while (wr_id != wrid_requested) {
1657         ret = qemu_rdma_poll(rdma, poll_cq, &wr_id_in, byte_len);
1658         if (ret < 0) {
1659             return ret;
1660         }
1661 
1662         wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
1663 
1664         if (wr_id == RDMA_WRID_NONE) {
1665             break;
1666         }
1667         if (wr_id != wrid_requested) {
1668             trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested),
1669                        wrid_requested, print_wrid(wr_id), wr_id);
1670         }
1671     }
1672 
1673     if (wr_id == wrid_requested) {
1674         return 0;
1675     }
1676 
1677     while (1) {
1678         ret = qemu_rdma_wait_comp_channel(rdma, ch);
1679         if (ret) {
1680             goto err_block_for_wrid;
1681         }
1682 
1683         ret = ibv_get_cq_event(ch, &cq, &cq_ctx);
1684         if (ret) {
1685             perror("ibv_get_cq_event");
1686             goto err_block_for_wrid;
1687         }
1688 
1689         num_cq_events++;
1690 
1691         ret = -ibv_req_notify_cq(cq, 0);
1692         if (ret) {
1693             goto err_block_for_wrid;
1694         }
1695 
1696         while (wr_id != wrid_requested) {
1697             ret = qemu_rdma_poll(rdma, poll_cq, &wr_id_in, byte_len);
1698             if (ret < 0) {
1699                 goto err_block_for_wrid;
1700             }
1701 
1702             wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
1703 
1704             if (wr_id == RDMA_WRID_NONE) {
1705                 break;
1706             }
1707             if (wr_id != wrid_requested) {
1708                 trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested),
1709                                    wrid_requested, print_wrid(wr_id), wr_id);
1710             }
1711         }
1712 
1713         if (wr_id == wrid_requested) {
1714             goto success_block_for_wrid;
1715         }
1716     }
1717 
1718 success_block_for_wrid:
1719     if (num_cq_events) {
1720         ibv_ack_cq_events(cq, num_cq_events);
1721     }
1722     return 0;
1723 
1724 err_block_for_wrid:
1725     if (num_cq_events) {
1726         ibv_ack_cq_events(cq, num_cq_events);
1727     }
1728 
1729     rdma->error_state = ret;
1730     return ret;
1731 }
1732 
1733 /*
1734  * Post a SEND message work request for the control channel
1735  * containing some data and block until the post completes.
1736  */
1737 static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf,
1738                                        RDMAControlHeader *head)
1739 {
1740     int ret = 0;
1741     RDMAWorkRequestData *wr = &rdma->wr_data[RDMA_WRID_CONTROL];
1742     struct ibv_send_wr *bad_wr;
1743     struct ibv_sge sge = {
1744                            .addr = (uintptr_t)(wr->control),
1745                            .length = head->len + sizeof(RDMAControlHeader),
1746                            .lkey = wr->control_mr->lkey,
1747                          };
1748     struct ibv_send_wr send_wr = {
1749                                    .wr_id = RDMA_WRID_SEND_CONTROL,
1750                                    .opcode = IBV_WR_SEND,
1751                                    .send_flags = IBV_SEND_SIGNALED,
1752                                    .sg_list = &sge,
1753                                    .num_sge = 1,
1754                                 };
1755 
1756     trace_qemu_rdma_post_send_control(control_desc(head->type));
1757 
1758     /*
1759      * We don't actually need to do a memcpy() in here if we used
1760      * the "sge" properly, but since we're only sending control messages
1761      * (not RAM in a performance-critical path), then its OK for now.
1762      *
1763      * The copy makes the RDMAControlHeader simpler to manipulate
1764      * for the time being.
1765      */
1766     assert(head->len <= RDMA_CONTROL_MAX_BUFFER - sizeof(*head));
1767     memcpy(wr->control, head, sizeof(RDMAControlHeader));
1768     control_to_network((void *) wr->control);
1769 
1770     if (buf) {
1771         memcpy(wr->control + sizeof(RDMAControlHeader), buf, head->len);
1772     }
1773 
1774 
1775     ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
1776 
1777     if (ret > 0) {
1778         error_report("Failed to use post IB SEND for control");
1779         return -ret;
1780     }
1781 
1782     ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_SEND_CONTROL, NULL);
1783     if (ret < 0) {
1784         error_report("rdma migration: send polling control error");
1785     }
1786 
1787     return ret;
1788 }
1789 
1790 /*
1791  * Post a RECV work request in anticipation of some future receipt
1792  * of data on the control channel.
1793  */
1794 static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx)
1795 {
1796     struct ibv_recv_wr *bad_wr;
1797     struct ibv_sge sge = {
1798                             .addr = (uintptr_t)(rdma->wr_data[idx].control),
1799                             .length = RDMA_CONTROL_MAX_BUFFER,
1800                             .lkey = rdma->wr_data[idx].control_mr->lkey,
1801                          };
1802 
1803     struct ibv_recv_wr recv_wr = {
1804                                     .wr_id = RDMA_WRID_RECV_CONTROL + idx,
1805                                     .sg_list = &sge,
1806                                     .num_sge = 1,
1807                                  };
1808 
1809 
1810     if (ibv_post_recv(rdma->qp, &recv_wr, &bad_wr)) {
1811         return -1;
1812     }
1813 
1814     return 0;
1815 }
1816 
1817 /*
1818  * Block and wait for a RECV control channel message to arrive.
1819  */
1820 static int qemu_rdma_exchange_get_response(RDMAContext *rdma,
1821                 RDMAControlHeader *head, int expecting, int idx)
1822 {
1823     uint32_t byte_len;
1824     int ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RECV_CONTROL + idx,
1825                                        &byte_len);
1826 
1827     if (ret < 0) {
1828         error_report("rdma migration: recv polling control error!");
1829         return ret;
1830     }
1831 
1832     network_to_control((void *) rdma->wr_data[idx].control);
1833     memcpy(head, rdma->wr_data[idx].control, sizeof(RDMAControlHeader));
1834 
1835     trace_qemu_rdma_exchange_get_response_start(control_desc(expecting));
1836 
1837     if (expecting == RDMA_CONTROL_NONE) {
1838         trace_qemu_rdma_exchange_get_response_none(control_desc(head->type),
1839                                              head->type);
1840     } else if (head->type != expecting || head->type == RDMA_CONTROL_ERROR) {
1841         error_report("Was expecting a %s (%d) control message"
1842                 ", but got: %s (%d), length: %d",
1843                 control_desc(expecting), expecting,
1844                 control_desc(head->type), head->type, head->len);
1845         if (head->type == RDMA_CONTROL_ERROR) {
1846             rdma->received_error = true;
1847         }
1848         return -EIO;
1849     }
1850     if (head->len > RDMA_CONTROL_MAX_BUFFER - sizeof(*head)) {
1851         error_report("too long length: %d", head->len);
1852         return -EINVAL;
1853     }
1854     if (sizeof(*head) + head->len != byte_len) {
1855         error_report("Malformed length: %d byte_len %d", head->len, byte_len);
1856         return -EINVAL;
1857     }
1858 
1859     return 0;
1860 }
1861 
1862 /*
1863  * When a RECV work request has completed, the work request's
1864  * buffer is pointed at the header.
1865  *
1866  * This will advance the pointer to the data portion
1867  * of the control message of the work request's buffer that
1868  * was populated after the work request finished.
1869  */
1870 static void qemu_rdma_move_header(RDMAContext *rdma, int idx,
1871                                   RDMAControlHeader *head)
1872 {
1873     rdma->wr_data[idx].control_len = head->len;
1874     rdma->wr_data[idx].control_curr =
1875         rdma->wr_data[idx].control + sizeof(RDMAControlHeader);
1876 }
1877 
1878 /*
1879  * This is an 'atomic' high-level operation to deliver a single, unified
1880  * control-channel message.
1881  *
1882  * Additionally, if the user is expecting some kind of reply to this message,
1883  * they can request a 'resp' response message be filled in by posting an
1884  * additional work request on behalf of the user and waiting for an additional
1885  * completion.
1886  *
1887  * The extra (optional) response is used during registration to us from having
1888  * to perform an *additional* exchange of message just to provide a response by
1889  * instead piggy-backing on the acknowledgement.
1890  */
1891 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
1892                                    uint8_t *data, RDMAControlHeader *resp,
1893                                    int *resp_idx,
1894                                    int (*callback)(RDMAContext *rdma))
1895 {
1896     int ret = 0;
1897 
1898     /*
1899      * Wait until the dest is ready before attempting to deliver the message
1900      * by waiting for a READY message.
1901      */
1902     if (rdma->control_ready_expected) {
1903         RDMAControlHeader resp;
1904         ret = qemu_rdma_exchange_get_response(rdma,
1905                                     &resp, RDMA_CONTROL_READY, RDMA_WRID_READY);
1906         if (ret < 0) {
1907             return ret;
1908         }
1909     }
1910 
1911     /*
1912      * If the user is expecting a response, post a WR in anticipation of it.
1913      */
1914     if (resp) {
1915         ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_DATA);
1916         if (ret) {
1917             error_report("rdma migration: error posting"
1918                     " extra control recv for anticipated result!");
1919             return ret;
1920         }
1921     }
1922 
1923     /*
1924      * Post a WR to replace the one we just consumed for the READY message.
1925      */
1926     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
1927     if (ret) {
1928         error_report("rdma migration: error posting first control recv!");
1929         return ret;
1930     }
1931 
1932     /*
1933      * Deliver the control message that was requested.
1934      */
1935     ret = qemu_rdma_post_send_control(rdma, data, head);
1936 
1937     if (ret < 0) {
1938         error_report("Failed to send control buffer!");
1939         return ret;
1940     }
1941 
1942     /*
1943      * If we're expecting a response, block and wait for it.
1944      */
1945     if (resp) {
1946         if (callback) {
1947             trace_qemu_rdma_exchange_send_issue_callback();
1948             ret = callback(rdma);
1949             if (ret < 0) {
1950                 return ret;
1951             }
1952         }
1953 
1954         trace_qemu_rdma_exchange_send_waiting(control_desc(resp->type));
1955         ret = qemu_rdma_exchange_get_response(rdma, resp,
1956                                               resp->type, RDMA_WRID_DATA);
1957 
1958         if (ret < 0) {
1959             return ret;
1960         }
1961 
1962         qemu_rdma_move_header(rdma, RDMA_WRID_DATA, resp);
1963         if (resp_idx) {
1964             *resp_idx = RDMA_WRID_DATA;
1965         }
1966         trace_qemu_rdma_exchange_send_received(control_desc(resp->type));
1967     }
1968 
1969     rdma->control_ready_expected = 1;
1970 
1971     return 0;
1972 }
1973 
1974 /*
1975  * This is an 'atomic' high-level operation to receive a single, unified
1976  * control-channel message.
1977  */
1978 static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader *head,
1979                                 int expecting)
1980 {
1981     RDMAControlHeader ready = {
1982                                 .len = 0,
1983                                 .type = RDMA_CONTROL_READY,
1984                                 .repeat = 1,
1985                               };
1986     int ret;
1987 
1988     /*
1989      * Inform the source that we're ready to receive a message.
1990      */
1991     ret = qemu_rdma_post_send_control(rdma, NULL, &ready);
1992 
1993     if (ret < 0) {
1994         error_report("Failed to send control buffer!");
1995         return ret;
1996     }
1997 
1998     /*
1999      * Block and wait for the message.
2000      */
2001     ret = qemu_rdma_exchange_get_response(rdma, head,
2002                                           expecting, RDMA_WRID_READY);
2003 
2004     if (ret < 0) {
2005         return ret;
2006     }
2007 
2008     qemu_rdma_move_header(rdma, RDMA_WRID_READY, head);
2009 
2010     /*
2011      * Post a new RECV work request to replace the one we just consumed.
2012      */
2013     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
2014     if (ret) {
2015         error_report("rdma migration: error posting second control recv!");
2016         return ret;
2017     }
2018 
2019     return 0;
2020 }
2021 
2022 /*
2023  * Write an actual chunk of memory using RDMA.
2024  *
2025  * If we're using dynamic registration on the dest-side, we have to
2026  * send a registration command first.
2027  */
2028 static int qemu_rdma_write_one(QEMUFile *f, RDMAContext *rdma,
2029                                int current_index, uint64_t current_addr,
2030                                uint64_t length)
2031 {
2032     struct ibv_sge sge;
2033     struct ibv_send_wr send_wr = { 0 };
2034     struct ibv_send_wr *bad_wr;
2035     int reg_result_idx, ret, count = 0;
2036     uint64_t chunk, chunks;
2037     uint8_t *chunk_start, *chunk_end;
2038     RDMALocalBlock *block = &(rdma->local_ram_blocks.block[current_index]);
2039     RDMARegister reg;
2040     RDMARegisterResult *reg_result;
2041     RDMAControlHeader resp = { .type = RDMA_CONTROL_REGISTER_RESULT };
2042     RDMAControlHeader head = { .len = sizeof(RDMARegister),
2043                                .type = RDMA_CONTROL_REGISTER_REQUEST,
2044                                .repeat = 1,
2045                              };
2046 
2047 retry:
2048     sge.addr = (uintptr_t)(block->local_host_addr +
2049                             (current_addr - block->offset));
2050     sge.length = length;
2051 
2052     chunk = ram_chunk_index(block->local_host_addr,
2053                             (uint8_t *)(uintptr_t)sge.addr);
2054     chunk_start = ram_chunk_start(block, chunk);
2055 
2056     if (block->is_ram_block) {
2057         chunks = length / (1UL << RDMA_REG_CHUNK_SHIFT);
2058 
2059         if (chunks && ((length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
2060             chunks--;
2061         }
2062     } else {
2063         chunks = block->length / (1UL << RDMA_REG_CHUNK_SHIFT);
2064 
2065         if (chunks && ((block->length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
2066             chunks--;
2067         }
2068     }
2069 
2070     trace_qemu_rdma_write_one_top(chunks + 1,
2071                                   (chunks + 1) *
2072                                   (1UL << RDMA_REG_CHUNK_SHIFT) / 1024 / 1024);
2073 
2074     chunk_end = ram_chunk_end(block, chunk + chunks);
2075 
2076 
2077     while (test_bit(chunk, block->transit_bitmap)) {
2078         (void)count;
2079         trace_qemu_rdma_write_one_block(count++, current_index, chunk,
2080                 sge.addr, length, rdma->nb_sent, block->nb_chunks);
2081 
2082         ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
2083 
2084         if (ret < 0) {
2085             error_report("Failed to Wait for previous write to complete "
2086                     "block %d chunk %" PRIu64
2087                     " current %" PRIu64 " len %" PRIu64 " %d",
2088                     current_index, chunk, sge.addr, length, rdma->nb_sent);
2089             return ret;
2090         }
2091     }
2092 
2093     if (!rdma->pin_all || !block->is_ram_block) {
2094         if (!block->remote_keys[chunk]) {
2095             /*
2096              * This chunk has not yet been registered, so first check to see
2097              * if the entire chunk is zero. If so, tell the other size to
2098              * memset() + madvise() the entire chunk without RDMA.
2099              */
2100 
2101             if (buffer_is_zero((void *)(uintptr_t)sge.addr, length)) {
2102                 RDMACompress comp = {
2103                                         .offset = current_addr,
2104                                         .value = 0,
2105                                         .block_idx = current_index,
2106                                         .length = length,
2107                                     };
2108 
2109                 head.len = sizeof(comp);
2110                 head.type = RDMA_CONTROL_COMPRESS;
2111 
2112                 trace_qemu_rdma_write_one_zero(chunk, sge.length,
2113                                                current_index, current_addr);
2114 
2115                 compress_to_network(rdma, &comp);
2116                 ret = qemu_rdma_exchange_send(rdma, &head,
2117                                 (uint8_t *) &comp, NULL, NULL, NULL);
2118 
2119                 if (ret < 0) {
2120                     return -EIO;
2121                 }
2122 
2123                 acct_update_position(f, sge.length, true);
2124 
2125                 return 1;
2126             }
2127 
2128             /*
2129              * Otherwise, tell other side to register.
2130              */
2131             reg.current_index = current_index;
2132             if (block->is_ram_block) {
2133                 reg.key.current_addr = current_addr;
2134             } else {
2135                 reg.key.chunk = chunk;
2136             }
2137             reg.chunks = chunks;
2138 
2139             trace_qemu_rdma_write_one_sendreg(chunk, sge.length, current_index,
2140                                               current_addr);
2141 
2142             register_to_network(rdma, &reg);
2143             ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
2144                                     &resp, &reg_result_idx, NULL);
2145             if (ret < 0) {
2146                 return ret;
2147             }
2148 
2149             /* try to overlap this single registration with the one we sent. */
2150             if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
2151                                                 &sge.lkey, NULL, chunk,
2152                                                 chunk_start, chunk_end)) {
2153                 error_report("cannot get lkey");
2154                 return -EINVAL;
2155             }
2156 
2157             reg_result = (RDMARegisterResult *)
2158                     rdma->wr_data[reg_result_idx].control_curr;
2159 
2160             network_to_result(reg_result);
2161 
2162             trace_qemu_rdma_write_one_recvregres(block->remote_keys[chunk],
2163                                                  reg_result->rkey, chunk);
2164 
2165             block->remote_keys[chunk] = reg_result->rkey;
2166             block->remote_host_addr = reg_result->host_addr;
2167         } else {
2168             /* already registered before */
2169             if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
2170                                                 &sge.lkey, NULL, chunk,
2171                                                 chunk_start, chunk_end)) {
2172                 error_report("cannot get lkey!");
2173                 return -EINVAL;
2174             }
2175         }
2176 
2177         send_wr.wr.rdma.rkey = block->remote_keys[chunk];
2178     } else {
2179         send_wr.wr.rdma.rkey = block->remote_rkey;
2180 
2181         if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
2182                                                      &sge.lkey, NULL, chunk,
2183                                                      chunk_start, chunk_end)) {
2184             error_report("cannot get lkey!");
2185             return -EINVAL;
2186         }
2187     }
2188 
2189     /*
2190      * Encode the ram block index and chunk within this wrid.
2191      * We will use this information at the time of completion
2192      * to figure out which bitmap to check against and then which
2193      * chunk in the bitmap to look for.
2194      */
2195     send_wr.wr_id = qemu_rdma_make_wrid(RDMA_WRID_RDMA_WRITE,
2196                                         current_index, chunk);
2197 
2198     send_wr.opcode = IBV_WR_RDMA_WRITE;
2199     send_wr.send_flags = IBV_SEND_SIGNALED;
2200     send_wr.sg_list = &sge;
2201     send_wr.num_sge = 1;
2202     send_wr.wr.rdma.remote_addr = block->remote_host_addr +
2203                                 (current_addr - block->offset);
2204 
2205     trace_qemu_rdma_write_one_post(chunk, sge.addr, send_wr.wr.rdma.remote_addr,
2206                                    sge.length);
2207 
2208     /*
2209      * ibv_post_send() does not return negative error numbers,
2210      * per the specification they are positive - no idea why.
2211      */
2212     ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
2213 
2214     if (ret == ENOMEM) {
2215         trace_qemu_rdma_write_one_queue_full();
2216         ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
2217         if (ret < 0) {
2218             error_report("rdma migration: failed to make "
2219                          "room in full send queue! %d", ret);
2220             return ret;
2221         }
2222 
2223         goto retry;
2224 
2225     } else if (ret > 0) {
2226         perror("rdma migration: post rdma write failed");
2227         return -ret;
2228     }
2229 
2230     set_bit(chunk, block->transit_bitmap);
2231     acct_update_position(f, sge.length, false);
2232     rdma->total_writes++;
2233 
2234     return 0;
2235 }
2236 
2237 /*
2238  * Push out any unwritten RDMA operations.
2239  *
2240  * We support sending out multiple chunks at the same time.
2241  * Not all of them need to get signaled in the completion queue.
2242  */
2243 static int qemu_rdma_write_flush(QEMUFile *f, RDMAContext *rdma)
2244 {
2245     int ret;
2246 
2247     if (!rdma->current_length) {
2248         return 0;
2249     }
2250 
2251     ret = qemu_rdma_write_one(f, rdma,
2252             rdma->current_index, rdma->current_addr, rdma->current_length);
2253 
2254     if (ret < 0) {
2255         return ret;
2256     }
2257 
2258     if (ret == 0) {
2259         rdma->nb_sent++;
2260         trace_qemu_rdma_write_flush(rdma->nb_sent);
2261     }
2262 
2263     rdma->current_length = 0;
2264     rdma->current_addr = 0;
2265 
2266     return 0;
2267 }
2268 
2269 static inline int qemu_rdma_buffer_mergable(RDMAContext *rdma,
2270                     uint64_t offset, uint64_t len)
2271 {
2272     RDMALocalBlock *block;
2273     uint8_t *host_addr;
2274     uint8_t *chunk_end;
2275 
2276     if (rdma->current_index < 0) {
2277         return 0;
2278     }
2279 
2280     if (rdma->current_chunk < 0) {
2281         return 0;
2282     }
2283 
2284     block = &(rdma->local_ram_blocks.block[rdma->current_index]);
2285     host_addr = block->local_host_addr + (offset - block->offset);
2286     chunk_end = ram_chunk_end(block, rdma->current_chunk);
2287 
2288     if (rdma->current_length == 0) {
2289         return 0;
2290     }
2291 
2292     /*
2293      * Only merge into chunk sequentially.
2294      */
2295     if (offset != (rdma->current_addr + rdma->current_length)) {
2296         return 0;
2297     }
2298 
2299     if (offset < block->offset) {
2300         return 0;
2301     }
2302 
2303     if ((offset + len) > (block->offset + block->length)) {
2304         return 0;
2305     }
2306 
2307     if ((host_addr + len) > chunk_end) {
2308         return 0;
2309     }
2310 
2311     return 1;
2312 }
2313 
2314 /*
2315  * We're not actually writing here, but doing three things:
2316  *
2317  * 1. Identify the chunk the buffer belongs to.
2318  * 2. If the chunk is full or the buffer doesn't belong to the current
2319  *    chunk, then start a new chunk and flush() the old chunk.
2320  * 3. To keep the hardware busy, we also group chunks into batches
2321  *    and only require that a batch gets acknowledged in the completion
2322  *    queue instead of each individual chunk.
2323  */
2324 static int qemu_rdma_write(QEMUFile *f, RDMAContext *rdma,
2325                            uint64_t block_offset, uint64_t offset,
2326                            uint64_t len)
2327 {
2328     uint64_t current_addr = block_offset + offset;
2329     uint64_t index = rdma->current_index;
2330     uint64_t chunk = rdma->current_chunk;
2331     int ret;
2332 
2333     /* If we cannot merge it, we flush the current buffer first. */
2334     if (!qemu_rdma_buffer_mergable(rdma, current_addr, len)) {
2335         ret = qemu_rdma_write_flush(f, rdma);
2336         if (ret) {
2337             return ret;
2338         }
2339         rdma->current_length = 0;
2340         rdma->current_addr = current_addr;
2341 
2342         ret = qemu_rdma_search_ram_block(rdma, block_offset,
2343                                          offset, len, &index, &chunk);
2344         if (ret) {
2345             error_report("ram block search failed");
2346             return ret;
2347         }
2348         rdma->current_index = index;
2349         rdma->current_chunk = chunk;
2350     }
2351 
2352     /* merge it */
2353     rdma->current_length += len;
2354 
2355     /* flush it if buffer is too large */
2356     if (rdma->current_length >= RDMA_MERGE_MAX) {
2357         return qemu_rdma_write_flush(f, rdma);
2358     }
2359 
2360     return 0;
2361 }
2362 
2363 static void qemu_rdma_cleanup(RDMAContext *rdma)
2364 {
2365     int idx;
2366 
2367     if (rdma->cm_id && rdma->connected) {
2368         if ((rdma->error_state ||
2369              migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) &&
2370             !rdma->received_error) {
2371             RDMAControlHeader head = { .len = 0,
2372                                        .type = RDMA_CONTROL_ERROR,
2373                                        .repeat = 1,
2374                                      };
2375             error_report("Early error. Sending error.");
2376             qemu_rdma_post_send_control(rdma, NULL, &head);
2377         }
2378 
2379         rdma_disconnect(rdma->cm_id);
2380         trace_qemu_rdma_cleanup_disconnect();
2381         rdma->connected = false;
2382     }
2383 
2384     if (rdma->channel) {
2385         qemu_set_fd_handler(rdma->channel->fd, NULL, NULL, NULL);
2386     }
2387     g_free(rdma->dest_blocks);
2388     rdma->dest_blocks = NULL;
2389 
2390     for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2391         if (rdma->wr_data[idx].control_mr) {
2392             rdma->total_registrations--;
2393             ibv_dereg_mr(rdma->wr_data[idx].control_mr);
2394         }
2395         rdma->wr_data[idx].control_mr = NULL;
2396     }
2397 
2398     if (rdma->local_ram_blocks.block) {
2399         while (rdma->local_ram_blocks.nb_blocks) {
2400             rdma_delete_block(rdma, &rdma->local_ram_blocks.block[0]);
2401         }
2402     }
2403 
2404     if (rdma->qp) {
2405         rdma_destroy_qp(rdma->cm_id);
2406         rdma->qp = NULL;
2407     }
2408     if (rdma->recv_cq) {
2409         ibv_destroy_cq(rdma->recv_cq);
2410         rdma->recv_cq = NULL;
2411     }
2412     if (rdma->send_cq) {
2413         ibv_destroy_cq(rdma->send_cq);
2414         rdma->send_cq = NULL;
2415     }
2416     if (rdma->recv_comp_channel) {
2417         ibv_destroy_comp_channel(rdma->recv_comp_channel);
2418         rdma->recv_comp_channel = NULL;
2419     }
2420     if (rdma->send_comp_channel) {
2421         ibv_destroy_comp_channel(rdma->send_comp_channel);
2422         rdma->send_comp_channel = NULL;
2423     }
2424     if (rdma->pd) {
2425         ibv_dealloc_pd(rdma->pd);
2426         rdma->pd = NULL;
2427     }
2428     if (rdma->cm_id) {
2429         rdma_destroy_id(rdma->cm_id);
2430         rdma->cm_id = NULL;
2431     }
2432 
2433     /* the destination side, listen_id and channel is shared */
2434     if (rdma->listen_id) {
2435         if (!rdma->is_return_path) {
2436             rdma_destroy_id(rdma->listen_id);
2437         }
2438         rdma->listen_id = NULL;
2439 
2440         if (rdma->channel) {
2441             if (!rdma->is_return_path) {
2442                 rdma_destroy_event_channel(rdma->channel);
2443             }
2444             rdma->channel = NULL;
2445         }
2446     }
2447 
2448     if (rdma->channel) {
2449         rdma_destroy_event_channel(rdma->channel);
2450         rdma->channel = NULL;
2451     }
2452     g_free(rdma->host);
2453     g_free(rdma->host_port);
2454     rdma->host = NULL;
2455     rdma->host_port = NULL;
2456 }
2457 
2458 
2459 static int qemu_rdma_source_init(RDMAContext *rdma, bool pin_all, Error **errp)
2460 {
2461     int ret, idx;
2462     Error *local_err = NULL, **temp = &local_err;
2463 
2464     /*
2465      * Will be validated against destination's actual capabilities
2466      * after the connect() completes.
2467      */
2468     rdma->pin_all = pin_all;
2469 
2470     ret = qemu_rdma_resolve_host(rdma, temp);
2471     if (ret) {
2472         goto err_rdma_source_init;
2473     }
2474 
2475     ret = qemu_rdma_alloc_pd_cq(rdma);
2476     if (ret) {
2477         ERROR(temp, "rdma migration: error allocating pd and cq! Your mlock()"
2478                     " limits may be too low. Please check $ ulimit -a # and "
2479                     "search for 'ulimit -l' in the output");
2480         goto err_rdma_source_init;
2481     }
2482 
2483     ret = qemu_rdma_alloc_qp(rdma);
2484     if (ret) {
2485         ERROR(temp, "rdma migration: error allocating qp!");
2486         goto err_rdma_source_init;
2487     }
2488 
2489     ret = qemu_rdma_init_ram_blocks(rdma);
2490     if (ret) {
2491         ERROR(temp, "rdma migration: error initializing ram blocks!");
2492         goto err_rdma_source_init;
2493     }
2494 
2495     /* Build the hash that maps from offset to RAMBlock */
2496     rdma->blockmap = g_hash_table_new(g_direct_hash, g_direct_equal);
2497     for (idx = 0; idx < rdma->local_ram_blocks.nb_blocks; idx++) {
2498         g_hash_table_insert(rdma->blockmap,
2499                 (void *)(uintptr_t)rdma->local_ram_blocks.block[idx].offset,
2500                 &rdma->local_ram_blocks.block[idx]);
2501     }
2502 
2503     for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2504         ret = qemu_rdma_reg_control(rdma, idx);
2505         if (ret) {
2506             ERROR(temp, "rdma migration: error registering %d control!",
2507                                                             idx);
2508             goto err_rdma_source_init;
2509         }
2510     }
2511 
2512     return 0;
2513 
2514 err_rdma_source_init:
2515     error_propagate(errp, local_err);
2516     qemu_rdma_cleanup(rdma);
2517     return -1;
2518 }
2519 
2520 static int qemu_get_cm_event_timeout(RDMAContext *rdma,
2521                                      struct rdma_cm_event **cm_event,
2522                                      long msec, Error **errp)
2523 {
2524     int ret;
2525     struct pollfd poll_fd = {
2526                                 .fd = rdma->channel->fd,
2527                                 .events = POLLIN,
2528                                 .revents = 0
2529                             };
2530 
2531     do {
2532         ret = poll(&poll_fd, 1, msec);
2533     } while (ret < 0 && errno == EINTR);
2534 
2535     if (ret == 0) {
2536         ERROR(errp, "poll cm event timeout");
2537         return -1;
2538     } else if (ret < 0) {
2539         ERROR(errp, "failed to poll cm event, errno=%i", errno);
2540         return -1;
2541     } else if (poll_fd.revents & POLLIN) {
2542         return rdma_get_cm_event(rdma->channel, cm_event);
2543     } else {
2544         ERROR(errp, "no POLLIN event, revent=%x", poll_fd.revents);
2545         return -1;
2546     }
2547 }
2548 
2549 static int qemu_rdma_connect(RDMAContext *rdma, Error **errp, bool return_path)
2550 {
2551     RDMACapabilities cap = {
2552                                 .version = RDMA_CONTROL_VERSION_CURRENT,
2553                                 .flags = 0,
2554                            };
2555     struct rdma_conn_param conn_param = { .initiator_depth = 2,
2556                                           .retry_count = 5,
2557                                           .private_data = &cap,
2558                                           .private_data_len = sizeof(cap),
2559                                         };
2560     struct rdma_cm_event *cm_event;
2561     int ret;
2562 
2563     /*
2564      * Only negotiate the capability with destination if the user
2565      * on the source first requested the capability.
2566      */
2567     if (rdma->pin_all) {
2568         trace_qemu_rdma_connect_pin_all_requested();
2569         cap.flags |= RDMA_CAPABILITY_PIN_ALL;
2570     }
2571 
2572     caps_to_network(&cap);
2573 
2574     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
2575     if (ret) {
2576         ERROR(errp, "posting second control recv");
2577         goto err_rdma_source_connect;
2578     }
2579 
2580     ret = rdma_connect(rdma->cm_id, &conn_param);
2581     if (ret) {
2582         perror("rdma_connect");
2583         ERROR(errp, "connecting to destination!");
2584         goto err_rdma_source_connect;
2585     }
2586 
2587     if (return_path) {
2588         ret = qemu_get_cm_event_timeout(rdma, &cm_event, 5000, errp);
2589     } else {
2590         ret = rdma_get_cm_event(rdma->channel, &cm_event);
2591     }
2592     if (ret) {
2593         perror("rdma_get_cm_event after rdma_connect");
2594         ERROR(errp, "connecting to destination!");
2595         goto err_rdma_source_connect;
2596     }
2597 
2598     if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
2599         error_report("rdma_get_cm_event != EVENT_ESTABLISHED after rdma_connect");
2600         ERROR(errp, "connecting to destination!");
2601         rdma_ack_cm_event(cm_event);
2602         goto err_rdma_source_connect;
2603     }
2604     rdma->connected = true;
2605 
2606     memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
2607     network_to_caps(&cap);
2608 
2609     /*
2610      * Verify that the *requested* capabilities are supported by the destination
2611      * and disable them otherwise.
2612      */
2613     if (rdma->pin_all && !(cap.flags & RDMA_CAPABILITY_PIN_ALL)) {
2614         ERROR(errp, "Server cannot support pinning all memory. "
2615                         "Will register memory dynamically.");
2616         rdma->pin_all = false;
2617     }
2618 
2619     trace_qemu_rdma_connect_pin_all_outcome(rdma->pin_all);
2620 
2621     rdma_ack_cm_event(cm_event);
2622 
2623     rdma->control_ready_expected = 1;
2624     rdma->nb_sent = 0;
2625     return 0;
2626 
2627 err_rdma_source_connect:
2628     qemu_rdma_cleanup(rdma);
2629     return -1;
2630 }
2631 
2632 static int qemu_rdma_dest_init(RDMAContext *rdma, Error **errp)
2633 {
2634     int ret, idx;
2635     struct rdma_cm_id *listen_id;
2636     char ip[40] = "unknown";
2637     struct rdma_addrinfo *res, *e;
2638     char port_str[16];
2639     int reuse = 1;
2640 
2641     for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2642         rdma->wr_data[idx].control_len = 0;
2643         rdma->wr_data[idx].control_curr = NULL;
2644     }
2645 
2646     if (!rdma->host || !rdma->host[0]) {
2647         ERROR(errp, "RDMA host is not set!");
2648         rdma->error_state = -EINVAL;
2649         return -1;
2650     }
2651     /* create CM channel */
2652     rdma->channel = rdma_create_event_channel();
2653     if (!rdma->channel) {
2654         ERROR(errp, "could not create rdma event channel");
2655         rdma->error_state = -EINVAL;
2656         return -1;
2657     }
2658 
2659     /* create CM id */
2660     ret = rdma_create_id(rdma->channel, &listen_id, NULL, RDMA_PS_TCP);
2661     if (ret) {
2662         ERROR(errp, "could not create cm_id!");
2663         goto err_dest_init_create_listen_id;
2664     }
2665 
2666     snprintf(port_str, 16, "%d", rdma->port);
2667     port_str[15] = '\0';
2668 
2669     ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res);
2670     if (ret < 0) {
2671         ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host);
2672         goto err_dest_init_bind_addr;
2673     }
2674 
2675     ret = rdma_set_option(listen_id, RDMA_OPTION_ID, RDMA_OPTION_ID_REUSEADDR,
2676                           &reuse, sizeof reuse);
2677     if (ret) {
2678         ERROR(errp, "Error: could not set REUSEADDR option");
2679         goto err_dest_init_bind_addr;
2680     }
2681     for (e = res; e != NULL; e = e->ai_next) {
2682         inet_ntop(e->ai_family,
2683             &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip);
2684         trace_qemu_rdma_dest_init_trying(rdma->host, ip);
2685         ret = rdma_bind_addr(listen_id, e->ai_dst_addr);
2686         if (ret) {
2687             continue;
2688         }
2689         if (e->ai_family == AF_INET6) {
2690             ret = qemu_rdma_broken_ipv6_kernel(listen_id->verbs, errp);
2691             if (ret) {
2692                 continue;
2693             }
2694         }
2695         break;
2696     }
2697 
2698     rdma_freeaddrinfo(res);
2699     if (!e) {
2700         ERROR(errp, "Error: could not rdma_bind_addr!");
2701         goto err_dest_init_bind_addr;
2702     }
2703 
2704     rdma->listen_id = listen_id;
2705     qemu_rdma_dump_gid("dest_init", listen_id);
2706     return 0;
2707 
2708 err_dest_init_bind_addr:
2709     rdma_destroy_id(listen_id);
2710 err_dest_init_create_listen_id:
2711     rdma_destroy_event_channel(rdma->channel);
2712     rdma->channel = NULL;
2713     rdma->error_state = ret;
2714     return ret;
2715 
2716 }
2717 
2718 static void qemu_rdma_return_path_dest_init(RDMAContext *rdma_return_path,
2719                                             RDMAContext *rdma)
2720 {
2721     int idx;
2722 
2723     for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2724         rdma_return_path->wr_data[idx].control_len = 0;
2725         rdma_return_path->wr_data[idx].control_curr = NULL;
2726     }
2727 
2728     /*the CM channel and CM id is shared*/
2729     rdma_return_path->channel = rdma->channel;
2730     rdma_return_path->listen_id = rdma->listen_id;
2731 
2732     rdma->return_path = rdma_return_path;
2733     rdma_return_path->return_path = rdma;
2734     rdma_return_path->is_return_path = true;
2735 }
2736 
2737 static void *qemu_rdma_data_init(const char *host_port, Error **errp)
2738 {
2739     RDMAContext *rdma = NULL;
2740     InetSocketAddress *addr;
2741 
2742     if (host_port) {
2743         rdma = g_new0(RDMAContext, 1);
2744         rdma->current_index = -1;
2745         rdma->current_chunk = -1;
2746 
2747         addr = g_new(InetSocketAddress, 1);
2748         if (!inet_parse(addr, host_port, NULL)) {
2749             rdma->port = atoi(addr->port);
2750             rdma->host = g_strdup(addr->host);
2751             rdma->host_port = g_strdup(host_port);
2752         } else {
2753             ERROR(errp, "bad RDMA migration address '%s'", host_port);
2754             g_free(rdma);
2755             rdma = NULL;
2756         }
2757 
2758         qapi_free_InetSocketAddress(addr);
2759     }
2760 
2761     return rdma;
2762 }
2763 
2764 /*
2765  * QEMUFile interface to the control channel.
2766  * SEND messages for control only.
2767  * VM's ram is handled with regular RDMA messages.
2768  */
2769 static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
2770                                        const struct iovec *iov,
2771                                        size_t niov,
2772                                        int *fds,
2773                                        size_t nfds,
2774                                        int flags,
2775                                        Error **errp)
2776 {
2777     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2778     QEMUFile *f = rioc->file;
2779     RDMAContext *rdma;
2780     int ret;
2781     ssize_t done = 0;
2782     size_t i;
2783     size_t len = 0;
2784 
2785     RCU_READ_LOCK_GUARD();
2786     rdma = qatomic_rcu_read(&rioc->rdmaout);
2787 
2788     if (!rdma) {
2789         error_setg(errp, "RDMA control channel output is not set");
2790         return -1;
2791     }
2792 
2793     CHECK_ERROR_STATE();
2794 
2795     /*
2796      * Push out any writes that
2797      * we're queued up for VM's ram.
2798      */
2799     ret = qemu_rdma_write_flush(f, rdma);
2800     if (ret < 0) {
2801         rdma->error_state = ret;
2802         error_setg(errp, "qemu_rdma_write_flush returned %d", ret);
2803         return -1;
2804     }
2805 
2806     for (i = 0; i < niov; i++) {
2807         size_t remaining = iov[i].iov_len;
2808         uint8_t * data = (void *)iov[i].iov_base;
2809         while (remaining) {
2810             RDMAControlHeader head;
2811 
2812             len = MIN(remaining, RDMA_SEND_INCREMENT);
2813             remaining -= len;
2814 
2815             head.len = len;
2816             head.type = RDMA_CONTROL_QEMU_FILE;
2817 
2818             ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL);
2819 
2820             if (ret < 0) {
2821                 rdma->error_state = ret;
2822                 error_setg(errp, "qemu_rdma_exchange_send returned %d", ret);
2823                 return -1;
2824             }
2825 
2826             data += len;
2827             done += len;
2828         }
2829     }
2830 
2831     return done;
2832 }
2833 
2834 static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf,
2835                              size_t size, int idx)
2836 {
2837     size_t len = 0;
2838 
2839     if (rdma->wr_data[idx].control_len) {
2840         trace_qemu_rdma_fill(rdma->wr_data[idx].control_len, size);
2841 
2842         len = MIN(size, rdma->wr_data[idx].control_len);
2843         memcpy(buf, rdma->wr_data[idx].control_curr, len);
2844         rdma->wr_data[idx].control_curr += len;
2845         rdma->wr_data[idx].control_len -= len;
2846     }
2847 
2848     return len;
2849 }
2850 
2851 /*
2852  * QEMUFile interface to the control channel.
2853  * RDMA links don't use bytestreams, so we have to
2854  * return bytes to QEMUFile opportunistically.
2855  */
2856 static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
2857                                       const struct iovec *iov,
2858                                       size_t niov,
2859                                       int **fds,
2860                                       size_t *nfds,
2861                                       int flags,
2862                                       Error **errp)
2863 {
2864     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2865     RDMAContext *rdma;
2866     RDMAControlHeader head;
2867     int ret = 0;
2868     ssize_t i;
2869     size_t done = 0;
2870 
2871     RCU_READ_LOCK_GUARD();
2872     rdma = qatomic_rcu_read(&rioc->rdmain);
2873 
2874     if (!rdma) {
2875         error_setg(errp, "RDMA control channel input is not set");
2876         return -1;
2877     }
2878 
2879     CHECK_ERROR_STATE();
2880 
2881     for (i = 0; i < niov; i++) {
2882         size_t want = iov[i].iov_len;
2883         uint8_t *data = (void *)iov[i].iov_base;
2884 
2885         /*
2886          * First, we hold on to the last SEND message we
2887          * were given and dish out the bytes until we run
2888          * out of bytes.
2889          */
2890         ret = qemu_rdma_fill(rdma, data, want, 0);
2891         done += ret;
2892         want -= ret;
2893         /* Got what we needed, so go to next iovec */
2894         if (want == 0) {
2895             continue;
2896         }
2897 
2898         /* If we got any data so far, then don't wait
2899          * for more, just return what we have */
2900         if (done > 0) {
2901             break;
2902         }
2903 
2904 
2905         /* We've got nothing at all, so lets wait for
2906          * more to arrive
2907          */
2908         ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE);
2909 
2910         if (ret < 0) {
2911             rdma->error_state = ret;
2912             error_setg(errp, "qemu_rdma_exchange_recv returned %d", ret);
2913             return -1;
2914         }
2915 
2916         /*
2917          * SEND was received with new bytes, now try again.
2918          */
2919         ret = qemu_rdma_fill(rdma, data, want, 0);
2920         done += ret;
2921         want -= ret;
2922 
2923         /* Still didn't get enough, so lets just return */
2924         if (want) {
2925             if (done == 0) {
2926                 return QIO_CHANNEL_ERR_BLOCK;
2927             } else {
2928                 break;
2929             }
2930         }
2931     }
2932     return done;
2933 }
2934 
2935 /*
2936  * Block until all the outstanding chunks have been delivered by the hardware.
2937  */
2938 static int qemu_rdma_drain_cq(QEMUFile *f, RDMAContext *rdma)
2939 {
2940     int ret;
2941 
2942     if (qemu_rdma_write_flush(f, rdma) < 0) {
2943         return -EIO;
2944     }
2945 
2946     while (rdma->nb_sent) {
2947         ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
2948         if (ret < 0) {
2949             error_report("rdma migration: complete polling error!");
2950             return -EIO;
2951         }
2952     }
2953 
2954     qemu_rdma_unregister_waiting(rdma);
2955 
2956     return 0;
2957 }
2958 
2959 
2960 static int qio_channel_rdma_set_blocking(QIOChannel *ioc,
2961                                          bool blocking,
2962                                          Error **errp)
2963 {
2964     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2965     /* XXX we should make readv/writev actually honour this :-) */
2966     rioc->blocking = blocking;
2967     return 0;
2968 }
2969 
2970 
2971 typedef struct QIOChannelRDMASource QIOChannelRDMASource;
2972 struct QIOChannelRDMASource {
2973     GSource parent;
2974     QIOChannelRDMA *rioc;
2975     GIOCondition condition;
2976 };
2977 
2978 static gboolean
2979 qio_channel_rdma_source_prepare(GSource *source,
2980                                 gint *timeout)
2981 {
2982     QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
2983     RDMAContext *rdma;
2984     GIOCondition cond = 0;
2985     *timeout = -1;
2986 
2987     RCU_READ_LOCK_GUARD();
2988     if (rsource->condition == G_IO_IN) {
2989         rdma = qatomic_rcu_read(&rsource->rioc->rdmain);
2990     } else {
2991         rdma = qatomic_rcu_read(&rsource->rioc->rdmaout);
2992     }
2993 
2994     if (!rdma) {
2995         error_report("RDMAContext is NULL when prepare Gsource");
2996         return FALSE;
2997     }
2998 
2999     if (rdma->wr_data[0].control_len) {
3000         cond |= G_IO_IN;
3001     }
3002     cond |= G_IO_OUT;
3003 
3004     return cond & rsource->condition;
3005 }
3006 
3007 static gboolean
3008 qio_channel_rdma_source_check(GSource *source)
3009 {
3010     QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
3011     RDMAContext *rdma;
3012     GIOCondition cond = 0;
3013 
3014     RCU_READ_LOCK_GUARD();
3015     if (rsource->condition == G_IO_IN) {
3016         rdma = qatomic_rcu_read(&rsource->rioc->rdmain);
3017     } else {
3018         rdma = qatomic_rcu_read(&rsource->rioc->rdmaout);
3019     }
3020 
3021     if (!rdma) {
3022         error_report("RDMAContext is NULL when check Gsource");
3023         return FALSE;
3024     }
3025 
3026     if (rdma->wr_data[0].control_len) {
3027         cond |= G_IO_IN;
3028     }
3029     cond |= G_IO_OUT;
3030 
3031     return cond & rsource->condition;
3032 }
3033 
3034 static gboolean
3035 qio_channel_rdma_source_dispatch(GSource *source,
3036                                  GSourceFunc callback,
3037                                  gpointer user_data)
3038 {
3039     QIOChannelFunc func = (QIOChannelFunc)callback;
3040     QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
3041     RDMAContext *rdma;
3042     GIOCondition cond = 0;
3043 
3044     RCU_READ_LOCK_GUARD();
3045     if (rsource->condition == G_IO_IN) {
3046         rdma = qatomic_rcu_read(&rsource->rioc->rdmain);
3047     } else {
3048         rdma = qatomic_rcu_read(&rsource->rioc->rdmaout);
3049     }
3050 
3051     if (!rdma) {
3052         error_report("RDMAContext is NULL when dispatch Gsource");
3053         return FALSE;
3054     }
3055 
3056     if (rdma->wr_data[0].control_len) {
3057         cond |= G_IO_IN;
3058     }
3059     cond |= G_IO_OUT;
3060 
3061     return (*func)(QIO_CHANNEL(rsource->rioc),
3062                    (cond & rsource->condition),
3063                    user_data);
3064 }
3065 
3066 static void
3067 qio_channel_rdma_source_finalize(GSource *source)
3068 {
3069     QIOChannelRDMASource *ssource = (QIOChannelRDMASource *)source;
3070 
3071     object_unref(OBJECT(ssource->rioc));
3072 }
3073 
3074 GSourceFuncs qio_channel_rdma_source_funcs = {
3075     qio_channel_rdma_source_prepare,
3076     qio_channel_rdma_source_check,
3077     qio_channel_rdma_source_dispatch,
3078     qio_channel_rdma_source_finalize
3079 };
3080 
3081 static GSource *qio_channel_rdma_create_watch(QIOChannel *ioc,
3082                                               GIOCondition condition)
3083 {
3084     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
3085     QIOChannelRDMASource *ssource;
3086     GSource *source;
3087 
3088     source = g_source_new(&qio_channel_rdma_source_funcs,
3089                           sizeof(QIOChannelRDMASource));
3090     ssource = (QIOChannelRDMASource *)source;
3091 
3092     ssource->rioc = rioc;
3093     object_ref(OBJECT(rioc));
3094 
3095     ssource->condition = condition;
3096 
3097     return source;
3098 }
3099 
3100 static void qio_channel_rdma_set_aio_fd_handler(QIOChannel *ioc,
3101                                                   AioContext *ctx,
3102                                                   IOHandler *io_read,
3103                                                   IOHandler *io_write,
3104                                                   void *opaque)
3105 {
3106     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
3107     if (io_read) {
3108         aio_set_fd_handler(ctx, rioc->rdmain->recv_comp_channel->fd,
3109                            false, io_read, io_write, NULL, NULL, opaque);
3110         aio_set_fd_handler(ctx, rioc->rdmain->send_comp_channel->fd,
3111                            false, io_read, io_write, NULL, NULL, opaque);
3112     } else {
3113         aio_set_fd_handler(ctx, rioc->rdmaout->recv_comp_channel->fd,
3114                            false, io_read, io_write, NULL, NULL, opaque);
3115         aio_set_fd_handler(ctx, rioc->rdmaout->send_comp_channel->fd,
3116                            false, io_read, io_write, NULL, NULL, opaque);
3117     }
3118 }
3119 
3120 struct rdma_close_rcu {
3121     struct rcu_head rcu;
3122     RDMAContext *rdmain;
3123     RDMAContext *rdmaout;
3124 };
3125 
3126 /* callback from qio_channel_rdma_close via call_rcu */
3127 static void qio_channel_rdma_close_rcu(struct rdma_close_rcu *rcu)
3128 {
3129     if (rcu->rdmain) {
3130         qemu_rdma_cleanup(rcu->rdmain);
3131     }
3132 
3133     if (rcu->rdmaout) {
3134         qemu_rdma_cleanup(rcu->rdmaout);
3135     }
3136 
3137     g_free(rcu->rdmain);
3138     g_free(rcu->rdmaout);
3139     g_free(rcu);
3140 }
3141 
3142 static int qio_channel_rdma_close(QIOChannel *ioc,
3143                                   Error **errp)
3144 {
3145     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
3146     RDMAContext *rdmain, *rdmaout;
3147     struct rdma_close_rcu *rcu = g_new(struct rdma_close_rcu, 1);
3148 
3149     trace_qemu_rdma_close();
3150 
3151     rdmain = rioc->rdmain;
3152     if (rdmain) {
3153         qatomic_rcu_set(&rioc->rdmain, NULL);
3154     }
3155 
3156     rdmaout = rioc->rdmaout;
3157     if (rdmaout) {
3158         qatomic_rcu_set(&rioc->rdmaout, NULL);
3159     }
3160 
3161     rcu->rdmain = rdmain;
3162     rcu->rdmaout = rdmaout;
3163     call_rcu(rcu, qio_channel_rdma_close_rcu, rcu);
3164 
3165     return 0;
3166 }
3167 
3168 static int
3169 qio_channel_rdma_shutdown(QIOChannel *ioc,
3170                             QIOChannelShutdown how,
3171                             Error **errp)
3172 {
3173     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
3174     RDMAContext *rdmain, *rdmaout;
3175 
3176     RCU_READ_LOCK_GUARD();
3177 
3178     rdmain = qatomic_rcu_read(&rioc->rdmain);
3179     rdmaout = qatomic_rcu_read(&rioc->rdmain);
3180 
3181     switch (how) {
3182     case QIO_CHANNEL_SHUTDOWN_READ:
3183         if (rdmain) {
3184             rdmain->error_state = -1;
3185         }
3186         break;
3187     case QIO_CHANNEL_SHUTDOWN_WRITE:
3188         if (rdmaout) {
3189             rdmaout->error_state = -1;
3190         }
3191         break;
3192     case QIO_CHANNEL_SHUTDOWN_BOTH:
3193     default:
3194         if (rdmain) {
3195             rdmain->error_state = -1;
3196         }
3197         if (rdmaout) {
3198             rdmaout->error_state = -1;
3199         }
3200         break;
3201     }
3202 
3203     return 0;
3204 }
3205 
3206 /*
3207  * Parameters:
3208  *    @offset == 0 :
3209  *        This means that 'block_offset' is a full virtual address that does not
3210  *        belong to a RAMBlock of the virtual machine and instead
3211  *        represents a private malloc'd memory area that the caller wishes to
3212  *        transfer.
3213  *
3214  *    @offset != 0 :
3215  *        Offset is an offset to be added to block_offset and used
3216  *        to also lookup the corresponding RAMBlock.
3217  *
3218  *    @size : Number of bytes to transfer
3219  *
3220  *    @bytes_sent : User-specificed pointer to indicate how many bytes were
3221  *                  sent. Usually, this will not be more than a few bytes of
3222  *                  the protocol because most transfers are sent asynchronously.
3223  */
3224 static size_t qemu_rdma_save_page(QEMUFile *f,
3225                                   ram_addr_t block_offset, ram_addr_t offset,
3226                                   size_t size, uint64_t *bytes_sent)
3227 {
3228     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
3229     RDMAContext *rdma;
3230     int ret;
3231 
3232     RCU_READ_LOCK_GUARD();
3233     rdma = qatomic_rcu_read(&rioc->rdmaout);
3234 
3235     if (!rdma) {
3236         return -EIO;
3237     }
3238 
3239     CHECK_ERROR_STATE();
3240 
3241     if (migration_in_postcopy()) {
3242         return RAM_SAVE_CONTROL_NOT_SUPP;
3243     }
3244 
3245     qemu_fflush(f);
3246 
3247     /*
3248      * Add this page to the current 'chunk'. If the chunk
3249      * is full, or the page doesn't belong to the current chunk,
3250      * an actual RDMA write will occur and a new chunk will be formed.
3251      */
3252     ret = qemu_rdma_write(f, rdma, block_offset, offset, size);
3253     if (ret < 0) {
3254         error_report("rdma migration: write error! %d", ret);
3255         goto err;
3256     }
3257 
3258     /*
3259      * We always return 1 bytes because the RDMA
3260      * protocol is completely asynchronous. We do not yet know
3261      * whether an  identified chunk is zero or not because we're
3262      * waiting for other pages to potentially be merged with
3263      * the current chunk. So, we have to call qemu_update_position()
3264      * later on when the actual write occurs.
3265      */
3266     if (bytes_sent) {
3267         *bytes_sent = 1;
3268     }
3269 
3270     /*
3271      * Drain the Completion Queue if possible, but do not block,
3272      * just poll.
3273      *
3274      * If nothing to poll, the end of the iteration will do this
3275      * again to make sure we don't overflow the request queue.
3276      */
3277     while (1) {
3278         uint64_t wr_id, wr_id_in;
3279         int ret = qemu_rdma_poll(rdma, rdma->recv_cq, &wr_id_in, NULL);
3280         if (ret < 0) {
3281             error_report("rdma migration: polling error! %d", ret);
3282             goto err;
3283         }
3284 
3285         wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
3286 
3287         if (wr_id == RDMA_WRID_NONE) {
3288             break;
3289         }
3290     }
3291 
3292     while (1) {
3293         uint64_t wr_id, wr_id_in;
3294         int ret = qemu_rdma_poll(rdma, rdma->send_cq, &wr_id_in, NULL);
3295         if (ret < 0) {
3296             error_report("rdma migration: polling error! %d", ret);
3297             goto err;
3298         }
3299 
3300         wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
3301 
3302         if (wr_id == RDMA_WRID_NONE) {
3303             break;
3304         }
3305     }
3306 
3307     return RAM_SAVE_CONTROL_DELAYED;
3308 err:
3309     rdma->error_state = ret;
3310     return ret;
3311 }
3312 
3313 static void rdma_accept_incoming_migration(void *opaque);
3314 
3315 static void rdma_cm_poll_handler(void *opaque)
3316 {
3317     RDMAContext *rdma = opaque;
3318     int ret;
3319     struct rdma_cm_event *cm_event;
3320     MigrationIncomingState *mis = migration_incoming_get_current();
3321 
3322     ret = rdma_get_cm_event(rdma->channel, &cm_event);
3323     if (ret) {
3324         error_report("get_cm_event failed %d", errno);
3325         return;
3326     }
3327 
3328     if (cm_event->event == RDMA_CM_EVENT_DISCONNECTED ||
3329         cm_event->event == RDMA_CM_EVENT_DEVICE_REMOVAL) {
3330         if (!rdma->error_state &&
3331             migration_incoming_get_current()->state !=
3332               MIGRATION_STATUS_COMPLETED) {
3333             error_report("receive cm event, cm event is %d", cm_event->event);
3334             rdma->error_state = -EPIPE;
3335             if (rdma->return_path) {
3336                 rdma->return_path->error_state = -EPIPE;
3337             }
3338         }
3339         rdma_ack_cm_event(cm_event);
3340 
3341         if (mis->migration_incoming_co) {
3342             qemu_coroutine_enter(mis->migration_incoming_co);
3343         }
3344         return;
3345     }
3346     rdma_ack_cm_event(cm_event);
3347 }
3348 
3349 static int qemu_rdma_accept(RDMAContext *rdma)
3350 {
3351     RDMACapabilities cap;
3352     struct rdma_conn_param conn_param = {
3353                                             .responder_resources = 2,
3354                                             .private_data = &cap,
3355                                             .private_data_len = sizeof(cap),
3356                                          };
3357     RDMAContext *rdma_return_path = NULL;
3358     struct rdma_cm_event *cm_event;
3359     struct ibv_context *verbs;
3360     int ret = -EINVAL;
3361     int idx;
3362 
3363     ret = rdma_get_cm_event(rdma->channel, &cm_event);
3364     if (ret) {
3365         goto err_rdma_dest_wait;
3366     }
3367 
3368     if (cm_event->event != RDMA_CM_EVENT_CONNECT_REQUEST) {
3369         rdma_ack_cm_event(cm_event);
3370         goto err_rdma_dest_wait;
3371     }
3372 
3373     /*
3374      * initialize the RDMAContext for return path for postcopy after first
3375      * connection request reached.
3376      */
3377     if ((migrate_postcopy() || migrate_return_path())
3378         && !rdma->is_return_path) {
3379         rdma_return_path = qemu_rdma_data_init(rdma->host_port, NULL);
3380         if (rdma_return_path == NULL) {
3381             rdma_ack_cm_event(cm_event);
3382             goto err_rdma_dest_wait;
3383         }
3384 
3385         qemu_rdma_return_path_dest_init(rdma_return_path, rdma);
3386     }
3387 
3388     memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
3389 
3390     network_to_caps(&cap);
3391 
3392     if (cap.version < 1 || cap.version > RDMA_CONTROL_VERSION_CURRENT) {
3393             error_report("Unknown source RDMA version: %d, bailing...",
3394                             cap.version);
3395             rdma_ack_cm_event(cm_event);
3396             goto err_rdma_dest_wait;
3397     }
3398 
3399     /*
3400      * Respond with only the capabilities this version of QEMU knows about.
3401      */
3402     cap.flags &= known_capabilities;
3403 
3404     /*
3405      * Enable the ones that we do know about.
3406      * Add other checks here as new ones are introduced.
3407      */
3408     if (cap.flags & RDMA_CAPABILITY_PIN_ALL) {
3409         rdma->pin_all = true;
3410     }
3411 
3412     rdma->cm_id = cm_event->id;
3413     verbs = cm_event->id->verbs;
3414 
3415     rdma_ack_cm_event(cm_event);
3416 
3417     trace_qemu_rdma_accept_pin_state(rdma->pin_all);
3418 
3419     caps_to_network(&cap);
3420 
3421     trace_qemu_rdma_accept_pin_verbsc(verbs);
3422 
3423     if (!rdma->verbs) {
3424         rdma->verbs = verbs;
3425     } else if (rdma->verbs != verbs) {
3426             error_report("ibv context not matching %p, %p!", rdma->verbs,
3427                          verbs);
3428             goto err_rdma_dest_wait;
3429     }
3430 
3431     qemu_rdma_dump_id("dest_init", verbs);
3432 
3433     ret = qemu_rdma_alloc_pd_cq(rdma);
3434     if (ret) {
3435         error_report("rdma migration: error allocating pd and cq!");
3436         goto err_rdma_dest_wait;
3437     }
3438 
3439     ret = qemu_rdma_alloc_qp(rdma);
3440     if (ret) {
3441         error_report("rdma migration: error allocating qp!");
3442         goto err_rdma_dest_wait;
3443     }
3444 
3445     ret = qemu_rdma_init_ram_blocks(rdma);
3446     if (ret) {
3447         error_report("rdma migration: error initializing ram blocks!");
3448         goto err_rdma_dest_wait;
3449     }
3450 
3451     for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
3452         ret = qemu_rdma_reg_control(rdma, idx);
3453         if (ret) {
3454             error_report("rdma: error registering %d control", idx);
3455             goto err_rdma_dest_wait;
3456         }
3457     }
3458 
3459     /* Accept the second connection request for return path */
3460     if ((migrate_postcopy() || migrate_return_path())
3461         && !rdma->is_return_path) {
3462         qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
3463                             NULL,
3464                             (void *)(intptr_t)rdma->return_path);
3465     } else {
3466         qemu_set_fd_handler(rdma->channel->fd, rdma_cm_poll_handler,
3467                             NULL, rdma);
3468     }
3469 
3470     ret = rdma_accept(rdma->cm_id, &conn_param);
3471     if (ret) {
3472         error_report("rdma_accept returns %d", ret);
3473         goto err_rdma_dest_wait;
3474     }
3475 
3476     ret = rdma_get_cm_event(rdma->channel, &cm_event);
3477     if (ret) {
3478         error_report("rdma_accept get_cm_event failed %d", ret);
3479         goto err_rdma_dest_wait;
3480     }
3481 
3482     if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
3483         error_report("rdma_accept not event established");
3484         rdma_ack_cm_event(cm_event);
3485         goto err_rdma_dest_wait;
3486     }
3487 
3488     rdma_ack_cm_event(cm_event);
3489     rdma->connected = true;
3490 
3491     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
3492     if (ret) {
3493         error_report("rdma migration: error posting second control recv");
3494         goto err_rdma_dest_wait;
3495     }
3496 
3497     qemu_rdma_dump_gid("dest_connect", rdma->cm_id);
3498 
3499     return 0;
3500 
3501 err_rdma_dest_wait:
3502     rdma->error_state = ret;
3503     qemu_rdma_cleanup(rdma);
3504     g_free(rdma_return_path);
3505     return ret;
3506 }
3507 
3508 static int dest_ram_sort_func(const void *a, const void *b)
3509 {
3510     unsigned int a_index = ((const RDMALocalBlock *)a)->src_index;
3511     unsigned int b_index = ((const RDMALocalBlock *)b)->src_index;
3512 
3513     return (a_index < b_index) ? -1 : (a_index != b_index);
3514 }
3515 
3516 /*
3517  * During each iteration of the migration, we listen for instructions
3518  * by the source VM to perform dynamic page registrations before they
3519  * can perform RDMA operations.
3520  *
3521  * We respond with the 'rkey'.
3522  *
3523  * Keep doing this until the source tells us to stop.
3524  */
3525 static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque)
3526 {
3527     RDMAControlHeader reg_resp = { .len = sizeof(RDMARegisterResult),
3528                                .type = RDMA_CONTROL_REGISTER_RESULT,
3529                                .repeat = 0,
3530                              };
3531     RDMAControlHeader unreg_resp = { .len = 0,
3532                                .type = RDMA_CONTROL_UNREGISTER_FINISHED,
3533                                .repeat = 0,
3534                              };
3535     RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT,
3536                                  .repeat = 1 };
3537     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
3538     RDMAContext *rdma;
3539     RDMALocalBlocks *local;
3540     RDMAControlHeader head;
3541     RDMARegister *reg, *registers;
3542     RDMACompress *comp;
3543     RDMARegisterResult *reg_result;
3544     static RDMARegisterResult results[RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE];
3545     RDMALocalBlock *block;
3546     void *host_addr;
3547     int ret = 0;
3548     int idx = 0;
3549     int count = 0;
3550     int i = 0;
3551 
3552     RCU_READ_LOCK_GUARD();
3553     rdma = qatomic_rcu_read(&rioc->rdmain);
3554 
3555     if (!rdma) {
3556         return -EIO;
3557     }
3558 
3559     CHECK_ERROR_STATE();
3560 
3561     local = &rdma->local_ram_blocks;
3562     do {
3563         trace_qemu_rdma_registration_handle_wait();
3564 
3565         ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_NONE);
3566 
3567         if (ret < 0) {
3568             break;
3569         }
3570 
3571         if (head.repeat > RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE) {
3572             error_report("rdma: Too many requests in this message (%d)."
3573                             "Bailing.", head.repeat);
3574             ret = -EIO;
3575             break;
3576         }
3577 
3578         switch (head.type) {
3579         case RDMA_CONTROL_COMPRESS:
3580             comp = (RDMACompress *) rdma->wr_data[idx].control_curr;
3581             network_to_compress(comp);
3582 
3583             trace_qemu_rdma_registration_handle_compress(comp->length,
3584                                                          comp->block_idx,
3585                                                          comp->offset);
3586             if (comp->block_idx >= rdma->local_ram_blocks.nb_blocks) {
3587                 error_report("rdma: 'compress' bad block index %u (vs %d)",
3588                              (unsigned int)comp->block_idx,
3589                              rdma->local_ram_blocks.nb_blocks);
3590                 ret = -EIO;
3591                 goto out;
3592             }
3593             block = &(rdma->local_ram_blocks.block[comp->block_idx]);
3594 
3595             host_addr = block->local_host_addr +
3596                             (comp->offset - block->offset);
3597 
3598             ram_handle_compressed(host_addr, comp->value, comp->length);
3599             break;
3600 
3601         case RDMA_CONTROL_REGISTER_FINISHED:
3602             trace_qemu_rdma_registration_handle_finished();
3603             goto out;
3604 
3605         case RDMA_CONTROL_RAM_BLOCKS_REQUEST:
3606             trace_qemu_rdma_registration_handle_ram_blocks();
3607 
3608             /* Sort our local RAM Block list so it's the same as the source,
3609              * we can do this since we've filled in a src_index in the list
3610              * as we received the RAMBlock list earlier.
3611              */
3612             qsort(rdma->local_ram_blocks.block,
3613                   rdma->local_ram_blocks.nb_blocks,
3614                   sizeof(RDMALocalBlock), dest_ram_sort_func);
3615             for (i = 0; i < local->nb_blocks; i++) {
3616                 local->block[i].index = i;
3617             }
3618 
3619             if (rdma->pin_all) {
3620                 ret = qemu_rdma_reg_whole_ram_blocks(rdma);
3621                 if (ret) {
3622                     error_report("rdma migration: error dest "
3623                                     "registering ram blocks");
3624                     goto out;
3625                 }
3626             }
3627 
3628             /*
3629              * Dest uses this to prepare to transmit the RAMBlock descriptions
3630              * to the source VM after connection setup.
3631              * Both sides use the "remote" structure to communicate and update
3632              * their "local" descriptions with what was sent.
3633              */
3634             for (i = 0; i < local->nb_blocks; i++) {
3635                 rdma->dest_blocks[i].remote_host_addr =
3636                     (uintptr_t)(local->block[i].local_host_addr);
3637 
3638                 if (rdma->pin_all) {
3639                     rdma->dest_blocks[i].remote_rkey = local->block[i].mr->rkey;
3640                 }
3641 
3642                 rdma->dest_blocks[i].offset = local->block[i].offset;
3643                 rdma->dest_blocks[i].length = local->block[i].length;
3644 
3645                 dest_block_to_network(&rdma->dest_blocks[i]);
3646                 trace_qemu_rdma_registration_handle_ram_blocks_loop(
3647                     local->block[i].block_name,
3648                     local->block[i].offset,
3649                     local->block[i].length,
3650                     local->block[i].local_host_addr,
3651                     local->block[i].src_index);
3652             }
3653 
3654             blocks.len = rdma->local_ram_blocks.nb_blocks
3655                                                 * sizeof(RDMADestBlock);
3656 
3657 
3658             ret = qemu_rdma_post_send_control(rdma,
3659                                         (uint8_t *) rdma->dest_blocks, &blocks);
3660 
3661             if (ret < 0) {
3662                 error_report("rdma migration: error sending remote info");
3663                 goto out;
3664             }
3665 
3666             break;
3667         case RDMA_CONTROL_REGISTER_REQUEST:
3668             trace_qemu_rdma_registration_handle_register(head.repeat);
3669 
3670             reg_resp.repeat = head.repeat;
3671             registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
3672 
3673             for (count = 0; count < head.repeat; count++) {
3674                 uint64_t chunk;
3675                 uint8_t *chunk_start, *chunk_end;
3676 
3677                 reg = &registers[count];
3678                 network_to_register(reg);
3679 
3680                 reg_result = &results[count];
3681 
3682                 trace_qemu_rdma_registration_handle_register_loop(count,
3683                          reg->current_index, reg->key.current_addr, reg->chunks);
3684 
3685                 if (reg->current_index >= rdma->local_ram_blocks.nb_blocks) {
3686                     error_report("rdma: 'register' bad block index %u (vs %d)",
3687                                  (unsigned int)reg->current_index,
3688                                  rdma->local_ram_blocks.nb_blocks);
3689                     ret = -ENOENT;
3690                     goto out;
3691                 }
3692                 block = &(rdma->local_ram_blocks.block[reg->current_index]);
3693                 if (block->is_ram_block) {
3694                     if (block->offset > reg->key.current_addr) {
3695                         error_report("rdma: bad register address for block %s"
3696                             " offset: %" PRIx64 " current_addr: %" PRIx64,
3697                             block->block_name, block->offset,
3698                             reg->key.current_addr);
3699                         ret = -ERANGE;
3700                         goto out;
3701                     }
3702                     host_addr = (block->local_host_addr +
3703                                 (reg->key.current_addr - block->offset));
3704                     chunk = ram_chunk_index(block->local_host_addr,
3705                                             (uint8_t *) host_addr);
3706                 } else {
3707                     chunk = reg->key.chunk;
3708                     host_addr = block->local_host_addr +
3709                         (reg->key.chunk * (1UL << RDMA_REG_CHUNK_SHIFT));
3710                     /* Check for particularly bad chunk value */
3711                     if (host_addr < (void *)block->local_host_addr) {
3712                         error_report("rdma: bad chunk for block %s"
3713                             " chunk: %" PRIx64,
3714                             block->block_name, reg->key.chunk);
3715                         ret = -ERANGE;
3716                         goto out;
3717                     }
3718                 }
3719                 chunk_start = ram_chunk_start(block, chunk);
3720                 chunk_end = ram_chunk_end(block, chunk + reg->chunks);
3721                 /* avoid "-Waddress-of-packed-member" warning */
3722                 uint32_t tmp_rkey = 0;
3723                 if (qemu_rdma_register_and_get_keys(rdma, block,
3724                             (uintptr_t)host_addr, NULL, &tmp_rkey,
3725                             chunk, chunk_start, chunk_end)) {
3726                     error_report("cannot get rkey");
3727                     ret = -EINVAL;
3728                     goto out;
3729                 }
3730                 reg_result->rkey = tmp_rkey;
3731 
3732                 reg_result->host_addr = (uintptr_t)block->local_host_addr;
3733 
3734                 trace_qemu_rdma_registration_handle_register_rkey(
3735                                                            reg_result->rkey);
3736 
3737                 result_to_network(reg_result);
3738             }
3739 
3740             ret = qemu_rdma_post_send_control(rdma,
3741                             (uint8_t *) results, &reg_resp);
3742 
3743             if (ret < 0) {
3744                 error_report("Failed to send control buffer");
3745                 goto out;
3746             }
3747             break;
3748         case RDMA_CONTROL_UNREGISTER_REQUEST:
3749             trace_qemu_rdma_registration_handle_unregister(head.repeat);
3750             unreg_resp.repeat = head.repeat;
3751             registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
3752 
3753             for (count = 0; count < head.repeat; count++) {
3754                 reg = &registers[count];
3755                 network_to_register(reg);
3756 
3757                 trace_qemu_rdma_registration_handle_unregister_loop(count,
3758                            reg->current_index, reg->key.chunk);
3759 
3760                 block = &(rdma->local_ram_blocks.block[reg->current_index]);
3761 
3762                 ret = ibv_dereg_mr(block->pmr[reg->key.chunk]);
3763                 block->pmr[reg->key.chunk] = NULL;
3764 
3765                 if (ret != 0) {
3766                     perror("rdma unregistration chunk failed");
3767                     ret = -ret;
3768                     goto out;
3769                 }
3770 
3771                 rdma->total_registrations--;
3772 
3773                 trace_qemu_rdma_registration_handle_unregister_success(
3774                                                        reg->key.chunk);
3775             }
3776 
3777             ret = qemu_rdma_post_send_control(rdma, NULL, &unreg_resp);
3778 
3779             if (ret < 0) {
3780                 error_report("Failed to send control buffer");
3781                 goto out;
3782             }
3783             break;
3784         case RDMA_CONTROL_REGISTER_RESULT:
3785             error_report("Invalid RESULT message at dest.");
3786             ret = -EIO;
3787             goto out;
3788         default:
3789             error_report("Unknown control message %s", control_desc(head.type));
3790             ret = -EIO;
3791             goto out;
3792         }
3793     } while (1);
3794 out:
3795     if (ret < 0) {
3796         rdma->error_state = ret;
3797     }
3798     return ret;
3799 }
3800 
3801 /* Destination:
3802  * Called via a ram_control_load_hook during the initial RAM load section which
3803  * lists the RAMBlocks by name.  This lets us know the order of the RAMBlocks
3804  * on the source.
3805  * We've already built our local RAMBlock list, but not yet sent the list to
3806  * the source.
3807  */
3808 static int
3809 rdma_block_notification_handle(QIOChannelRDMA *rioc, const char *name)
3810 {
3811     RDMAContext *rdma;
3812     int curr;
3813     int found = -1;
3814 
3815     RCU_READ_LOCK_GUARD();
3816     rdma = qatomic_rcu_read(&rioc->rdmain);
3817 
3818     if (!rdma) {
3819         return -EIO;
3820     }
3821 
3822     /* Find the matching RAMBlock in our local list */
3823     for (curr = 0; curr < rdma->local_ram_blocks.nb_blocks; curr++) {
3824         if (!strcmp(rdma->local_ram_blocks.block[curr].block_name, name)) {
3825             found = curr;
3826             break;
3827         }
3828     }
3829 
3830     if (found == -1) {
3831         error_report("RAMBlock '%s' not found on destination", name);
3832         return -ENOENT;
3833     }
3834 
3835     rdma->local_ram_blocks.block[curr].src_index = rdma->next_src_index;
3836     trace_rdma_block_notification_handle(name, rdma->next_src_index);
3837     rdma->next_src_index++;
3838 
3839     return 0;
3840 }
3841 
3842 static int rdma_load_hook(QEMUFile *f, uint64_t flags, void *data)
3843 {
3844     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
3845     switch (flags) {
3846     case RAM_CONTROL_BLOCK_REG:
3847         return rdma_block_notification_handle(rioc, data);
3848 
3849     case RAM_CONTROL_HOOK:
3850         return qemu_rdma_registration_handle(f, rioc);
3851 
3852     default:
3853         /* Shouldn't be called with any other values */
3854         abort();
3855     }
3856 }
3857 
3858 static int qemu_rdma_registration_start(QEMUFile *f,
3859                                         uint64_t flags, void *data)
3860 {
3861     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
3862     RDMAContext *rdma;
3863 
3864     RCU_READ_LOCK_GUARD();
3865     rdma = qatomic_rcu_read(&rioc->rdmaout);
3866     if (!rdma) {
3867         return -EIO;
3868     }
3869 
3870     CHECK_ERROR_STATE();
3871 
3872     if (migration_in_postcopy()) {
3873         return 0;
3874     }
3875 
3876     trace_qemu_rdma_registration_start(flags);
3877     qemu_put_be64(f, RAM_SAVE_FLAG_HOOK);
3878     qemu_fflush(f);
3879 
3880     return 0;
3881 }
3882 
3883 /*
3884  * Inform dest that dynamic registrations are done for now.
3885  * First, flush writes, if any.
3886  */
3887 static int qemu_rdma_registration_stop(QEMUFile *f,
3888                                        uint64_t flags, void *data)
3889 {
3890     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
3891     RDMAContext *rdma;
3892     RDMAControlHeader head = { .len = 0, .repeat = 1 };
3893     int ret = 0;
3894 
3895     RCU_READ_LOCK_GUARD();
3896     rdma = qatomic_rcu_read(&rioc->rdmaout);
3897     if (!rdma) {
3898         return -EIO;
3899     }
3900 
3901     CHECK_ERROR_STATE();
3902 
3903     if (migration_in_postcopy()) {
3904         return 0;
3905     }
3906 
3907     qemu_fflush(f);
3908     ret = qemu_rdma_drain_cq(f, rdma);
3909 
3910     if (ret < 0) {
3911         goto err;
3912     }
3913 
3914     if (flags == RAM_CONTROL_SETUP) {
3915         RDMAControlHeader resp = {.type = RDMA_CONTROL_RAM_BLOCKS_RESULT };
3916         RDMALocalBlocks *local = &rdma->local_ram_blocks;
3917         int reg_result_idx, i, nb_dest_blocks;
3918 
3919         head.type = RDMA_CONTROL_RAM_BLOCKS_REQUEST;
3920         trace_qemu_rdma_registration_stop_ram();
3921 
3922         /*
3923          * Make sure that we parallelize the pinning on both sides.
3924          * For very large guests, doing this serially takes a really
3925          * long time, so we have to 'interleave' the pinning locally
3926          * with the control messages by performing the pinning on this
3927          * side before we receive the control response from the other
3928          * side that the pinning has completed.
3929          */
3930         ret = qemu_rdma_exchange_send(rdma, &head, NULL, &resp,
3931                     &reg_result_idx, rdma->pin_all ?
3932                     qemu_rdma_reg_whole_ram_blocks : NULL);
3933         if (ret < 0) {
3934             fprintf(stderr, "receiving remote info!");
3935             return ret;
3936         }
3937 
3938         nb_dest_blocks = resp.len / sizeof(RDMADestBlock);
3939 
3940         /*
3941          * The protocol uses two different sets of rkeys (mutually exclusive):
3942          * 1. One key to represent the virtual address of the entire ram block.
3943          *    (dynamic chunk registration disabled - pin everything with one rkey.)
3944          * 2. One to represent individual chunks within a ram block.
3945          *    (dynamic chunk registration enabled - pin individual chunks.)
3946          *
3947          * Once the capability is successfully negotiated, the destination transmits
3948          * the keys to use (or sends them later) including the virtual addresses
3949          * and then propagates the remote ram block descriptions to his local copy.
3950          */
3951 
3952         if (local->nb_blocks != nb_dest_blocks) {
3953             fprintf(stderr, "ram blocks mismatch (Number of blocks %d vs %d) "
3954                     "Your QEMU command line parameters are probably "
3955                     "not identical on both the source and destination.",
3956                     local->nb_blocks, nb_dest_blocks);
3957             rdma->error_state = -EINVAL;
3958             return -EINVAL;
3959         }
3960 
3961         qemu_rdma_move_header(rdma, reg_result_idx, &resp);
3962         memcpy(rdma->dest_blocks,
3963             rdma->wr_data[reg_result_idx].control_curr, resp.len);
3964         for (i = 0; i < nb_dest_blocks; i++) {
3965             network_to_dest_block(&rdma->dest_blocks[i]);
3966 
3967             /* We require that the blocks are in the same order */
3968             if (rdma->dest_blocks[i].length != local->block[i].length) {
3969                 fprintf(stderr, "Block %s/%d has a different length %" PRIu64
3970                         "vs %" PRIu64, local->block[i].block_name, i,
3971                         local->block[i].length,
3972                         rdma->dest_blocks[i].length);
3973                 rdma->error_state = -EINVAL;
3974                 return -EINVAL;
3975             }
3976             local->block[i].remote_host_addr =
3977                     rdma->dest_blocks[i].remote_host_addr;
3978             local->block[i].remote_rkey = rdma->dest_blocks[i].remote_rkey;
3979         }
3980     }
3981 
3982     trace_qemu_rdma_registration_stop(flags);
3983 
3984     head.type = RDMA_CONTROL_REGISTER_FINISHED;
3985     ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL);
3986 
3987     if (ret < 0) {
3988         goto err;
3989     }
3990 
3991     return 0;
3992 err:
3993     rdma->error_state = ret;
3994     return ret;
3995 }
3996 
3997 static const QEMUFileHooks rdma_read_hooks = {
3998     .hook_ram_load = rdma_load_hook,
3999 };
4000 
4001 static const QEMUFileHooks rdma_write_hooks = {
4002     .before_ram_iterate = qemu_rdma_registration_start,
4003     .after_ram_iterate  = qemu_rdma_registration_stop,
4004     .save_page          = qemu_rdma_save_page,
4005 };
4006 
4007 
4008 static void qio_channel_rdma_finalize(Object *obj)
4009 {
4010     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(obj);
4011     if (rioc->rdmain) {
4012         qemu_rdma_cleanup(rioc->rdmain);
4013         g_free(rioc->rdmain);
4014         rioc->rdmain = NULL;
4015     }
4016     if (rioc->rdmaout) {
4017         qemu_rdma_cleanup(rioc->rdmaout);
4018         g_free(rioc->rdmaout);
4019         rioc->rdmaout = NULL;
4020     }
4021 }
4022 
4023 static void qio_channel_rdma_class_init(ObjectClass *klass,
4024                                         void *class_data G_GNUC_UNUSED)
4025 {
4026     QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass);
4027 
4028     ioc_klass->io_writev = qio_channel_rdma_writev;
4029     ioc_klass->io_readv = qio_channel_rdma_readv;
4030     ioc_klass->io_set_blocking = qio_channel_rdma_set_blocking;
4031     ioc_klass->io_close = qio_channel_rdma_close;
4032     ioc_klass->io_create_watch = qio_channel_rdma_create_watch;
4033     ioc_klass->io_set_aio_fd_handler = qio_channel_rdma_set_aio_fd_handler;
4034     ioc_klass->io_shutdown = qio_channel_rdma_shutdown;
4035 }
4036 
4037 static const TypeInfo qio_channel_rdma_info = {
4038     .parent = TYPE_QIO_CHANNEL,
4039     .name = TYPE_QIO_CHANNEL_RDMA,
4040     .instance_size = sizeof(QIOChannelRDMA),
4041     .instance_finalize = qio_channel_rdma_finalize,
4042     .class_init = qio_channel_rdma_class_init,
4043 };
4044 
4045 static void qio_channel_rdma_register_types(void)
4046 {
4047     type_register_static(&qio_channel_rdma_info);
4048 }
4049 
4050 type_init(qio_channel_rdma_register_types);
4051 
4052 static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
4053 {
4054     QIOChannelRDMA *rioc;
4055 
4056     if (qemu_file_mode_is_not_valid(mode)) {
4057         return NULL;
4058     }
4059 
4060     rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
4061 
4062     if (mode[0] == 'w') {
4063         rioc->file = qemu_file_new_output(QIO_CHANNEL(rioc));
4064         rioc->rdmaout = rdma;
4065         rioc->rdmain = rdma->return_path;
4066         qemu_file_set_hooks(rioc->file, &rdma_write_hooks);
4067     } else {
4068         rioc->file = qemu_file_new_input(QIO_CHANNEL(rioc));
4069         rioc->rdmain = rdma;
4070         rioc->rdmaout = rdma->return_path;
4071         qemu_file_set_hooks(rioc->file, &rdma_read_hooks);
4072     }
4073 
4074     return rioc->file;
4075 }
4076 
4077 static void rdma_accept_incoming_migration(void *opaque)
4078 {
4079     RDMAContext *rdma = opaque;
4080     int ret;
4081     QEMUFile *f;
4082     Error *local_err = NULL;
4083 
4084     trace_qemu_rdma_accept_incoming_migration();
4085     ret = qemu_rdma_accept(rdma);
4086 
4087     if (ret) {
4088         fprintf(stderr, "RDMA ERROR: Migration initialization failed\n");
4089         return;
4090     }
4091 
4092     trace_qemu_rdma_accept_incoming_migration_accepted();
4093 
4094     if (rdma->is_return_path) {
4095         return;
4096     }
4097 
4098     f = qemu_fopen_rdma(rdma, "rb");
4099     if (f == NULL) {
4100         fprintf(stderr, "RDMA ERROR: could not qemu_fopen_rdma\n");
4101         qemu_rdma_cleanup(rdma);
4102         return;
4103     }
4104 
4105     rdma->migration_started_on_destination = 1;
4106     migration_fd_process_incoming(f, &local_err);
4107     if (local_err) {
4108         error_reportf_err(local_err, "RDMA ERROR:");
4109     }
4110 }
4111 
4112 void rdma_start_incoming_migration(const char *host_port, Error **errp)
4113 {
4114     int ret;
4115     RDMAContext *rdma;
4116     Error *local_err = NULL;
4117 
4118     trace_rdma_start_incoming_migration();
4119 
4120     /* Avoid ram_block_discard_disable(), cannot change during migration. */
4121     if (ram_block_discard_is_required()) {
4122         error_setg(errp, "RDMA: cannot disable RAM discard");
4123         return;
4124     }
4125 
4126     rdma = qemu_rdma_data_init(host_port, &local_err);
4127     if (rdma == NULL) {
4128         goto err;
4129     }
4130 
4131     ret = qemu_rdma_dest_init(rdma, &local_err);
4132 
4133     if (ret) {
4134         goto err;
4135     }
4136 
4137     trace_rdma_start_incoming_migration_after_dest_init();
4138 
4139     ret = rdma_listen(rdma->listen_id, 5);
4140 
4141     if (ret) {
4142         ERROR(errp, "listening on socket!");
4143         goto cleanup_rdma;
4144     }
4145 
4146     trace_rdma_start_incoming_migration_after_rdma_listen();
4147 
4148     qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
4149                         NULL, (void *)(intptr_t)rdma);
4150     return;
4151 
4152 cleanup_rdma:
4153     qemu_rdma_cleanup(rdma);
4154 err:
4155     error_propagate(errp, local_err);
4156     if (rdma) {
4157         g_free(rdma->host);
4158         g_free(rdma->host_port);
4159     }
4160     g_free(rdma);
4161 }
4162 
4163 void rdma_start_outgoing_migration(void *opaque,
4164                             const char *host_port, Error **errp)
4165 {
4166     MigrationState *s = opaque;
4167     RDMAContext *rdma_return_path = NULL;
4168     RDMAContext *rdma;
4169     int ret = 0;
4170 
4171     /* Avoid ram_block_discard_disable(), cannot change during migration. */
4172     if (ram_block_discard_is_required()) {
4173         error_setg(errp, "RDMA: cannot disable RAM discard");
4174         return;
4175     }
4176 
4177     rdma = qemu_rdma_data_init(host_port, errp);
4178     if (rdma == NULL) {
4179         goto err;
4180     }
4181 
4182     ret = qemu_rdma_source_init(rdma, migrate_rdma_pin_all(), errp);
4183 
4184     if (ret) {
4185         goto err;
4186     }
4187 
4188     trace_rdma_start_outgoing_migration_after_rdma_source_init();
4189     ret = qemu_rdma_connect(rdma, errp, false);
4190 
4191     if (ret) {
4192         goto err;
4193     }
4194 
4195     /* RDMA postcopy need a separate queue pair for return path */
4196     if (migrate_postcopy() || migrate_return_path()) {
4197         rdma_return_path = qemu_rdma_data_init(host_port, errp);
4198 
4199         if (rdma_return_path == NULL) {
4200             goto return_path_err;
4201         }
4202 
4203         ret = qemu_rdma_source_init(rdma_return_path,
4204                                     migrate_rdma_pin_all(), errp);
4205 
4206         if (ret) {
4207             goto return_path_err;
4208         }
4209 
4210         ret = qemu_rdma_connect(rdma_return_path, errp, true);
4211 
4212         if (ret) {
4213             goto return_path_err;
4214         }
4215 
4216         rdma->return_path = rdma_return_path;
4217         rdma_return_path->return_path = rdma;
4218         rdma_return_path->is_return_path = true;
4219     }
4220 
4221     trace_rdma_start_outgoing_migration_after_rdma_connect();
4222 
4223     s->to_dst_file = qemu_fopen_rdma(rdma, "wb");
4224     migrate_fd_connect(s, NULL);
4225     return;
4226 return_path_err:
4227     qemu_rdma_cleanup(rdma);
4228 err:
4229     g_free(rdma);
4230     g_free(rdma_return_path);
4231 }
4232