]> git.proxmox.com Git - mirror_qemu.git/blame - migration/rdma.c
migration: remove the QEMUFileOps 'get_return_path' callback
[mirror_qemu.git] / migration / rdma.c
CommitLineData
2da776db
MH
1/*
2 * RDMA protocol and interfaces
3 *
4 * Copyright IBM, Corp. 2010-2013
6ddd2d76 5 * Copyright Red Hat, Inc. 2015-2016
2da776db
MH
6 *
7 * Authors:
8 * Michael R. Hines <mrhines@us.ibm.com>
9 * Jiuxing Liu <jl@us.ibm.com>
6ddd2d76 10 * Daniel P. Berrange <berrange@redhat.com>
2da776db
MH
11 *
12 * This work is licensed under the terms of the GNU GPL, version 2 or
13 * later. See the COPYING file in the top-level directory.
14 *
15 */
0b8fa32f 16
1393a485 17#include "qemu/osdep.h"
da34e65c 18#include "qapi/error.h"
f348b6d1 19#include "qemu/cutils.h"
e1a3ecee 20#include "rdma.h"
6666c96a 21#include "migration.h"
08a0aee1 22#include "qemu-file.h"
7b1e1a22 23#include "ram.h"
40014d81 24#include "qemu-file-channel.h"
d49b6836 25#include "qemu/error-report.h"
2da776db 26#include "qemu/main-loop.h"
0b8fa32f 27#include "qemu/module.h"
d4842052 28#include "qemu/rcu.h"
2da776db
MH
29#include "qemu/sockets.h"
30#include "qemu/bitmap.h"
10817bf0 31#include "qemu/coroutine.h"
5f1f1902 32#include "exec/memory.h"
2da776db
MH
33#include <sys/socket.h>
34#include <netdb.h>
35#include <arpa/inet.h>
2da776db 36#include <rdma/rdma_cma.h>
733252de 37#include "trace.h"
db1015e9 38#include "qom/object.h"
e49e49dd 39#include <poll.h>
2da776db
MH
40
41/*
42 * Print and error on both the Monitor and the Log file.
43 */
44#define ERROR(errp, fmt, ...) \
45 do { \
66988941 46 fprintf(stderr, "RDMA ERROR: " fmt "\n", ## __VA_ARGS__); \
2da776db
MH
47 if (errp && (*(errp) == NULL)) { \
48 error_setg(errp, "RDMA ERROR: " fmt, ## __VA_ARGS__); \
49 } \
50 } while (0)
51
52#define RDMA_RESOLVE_TIMEOUT_MS 10000
53
54/* Do not merge data if larger than this. */
55#define RDMA_MERGE_MAX (2 * 1024 * 1024)
56#define RDMA_SIGNALED_SEND_MAX (RDMA_MERGE_MAX / 4096)
57
58#define RDMA_REG_CHUNK_SHIFT 20 /* 1 MB */
59
60/*
61 * This is only for non-live state being migrated.
62 * Instead of RDMA_WRITE messages, we use RDMA_SEND
63 * messages for that state, which requires a different
64 * delivery design than main memory.
65 */
66#define RDMA_SEND_INCREMENT 32768
67
68/*
69 * Maximum size infiniband SEND message
70 */
71#define RDMA_CONTROL_MAX_BUFFER (512 * 1024)
72#define RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE 4096
73
74#define RDMA_CONTROL_VERSION_CURRENT 1
75/*
76 * Capabilities for negotiation.
77 */
78#define RDMA_CAPABILITY_PIN_ALL 0x01
79
80/*
81 * Add the other flags above to this list of known capabilities
82 * as they are introduced.
83 */
84static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL;
85
86#define CHECK_ERROR_STATE() \
87 do { \
88 if (rdma->error_state) { \
89 if (!rdma->error_reported) { \
733252de
DDAG
90 error_report("RDMA is in an error state waiting migration" \
91 " to abort!"); \
2da776db
MH
92 rdma->error_reported = 1; \
93 } \
94 return rdma->error_state; \
95 } \
2562755e 96 } while (0)
2da776db
MH
97
98/*
99 * A work request ID is 64-bits and we split up these bits
100 * into 3 parts:
101 *
102 * bits 0-15 : type of control message, 2^16
103 * bits 16-29: ram block index, 2^14
104 * bits 30-63: ram block chunk number, 2^34
105 *
106 * The last two bit ranges are only used for RDMA writes,
107 * in order to track their completion and potentially
108 * also track unregistration status of the message.
109 */
110#define RDMA_WRID_TYPE_SHIFT 0UL
111#define RDMA_WRID_BLOCK_SHIFT 16UL
112#define RDMA_WRID_CHUNK_SHIFT 30UL
113
114#define RDMA_WRID_TYPE_MASK \
115 ((1UL << RDMA_WRID_BLOCK_SHIFT) - 1UL)
116
117#define RDMA_WRID_BLOCK_MASK \
118 (~RDMA_WRID_TYPE_MASK & ((1UL << RDMA_WRID_CHUNK_SHIFT) - 1UL))
119
120#define RDMA_WRID_CHUNK_MASK (~RDMA_WRID_BLOCK_MASK & ~RDMA_WRID_TYPE_MASK)
121
122/*
123 * RDMA migration protocol:
124 * 1. RDMA Writes (data messages, i.e. RAM)
125 * 2. IB Send/Recv (control channel messages)
126 */
127enum {
128 RDMA_WRID_NONE = 0,
129 RDMA_WRID_RDMA_WRITE = 1,
130 RDMA_WRID_SEND_CONTROL = 2000,
131 RDMA_WRID_RECV_CONTROL = 4000,
132};
133
2ae31aea 134static const char *wrid_desc[] = {
2da776db
MH
135 [RDMA_WRID_NONE] = "NONE",
136 [RDMA_WRID_RDMA_WRITE] = "WRITE RDMA",
137 [RDMA_WRID_SEND_CONTROL] = "CONTROL SEND",
138 [RDMA_WRID_RECV_CONTROL] = "CONTROL RECV",
139};
140
141/*
142 * Work request IDs for IB SEND messages only (not RDMA writes).
143 * This is used by the migration protocol to transmit
144 * control messages (such as device state and registration commands)
145 *
146 * We could use more WRs, but we have enough for now.
147 */
148enum {
149 RDMA_WRID_READY = 0,
150 RDMA_WRID_DATA,
151 RDMA_WRID_CONTROL,
152 RDMA_WRID_MAX,
153};
154
155/*
156 * SEND/RECV IB Control Messages.
157 */
158enum {
159 RDMA_CONTROL_NONE = 0,
160 RDMA_CONTROL_ERROR,
161 RDMA_CONTROL_READY, /* ready to receive */
162 RDMA_CONTROL_QEMU_FILE, /* QEMUFile-transmitted bytes */
163 RDMA_CONTROL_RAM_BLOCKS_REQUEST, /* RAMBlock synchronization */
164 RDMA_CONTROL_RAM_BLOCKS_RESULT, /* RAMBlock synchronization */
165 RDMA_CONTROL_COMPRESS, /* page contains repeat values */
166 RDMA_CONTROL_REGISTER_REQUEST, /* dynamic page registration */
167 RDMA_CONTROL_REGISTER_RESULT, /* key to use after registration */
168 RDMA_CONTROL_REGISTER_FINISHED, /* current iteration finished */
169 RDMA_CONTROL_UNREGISTER_REQUEST, /* dynamic UN-registration */
170 RDMA_CONTROL_UNREGISTER_FINISHED, /* unpinning finished */
171};
172
2da776db
MH
173
174/*
175 * Memory and MR structures used to represent an IB Send/Recv work request.
176 * This is *not* used for RDMA writes, only IB Send/Recv.
177 */
178typedef struct {
179 uint8_t control[RDMA_CONTROL_MAX_BUFFER]; /* actual buffer to register */
180 struct ibv_mr *control_mr; /* registration metadata */
181 size_t control_len; /* length of the message */
182 uint8_t *control_curr; /* start of unconsumed bytes */
183} RDMAWorkRequestData;
184
185/*
186 * Negotiate RDMA capabilities during connection-setup time.
187 */
188typedef struct {
189 uint32_t version;
190 uint32_t flags;
191} RDMACapabilities;
192
193static void caps_to_network(RDMACapabilities *cap)
194{
195 cap->version = htonl(cap->version);
196 cap->flags = htonl(cap->flags);
197}
198
199static void network_to_caps(RDMACapabilities *cap)
200{
201 cap->version = ntohl(cap->version);
202 cap->flags = ntohl(cap->flags);
203}
204
205/*
206 * Representation of a RAMBlock from an RDMA perspective.
207 * This is not transmitted, only local.
208 * This and subsequent structures cannot be linked lists
209 * because we're using a single IB message to transmit
210 * the information. It's small anyway, so a list is overkill.
211 */
212typedef struct RDMALocalBlock {
4fb5364b
DDAG
213 char *block_name;
214 uint8_t *local_host_addr; /* local virtual address */
215 uint64_t remote_host_addr; /* remote virtual address */
216 uint64_t offset;
217 uint64_t length;
218 struct ibv_mr **pmr; /* MRs for chunk-level registration */
219 struct ibv_mr *mr; /* MR for non-chunk-level registration */
220 uint32_t *remote_keys; /* rkeys for chunk-level registration */
221 uint32_t remote_rkey; /* rkeys for non-chunk-level registration */
222 int index; /* which block are we */
e4d63320 223 unsigned int src_index; /* (Only used on dest) */
4fb5364b
DDAG
224 bool is_ram_block;
225 int nb_chunks;
2da776db
MH
226 unsigned long *transit_bitmap;
227 unsigned long *unregister_bitmap;
228} RDMALocalBlock;
229
230/*
231 * Also represents a RAMblock, but only on the dest.
232 * This gets transmitted by the dest during connection-time
233 * to the source VM and then is used to populate the
234 * corresponding RDMALocalBlock with
235 * the information needed to perform the actual RDMA.
236 */
a97270ad 237typedef struct QEMU_PACKED RDMADestBlock {
2da776db
MH
238 uint64_t remote_host_addr;
239 uint64_t offset;
240 uint64_t length;
241 uint32_t remote_rkey;
242 uint32_t padding;
a97270ad 243} RDMADestBlock;
2da776db 244
482a33c5
DDAG
245static const char *control_desc(unsigned int rdma_control)
246{
247 static const char *strs[] = {
248 [RDMA_CONTROL_NONE] = "NONE",
249 [RDMA_CONTROL_ERROR] = "ERROR",
250 [RDMA_CONTROL_READY] = "READY",
251 [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE",
252 [RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST",
253 [RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT",
254 [RDMA_CONTROL_COMPRESS] = "COMPRESS",
255 [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST",
256 [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT",
257 [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED",
258 [RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST",
259 [RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED",
260 };
261
262 if (rdma_control > RDMA_CONTROL_UNREGISTER_FINISHED) {
263 return "??BAD CONTROL VALUE??";
264 }
265
266 return strs[rdma_control];
267}
268
2da776db
MH
269static uint64_t htonll(uint64_t v)
270{
271 union { uint32_t lv[2]; uint64_t llv; } u;
272 u.lv[0] = htonl(v >> 32);
273 u.lv[1] = htonl(v & 0xFFFFFFFFULL);
274 return u.llv;
275}
276
cbfc71b5
BY
277static uint64_t ntohll(uint64_t v)
278{
2da776db
MH
279 union { uint32_t lv[2]; uint64_t llv; } u;
280 u.llv = v;
281 return ((uint64_t)ntohl(u.lv[0]) << 32) | (uint64_t) ntohl(u.lv[1]);
282}
283
a97270ad 284static void dest_block_to_network(RDMADestBlock *db)
2da776db 285{
a97270ad
DDAG
286 db->remote_host_addr = htonll(db->remote_host_addr);
287 db->offset = htonll(db->offset);
288 db->length = htonll(db->length);
289 db->remote_rkey = htonl(db->remote_rkey);
2da776db
MH
290}
291
a97270ad 292static void network_to_dest_block(RDMADestBlock *db)
2da776db 293{
a97270ad
DDAG
294 db->remote_host_addr = ntohll(db->remote_host_addr);
295 db->offset = ntohll(db->offset);
296 db->length = ntohll(db->length);
297 db->remote_rkey = ntohl(db->remote_rkey);
2da776db
MH
298}
299
300/*
301 * Virtual address of the above structures used for transmitting
302 * the RAMBlock descriptions at connection-time.
303 * This structure is *not* transmitted.
304 */
305typedef struct RDMALocalBlocks {
306 int nb_blocks;
307 bool init; /* main memory init complete */
308 RDMALocalBlock *block;
309} RDMALocalBlocks;
310
311/*
312 * Main data structure for RDMA state.
313 * While there is only one copy of this structure being allocated right now,
314 * this is the place where one would start if you wanted to consider
315 * having more than one RDMA connection open at the same time.
316 */
317typedef struct RDMAContext {
318 char *host;
319 int port;
44bcfd45 320 char *host_port;
2da776db 321
1f22364b 322 RDMAWorkRequestData wr_data[RDMA_WRID_MAX];
2da776db
MH
323
324 /*
325 * This is used by *_exchange_send() to figure out whether or not
326 * the initial "READY" message has already been received or not.
327 * This is because other functions may potentially poll() and detect
328 * the READY message before send() does, in which case we need to
329 * know if it completed.
330 */
331 int control_ready_expected;
332
333 /* number of outstanding writes */
334 int nb_sent;
335
336 /* store info about current buffer so that we can
337 merge it with future sends */
338 uint64_t current_addr;
339 uint64_t current_length;
340 /* index of ram block the current buffer belongs to */
341 int current_index;
342 /* index of the chunk in the current ram block */
343 int current_chunk;
344
345 bool pin_all;
346
347 /*
348 * infiniband-specific variables for opening the device
349 * and maintaining connection state and so forth.
350 *
351 * cm_id also has ibv_context, rdma_event_channel, and ibv_qp in
352 * cm_id->verbs, cm_id->channel, and cm_id->qp.
353 */
354 struct rdma_cm_id *cm_id; /* connection manager ID */
355 struct rdma_cm_id *listen_id;
5a91337c 356 bool connected;
2da776db
MH
357
358 struct ibv_context *verbs;
359 struct rdma_event_channel *channel;
360 struct ibv_qp *qp; /* queue pair */
b390afd8
LZ
361 struct ibv_comp_channel *recv_comp_channel; /* recv completion channel */
362 struct ibv_comp_channel *send_comp_channel; /* send completion channel */
2da776db 363 struct ibv_pd *pd; /* protection domain */
b390afd8
LZ
364 struct ibv_cq *recv_cq; /* recvieve completion queue */
365 struct ibv_cq *send_cq; /* send completion queue */
2da776db
MH
366
367 /*
368 * If a previous write failed (perhaps because of a failed
369 * memory registration, then do not attempt any future work
370 * and remember the error state.
371 */
372 int error_state;
373 int error_reported;
cd5ea070 374 int received_error;
2da776db
MH
375
376 /*
377 * Description of ram blocks used throughout the code.
378 */
379 RDMALocalBlocks local_ram_blocks;
a97270ad 380 RDMADestBlock *dest_blocks;
2da776db 381
e4d63320
DDAG
382 /* Index of the next RAMBlock received during block registration */
383 unsigned int next_src_index;
384
2da776db
MH
385 /*
386 * Migration on *destination* started.
387 * Then use coroutine yield function.
388 * Source runs in a thread, so we don't care.
389 */
390 int migration_started_on_destination;
391
392 int total_registrations;
393 int total_writes;
394
395 int unregister_current, unregister_next;
396 uint64_t unregistrations[RDMA_SIGNALED_SEND_MAX];
397
398 GHashTable *blockmap;
55cc1b59
LC
399
400 /* the RDMAContext for return path */
401 struct RDMAContext *return_path;
402 bool is_return_path;
2da776db
MH
403} RDMAContext;
404
6ddd2d76 405#define TYPE_QIO_CHANNEL_RDMA "qio-channel-rdma"
8063396b 406OBJECT_DECLARE_SIMPLE_TYPE(QIOChannelRDMA, QIO_CHANNEL_RDMA)
6ddd2d76 407
6ddd2d76
DB
408
409
410struct QIOChannelRDMA {
411 QIOChannel parent;
74637e6f
LC
412 RDMAContext *rdmain;
413 RDMAContext *rdmaout;
6ddd2d76 414 QEMUFile *file;
6ddd2d76
DB
415 bool blocking; /* XXX we don't actually honour this yet */
416};
2da776db
MH
417
418/*
419 * Main structure for IB Send/Recv control messages.
420 * This gets prepended at the beginning of every Send/Recv.
421 */
422typedef struct QEMU_PACKED {
423 uint32_t len; /* Total length of data portion */
424 uint32_t type; /* which control command to perform */
425 uint32_t repeat; /* number of commands in data portion of same type */
426 uint32_t padding;
427} RDMAControlHeader;
428
429static void control_to_network(RDMAControlHeader *control)
430{
431 control->type = htonl(control->type);
432 control->len = htonl(control->len);
433 control->repeat = htonl(control->repeat);
434}
435
436static void network_to_control(RDMAControlHeader *control)
437{
438 control->type = ntohl(control->type);
439 control->len = ntohl(control->len);
440 control->repeat = ntohl(control->repeat);
441}
442
443/*
444 * Register a single Chunk.
445 * Information sent by the source VM to inform the dest
446 * to register an single chunk of memory before we can perform
447 * the actual RDMA operation.
448 */
449typedef struct QEMU_PACKED {
450 union QEMU_PACKED {
b12f7777 451 uint64_t current_addr; /* offset into the ram_addr_t space */
2da776db
MH
452 uint64_t chunk; /* chunk to lookup if unregistering */
453 } key;
454 uint32_t current_index; /* which ramblock the chunk belongs to */
455 uint32_t padding;
456 uint64_t chunks; /* how many sequential chunks to register */
457} RDMARegister;
458
b12f7777 459static void register_to_network(RDMAContext *rdma, RDMARegister *reg)
2da776db 460{
b12f7777
DDAG
461 RDMALocalBlock *local_block;
462 local_block = &rdma->local_ram_blocks.block[reg->current_index];
463
464 if (local_block->is_ram_block) {
465 /*
466 * current_addr as passed in is an address in the local ram_addr_t
467 * space, we need to translate this for the destination
468 */
469 reg->key.current_addr -= local_block->offset;
470 reg->key.current_addr += rdma->dest_blocks[reg->current_index].offset;
471 }
2da776db
MH
472 reg->key.current_addr = htonll(reg->key.current_addr);
473 reg->current_index = htonl(reg->current_index);
474 reg->chunks = htonll(reg->chunks);
475}
476
477static void network_to_register(RDMARegister *reg)
478{
479 reg->key.current_addr = ntohll(reg->key.current_addr);
480 reg->current_index = ntohl(reg->current_index);
481 reg->chunks = ntohll(reg->chunks);
482}
483
484typedef struct QEMU_PACKED {
485 uint32_t value; /* if zero, we will madvise() */
486 uint32_t block_idx; /* which ram block index */
b12f7777 487 uint64_t offset; /* Address in remote ram_addr_t space */
2da776db
MH
488 uint64_t length; /* length of the chunk */
489} RDMACompress;
490
b12f7777 491static void compress_to_network(RDMAContext *rdma, RDMACompress *comp)
2da776db
MH
492{
493 comp->value = htonl(comp->value);
b12f7777
DDAG
494 /*
495 * comp->offset as passed in is an address in the local ram_addr_t
496 * space, we need to translate this for the destination
497 */
498 comp->offset -= rdma->local_ram_blocks.block[comp->block_idx].offset;
499 comp->offset += rdma->dest_blocks[comp->block_idx].offset;
2da776db
MH
500 comp->block_idx = htonl(comp->block_idx);
501 comp->offset = htonll(comp->offset);
502 comp->length = htonll(comp->length);
503}
504
505static void network_to_compress(RDMACompress *comp)
506{
507 comp->value = ntohl(comp->value);
508 comp->block_idx = ntohl(comp->block_idx);
509 comp->offset = ntohll(comp->offset);
510 comp->length = ntohll(comp->length);
511}
512
513/*
514 * The result of the dest's memory registration produces an "rkey"
515 * which the source VM must reference in order to perform
516 * the RDMA operation.
517 */
518typedef struct QEMU_PACKED {
519 uint32_t rkey;
520 uint32_t padding;
521 uint64_t host_addr;
522} RDMARegisterResult;
523
524static void result_to_network(RDMARegisterResult *result)
525{
526 result->rkey = htonl(result->rkey);
527 result->host_addr = htonll(result->host_addr);
528};
529
530static void network_to_result(RDMARegisterResult *result)
531{
532 result->rkey = ntohl(result->rkey);
533 result->host_addr = ntohll(result->host_addr);
534};
535
536const char *print_wrid(int wrid);
537static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
538 uint8_t *data, RDMAControlHeader *resp,
539 int *resp_idx,
540 int (*callback)(RDMAContext *rdma));
541
dd286ed7
IY
542static inline uint64_t ram_chunk_index(const uint8_t *start,
543 const uint8_t *host)
2da776db
MH
544{
545 return ((uintptr_t) host - (uintptr_t) start) >> RDMA_REG_CHUNK_SHIFT;
546}
547
dd286ed7 548static inline uint8_t *ram_chunk_start(const RDMALocalBlock *rdma_ram_block,
2da776db
MH
549 uint64_t i)
550{
fbce8c25
SW
551 return (uint8_t *)(uintptr_t)(rdma_ram_block->local_host_addr +
552 (i << RDMA_REG_CHUNK_SHIFT));
2da776db
MH
553}
554
dd286ed7
IY
555static inline uint8_t *ram_chunk_end(const RDMALocalBlock *rdma_ram_block,
556 uint64_t i)
2da776db
MH
557{
558 uint8_t *result = ram_chunk_start(rdma_ram_block, i) +
559 (1UL << RDMA_REG_CHUNK_SHIFT);
560
561 if (result > (rdma_ram_block->local_host_addr + rdma_ram_block->length)) {
562 result = rdma_ram_block->local_host_addr + rdma_ram_block->length;
563 }
564
565 return result;
566}
567
4fb5364b
DDAG
568static int rdma_add_block(RDMAContext *rdma, const char *block_name,
569 void *host_addr,
2da776db
MH
570 ram_addr_t block_offset, uint64_t length)
571{
572 RDMALocalBlocks *local = &rdma->local_ram_blocks;
760ff4be 573 RDMALocalBlock *block;
2da776db
MH
574 RDMALocalBlock *old = local->block;
575
97f3ad35 576 local->block = g_new0(RDMALocalBlock, local->nb_blocks + 1);
2da776db
MH
577
578 if (local->nb_blocks) {
579 int x;
580
760ff4be
DDAG
581 if (rdma->blockmap) {
582 for (x = 0; x < local->nb_blocks; x++) {
583 g_hash_table_remove(rdma->blockmap,
584 (void *)(uintptr_t)old[x].offset);
585 g_hash_table_insert(rdma->blockmap,
586 (void *)(uintptr_t)old[x].offset,
587 &local->block[x]);
588 }
2da776db
MH
589 }
590 memcpy(local->block, old, sizeof(RDMALocalBlock) * local->nb_blocks);
591 g_free(old);
592 }
593
594 block = &local->block[local->nb_blocks];
595
4fb5364b 596 block->block_name = g_strdup(block_name);
2da776db
MH
597 block->local_host_addr = host_addr;
598 block->offset = block_offset;
599 block->length = length;
600 block->index = local->nb_blocks;
e4d63320 601 block->src_index = ~0U; /* Filled in by the receipt of the block list */
2da776db
MH
602 block->nb_chunks = ram_chunk_index(host_addr, host_addr + length) + 1UL;
603 block->transit_bitmap = bitmap_new(block->nb_chunks);
604 bitmap_clear(block->transit_bitmap, 0, block->nb_chunks);
605 block->unregister_bitmap = bitmap_new(block->nb_chunks);
606 bitmap_clear(block->unregister_bitmap, 0, block->nb_chunks);
97f3ad35 607 block->remote_keys = g_new0(uint32_t, block->nb_chunks);
2da776db
MH
608
609 block->is_ram_block = local->init ? false : true;
610
760ff4be 611 if (rdma->blockmap) {
80e60c6e 612 g_hash_table_insert(rdma->blockmap, (void *)(uintptr_t)block_offset, block);
760ff4be 613 }
2da776db 614
4fb5364b
DDAG
615 trace_rdma_add_block(block_name, local->nb_blocks,
616 (uintptr_t) block->local_host_addr,
ba795761 617 block->offset, block->length,
fbce8c25 618 (uintptr_t) (block->local_host_addr + block->length),
ba795761
DDAG
619 BITS_TO_LONGS(block->nb_chunks) *
620 sizeof(unsigned long) * 8,
621 block->nb_chunks);
2da776db
MH
622
623 local->nb_blocks++;
624
625 return 0;
626}
627
628/*
629 * Memory regions need to be registered with the device and queue pairs setup
630 * in advanced before the migration starts. This tells us where the RAM blocks
631 * are so that we can register them individually.
632 */
754cb9c0 633static int qemu_rdma_init_one_block(RAMBlock *rb, void *opaque)
2da776db 634{
754cb9c0
YK
635 const char *block_name = qemu_ram_get_idstr(rb);
636 void *host_addr = qemu_ram_get_host_addr(rb);
637 ram_addr_t block_offset = qemu_ram_get_offset(rb);
638 ram_addr_t length = qemu_ram_get_used_length(rb);
4fb5364b 639 return rdma_add_block(opaque, block_name, host_addr, block_offset, length);
2da776db
MH
640}
641
642/*
643 * Identify the RAMBlocks and their quantity. They will be references to
644 * identify chunk boundaries inside each RAMBlock and also be referenced
645 * during dynamic page registration.
646 */
647static int qemu_rdma_init_ram_blocks(RDMAContext *rdma)
648{
649 RDMALocalBlocks *local = &rdma->local_ram_blocks;
281496bb 650 int ret;
2da776db
MH
651
652 assert(rdma->blockmap == NULL);
2da776db 653 memset(local, 0, sizeof *local);
281496bb
DDAG
654 ret = foreach_not_ignored_block(qemu_rdma_init_one_block, rdma);
655 if (ret) {
656 return ret;
657 }
733252de 658 trace_qemu_rdma_init_ram_blocks(local->nb_blocks);
97f3ad35
MA
659 rdma->dest_blocks = g_new0(RDMADestBlock,
660 rdma->local_ram_blocks.nb_blocks);
2da776db
MH
661 local->init = true;
662 return 0;
663}
664
03fcab38
DDAG
665/*
666 * Note: If used outside of cleanup, the caller must ensure that the destination
667 * block structures are also updated
668 */
669static int rdma_delete_block(RDMAContext *rdma, RDMALocalBlock *block)
2da776db
MH
670{
671 RDMALocalBlocks *local = &rdma->local_ram_blocks;
2da776db
MH
672 RDMALocalBlock *old = local->block;
673 int x;
674
03fcab38
DDAG
675 if (rdma->blockmap) {
676 g_hash_table_remove(rdma->blockmap, (void *)(uintptr_t)block->offset);
677 }
2da776db
MH
678 if (block->pmr) {
679 int j;
680
681 for (j = 0; j < block->nb_chunks; j++) {
682 if (!block->pmr[j]) {
683 continue;
684 }
685 ibv_dereg_mr(block->pmr[j]);
686 rdma->total_registrations--;
687 }
688 g_free(block->pmr);
689 block->pmr = NULL;
690 }
691
692 if (block->mr) {
693 ibv_dereg_mr(block->mr);
694 rdma->total_registrations--;
695 block->mr = NULL;
696 }
697
698 g_free(block->transit_bitmap);
699 block->transit_bitmap = NULL;
700
701 g_free(block->unregister_bitmap);
702 block->unregister_bitmap = NULL;
703
704 g_free(block->remote_keys);
705 block->remote_keys = NULL;
706
4fb5364b
DDAG
707 g_free(block->block_name);
708 block->block_name = NULL;
709
03fcab38
DDAG
710 if (rdma->blockmap) {
711 for (x = 0; x < local->nb_blocks; x++) {
712 g_hash_table_remove(rdma->blockmap,
713 (void *)(uintptr_t)old[x].offset);
714 }
2da776db
MH
715 }
716
717 if (local->nb_blocks > 1) {
718
97f3ad35 719 local->block = g_new0(RDMALocalBlock, local->nb_blocks - 1);
2da776db
MH
720
721 if (block->index) {
722 memcpy(local->block, old, sizeof(RDMALocalBlock) * block->index);
723 }
724
725 if (block->index < (local->nb_blocks - 1)) {
726 memcpy(local->block + block->index, old + (block->index + 1),
727 sizeof(RDMALocalBlock) *
728 (local->nb_blocks - (block->index + 1)));
71cd7306
LC
729 for (x = block->index; x < local->nb_blocks - 1; x++) {
730 local->block[x].index--;
731 }
2da776db
MH
732 }
733 } else {
734 assert(block == local->block);
735 local->block = NULL;
736 }
737
03fcab38 738 trace_rdma_delete_block(block, (uintptr_t)block->local_host_addr,
733252de 739 block->offset, block->length,
fbce8c25 740 (uintptr_t)(block->local_host_addr + block->length),
733252de
DDAG
741 BITS_TO_LONGS(block->nb_chunks) *
742 sizeof(unsigned long) * 8, block->nb_chunks);
2da776db
MH
743
744 g_free(old);
745
746 local->nb_blocks--;
747
03fcab38 748 if (local->nb_blocks && rdma->blockmap) {
2da776db 749 for (x = 0; x < local->nb_blocks; x++) {
fbce8c25
SW
750 g_hash_table_insert(rdma->blockmap,
751 (void *)(uintptr_t)local->block[x].offset,
752 &local->block[x]);
2da776db
MH
753 }
754 }
755
756 return 0;
757}
758
759/*
760 * Put in the log file which RDMA device was opened and the details
761 * associated with that device.
762 */
763static void qemu_rdma_dump_id(const char *who, struct ibv_context *verbs)
764{
7fc5b13f
MH
765 struct ibv_port_attr port;
766
767 if (ibv_query_port(verbs, 1, &port)) {
733252de 768 error_report("Failed to query port information");
7fc5b13f
MH
769 return;
770 }
771
2da776db
MH
772 printf("%s RDMA Device opened: kernel name %s "
773 "uverbs device name %s, "
7fc5b13f
MH
774 "infiniband_verbs class device path %s, "
775 "infiniband class device path %s, "
776 "transport: (%d) %s\n",
2da776db
MH
777 who,
778 verbs->device->name,
779 verbs->device->dev_name,
780 verbs->device->dev_path,
7fc5b13f
MH
781 verbs->device->ibdev_path,
782 port.link_layer,
783 (port.link_layer == IBV_LINK_LAYER_INFINIBAND) ? "Infiniband" :
02942db7 784 ((port.link_layer == IBV_LINK_LAYER_ETHERNET)
7fc5b13f 785 ? "Ethernet" : "Unknown"));
2da776db
MH
786}
787
788/*
789 * Put in the log file the RDMA gid addressing information,
790 * useful for folks who have trouble understanding the
791 * RDMA device hierarchy in the kernel.
792 */
793static void qemu_rdma_dump_gid(const char *who, struct rdma_cm_id *id)
794{
795 char sgid[33];
796 char dgid[33];
797 inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.sgid, sgid, sizeof sgid);
798 inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.dgid, dgid, sizeof dgid);
733252de 799 trace_qemu_rdma_dump_gid(who, sgid, dgid);
2da776db
MH
800}
801
7fc5b13f
MH
802/*
803 * As of now, IPv6 over RoCE / iWARP is not supported by linux.
804 * We will try the next addrinfo struct, and fail if there are
805 * no other valid addresses to bind against.
806 *
807 * If user is listening on '[::]', then we will not have a opened a device
808 * yet and have no way of verifying if the device is RoCE or not.
809 *
810 * In this case, the source VM will throw an error for ALL types of
811 * connections (both IPv4 and IPv6) if the destination machine does not have
812 * a regular infiniband network available for use.
813 *
4c293dc6 814 * The only way to guarantee that an error is thrown for broken kernels is
7fc5b13f
MH
815 * for the management software to choose a *specific* interface at bind time
816 * and validate what time of hardware it is.
817 *
818 * Unfortunately, this puts the user in a fix:
02942db7 819 *
7fc5b13f
MH
820 * If the source VM connects with an IPv4 address without knowing that the
821 * destination has bound to '[::]' the migration will unconditionally fail
b6af0975 822 * unless the management software is explicitly listening on the IPv4
7fc5b13f
MH
823 * address while using a RoCE-based device.
824 *
825 * If the source VM connects with an IPv6 address, then we're OK because we can
826 * throw an error on the source (and similarly on the destination).
02942db7 827 *
7fc5b13f
MH
828 * But in mixed environments, this will be broken for a while until it is fixed
829 * inside linux.
830 *
831 * We do provide a *tiny* bit of help in this function: We can list all of the
832 * devices in the system and check to see if all the devices are RoCE or
02942db7 833 * Infiniband.
7fc5b13f
MH
834 *
835 * If we detect that we have a *pure* RoCE environment, then we can safely
4c293dc6 836 * thrown an error even if the management software has specified '[::]' as the
7fc5b13f
MH
837 * bind address.
838 *
839 * However, if there is are multiple hetergeneous devices, then we cannot make
840 * this assumption and the user just has to be sure they know what they are
841 * doing.
842 *
843 * Patches are being reviewed on linux-rdma.
844 */
bbfb89e3 845static int qemu_rdma_broken_ipv6_kernel(struct ibv_context *verbs, Error **errp)
7fc5b13f 846{
7fc5b13f
MH
847 /* This bug only exists in linux, to our knowledge. */
848#ifdef CONFIG_LINUX
1f4abd81 849 struct ibv_port_attr port_attr;
7fc5b13f 850
02942db7 851 /*
7fc5b13f 852 * Verbs are only NULL if management has bound to '[::]'.
02942db7 853 *
7fc5b13f
MH
854 * Let's iterate through all the devices and see if there any pure IB
855 * devices (non-ethernet).
02942db7 856 *
7fc5b13f 857 * If not, then we can safely proceed with the migration.
4c293dc6 858 * Otherwise, there are no guarantees until the bug is fixed in linux.
7fc5b13f
MH
859 */
860 if (!verbs) {
02942db7 861 int num_devices, x;
0bcae623 862 struct ibv_device **dev_list = ibv_get_device_list(&num_devices);
7fc5b13f
MH
863 bool roce_found = false;
864 bool ib_found = false;
865
866 for (x = 0; x < num_devices; x++) {
867 verbs = ibv_open_device(dev_list[x]);
5b61d575
PR
868 if (!verbs) {
869 if (errno == EPERM) {
870 continue;
871 } else {
872 return -EINVAL;
873 }
874 }
7fc5b13f
MH
875
876 if (ibv_query_port(verbs, 1, &port_attr)) {
877 ibv_close_device(verbs);
878 ERROR(errp, "Could not query initial IB port");
879 return -EINVAL;
880 }
881
882 if (port_attr.link_layer == IBV_LINK_LAYER_INFINIBAND) {
883 ib_found = true;
884 } else if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) {
885 roce_found = true;
886 }
887
888 ibv_close_device(verbs);
889
890 }
891
892 if (roce_found) {
893 if (ib_found) {
894 fprintf(stderr, "WARN: migrations may fail:"
895 " IPv6 over RoCE / iWARP in linux"
896 " is broken. But since you appear to have a"
897 " mixed RoCE / IB environment, be sure to only"
898 " migrate over the IB fabric until the kernel "
899 " fixes the bug.\n");
900 } else {
901 ERROR(errp, "You only have RoCE / iWARP devices in your systems"
902 " and your management software has specified '[::]'"
903 ", but IPv6 over RoCE / iWARP is not supported in Linux.");
904 return -ENONET;
905 }
906 }
907
908 return 0;
909 }
910
911 /*
912 * If we have a verbs context, that means that some other than '[::]' was
02942db7
SW
913 * used by the management software for binding. In which case we can
914 * actually warn the user about a potentially broken kernel.
7fc5b13f
MH
915 */
916
917 /* IB ports start with 1, not 0 */
918 if (ibv_query_port(verbs, 1, &port_attr)) {
919 ERROR(errp, "Could not query initial IB port");
920 return -EINVAL;
921 }
922
923 if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) {
924 ERROR(errp, "Linux kernel's RoCE / iWARP does not support IPv6 "
925 "(but patches on linux-rdma in progress)");
926 return -ENONET;
927 }
928
929#endif
930
931 return 0;
932}
933
2da776db
MH
934/*
935 * Figure out which RDMA device corresponds to the requested IP hostname
936 * Also create the initial connection manager identifiers for opening
937 * the connection.
938 */
939static int qemu_rdma_resolve_host(RDMAContext *rdma, Error **errp)
940{
941 int ret;
7fc5b13f 942 struct rdma_addrinfo *res;
2da776db
MH
943 char port_str[16];
944 struct rdma_cm_event *cm_event;
945 char ip[40] = "unknown";
7fc5b13f 946 struct rdma_addrinfo *e;
2da776db
MH
947
948 if (rdma->host == NULL || !strcmp(rdma->host, "")) {
66988941 949 ERROR(errp, "RDMA hostname has not been set");
7fc5b13f 950 return -EINVAL;
2da776db
MH
951 }
952
953 /* create CM channel */
954 rdma->channel = rdma_create_event_channel();
955 if (!rdma->channel) {
66988941 956 ERROR(errp, "could not create CM channel");
7fc5b13f 957 return -EINVAL;
2da776db
MH
958 }
959
960 /* create CM id */
961 ret = rdma_create_id(rdma->channel, &rdma->cm_id, NULL, RDMA_PS_TCP);
962 if (ret) {
66988941 963 ERROR(errp, "could not create channel id");
2da776db
MH
964 goto err_resolve_create_id;
965 }
966
967 snprintf(port_str, 16, "%d", rdma->port);
968 port_str[15] = '\0';
969
7fc5b13f 970 ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res);
2da776db 971 if (ret < 0) {
7fc5b13f 972 ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host);
2da776db
MH
973 goto err_resolve_get_addr;
974 }
975
6470215b
MH
976 for (e = res; e != NULL; e = e->ai_next) {
977 inet_ntop(e->ai_family,
7fc5b13f 978 &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip);
733252de 979 trace_qemu_rdma_resolve_host_trying(rdma->host, ip);
2da776db 980
7fc5b13f 981 ret = rdma_resolve_addr(rdma->cm_id, NULL, e->ai_dst_addr,
6470215b
MH
982 RDMA_RESOLVE_TIMEOUT_MS);
983 if (!ret) {
c89aa2f1 984 if (e->ai_family == AF_INET6) {
bbfb89e3 985 ret = qemu_rdma_broken_ipv6_kernel(rdma->cm_id->verbs, errp);
c89aa2f1
MH
986 if (ret) {
987 continue;
988 }
7fc5b13f 989 }
6470215b
MH
990 goto route;
991 }
2da776db
MH
992 }
993
f53b450a 994 rdma_freeaddrinfo(res);
6470215b
MH
995 ERROR(errp, "could not resolve address %s", rdma->host);
996 goto err_resolve_get_addr;
997
998route:
f53b450a 999 rdma_freeaddrinfo(res);
2da776db
MH
1000 qemu_rdma_dump_gid("source_resolve_addr", rdma->cm_id);
1001
1002 ret = rdma_get_cm_event(rdma->channel, &cm_event);
1003 if (ret) {
66988941 1004 ERROR(errp, "could not perform event_addr_resolved");
2da776db
MH
1005 goto err_resolve_get_addr;
1006 }
1007
1008 if (cm_event->event != RDMA_CM_EVENT_ADDR_RESOLVED) {
66988941 1009 ERROR(errp, "result not equal to event_addr_resolved %s",
2da776db 1010 rdma_event_str(cm_event->event));
e5f60791 1011 error_report("rdma_resolve_addr");
2a934347 1012 rdma_ack_cm_event(cm_event);
7fc5b13f 1013 ret = -EINVAL;
2da776db
MH
1014 goto err_resolve_get_addr;
1015 }
1016 rdma_ack_cm_event(cm_event);
1017
1018 /* resolve route */
1019 ret = rdma_resolve_route(rdma->cm_id, RDMA_RESOLVE_TIMEOUT_MS);
1020 if (ret) {
66988941 1021 ERROR(errp, "could not resolve rdma route");
2da776db
MH
1022 goto err_resolve_get_addr;
1023 }
1024
1025 ret = rdma_get_cm_event(rdma->channel, &cm_event);
1026 if (ret) {
66988941 1027 ERROR(errp, "could not perform event_route_resolved");
2da776db
MH
1028 goto err_resolve_get_addr;
1029 }
1030 if (cm_event->event != RDMA_CM_EVENT_ROUTE_RESOLVED) {
66988941 1031 ERROR(errp, "result not equal to event_route_resolved: %s",
2da776db
MH
1032 rdma_event_str(cm_event->event));
1033 rdma_ack_cm_event(cm_event);
7fc5b13f 1034 ret = -EINVAL;
2da776db
MH
1035 goto err_resolve_get_addr;
1036 }
1037 rdma_ack_cm_event(cm_event);
1038 rdma->verbs = rdma->cm_id->verbs;
1039 qemu_rdma_dump_id("source_resolve_host", rdma->cm_id->verbs);
1040 qemu_rdma_dump_gid("source_resolve_host", rdma->cm_id);
1041 return 0;
1042
1043err_resolve_get_addr:
1044 rdma_destroy_id(rdma->cm_id);
1045 rdma->cm_id = NULL;
1046err_resolve_create_id:
1047 rdma_destroy_event_channel(rdma->channel);
1048 rdma->channel = NULL;
7fc5b13f 1049 return ret;
2da776db
MH
1050}
1051
1052/*
1053 * Create protection domain and completion queues
1054 */
1055static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma)
1056{
1057 /* allocate pd */
1058 rdma->pd = ibv_alloc_pd(rdma->verbs);
1059 if (!rdma->pd) {
733252de 1060 error_report("failed to allocate protection domain");
2da776db
MH
1061 return -1;
1062 }
1063
b390afd8
LZ
1064 /* create receive completion channel */
1065 rdma->recv_comp_channel = ibv_create_comp_channel(rdma->verbs);
1066 if (!rdma->recv_comp_channel) {
1067 error_report("failed to allocate receive completion channel");
2da776db
MH
1068 goto err_alloc_pd_cq;
1069 }
1070
1071 /*
b390afd8 1072 * Completion queue can be filled by read work requests.
2da776db 1073 */
b390afd8
LZ
1074 rdma->recv_cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3),
1075 NULL, rdma->recv_comp_channel, 0);
1076 if (!rdma->recv_cq) {
1077 error_report("failed to allocate receive completion queue");
1078 goto err_alloc_pd_cq;
1079 }
1080
1081 /* create send completion channel */
1082 rdma->send_comp_channel = ibv_create_comp_channel(rdma->verbs);
1083 if (!rdma->send_comp_channel) {
1084 error_report("failed to allocate send completion channel");
1085 goto err_alloc_pd_cq;
1086 }
1087
1088 rdma->send_cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3),
1089 NULL, rdma->send_comp_channel, 0);
1090 if (!rdma->send_cq) {
1091 error_report("failed to allocate send completion queue");
2da776db
MH
1092 goto err_alloc_pd_cq;
1093 }
1094
1095 return 0;
1096
1097err_alloc_pd_cq:
1098 if (rdma->pd) {
1099 ibv_dealloc_pd(rdma->pd);
1100 }
b390afd8
LZ
1101 if (rdma->recv_comp_channel) {
1102 ibv_destroy_comp_channel(rdma->recv_comp_channel);
1103 }
1104 if (rdma->send_comp_channel) {
1105 ibv_destroy_comp_channel(rdma->send_comp_channel);
1106 }
1107 if (rdma->recv_cq) {
1108 ibv_destroy_cq(rdma->recv_cq);
1109 rdma->recv_cq = NULL;
2da776db
MH
1110 }
1111 rdma->pd = NULL;
b390afd8
LZ
1112 rdma->recv_comp_channel = NULL;
1113 rdma->send_comp_channel = NULL;
2da776db
MH
1114 return -1;
1115
1116}
1117
1118/*
1119 * Create queue pairs.
1120 */
1121static int qemu_rdma_alloc_qp(RDMAContext *rdma)
1122{
1123 struct ibv_qp_init_attr attr = { 0 };
1124 int ret;
1125
1126 attr.cap.max_send_wr = RDMA_SIGNALED_SEND_MAX;
1127 attr.cap.max_recv_wr = 3;
1128 attr.cap.max_send_sge = 1;
1129 attr.cap.max_recv_sge = 1;
b390afd8
LZ
1130 attr.send_cq = rdma->send_cq;
1131 attr.recv_cq = rdma->recv_cq;
2da776db
MH
1132 attr.qp_type = IBV_QPT_RC;
1133
1134 ret = rdma_create_qp(rdma->cm_id, rdma->pd, &attr);
1135 if (ret) {
1136 return -1;
1137 }
1138
1139 rdma->qp = rdma->cm_id->qp;
1140 return 0;
1141}
1142
e2daccb0
LZ
1143/* Check whether On-Demand Paging is supported by RDAM device */
1144static bool rdma_support_odp(struct ibv_context *dev)
1145{
1146 struct ibv_device_attr_ex attr = {0};
1147 int ret = ibv_query_device_ex(dev, NULL, &attr);
1148 if (ret) {
1149 return false;
1150 }
1151
1152 if (attr.odp_caps.general_caps & IBV_ODP_SUPPORT) {
1153 return true;
1154 }
1155
1156 return false;
1157}
1158
911965ac
LZ
1159/*
1160 * ibv_advise_mr to avoid RNR NAK error as far as possible.
1161 * The responder mr registering with ODP will sent RNR NAK back to
1162 * the requester in the face of the page fault.
1163 */
1164static void qemu_rdma_advise_prefetch_mr(struct ibv_pd *pd, uint64_t addr,
1165 uint32_t len, uint32_t lkey,
1166 const char *name, bool wr)
1167{
1168#ifdef HAVE_IBV_ADVISE_MR
1169 int ret;
1170 int advice = wr ? IBV_ADVISE_MR_ADVICE_PREFETCH_WRITE :
1171 IBV_ADVISE_MR_ADVICE_PREFETCH;
1172 struct ibv_sge sg_list = {.lkey = lkey, .addr = addr, .length = len};
1173
1174 ret = ibv_advise_mr(pd, advice,
1175 IBV_ADVISE_MR_FLAG_FLUSH, &sg_list, 1);
1176 /* ignore the error */
1177 if (ret) {
1178 trace_qemu_rdma_advise_mr(name, len, addr, strerror(errno));
1179 } else {
1180 trace_qemu_rdma_advise_mr(name, len, addr, "successed");
1181 }
1182#endif
1183}
1184
2da776db
MH
1185static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma)
1186{
1187 int i;
1188 RDMALocalBlocks *local = &rdma->local_ram_blocks;
1189
1190 for (i = 0; i < local->nb_blocks; i++) {
e2daccb0
LZ
1191 int access = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE;
1192
2da776db
MH
1193 local->block[i].mr =
1194 ibv_reg_mr(rdma->pd,
1195 local->block[i].local_host_addr,
e2daccb0 1196 local->block[i].length, access
2da776db 1197 );
e2daccb0
LZ
1198
1199 if (!local->block[i].mr &&
1200 errno == ENOTSUP && rdma_support_odp(rdma->verbs)) {
1201 access |= IBV_ACCESS_ON_DEMAND;
1202 /* register ODP mr */
1203 local->block[i].mr =
1204 ibv_reg_mr(rdma->pd,
1205 local->block[i].local_host_addr,
1206 local->block[i].length, access);
1207 trace_qemu_rdma_register_odp_mr(local->block[i].block_name);
911965ac
LZ
1208
1209 if (local->block[i].mr) {
1210 qemu_rdma_advise_prefetch_mr(rdma->pd,
1211 (uintptr_t)local->block[i].local_host_addr,
1212 local->block[i].length,
1213 local->block[i].mr->lkey,
1214 local->block[i].block_name,
1215 true);
1216 }
e2daccb0
LZ
1217 }
1218
2da776db 1219 if (!local->block[i].mr) {
eb1960aa 1220 perror("Failed to register local dest ram block!");
2da776db
MH
1221 break;
1222 }
1223 rdma->total_registrations++;
1224 }
1225
1226 if (i >= local->nb_blocks) {
1227 return 0;
1228 }
1229
1230 for (i--; i >= 0; i--) {
1231 ibv_dereg_mr(local->block[i].mr);
224f364a 1232 local->block[i].mr = NULL;
2da776db
MH
1233 rdma->total_registrations--;
1234 }
1235
1236 return -1;
1237
1238}
1239
1240/*
1241 * Find the ram block that corresponds to the page requested to be
1242 * transmitted by QEMU.
1243 *
1244 * Once the block is found, also identify which 'chunk' within that
1245 * block that the page belongs to.
1246 *
1247 * This search cannot fail or the migration will fail.
1248 */
1249static int qemu_rdma_search_ram_block(RDMAContext *rdma,
fbce8c25 1250 uintptr_t block_offset,
2da776db
MH
1251 uint64_t offset,
1252 uint64_t length,
1253 uint64_t *block_index,
1254 uint64_t *chunk_index)
1255{
1256 uint64_t current_addr = block_offset + offset;
1257 RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap,
1258 (void *) block_offset);
1259 assert(block);
1260 assert(current_addr >= block->offset);
1261 assert((current_addr + length) <= (block->offset + block->length));
1262
1263 *block_index = block->index;
1264 *chunk_index = ram_chunk_index(block->local_host_addr,
1265 block->local_host_addr + (current_addr - block->offset));
1266
1267 return 0;
1268}
1269
1270/*
1271 * Register a chunk with IB. If the chunk was already registered
1272 * previously, then skip.
1273 *
1274 * Also return the keys associated with the registration needed
1275 * to perform the actual RDMA operation.
1276 */
1277static int qemu_rdma_register_and_get_keys(RDMAContext *rdma,
3ac040c0 1278 RDMALocalBlock *block, uintptr_t host_addr,
2da776db
MH
1279 uint32_t *lkey, uint32_t *rkey, int chunk,
1280 uint8_t *chunk_start, uint8_t *chunk_end)
1281{
1282 if (block->mr) {
1283 if (lkey) {
1284 *lkey = block->mr->lkey;
1285 }
1286 if (rkey) {
1287 *rkey = block->mr->rkey;
1288 }
1289 return 0;
1290 }
1291
1292 /* allocate memory to store chunk MRs */
1293 if (!block->pmr) {
97f3ad35 1294 block->pmr = g_new0(struct ibv_mr *, block->nb_chunks);
2da776db
MH
1295 }
1296
1297 /*
1298 * If 'rkey', then we're the destination, so grant access to the source.
1299 *
1300 * If 'lkey', then we're the source VM, so grant access only to ourselves.
1301 */
1302 if (!block->pmr[chunk]) {
1303 uint64_t len = chunk_end - chunk_start;
e2daccb0
LZ
1304 int access = rkey ? IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE :
1305 0;
2da776db 1306
733252de 1307 trace_qemu_rdma_register_and_get_keys(len, chunk_start);
2da776db 1308
e2daccb0
LZ
1309 block->pmr[chunk] = ibv_reg_mr(rdma->pd, chunk_start, len, access);
1310 if (!block->pmr[chunk] &&
1311 errno == ENOTSUP && rdma_support_odp(rdma->verbs)) {
1312 access |= IBV_ACCESS_ON_DEMAND;
1313 /* register ODP mr */
1314 block->pmr[chunk] = ibv_reg_mr(rdma->pd, chunk_start, len, access);
1315 trace_qemu_rdma_register_odp_mr(block->block_name);
911965ac
LZ
1316
1317 if (block->pmr[chunk]) {
1318 qemu_rdma_advise_prefetch_mr(rdma->pd, (uintptr_t)chunk_start,
1319 len, block->pmr[chunk]->lkey,
1320 block->block_name, rkey);
1321
1322 }
2da776db 1323 }
2da776db 1324 }
e2daccb0
LZ
1325 if (!block->pmr[chunk]) {
1326 perror("Failed to register chunk!");
1327 fprintf(stderr, "Chunk details: block: %d chunk index %d"
1328 " start %" PRIuPTR " end %" PRIuPTR
1329 " host %" PRIuPTR
1330 " local %" PRIuPTR " registrations: %d\n",
1331 block->index, chunk, (uintptr_t)chunk_start,
1332 (uintptr_t)chunk_end, host_addr,
1333 (uintptr_t)block->local_host_addr,
1334 rdma->total_registrations);
1335 return -1;
1336 }
1337 rdma->total_registrations++;
2da776db
MH
1338
1339 if (lkey) {
1340 *lkey = block->pmr[chunk]->lkey;
1341 }
1342 if (rkey) {
1343 *rkey = block->pmr[chunk]->rkey;
1344 }
1345 return 0;
1346}
1347
1348/*
1349 * Register (at connection time) the memory used for control
1350 * channel messages.
1351 */
1352static int qemu_rdma_reg_control(RDMAContext *rdma, int idx)
1353{
1354 rdma->wr_data[idx].control_mr = ibv_reg_mr(rdma->pd,
1355 rdma->wr_data[idx].control, RDMA_CONTROL_MAX_BUFFER,
1356 IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
1357 if (rdma->wr_data[idx].control_mr) {
1358 rdma->total_registrations++;
1359 return 0;
1360 }
733252de 1361 error_report("qemu_rdma_reg_control failed");
2da776db
MH
1362 return -1;
1363}
1364
1365const char *print_wrid(int wrid)
1366{
1367 if (wrid >= RDMA_WRID_RECV_CONTROL) {
1368 return wrid_desc[RDMA_WRID_RECV_CONTROL];
1369 }
1370 return wrid_desc[wrid];
1371}
1372
2da776db
MH
1373/*
1374 * Perform a non-optimized memory unregistration after every transfer
24ec68ef 1375 * for demonstration purposes, only if pin-all is not requested.
2da776db
MH
1376 *
1377 * Potential optimizations:
1378 * 1. Start a new thread to run this function continuously
1379 - for bit clearing
1380 - and for receipt of unregister messages
1381 * 2. Use an LRU.
1382 * 3. Use workload hints.
1383 */
1384static int qemu_rdma_unregister_waiting(RDMAContext *rdma)
1385{
1386 while (rdma->unregistrations[rdma->unregister_current]) {
1387 int ret;
1388 uint64_t wr_id = rdma->unregistrations[rdma->unregister_current];
1389 uint64_t chunk =
1390 (wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
1391 uint64_t index =
1392 (wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
1393 RDMALocalBlock *block =
1394 &(rdma->local_ram_blocks.block[index]);
1395 RDMARegister reg = { .current_index = index };
1396 RDMAControlHeader resp = { .type = RDMA_CONTROL_UNREGISTER_FINISHED,
1397 };
1398 RDMAControlHeader head = { .len = sizeof(RDMARegister),
1399 .type = RDMA_CONTROL_UNREGISTER_REQUEST,
1400 .repeat = 1,
1401 };
1402
733252de
DDAG
1403 trace_qemu_rdma_unregister_waiting_proc(chunk,
1404 rdma->unregister_current);
2da776db
MH
1405
1406 rdma->unregistrations[rdma->unregister_current] = 0;
1407 rdma->unregister_current++;
1408
1409 if (rdma->unregister_current == RDMA_SIGNALED_SEND_MAX) {
1410 rdma->unregister_current = 0;
1411 }
1412
1413
1414 /*
1415 * Unregistration is speculative (because migration is single-threaded
1416 * and we cannot break the protocol's inifinband message ordering).
1417 * Thus, if the memory is currently being used for transmission,
1418 * then abort the attempt to unregister and try again
1419 * later the next time a completion is received for this memory.
1420 */
1421 clear_bit(chunk, block->unregister_bitmap);
1422
1423 if (test_bit(chunk, block->transit_bitmap)) {
733252de 1424 trace_qemu_rdma_unregister_waiting_inflight(chunk);
2da776db
MH
1425 continue;
1426 }
1427
733252de 1428 trace_qemu_rdma_unregister_waiting_send(chunk);
2da776db
MH
1429
1430 ret = ibv_dereg_mr(block->pmr[chunk]);
1431 block->pmr[chunk] = NULL;
1432 block->remote_keys[chunk] = 0;
1433
1434 if (ret != 0) {
1435 perror("unregistration chunk failed");
1436 return -ret;
1437 }
1438 rdma->total_registrations--;
1439
1440 reg.key.chunk = chunk;
b12f7777 1441 register_to_network(rdma, &reg);
2da776db
MH
1442 ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
1443 &resp, NULL, NULL);
1444 if (ret < 0) {
1445 return ret;
1446 }
1447
733252de 1448 trace_qemu_rdma_unregister_waiting_complete(chunk);
2da776db
MH
1449 }
1450
1451 return 0;
1452}
1453
1454static uint64_t qemu_rdma_make_wrid(uint64_t wr_id, uint64_t index,
1455 uint64_t chunk)
1456{
1457 uint64_t result = wr_id & RDMA_WRID_TYPE_MASK;
1458
1459 result |= (index << RDMA_WRID_BLOCK_SHIFT);
1460 result |= (chunk << RDMA_WRID_CHUNK_SHIFT);
1461
1462 return result;
1463}
1464
2da776db
MH
1465/*
1466 * Consult the connection manager to see a work request
1467 * (of any kind) has completed.
1468 * Return the work request ID that completed.
1469 */
b390afd8
LZ
1470static uint64_t qemu_rdma_poll(RDMAContext *rdma, struct ibv_cq *cq,
1471 uint64_t *wr_id_out, uint32_t *byte_len)
2da776db
MH
1472{
1473 int ret;
1474 struct ibv_wc wc;
1475 uint64_t wr_id;
1476
b390afd8 1477 ret = ibv_poll_cq(cq, 1, &wc);
2da776db
MH
1478
1479 if (!ret) {
1480 *wr_id_out = RDMA_WRID_NONE;
1481 return 0;
1482 }
1483
1484 if (ret < 0) {
733252de 1485 error_report("ibv_poll_cq return %d", ret);
2da776db
MH
1486 return ret;
1487 }
1488
1489 wr_id = wc.wr_id & RDMA_WRID_TYPE_MASK;
1490
1491 if (wc.status != IBV_WC_SUCCESS) {
1492 fprintf(stderr, "ibv_poll_cq wc.status=%d %s!\n",
1493 wc.status, ibv_wc_status_str(wc.status));
1494 fprintf(stderr, "ibv_poll_cq wrid=%s!\n", wrid_desc[wr_id]);
1495
1496 return -1;
1497 }
1498
1499 if (rdma->control_ready_expected &&
1500 (wr_id >= RDMA_WRID_RECV_CONTROL)) {
733252de 1501 trace_qemu_rdma_poll_recv(wrid_desc[RDMA_WRID_RECV_CONTROL],
2da776db
MH
1502 wr_id - RDMA_WRID_RECV_CONTROL, wr_id, rdma->nb_sent);
1503 rdma->control_ready_expected = 0;
1504 }
1505
1506 if (wr_id == RDMA_WRID_RDMA_WRITE) {
1507 uint64_t chunk =
1508 (wc.wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
1509 uint64_t index =
1510 (wc.wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
1511 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
1512
733252de 1513 trace_qemu_rdma_poll_write(print_wrid(wr_id), wr_id, rdma->nb_sent,
fbce8c25
SW
1514 index, chunk, block->local_host_addr,
1515 (void *)(uintptr_t)block->remote_host_addr);
2da776db
MH
1516
1517 clear_bit(chunk, block->transit_bitmap);
1518
1519 if (rdma->nb_sent > 0) {
1520 rdma->nb_sent--;
1521 }
2da776db 1522 } else {
733252de 1523 trace_qemu_rdma_poll_other(print_wrid(wr_id), wr_id, rdma->nb_sent);
2da776db
MH
1524 }
1525
1526 *wr_id_out = wc.wr_id;
88571882
IY
1527 if (byte_len) {
1528 *byte_len = wc.byte_len;
1529 }
2da776db
MH
1530
1531 return 0;
1532}
1533
9c98cfbe
DDAG
1534/* Wait for activity on the completion channel.
1535 * Returns 0 on success, none-0 on error.
1536 */
b390afd8
LZ
1537static int qemu_rdma_wait_comp_channel(RDMAContext *rdma,
1538 struct ibv_comp_channel *comp_channel)
9c98cfbe 1539{
d5882995
LC
1540 struct rdma_cm_event *cm_event;
1541 int ret = -1;
1542
9c98cfbe
DDAG
1543 /*
1544 * Coroutine doesn't start until migration_fd_process_incoming()
1545 * so don't yield unless we know we're running inside of a coroutine.
1546 */
f5627c2a
LC
1547 if (rdma->migration_started_on_destination &&
1548 migration_incoming_get_current()->state == MIGRATION_STATUS_ACTIVE) {
b390afd8 1549 yield_until_fd_readable(comp_channel->fd);
9c98cfbe
DDAG
1550 } else {
1551 /* This is the source side, we're in a separate thread
1552 * or destination prior to migration_fd_process_incoming()
3a4452d8 1553 * after postcopy, the destination also in a separate thread.
9c98cfbe
DDAG
1554 * we can't yield; so we have to poll the fd.
1555 * But we need to be able to handle 'cancel' or an error
1556 * without hanging forever.
1557 */
1558 while (!rdma->error_state && !rdma->received_error) {
d5882995 1559 GPollFD pfds[2];
b390afd8 1560 pfds[0].fd = comp_channel->fd;
9c98cfbe 1561 pfds[0].events = G_IO_IN | G_IO_HUP | G_IO_ERR;
d5882995
LC
1562 pfds[0].revents = 0;
1563
1564 pfds[1].fd = rdma->channel->fd;
1565 pfds[1].events = G_IO_IN | G_IO_HUP | G_IO_ERR;
1566 pfds[1].revents = 0;
1567
9c98cfbe 1568 /* 0.1s timeout, should be fine for a 'cancel' */
d5882995
LC
1569 switch (qemu_poll_ns(pfds, 2, 100 * 1000 * 1000)) {
1570 case 2:
9c98cfbe 1571 case 1: /* fd active */
d5882995
LC
1572 if (pfds[0].revents) {
1573 return 0;
1574 }
1575
1576 if (pfds[1].revents) {
1577 ret = rdma_get_cm_event(rdma->channel, &cm_event);
6b8c2eb5
LZ
1578 if (ret) {
1579 error_report("failed to get cm event while wait "
1580 "completion channel");
1581 return -EPIPE;
d5882995
LC
1582 }
1583
1584 error_report("receive cm event while wait comp channel,"
1585 "cm event is %d", cm_event->event);
1586 if (cm_event->event == RDMA_CM_EVENT_DISCONNECTED ||
1587 cm_event->event == RDMA_CM_EVENT_DEVICE_REMOVAL) {
6b8c2eb5 1588 rdma_ack_cm_event(cm_event);
d5882995
LC
1589 return -EPIPE;
1590 }
6b8c2eb5 1591 rdma_ack_cm_event(cm_event);
d5882995
LC
1592 }
1593 break;
9c98cfbe
DDAG
1594
1595 case 0: /* Timeout, go around again */
1596 break;
1597
1598 default: /* Error of some type -
1599 * I don't trust errno from qemu_poll_ns
1600 */
1601 error_report("%s: poll failed", __func__);
1602 return -EPIPE;
1603 }
1604
1605 if (migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) {
1606 /* Bail out and let the cancellation happen */
1607 return -EPIPE;
1608 }
1609 }
1610 }
1611
1612 if (rdma->received_error) {
1613 return -EPIPE;
1614 }
1615 return rdma->error_state;
1616}
1617
b390afd8
LZ
1618static struct ibv_comp_channel *to_channel(RDMAContext *rdma, int wrid)
1619{
1620 return wrid < RDMA_WRID_RECV_CONTROL ? rdma->send_comp_channel :
1621 rdma->recv_comp_channel;
1622}
1623
1624static struct ibv_cq *to_cq(RDMAContext *rdma, int wrid)
1625{
1626 return wrid < RDMA_WRID_RECV_CONTROL ? rdma->send_cq : rdma->recv_cq;
1627}
1628
2da776db
MH
1629/*
1630 * Block until the next work request has completed.
1631 *
1632 * First poll to see if a work request has already completed,
1633 * otherwise block.
1634 *
1635 * If we encounter completed work requests for IDs other than
1636 * the one we're interested in, then that's generally an error.
1637 *
1638 * The only exception is actual RDMA Write completions. These
1639 * completions only need to be recorded, but do not actually
1640 * need further processing.
1641 */
88571882
IY
1642static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested,
1643 uint32_t *byte_len)
2da776db
MH
1644{
1645 int num_cq_events = 0, ret = 0;
1646 struct ibv_cq *cq;
1647 void *cq_ctx;
1648 uint64_t wr_id = RDMA_WRID_NONE, wr_id_in;
b390afd8
LZ
1649 struct ibv_comp_channel *ch = to_channel(rdma, wrid_requested);
1650 struct ibv_cq *poll_cq = to_cq(rdma, wrid_requested);
2da776db 1651
b390afd8 1652 if (ibv_req_notify_cq(poll_cq, 0)) {
2da776db
MH
1653 return -1;
1654 }
1655 /* poll cq first */
1656 while (wr_id != wrid_requested) {
b390afd8 1657 ret = qemu_rdma_poll(rdma, poll_cq, &wr_id_in, byte_len);
2da776db
MH
1658 if (ret < 0) {
1659 return ret;
1660 }
1661
1662 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
1663
1664 if (wr_id == RDMA_WRID_NONE) {
1665 break;
1666 }
1667 if (wr_id != wrid_requested) {
733252de
DDAG
1668 trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested),
1669 wrid_requested, print_wrid(wr_id), wr_id);
2da776db
MH
1670 }
1671 }
1672
1673 if (wr_id == wrid_requested) {
1674 return 0;
1675 }
1676
1677 while (1) {
b390afd8 1678 ret = qemu_rdma_wait_comp_channel(rdma, ch);
9c98cfbe
DDAG
1679 if (ret) {
1680 goto err_block_for_wrid;
2da776db
MH
1681 }
1682
b390afd8 1683 ret = ibv_get_cq_event(ch, &cq, &cq_ctx);
0b3c15f0 1684 if (ret) {
2da776db
MH
1685 perror("ibv_get_cq_event");
1686 goto err_block_for_wrid;
1687 }
1688
1689 num_cq_events++;
1690
0b3c15f0
DDAG
1691 ret = -ibv_req_notify_cq(cq, 0);
1692 if (ret) {
2da776db
MH
1693 goto err_block_for_wrid;
1694 }
1695
1696 while (wr_id != wrid_requested) {
b390afd8 1697 ret = qemu_rdma_poll(rdma, poll_cq, &wr_id_in, byte_len);
2da776db
MH
1698 if (ret < 0) {
1699 goto err_block_for_wrid;
1700 }
1701
1702 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
1703
1704 if (wr_id == RDMA_WRID_NONE) {
1705 break;
1706 }
1707 if (wr_id != wrid_requested) {
733252de
DDAG
1708 trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested),
1709 wrid_requested, print_wrid(wr_id), wr_id);
2da776db
MH
1710 }
1711 }
1712
1713 if (wr_id == wrid_requested) {
1714 goto success_block_for_wrid;
1715 }
1716 }
1717
1718success_block_for_wrid:
1719 if (num_cq_events) {
1720 ibv_ack_cq_events(cq, num_cq_events);
1721 }
1722 return 0;
1723
1724err_block_for_wrid:
1725 if (num_cq_events) {
1726 ibv_ack_cq_events(cq, num_cq_events);
1727 }
0b3c15f0
DDAG
1728
1729 rdma->error_state = ret;
2da776db
MH
1730 return ret;
1731}
1732
1733/*
1734 * Post a SEND message work request for the control channel
1735 * containing some data and block until the post completes.
1736 */
1737static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf,
1738 RDMAControlHeader *head)
1739{
1740 int ret = 0;
1f22364b 1741 RDMAWorkRequestData *wr = &rdma->wr_data[RDMA_WRID_CONTROL];
2da776db
MH
1742 struct ibv_send_wr *bad_wr;
1743 struct ibv_sge sge = {
fbce8c25 1744 .addr = (uintptr_t)(wr->control),
2da776db
MH
1745 .length = head->len + sizeof(RDMAControlHeader),
1746 .lkey = wr->control_mr->lkey,
1747 };
1748 struct ibv_send_wr send_wr = {
1749 .wr_id = RDMA_WRID_SEND_CONTROL,
1750 .opcode = IBV_WR_SEND,
1751 .send_flags = IBV_SEND_SIGNALED,
1752 .sg_list = &sge,
1753 .num_sge = 1,
1754 };
1755
482a33c5 1756 trace_qemu_rdma_post_send_control(control_desc(head->type));
2da776db
MH
1757
1758 /*
1759 * We don't actually need to do a memcpy() in here if we used
1760 * the "sge" properly, but since we're only sending control messages
1761 * (not RAM in a performance-critical path), then its OK for now.
1762 *
1763 * The copy makes the RDMAControlHeader simpler to manipulate
1764 * for the time being.
1765 */
6f1484ed 1766 assert(head->len <= RDMA_CONTROL_MAX_BUFFER - sizeof(*head));
2da776db
MH
1767 memcpy(wr->control, head, sizeof(RDMAControlHeader));
1768 control_to_network((void *) wr->control);
1769
1770 if (buf) {
1771 memcpy(wr->control + sizeof(RDMAControlHeader), buf, head->len);
1772 }
1773
1774
e325b49a 1775 ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
2da776db 1776
e325b49a 1777 if (ret > 0) {
733252de 1778 error_report("Failed to use post IB SEND for control");
e325b49a 1779 return -ret;
2da776db
MH
1780 }
1781
88571882 1782 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_SEND_CONTROL, NULL);
2da776db 1783 if (ret < 0) {
733252de 1784 error_report("rdma migration: send polling control error");
2da776db
MH
1785 }
1786
1787 return ret;
1788}
1789
1790/*
1791 * Post a RECV work request in anticipation of some future receipt
1792 * of data on the control channel.
1793 */
1794static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx)
1795{
1796 struct ibv_recv_wr *bad_wr;
1797 struct ibv_sge sge = {
fbce8c25 1798 .addr = (uintptr_t)(rdma->wr_data[idx].control),
2da776db
MH
1799 .length = RDMA_CONTROL_MAX_BUFFER,
1800 .lkey = rdma->wr_data[idx].control_mr->lkey,
1801 };
1802
1803 struct ibv_recv_wr recv_wr = {
1804 .wr_id = RDMA_WRID_RECV_CONTROL + idx,
1805 .sg_list = &sge,
1806 .num_sge = 1,
1807 };
1808
1809
1810 if (ibv_post_recv(rdma->qp, &recv_wr, &bad_wr)) {
1811 return -1;
1812 }
1813
1814 return 0;
1815}
1816
1817/*
1818 * Block and wait for a RECV control channel message to arrive.
1819 */
1820static int qemu_rdma_exchange_get_response(RDMAContext *rdma,
1821 RDMAControlHeader *head, int expecting, int idx)
1822{
88571882
IY
1823 uint32_t byte_len;
1824 int ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RECV_CONTROL + idx,
1825 &byte_len);
2da776db
MH
1826
1827 if (ret < 0) {
733252de 1828 error_report("rdma migration: recv polling control error!");
2da776db
MH
1829 return ret;
1830 }
1831
1832 network_to_control((void *) rdma->wr_data[idx].control);
1833 memcpy(head, rdma->wr_data[idx].control, sizeof(RDMAControlHeader));
1834
482a33c5 1835 trace_qemu_rdma_exchange_get_response_start(control_desc(expecting));
2da776db
MH
1836
1837 if (expecting == RDMA_CONTROL_NONE) {
482a33c5 1838 trace_qemu_rdma_exchange_get_response_none(control_desc(head->type),
733252de 1839 head->type);
2da776db 1840 } else if (head->type != expecting || head->type == RDMA_CONTROL_ERROR) {
733252de
DDAG
1841 error_report("Was expecting a %s (%d) control message"
1842 ", but got: %s (%d), length: %d",
482a33c5
DDAG
1843 control_desc(expecting), expecting,
1844 control_desc(head->type), head->type, head->len);
cd5ea070
DDAG
1845 if (head->type == RDMA_CONTROL_ERROR) {
1846 rdma->received_error = true;
1847 }
2da776db
MH
1848 return -EIO;
1849 }
6f1484ed 1850 if (head->len > RDMA_CONTROL_MAX_BUFFER - sizeof(*head)) {
81b07353 1851 error_report("too long length: %d", head->len);
6f1484ed
IY
1852 return -EINVAL;
1853 }
88571882 1854 if (sizeof(*head) + head->len != byte_len) {
733252de 1855 error_report("Malformed length: %d byte_len %d", head->len, byte_len);
88571882
IY
1856 return -EINVAL;
1857 }
2da776db
MH
1858
1859 return 0;
1860}
1861
1862/*
1863 * When a RECV work request has completed, the work request's
1864 * buffer is pointed at the header.
1865 *
1866 * This will advance the pointer to the data portion
1867 * of the control message of the work request's buffer that
1868 * was populated after the work request finished.
1869 */
1870static void qemu_rdma_move_header(RDMAContext *rdma, int idx,
1871 RDMAControlHeader *head)
1872{
1873 rdma->wr_data[idx].control_len = head->len;
1874 rdma->wr_data[idx].control_curr =
1875 rdma->wr_data[idx].control + sizeof(RDMAControlHeader);
1876}
1877
1878/*
1879 * This is an 'atomic' high-level operation to deliver a single, unified
1880 * control-channel message.
1881 *
1882 * Additionally, if the user is expecting some kind of reply to this message,
1883 * they can request a 'resp' response message be filled in by posting an
1884 * additional work request on behalf of the user and waiting for an additional
1885 * completion.
1886 *
1887 * The extra (optional) response is used during registration to us from having
1888 * to perform an *additional* exchange of message just to provide a response by
1889 * instead piggy-backing on the acknowledgement.
1890 */
1891static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
1892 uint8_t *data, RDMAControlHeader *resp,
1893 int *resp_idx,
1894 int (*callback)(RDMAContext *rdma))
1895{
1896 int ret = 0;
1897
1898 /*
1899 * Wait until the dest is ready before attempting to deliver the message
1900 * by waiting for a READY message.
1901 */
1902 if (rdma->control_ready_expected) {
1903 RDMAControlHeader resp;
1904 ret = qemu_rdma_exchange_get_response(rdma,
1905 &resp, RDMA_CONTROL_READY, RDMA_WRID_READY);
1906 if (ret < 0) {
1907 return ret;
1908 }
1909 }
1910
1911 /*
1912 * If the user is expecting a response, post a WR in anticipation of it.
1913 */
1914 if (resp) {
1915 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_DATA);
1916 if (ret) {
733252de 1917 error_report("rdma migration: error posting"
2da776db
MH
1918 " extra control recv for anticipated result!");
1919 return ret;
1920 }
1921 }
1922
1923 /*
1924 * Post a WR to replace the one we just consumed for the READY message.
1925 */
1926 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
1927 if (ret) {
733252de 1928 error_report("rdma migration: error posting first control recv!");
2da776db
MH
1929 return ret;
1930 }
1931
1932 /*
1933 * Deliver the control message that was requested.
1934 */
1935 ret = qemu_rdma_post_send_control(rdma, data, head);
1936
1937 if (ret < 0) {
733252de 1938 error_report("Failed to send control buffer!");
2da776db
MH
1939 return ret;
1940 }
1941
1942 /*
1943 * If we're expecting a response, block and wait for it.
1944 */
1945 if (resp) {
1946 if (callback) {
733252de 1947 trace_qemu_rdma_exchange_send_issue_callback();
2da776db
MH
1948 ret = callback(rdma);
1949 if (ret < 0) {
1950 return ret;
1951 }
1952 }
1953
482a33c5 1954 trace_qemu_rdma_exchange_send_waiting(control_desc(resp->type));
2da776db
MH
1955 ret = qemu_rdma_exchange_get_response(rdma, resp,
1956 resp->type, RDMA_WRID_DATA);
1957
1958 if (ret < 0) {
1959 return ret;
1960 }
1961
1962 qemu_rdma_move_header(rdma, RDMA_WRID_DATA, resp);
1963 if (resp_idx) {
1964 *resp_idx = RDMA_WRID_DATA;
1965 }
482a33c5 1966 trace_qemu_rdma_exchange_send_received(control_desc(resp->type));
2da776db
MH
1967 }
1968
1969 rdma->control_ready_expected = 1;
1970
1971 return 0;
1972}
1973
1974/*
1975 * This is an 'atomic' high-level operation to receive a single, unified
1976 * control-channel message.
1977 */
1978static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader *head,
1979 int expecting)
1980{
1981 RDMAControlHeader ready = {
1982 .len = 0,
1983 .type = RDMA_CONTROL_READY,
1984 .repeat = 1,
1985 };
1986 int ret;
1987
1988 /*
1989 * Inform the source that we're ready to receive a message.
1990 */
1991 ret = qemu_rdma_post_send_control(rdma, NULL, &ready);
1992
1993 if (ret < 0) {
733252de 1994 error_report("Failed to send control buffer!");
2da776db
MH
1995 return ret;
1996 }
1997
1998 /*
1999 * Block and wait for the message.
2000 */
2001 ret = qemu_rdma_exchange_get_response(rdma, head,
2002 expecting, RDMA_WRID_READY);
2003
2004 if (ret < 0) {
2005 return ret;
2006 }
2007
2008 qemu_rdma_move_header(rdma, RDMA_WRID_READY, head);
2009
2010 /*
2011 * Post a new RECV work request to replace the one we just consumed.
2012 */
2013 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
2014 if (ret) {
733252de 2015 error_report("rdma migration: error posting second control recv!");
2da776db
MH
2016 return ret;
2017 }
2018
2019 return 0;
2020}
2021
2022/*
2023 * Write an actual chunk of memory using RDMA.
2024 *
2025 * If we're using dynamic registration on the dest-side, we have to
2026 * send a registration command first.
2027 */
2028static int qemu_rdma_write_one(QEMUFile *f, RDMAContext *rdma,
2029 int current_index, uint64_t current_addr,
2030 uint64_t length)
2031{
2032 struct ibv_sge sge;
2033 struct ibv_send_wr send_wr = { 0 };
2034 struct ibv_send_wr *bad_wr;
2035 int reg_result_idx, ret, count = 0;
2036 uint64_t chunk, chunks;
2037 uint8_t *chunk_start, *chunk_end;
2038 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[current_index]);
2039 RDMARegister reg;
2040 RDMARegisterResult *reg_result;
2041 RDMAControlHeader resp = { .type = RDMA_CONTROL_REGISTER_RESULT };
2042 RDMAControlHeader head = { .len = sizeof(RDMARegister),
2043 .type = RDMA_CONTROL_REGISTER_REQUEST,
2044 .repeat = 1,
2045 };
2046
2047retry:
fbce8c25 2048 sge.addr = (uintptr_t)(block->local_host_addr +
2da776db
MH
2049 (current_addr - block->offset));
2050 sge.length = length;
2051
fbce8c25
SW
2052 chunk = ram_chunk_index(block->local_host_addr,
2053 (uint8_t *)(uintptr_t)sge.addr);
2da776db
MH
2054 chunk_start = ram_chunk_start(block, chunk);
2055
2056 if (block->is_ram_block) {
2057 chunks = length / (1UL << RDMA_REG_CHUNK_SHIFT);
2058
2059 if (chunks && ((length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
2060 chunks--;
2061 }
2062 } else {
2063 chunks = block->length / (1UL << RDMA_REG_CHUNK_SHIFT);
2064
2065 if (chunks && ((block->length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
2066 chunks--;
2067 }
2068 }
2069
733252de
DDAG
2070 trace_qemu_rdma_write_one_top(chunks + 1,
2071 (chunks + 1) *
2072 (1UL << RDMA_REG_CHUNK_SHIFT) / 1024 / 1024);
2da776db
MH
2073
2074 chunk_end = ram_chunk_end(block, chunk + chunks);
2075
2da776db
MH
2076
2077 while (test_bit(chunk, block->transit_bitmap)) {
2078 (void)count;
733252de 2079 trace_qemu_rdma_write_one_block(count++, current_index, chunk,
2da776db
MH
2080 sge.addr, length, rdma->nb_sent, block->nb_chunks);
2081
88571882 2082 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
2da776db
MH
2083
2084 if (ret < 0) {
733252de 2085 error_report("Failed to Wait for previous write to complete "
2da776db 2086 "block %d chunk %" PRIu64
733252de 2087 " current %" PRIu64 " len %" PRIu64 " %d",
2da776db
MH
2088 current_index, chunk, sge.addr, length, rdma->nb_sent);
2089 return ret;
2090 }
2091 }
2092
2093 if (!rdma->pin_all || !block->is_ram_block) {
2094 if (!block->remote_keys[chunk]) {
2095 /*
2096 * This chunk has not yet been registered, so first check to see
2097 * if the entire chunk is zero. If so, tell the other size to
2098 * memset() + madvise() the entire chunk without RDMA.
2099 */
2100
a1febc49 2101 if (buffer_is_zero((void *)(uintptr_t)sge.addr, length)) {
2da776db
MH
2102 RDMACompress comp = {
2103 .offset = current_addr,
2104 .value = 0,
2105 .block_idx = current_index,
2106 .length = length,
2107 };
2108
2109 head.len = sizeof(comp);
2110 head.type = RDMA_CONTROL_COMPRESS;
2111
733252de
DDAG
2112 trace_qemu_rdma_write_one_zero(chunk, sge.length,
2113 current_index, current_addr);
2da776db 2114
b12f7777 2115 compress_to_network(rdma, &comp);
2da776db
MH
2116 ret = qemu_rdma_exchange_send(rdma, &head,
2117 (uint8_t *) &comp, NULL, NULL, NULL);
2118
2119 if (ret < 0) {
2120 return -EIO;
2121 }
2122
2123 acct_update_position(f, sge.length, true);
2124
2125 return 1;
2126 }
2127
2128 /*
2129 * Otherwise, tell other side to register.
2130 */
2131 reg.current_index = current_index;
2132 if (block->is_ram_block) {
2133 reg.key.current_addr = current_addr;
2134 } else {
2135 reg.key.chunk = chunk;
2136 }
2137 reg.chunks = chunks;
2138
733252de
DDAG
2139 trace_qemu_rdma_write_one_sendreg(chunk, sge.length, current_index,
2140 current_addr);
2da776db 2141
b12f7777 2142 register_to_network(rdma, &reg);
2da776db
MH
2143 ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
2144 &resp, &reg_result_idx, NULL);
2145 if (ret < 0) {
2146 return ret;
2147 }
2148
2149 /* try to overlap this single registration with the one we sent. */
3ac040c0 2150 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
2da776db
MH
2151 &sge.lkey, NULL, chunk,
2152 chunk_start, chunk_end)) {
733252de 2153 error_report("cannot get lkey");
2da776db
MH
2154 return -EINVAL;
2155 }
2156
2157 reg_result = (RDMARegisterResult *)
2158 rdma->wr_data[reg_result_idx].control_curr;
2159
2160 network_to_result(reg_result);
2161
733252de
DDAG
2162 trace_qemu_rdma_write_one_recvregres(block->remote_keys[chunk],
2163 reg_result->rkey, chunk);
2da776db
MH
2164
2165 block->remote_keys[chunk] = reg_result->rkey;
2166 block->remote_host_addr = reg_result->host_addr;
2167 } else {
2168 /* already registered before */
3ac040c0 2169 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
2da776db
MH
2170 &sge.lkey, NULL, chunk,
2171 chunk_start, chunk_end)) {
733252de 2172 error_report("cannot get lkey!");
2da776db
MH
2173 return -EINVAL;
2174 }
2175 }
2176
2177 send_wr.wr.rdma.rkey = block->remote_keys[chunk];
2178 } else {
2179 send_wr.wr.rdma.rkey = block->remote_rkey;
2180
3ac040c0 2181 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
2da776db
MH
2182 &sge.lkey, NULL, chunk,
2183 chunk_start, chunk_end)) {
733252de 2184 error_report("cannot get lkey!");
2da776db
MH
2185 return -EINVAL;
2186 }
2187 }
2188
2189 /*
2190 * Encode the ram block index and chunk within this wrid.
2191 * We will use this information at the time of completion
2192 * to figure out which bitmap to check against and then which
2193 * chunk in the bitmap to look for.
2194 */
2195 send_wr.wr_id = qemu_rdma_make_wrid(RDMA_WRID_RDMA_WRITE,
2196 current_index, chunk);
2197
2198 send_wr.opcode = IBV_WR_RDMA_WRITE;
2199 send_wr.send_flags = IBV_SEND_SIGNALED;
2200 send_wr.sg_list = &sge;
2201 send_wr.num_sge = 1;
2202 send_wr.wr.rdma.remote_addr = block->remote_host_addr +
2203 (current_addr - block->offset);
2204
733252de
DDAG
2205 trace_qemu_rdma_write_one_post(chunk, sge.addr, send_wr.wr.rdma.remote_addr,
2206 sge.length);
2da776db
MH
2207
2208 /*
2209 * ibv_post_send() does not return negative error numbers,
2210 * per the specification they are positive - no idea why.
2211 */
2212 ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
2213
2214 if (ret == ENOMEM) {
733252de 2215 trace_qemu_rdma_write_one_queue_full();
88571882 2216 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
2da776db 2217 if (ret < 0) {
733252de
DDAG
2218 error_report("rdma migration: failed to make "
2219 "room in full send queue! %d", ret);
2da776db
MH
2220 return ret;
2221 }
2222
2223 goto retry;
2224
2225 } else if (ret > 0) {
2226 perror("rdma migration: post rdma write failed");
2227 return -ret;
2228 }
2229
2230 set_bit(chunk, block->transit_bitmap);
2231 acct_update_position(f, sge.length, false);
2232 rdma->total_writes++;
2233
2234 return 0;
2235}
2236
2237/*
2238 * Push out any unwritten RDMA operations.
2239 *
2240 * We support sending out multiple chunks at the same time.
2241 * Not all of them need to get signaled in the completion queue.
2242 */
2243static int qemu_rdma_write_flush(QEMUFile *f, RDMAContext *rdma)
2244{
2245 int ret;
2246
2247 if (!rdma->current_length) {
2248 return 0;
2249 }
2250
2251 ret = qemu_rdma_write_one(f, rdma,
2252 rdma->current_index, rdma->current_addr, rdma->current_length);
2253
2254 if (ret < 0) {
2255 return ret;
2256 }
2257
2258 if (ret == 0) {
2259 rdma->nb_sent++;
733252de 2260 trace_qemu_rdma_write_flush(rdma->nb_sent);
2da776db
MH
2261 }
2262
2263 rdma->current_length = 0;
2264 rdma->current_addr = 0;
2265
2266 return 0;
2267}
2268
2269static inline int qemu_rdma_buffer_mergable(RDMAContext *rdma,
2270 uint64_t offset, uint64_t len)
2271{
44b59494
IY
2272 RDMALocalBlock *block;
2273 uint8_t *host_addr;
2274 uint8_t *chunk_end;
2275
2276 if (rdma->current_index < 0) {
2277 return 0;
2278 }
2279
2280 if (rdma->current_chunk < 0) {
2281 return 0;
2282 }
2283
2284 block = &(rdma->local_ram_blocks.block[rdma->current_index]);
2285 host_addr = block->local_host_addr + (offset - block->offset);
2286 chunk_end = ram_chunk_end(block, rdma->current_chunk);
2da776db
MH
2287
2288 if (rdma->current_length == 0) {
2289 return 0;
2290 }
2291
2292 /*
2293 * Only merge into chunk sequentially.
2294 */
2295 if (offset != (rdma->current_addr + rdma->current_length)) {
2296 return 0;
2297 }
2298
2da776db
MH
2299 if (offset < block->offset) {
2300 return 0;
2301 }
2302
2303 if ((offset + len) > (block->offset + block->length)) {
2304 return 0;
2305 }
2306
2da776db
MH
2307 if ((host_addr + len) > chunk_end) {
2308 return 0;
2309 }
2310
2311 return 1;
2312}
2313
2314/*
2315 * We're not actually writing here, but doing three things:
2316 *
2317 * 1. Identify the chunk the buffer belongs to.
2318 * 2. If the chunk is full or the buffer doesn't belong to the current
2319 * chunk, then start a new chunk and flush() the old chunk.
2320 * 3. To keep the hardware busy, we also group chunks into batches
2321 * and only require that a batch gets acknowledged in the completion
3a4452d8 2322 * queue instead of each individual chunk.
2da776db
MH
2323 */
2324static int qemu_rdma_write(QEMUFile *f, RDMAContext *rdma,
2325 uint64_t block_offset, uint64_t offset,
2326 uint64_t len)
2327{
2328 uint64_t current_addr = block_offset + offset;
2329 uint64_t index = rdma->current_index;
2330 uint64_t chunk = rdma->current_chunk;
2331 int ret;
2332
2333 /* If we cannot merge it, we flush the current buffer first. */
2334 if (!qemu_rdma_buffer_mergable(rdma, current_addr, len)) {
2335 ret = qemu_rdma_write_flush(f, rdma);
2336 if (ret) {
2337 return ret;
2338 }
2339 rdma->current_length = 0;
2340 rdma->current_addr = current_addr;
2341
2342 ret = qemu_rdma_search_ram_block(rdma, block_offset,
2343 offset, len, &index, &chunk);
2344 if (ret) {
733252de 2345 error_report("ram block search failed");
2da776db
MH
2346 return ret;
2347 }
2348 rdma->current_index = index;
2349 rdma->current_chunk = chunk;
2350 }
2351
2352 /* merge it */
2353 rdma->current_length += len;
2354
2355 /* flush it if buffer is too large */
2356 if (rdma->current_length >= RDMA_MERGE_MAX) {
2357 return qemu_rdma_write_flush(f, rdma);
2358 }
2359
2360 return 0;
2361}
2362
2363static void qemu_rdma_cleanup(RDMAContext *rdma)
2364{
c5e76115 2365 int idx;
2da776db 2366
5a91337c 2367 if (rdma->cm_id && rdma->connected) {
32bce196
DDAG
2368 if ((rdma->error_state ||
2369 migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) &&
2370 !rdma->received_error) {
2da776db
MH
2371 RDMAControlHeader head = { .len = 0,
2372 .type = RDMA_CONTROL_ERROR,
2373 .repeat = 1,
2374 };
733252de 2375 error_report("Early error. Sending error.");
2da776db
MH
2376 qemu_rdma_post_send_control(rdma, NULL, &head);
2377 }
2378
c5e76115 2379 rdma_disconnect(rdma->cm_id);
733252de 2380 trace_qemu_rdma_cleanup_disconnect();
5a91337c 2381 rdma->connected = false;
2da776db
MH
2382 }
2383
cf75e268
DDAG
2384 if (rdma->channel) {
2385 qemu_set_fd_handler(rdma->channel->fd, NULL, NULL, NULL);
2386 }
a97270ad
DDAG
2387 g_free(rdma->dest_blocks);
2388 rdma->dest_blocks = NULL;
2da776db 2389
1f22364b 2390 for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2da776db
MH
2391 if (rdma->wr_data[idx].control_mr) {
2392 rdma->total_registrations--;
2393 ibv_dereg_mr(rdma->wr_data[idx].control_mr);
2394 }
2395 rdma->wr_data[idx].control_mr = NULL;
2396 }
2397
2398 if (rdma->local_ram_blocks.block) {
2399 while (rdma->local_ram_blocks.nb_blocks) {
03fcab38 2400 rdma_delete_block(rdma, &rdma->local_ram_blocks.block[0]);
2da776db
MH
2401 }
2402 }
2403
80b262e1
PR
2404 if (rdma->qp) {
2405 rdma_destroy_qp(rdma->cm_id);
2406 rdma->qp = NULL;
2407 }
b390afd8
LZ
2408 if (rdma->recv_cq) {
2409 ibv_destroy_cq(rdma->recv_cq);
2410 rdma->recv_cq = NULL;
2411 }
2412 if (rdma->send_cq) {
2413 ibv_destroy_cq(rdma->send_cq);
2414 rdma->send_cq = NULL;
2415 }
2416 if (rdma->recv_comp_channel) {
2417 ibv_destroy_comp_channel(rdma->recv_comp_channel);
2418 rdma->recv_comp_channel = NULL;
2da776db 2419 }
b390afd8
LZ
2420 if (rdma->send_comp_channel) {
2421 ibv_destroy_comp_channel(rdma->send_comp_channel);
2422 rdma->send_comp_channel = NULL;
2da776db
MH
2423 }
2424 if (rdma->pd) {
2425 ibv_dealloc_pd(rdma->pd);
2426 rdma->pd = NULL;
2427 }
2da776db
MH
2428 if (rdma->cm_id) {
2429 rdma_destroy_id(rdma->cm_id);
2430 rdma->cm_id = NULL;
2431 }
55cc1b59
LC
2432
2433 /* the destination side, listen_id and channel is shared */
80b262e1 2434 if (rdma->listen_id) {
55cc1b59
LC
2435 if (!rdma->is_return_path) {
2436 rdma_destroy_id(rdma->listen_id);
2437 }
80b262e1 2438 rdma->listen_id = NULL;
55cc1b59
LC
2439
2440 if (rdma->channel) {
2441 if (!rdma->is_return_path) {
2442 rdma_destroy_event_channel(rdma->channel);
2443 }
2444 rdma->channel = NULL;
2445 }
80b262e1 2446 }
55cc1b59 2447
2da776db
MH
2448 if (rdma->channel) {
2449 rdma_destroy_event_channel(rdma->channel);
2450 rdma->channel = NULL;
2451 }
e1d0fb37 2452 g_free(rdma->host);
44bcfd45 2453 g_free(rdma->host_port);
e1d0fb37 2454 rdma->host = NULL;
44bcfd45 2455 rdma->host_port = NULL;
2da776db
MH
2456}
2457
2458
bbfb89e3 2459static int qemu_rdma_source_init(RDMAContext *rdma, bool pin_all, Error **errp)
2da776db
MH
2460{
2461 int ret, idx;
2462 Error *local_err = NULL, **temp = &local_err;
2463
2464 /*
2465 * Will be validated against destination's actual capabilities
2466 * after the connect() completes.
2467 */
2468 rdma->pin_all = pin_all;
2469
2470 ret = qemu_rdma_resolve_host(rdma, temp);
2471 if (ret) {
2472 goto err_rdma_source_init;
2473 }
2474
2475 ret = qemu_rdma_alloc_pd_cq(rdma);
2476 if (ret) {
2477 ERROR(temp, "rdma migration: error allocating pd and cq! Your mlock()"
2478 " limits may be too low. Please check $ ulimit -a # and "
66988941 2479 "search for 'ulimit -l' in the output");
2da776db
MH
2480 goto err_rdma_source_init;
2481 }
2482
2483 ret = qemu_rdma_alloc_qp(rdma);
2484 if (ret) {
66988941 2485 ERROR(temp, "rdma migration: error allocating qp!");
2da776db
MH
2486 goto err_rdma_source_init;
2487 }
2488
2489 ret = qemu_rdma_init_ram_blocks(rdma);
2490 if (ret) {
66988941 2491 ERROR(temp, "rdma migration: error initializing ram blocks!");
2da776db
MH
2492 goto err_rdma_source_init;
2493 }
2494
760ff4be
DDAG
2495 /* Build the hash that maps from offset to RAMBlock */
2496 rdma->blockmap = g_hash_table_new(g_direct_hash, g_direct_equal);
2497 for (idx = 0; idx < rdma->local_ram_blocks.nb_blocks; idx++) {
2498 g_hash_table_insert(rdma->blockmap,
2499 (void *)(uintptr_t)rdma->local_ram_blocks.block[idx].offset,
2500 &rdma->local_ram_blocks.block[idx]);
2501 }
2502
1f22364b 2503 for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2da776db
MH
2504 ret = qemu_rdma_reg_control(rdma, idx);
2505 if (ret) {
66988941 2506 ERROR(temp, "rdma migration: error registering %d control!",
2da776db
MH
2507 idx);
2508 goto err_rdma_source_init;
2509 }
2510 }
2511
2512 return 0;
2513
2514err_rdma_source_init:
2515 error_propagate(errp, local_err);
2516 qemu_rdma_cleanup(rdma);
2517 return -1;
2518}
2519
e49e49dd
LZ
2520static int qemu_get_cm_event_timeout(RDMAContext *rdma,
2521 struct rdma_cm_event **cm_event,
2522 long msec, Error **errp)
2523{
2524 int ret;
2525 struct pollfd poll_fd = {
2526 .fd = rdma->channel->fd,
2527 .events = POLLIN,
2528 .revents = 0
2529 };
2530
2531 do {
2532 ret = poll(&poll_fd, 1, msec);
2533 } while (ret < 0 && errno == EINTR);
2534
2535 if (ret == 0) {
2536 ERROR(errp, "poll cm event timeout");
2537 return -1;
2538 } else if (ret < 0) {
2539 ERROR(errp, "failed to poll cm event, errno=%i", errno);
2540 return -1;
2541 } else if (poll_fd.revents & POLLIN) {
2542 return rdma_get_cm_event(rdma->channel, cm_event);
2543 } else {
2544 ERROR(errp, "no POLLIN event, revent=%x", poll_fd.revents);
2545 return -1;
2546 }
2547}
2548
2549static int qemu_rdma_connect(RDMAContext *rdma, Error **errp, bool return_path)
2da776db
MH
2550{
2551 RDMACapabilities cap = {
2552 .version = RDMA_CONTROL_VERSION_CURRENT,
2553 .flags = 0,
2554 };
2555 struct rdma_conn_param conn_param = { .initiator_depth = 2,
2556 .retry_count = 5,
2557 .private_data = &cap,
2558 .private_data_len = sizeof(cap),
2559 };
2560 struct rdma_cm_event *cm_event;
2561 int ret;
2562
2563 /*
2564 * Only negotiate the capability with destination if the user
2565 * on the source first requested the capability.
2566 */
2567 if (rdma->pin_all) {
733252de 2568 trace_qemu_rdma_connect_pin_all_requested();
2da776db
MH
2569 cap.flags |= RDMA_CAPABILITY_PIN_ALL;
2570 }
2571
2572 caps_to_network(&cap);
2573
9cf2bab2
DDAG
2574 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
2575 if (ret) {
2576 ERROR(errp, "posting second control recv");
2577 goto err_rdma_source_connect;
2578 }
2579
2da776db
MH
2580 ret = rdma_connect(rdma->cm_id, &conn_param);
2581 if (ret) {
2582 perror("rdma_connect");
66988941 2583 ERROR(errp, "connecting to destination!");
2da776db
MH
2584 goto err_rdma_source_connect;
2585 }
2586
e49e49dd
LZ
2587 if (return_path) {
2588 ret = qemu_get_cm_event_timeout(rdma, &cm_event, 5000, errp);
2589 } else {
2590 ret = rdma_get_cm_event(rdma->channel, &cm_event);
2591 }
2da776db
MH
2592 if (ret) {
2593 perror("rdma_get_cm_event after rdma_connect");
66988941 2594 ERROR(errp, "connecting to destination!");
2da776db
MH
2595 goto err_rdma_source_connect;
2596 }
2597
2598 if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
e5f60791 2599 error_report("rdma_get_cm_event != EVENT_ESTABLISHED after rdma_connect");
66988941 2600 ERROR(errp, "connecting to destination!");
2da776db 2601 rdma_ack_cm_event(cm_event);
2da776db
MH
2602 goto err_rdma_source_connect;
2603 }
5a91337c 2604 rdma->connected = true;
2da776db
MH
2605
2606 memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
2607 network_to_caps(&cap);
2608
2609 /*
2610 * Verify that the *requested* capabilities are supported by the destination
2611 * and disable them otherwise.
2612 */
2613 if (rdma->pin_all && !(cap.flags & RDMA_CAPABILITY_PIN_ALL)) {
2614 ERROR(errp, "Server cannot support pinning all memory. "
66988941 2615 "Will register memory dynamically.");
2da776db
MH
2616 rdma->pin_all = false;
2617 }
2618
733252de 2619 trace_qemu_rdma_connect_pin_all_outcome(rdma->pin_all);
2da776db
MH
2620
2621 rdma_ack_cm_event(cm_event);
2622
2da776db
MH
2623 rdma->control_ready_expected = 1;
2624 rdma->nb_sent = 0;
2625 return 0;
2626
2627err_rdma_source_connect:
2628 qemu_rdma_cleanup(rdma);
2629 return -1;
2630}
2631
2632static int qemu_rdma_dest_init(RDMAContext *rdma, Error **errp)
2633{
1dbd2fd9 2634 int ret, idx;
2da776db
MH
2635 struct rdma_cm_id *listen_id;
2636 char ip[40] = "unknown";
1dbd2fd9 2637 struct rdma_addrinfo *res, *e;
b58c8552 2638 char port_str[16];
f736e414 2639 int reuse = 1;
2da776db 2640
1f22364b 2641 for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2da776db
MH
2642 rdma->wr_data[idx].control_len = 0;
2643 rdma->wr_data[idx].control_curr = NULL;
2644 }
2645
1dbd2fd9 2646 if (!rdma->host || !rdma->host[0]) {
66988941 2647 ERROR(errp, "RDMA host is not set!");
2da776db
MH
2648 rdma->error_state = -EINVAL;
2649 return -1;
2650 }
2651 /* create CM channel */
2652 rdma->channel = rdma_create_event_channel();
2653 if (!rdma->channel) {
66988941 2654 ERROR(errp, "could not create rdma event channel");
2da776db
MH
2655 rdma->error_state = -EINVAL;
2656 return -1;
2657 }
2658
2659 /* create CM id */
2660 ret = rdma_create_id(rdma->channel, &listen_id, NULL, RDMA_PS_TCP);
2661 if (ret) {
66988941 2662 ERROR(errp, "could not create cm_id!");
2da776db
MH
2663 goto err_dest_init_create_listen_id;
2664 }
2665
b58c8552
MH
2666 snprintf(port_str, 16, "%d", rdma->port);
2667 port_str[15] = '\0';
2da776db 2668
1dbd2fd9
MT
2669 ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res);
2670 if (ret < 0) {
2671 ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host);
2672 goto err_dest_init_bind_addr;
2673 }
6470215b 2674
f736e414
JW
2675 ret = rdma_set_option(listen_id, RDMA_OPTION_ID, RDMA_OPTION_ID_REUSEADDR,
2676 &reuse, sizeof reuse);
2677 if (ret) {
2678 ERROR(errp, "Error: could not set REUSEADDR option");
2679 goto err_dest_init_bind_addr;
2680 }
1dbd2fd9
MT
2681 for (e = res; e != NULL; e = e->ai_next) {
2682 inet_ntop(e->ai_family,
2683 &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip);
2684 trace_qemu_rdma_dest_init_trying(rdma->host, ip);
2685 ret = rdma_bind_addr(listen_id, e->ai_dst_addr);
2686 if (ret) {
2687 continue;
2da776db 2688 }
1dbd2fd9 2689 if (e->ai_family == AF_INET6) {
bbfb89e3 2690 ret = qemu_rdma_broken_ipv6_kernel(listen_id->verbs, errp);
1dbd2fd9
MT
2691 if (ret) {
2692 continue;
6470215b
MH
2693 }
2694 }
1dbd2fd9
MT
2695 break;
2696 }
b58c8552 2697
f53b450a 2698 rdma_freeaddrinfo(res);
1dbd2fd9 2699 if (!e) {
6470215b
MH
2700 ERROR(errp, "Error: could not rdma_bind_addr!");
2701 goto err_dest_init_bind_addr;
2da776db 2702 }
2da776db
MH
2703
2704 rdma->listen_id = listen_id;
2705 qemu_rdma_dump_gid("dest_init", listen_id);
2706 return 0;
2707
2708err_dest_init_bind_addr:
2709 rdma_destroy_id(listen_id);
2710err_dest_init_create_listen_id:
2711 rdma_destroy_event_channel(rdma->channel);
2712 rdma->channel = NULL;
2713 rdma->error_state = ret;
2714 return ret;
2715
2716}
2717
55cc1b59
LC
2718static void qemu_rdma_return_path_dest_init(RDMAContext *rdma_return_path,
2719 RDMAContext *rdma)
2720{
2721 int idx;
2722
2723 for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2724 rdma_return_path->wr_data[idx].control_len = 0;
2725 rdma_return_path->wr_data[idx].control_curr = NULL;
2726 }
2727
2728 /*the CM channel and CM id is shared*/
2729 rdma_return_path->channel = rdma->channel;
2730 rdma_return_path->listen_id = rdma->listen_id;
2731
2732 rdma->return_path = rdma_return_path;
2733 rdma_return_path->return_path = rdma;
2734 rdma_return_path->is_return_path = true;
2735}
2736
2da776db
MH
2737static void *qemu_rdma_data_init(const char *host_port, Error **errp)
2738{
2739 RDMAContext *rdma = NULL;
2740 InetSocketAddress *addr;
2741
2742 if (host_port) {
97f3ad35 2743 rdma = g_new0(RDMAContext, 1);
2da776db
MH
2744 rdma->current_index = -1;
2745 rdma->current_chunk = -1;
2746
0785bd7a
MA
2747 addr = g_new(InetSocketAddress, 1);
2748 if (!inet_parse(addr, host_port, NULL)) {
2da776db
MH
2749 rdma->port = atoi(addr->port);
2750 rdma->host = g_strdup(addr->host);
44bcfd45 2751 rdma->host_port = g_strdup(host_port);
2da776db
MH
2752 } else {
2753 ERROR(errp, "bad RDMA migration address '%s'", host_port);
2754 g_free(rdma);
e325b49a 2755 rdma = NULL;
2da776db 2756 }
e325b49a
MH
2757
2758 qapi_free_InetSocketAddress(addr);
2da776db
MH
2759 }
2760
2761 return rdma;
2762}
2763
2764/*
2765 * QEMUFile interface to the control channel.
2766 * SEND messages for control only.
971ae6ef 2767 * VM's ram is handled with regular RDMA messages.
2da776db 2768 */
6ddd2d76
DB
2769static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
2770 const struct iovec *iov,
2771 size_t niov,
2772 int *fds,
2773 size_t nfds,
b88651cb 2774 int flags,
6ddd2d76
DB
2775 Error **errp)
2776{
2777 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2778 QEMUFile *f = rioc->file;
74637e6f 2779 RDMAContext *rdma;
2da776db 2780 int ret;
6ddd2d76
DB
2781 ssize_t done = 0;
2782 size_t i;
f38f6d41 2783 size_t len = 0;
2da776db 2784
987ab2a5 2785 RCU_READ_LOCK_GUARD();
d73415a3 2786 rdma = qatomic_rcu_read(&rioc->rdmaout);
74637e6f
LC
2787
2788 if (!rdma) {
74637e6f
LC
2789 return -EIO;
2790 }
2791
2da776db
MH
2792 CHECK_ERROR_STATE();
2793
2794 /*
2795 * Push out any writes that
971ae6ef 2796 * we're queued up for VM's ram.
2da776db
MH
2797 */
2798 ret = qemu_rdma_write_flush(f, rdma);
2799 if (ret < 0) {
2800 rdma->error_state = ret;
2801 return ret;
2802 }
2803
6ddd2d76
DB
2804 for (i = 0; i < niov; i++) {
2805 size_t remaining = iov[i].iov_len;
2806 uint8_t * data = (void *)iov[i].iov_base;
2807 while (remaining) {
2808 RDMAControlHeader head;
2da776db 2809
f38f6d41
LC
2810 len = MIN(remaining, RDMA_SEND_INCREMENT);
2811 remaining -= len;
2da776db 2812
f38f6d41 2813 head.len = len;
6ddd2d76 2814 head.type = RDMA_CONTROL_QEMU_FILE;
2da776db 2815
6ddd2d76 2816 ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL);
2da776db 2817
6ddd2d76
DB
2818 if (ret < 0) {
2819 rdma->error_state = ret;
2820 return ret;
2821 }
2da776db 2822
f38f6d41
LC
2823 data += len;
2824 done += len;
6ddd2d76 2825 }
2da776db
MH
2826 }
2827
6ddd2d76 2828 return done;
2da776db
MH
2829}
2830
2831static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf,
a202a4c0 2832 size_t size, int idx)
2da776db
MH
2833{
2834 size_t len = 0;
2835
2836 if (rdma->wr_data[idx].control_len) {
733252de 2837 trace_qemu_rdma_fill(rdma->wr_data[idx].control_len, size);
2da776db
MH
2838
2839 len = MIN(size, rdma->wr_data[idx].control_len);
2840 memcpy(buf, rdma->wr_data[idx].control_curr, len);
2841 rdma->wr_data[idx].control_curr += len;
2842 rdma->wr_data[idx].control_len -= len;
2843 }
2844
2845 return len;
2846}
2847
2848/*
2849 * QEMUFile interface to the control channel.
2850 * RDMA links don't use bytestreams, so we have to
2851 * return bytes to QEMUFile opportunistically.
2852 */
6ddd2d76
DB
2853static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
2854 const struct iovec *iov,
2855 size_t niov,
2856 int **fds,
2857 size_t *nfds,
2858 Error **errp)
2859{
2860 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
74637e6f 2861 RDMAContext *rdma;
2da776db
MH
2862 RDMAControlHeader head;
2863 int ret = 0;
6ddd2d76
DB
2864 ssize_t i;
2865 size_t done = 0;
2da776db 2866
987ab2a5 2867 RCU_READ_LOCK_GUARD();
d73415a3 2868 rdma = qatomic_rcu_read(&rioc->rdmain);
74637e6f
LC
2869
2870 if (!rdma) {
74637e6f
LC
2871 return -EIO;
2872 }
2873
2da776db
MH
2874 CHECK_ERROR_STATE();
2875
6ddd2d76
DB
2876 for (i = 0; i < niov; i++) {
2877 size_t want = iov[i].iov_len;
2878 uint8_t *data = (void *)iov[i].iov_base;
2da776db 2879
6ddd2d76
DB
2880 /*
2881 * First, we hold on to the last SEND message we
2882 * were given and dish out the bytes until we run
2883 * out of bytes.
2884 */
74637e6f 2885 ret = qemu_rdma_fill(rdma, data, want, 0);
6ddd2d76
DB
2886 done += ret;
2887 want -= ret;
2888 /* Got what we needed, so go to next iovec */
2889 if (want == 0) {
2890 continue;
2891 }
2da776db 2892
6ddd2d76
DB
2893 /* If we got any data so far, then don't wait
2894 * for more, just return what we have */
2895 if (done > 0) {
2896 break;
2897 }
2da776db 2898
6ddd2d76
DB
2899
2900 /* We've got nothing at all, so lets wait for
2901 * more to arrive
2902 */
2903 ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE);
2904
2905 if (ret < 0) {
2906 rdma->error_state = ret;
2907 return ret;
2908 }
2909
2910 /*
2911 * SEND was received with new bytes, now try again.
2912 */
74637e6f 2913 ret = qemu_rdma_fill(rdma, data, want, 0);
6ddd2d76
DB
2914 done += ret;
2915 want -= ret;
2916
2917 /* Still didn't get enough, so lets just return */
2918 if (want) {
2919 if (done == 0) {
2920 return QIO_CHANNEL_ERR_BLOCK;
2921 } else {
2922 break;
2923 }
2924 }
2925 }
f38f6d41 2926 return done;
2da776db
MH
2927}
2928
2929/*
2930 * Block until all the outstanding chunks have been delivered by the hardware.
2931 */
2932static int qemu_rdma_drain_cq(QEMUFile *f, RDMAContext *rdma)
2933{
2934 int ret;
2935
2936 if (qemu_rdma_write_flush(f, rdma) < 0) {
2937 return -EIO;
2938 }
2939
2940 while (rdma->nb_sent) {
88571882 2941 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
2da776db 2942 if (ret < 0) {
733252de 2943 error_report("rdma migration: complete polling error!");
2da776db
MH
2944 return -EIO;
2945 }
2946 }
2947
2948 qemu_rdma_unregister_waiting(rdma);
2949
2950 return 0;
2951}
2952
6ddd2d76
DB
2953
2954static int qio_channel_rdma_set_blocking(QIOChannel *ioc,
2955 bool blocking,
2956 Error **errp)
2957{
2958 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2959 /* XXX we should make readv/writev actually honour this :-) */
2960 rioc->blocking = blocking;
2961 return 0;
2962}
2963
2964
2965typedef struct QIOChannelRDMASource QIOChannelRDMASource;
2966struct QIOChannelRDMASource {
2967 GSource parent;
2968 QIOChannelRDMA *rioc;
2969 GIOCondition condition;
2970};
2971
2972static gboolean
2973qio_channel_rdma_source_prepare(GSource *source,
2974 gint *timeout)
2975{
2976 QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
74637e6f 2977 RDMAContext *rdma;
6ddd2d76
DB
2978 GIOCondition cond = 0;
2979 *timeout = -1;
2980
987ab2a5 2981 RCU_READ_LOCK_GUARD();
74637e6f 2982 if (rsource->condition == G_IO_IN) {
d73415a3 2983 rdma = qatomic_rcu_read(&rsource->rioc->rdmain);
74637e6f 2984 } else {
d73415a3 2985 rdma = qatomic_rcu_read(&rsource->rioc->rdmaout);
74637e6f
LC
2986 }
2987
2988 if (!rdma) {
2989 error_report("RDMAContext is NULL when prepare Gsource");
74637e6f
LC
2990 return FALSE;
2991 }
2992
6ddd2d76
DB
2993 if (rdma->wr_data[0].control_len) {
2994 cond |= G_IO_IN;
2995 }
2996 cond |= G_IO_OUT;
2997
2998 return cond & rsource->condition;
2999}
3000
3001static gboolean
3002qio_channel_rdma_source_check(GSource *source)
3003{
3004 QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
74637e6f 3005 RDMAContext *rdma;
6ddd2d76
DB
3006 GIOCondition cond = 0;
3007
987ab2a5 3008 RCU_READ_LOCK_GUARD();
74637e6f 3009 if (rsource->condition == G_IO_IN) {
d73415a3 3010 rdma = qatomic_rcu_read(&rsource->rioc->rdmain);
74637e6f 3011 } else {
d73415a3 3012 rdma = qatomic_rcu_read(&rsource->rioc->rdmaout);
74637e6f
LC
3013 }
3014
3015 if (!rdma) {
3016 error_report("RDMAContext is NULL when check Gsource");
74637e6f
LC
3017 return FALSE;
3018 }
3019
6ddd2d76
DB
3020 if (rdma->wr_data[0].control_len) {
3021 cond |= G_IO_IN;
3022 }
3023 cond |= G_IO_OUT;
3024
3025 return cond & rsource->condition;
3026}
3027
3028static gboolean
3029qio_channel_rdma_source_dispatch(GSource *source,
3030 GSourceFunc callback,
3031 gpointer user_data)
3032{
3033 QIOChannelFunc func = (QIOChannelFunc)callback;
3034 QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
74637e6f 3035 RDMAContext *rdma;
6ddd2d76
DB
3036 GIOCondition cond = 0;
3037
987ab2a5 3038 RCU_READ_LOCK_GUARD();
74637e6f 3039 if (rsource->condition == G_IO_IN) {
d73415a3 3040 rdma = qatomic_rcu_read(&rsource->rioc->rdmain);
74637e6f 3041 } else {
d73415a3 3042 rdma = qatomic_rcu_read(&rsource->rioc->rdmaout);
74637e6f
LC
3043 }
3044
3045 if (!rdma) {
3046 error_report("RDMAContext is NULL when dispatch Gsource");
74637e6f
LC
3047 return FALSE;
3048 }
3049
6ddd2d76
DB
3050 if (rdma->wr_data[0].control_len) {
3051 cond |= G_IO_IN;
3052 }
3053 cond |= G_IO_OUT;
3054
3055 return (*func)(QIO_CHANNEL(rsource->rioc),
3056 (cond & rsource->condition),
3057 user_data);
3058}
3059
3060static void
3061qio_channel_rdma_source_finalize(GSource *source)
3062{
3063 QIOChannelRDMASource *ssource = (QIOChannelRDMASource *)source;
3064
3065 object_unref(OBJECT(ssource->rioc));
3066}
3067
3068GSourceFuncs qio_channel_rdma_source_funcs = {
3069 qio_channel_rdma_source_prepare,
3070 qio_channel_rdma_source_check,
3071 qio_channel_rdma_source_dispatch,
3072 qio_channel_rdma_source_finalize
3073};
3074
3075static GSource *qio_channel_rdma_create_watch(QIOChannel *ioc,
3076 GIOCondition condition)
2da776db 3077{
6ddd2d76
DB
3078 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
3079 QIOChannelRDMASource *ssource;
3080 GSource *source;
3081
3082 source = g_source_new(&qio_channel_rdma_source_funcs,
3083 sizeof(QIOChannelRDMASource));
3084 ssource = (QIOChannelRDMASource *)source;
3085
3086 ssource->rioc = rioc;
3087 object_ref(OBJECT(rioc));
3088
3089 ssource->condition = condition;
3090
3091 return source;
3092}
3093
4d9f675b
LC
3094static void qio_channel_rdma_set_aio_fd_handler(QIOChannel *ioc,
3095 AioContext *ctx,
3096 IOHandler *io_read,
3097 IOHandler *io_write,
3098 void *opaque)
3099{
3100 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
3101 if (io_read) {
b390afd8 3102 aio_set_fd_handler(ctx, rioc->rdmain->recv_comp_channel->fd,
826cc324 3103 false, io_read, io_write, NULL, NULL, opaque);
b390afd8 3104 aio_set_fd_handler(ctx, rioc->rdmain->send_comp_channel->fd,
826cc324 3105 false, io_read, io_write, NULL, NULL, opaque);
4d9f675b 3106 } else {
b390afd8 3107 aio_set_fd_handler(ctx, rioc->rdmaout->recv_comp_channel->fd,
826cc324 3108 false, io_read, io_write, NULL, NULL, opaque);
b390afd8 3109 aio_set_fd_handler(ctx, rioc->rdmaout->send_comp_channel->fd,
826cc324 3110 false, io_read, io_write, NULL, NULL, opaque);
4d9f675b
LC
3111 }
3112}
6ddd2d76 3113
d46a4847
DDAG
3114struct rdma_close_rcu {
3115 struct rcu_head rcu;
3116 RDMAContext *rdmain;
3117 RDMAContext *rdmaout;
3118};
3119
3120/* callback from qio_channel_rdma_close via call_rcu */
3121static void qio_channel_rdma_close_rcu(struct rdma_close_rcu *rcu)
3122{
3123 if (rcu->rdmain) {
3124 qemu_rdma_cleanup(rcu->rdmain);
3125 }
3126
3127 if (rcu->rdmaout) {
3128 qemu_rdma_cleanup(rcu->rdmaout);
3129 }
3130
3131 g_free(rcu->rdmain);
3132 g_free(rcu->rdmaout);
3133 g_free(rcu);
3134}
3135
6ddd2d76
DB
3136static int qio_channel_rdma_close(QIOChannel *ioc,
3137 Error **errp)
3138{
3139 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
74637e6f 3140 RDMAContext *rdmain, *rdmaout;
d46a4847
DDAG
3141 struct rdma_close_rcu *rcu = g_new(struct rdma_close_rcu, 1);
3142
733252de 3143 trace_qemu_rdma_close();
74637e6f
LC
3144
3145 rdmain = rioc->rdmain;
3146 if (rdmain) {
d73415a3 3147 qatomic_rcu_set(&rioc->rdmain, NULL);
74637e6f
LC
3148 }
3149
3150 rdmaout = rioc->rdmaout;
3151 if (rdmaout) {
d73415a3 3152 qatomic_rcu_set(&rioc->rdmaout, NULL);
2da776db 3153 }
74637e6f 3154
d46a4847
DDAG
3155 rcu->rdmain = rdmain;
3156 rcu->rdmaout = rdmaout;
3157 call_rcu(rcu, qio_channel_rdma_close_rcu, rcu);
74637e6f 3158
2da776db
MH
3159 return 0;
3160}
3161
54db882f
LC
3162static int
3163qio_channel_rdma_shutdown(QIOChannel *ioc,
3164 QIOChannelShutdown how,
3165 Error **errp)
3166{
3167 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
3168 RDMAContext *rdmain, *rdmaout;
3169
987ab2a5 3170 RCU_READ_LOCK_GUARD();
54db882f 3171
d73415a3
SH
3172 rdmain = qatomic_rcu_read(&rioc->rdmain);
3173 rdmaout = qatomic_rcu_read(&rioc->rdmain);
54db882f
LC
3174
3175 switch (how) {
3176 case QIO_CHANNEL_SHUTDOWN_READ:
3177 if (rdmain) {
3178 rdmain->error_state = -1;
3179 }
3180 break;
3181 case QIO_CHANNEL_SHUTDOWN_WRITE:
3182 if (rdmaout) {
3183 rdmaout->error_state = -1;
3184 }
3185 break;
3186 case QIO_CHANNEL_SHUTDOWN_BOTH:
3187 default:
3188 if (rdmain) {
3189 rdmain->error_state = -1;
3190 }
3191 if (rdmaout) {
3192 rdmaout->error_state = -1;
3193 }
3194 break;
3195 }
3196
54db882f
LC
3197 return 0;
3198}
3199
2da776db
MH
3200/*
3201 * Parameters:
3202 * @offset == 0 :
3203 * This means that 'block_offset' is a full virtual address that does not
3204 * belong to a RAMBlock of the virtual machine and instead
3205 * represents a private malloc'd memory area that the caller wishes to
3206 * transfer.
3207 *
3208 * @offset != 0 :
3209 * Offset is an offset to be added to block_offset and used
3210 * to also lookup the corresponding RAMBlock.
3211 *
246683c2 3212 * @size : Number of bytes to transfer
2da776db
MH
3213 *
3214 * @bytes_sent : User-specificed pointer to indicate how many bytes were
3215 * sent. Usually, this will not be more than a few bytes of
3216 * the protocol because most transfers are sent asynchronously.
3217 */
365c0463 3218static size_t qemu_rdma_save_page(QEMUFile *f,
2da776db 3219 ram_addr_t block_offset, ram_addr_t offset,
6e1dea46 3220 size_t size, uint64_t *bytes_sent)
2da776db 3221{
365c0463 3222 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
74637e6f 3223 RDMAContext *rdma;
2da776db
MH
3224 int ret;
3225
987ab2a5 3226 RCU_READ_LOCK_GUARD();
d73415a3 3227 rdma = qatomic_rcu_read(&rioc->rdmaout);
74637e6f
LC
3228
3229 if (!rdma) {
74637e6f
LC
3230 return -EIO;
3231 }
3232
2da776db
MH
3233 CHECK_ERROR_STATE();
3234
6a88eb2b 3235 if (migration_in_postcopy()) {
ccb7e1b5
LC
3236 return RAM_SAVE_CONTROL_NOT_SUPP;
3237 }
3238
2da776db
MH
3239 qemu_fflush(f);
3240
246683c2
DB
3241 /*
3242 * Add this page to the current 'chunk'. If the chunk
3243 * is full, or the page doesn't belong to the current chunk,
3244 * an actual RDMA write will occur and a new chunk will be formed.
3245 */
3246 ret = qemu_rdma_write(f, rdma, block_offset, offset, size);
3247 if (ret < 0) {
3248 error_report("rdma migration: write error! %d", ret);
3249 goto err;
3250 }
2da776db 3251
246683c2
DB
3252 /*
3253 * We always return 1 bytes because the RDMA
3254 * protocol is completely asynchronous. We do not yet know
3255 * whether an identified chunk is zero or not because we're
3256 * waiting for other pages to potentially be merged with
3257 * the current chunk. So, we have to call qemu_update_position()
3258 * later on when the actual write occurs.
3259 */
3260 if (bytes_sent) {
3261 *bytes_sent = 1;
2da776db
MH
3262 }
3263
3264 /*
3265 * Drain the Completion Queue if possible, but do not block,
3266 * just poll.
3267 *
3268 * If nothing to poll, the end of the iteration will do this
3269 * again to make sure we don't overflow the request queue.
3270 */
3271 while (1) {
3272 uint64_t wr_id, wr_id_in;
b390afd8
LZ
3273 int ret = qemu_rdma_poll(rdma, rdma->recv_cq, &wr_id_in, NULL);
3274 if (ret < 0) {
3275 error_report("rdma migration: polling error! %d", ret);
3276 goto err;
3277 }
3278
3279 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
3280
3281 if (wr_id == RDMA_WRID_NONE) {
3282 break;
3283 }
3284 }
3285
3286 while (1) {
3287 uint64_t wr_id, wr_id_in;
3288 int ret = qemu_rdma_poll(rdma, rdma->send_cq, &wr_id_in, NULL);
2da776db 3289 if (ret < 0) {
733252de 3290 error_report("rdma migration: polling error! %d", ret);
2da776db
MH
3291 goto err;
3292 }
3293
3294 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
3295
3296 if (wr_id == RDMA_WRID_NONE) {
3297 break;
3298 }
3299 }
3300
3301 return RAM_SAVE_CONTROL_DELAYED;
3302err:
3303 rdma->error_state = ret;
3304 return ret;
3305}
3306
55cc1b59
LC
3307static void rdma_accept_incoming_migration(void *opaque);
3308
92370989
LC
3309static void rdma_cm_poll_handler(void *opaque)
3310{
3311 RDMAContext *rdma = opaque;
3312 int ret;
3313 struct rdma_cm_event *cm_event;
3314 MigrationIncomingState *mis = migration_incoming_get_current();
3315
3316 ret = rdma_get_cm_event(rdma->channel, &cm_event);
3317 if (ret) {
3318 error_report("get_cm_event failed %d", errno);
3319 return;
3320 }
92370989
LC
3321
3322 if (cm_event->event == RDMA_CM_EVENT_DISCONNECTED ||
3323 cm_event->event == RDMA_CM_EVENT_DEVICE_REMOVAL) {
de8434a3
DDAG
3324 if (!rdma->error_state &&
3325 migration_incoming_get_current()->state !=
3326 MIGRATION_STATUS_COMPLETED) {
3327 error_report("receive cm event, cm event is %d", cm_event->event);
3328 rdma->error_state = -EPIPE;
3329 if (rdma->return_path) {
3330 rdma->return_path->error_state = -EPIPE;
3331 }
92370989 3332 }
6b8c2eb5 3333 rdma_ack_cm_event(cm_event);
92370989
LC
3334
3335 if (mis->migration_incoming_co) {
3336 qemu_coroutine_enter(mis->migration_incoming_co);
3337 }
3338 return;
3339 }
6b8c2eb5 3340 rdma_ack_cm_event(cm_event);
92370989
LC
3341}
3342
2da776db
MH
3343static int qemu_rdma_accept(RDMAContext *rdma)
3344{
3345 RDMACapabilities cap;
3346 struct rdma_conn_param conn_param = {
3347 .responder_resources = 2,
3348 .private_data = &cap,
3349 .private_data_len = sizeof(cap),
3350 };
44bcfd45 3351 RDMAContext *rdma_return_path = NULL;
2da776db
MH
3352 struct rdma_cm_event *cm_event;
3353 struct ibv_context *verbs;
3354 int ret = -EINVAL;
3355 int idx;
3356
3357 ret = rdma_get_cm_event(rdma->channel, &cm_event);
3358 if (ret) {
3359 goto err_rdma_dest_wait;
3360 }
3361
3362 if (cm_event->event != RDMA_CM_EVENT_CONNECT_REQUEST) {
3363 rdma_ack_cm_event(cm_event);
3364 goto err_rdma_dest_wait;
3365 }
3366
44bcfd45
LZ
3367 /*
3368 * initialize the RDMAContext for return path for postcopy after first
3369 * connection request reached.
3370 */
3371 if (migrate_postcopy() && !rdma->is_return_path) {
3372 rdma_return_path = qemu_rdma_data_init(rdma->host_port, NULL);
3373 if (rdma_return_path == NULL) {
3374 rdma_ack_cm_event(cm_event);
3375 goto err_rdma_dest_wait;
3376 }
3377
3378 qemu_rdma_return_path_dest_init(rdma_return_path, rdma);
3379 }
3380
2da776db
MH
3381 memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
3382
3383 network_to_caps(&cap);
3384
3385 if (cap.version < 1 || cap.version > RDMA_CONTROL_VERSION_CURRENT) {
733252de 3386 error_report("Unknown source RDMA version: %d, bailing...",
2da776db
MH
3387 cap.version);
3388 rdma_ack_cm_event(cm_event);
3389 goto err_rdma_dest_wait;
3390 }
3391
3392 /*
3393 * Respond with only the capabilities this version of QEMU knows about.
3394 */
3395 cap.flags &= known_capabilities;
3396
3397 /*
3398 * Enable the ones that we do know about.
3399 * Add other checks here as new ones are introduced.
3400 */
3401 if (cap.flags & RDMA_CAPABILITY_PIN_ALL) {
3402 rdma->pin_all = true;
3403 }
3404
3405 rdma->cm_id = cm_event->id;
3406 verbs = cm_event->id->verbs;
3407
3408 rdma_ack_cm_event(cm_event);
3409
733252de 3410 trace_qemu_rdma_accept_pin_state(rdma->pin_all);
2da776db
MH
3411
3412 caps_to_network(&cap);
3413
733252de 3414 trace_qemu_rdma_accept_pin_verbsc(verbs);
2da776db
MH
3415
3416 if (!rdma->verbs) {
3417 rdma->verbs = verbs;
3418 } else if (rdma->verbs != verbs) {
733252de
DDAG
3419 error_report("ibv context not matching %p, %p!", rdma->verbs,
3420 verbs);
2da776db
MH
3421 goto err_rdma_dest_wait;
3422 }
3423
3424 qemu_rdma_dump_id("dest_init", verbs);
3425
3426 ret = qemu_rdma_alloc_pd_cq(rdma);
3427 if (ret) {
733252de 3428 error_report("rdma migration: error allocating pd and cq!");
2da776db
MH
3429 goto err_rdma_dest_wait;
3430 }
3431
3432 ret = qemu_rdma_alloc_qp(rdma);
3433 if (ret) {
733252de 3434 error_report("rdma migration: error allocating qp!");
2da776db
MH
3435 goto err_rdma_dest_wait;
3436 }
3437
3438 ret = qemu_rdma_init_ram_blocks(rdma);
3439 if (ret) {
733252de 3440 error_report("rdma migration: error initializing ram blocks!");
2da776db
MH
3441 goto err_rdma_dest_wait;
3442 }
3443
1f22364b 3444 for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2da776db
MH
3445 ret = qemu_rdma_reg_control(rdma, idx);
3446 if (ret) {
733252de 3447 error_report("rdma: error registering %d control", idx);
2da776db
MH
3448 goto err_rdma_dest_wait;
3449 }
3450 }
3451
55cc1b59
LC
3452 /* Accept the second connection request for return path */
3453 if (migrate_postcopy() && !rdma->is_return_path) {
3454 qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
3455 NULL,
3456 (void *)(intptr_t)rdma->return_path);
3457 } else {
92370989
LC
3458 qemu_set_fd_handler(rdma->channel->fd, rdma_cm_poll_handler,
3459 NULL, rdma);
55cc1b59 3460 }
2da776db
MH
3461
3462 ret = rdma_accept(rdma->cm_id, &conn_param);
3463 if (ret) {
733252de 3464 error_report("rdma_accept returns %d", ret);
2da776db
MH
3465 goto err_rdma_dest_wait;
3466 }
3467
3468 ret = rdma_get_cm_event(rdma->channel, &cm_event);
3469 if (ret) {
733252de 3470 error_report("rdma_accept get_cm_event failed %d", ret);
2da776db
MH
3471 goto err_rdma_dest_wait;
3472 }
3473
3474 if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
733252de 3475 error_report("rdma_accept not event established");
2da776db
MH
3476 rdma_ack_cm_event(cm_event);
3477 goto err_rdma_dest_wait;
3478 }
3479
3480 rdma_ack_cm_event(cm_event);
5a91337c 3481 rdma->connected = true;
2da776db 3482
87772639 3483 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
2da776db 3484 if (ret) {
733252de 3485 error_report("rdma migration: error posting second control recv");
2da776db
MH
3486 goto err_rdma_dest_wait;
3487 }
3488
3489 qemu_rdma_dump_gid("dest_connect", rdma->cm_id);
3490
3491 return 0;
3492
3493err_rdma_dest_wait:
3494 rdma->error_state = ret;
3495 qemu_rdma_cleanup(rdma);
44bcfd45 3496 g_free(rdma_return_path);
2da776db
MH
3497 return ret;
3498}
3499
e4d63320
DDAG
3500static int dest_ram_sort_func(const void *a, const void *b)
3501{
3502 unsigned int a_index = ((const RDMALocalBlock *)a)->src_index;
3503 unsigned int b_index = ((const RDMALocalBlock *)b)->src_index;
3504
3505 return (a_index < b_index) ? -1 : (a_index != b_index);
3506}
3507
2da776db
MH
3508/*
3509 * During each iteration of the migration, we listen for instructions
3510 * by the source VM to perform dynamic page registrations before they
3511 * can perform RDMA operations.
3512 *
3513 * We respond with the 'rkey'.
3514 *
3515 * Keep doing this until the source tells us to stop.
3516 */
632e3a5c 3517static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque)
2da776db
MH
3518{
3519 RDMAControlHeader reg_resp = { .len = sizeof(RDMARegisterResult),
3520 .type = RDMA_CONTROL_REGISTER_RESULT,
3521 .repeat = 0,
3522 };
3523 RDMAControlHeader unreg_resp = { .len = 0,
3524 .type = RDMA_CONTROL_UNREGISTER_FINISHED,
3525 .repeat = 0,
3526 };
3527 RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT,
3528 .repeat = 1 };
6ddd2d76 3529 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
74637e6f
LC
3530 RDMAContext *rdma;
3531 RDMALocalBlocks *local;
2da776db
MH
3532 RDMAControlHeader head;
3533 RDMARegister *reg, *registers;
3534 RDMACompress *comp;
3535 RDMARegisterResult *reg_result;
3536 static RDMARegisterResult results[RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE];
3537 RDMALocalBlock *block;
3538 void *host_addr;
3539 int ret = 0;
3540 int idx = 0;
3541 int count = 0;
3542 int i = 0;
3543
987ab2a5 3544 RCU_READ_LOCK_GUARD();
d73415a3 3545 rdma = qatomic_rcu_read(&rioc->rdmain);
74637e6f
LC
3546
3547 if (!rdma) {
74637e6f
LC
3548 return -EIO;
3549 }
3550
2da776db
MH
3551 CHECK_ERROR_STATE();
3552
74637e6f 3553 local = &rdma->local_ram_blocks;
2da776db 3554 do {
632e3a5c 3555 trace_qemu_rdma_registration_handle_wait();
2da776db
MH
3556
3557 ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_NONE);
3558
3559 if (ret < 0) {
3560 break;
3561 }
3562
3563 if (head.repeat > RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE) {
733252de
DDAG
3564 error_report("rdma: Too many requests in this message (%d)."
3565 "Bailing.", head.repeat);
2da776db
MH
3566 ret = -EIO;
3567 break;
3568 }
3569
3570 switch (head.type) {
3571 case RDMA_CONTROL_COMPRESS:
3572 comp = (RDMACompress *) rdma->wr_data[idx].control_curr;
3573 network_to_compress(comp);
3574
733252de
DDAG
3575 trace_qemu_rdma_registration_handle_compress(comp->length,
3576 comp->block_idx,
3577 comp->offset);
afcddefd
DDAG
3578 if (comp->block_idx >= rdma->local_ram_blocks.nb_blocks) {
3579 error_report("rdma: 'compress' bad block index %u (vs %d)",
3580 (unsigned int)comp->block_idx,
3581 rdma->local_ram_blocks.nb_blocks);
3582 ret = -EIO;
24b41d66 3583 goto out;
afcddefd 3584 }
2da776db
MH
3585 block = &(rdma->local_ram_blocks.block[comp->block_idx]);
3586
3587 host_addr = block->local_host_addr +
3588 (comp->offset - block->offset);
3589
3590 ram_handle_compressed(host_addr, comp->value, comp->length);
3591 break;
3592
3593 case RDMA_CONTROL_REGISTER_FINISHED:
733252de 3594 trace_qemu_rdma_registration_handle_finished();
2da776db
MH
3595 goto out;
3596
3597 case RDMA_CONTROL_RAM_BLOCKS_REQUEST:
733252de 3598 trace_qemu_rdma_registration_handle_ram_blocks();
2da776db 3599
e4d63320
DDAG
3600 /* Sort our local RAM Block list so it's the same as the source,
3601 * we can do this since we've filled in a src_index in the list
3602 * as we received the RAMBlock list earlier.
3603 */
3604 qsort(rdma->local_ram_blocks.block,
3605 rdma->local_ram_blocks.nb_blocks,
3606 sizeof(RDMALocalBlock), dest_ram_sort_func);
71cd7306
LC
3607 for (i = 0; i < local->nb_blocks; i++) {
3608 local->block[i].index = i;
3609 }
3610
2da776db
MH
3611 if (rdma->pin_all) {
3612 ret = qemu_rdma_reg_whole_ram_blocks(rdma);
3613 if (ret) {
733252de
DDAG
3614 error_report("rdma migration: error dest "
3615 "registering ram blocks");
2da776db
MH
3616 goto out;
3617 }
3618 }
3619
3620 /*
3621 * Dest uses this to prepare to transmit the RAMBlock descriptions
3622 * to the source VM after connection setup.
3623 * Both sides use the "remote" structure to communicate and update
3624 * their "local" descriptions with what was sent.
3625 */
3626 for (i = 0; i < local->nb_blocks; i++) {
a97270ad 3627 rdma->dest_blocks[i].remote_host_addr =
fbce8c25 3628 (uintptr_t)(local->block[i].local_host_addr);
2da776db
MH
3629
3630 if (rdma->pin_all) {
a97270ad 3631 rdma->dest_blocks[i].remote_rkey = local->block[i].mr->rkey;
2da776db
MH
3632 }
3633
a97270ad
DDAG
3634 rdma->dest_blocks[i].offset = local->block[i].offset;
3635 rdma->dest_blocks[i].length = local->block[i].length;
2da776db 3636
a97270ad 3637 dest_block_to_network(&rdma->dest_blocks[i]);
e4d63320
DDAG
3638 trace_qemu_rdma_registration_handle_ram_blocks_loop(
3639 local->block[i].block_name,
3640 local->block[i].offset,
3641 local->block[i].length,
3642 local->block[i].local_host_addr,
3643 local->block[i].src_index);
2da776db
MH
3644 }
3645
3646 blocks.len = rdma->local_ram_blocks.nb_blocks
a97270ad 3647 * sizeof(RDMADestBlock);
2da776db
MH
3648
3649
3650 ret = qemu_rdma_post_send_control(rdma,
a97270ad 3651 (uint8_t *) rdma->dest_blocks, &blocks);
2da776db
MH
3652
3653 if (ret < 0) {
733252de 3654 error_report("rdma migration: error sending remote info");
2da776db
MH
3655 goto out;
3656 }
3657
3658 break;
3659 case RDMA_CONTROL_REGISTER_REQUEST:
733252de 3660 trace_qemu_rdma_registration_handle_register(head.repeat);
2da776db
MH
3661
3662 reg_resp.repeat = head.repeat;
3663 registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
3664
3665 for (count = 0; count < head.repeat; count++) {
3666 uint64_t chunk;
3667 uint8_t *chunk_start, *chunk_end;
3668
3669 reg = &registers[count];
3670 network_to_register(reg);
3671
3672 reg_result = &results[count];
3673
733252de 3674 trace_qemu_rdma_registration_handle_register_loop(count,
2da776db
MH
3675 reg->current_index, reg->key.current_addr, reg->chunks);
3676
afcddefd
DDAG
3677 if (reg->current_index >= rdma->local_ram_blocks.nb_blocks) {
3678 error_report("rdma: 'register' bad block index %u (vs %d)",
3679 (unsigned int)reg->current_index,
3680 rdma->local_ram_blocks.nb_blocks);
3681 ret = -ENOENT;
24b41d66 3682 goto out;
afcddefd 3683 }
2da776db
MH
3684 block = &(rdma->local_ram_blocks.block[reg->current_index]);
3685 if (block->is_ram_block) {
afcddefd
DDAG
3686 if (block->offset > reg->key.current_addr) {
3687 error_report("rdma: bad register address for block %s"
3688 " offset: %" PRIx64 " current_addr: %" PRIx64,
3689 block->block_name, block->offset,
3690 reg->key.current_addr);
3691 ret = -ERANGE;
24b41d66 3692 goto out;
afcddefd 3693 }
2da776db
MH
3694 host_addr = (block->local_host_addr +
3695 (reg->key.current_addr - block->offset));
3696 chunk = ram_chunk_index(block->local_host_addr,
3697 (uint8_t *) host_addr);
3698 } else {
3699 chunk = reg->key.chunk;
3700 host_addr = block->local_host_addr +
3701 (reg->key.chunk * (1UL << RDMA_REG_CHUNK_SHIFT));
afcddefd
DDAG
3702 /* Check for particularly bad chunk value */
3703 if (host_addr < (void *)block->local_host_addr) {
3704 error_report("rdma: bad chunk for block %s"
3705 " chunk: %" PRIx64,
3706 block->block_name, reg->key.chunk);
3707 ret = -ERANGE;
24b41d66 3708 goto out;
afcddefd 3709 }
2da776db
MH
3710 }
3711 chunk_start = ram_chunk_start(block, chunk);
3712 chunk_end = ram_chunk_end(block, chunk + reg->chunks);
9589e763
MA
3713 /* avoid "-Waddress-of-packed-member" warning */
3714 uint32_t tmp_rkey = 0;
2da776db 3715 if (qemu_rdma_register_and_get_keys(rdma, block,
9589e763 3716 (uintptr_t)host_addr, NULL, &tmp_rkey,
2da776db 3717 chunk, chunk_start, chunk_end)) {
733252de 3718 error_report("cannot get rkey");
2da776db
MH
3719 ret = -EINVAL;
3720 goto out;
3721 }
9589e763 3722 reg_result->rkey = tmp_rkey;
2da776db 3723
fbce8c25 3724 reg_result->host_addr = (uintptr_t)block->local_host_addr;
2da776db 3725
733252de
DDAG
3726 trace_qemu_rdma_registration_handle_register_rkey(
3727 reg_result->rkey);
2da776db
MH
3728
3729 result_to_network(reg_result);
3730 }
3731
3732 ret = qemu_rdma_post_send_control(rdma,
3733 (uint8_t *) results, &reg_resp);
3734
3735 if (ret < 0) {
733252de 3736 error_report("Failed to send control buffer");
2da776db
MH
3737 goto out;
3738 }
3739 break;
3740 case RDMA_CONTROL_UNREGISTER_REQUEST:
733252de 3741 trace_qemu_rdma_registration_handle_unregister(head.repeat);
2da776db
MH
3742 unreg_resp.repeat = head.repeat;
3743 registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
3744
3745 for (count = 0; count < head.repeat; count++) {
3746 reg = &registers[count];
3747 network_to_register(reg);
3748
733252de
DDAG
3749 trace_qemu_rdma_registration_handle_unregister_loop(count,
3750 reg->current_index, reg->key.chunk);
2da776db
MH
3751
3752 block = &(rdma->local_ram_blocks.block[reg->current_index]);
3753
3754 ret = ibv_dereg_mr(block->pmr[reg->key.chunk]);
3755 block->pmr[reg->key.chunk] = NULL;
3756
3757 if (ret != 0) {
3758 perror("rdma unregistration chunk failed");
3759 ret = -ret;
3760 goto out;
3761 }
3762
3763 rdma->total_registrations--;
3764
733252de
DDAG
3765 trace_qemu_rdma_registration_handle_unregister_success(
3766 reg->key.chunk);
2da776db
MH
3767 }
3768
3769 ret = qemu_rdma_post_send_control(rdma, NULL, &unreg_resp);
3770
3771 if (ret < 0) {
733252de 3772 error_report("Failed to send control buffer");
2da776db
MH
3773 goto out;
3774 }
3775 break;
3776 case RDMA_CONTROL_REGISTER_RESULT:
733252de 3777 error_report("Invalid RESULT message at dest.");
2da776db
MH
3778 ret = -EIO;
3779 goto out;
3780 default:
482a33c5 3781 error_report("Unknown control message %s", control_desc(head.type));
2da776db
MH
3782 ret = -EIO;
3783 goto out;
3784 }
3785 } while (1);
3786out:
3787 if (ret < 0) {
3788 rdma->error_state = ret;
3789 }
3790 return ret;
3791}
3792
e4d63320
DDAG
3793/* Destination:
3794 * Called via a ram_control_load_hook during the initial RAM load section which
3795 * lists the RAMBlocks by name. This lets us know the order of the RAMBlocks
3796 * on the source.
3797 * We've already built our local RAMBlock list, but not yet sent the list to
3798 * the source.
3799 */
6ddd2d76
DB
3800static int
3801rdma_block_notification_handle(QIOChannelRDMA *rioc, const char *name)
e4d63320 3802{
74637e6f 3803 RDMAContext *rdma;
e4d63320
DDAG
3804 int curr;
3805 int found = -1;
3806
987ab2a5 3807 RCU_READ_LOCK_GUARD();
d73415a3 3808 rdma = qatomic_rcu_read(&rioc->rdmain);
74637e6f
LC
3809
3810 if (!rdma) {
74637e6f
LC
3811 return -EIO;
3812 }
3813
e4d63320
DDAG
3814 /* Find the matching RAMBlock in our local list */
3815 for (curr = 0; curr < rdma->local_ram_blocks.nb_blocks; curr++) {
3816 if (!strcmp(rdma->local_ram_blocks.block[curr].block_name, name)) {
3817 found = curr;
3818 break;
3819 }
3820 }
3821
3822 if (found == -1) {
3823 error_report("RAMBlock '%s' not found on destination", name);
3824 return -ENOENT;
3825 }
3826
3827 rdma->local_ram_blocks.block[curr].src_index = rdma->next_src_index;
3828 trace_rdma_block_notification_handle(name, rdma->next_src_index);
3829 rdma->next_src_index++;
3830
3831 return 0;
3832}
3833
365c0463 3834static int rdma_load_hook(QEMUFile *f, uint64_t flags, void *data)
632e3a5c 3835{
365c0463 3836 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
632e3a5c
DDAG
3837 switch (flags) {
3838 case RAM_CONTROL_BLOCK_REG:
365c0463 3839 return rdma_block_notification_handle(rioc, data);
632e3a5c
DDAG
3840
3841 case RAM_CONTROL_HOOK:
365c0463 3842 return qemu_rdma_registration_handle(f, rioc);
632e3a5c
DDAG
3843
3844 default:
3845 /* Shouldn't be called with any other values */
3846 abort();
3847 }
3848}
3849
365c0463 3850static int qemu_rdma_registration_start(QEMUFile *f,
632e3a5c 3851 uint64_t flags, void *data)
2da776db 3852{
365c0463 3853 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
74637e6f
LC
3854 RDMAContext *rdma;
3855
987ab2a5 3856 RCU_READ_LOCK_GUARD();
d73415a3 3857 rdma = qatomic_rcu_read(&rioc->rdmaout);
74637e6f 3858 if (!rdma) {
74637e6f
LC
3859 return -EIO;
3860 }
2da776db
MH
3861
3862 CHECK_ERROR_STATE();
3863
6a88eb2b 3864 if (migration_in_postcopy()) {
ccb7e1b5
LC
3865 return 0;
3866 }
3867
733252de 3868 trace_qemu_rdma_registration_start(flags);
2da776db
MH
3869 qemu_put_be64(f, RAM_SAVE_FLAG_HOOK);
3870 qemu_fflush(f);
3871
3872 return 0;
3873}
3874
3875/*
3876 * Inform dest that dynamic registrations are done for now.
3877 * First, flush writes, if any.
3878 */
365c0463 3879static int qemu_rdma_registration_stop(QEMUFile *f,
632e3a5c 3880 uint64_t flags, void *data)
2da776db 3881{
365c0463 3882 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
74637e6f 3883 RDMAContext *rdma;
2da776db
MH
3884 RDMAControlHeader head = { .len = 0, .repeat = 1 };
3885 int ret = 0;
3886
987ab2a5 3887 RCU_READ_LOCK_GUARD();
d73415a3 3888 rdma = qatomic_rcu_read(&rioc->rdmaout);
74637e6f 3889 if (!rdma) {
74637e6f
LC
3890 return -EIO;
3891 }
3892
2da776db
MH
3893 CHECK_ERROR_STATE();
3894
6a88eb2b 3895 if (migration_in_postcopy()) {
ccb7e1b5
LC
3896 return 0;
3897 }
3898
2da776db
MH
3899 qemu_fflush(f);
3900 ret = qemu_rdma_drain_cq(f, rdma);
3901
3902 if (ret < 0) {
3903 goto err;
3904 }
3905
3906 if (flags == RAM_CONTROL_SETUP) {
3907 RDMAControlHeader resp = {.type = RDMA_CONTROL_RAM_BLOCKS_RESULT };
3908 RDMALocalBlocks *local = &rdma->local_ram_blocks;
e4d63320 3909 int reg_result_idx, i, nb_dest_blocks;
2da776db
MH
3910
3911 head.type = RDMA_CONTROL_RAM_BLOCKS_REQUEST;
733252de 3912 trace_qemu_rdma_registration_stop_ram();
2da776db
MH
3913
3914 /*
3915 * Make sure that we parallelize the pinning on both sides.
3916 * For very large guests, doing this serially takes a really
3917 * long time, so we have to 'interleave' the pinning locally
3918 * with the control messages by performing the pinning on this
3919 * side before we receive the control response from the other
3920 * side that the pinning has completed.
3921 */
3922 ret = qemu_rdma_exchange_send(rdma, &head, NULL, &resp,
3923 &reg_result_idx, rdma->pin_all ?
3924 qemu_rdma_reg_whole_ram_blocks : NULL);
3925 if (ret < 0) {
9cde9caa 3926 fprintf(stderr, "receiving remote info!");
2da776db
MH
3927 return ret;
3928 }
3929
a97270ad 3930 nb_dest_blocks = resp.len / sizeof(RDMADestBlock);
2da776db
MH
3931
3932 /*
3933 * The protocol uses two different sets of rkeys (mutually exclusive):
3934 * 1. One key to represent the virtual address of the entire ram block.
3935 * (dynamic chunk registration disabled - pin everything with one rkey.)
3936 * 2. One to represent individual chunks within a ram block.
3937 * (dynamic chunk registration enabled - pin individual chunks.)
3938 *
3939 * Once the capability is successfully negotiated, the destination transmits
3940 * the keys to use (or sends them later) including the virtual addresses
3941 * and then propagates the remote ram block descriptions to his local copy.
3942 */
3943
a97270ad 3944 if (local->nb_blocks != nb_dest_blocks) {
9cde9caa
MA
3945 fprintf(stderr, "ram blocks mismatch (Number of blocks %d vs %d) "
3946 "Your QEMU command line parameters are probably "
3947 "not identical on both the source and destination.",
3948 local->nb_blocks, nb_dest_blocks);
ef4b722d 3949 rdma->error_state = -EINVAL;
2da776db
MH
3950 return -EINVAL;
3951 }
3952
885e8f98 3953 qemu_rdma_move_header(rdma, reg_result_idx, &resp);
a97270ad 3954 memcpy(rdma->dest_blocks,
885e8f98 3955 rdma->wr_data[reg_result_idx].control_curr, resp.len);
a97270ad
DDAG
3956 for (i = 0; i < nb_dest_blocks; i++) {
3957 network_to_dest_block(&rdma->dest_blocks[i]);
2da776db 3958
e4d63320
DDAG
3959 /* We require that the blocks are in the same order */
3960 if (rdma->dest_blocks[i].length != local->block[i].length) {
9cde9caa
MA
3961 fprintf(stderr, "Block %s/%d has a different length %" PRIu64
3962 "vs %" PRIu64, local->block[i].block_name, i,
3963 local->block[i].length,
3964 rdma->dest_blocks[i].length);
ef4b722d 3965 rdma->error_state = -EINVAL;
2da776db
MH
3966 return -EINVAL;
3967 }
e4d63320
DDAG
3968 local->block[i].remote_host_addr =
3969 rdma->dest_blocks[i].remote_host_addr;
3970 local->block[i].remote_rkey = rdma->dest_blocks[i].remote_rkey;
2da776db
MH
3971 }
3972 }
3973
733252de 3974 trace_qemu_rdma_registration_stop(flags);
2da776db
MH
3975
3976 head.type = RDMA_CONTROL_REGISTER_FINISHED;
3977 ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL);
3978
3979 if (ret < 0) {
3980 goto err;
3981 }
3982
3983 return 0;
3984err:
3985 rdma->error_state = ret;
3986 return ret;
3987}
3988
0436e09f 3989static const QEMUFileHooks rdma_read_hooks = {
632e3a5c 3990 .hook_ram_load = rdma_load_hook,
2da776db
MH
3991};
3992
0436e09f 3993static const QEMUFileHooks rdma_write_hooks = {
2da776db
MH
3994 .before_ram_iterate = qemu_rdma_registration_start,
3995 .after_ram_iterate = qemu_rdma_registration_stop,
3996 .save_page = qemu_rdma_save_page,
3997};
3998
6ddd2d76
DB
3999
4000static void qio_channel_rdma_finalize(Object *obj)
4001{
4002 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(obj);
74637e6f
LC
4003 if (rioc->rdmain) {
4004 qemu_rdma_cleanup(rioc->rdmain);
4005 g_free(rioc->rdmain);
4006 rioc->rdmain = NULL;
4007 }
4008 if (rioc->rdmaout) {
4009 qemu_rdma_cleanup(rioc->rdmaout);
4010 g_free(rioc->rdmaout);
4011 rioc->rdmaout = NULL;
6ddd2d76
DB
4012 }
4013}
4014
4015static void qio_channel_rdma_class_init(ObjectClass *klass,
4016 void *class_data G_GNUC_UNUSED)
4017{
4018 QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass);
4019
4020 ioc_klass->io_writev = qio_channel_rdma_writev;
4021 ioc_klass->io_readv = qio_channel_rdma_readv;
4022 ioc_klass->io_set_blocking = qio_channel_rdma_set_blocking;
4023 ioc_klass->io_close = qio_channel_rdma_close;
4024 ioc_klass->io_create_watch = qio_channel_rdma_create_watch;
4d9f675b 4025 ioc_klass->io_set_aio_fd_handler = qio_channel_rdma_set_aio_fd_handler;
54db882f 4026 ioc_klass->io_shutdown = qio_channel_rdma_shutdown;
6ddd2d76
DB
4027}
4028
4029static const TypeInfo qio_channel_rdma_info = {
4030 .parent = TYPE_QIO_CHANNEL,
4031 .name = TYPE_QIO_CHANNEL_RDMA,
4032 .instance_size = sizeof(QIOChannelRDMA),
4033 .instance_finalize = qio_channel_rdma_finalize,
4034 .class_init = qio_channel_rdma_class_init,
4035};
4036
4037static void qio_channel_rdma_register_types(void)
4038{
4039 type_register_static(&qio_channel_rdma_info);
4040}
4041
4042type_init(qio_channel_rdma_register_types);
4043
4044static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
2da776db 4045{
6ddd2d76 4046 QIOChannelRDMA *rioc;
2da776db
MH
4047
4048 if (qemu_file_mode_is_not_valid(mode)) {
4049 return NULL;
4050 }
4051
6ddd2d76 4052 rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
2da776db
MH
4053
4054 if (mode[0] == 'w') {
6ddd2d76 4055 rioc->file = qemu_fopen_channel_output(QIO_CHANNEL(rioc));
74637e6f
LC
4056 rioc->rdmaout = rdma;
4057 rioc->rdmain = rdma->return_path;
6ddd2d76 4058 qemu_file_set_hooks(rioc->file, &rdma_write_hooks);
2da776db 4059 } else {
6ddd2d76 4060 rioc->file = qemu_fopen_channel_input(QIO_CHANNEL(rioc));
74637e6f
LC
4061 rioc->rdmain = rdma;
4062 rioc->rdmaout = rdma->return_path;
6ddd2d76 4063 qemu_file_set_hooks(rioc->file, &rdma_read_hooks);
2da776db
MH
4064 }
4065
6ddd2d76 4066 return rioc->file;
2da776db
MH
4067}
4068
4069static void rdma_accept_incoming_migration(void *opaque)
4070{
4071 RDMAContext *rdma = opaque;
4072 int ret;
4073 QEMUFile *f;
2a1bc8bd 4074 Error *local_err = NULL;
2da776db 4075
24ec68ef 4076 trace_qemu_rdma_accept_incoming_migration();
2da776db
MH
4077 ret = qemu_rdma_accept(rdma);
4078
4079 if (ret) {
2a1bc8bd 4080 fprintf(stderr, "RDMA ERROR: Migration initialization failed\n");
2da776db
MH
4081 return;
4082 }
4083
24ec68ef 4084 trace_qemu_rdma_accept_incoming_migration_accepted();
2da776db 4085
55cc1b59
LC
4086 if (rdma->is_return_path) {
4087 return;
4088 }
4089
2da776db
MH
4090 f = qemu_fopen_rdma(rdma, "rb");
4091 if (f == NULL) {
2a1bc8bd 4092 fprintf(stderr, "RDMA ERROR: could not qemu_fopen_rdma\n");
2da776db
MH
4093 qemu_rdma_cleanup(rdma);
4094 return;
4095 }
4096
4097 rdma->migration_started_on_destination = 1;
2a1bc8bd
DDAG
4098 migration_fd_process_incoming(f, &local_err);
4099 if (local_err) {
4100 error_reportf_err(local_err, "RDMA ERROR:");
4101 }
2da776db
MH
4102}
4103
4104void rdma_start_incoming_migration(const char *host_port, Error **errp)
4105{
4106 int ret;
449f91b2 4107 RDMAContext *rdma, *rdma_return_path = NULL;
2da776db
MH
4108 Error *local_err = NULL;
4109
733252de 4110 trace_rdma_start_incoming_migration();
2da776db 4111
5f1f1902
DH
4112 /* Avoid ram_block_discard_disable(), cannot change during migration. */
4113 if (ram_block_discard_is_required()) {
4114 error_setg(errp, "RDMA: cannot disable RAM discard");
4115 return;
4116 }
4117
4118 rdma = qemu_rdma_data_init(host_port, &local_err);
2da776db
MH
4119 if (rdma == NULL) {
4120 goto err;
4121 }
4122
4123 ret = qemu_rdma_dest_init(rdma, &local_err);
4124
4125 if (ret) {
4126 goto err;
4127 }
4128
733252de 4129 trace_rdma_start_incoming_migration_after_dest_init();
2da776db
MH
4130
4131 ret = rdma_listen(rdma->listen_id, 5);
4132
4133 if (ret) {
66988941 4134 ERROR(errp, "listening on socket!");
4e812d23 4135 goto cleanup_rdma;
2da776db
MH
4136 }
4137
733252de 4138 trace_rdma_start_incoming_migration_after_rdma_listen();
2da776db 4139
82e1cc4b
FZ
4140 qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
4141 NULL, (void *)(intptr_t)rdma);
2da776db 4142 return;
4e812d23
LZ
4143
4144cleanup_rdma:
4145 qemu_rdma_cleanup(rdma);
2da776db
MH
4146err:
4147 error_propagate(errp, local_err);
3b59ee72
PN
4148 if (rdma) {
4149 g_free(rdma->host);
44bcfd45 4150 g_free(rdma->host_port);
3b59ee72 4151 }
2da776db 4152 g_free(rdma);
55cc1b59 4153 g_free(rdma_return_path);
2da776db
MH
4154}
4155
4156void rdma_start_outgoing_migration(void *opaque,
4157 const char *host_port, Error **errp)
4158{
4159 MigrationState *s = opaque;
55cc1b59 4160 RDMAContext *rdma_return_path = NULL;
5f1f1902 4161 RDMAContext *rdma;
2da776db
MH
4162 int ret = 0;
4163
5f1f1902
DH
4164 /* Avoid ram_block_discard_disable(), cannot change during migration. */
4165 if (ram_block_discard_is_required()) {
4166 error_setg(errp, "RDMA: cannot disable RAM discard");
4167 return;
4168 }
4169
4170 rdma = qemu_rdma_data_init(host_port, errp);
2da776db 4171 if (rdma == NULL) {
2da776db
MH
4172 goto err;
4173 }
4174
bbfb89e3
FZ
4175 ret = qemu_rdma_source_init(rdma,
4176 s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL], errp);
2da776db
MH
4177
4178 if (ret) {
4179 goto err;
4180 }
4181
733252de 4182 trace_rdma_start_outgoing_migration_after_rdma_source_init();
e49e49dd 4183 ret = qemu_rdma_connect(rdma, errp, false);
2da776db
MH
4184
4185 if (ret) {
4186 goto err;
4187 }
4188
3a4452d8 4189 /* RDMA postcopy need a separate queue pair for return path */
55cc1b59
LC
4190 if (migrate_postcopy()) {
4191 rdma_return_path = qemu_rdma_data_init(host_port, errp);
4192
4193 if (rdma_return_path == NULL) {
2f0c285a 4194 goto return_path_err;
55cc1b59
LC
4195 }
4196
4197 ret = qemu_rdma_source_init(rdma_return_path,
4198 s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL], errp);
4199
4200 if (ret) {
2f0c285a 4201 goto return_path_err;
55cc1b59
LC
4202 }
4203
e49e49dd 4204 ret = qemu_rdma_connect(rdma_return_path, errp, true);
55cc1b59
LC
4205
4206 if (ret) {
2f0c285a 4207 goto return_path_err;
55cc1b59
LC
4208 }
4209
4210 rdma->return_path = rdma_return_path;
4211 rdma_return_path->return_path = rdma;
4212 rdma_return_path->is_return_path = true;
4213 }
4214
733252de 4215 trace_rdma_start_outgoing_migration_after_rdma_connect();
2da776db 4216
89a02a9f 4217 s->to_dst_file = qemu_fopen_rdma(rdma, "wb");
cce8040b 4218 migrate_fd_connect(s, NULL);
2da776db 4219 return;
2f0c285a
PN
4220return_path_err:
4221 qemu_rdma_cleanup(rdma);
2da776db 4222err:
2da776db 4223 g_free(rdma);
55cc1b59 4224 g_free(rdma_return_path);
2da776db 4225}