]> git.proxmox.com Git - mirror_qemu.git/blob - migration/rdma.c
migration/rdma: Use error_report() & friends instead of stderr
[mirror_qemu.git] / migration / rdma.c
1 /*
2 * RDMA protocol and interfaces
3 *
4 * Copyright IBM, Corp. 2010-2013
5 * Copyright Red Hat, Inc. 2015-2016
6 *
7 * Authors:
8 * Michael R. Hines <mrhines@us.ibm.com>
9 * Jiuxing Liu <jl@us.ibm.com>
10 * Daniel P. Berrange <berrange@redhat.com>
11 *
12 * This work is licensed under the terms of the GNU GPL, version 2 or
13 * later. See the COPYING file in the top-level directory.
14 *
15 */
16
17 #include "qemu/osdep.h"
18 #include "qapi/error.h"
19 #include "qemu/cutils.h"
20 #include "exec/target_page.h"
21 #include "rdma.h"
22 #include "migration.h"
23 #include "migration-stats.h"
24 #include "qemu-file.h"
25 #include "ram.h"
26 #include "qemu/error-report.h"
27 #include "qemu/main-loop.h"
28 #include "qemu/module.h"
29 #include "qemu/rcu.h"
30 #include "qemu/sockets.h"
31 #include "qemu/bitmap.h"
32 #include "qemu/coroutine.h"
33 #include "exec/memory.h"
34 #include <sys/socket.h>
35 #include <netdb.h>
36 #include <arpa/inet.h>
37 #include <rdma/rdma_cma.h>
38 #include "trace.h"
39 #include "qom/object.h"
40 #include "options.h"
41 #include <poll.h>
42
43 #define RDMA_RESOLVE_TIMEOUT_MS 10000
44
45 /* Do not merge data if larger than this. */
46 #define RDMA_MERGE_MAX (2 * 1024 * 1024)
47 #define RDMA_SIGNALED_SEND_MAX (RDMA_MERGE_MAX / 4096)
48
49 #define RDMA_REG_CHUNK_SHIFT 20 /* 1 MB */
50
51 /*
52 * This is only for non-live state being migrated.
53 * Instead of RDMA_WRITE messages, we use RDMA_SEND
54 * messages for that state, which requires a different
55 * delivery design than main memory.
56 */
57 #define RDMA_SEND_INCREMENT 32768
58
59 /*
60 * Maximum size infiniband SEND message
61 */
62 #define RDMA_CONTROL_MAX_BUFFER (512 * 1024)
63 #define RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE 4096
64
65 #define RDMA_CONTROL_VERSION_CURRENT 1
66 /*
67 * Capabilities for negotiation.
68 */
69 #define RDMA_CAPABILITY_PIN_ALL 0x01
70
71 /*
72 * Add the other flags above to this list of known capabilities
73 * as they are introduced.
74 */
75 static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL;
76
77 /*
78 * A work request ID is 64-bits and we split up these bits
79 * into 3 parts:
80 *
81 * bits 0-15 : type of control message, 2^16
82 * bits 16-29: ram block index, 2^14
83 * bits 30-63: ram block chunk number, 2^34
84 *
85 * The last two bit ranges are only used for RDMA writes,
86 * in order to track their completion and potentially
87 * also track unregistration status of the message.
88 */
89 #define RDMA_WRID_TYPE_SHIFT 0UL
90 #define RDMA_WRID_BLOCK_SHIFT 16UL
91 #define RDMA_WRID_CHUNK_SHIFT 30UL
92
93 #define RDMA_WRID_TYPE_MASK \
94 ((1UL << RDMA_WRID_BLOCK_SHIFT) - 1UL)
95
96 #define RDMA_WRID_BLOCK_MASK \
97 (~RDMA_WRID_TYPE_MASK & ((1UL << RDMA_WRID_CHUNK_SHIFT) - 1UL))
98
99 #define RDMA_WRID_CHUNK_MASK (~RDMA_WRID_BLOCK_MASK & ~RDMA_WRID_TYPE_MASK)
100
101 /*
102 * RDMA migration protocol:
103 * 1. RDMA Writes (data messages, i.e. RAM)
104 * 2. IB Send/Recv (control channel messages)
105 */
106 enum {
107 RDMA_WRID_NONE = 0,
108 RDMA_WRID_RDMA_WRITE = 1,
109 RDMA_WRID_SEND_CONTROL = 2000,
110 RDMA_WRID_RECV_CONTROL = 4000,
111 };
112
113 /*
114 * Work request IDs for IB SEND messages only (not RDMA writes).
115 * This is used by the migration protocol to transmit
116 * control messages (such as device state and registration commands)
117 *
118 * We could use more WRs, but we have enough for now.
119 */
120 enum {
121 RDMA_WRID_READY = 0,
122 RDMA_WRID_DATA,
123 RDMA_WRID_CONTROL,
124 RDMA_WRID_MAX,
125 };
126
127 /*
128 * SEND/RECV IB Control Messages.
129 */
130 enum {
131 RDMA_CONTROL_NONE = 0,
132 RDMA_CONTROL_ERROR,
133 RDMA_CONTROL_READY, /* ready to receive */
134 RDMA_CONTROL_QEMU_FILE, /* QEMUFile-transmitted bytes */
135 RDMA_CONTROL_RAM_BLOCKS_REQUEST, /* RAMBlock synchronization */
136 RDMA_CONTROL_RAM_BLOCKS_RESULT, /* RAMBlock synchronization */
137 RDMA_CONTROL_COMPRESS, /* page contains repeat values */
138 RDMA_CONTROL_REGISTER_REQUEST, /* dynamic page registration */
139 RDMA_CONTROL_REGISTER_RESULT, /* key to use after registration */
140 RDMA_CONTROL_REGISTER_FINISHED, /* current iteration finished */
141 RDMA_CONTROL_UNREGISTER_REQUEST, /* dynamic UN-registration */
142 RDMA_CONTROL_UNREGISTER_FINISHED, /* unpinning finished */
143 };
144
145
146 /*
147 * Memory and MR structures used to represent an IB Send/Recv work request.
148 * This is *not* used for RDMA writes, only IB Send/Recv.
149 */
150 typedef struct {
151 uint8_t control[RDMA_CONTROL_MAX_BUFFER]; /* actual buffer to register */
152 struct ibv_mr *control_mr; /* registration metadata */
153 size_t control_len; /* length of the message */
154 uint8_t *control_curr; /* start of unconsumed bytes */
155 } RDMAWorkRequestData;
156
157 /*
158 * Negotiate RDMA capabilities during connection-setup time.
159 */
160 typedef struct {
161 uint32_t version;
162 uint32_t flags;
163 } RDMACapabilities;
164
165 static void caps_to_network(RDMACapabilities *cap)
166 {
167 cap->version = htonl(cap->version);
168 cap->flags = htonl(cap->flags);
169 }
170
171 static void network_to_caps(RDMACapabilities *cap)
172 {
173 cap->version = ntohl(cap->version);
174 cap->flags = ntohl(cap->flags);
175 }
176
177 /*
178 * Representation of a RAMBlock from an RDMA perspective.
179 * This is not transmitted, only local.
180 * This and subsequent structures cannot be linked lists
181 * because we're using a single IB message to transmit
182 * the information. It's small anyway, so a list is overkill.
183 */
184 typedef struct RDMALocalBlock {
185 char *block_name;
186 uint8_t *local_host_addr; /* local virtual address */
187 uint64_t remote_host_addr; /* remote virtual address */
188 uint64_t offset;
189 uint64_t length;
190 struct ibv_mr **pmr; /* MRs for chunk-level registration */
191 struct ibv_mr *mr; /* MR for non-chunk-level registration */
192 uint32_t *remote_keys; /* rkeys for chunk-level registration */
193 uint32_t remote_rkey; /* rkeys for non-chunk-level registration */
194 int index; /* which block are we */
195 unsigned int src_index; /* (Only used on dest) */
196 bool is_ram_block;
197 int nb_chunks;
198 unsigned long *transit_bitmap;
199 unsigned long *unregister_bitmap;
200 } RDMALocalBlock;
201
202 /*
203 * Also represents a RAMblock, but only on the dest.
204 * This gets transmitted by the dest during connection-time
205 * to the source VM and then is used to populate the
206 * corresponding RDMALocalBlock with
207 * the information needed to perform the actual RDMA.
208 */
209 typedef struct QEMU_PACKED RDMADestBlock {
210 uint64_t remote_host_addr;
211 uint64_t offset;
212 uint64_t length;
213 uint32_t remote_rkey;
214 uint32_t padding;
215 } RDMADestBlock;
216
217 static const char *control_desc(unsigned int rdma_control)
218 {
219 static const char *strs[] = {
220 [RDMA_CONTROL_NONE] = "NONE",
221 [RDMA_CONTROL_ERROR] = "ERROR",
222 [RDMA_CONTROL_READY] = "READY",
223 [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE",
224 [RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST",
225 [RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT",
226 [RDMA_CONTROL_COMPRESS] = "COMPRESS",
227 [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST",
228 [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT",
229 [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED",
230 [RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST",
231 [RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED",
232 };
233
234 if (rdma_control > RDMA_CONTROL_UNREGISTER_FINISHED) {
235 return "??BAD CONTROL VALUE??";
236 }
237
238 return strs[rdma_control];
239 }
240
241 static uint64_t htonll(uint64_t v)
242 {
243 union { uint32_t lv[2]; uint64_t llv; } u;
244 u.lv[0] = htonl(v >> 32);
245 u.lv[1] = htonl(v & 0xFFFFFFFFULL);
246 return u.llv;
247 }
248
249 static uint64_t ntohll(uint64_t v)
250 {
251 union { uint32_t lv[2]; uint64_t llv; } u;
252 u.llv = v;
253 return ((uint64_t)ntohl(u.lv[0]) << 32) | (uint64_t) ntohl(u.lv[1]);
254 }
255
256 static void dest_block_to_network(RDMADestBlock *db)
257 {
258 db->remote_host_addr = htonll(db->remote_host_addr);
259 db->offset = htonll(db->offset);
260 db->length = htonll(db->length);
261 db->remote_rkey = htonl(db->remote_rkey);
262 }
263
264 static void network_to_dest_block(RDMADestBlock *db)
265 {
266 db->remote_host_addr = ntohll(db->remote_host_addr);
267 db->offset = ntohll(db->offset);
268 db->length = ntohll(db->length);
269 db->remote_rkey = ntohl(db->remote_rkey);
270 }
271
272 /*
273 * Virtual address of the above structures used for transmitting
274 * the RAMBlock descriptions at connection-time.
275 * This structure is *not* transmitted.
276 */
277 typedef struct RDMALocalBlocks {
278 int nb_blocks;
279 bool init; /* main memory init complete */
280 RDMALocalBlock *block;
281 } RDMALocalBlocks;
282
283 /*
284 * Main data structure for RDMA state.
285 * While there is only one copy of this structure being allocated right now,
286 * this is the place where one would start if you wanted to consider
287 * having more than one RDMA connection open at the same time.
288 */
289 typedef struct RDMAContext {
290 char *host;
291 int port;
292 char *host_port;
293
294 RDMAWorkRequestData wr_data[RDMA_WRID_MAX];
295
296 /*
297 * This is used by *_exchange_send() to figure out whether or not
298 * the initial "READY" message has already been received or not.
299 * This is because other functions may potentially poll() and detect
300 * the READY message before send() does, in which case we need to
301 * know if it completed.
302 */
303 int control_ready_expected;
304
305 /* number of outstanding writes */
306 int nb_sent;
307
308 /* store info about current buffer so that we can
309 merge it with future sends */
310 uint64_t current_addr;
311 uint64_t current_length;
312 /* index of ram block the current buffer belongs to */
313 int current_index;
314 /* index of the chunk in the current ram block */
315 int current_chunk;
316
317 bool pin_all;
318
319 /*
320 * infiniband-specific variables for opening the device
321 * and maintaining connection state and so forth.
322 *
323 * cm_id also has ibv_context, rdma_event_channel, and ibv_qp in
324 * cm_id->verbs, cm_id->channel, and cm_id->qp.
325 */
326 struct rdma_cm_id *cm_id; /* connection manager ID */
327 struct rdma_cm_id *listen_id;
328 bool connected;
329
330 struct ibv_context *verbs;
331 struct rdma_event_channel *channel;
332 struct ibv_qp *qp; /* queue pair */
333 struct ibv_comp_channel *recv_comp_channel; /* recv completion channel */
334 struct ibv_comp_channel *send_comp_channel; /* send completion channel */
335 struct ibv_pd *pd; /* protection domain */
336 struct ibv_cq *recv_cq; /* recvieve completion queue */
337 struct ibv_cq *send_cq; /* send completion queue */
338
339 /*
340 * If a previous write failed (perhaps because of a failed
341 * memory registration, then do not attempt any future work
342 * and remember the error state.
343 */
344 bool errored;
345 bool error_reported;
346 bool received_error;
347
348 /*
349 * Description of ram blocks used throughout the code.
350 */
351 RDMALocalBlocks local_ram_blocks;
352 RDMADestBlock *dest_blocks;
353
354 /* Index of the next RAMBlock received during block registration */
355 unsigned int next_src_index;
356
357 /*
358 * Migration on *destination* started.
359 * Then use coroutine yield function.
360 * Source runs in a thread, so we don't care.
361 */
362 int migration_started_on_destination;
363
364 int total_registrations;
365 int total_writes;
366
367 int unregister_current, unregister_next;
368 uint64_t unregistrations[RDMA_SIGNALED_SEND_MAX];
369
370 GHashTable *blockmap;
371
372 /* the RDMAContext for return path */
373 struct RDMAContext *return_path;
374 bool is_return_path;
375 } RDMAContext;
376
377 #define TYPE_QIO_CHANNEL_RDMA "qio-channel-rdma"
378 OBJECT_DECLARE_SIMPLE_TYPE(QIOChannelRDMA, QIO_CHANNEL_RDMA)
379
380
381
382 struct QIOChannelRDMA {
383 QIOChannel parent;
384 RDMAContext *rdmain;
385 RDMAContext *rdmaout;
386 QEMUFile *file;
387 bool blocking; /* XXX we don't actually honour this yet */
388 };
389
390 /*
391 * Main structure for IB Send/Recv control messages.
392 * This gets prepended at the beginning of every Send/Recv.
393 */
394 typedef struct QEMU_PACKED {
395 uint32_t len; /* Total length of data portion */
396 uint32_t type; /* which control command to perform */
397 uint32_t repeat; /* number of commands in data portion of same type */
398 uint32_t padding;
399 } RDMAControlHeader;
400
401 static void control_to_network(RDMAControlHeader *control)
402 {
403 control->type = htonl(control->type);
404 control->len = htonl(control->len);
405 control->repeat = htonl(control->repeat);
406 }
407
408 static void network_to_control(RDMAControlHeader *control)
409 {
410 control->type = ntohl(control->type);
411 control->len = ntohl(control->len);
412 control->repeat = ntohl(control->repeat);
413 }
414
415 /*
416 * Register a single Chunk.
417 * Information sent by the source VM to inform the dest
418 * to register an single chunk of memory before we can perform
419 * the actual RDMA operation.
420 */
421 typedef struct QEMU_PACKED {
422 union QEMU_PACKED {
423 uint64_t current_addr; /* offset into the ram_addr_t space */
424 uint64_t chunk; /* chunk to lookup if unregistering */
425 } key;
426 uint32_t current_index; /* which ramblock the chunk belongs to */
427 uint32_t padding;
428 uint64_t chunks; /* how many sequential chunks to register */
429 } RDMARegister;
430
431 static bool rdma_errored(RDMAContext *rdma)
432 {
433 if (rdma->errored && !rdma->error_reported) {
434 error_report("RDMA is in an error state waiting migration"
435 " to abort!");
436 rdma->error_reported = true;
437 }
438 return rdma->errored;
439 }
440
441 static void register_to_network(RDMAContext *rdma, RDMARegister *reg)
442 {
443 RDMALocalBlock *local_block;
444 local_block = &rdma->local_ram_blocks.block[reg->current_index];
445
446 if (local_block->is_ram_block) {
447 /*
448 * current_addr as passed in is an address in the local ram_addr_t
449 * space, we need to translate this for the destination
450 */
451 reg->key.current_addr -= local_block->offset;
452 reg->key.current_addr += rdma->dest_blocks[reg->current_index].offset;
453 }
454 reg->key.current_addr = htonll(reg->key.current_addr);
455 reg->current_index = htonl(reg->current_index);
456 reg->chunks = htonll(reg->chunks);
457 }
458
459 static void network_to_register(RDMARegister *reg)
460 {
461 reg->key.current_addr = ntohll(reg->key.current_addr);
462 reg->current_index = ntohl(reg->current_index);
463 reg->chunks = ntohll(reg->chunks);
464 }
465
466 typedef struct QEMU_PACKED {
467 uint32_t value; /* if zero, we will madvise() */
468 uint32_t block_idx; /* which ram block index */
469 uint64_t offset; /* Address in remote ram_addr_t space */
470 uint64_t length; /* length of the chunk */
471 } RDMACompress;
472
473 static void compress_to_network(RDMAContext *rdma, RDMACompress *comp)
474 {
475 comp->value = htonl(comp->value);
476 /*
477 * comp->offset as passed in is an address in the local ram_addr_t
478 * space, we need to translate this for the destination
479 */
480 comp->offset -= rdma->local_ram_blocks.block[comp->block_idx].offset;
481 comp->offset += rdma->dest_blocks[comp->block_idx].offset;
482 comp->block_idx = htonl(comp->block_idx);
483 comp->offset = htonll(comp->offset);
484 comp->length = htonll(comp->length);
485 }
486
487 static void network_to_compress(RDMACompress *comp)
488 {
489 comp->value = ntohl(comp->value);
490 comp->block_idx = ntohl(comp->block_idx);
491 comp->offset = ntohll(comp->offset);
492 comp->length = ntohll(comp->length);
493 }
494
495 /*
496 * The result of the dest's memory registration produces an "rkey"
497 * which the source VM must reference in order to perform
498 * the RDMA operation.
499 */
500 typedef struct QEMU_PACKED {
501 uint32_t rkey;
502 uint32_t padding;
503 uint64_t host_addr;
504 } RDMARegisterResult;
505
506 static void result_to_network(RDMARegisterResult *result)
507 {
508 result->rkey = htonl(result->rkey);
509 result->host_addr = htonll(result->host_addr);
510 };
511
512 static void network_to_result(RDMARegisterResult *result)
513 {
514 result->rkey = ntohl(result->rkey);
515 result->host_addr = ntohll(result->host_addr);
516 };
517
518 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
519 uint8_t *data, RDMAControlHeader *resp,
520 int *resp_idx,
521 int (*callback)(RDMAContext *rdma,
522 Error **errp),
523 Error **errp);
524
525 static inline uint64_t ram_chunk_index(const uint8_t *start,
526 const uint8_t *host)
527 {
528 return ((uintptr_t) host - (uintptr_t) start) >> RDMA_REG_CHUNK_SHIFT;
529 }
530
531 static inline uint8_t *ram_chunk_start(const RDMALocalBlock *rdma_ram_block,
532 uint64_t i)
533 {
534 return (uint8_t *)(uintptr_t)(rdma_ram_block->local_host_addr +
535 (i << RDMA_REG_CHUNK_SHIFT));
536 }
537
538 static inline uint8_t *ram_chunk_end(const RDMALocalBlock *rdma_ram_block,
539 uint64_t i)
540 {
541 uint8_t *result = ram_chunk_start(rdma_ram_block, i) +
542 (1UL << RDMA_REG_CHUNK_SHIFT);
543
544 if (result > (rdma_ram_block->local_host_addr + rdma_ram_block->length)) {
545 result = rdma_ram_block->local_host_addr + rdma_ram_block->length;
546 }
547
548 return result;
549 }
550
551 static void rdma_add_block(RDMAContext *rdma, const char *block_name,
552 void *host_addr,
553 ram_addr_t block_offset, uint64_t length)
554 {
555 RDMALocalBlocks *local = &rdma->local_ram_blocks;
556 RDMALocalBlock *block;
557 RDMALocalBlock *old = local->block;
558
559 local->block = g_new0(RDMALocalBlock, local->nb_blocks + 1);
560
561 if (local->nb_blocks) {
562 int x;
563
564 if (rdma->blockmap) {
565 for (x = 0; x < local->nb_blocks; x++) {
566 g_hash_table_remove(rdma->blockmap,
567 (void *)(uintptr_t)old[x].offset);
568 g_hash_table_insert(rdma->blockmap,
569 (void *)(uintptr_t)old[x].offset,
570 &local->block[x]);
571 }
572 }
573 memcpy(local->block, old, sizeof(RDMALocalBlock) * local->nb_blocks);
574 g_free(old);
575 }
576
577 block = &local->block[local->nb_blocks];
578
579 block->block_name = g_strdup(block_name);
580 block->local_host_addr = host_addr;
581 block->offset = block_offset;
582 block->length = length;
583 block->index = local->nb_blocks;
584 block->src_index = ~0U; /* Filled in by the receipt of the block list */
585 block->nb_chunks = ram_chunk_index(host_addr, host_addr + length) + 1UL;
586 block->transit_bitmap = bitmap_new(block->nb_chunks);
587 bitmap_clear(block->transit_bitmap, 0, block->nb_chunks);
588 block->unregister_bitmap = bitmap_new(block->nb_chunks);
589 bitmap_clear(block->unregister_bitmap, 0, block->nb_chunks);
590 block->remote_keys = g_new0(uint32_t, block->nb_chunks);
591
592 block->is_ram_block = local->init ? false : true;
593
594 if (rdma->blockmap) {
595 g_hash_table_insert(rdma->blockmap, (void *)(uintptr_t)block_offset, block);
596 }
597
598 trace_rdma_add_block(block_name, local->nb_blocks,
599 (uintptr_t) block->local_host_addr,
600 block->offset, block->length,
601 (uintptr_t) (block->local_host_addr + block->length),
602 BITS_TO_LONGS(block->nb_chunks) *
603 sizeof(unsigned long) * 8,
604 block->nb_chunks);
605
606 local->nb_blocks++;
607 }
608
609 /*
610 * Memory regions need to be registered with the device and queue pairs setup
611 * in advanced before the migration starts. This tells us where the RAM blocks
612 * are so that we can register them individually.
613 */
614 static int qemu_rdma_init_one_block(RAMBlock *rb, void *opaque)
615 {
616 const char *block_name = qemu_ram_get_idstr(rb);
617 void *host_addr = qemu_ram_get_host_addr(rb);
618 ram_addr_t block_offset = qemu_ram_get_offset(rb);
619 ram_addr_t length = qemu_ram_get_used_length(rb);
620 rdma_add_block(opaque, block_name, host_addr, block_offset, length);
621 return 0;
622 }
623
624 /*
625 * Identify the RAMBlocks and their quantity. They will be references to
626 * identify chunk boundaries inside each RAMBlock and also be referenced
627 * during dynamic page registration.
628 */
629 static void qemu_rdma_init_ram_blocks(RDMAContext *rdma)
630 {
631 RDMALocalBlocks *local = &rdma->local_ram_blocks;
632 int ret;
633
634 assert(rdma->blockmap == NULL);
635 memset(local, 0, sizeof *local);
636 ret = foreach_not_ignored_block(qemu_rdma_init_one_block, rdma);
637 assert(!ret);
638 trace_qemu_rdma_init_ram_blocks(local->nb_blocks);
639 rdma->dest_blocks = g_new0(RDMADestBlock,
640 rdma->local_ram_blocks.nb_blocks);
641 local->init = true;
642 }
643
644 /*
645 * Note: If used outside of cleanup, the caller must ensure that the destination
646 * block structures are also updated
647 */
648 static void rdma_delete_block(RDMAContext *rdma, RDMALocalBlock *block)
649 {
650 RDMALocalBlocks *local = &rdma->local_ram_blocks;
651 RDMALocalBlock *old = local->block;
652 int x;
653
654 if (rdma->blockmap) {
655 g_hash_table_remove(rdma->blockmap, (void *)(uintptr_t)block->offset);
656 }
657 if (block->pmr) {
658 int j;
659
660 for (j = 0; j < block->nb_chunks; j++) {
661 if (!block->pmr[j]) {
662 continue;
663 }
664 ibv_dereg_mr(block->pmr[j]);
665 rdma->total_registrations--;
666 }
667 g_free(block->pmr);
668 block->pmr = NULL;
669 }
670
671 if (block->mr) {
672 ibv_dereg_mr(block->mr);
673 rdma->total_registrations--;
674 block->mr = NULL;
675 }
676
677 g_free(block->transit_bitmap);
678 block->transit_bitmap = NULL;
679
680 g_free(block->unregister_bitmap);
681 block->unregister_bitmap = NULL;
682
683 g_free(block->remote_keys);
684 block->remote_keys = NULL;
685
686 g_free(block->block_name);
687 block->block_name = NULL;
688
689 if (rdma->blockmap) {
690 for (x = 0; x < local->nb_blocks; x++) {
691 g_hash_table_remove(rdma->blockmap,
692 (void *)(uintptr_t)old[x].offset);
693 }
694 }
695
696 if (local->nb_blocks > 1) {
697
698 local->block = g_new0(RDMALocalBlock, local->nb_blocks - 1);
699
700 if (block->index) {
701 memcpy(local->block, old, sizeof(RDMALocalBlock) * block->index);
702 }
703
704 if (block->index < (local->nb_blocks - 1)) {
705 memcpy(local->block + block->index, old + (block->index + 1),
706 sizeof(RDMALocalBlock) *
707 (local->nb_blocks - (block->index + 1)));
708 for (x = block->index; x < local->nb_blocks - 1; x++) {
709 local->block[x].index--;
710 }
711 }
712 } else {
713 assert(block == local->block);
714 local->block = NULL;
715 }
716
717 trace_rdma_delete_block(block, (uintptr_t)block->local_host_addr,
718 block->offset, block->length,
719 (uintptr_t)(block->local_host_addr + block->length),
720 BITS_TO_LONGS(block->nb_chunks) *
721 sizeof(unsigned long) * 8, block->nb_chunks);
722
723 g_free(old);
724
725 local->nb_blocks--;
726
727 if (local->nb_blocks && rdma->blockmap) {
728 for (x = 0; x < local->nb_blocks; x++) {
729 g_hash_table_insert(rdma->blockmap,
730 (void *)(uintptr_t)local->block[x].offset,
731 &local->block[x]);
732 }
733 }
734 }
735
736 /*
737 * Put in the log file which RDMA device was opened and the details
738 * associated with that device.
739 */
740 static void qemu_rdma_dump_id(const char *who, struct ibv_context *verbs)
741 {
742 struct ibv_port_attr port;
743
744 if (ibv_query_port(verbs, 1, &port)) {
745 error_report("Failed to query port information");
746 return;
747 }
748
749 printf("%s RDMA Device opened: kernel name %s "
750 "uverbs device name %s, "
751 "infiniband_verbs class device path %s, "
752 "infiniband class device path %s, "
753 "transport: (%d) %s\n",
754 who,
755 verbs->device->name,
756 verbs->device->dev_name,
757 verbs->device->dev_path,
758 verbs->device->ibdev_path,
759 port.link_layer,
760 (port.link_layer == IBV_LINK_LAYER_INFINIBAND) ? "Infiniband" :
761 ((port.link_layer == IBV_LINK_LAYER_ETHERNET)
762 ? "Ethernet" : "Unknown"));
763 }
764
765 /*
766 * Put in the log file the RDMA gid addressing information,
767 * useful for folks who have trouble understanding the
768 * RDMA device hierarchy in the kernel.
769 */
770 static void qemu_rdma_dump_gid(const char *who, struct rdma_cm_id *id)
771 {
772 char sgid[33];
773 char dgid[33];
774 inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.sgid, sgid, sizeof sgid);
775 inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.dgid, dgid, sizeof dgid);
776 trace_qemu_rdma_dump_gid(who, sgid, dgid);
777 }
778
779 /*
780 * As of now, IPv6 over RoCE / iWARP is not supported by linux.
781 * We will try the next addrinfo struct, and fail if there are
782 * no other valid addresses to bind against.
783 *
784 * If user is listening on '[::]', then we will not have a opened a device
785 * yet and have no way of verifying if the device is RoCE or not.
786 *
787 * In this case, the source VM will throw an error for ALL types of
788 * connections (both IPv4 and IPv6) if the destination machine does not have
789 * a regular infiniband network available for use.
790 *
791 * The only way to guarantee that an error is thrown for broken kernels is
792 * for the management software to choose a *specific* interface at bind time
793 * and validate what time of hardware it is.
794 *
795 * Unfortunately, this puts the user in a fix:
796 *
797 * If the source VM connects with an IPv4 address without knowing that the
798 * destination has bound to '[::]' the migration will unconditionally fail
799 * unless the management software is explicitly listening on the IPv4
800 * address while using a RoCE-based device.
801 *
802 * If the source VM connects with an IPv6 address, then we're OK because we can
803 * throw an error on the source (and similarly on the destination).
804 *
805 * But in mixed environments, this will be broken for a while until it is fixed
806 * inside linux.
807 *
808 * We do provide a *tiny* bit of help in this function: We can list all of the
809 * devices in the system and check to see if all the devices are RoCE or
810 * Infiniband.
811 *
812 * If we detect that we have a *pure* RoCE environment, then we can safely
813 * thrown an error even if the management software has specified '[::]' as the
814 * bind address.
815 *
816 * However, if there is are multiple hetergeneous devices, then we cannot make
817 * this assumption and the user just has to be sure they know what they are
818 * doing.
819 *
820 * Patches are being reviewed on linux-rdma.
821 */
822 static int qemu_rdma_broken_ipv6_kernel(struct ibv_context *verbs, Error **errp)
823 {
824 /* This bug only exists in linux, to our knowledge. */
825 #ifdef CONFIG_LINUX
826 struct ibv_port_attr port_attr;
827
828 /*
829 * Verbs are only NULL if management has bound to '[::]'.
830 *
831 * Let's iterate through all the devices and see if there any pure IB
832 * devices (non-ethernet).
833 *
834 * If not, then we can safely proceed with the migration.
835 * Otherwise, there are no guarantees until the bug is fixed in linux.
836 */
837 if (!verbs) {
838 int num_devices, x;
839 struct ibv_device **dev_list = ibv_get_device_list(&num_devices);
840 bool roce_found = false;
841 bool ib_found = false;
842
843 for (x = 0; x < num_devices; x++) {
844 verbs = ibv_open_device(dev_list[x]);
845 /*
846 * ibv_open_device() is not documented to set errno. If
847 * it does, it's somebody else's doc bug. If it doesn't,
848 * the use of errno below is wrong.
849 * TODO Find out whether ibv_open_device() sets errno.
850 */
851 if (!verbs) {
852 if (errno == EPERM) {
853 continue;
854 } else {
855 error_setg_errno(errp, errno,
856 "could not open RDMA device context");
857 return -1;
858 }
859 }
860
861 if (ibv_query_port(verbs, 1, &port_attr)) {
862 ibv_close_device(verbs);
863 error_setg(errp,
864 "RDMA ERROR: Could not query initial IB port");
865 return -1;
866 }
867
868 if (port_attr.link_layer == IBV_LINK_LAYER_INFINIBAND) {
869 ib_found = true;
870 } else if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) {
871 roce_found = true;
872 }
873
874 ibv_close_device(verbs);
875
876 }
877
878 if (roce_found) {
879 if (ib_found) {
880 warn_report("migrations may fail:"
881 " IPv6 over RoCE / iWARP in linux"
882 " is broken. But since you appear to have a"
883 " mixed RoCE / IB environment, be sure to only"
884 " migrate over the IB fabric until the kernel "
885 " fixes the bug.");
886 } else {
887 error_setg(errp, "RDMA ERROR: "
888 "You only have RoCE / iWARP devices in your systems"
889 " and your management software has specified '[::]'"
890 ", but IPv6 over RoCE / iWARP is not supported in Linux.");
891 return -1;
892 }
893 }
894
895 return 0;
896 }
897
898 /*
899 * If we have a verbs context, that means that some other than '[::]' was
900 * used by the management software for binding. In which case we can
901 * actually warn the user about a potentially broken kernel.
902 */
903
904 /* IB ports start with 1, not 0 */
905 if (ibv_query_port(verbs, 1, &port_attr)) {
906 error_setg(errp, "RDMA ERROR: Could not query initial IB port");
907 return -1;
908 }
909
910 if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) {
911 error_setg(errp, "RDMA ERROR: "
912 "Linux kernel's RoCE / iWARP does not support IPv6 "
913 "(but patches on linux-rdma in progress)");
914 return -1;
915 }
916
917 #endif
918
919 return 0;
920 }
921
922 /*
923 * Figure out which RDMA device corresponds to the requested IP hostname
924 * Also create the initial connection manager identifiers for opening
925 * the connection.
926 */
927 static int qemu_rdma_resolve_host(RDMAContext *rdma, Error **errp)
928 {
929 Error *err = NULL;
930 int ret;
931 struct rdma_addrinfo *res;
932 char port_str[16];
933 struct rdma_cm_event *cm_event;
934 char ip[40] = "unknown";
935 struct rdma_addrinfo *e;
936
937 if (rdma->host == NULL || !strcmp(rdma->host, "")) {
938 error_setg(errp, "RDMA ERROR: RDMA hostname has not been set");
939 return -1;
940 }
941
942 /* create CM channel */
943 rdma->channel = rdma_create_event_channel();
944 if (!rdma->channel) {
945 error_setg(errp, "RDMA ERROR: could not create CM channel");
946 return -1;
947 }
948
949 /* create CM id */
950 ret = rdma_create_id(rdma->channel, &rdma->cm_id, NULL, RDMA_PS_TCP);
951 if (ret < 0) {
952 error_setg(errp, "RDMA ERROR: could not create channel id");
953 goto err_resolve_create_id;
954 }
955
956 snprintf(port_str, 16, "%d", rdma->port);
957 port_str[15] = '\0';
958
959 ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res);
960 if (ret) {
961 error_setg(errp, "RDMA ERROR: could not rdma_getaddrinfo address %s",
962 rdma->host);
963 goto err_resolve_get_addr;
964 }
965
966 /* Try all addresses, saving the first error in @err */
967 for (e = res; e != NULL; e = e->ai_next) {
968 Error **local_errp = err ? NULL : &err;
969
970 inet_ntop(e->ai_family,
971 &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip);
972 trace_qemu_rdma_resolve_host_trying(rdma->host, ip);
973
974 ret = rdma_resolve_addr(rdma->cm_id, NULL, e->ai_dst_addr,
975 RDMA_RESOLVE_TIMEOUT_MS);
976 if (ret >= 0) {
977 if (e->ai_family == AF_INET6) {
978 ret = qemu_rdma_broken_ipv6_kernel(rdma->cm_id->verbs,
979 local_errp);
980 if (ret < 0) {
981 continue;
982 }
983 }
984 error_free(err);
985 goto route;
986 }
987 }
988
989 rdma_freeaddrinfo(res);
990 if (err) {
991 error_propagate(errp, err);
992 } else {
993 error_setg(errp, "RDMA ERROR: could not resolve address %s",
994 rdma->host);
995 }
996 goto err_resolve_get_addr;
997
998 route:
999 rdma_freeaddrinfo(res);
1000 qemu_rdma_dump_gid("source_resolve_addr", rdma->cm_id);
1001
1002 ret = rdma_get_cm_event(rdma->channel, &cm_event);
1003 if (ret < 0) {
1004 error_setg(errp, "RDMA ERROR: could not perform event_addr_resolved");
1005 goto err_resolve_get_addr;
1006 }
1007
1008 if (cm_event->event != RDMA_CM_EVENT_ADDR_RESOLVED) {
1009 error_setg(errp,
1010 "RDMA ERROR: result not equal to event_addr_resolved %s",
1011 rdma_event_str(cm_event->event));
1012 rdma_ack_cm_event(cm_event);
1013 goto err_resolve_get_addr;
1014 }
1015 rdma_ack_cm_event(cm_event);
1016
1017 /* resolve route */
1018 ret = rdma_resolve_route(rdma->cm_id, RDMA_RESOLVE_TIMEOUT_MS);
1019 if (ret < 0) {
1020 error_setg(errp, "RDMA ERROR: could not resolve rdma route");
1021 goto err_resolve_get_addr;
1022 }
1023
1024 ret = rdma_get_cm_event(rdma->channel, &cm_event);
1025 if (ret < 0) {
1026 error_setg(errp, "RDMA ERROR: could not perform event_route_resolved");
1027 goto err_resolve_get_addr;
1028 }
1029 if (cm_event->event != RDMA_CM_EVENT_ROUTE_RESOLVED) {
1030 error_setg(errp, "RDMA ERROR: "
1031 "result not equal to event_route_resolved: %s",
1032 rdma_event_str(cm_event->event));
1033 rdma_ack_cm_event(cm_event);
1034 goto err_resolve_get_addr;
1035 }
1036 rdma_ack_cm_event(cm_event);
1037 rdma->verbs = rdma->cm_id->verbs;
1038 qemu_rdma_dump_id("source_resolve_host", rdma->cm_id->verbs);
1039 qemu_rdma_dump_gid("source_resolve_host", rdma->cm_id);
1040 return 0;
1041
1042 err_resolve_get_addr:
1043 rdma_destroy_id(rdma->cm_id);
1044 rdma->cm_id = NULL;
1045 err_resolve_create_id:
1046 rdma_destroy_event_channel(rdma->channel);
1047 rdma->channel = NULL;
1048 return -1;
1049 }
1050
1051 /*
1052 * Create protection domain and completion queues
1053 */
1054 static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma, Error **errp)
1055 {
1056 /* allocate pd */
1057 rdma->pd = ibv_alloc_pd(rdma->verbs);
1058 if (!rdma->pd) {
1059 error_setg(errp, "failed to allocate protection domain");
1060 return -1;
1061 }
1062
1063 /* create receive completion channel */
1064 rdma->recv_comp_channel = ibv_create_comp_channel(rdma->verbs);
1065 if (!rdma->recv_comp_channel) {
1066 error_setg(errp, "failed to allocate receive completion channel");
1067 goto err_alloc_pd_cq;
1068 }
1069
1070 /*
1071 * Completion queue can be filled by read work requests.
1072 */
1073 rdma->recv_cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3),
1074 NULL, rdma->recv_comp_channel, 0);
1075 if (!rdma->recv_cq) {
1076 error_setg(errp, "failed to allocate receive completion queue");
1077 goto err_alloc_pd_cq;
1078 }
1079
1080 /* create send completion channel */
1081 rdma->send_comp_channel = ibv_create_comp_channel(rdma->verbs);
1082 if (!rdma->send_comp_channel) {
1083 error_setg(errp, "failed to allocate send completion channel");
1084 goto err_alloc_pd_cq;
1085 }
1086
1087 rdma->send_cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3),
1088 NULL, rdma->send_comp_channel, 0);
1089 if (!rdma->send_cq) {
1090 error_setg(errp, "failed to allocate send completion queue");
1091 goto err_alloc_pd_cq;
1092 }
1093
1094 return 0;
1095
1096 err_alloc_pd_cq:
1097 if (rdma->pd) {
1098 ibv_dealloc_pd(rdma->pd);
1099 }
1100 if (rdma->recv_comp_channel) {
1101 ibv_destroy_comp_channel(rdma->recv_comp_channel);
1102 }
1103 if (rdma->send_comp_channel) {
1104 ibv_destroy_comp_channel(rdma->send_comp_channel);
1105 }
1106 if (rdma->recv_cq) {
1107 ibv_destroy_cq(rdma->recv_cq);
1108 rdma->recv_cq = NULL;
1109 }
1110 rdma->pd = NULL;
1111 rdma->recv_comp_channel = NULL;
1112 rdma->send_comp_channel = NULL;
1113 return -1;
1114
1115 }
1116
1117 /*
1118 * Create queue pairs.
1119 */
1120 static int qemu_rdma_alloc_qp(RDMAContext *rdma)
1121 {
1122 struct ibv_qp_init_attr attr = { 0 };
1123 int ret;
1124
1125 attr.cap.max_send_wr = RDMA_SIGNALED_SEND_MAX;
1126 attr.cap.max_recv_wr = 3;
1127 attr.cap.max_send_sge = 1;
1128 attr.cap.max_recv_sge = 1;
1129 attr.send_cq = rdma->send_cq;
1130 attr.recv_cq = rdma->recv_cq;
1131 attr.qp_type = IBV_QPT_RC;
1132
1133 ret = rdma_create_qp(rdma->cm_id, rdma->pd, &attr);
1134 if (ret < 0) {
1135 return -1;
1136 }
1137
1138 rdma->qp = rdma->cm_id->qp;
1139 return 0;
1140 }
1141
1142 /* Check whether On-Demand Paging is supported by RDAM device */
1143 static bool rdma_support_odp(struct ibv_context *dev)
1144 {
1145 struct ibv_device_attr_ex attr = {0};
1146 int ret = ibv_query_device_ex(dev, NULL, &attr);
1147 if (ret) {
1148 return false;
1149 }
1150
1151 if (attr.odp_caps.general_caps & IBV_ODP_SUPPORT) {
1152 return true;
1153 }
1154
1155 return false;
1156 }
1157
1158 /*
1159 * ibv_advise_mr to avoid RNR NAK error as far as possible.
1160 * The responder mr registering with ODP will sent RNR NAK back to
1161 * the requester in the face of the page fault.
1162 */
1163 static void qemu_rdma_advise_prefetch_mr(struct ibv_pd *pd, uint64_t addr,
1164 uint32_t len, uint32_t lkey,
1165 const char *name, bool wr)
1166 {
1167 #ifdef HAVE_IBV_ADVISE_MR
1168 int ret;
1169 int advice = wr ? IBV_ADVISE_MR_ADVICE_PREFETCH_WRITE :
1170 IBV_ADVISE_MR_ADVICE_PREFETCH;
1171 struct ibv_sge sg_list = {.lkey = lkey, .addr = addr, .length = len};
1172
1173 ret = ibv_advise_mr(pd, advice,
1174 IBV_ADVISE_MR_FLAG_FLUSH, &sg_list, 1);
1175 /* ignore the error */
1176 trace_qemu_rdma_advise_mr(name, len, addr, strerror(ret));
1177 #endif
1178 }
1179
1180 static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma, Error **errp)
1181 {
1182 int i;
1183 RDMALocalBlocks *local = &rdma->local_ram_blocks;
1184
1185 for (i = 0; i < local->nb_blocks; i++) {
1186 int access = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE;
1187
1188 local->block[i].mr =
1189 ibv_reg_mr(rdma->pd,
1190 local->block[i].local_host_addr,
1191 local->block[i].length, access
1192 );
1193 /*
1194 * ibv_reg_mr() is not documented to set errno. If it does,
1195 * it's somebody else's doc bug. If it doesn't, the use of
1196 * errno below is wrong.
1197 * TODO Find out whether ibv_reg_mr() sets errno.
1198 */
1199 if (!local->block[i].mr &&
1200 errno == ENOTSUP && rdma_support_odp(rdma->verbs)) {
1201 access |= IBV_ACCESS_ON_DEMAND;
1202 /* register ODP mr */
1203 local->block[i].mr =
1204 ibv_reg_mr(rdma->pd,
1205 local->block[i].local_host_addr,
1206 local->block[i].length, access);
1207 trace_qemu_rdma_register_odp_mr(local->block[i].block_name);
1208
1209 if (local->block[i].mr) {
1210 qemu_rdma_advise_prefetch_mr(rdma->pd,
1211 (uintptr_t)local->block[i].local_host_addr,
1212 local->block[i].length,
1213 local->block[i].mr->lkey,
1214 local->block[i].block_name,
1215 true);
1216 }
1217 }
1218
1219 if (!local->block[i].mr) {
1220 error_setg_errno(errp, errno,
1221 "Failed to register local dest ram block!");
1222 goto err;
1223 }
1224 rdma->total_registrations++;
1225 }
1226
1227 return 0;
1228
1229 err:
1230 for (i--; i >= 0; i--) {
1231 ibv_dereg_mr(local->block[i].mr);
1232 local->block[i].mr = NULL;
1233 rdma->total_registrations--;
1234 }
1235
1236 return -1;
1237
1238 }
1239
1240 /*
1241 * Find the ram block that corresponds to the page requested to be
1242 * transmitted by QEMU.
1243 *
1244 * Once the block is found, also identify which 'chunk' within that
1245 * block that the page belongs to.
1246 */
1247 static void qemu_rdma_search_ram_block(RDMAContext *rdma,
1248 uintptr_t block_offset,
1249 uint64_t offset,
1250 uint64_t length,
1251 uint64_t *block_index,
1252 uint64_t *chunk_index)
1253 {
1254 uint64_t current_addr = block_offset + offset;
1255 RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap,
1256 (void *) block_offset);
1257 assert(block);
1258 assert(current_addr >= block->offset);
1259 assert((current_addr + length) <= (block->offset + block->length));
1260
1261 *block_index = block->index;
1262 *chunk_index = ram_chunk_index(block->local_host_addr,
1263 block->local_host_addr + (current_addr - block->offset));
1264 }
1265
1266 /*
1267 * Register a chunk with IB. If the chunk was already registered
1268 * previously, then skip.
1269 *
1270 * Also return the keys associated with the registration needed
1271 * to perform the actual RDMA operation.
1272 */
1273 static int qemu_rdma_register_and_get_keys(RDMAContext *rdma,
1274 RDMALocalBlock *block, uintptr_t host_addr,
1275 uint32_t *lkey, uint32_t *rkey, int chunk,
1276 uint8_t *chunk_start, uint8_t *chunk_end)
1277 {
1278 if (block->mr) {
1279 if (lkey) {
1280 *lkey = block->mr->lkey;
1281 }
1282 if (rkey) {
1283 *rkey = block->mr->rkey;
1284 }
1285 return 0;
1286 }
1287
1288 /* allocate memory to store chunk MRs */
1289 if (!block->pmr) {
1290 block->pmr = g_new0(struct ibv_mr *, block->nb_chunks);
1291 }
1292
1293 /*
1294 * If 'rkey', then we're the destination, so grant access to the source.
1295 *
1296 * If 'lkey', then we're the source VM, so grant access only to ourselves.
1297 */
1298 if (!block->pmr[chunk]) {
1299 uint64_t len = chunk_end - chunk_start;
1300 int access = rkey ? IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE :
1301 0;
1302
1303 trace_qemu_rdma_register_and_get_keys(len, chunk_start);
1304
1305 block->pmr[chunk] = ibv_reg_mr(rdma->pd, chunk_start, len, access);
1306 /*
1307 * ibv_reg_mr() is not documented to set errno. If it does,
1308 * it's somebody else's doc bug. If it doesn't, the use of
1309 * errno below is wrong.
1310 * TODO Find out whether ibv_reg_mr() sets errno.
1311 */
1312 if (!block->pmr[chunk] &&
1313 errno == ENOTSUP && rdma_support_odp(rdma->verbs)) {
1314 access |= IBV_ACCESS_ON_DEMAND;
1315 /* register ODP mr */
1316 block->pmr[chunk] = ibv_reg_mr(rdma->pd, chunk_start, len, access);
1317 trace_qemu_rdma_register_odp_mr(block->block_name);
1318
1319 if (block->pmr[chunk]) {
1320 qemu_rdma_advise_prefetch_mr(rdma->pd, (uintptr_t)chunk_start,
1321 len, block->pmr[chunk]->lkey,
1322 block->block_name, rkey);
1323
1324 }
1325 }
1326 }
1327 if (!block->pmr[chunk]) {
1328 return -1;
1329 }
1330 rdma->total_registrations++;
1331
1332 if (lkey) {
1333 *lkey = block->pmr[chunk]->lkey;
1334 }
1335 if (rkey) {
1336 *rkey = block->pmr[chunk]->rkey;
1337 }
1338 return 0;
1339 }
1340
1341 /*
1342 * Register (at connection time) the memory used for control
1343 * channel messages.
1344 */
1345 static int qemu_rdma_reg_control(RDMAContext *rdma, int idx)
1346 {
1347 rdma->wr_data[idx].control_mr = ibv_reg_mr(rdma->pd,
1348 rdma->wr_data[idx].control, RDMA_CONTROL_MAX_BUFFER,
1349 IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
1350 if (rdma->wr_data[idx].control_mr) {
1351 rdma->total_registrations++;
1352 return 0;
1353 }
1354 return -1;
1355 }
1356
1357 /*
1358 * Perform a non-optimized memory unregistration after every transfer
1359 * for demonstration purposes, only if pin-all is not requested.
1360 *
1361 * Potential optimizations:
1362 * 1. Start a new thread to run this function continuously
1363 - for bit clearing
1364 - and for receipt of unregister messages
1365 * 2. Use an LRU.
1366 * 3. Use workload hints.
1367 */
1368 static int qemu_rdma_unregister_waiting(RDMAContext *rdma)
1369 {
1370 Error *err = NULL;
1371
1372 while (rdma->unregistrations[rdma->unregister_current]) {
1373 int ret;
1374 uint64_t wr_id = rdma->unregistrations[rdma->unregister_current];
1375 uint64_t chunk =
1376 (wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
1377 uint64_t index =
1378 (wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
1379 RDMALocalBlock *block =
1380 &(rdma->local_ram_blocks.block[index]);
1381 RDMARegister reg = { .current_index = index };
1382 RDMAControlHeader resp = { .type = RDMA_CONTROL_UNREGISTER_FINISHED,
1383 };
1384 RDMAControlHeader head = { .len = sizeof(RDMARegister),
1385 .type = RDMA_CONTROL_UNREGISTER_REQUEST,
1386 .repeat = 1,
1387 };
1388
1389 trace_qemu_rdma_unregister_waiting_proc(chunk,
1390 rdma->unregister_current);
1391
1392 rdma->unregistrations[rdma->unregister_current] = 0;
1393 rdma->unregister_current++;
1394
1395 if (rdma->unregister_current == RDMA_SIGNALED_SEND_MAX) {
1396 rdma->unregister_current = 0;
1397 }
1398
1399
1400 /*
1401 * Unregistration is speculative (because migration is single-threaded
1402 * and we cannot break the protocol's inifinband message ordering).
1403 * Thus, if the memory is currently being used for transmission,
1404 * then abort the attempt to unregister and try again
1405 * later the next time a completion is received for this memory.
1406 */
1407 clear_bit(chunk, block->unregister_bitmap);
1408
1409 if (test_bit(chunk, block->transit_bitmap)) {
1410 trace_qemu_rdma_unregister_waiting_inflight(chunk);
1411 continue;
1412 }
1413
1414 trace_qemu_rdma_unregister_waiting_send(chunk);
1415
1416 ret = ibv_dereg_mr(block->pmr[chunk]);
1417 block->pmr[chunk] = NULL;
1418 block->remote_keys[chunk] = 0;
1419
1420 if (ret != 0) {
1421 error_report("unregistration chunk failed: %s",
1422 strerror(ret));
1423 return -1;
1424 }
1425 rdma->total_registrations--;
1426
1427 reg.key.chunk = chunk;
1428 register_to_network(rdma, &reg);
1429 ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
1430 &resp, NULL, NULL, &err);
1431 if (ret < 0) {
1432 error_report_err(err);
1433 return -1;
1434 }
1435
1436 trace_qemu_rdma_unregister_waiting_complete(chunk);
1437 }
1438
1439 return 0;
1440 }
1441
1442 static uint64_t qemu_rdma_make_wrid(uint64_t wr_id, uint64_t index,
1443 uint64_t chunk)
1444 {
1445 uint64_t result = wr_id & RDMA_WRID_TYPE_MASK;
1446
1447 result |= (index << RDMA_WRID_BLOCK_SHIFT);
1448 result |= (chunk << RDMA_WRID_CHUNK_SHIFT);
1449
1450 return result;
1451 }
1452
1453 /*
1454 * Consult the connection manager to see a work request
1455 * (of any kind) has completed.
1456 * Return the work request ID that completed.
1457 */
1458 static int qemu_rdma_poll(RDMAContext *rdma, struct ibv_cq *cq,
1459 uint64_t *wr_id_out, uint32_t *byte_len)
1460 {
1461 int ret;
1462 struct ibv_wc wc;
1463 uint64_t wr_id;
1464
1465 ret = ibv_poll_cq(cq, 1, &wc);
1466
1467 if (!ret) {
1468 *wr_id_out = RDMA_WRID_NONE;
1469 return 0;
1470 }
1471
1472 if (ret < 0) {
1473 return -1;
1474 }
1475
1476 wr_id = wc.wr_id & RDMA_WRID_TYPE_MASK;
1477
1478 if (wc.status != IBV_WC_SUCCESS) {
1479 return -1;
1480 }
1481
1482 if (rdma->control_ready_expected &&
1483 (wr_id >= RDMA_WRID_RECV_CONTROL)) {
1484 trace_qemu_rdma_poll_recv(wr_id - RDMA_WRID_RECV_CONTROL, wr_id,
1485 rdma->nb_sent);
1486 rdma->control_ready_expected = 0;
1487 }
1488
1489 if (wr_id == RDMA_WRID_RDMA_WRITE) {
1490 uint64_t chunk =
1491 (wc.wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
1492 uint64_t index =
1493 (wc.wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
1494 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
1495
1496 trace_qemu_rdma_poll_write(wr_id, rdma->nb_sent,
1497 index, chunk, block->local_host_addr,
1498 (void *)(uintptr_t)block->remote_host_addr);
1499
1500 clear_bit(chunk, block->transit_bitmap);
1501
1502 if (rdma->nb_sent > 0) {
1503 rdma->nb_sent--;
1504 }
1505 } else {
1506 trace_qemu_rdma_poll_other(wr_id, rdma->nb_sent);
1507 }
1508
1509 *wr_id_out = wc.wr_id;
1510 if (byte_len) {
1511 *byte_len = wc.byte_len;
1512 }
1513
1514 return 0;
1515 }
1516
1517 /* Wait for activity on the completion channel.
1518 * Returns 0 on success, none-0 on error.
1519 */
1520 static int qemu_rdma_wait_comp_channel(RDMAContext *rdma,
1521 struct ibv_comp_channel *comp_channel)
1522 {
1523 struct rdma_cm_event *cm_event;
1524 int ret;
1525
1526 /*
1527 * Coroutine doesn't start until migration_fd_process_incoming()
1528 * so don't yield unless we know we're running inside of a coroutine.
1529 */
1530 if (rdma->migration_started_on_destination &&
1531 migration_incoming_get_current()->state == MIGRATION_STATUS_ACTIVE) {
1532 yield_until_fd_readable(comp_channel->fd);
1533 } else {
1534 /* This is the source side, we're in a separate thread
1535 * or destination prior to migration_fd_process_incoming()
1536 * after postcopy, the destination also in a separate thread.
1537 * we can't yield; so we have to poll the fd.
1538 * But we need to be able to handle 'cancel' or an error
1539 * without hanging forever.
1540 */
1541 while (!rdma->errored && !rdma->received_error) {
1542 GPollFD pfds[2];
1543 pfds[0].fd = comp_channel->fd;
1544 pfds[0].events = G_IO_IN | G_IO_HUP | G_IO_ERR;
1545 pfds[0].revents = 0;
1546
1547 pfds[1].fd = rdma->channel->fd;
1548 pfds[1].events = G_IO_IN | G_IO_HUP | G_IO_ERR;
1549 pfds[1].revents = 0;
1550
1551 /* 0.1s timeout, should be fine for a 'cancel' */
1552 switch (qemu_poll_ns(pfds, 2, 100 * 1000 * 1000)) {
1553 case 2:
1554 case 1: /* fd active */
1555 if (pfds[0].revents) {
1556 return 0;
1557 }
1558
1559 if (pfds[1].revents) {
1560 ret = rdma_get_cm_event(rdma->channel, &cm_event);
1561 if (ret < 0) {
1562 return -1;
1563 }
1564
1565 if (cm_event->event == RDMA_CM_EVENT_DISCONNECTED ||
1566 cm_event->event == RDMA_CM_EVENT_DEVICE_REMOVAL) {
1567 rdma_ack_cm_event(cm_event);
1568 return -1;
1569 }
1570 rdma_ack_cm_event(cm_event);
1571 }
1572 break;
1573
1574 case 0: /* Timeout, go around again */
1575 break;
1576
1577 default: /* Error of some type -
1578 * I don't trust errno from qemu_poll_ns
1579 */
1580 return -1;
1581 }
1582
1583 if (migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) {
1584 /* Bail out and let the cancellation happen */
1585 return -1;
1586 }
1587 }
1588 }
1589
1590 if (rdma->received_error) {
1591 return -1;
1592 }
1593 return -rdma->errored;
1594 }
1595
1596 static struct ibv_comp_channel *to_channel(RDMAContext *rdma, uint64_t wrid)
1597 {
1598 return wrid < RDMA_WRID_RECV_CONTROL ? rdma->send_comp_channel :
1599 rdma->recv_comp_channel;
1600 }
1601
1602 static struct ibv_cq *to_cq(RDMAContext *rdma, uint64_t wrid)
1603 {
1604 return wrid < RDMA_WRID_RECV_CONTROL ? rdma->send_cq : rdma->recv_cq;
1605 }
1606
1607 /*
1608 * Block until the next work request has completed.
1609 *
1610 * First poll to see if a work request has already completed,
1611 * otherwise block.
1612 *
1613 * If we encounter completed work requests for IDs other than
1614 * the one we're interested in, then that's generally an error.
1615 *
1616 * The only exception is actual RDMA Write completions. These
1617 * completions only need to be recorded, but do not actually
1618 * need further processing.
1619 */
1620 static int qemu_rdma_block_for_wrid(RDMAContext *rdma,
1621 uint64_t wrid_requested,
1622 uint32_t *byte_len)
1623 {
1624 int num_cq_events = 0, ret;
1625 struct ibv_cq *cq;
1626 void *cq_ctx;
1627 uint64_t wr_id = RDMA_WRID_NONE, wr_id_in;
1628 struct ibv_comp_channel *ch = to_channel(rdma, wrid_requested);
1629 struct ibv_cq *poll_cq = to_cq(rdma, wrid_requested);
1630
1631 if (ibv_req_notify_cq(poll_cq, 0)) {
1632 return -1;
1633 }
1634 /* poll cq first */
1635 while (wr_id != wrid_requested) {
1636 ret = qemu_rdma_poll(rdma, poll_cq, &wr_id_in, byte_len);
1637 if (ret < 0) {
1638 return -1;
1639 }
1640
1641 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
1642
1643 if (wr_id == RDMA_WRID_NONE) {
1644 break;
1645 }
1646 if (wr_id != wrid_requested) {
1647 trace_qemu_rdma_block_for_wrid_miss(wrid_requested, wr_id);
1648 }
1649 }
1650
1651 if (wr_id == wrid_requested) {
1652 return 0;
1653 }
1654
1655 while (1) {
1656 ret = qemu_rdma_wait_comp_channel(rdma, ch);
1657 if (ret < 0) {
1658 goto err_block_for_wrid;
1659 }
1660
1661 ret = ibv_get_cq_event(ch, &cq, &cq_ctx);
1662 if (ret < 0) {
1663 goto err_block_for_wrid;
1664 }
1665
1666 num_cq_events++;
1667
1668 if (ibv_req_notify_cq(cq, 0)) {
1669 goto err_block_for_wrid;
1670 }
1671
1672 while (wr_id != wrid_requested) {
1673 ret = qemu_rdma_poll(rdma, poll_cq, &wr_id_in, byte_len);
1674 if (ret < 0) {
1675 goto err_block_for_wrid;
1676 }
1677
1678 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
1679
1680 if (wr_id == RDMA_WRID_NONE) {
1681 break;
1682 }
1683 if (wr_id != wrid_requested) {
1684 trace_qemu_rdma_block_for_wrid_miss(wrid_requested, wr_id);
1685 }
1686 }
1687
1688 if (wr_id == wrid_requested) {
1689 goto success_block_for_wrid;
1690 }
1691 }
1692
1693 success_block_for_wrid:
1694 if (num_cq_events) {
1695 ibv_ack_cq_events(cq, num_cq_events);
1696 }
1697 return 0;
1698
1699 err_block_for_wrid:
1700 if (num_cq_events) {
1701 ibv_ack_cq_events(cq, num_cq_events);
1702 }
1703
1704 rdma->errored = true;
1705 return -1;
1706 }
1707
1708 /*
1709 * Post a SEND message work request for the control channel
1710 * containing some data and block until the post completes.
1711 */
1712 static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf,
1713 RDMAControlHeader *head,
1714 Error **errp)
1715 {
1716 int ret;
1717 RDMAWorkRequestData *wr = &rdma->wr_data[RDMA_WRID_CONTROL];
1718 struct ibv_send_wr *bad_wr;
1719 struct ibv_sge sge = {
1720 .addr = (uintptr_t)(wr->control),
1721 .length = head->len + sizeof(RDMAControlHeader),
1722 .lkey = wr->control_mr->lkey,
1723 };
1724 struct ibv_send_wr send_wr = {
1725 .wr_id = RDMA_WRID_SEND_CONTROL,
1726 .opcode = IBV_WR_SEND,
1727 .send_flags = IBV_SEND_SIGNALED,
1728 .sg_list = &sge,
1729 .num_sge = 1,
1730 };
1731
1732 trace_qemu_rdma_post_send_control(control_desc(head->type));
1733
1734 /*
1735 * We don't actually need to do a memcpy() in here if we used
1736 * the "sge" properly, but since we're only sending control messages
1737 * (not RAM in a performance-critical path), then its OK for now.
1738 *
1739 * The copy makes the RDMAControlHeader simpler to manipulate
1740 * for the time being.
1741 */
1742 assert(head->len <= RDMA_CONTROL_MAX_BUFFER - sizeof(*head));
1743 memcpy(wr->control, head, sizeof(RDMAControlHeader));
1744 control_to_network((void *) wr->control);
1745
1746 if (buf) {
1747 memcpy(wr->control + sizeof(RDMAControlHeader), buf, head->len);
1748 }
1749
1750
1751 ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
1752
1753 if (ret > 0) {
1754 error_setg(errp, "Failed to use post IB SEND for control");
1755 return -1;
1756 }
1757
1758 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_SEND_CONTROL, NULL);
1759 if (ret < 0) {
1760 error_setg(errp, "rdma migration: send polling control error");
1761 return -1;
1762 }
1763
1764 return 0;
1765 }
1766
1767 /*
1768 * Post a RECV work request in anticipation of some future receipt
1769 * of data on the control channel.
1770 */
1771 static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx,
1772 Error **errp)
1773 {
1774 struct ibv_recv_wr *bad_wr;
1775 struct ibv_sge sge = {
1776 .addr = (uintptr_t)(rdma->wr_data[idx].control),
1777 .length = RDMA_CONTROL_MAX_BUFFER,
1778 .lkey = rdma->wr_data[idx].control_mr->lkey,
1779 };
1780
1781 struct ibv_recv_wr recv_wr = {
1782 .wr_id = RDMA_WRID_RECV_CONTROL + idx,
1783 .sg_list = &sge,
1784 .num_sge = 1,
1785 };
1786
1787
1788 if (ibv_post_recv(rdma->qp, &recv_wr, &bad_wr)) {
1789 error_setg(errp, "error posting control recv");
1790 return -1;
1791 }
1792
1793 return 0;
1794 }
1795
1796 /*
1797 * Block and wait for a RECV control channel message to arrive.
1798 */
1799 static int qemu_rdma_exchange_get_response(RDMAContext *rdma,
1800 RDMAControlHeader *head, uint32_t expecting, int idx,
1801 Error **errp)
1802 {
1803 uint32_t byte_len;
1804 int ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RECV_CONTROL + idx,
1805 &byte_len);
1806
1807 if (ret < 0) {
1808 error_setg(errp, "rdma migration: recv polling control error!");
1809 return -1;
1810 }
1811
1812 network_to_control((void *) rdma->wr_data[idx].control);
1813 memcpy(head, rdma->wr_data[idx].control, sizeof(RDMAControlHeader));
1814
1815 trace_qemu_rdma_exchange_get_response_start(control_desc(expecting));
1816
1817 if (expecting == RDMA_CONTROL_NONE) {
1818 trace_qemu_rdma_exchange_get_response_none(control_desc(head->type),
1819 head->type);
1820 } else if (head->type != expecting || head->type == RDMA_CONTROL_ERROR) {
1821 error_setg(errp, "Was expecting a %s (%d) control message"
1822 ", but got: %s (%d), length: %d",
1823 control_desc(expecting), expecting,
1824 control_desc(head->type), head->type, head->len);
1825 if (head->type == RDMA_CONTROL_ERROR) {
1826 rdma->received_error = true;
1827 }
1828 return -1;
1829 }
1830 if (head->len > RDMA_CONTROL_MAX_BUFFER - sizeof(*head)) {
1831 error_setg(errp, "too long length: %d", head->len);
1832 return -1;
1833 }
1834 if (sizeof(*head) + head->len != byte_len) {
1835 error_setg(errp, "Malformed length: %d byte_len %d",
1836 head->len, byte_len);
1837 return -1;
1838 }
1839
1840 return 0;
1841 }
1842
1843 /*
1844 * When a RECV work request has completed, the work request's
1845 * buffer is pointed at the header.
1846 *
1847 * This will advance the pointer to the data portion
1848 * of the control message of the work request's buffer that
1849 * was populated after the work request finished.
1850 */
1851 static void qemu_rdma_move_header(RDMAContext *rdma, int idx,
1852 RDMAControlHeader *head)
1853 {
1854 rdma->wr_data[idx].control_len = head->len;
1855 rdma->wr_data[idx].control_curr =
1856 rdma->wr_data[idx].control + sizeof(RDMAControlHeader);
1857 }
1858
1859 /*
1860 * This is an 'atomic' high-level operation to deliver a single, unified
1861 * control-channel message.
1862 *
1863 * Additionally, if the user is expecting some kind of reply to this message,
1864 * they can request a 'resp' response message be filled in by posting an
1865 * additional work request on behalf of the user and waiting for an additional
1866 * completion.
1867 *
1868 * The extra (optional) response is used during registration to us from having
1869 * to perform an *additional* exchange of message just to provide a response by
1870 * instead piggy-backing on the acknowledgement.
1871 */
1872 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
1873 uint8_t *data, RDMAControlHeader *resp,
1874 int *resp_idx,
1875 int (*callback)(RDMAContext *rdma,
1876 Error **errp),
1877 Error **errp)
1878 {
1879 int ret;
1880
1881 /*
1882 * Wait until the dest is ready before attempting to deliver the message
1883 * by waiting for a READY message.
1884 */
1885 if (rdma->control_ready_expected) {
1886 RDMAControlHeader resp_ignored;
1887
1888 ret = qemu_rdma_exchange_get_response(rdma, &resp_ignored,
1889 RDMA_CONTROL_READY,
1890 RDMA_WRID_READY, errp);
1891 if (ret < 0) {
1892 return -1;
1893 }
1894 }
1895
1896 /*
1897 * If the user is expecting a response, post a WR in anticipation of it.
1898 */
1899 if (resp) {
1900 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_DATA, errp);
1901 if (ret < 0) {
1902 return -1;
1903 }
1904 }
1905
1906 /*
1907 * Post a WR to replace the one we just consumed for the READY message.
1908 */
1909 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY, errp);
1910 if (ret < 0) {
1911 return -1;
1912 }
1913
1914 /*
1915 * Deliver the control message that was requested.
1916 */
1917 ret = qemu_rdma_post_send_control(rdma, data, head, errp);
1918
1919 if (ret < 0) {
1920 return -1;
1921 }
1922
1923 /*
1924 * If we're expecting a response, block and wait for it.
1925 */
1926 if (resp) {
1927 if (callback) {
1928 trace_qemu_rdma_exchange_send_issue_callback();
1929 ret = callback(rdma, errp);
1930 if (ret < 0) {
1931 return -1;
1932 }
1933 }
1934
1935 trace_qemu_rdma_exchange_send_waiting(control_desc(resp->type));
1936 ret = qemu_rdma_exchange_get_response(rdma, resp,
1937 resp->type, RDMA_WRID_DATA,
1938 errp);
1939
1940 if (ret < 0) {
1941 return -1;
1942 }
1943
1944 qemu_rdma_move_header(rdma, RDMA_WRID_DATA, resp);
1945 if (resp_idx) {
1946 *resp_idx = RDMA_WRID_DATA;
1947 }
1948 trace_qemu_rdma_exchange_send_received(control_desc(resp->type));
1949 }
1950
1951 rdma->control_ready_expected = 1;
1952
1953 return 0;
1954 }
1955
1956 /*
1957 * This is an 'atomic' high-level operation to receive a single, unified
1958 * control-channel message.
1959 */
1960 static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader *head,
1961 uint32_t expecting, Error **errp)
1962 {
1963 RDMAControlHeader ready = {
1964 .len = 0,
1965 .type = RDMA_CONTROL_READY,
1966 .repeat = 1,
1967 };
1968 int ret;
1969
1970 /*
1971 * Inform the source that we're ready to receive a message.
1972 */
1973 ret = qemu_rdma_post_send_control(rdma, NULL, &ready, errp);
1974
1975 if (ret < 0) {
1976 return -1;
1977 }
1978
1979 /*
1980 * Block and wait for the message.
1981 */
1982 ret = qemu_rdma_exchange_get_response(rdma, head,
1983 expecting, RDMA_WRID_READY, errp);
1984
1985 if (ret < 0) {
1986 return -1;
1987 }
1988
1989 qemu_rdma_move_header(rdma, RDMA_WRID_READY, head);
1990
1991 /*
1992 * Post a new RECV work request to replace the one we just consumed.
1993 */
1994 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY, errp);
1995 if (ret < 0) {
1996 return -1;
1997 }
1998
1999 return 0;
2000 }
2001
2002 /*
2003 * Write an actual chunk of memory using RDMA.
2004 *
2005 * If we're using dynamic registration on the dest-side, we have to
2006 * send a registration command first.
2007 */
2008 static int qemu_rdma_write_one(RDMAContext *rdma,
2009 int current_index, uint64_t current_addr,
2010 uint64_t length, Error **errp)
2011 {
2012 struct ibv_sge sge;
2013 struct ibv_send_wr send_wr = { 0 };
2014 struct ibv_send_wr *bad_wr;
2015 int reg_result_idx, ret, count = 0;
2016 uint64_t chunk, chunks;
2017 uint8_t *chunk_start, *chunk_end;
2018 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[current_index]);
2019 RDMARegister reg;
2020 RDMARegisterResult *reg_result;
2021 RDMAControlHeader resp = { .type = RDMA_CONTROL_REGISTER_RESULT };
2022 RDMAControlHeader head = { .len = sizeof(RDMARegister),
2023 .type = RDMA_CONTROL_REGISTER_REQUEST,
2024 .repeat = 1,
2025 };
2026
2027 retry:
2028 sge.addr = (uintptr_t)(block->local_host_addr +
2029 (current_addr - block->offset));
2030 sge.length = length;
2031
2032 chunk = ram_chunk_index(block->local_host_addr,
2033 (uint8_t *)(uintptr_t)sge.addr);
2034 chunk_start = ram_chunk_start(block, chunk);
2035
2036 if (block->is_ram_block) {
2037 chunks = length / (1UL << RDMA_REG_CHUNK_SHIFT);
2038
2039 if (chunks && ((length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
2040 chunks--;
2041 }
2042 } else {
2043 chunks = block->length / (1UL << RDMA_REG_CHUNK_SHIFT);
2044
2045 if (chunks && ((block->length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
2046 chunks--;
2047 }
2048 }
2049
2050 trace_qemu_rdma_write_one_top(chunks + 1,
2051 (chunks + 1) *
2052 (1UL << RDMA_REG_CHUNK_SHIFT) / 1024 / 1024);
2053
2054 chunk_end = ram_chunk_end(block, chunk + chunks);
2055
2056
2057 while (test_bit(chunk, block->transit_bitmap)) {
2058 (void)count;
2059 trace_qemu_rdma_write_one_block(count++, current_index, chunk,
2060 sge.addr, length, rdma->nb_sent, block->nb_chunks);
2061
2062 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
2063
2064 if (ret < 0) {
2065 error_setg(errp, "Failed to Wait for previous write to complete "
2066 "block %d chunk %" PRIu64
2067 " current %" PRIu64 " len %" PRIu64 " %d",
2068 current_index, chunk, sge.addr, length, rdma->nb_sent);
2069 return -1;
2070 }
2071 }
2072
2073 if (!rdma->pin_all || !block->is_ram_block) {
2074 if (!block->remote_keys[chunk]) {
2075 /*
2076 * This chunk has not yet been registered, so first check to see
2077 * if the entire chunk is zero. If so, tell the other size to
2078 * memset() + madvise() the entire chunk without RDMA.
2079 */
2080
2081 if (buffer_is_zero((void *)(uintptr_t)sge.addr, length)) {
2082 RDMACompress comp = {
2083 .offset = current_addr,
2084 .value = 0,
2085 .block_idx = current_index,
2086 .length = length,
2087 };
2088
2089 head.len = sizeof(comp);
2090 head.type = RDMA_CONTROL_COMPRESS;
2091
2092 trace_qemu_rdma_write_one_zero(chunk, sge.length,
2093 current_index, current_addr);
2094
2095 compress_to_network(rdma, &comp);
2096 ret = qemu_rdma_exchange_send(rdma, &head,
2097 (uint8_t *) &comp, NULL, NULL, NULL, errp);
2098
2099 if (ret < 0) {
2100 return -1;
2101 }
2102
2103 /*
2104 * TODO: Here we are sending something, but we are not
2105 * accounting for anything transferred. The following is wrong:
2106 *
2107 * stat64_add(&mig_stats.rdma_bytes, sge.length);
2108 *
2109 * because we are using some kind of compression. I
2110 * would think that head.len would be the more similar
2111 * thing to a correct value.
2112 */
2113 stat64_add(&mig_stats.zero_pages,
2114 sge.length / qemu_target_page_size());
2115 return 1;
2116 }
2117
2118 /*
2119 * Otherwise, tell other side to register.
2120 */
2121 reg.current_index = current_index;
2122 if (block->is_ram_block) {
2123 reg.key.current_addr = current_addr;
2124 } else {
2125 reg.key.chunk = chunk;
2126 }
2127 reg.chunks = chunks;
2128
2129 trace_qemu_rdma_write_one_sendreg(chunk, sge.length, current_index,
2130 current_addr);
2131
2132 register_to_network(rdma, &reg);
2133 ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
2134 &resp, &reg_result_idx, NULL, errp);
2135 if (ret < 0) {
2136 return -1;
2137 }
2138
2139 /* try to overlap this single registration with the one we sent. */
2140 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
2141 &sge.lkey, NULL, chunk,
2142 chunk_start, chunk_end)) {
2143 error_setg(errp, "cannot get lkey");
2144 return -1;
2145 }
2146
2147 reg_result = (RDMARegisterResult *)
2148 rdma->wr_data[reg_result_idx].control_curr;
2149
2150 network_to_result(reg_result);
2151
2152 trace_qemu_rdma_write_one_recvregres(block->remote_keys[chunk],
2153 reg_result->rkey, chunk);
2154
2155 block->remote_keys[chunk] = reg_result->rkey;
2156 block->remote_host_addr = reg_result->host_addr;
2157 } else {
2158 /* already registered before */
2159 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
2160 &sge.lkey, NULL, chunk,
2161 chunk_start, chunk_end)) {
2162 error_setg(errp, "cannot get lkey!");
2163 return -1;
2164 }
2165 }
2166
2167 send_wr.wr.rdma.rkey = block->remote_keys[chunk];
2168 } else {
2169 send_wr.wr.rdma.rkey = block->remote_rkey;
2170
2171 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
2172 &sge.lkey, NULL, chunk,
2173 chunk_start, chunk_end)) {
2174 error_setg(errp, "cannot get lkey!");
2175 return -1;
2176 }
2177 }
2178
2179 /*
2180 * Encode the ram block index and chunk within this wrid.
2181 * We will use this information at the time of completion
2182 * to figure out which bitmap to check against and then which
2183 * chunk in the bitmap to look for.
2184 */
2185 send_wr.wr_id = qemu_rdma_make_wrid(RDMA_WRID_RDMA_WRITE,
2186 current_index, chunk);
2187
2188 send_wr.opcode = IBV_WR_RDMA_WRITE;
2189 send_wr.send_flags = IBV_SEND_SIGNALED;
2190 send_wr.sg_list = &sge;
2191 send_wr.num_sge = 1;
2192 send_wr.wr.rdma.remote_addr = block->remote_host_addr +
2193 (current_addr - block->offset);
2194
2195 trace_qemu_rdma_write_one_post(chunk, sge.addr, send_wr.wr.rdma.remote_addr,
2196 sge.length);
2197
2198 /*
2199 * ibv_post_send() does not return negative error numbers,
2200 * per the specification they are positive - no idea why.
2201 */
2202 ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
2203
2204 if (ret == ENOMEM) {
2205 trace_qemu_rdma_write_one_queue_full();
2206 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
2207 if (ret < 0) {
2208 error_setg(errp, "rdma migration: failed to make "
2209 "room in full send queue!");
2210 return -1;
2211 }
2212
2213 goto retry;
2214
2215 } else if (ret > 0) {
2216 error_setg_errno(errp, ret,
2217 "rdma migration: post rdma write failed");
2218 return -1;
2219 }
2220
2221 set_bit(chunk, block->transit_bitmap);
2222 stat64_add(&mig_stats.normal_pages, sge.length / qemu_target_page_size());
2223 /*
2224 * We are adding to transferred the amount of data written, but no
2225 * overhead at all. I will asume that RDMA is magicaly and don't
2226 * need to transfer (at least) the addresses where it wants to
2227 * write the pages. Here it looks like it should be something
2228 * like:
2229 * sizeof(send_wr) + sge.length
2230 * but this being RDMA, who knows.
2231 */
2232 stat64_add(&mig_stats.rdma_bytes, sge.length);
2233 ram_transferred_add(sge.length);
2234 rdma->total_writes++;
2235
2236 return 0;
2237 }
2238
2239 /*
2240 * Push out any unwritten RDMA operations.
2241 *
2242 * We support sending out multiple chunks at the same time.
2243 * Not all of them need to get signaled in the completion queue.
2244 */
2245 static int qemu_rdma_write_flush(RDMAContext *rdma, Error **errp)
2246 {
2247 int ret;
2248
2249 if (!rdma->current_length) {
2250 return 0;
2251 }
2252
2253 ret = qemu_rdma_write_one(rdma, rdma->current_index, rdma->current_addr,
2254 rdma->current_length, errp);
2255
2256 if (ret < 0) {
2257 return -1;
2258 }
2259
2260 if (ret == 0) {
2261 rdma->nb_sent++;
2262 trace_qemu_rdma_write_flush(rdma->nb_sent);
2263 }
2264
2265 rdma->current_length = 0;
2266 rdma->current_addr = 0;
2267
2268 return 0;
2269 }
2270
2271 static inline bool qemu_rdma_buffer_mergeable(RDMAContext *rdma,
2272 uint64_t offset, uint64_t len)
2273 {
2274 RDMALocalBlock *block;
2275 uint8_t *host_addr;
2276 uint8_t *chunk_end;
2277
2278 if (rdma->current_index < 0) {
2279 return false;
2280 }
2281
2282 if (rdma->current_chunk < 0) {
2283 return false;
2284 }
2285
2286 block = &(rdma->local_ram_blocks.block[rdma->current_index]);
2287 host_addr = block->local_host_addr + (offset - block->offset);
2288 chunk_end = ram_chunk_end(block, rdma->current_chunk);
2289
2290 if (rdma->current_length == 0) {
2291 return false;
2292 }
2293
2294 /*
2295 * Only merge into chunk sequentially.
2296 */
2297 if (offset != (rdma->current_addr + rdma->current_length)) {
2298 return false;
2299 }
2300
2301 if (offset < block->offset) {
2302 return false;
2303 }
2304
2305 if ((offset + len) > (block->offset + block->length)) {
2306 return false;
2307 }
2308
2309 if ((host_addr + len) > chunk_end) {
2310 return false;
2311 }
2312
2313 return true;
2314 }
2315
2316 /*
2317 * We're not actually writing here, but doing three things:
2318 *
2319 * 1. Identify the chunk the buffer belongs to.
2320 * 2. If the chunk is full or the buffer doesn't belong to the current
2321 * chunk, then start a new chunk and flush() the old chunk.
2322 * 3. To keep the hardware busy, we also group chunks into batches
2323 * and only require that a batch gets acknowledged in the completion
2324 * queue instead of each individual chunk.
2325 */
2326 static int qemu_rdma_write(RDMAContext *rdma,
2327 uint64_t block_offset, uint64_t offset,
2328 uint64_t len, Error **errp)
2329 {
2330 uint64_t current_addr = block_offset + offset;
2331 uint64_t index = rdma->current_index;
2332 uint64_t chunk = rdma->current_chunk;
2333 int ret;
2334
2335 /* If we cannot merge it, we flush the current buffer first. */
2336 if (!qemu_rdma_buffer_mergeable(rdma, current_addr, len)) {
2337 ret = qemu_rdma_write_flush(rdma, errp);
2338 if (ret < 0) {
2339 return -1;
2340 }
2341 rdma->current_length = 0;
2342 rdma->current_addr = current_addr;
2343
2344 qemu_rdma_search_ram_block(rdma, block_offset,
2345 offset, len, &index, &chunk);
2346 rdma->current_index = index;
2347 rdma->current_chunk = chunk;
2348 }
2349
2350 /* merge it */
2351 rdma->current_length += len;
2352
2353 /* flush it if buffer is too large */
2354 if (rdma->current_length >= RDMA_MERGE_MAX) {
2355 return qemu_rdma_write_flush(rdma, errp);
2356 }
2357
2358 return 0;
2359 }
2360
2361 static void qemu_rdma_cleanup(RDMAContext *rdma)
2362 {
2363 Error *err = NULL;
2364 int idx;
2365
2366 if (rdma->cm_id && rdma->connected) {
2367 if ((rdma->errored ||
2368 migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) &&
2369 !rdma->received_error) {
2370 RDMAControlHeader head = { .len = 0,
2371 .type = RDMA_CONTROL_ERROR,
2372 .repeat = 1,
2373 };
2374 warn_report("Early error. Sending error.");
2375 if (qemu_rdma_post_send_control(rdma, NULL, &head, &err) < 0) {
2376 warn_report_err(err);
2377 }
2378 }
2379
2380 rdma_disconnect(rdma->cm_id);
2381 trace_qemu_rdma_cleanup_disconnect();
2382 rdma->connected = false;
2383 }
2384
2385 if (rdma->channel) {
2386 qemu_set_fd_handler(rdma->channel->fd, NULL, NULL, NULL);
2387 }
2388 g_free(rdma->dest_blocks);
2389 rdma->dest_blocks = NULL;
2390
2391 for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2392 if (rdma->wr_data[idx].control_mr) {
2393 rdma->total_registrations--;
2394 ibv_dereg_mr(rdma->wr_data[idx].control_mr);
2395 }
2396 rdma->wr_data[idx].control_mr = NULL;
2397 }
2398
2399 if (rdma->local_ram_blocks.block) {
2400 while (rdma->local_ram_blocks.nb_blocks) {
2401 rdma_delete_block(rdma, &rdma->local_ram_blocks.block[0]);
2402 }
2403 }
2404
2405 if (rdma->qp) {
2406 rdma_destroy_qp(rdma->cm_id);
2407 rdma->qp = NULL;
2408 }
2409 if (rdma->recv_cq) {
2410 ibv_destroy_cq(rdma->recv_cq);
2411 rdma->recv_cq = NULL;
2412 }
2413 if (rdma->send_cq) {
2414 ibv_destroy_cq(rdma->send_cq);
2415 rdma->send_cq = NULL;
2416 }
2417 if (rdma->recv_comp_channel) {
2418 ibv_destroy_comp_channel(rdma->recv_comp_channel);
2419 rdma->recv_comp_channel = NULL;
2420 }
2421 if (rdma->send_comp_channel) {
2422 ibv_destroy_comp_channel(rdma->send_comp_channel);
2423 rdma->send_comp_channel = NULL;
2424 }
2425 if (rdma->pd) {
2426 ibv_dealloc_pd(rdma->pd);
2427 rdma->pd = NULL;
2428 }
2429 if (rdma->cm_id) {
2430 rdma_destroy_id(rdma->cm_id);
2431 rdma->cm_id = NULL;
2432 }
2433
2434 /* the destination side, listen_id and channel is shared */
2435 if (rdma->listen_id) {
2436 if (!rdma->is_return_path) {
2437 rdma_destroy_id(rdma->listen_id);
2438 }
2439 rdma->listen_id = NULL;
2440
2441 if (rdma->channel) {
2442 if (!rdma->is_return_path) {
2443 rdma_destroy_event_channel(rdma->channel);
2444 }
2445 rdma->channel = NULL;
2446 }
2447 }
2448
2449 if (rdma->channel) {
2450 rdma_destroy_event_channel(rdma->channel);
2451 rdma->channel = NULL;
2452 }
2453 g_free(rdma->host);
2454 g_free(rdma->host_port);
2455 rdma->host = NULL;
2456 rdma->host_port = NULL;
2457 }
2458
2459
2460 static int qemu_rdma_source_init(RDMAContext *rdma, bool pin_all, Error **errp)
2461 {
2462 int ret, idx;
2463
2464 /*
2465 * Will be validated against destination's actual capabilities
2466 * after the connect() completes.
2467 */
2468 rdma->pin_all = pin_all;
2469
2470 ret = qemu_rdma_resolve_host(rdma, errp);
2471 if (ret < 0) {
2472 goto err_rdma_source_init;
2473 }
2474
2475 ret = qemu_rdma_alloc_pd_cq(rdma, errp);
2476 if (ret < 0) {
2477 goto err_rdma_source_init;
2478 }
2479
2480 ret = qemu_rdma_alloc_qp(rdma);
2481 if (ret < 0) {
2482 error_setg(errp, "RDMA ERROR: rdma migration: error allocating qp!");
2483 goto err_rdma_source_init;
2484 }
2485
2486 qemu_rdma_init_ram_blocks(rdma);
2487
2488 /* Build the hash that maps from offset to RAMBlock */
2489 rdma->blockmap = g_hash_table_new(g_direct_hash, g_direct_equal);
2490 for (idx = 0; idx < rdma->local_ram_blocks.nb_blocks; idx++) {
2491 g_hash_table_insert(rdma->blockmap,
2492 (void *)(uintptr_t)rdma->local_ram_blocks.block[idx].offset,
2493 &rdma->local_ram_blocks.block[idx]);
2494 }
2495
2496 for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2497 ret = qemu_rdma_reg_control(rdma, idx);
2498 if (ret < 0) {
2499 error_setg(errp,
2500 "RDMA ERROR: rdma migration: error registering %d control!",
2501 idx);
2502 goto err_rdma_source_init;
2503 }
2504 }
2505
2506 return 0;
2507
2508 err_rdma_source_init:
2509 qemu_rdma_cleanup(rdma);
2510 return -1;
2511 }
2512
2513 static int qemu_get_cm_event_timeout(RDMAContext *rdma,
2514 struct rdma_cm_event **cm_event,
2515 long msec, Error **errp)
2516 {
2517 int ret;
2518 struct pollfd poll_fd = {
2519 .fd = rdma->channel->fd,
2520 .events = POLLIN,
2521 .revents = 0
2522 };
2523
2524 do {
2525 ret = poll(&poll_fd, 1, msec);
2526 } while (ret < 0 && errno == EINTR);
2527
2528 if (ret == 0) {
2529 error_setg(errp, "RDMA ERROR: poll cm event timeout");
2530 return -1;
2531 } else if (ret < 0) {
2532 error_setg(errp, "RDMA ERROR: failed to poll cm event, errno=%i",
2533 errno);
2534 return -1;
2535 } else if (poll_fd.revents & POLLIN) {
2536 if (rdma_get_cm_event(rdma->channel, cm_event) < 0) {
2537 error_setg(errp, "RDMA ERROR: failed to get cm event");
2538 return -1;
2539 }
2540 return 0;
2541 } else {
2542 error_setg(errp, "RDMA ERROR: no POLLIN event, revent=%x",
2543 poll_fd.revents);
2544 return -1;
2545 }
2546 }
2547
2548 static int qemu_rdma_connect(RDMAContext *rdma, bool return_path,
2549 Error **errp)
2550 {
2551 RDMACapabilities cap = {
2552 .version = RDMA_CONTROL_VERSION_CURRENT,
2553 .flags = 0,
2554 };
2555 struct rdma_conn_param conn_param = { .initiator_depth = 2,
2556 .retry_count = 5,
2557 .private_data = &cap,
2558 .private_data_len = sizeof(cap),
2559 };
2560 struct rdma_cm_event *cm_event;
2561 int ret;
2562
2563 /*
2564 * Only negotiate the capability with destination if the user
2565 * on the source first requested the capability.
2566 */
2567 if (rdma->pin_all) {
2568 trace_qemu_rdma_connect_pin_all_requested();
2569 cap.flags |= RDMA_CAPABILITY_PIN_ALL;
2570 }
2571
2572 caps_to_network(&cap);
2573
2574 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY, errp);
2575 if (ret < 0) {
2576 goto err_rdma_source_connect;
2577 }
2578
2579 ret = rdma_connect(rdma->cm_id, &conn_param);
2580 if (ret < 0) {
2581 error_setg_errno(errp, errno,
2582 "RDMA ERROR: connecting to destination!");
2583 goto err_rdma_source_connect;
2584 }
2585
2586 if (return_path) {
2587 ret = qemu_get_cm_event_timeout(rdma, &cm_event, 5000, errp);
2588 } else {
2589 ret = rdma_get_cm_event(rdma->channel, &cm_event);
2590 if (ret < 0) {
2591 error_setg_errno(errp, errno,
2592 "RDMA ERROR: failed to get cm event");
2593 }
2594 }
2595 if (ret < 0) {
2596 goto err_rdma_source_connect;
2597 }
2598
2599 if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
2600 error_setg(errp, "RDMA ERROR: connecting to destination!");
2601 rdma_ack_cm_event(cm_event);
2602 goto err_rdma_source_connect;
2603 }
2604 rdma->connected = true;
2605
2606 memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
2607 network_to_caps(&cap);
2608
2609 /*
2610 * Verify that the *requested* capabilities are supported by the destination
2611 * and disable them otherwise.
2612 */
2613 if (rdma->pin_all && !(cap.flags & RDMA_CAPABILITY_PIN_ALL)) {
2614 warn_report("RDMA: Server cannot support pinning all memory. "
2615 "Will register memory dynamically.");
2616 rdma->pin_all = false;
2617 }
2618
2619 trace_qemu_rdma_connect_pin_all_outcome(rdma->pin_all);
2620
2621 rdma_ack_cm_event(cm_event);
2622
2623 rdma->control_ready_expected = 1;
2624 rdma->nb_sent = 0;
2625 return 0;
2626
2627 err_rdma_source_connect:
2628 qemu_rdma_cleanup(rdma);
2629 return -1;
2630 }
2631
2632 static int qemu_rdma_dest_init(RDMAContext *rdma, Error **errp)
2633 {
2634 Error *err = NULL;
2635 int ret, idx;
2636 struct rdma_cm_id *listen_id;
2637 char ip[40] = "unknown";
2638 struct rdma_addrinfo *res, *e;
2639 char port_str[16];
2640 int reuse = 1;
2641
2642 for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2643 rdma->wr_data[idx].control_len = 0;
2644 rdma->wr_data[idx].control_curr = NULL;
2645 }
2646
2647 if (!rdma->host || !rdma->host[0]) {
2648 error_setg(errp, "RDMA ERROR: RDMA host is not set!");
2649 rdma->errored = true;
2650 return -1;
2651 }
2652 /* create CM channel */
2653 rdma->channel = rdma_create_event_channel();
2654 if (!rdma->channel) {
2655 error_setg(errp, "RDMA ERROR: could not create rdma event channel");
2656 rdma->errored = true;
2657 return -1;
2658 }
2659
2660 /* create CM id */
2661 ret = rdma_create_id(rdma->channel, &listen_id, NULL, RDMA_PS_TCP);
2662 if (ret < 0) {
2663 error_setg(errp, "RDMA ERROR: could not create cm_id!");
2664 goto err_dest_init_create_listen_id;
2665 }
2666
2667 snprintf(port_str, 16, "%d", rdma->port);
2668 port_str[15] = '\0';
2669
2670 ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res);
2671 if (ret) {
2672 error_setg(errp, "RDMA ERROR: could not rdma_getaddrinfo address %s",
2673 rdma->host);
2674 goto err_dest_init_bind_addr;
2675 }
2676
2677 ret = rdma_set_option(listen_id, RDMA_OPTION_ID, RDMA_OPTION_ID_REUSEADDR,
2678 &reuse, sizeof reuse);
2679 if (ret < 0) {
2680 error_setg(errp, "RDMA ERROR: Error: could not set REUSEADDR option");
2681 goto err_dest_init_bind_addr;
2682 }
2683
2684 /* Try all addresses, saving the first error in @err */
2685 for (e = res; e != NULL; e = e->ai_next) {
2686 Error **local_errp = err ? NULL : &err;
2687
2688 inet_ntop(e->ai_family,
2689 &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip);
2690 trace_qemu_rdma_dest_init_trying(rdma->host, ip);
2691 ret = rdma_bind_addr(listen_id, e->ai_dst_addr);
2692 if (ret < 0) {
2693 continue;
2694 }
2695 if (e->ai_family == AF_INET6) {
2696 ret = qemu_rdma_broken_ipv6_kernel(listen_id->verbs,
2697 local_errp);
2698 if (ret < 0) {
2699 continue;
2700 }
2701 }
2702 error_free(err);
2703 break;
2704 }
2705
2706 rdma_freeaddrinfo(res);
2707 if (!e) {
2708 if (err) {
2709 error_propagate(errp, err);
2710 } else {
2711 error_setg(errp, "RDMA ERROR: Error: could not rdma_bind_addr!");
2712 }
2713 goto err_dest_init_bind_addr;
2714 }
2715
2716 rdma->listen_id = listen_id;
2717 qemu_rdma_dump_gid("dest_init", listen_id);
2718 return 0;
2719
2720 err_dest_init_bind_addr:
2721 rdma_destroy_id(listen_id);
2722 err_dest_init_create_listen_id:
2723 rdma_destroy_event_channel(rdma->channel);
2724 rdma->channel = NULL;
2725 rdma->errored = true;
2726 return -1;
2727
2728 }
2729
2730 static void qemu_rdma_return_path_dest_init(RDMAContext *rdma_return_path,
2731 RDMAContext *rdma)
2732 {
2733 int idx;
2734
2735 for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2736 rdma_return_path->wr_data[idx].control_len = 0;
2737 rdma_return_path->wr_data[idx].control_curr = NULL;
2738 }
2739
2740 /*the CM channel and CM id is shared*/
2741 rdma_return_path->channel = rdma->channel;
2742 rdma_return_path->listen_id = rdma->listen_id;
2743
2744 rdma->return_path = rdma_return_path;
2745 rdma_return_path->return_path = rdma;
2746 rdma_return_path->is_return_path = true;
2747 }
2748
2749 static RDMAContext *qemu_rdma_data_init(const char *host_port, Error **errp)
2750 {
2751 RDMAContext *rdma = NULL;
2752 InetSocketAddress *addr;
2753
2754 rdma = g_new0(RDMAContext, 1);
2755 rdma->current_index = -1;
2756 rdma->current_chunk = -1;
2757
2758 addr = g_new(InetSocketAddress, 1);
2759 if (!inet_parse(addr, host_port, NULL)) {
2760 rdma->port = atoi(addr->port);
2761 rdma->host = g_strdup(addr->host);
2762 rdma->host_port = g_strdup(host_port);
2763 } else {
2764 error_setg(errp, "RDMA ERROR: bad RDMA migration address '%s'",
2765 host_port);
2766 g_free(rdma);
2767 rdma = NULL;
2768 }
2769
2770 qapi_free_InetSocketAddress(addr);
2771 return rdma;
2772 }
2773
2774 /*
2775 * QEMUFile interface to the control channel.
2776 * SEND messages for control only.
2777 * VM's ram is handled with regular RDMA messages.
2778 */
2779 static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
2780 const struct iovec *iov,
2781 size_t niov,
2782 int *fds,
2783 size_t nfds,
2784 int flags,
2785 Error **errp)
2786 {
2787 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2788 RDMAContext *rdma;
2789 int ret;
2790 ssize_t done = 0;
2791 size_t i, len;
2792
2793 RCU_READ_LOCK_GUARD();
2794 rdma = qatomic_rcu_read(&rioc->rdmaout);
2795
2796 if (!rdma) {
2797 error_setg(errp, "RDMA control channel output is not set");
2798 return -1;
2799 }
2800
2801 if (rdma->errored) {
2802 error_setg(errp,
2803 "RDMA is in an error state waiting migration to abort!");
2804 return -1;
2805 }
2806
2807 /*
2808 * Push out any writes that
2809 * we're queued up for VM's ram.
2810 */
2811 ret = qemu_rdma_write_flush(rdma, errp);
2812 if (ret < 0) {
2813 rdma->errored = true;
2814 return -1;
2815 }
2816
2817 for (i = 0; i < niov; i++) {
2818 size_t remaining = iov[i].iov_len;
2819 uint8_t * data = (void *)iov[i].iov_base;
2820 while (remaining) {
2821 RDMAControlHeader head = {};
2822
2823 len = MIN(remaining, RDMA_SEND_INCREMENT);
2824 remaining -= len;
2825
2826 head.len = len;
2827 head.type = RDMA_CONTROL_QEMU_FILE;
2828
2829 ret = qemu_rdma_exchange_send(rdma, &head,
2830 data, NULL, NULL, NULL, errp);
2831
2832 if (ret < 0) {
2833 rdma->errored = true;
2834 return -1;
2835 }
2836
2837 data += len;
2838 done += len;
2839 }
2840 }
2841
2842 return done;
2843 }
2844
2845 static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf,
2846 size_t size, int idx)
2847 {
2848 size_t len = 0;
2849
2850 if (rdma->wr_data[idx].control_len) {
2851 trace_qemu_rdma_fill(rdma->wr_data[idx].control_len, size);
2852
2853 len = MIN(size, rdma->wr_data[idx].control_len);
2854 memcpy(buf, rdma->wr_data[idx].control_curr, len);
2855 rdma->wr_data[idx].control_curr += len;
2856 rdma->wr_data[idx].control_len -= len;
2857 }
2858
2859 return len;
2860 }
2861
2862 /*
2863 * QEMUFile interface to the control channel.
2864 * RDMA links don't use bytestreams, so we have to
2865 * return bytes to QEMUFile opportunistically.
2866 */
2867 static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
2868 const struct iovec *iov,
2869 size_t niov,
2870 int **fds,
2871 size_t *nfds,
2872 int flags,
2873 Error **errp)
2874 {
2875 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2876 RDMAContext *rdma;
2877 RDMAControlHeader head;
2878 int ret;
2879 ssize_t done = 0;
2880 size_t i, len;
2881
2882 RCU_READ_LOCK_GUARD();
2883 rdma = qatomic_rcu_read(&rioc->rdmain);
2884
2885 if (!rdma) {
2886 error_setg(errp, "RDMA control channel input is not set");
2887 return -1;
2888 }
2889
2890 if (rdma->errored) {
2891 error_setg(errp,
2892 "RDMA is in an error state waiting migration to abort!");
2893 return -1;
2894 }
2895
2896 for (i = 0; i < niov; i++) {
2897 size_t want = iov[i].iov_len;
2898 uint8_t *data = (void *)iov[i].iov_base;
2899
2900 /*
2901 * First, we hold on to the last SEND message we
2902 * were given and dish out the bytes until we run
2903 * out of bytes.
2904 */
2905 len = qemu_rdma_fill(rdma, data, want, 0);
2906 done += len;
2907 want -= len;
2908 /* Got what we needed, so go to next iovec */
2909 if (want == 0) {
2910 continue;
2911 }
2912
2913 /* If we got any data so far, then don't wait
2914 * for more, just return what we have */
2915 if (done > 0) {
2916 break;
2917 }
2918
2919
2920 /* We've got nothing at all, so lets wait for
2921 * more to arrive
2922 */
2923 ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE,
2924 errp);
2925
2926 if (ret < 0) {
2927 rdma->errored = true;
2928 return -1;
2929 }
2930
2931 /*
2932 * SEND was received with new bytes, now try again.
2933 */
2934 len = qemu_rdma_fill(rdma, data, want, 0);
2935 done += len;
2936 want -= len;
2937
2938 /* Still didn't get enough, so lets just return */
2939 if (want) {
2940 if (done == 0) {
2941 return QIO_CHANNEL_ERR_BLOCK;
2942 } else {
2943 break;
2944 }
2945 }
2946 }
2947 return done;
2948 }
2949
2950 /*
2951 * Block until all the outstanding chunks have been delivered by the hardware.
2952 */
2953 static int qemu_rdma_drain_cq(RDMAContext *rdma)
2954 {
2955 Error *err = NULL;
2956 int ret;
2957
2958 if (qemu_rdma_write_flush(rdma, &err) < 0) {
2959 error_report_err(err);
2960 return -1;
2961 }
2962
2963 while (rdma->nb_sent) {
2964 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
2965 if (ret < 0) {
2966 error_report("rdma migration: complete polling error!");
2967 return -1;
2968 }
2969 }
2970
2971 qemu_rdma_unregister_waiting(rdma);
2972
2973 return 0;
2974 }
2975
2976
2977 static int qio_channel_rdma_set_blocking(QIOChannel *ioc,
2978 bool blocking,
2979 Error **errp)
2980 {
2981 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2982 /* XXX we should make readv/writev actually honour this :-) */
2983 rioc->blocking = blocking;
2984 return 0;
2985 }
2986
2987
2988 typedef struct QIOChannelRDMASource QIOChannelRDMASource;
2989 struct QIOChannelRDMASource {
2990 GSource parent;
2991 QIOChannelRDMA *rioc;
2992 GIOCondition condition;
2993 };
2994
2995 static gboolean
2996 qio_channel_rdma_source_prepare(GSource *source,
2997 gint *timeout)
2998 {
2999 QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
3000 RDMAContext *rdma;
3001 GIOCondition cond = 0;
3002 *timeout = -1;
3003
3004 RCU_READ_LOCK_GUARD();
3005 if (rsource->condition == G_IO_IN) {
3006 rdma = qatomic_rcu_read(&rsource->rioc->rdmain);
3007 } else {
3008 rdma = qatomic_rcu_read(&rsource->rioc->rdmaout);
3009 }
3010
3011 if (!rdma) {
3012 error_report("RDMAContext is NULL when prepare Gsource");
3013 return FALSE;
3014 }
3015
3016 if (rdma->wr_data[0].control_len) {
3017 cond |= G_IO_IN;
3018 }
3019 cond |= G_IO_OUT;
3020
3021 return cond & rsource->condition;
3022 }
3023
3024 static gboolean
3025 qio_channel_rdma_source_check(GSource *source)
3026 {
3027 QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
3028 RDMAContext *rdma;
3029 GIOCondition cond = 0;
3030
3031 RCU_READ_LOCK_GUARD();
3032 if (rsource->condition == G_IO_IN) {
3033 rdma = qatomic_rcu_read(&rsource->rioc->rdmain);
3034 } else {
3035 rdma = qatomic_rcu_read(&rsource->rioc->rdmaout);
3036 }
3037
3038 if (!rdma) {
3039 error_report("RDMAContext is NULL when check Gsource");
3040 return FALSE;
3041 }
3042
3043 if (rdma->wr_data[0].control_len) {
3044 cond |= G_IO_IN;
3045 }
3046 cond |= G_IO_OUT;
3047
3048 return cond & rsource->condition;
3049 }
3050
3051 static gboolean
3052 qio_channel_rdma_source_dispatch(GSource *source,
3053 GSourceFunc callback,
3054 gpointer user_data)
3055 {
3056 QIOChannelFunc func = (QIOChannelFunc)callback;
3057 QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
3058 RDMAContext *rdma;
3059 GIOCondition cond = 0;
3060
3061 RCU_READ_LOCK_GUARD();
3062 if (rsource->condition == G_IO_IN) {
3063 rdma = qatomic_rcu_read(&rsource->rioc->rdmain);
3064 } else {
3065 rdma = qatomic_rcu_read(&rsource->rioc->rdmaout);
3066 }
3067
3068 if (!rdma) {
3069 error_report("RDMAContext is NULL when dispatch Gsource");
3070 return FALSE;
3071 }
3072
3073 if (rdma->wr_data[0].control_len) {
3074 cond |= G_IO_IN;
3075 }
3076 cond |= G_IO_OUT;
3077
3078 return (*func)(QIO_CHANNEL(rsource->rioc),
3079 (cond & rsource->condition),
3080 user_data);
3081 }
3082
3083 static void
3084 qio_channel_rdma_source_finalize(GSource *source)
3085 {
3086 QIOChannelRDMASource *ssource = (QIOChannelRDMASource *)source;
3087
3088 object_unref(OBJECT(ssource->rioc));
3089 }
3090
3091 static GSourceFuncs qio_channel_rdma_source_funcs = {
3092 qio_channel_rdma_source_prepare,
3093 qio_channel_rdma_source_check,
3094 qio_channel_rdma_source_dispatch,
3095 qio_channel_rdma_source_finalize
3096 };
3097
3098 static GSource *qio_channel_rdma_create_watch(QIOChannel *ioc,
3099 GIOCondition condition)
3100 {
3101 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
3102 QIOChannelRDMASource *ssource;
3103 GSource *source;
3104
3105 source = g_source_new(&qio_channel_rdma_source_funcs,
3106 sizeof(QIOChannelRDMASource));
3107 ssource = (QIOChannelRDMASource *)source;
3108
3109 ssource->rioc = rioc;
3110 object_ref(OBJECT(rioc));
3111
3112 ssource->condition = condition;
3113
3114 return source;
3115 }
3116
3117 static void qio_channel_rdma_set_aio_fd_handler(QIOChannel *ioc,
3118 AioContext *read_ctx,
3119 IOHandler *io_read,
3120 AioContext *write_ctx,
3121 IOHandler *io_write,
3122 void *opaque)
3123 {
3124 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
3125 if (io_read) {
3126 aio_set_fd_handler(read_ctx, rioc->rdmain->recv_comp_channel->fd,
3127 io_read, io_write, NULL, NULL, opaque);
3128 aio_set_fd_handler(read_ctx, rioc->rdmain->send_comp_channel->fd,
3129 io_read, io_write, NULL, NULL, opaque);
3130 } else {
3131 aio_set_fd_handler(write_ctx, rioc->rdmaout->recv_comp_channel->fd,
3132 io_read, io_write, NULL, NULL, opaque);
3133 aio_set_fd_handler(write_ctx, rioc->rdmaout->send_comp_channel->fd,
3134 io_read, io_write, NULL, NULL, opaque);
3135 }
3136 }
3137
3138 struct rdma_close_rcu {
3139 struct rcu_head rcu;
3140 RDMAContext *rdmain;
3141 RDMAContext *rdmaout;
3142 };
3143
3144 /* callback from qio_channel_rdma_close via call_rcu */
3145 static void qio_channel_rdma_close_rcu(struct rdma_close_rcu *rcu)
3146 {
3147 if (rcu->rdmain) {
3148 qemu_rdma_cleanup(rcu->rdmain);
3149 }
3150
3151 if (rcu->rdmaout) {
3152 qemu_rdma_cleanup(rcu->rdmaout);
3153 }
3154
3155 g_free(rcu->rdmain);
3156 g_free(rcu->rdmaout);
3157 g_free(rcu);
3158 }
3159
3160 static int qio_channel_rdma_close(QIOChannel *ioc,
3161 Error **errp)
3162 {
3163 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
3164 RDMAContext *rdmain, *rdmaout;
3165 struct rdma_close_rcu *rcu = g_new(struct rdma_close_rcu, 1);
3166
3167 trace_qemu_rdma_close();
3168
3169 rdmain = rioc->rdmain;
3170 if (rdmain) {
3171 qatomic_rcu_set(&rioc->rdmain, NULL);
3172 }
3173
3174 rdmaout = rioc->rdmaout;
3175 if (rdmaout) {
3176 qatomic_rcu_set(&rioc->rdmaout, NULL);
3177 }
3178
3179 rcu->rdmain = rdmain;
3180 rcu->rdmaout = rdmaout;
3181 call_rcu(rcu, qio_channel_rdma_close_rcu, rcu);
3182
3183 return 0;
3184 }
3185
3186 static int
3187 qio_channel_rdma_shutdown(QIOChannel *ioc,
3188 QIOChannelShutdown how,
3189 Error **errp)
3190 {
3191 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
3192 RDMAContext *rdmain, *rdmaout;
3193
3194 RCU_READ_LOCK_GUARD();
3195
3196 rdmain = qatomic_rcu_read(&rioc->rdmain);
3197 rdmaout = qatomic_rcu_read(&rioc->rdmain);
3198
3199 switch (how) {
3200 case QIO_CHANNEL_SHUTDOWN_READ:
3201 if (rdmain) {
3202 rdmain->errored = true;
3203 }
3204 break;
3205 case QIO_CHANNEL_SHUTDOWN_WRITE:
3206 if (rdmaout) {
3207 rdmaout->errored = true;
3208 }
3209 break;
3210 case QIO_CHANNEL_SHUTDOWN_BOTH:
3211 default:
3212 if (rdmain) {
3213 rdmain->errored = true;
3214 }
3215 if (rdmaout) {
3216 rdmaout->errored = true;
3217 }
3218 break;
3219 }
3220
3221 return 0;
3222 }
3223
3224 /*
3225 * Parameters:
3226 * @offset == 0 :
3227 * This means that 'block_offset' is a full virtual address that does not
3228 * belong to a RAMBlock of the virtual machine and instead
3229 * represents a private malloc'd memory area that the caller wishes to
3230 * transfer.
3231 *
3232 * @offset != 0 :
3233 * Offset is an offset to be added to block_offset and used
3234 * to also lookup the corresponding RAMBlock.
3235 *
3236 * @size : Number of bytes to transfer
3237 *
3238 * @pages_sent : User-specificed pointer to indicate how many pages were
3239 * sent. Usually, this will not be more than a few bytes of
3240 * the protocol because most transfers are sent asynchronously.
3241 */
3242 static int qemu_rdma_save_page(QEMUFile *f, ram_addr_t block_offset,
3243 ram_addr_t offset, size_t size)
3244 {
3245 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
3246 Error *err = NULL;
3247 RDMAContext *rdma;
3248 int ret;
3249
3250 if (migration_in_postcopy()) {
3251 return RAM_SAVE_CONTROL_NOT_SUPP;
3252 }
3253
3254 RCU_READ_LOCK_GUARD();
3255 rdma = qatomic_rcu_read(&rioc->rdmaout);
3256
3257 if (!rdma) {
3258 return -1;
3259 }
3260
3261 if (rdma_errored(rdma)) {
3262 return -1;
3263 }
3264
3265 qemu_fflush(f);
3266
3267 /*
3268 * Add this page to the current 'chunk'. If the chunk
3269 * is full, or the page doesn't belong to the current chunk,
3270 * an actual RDMA write will occur and a new chunk will be formed.
3271 */
3272 ret = qemu_rdma_write(rdma, block_offset, offset, size, &err);
3273 if (ret < 0) {
3274 error_report_err(err);
3275 goto err;
3276 }
3277
3278 /*
3279 * Drain the Completion Queue if possible, but do not block,
3280 * just poll.
3281 *
3282 * If nothing to poll, the end of the iteration will do this
3283 * again to make sure we don't overflow the request queue.
3284 */
3285 while (1) {
3286 uint64_t wr_id, wr_id_in;
3287 ret = qemu_rdma_poll(rdma, rdma->recv_cq, &wr_id_in, NULL);
3288
3289 if (ret < 0) {
3290 error_report("rdma migration: polling error");
3291 goto err;
3292 }
3293
3294 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
3295
3296 if (wr_id == RDMA_WRID_NONE) {
3297 break;
3298 }
3299 }
3300
3301 while (1) {
3302 uint64_t wr_id, wr_id_in;
3303 ret = qemu_rdma_poll(rdma, rdma->send_cq, &wr_id_in, NULL);
3304
3305 if (ret < 0) {
3306 error_report("rdma migration: polling error");
3307 goto err;
3308 }
3309
3310 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
3311
3312 if (wr_id == RDMA_WRID_NONE) {
3313 break;
3314 }
3315 }
3316
3317 return RAM_SAVE_CONTROL_DELAYED;
3318
3319 err:
3320 rdma->errored = true;
3321 return -1;
3322 }
3323
3324 static void rdma_accept_incoming_migration(void *opaque);
3325
3326 static void rdma_cm_poll_handler(void *opaque)
3327 {
3328 RDMAContext *rdma = opaque;
3329 int ret;
3330 struct rdma_cm_event *cm_event;
3331 MigrationIncomingState *mis = migration_incoming_get_current();
3332
3333 ret = rdma_get_cm_event(rdma->channel, &cm_event);
3334 if (ret < 0) {
3335 error_report("get_cm_event failed %d", errno);
3336 return;
3337 }
3338
3339 if (cm_event->event == RDMA_CM_EVENT_DISCONNECTED ||
3340 cm_event->event == RDMA_CM_EVENT_DEVICE_REMOVAL) {
3341 if (!rdma->errored &&
3342 migration_incoming_get_current()->state !=
3343 MIGRATION_STATUS_COMPLETED) {
3344 error_report("receive cm event, cm event is %d", cm_event->event);
3345 rdma->errored = true;
3346 if (rdma->return_path) {
3347 rdma->return_path->errored = true;
3348 }
3349 }
3350 rdma_ack_cm_event(cm_event);
3351 if (mis->loadvm_co) {
3352 qemu_coroutine_enter(mis->loadvm_co);
3353 }
3354 return;
3355 }
3356 rdma_ack_cm_event(cm_event);
3357 }
3358
3359 static int qemu_rdma_accept(RDMAContext *rdma)
3360 {
3361 Error *err = NULL;
3362 RDMACapabilities cap;
3363 struct rdma_conn_param conn_param = {
3364 .responder_resources = 2,
3365 .private_data = &cap,
3366 .private_data_len = sizeof(cap),
3367 };
3368 RDMAContext *rdma_return_path = NULL;
3369 struct rdma_cm_event *cm_event;
3370 struct ibv_context *verbs;
3371 int ret;
3372 int idx;
3373
3374 ret = rdma_get_cm_event(rdma->channel, &cm_event);
3375 if (ret < 0) {
3376 goto err_rdma_dest_wait;
3377 }
3378
3379 if (cm_event->event != RDMA_CM_EVENT_CONNECT_REQUEST) {
3380 rdma_ack_cm_event(cm_event);
3381 goto err_rdma_dest_wait;
3382 }
3383
3384 /*
3385 * initialize the RDMAContext for return path for postcopy after first
3386 * connection request reached.
3387 */
3388 if ((migrate_postcopy() || migrate_return_path())
3389 && !rdma->is_return_path) {
3390 rdma_return_path = qemu_rdma_data_init(rdma->host_port, NULL);
3391 if (rdma_return_path == NULL) {
3392 rdma_ack_cm_event(cm_event);
3393 goto err_rdma_dest_wait;
3394 }
3395
3396 qemu_rdma_return_path_dest_init(rdma_return_path, rdma);
3397 }
3398
3399 memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
3400
3401 network_to_caps(&cap);
3402
3403 if (cap.version < 1 || cap.version > RDMA_CONTROL_VERSION_CURRENT) {
3404 error_report("Unknown source RDMA version: %d, bailing...",
3405 cap.version);
3406 rdma_ack_cm_event(cm_event);
3407 goto err_rdma_dest_wait;
3408 }
3409
3410 /*
3411 * Respond with only the capabilities this version of QEMU knows about.
3412 */
3413 cap.flags &= known_capabilities;
3414
3415 /*
3416 * Enable the ones that we do know about.
3417 * Add other checks here as new ones are introduced.
3418 */
3419 if (cap.flags & RDMA_CAPABILITY_PIN_ALL) {
3420 rdma->pin_all = true;
3421 }
3422
3423 rdma->cm_id = cm_event->id;
3424 verbs = cm_event->id->verbs;
3425
3426 rdma_ack_cm_event(cm_event);
3427
3428 trace_qemu_rdma_accept_pin_state(rdma->pin_all);
3429
3430 caps_to_network(&cap);
3431
3432 trace_qemu_rdma_accept_pin_verbsc(verbs);
3433
3434 if (!rdma->verbs) {
3435 rdma->verbs = verbs;
3436 } else if (rdma->verbs != verbs) {
3437 error_report("ibv context not matching %p, %p!", rdma->verbs,
3438 verbs);
3439 goto err_rdma_dest_wait;
3440 }
3441
3442 qemu_rdma_dump_id("dest_init", verbs);
3443
3444 ret = qemu_rdma_alloc_pd_cq(rdma, &err);
3445 if (ret < 0) {
3446 error_report_err(err);
3447 goto err_rdma_dest_wait;
3448 }
3449
3450 ret = qemu_rdma_alloc_qp(rdma);
3451 if (ret < 0) {
3452 error_report("rdma migration: error allocating qp!");
3453 goto err_rdma_dest_wait;
3454 }
3455
3456 qemu_rdma_init_ram_blocks(rdma);
3457
3458 for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
3459 ret = qemu_rdma_reg_control(rdma, idx);
3460 if (ret < 0) {
3461 error_report("rdma: error registering %d control", idx);
3462 goto err_rdma_dest_wait;
3463 }
3464 }
3465
3466 /* Accept the second connection request for return path */
3467 if ((migrate_postcopy() || migrate_return_path())
3468 && !rdma->is_return_path) {
3469 qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
3470 NULL,
3471 (void *)(intptr_t)rdma->return_path);
3472 } else {
3473 qemu_set_fd_handler(rdma->channel->fd, rdma_cm_poll_handler,
3474 NULL, rdma);
3475 }
3476
3477 ret = rdma_accept(rdma->cm_id, &conn_param);
3478 if (ret < 0) {
3479 error_report("rdma_accept failed");
3480 goto err_rdma_dest_wait;
3481 }
3482
3483 ret = rdma_get_cm_event(rdma->channel, &cm_event);
3484 if (ret < 0) {
3485 error_report("rdma_accept get_cm_event failed");
3486 goto err_rdma_dest_wait;
3487 }
3488
3489 if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
3490 error_report("rdma_accept not event established");
3491 rdma_ack_cm_event(cm_event);
3492 goto err_rdma_dest_wait;
3493 }
3494
3495 rdma_ack_cm_event(cm_event);
3496 rdma->connected = true;
3497
3498 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY, &err);
3499 if (ret < 0) {
3500 error_report_err(err);
3501 goto err_rdma_dest_wait;
3502 }
3503
3504 qemu_rdma_dump_gid("dest_connect", rdma->cm_id);
3505
3506 return 0;
3507
3508 err_rdma_dest_wait:
3509 rdma->errored = true;
3510 qemu_rdma_cleanup(rdma);
3511 g_free(rdma_return_path);
3512 return -1;
3513 }
3514
3515 static int dest_ram_sort_func(const void *a, const void *b)
3516 {
3517 unsigned int a_index = ((const RDMALocalBlock *)a)->src_index;
3518 unsigned int b_index = ((const RDMALocalBlock *)b)->src_index;
3519
3520 return (a_index < b_index) ? -1 : (a_index != b_index);
3521 }
3522
3523 /*
3524 * During each iteration of the migration, we listen for instructions
3525 * by the source VM to perform dynamic page registrations before they
3526 * can perform RDMA operations.
3527 *
3528 * We respond with the 'rkey'.
3529 *
3530 * Keep doing this until the source tells us to stop.
3531 */
3532 static int qemu_rdma_registration_handle(QEMUFile *f)
3533 {
3534 RDMAControlHeader reg_resp = { .len = sizeof(RDMARegisterResult),
3535 .type = RDMA_CONTROL_REGISTER_RESULT,
3536 .repeat = 0,
3537 };
3538 RDMAControlHeader unreg_resp = { .len = 0,
3539 .type = RDMA_CONTROL_UNREGISTER_FINISHED,
3540 .repeat = 0,
3541 };
3542 RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT,
3543 .repeat = 1 };
3544 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
3545 Error *err = NULL;
3546 RDMAContext *rdma;
3547 RDMALocalBlocks *local;
3548 RDMAControlHeader head;
3549 RDMARegister *reg, *registers;
3550 RDMACompress *comp;
3551 RDMARegisterResult *reg_result;
3552 static RDMARegisterResult results[RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE];
3553 RDMALocalBlock *block;
3554 void *host_addr;
3555 int ret;
3556 int idx = 0;
3557 int count = 0;
3558 int i = 0;
3559
3560 RCU_READ_LOCK_GUARD();
3561 rdma = qatomic_rcu_read(&rioc->rdmain);
3562
3563 if (!rdma) {
3564 return -1;
3565 }
3566
3567 if (rdma_errored(rdma)) {
3568 return -1;
3569 }
3570
3571 local = &rdma->local_ram_blocks;
3572 do {
3573 trace_qemu_rdma_registration_handle_wait();
3574
3575 ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_NONE, &err);
3576
3577 if (ret < 0) {
3578 error_report_err(err);
3579 break;
3580 }
3581
3582 if (head.repeat > RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE) {
3583 error_report("rdma: Too many requests in this message (%d)."
3584 "Bailing.", head.repeat);
3585 break;
3586 }
3587
3588 switch (head.type) {
3589 case RDMA_CONTROL_COMPRESS:
3590 comp = (RDMACompress *) rdma->wr_data[idx].control_curr;
3591 network_to_compress(comp);
3592
3593 trace_qemu_rdma_registration_handle_compress(comp->length,
3594 comp->block_idx,
3595 comp->offset);
3596 if (comp->block_idx >= rdma->local_ram_blocks.nb_blocks) {
3597 error_report("rdma: 'compress' bad block index %u (vs %d)",
3598 (unsigned int)comp->block_idx,
3599 rdma->local_ram_blocks.nb_blocks);
3600 goto err;
3601 }
3602 block = &(rdma->local_ram_blocks.block[comp->block_idx]);
3603
3604 host_addr = block->local_host_addr +
3605 (comp->offset - block->offset);
3606
3607 ram_handle_compressed(host_addr, comp->value, comp->length);
3608 break;
3609
3610 case RDMA_CONTROL_REGISTER_FINISHED:
3611 trace_qemu_rdma_registration_handle_finished();
3612 return 0;
3613
3614 case RDMA_CONTROL_RAM_BLOCKS_REQUEST:
3615 trace_qemu_rdma_registration_handle_ram_blocks();
3616
3617 /* Sort our local RAM Block list so it's the same as the source,
3618 * we can do this since we've filled in a src_index in the list
3619 * as we received the RAMBlock list earlier.
3620 */
3621 qsort(rdma->local_ram_blocks.block,
3622 rdma->local_ram_blocks.nb_blocks,
3623 sizeof(RDMALocalBlock), dest_ram_sort_func);
3624 for (i = 0; i < local->nb_blocks; i++) {
3625 local->block[i].index = i;
3626 }
3627
3628 if (rdma->pin_all) {
3629 ret = qemu_rdma_reg_whole_ram_blocks(rdma, &err);
3630 if (ret < 0) {
3631 error_report_err(err);
3632 goto err;
3633 }
3634 }
3635
3636 /*
3637 * Dest uses this to prepare to transmit the RAMBlock descriptions
3638 * to the source VM after connection setup.
3639 * Both sides use the "remote" structure to communicate and update
3640 * their "local" descriptions with what was sent.
3641 */
3642 for (i = 0; i < local->nb_blocks; i++) {
3643 rdma->dest_blocks[i].remote_host_addr =
3644 (uintptr_t)(local->block[i].local_host_addr);
3645
3646 if (rdma->pin_all) {
3647 rdma->dest_blocks[i].remote_rkey = local->block[i].mr->rkey;
3648 }
3649
3650 rdma->dest_blocks[i].offset = local->block[i].offset;
3651 rdma->dest_blocks[i].length = local->block[i].length;
3652
3653 dest_block_to_network(&rdma->dest_blocks[i]);
3654 trace_qemu_rdma_registration_handle_ram_blocks_loop(
3655 local->block[i].block_name,
3656 local->block[i].offset,
3657 local->block[i].length,
3658 local->block[i].local_host_addr,
3659 local->block[i].src_index);
3660 }
3661
3662 blocks.len = rdma->local_ram_blocks.nb_blocks
3663 * sizeof(RDMADestBlock);
3664
3665
3666 ret = qemu_rdma_post_send_control(rdma,
3667 (uint8_t *) rdma->dest_blocks, &blocks,
3668 &err);
3669
3670 if (ret < 0) {
3671 error_report_err(err);
3672 goto err;
3673 }
3674
3675 break;
3676 case RDMA_CONTROL_REGISTER_REQUEST:
3677 trace_qemu_rdma_registration_handle_register(head.repeat);
3678
3679 reg_resp.repeat = head.repeat;
3680 registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
3681
3682 for (count = 0; count < head.repeat; count++) {
3683 uint64_t chunk;
3684 uint8_t *chunk_start, *chunk_end;
3685
3686 reg = &registers[count];
3687 network_to_register(reg);
3688
3689 reg_result = &results[count];
3690
3691 trace_qemu_rdma_registration_handle_register_loop(count,
3692 reg->current_index, reg->key.current_addr, reg->chunks);
3693
3694 if (reg->current_index >= rdma->local_ram_blocks.nb_blocks) {
3695 error_report("rdma: 'register' bad block index %u (vs %d)",
3696 (unsigned int)reg->current_index,
3697 rdma->local_ram_blocks.nb_blocks);
3698 goto err;
3699 }
3700 block = &(rdma->local_ram_blocks.block[reg->current_index]);
3701 if (block->is_ram_block) {
3702 if (block->offset > reg->key.current_addr) {
3703 error_report("rdma: bad register address for block %s"
3704 " offset: %" PRIx64 " current_addr: %" PRIx64,
3705 block->block_name, block->offset,
3706 reg->key.current_addr);
3707 goto err;
3708 }
3709 host_addr = (block->local_host_addr +
3710 (reg->key.current_addr - block->offset));
3711 chunk = ram_chunk_index(block->local_host_addr,
3712 (uint8_t *) host_addr);
3713 } else {
3714 chunk = reg->key.chunk;
3715 host_addr = block->local_host_addr +
3716 (reg->key.chunk * (1UL << RDMA_REG_CHUNK_SHIFT));
3717 /* Check for particularly bad chunk value */
3718 if (host_addr < (void *)block->local_host_addr) {
3719 error_report("rdma: bad chunk for block %s"
3720 " chunk: %" PRIx64,
3721 block->block_name, reg->key.chunk);
3722 goto err;
3723 }
3724 }
3725 chunk_start = ram_chunk_start(block, chunk);
3726 chunk_end = ram_chunk_end(block, chunk + reg->chunks);
3727 /* avoid "-Waddress-of-packed-member" warning */
3728 uint32_t tmp_rkey = 0;
3729 if (qemu_rdma_register_and_get_keys(rdma, block,
3730 (uintptr_t)host_addr, NULL, &tmp_rkey,
3731 chunk, chunk_start, chunk_end)) {
3732 error_report("cannot get rkey");
3733 goto err;
3734 }
3735 reg_result->rkey = tmp_rkey;
3736
3737 reg_result->host_addr = (uintptr_t)block->local_host_addr;
3738
3739 trace_qemu_rdma_registration_handle_register_rkey(
3740 reg_result->rkey);
3741
3742 result_to_network(reg_result);
3743 }
3744
3745 ret = qemu_rdma_post_send_control(rdma,
3746 (uint8_t *) results, &reg_resp, &err);
3747
3748 if (ret < 0) {
3749 error_report_err(err);
3750 goto err;
3751 }
3752 break;
3753 case RDMA_CONTROL_UNREGISTER_REQUEST:
3754 trace_qemu_rdma_registration_handle_unregister(head.repeat);
3755 unreg_resp.repeat = head.repeat;
3756 registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
3757
3758 for (count = 0; count < head.repeat; count++) {
3759 reg = &registers[count];
3760 network_to_register(reg);
3761
3762 trace_qemu_rdma_registration_handle_unregister_loop(count,
3763 reg->current_index, reg->key.chunk);
3764
3765 block = &(rdma->local_ram_blocks.block[reg->current_index]);
3766
3767 ret = ibv_dereg_mr(block->pmr[reg->key.chunk]);
3768 block->pmr[reg->key.chunk] = NULL;
3769
3770 if (ret != 0) {
3771 error_report("rdma unregistration chunk failed: %s",
3772 strerror(errno));
3773 goto err;
3774 }
3775
3776 rdma->total_registrations--;
3777
3778 trace_qemu_rdma_registration_handle_unregister_success(
3779 reg->key.chunk);
3780 }
3781
3782 ret = qemu_rdma_post_send_control(rdma, NULL, &unreg_resp, &err);
3783
3784 if (ret < 0) {
3785 error_report_err(err);
3786 goto err;
3787 }
3788 break;
3789 case RDMA_CONTROL_REGISTER_RESULT:
3790 error_report("Invalid RESULT message at dest.");
3791 goto err;
3792 default:
3793 error_report("Unknown control message %s", control_desc(head.type));
3794 goto err;
3795 }
3796 } while (1);
3797
3798 err:
3799 rdma->errored = true;
3800 return -1;
3801 }
3802
3803 /* Destination:
3804 * Called via a ram_control_load_hook during the initial RAM load section which
3805 * lists the RAMBlocks by name. This lets us know the order of the RAMBlocks
3806 * on the source.
3807 * We've already built our local RAMBlock list, but not yet sent the list to
3808 * the source.
3809 */
3810 static int
3811 rdma_block_notification_handle(QEMUFile *f, const char *name)
3812 {
3813 RDMAContext *rdma;
3814 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
3815 int curr;
3816 int found = -1;
3817
3818 RCU_READ_LOCK_GUARD();
3819 rdma = qatomic_rcu_read(&rioc->rdmain);
3820
3821 if (!rdma) {
3822 return -1;
3823 }
3824
3825 /* Find the matching RAMBlock in our local list */
3826 for (curr = 0; curr < rdma->local_ram_blocks.nb_blocks; curr++) {
3827 if (!strcmp(rdma->local_ram_blocks.block[curr].block_name, name)) {
3828 found = curr;
3829 break;
3830 }
3831 }
3832
3833 if (found == -1) {
3834 error_report("RAMBlock '%s' not found on destination", name);
3835 return -1;
3836 }
3837
3838 rdma->local_ram_blocks.block[curr].src_index = rdma->next_src_index;
3839 trace_rdma_block_notification_handle(name, rdma->next_src_index);
3840 rdma->next_src_index++;
3841
3842 return 0;
3843 }
3844
3845 static int rdma_load_hook(QEMUFile *f, uint64_t flags, void *data)
3846 {
3847 switch (flags) {
3848 case RAM_CONTROL_BLOCK_REG:
3849 return rdma_block_notification_handle(f, data);
3850
3851 case RAM_CONTROL_HOOK:
3852 return qemu_rdma_registration_handle(f);
3853
3854 default:
3855 /* Shouldn't be called with any other values */
3856 abort();
3857 }
3858 }
3859
3860 static int qemu_rdma_registration_start(QEMUFile *f,
3861 uint64_t flags, void *data)
3862 {
3863 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
3864 RDMAContext *rdma;
3865
3866 if (migration_in_postcopy()) {
3867 return 0;
3868 }
3869
3870 RCU_READ_LOCK_GUARD();
3871 rdma = qatomic_rcu_read(&rioc->rdmaout);
3872 if (!rdma) {
3873 return -1;
3874 }
3875
3876 if (rdma_errored(rdma)) {
3877 return -1;
3878 }
3879
3880 trace_qemu_rdma_registration_start(flags);
3881 qemu_put_be64(f, RAM_SAVE_FLAG_HOOK);
3882 qemu_fflush(f);
3883
3884 return 0;
3885 }
3886
3887 /*
3888 * Inform dest that dynamic registrations are done for now.
3889 * First, flush writes, if any.
3890 */
3891 static int qemu_rdma_registration_stop(QEMUFile *f,
3892 uint64_t flags, void *data)
3893 {
3894 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
3895 Error *err = NULL;
3896 RDMAContext *rdma;
3897 RDMAControlHeader head = { .len = 0, .repeat = 1 };
3898 int ret;
3899
3900 if (migration_in_postcopy()) {
3901 return 0;
3902 }
3903
3904 RCU_READ_LOCK_GUARD();
3905 rdma = qatomic_rcu_read(&rioc->rdmaout);
3906 if (!rdma) {
3907 return -1;
3908 }
3909
3910 if (rdma_errored(rdma)) {
3911 return -1;
3912 }
3913
3914 qemu_fflush(f);
3915 ret = qemu_rdma_drain_cq(rdma);
3916
3917 if (ret < 0) {
3918 goto err;
3919 }
3920
3921 if (flags == RAM_CONTROL_SETUP) {
3922 RDMAControlHeader resp = {.type = RDMA_CONTROL_RAM_BLOCKS_RESULT };
3923 RDMALocalBlocks *local = &rdma->local_ram_blocks;
3924 int reg_result_idx, i, nb_dest_blocks;
3925
3926 head.type = RDMA_CONTROL_RAM_BLOCKS_REQUEST;
3927 trace_qemu_rdma_registration_stop_ram();
3928
3929 /*
3930 * Make sure that we parallelize the pinning on both sides.
3931 * For very large guests, doing this serially takes a really
3932 * long time, so we have to 'interleave' the pinning locally
3933 * with the control messages by performing the pinning on this
3934 * side before we receive the control response from the other
3935 * side that the pinning has completed.
3936 */
3937 ret = qemu_rdma_exchange_send(rdma, &head, NULL, &resp,
3938 &reg_result_idx, rdma->pin_all ?
3939 qemu_rdma_reg_whole_ram_blocks : NULL,
3940 &err);
3941 if (ret < 0) {
3942 error_report_err(err);
3943 return -1;
3944 }
3945
3946 nb_dest_blocks = resp.len / sizeof(RDMADestBlock);
3947
3948 /*
3949 * The protocol uses two different sets of rkeys (mutually exclusive):
3950 * 1. One key to represent the virtual address of the entire ram block.
3951 * (dynamic chunk registration disabled - pin everything with one rkey.)
3952 * 2. One to represent individual chunks within a ram block.
3953 * (dynamic chunk registration enabled - pin individual chunks.)
3954 *
3955 * Once the capability is successfully negotiated, the destination transmits
3956 * the keys to use (or sends them later) including the virtual addresses
3957 * and then propagates the remote ram block descriptions to his local copy.
3958 */
3959
3960 if (local->nb_blocks != nb_dest_blocks) {
3961 error_report("ram blocks mismatch (Number of blocks %d vs %d)",
3962 local->nb_blocks, nb_dest_blocks);
3963 error_printf("Your QEMU command line parameters are probably "
3964 "not identical on both the source and destination.");
3965 rdma->errored = true;
3966 return -1;
3967 }
3968
3969 qemu_rdma_move_header(rdma, reg_result_idx, &resp);
3970 memcpy(rdma->dest_blocks,
3971 rdma->wr_data[reg_result_idx].control_curr, resp.len);
3972 for (i = 0; i < nb_dest_blocks; i++) {
3973 network_to_dest_block(&rdma->dest_blocks[i]);
3974
3975 /* We require that the blocks are in the same order */
3976 if (rdma->dest_blocks[i].length != local->block[i].length) {
3977 error_report("Block %s/%d has a different length %" PRIu64
3978 "vs %" PRIu64,
3979 local->block[i].block_name, i,
3980 local->block[i].length,
3981 rdma->dest_blocks[i].length);
3982 rdma->errored = true;
3983 return -1;
3984 }
3985 local->block[i].remote_host_addr =
3986 rdma->dest_blocks[i].remote_host_addr;
3987 local->block[i].remote_rkey = rdma->dest_blocks[i].remote_rkey;
3988 }
3989 }
3990
3991 trace_qemu_rdma_registration_stop(flags);
3992
3993 head.type = RDMA_CONTROL_REGISTER_FINISHED;
3994 ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL, &err);
3995
3996 if (ret < 0) {
3997 error_report_err(err);
3998 goto err;
3999 }
4000
4001 return 0;
4002 err:
4003 rdma->errored = true;
4004 return -1;
4005 }
4006
4007 static const QEMUFileHooks rdma_read_hooks = {
4008 .hook_ram_load = rdma_load_hook,
4009 };
4010
4011 static const QEMUFileHooks rdma_write_hooks = {
4012 .before_ram_iterate = qemu_rdma_registration_start,
4013 .after_ram_iterate = qemu_rdma_registration_stop,
4014 .save_page = qemu_rdma_save_page,
4015 };
4016
4017
4018 static void qio_channel_rdma_finalize(Object *obj)
4019 {
4020 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(obj);
4021 if (rioc->rdmain) {
4022 qemu_rdma_cleanup(rioc->rdmain);
4023 g_free(rioc->rdmain);
4024 rioc->rdmain = NULL;
4025 }
4026 if (rioc->rdmaout) {
4027 qemu_rdma_cleanup(rioc->rdmaout);
4028 g_free(rioc->rdmaout);
4029 rioc->rdmaout = NULL;
4030 }
4031 }
4032
4033 static void qio_channel_rdma_class_init(ObjectClass *klass,
4034 void *class_data G_GNUC_UNUSED)
4035 {
4036 QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass);
4037
4038 ioc_klass->io_writev = qio_channel_rdma_writev;
4039 ioc_klass->io_readv = qio_channel_rdma_readv;
4040 ioc_klass->io_set_blocking = qio_channel_rdma_set_blocking;
4041 ioc_klass->io_close = qio_channel_rdma_close;
4042 ioc_klass->io_create_watch = qio_channel_rdma_create_watch;
4043 ioc_klass->io_set_aio_fd_handler = qio_channel_rdma_set_aio_fd_handler;
4044 ioc_klass->io_shutdown = qio_channel_rdma_shutdown;
4045 }
4046
4047 static const TypeInfo qio_channel_rdma_info = {
4048 .parent = TYPE_QIO_CHANNEL,
4049 .name = TYPE_QIO_CHANNEL_RDMA,
4050 .instance_size = sizeof(QIOChannelRDMA),
4051 .instance_finalize = qio_channel_rdma_finalize,
4052 .class_init = qio_channel_rdma_class_init,
4053 };
4054
4055 static void qio_channel_rdma_register_types(void)
4056 {
4057 type_register_static(&qio_channel_rdma_info);
4058 }
4059
4060 type_init(qio_channel_rdma_register_types);
4061
4062 static QEMUFile *rdma_new_input(RDMAContext *rdma)
4063 {
4064 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
4065
4066 rioc->file = qemu_file_new_input(QIO_CHANNEL(rioc));
4067 rioc->rdmain = rdma;
4068 rioc->rdmaout = rdma->return_path;
4069 qemu_file_set_hooks(rioc->file, &rdma_read_hooks);
4070
4071 return rioc->file;
4072 }
4073
4074 static QEMUFile *rdma_new_output(RDMAContext *rdma)
4075 {
4076 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
4077
4078 rioc->file = qemu_file_new_output(QIO_CHANNEL(rioc));
4079 rioc->rdmaout = rdma;
4080 rioc->rdmain = rdma->return_path;
4081 qemu_file_set_hooks(rioc->file, &rdma_write_hooks);
4082
4083 return rioc->file;
4084 }
4085
4086 static void rdma_accept_incoming_migration(void *opaque)
4087 {
4088 RDMAContext *rdma = opaque;
4089 int ret;
4090 QEMUFile *f;
4091 Error *local_err = NULL;
4092
4093 trace_qemu_rdma_accept_incoming_migration();
4094 ret = qemu_rdma_accept(rdma);
4095
4096 if (ret < 0) {
4097 error_report("RDMA ERROR: Migration initialization failed");
4098 return;
4099 }
4100
4101 trace_qemu_rdma_accept_incoming_migration_accepted();
4102
4103 if (rdma->is_return_path) {
4104 return;
4105 }
4106
4107 f = rdma_new_input(rdma);
4108 if (f == NULL) {
4109 error_report("RDMA ERROR: could not open RDMA for input");
4110 qemu_rdma_cleanup(rdma);
4111 return;
4112 }
4113
4114 rdma->migration_started_on_destination = 1;
4115 migration_fd_process_incoming(f, &local_err);
4116 if (local_err) {
4117 error_reportf_err(local_err, "RDMA ERROR:");
4118 }
4119 }
4120
4121 void rdma_start_incoming_migration(const char *host_port, Error **errp)
4122 {
4123 int ret;
4124 RDMAContext *rdma;
4125
4126 trace_rdma_start_incoming_migration();
4127
4128 /* Avoid ram_block_discard_disable(), cannot change during migration. */
4129 if (ram_block_discard_is_required()) {
4130 error_setg(errp, "RDMA: cannot disable RAM discard");
4131 return;
4132 }
4133
4134 rdma = qemu_rdma_data_init(host_port, errp);
4135 if (rdma == NULL) {
4136 goto err;
4137 }
4138
4139 ret = qemu_rdma_dest_init(rdma, errp);
4140 if (ret < 0) {
4141 goto err;
4142 }
4143
4144 trace_rdma_start_incoming_migration_after_dest_init();
4145
4146 ret = rdma_listen(rdma->listen_id, 5);
4147
4148 if (ret < 0) {
4149 error_setg(errp, "RDMA ERROR: listening on socket!");
4150 goto cleanup_rdma;
4151 }
4152
4153 trace_rdma_start_incoming_migration_after_rdma_listen();
4154
4155 qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
4156 NULL, (void *)(intptr_t)rdma);
4157 return;
4158
4159 cleanup_rdma:
4160 qemu_rdma_cleanup(rdma);
4161 err:
4162 if (rdma) {
4163 g_free(rdma->host);
4164 g_free(rdma->host_port);
4165 }
4166 g_free(rdma);
4167 }
4168
4169 void rdma_start_outgoing_migration(void *opaque,
4170 const char *host_port, Error **errp)
4171 {
4172 MigrationState *s = opaque;
4173 RDMAContext *rdma_return_path = NULL;
4174 RDMAContext *rdma;
4175 int ret;
4176
4177 /* Avoid ram_block_discard_disable(), cannot change during migration. */
4178 if (ram_block_discard_is_required()) {
4179 error_setg(errp, "RDMA: cannot disable RAM discard");
4180 return;
4181 }
4182
4183 rdma = qemu_rdma_data_init(host_port, errp);
4184 if (rdma == NULL) {
4185 goto err;
4186 }
4187
4188 ret = qemu_rdma_source_init(rdma, migrate_rdma_pin_all(), errp);
4189
4190 if (ret < 0) {
4191 goto err;
4192 }
4193
4194 trace_rdma_start_outgoing_migration_after_rdma_source_init();
4195 ret = qemu_rdma_connect(rdma, false, errp);
4196
4197 if (ret < 0) {
4198 goto err;
4199 }
4200
4201 /* RDMA postcopy need a separate queue pair for return path */
4202 if (migrate_postcopy() || migrate_return_path()) {
4203 rdma_return_path = qemu_rdma_data_init(host_port, errp);
4204
4205 if (rdma_return_path == NULL) {
4206 goto return_path_err;
4207 }
4208
4209 ret = qemu_rdma_source_init(rdma_return_path,
4210 migrate_rdma_pin_all(), errp);
4211
4212 if (ret < 0) {
4213 goto return_path_err;
4214 }
4215
4216 ret = qemu_rdma_connect(rdma_return_path, true, errp);
4217
4218 if (ret < 0) {
4219 goto return_path_err;
4220 }
4221
4222 rdma->return_path = rdma_return_path;
4223 rdma_return_path->return_path = rdma;
4224 rdma_return_path->is_return_path = true;
4225 }
4226
4227 trace_rdma_start_outgoing_migration_after_rdma_connect();
4228
4229 s->to_dst_file = rdma_new_output(rdma);
4230 migrate_fd_connect(s, NULL);
4231 return;
4232 return_path_err:
4233 qemu_rdma_cleanup(rdma);
4234 err:
4235 g_free(rdma);
4236 g_free(rdma_return_path);
4237 }