]> git.proxmox.com Git - mirror_qemu.git/blob - migration/rdma.c
migration/rdma: Drop "@errp is clear" guards around error_setg()
[mirror_qemu.git] / migration / rdma.c
1 /*
2 * RDMA protocol and interfaces
3 *
4 * Copyright IBM, Corp. 2010-2013
5 * Copyright Red Hat, Inc. 2015-2016
6 *
7 * Authors:
8 * Michael R. Hines <mrhines@us.ibm.com>
9 * Jiuxing Liu <jl@us.ibm.com>
10 * Daniel P. Berrange <berrange@redhat.com>
11 *
12 * This work is licensed under the terms of the GNU GPL, version 2 or
13 * later. See the COPYING file in the top-level directory.
14 *
15 */
16
17 #include "qemu/osdep.h"
18 #include "qapi/error.h"
19 #include "qemu/cutils.h"
20 #include "exec/target_page.h"
21 #include "rdma.h"
22 #include "migration.h"
23 #include "migration-stats.h"
24 #include "qemu-file.h"
25 #include "ram.h"
26 #include "qemu/error-report.h"
27 #include "qemu/main-loop.h"
28 #include "qemu/module.h"
29 #include "qemu/rcu.h"
30 #include "qemu/sockets.h"
31 #include "qemu/bitmap.h"
32 #include "qemu/coroutine.h"
33 #include "exec/memory.h"
34 #include <sys/socket.h>
35 #include <netdb.h>
36 #include <arpa/inet.h>
37 #include <rdma/rdma_cma.h>
38 #include "trace.h"
39 #include "qom/object.h"
40 #include "options.h"
41 #include <poll.h>
42
43 #define RDMA_RESOLVE_TIMEOUT_MS 10000
44
45 /* Do not merge data if larger than this. */
46 #define RDMA_MERGE_MAX (2 * 1024 * 1024)
47 #define RDMA_SIGNALED_SEND_MAX (RDMA_MERGE_MAX / 4096)
48
49 #define RDMA_REG_CHUNK_SHIFT 20 /* 1 MB */
50
51 /*
52 * This is only for non-live state being migrated.
53 * Instead of RDMA_WRITE messages, we use RDMA_SEND
54 * messages for that state, which requires a different
55 * delivery design than main memory.
56 */
57 #define RDMA_SEND_INCREMENT 32768
58
59 /*
60 * Maximum size infiniband SEND message
61 */
62 #define RDMA_CONTROL_MAX_BUFFER (512 * 1024)
63 #define RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE 4096
64
65 #define RDMA_CONTROL_VERSION_CURRENT 1
66 /*
67 * Capabilities for negotiation.
68 */
69 #define RDMA_CAPABILITY_PIN_ALL 0x01
70
71 /*
72 * Add the other flags above to this list of known capabilities
73 * as they are introduced.
74 */
75 static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL;
76
77 /*
78 * A work request ID is 64-bits and we split up these bits
79 * into 3 parts:
80 *
81 * bits 0-15 : type of control message, 2^16
82 * bits 16-29: ram block index, 2^14
83 * bits 30-63: ram block chunk number, 2^34
84 *
85 * The last two bit ranges are only used for RDMA writes,
86 * in order to track their completion and potentially
87 * also track unregistration status of the message.
88 */
89 #define RDMA_WRID_TYPE_SHIFT 0UL
90 #define RDMA_WRID_BLOCK_SHIFT 16UL
91 #define RDMA_WRID_CHUNK_SHIFT 30UL
92
93 #define RDMA_WRID_TYPE_MASK \
94 ((1UL << RDMA_WRID_BLOCK_SHIFT) - 1UL)
95
96 #define RDMA_WRID_BLOCK_MASK \
97 (~RDMA_WRID_TYPE_MASK & ((1UL << RDMA_WRID_CHUNK_SHIFT) - 1UL))
98
99 #define RDMA_WRID_CHUNK_MASK (~RDMA_WRID_BLOCK_MASK & ~RDMA_WRID_TYPE_MASK)
100
101 /*
102 * RDMA migration protocol:
103 * 1. RDMA Writes (data messages, i.e. RAM)
104 * 2. IB Send/Recv (control channel messages)
105 */
106 enum {
107 RDMA_WRID_NONE = 0,
108 RDMA_WRID_RDMA_WRITE = 1,
109 RDMA_WRID_SEND_CONTROL = 2000,
110 RDMA_WRID_RECV_CONTROL = 4000,
111 };
112
113 /*
114 * Work request IDs for IB SEND messages only (not RDMA writes).
115 * This is used by the migration protocol to transmit
116 * control messages (such as device state and registration commands)
117 *
118 * We could use more WRs, but we have enough for now.
119 */
120 enum {
121 RDMA_WRID_READY = 0,
122 RDMA_WRID_DATA,
123 RDMA_WRID_CONTROL,
124 RDMA_WRID_MAX,
125 };
126
127 /*
128 * SEND/RECV IB Control Messages.
129 */
130 enum {
131 RDMA_CONTROL_NONE = 0,
132 RDMA_CONTROL_ERROR,
133 RDMA_CONTROL_READY, /* ready to receive */
134 RDMA_CONTROL_QEMU_FILE, /* QEMUFile-transmitted bytes */
135 RDMA_CONTROL_RAM_BLOCKS_REQUEST, /* RAMBlock synchronization */
136 RDMA_CONTROL_RAM_BLOCKS_RESULT, /* RAMBlock synchronization */
137 RDMA_CONTROL_COMPRESS, /* page contains repeat values */
138 RDMA_CONTROL_REGISTER_REQUEST, /* dynamic page registration */
139 RDMA_CONTROL_REGISTER_RESULT, /* key to use after registration */
140 RDMA_CONTROL_REGISTER_FINISHED, /* current iteration finished */
141 RDMA_CONTROL_UNREGISTER_REQUEST, /* dynamic UN-registration */
142 RDMA_CONTROL_UNREGISTER_FINISHED, /* unpinning finished */
143 };
144
145
146 /*
147 * Memory and MR structures used to represent an IB Send/Recv work request.
148 * This is *not* used for RDMA writes, only IB Send/Recv.
149 */
150 typedef struct {
151 uint8_t control[RDMA_CONTROL_MAX_BUFFER]; /* actual buffer to register */
152 struct ibv_mr *control_mr; /* registration metadata */
153 size_t control_len; /* length of the message */
154 uint8_t *control_curr; /* start of unconsumed bytes */
155 } RDMAWorkRequestData;
156
157 /*
158 * Negotiate RDMA capabilities during connection-setup time.
159 */
160 typedef struct {
161 uint32_t version;
162 uint32_t flags;
163 } RDMACapabilities;
164
165 static void caps_to_network(RDMACapabilities *cap)
166 {
167 cap->version = htonl(cap->version);
168 cap->flags = htonl(cap->flags);
169 }
170
171 static void network_to_caps(RDMACapabilities *cap)
172 {
173 cap->version = ntohl(cap->version);
174 cap->flags = ntohl(cap->flags);
175 }
176
177 /*
178 * Representation of a RAMBlock from an RDMA perspective.
179 * This is not transmitted, only local.
180 * This and subsequent structures cannot be linked lists
181 * because we're using a single IB message to transmit
182 * the information. It's small anyway, so a list is overkill.
183 */
184 typedef struct RDMALocalBlock {
185 char *block_name;
186 uint8_t *local_host_addr; /* local virtual address */
187 uint64_t remote_host_addr; /* remote virtual address */
188 uint64_t offset;
189 uint64_t length;
190 struct ibv_mr **pmr; /* MRs for chunk-level registration */
191 struct ibv_mr *mr; /* MR for non-chunk-level registration */
192 uint32_t *remote_keys; /* rkeys for chunk-level registration */
193 uint32_t remote_rkey; /* rkeys for non-chunk-level registration */
194 int index; /* which block are we */
195 unsigned int src_index; /* (Only used on dest) */
196 bool is_ram_block;
197 int nb_chunks;
198 unsigned long *transit_bitmap;
199 unsigned long *unregister_bitmap;
200 } RDMALocalBlock;
201
202 /*
203 * Also represents a RAMblock, but only on the dest.
204 * This gets transmitted by the dest during connection-time
205 * to the source VM and then is used to populate the
206 * corresponding RDMALocalBlock with
207 * the information needed to perform the actual RDMA.
208 */
209 typedef struct QEMU_PACKED RDMADestBlock {
210 uint64_t remote_host_addr;
211 uint64_t offset;
212 uint64_t length;
213 uint32_t remote_rkey;
214 uint32_t padding;
215 } RDMADestBlock;
216
217 static const char *control_desc(unsigned int rdma_control)
218 {
219 static const char *strs[] = {
220 [RDMA_CONTROL_NONE] = "NONE",
221 [RDMA_CONTROL_ERROR] = "ERROR",
222 [RDMA_CONTROL_READY] = "READY",
223 [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE",
224 [RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST",
225 [RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT",
226 [RDMA_CONTROL_COMPRESS] = "COMPRESS",
227 [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST",
228 [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT",
229 [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED",
230 [RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST",
231 [RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED",
232 };
233
234 if (rdma_control > RDMA_CONTROL_UNREGISTER_FINISHED) {
235 return "??BAD CONTROL VALUE??";
236 }
237
238 return strs[rdma_control];
239 }
240
241 static uint64_t htonll(uint64_t v)
242 {
243 union { uint32_t lv[2]; uint64_t llv; } u;
244 u.lv[0] = htonl(v >> 32);
245 u.lv[1] = htonl(v & 0xFFFFFFFFULL);
246 return u.llv;
247 }
248
249 static uint64_t ntohll(uint64_t v)
250 {
251 union { uint32_t lv[2]; uint64_t llv; } u;
252 u.llv = v;
253 return ((uint64_t)ntohl(u.lv[0]) << 32) | (uint64_t) ntohl(u.lv[1]);
254 }
255
256 static void dest_block_to_network(RDMADestBlock *db)
257 {
258 db->remote_host_addr = htonll(db->remote_host_addr);
259 db->offset = htonll(db->offset);
260 db->length = htonll(db->length);
261 db->remote_rkey = htonl(db->remote_rkey);
262 }
263
264 static void network_to_dest_block(RDMADestBlock *db)
265 {
266 db->remote_host_addr = ntohll(db->remote_host_addr);
267 db->offset = ntohll(db->offset);
268 db->length = ntohll(db->length);
269 db->remote_rkey = ntohl(db->remote_rkey);
270 }
271
272 /*
273 * Virtual address of the above structures used for transmitting
274 * the RAMBlock descriptions at connection-time.
275 * This structure is *not* transmitted.
276 */
277 typedef struct RDMALocalBlocks {
278 int nb_blocks;
279 bool init; /* main memory init complete */
280 RDMALocalBlock *block;
281 } RDMALocalBlocks;
282
283 /*
284 * Main data structure for RDMA state.
285 * While there is only one copy of this structure being allocated right now,
286 * this is the place where one would start if you wanted to consider
287 * having more than one RDMA connection open at the same time.
288 */
289 typedef struct RDMAContext {
290 char *host;
291 int port;
292 char *host_port;
293
294 RDMAWorkRequestData wr_data[RDMA_WRID_MAX];
295
296 /*
297 * This is used by *_exchange_send() to figure out whether or not
298 * the initial "READY" message has already been received or not.
299 * This is because other functions may potentially poll() and detect
300 * the READY message before send() does, in which case we need to
301 * know if it completed.
302 */
303 int control_ready_expected;
304
305 /* number of outstanding writes */
306 int nb_sent;
307
308 /* store info about current buffer so that we can
309 merge it with future sends */
310 uint64_t current_addr;
311 uint64_t current_length;
312 /* index of ram block the current buffer belongs to */
313 int current_index;
314 /* index of the chunk in the current ram block */
315 int current_chunk;
316
317 bool pin_all;
318
319 /*
320 * infiniband-specific variables for opening the device
321 * and maintaining connection state and so forth.
322 *
323 * cm_id also has ibv_context, rdma_event_channel, and ibv_qp in
324 * cm_id->verbs, cm_id->channel, and cm_id->qp.
325 */
326 struct rdma_cm_id *cm_id; /* connection manager ID */
327 struct rdma_cm_id *listen_id;
328 bool connected;
329
330 struct ibv_context *verbs;
331 struct rdma_event_channel *channel;
332 struct ibv_qp *qp; /* queue pair */
333 struct ibv_comp_channel *recv_comp_channel; /* recv completion channel */
334 struct ibv_comp_channel *send_comp_channel; /* send completion channel */
335 struct ibv_pd *pd; /* protection domain */
336 struct ibv_cq *recv_cq; /* recvieve completion queue */
337 struct ibv_cq *send_cq; /* send completion queue */
338
339 /*
340 * If a previous write failed (perhaps because of a failed
341 * memory registration, then do not attempt any future work
342 * and remember the error state.
343 */
344 bool errored;
345 bool error_reported;
346 bool received_error;
347
348 /*
349 * Description of ram blocks used throughout the code.
350 */
351 RDMALocalBlocks local_ram_blocks;
352 RDMADestBlock *dest_blocks;
353
354 /* Index of the next RAMBlock received during block registration */
355 unsigned int next_src_index;
356
357 /*
358 * Migration on *destination* started.
359 * Then use coroutine yield function.
360 * Source runs in a thread, so we don't care.
361 */
362 int migration_started_on_destination;
363
364 int total_registrations;
365 int total_writes;
366
367 int unregister_current, unregister_next;
368 uint64_t unregistrations[RDMA_SIGNALED_SEND_MAX];
369
370 GHashTable *blockmap;
371
372 /* the RDMAContext for return path */
373 struct RDMAContext *return_path;
374 bool is_return_path;
375 } RDMAContext;
376
377 #define TYPE_QIO_CHANNEL_RDMA "qio-channel-rdma"
378 OBJECT_DECLARE_SIMPLE_TYPE(QIOChannelRDMA, QIO_CHANNEL_RDMA)
379
380
381
382 struct QIOChannelRDMA {
383 QIOChannel parent;
384 RDMAContext *rdmain;
385 RDMAContext *rdmaout;
386 QEMUFile *file;
387 bool blocking; /* XXX we don't actually honour this yet */
388 };
389
390 /*
391 * Main structure for IB Send/Recv control messages.
392 * This gets prepended at the beginning of every Send/Recv.
393 */
394 typedef struct QEMU_PACKED {
395 uint32_t len; /* Total length of data portion */
396 uint32_t type; /* which control command to perform */
397 uint32_t repeat; /* number of commands in data portion of same type */
398 uint32_t padding;
399 } RDMAControlHeader;
400
401 static void control_to_network(RDMAControlHeader *control)
402 {
403 control->type = htonl(control->type);
404 control->len = htonl(control->len);
405 control->repeat = htonl(control->repeat);
406 }
407
408 static void network_to_control(RDMAControlHeader *control)
409 {
410 control->type = ntohl(control->type);
411 control->len = ntohl(control->len);
412 control->repeat = ntohl(control->repeat);
413 }
414
415 /*
416 * Register a single Chunk.
417 * Information sent by the source VM to inform the dest
418 * to register an single chunk of memory before we can perform
419 * the actual RDMA operation.
420 */
421 typedef struct QEMU_PACKED {
422 union QEMU_PACKED {
423 uint64_t current_addr; /* offset into the ram_addr_t space */
424 uint64_t chunk; /* chunk to lookup if unregistering */
425 } key;
426 uint32_t current_index; /* which ramblock the chunk belongs to */
427 uint32_t padding;
428 uint64_t chunks; /* how many sequential chunks to register */
429 } RDMARegister;
430
431 static bool rdma_errored(RDMAContext *rdma)
432 {
433 if (rdma->errored && !rdma->error_reported) {
434 error_report("RDMA is in an error state waiting migration"
435 " to abort!");
436 rdma->error_reported = true;
437 }
438 return rdma->errored;
439 }
440
441 static void register_to_network(RDMAContext *rdma, RDMARegister *reg)
442 {
443 RDMALocalBlock *local_block;
444 local_block = &rdma->local_ram_blocks.block[reg->current_index];
445
446 if (local_block->is_ram_block) {
447 /*
448 * current_addr as passed in is an address in the local ram_addr_t
449 * space, we need to translate this for the destination
450 */
451 reg->key.current_addr -= local_block->offset;
452 reg->key.current_addr += rdma->dest_blocks[reg->current_index].offset;
453 }
454 reg->key.current_addr = htonll(reg->key.current_addr);
455 reg->current_index = htonl(reg->current_index);
456 reg->chunks = htonll(reg->chunks);
457 }
458
459 static void network_to_register(RDMARegister *reg)
460 {
461 reg->key.current_addr = ntohll(reg->key.current_addr);
462 reg->current_index = ntohl(reg->current_index);
463 reg->chunks = ntohll(reg->chunks);
464 }
465
466 typedef struct QEMU_PACKED {
467 uint32_t value; /* if zero, we will madvise() */
468 uint32_t block_idx; /* which ram block index */
469 uint64_t offset; /* Address in remote ram_addr_t space */
470 uint64_t length; /* length of the chunk */
471 } RDMACompress;
472
473 static void compress_to_network(RDMAContext *rdma, RDMACompress *comp)
474 {
475 comp->value = htonl(comp->value);
476 /*
477 * comp->offset as passed in is an address in the local ram_addr_t
478 * space, we need to translate this for the destination
479 */
480 comp->offset -= rdma->local_ram_blocks.block[comp->block_idx].offset;
481 comp->offset += rdma->dest_blocks[comp->block_idx].offset;
482 comp->block_idx = htonl(comp->block_idx);
483 comp->offset = htonll(comp->offset);
484 comp->length = htonll(comp->length);
485 }
486
487 static void network_to_compress(RDMACompress *comp)
488 {
489 comp->value = ntohl(comp->value);
490 comp->block_idx = ntohl(comp->block_idx);
491 comp->offset = ntohll(comp->offset);
492 comp->length = ntohll(comp->length);
493 }
494
495 /*
496 * The result of the dest's memory registration produces an "rkey"
497 * which the source VM must reference in order to perform
498 * the RDMA operation.
499 */
500 typedef struct QEMU_PACKED {
501 uint32_t rkey;
502 uint32_t padding;
503 uint64_t host_addr;
504 } RDMARegisterResult;
505
506 static void result_to_network(RDMARegisterResult *result)
507 {
508 result->rkey = htonl(result->rkey);
509 result->host_addr = htonll(result->host_addr);
510 };
511
512 static void network_to_result(RDMARegisterResult *result)
513 {
514 result->rkey = ntohl(result->rkey);
515 result->host_addr = ntohll(result->host_addr);
516 };
517
518 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
519 uint8_t *data, RDMAControlHeader *resp,
520 int *resp_idx,
521 int (*callback)(RDMAContext *rdma));
522
523 static inline uint64_t ram_chunk_index(const uint8_t *start,
524 const uint8_t *host)
525 {
526 return ((uintptr_t) host - (uintptr_t) start) >> RDMA_REG_CHUNK_SHIFT;
527 }
528
529 static inline uint8_t *ram_chunk_start(const RDMALocalBlock *rdma_ram_block,
530 uint64_t i)
531 {
532 return (uint8_t *)(uintptr_t)(rdma_ram_block->local_host_addr +
533 (i << RDMA_REG_CHUNK_SHIFT));
534 }
535
536 static inline uint8_t *ram_chunk_end(const RDMALocalBlock *rdma_ram_block,
537 uint64_t i)
538 {
539 uint8_t *result = ram_chunk_start(rdma_ram_block, i) +
540 (1UL << RDMA_REG_CHUNK_SHIFT);
541
542 if (result > (rdma_ram_block->local_host_addr + rdma_ram_block->length)) {
543 result = rdma_ram_block->local_host_addr + rdma_ram_block->length;
544 }
545
546 return result;
547 }
548
549 static void rdma_add_block(RDMAContext *rdma, const char *block_name,
550 void *host_addr,
551 ram_addr_t block_offset, uint64_t length)
552 {
553 RDMALocalBlocks *local = &rdma->local_ram_blocks;
554 RDMALocalBlock *block;
555 RDMALocalBlock *old = local->block;
556
557 local->block = g_new0(RDMALocalBlock, local->nb_blocks + 1);
558
559 if (local->nb_blocks) {
560 int x;
561
562 if (rdma->blockmap) {
563 for (x = 0; x < local->nb_blocks; x++) {
564 g_hash_table_remove(rdma->blockmap,
565 (void *)(uintptr_t)old[x].offset);
566 g_hash_table_insert(rdma->blockmap,
567 (void *)(uintptr_t)old[x].offset,
568 &local->block[x]);
569 }
570 }
571 memcpy(local->block, old, sizeof(RDMALocalBlock) * local->nb_blocks);
572 g_free(old);
573 }
574
575 block = &local->block[local->nb_blocks];
576
577 block->block_name = g_strdup(block_name);
578 block->local_host_addr = host_addr;
579 block->offset = block_offset;
580 block->length = length;
581 block->index = local->nb_blocks;
582 block->src_index = ~0U; /* Filled in by the receipt of the block list */
583 block->nb_chunks = ram_chunk_index(host_addr, host_addr + length) + 1UL;
584 block->transit_bitmap = bitmap_new(block->nb_chunks);
585 bitmap_clear(block->transit_bitmap, 0, block->nb_chunks);
586 block->unregister_bitmap = bitmap_new(block->nb_chunks);
587 bitmap_clear(block->unregister_bitmap, 0, block->nb_chunks);
588 block->remote_keys = g_new0(uint32_t, block->nb_chunks);
589
590 block->is_ram_block = local->init ? false : true;
591
592 if (rdma->blockmap) {
593 g_hash_table_insert(rdma->blockmap, (void *)(uintptr_t)block_offset, block);
594 }
595
596 trace_rdma_add_block(block_name, local->nb_blocks,
597 (uintptr_t) block->local_host_addr,
598 block->offset, block->length,
599 (uintptr_t) (block->local_host_addr + block->length),
600 BITS_TO_LONGS(block->nb_chunks) *
601 sizeof(unsigned long) * 8,
602 block->nb_chunks);
603
604 local->nb_blocks++;
605 }
606
607 /*
608 * Memory regions need to be registered with the device and queue pairs setup
609 * in advanced before the migration starts. This tells us where the RAM blocks
610 * are so that we can register them individually.
611 */
612 static int qemu_rdma_init_one_block(RAMBlock *rb, void *opaque)
613 {
614 const char *block_name = qemu_ram_get_idstr(rb);
615 void *host_addr = qemu_ram_get_host_addr(rb);
616 ram_addr_t block_offset = qemu_ram_get_offset(rb);
617 ram_addr_t length = qemu_ram_get_used_length(rb);
618 rdma_add_block(opaque, block_name, host_addr, block_offset, length);
619 return 0;
620 }
621
622 /*
623 * Identify the RAMBlocks and their quantity. They will be references to
624 * identify chunk boundaries inside each RAMBlock and also be referenced
625 * during dynamic page registration.
626 */
627 static void qemu_rdma_init_ram_blocks(RDMAContext *rdma)
628 {
629 RDMALocalBlocks *local = &rdma->local_ram_blocks;
630 int ret;
631
632 assert(rdma->blockmap == NULL);
633 memset(local, 0, sizeof *local);
634 ret = foreach_not_ignored_block(qemu_rdma_init_one_block, rdma);
635 assert(!ret);
636 trace_qemu_rdma_init_ram_blocks(local->nb_blocks);
637 rdma->dest_blocks = g_new0(RDMADestBlock,
638 rdma->local_ram_blocks.nb_blocks);
639 local->init = true;
640 }
641
642 /*
643 * Note: If used outside of cleanup, the caller must ensure that the destination
644 * block structures are also updated
645 */
646 static void rdma_delete_block(RDMAContext *rdma, RDMALocalBlock *block)
647 {
648 RDMALocalBlocks *local = &rdma->local_ram_blocks;
649 RDMALocalBlock *old = local->block;
650 int x;
651
652 if (rdma->blockmap) {
653 g_hash_table_remove(rdma->blockmap, (void *)(uintptr_t)block->offset);
654 }
655 if (block->pmr) {
656 int j;
657
658 for (j = 0; j < block->nb_chunks; j++) {
659 if (!block->pmr[j]) {
660 continue;
661 }
662 ibv_dereg_mr(block->pmr[j]);
663 rdma->total_registrations--;
664 }
665 g_free(block->pmr);
666 block->pmr = NULL;
667 }
668
669 if (block->mr) {
670 ibv_dereg_mr(block->mr);
671 rdma->total_registrations--;
672 block->mr = NULL;
673 }
674
675 g_free(block->transit_bitmap);
676 block->transit_bitmap = NULL;
677
678 g_free(block->unregister_bitmap);
679 block->unregister_bitmap = NULL;
680
681 g_free(block->remote_keys);
682 block->remote_keys = NULL;
683
684 g_free(block->block_name);
685 block->block_name = NULL;
686
687 if (rdma->blockmap) {
688 for (x = 0; x < local->nb_blocks; x++) {
689 g_hash_table_remove(rdma->blockmap,
690 (void *)(uintptr_t)old[x].offset);
691 }
692 }
693
694 if (local->nb_blocks > 1) {
695
696 local->block = g_new0(RDMALocalBlock, local->nb_blocks - 1);
697
698 if (block->index) {
699 memcpy(local->block, old, sizeof(RDMALocalBlock) * block->index);
700 }
701
702 if (block->index < (local->nb_blocks - 1)) {
703 memcpy(local->block + block->index, old + (block->index + 1),
704 sizeof(RDMALocalBlock) *
705 (local->nb_blocks - (block->index + 1)));
706 for (x = block->index; x < local->nb_blocks - 1; x++) {
707 local->block[x].index--;
708 }
709 }
710 } else {
711 assert(block == local->block);
712 local->block = NULL;
713 }
714
715 trace_rdma_delete_block(block, (uintptr_t)block->local_host_addr,
716 block->offset, block->length,
717 (uintptr_t)(block->local_host_addr + block->length),
718 BITS_TO_LONGS(block->nb_chunks) *
719 sizeof(unsigned long) * 8, block->nb_chunks);
720
721 g_free(old);
722
723 local->nb_blocks--;
724
725 if (local->nb_blocks && rdma->blockmap) {
726 for (x = 0; x < local->nb_blocks; x++) {
727 g_hash_table_insert(rdma->blockmap,
728 (void *)(uintptr_t)local->block[x].offset,
729 &local->block[x]);
730 }
731 }
732 }
733
734 /*
735 * Put in the log file which RDMA device was opened and the details
736 * associated with that device.
737 */
738 static void qemu_rdma_dump_id(const char *who, struct ibv_context *verbs)
739 {
740 struct ibv_port_attr port;
741
742 if (ibv_query_port(verbs, 1, &port)) {
743 error_report("Failed to query port information");
744 return;
745 }
746
747 printf("%s RDMA Device opened: kernel name %s "
748 "uverbs device name %s, "
749 "infiniband_verbs class device path %s, "
750 "infiniband class device path %s, "
751 "transport: (%d) %s\n",
752 who,
753 verbs->device->name,
754 verbs->device->dev_name,
755 verbs->device->dev_path,
756 verbs->device->ibdev_path,
757 port.link_layer,
758 (port.link_layer == IBV_LINK_LAYER_INFINIBAND) ? "Infiniband" :
759 ((port.link_layer == IBV_LINK_LAYER_ETHERNET)
760 ? "Ethernet" : "Unknown"));
761 }
762
763 /*
764 * Put in the log file the RDMA gid addressing information,
765 * useful for folks who have trouble understanding the
766 * RDMA device hierarchy in the kernel.
767 */
768 static void qemu_rdma_dump_gid(const char *who, struct rdma_cm_id *id)
769 {
770 char sgid[33];
771 char dgid[33];
772 inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.sgid, sgid, sizeof sgid);
773 inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.dgid, dgid, sizeof dgid);
774 trace_qemu_rdma_dump_gid(who, sgid, dgid);
775 }
776
777 /*
778 * As of now, IPv6 over RoCE / iWARP is not supported by linux.
779 * We will try the next addrinfo struct, and fail if there are
780 * no other valid addresses to bind against.
781 *
782 * If user is listening on '[::]', then we will not have a opened a device
783 * yet and have no way of verifying if the device is RoCE or not.
784 *
785 * In this case, the source VM will throw an error for ALL types of
786 * connections (both IPv4 and IPv6) if the destination machine does not have
787 * a regular infiniband network available for use.
788 *
789 * The only way to guarantee that an error is thrown for broken kernels is
790 * for the management software to choose a *specific* interface at bind time
791 * and validate what time of hardware it is.
792 *
793 * Unfortunately, this puts the user in a fix:
794 *
795 * If the source VM connects with an IPv4 address without knowing that the
796 * destination has bound to '[::]' the migration will unconditionally fail
797 * unless the management software is explicitly listening on the IPv4
798 * address while using a RoCE-based device.
799 *
800 * If the source VM connects with an IPv6 address, then we're OK because we can
801 * throw an error on the source (and similarly on the destination).
802 *
803 * But in mixed environments, this will be broken for a while until it is fixed
804 * inside linux.
805 *
806 * We do provide a *tiny* bit of help in this function: We can list all of the
807 * devices in the system and check to see if all the devices are RoCE or
808 * Infiniband.
809 *
810 * If we detect that we have a *pure* RoCE environment, then we can safely
811 * thrown an error even if the management software has specified '[::]' as the
812 * bind address.
813 *
814 * However, if there is are multiple hetergeneous devices, then we cannot make
815 * this assumption and the user just has to be sure they know what they are
816 * doing.
817 *
818 * Patches are being reviewed on linux-rdma.
819 */
820 static int qemu_rdma_broken_ipv6_kernel(struct ibv_context *verbs, Error **errp)
821 {
822 /* This bug only exists in linux, to our knowledge. */
823 #ifdef CONFIG_LINUX
824 struct ibv_port_attr port_attr;
825
826 /*
827 * Verbs are only NULL if management has bound to '[::]'.
828 *
829 * Let's iterate through all the devices and see if there any pure IB
830 * devices (non-ethernet).
831 *
832 * If not, then we can safely proceed with the migration.
833 * Otherwise, there are no guarantees until the bug is fixed in linux.
834 */
835 if (!verbs) {
836 int num_devices, x;
837 struct ibv_device **dev_list = ibv_get_device_list(&num_devices);
838 bool roce_found = false;
839 bool ib_found = false;
840
841 for (x = 0; x < num_devices; x++) {
842 verbs = ibv_open_device(dev_list[x]);
843 /*
844 * ibv_open_device() is not documented to set errno. If
845 * it does, it's somebody else's doc bug. If it doesn't,
846 * the use of errno below is wrong.
847 * TODO Find out whether ibv_open_device() sets errno.
848 */
849 if (!verbs) {
850 if (errno == EPERM) {
851 continue;
852 } else {
853 error_setg_errno(errp, errno,
854 "could not open RDMA device context");
855 return -1;
856 }
857 }
858
859 if (ibv_query_port(verbs, 1, &port_attr)) {
860 ibv_close_device(verbs);
861 error_setg(errp,
862 "RDMA ERROR: Could not query initial IB port");
863 return -1;
864 }
865
866 if (port_attr.link_layer == IBV_LINK_LAYER_INFINIBAND) {
867 ib_found = true;
868 } else if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) {
869 roce_found = true;
870 }
871
872 ibv_close_device(verbs);
873
874 }
875
876 if (roce_found) {
877 if (ib_found) {
878 fprintf(stderr, "WARN: migrations may fail:"
879 " IPv6 over RoCE / iWARP in linux"
880 " is broken. But since you appear to have a"
881 " mixed RoCE / IB environment, be sure to only"
882 " migrate over the IB fabric until the kernel "
883 " fixes the bug.\n");
884 } else {
885 error_setg(errp, "RDMA ERROR: "
886 "You only have RoCE / iWARP devices in your systems"
887 " and your management software has specified '[::]'"
888 ", but IPv6 over RoCE / iWARP is not supported in Linux.");
889 return -1;
890 }
891 }
892
893 return 0;
894 }
895
896 /*
897 * If we have a verbs context, that means that some other than '[::]' was
898 * used by the management software for binding. In which case we can
899 * actually warn the user about a potentially broken kernel.
900 */
901
902 /* IB ports start with 1, not 0 */
903 if (ibv_query_port(verbs, 1, &port_attr)) {
904 error_setg(errp, "RDMA ERROR: Could not query initial IB port");
905 return -1;
906 }
907
908 if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) {
909 error_setg(errp, "RDMA ERROR: "
910 "Linux kernel's RoCE / iWARP does not support IPv6 "
911 "(but patches on linux-rdma in progress)");
912 return -1;
913 }
914
915 #endif
916
917 return 0;
918 }
919
920 /*
921 * Figure out which RDMA device corresponds to the requested IP hostname
922 * Also create the initial connection manager identifiers for opening
923 * the connection.
924 */
925 static int qemu_rdma_resolve_host(RDMAContext *rdma, Error **errp)
926 {
927 Error *err = NULL;
928 int ret;
929 struct rdma_addrinfo *res;
930 char port_str[16];
931 struct rdma_cm_event *cm_event;
932 char ip[40] = "unknown";
933 struct rdma_addrinfo *e;
934
935 if (rdma->host == NULL || !strcmp(rdma->host, "")) {
936 error_setg(errp, "RDMA ERROR: RDMA hostname has not been set");
937 return -1;
938 }
939
940 /* create CM channel */
941 rdma->channel = rdma_create_event_channel();
942 if (!rdma->channel) {
943 error_setg(errp, "RDMA ERROR: could not create CM channel");
944 return -1;
945 }
946
947 /* create CM id */
948 ret = rdma_create_id(rdma->channel, &rdma->cm_id, NULL, RDMA_PS_TCP);
949 if (ret < 0) {
950 error_setg(errp, "RDMA ERROR: could not create channel id");
951 goto err_resolve_create_id;
952 }
953
954 snprintf(port_str, 16, "%d", rdma->port);
955 port_str[15] = '\0';
956
957 ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res);
958 if (ret) {
959 error_setg(errp, "RDMA ERROR: could not rdma_getaddrinfo address %s",
960 rdma->host);
961 goto err_resolve_get_addr;
962 }
963
964 /* Try all addresses, saving the first error in @err */
965 for (e = res; e != NULL; e = e->ai_next) {
966 Error **local_errp = err ? NULL : &err;
967
968 inet_ntop(e->ai_family,
969 &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip);
970 trace_qemu_rdma_resolve_host_trying(rdma->host, ip);
971
972 ret = rdma_resolve_addr(rdma->cm_id, NULL, e->ai_dst_addr,
973 RDMA_RESOLVE_TIMEOUT_MS);
974 if (ret >= 0) {
975 if (e->ai_family == AF_INET6) {
976 ret = qemu_rdma_broken_ipv6_kernel(rdma->cm_id->verbs,
977 local_errp);
978 if (ret < 0) {
979 continue;
980 }
981 }
982 error_free(err);
983 goto route;
984 }
985 }
986
987 rdma_freeaddrinfo(res);
988 if (err) {
989 error_propagate(errp, err);
990 } else {
991 error_setg(errp, "RDMA ERROR: could not resolve address %s",
992 rdma->host);
993 }
994 goto err_resolve_get_addr;
995
996 route:
997 rdma_freeaddrinfo(res);
998 qemu_rdma_dump_gid("source_resolve_addr", rdma->cm_id);
999
1000 ret = rdma_get_cm_event(rdma->channel, &cm_event);
1001 if (ret < 0) {
1002 error_setg(errp, "RDMA ERROR: could not perform event_addr_resolved");
1003 goto err_resolve_get_addr;
1004 }
1005
1006 if (cm_event->event != RDMA_CM_EVENT_ADDR_RESOLVED) {
1007 error_setg(errp,
1008 "RDMA ERROR: result not equal to event_addr_resolved %s",
1009 rdma_event_str(cm_event->event));
1010 error_report("rdma_resolve_addr");
1011 rdma_ack_cm_event(cm_event);
1012 goto err_resolve_get_addr;
1013 }
1014 rdma_ack_cm_event(cm_event);
1015
1016 /* resolve route */
1017 ret = rdma_resolve_route(rdma->cm_id, RDMA_RESOLVE_TIMEOUT_MS);
1018 if (ret < 0) {
1019 error_setg(errp, "RDMA ERROR: could not resolve rdma route");
1020 goto err_resolve_get_addr;
1021 }
1022
1023 ret = rdma_get_cm_event(rdma->channel, &cm_event);
1024 if (ret < 0) {
1025 error_setg(errp, "RDMA ERROR: could not perform event_route_resolved");
1026 goto err_resolve_get_addr;
1027 }
1028 if (cm_event->event != RDMA_CM_EVENT_ROUTE_RESOLVED) {
1029 error_setg(errp, "RDMA ERROR: "
1030 "result not equal to event_route_resolved: %s",
1031 rdma_event_str(cm_event->event));
1032 rdma_ack_cm_event(cm_event);
1033 goto err_resolve_get_addr;
1034 }
1035 rdma_ack_cm_event(cm_event);
1036 rdma->verbs = rdma->cm_id->verbs;
1037 qemu_rdma_dump_id("source_resolve_host", rdma->cm_id->verbs);
1038 qemu_rdma_dump_gid("source_resolve_host", rdma->cm_id);
1039 return 0;
1040
1041 err_resolve_get_addr:
1042 rdma_destroy_id(rdma->cm_id);
1043 rdma->cm_id = NULL;
1044 err_resolve_create_id:
1045 rdma_destroy_event_channel(rdma->channel);
1046 rdma->channel = NULL;
1047 return -1;
1048 }
1049
1050 /*
1051 * Create protection domain and completion queues
1052 */
1053 static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma)
1054 {
1055 /* allocate pd */
1056 rdma->pd = ibv_alloc_pd(rdma->verbs);
1057 if (!rdma->pd) {
1058 error_report("failed to allocate protection domain");
1059 return -1;
1060 }
1061
1062 /* create receive completion channel */
1063 rdma->recv_comp_channel = ibv_create_comp_channel(rdma->verbs);
1064 if (!rdma->recv_comp_channel) {
1065 error_report("failed to allocate receive completion channel");
1066 goto err_alloc_pd_cq;
1067 }
1068
1069 /*
1070 * Completion queue can be filled by read work requests.
1071 */
1072 rdma->recv_cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3),
1073 NULL, rdma->recv_comp_channel, 0);
1074 if (!rdma->recv_cq) {
1075 error_report("failed to allocate receive completion queue");
1076 goto err_alloc_pd_cq;
1077 }
1078
1079 /* create send completion channel */
1080 rdma->send_comp_channel = ibv_create_comp_channel(rdma->verbs);
1081 if (!rdma->send_comp_channel) {
1082 error_report("failed to allocate send completion channel");
1083 goto err_alloc_pd_cq;
1084 }
1085
1086 rdma->send_cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3),
1087 NULL, rdma->send_comp_channel, 0);
1088 if (!rdma->send_cq) {
1089 error_report("failed to allocate send completion queue");
1090 goto err_alloc_pd_cq;
1091 }
1092
1093 return 0;
1094
1095 err_alloc_pd_cq:
1096 if (rdma->pd) {
1097 ibv_dealloc_pd(rdma->pd);
1098 }
1099 if (rdma->recv_comp_channel) {
1100 ibv_destroy_comp_channel(rdma->recv_comp_channel);
1101 }
1102 if (rdma->send_comp_channel) {
1103 ibv_destroy_comp_channel(rdma->send_comp_channel);
1104 }
1105 if (rdma->recv_cq) {
1106 ibv_destroy_cq(rdma->recv_cq);
1107 rdma->recv_cq = NULL;
1108 }
1109 rdma->pd = NULL;
1110 rdma->recv_comp_channel = NULL;
1111 rdma->send_comp_channel = NULL;
1112 return -1;
1113
1114 }
1115
1116 /*
1117 * Create queue pairs.
1118 */
1119 static int qemu_rdma_alloc_qp(RDMAContext *rdma)
1120 {
1121 struct ibv_qp_init_attr attr = { 0 };
1122 int ret;
1123
1124 attr.cap.max_send_wr = RDMA_SIGNALED_SEND_MAX;
1125 attr.cap.max_recv_wr = 3;
1126 attr.cap.max_send_sge = 1;
1127 attr.cap.max_recv_sge = 1;
1128 attr.send_cq = rdma->send_cq;
1129 attr.recv_cq = rdma->recv_cq;
1130 attr.qp_type = IBV_QPT_RC;
1131
1132 ret = rdma_create_qp(rdma->cm_id, rdma->pd, &attr);
1133 if (ret < 0) {
1134 return -1;
1135 }
1136
1137 rdma->qp = rdma->cm_id->qp;
1138 return 0;
1139 }
1140
1141 /* Check whether On-Demand Paging is supported by RDAM device */
1142 static bool rdma_support_odp(struct ibv_context *dev)
1143 {
1144 struct ibv_device_attr_ex attr = {0};
1145 int ret = ibv_query_device_ex(dev, NULL, &attr);
1146 if (ret) {
1147 return false;
1148 }
1149
1150 if (attr.odp_caps.general_caps & IBV_ODP_SUPPORT) {
1151 return true;
1152 }
1153
1154 return false;
1155 }
1156
1157 /*
1158 * ibv_advise_mr to avoid RNR NAK error as far as possible.
1159 * The responder mr registering with ODP will sent RNR NAK back to
1160 * the requester in the face of the page fault.
1161 */
1162 static void qemu_rdma_advise_prefetch_mr(struct ibv_pd *pd, uint64_t addr,
1163 uint32_t len, uint32_t lkey,
1164 const char *name, bool wr)
1165 {
1166 #ifdef HAVE_IBV_ADVISE_MR
1167 int ret;
1168 int advice = wr ? IBV_ADVISE_MR_ADVICE_PREFETCH_WRITE :
1169 IBV_ADVISE_MR_ADVICE_PREFETCH;
1170 struct ibv_sge sg_list = {.lkey = lkey, .addr = addr, .length = len};
1171
1172 ret = ibv_advise_mr(pd, advice,
1173 IBV_ADVISE_MR_FLAG_FLUSH, &sg_list, 1);
1174 /* ignore the error */
1175 trace_qemu_rdma_advise_mr(name, len, addr, strerror(ret));
1176 #endif
1177 }
1178
1179 static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma)
1180 {
1181 int i;
1182 RDMALocalBlocks *local = &rdma->local_ram_blocks;
1183
1184 for (i = 0; i < local->nb_blocks; i++) {
1185 int access = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE;
1186
1187 local->block[i].mr =
1188 ibv_reg_mr(rdma->pd,
1189 local->block[i].local_host_addr,
1190 local->block[i].length, access
1191 );
1192 /*
1193 * ibv_reg_mr() is not documented to set errno. If it does,
1194 * it's somebody else's doc bug. If it doesn't, the use of
1195 * errno below is wrong.
1196 * TODO Find out whether ibv_reg_mr() sets errno.
1197 */
1198 if (!local->block[i].mr &&
1199 errno == ENOTSUP && rdma_support_odp(rdma->verbs)) {
1200 access |= IBV_ACCESS_ON_DEMAND;
1201 /* register ODP mr */
1202 local->block[i].mr =
1203 ibv_reg_mr(rdma->pd,
1204 local->block[i].local_host_addr,
1205 local->block[i].length, access);
1206 trace_qemu_rdma_register_odp_mr(local->block[i].block_name);
1207
1208 if (local->block[i].mr) {
1209 qemu_rdma_advise_prefetch_mr(rdma->pd,
1210 (uintptr_t)local->block[i].local_host_addr,
1211 local->block[i].length,
1212 local->block[i].mr->lkey,
1213 local->block[i].block_name,
1214 true);
1215 }
1216 }
1217
1218 if (!local->block[i].mr) {
1219 perror("Failed to register local dest ram block!");
1220 break;
1221 }
1222 rdma->total_registrations++;
1223 }
1224
1225 if (i >= local->nb_blocks) {
1226 return 0;
1227 }
1228
1229 for (i--; i >= 0; i--) {
1230 ibv_dereg_mr(local->block[i].mr);
1231 local->block[i].mr = NULL;
1232 rdma->total_registrations--;
1233 }
1234
1235 return -1;
1236
1237 }
1238
1239 /*
1240 * Find the ram block that corresponds to the page requested to be
1241 * transmitted by QEMU.
1242 *
1243 * Once the block is found, also identify which 'chunk' within that
1244 * block that the page belongs to.
1245 */
1246 static void qemu_rdma_search_ram_block(RDMAContext *rdma,
1247 uintptr_t block_offset,
1248 uint64_t offset,
1249 uint64_t length,
1250 uint64_t *block_index,
1251 uint64_t *chunk_index)
1252 {
1253 uint64_t current_addr = block_offset + offset;
1254 RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap,
1255 (void *) block_offset);
1256 assert(block);
1257 assert(current_addr >= block->offset);
1258 assert((current_addr + length) <= (block->offset + block->length));
1259
1260 *block_index = block->index;
1261 *chunk_index = ram_chunk_index(block->local_host_addr,
1262 block->local_host_addr + (current_addr - block->offset));
1263 }
1264
1265 /*
1266 * Register a chunk with IB. If the chunk was already registered
1267 * previously, then skip.
1268 *
1269 * Also return the keys associated with the registration needed
1270 * to perform the actual RDMA operation.
1271 */
1272 static int qemu_rdma_register_and_get_keys(RDMAContext *rdma,
1273 RDMALocalBlock *block, uintptr_t host_addr,
1274 uint32_t *lkey, uint32_t *rkey, int chunk,
1275 uint8_t *chunk_start, uint8_t *chunk_end)
1276 {
1277 if (block->mr) {
1278 if (lkey) {
1279 *lkey = block->mr->lkey;
1280 }
1281 if (rkey) {
1282 *rkey = block->mr->rkey;
1283 }
1284 return 0;
1285 }
1286
1287 /* allocate memory to store chunk MRs */
1288 if (!block->pmr) {
1289 block->pmr = g_new0(struct ibv_mr *, block->nb_chunks);
1290 }
1291
1292 /*
1293 * If 'rkey', then we're the destination, so grant access to the source.
1294 *
1295 * If 'lkey', then we're the source VM, so grant access only to ourselves.
1296 */
1297 if (!block->pmr[chunk]) {
1298 uint64_t len = chunk_end - chunk_start;
1299 int access = rkey ? IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE :
1300 0;
1301
1302 trace_qemu_rdma_register_and_get_keys(len, chunk_start);
1303
1304 block->pmr[chunk] = ibv_reg_mr(rdma->pd, chunk_start, len, access);
1305 /*
1306 * ibv_reg_mr() is not documented to set errno. If it does,
1307 * it's somebody else's doc bug. If it doesn't, the use of
1308 * errno below is wrong.
1309 * TODO Find out whether ibv_reg_mr() sets errno.
1310 */
1311 if (!block->pmr[chunk] &&
1312 errno == ENOTSUP && rdma_support_odp(rdma->verbs)) {
1313 access |= IBV_ACCESS_ON_DEMAND;
1314 /* register ODP mr */
1315 block->pmr[chunk] = ibv_reg_mr(rdma->pd, chunk_start, len, access);
1316 trace_qemu_rdma_register_odp_mr(block->block_name);
1317
1318 if (block->pmr[chunk]) {
1319 qemu_rdma_advise_prefetch_mr(rdma->pd, (uintptr_t)chunk_start,
1320 len, block->pmr[chunk]->lkey,
1321 block->block_name, rkey);
1322
1323 }
1324 }
1325 }
1326 if (!block->pmr[chunk]) {
1327 perror("Failed to register chunk!");
1328 fprintf(stderr, "Chunk details: block: %d chunk index %d"
1329 " start %" PRIuPTR " end %" PRIuPTR
1330 " host %" PRIuPTR
1331 " local %" PRIuPTR " registrations: %d\n",
1332 block->index, chunk, (uintptr_t)chunk_start,
1333 (uintptr_t)chunk_end, host_addr,
1334 (uintptr_t)block->local_host_addr,
1335 rdma->total_registrations);
1336 return -1;
1337 }
1338 rdma->total_registrations++;
1339
1340 if (lkey) {
1341 *lkey = block->pmr[chunk]->lkey;
1342 }
1343 if (rkey) {
1344 *rkey = block->pmr[chunk]->rkey;
1345 }
1346 return 0;
1347 }
1348
1349 /*
1350 * Register (at connection time) the memory used for control
1351 * channel messages.
1352 */
1353 static int qemu_rdma_reg_control(RDMAContext *rdma, int idx)
1354 {
1355 rdma->wr_data[idx].control_mr = ibv_reg_mr(rdma->pd,
1356 rdma->wr_data[idx].control, RDMA_CONTROL_MAX_BUFFER,
1357 IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
1358 if (rdma->wr_data[idx].control_mr) {
1359 rdma->total_registrations++;
1360 return 0;
1361 }
1362 error_report("qemu_rdma_reg_control failed");
1363 return -1;
1364 }
1365
1366 /*
1367 * Perform a non-optimized memory unregistration after every transfer
1368 * for demonstration purposes, only if pin-all is not requested.
1369 *
1370 * Potential optimizations:
1371 * 1. Start a new thread to run this function continuously
1372 - for bit clearing
1373 - and for receipt of unregister messages
1374 * 2. Use an LRU.
1375 * 3. Use workload hints.
1376 */
1377 static int qemu_rdma_unregister_waiting(RDMAContext *rdma)
1378 {
1379 while (rdma->unregistrations[rdma->unregister_current]) {
1380 int ret;
1381 uint64_t wr_id = rdma->unregistrations[rdma->unregister_current];
1382 uint64_t chunk =
1383 (wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
1384 uint64_t index =
1385 (wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
1386 RDMALocalBlock *block =
1387 &(rdma->local_ram_blocks.block[index]);
1388 RDMARegister reg = { .current_index = index };
1389 RDMAControlHeader resp = { .type = RDMA_CONTROL_UNREGISTER_FINISHED,
1390 };
1391 RDMAControlHeader head = { .len = sizeof(RDMARegister),
1392 .type = RDMA_CONTROL_UNREGISTER_REQUEST,
1393 .repeat = 1,
1394 };
1395
1396 trace_qemu_rdma_unregister_waiting_proc(chunk,
1397 rdma->unregister_current);
1398
1399 rdma->unregistrations[rdma->unregister_current] = 0;
1400 rdma->unregister_current++;
1401
1402 if (rdma->unregister_current == RDMA_SIGNALED_SEND_MAX) {
1403 rdma->unregister_current = 0;
1404 }
1405
1406
1407 /*
1408 * Unregistration is speculative (because migration is single-threaded
1409 * and we cannot break the protocol's inifinband message ordering).
1410 * Thus, if the memory is currently being used for transmission,
1411 * then abort the attempt to unregister and try again
1412 * later the next time a completion is received for this memory.
1413 */
1414 clear_bit(chunk, block->unregister_bitmap);
1415
1416 if (test_bit(chunk, block->transit_bitmap)) {
1417 trace_qemu_rdma_unregister_waiting_inflight(chunk);
1418 continue;
1419 }
1420
1421 trace_qemu_rdma_unregister_waiting_send(chunk);
1422
1423 ret = ibv_dereg_mr(block->pmr[chunk]);
1424 block->pmr[chunk] = NULL;
1425 block->remote_keys[chunk] = 0;
1426
1427 if (ret != 0) {
1428 /*
1429 * FIXME perror() is problematic, bcause ibv_dereg_mr() is
1430 * not documented to set errno. Will go away later in
1431 * this series.
1432 */
1433 perror("unregistration chunk failed");
1434 return -1;
1435 }
1436 rdma->total_registrations--;
1437
1438 reg.key.chunk = chunk;
1439 register_to_network(rdma, &reg);
1440 ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
1441 &resp, NULL, NULL);
1442 if (ret < 0) {
1443 return -1;
1444 }
1445
1446 trace_qemu_rdma_unregister_waiting_complete(chunk);
1447 }
1448
1449 return 0;
1450 }
1451
1452 static uint64_t qemu_rdma_make_wrid(uint64_t wr_id, uint64_t index,
1453 uint64_t chunk)
1454 {
1455 uint64_t result = wr_id & RDMA_WRID_TYPE_MASK;
1456
1457 result |= (index << RDMA_WRID_BLOCK_SHIFT);
1458 result |= (chunk << RDMA_WRID_CHUNK_SHIFT);
1459
1460 return result;
1461 }
1462
1463 /*
1464 * Consult the connection manager to see a work request
1465 * (of any kind) has completed.
1466 * Return the work request ID that completed.
1467 */
1468 static int qemu_rdma_poll(RDMAContext *rdma, struct ibv_cq *cq,
1469 uint64_t *wr_id_out, uint32_t *byte_len)
1470 {
1471 int ret;
1472 struct ibv_wc wc;
1473 uint64_t wr_id;
1474
1475 ret = ibv_poll_cq(cq, 1, &wc);
1476
1477 if (!ret) {
1478 *wr_id_out = RDMA_WRID_NONE;
1479 return 0;
1480 }
1481
1482 if (ret < 0) {
1483 error_report("ibv_poll_cq failed");
1484 return -1;
1485 }
1486
1487 wr_id = wc.wr_id & RDMA_WRID_TYPE_MASK;
1488
1489 if (wc.status != IBV_WC_SUCCESS) {
1490 fprintf(stderr, "ibv_poll_cq wc.status=%d %s!\n",
1491 wc.status, ibv_wc_status_str(wc.status));
1492 fprintf(stderr, "ibv_poll_cq wrid=%" PRIu64 "!\n", wr_id);
1493
1494 return -1;
1495 }
1496
1497 if (rdma->control_ready_expected &&
1498 (wr_id >= RDMA_WRID_RECV_CONTROL)) {
1499 trace_qemu_rdma_poll_recv(wr_id - RDMA_WRID_RECV_CONTROL, wr_id,
1500 rdma->nb_sent);
1501 rdma->control_ready_expected = 0;
1502 }
1503
1504 if (wr_id == RDMA_WRID_RDMA_WRITE) {
1505 uint64_t chunk =
1506 (wc.wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
1507 uint64_t index =
1508 (wc.wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
1509 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
1510
1511 trace_qemu_rdma_poll_write(wr_id, rdma->nb_sent,
1512 index, chunk, block->local_host_addr,
1513 (void *)(uintptr_t)block->remote_host_addr);
1514
1515 clear_bit(chunk, block->transit_bitmap);
1516
1517 if (rdma->nb_sent > 0) {
1518 rdma->nb_sent--;
1519 }
1520 } else {
1521 trace_qemu_rdma_poll_other(wr_id, rdma->nb_sent);
1522 }
1523
1524 *wr_id_out = wc.wr_id;
1525 if (byte_len) {
1526 *byte_len = wc.byte_len;
1527 }
1528
1529 return 0;
1530 }
1531
1532 /* Wait for activity on the completion channel.
1533 * Returns 0 on success, none-0 on error.
1534 */
1535 static int qemu_rdma_wait_comp_channel(RDMAContext *rdma,
1536 struct ibv_comp_channel *comp_channel)
1537 {
1538 struct rdma_cm_event *cm_event;
1539 int ret;
1540
1541 /*
1542 * Coroutine doesn't start until migration_fd_process_incoming()
1543 * so don't yield unless we know we're running inside of a coroutine.
1544 */
1545 if (rdma->migration_started_on_destination &&
1546 migration_incoming_get_current()->state == MIGRATION_STATUS_ACTIVE) {
1547 yield_until_fd_readable(comp_channel->fd);
1548 } else {
1549 /* This is the source side, we're in a separate thread
1550 * or destination prior to migration_fd_process_incoming()
1551 * after postcopy, the destination also in a separate thread.
1552 * we can't yield; so we have to poll the fd.
1553 * But we need to be able to handle 'cancel' or an error
1554 * without hanging forever.
1555 */
1556 while (!rdma->errored && !rdma->received_error) {
1557 GPollFD pfds[2];
1558 pfds[0].fd = comp_channel->fd;
1559 pfds[0].events = G_IO_IN | G_IO_HUP | G_IO_ERR;
1560 pfds[0].revents = 0;
1561
1562 pfds[1].fd = rdma->channel->fd;
1563 pfds[1].events = G_IO_IN | G_IO_HUP | G_IO_ERR;
1564 pfds[1].revents = 0;
1565
1566 /* 0.1s timeout, should be fine for a 'cancel' */
1567 switch (qemu_poll_ns(pfds, 2, 100 * 1000 * 1000)) {
1568 case 2:
1569 case 1: /* fd active */
1570 if (pfds[0].revents) {
1571 return 0;
1572 }
1573
1574 if (pfds[1].revents) {
1575 ret = rdma_get_cm_event(rdma->channel, &cm_event);
1576 if (ret < 0) {
1577 error_report("failed to get cm event while wait "
1578 "completion channel");
1579 return -1;
1580 }
1581
1582 error_report("receive cm event while wait comp channel,"
1583 "cm event is %d", cm_event->event);
1584 if (cm_event->event == RDMA_CM_EVENT_DISCONNECTED ||
1585 cm_event->event == RDMA_CM_EVENT_DEVICE_REMOVAL) {
1586 rdma_ack_cm_event(cm_event);
1587 return -1;
1588 }
1589 rdma_ack_cm_event(cm_event);
1590 }
1591 break;
1592
1593 case 0: /* Timeout, go around again */
1594 break;
1595
1596 default: /* Error of some type -
1597 * I don't trust errno from qemu_poll_ns
1598 */
1599 error_report("%s: poll failed", __func__);
1600 return -1;
1601 }
1602
1603 if (migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) {
1604 /* Bail out and let the cancellation happen */
1605 return -1;
1606 }
1607 }
1608 }
1609
1610 if (rdma->received_error) {
1611 return -1;
1612 }
1613 return -rdma->errored;
1614 }
1615
1616 static struct ibv_comp_channel *to_channel(RDMAContext *rdma, uint64_t wrid)
1617 {
1618 return wrid < RDMA_WRID_RECV_CONTROL ? rdma->send_comp_channel :
1619 rdma->recv_comp_channel;
1620 }
1621
1622 static struct ibv_cq *to_cq(RDMAContext *rdma, uint64_t wrid)
1623 {
1624 return wrid < RDMA_WRID_RECV_CONTROL ? rdma->send_cq : rdma->recv_cq;
1625 }
1626
1627 /*
1628 * Block until the next work request has completed.
1629 *
1630 * First poll to see if a work request has already completed,
1631 * otherwise block.
1632 *
1633 * If we encounter completed work requests for IDs other than
1634 * the one we're interested in, then that's generally an error.
1635 *
1636 * The only exception is actual RDMA Write completions. These
1637 * completions only need to be recorded, but do not actually
1638 * need further processing.
1639 */
1640 static int qemu_rdma_block_for_wrid(RDMAContext *rdma,
1641 uint64_t wrid_requested,
1642 uint32_t *byte_len)
1643 {
1644 int num_cq_events = 0, ret;
1645 struct ibv_cq *cq;
1646 void *cq_ctx;
1647 uint64_t wr_id = RDMA_WRID_NONE, wr_id_in;
1648 struct ibv_comp_channel *ch = to_channel(rdma, wrid_requested);
1649 struct ibv_cq *poll_cq = to_cq(rdma, wrid_requested);
1650
1651 if (ibv_req_notify_cq(poll_cq, 0)) {
1652 return -1;
1653 }
1654 /* poll cq first */
1655 while (wr_id != wrid_requested) {
1656 ret = qemu_rdma_poll(rdma, poll_cq, &wr_id_in, byte_len);
1657 if (ret < 0) {
1658 return -1;
1659 }
1660
1661 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
1662
1663 if (wr_id == RDMA_WRID_NONE) {
1664 break;
1665 }
1666 if (wr_id != wrid_requested) {
1667 trace_qemu_rdma_block_for_wrid_miss(wrid_requested, wr_id);
1668 }
1669 }
1670
1671 if (wr_id == wrid_requested) {
1672 return 0;
1673 }
1674
1675 while (1) {
1676 ret = qemu_rdma_wait_comp_channel(rdma, ch);
1677 if (ret < 0) {
1678 goto err_block_for_wrid;
1679 }
1680
1681 ret = ibv_get_cq_event(ch, &cq, &cq_ctx);
1682 if (ret < 0) {
1683 /*
1684 * FIXME perror() is problematic, because ibv_reg_mr() is
1685 * not documented to set errno. Will go away later in
1686 * this series.
1687 */
1688 perror("ibv_get_cq_event");
1689 goto err_block_for_wrid;
1690 }
1691
1692 num_cq_events++;
1693
1694 if (ibv_req_notify_cq(cq, 0)) {
1695 goto err_block_for_wrid;
1696 }
1697
1698 while (wr_id != wrid_requested) {
1699 ret = qemu_rdma_poll(rdma, poll_cq, &wr_id_in, byte_len);
1700 if (ret < 0) {
1701 goto err_block_for_wrid;
1702 }
1703
1704 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
1705
1706 if (wr_id == RDMA_WRID_NONE) {
1707 break;
1708 }
1709 if (wr_id != wrid_requested) {
1710 trace_qemu_rdma_block_for_wrid_miss(wrid_requested, wr_id);
1711 }
1712 }
1713
1714 if (wr_id == wrid_requested) {
1715 goto success_block_for_wrid;
1716 }
1717 }
1718
1719 success_block_for_wrid:
1720 if (num_cq_events) {
1721 ibv_ack_cq_events(cq, num_cq_events);
1722 }
1723 return 0;
1724
1725 err_block_for_wrid:
1726 if (num_cq_events) {
1727 ibv_ack_cq_events(cq, num_cq_events);
1728 }
1729
1730 rdma->errored = true;
1731 return -1;
1732 }
1733
1734 /*
1735 * Post a SEND message work request for the control channel
1736 * containing some data and block until the post completes.
1737 */
1738 static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf,
1739 RDMAControlHeader *head)
1740 {
1741 int ret;
1742 RDMAWorkRequestData *wr = &rdma->wr_data[RDMA_WRID_CONTROL];
1743 struct ibv_send_wr *bad_wr;
1744 struct ibv_sge sge = {
1745 .addr = (uintptr_t)(wr->control),
1746 .length = head->len + sizeof(RDMAControlHeader),
1747 .lkey = wr->control_mr->lkey,
1748 };
1749 struct ibv_send_wr send_wr = {
1750 .wr_id = RDMA_WRID_SEND_CONTROL,
1751 .opcode = IBV_WR_SEND,
1752 .send_flags = IBV_SEND_SIGNALED,
1753 .sg_list = &sge,
1754 .num_sge = 1,
1755 };
1756
1757 trace_qemu_rdma_post_send_control(control_desc(head->type));
1758
1759 /*
1760 * We don't actually need to do a memcpy() in here if we used
1761 * the "sge" properly, but since we're only sending control messages
1762 * (not RAM in a performance-critical path), then its OK for now.
1763 *
1764 * The copy makes the RDMAControlHeader simpler to manipulate
1765 * for the time being.
1766 */
1767 assert(head->len <= RDMA_CONTROL_MAX_BUFFER - sizeof(*head));
1768 memcpy(wr->control, head, sizeof(RDMAControlHeader));
1769 control_to_network((void *) wr->control);
1770
1771 if (buf) {
1772 memcpy(wr->control + sizeof(RDMAControlHeader), buf, head->len);
1773 }
1774
1775
1776 ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
1777
1778 if (ret > 0) {
1779 error_report("Failed to use post IB SEND for control");
1780 return -1;
1781 }
1782
1783 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_SEND_CONTROL, NULL);
1784 if (ret < 0) {
1785 error_report("rdma migration: send polling control error");
1786 return -1;
1787 }
1788
1789 return 0;
1790 }
1791
1792 /*
1793 * Post a RECV work request in anticipation of some future receipt
1794 * of data on the control channel.
1795 */
1796 static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx)
1797 {
1798 struct ibv_recv_wr *bad_wr;
1799 struct ibv_sge sge = {
1800 .addr = (uintptr_t)(rdma->wr_data[idx].control),
1801 .length = RDMA_CONTROL_MAX_BUFFER,
1802 .lkey = rdma->wr_data[idx].control_mr->lkey,
1803 };
1804
1805 struct ibv_recv_wr recv_wr = {
1806 .wr_id = RDMA_WRID_RECV_CONTROL + idx,
1807 .sg_list = &sge,
1808 .num_sge = 1,
1809 };
1810
1811
1812 if (ibv_post_recv(rdma->qp, &recv_wr, &bad_wr)) {
1813 return -1;
1814 }
1815
1816 return 0;
1817 }
1818
1819 /*
1820 * Block and wait for a RECV control channel message to arrive.
1821 */
1822 static int qemu_rdma_exchange_get_response(RDMAContext *rdma,
1823 RDMAControlHeader *head, uint32_t expecting, int idx)
1824 {
1825 uint32_t byte_len;
1826 int ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RECV_CONTROL + idx,
1827 &byte_len);
1828
1829 if (ret < 0) {
1830 error_report("rdma migration: recv polling control error!");
1831 return -1;
1832 }
1833
1834 network_to_control((void *) rdma->wr_data[idx].control);
1835 memcpy(head, rdma->wr_data[idx].control, sizeof(RDMAControlHeader));
1836
1837 trace_qemu_rdma_exchange_get_response_start(control_desc(expecting));
1838
1839 if (expecting == RDMA_CONTROL_NONE) {
1840 trace_qemu_rdma_exchange_get_response_none(control_desc(head->type),
1841 head->type);
1842 } else if (head->type != expecting || head->type == RDMA_CONTROL_ERROR) {
1843 error_report("Was expecting a %s (%d) control message"
1844 ", but got: %s (%d), length: %d",
1845 control_desc(expecting), expecting,
1846 control_desc(head->type), head->type, head->len);
1847 if (head->type == RDMA_CONTROL_ERROR) {
1848 rdma->received_error = true;
1849 }
1850 return -1;
1851 }
1852 if (head->len > RDMA_CONTROL_MAX_BUFFER - sizeof(*head)) {
1853 error_report("too long length: %d", head->len);
1854 return -1;
1855 }
1856 if (sizeof(*head) + head->len != byte_len) {
1857 error_report("Malformed length: %d byte_len %d", head->len, byte_len);
1858 return -1;
1859 }
1860
1861 return 0;
1862 }
1863
1864 /*
1865 * When a RECV work request has completed, the work request's
1866 * buffer is pointed at the header.
1867 *
1868 * This will advance the pointer to the data portion
1869 * of the control message of the work request's buffer that
1870 * was populated after the work request finished.
1871 */
1872 static void qemu_rdma_move_header(RDMAContext *rdma, int idx,
1873 RDMAControlHeader *head)
1874 {
1875 rdma->wr_data[idx].control_len = head->len;
1876 rdma->wr_data[idx].control_curr =
1877 rdma->wr_data[idx].control + sizeof(RDMAControlHeader);
1878 }
1879
1880 /*
1881 * This is an 'atomic' high-level operation to deliver a single, unified
1882 * control-channel message.
1883 *
1884 * Additionally, if the user is expecting some kind of reply to this message,
1885 * they can request a 'resp' response message be filled in by posting an
1886 * additional work request on behalf of the user and waiting for an additional
1887 * completion.
1888 *
1889 * The extra (optional) response is used during registration to us from having
1890 * to perform an *additional* exchange of message just to provide a response by
1891 * instead piggy-backing on the acknowledgement.
1892 */
1893 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
1894 uint8_t *data, RDMAControlHeader *resp,
1895 int *resp_idx,
1896 int (*callback)(RDMAContext *rdma))
1897 {
1898 int ret;
1899
1900 /*
1901 * Wait until the dest is ready before attempting to deliver the message
1902 * by waiting for a READY message.
1903 */
1904 if (rdma->control_ready_expected) {
1905 RDMAControlHeader resp_ignored;
1906
1907 ret = qemu_rdma_exchange_get_response(rdma, &resp_ignored,
1908 RDMA_CONTROL_READY,
1909 RDMA_WRID_READY);
1910 if (ret < 0) {
1911 return -1;
1912 }
1913 }
1914
1915 /*
1916 * If the user is expecting a response, post a WR in anticipation of it.
1917 */
1918 if (resp) {
1919 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_DATA);
1920 if (ret < 0) {
1921 error_report("rdma migration: error posting"
1922 " extra control recv for anticipated result!");
1923 return -1;
1924 }
1925 }
1926
1927 /*
1928 * Post a WR to replace the one we just consumed for the READY message.
1929 */
1930 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
1931 if (ret < 0) {
1932 error_report("rdma migration: error posting first control recv!");
1933 return -1;
1934 }
1935
1936 /*
1937 * Deliver the control message that was requested.
1938 */
1939 ret = qemu_rdma_post_send_control(rdma, data, head);
1940
1941 if (ret < 0) {
1942 error_report("Failed to send control buffer!");
1943 return -1;
1944 }
1945
1946 /*
1947 * If we're expecting a response, block and wait for it.
1948 */
1949 if (resp) {
1950 if (callback) {
1951 trace_qemu_rdma_exchange_send_issue_callback();
1952 ret = callback(rdma);
1953 if (ret < 0) {
1954 return -1;
1955 }
1956 }
1957
1958 trace_qemu_rdma_exchange_send_waiting(control_desc(resp->type));
1959 ret = qemu_rdma_exchange_get_response(rdma, resp,
1960 resp->type, RDMA_WRID_DATA);
1961
1962 if (ret < 0) {
1963 return -1;
1964 }
1965
1966 qemu_rdma_move_header(rdma, RDMA_WRID_DATA, resp);
1967 if (resp_idx) {
1968 *resp_idx = RDMA_WRID_DATA;
1969 }
1970 trace_qemu_rdma_exchange_send_received(control_desc(resp->type));
1971 }
1972
1973 rdma->control_ready_expected = 1;
1974
1975 return 0;
1976 }
1977
1978 /*
1979 * This is an 'atomic' high-level operation to receive a single, unified
1980 * control-channel message.
1981 */
1982 static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader *head,
1983 uint32_t expecting)
1984 {
1985 RDMAControlHeader ready = {
1986 .len = 0,
1987 .type = RDMA_CONTROL_READY,
1988 .repeat = 1,
1989 };
1990 int ret;
1991
1992 /*
1993 * Inform the source that we're ready to receive a message.
1994 */
1995 ret = qemu_rdma_post_send_control(rdma, NULL, &ready);
1996
1997 if (ret < 0) {
1998 error_report("Failed to send control buffer!");
1999 return -1;
2000 }
2001
2002 /*
2003 * Block and wait for the message.
2004 */
2005 ret = qemu_rdma_exchange_get_response(rdma, head,
2006 expecting, RDMA_WRID_READY);
2007
2008 if (ret < 0) {
2009 return -1;
2010 }
2011
2012 qemu_rdma_move_header(rdma, RDMA_WRID_READY, head);
2013
2014 /*
2015 * Post a new RECV work request to replace the one we just consumed.
2016 */
2017 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
2018 if (ret < 0) {
2019 error_report("rdma migration: error posting second control recv!");
2020 return -1;
2021 }
2022
2023 return 0;
2024 }
2025
2026 /*
2027 * Write an actual chunk of memory using RDMA.
2028 *
2029 * If we're using dynamic registration on the dest-side, we have to
2030 * send a registration command first.
2031 */
2032 static int qemu_rdma_write_one(RDMAContext *rdma,
2033 int current_index, uint64_t current_addr,
2034 uint64_t length)
2035 {
2036 struct ibv_sge sge;
2037 struct ibv_send_wr send_wr = { 0 };
2038 struct ibv_send_wr *bad_wr;
2039 int reg_result_idx, ret, count = 0;
2040 uint64_t chunk, chunks;
2041 uint8_t *chunk_start, *chunk_end;
2042 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[current_index]);
2043 RDMARegister reg;
2044 RDMARegisterResult *reg_result;
2045 RDMAControlHeader resp = { .type = RDMA_CONTROL_REGISTER_RESULT };
2046 RDMAControlHeader head = { .len = sizeof(RDMARegister),
2047 .type = RDMA_CONTROL_REGISTER_REQUEST,
2048 .repeat = 1,
2049 };
2050
2051 retry:
2052 sge.addr = (uintptr_t)(block->local_host_addr +
2053 (current_addr - block->offset));
2054 sge.length = length;
2055
2056 chunk = ram_chunk_index(block->local_host_addr,
2057 (uint8_t *)(uintptr_t)sge.addr);
2058 chunk_start = ram_chunk_start(block, chunk);
2059
2060 if (block->is_ram_block) {
2061 chunks = length / (1UL << RDMA_REG_CHUNK_SHIFT);
2062
2063 if (chunks && ((length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
2064 chunks--;
2065 }
2066 } else {
2067 chunks = block->length / (1UL << RDMA_REG_CHUNK_SHIFT);
2068
2069 if (chunks && ((block->length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
2070 chunks--;
2071 }
2072 }
2073
2074 trace_qemu_rdma_write_one_top(chunks + 1,
2075 (chunks + 1) *
2076 (1UL << RDMA_REG_CHUNK_SHIFT) / 1024 / 1024);
2077
2078 chunk_end = ram_chunk_end(block, chunk + chunks);
2079
2080
2081 while (test_bit(chunk, block->transit_bitmap)) {
2082 (void)count;
2083 trace_qemu_rdma_write_one_block(count++, current_index, chunk,
2084 sge.addr, length, rdma->nb_sent, block->nb_chunks);
2085
2086 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
2087
2088 if (ret < 0) {
2089 error_report("Failed to Wait for previous write to complete "
2090 "block %d chunk %" PRIu64
2091 " current %" PRIu64 " len %" PRIu64 " %d",
2092 current_index, chunk, sge.addr, length, rdma->nb_sent);
2093 return -1;
2094 }
2095 }
2096
2097 if (!rdma->pin_all || !block->is_ram_block) {
2098 if (!block->remote_keys[chunk]) {
2099 /*
2100 * This chunk has not yet been registered, so first check to see
2101 * if the entire chunk is zero. If so, tell the other size to
2102 * memset() + madvise() the entire chunk without RDMA.
2103 */
2104
2105 if (buffer_is_zero((void *)(uintptr_t)sge.addr, length)) {
2106 RDMACompress comp = {
2107 .offset = current_addr,
2108 .value = 0,
2109 .block_idx = current_index,
2110 .length = length,
2111 };
2112
2113 head.len = sizeof(comp);
2114 head.type = RDMA_CONTROL_COMPRESS;
2115
2116 trace_qemu_rdma_write_one_zero(chunk, sge.length,
2117 current_index, current_addr);
2118
2119 compress_to_network(rdma, &comp);
2120 ret = qemu_rdma_exchange_send(rdma, &head,
2121 (uint8_t *) &comp, NULL, NULL, NULL);
2122
2123 if (ret < 0) {
2124 return -1;
2125 }
2126
2127 /*
2128 * TODO: Here we are sending something, but we are not
2129 * accounting for anything transferred. The following is wrong:
2130 *
2131 * stat64_add(&mig_stats.rdma_bytes, sge.length);
2132 *
2133 * because we are using some kind of compression. I
2134 * would think that head.len would be the more similar
2135 * thing to a correct value.
2136 */
2137 stat64_add(&mig_stats.zero_pages,
2138 sge.length / qemu_target_page_size());
2139 return 1;
2140 }
2141
2142 /*
2143 * Otherwise, tell other side to register.
2144 */
2145 reg.current_index = current_index;
2146 if (block->is_ram_block) {
2147 reg.key.current_addr = current_addr;
2148 } else {
2149 reg.key.chunk = chunk;
2150 }
2151 reg.chunks = chunks;
2152
2153 trace_qemu_rdma_write_one_sendreg(chunk, sge.length, current_index,
2154 current_addr);
2155
2156 register_to_network(rdma, &reg);
2157 ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
2158 &resp, &reg_result_idx, NULL);
2159 if (ret < 0) {
2160 return -1;
2161 }
2162
2163 /* try to overlap this single registration with the one we sent. */
2164 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
2165 &sge.lkey, NULL, chunk,
2166 chunk_start, chunk_end)) {
2167 error_report("cannot get lkey");
2168 return -1;
2169 }
2170
2171 reg_result = (RDMARegisterResult *)
2172 rdma->wr_data[reg_result_idx].control_curr;
2173
2174 network_to_result(reg_result);
2175
2176 trace_qemu_rdma_write_one_recvregres(block->remote_keys[chunk],
2177 reg_result->rkey, chunk);
2178
2179 block->remote_keys[chunk] = reg_result->rkey;
2180 block->remote_host_addr = reg_result->host_addr;
2181 } else {
2182 /* already registered before */
2183 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
2184 &sge.lkey, NULL, chunk,
2185 chunk_start, chunk_end)) {
2186 error_report("cannot get lkey!");
2187 return -1;
2188 }
2189 }
2190
2191 send_wr.wr.rdma.rkey = block->remote_keys[chunk];
2192 } else {
2193 send_wr.wr.rdma.rkey = block->remote_rkey;
2194
2195 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
2196 &sge.lkey, NULL, chunk,
2197 chunk_start, chunk_end)) {
2198 error_report("cannot get lkey!");
2199 return -1;
2200 }
2201 }
2202
2203 /*
2204 * Encode the ram block index and chunk within this wrid.
2205 * We will use this information at the time of completion
2206 * to figure out which bitmap to check against and then which
2207 * chunk in the bitmap to look for.
2208 */
2209 send_wr.wr_id = qemu_rdma_make_wrid(RDMA_WRID_RDMA_WRITE,
2210 current_index, chunk);
2211
2212 send_wr.opcode = IBV_WR_RDMA_WRITE;
2213 send_wr.send_flags = IBV_SEND_SIGNALED;
2214 send_wr.sg_list = &sge;
2215 send_wr.num_sge = 1;
2216 send_wr.wr.rdma.remote_addr = block->remote_host_addr +
2217 (current_addr - block->offset);
2218
2219 trace_qemu_rdma_write_one_post(chunk, sge.addr, send_wr.wr.rdma.remote_addr,
2220 sge.length);
2221
2222 /*
2223 * ibv_post_send() does not return negative error numbers,
2224 * per the specification they are positive - no idea why.
2225 */
2226 ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
2227
2228 if (ret == ENOMEM) {
2229 trace_qemu_rdma_write_one_queue_full();
2230 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
2231 if (ret < 0) {
2232 error_report("rdma migration: failed to make "
2233 "room in full send queue!");
2234 return -1;
2235 }
2236
2237 goto retry;
2238
2239 } else if (ret > 0) {
2240 /*
2241 * FIXME perror() is problematic, because whether
2242 * ibv_post_send() sets errno is unclear. Will go away later
2243 * in this series.
2244 */
2245 perror("rdma migration: post rdma write failed");
2246 return -1;
2247 }
2248
2249 set_bit(chunk, block->transit_bitmap);
2250 stat64_add(&mig_stats.normal_pages, sge.length / qemu_target_page_size());
2251 /*
2252 * We are adding to transferred the amount of data written, but no
2253 * overhead at all. I will asume that RDMA is magicaly and don't
2254 * need to transfer (at least) the addresses where it wants to
2255 * write the pages. Here it looks like it should be something
2256 * like:
2257 * sizeof(send_wr) + sge.length
2258 * but this being RDMA, who knows.
2259 */
2260 stat64_add(&mig_stats.rdma_bytes, sge.length);
2261 ram_transferred_add(sge.length);
2262 rdma->total_writes++;
2263
2264 return 0;
2265 }
2266
2267 /*
2268 * Push out any unwritten RDMA operations.
2269 *
2270 * We support sending out multiple chunks at the same time.
2271 * Not all of them need to get signaled in the completion queue.
2272 */
2273 static int qemu_rdma_write_flush(RDMAContext *rdma)
2274 {
2275 int ret;
2276
2277 if (!rdma->current_length) {
2278 return 0;
2279 }
2280
2281 ret = qemu_rdma_write_one(rdma,
2282 rdma->current_index, rdma->current_addr, rdma->current_length);
2283
2284 if (ret < 0) {
2285 return -1;
2286 }
2287
2288 if (ret == 0) {
2289 rdma->nb_sent++;
2290 trace_qemu_rdma_write_flush(rdma->nb_sent);
2291 }
2292
2293 rdma->current_length = 0;
2294 rdma->current_addr = 0;
2295
2296 return 0;
2297 }
2298
2299 static inline bool qemu_rdma_buffer_mergeable(RDMAContext *rdma,
2300 uint64_t offset, uint64_t len)
2301 {
2302 RDMALocalBlock *block;
2303 uint8_t *host_addr;
2304 uint8_t *chunk_end;
2305
2306 if (rdma->current_index < 0) {
2307 return false;
2308 }
2309
2310 if (rdma->current_chunk < 0) {
2311 return false;
2312 }
2313
2314 block = &(rdma->local_ram_blocks.block[rdma->current_index]);
2315 host_addr = block->local_host_addr + (offset - block->offset);
2316 chunk_end = ram_chunk_end(block, rdma->current_chunk);
2317
2318 if (rdma->current_length == 0) {
2319 return false;
2320 }
2321
2322 /*
2323 * Only merge into chunk sequentially.
2324 */
2325 if (offset != (rdma->current_addr + rdma->current_length)) {
2326 return false;
2327 }
2328
2329 if (offset < block->offset) {
2330 return false;
2331 }
2332
2333 if ((offset + len) > (block->offset + block->length)) {
2334 return false;
2335 }
2336
2337 if ((host_addr + len) > chunk_end) {
2338 return false;
2339 }
2340
2341 return true;
2342 }
2343
2344 /*
2345 * We're not actually writing here, but doing three things:
2346 *
2347 * 1. Identify the chunk the buffer belongs to.
2348 * 2. If the chunk is full or the buffer doesn't belong to the current
2349 * chunk, then start a new chunk and flush() the old chunk.
2350 * 3. To keep the hardware busy, we also group chunks into batches
2351 * and only require that a batch gets acknowledged in the completion
2352 * queue instead of each individual chunk.
2353 */
2354 static int qemu_rdma_write(RDMAContext *rdma,
2355 uint64_t block_offset, uint64_t offset,
2356 uint64_t len)
2357 {
2358 uint64_t current_addr = block_offset + offset;
2359 uint64_t index = rdma->current_index;
2360 uint64_t chunk = rdma->current_chunk;
2361 int ret;
2362
2363 /* If we cannot merge it, we flush the current buffer first. */
2364 if (!qemu_rdma_buffer_mergeable(rdma, current_addr, len)) {
2365 ret = qemu_rdma_write_flush(rdma);
2366 if (ret < 0) {
2367 return -1;
2368 }
2369 rdma->current_length = 0;
2370 rdma->current_addr = current_addr;
2371
2372 qemu_rdma_search_ram_block(rdma, block_offset,
2373 offset, len, &index, &chunk);
2374 rdma->current_index = index;
2375 rdma->current_chunk = chunk;
2376 }
2377
2378 /* merge it */
2379 rdma->current_length += len;
2380
2381 /* flush it if buffer is too large */
2382 if (rdma->current_length >= RDMA_MERGE_MAX) {
2383 return qemu_rdma_write_flush(rdma);
2384 }
2385
2386 return 0;
2387 }
2388
2389 static void qemu_rdma_cleanup(RDMAContext *rdma)
2390 {
2391 int idx;
2392
2393 if (rdma->cm_id && rdma->connected) {
2394 if ((rdma->errored ||
2395 migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) &&
2396 !rdma->received_error) {
2397 RDMAControlHeader head = { .len = 0,
2398 .type = RDMA_CONTROL_ERROR,
2399 .repeat = 1,
2400 };
2401 error_report("Early error. Sending error.");
2402 qemu_rdma_post_send_control(rdma, NULL, &head);
2403 }
2404
2405 rdma_disconnect(rdma->cm_id);
2406 trace_qemu_rdma_cleanup_disconnect();
2407 rdma->connected = false;
2408 }
2409
2410 if (rdma->channel) {
2411 qemu_set_fd_handler(rdma->channel->fd, NULL, NULL, NULL);
2412 }
2413 g_free(rdma->dest_blocks);
2414 rdma->dest_blocks = NULL;
2415
2416 for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2417 if (rdma->wr_data[idx].control_mr) {
2418 rdma->total_registrations--;
2419 ibv_dereg_mr(rdma->wr_data[idx].control_mr);
2420 }
2421 rdma->wr_data[idx].control_mr = NULL;
2422 }
2423
2424 if (rdma->local_ram_blocks.block) {
2425 while (rdma->local_ram_blocks.nb_blocks) {
2426 rdma_delete_block(rdma, &rdma->local_ram_blocks.block[0]);
2427 }
2428 }
2429
2430 if (rdma->qp) {
2431 rdma_destroy_qp(rdma->cm_id);
2432 rdma->qp = NULL;
2433 }
2434 if (rdma->recv_cq) {
2435 ibv_destroy_cq(rdma->recv_cq);
2436 rdma->recv_cq = NULL;
2437 }
2438 if (rdma->send_cq) {
2439 ibv_destroy_cq(rdma->send_cq);
2440 rdma->send_cq = NULL;
2441 }
2442 if (rdma->recv_comp_channel) {
2443 ibv_destroy_comp_channel(rdma->recv_comp_channel);
2444 rdma->recv_comp_channel = NULL;
2445 }
2446 if (rdma->send_comp_channel) {
2447 ibv_destroy_comp_channel(rdma->send_comp_channel);
2448 rdma->send_comp_channel = NULL;
2449 }
2450 if (rdma->pd) {
2451 ibv_dealloc_pd(rdma->pd);
2452 rdma->pd = NULL;
2453 }
2454 if (rdma->cm_id) {
2455 rdma_destroy_id(rdma->cm_id);
2456 rdma->cm_id = NULL;
2457 }
2458
2459 /* the destination side, listen_id and channel is shared */
2460 if (rdma->listen_id) {
2461 if (!rdma->is_return_path) {
2462 rdma_destroy_id(rdma->listen_id);
2463 }
2464 rdma->listen_id = NULL;
2465
2466 if (rdma->channel) {
2467 if (!rdma->is_return_path) {
2468 rdma_destroy_event_channel(rdma->channel);
2469 }
2470 rdma->channel = NULL;
2471 }
2472 }
2473
2474 if (rdma->channel) {
2475 rdma_destroy_event_channel(rdma->channel);
2476 rdma->channel = NULL;
2477 }
2478 g_free(rdma->host);
2479 g_free(rdma->host_port);
2480 rdma->host = NULL;
2481 rdma->host_port = NULL;
2482 }
2483
2484
2485 static int qemu_rdma_source_init(RDMAContext *rdma, bool pin_all, Error **errp)
2486 {
2487 int ret, idx;
2488
2489 /*
2490 * Will be validated against destination's actual capabilities
2491 * after the connect() completes.
2492 */
2493 rdma->pin_all = pin_all;
2494
2495 ret = qemu_rdma_resolve_host(rdma, errp);
2496 if (ret < 0) {
2497 goto err_rdma_source_init;
2498 }
2499
2500 ret = qemu_rdma_alloc_pd_cq(rdma);
2501 if (ret < 0) {
2502 error_setg(errp, "RDMA ERROR: "
2503 "rdma migration: error allocating pd and cq! Your mlock()"
2504 " limits may be too low. Please check $ ulimit -a # and "
2505 "search for 'ulimit -l' in the output");
2506 goto err_rdma_source_init;
2507 }
2508
2509 ret = qemu_rdma_alloc_qp(rdma);
2510 if (ret < 0) {
2511 error_setg(errp, "RDMA ERROR: rdma migration: error allocating qp!");
2512 goto err_rdma_source_init;
2513 }
2514
2515 qemu_rdma_init_ram_blocks(rdma);
2516
2517 /* Build the hash that maps from offset to RAMBlock */
2518 rdma->blockmap = g_hash_table_new(g_direct_hash, g_direct_equal);
2519 for (idx = 0; idx < rdma->local_ram_blocks.nb_blocks; idx++) {
2520 g_hash_table_insert(rdma->blockmap,
2521 (void *)(uintptr_t)rdma->local_ram_blocks.block[idx].offset,
2522 &rdma->local_ram_blocks.block[idx]);
2523 }
2524
2525 for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2526 ret = qemu_rdma_reg_control(rdma, idx);
2527 if (ret < 0) {
2528 error_setg(errp,
2529 "RDMA ERROR: rdma migration: error registering %d control!",
2530 idx);
2531 goto err_rdma_source_init;
2532 }
2533 }
2534
2535 return 0;
2536
2537 err_rdma_source_init:
2538 qemu_rdma_cleanup(rdma);
2539 return -1;
2540 }
2541
2542 static int qemu_get_cm_event_timeout(RDMAContext *rdma,
2543 struct rdma_cm_event **cm_event,
2544 long msec, Error **errp)
2545 {
2546 int ret;
2547 struct pollfd poll_fd = {
2548 .fd = rdma->channel->fd,
2549 .events = POLLIN,
2550 .revents = 0
2551 };
2552
2553 do {
2554 ret = poll(&poll_fd, 1, msec);
2555 } while (ret < 0 && errno == EINTR);
2556
2557 if (ret == 0) {
2558 error_setg(errp, "RDMA ERROR: poll cm event timeout");
2559 return -1;
2560 } else if (ret < 0) {
2561 error_setg(errp, "RDMA ERROR: failed to poll cm event, errno=%i",
2562 errno);
2563 return -1;
2564 } else if (poll_fd.revents & POLLIN) {
2565 if (rdma_get_cm_event(rdma->channel, cm_event) < 0) {
2566 error_setg(errp, "RDMA ERROR: failed to get cm event");
2567 return -1;
2568 }
2569 return 0;
2570 } else {
2571 error_setg(errp, "RDMA ERROR: no POLLIN event, revent=%x",
2572 poll_fd.revents);
2573 return -1;
2574 }
2575 }
2576
2577 static int qemu_rdma_connect(RDMAContext *rdma, bool return_path,
2578 Error **errp)
2579 {
2580 RDMACapabilities cap = {
2581 .version = RDMA_CONTROL_VERSION_CURRENT,
2582 .flags = 0,
2583 };
2584 struct rdma_conn_param conn_param = { .initiator_depth = 2,
2585 .retry_count = 5,
2586 .private_data = &cap,
2587 .private_data_len = sizeof(cap),
2588 };
2589 struct rdma_cm_event *cm_event;
2590 int ret;
2591
2592 /*
2593 * Only negotiate the capability with destination if the user
2594 * on the source first requested the capability.
2595 */
2596 if (rdma->pin_all) {
2597 trace_qemu_rdma_connect_pin_all_requested();
2598 cap.flags |= RDMA_CAPABILITY_PIN_ALL;
2599 }
2600
2601 caps_to_network(&cap);
2602
2603 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
2604 if (ret < 0) {
2605 error_setg(errp, "RDMA ERROR: posting second control recv");
2606 goto err_rdma_source_connect;
2607 }
2608
2609 ret = rdma_connect(rdma->cm_id, &conn_param);
2610 if (ret < 0) {
2611 perror("rdma_connect");
2612 error_setg(errp, "RDMA ERROR: connecting to destination!");
2613 goto err_rdma_source_connect;
2614 }
2615
2616 if (return_path) {
2617 ret = qemu_get_cm_event_timeout(rdma, &cm_event, 5000, errp);
2618 } else {
2619 ret = rdma_get_cm_event(rdma->channel, &cm_event);
2620 if (ret < 0) {
2621 error_setg(errp, "RDMA ERROR: failed to get cm event");
2622 }
2623 }
2624 if (ret < 0) {
2625 /*
2626 * FIXME perror() is wrong, because
2627 * qemu_get_cm_event_timeout() can fail without setting errno.
2628 * Will go away later in this series.
2629 */
2630 perror("rdma_get_cm_event after rdma_connect");
2631 goto err_rdma_source_connect;
2632 }
2633
2634 if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
2635 error_report("rdma_get_cm_event != EVENT_ESTABLISHED after rdma_connect");
2636 error_setg(errp, "RDMA ERROR: connecting to destination!");
2637 rdma_ack_cm_event(cm_event);
2638 goto err_rdma_source_connect;
2639 }
2640 rdma->connected = true;
2641
2642 memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
2643 network_to_caps(&cap);
2644
2645 /*
2646 * Verify that the *requested* capabilities are supported by the destination
2647 * and disable them otherwise.
2648 */
2649 if (rdma->pin_all && !(cap.flags & RDMA_CAPABILITY_PIN_ALL)) {
2650 warn_report("RDMA: Server cannot support pinning all memory. "
2651 "Will register memory dynamically.");
2652 rdma->pin_all = false;
2653 }
2654
2655 trace_qemu_rdma_connect_pin_all_outcome(rdma->pin_all);
2656
2657 rdma_ack_cm_event(cm_event);
2658
2659 rdma->control_ready_expected = 1;
2660 rdma->nb_sent = 0;
2661 return 0;
2662
2663 err_rdma_source_connect:
2664 qemu_rdma_cleanup(rdma);
2665 return -1;
2666 }
2667
2668 static int qemu_rdma_dest_init(RDMAContext *rdma, Error **errp)
2669 {
2670 Error *err = NULL;
2671 int ret, idx;
2672 struct rdma_cm_id *listen_id;
2673 char ip[40] = "unknown";
2674 struct rdma_addrinfo *res, *e;
2675 char port_str[16];
2676 int reuse = 1;
2677
2678 for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2679 rdma->wr_data[idx].control_len = 0;
2680 rdma->wr_data[idx].control_curr = NULL;
2681 }
2682
2683 if (!rdma->host || !rdma->host[0]) {
2684 error_setg(errp, "RDMA ERROR: RDMA host is not set!");
2685 rdma->errored = true;
2686 return -1;
2687 }
2688 /* create CM channel */
2689 rdma->channel = rdma_create_event_channel();
2690 if (!rdma->channel) {
2691 error_setg(errp, "RDMA ERROR: could not create rdma event channel");
2692 rdma->errored = true;
2693 return -1;
2694 }
2695
2696 /* create CM id */
2697 ret = rdma_create_id(rdma->channel, &listen_id, NULL, RDMA_PS_TCP);
2698 if (ret < 0) {
2699 error_setg(errp, "RDMA ERROR: could not create cm_id!");
2700 goto err_dest_init_create_listen_id;
2701 }
2702
2703 snprintf(port_str, 16, "%d", rdma->port);
2704 port_str[15] = '\0';
2705
2706 ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res);
2707 if (ret) {
2708 error_setg(errp, "RDMA ERROR: could not rdma_getaddrinfo address %s",
2709 rdma->host);
2710 goto err_dest_init_bind_addr;
2711 }
2712
2713 ret = rdma_set_option(listen_id, RDMA_OPTION_ID, RDMA_OPTION_ID_REUSEADDR,
2714 &reuse, sizeof reuse);
2715 if (ret < 0) {
2716 error_setg(errp, "RDMA ERROR: Error: could not set REUSEADDR option");
2717 goto err_dest_init_bind_addr;
2718 }
2719
2720 /* Try all addresses, saving the first error in @err */
2721 for (e = res; e != NULL; e = e->ai_next) {
2722 Error **local_errp = err ? NULL : &err;
2723
2724 inet_ntop(e->ai_family,
2725 &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip);
2726 trace_qemu_rdma_dest_init_trying(rdma->host, ip);
2727 ret = rdma_bind_addr(listen_id, e->ai_dst_addr);
2728 if (ret < 0) {
2729 continue;
2730 }
2731 if (e->ai_family == AF_INET6) {
2732 ret = qemu_rdma_broken_ipv6_kernel(listen_id->verbs,
2733 local_errp);
2734 if (ret < 0) {
2735 continue;
2736 }
2737 }
2738 error_free(err);
2739 break;
2740 }
2741
2742 rdma_freeaddrinfo(res);
2743 if (!e) {
2744 if (err) {
2745 error_propagate(errp, err);
2746 } else {
2747 error_setg(errp, "RDMA ERROR: Error: could not rdma_bind_addr!");
2748 }
2749 goto err_dest_init_bind_addr;
2750 }
2751
2752 rdma->listen_id = listen_id;
2753 qemu_rdma_dump_gid("dest_init", listen_id);
2754 return 0;
2755
2756 err_dest_init_bind_addr:
2757 rdma_destroy_id(listen_id);
2758 err_dest_init_create_listen_id:
2759 rdma_destroy_event_channel(rdma->channel);
2760 rdma->channel = NULL;
2761 rdma->errored = true;
2762 return -1;
2763
2764 }
2765
2766 static void qemu_rdma_return_path_dest_init(RDMAContext *rdma_return_path,
2767 RDMAContext *rdma)
2768 {
2769 int idx;
2770
2771 for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2772 rdma_return_path->wr_data[idx].control_len = 0;
2773 rdma_return_path->wr_data[idx].control_curr = NULL;
2774 }
2775
2776 /*the CM channel and CM id is shared*/
2777 rdma_return_path->channel = rdma->channel;
2778 rdma_return_path->listen_id = rdma->listen_id;
2779
2780 rdma->return_path = rdma_return_path;
2781 rdma_return_path->return_path = rdma;
2782 rdma_return_path->is_return_path = true;
2783 }
2784
2785 static RDMAContext *qemu_rdma_data_init(const char *host_port, Error **errp)
2786 {
2787 RDMAContext *rdma = NULL;
2788 InetSocketAddress *addr;
2789
2790 rdma = g_new0(RDMAContext, 1);
2791 rdma->current_index = -1;
2792 rdma->current_chunk = -1;
2793
2794 addr = g_new(InetSocketAddress, 1);
2795 if (!inet_parse(addr, host_port, NULL)) {
2796 rdma->port = atoi(addr->port);
2797 rdma->host = g_strdup(addr->host);
2798 rdma->host_port = g_strdup(host_port);
2799 } else {
2800 error_setg(errp, "RDMA ERROR: bad RDMA migration address '%s'",
2801 host_port);
2802 g_free(rdma);
2803 rdma = NULL;
2804 }
2805
2806 qapi_free_InetSocketAddress(addr);
2807 return rdma;
2808 }
2809
2810 /*
2811 * QEMUFile interface to the control channel.
2812 * SEND messages for control only.
2813 * VM's ram is handled with regular RDMA messages.
2814 */
2815 static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
2816 const struct iovec *iov,
2817 size_t niov,
2818 int *fds,
2819 size_t nfds,
2820 int flags,
2821 Error **errp)
2822 {
2823 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2824 RDMAContext *rdma;
2825 int ret;
2826 ssize_t done = 0;
2827 size_t i, len;
2828
2829 RCU_READ_LOCK_GUARD();
2830 rdma = qatomic_rcu_read(&rioc->rdmaout);
2831
2832 if (!rdma) {
2833 error_setg(errp, "RDMA control channel output is not set");
2834 return -1;
2835 }
2836
2837 if (rdma->errored) {
2838 error_setg(errp,
2839 "RDMA is in an error state waiting migration to abort!");
2840 return -1;
2841 }
2842
2843 /*
2844 * Push out any writes that
2845 * we're queued up for VM's ram.
2846 */
2847 ret = qemu_rdma_write_flush(rdma);
2848 if (ret < 0) {
2849 rdma->errored = true;
2850 error_setg(errp, "qemu_rdma_write_flush failed");
2851 return -1;
2852 }
2853
2854 for (i = 0; i < niov; i++) {
2855 size_t remaining = iov[i].iov_len;
2856 uint8_t * data = (void *)iov[i].iov_base;
2857 while (remaining) {
2858 RDMAControlHeader head = {};
2859
2860 len = MIN(remaining, RDMA_SEND_INCREMENT);
2861 remaining -= len;
2862
2863 head.len = len;
2864 head.type = RDMA_CONTROL_QEMU_FILE;
2865
2866 ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL);
2867
2868 if (ret < 0) {
2869 rdma->errored = true;
2870 error_setg(errp, "qemu_rdma_exchange_send failed");
2871 return -1;
2872 }
2873
2874 data += len;
2875 done += len;
2876 }
2877 }
2878
2879 return done;
2880 }
2881
2882 static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf,
2883 size_t size, int idx)
2884 {
2885 size_t len = 0;
2886
2887 if (rdma->wr_data[idx].control_len) {
2888 trace_qemu_rdma_fill(rdma->wr_data[idx].control_len, size);
2889
2890 len = MIN(size, rdma->wr_data[idx].control_len);
2891 memcpy(buf, rdma->wr_data[idx].control_curr, len);
2892 rdma->wr_data[idx].control_curr += len;
2893 rdma->wr_data[idx].control_len -= len;
2894 }
2895
2896 return len;
2897 }
2898
2899 /*
2900 * QEMUFile interface to the control channel.
2901 * RDMA links don't use bytestreams, so we have to
2902 * return bytes to QEMUFile opportunistically.
2903 */
2904 static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
2905 const struct iovec *iov,
2906 size_t niov,
2907 int **fds,
2908 size_t *nfds,
2909 int flags,
2910 Error **errp)
2911 {
2912 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2913 RDMAContext *rdma;
2914 RDMAControlHeader head;
2915 int ret;
2916 ssize_t done = 0;
2917 size_t i, len;
2918
2919 RCU_READ_LOCK_GUARD();
2920 rdma = qatomic_rcu_read(&rioc->rdmain);
2921
2922 if (!rdma) {
2923 error_setg(errp, "RDMA control channel input is not set");
2924 return -1;
2925 }
2926
2927 if (rdma->errored) {
2928 error_setg(errp,
2929 "RDMA is in an error state waiting migration to abort!");
2930 return -1;
2931 }
2932
2933 for (i = 0; i < niov; i++) {
2934 size_t want = iov[i].iov_len;
2935 uint8_t *data = (void *)iov[i].iov_base;
2936
2937 /*
2938 * First, we hold on to the last SEND message we
2939 * were given and dish out the bytes until we run
2940 * out of bytes.
2941 */
2942 len = qemu_rdma_fill(rdma, data, want, 0);
2943 done += len;
2944 want -= len;
2945 /* Got what we needed, so go to next iovec */
2946 if (want == 0) {
2947 continue;
2948 }
2949
2950 /* If we got any data so far, then don't wait
2951 * for more, just return what we have */
2952 if (done > 0) {
2953 break;
2954 }
2955
2956
2957 /* We've got nothing at all, so lets wait for
2958 * more to arrive
2959 */
2960 ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE);
2961
2962 if (ret < 0) {
2963 rdma->errored = true;
2964 error_setg(errp, "qemu_rdma_exchange_recv failed");
2965 return -1;
2966 }
2967
2968 /*
2969 * SEND was received with new bytes, now try again.
2970 */
2971 len = qemu_rdma_fill(rdma, data, want, 0);
2972 done += len;
2973 want -= len;
2974
2975 /* Still didn't get enough, so lets just return */
2976 if (want) {
2977 if (done == 0) {
2978 return QIO_CHANNEL_ERR_BLOCK;
2979 } else {
2980 break;
2981 }
2982 }
2983 }
2984 return done;
2985 }
2986
2987 /*
2988 * Block until all the outstanding chunks have been delivered by the hardware.
2989 */
2990 static int qemu_rdma_drain_cq(RDMAContext *rdma)
2991 {
2992 int ret;
2993
2994 if (qemu_rdma_write_flush(rdma) < 0) {
2995 return -1;
2996 }
2997
2998 while (rdma->nb_sent) {
2999 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
3000 if (ret < 0) {
3001 error_report("rdma migration: complete polling error!");
3002 return -1;
3003 }
3004 }
3005
3006 qemu_rdma_unregister_waiting(rdma);
3007
3008 return 0;
3009 }
3010
3011
3012 static int qio_channel_rdma_set_blocking(QIOChannel *ioc,
3013 bool blocking,
3014 Error **errp)
3015 {
3016 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
3017 /* XXX we should make readv/writev actually honour this :-) */
3018 rioc->blocking = blocking;
3019 return 0;
3020 }
3021
3022
3023 typedef struct QIOChannelRDMASource QIOChannelRDMASource;
3024 struct QIOChannelRDMASource {
3025 GSource parent;
3026 QIOChannelRDMA *rioc;
3027 GIOCondition condition;
3028 };
3029
3030 static gboolean
3031 qio_channel_rdma_source_prepare(GSource *source,
3032 gint *timeout)
3033 {
3034 QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
3035 RDMAContext *rdma;
3036 GIOCondition cond = 0;
3037 *timeout = -1;
3038
3039 RCU_READ_LOCK_GUARD();
3040 if (rsource->condition == G_IO_IN) {
3041 rdma = qatomic_rcu_read(&rsource->rioc->rdmain);
3042 } else {
3043 rdma = qatomic_rcu_read(&rsource->rioc->rdmaout);
3044 }
3045
3046 if (!rdma) {
3047 error_report("RDMAContext is NULL when prepare Gsource");
3048 return FALSE;
3049 }
3050
3051 if (rdma->wr_data[0].control_len) {
3052 cond |= G_IO_IN;
3053 }
3054 cond |= G_IO_OUT;
3055
3056 return cond & rsource->condition;
3057 }
3058
3059 static gboolean
3060 qio_channel_rdma_source_check(GSource *source)
3061 {
3062 QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
3063 RDMAContext *rdma;
3064 GIOCondition cond = 0;
3065
3066 RCU_READ_LOCK_GUARD();
3067 if (rsource->condition == G_IO_IN) {
3068 rdma = qatomic_rcu_read(&rsource->rioc->rdmain);
3069 } else {
3070 rdma = qatomic_rcu_read(&rsource->rioc->rdmaout);
3071 }
3072
3073 if (!rdma) {
3074 error_report("RDMAContext is NULL when check Gsource");
3075 return FALSE;
3076 }
3077
3078 if (rdma->wr_data[0].control_len) {
3079 cond |= G_IO_IN;
3080 }
3081 cond |= G_IO_OUT;
3082
3083 return cond & rsource->condition;
3084 }
3085
3086 static gboolean
3087 qio_channel_rdma_source_dispatch(GSource *source,
3088 GSourceFunc callback,
3089 gpointer user_data)
3090 {
3091 QIOChannelFunc func = (QIOChannelFunc)callback;
3092 QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
3093 RDMAContext *rdma;
3094 GIOCondition cond = 0;
3095
3096 RCU_READ_LOCK_GUARD();
3097 if (rsource->condition == G_IO_IN) {
3098 rdma = qatomic_rcu_read(&rsource->rioc->rdmain);
3099 } else {
3100 rdma = qatomic_rcu_read(&rsource->rioc->rdmaout);
3101 }
3102
3103 if (!rdma) {
3104 error_report("RDMAContext is NULL when dispatch Gsource");
3105 return FALSE;
3106 }
3107
3108 if (rdma->wr_data[0].control_len) {
3109 cond |= G_IO_IN;
3110 }
3111 cond |= G_IO_OUT;
3112
3113 return (*func)(QIO_CHANNEL(rsource->rioc),
3114 (cond & rsource->condition),
3115 user_data);
3116 }
3117
3118 static void
3119 qio_channel_rdma_source_finalize(GSource *source)
3120 {
3121 QIOChannelRDMASource *ssource = (QIOChannelRDMASource *)source;
3122
3123 object_unref(OBJECT(ssource->rioc));
3124 }
3125
3126 static GSourceFuncs qio_channel_rdma_source_funcs = {
3127 qio_channel_rdma_source_prepare,
3128 qio_channel_rdma_source_check,
3129 qio_channel_rdma_source_dispatch,
3130 qio_channel_rdma_source_finalize
3131 };
3132
3133 static GSource *qio_channel_rdma_create_watch(QIOChannel *ioc,
3134 GIOCondition condition)
3135 {
3136 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
3137 QIOChannelRDMASource *ssource;
3138 GSource *source;
3139
3140 source = g_source_new(&qio_channel_rdma_source_funcs,
3141 sizeof(QIOChannelRDMASource));
3142 ssource = (QIOChannelRDMASource *)source;
3143
3144 ssource->rioc = rioc;
3145 object_ref(OBJECT(rioc));
3146
3147 ssource->condition = condition;
3148
3149 return source;
3150 }
3151
3152 static void qio_channel_rdma_set_aio_fd_handler(QIOChannel *ioc,
3153 AioContext *read_ctx,
3154 IOHandler *io_read,
3155 AioContext *write_ctx,
3156 IOHandler *io_write,
3157 void *opaque)
3158 {
3159 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
3160 if (io_read) {
3161 aio_set_fd_handler(read_ctx, rioc->rdmain->recv_comp_channel->fd,
3162 io_read, io_write, NULL, NULL, opaque);
3163 aio_set_fd_handler(read_ctx, rioc->rdmain->send_comp_channel->fd,
3164 io_read, io_write, NULL, NULL, opaque);
3165 } else {
3166 aio_set_fd_handler(write_ctx, rioc->rdmaout->recv_comp_channel->fd,
3167 io_read, io_write, NULL, NULL, opaque);
3168 aio_set_fd_handler(write_ctx, rioc->rdmaout->send_comp_channel->fd,
3169 io_read, io_write, NULL, NULL, opaque);
3170 }
3171 }
3172
3173 struct rdma_close_rcu {
3174 struct rcu_head rcu;
3175 RDMAContext *rdmain;
3176 RDMAContext *rdmaout;
3177 };
3178
3179 /* callback from qio_channel_rdma_close via call_rcu */
3180 static void qio_channel_rdma_close_rcu(struct rdma_close_rcu *rcu)
3181 {
3182 if (rcu->rdmain) {
3183 qemu_rdma_cleanup(rcu->rdmain);
3184 }
3185
3186 if (rcu->rdmaout) {
3187 qemu_rdma_cleanup(rcu->rdmaout);
3188 }
3189
3190 g_free(rcu->rdmain);
3191 g_free(rcu->rdmaout);
3192 g_free(rcu);
3193 }
3194
3195 static int qio_channel_rdma_close(QIOChannel *ioc,
3196 Error **errp)
3197 {
3198 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
3199 RDMAContext *rdmain, *rdmaout;
3200 struct rdma_close_rcu *rcu = g_new(struct rdma_close_rcu, 1);
3201
3202 trace_qemu_rdma_close();
3203
3204 rdmain = rioc->rdmain;
3205 if (rdmain) {
3206 qatomic_rcu_set(&rioc->rdmain, NULL);
3207 }
3208
3209 rdmaout = rioc->rdmaout;
3210 if (rdmaout) {
3211 qatomic_rcu_set(&rioc->rdmaout, NULL);
3212 }
3213
3214 rcu->rdmain = rdmain;
3215 rcu->rdmaout = rdmaout;
3216 call_rcu(rcu, qio_channel_rdma_close_rcu, rcu);
3217
3218 return 0;
3219 }
3220
3221 static int
3222 qio_channel_rdma_shutdown(QIOChannel *ioc,
3223 QIOChannelShutdown how,
3224 Error **errp)
3225 {
3226 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
3227 RDMAContext *rdmain, *rdmaout;
3228
3229 RCU_READ_LOCK_GUARD();
3230
3231 rdmain = qatomic_rcu_read(&rioc->rdmain);
3232 rdmaout = qatomic_rcu_read(&rioc->rdmain);
3233
3234 switch (how) {
3235 case QIO_CHANNEL_SHUTDOWN_READ:
3236 if (rdmain) {
3237 rdmain->errored = true;
3238 }
3239 break;
3240 case QIO_CHANNEL_SHUTDOWN_WRITE:
3241 if (rdmaout) {
3242 rdmaout->errored = true;
3243 }
3244 break;
3245 case QIO_CHANNEL_SHUTDOWN_BOTH:
3246 default:
3247 if (rdmain) {
3248 rdmain->errored = true;
3249 }
3250 if (rdmaout) {
3251 rdmaout->errored = true;
3252 }
3253 break;
3254 }
3255
3256 return 0;
3257 }
3258
3259 /*
3260 * Parameters:
3261 * @offset == 0 :
3262 * This means that 'block_offset' is a full virtual address that does not
3263 * belong to a RAMBlock of the virtual machine and instead
3264 * represents a private malloc'd memory area that the caller wishes to
3265 * transfer.
3266 *
3267 * @offset != 0 :
3268 * Offset is an offset to be added to block_offset and used
3269 * to also lookup the corresponding RAMBlock.
3270 *
3271 * @size : Number of bytes to transfer
3272 *
3273 * @pages_sent : User-specificed pointer to indicate how many pages were
3274 * sent. Usually, this will not be more than a few bytes of
3275 * the protocol because most transfers are sent asynchronously.
3276 */
3277 static int qemu_rdma_save_page(QEMUFile *f, ram_addr_t block_offset,
3278 ram_addr_t offset, size_t size)
3279 {
3280 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
3281 RDMAContext *rdma;
3282 int ret;
3283
3284 if (migration_in_postcopy()) {
3285 return RAM_SAVE_CONTROL_NOT_SUPP;
3286 }
3287
3288 RCU_READ_LOCK_GUARD();
3289 rdma = qatomic_rcu_read(&rioc->rdmaout);
3290
3291 if (!rdma) {
3292 return -1;
3293 }
3294
3295 if (rdma_errored(rdma)) {
3296 return -1;
3297 }
3298
3299 qemu_fflush(f);
3300
3301 /*
3302 * Add this page to the current 'chunk'. If the chunk
3303 * is full, or the page doesn't belong to the current chunk,
3304 * an actual RDMA write will occur and a new chunk will be formed.
3305 */
3306 ret = qemu_rdma_write(rdma, block_offset, offset, size);
3307 if (ret < 0) {
3308 error_report("rdma migration: write error");
3309 goto err;
3310 }
3311
3312 /*
3313 * Drain the Completion Queue if possible, but do not block,
3314 * just poll.
3315 *
3316 * If nothing to poll, the end of the iteration will do this
3317 * again to make sure we don't overflow the request queue.
3318 */
3319 while (1) {
3320 uint64_t wr_id, wr_id_in;
3321 ret = qemu_rdma_poll(rdma, rdma->recv_cq, &wr_id_in, NULL);
3322
3323 if (ret < 0) {
3324 error_report("rdma migration: polling error");
3325 goto err;
3326 }
3327
3328 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
3329
3330 if (wr_id == RDMA_WRID_NONE) {
3331 break;
3332 }
3333 }
3334
3335 while (1) {
3336 uint64_t wr_id, wr_id_in;
3337 ret = qemu_rdma_poll(rdma, rdma->send_cq, &wr_id_in, NULL);
3338
3339 if (ret < 0) {
3340 error_report("rdma migration: polling error");
3341 goto err;
3342 }
3343
3344 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
3345
3346 if (wr_id == RDMA_WRID_NONE) {
3347 break;
3348 }
3349 }
3350
3351 return RAM_SAVE_CONTROL_DELAYED;
3352
3353 err:
3354 rdma->errored = true;
3355 return -1;
3356 }
3357
3358 static void rdma_accept_incoming_migration(void *opaque);
3359
3360 static void rdma_cm_poll_handler(void *opaque)
3361 {
3362 RDMAContext *rdma = opaque;
3363 int ret;
3364 struct rdma_cm_event *cm_event;
3365 MigrationIncomingState *mis = migration_incoming_get_current();
3366
3367 ret = rdma_get_cm_event(rdma->channel, &cm_event);
3368 if (ret < 0) {
3369 error_report("get_cm_event failed %d", errno);
3370 return;
3371 }
3372
3373 if (cm_event->event == RDMA_CM_EVENT_DISCONNECTED ||
3374 cm_event->event == RDMA_CM_EVENT_DEVICE_REMOVAL) {
3375 if (!rdma->errored &&
3376 migration_incoming_get_current()->state !=
3377 MIGRATION_STATUS_COMPLETED) {
3378 error_report("receive cm event, cm event is %d", cm_event->event);
3379 rdma->errored = true;
3380 if (rdma->return_path) {
3381 rdma->return_path->errored = true;
3382 }
3383 }
3384 rdma_ack_cm_event(cm_event);
3385 if (mis->loadvm_co) {
3386 qemu_coroutine_enter(mis->loadvm_co);
3387 }
3388 return;
3389 }
3390 rdma_ack_cm_event(cm_event);
3391 }
3392
3393 static int qemu_rdma_accept(RDMAContext *rdma)
3394 {
3395 RDMACapabilities cap;
3396 struct rdma_conn_param conn_param = {
3397 .responder_resources = 2,
3398 .private_data = &cap,
3399 .private_data_len = sizeof(cap),
3400 };
3401 RDMAContext *rdma_return_path = NULL;
3402 struct rdma_cm_event *cm_event;
3403 struct ibv_context *verbs;
3404 int ret;
3405 int idx;
3406
3407 ret = rdma_get_cm_event(rdma->channel, &cm_event);
3408 if (ret < 0) {
3409 goto err_rdma_dest_wait;
3410 }
3411
3412 if (cm_event->event != RDMA_CM_EVENT_CONNECT_REQUEST) {
3413 rdma_ack_cm_event(cm_event);
3414 goto err_rdma_dest_wait;
3415 }
3416
3417 /*
3418 * initialize the RDMAContext for return path for postcopy after first
3419 * connection request reached.
3420 */
3421 if ((migrate_postcopy() || migrate_return_path())
3422 && !rdma->is_return_path) {
3423 rdma_return_path = qemu_rdma_data_init(rdma->host_port, NULL);
3424 if (rdma_return_path == NULL) {
3425 rdma_ack_cm_event(cm_event);
3426 goto err_rdma_dest_wait;
3427 }
3428
3429 qemu_rdma_return_path_dest_init(rdma_return_path, rdma);
3430 }
3431
3432 memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
3433
3434 network_to_caps(&cap);
3435
3436 if (cap.version < 1 || cap.version > RDMA_CONTROL_VERSION_CURRENT) {
3437 error_report("Unknown source RDMA version: %d, bailing...",
3438 cap.version);
3439 rdma_ack_cm_event(cm_event);
3440 goto err_rdma_dest_wait;
3441 }
3442
3443 /*
3444 * Respond with only the capabilities this version of QEMU knows about.
3445 */
3446 cap.flags &= known_capabilities;
3447
3448 /*
3449 * Enable the ones that we do know about.
3450 * Add other checks here as new ones are introduced.
3451 */
3452 if (cap.flags & RDMA_CAPABILITY_PIN_ALL) {
3453 rdma->pin_all = true;
3454 }
3455
3456 rdma->cm_id = cm_event->id;
3457 verbs = cm_event->id->verbs;
3458
3459 rdma_ack_cm_event(cm_event);
3460
3461 trace_qemu_rdma_accept_pin_state(rdma->pin_all);
3462
3463 caps_to_network(&cap);
3464
3465 trace_qemu_rdma_accept_pin_verbsc(verbs);
3466
3467 if (!rdma->verbs) {
3468 rdma->verbs = verbs;
3469 } else if (rdma->verbs != verbs) {
3470 error_report("ibv context not matching %p, %p!", rdma->verbs,
3471 verbs);
3472 goto err_rdma_dest_wait;
3473 }
3474
3475 qemu_rdma_dump_id("dest_init", verbs);
3476
3477 ret = qemu_rdma_alloc_pd_cq(rdma);
3478 if (ret < 0) {
3479 error_report("rdma migration: error allocating pd and cq!");
3480 goto err_rdma_dest_wait;
3481 }
3482
3483 ret = qemu_rdma_alloc_qp(rdma);
3484 if (ret < 0) {
3485 error_report("rdma migration: error allocating qp!");
3486 goto err_rdma_dest_wait;
3487 }
3488
3489 qemu_rdma_init_ram_blocks(rdma);
3490
3491 for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
3492 ret = qemu_rdma_reg_control(rdma, idx);
3493 if (ret < 0) {
3494 error_report("rdma: error registering %d control", idx);
3495 goto err_rdma_dest_wait;
3496 }
3497 }
3498
3499 /* Accept the second connection request for return path */
3500 if ((migrate_postcopy() || migrate_return_path())
3501 && !rdma->is_return_path) {
3502 qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
3503 NULL,
3504 (void *)(intptr_t)rdma->return_path);
3505 } else {
3506 qemu_set_fd_handler(rdma->channel->fd, rdma_cm_poll_handler,
3507 NULL, rdma);
3508 }
3509
3510 ret = rdma_accept(rdma->cm_id, &conn_param);
3511 if (ret < 0) {
3512 error_report("rdma_accept failed");
3513 goto err_rdma_dest_wait;
3514 }
3515
3516 ret = rdma_get_cm_event(rdma->channel, &cm_event);
3517 if (ret < 0) {
3518 error_report("rdma_accept get_cm_event failed");
3519 goto err_rdma_dest_wait;
3520 }
3521
3522 if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
3523 error_report("rdma_accept not event established");
3524 rdma_ack_cm_event(cm_event);
3525 goto err_rdma_dest_wait;
3526 }
3527
3528 rdma_ack_cm_event(cm_event);
3529 rdma->connected = true;
3530
3531 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
3532 if (ret < 0) {
3533 error_report("rdma migration: error posting second control recv");
3534 goto err_rdma_dest_wait;
3535 }
3536
3537 qemu_rdma_dump_gid("dest_connect", rdma->cm_id);
3538
3539 return 0;
3540
3541 err_rdma_dest_wait:
3542 rdma->errored = true;
3543 qemu_rdma_cleanup(rdma);
3544 g_free(rdma_return_path);
3545 return -1;
3546 }
3547
3548 static int dest_ram_sort_func(const void *a, const void *b)
3549 {
3550 unsigned int a_index = ((const RDMALocalBlock *)a)->src_index;
3551 unsigned int b_index = ((const RDMALocalBlock *)b)->src_index;
3552
3553 return (a_index < b_index) ? -1 : (a_index != b_index);
3554 }
3555
3556 /*
3557 * During each iteration of the migration, we listen for instructions
3558 * by the source VM to perform dynamic page registrations before they
3559 * can perform RDMA operations.
3560 *
3561 * We respond with the 'rkey'.
3562 *
3563 * Keep doing this until the source tells us to stop.
3564 */
3565 static int qemu_rdma_registration_handle(QEMUFile *f)
3566 {
3567 RDMAControlHeader reg_resp = { .len = sizeof(RDMARegisterResult),
3568 .type = RDMA_CONTROL_REGISTER_RESULT,
3569 .repeat = 0,
3570 };
3571 RDMAControlHeader unreg_resp = { .len = 0,
3572 .type = RDMA_CONTROL_UNREGISTER_FINISHED,
3573 .repeat = 0,
3574 };
3575 RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT,
3576 .repeat = 1 };
3577 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
3578 RDMAContext *rdma;
3579 RDMALocalBlocks *local;
3580 RDMAControlHeader head;
3581 RDMARegister *reg, *registers;
3582 RDMACompress *comp;
3583 RDMARegisterResult *reg_result;
3584 static RDMARegisterResult results[RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE];
3585 RDMALocalBlock *block;
3586 void *host_addr;
3587 int ret;
3588 int idx = 0;
3589 int count = 0;
3590 int i = 0;
3591
3592 RCU_READ_LOCK_GUARD();
3593 rdma = qatomic_rcu_read(&rioc->rdmain);
3594
3595 if (!rdma) {
3596 return -1;
3597 }
3598
3599 if (rdma_errored(rdma)) {
3600 return -1;
3601 }
3602
3603 local = &rdma->local_ram_blocks;
3604 do {
3605 trace_qemu_rdma_registration_handle_wait();
3606
3607 ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_NONE);
3608
3609 if (ret < 0) {
3610 break;
3611 }
3612
3613 if (head.repeat > RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE) {
3614 error_report("rdma: Too many requests in this message (%d)."
3615 "Bailing.", head.repeat);
3616 break;
3617 }
3618
3619 switch (head.type) {
3620 case RDMA_CONTROL_COMPRESS:
3621 comp = (RDMACompress *) rdma->wr_data[idx].control_curr;
3622 network_to_compress(comp);
3623
3624 trace_qemu_rdma_registration_handle_compress(comp->length,
3625 comp->block_idx,
3626 comp->offset);
3627 if (comp->block_idx >= rdma->local_ram_blocks.nb_blocks) {
3628 error_report("rdma: 'compress' bad block index %u (vs %d)",
3629 (unsigned int)comp->block_idx,
3630 rdma->local_ram_blocks.nb_blocks);
3631 goto err;
3632 }
3633 block = &(rdma->local_ram_blocks.block[comp->block_idx]);
3634
3635 host_addr = block->local_host_addr +
3636 (comp->offset - block->offset);
3637
3638 ram_handle_compressed(host_addr, comp->value, comp->length);
3639 break;
3640
3641 case RDMA_CONTROL_REGISTER_FINISHED:
3642 trace_qemu_rdma_registration_handle_finished();
3643 return 0;
3644
3645 case RDMA_CONTROL_RAM_BLOCKS_REQUEST:
3646 trace_qemu_rdma_registration_handle_ram_blocks();
3647
3648 /* Sort our local RAM Block list so it's the same as the source,
3649 * we can do this since we've filled in a src_index in the list
3650 * as we received the RAMBlock list earlier.
3651 */
3652 qsort(rdma->local_ram_blocks.block,
3653 rdma->local_ram_blocks.nb_blocks,
3654 sizeof(RDMALocalBlock), dest_ram_sort_func);
3655 for (i = 0; i < local->nb_blocks; i++) {
3656 local->block[i].index = i;
3657 }
3658
3659 if (rdma->pin_all) {
3660 ret = qemu_rdma_reg_whole_ram_blocks(rdma);
3661 if (ret < 0) {
3662 error_report("rdma migration: error dest "
3663 "registering ram blocks");
3664 goto err;
3665 }
3666 }
3667
3668 /*
3669 * Dest uses this to prepare to transmit the RAMBlock descriptions
3670 * to the source VM after connection setup.
3671 * Both sides use the "remote" structure to communicate and update
3672 * their "local" descriptions with what was sent.
3673 */
3674 for (i = 0; i < local->nb_blocks; i++) {
3675 rdma->dest_blocks[i].remote_host_addr =
3676 (uintptr_t)(local->block[i].local_host_addr);
3677
3678 if (rdma->pin_all) {
3679 rdma->dest_blocks[i].remote_rkey = local->block[i].mr->rkey;
3680 }
3681
3682 rdma->dest_blocks[i].offset = local->block[i].offset;
3683 rdma->dest_blocks[i].length = local->block[i].length;
3684
3685 dest_block_to_network(&rdma->dest_blocks[i]);
3686 trace_qemu_rdma_registration_handle_ram_blocks_loop(
3687 local->block[i].block_name,
3688 local->block[i].offset,
3689 local->block[i].length,
3690 local->block[i].local_host_addr,
3691 local->block[i].src_index);
3692 }
3693
3694 blocks.len = rdma->local_ram_blocks.nb_blocks
3695 * sizeof(RDMADestBlock);
3696
3697
3698 ret = qemu_rdma_post_send_control(rdma,
3699 (uint8_t *) rdma->dest_blocks, &blocks);
3700
3701 if (ret < 0) {
3702 error_report("rdma migration: error sending remote info");
3703 goto err;
3704 }
3705
3706 break;
3707 case RDMA_CONTROL_REGISTER_REQUEST:
3708 trace_qemu_rdma_registration_handle_register(head.repeat);
3709
3710 reg_resp.repeat = head.repeat;
3711 registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
3712
3713 for (count = 0; count < head.repeat; count++) {
3714 uint64_t chunk;
3715 uint8_t *chunk_start, *chunk_end;
3716
3717 reg = &registers[count];
3718 network_to_register(reg);
3719
3720 reg_result = &results[count];
3721
3722 trace_qemu_rdma_registration_handle_register_loop(count,
3723 reg->current_index, reg->key.current_addr, reg->chunks);
3724
3725 if (reg->current_index >= rdma->local_ram_blocks.nb_blocks) {
3726 error_report("rdma: 'register' bad block index %u (vs %d)",
3727 (unsigned int)reg->current_index,
3728 rdma->local_ram_blocks.nb_blocks);
3729 goto err;
3730 }
3731 block = &(rdma->local_ram_blocks.block[reg->current_index]);
3732 if (block->is_ram_block) {
3733 if (block->offset > reg->key.current_addr) {
3734 error_report("rdma: bad register address for block %s"
3735 " offset: %" PRIx64 " current_addr: %" PRIx64,
3736 block->block_name, block->offset,
3737 reg->key.current_addr);
3738 goto err;
3739 }
3740 host_addr = (block->local_host_addr +
3741 (reg->key.current_addr - block->offset));
3742 chunk = ram_chunk_index(block->local_host_addr,
3743 (uint8_t *) host_addr);
3744 } else {
3745 chunk = reg->key.chunk;
3746 host_addr = block->local_host_addr +
3747 (reg->key.chunk * (1UL << RDMA_REG_CHUNK_SHIFT));
3748 /* Check for particularly bad chunk value */
3749 if (host_addr < (void *)block->local_host_addr) {
3750 error_report("rdma: bad chunk for block %s"
3751 " chunk: %" PRIx64,
3752 block->block_name, reg->key.chunk);
3753 goto err;
3754 }
3755 }
3756 chunk_start = ram_chunk_start(block, chunk);
3757 chunk_end = ram_chunk_end(block, chunk + reg->chunks);
3758 /* avoid "-Waddress-of-packed-member" warning */
3759 uint32_t tmp_rkey = 0;
3760 if (qemu_rdma_register_and_get_keys(rdma, block,
3761 (uintptr_t)host_addr, NULL, &tmp_rkey,
3762 chunk, chunk_start, chunk_end)) {
3763 error_report("cannot get rkey");
3764 goto err;
3765 }
3766 reg_result->rkey = tmp_rkey;
3767
3768 reg_result->host_addr = (uintptr_t)block->local_host_addr;
3769
3770 trace_qemu_rdma_registration_handle_register_rkey(
3771 reg_result->rkey);
3772
3773 result_to_network(reg_result);
3774 }
3775
3776 ret = qemu_rdma_post_send_control(rdma,
3777 (uint8_t *) results, &reg_resp);
3778
3779 if (ret < 0) {
3780 error_report("Failed to send control buffer");
3781 goto err;
3782 }
3783 break;
3784 case RDMA_CONTROL_UNREGISTER_REQUEST:
3785 trace_qemu_rdma_registration_handle_unregister(head.repeat);
3786 unreg_resp.repeat = head.repeat;
3787 registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
3788
3789 for (count = 0; count < head.repeat; count++) {
3790 reg = &registers[count];
3791 network_to_register(reg);
3792
3793 trace_qemu_rdma_registration_handle_unregister_loop(count,
3794 reg->current_index, reg->key.chunk);
3795
3796 block = &(rdma->local_ram_blocks.block[reg->current_index]);
3797
3798 ret = ibv_dereg_mr(block->pmr[reg->key.chunk]);
3799 block->pmr[reg->key.chunk] = NULL;
3800
3801 if (ret != 0) {
3802 perror("rdma unregistration chunk failed");
3803 goto err;
3804 }
3805
3806 rdma->total_registrations--;
3807
3808 trace_qemu_rdma_registration_handle_unregister_success(
3809 reg->key.chunk);
3810 }
3811
3812 ret = qemu_rdma_post_send_control(rdma, NULL, &unreg_resp);
3813
3814 if (ret < 0) {
3815 error_report("Failed to send control buffer");
3816 goto err;
3817 }
3818 break;
3819 case RDMA_CONTROL_REGISTER_RESULT:
3820 error_report("Invalid RESULT message at dest.");
3821 goto err;
3822 default:
3823 error_report("Unknown control message %s", control_desc(head.type));
3824 goto err;
3825 }
3826 } while (1);
3827
3828 err:
3829 rdma->errored = true;
3830 return -1;
3831 }
3832
3833 /* Destination:
3834 * Called via a ram_control_load_hook during the initial RAM load section which
3835 * lists the RAMBlocks by name. This lets us know the order of the RAMBlocks
3836 * on the source.
3837 * We've already built our local RAMBlock list, but not yet sent the list to
3838 * the source.
3839 */
3840 static int
3841 rdma_block_notification_handle(QEMUFile *f, const char *name)
3842 {
3843 RDMAContext *rdma;
3844 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
3845 int curr;
3846 int found = -1;
3847
3848 RCU_READ_LOCK_GUARD();
3849 rdma = qatomic_rcu_read(&rioc->rdmain);
3850
3851 if (!rdma) {
3852 return -1;
3853 }
3854
3855 /* Find the matching RAMBlock in our local list */
3856 for (curr = 0; curr < rdma->local_ram_blocks.nb_blocks; curr++) {
3857 if (!strcmp(rdma->local_ram_blocks.block[curr].block_name, name)) {
3858 found = curr;
3859 break;
3860 }
3861 }
3862
3863 if (found == -1) {
3864 error_report("RAMBlock '%s' not found on destination", name);
3865 return -1;
3866 }
3867
3868 rdma->local_ram_blocks.block[curr].src_index = rdma->next_src_index;
3869 trace_rdma_block_notification_handle(name, rdma->next_src_index);
3870 rdma->next_src_index++;
3871
3872 return 0;
3873 }
3874
3875 static int rdma_load_hook(QEMUFile *f, uint64_t flags, void *data)
3876 {
3877 switch (flags) {
3878 case RAM_CONTROL_BLOCK_REG:
3879 return rdma_block_notification_handle(f, data);
3880
3881 case RAM_CONTROL_HOOK:
3882 return qemu_rdma_registration_handle(f);
3883
3884 default:
3885 /* Shouldn't be called with any other values */
3886 abort();
3887 }
3888 }
3889
3890 static int qemu_rdma_registration_start(QEMUFile *f,
3891 uint64_t flags, void *data)
3892 {
3893 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
3894 RDMAContext *rdma;
3895
3896 if (migration_in_postcopy()) {
3897 return 0;
3898 }
3899
3900 RCU_READ_LOCK_GUARD();
3901 rdma = qatomic_rcu_read(&rioc->rdmaout);
3902 if (!rdma) {
3903 return -1;
3904 }
3905
3906 if (rdma_errored(rdma)) {
3907 return -1;
3908 }
3909
3910 trace_qemu_rdma_registration_start(flags);
3911 qemu_put_be64(f, RAM_SAVE_FLAG_HOOK);
3912 qemu_fflush(f);
3913
3914 return 0;
3915 }
3916
3917 /*
3918 * Inform dest that dynamic registrations are done for now.
3919 * First, flush writes, if any.
3920 */
3921 static int qemu_rdma_registration_stop(QEMUFile *f,
3922 uint64_t flags, void *data)
3923 {
3924 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
3925 RDMAContext *rdma;
3926 RDMAControlHeader head = { .len = 0, .repeat = 1 };
3927 int ret;
3928
3929 if (migration_in_postcopy()) {
3930 return 0;
3931 }
3932
3933 RCU_READ_LOCK_GUARD();
3934 rdma = qatomic_rcu_read(&rioc->rdmaout);
3935 if (!rdma) {
3936 return -1;
3937 }
3938
3939 if (rdma_errored(rdma)) {
3940 return -1;
3941 }
3942
3943 qemu_fflush(f);
3944 ret = qemu_rdma_drain_cq(rdma);
3945
3946 if (ret < 0) {
3947 goto err;
3948 }
3949
3950 if (flags == RAM_CONTROL_SETUP) {
3951 RDMAControlHeader resp = {.type = RDMA_CONTROL_RAM_BLOCKS_RESULT };
3952 RDMALocalBlocks *local = &rdma->local_ram_blocks;
3953 int reg_result_idx, i, nb_dest_blocks;
3954
3955 head.type = RDMA_CONTROL_RAM_BLOCKS_REQUEST;
3956 trace_qemu_rdma_registration_stop_ram();
3957
3958 /*
3959 * Make sure that we parallelize the pinning on both sides.
3960 * For very large guests, doing this serially takes a really
3961 * long time, so we have to 'interleave' the pinning locally
3962 * with the control messages by performing the pinning on this
3963 * side before we receive the control response from the other
3964 * side that the pinning has completed.
3965 */
3966 ret = qemu_rdma_exchange_send(rdma, &head, NULL, &resp,
3967 &reg_result_idx, rdma->pin_all ?
3968 qemu_rdma_reg_whole_ram_blocks : NULL);
3969 if (ret < 0) {
3970 fprintf(stderr, "receiving remote info!");
3971 return -1;
3972 }
3973
3974 nb_dest_blocks = resp.len / sizeof(RDMADestBlock);
3975
3976 /*
3977 * The protocol uses two different sets of rkeys (mutually exclusive):
3978 * 1. One key to represent the virtual address of the entire ram block.
3979 * (dynamic chunk registration disabled - pin everything with one rkey.)
3980 * 2. One to represent individual chunks within a ram block.
3981 * (dynamic chunk registration enabled - pin individual chunks.)
3982 *
3983 * Once the capability is successfully negotiated, the destination transmits
3984 * the keys to use (or sends them later) including the virtual addresses
3985 * and then propagates the remote ram block descriptions to his local copy.
3986 */
3987
3988 if (local->nb_blocks != nb_dest_blocks) {
3989 fprintf(stderr, "ram blocks mismatch (Number of blocks %d vs %d) "
3990 "Your QEMU command line parameters are probably "
3991 "not identical on both the source and destination.",
3992 local->nb_blocks, nb_dest_blocks);
3993 rdma->errored = true;
3994 return -1;
3995 }
3996
3997 qemu_rdma_move_header(rdma, reg_result_idx, &resp);
3998 memcpy(rdma->dest_blocks,
3999 rdma->wr_data[reg_result_idx].control_curr, resp.len);
4000 for (i = 0; i < nb_dest_blocks; i++) {
4001 network_to_dest_block(&rdma->dest_blocks[i]);
4002
4003 /* We require that the blocks are in the same order */
4004 if (rdma->dest_blocks[i].length != local->block[i].length) {
4005 fprintf(stderr, "Block %s/%d has a different length %" PRIu64
4006 "vs %" PRIu64, local->block[i].block_name, i,
4007 local->block[i].length,
4008 rdma->dest_blocks[i].length);
4009 rdma->errored = true;
4010 return -1;
4011 }
4012 local->block[i].remote_host_addr =
4013 rdma->dest_blocks[i].remote_host_addr;
4014 local->block[i].remote_rkey = rdma->dest_blocks[i].remote_rkey;
4015 }
4016 }
4017
4018 trace_qemu_rdma_registration_stop(flags);
4019
4020 head.type = RDMA_CONTROL_REGISTER_FINISHED;
4021 ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL);
4022
4023 if (ret < 0) {
4024 goto err;
4025 }
4026
4027 return 0;
4028 err:
4029 rdma->errored = true;
4030 return -1;
4031 }
4032
4033 static const QEMUFileHooks rdma_read_hooks = {
4034 .hook_ram_load = rdma_load_hook,
4035 };
4036
4037 static const QEMUFileHooks rdma_write_hooks = {
4038 .before_ram_iterate = qemu_rdma_registration_start,
4039 .after_ram_iterate = qemu_rdma_registration_stop,
4040 .save_page = qemu_rdma_save_page,
4041 };
4042
4043
4044 static void qio_channel_rdma_finalize(Object *obj)
4045 {
4046 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(obj);
4047 if (rioc->rdmain) {
4048 qemu_rdma_cleanup(rioc->rdmain);
4049 g_free(rioc->rdmain);
4050 rioc->rdmain = NULL;
4051 }
4052 if (rioc->rdmaout) {
4053 qemu_rdma_cleanup(rioc->rdmaout);
4054 g_free(rioc->rdmaout);
4055 rioc->rdmaout = NULL;
4056 }
4057 }
4058
4059 static void qio_channel_rdma_class_init(ObjectClass *klass,
4060 void *class_data G_GNUC_UNUSED)
4061 {
4062 QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass);
4063
4064 ioc_klass->io_writev = qio_channel_rdma_writev;
4065 ioc_klass->io_readv = qio_channel_rdma_readv;
4066 ioc_klass->io_set_blocking = qio_channel_rdma_set_blocking;
4067 ioc_klass->io_close = qio_channel_rdma_close;
4068 ioc_klass->io_create_watch = qio_channel_rdma_create_watch;
4069 ioc_klass->io_set_aio_fd_handler = qio_channel_rdma_set_aio_fd_handler;
4070 ioc_klass->io_shutdown = qio_channel_rdma_shutdown;
4071 }
4072
4073 static const TypeInfo qio_channel_rdma_info = {
4074 .parent = TYPE_QIO_CHANNEL,
4075 .name = TYPE_QIO_CHANNEL_RDMA,
4076 .instance_size = sizeof(QIOChannelRDMA),
4077 .instance_finalize = qio_channel_rdma_finalize,
4078 .class_init = qio_channel_rdma_class_init,
4079 };
4080
4081 static void qio_channel_rdma_register_types(void)
4082 {
4083 type_register_static(&qio_channel_rdma_info);
4084 }
4085
4086 type_init(qio_channel_rdma_register_types);
4087
4088 static QEMUFile *rdma_new_input(RDMAContext *rdma)
4089 {
4090 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
4091
4092 rioc->file = qemu_file_new_input(QIO_CHANNEL(rioc));
4093 rioc->rdmain = rdma;
4094 rioc->rdmaout = rdma->return_path;
4095 qemu_file_set_hooks(rioc->file, &rdma_read_hooks);
4096
4097 return rioc->file;
4098 }
4099
4100 static QEMUFile *rdma_new_output(RDMAContext *rdma)
4101 {
4102 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
4103
4104 rioc->file = qemu_file_new_output(QIO_CHANNEL(rioc));
4105 rioc->rdmaout = rdma;
4106 rioc->rdmain = rdma->return_path;
4107 qemu_file_set_hooks(rioc->file, &rdma_write_hooks);
4108
4109 return rioc->file;
4110 }
4111
4112 static void rdma_accept_incoming_migration(void *opaque)
4113 {
4114 RDMAContext *rdma = opaque;
4115 int ret;
4116 QEMUFile *f;
4117 Error *local_err = NULL;
4118
4119 trace_qemu_rdma_accept_incoming_migration();
4120 ret = qemu_rdma_accept(rdma);
4121
4122 if (ret < 0) {
4123 fprintf(stderr, "RDMA ERROR: Migration initialization failed\n");
4124 return;
4125 }
4126
4127 trace_qemu_rdma_accept_incoming_migration_accepted();
4128
4129 if (rdma->is_return_path) {
4130 return;
4131 }
4132
4133 f = rdma_new_input(rdma);
4134 if (f == NULL) {
4135 fprintf(stderr, "RDMA ERROR: could not open RDMA for input\n");
4136 qemu_rdma_cleanup(rdma);
4137 return;
4138 }
4139
4140 rdma->migration_started_on_destination = 1;
4141 migration_fd_process_incoming(f, &local_err);
4142 if (local_err) {
4143 error_reportf_err(local_err, "RDMA ERROR:");
4144 }
4145 }
4146
4147 void rdma_start_incoming_migration(const char *host_port, Error **errp)
4148 {
4149 int ret;
4150 RDMAContext *rdma;
4151
4152 trace_rdma_start_incoming_migration();
4153
4154 /* Avoid ram_block_discard_disable(), cannot change during migration. */
4155 if (ram_block_discard_is_required()) {
4156 error_setg(errp, "RDMA: cannot disable RAM discard");
4157 return;
4158 }
4159
4160 rdma = qemu_rdma_data_init(host_port, errp);
4161 if (rdma == NULL) {
4162 goto err;
4163 }
4164
4165 ret = qemu_rdma_dest_init(rdma, errp);
4166 if (ret < 0) {
4167 goto err;
4168 }
4169
4170 trace_rdma_start_incoming_migration_after_dest_init();
4171
4172 ret = rdma_listen(rdma->listen_id, 5);
4173
4174 if (ret < 0) {
4175 error_setg(errp, "RDMA ERROR: listening on socket!");
4176 goto cleanup_rdma;
4177 }
4178
4179 trace_rdma_start_incoming_migration_after_rdma_listen();
4180
4181 qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
4182 NULL, (void *)(intptr_t)rdma);
4183 return;
4184
4185 cleanup_rdma:
4186 qemu_rdma_cleanup(rdma);
4187 err:
4188 if (rdma) {
4189 g_free(rdma->host);
4190 g_free(rdma->host_port);
4191 }
4192 g_free(rdma);
4193 }
4194
4195 void rdma_start_outgoing_migration(void *opaque,
4196 const char *host_port, Error **errp)
4197 {
4198 MigrationState *s = opaque;
4199 RDMAContext *rdma_return_path = NULL;
4200 RDMAContext *rdma;
4201 int ret;
4202
4203 /* Avoid ram_block_discard_disable(), cannot change during migration. */
4204 if (ram_block_discard_is_required()) {
4205 error_setg(errp, "RDMA: cannot disable RAM discard");
4206 return;
4207 }
4208
4209 rdma = qemu_rdma_data_init(host_port, errp);
4210 if (rdma == NULL) {
4211 goto err;
4212 }
4213
4214 ret = qemu_rdma_source_init(rdma, migrate_rdma_pin_all(), errp);
4215
4216 if (ret < 0) {
4217 goto err;
4218 }
4219
4220 trace_rdma_start_outgoing_migration_after_rdma_source_init();
4221 ret = qemu_rdma_connect(rdma, false, errp);
4222
4223 if (ret < 0) {
4224 goto err;
4225 }
4226
4227 /* RDMA postcopy need a separate queue pair for return path */
4228 if (migrate_postcopy() || migrate_return_path()) {
4229 rdma_return_path = qemu_rdma_data_init(host_port, errp);
4230
4231 if (rdma_return_path == NULL) {
4232 goto return_path_err;
4233 }
4234
4235 ret = qemu_rdma_source_init(rdma_return_path,
4236 migrate_rdma_pin_all(), errp);
4237
4238 if (ret < 0) {
4239 goto return_path_err;
4240 }
4241
4242 ret = qemu_rdma_connect(rdma_return_path, true, errp);
4243
4244 if (ret < 0) {
4245 goto return_path_err;
4246 }
4247
4248 rdma->return_path = rdma_return_path;
4249 rdma_return_path->return_path = rdma;
4250 rdma_return_path->is_return_path = true;
4251 }
4252
4253 trace_rdma_start_outgoing_migration_after_rdma_connect();
4254
4255 s->to_dst_file = rdma_new_output(rdma);
4256 migrate_fd_connect(s, NULL);
4257 return;
4258 return_path_err:
4259 qemu_rdma_cleanup(rdma);
4260 err:
4261 g_free(rdma);
4262 g_free(rdma_return_path);
4263 }