]> git.proxmox.com Git - mirror_qemu.git/blame - migration/rdma.c
migration: create a dedicated connection for rdma return path
[mirror_qemu.git] / migration / rdma.c
CommitLineData
2da776db
MH
1/*
2 * RDMA protocol and interfaces
3 *
4 * Copyright IBM, Corp. 2010-2013
6ddd2d76 5 * Copyright Red Hat, Inc. 2015-2016
2da776db
MH
6 *
7 * Authors:
8 * Michael R. Hines <mrhines@us.ibm.com>
9 * Jiuxing Liu <jl@us.ibm.com>
6ddd2d76 10 * Daniel P. Berrange <berrange@redhat.com>
2da776db
MH
11 *
12 * This work is licensed under the terms of the GNU GPL, version 2 or
13 * later. See the COPYING file in the top-level directory.
14 *
15 */
1393a485 16#include "qemu/osdep.h"
da34e65c 17#include "qapi/error.h"
2da776db 18#include "qemu-common.h"
f348b6d1 19#include "qemu/cutils.h"
e1a3ecee 20#include "rdma.h"
6666c96a 21#include "migration.h"
08a0aee1 22#include "qemu-file.h"
7b1e1a22 23#include "ram.h"
40014d81 24#include "qemu-file-channel.h"
d49b6836 25#include "qemu/error-report.h"
2da776db
MH
26#include "qemu/main-loop.h"
27#include "qemu/sockets.h"
28#include "qemu/bitmap.h"
10817bf0 29#include "qemu/coroutine.h"
2da776db
MH
30#include <sys/socket.h>
31#include <netdb.h>
32#include <arpa/inet.h>
2da776db 33#include <rdma/rdma_cma.h>
733252de 34#include "trace.h"
2da776db
MH
35
36/*
37 * Print and error on both the Monitor and the Log file.
38 */
39#define ERROR(errp, fmt, ...) \
40 do { \
66988941 41 fprintf(stderr, "RDMA ERROR: " fmt "\n", ## __VA_ARGS__); \
2da776db
MH
42 if (errp && (*(errp) == NULL)) { \
43 error_setg(errp, "RDMA ERROR: " fmt, ## __VA_ARGS__); \
44 } \
45 } while (0)
46
47#define RDMA_RESOLVE_TIMEOUT_MS 10000
48
49/* Do not merge data if larger than this. */
50#define RDMA_MERGE_MAX (2 * 1024 * 1024)
51#define RDMA_SIGNALED_SEND_MAX (RDMA_MERGE_MAX / 4096)
52
53#define RDMA_REG_CHUNK_SHIFT 20 /* 1 MB */
54
55/*
56 * This is only for non-live state being migrated.
57 * Instead of RDMA_WRITE messages, we use RDMA_SEND
58 * messages for that state, which requires a different
59 * delivery design than main memory.
60 */
61#define RDMA_SEND_INCREMENT 32768
62
63/*
64 * Maximum size infiniband SEND message
65 */
66#define RDMA_CONTROL_MAX_BUFFER (512 * 1024)
67#define RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE 4096
68
69#define RDMA_CONTROL_VERSION_CURRENT 1
70/*
71 * Capabilities for negotiation.
72 */
73#define RDMA_CAPABILITY_PIN_ALL 0x01
74
75/*
76 * Add the other flags above to this list of known capabilities
77 * as they are introduced.
78 */
79static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL;
80
81#define CHECK_ERROR_STATE() \
82 do { \
83 if (rdma->error_state) { \
84 if (!rdma->error_reported) { \
733252de
DDAG
85 error_report("RDMA is in an error state waiting migration" \
86 " to abort!"); \
2da776db
MH
87 rdma->error_reported = 1; \
88 } \
89 return rdma->error_state; \
90 } \
2562755e 91 } while (0)
2da776db
MH
92
93/*
94 * A work request ID is 64-bits and we split up these bits
95 * into 3 parts:
96 *
97 * bits 0-15 : type of control message, 2^16
98 * bits 16-29: ram block index, 2^14
99 * bits 30-63: ram block chunk number, 2^34
100 *
101 * The last two bit ranges are only used for RDMA writes,
102 * in order to track their completion and potentially
103 * also track unregistration status of the message.
104 */
105#define RDMA_WRID_TYPE_SHIFT 0UL
106#define RDMA_WRID_BLOCK_SHIFT 16UL
107#define RDMA_WRID_CHUNK_SHIFT 30UL
108
109#define RDMA_WRID_TYPE_MASK \
110 ((1UL << RDMA_WRID_BLOCK_SHIFT) - 1UL)
111
112#define RDMA_WRID_BLOCK_MASK \
113 (~RDMA_WRID_TYPE_MASK & ((1UL << RDMA_WRID_CHUNK_SHIFT) - 1UL))
114
115#define RDMA_WRID_CHUNK_MASK (~RDMA_WRID_BLOCK_MASK & ~RDMA_WRID_TYPE_MASK)
116
117/*
118 * RDMA migration protocol:
119 * 1. RDMA Writes (data messages, i.e. RAM)
120 * 2. IB Send/Recv (control channel messages)
121 */
122enum {
123 RDMA_WRID_NONE = 0,
124 RDMA_WRID_RDMA_WRITE = 1,
125 RDMA_WRID_SEND_CONTROL = 2000,
126 RDMA_WRID_RECV_CONTROL = 4000,
127};
128
2ae31aea 129static const char *wrid_desc[] = {
2da776db
MH
130 [RDMA_WRID_NONE] = "NONE",
131 [RDMA_WRID_RDMA_WRITE] = "WRITE RDMA",
132 [RDMA_WRID_SEND_CONTROL] = "CONTROL SEND",
133 [RDMA_WRID_RECV_CONTROL] = "CONTROL RECV",
134};
135
136/*
137 * Work request IDs for IB SEND messages only (not RDMA writes).
138 * This is used by the migration protocol to transmit
139 * control messages (such as device state and registration commands)
140 *
141 * We could use more WRs, but we have enough for now.
142 */
143enum {
144 RDMA_WRID_READY = 0,
145 RDMA_WRID_DATA,
146 RDMA_WRID_CONTROL,
147 RDMA_WRID_MAX,
148};
149
150/*
151 * SEND/RECV IB Control Messages.
152 */
153enum {
154 RDMA_CONTROL_NONE = 0,
155 RDMA_CONTROL_ERROR,
156 RDMA_CONTROL_READY, /* ready to receive */
157 RDMA_CONTROL_QEMU_FILE, /* QEMUFile-transmitted bytes */
158 RDMA_CONTROL_RAM_BLOCKS_REQUEST, /* RAMBlock synchronization */
159 RDMA_CONTROL_RAM_BLOCKS_RESULT, /* RAMBlock synchronization */
160 RDMA_CONTROL_COMPRESS, /* page contains repeat values */
161 RDMA_CONTROL_REGISTER_REQUEST, /* dynamic page registration */
162 RDMA_CONTROL_REGISTER_RESULT, /* key to use after registration */
163 RDMA_CONTROL_REGISTER_FINISHED, /* current iteration finished */
164 RDMA_CONTROL_UNREGISTER_REQUEST, /* dynamic UN-registration */
165 RDMA_CONTROL_UNREGISTER_FINISHED, /* unpinning finished */
166};
167
2da776db
MH
168
169/*
170 * Memory and MR structures used to represent an IB Send/Recv work request.
171 * This is *not* used for RDMA writes, only IB Send/Recv.
172 */
173typedef struct {
174 uint8_t control[RDMA_CONTROL_MAX_BUFFER]; /* actual buffer to register */
175 struct ibv_mr *control_mr; /* registration metadata */
176 size_t control_len; /* length of the message */
177 uint8_t *control_curr; /* start of unconsumed bytes */
178} RDMAWorkRequestData;
179
180/*
181 * Negotiate RDMA capabilities during connection-setup time.
182 */
183typedef struct {
184 uint32_t version;
185 uint32_t flags;
186} RDMACapabilities;
187
188static void caps_to_network(RDMACapabilities *cap)
189{
190 cap->version = htonl(cap->version);
191 cap->flags = htonl(cap->flags);
192}
193
194static void network_to_caps(RDMACapabilities *cap)
195{
196 cap->version = ntohl(cap->version);
197 cap->flags = ntohl(cap->flags);
198}
199
200/*
201 * Representation of a RAMBlock from an RDMA perspective.
202 * This is not transmitted, only local.
203 * This and subsequent structures cannot be linked lists
204 * because we're using a single IB message to transmit
205 * the information. It's small anyway, so a list is overkill.
206 */
207typedef struct RDMALocalBlock {
4fb5364b
DDAG
208 char *block_name;
209 uint8_t *local_host_addr; /* local virtual address */
210 uint64_t remote_host_addr; /* remote virtual address */
211 uint64_t offset;
212 uint64_t length;
213 struct ibv_mr **pmr; /* MRs for chunk-level registration */
214 struct ibv_mr *mr; /* MR for non-chunk-level registration */
215 uint32_t *remote_keys; /* rkeys for chunk-level registration */
216 uint32_t remote_rkey; /* rkeys for non-chunk-level registration */
217 int index; /* which block are we */
e4d63320 218 unsigned int src_index; /* (Only used on dest) */
4fb5364b
DDAG
219 bool is_ram_block;
220 int nb_chunks;
2da776db
MH
221 unsigned long *transit_bitmap;
222 unsigned long *unregister_bitmap;
223} RDMALocalBlock;
224
225/*
226 * Also represents a RAMblock, but only on the dest.
227 * This gets transmitted by the dest during connection-time
228 * to the source VM and then is used to populate the
229 * corresponding RDMALocalBlock with
230 * the information needed to perform the actual RDMA.
231 */
a97270ad 232typedef struct QEMU_PACKED RDMADestBlock {
2da776db
MH
233 uint64_t remote_host_addr;
234 uint64_t offset;
235 uint64_t length;
236 uint32_t remote_rkey;
237 uint32_t padding;
a97270ad 238} RDMADestBlock;
2da776db 239
482a33c5
DDAG
240static const char *control_desc(unsigned int rdma_control)
241{
242 static const char *strs[] = {
243 [RDMA_CONTROL_NONE] = "NONE",
244 [RDMA_CONTROL_ERROR] = "ERROR",
245 [RDMA_CONTROL_READY] = "READY",
246 [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE",
247 [RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST",
248 [RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT",
249 [RDMA_CONTROL_COMPRESS] = "COMPRESS",
250 [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST",
251 [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT",
252 [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED",
253 [RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST",
254 [RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED",
255 };
256
257 if (rdma_control > RDMA_CONTROL_UNREGISTER_FINISHED) {
258 return "??BAD CONTROL VALUE??";
259 }
260
261 return strs[rdma_control];
262}
263
2da776db
MH
264static uint64_t htonll(uint64_t v)
265{
266 union { uint32_t lv[2]; uint64_t llv; } u;
267 u.lv[0] = htonl(v >> 32);
268 u.lv[1] = htonl(v & 0xFFFFFFFFULL);
269 return u.llv;
270}
271
272static uint64_t ntohll(uint64_t v) {
273 union { uint32_t lv[2]; uint64_t llv; } u;
274 u.llv = v;
275 return ((uint64_t)ntohl(u.lv[0]) << 32) | (uint64_t) ntohl(u.lv[1]);
276}
277
a97270ad 278static void dest_block_to_network(RDMADestBlock *db)
2da776db 279{
a97270ad
DDAG
280 db->remote_host_addr = htonll(db->remote_host_addr);
281 db->offset = htonll(db->offset);
282 db->length = htonll(db->length);
283 db->remote_rkey = htonl(db->remote_rkey);
2da776db
MH
284}
285
a97270ad 286static void network_to_dest_block(RDMADestBlock *db)
2da776db 287{
a97270ad
DDAG
288 db->remote_host_addr = ntohll(db->remote_host_addr);
289 db->offset = ntohll(db->offset);
290 db->length = ntohll(db->length);
291 db->remote_rkey = ntohl(db->remote_rkey);
2da776db
MH
292}
293
294/*
295 * Virtual address of the above structures used for transmitting
296 * the RAMBlock descriptions at connection-time.
297 * This structure is *not* transmitted.
298 */
299typedef struct RDMALocalBlocks {
300 int nb_blocks;
301 bool init; /* main memory init complete */
302 RDMALocalBlock *block;
303} RDMALocalBlocks;
304
305/*
306 * Main data structure for RDMA state.
307 * While there is only one copy of this structure being allocated right now,
308 * this is the place where one would start if you wanted to consider
309 * having more than one RDMA connection open at the same time.
310 */
311typedef struct RDMAContext {
312 char *host;
313 int port;
314
1f22364b 315 RDMAWorkRequestData wr_data[RDMA_WRID_MAX];
2da776db
MH
316
317 /*
318 * This is used by *_exchange_send() to figure out whether or not
319 * the initial "READY" message has already been received or not.
320 * This is because other functions may potentially poll() and detect
321 * the READY message before send() does, in which case we need to
322 * know if it completed.
323 */
324 int control_ready_expected;
325
326 /* number of outstanding writes */
327 int nb_sent;
328
329 /* store info about current buffer so that we can
330 merge it with future sends */
331 uint64_t current_addr;
332 uint64_t current_length;
333 /* index of ram block the current buffer belongs to */
334 int current_index;
335 /* index of the chunk in the current ram block */
336 int current_chunk;
337
338 bool pin_all;
339
340 /*
341 * infiniband-specific variables for opening the device
342 * and maintaining connection state and so forth.
343 *
344 * cm_id also has ibv_context, rdma_event_channel, and ibv_qp in
345 * cm_id->verbs, cm_id->channel, and cm_id->qp.
346 */
347 struct rdma_cm_id *cm_id; /* connection manager ID */
348 struct rdma_cm_id *listen_id;
5a91337c 349 bool connected;
2da776db
MH
350
351 struct ibv_context *verbs;
352 struct rdma_event_channel *channel;
353 struct ibv_qp *qp; /* queue pair */
354 struct ibv_comp_channel *comp_channel; /* completion channel */
355 struct ibv_pd *pd; /* protection domain */
356 struct ibv_cq *cq; /* completion queue */
357
358 /*
359 * If a previous write failed (perhaps because of a failed
360 * memory registration, then do not attempt any future work
361 * and remember the error state.
362 */
363 int error_state;
364 int error_reported;
cd5ea070 365 int received_error;
2da776db
MH
366
367 /*
368 * Description of ram blocks used throughout the code.
369 */
370 RDMALocalBlocks local_ram_blocks;
a97270ad 371 RDMADestBlock *dest_blocks;
2da776db 372
e4d63320
DDAG
373 /* Index of the next RAMBlock received during block registration */
374 unsigned int next_src_index;
375
2da776db
MH
376 /*
377 * Migration on *destination* started.
378 * Then use coroutine yield function.
379 * Source runs in a thread, so we don't care.
380 */
381 int migration_started_on_destination;
382
383 int total_registrations;
384 int total_writes;
385
386 int unregister_current, unregister_next;
387 uint64_t unregistrations[RDMA_SIGNALED_SEND_MAX];
388
389 GHashTable *blockmap;
55cc1b59
LC
390
391 /* the RDMAContext for return path */
392 struct RDMAContext *return_path;
393 bool is_return_path;
2da776db
MH
394} RDMAContext;
395
6ddd2d76
DB
396#define TYPE_QIO_CHANNEL_RDMA "qio-channel-rdma"
397#define QIO_CHANNEL_RDMA(obj) \
398 OBJECT_CHECK(QIOChannelRDMA, (obj), TYPE_QIO_CHANNEL_RDMA)
399
400typedef struct QIOChannelRDMA QIOChannelRDMA;
401
402
403struct QIOChannelRDMA {
404 QIOChannel parent;
2da776db 405 RDMAContext *rdma;
6ddd2d76 406 QEMUFile *file;
6ddd2d76
DB
407 bool blocking; /* XXX we don't actually honour this yet */
408};
2da776db
MH
409
410/*
411 * Main structure for IB Send/Recv control messages.
412 * This gets prepended at the beginning of every Send/Recv.
413 */
414typedef struct QEMU_PACKED {
415 uint32_t len; /* Total length of data portion */
416 uint32_t type; /* which control command to perform */
417 uint32_t repeat; /* number of commands in data portion of same type */
418 uint32_t padding;
419} RDMAControlHeader;
420
421static void control_to_network(RDMAControlHeader *control)
422{
423 control->type = htonl(control->type);
424 control->len = htonl(control->len);
425 control->repeat = htonl(control->repeat);
426}
427
428static void network_to_control(RDMAControlHeader *control)
429{
430 control->type = ntohl(control->type);
431 control->len = ntohl(control->len);
432 control->repeat = ntohl(control->repeat);
433}
434
435/*
436 * Register a single Chunk.
437 * Information sent by the source VM to inform the dest
438 * to register an single chunk of memory before we can perform
439 * the actual RDMA operation.
440 */
441typedef struct QEMU_PACKED {
442 union QEMU_PACKED {
b12f7777 443 uint64_t current_addr; /* offset into the ram_addr_t space */
2da776db
MH
444 uint64_t chunk; /* chunk to lookup if unregistering */
445 } key;
446 uint32_t current_index; /* which ramblock the chunk belongs to */
447 uint32_t padding;
448 uint64_t chunks; /* how many sequential chunks to register */
449} RDMARegister;
450
b12f7777 451static void register_to_network(RDMAContext *rdma, RDMARegister *reg)
2da776db 452{
b12f7777
DDAG
453 RDMALocalBlock *local_block;
454 local_block = &rdma->local_ram_blocks.block[reg->current_index];
455
456 if (local_block->is_ram_block) {
457 /*
458 * current_addr as passed in is an address in the local ram_addr_t
459 * space, we need to translate this for the destination
460 */
461 reg->key.current_addr -= local_block->offset;
462 reg->key.current_addr += rdma->dest_blocks[reg->current_index].offset;
463 }
2da776db
MH
464 reg->key.current_addr = htonll(reg->key.current_addr);
465 reg->current_index = htonl(reg->current_index);
466 reg->chunks = htonll(reg->chunks);
467}
468
469static void network_to_register(RDMARegister *reg)
470{
471 reg->key.current_addr = ntohll(reg->key.current_addr);
472 reg->current_index = ntohl(reg->current_index);
473 reg->chunks = ntohll(reg->chunks);
474}
475
476typedef struct QEMU_PACKED {
477 uint32_t value; /* if zero, we will madvise() */
478 uint32_t block_idx; /* which ram block index */
b12f7777 479 uint64_t offset; /* Address in remote ram_addr_t space */
2da776db
MH
480 uint64_t length; /* length of the chunk */
481} RDMACompress;
482
b12f7777 483static void compress_to_network(RDMAContext *rdma, RDMACompress *comp)
2da776db
MH
484{
485 comp->value = htonl(comp->value);
b12f7777
DDAG
486 /*
487 * comp->offset as passed in is an address in the local ram_addr_t
488 * space, we need to translate this for the destination
489 */
490 comp->offset -= rdma->local_ram_blocks.block[comp->block_idx].offset;
491 comp->offset += rdma->dest_blocks[comp->block_idx].offset;
2da776db
MH
492 comp->block_idx = htonl(comp->block_idx);
493 comp->offset = htonll(comp->offset);
494 comp->length = htonll(comp->length);
495}
496
497static void network_to_compress(RDMACompress *comp)
498{
499 comp->value = ntohl(comp->value);
500 comp->block_idx = ntohl(comp->block_idx);
501 comp->offset = ntohll(comp->offset);
502 comp->length = ntohll(comp->length);
503}
504
505/*
506 * The result of the dest's memory registration produces an "rkey"
507 * which the source VM must reference in order to perform
508 * the RDMA operation.
509 */
510typedef struct QEMU_PACKED {
511 uint32_t rkey;
512 uint32_t padding;
513 uint64_t host_addr;
514} RDMARegisterResult;
515
516static void result_to_network(RDMARegisterResult *result)
517{
518 result->rkey = htonl(result->rkey);
519 result->host_addr = htonll(result->host_addr);
520};
521
522static void network_to_result(RDMARegisterResult *result)
523{
524 result->rkey = ntohl(result->rkey);
525 result->host_addr = ntohll(result->host_addr);
526};
527
528const char *print_wrid(int wrid);
529static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
530 uint8_t *data, RDMAControlHeader *resp,
531 int *resp_idx,
532 int (*callback)(RDMAContext *rdma));
533
dd286ed7
IY
534static inline uint64_t ram_chunk_index(const uint8_t *start,
535 const uint8_t *host)
2da776db
MH
536{
537 return ((uintptr_t) host - (uintptr_t) start) >> RDMA_REG_CHUNK_SHIFT;
538}
539
dd286ed7 540static inline uint8_t *ram_chunk_start(const RDMALocalBlock *rdma_ram_block,
2da776db
MH
541 uint64_t i)
542{
fbce8c25
SW
543 return (uint8_t *)(uintptr_t)(rdma_ram_block->local_host_addr +
544 (i << RDMA_REG_CHUNK_SHIFT));
2da776db
MH
545}
546
dd286ed7
IY
547static inline uint8_t *ram_chunk_end(const RDMALocalBlock *rdma_ram_block,
548 uint64_t i)
2da776db
MH
549{
550 uint8_t *result = ram_chunk_start(rdma_ram_block, i) +
551 (1UL << RDMA_REG_CHUNK_SHIFT);
552
553 if (result > (rdma_ram_block->local_host_addr + rdma_ram_block->length)) {
554 result = rdma_ram_block->local_host_addr + rdma_ram_block->length;
555 }
556
557 return result;
558}
559
4fb5364b
DDAG
560static int rdma_add_block(RDMAContext *rdma, const char *block_name,
561 void *host_addr,
2da776db
MH
562 ram_addr_t block_offset, uint64_t length)
563{
564 RDMALocalBlocks *local = &rdma->local_ram_blocks;
760ff4be 565 RDMALocalBlock *block;
2da776db
MH
566 RDMALocalBlock *old = local->block;
567
97f3ad35 568 local->block = g_new0(RDMALocalBlock, local->nb_blocks + 1);
2da776db
MH
569
570 if (local->nb_blocks) {
571 int x;
572
760ff4be
DDAG
573 if (rdma->blockmap) {
574 for (x = 0; x < local->nb_blocks; x++) {
575 g_hash_table_remove(rdma->blockmap,
576 (void *)(uintptr_t)old[x].offset);
577 g_hash_table_insert(rdma->blockmap,
578 (void *)(uintptr_t)old[x].offset,
579 &local->block[x]);
580 }
2da776db
MH
581 }
582 memcpy(local->block, old, sizeof(RDMALocalBlock) * local->nb_blocks);
583 g_free(old);
584 }
585
586 block = &local->block[local->nb_blocks];
587
4fb5364b 588 block->block_name = g_strdup(block_name);
2da776db
MH
589 block->local_host_addr = host_addr;
590 block->offset = block_offset;
591 block->length = length;
592 block->index = local->nb_blocks;
e4d63320 593 block->src_index = ~0U; /* Filled in by the receipt of the block list */
2da776db
MH
594 block->nb_chunks = ram_chunk_index(host_addr, host_addr + length) + 1UL;
595 block->transit_bitmap = bitmap_new(block->nb_chunks);
596 bitmap_clear(block->transit_bitmap, 0, block->nb_chunks);
597 block->unregister_bitmap = bitmap_new(block->nb_chunks);
598 bitmap_clear(block->unregister_bitmap, 0, block->nb_chunks);
97f3ad35 599 block->remote_keys = g_new0(uint32_t, block->nb_chunks);
2da776db
MH
600
601 block->is_ram_block = local->init ? false : true;
602
760ff4be 603 if (rdma->blockmap) {
80e60c6e 604 g_hash_table_insert(rdma->blockmap, (void *)(uintptr_t)block_offset, block);
760ff4be 605 }
2da776db 606
4fb5364b
DDAG
607 trace_rdma_add_block(block_name, local->nb_blocks,
608 (uintptr_t) block->local_host_addr,
ba795761 609 block->offset, block->length,
fbce8c25 610 (uintptr_t) (block->local_host_addr + block->length),
ba795761
DDAG
611 BITS_TO_LONGS(block->nb_chunks) *
612 sizeof(unsigned long) * 8,
613 block->nb_chunks);
2da776db
MH
614
615 local->nb_blocks++;
616
617 return 0;
618}
619
620/*
621 * Memory regions need to be registered with the device and queue pairs setup
622 * in advanced before the migration starts. This tells us where the RAM blocks
623 * are so that we can register them individually.
624 */
e3807054 625static int qemu_rdma_init_one_block(const char *block_name, void *host_addr,
2da776db
MH
626 ram_addr_t block_offset, ram_addr_t length, void *opaque)
627{
4fb5364b 628 return rdma_add_block(opaque, block_name, host_addr, block_offset, length);
2da776db
MH
629}
630
631/*
632 * Identify the RAMBlocks and their quantity. They will be references to
633 * identify chunk boundaries inside each RAMBlock and also be referenced
634 * during dynamic page registration.
635 */
636static int qemu_rdma_init_ram_blocks(RDMAContext *rdma)
637{
638 RDMALocalBlocks *local = &rdma->local_ram_blocks;
639
640 assert(rdma->blockmap == NULL);
2da776db 641 memset(local, 0, sizeof *local);
ff0769a4 642 qemu_ram_foreach_migratable_block(qemu_rdma_init_one_block, rdma);
733252de 643 trace_qemu_rdma_init_ram_blocks(local->nb_blocks);
97f3ad35
MA
644 rdma->dest_blocks = g_new0(RDMADestBlock,
645 rdma->local_ram_blocks.nb_blocks);
2da776db
MH
646 local->init = true;
647 return 0;
648}
649
03fcab38
DDAG
650/*
651 * Note: If used outside of cleanup, the caller must ensure that the destination
652 * block structures are also updated
653 */
654static int rdma_delete_block(RDMAContext *rdma, RDMALocalBlock *block)
2da776db
MH
655{
656 RDMALocalBlocks *local = &rdma->local_ram_blocks;
2da776db
MH
657 RDMALocalBlock *old = local->block;
658 int x;
659
03fcab38
DDAG
660 if (rdma->blockmap) {
661 g_hash_table_remove(rdma->blockmap, (void *)(uintptr_t)block->offset);
662 }
2da776db
MH
663 if (block->pmr) {
664 int j;
665
666 for (j = 0; j < block->nb_chunks; j++) {
667 if (!block->pmr[j]) {
668 continue;
669 }
670 ibv_dereg_mr(block->pmr[j]);
671 rdma->total_registrations--;
672 }
673 g_free(block->pmr);
674 block->pmr = NULL;
675 }
676
677 if (block->mr) {
678 ibv_dereg_mr(block->mr);
679 rdma->total_registrations--;
680 block->mr = NULL;
681 }
682
683 g_free(block->transit_bitmap);
684 block->transit_bitmap = NULL;
685
686 g_free(block->unregister_bitmap);
687 block->unregister_bitmap = NULL;
688
689 g_free(block->remote_keys);
690 block->remote_keys = NULL;
691
4fb5364b
DDAG
692 g_free(block->block_name);
693 block->block_name = NULL;
694
03fcab38
DDAG
695 if (rdma->blockmap) {
696 for (x = 0; x < local->nb_blocks; x++) {
697 g_hash_table_remove(rdma->blockmap,
698 (void *)(uintptr_t)old[x].offset);
699 }
2da776db
MH
700 }
701
702 if (local->nb_blocks > 1) {
703
97f3ad35 704 local->block = g_new0(RDMALocalBlock, local->nb_blocks - 1);
2da776db
MH
705
706 if (block->index) {
707 memcpy(local->block, old, sizeof(RDMALocalBlock) * block->index);
708 }
709
710 if (block->index < (local->nb_blocks - 1)) {
711 memcpy(local->block + block->index, old + (block->index + 1),
712 sizeof(RDMALocalBlock) *
713 (local->nb_blocks - (block->index + 1)));
71cd7306
LC
714 for (x = block->index; x < local->nb_blocks - 1; x++) {
715 local->block[x].index--;
716 }
2da776db
MH
717 }
718 } else {
719 assert(block == local->block);
720 local->block = NULL;
721 }
722
03fcab38 723 trace_rdma_delete_block(block, (uintptr_t)block->local_host_addr,
733252de 724 block->offset, block->length,
fbce8c25 725 (uintptr_t)(block->local_host_addr + block->length),
733252de
DDAG
726 BITS_TO_LONGS(block->nb_chunks) *
727 sizeof(unsigned long) * 8, block->nb_chunks);
2da776db
MH
728
729 g_free(old);
730
731 local->nb_blocks--;
732
03fcab38 733 if (local->nb_blocks && rdma->blockmap) {
2da776db 734 for (x = 0; x < local->nb_blocks; x++) {
fbce8c25
SW
735 g_hash_table_insert(rdma->blockmap,
736 (void *)(uintptr_t)local->block[x].offset,
737 &local->block[x]);
2da776db
MH
738 }
739 }
740
741 return 0;
742}
743
744/*
745 * Put in the log file which RDMA device was opened and the details
746 * associated with that device.
747 */
748static void qemu_rdma_dump_id(const char *who, struct ibv_context *verbs)
749{
7fc5b13f
MH
750 struct ibv_port_attr port;
751
752 if (ibv_query_port(verbs, 1, &port)) {
733252de 753 error_report("Failed to query port information");
7fc5b13f
MH
754 return;
755 }
756
2da776db
MH
757 printf("%s RDMA Device opened: kernel name %s "
758 "uverbs device name %s, "
7fc5b13f
MH
759 "infiniband_verbs class device path %s, "
760 "infiniband class device path %s, "
761 "transport: (%d) %s\n",
2da776db
MH
762 who,
763 verbs->device->name,
764 verbs->device->dev_name,
765 verbs->device->dev_path,
7fc5b13f
MH
766 verbs->device->ibdev_path,
767 port.link_layer,
768 (port.link_layer == IBV_LINK_LAYER_INFINIBAND) ? "Infiniband" :
02942db7 769 ((port.link_layer == IBV_LINK_LAYER_ETHERNET)
7fc5b13f 770 ? "Ethernet" : "Unknown"));
2da776db
MH
771}
772
773/*
774 * Put in the log file the RDMA gid addressing information,
775 * useful for folks who have trouble understanding the
776 * RDMA device hierarchy in the kernel.
777 */
778static void qemu_rdma_dump_gid(const char *who, struct rdma_cm_id *id)
779{
780 char sgid[33];
781 char dgid[33];
782 inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.sgid, sgid, sizeof sgid);
783 inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.dgid, dgid, sizeof dgid);
733252de 784 trace_qemu_rdma_dump_gid(who, sgid, dgid);
2da776db
MH
785}
786
7fc5b13f
MH
787/*
788 * As of now, IPv6 over RoCE / iWARP is not supported by linux.
789 * We will try the next addrinfo struct, and fail if there are
790 * no other valid addresses to bind against.
791 *
792 * If user is listening on '[::]', then we will not have a opened a device
793 * yet and have no way of verifying if the device is RoCE or not.
794 *
795 * In this case, the source VM will throw an error for ALL types of
796 * connections (both IPv4 and IPv6) if the destination machine does not have
797 * a regular infiniband network available for use.
798 *
4c293dc6 799 * The only way to guarantee that an error is thrown for broken kernels is
7fc5b13f
MH
800 * for the management software to choose a *specific* interface at bind time
801 * and validate what time of hardware it is.
802 *
803 * Unfortunately, this puts the user in a fix:
02942db7 804 *
7fc5b13f
MH
805 * If the source VM connects with an IPv4 address without knowing that the
806 * destination has bound to '[::]' the migration will unconditionally fail
b6af0975 807 * unless the management software is explicitly listening on the IPv4
7fc5b13f
MH
808 * address while using a RoCE-based device.
809 *
810 * If the source VM connects with an IPv6 address, then we're OK because we can
811 * throw an error on the source (and similarly on the destination).
02942db7 812 *
7fc5b13f
MH
813 * But in mixed environments, this will be broken for a while until it is fixed
814 * inside linux.
815 *
816 * We do provide a *tiny* bit of help in this function: We can list all of the
817 * devices in the system and check to see if all the devices are RoCE or
02942db7 818 * Infiniband.
7fc5b13f
MH
819 *
820 * If we detect that we have a *pure* RoCE environment, then we can safely
4c293dc6 821 * thrown an error even if the management software has specified '[::]' as the
7fc5b13f
MH
822 * bind address.
823 *
824 * However, if there is are multiple hetergeneous devices, then we cannot make
825 * this assumption and the user just has to be sure they know what they are
826 * doing.
827 *
828 * Patches are being reviewed on linux-rdma.
829 */
bbfb89e3 830static int qemu_rdma_broken_ipv6_kernel(struct ibv_context *verbs, Error **errp)
7fc5b13f
MH
831{
832 struct ibv_port_attr port_attr;
833
834 /* This bug only exists in linux, to our knowledge. */
835#ifdef CONFIG_LINUX
836
02942db7 837 /*
7fc5b13f 838 * Verbs are only NULL if management has bound to '[::]'.
02942db7 839 *
7fc5b13f
MH
840 * Let's iterate through all the devices and see if there any pure IB
841 * devices (non-ethernet).
02942db7 842 *
7fc5b13f 843 * If not, then we can safely proceed with the migration.
4c293dc6 844 * Otherwise, there are no guarantees until the bug is fixed in linux.
7fc5b13f
MH
845 */
846 if (!verbs) {
02942db7 847 int num_devices, x;
7fc5b13f
MH
848 struct ibv_device ** dev_list = ibv_get_device_list(&num_devices);
849 bool roce_found = false;
850 bool ib_found = false;
851
852 for (x = 0; x < num_devices; x++) {
853 verbs = ibv_open_device(dev_list[x]);
5b61d575
PR
854 if (!verbs) {
855 if (errno == EPERM) {
856 continue;
857 } else {
858 return -EINVAL;
859 }
860 }
7fc5b13f
MH
861
862 if (ibv_query_port(verbs, 1, &port_attr)) {
863 ibv_close_device(verbs);
864 ERROR(errp, "Could not query initial IB port");
865 return -EINVAL;
866 }
867
868 if (port_attr.link_layer == IBV_LINK_LAYER_INFINIBAND) {
869 ib_found = true;
870 } else if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) {
871 roce_found = true;
872 }
873
874 ibv_close_device(verbs);
875
876 }
877
878 if (roce_found) {
879 if (ib_found) {
880 fprintf(stderr, "WARN: migrations may fail:"
881 " IPv6 over RoCE / iWARP in linux"
882 " is broken. But since you appear to have a"
883 " mixed RoCE / IB environment, be sure to only"
884 " migrate over the IB fabric until the kernel "
885 " fixes the bug.\n");
886 } else {
887 ERROR(errp, "You only have RoCE / iWARP devices in your systems"
888 " and your management software has specified '[::]'"
889 ", but IPv6 over RoCE / iWARP is not supported in Linux.");
890 return -ENONET;
891 }
892 }
893
894 return 0;
895 }
896
897 /*
898 * If we have a verbs context, that means that some other than '[::]' was
02942db7
SW
899 * used by the management software for binding. In which case we can
900 * actually warn the user about a potentially broken kernel.
7fc5b13f
MH
901 */
902
903 /* IB ports start with 1, not 0 */
904 if (ibv_query_port(verbs, 1, &port_attr)) {
905 ERROR(errp, "Could not query initial IB port");
906 return -EINVAL;
907 }
908
909 if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) {
910 ERROR(errp, "Linux kernel's RoCE / iWARP does not support IPv6 "
911 "(but patches on linux-rdma in progress)");
912 return -ENONET;
913 }
914
915#endif
916
917 return 0;
918}
919
2da776db
MH
920/*
921 * Figure out which RDMA device corresponds to the requested IP hostname
922 * Also create the initial connection manager identifiers for opening
923 * the connection.
924 */
925static int qemu_rdma_resolve_host(RDMAContext *rdma, Error **errp)
926{
927 int ret;
7fc5b13f 928 struct rdma_addrinfo *res;
2da776db
MH
929 char port_str[16];
930 struct rdma_cm_event *cm_event;
931 char ip[40] = "unknown";
7fc5b13f 932 struct rdma_addrinfo *e;
2da776db
MH
933
934 if (rdma->host == NULL || !strcmp(rdma->host, "")) {
66988941 935 ERROR(errp, "RDMA hostname has not been set");
7fc5b13f 936 return -EINVAL;
2da776db
MH
937 }
938
939 /* create CM channel */
940 rdma->channel = rdma_create_event_channel();
941 if (!rdma->channel) {
66988941 942 ERROR(errp, "could not create CM channel");
7fc5b13f 943 return -EINVAL;
2da776db
MH
944 }
945
946 /* create CM id */
947 ret = rdma_create_id(rdma->channel, &rdma->cm_id, NULL, RDMA_PS_TCP);
948 if (ret) {
66988941 949 ERROR(errp, "could not create channel id");
2da776db
MH
950 goto err_resolve_create_id;
951 }
952
953 snprintf(port_str, 16, "%d", rdma->port);
954 port_str[15] = '\0';
955
7fc5b13f 956 ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res);
2da776db 957 if (ret < 0) {
7fc5b13f 958 ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host);
2da776db
MH
959 goto err_resolve_get_addr;
960 }
961
6470215b
MH
962 for (e = res; e != NULL; e = e->ai_next) {
963 inet_ntop(e->ai_family,
7fc5b13f 964 &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip);
733252de 965 trace_qemu_rdma_resolve_host_trying(rdma->host, ip);
2da776db 966
7fc5b13f 967 ret = rdma_resolve_addr(rdma->cm_id, NULL, e->ai_dst_addr,
6470215b
MH
968 RDMA_RESOLVE_TIMEOUT_MS);
969 if (!ret) {
c89aa2f1 970 if (e->ai_family == AF_INET6) {
bbfb89e3 971 ret = qemu_rdma_broken_ipv6_kernel(rdma->cm_id->verbs, errp);
c89aa2f1
MH
972 if (ret) {
973 continue;
974 }
7fc5b13f 975 }
6470215b
MH
976 goto route;
977 }
2da776db
MH
978 }
979
6470215b
MH
980 ERROR(errp, "could not resolve address %s", rdma->host);
981 goto err_resolve_get_addr;
982
983route:
2da776db
MH
984 qemu_rdma_dump_gid("source_resolve_addr", rdma->cm_id);
985
986 ret = rdma_get_cm_event(rdma->channel, &cm_event);
987 if (ret) {
66988941 988 ERROR(errp, "could not perform event_addr_resolved");
2da776db
MH
989 goto err_resolve_get_addr;
990 }
991
992 if (cm_event->event != RDMA_CM_EVENT_ADDR_RESOLVED) {
66988941 993 ERROR(errp, "result not equal to event_addr_resolved %s",
2da776db
MH
994 rdma_event_str(cm_event->event));
995 perror("rdma_resolve_addr");
2a934347 996 rdma_ack_cm_event(cm_event);
7fc5b13f 997 ret = -EINVAL;
2da776db
MH
998 goto err_resolve_get_addr;
999 }
1000 rdma_ack_cm_event(cm_event);
1001
1002 /* resolve route */
1003 ret = rdma_resolve_route(rdma->cm_id, RDMA_RESOLVE_TIMEOUT_MS);
1004 if (ret) {
66988941 1005 ERROR(errp, "could not resolve rdma route");
2da776db
MH
1006 goto err_resolve_get_addr;
1007 }
1008
1009 ret = rdma_get_cm_event(rdma->channel, &cm_event);
1010 if (ret) {
66988941 1011 ERROR(errp, "could not perform event_route_resolved");
2da776db
MH
1012 goto err_resolve_get_addr;
1013 }
1014 if (cm_event->event != RDMA_CM_EVENT_ROUTE_RESOLVED) {
66988941 1015 ERROR(errp, "result not equal to event_route_resolved: %s",
2da776db
MH
1016 rdma_event_str(cm_event->event));
1017 rdma_ack_cm_event(cm_event);
7fc5b13f 1018 ret = -EINVAL;
2da776db
MH
1019 goto err_resolve_get_addr;
1020 }
1021 rdma_ack_cm_event(cm_event);
1022 rdma->verbs = rdma->cm_id->verbs;
1023 qemu_rdma_dump_id("source_resolve_host", rdma->cm_id->verbs);
1024 qemu_rdma_dump_gid("source_resolve_host", rdma->cm_id);
1025 return 0;
1026
1027err_resolve_get_addr:
1028 rdma_destroy_id(rdma->cm_id);
1029 rdma->cm_id = NULL;
1030err_resolve_create_id:
1031 rdma_destroy_event_channel(rdma->channel);
1032 rdma->channel = NULL;
7fc5b13f 1033 return ret;
2da776db
MH
1034}
1035
1036/*
1037 * Create protection domain and completion queues
1038 */
1039static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma)
1040{
1041 /* allocate pd */
1042 rdma->pd = ibv_alloc_pd(rdma->verbs);
1043 if (!rdma->pd) {
733252de 1044 error_report("failed to allocate protection domain");
2da776db
MH
1045 return -1;
1046 }
1047
1048 /* create completion channel */
1049 rdma->comp_channel = ibv_create_comp_channel(rdma->verbs);
1050 if (!rdma->comp_channel) {
733252de 1051 error_report("failed to allocate completion channel");
2da776db
MH
1052 goto err_alloc_pd_cq;
1053 }
1054
1055 /*
1056 * Completion queue can be filled by both read and write work requests,
1057 * so must reflect the sum of both possible queue sizes.
1058 */
1059 rdma->cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3),
1060 NULL, rdma->comp_channel, 0);
1061 if (!rdma->cq) {
733252de 1062 error_report("failed to allocate completion queue");
2da776db
MH
1063 goto err_alloc_pd_cq;
1064 }
1065
1066 return 0;
1067
1068err_alloc_pd_cq:
1069 if (rdma->pd) {
1070 ibv_dealloc_pd(rdma->pd);
1071 }
1072 if (rdma->comp_channel) {
1073 ibv_destroy_comp_channel(rdma->comp_channel);
1074 }
1075 rdma->pd = NULL;
1076 rdma->comp_channel = NULL;
1077 return -1;
1078
1079}
1080
1081/*
1082 * Create queue pairs.
1083 */
1084static int qemu_rdma_alloc_qp(RDMAContext *rdma)
1085{
1086 struct ibv_qp_init_attr attr = { 0 };
1087 int ret;
1088
1089 attr.cap.max_send_wr = RDMA_SIGNALED_SEND_MAX;
1090 attr.cap.max_recv_wr = 3;
1091 attr.cap.max_send_sge = 1;
1092 attr.cap.max_recv_sge = 1;
1093 attr.send_cq = rdma->cq;
1094 attr.recv_cq = rdma->cq;
1095 attr.qp_type = IBV_QPT_RC;
1096
1097 ret = rdma_create_qp(rdma->cm_id, rdma->pd, &attr);
1098 if (ret) {
1099 return -1;
1100 }
1101
1102 rdma->qp = rdma->cm_id->qp;
1103 return 0;
1104}
1105
1106static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma)
1107{
1108 int i;
1109 RDMALocalBlocks *local = &rdma->local_ram_blocks;
1110
1111 for (i = 0; i < local->nb_blocks; i++) {
1112 local->block[i].mr =
1113 ibv_reg_mr(rdma->pd,
1114 local->block[i].local_host_addr,
1115 local->block[i].length,
1116 IBV_ACCESS_LOCAL_WRITE |
1117 IBV_ACCESS_REMOTE_WRITE
1118 );
1119 if (!local->block[i].mr) {
1120 perror("Failed to register local dest ram block!\n");
1121 break;
1122 }
1123 rdma->total_registrations++;
1124 }
1125
1126 if (i >= local->nb_blocks) {
1127 return 0;
1128 }
1129
1130 for (i--; i >= 0; i--) {
1131 ibv_dereg_mr(local->block[i].mr);
1132 rdma->total_registrations--;
1133 }
1134
1135 return -1;
1136
1137}
1138
1139/*
1140 * Find the ram block that corresponds to the page requested to be
1141 * transmitted by QEMU.
1142 *
1143 * Once the block is found, also identify which 'chunk' within that
1144 * block that the page belongs to.
1145 *
1146 * This search cannot fail or the migration will fail.
1147 */
1148static int qemu_rdma_search_ram_block(RDMAContext *rdma,
fbce8c25 1149 uintptr_t block_offset,
2da776db
MH
1150 uint64_t offset,
1151 uint64_t length,
1152 uint64_t *block_index,
1153 uint64_t *chunk_index)
1154{
1155 uint64_t current_addr = block_offset + offset;
1156 RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap,
1157 (void *) block_offset);
1158 assert(block);
1159 assert(current_addr >= block->offset);
1160 assert((current_addr + length) <= (block->offset + block->length));
1161
1162 *block_index = block->index;
1163 *chunk_index = ram_chunk_index(block->local_host_addr,
1164 block->local_host_addr + (current_addr - block->offset));
1165
1166 return 0;
1167}
1168
1169/*
1170 * Register a chunk with IB. If the chunk was already registered
1171 * previously, then skip.
1172 *
1173 * Also return the keys associated with the registration needed
1174 * to perform the actual RDMA operation.
1175 */
1176static int qemu_rdma_register_and_get_keys(RDMAContext *rdma,
3ac040c0 1177 RDMALocalBlock *block, uintptr_t host_addr,
2da776db
MH
1178 uint32_t *lkey, uint32_t *rkey, int chunk,
1179 uint8_t *chunk_start, uint8_t *chunk_end)
1180{
1181 if (block->mr) {
1182 if (lkey) {
1183 *lkey = block->mr->lkey;
1184 }
1185 if (rkey) {
1186 *rkey = block->mr->rkey;
1187 }
1188 return 0;
1189 }
1190
1191 /* allocate memory to store chunk MRs */
1192 if (!block->pmr) {
97f3ad35 1193 block->pmr = g_new0(struct ibv_mr *, block->nb_chunks);
2da776db
MH
1194 }
1195
1196 /*
1197 * If 'rkey', then we're the destination, so grant access to the source.
1198 *
1199 * If 'lkey', then we're the source VM, so grant access only to ourselves.
1200 */
1201 if (!block->pmr[chunk]) {
1202 uint64_t len = chunk_end - chunk_start;
1203
733252de 1204 trace_qemu_rdma_register_and_get_keys(len, chunk_start);
2da776db
MH
1205
1206 block->pmr[chunk] = ibv_reg_mr(rdma->pd,
1207 chunk_start, len,
1208 (rkey ? (IBV_ACCESS_LOCAL_WRITE |
1209 IBV_ACCESS_REMOTE_WRITE) : 0));
1210
1211 if (!block->pmr[chunk]) {
1212 perror("Failed to register chunk!");
1213 fprintf(stderr, "Chunk details: block: %d chunk index %d"
3ac040c0
SW
1214 " start %" PRIuPTR " end %" PRIuPTR
1215 " host %" PRIuPTR
1216 " local %" PRIuPTR " registrations: %d\n",
1217 block->index, chunk, (uintptr_t)chunk_start,
1218 (uintptr_t)chunk_end, host_addr,
1219 (uintptr_t)block->local_host_addr,
2da776db
MH
1220 rdma->total_registrations);
1221 return -1;
1222 }
1223 rdma->total_registrations++;
1224 }
1225
1226 if (lkey) {
1227 *lkey = block->pmr[chunk]->lkey;
1228 }
1229 if (rkey) {
1230 *rkey = block->pmr[chunk]->rkey;
1231 }
1232 return 0;
1233}
1234
1235/*
1236 * Register (at connection time) the memory used for control
1237 * channel messages.
1238 */
1239static int qemu_rdma_reg_control(RDMAContext *rdma, int idx)
1240{
1241 rdma->wr_data[idx].control_mr = ibv_reg_mr(rdma->pd,
1242 rdma->wr_data[idx].control, RDMA_CONTROL_MAX_BUFFER,
1243 IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
1244 if (rdma->wr_data[idx].control_mr) {
1245 rdma->total_registrations++;
1246 return 0;
1247 }
733252de 1248 error_report("qemu_rdma_reg_control failed");
2da776db
MH
1249 return -1;
1250}
1251
1252const char *print_wrid(int wrid)
1253{
1254 if (wrid >= RDMA_WRID_RECV_CONTROL) {
1255 return wrid_desc[RDMA_WRID_RECV_CONTROL];
1256 }
1257 return wrid_desc[wrid];
1258}
1259
1260/*
1261 * RDMA requires memory registration (mlock/pinning), but this is not good for
1262 * overcommitment.
1263 *
1264 * In preparation for the future where LRU information or workload-specific
1265 * writable writable working set memory access behavior is available to QEMU
1266 * it would be nice to have in place the ability to UN-register/UN-pin
1267 * particular memory regions from the RDMA hardware when it is determine that
1268 * those regions of memory will likely not be accessed again in the near future.
1269 *
1270 * While we do not yet have such information right now, the following
1271 * compile-time option allows us to perform a non-optimized version of this
1272 * behavior.
1273 *
1274 * By uncommenting this option, you will cause *all* RDMA transfers to be
1275 * unregistered immediately after the transfer completes on both sides of the
1276 * connection. This has no effect in 'rdma-pin-all' mode, only regular mode.
1277 *
1278 * This will have a terrible impact on migration performance, so until future
1279 * workload information or LRU information is available, do not attempt to use
1280 * this feature except for basic testing.
1281 */
1282//#define RDMA_UNREGISTRATION_EXAMPLE
1283
1284/*
1285 * Perform a non-optimized memory unregistration after every transfer
24ec68ef 1286 * for demonstration purposes, only if pin-all is not requested.
2da776db
MH
1287 *
1288 * Potential optimizations:
1289 * 1. Start a new thread to run this function continuously
1290 - for bit clearing
1291 - and for receipt of unregister messages
1292 * 2. Use an LRU.
1293 * 3. Use workload hints.
1294 */
1295static int qemu_rdma_unregister_waiting(RDMAContext *rdma)
1296{
1297 while (rdma->unregistrations[rdma->unregister_current]) {
1298 int ret;
1299 uint64_t wr_id = rdma->unregistrations[rdma->unregister_current];
1300 uint64_t chunk =
1301 (wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
1302 uint64_t index =
1303 (wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
1304 RDMALocalBlock *block =
1305 &(rdma->local_ram_blocks.block[index]);
1306 RDMARegister reg = { .current_index = index };
1307 RDMAControlHeader resp = { .type = RDMA_CONTROL_UNREGISTER_FINISHED,
1308 };
1309 RDMAControlHeader head = { .len = sizeof(RDMARegister),
1310 .type = RDMA_CONTROL_UNREGISTER_REQUEST,
1311 .repeat = 1,
1312 };
1313
733252de
DDAG
1314 trace_qemu_rdma_unregister_waiting_proc(chunk,
1315 rdma->unregister_current);
2da776db
MH
1316
1317 rdma->unregistrations[rdma->unregister_current] = 0;
1318 rdma->unregister_current++;
1319
1320 if (rdma->unregister_current == RDMA_SIGNALED_SEND_MAX) {
1321 rdma->unregister_current = 0;
1322 }
1323
1324
1325 /*
1326 * Unregistration is speculative (because migration is single-threaded
1327 * and we cannot break the protocol's inifinband message ordering).
1328 * Thus, if the memory is currently being used for transmission,
1329 * then abort the attempt to unregister and try again
1330 * later the next time a completion is received for this memory.
1331 */
1332 clear_bit(chunk, block->unregister_bitmap);
1333
1334 if (test_bit(chunk, block->transit_bitmap)) {
733252de 1335 trace_qemu_rdma_unregister_waiting_inflight(chunk);
2da776db
MH
1336 continue;
1337 }
1338
733252de 1339 trace_qemu_rdma_unregister_waiting_send(chunk);
2da776db
MH
1340
1341 ret = ibv_dereg_mr(block->pmr[chunk]);
1342 block->pmr[chunk] = NULL;
1343 block->remote_keys[chunk] = 0;
1344
1345 if (ret != 0) {
1346 perror("unregistration chunk failed");
1347 return -ret;
1348 }
1349 rdma->total_registrations--;
1350
1351 reg.key.chunk = chunk;
b12f7777 1352 register_to_network(rdma, &reg);
2da776db
MH
1353 ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
1354 &resp, NULL, NULL);
1355 if (ret < 0) {
1356 return ret;
1357 }
1358
733252de 1359 trace_qemu_rdma_unregister_waiting_complete(chunk);
2da776db
MH
1360 }
1361
1362 return 0;
1363}
1364
1365static uint64_t qemu_rdma_make_wrid(uint64_t wr_id, uint64_t index,
1366 uint64_t chunk)
1367{
1368 uint64_t result = wr_id & RDMA_WRID_TYPE_MASK;
1369
1370 result |= (index << RDMA_WRID_BLOCK_SHIFT);
1371 result |= (chunk << RDMA_WRID_CHUNK_SHIFT);
1372
1373 return result;
1374}
1375
1376/*
1377 * Set bit for unregistration in the next iteration.
1378 * We cannot transmit right here, but will unpin later.
1379 */
1380static void qemu_rdma_signal_unregister(RDMAContext *rdma, uint64_t index,
1381 uint64_t chunk, uint64_t wr_id)
1382{
1383 if (rdma->unregistrations[rdma->unregister_next] != 0) {
733252de 1384 error_report("rdma migration: queue is full");
2da776db
MH
1385 } else {
1386 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
1387
1388 if (!test_and_set_bit(chunk, block->unregister_bitmap)) {
733252de
DDAG
1389 trace_qemu_rdma_signal_unregister_append(chunk,
1390 rdma->unregister_next);
2da776db
MH
1391
1392 rdma->unregistrations[rdma->unregister_next++] =
1393 qemu_rdma_make_wrid(wr_id, index, chunk);
1394
1395 if (rdma->unregister_next == RDMA_SIGNALED_SEND_MAX) {
1396 rdma->unregister_next = 0;
1397 }
1398 } else {
733252de 1399 trace_qemu_rdma_signal_unregister_already(chunk);
2da776db
MH
1400 }
1401 }
1402}
1403
1404/*
1405 * Consult the connection manager to see a work request
1406 * (of any kind) has completed.
1407 * Return the work request ID that completed.
1408 */
88571882
IY
1409static uint64_t qemu_rdma_poll(RDMAContext *rdma, uint64_t *wr_id_out,
1410 uint32_t *byte_len)
2da776db
MH
1411{
1412 int ret;
1413 struct ibv_wc wc;
1414 uint64_t wr_id;
1415
1416 ret = ibv_poll_cq(rdma->cq, 1, &wc);
1417
1418 if (!ret) {
1419 *wr_id_out = RDMA_WRID_NONE;
1420 return 0;
1421 }
1422
1423 if (ret < 0) {
733252de 1424 error_report("ibv_poll_cq return %d", ret);
2da776db
MH
1425 return ret;
1426 }
1427
1428 wr_id = wc.wr_id & RDMA_WRID_TYPE_MASK;
1429
1430 if (wc.status != IBV_WC_SUCCESS) {
1431 fprintf(stderr, "ibv_poll_cq wc.status=%d %s!\n",
1432 wc.status, ibv_wc_status_str(wc.status));
1433 fprintf(stderr, "ibv_poll_cq wrid=%s!\n", wrid_desc[wr_id]);
1434
1435 return -1;
1436 }
1437
1438 if (rdma->control_ready_expected &&
1439 (wr_id >= RDMA_WRID_RECV_CONTROL)) {
733252de 1440 trace_qemu_rdma_poll_recv(wrid_desc[RDMA_WRID_RECV_CONTROL],
2da776db
MH
1441 wr_id - RDMA_WRID_RECV_CONTROL, wr_id, rdma->nb_sent);
1442 rdma->control_ready_expected = 0;
1443 }
1444
1445 if (wr_id == RDMA_WRID_RDMA_WRITE) {
1446 uint64_t chunk =
1447 (wc.wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
1448 uint64_t index =
1449 (wc.wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
1450 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
1451
733252de 1452 trace_qemu_rdma_poll_write(print_wrid(wr_id), wr_id, rdma->nb_sent,
fbce8c25
SW
1453 index, chunk, block->local_host_addr,
1454 (void *)(uintptr_t)block->remote_host_addr);
2da776db
MH
1455
1456 clear_bit(chunk, block->transit_bitmap);
1457
1458 if (rdma->nb_sent > 0) {
1459 rdma->nb_sent--;
1460 }
1461
1462 if (!rdma->pin_all) {
1463 /*
1464 * FYI: If one wanted to signal a specific chunk to be unregistered
1465 * using LRU or workload-specific information, this is the function
1466 * you would call to do so. That chunk would then get asynchronously
1467 * unregistered later.
1468 */
1469#ifdef RDMA_UNREGISTRATION_EXAMPLE
1470 qemu_rdma_signal_unregister(rdma, index, chunk, wc.wr_id);
1471#endif
1472 }
1473 } else {
733252de 1474 trace_qemu_rdma_poll_other(print_wrid(wr_id), wr_id, rdma->nb_sent);
2da776db
MH
1475 }
1476
1477 *wr_id_out = wc.wr_id;
88571882
IY
1478 if (byte_len) {
1479 *byte_len = wc.byte_len;
1480 }
2da776db
MH
1481
1482 return 0;
1483}
1484
9c98cfbe
DDAG
1485/* Wait for activity on the completion channel.
1486 * Returns 0 on success, none-0 on error.
1487 */
1488static int qemu_rdma_wait_comp_channel(RDMAContext *rdma)
1489{
1490 /*
1491 * Coroutine doesn't start until migration_fd_process_incoming()
1492 * so don't yield unless we know we're running inside of a coroutine.
1493 */
1494 if (rdma->migration_started_on_destination) {
1495 yield_until_fd_readable(rdma->comp_channel->fd);
1496 } else {
1497 /* This is the source side, we're in a separate thread
1498 * or destination prior to migration_fd_process_incoming()
1499 * we can't yield; so we have to poll the fd.
1500 * But we need to be able to handle 'cancel' or an error
1501 * without hanging forever.
1502 */
1503 while (!rdma->error_state && !rdma->received_error) {
1504 GPollFD pfds[1];
1505 pfds[0].fd = rdma->comp_channel->fd;
1506 pfds[0].events = G_IO_IN | G_IO_HUP | G_IO_ERR;
1507 /* 0.1s timeout, should be fine for a 'cancel' */
1508 switch (qemu_poll_ns(pfds, 1, 100 * 1000 * 1000)) {
1509 case 1: /* fd active */
1510 return 0;
1511
1512 case 0: /* Timeout, go around again */
1513 break;
1514
1515 default: /* Error of some type -
1516 * I don't trust errno from qemu_poll_ns
1517 */
1518 error_report("%s: poll failed", __func__);
1519 return -EPIPE;
1520 }
1521
1522 if (migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) {
1523 /* Bail out and let the cancellation happen */
1524 return -EPIPE;
1525 }
1526 }
1527 }
1528
1529 if (rdma->received_error) {
1530 return -EPIPE;
1531 }
1532 return rdma->error_state;
1533}
1534
2da776db
MH
1535/*
1536 * Block until the next work request has completed.
1537 *
1538 * First poll to see if a work request has already completed,
1539 * otherwise block.
1540 *
1541 * If we encounter completed work requests for IDs other than
1542 * the one we're interested in, then that's generally an error.
1543 *
1544 * The only exception is actual RDMA Write completions. These
1545 * completions only need to be recorded, but do not actually
1546 * need further processing.
1547 */
88571882
IY
1548static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested,
1549 uint32_t *byte_len)
2da776db
MH
1550{
1551 int num_cq_events = 0, ret = 0;
1552 struct ibv_cq *cq;
1553 void *cq_ctx;
1554 uint64_t wr_id = RDMA_WRID_NONE, wr_id_in;
1555
1556 if (ibv_req_notify_cq(rdma->cq, 0)) {
1557 return -1;
1558 }
1559 /* poll cq first */
1560 while (wr_id != wrid_requested) {
88571882 1561 ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len);
2da776db
MH
1562 if (ret < 0) {
1563 return ret;
1564 }
1565
1566 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
1567
1568 if (wr_id == RDMA_WRID_NONE) {
1569 break;
1570 }
1571 if (wr_id != wrid_requested) {
733252de
DDAG
1572 trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested),
1573 wrid_requested, print_wrid(wr_id), wr_id);
2da776db
MH
1574 }
1575 }
1576
1577 if (wr_id == wrid_requested) {
1578 return 0;
1579 }
1580
1581 while (1) {
9c98cfbe
DDAG
1582 ret = qemu_rdma_wait_comp_channel(rdma);
1583 if (ret) {
1584 goto err_block_for_wrid;
2da776db
MH
1585 }
1586
0b3c15f0
DDAG
1587 ret = ibv_get_cq_event(rdma->comp_channel, &cq, &cq_ctx);
1588 if (ret) {
2da776db
MH
1589 perror("ibv_get_cq_event");
1590 goto err_block_for_wrid;
1591 }
1592
1593 num_cq_events++;
1594
0b3c15f0
DDAG
1595 ret = -ibv_req_notify_cq(cq, 0);
1596 if (ret) {
2da776db
MH
1597 goto err_block_for_wrid;
1598 }
1599
1600 while (wr_id != wrid_requested) {
88571882 1601 ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len);
2da776db
MH
1602 if (ret < 0) {
1603 goto err_block_for_wrid;
1604 }
1605
1606 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
1607
1608 if (wr_id == RDMA_WRID_NONE) {
1609 break;
1610 }
1611 if (wr_id != wrid_requested) {
733252de
DDAG
1612 trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested),
1613 wrid_requested, print_wrid(wr_id), wr_id);
2da776db
MH
1614 }
1615 }
1616
1617 if (wr_id == wrid_requested) {
1618 goto success_block_for_wrid;
1619 }
1620 }
1621
1622success_block_for_wrid:
1623 if (num_cq_events) {
1624 ibv_ack_cq_events(cq, num_cq_events);
1625 }
1626 return 0;
1627
1628err_block_for_wrid:
1629 if (num_cq_events) {
1630 ibv_ack_cq_events(cq, num_cq_events);
1631 }
0b3c15f0
DDAG
1632
1633 rdma->error_state = ret;
2da776db
MH
1634 return ret;
1635}
1636
1637/*
1638 * Post a SEND message work request for the control channel
1639 * containing some data and block until the post completes.
1640 */
1641static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf,
1642 RDMAControlHeader *head)
1643{
1644 int ret = 0;
1f22364b 1645 RDMAWorkRequestData *wr = &rdma->wr_data[RDMA_WRID_CONTROL];
2da776db
MH
1646 struct ibv_send_wr *bad_wr;
1647 struct ibv_sge sge = {
fbce8c25 1648 .addr = (uintptr_t)(wr->control),
2da776db
MH
1649 .length = head->len + sizeof(RDMAControlHeader),
1650 .lkey = wr->control_mr->lkey,
1651 };
1652 struct ibv_send_wr send_wr = {
1653 .wr_id = RDMA_WRID_SEND_CONTROL,
1654 .opcode = IBV_WR_SEND,
1655 .send_flags = IBV_SEND_SIGNALED,
1656 .sg_list = &sge,
1657 .num_sge = 1,
1658 };
1659
482a33c5 1660 trace_qemu_rdma_post_send_control(control_desc(head->type));
2da776db
MH
1661
1662 /*
1663 * We don't actually need to do a memcpy() in here if we used
1664 * the "sge" properly, but since we're only sending control messages
1665 * (not RAM in a performance-critical path), then its OK for now.
1666 *
1667 * The copy makes the RDMAControlHeader simpler to manipulate
1668 * for the time being.
1669 */
6f1484ed 1670 assert(head->len <= RDMA_CONTROL_MAX_BUFFER - sizeof(*head));
2da776db
MH
1671 memcpy(wr->control, head, sizeof(RDMAControlHeader));
1672 control_to_network((void *) wr->control);
1673
1674 if (buf) {
1675 memcpy(wr->control + sizeof(RDMAControlHeader), buf, head->len);
1676 }
1677
1678
e325b49a 1679 ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
2da776db 1680
e325b49a 1681 if (ret > 0) {
733252de 1682 error_report("Failed to use post IB SEND for control");
e325b49a 1683 return -ret;
2da776db
MH
1684 }
1685
88571882 1686 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_SEND_CONTROL, NULL);
2da776db 1687 if (ret < 0) {
733252de 1688 error_report("rdma migration: send polling control error");
2da776db
MH
1689 }
1690
1691 return ret;
1692}
1693
1694/*
1695 * Post a RECV work request in anticipation of some future receipt
1696 * of data on the control channel.
1697 */
1698static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx)
1699{
1700 struct ibv_recv_wr *bad_wr;
1701 struct ibv_sge sge = {
fbce8c25 1702 .addr = (uintptr_t)(rdma->wr_data[idx].control),
2da776db
MH
1703 .length = RDMA_CONTROL_MAX_BUFFER,
1704 .lkey = rdma->wr_data[idx].control_mr->lkey,
1705 };
1706
1707 struct ibv_recv_wr recv_wr = {
1708 .wr_id = RDMA_WRID_RECV_CONTROL + idx,
1709 .sg_list = &sge,
1710 .num_sge = 1,
1711 };
1712
1713
1714 if (ibv_post_recv(rdma->qp, &recv_wr, &bad_wr)) {
1715 return -1;
1716 }
1717
1718 return 0;
1719}
1720
1721/*
1722 * Block and wait for a RECV control channel message to arrive.
1723 */
1724static int qemu_rdma_exchange_get_response(RDMAContext *rdma,
1725 RDMAControlHeader *head, int expecting, int idx)
1726{
88571882
IY
1727 uint32_t byte_len;
1728 int ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RECV_CONTROL + idx,
1729 &byte_len);
2da776db
MH
1730
1731 if (ret < 0) {
733252de 1732 error_report("rdma migration: recv polling control error!");
2da776db
MH
1733 return ret;
1734 }
1735
1736 network_to_control((void *) rdma->wr_data[idx].control);
1737 memcpy(head, rdma->wr_data[idx].control, sizeof(RDMAControlHeader));
1738
482a33c5 1739 trace_qemu_rdma_exchange_get_response_start(control_desc(expecting));
2da776db
MH
1740
1741 if (expecting == RDMA_CONTROL_NONE) {
482a33c5 1742 trace_qemu_rdma_exchange_get_response_none(control_desc(head->type),
733252de 1743 head->type);
2da776db 1744 } else if (head->type != expecting || head->type == RDMA_CONTROL_ERROR) {
733252de
DDAG
1745 error_report("Was expecting a %s (%d) control message"
1746 ", but got: %s (%d), length: %d",
482a33c5
DDAG
1747 control_desc(expecting), expecting,
1748 control_desc(head->type), head->type, head->len);
cd5ea070
DDAG
1749 if (head->type == RDMA_CONTROL_ERROR) {
1750 rdma->received_error = true;
1751 }
2da776db
MH
1752 return -EIO;
1753 }
6f1484ed 1754 if (head->len > RDMA_CONTROL_MAX_BUFFER - sizeof(*head)) {
81b07353 1755 error_report("too long length: %d", head->len);
6f1484ed
IY
1756 return -EINVAL;
1757 }
88571882 1758 if (sizeof(*head) + head->len != byte_len) {
733252de 1759 error_report("Malformed length: %d byte_len %d", head->len, byte_len);
88571882
IY
1760 return -EINVAL;
1761 }
2da776db
MH
1762
1763 return 0;
1764}
1765
1766/*
1767 * When a RECV work request has completed, the work request's
1768 * buffer is pointed at the header.
1769 *
1770 * This will advance the pointer to the data portion
1771 * of the control message of the work request's buffer that
1772 * was populated after the work request finished.
1773 */
1774static void qemu_rdma_move_header(RDMAContext *rdma, int idx,
1775 RDMAControlHeader *head)
1776{
1777 rdma->wr_data[idx].control_len = head->len;
1778 rdma->wr_data[idx].control_curr =
1779 rdma->wr_data[idx].control + sizeof(RDMAControlHeader);
1780}
1781
1782/*
1783 * This is an 'atomic' high-level operation to deliver a single, unified
1784 * control-channel message.
1785 *
1786 * Additionally, if the user is expecting some kind of reply to this message,
1787 * they can request a 'resp' response message be filled in by posting an
1788 * additional work request on behalf of the user and waiting for an additional
1789 * completion.
1790 *
1791 * The extra (optional) response is used during registration to us from having
1792 * to perform an *additional* exchange of message just to provide a response by
1793 * instead piggy-backing on the acknowledgement.
1794 */
1795static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
1796 uint8_t *data, RDMAControlHeader *resp,
1797 int *resp_idx,
1798 int (*callback)(RDMAContext *rdma))
1799{
1800 int ret = 0;
1801
1802 /*
1803 * Wait until the dest is ready before attempting to deliver the message
1804 * by waiting for a READY message.
1805 */
1806 if (rdma->control_ready_expected) {
1807 RDMAControlHeader resp;
1808 ret = qemu_rdma_exchange_get_response(rdma,
1809 &resp, RDMA_CONTROL_READY, RDMA_WRID_READY);
1810 if (ret < 0) {
1811 return ret;
1812 }
1813 }
1814
1815 /*
1816 * If the user is expecting a response, post a WR in anticipation of it.
1817 */
1818 if (resp) {
1819 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_DATA);
1820 if (ret) {
733252de 1821 error_report("rdma migration: error posting"
2da776db
MH
1822 " extra control recv for anticipated result!");
1823 return ret;
1824 }
1825 }
1826
1827 /*
1828 * Post a WR to replace the one we just consumed for the READY message.
1829 */
1830 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
1831 if (ret) {
733252de 1832 error_report("rdma migration: error posting first control recv!");
2da776db
MH
1833 return ret;
1834 }
1835
1836 /*
1837 * Deliver the control message that was requested.
1838 */
1839 ret = qemu_rdma_post_send_control(rdma, data, head);
1840
1841 if (ret < 0) {
733252de 1842 error_report("Failed to send control buffer!");
2da776db
MH
1843 return ret;
1844 }
1845
1846 /*
1847 * If we're expecting a response, block and wait for it.
1848 */
1849 if (resp) {
1850 if (callback) {
733252de 1851 trace_qemu_rdma_exchange_send_issue_callback();
2da776db
MH
1852 ret = callback(rdma);
1853 if (ret < 0) {
1854 return ret;
1855 }
1856 }
1857
482a33c5 1858 trace_qemu_rdma_exchange_send_waiting(control_desc(resp->type));
2da776db
MH
1859 ret = qemu_rdma_exchange_get_response(rdma, resp,
1860 resp->type, RDMA_WRID_DATA);
1861
1862 if (ret < 0) {
1863 return ret;
1864 }
1865
1866 qemu_rdma_move_header(rdma, RDMA_WRID_DATA, resp);
1867 if (resp_idx) {
1868 *resp_idx = RDMA_WRID_DATA;
1869 }
482a33c5 1870 trace_qemu_rdma_exchange_send_received(control_desc(resp->type));
2da776db
MH
1871 }
1872
1873 rdma->control_ready_expected = 1;
1874
1875 return 0;
1876}
1877
1878/*
1879 * This is an 'atomic' high-level operation to receive a single, unified
1880 * control-channel message.
1881 */
1882static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader *head,
1883 int expecting)
1884{
1885 RDMAControlHeader ready = {
1886 .len = 0,
1887 .type = RDMA_CONTROL_READY,
1888 .repeat = 1,
1889 };
1890 int ret;
1891
1892 /*
1893 * Inform the source that we're ready to receive a message.
1894 */
1895 ret = qemu_rdma_post_send_control(rdma, NULL, &ready);
1896
1897 if (ret < 0) {
733252de 1898 error_report("Failed to send control buffer!");
2da776db
MH
1899 return ret;
1900 }
1901
1902 /*
1903 * Block and wait for the message.
1904 */
1905 ret = qemu_rdma_exchange_get_response(rdma, head,
1906 expecting, RDMA_WRID_READY);
1907
1908 if (ret < 0) {
1909 return ret;
1910 }
1911
1912 qemu_rdma_move_header(rdma, RDMA_WRID_READY, head);
1913
1914 /*
1915 * Post a new RECV work request to replace the one we just consumed.
1916 */
1917 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
1918 if (ret) {
733252de 1919 error_report("rdma migration: error posting second control recv!");
2da776db
MH
1920 return ret;
1921 }
1922
1923 return 0;
1924}
1925
1926/*
1927 * Write an actual chunk of memory using RDMA.
1928 *
1929 * If we're using dynamic registration on the dest-side, we have to
1930 * send a registration command first.
1931 */
1932static int qemu_rdma_write_one(QEMUFile *f, RDMAContext *rdma,
1933 int current_index, uint64_t current_addr,
1934 uint64_t length)
1935{
1936 struct ibv_sge sge;
1937 struct ibv_send_wr send_wr = { 0 };
1938 struct ibv_send_wr *bad_wr;
1939 int reg_result_idx, ret, count = 0;
1940 uint64_t chunk, chunks;
1941 uint8_t *chunk_start, *chunk_end;
1942 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[current_index]);
1943 RDMARegister reg;
1944 RDMARegisterResult *reg_result;
1945 RDMAControlHeader resp = { .type = RDMA_CONTROL_REGISTER_RESULT };
1946 RDMAControlHeader head = { .len = sizeof(RDMARegister),
1947 .type = RDMA_CONTROL_REGISTER_REQUEST,
1948 .repeat = 1,
1949 };
1950
1951retry:
fbce8c25 1952 sge.addr = (uintptr_t)(block->local_host_addr +
2da776db
MH
1953 (current_addr - block->offset));
1954 sge.length = length;
1955
fbce8c25
SW
1956 chunk = ram_chunk_index(block->local_host_addr,
1957 (uint8_t *)(uintptr_t)sge.addr);
2da776db
MH
1958 chunk_start = ram_chunk_start(block, chunk);
1959
1960 if (block->is_ram_block) {
1961 chunks = length / (1UL << RDMA_REG_CHUNK_SHIFT);
1962
1963 if (chunks && ((length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
1964 chunks--;
1965 }
1966 } else {
1967 chunks = block->length / (1UL << RDMA_REG_CHUNK_SHIFT);
1968
1969 if (chunks && ((block->length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
1970 chunks--;
1971 }
1972 }
1973
733252de
DDAG
1974 trace_qemu_rdma_write_one_top(chunks + 1,
1975 (chunks + 1) *
1976 (1UL << RDMA_REG_CHUNK_SHIFT) / 1024 / 1024);
2da776db
MH
1977
1978 chunk_end = ram_chunk_end(block, chunk + chunks);
1979
1980 if (!rdma->pin_all) {
1981#ifdef RDMA_UNREGISTRATION_EXAMPLE
1982 qemu_rdma_unregister_waiting(rdma);
1983#endif
1984 }
1985
1986 while (test_bit(chunk, block->transit_bitmap)) {
1987 (void)count;
733252de 1988 trace_qemu_rdma_write_one_block(count++, current_index, chunk,
2da776db
MH
1989 sge.addr, length, rdma->nb_sent, block->nb_chunks);
1990
88571882 1991 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
2da776db
MH
1992
1993 if (ret < 0) {
733252de 1994 error_report("Failed to Wait for previous write to complete "
2da776db 1995 "block %d chunk %" PRIu64
733252de 1996 " current %" PRIu64 " len %" PRIu64 " %d",
2da776db
MH
1997 current_index, chunk, sge.addr, length, rdma->nb_sent);
1998 return ret;
1999 }
2000 }
2001
2002 if (!rdma->pin_all || !block->is_ram_block) {
2003 if (!block->remote_keys[chunk]) {
2004 /*
2005 * This chunk has not yet been registered, so first check to see
2006 * if the entire chunk is zero. If so, tell the other size to
2007 * memset() + madvise() the entire chunk without RDMA.
2008 */
2009
a1febc49 2010 if (buffer_is_zero((void *)(uintptr_t)sge.addr, length)) {
2da776db
MH
2011 RDMACompress comp = {
2012 .offset = current_addr,
2013 .value = 0,
2014 .block_idx = current_index,
2015 .length = length,
2016 };
2017
2018 head.len = sizeof(comp);
2019 head.type = RDMA_CONTROL_COMPRESS;
2020
733252de
DDAG
2021 trace_qemu_rdma_write_one_zero(chunk, sge.length,
2022 current_index, current_addr);
2da776db 2023
b12f7777 2024 compress_to_network(rdma, &comp);
2da776db
MH
2025 ret = qemu_rdma_exchange_send(rdma, &head,
2026 (uint8_t *) &comp, NULL, NULL, NULL);
2027
2028 if (ret < 0) {
2029 return -EIO;
2030 }
2031
2032 acct_update_position(f, sge.length, true);
2033
2034 return 1;
2035 }
2036
2037 /*
2038 * Otherwise, tell other side to register.
2039 */
2040 reg.current_index = current_index;
2041 if (block->is_ram_block) {
2042 reg.key.current_addr = current_addr;
2043 } else {
2044 reg.key.chunk = chunk;
2045 }
2046 reg.chunks = chunks;
2047
733252de
DDAG
2048 trace_qemu_rdma_write_one_sendreg(chunk, sge.length, current_index,
2049 current_addr);
2da776db 2050
b12f7777 2051 register_to_network(rdma, &reg);
2da776db
MH
2052 ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
2053 &resp, &reg_result_idx, NULL);
2054 if (ret < 0) {
2055 return ret;
2056 }
2057
2058 /* try to overlap this single registration with the one we sent. */
3ac040c0 2059 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
2da776db
MH
2060 &sge.lkey, NULL, chunk,
2061 chunk_start, chunk_end)) {
733252de 2062 error_report("cannot get lkey");
2da776db
MH
2063 return -EINVAL;
2064 }
2065
2066 reg_result = (RDMARegisterResult *)
2067 rdma->wr_data[reg_result_idx].control_curr;
2068
2069 network_to_result(reg_result);
2070
733252de
DDAG
2071 trace_qemu_rdma_write_one_recvregres(block->remote_keys[chunk],
2072 reg_result->rkey, chunk);
2da776db
MH
2073
2074 block->remote_keys[chunk] = reg_result->rkey;
2075 block->remote_host_addr = reg_result->host_addr;
2076 } else {
2077 /* already registered before */
3ac040c0 2078 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
2da776db
MH
2079 &sge.lkey, NULL, chunk,
2080 chunk_start, chunk_end)) {
733252de 2081 error_report("cannot get lkey!");
2da776db
MH
2082 return -EINVAL;
2083 }
2084 }
2085
2086 send_wr.wr.rdma.rkey = block->remote_keys[chunk];
2087 } else {
2088 send_wr.wr.rdma.rkey = block->remote_rkey;
2089
3ac040c0 2090 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
2da776db
MH
2091 &sge.lkey, NULL, chunk,
2092 chunk_start, chunk_end)) {
733252de 2093 error_report("cannot get lkey!");
2da776db
MH
2094 return -EINVAL;
2095 }
2096 }
2097
2098 /*
2099 * Encode the ram block index and chunk within this wrid.
2100 * We will use this information at the time of completion
2101 * to figure out which bitmap to check against and then which
2102 * chunk in the bitmap to look for.
2103 */
2104 send_wr.wr_id = qemu_rdma_make_wrid(RDMA_WRID_RDMA_WRITE,
2105 current_index, chunk);
2106
2107 send_wr.opcode = IBV_WR_RDMA_WRITE;
2108 send_wr.send_flags = IBV_SEND_SIGNALED;
2109 send_wr.sg_list = &sge;
2110 send_wr.num_sge = 1;
2111 send_wr.wr.rdma.remote_addr = block->remote_host_addr +
2112 (current_addr - block->offset);
2113
733252de
DDAG
2114 trace_qemu_rdma_write_one_post(chunk, sge.addr, send_wr.wr.rdma.remote_addr,
2115 sge.length);
2da776db
MH
2116
2117 /*
2118 * ibv_post_send() does not return negative error numbers,
2119 * per the specification they are positive - no idea why.
2120 */
2121 ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
2122
2123 if (ret == ENOMEM) {
733252de 2124 trace_qemu_rdma_write_one_queue_full();
88571882 2125 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
2da776db 2126 if (ret < 0) {
733252de
DDAG
2127 error_report("rdma migration: failed to make "
2128 "room in full send queue! %d", ret);
2da776db
MH
2129 return ret;
2130 }
2131
2132 goto retry;
2133
2134 } else if (ret > 0) {
2135 perror("rdma migration: post rdma write failed");
2136 return -ret;
2137 }
2138
2139 set_bit(chunk, block->transit_bitmap);
2140 acct_update_position(f, sge.length, false);
2141 rdma->total_writes++;
2142
2143 return 0;
2144}
2145
2146/*
2147 * Push out any unwritten RDMA operations.
2148 *
2149 * We support sending out multiple chunks at the same time.
2150 * Not all of them need to get signaled in the completion queue.
2151 */
2152static int qemu_rdma_write_flush(QEMUFile *f, RDMAContext *rdma)
2153{
2154 int ret;
2155
2156 if (!rdma->current_length) {
2157 return 0;
2158 }
2159
2160 ret = qemu_rdma_write_one(f, rdma,
2161 rdma->current_index, rdma->current_addr, rdma->current_length);
2162
2163 if (ret < 0) {
2164 return ret;
2165 }
2166
2167 if (ret == 0) {
2168 rdma->nb_sent++;
733252de 2169 trace_qemu_rdma_write_flush(rdma->nb_sent);
2da776db
MH
2170 }
2171
2172 rdma->current_length = 0;
2173 rdma->current_addr = 0;
2174
2175 return 0;
2176}
2177
2178static inline int qemu_rdma_buffer_mergable(RDMAContext *rdma,
2179 uint64_t offset, uint64_t len)
2180{
44b59494
IY
2181 RDMALocalBlock *block;
2182 uint8_t *host_addr;
2183 uint8_t *chunk_end;
2184
2185 if (rdma->current_index < 0) {
2186 return 0;
2187 }
2188
2189 if (rdma->current_chunk < 0) {
2190 return 0;
2191 }
2192
2193 block = &(rdma->local_ram_blocks.block[rdma->current_index]);
2194 host_addr = block->local_host_addr + (offset - block->offset);
2195 chunk_end = ram_chunk_end(block, rdma->current_chunk);
2da776db
MH
2196
2197 if (rdma->current_length == 0) {
2198 return 0;
2199 }
2200
2201 /*
2202 * Only merge into chunk sequentially.
2203 */
2204 if (offset != (rdma->current_addr + rdma->current_length)) {
2205 return 0;
2206 }
2207
2da776db
MH
2208 if (offset < block->offset) {
2209 return 0;
2210 }
2211
2212 if ((offset + len) > (block->offset + block->length)) {
2213 return 0;
2214 }
2215
2da776db
MH
2216 if ((host_addr + len) > chunk_end) {
2217 return 0;
2218 }
2219
2220 return 1;
2221}
2222
2223/*
2224 * We're not actually writing here, but doing three things:
2225 *
2226 * 1. Identify the chunk the buffer belongs to.
2227 * 2. If the chunk is full or the buffer doesn't belong to the current
2228 * chunk, then start a new chunk and flush() the old chunk.
2229 * 3. To keep the hardware busy, we also group chunks into batches
2230 * and only require that a batch gets acknowledged in the completion
2231 * qeueue instead of each individual chunk.
2232 */
2233static int qemu_rdma_write(QEMUFile *f, RDMAContext *rdma,
2234 uint64_t block_offset, uint64_t offset,
2235 uint64_t len)
2236{
2237 uint64_t current_addr = block_offset + offset;
2238 uint64_t index = rdma->current_index;
2239 uint64_t chunk = rdma->current_chunk;
2240 int ret;
2241
2242 /* If we cannot merge it, we flush the current buffer first. */
2243 if (!qemu_rdma_buffer_mergable(rdma, current_addr, len)) {
2244 ret = qemu_rdma_write_flush(f, rdma);
2245 if (ret) {
2246 return ret;
2247 }
2248 rdma->current_length = 0;
2249 rdma->current_addr = current_addr;
2250
2251 ret = qemu_rdma_search_ram_block(rdma, block_offset,
2252 offset, len, &index, &chunk);
2253 if (ret) {
733252de 2254 error_report("ram block search failed");
2da776db
MH
2255 return ret;
2256 }
2257 rdma->current_index = index;
2258 rdma->current_chunk = chunk;
2259 }
2260
2261 /* merge it */
2262 rdma->current_length += len;
2263
2264 /* flush it if buffer is too large */
2265 if (rdma->current_length >= RDMA_MERGE_MAX) {
2266 return qemu_rdma_write_flush(f, rdma);
2267 }
2268
2269 return 0;
2270}
2271
2272static void qemu_rdma_cleanup(RDMAContext *rdma)
2273{
c5e76115 2274 int idx;
2da776db 2275
5a91337c 2276 if (rdma->cm_id && rdma->connected) {
32bce196
DDAG
2277 if ((rdma->error_state ||
2278 migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) &&
2279 !rdma->received_error) {
2da776db
MH
2280 RDMAControlHeader head = { .len = 0,
2281 .type = RDMA_CONTROL_ERROR,
2282 .repeat = 1,
2283 };
733252de 2284 error_report("Early error. Sending error.");
2da776db
MH
2285 qemu_rdma_post_send_control(rdma, NULL, &head);
2286 }
2287
c5e76115 2288 rdma_disconnect(rdma->cm_id);
733252de 2289 trace_qemu_rdma_cleanup_disconnect();
5a91337c 2290 rdma->connected = false;
2da776db
MH
2291 }
2292
a97270ad
DDAG
2293 g_free(rdma->dest_blocks);
2294 rdma->dest_blocks = NULL;
2da776db 2295
1f22364b 2296 for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2da776db
MH
2297 if (rdma->wr_data[idx].control_mr) {
2298 rdma->total_registrations--;
2299 ibv_dereg_mr(rdma->wr_data[idx].control_mr);
2300 }
2301 rdma->wr_data[idx].control_mr = NULL;
2302 }
2303
2304 if (rdma->local_ram_blocks.block) {
2305 while (rdma->local_ram_blocks.nb_blocks) {
03fcab38 2306 rdma_delete_block(rdma, &rdma->local_ram_blocks.block[0]);
2da776db
MH
2307 }
2308 }
2309
80b262e1
PR
2310 if (rdma->qp) {
2311 rdma_destroy_qp(rdma->cm_id);
2312 rdma->qp = NULL;
2313 }
2da776db
MH
2314 if (rdma->cq) {
2315 ibv_destroy_cq(rdma->cq);
2316 rdma->cq = NULL;
2317 }
2318 if (rdma->comp_channel) {
2319 ibv_destroy_comp_channel(rdma->comp_channel);
2320 rdma->comp_channel = NULL;
2321 }
2322 if (rdma->pd) {
2323 ibv_dealloc_pd(rdma->pd);
2324 rdma->pd = NULL;
2325 }
2da776db
MH
2326 if (rdma->cm_id) {
2327 rdma_destroy_id(rdma->cm_id);
2328 rdma->cm_id = NULL;
2329 }
55cc1b59
LC
2330
2331 /* the destination side, listen_id and channel is shared */
80b262e1 2332 if (rdma->listen_id) {
55cc1b59
LC
2333 if (!rdma->is_return_path) {
2334 rdma_destroy_id(rdma->listen_id);
2335 }
80b262e1 2336 rdma->listen_id = NULL;
55cc1b59
LC
2337
2338 if (rdma->channel) {
2339 if (!rdma->is_return_path) {
2340 rdma_destroy_event_channel(rdma->channel);
2341 }
2342 rdma->channel = NULL;
2343 }
80b262e1 2344 }
55cc1b59 2345
2da776db
MH
2346 if (rdma->channel) {
2347 rdma_destroy_event_channel(rdma->channel);
2348 rdma->channel = NULL;
2349 }
e1d0fb37
IY
2350 g_free(rdma->host);
2351 rdma->host = NULL;
2da776db
MH
2352}
2353
2354
bbfb89e3 2355static int qemu_rdma_source_init(RDMAContext *rdma, bool pin_all, Error **errp)
2da776db
MH
2356{
2357 int ret, idx;
2358 Error *local_err = NULL, **temp = &local_err;
2359
2360 /*
2361 * Will be validated against destination's actual capabilities
2362 * after the connect() completes.
2363 */
2364 rdma->pin_all = pin_all;
2365
2366 ret = qemu_rdma_resolve_host(rdma, temp);
2367 if (ret) {
2368 goto err_rdma_source_init;
2369 }
2370
2371 ret = qemu_rdma_alloc_pd_cq(rdma);
2372 if (ret) {
2373 ERROR(temp, "rdma migration: error allocating pd and cq! Your mlock()"
2374 " limits may be too low. Please check $ ulimit -a # and "
66988941 2375 "search for 'ulimit -l' in the output");
2da776db
MH
2376 goto err_rdma_source_init;
2377 }
2378
2379 ret = qemu_rdma_alloc_qp(rdma);
2380 if (ret) {
66988941 2381 ERROR(temp, "rdma migration: error allocating qp!");
2da776db
MH
2382 goto err_rdma_source_init;
2383 }
2384
2385 ret = qemu_rdma_init_ram_blocks(rdma);
2386 if (ret) {
66988941 2387 ERROR(temp, "rdma migration: error initializing ram blocks!");
2da776db
MH
2388 goto err_rdma_source_init;
2389 }
2390
760ff4be
DDAG
2391 /* Build the hash that maps from offset to RAMBlock */
2392 rdma->blockmap = g_hash_table_new(g_direct_hash, g_direct_equal);
2393 for (idx = 0; idx < rdma->local_ram_blocks.nb_blocks; idx++) {
2394 g_hash_table_insert(rdma->blockmap,
2395 (void *)(uintptr_t)rdma->local_ram_blocks.block[idx].offset,
2396 &rdma->local_ram_blocks.block[idx]);
2397 }
2398
1f22364b 2399 for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2da776db
MH
2400 ret = qemu_rdma_reg_control(rdma, idx);
2401 if (ret) {
66988941 2402 ERROR(temp, "rdma migration: error registering %d control!",
2da776db
MH
2403 idx);
2404 goto err_rdma_source_init;
2405 }
2406 }
2407
2408 return 0;
2409
2410err_rdma_source_init:
2411 error_propagate(errp, local_err);
2412 qemu_rdma_cleanup(rdma);
2413 return -1;
2414}
2415
2416static int qemu_rdma_connect(RDMAContext *rdma, Error **errp)
2417{
2418 RDMACapabilities cap = {
2419 .version = RDMA_CONTROL_VERSION_CURRENT,
2420 .flags = 0,
2421 };
2422 struct rdma_conn_param conn_param = { .initiator_depth = 2,
2423 .retry_count = 5,
2424 .private_data = &cap,
2425 .private_data_len = sizeof(cap),
2426 };
2427 struct rdma_cm_event *cm_event;
2428 int ret;
2429
2430 /*
2431 * Only negotiate the capability with destination if the user
2432 * on the source first requested the capability.
2433 */
2434 if (rdma->pin_all) {
733252de 2435 trace_qemu_rdma_connect_pin_all_requested();
2da776db
MH
2436 cap.flags |= RDMA_CAPABILITY_PIN_ALL;
2437 }
2438
2439 caps_to_network(&cap);
2440
9cf2bab2
DDAG
2441 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
2442 if (ret) {
2443 ERROR(errp, "posting second control recv");
2444 goto err_rdma_source_connect;
2445 }
2446
2da776db
MH
2447 ret = rdma_connect(rdma->cm_id, &conn_param);
2448 if (ret) {
2449 perror("rdma_connect");
66988941 2450 ERROR(errp, "connecting to destination!");
2da776db
MH
2451 goto err_rdma_source_connect;
2452 }
2453
2454 ret = rdma_get_cm_event(rdma->channel, &cm_event);
2455 if (ret) {
2456 perror("rdma_get_cm_event after rdma_connect");
66988941 2457 ERROR(errp, "connecting to destination!");
2da776db 2458 rdma_ack_cm_event(cm_event);
2da776db
MH
2459 goto err_rdma_source_connect;
2460 }
2461
2462 if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
2463 perror("rdma_get_cm_event != EVENT_ESTABLISHED after rdma_connect");
66988941 2464 ERROR(errp, "connecting to destination!");
2da776db 2465 rdma_ack_cm_event(cm_event);
2da776db
MH
2466 goto err_rdma_source_connect;
2467 }
5a91337c 2468 rdma->connected = true;
2da776db
MH
2469
2470 memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
2471 network_to_caps(&cap);
2472
2473 /*
2474 * Verify that the *requested* capabilities are supported by the destination
2475 * and disable them otherwise.
2476 */
2477 if (rdma->pin_all && !(cap.flags & RDMA_CAPABILITY_PIN_ALL)) {
2478 ERROR(errp, "Server cannot support pinning all memory. "
66988941 2479 "Will register memory dynamically.");
2da776db
MH
2480 rdma->pin_all = false;
2481 }
2482
733252de 2483 trace_qemu_rdma_connect_pin_all_outcome(rdma->pin_all);
2da776db
MH
2484
2485 rdma_ack_cm_event(cm_event);
2486
2da776db
MH
2487 rdma->control_ready_expected = 1;
2488 rdma->nb_sent = 0;
2489 return 0;
2490
2491err_rdma_source_connect:
2492 qemu_rdma_cleanup(rdma);
2493 return -1;
2494}
2495
2496static int qemu_rdma_dest_init(RDMAContext *rdma, Error **errp)
2497{
1dbd2fd9 2498 int ret, idx;
2da776db
MH
2499 struct rdma_cm_id *listen_id;
2500 char ip[40] = "unknown";
1dbd2fd9 2501 struct rdma_addrinfo *res, *e;
b58c8552 2502 char port_str[16];
2da776db 2503
1f22364b 2504 for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2da776db
MH
2505 rdma->wr_data[idx].control_len = 0;
2506 rdma->wr_data[idx].control_curr = NULL;
2507 }
2508
1dbd2fd9 2509 if (!rdma->host || !rdma->host[0]) {
66988941 2510 ERROR(errp, "RDMA host is not set!");
2da776db
MH
2511 rdma->error_state = -EINVAL;
2512 return -1;
2513 }
2514 /* create CM channel */
2515 rdma->channel = rdma_create_event_channel();
2516 if (!rdma->channel) {
66988941 2517 ERROR(errp, "could not create rdma event channel");
2da776db
MH
2518 rdma->error_state = -EINVAL;
2519 return -1;
2520 }
2521
2522 /* create CM id */
2523 ret = rdma_create_id(rdma->channel, &listen_id, NULL, RDMA_PS_TCP);
2524 if (ret) {
66988941 2525 ERROR(errp, "could not create cm_id!");
2da776db
MH
2526 goto err_dest_init_create_listen_id;
2527 }
2528
b58c8552
MH
2529 snprintf(port_str, 16, "%d", rdma->port);
2530 port_str[15] = '\0';
2da776db 2531
1dbd2fd9
MT
2532 ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res);
2533 if (ret < 0) {
2534 ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host);
2535 goto err_dest_init_bind_addr;
2536 }
6470215b 2537
1dbd2fd9
MT
2538 for (e = res; e != NULL; e = e->ai_next) {
2539 inet_ntop(e->ai_family,
2540 &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip);
2541 trace_qemu_rdma_dest_init_trying(rdma->host, ip);
2542 ret = rdma_bind_addr(listen_id, e->ai_dst_addr);
2543 if (ret) {
2544 continue;
2da776db 2545 }
1dbd2fd9 2546 if (e->ai_family == AF_INET6) {
bbfb89e3 2547 ret = qemu_rdma_broken_ipv6_kernel(listen_id->verbs, errp);
1dbd2fd9
MT
2548 if (ret) {
2549 continue;
6470215b
MH
2550 }
2551 }
1dbd2fd9
MT
2552 break;
2553 }
b58c8552 2554
1dbd2fd9 2555 if (!e) {
6470215b
MH
2556 ERROR(errp, "Error: could not rdma_bind_addr!");
2557 goto err_dest_init_bind_addr;
2da776db 2558 }
2da776db
MH
2559
2560 rdma->listen_id = listen_id;
2561 qemu_rdma_dump_gid("dest_init", listen_id);
2562 return 0;
2563
2564err_dest_init_bind_addr:
2565 rdma_destroy_id(listen_id);
2566err_dest_init_create_listen_id:
2567 rdma_destroy_event_channel(rdma->channel);
2568 rdma->channel = NULL;
2569 rdma->error_state = ret;
2570 return ret;
2571
2572}
2573
55cc1b59
LC
2574static void qemu_rdma_return_path_dest_init(RDMAContext *rdma_return_path,
2575 RDMAContext *rdma)
2576{
2577 int idx;
2578
2579 for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2580 rdma_return_path->wr_data[idx].control_len = 0;
2581 rdma_return_path->wr_data[idx].control_curr = NULL;
2582 }
2583
2584 /*the CM channel and CM id is shared*/
2585 rdma_return_path->channel = rdma->channel;
2586 rdma_return_path->listen_id = rdma->listen_id;
2587
2588 rdma->return_path = rdma_return_path;
2589 rdma_return_path->return_path = rdma;
2590 rdma_return_path->is_return_path = true;
2591}
2592
2da776db
MH
2593static void *qemu_rdma_data_init(const char *host_port, Error **errp)
2594{
2595 RDMAContext *rdma = NULL;
2596 InetSocketAddress *addr;
2597
2598 if (host_port) {
97f3ad35 2599 rdma = g_new0(RDMAContext, 1);
2da776db
MH
2600 rdma->current_index = -1;
2601 rdma->current_chunk = -1;
2602
0785bd7a
MA
2603 addr = g_new(InetSocketAddress, 1);
2604 if (!inet_parse(addr, host_port, NULL)) {
2da776db
MH
2605 rdma->port = atoi(addr->port);
2606 rdma->host = g_strdup(addr->host);
2607 } else {
2608 ERROR(errp, "bad RDMA migration address '%s'", host_port);
2609 g_free(rdma);
e325b49a 2610 rdma = NULL;
2da776db 2611 }
e325b49a
MH
2612
2613 qapi_free_InetSocketAddress(addr);
2da776db
MH
2614 }
2615
2616 return rdma;
2617}
2618
2619/*
2620 * QEMUFile interface to the control channel.
2621 * SEND messages for control only.
971ae6ef 2622 * VM's ram is handled with regular RDMA messages.
2da776db 2623 */
6ddd2d76
DB
2624static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
2625 const struct iovec *iov,
2626 size_t niov,
2627 int *fds,
2628 size_t nfds,
2629 Error **errp)
2630{
2631 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2632 QEMUFile *f = rioc->file;
2633 RDMAContext *rdma = rioc->rdma;
2da776db 2634 int ret;
6ddd2d76
DB
2635 ssize_t done = 0;
2636 size_t i;
f38f6d41 2637 size_t len = 0;
2da776db
MH
2638
2639 CHECK_ERROR_STATE();
2640
2641 /*
2642 * Push out any writes that
971ae6ef 2643 * we're queued up for VM's ram.
2da776db
MH
2644 */
2645 ret = qemu_rdma_write_flush(f, rdma);
2646 if (ret < 0) {
2647 rdma->error_state = ret;
2648 return ret;
2649 }
2650
6ddd2d76
DB
2651 for (i = 0; i < niov; i++) {
2652 size_t remaining = iov[i].iov_len;
2653 uint8_t * data = (void *)iov[i].iov_base;
2654 while (remaining) {
2655 RDMAControlHeader head;
2da776db 2656
f38f6d41
LC
2657 len = MIN(remaining, RDMA_SEND_INCREMENT);
2658 remaining -= len;
2da776db 2659
f38f6d41 2660 head.len = len;
6ddd2d76 2661 head.type = RDMA_CONTROL_QEMU_FILE;
2da776db 2662
6ddd2d76 2663 ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL);
2da776db 2664
6ddd2d76
DB
2665 if (ret < 0) {
2666 rdma->error_state = ret;
2667 return ret;
2668 }
2da776db 2669
f38f6d41
LC
2670 data += len;
2671 done += len;
6ddd2d76 2672 }
2da776db
MH
2673 }
2674
6ddd2d76 2675 return done;
2da776db
MH
2676}
2677
2678static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf,
a202a4c0 2679 size_t size, int idx)
2da776db
MH
2680{
2681 size_t len = 0;
2682
2683 if (rdma->wr_data[idx].control_len) {
733252de 2684 trace_qemu_rdma_fill(rdma->wr_data[idx].control_len, size);
2da776db
MH
2685
2686 len = MIN(size, rdma->wr_data[idx].control_len);
2687 memcpy(buf, rdma->wr_data[idx].control_curr, len);
2688 rdma->wr_data[idx].control_curr += len;
2689 rdma->wr_data[idx].control_len -= len;
2690 }
2691
2692 return len;
2693}
2694
2695/*
2696 * QEMUFile interface to the control channel.
2697 * RDMA links don't use bytestreams, so we have to
2698 * return bytes to QEMUFile opportunistically.
2699 */
6ddd2d76
DB
2700static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
2701 const struct iovec *iov,
2702 size_t niov,
2703 int **fds,
2704 size_t *nfds,
2705 Error **errp)
2706{
2707 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2708 RDMAContext *rdma = rioc->rdma;
2da776db
MH
2709 RDMAControlHeader head;
2710 int ret = 0;
6ddd2d76
DB
2711 ssize_t i;
2712 size_t done = 0;
2da776db
MH
2713
2714 CHECK_ERROR_STATE();
2715
6ddd2d76
DB
2716 for (i = 0; i < niov; i++) {
2717 size_t want = iov[i].iov_len;
2718 uint8_t *data = (void *)iov[i].iov_base;
2da776db 2719
6ddd2d76
DB
2720 /*
2721 * First, we hold on to the last SEND message we
2722 * were given and dish out the bytes until we run
2723 * out of bytes.
2724 */
2725 ret = qemu_rdma_fill(rioc->rdma, data, want, 0);
2726 done += ret;
2727 want -= ret;
2728 /* Got what we needed, so go to next iovec */
2729 if (want == 0) {
2730 continue;
2731 }
2da776db 2732
6ddd2d76
DB
2733 /* If we got any data so far, then don't wait
2734 * for more, just return what we have */
2735 if (done > 0) {
2736 break;
2737 }
2da776db 2738
6ddd2d76
DB
2739
2740 /* We've got nothing at all, so lets wait for
2741 * more to arrive
2742 */
2743 ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE);
2744
2745 if (ret < 0) {
2746 rdma->error_state = ret;
2747 return ret;
2748 }
2749
2750 /*
2751 * SEND was received with new bytes, now try again.
2752 */
2753 ret = qemu_rdma_fill(rioc->rdma, data, want, 0);
2754 done += ret;
2755 want -= ret;
2756
2757 /* Still didn't get enough, so lets just return */
2758 if (want) {
2759 if (done == 0) {
2760 return QIO_CHANNEL_ERR_BLOCK;
2761 } else {
2762 break;
2763 }
2764 }
2765 }
f38f6d41 2766 return done;
2da776db
MH
2767}
2768
2769/*
2770 * Block until all the outstanding chunks have been delivered by the hardware.
2771 */
2772static int qemu_rdma_drain_cq(QEMUFile *f, RDMAContext *rdma)
2773{
2774 int ret;
2775
2776 if (qemu_rdma_write_flush(f, rdma) < 0) {
2777 return -EIO;
2778 }
2779
2780 while (rdma->nb_sent) {
88571882 2781 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
2da776db 2782 if (ret < 0) {
733252de 2783 error_report("rdma migration: complete polling error!");
2da776db
MH
2784 return -EIO;
2785 }
2786 }
2787
2788 qemu_rdma_unregister_waiting(rdma);
2789
2790 return 0;
2791}
2792
6ddd2d76
DB
2793
2794static int qio_channel_rdma_set_blocking(QIOChannel *ioc,
2795 bool blocking,
2796 Error **errp)
2797{
2798 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2799 /* XXX we should make readv/writev actually honour this :-) */
2800 rioc->blocking = blocking;
2801 return 0;
2802}
2803
2804
2805typedef struct QIOChannelRDMASource QIOChannelRDMASource;
2806struct QIOChannelRDMASource {
2807 GSource parent;
2808 QIOChannelRDMA *rioc;
2809 GIOCondition condition;
2810};
2811
2812static gboolean
2813qio_channel_rdma_source_prepare(GSource *source,
2814 gint *timeout)
2815{
2816 QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
2817 RDMAContext *rdma = rsource->rioc->rdma;
2818 GIOCondition cond = 0;
2819 *timeout = -1;
2820
2821 if (rdma->wr_data[0].control_len) {
2822 cond |= G_IO_IN;
2823 }
2824 cond |= G_IO_OUT;
2825
2826 return cond & rsource->condition;
2827}
2828
2829static gboolean
2830qio_channel_rdma_source_check(GSource *source)
2831{
2832 QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
2833 RDMAContext *rdma = rsource->rioc->rdma;
2834 GIOCondition cond = 0;
2835
2836 if (rdma->wr_data[0].control_len) {
2837 cond |= G_IO_IN;
2838 }
2839 cond |= G_IO_OUT;
2840
2841 return cond & rsource->condition;
2842}
2843
2844static gboolean
2845qio_channel_rdma_source_dispatch(GSource *source,
2846 GSourceFunc callback,
2847 gpointer user_data)
2848{
2849 QIOChannelFunc func = (QIOChannelFunc)callback;
2850 QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
2851 RDMAContext *rdma = rsource->rioc->rdma;
2852 GIOCondition cond = 0;
2853
2854 if (rdma->wr_data[0].control_len) {
2855 cond |= G_IO_IN;
2856 }
2857 cond |= G_IO_OUT;
2858
2859 return (*func)(QIO_CHANNEL(rsource->rioc),
2860 (cond & rsource->condition),
2861 user_data);
2862}
2863
2864static void
2865qio_channel_rdma_source_finalize(GSource *source)
2866{
2867 QIOChannelRDMASource *ssource = (QIOChannelRDMASource *)source;
2868
2869 object_unref(OBJECT(ssource->rioc));
2870}
2871
2872GSourceFuncs qio_channel_rdma_source_funcs = {
2873 qio_channel_rdma_source_prepare,
2874 qio_channel_rdma_source_check,
2875 qio_channel_rdma_source_dispatch,
2876 qio_channel_rdma_source_finalize
2877};
2878
2879static GSource *qio_channel_rdma_create_watch(QIOChannel *ioc,
2880 GIOCondition condition)
2da776db 2881{
6ddd2d76
DB
2882 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2883 QIOChannelRDMASource *ssource;
2884 GSource *source;
2885
2886 source = g_source_new(&qio_channel_rdma_source_funcs,
2887 sizeof(QIOChannelRDMASource));
2888 ssource = (QIOChannelRDMASource *)source;
2889
2890 ssource->rioc = rioc;
2891 object_ref(OBJECT(rioc));
2892
2893 ssource->condition = condition;
2894
2895 return source;
2896}
2897
2898
2899static int qio_channel_rdma_close(QIOChannel *ioc,
2900 Error **errp)
2901{
2902 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
733252de 2903 trace_qemu_rdma_close();
6ddd2d76 2904 if (rioc->rdma) {
12c67ffb
DDAG
2905 if (!rioc->rdma->error_state) {
2906 rioc->rdma->error_state = qemu_file_get_error(rioc->file);
2907 }
6ddd2d76
DB
2908 qemu_rdma_cleanup(rioc->rdma);
2909 g_free(rioc->rdma);
2910 rioc->rdma = NULL;
2da776db 2911 }
2da776db
MH
2912 return 0;
2913}
2914
2915/*
2916 * Parameters:
2917 * @offset == 0 :
2918 * This means that 'block_offset' is a full virtual address that does not
2919 * belong to a RAMBlock of the virtual machine and instead
2920 * represents a private malloc'd memory area that the caller wishes to
2921 * transfer.
2922 *
2923 * @offset != 0 :
2924 * Offset is an offset to be added to block_offset and used
2925 * to also lookup the corresponding RAMBlock.
2926 *
2927 * @size > 0 :
2928 * Initiate an transfer this size.
2929 *
2930 * @size == 0 :
2931 * A 'hint' or 'advice' that means that we wish to speculatively
2932 * and asynchronously unregister this memory. In this case, there is no
52f35022 2933 * guarantee that the unregister will actually happen, for example,
2da776db
MH
2934 * if the memory is being actively transmitted. Additionally, the memory
2935 * may be re-registered at any future time if a write within the same
2936 * chunk was requested again, even if you attempted to unregister it
2937 * here.
2938 *
2939 * @size < 0 : TODO, not yet supported
2940 * Unregister the memory NOW. This means that the caller does not
2941 * expect there to be any future RDMA transfers and we just want to clean
2942 * things up. This is used in case the upper layer owns the memory and
2943 * cannot wait for qemu_fclose() to occur.
2944 *
2945 * @bytes_sent : User-specificed pointer to indicate how many bytes were
2946 * sent. Usually, this will not be more than a few bytes of
2947 * the protocol because most transfers are sent asynchronously.
2948 */
2949static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque,
2950 ram_addr_t block_offset, ram_addr_t offset,
6e1dea46 2951 size_t size, uint64_t *bytes_sent)
2da776db 2952{
6ddd2d76
DB
2953 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
2954 RDMAContext *rdma = rioc->rdma;
2da776db
MH
2955 int ret;
2956
2957 CHECK_ERROR_STATE();
2958
ccb7e1b5
LC
2959 if (migrate_get_current()->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) {
2960 return RAM_SAVE_CONTROL_NOT_SUPP;
2961 }
2962
2da776db
MH
2963 qemu_fflush(f);
2964
2965 if (size > 0) {
2966 /*
2967 * Add this page to the current 'chunk'. If the chunk
2968 * is full, or the page doen't belong to the current chunk,
2969 * an actual RDMA write will occur and a new chunk will be formed.
2970 */
2971 ret = qemu_rdma_write(f, rdma, block_offset, offset, size);
2972 if (ret < 0) {
733252de 2973 error_report("rdma migration: write error! %d", ret);
2da776db
MH
2974 goto err;
2975 }
2976
2977 /*
2978 * We always return 1 bytes because the RDMA
2979 * protocol is completely asynchronous. We do not yet know
2980 * whether an identified chunk is zero or not because we're
2981 * waiting for other pages to potentially be merged with
2982 * the current chunk. So, we have to call qemu_update_position()
2983 * later on when the actual write occurs.
2984 */
2985 if (bytes_sent) {
2986 *bytes_sent = 1;
2987 }
2988 } else {
2989 uint64_t index, chunk;
2990
2991 /* TODO: Change QEMUFileOps prototype to be signed: size_t => long
2992 if (size < 0) {
2993 ret = qemu_rdma_drain_cq(f, rdma);
2994 if (ret < 0) {
2995 fprintf(stderr, "rdma: failed to synchronously drain"
2996 " completion queue before unregistration.\n");
2997 goto err;
2998 }
2999 }
3000 */
3001
3002 ret = qemu_rdma_search_ram_block(rdma, block_offset,
3003 offset, size, &index, &chunk);
3004
3005 if (ret) {
733252de 3006 error_report("ram block search failed");
2da776db
MH
3007 goto err;
3008 }
3009
3010 qemu_rdma_signal_unregister(rdma, index, chunk, 0);
3011
3012 /*
52f35022 3013 * TODO: Synchronous, guaranteed unregistration (should not occur during
2da776db
MH
3014 * fast-path). Otherwise, unregisters will process on the next call to
3015 * qemu_rdma_drain_cq()
3016 if (size < 0) {
3017 qemu_rdma_unregister_waiting(rdma);
3018 }
3019 */
3020 }
3021
3022 /*
3023 * Drain the Completion Queue if possible, but do not block,
3024 * just poll.
3025 *
3026 * If nothing to poll, the end of the iteration will do this
3027 * again to make sure we don't overflow the request queue.
3028 */
3029 while (1) {
3030 uint64_t wr_id, wr_id_in;
88571882 3031 int ret = qemu_rdma_poll(rdma, &wr_id_in, NULL);
2da776db 3032 if (ret < 0) {
733252de 3033 error_report("rdma migration: polling error! %d", ret);
2da776db
MH
3034 goto err;
3035 }
3036
3037 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
3038
3039 if (wr_id == RDMA_WRID_NONE) {
3040 break;
3041 }
3042 }
3043
3044 return RAM_SAVE_CONTROL_DELAYED;
3045err:
3046 rdma->error_state = ret;
3047 return ret;
3048}
3049
55cc1b59
LC
3050static void rdma_accept_incoming_migration(void *opaque);
3051
2da776db
MH
3052static int qemu_rdma_accept(RDMAContext *rdma)
3053{
3054 RDMACapabilities cap;
3055 struct rdma_conn_param conn_param = {
3056 .responder_resources = 2,
3057 .private_data = &cap,
3058 .private_data_len = sizeof(cap),
3059 };
3060 struct rdma_cm_event *cm_event;
3061 struct ibv_context *verbs;
3062 int ret = -EINVAL;
3063 int idx;
3064
3065 ret = rdma_get_cm_event(rdma->channel, &cm_event);
3066 if (ret) {
3067 goto err_rdma_dest_wait;
3068 }
3069
3070 if (cm_event->event != RDMA_CM_EVENT_CONNECT_REQUEST) {
3071 rdma_ack_cm_event(cm_event);
3072 goto err_rdma_dest_wait;
3073 }
3074
3075 memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
3076
3077 network_to_caps(&cap);
3078
3079 if (cap.version < 1 || cap.version > RDMA_CONTROL_VERSION_CURRENT) {
733252de 3080 error_report("Unknown source RDMA version: %d, bailing...",
2da776db
MH
3081 cap.version);
3082 rdma_ack_cm_event(cm_event);
3083 goto err_rdma_dest_wait;
3084 }
3085
3086 /*
3087 * Respond with only the capabilities this version of QEMU knows about.
3088 */
3089 cap.flags &= known_capabilities;
3090
3091 /*
3092 * Enable the ones that we do know about.
3093 * Add other checks here as new ones are introduced.
3094 */
3095 if (cap.flags & RDMA_CAPABILITY_PIN_ALL) {
3096 rdma->pin_all = true;
3097 }
3098
3099 rdma->cm_id = cm_event->id;
3100 verbs = cm_event->id->verbs;
3101
3102 rdma_ack_cm_event(cm_event);
3103
733252de 3104 trace_qemu_rdma_accept_pin_state(rdma->pin_all);
2da776db
MH
3105
3106 caps_to_network(&cap);
3107
733252de 3108 trace_qemu_rdma_accept_pin_verbsc(verbs);
2da776db
MH
3109
3110 if (!rdma->verbs) {
3111 rdma->verbs = verbs;
3112 } else if (rdma->verbs != verbs) {
733252de
DDAG
3113 error_report("ibv context not matching %p, %p!", rdma->verbs,
3114 verbs);
2da776db
MH
3115 goto err_rdma_dest_wait;
3116 }
3117
3118 qemu_rdma_dump_id("dest_init", verbs);
3119
3120 ret = qemu_rdma_alloc_pd_cq(rdma);
3121 if (ret) {
733252de 3122 error_report("rdma migration: error allocating pd and cq!");
2da776db
MH
3123 goto err_rdma_dest_wait;
3124 }
3125
3126 ret = qemu_rdma_alloc_qp(rdma);
3127 if (ret) {
733252de 3128 error_report("rdma migration: error allocating qp!");
2da776db
MH
3129 goto err_rdma_dest_wait;
3130 }
3131
3132 ret = qemu_rdma_init_ram_blocks(rdma);
3133 if (ret) {
733252de 3134 error_report("rdma migration: error initializing ram blocks!");
2da776db
MH
3135 goto err_rdma_dest_wait;
3136 }
3137
1f22364b 3138 for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2da776db
MH
3139 ret = qemu_rdma_reg_control(rdma, idx);
3140 if (ret) {
733252de 3141 error_report("rdma: error registering %d control", idx);
2da776db
MH
3142 goto err_rdma_dest_wait;
3143 }
3144 }
3145
55cc1b59
LC
3146 /* Accept the second connection request for return path */
3147 if (migrate_postcopy() && !rdma->is_return_path) {
3148 qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
3149 NULL,
3150 (void *)(intptr_t)rdma->return_path);
3151 } else {
3152 qemu_set_fd_handler(rdma->channel->fd, NULL, NULL, NULL);
3153 }
2da776db
MH
3154
3155 ret = rdma_accept(rdma->cm_id, &conn_param);
3156 if (ret) {
733252de 3157 error_report("rdma_accept returns %d", ret);
2da776db
MH
3158 goto err_rdma_dest_wait;
3159 }
3160
3161 ret = rdma_get_cm_event(rdma->channel, &cm_event);
3162 if (ret) {
733252de 3163 error_report("rdma_accept get_cm_event failed %d", ret);
2da776db
MH
3164 goto err_rdma_dest_wait;
3165 }
3166
3167 if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
733252de 3168 error_report("rdma_accept not event established");
2da776db
MH
3169 rdma_ack_cm_event(cm_event);
3170 goto err_rdma_dest_wait;
3171 }
3172
3173 rdma_ack_cm_event(cm_event);
5a91337c 3174 rdma->connected = true;
2da776db 3175
87772639 3176 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
2da776db 3177 if (ret) {
733252de 3178 error_report("rdma migration: error posting second control recv");
2da776db
MH
3179 goto err_rdma_dest_wait;
3180 }
3181
3182 qemu_rdma_dump_gid("dest_connect", rdma->cm_id);
3183
3184 return 0;
3185
3186err_rdma_dest_wait:
3187 rdma->error_state = ret;
3188 qemu_rdma_cleanup(rdma);
3189 return ret;
3190}
3191
e4d63320
DDAG
3192static int dest_ram_sort_func(const void *a, const void *b)
3193{
3194 unsigned int a_index = ((const RDMALocalBlock *)a)->src_index;
3195 unsigned int b_index = ((const RDMALocalBlock *)b)->src_index;
3196
3197 return (a_index < b_index) ? -1 : (a_index != b_index);
3198}
3199
2da776db
MH
3200/*
3201 * During each iteration of the migration, we listen for instructions
3202 * by the source VM to perform dynamic page registrations before they
3203 * can perform RDMA operations.
3204 *
3205 * We respond with the 'rkey'.
3206 *
3207 * Keep doing this until the source tells us to stop.
3208 */
632e3a5c 3209static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque)
2da776db
MH
3210{
3211 RDMAControlHeader reg_resp = { .len = sizeof(RDMARegisterResult),
3212 .type = RDMA_CONTROL_REGISTER_RESULT,
3213 .repeat = 0,
3214 };
3215 RDMAControlHeader unreg_resp = { .len = 0,
3216 .type = RDMA_CONTROL_UNREGISTER_FINISHED,
3217 .repeat = 0,
3218 };
3219 RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT,
3220 .repeat = 1 };
6ddd2d76
DB
3221 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
3222 RDMAContext *rdma = rioc->rdma;
2da776db
MH
3223 RDMALocalBlocks *local = &rdma->local_ram_blocks;
3224 RDMAControlHeader head;
3225 RDMARegister *reg, *registers;
3226 RDMACompress *comp;
3227 RDMARegisterResult *reg_result;
3228 static RDMARegisterResult results[RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE];
3229 RDMALocalBlock *block;
3230 void *host_addr;
3231 int ret = 0;
3232 int idx = 0;
3233 int count = 0;
3234 int i = 0;
3235
3236 CHECK_ERROR_STATE();
3237
3238 do {
632e3a5c 3239 trace_qemu_rdma_registration_handle_wait();
2da776db
MH
3240
3241 ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_NONE);
3242
3243 if (ret < 0) {
3244 break;
3245 }
3246
3247 if (head.repeat > RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE) {
733252de
DDAG
3248 error_report("rdma: Too many requests in this message (%d)."
3249 "Bailing.", head.repeat);
2da776db
MH
3250 ret = -EIO;
3251 break;
3252 }
3253
3254 switch (head.type) {
3255 case RDMA_CONTROL_COMPRESS:
3256 comp = (RDMACompress *) rdma->wr_data[idx].control_curr;
3257 network_to_compress(comp);
3258
733252de
DDAG
3259 trace_qemu_rdma_registration_handle_compress(comp->length,
3260 comp->block_idx,
3261 comp->offset);
afcddefd
DDAG
3262 if (comp->block_idx >= rdma->local_ram_blocks.nb_blocks) {
3263 error_report("rdma: 'compress' bad block index %u (vs %d)",
3264 (unsigned int)comp->block_idx,
3265 rdma->local_ram_blocks.nb_blocks);
3266 ret = -EIO;
24b41d66 3267 goto out;
afcddefd 3268 }
2da776db
MH
3269 block = &(rdma->local_ram_blocks.block[comp->block_idx]);
3270
3271 host_addr = block->local_host_addr +
3272 (comp->offset - block->offset);
3273
3274 ram_handle_compressed(host_addr, comp->value, comp->length);
3275 break;
3276
3277 case RDMA_CONTROL_REGISTER_FINISHED:
733252de 3278 trace_qemu_rdma_registration_handle_finished();
2da776db
MH
3279 goto out;
3280
3281 case RDMA_CONTROL_RAM_BLOCKS_REQUEST:
733252de 3282 trace_qemu_rdma_registration_handle_ram_blocks();
2da776db 3283
e4d63320
DDAG
3284 /* Sort our local RAM Block list so it's the same as the source,
3285 * we can do this since we've filled in a src_index in the list
3286 * as we received the RAMBlock list earlier.
3287 */
3288 qsort(rdma->local_ram_blocks.block,
3289 rdma->local_ram_blocks.nb_blocks,
3290 sizeof(RDMALocalBlock), dest_ram_sort_func);
71cd7306
LC
3291 for (i = 0; i < local->nb_blocks; i++) {
3292 local->block[i].index = i;
3293 }
3294
2da776db
MH
3295 if (rdma->pin_all) {
3296 ret = qemu_rdma_reg_whole_ram_blocks(rdma);
3297 if (ret) {
733252de
DDAG
3298 error_report("rdma migration: error dest "
3299 "registering ram blocks");
2da776db
MH
3300 goto out;
3301 }
3302 }
3303
3304 /*
3305 * Dest uses this to prepare to transmit the RAMBlock descriptions
3306 * to the source VM after connection setup.
3307 * Both sides use the "remote" structure to communicate and update
3308 * their "local" descriptions with what was sent.
3309 */
3310 for (i = 0; i < local->nb_blocks; i++) {
a97270ad 3311 rdma->dest_blocks[i].remote_host_addr =
fbce8c25 3312 (uintptr_t)(local->block[i].local_host_addr);
2da776db
MH
3313
3314 if (rdma->pin_all) {
a97270ad 3315 rdma->dest_blocks[i].remote_rkey = local->block[i].mr->rkey;
2da776db
MH
3316 }
3317
a97270ad
DDAG
3318 rdma->dest_blocks[i].offset = local->block[i].offset;
3319 rdma->dest_blocks[i].length = local->block[i].length;
2da776db 3320
a97270ad 3321 dest_block_to_network(&rdma->dest_blocks[i]);
e4d63320
DDAG
3322 trace_qemu_rdma_registration_handle_ram_blocks_loop(
3323 local->block[i].block_name,
3324 local->block[i].offset,
3325 local->block[i].length,
3326 local->block[i].local_host_addr,
3327 local->block[i].src_index);
2da776db
MH
3328 }
3329
3330 blocks.len = rdma->local_ram_blocks.nb_blocks
a97270ad 3331 * sizeof(RDMADestBlock);
2da776db
MH
3332
3333
3334 ret = qemu_rdma_post_send_control(rdma,
a97270ad 3335 (uint8_t *) rdma->dest_blocks, &blocks);
2da776db
MH
3336
3337 if (ret < 0) {
733252de 3338 error_report("rdma migration: error sending remote info");
2da776db
MH
3339 goto out;
3340 }
3341
3342 break;
3343 case RDMA_CONTROL_REGISTER_REQUEST:
733252de 3344 trace_qemu_rdma_registration_handle_register(head.repeat);
2da776db
MH
3345
3346 reg_resp.repeat = head.repeat;
3347 registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
3348
3349 for (count = 0; count < head.repeat; count++) {
3350 uint64_t chunk;
3351 uint8_t *chunk_start, *chunk_end;
3352
3353 reg = &registers[count];
3354 network_to_register(reg);
3355
3356 reg_result = &results[count];
3357
733252de 3358 trace_qemu_rdma_registration_handle_register_loop(count,
2da776db
MH
3359 reg->current_index, reg->key.current_addr, reg->chunks);
3360
afcddefd
DDAG
3361 if (reg->current_index >= rdma->local_ram_blocks.nb_blocks) {
3362 error_report("rdma: 'register' bad block index %u (vs %d)",
3363 (unsigned int)reg->current_index,
3364 rdma->local_ram_blocks.nb_blocks);
3365 ret = -ENOENT;
24b41d66 3366 goto out;
afcddefd 3367 }
2da776db
MH
3368 block = &(rdma->local_ram_blocks.block[reg->current_index]);
3369 if (block->is_ram_block) {
afcddefd
DDAG
3370 if (block->offset > reg->key.current_addr) {
3371 error_report("rdma: bad register address for block %s"
3372 " offset: %" PRIx64 " current_addr: %" PRIx64,
3373 block->block_name, block->offset,
3374 reg->key.current_addr);
3375 ret = -ERANGE;
24b41d66 3376 goto out;
afcddefd 3377 }
2da776db
MH
3378 host_addr = (block->local_host_addr +
3379 (reg->key.current_addr - block->offset));
3380 chunk = ram_chunk_index(block->local_host_addr,
3381 (uint8_t *) host_addr);
3382 } else {
3383 chunk = reg->key.chunk;
3384 host_addr = block->local_host_addr +
3385 (reg->key.chunk * (1UL << RDMA_REG_CHUNK_SHIFT));
afcddefd
DDAG
3386 /* Check for particularly bad chunk value */
3387 if (host_addr < (void *)block->local_host_addr) {
3388 error_report("rdma: bad chunk for block %s"
3389 " chunk: %" PRIx64,
3390 block->block_name, reg->key.chunk);
3391 ret = -ERANGE;
24b41d66 3392 goto out;
afcddefd 3393 }
2da776db
MH
3394 }
3395 chunk_start = ram_chunk_start(block, chunk);
3396 chunk_end = ram_chunk_end(block, chunk + reg->chunks);
3397 if (qemu_rdma_register_and_get_keys(rdma, block,
3ac040c0 3398 (uintptr_t)host_addr, NULL, &reg_result->rkey,
2da776db 3399 chunk, chunk_start, chunk_end)) {
733252de 3400 error_report("cannot get rkey");
2da776db
MH
3401 ret = -EINVAL;
3402 goto out;
3403 }
3404
fbce8c25 3405 reg_result->host_addr = (uintptr_t)block->local_host_addr;
2da776db 3406
733252de
DDAG
3407 trace_qemu_rdma_registration_handle_register_rkey(
3408 reg_result->rkey);
2da776db
MH
3409
3410 result_to_network(reg_result);
3411 }
3412
3413 ret = qemu_rdma_post_send_control(rdma,
3414 (uint8_t *) results, &reg_resp);
3415
3416 if (ret < 0) {
733252de 3417 error_report("Failed to send control buffer");
2da776db
MH
3418 goto out;
3419 }
3420 break;
3421 case RDMA_CONTROL_UNREGISTER_REQUEST:
733252de 3422 trace_qemu_rdma_registration_handle_unregister(head.repeat);
2da776db
MH
3423 unreg_resp.repeat = head.repeat;
3424 registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
3425
3426 for (count = 0; count < head.repeat; count++) {
3427 reg = &registers[count];
3428 network_to_register(reg);
3429
733252de
DDAG
3430 trace_qemu_rdma_registration_handle_unregister_loop(count,
3431 reg->current_index, reg->key.chunk);
2da776db
MH
3432
3433 block = &(rdma->local_ram_blocks.block[reg->current_index]);
3434
3435 ret = ibv_dereg_mr(block->pmr[reg->key.chunk]);
3436 block->pmr[reg->key.chunk] = NULL;
3437
3438 if (ret != 0) {
3439 perror("rdma unregistration chunk failed");
3440 ret = -ret;
3441 goto out;
3442 }
3443
3444 rdma->total_registrations--;
3445
733252de
DDAG
3446 trace_qemu_rdma_registration_handle_unregister_success(
3447 reg->key.chunk);
2da776db
MH
3448 }
3449
3450 ret = qemu_rdma_post_send_control(rdma, NULL, &unreg_resp);
3451
3452 if (ret < 0) {
733252de 3453 error_report("Failed to send control buffer");
2da776db
MH
3454 goto out;
3455 }
3456 break;
3457 case RDMA_CONTROL_REGISTER_RESULT:
733252de 3458 error_report("Invalid RESULT message at dest.");
2da776db
MH
3459 ret = -EIO;
3460 goto out;
3461 default:
482a33c5 3462 error_report("Unknown control message %s", control_desc(head.type));
2da776db
MH
3463 ret = -EIO;
3464 goto out;
3465 }
3466 } while (1);
3467out:
3468 if (ret < 0) {
3469 rdma->error_state = ret;
3470 }
3471 return ret;
3472}
3473
e4d63320
DDAG
3474/* Destination:
3475 * Called via a ram_control_load_hook during the initial RAM load section which
3476 * lists the RAMBlocks by name. This lets us know the order of the RAMBlocks
3477 * on the source.
3478 * We've already built our local RAMBlock list, but not yet sent the list to
3479 * the source.
3480 */
6ddd2d76
DB
3481static int
3482rdma_block_notification_handle(QIOChannelRDMA *rioc, const char *name)
e4d63320 3483{
6ddd2d76 3484 RDMAContext *rdma = rioc->rdma;
e4d63320
DDAG
3485 int curr;
3486 int found = -1;
3487
3488 /* Find the matching RAMBlock in our local list */
3489 for (curr = 0; curr < rdma->local_ram_blocks.nb_blocks; curr++) {
3490 if (!strcmp(rdma->local_ram_blocks.block[curr].block_name, name)) {
3491 found = curr;
3492 break;
3493 }
3494 }
3495
3496 if (found == -1) {
3497 error_report("RAMBlock '%s' not found on destination", name);
3498 return -ENOENT;
3499 }
3500
3501 rdma->local_ram_blocks.block[curr].src_index = rdma->next_src_index;
3502 trace_rdma_block_notification_handle(name, rdma->next_src_index);
3503 rdma->next_src_index++;
3504
3505 return 0;
3506}
3507
632e3a5c
DDAG
3508static int rdma_load_hook(QEMUFile *f, void *opaque, uint64_t flags, void *data)
3509{
3510 switch (flags) {
3511 case RAM_CONTROL_BLOCK_REG:
e4d63320 3512 return rdma_block_notification_handle(opaque, data);
632e3a5c
DDAG
3513
3514 case RAM_CONTROL_HOOK:
3515 return qemu_rdma_registration_handle(f, opaque);
3516
3517 default:
3518 /* Shouldn't be called with any other values */
3519 abort();
3520 }
3521}
3522
2da776db 3523static int qemu_rdma_registration_start(QEMUFile *f, void *opaque,
632e3a5c 3524 uint64_t flags, void *data)
2da776db 3525{
6ddd2d76
DB
3526 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
3527 RDMAContext *rdma = rioc->rdma;
2da776db
MH
3528
3529 CHECK_ERROR_STATE();
3530
ccb7e1b5
LC
3531 if (migrate_get_current()->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) {
3532 return 0;
3533 }
3534
733252de 3535 trace_qemu_rdma_registration_start(flags);
2da776db
MH
3536 qemu_put_be64(f, RAM_SAVE_FLAG_HOOK);
3537 qemu_fflush(f);
3538
3539 return 0;
3540}
3541
3542/*
3543 * Inform dest that dynamic registrations are done for now.
3544 * First, flush writes, if any.
3545 */
3546static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
632e3a5c 3547 uint64_t flags, void *data)
2da776db
MH
3548{
3549 Error *local_err = NULL, **errp = &local_err;
6ddd2d76
DB
3550 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
3551 RDMAContext *rdma = rioc->rdma;
2da776db
MH
3552 RDMAControlHeader head = { .len = 0, .repeat = 1 };
3553 int ret = 0;
3554
3555 CHECK_ERROR_STATE();
3556
ccb7e1b5
LC
3557 if (migrate_get_current()->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) {
3558 return 0;
3559 }
3560
2da776db
MH
3561 qemu_fflush(f);
3562 ret = qemu_rdma_drain_cq(f, rdma);
3563
3564 if (ret < 0) {
3565 goto err;
3566 }
3567
3568 if (flags == RAM_CONTROL_SETUP) {
3569 RDMAControlHeader resp = {.type = RDMA_CONTROL_RAM_BLOCKS_RESULT };
3570 RDMALocalBlocks *local = &rdma->local_ram_blocks;
e4d63320 3571 int reg_result_idx, i, nb_dest_blocks;
2da776db
MH
3572
3573 head.type = RDMA_CONTROL_RAM_BLOCKS_REQUEST;
733252de 3574 trace_qemu_rdma_registration_stop_ram();
2da776db
MH
3575
3576 /*
3577 * Make sure that we parallelize the pinning on both sides.
3578 * For very large guests, doing this serially takes a really
3579 * long time, so we have to 'interleave' the pinning locally
3580 * with the control messages by performing the pinning on this
3581 * side before we receive the control response from the other
3582 * side that the pinning has completed.
3583 */
3584 ret = qemu_rdma_exchange_send(rdma, &head, NULL, &resp,
3585 &reg_result_idx, rdma->pin_all ?
3586 qemu_rdma_reg_whole_ram_blocks : NULL);
3587 if (ret < 0) {
66988941 3588 ERROR(errp, "receiving remote info!");
2da776db
MH
3589 return ret;
3590 }
3591
a97270ad 3592 nb_dest_blocks = resp.len / sizeof(RDMADestBlock);
2da776db
MH
3593
3594 /*
3595 * The protocol uses two different sets of rkeys (mutually exclusive):
3596 * 1. One key to represent the virtual address of the entire ram block.
3597 * (dynamic chunk registration disabled - pin everything with one rkey.)
3598 * 2. One to represent individual chunks within a ram block.
3599 * (dynamic chunk registration enabled - pin individual chunks.)
3600 *
3601 * Once the capability is successfully negotiated, the destination transmits
3602 * the keys to use (or sends them later) including the virtual addresses
3603 * and then propagates the remote ram block descriptions to his local copy.
3604 */
3605
a97270ad 3606 if (local->nb_blocks != nb_dest_blocks) {
e4d63320 3607 ERROR(errp, "ram blocks mismatch (Number of blocks %d vs %d) "
2da776db 3608 "Your QEMU command line parameters are probably "
e4d63320
DDAG
3609 "not identical on both the source and destination.",
3610 local->nb_blocks, nb_dest_blocks);
ef4b722d 3611 rdma->error_state = -EINVAL;
2da776db
MH
3612 return -EINVAL;
3613 }
3614
885e8f98 3615 qemu_rdma_move_header(rdma, reg_result_idx, &resp);
a97270ad 3616 memcpy(rdma->dest_blocks,
885e8f98 3617 rdma->wr_data[reg_result_idx].control_curr, resp.len);
a97270ad
DDAG
3618 for (i = 0; i < nb_dest_blocks; i++) {
3619 network_to_dest_block(&rdma->dest_blocks[i]);
2da776db 3620
e4d63320
DDAG
3621 /* We require that the blocks are in the same order */
3622 if (rdma->dest_blocks[i].length != local->block[i].length) {
3623 ERROR(errp, "Block %s/%d has a different length %" PRIu64
3624 "vs %" PRIu64, local->block[i].block_name, i,
3625 local->block[i].length,
3626 rdma->dest_blocks[i].length);
ef4b722d 3627 rdma->error_state = -EINVAL;
2da776db
MH
3628 return -EINVAL;
3629 }
e4d63320
DDAG
3630 local->block[i].remote_host_addr =
3631 rdma->dest_blocks[i].remote_host_addr;
3632 local->block[i].remote_rkey = rdma->dest_blocks[i].remote_rkey;
2da776db
MH
3633 }
3634 }
3635
733252de 3636 trace_qemu_rdma_registration_stop(flags);
2da776db
MH
3637
3638 head.type = RDMA_CONTROL_REGISTER_FINISHED;
3639 ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL);
3640
3641 if (ret < 0) {
3642 goto err;
3643 }
3644
3645 return 0;
3646err:
3647 rdma->error_state = ret;
3648 return ret;
3649}
3650
0436e09f 3651static const QEMUFileHooks rdma_read_hooks = {
632e3a5c 3652 .hook_ram_load = rdma_load_hook,
2da776db
MH
3653};
3654
0436e09f 3655static const QEMUFileHooks rdma_write_hooks = {
2da776db
MH
3656 .before_ram_iterate = qemu_rdma_registration_start,
3657 .after_ram_iterate = qemu_rdma_registration_stop,
3658 .save_page = qemu_rdma_save_page,
3659};
3660
6ddd2d76
DB
3661
3662static void qio_channel_rdma_finalize(Object *obj)
3663{
3664 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(obj);
3665 if (rioc->rdma) {
3666 qemu_rdma_cleanup(rioc->rdma);
3667 g_free(rioc->rdma);
3668 rioc->rdma = NULL;
3669 }
3670}
3671
3672static void qio_channel_rdma_class_init(ObjectClass *klass,
3673 void *class_data G_GNUC_UNUSED)
3674{
3675 QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass);
3676
3677 ioc_klass->io_writev = qio_channel_rdma_writev;
3678 ioc_klass->io_readv = qio_channel_rdma_readv;
3679 ioc_klass->io_set_blocking = qio_channel_rdma_set_blocking;
3680 ioc_klass->io_close = qio_channel_rdma_close;
3681 ioc_klass->io_create_watch = qio_channel_rdma_create_watch;
3682}
3683
3684static const TypeInfo qio_channel_rdma_info = {
3685 .parent = TYPE_QIO_CHANNEL,
3686 .name = TYPE_QIO_CHANNEL_RDMA,
3687 .instance_size = sizeof(QIOChannelRDMA),
3688 .instance_finalize = qio_channel_rdma_finalize,
3689 .class_init = qio_channel_rdma_class_init,
3690};
3691
3692static void qio_channel_rdma_register_types(void)
3693{
3694 type_register_static(&qio_channel_rdma_info);
3695}
3696
3697type_init(qio_channel_rdma_register_types);
3698
3699static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
2da776db 3700{
6ddd2d76 3701 QIOChannelRDMA *rioc;
2da776db
MH
3702
3703 if (qemu_file_mode_is_not_valid(mode)) {
3704 return NULL;
3705 }
3706
6ddd2d76
DB
3707 rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
3708 rioc->rdma = rdma;
2da776db
MH
3709
3710 if (mode[0] == 'w') {
6ddd2d76
DB
3711 rioc->file = qemu_fopen_channel_output(QIO_CHANNEL(rioc));
3712 qemu_file_set_hooks(rioc->file, &rdma_write_hooks);
2da776db 3713 } else {
6ddd2d76
DB
3714 rioc->file = qemu_fopen_channel_input(QIO_CHANNEL(rioc));
3715 qemu_file_set_hooks(rioc->file, &rdma_read_hooks);
2da776db
MH
3716 }
3717
6ddd2d76 3718 return rioc->file;
2da776db
MH
3719}
3720
3721static void rdma_accept_incoming_migration(void *opaque)
3722{
3723 RDMAContext *rdma = opaque;
3724 int ret;
3725 QEMUFile *f;
3726 Error *local_err = NULL, **errp = &local_err;
3727
24ec68ef 3728 trace_qemu_rdma_accept_incoming_migration();
2da776db
MH
3729 ret = qemu_rdma_accept(rdma);
3730
3731 if (ret) {
66988941 3732 ERROR(errp, "RDMA Migration initialization failed!");
2da776db
MH
3733 return;
3734 }
3735
24ec68ef 3736 trace_qemu_rdma_accept_incoming_migration_accepted();
2da776db 3737
55cc1b59
LC
3738 if (rdma->is_return_path) {
3739 return;
3740 }
3741
2da776db
MH
3742 f = qemu_fopen_rdma(rdma, "rb");
3743 if (f == NULL) {
66988941 3744 ERROR(errp, "could not qemu_fopen_rdma!");
2da776db
MH
3745 qemu_rdma_cleanup(rdma);
3746 return;
3747 }
3748
3749 rdma->migration_started_on_destination = 1;
22724f49 3750 migration_fd_process_incoming(f);
2da776db
MH
3751}
3752
3753void rdma_start_incoming_migration(const char *host_port, Error **errp)
3754{
3755 int ret;
55cc1b59 3756 RDMAContext *rdma, *rdma_return_path;
2da776db
MH
3757 Error *local_err = NULL;
3758
733252de 3759 trace_rdma_start_incoming_migration();
2da776db
MH
3760 rdma = qemu_rdma_data_init(host_port, &local_err);
3761
3762 if (rdma == NULL) {
3763 goto err;
3764 }
3765
3766 ret = qemu_rdma_dest_init(rdma, &local_err);
3767
3768 if (ret) {
3769 goto err;
3770 }
3771
733252de 3772 trace_rdma_start_incoming_migration_after_dest_init();
2da776db
MH
3773
3774 ret = rdma_listen(rdma->listen_id, 5);
3775
3776 if (ret) {
66988941 3777 ERROR(errp, "listening on socket!");
2da776db
MH
3778 goto err;
3779 }
3780
733252de 3781 trace_rdma_start_incoming_migration_after_rdma_listen();
2da776db 3782
55cc1b59
LC
3783 /* initialize the RDMAContext for return path */
3784 if (migrate_postcopy()) {
3785 rdma_return_path = qemu_rdma_data_init(host_port, &local_err);
3786
3787 if (rdma_return_path == NULL) {
3788 goto err;
3789 }
3790
3791 qemu_rdma_return_path_dest_init(rdma_return_path, rdma);
3792 }
3793
82e1cc4b
FZ
3794 qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
3795 NULL, (void *)(intptr_t)rdma);
2da776db
MH
3796 return;
3797err:
3798 error_propagate(errp, local_err);
3799 g_free(rdma);
55cc1b59 3800 g_free(rdma_return_path);
2da776db
MH
3801}
3802
3803void rdma_start_outgoing_migration(void *opaque,
3804 const char *host_port, Error **errp)
3805{
3806 MigrationState *s = opaque;
d59ce6f3 3807 RDMAContext *rdma = qemu_rdma_data_init(host_port, errp);
55cc1b59 3808 RDMAContext *rdma_return_path = NULL;
2da776db
MH
3809 int ret = 0;
3810
3811 if (rdma == NULL) {
2da776db
MH
3812 goto err;
3813 }
3814
bbfb89e3
FZ
3815 ret = qemu_rdma_source_init(rdma,
3816 s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL], errp);
2da776db
MH
3817
3818 if (ret) {
3819 goto err;
3820 }
3821
733252de 3822 trace_rdma_start_outgoing_migration_after_rdma_source_init();
d59ce6f3 3823 ret = qemu_rdma_connect(rdma, errp);
2da776db
MH
3824
3825 if (ret) {
3826 goto err;
3827 }
3828
55cc1b59
LC
3829 /* RDMA postcopy need a seprate queue pair for return path */
3830 if (migrate_postcopy()) {
3831 rdma_return_path = qemu_rdma_data_init(host_port, errp);
3832
3833 if (rdma_return_path == NULL) {
3834 goto err;
3835 }
3836
3837 ret = qemu_rdma_source_init(rdma_return_path,
3838 s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL], errp);
3839
3840 if (ret) {
3841 goto err;
3842 }
3843
3844 ret = qemu_rdma_connect(rdma_return_path, errp);
3845
3846 if (ret) {
3847 goto err;
3848 }
3849
3850 rdma->return_path = rdma_return_path;
3851 rdma_return_path->return_path = rdma;
3852 rdma_return_path->is_return_path = true;
3853 }
3854
733252de 3855 trace_rdma_start_outgoing_migration_after_rdma_connect();
2da776db 3856
89a02a9f 3857 s->to_dst_file = qemu_fopen_rdma(rdma, "wb");
cce8040b 3858 migrate_fd_connect(s, NULL);
2da776db
MH
3859 return;
3860err:
2da776db 3861 g_free(rdma);
55cc1b59 3862 g_free(rdma_return_path);
2da776db 3863}