]> git.proxmox.com Git - mirror_qemu.git/blob - migration/rdma.c
migration/rdma: fix qemu_rdma_block_for_wrid error paths
[mirror_qemu.git] / migration / rdma.c
1 /*
2 * RDMA protocol and interfaces
3 *
4 * Copyright IBM, Corp. 2010-2013
5 * Copyright Red Hat, Inc. 2015-2016
6 *
7 * Authors:
8 * Michael R. Hines <mrhines@us.ibm.com>
9 * Jiuxing Liu <jl@us.ibm.com>
10 * Daniel P. Berrange <berrange@redhat.com>
11 *
12 * This work is licensed under the terms of the GNU GPL, version 2 or
13 * later. See the COPYING file in the top-level directory.
14 *
15 */
16 #include "qemu/osdep.h"
17 #include "qapi/error.h"
18 #include "qemu-common.h"
19 #include "qemu/cutils.h"
20 #include "rdma.h"
21 #include "migration.h"
22 #include "qemu-file.h"
23 #include "ram.h"
24 #include "qemu-file-channel.h"
25 #include "qemu/error-report.h"
26 #include "qemu/main-loop.h"
27 #include "qemu/sockets.h"
28 #include "qemu/bitmap.h"
29 #include "qemu/coroutine.h"
30 #include <sys/socket.h>
31 #include <netdb.h>
32 #include <arpa/inet.h>
33 #include <rdma/rdma_cma.h>
34 #include "trace.h"
35
36 /*
37 * Print and error on both the Monitor and the Log file.
38 */
39 #define ERROR(errp, fmt, ...) \
40 do { \
41 fprintf(stderr, "RDMA ERROR: " fmt "\n", ## __VA_ARGS__); \
42 if (errp && (*(errp) == NULL)) { \
43 error_setg(errp, "RDMA ERROR: " fmt, ## __VA_ARGS__); \
44 } \
45 } while (0)
46
47 #define RDMA_RESOLVE_TIMEOUT_MS 10000
48
49 /* Do not merge data if larger than this. */
50 #define RDMA_MERGE_MAX (2 * 1024 * 1024)
51 #define RDMA_SIGNALED_SEND_MAX (RDMA_MERGE_MAX / 4096)
52
53 #define RDMA_REG_CHUNK_SHIFT 20 /* 1 MB */
54
55 /*
56 * This is only for non-live state being migrated.
57 * Instead of RDMA_WRITE messages, we use RDMA_SEND
58 * messages for that state, which requires a different
59 * delivery design than main memory.
60 */
61 #define RDMA_SEND_INCREMENT 32768
62
63 /*
64 * Maximum size infiniband SEND message
65 */
66 #define RDMA_CONTROL_MAX_BUFFER (512 * 1024)
67 #define RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE 4096
68
69 #define RDMA_CONTROL_VERSION_CURRENT 1
70 /*
71 * Capabilities for negotiation.
72 */
73 #define RDMA_CAPABILITY_PIN_ALL 0x01
74
75 /*
76 * Add the other flags above to this list of known capabilities
77 * as they are introduced.
78 */
79 static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL;
80
81 #define CHECK_ERROR_STATE() \
82 do { \
83 if (rdma->error_state) { \
84 if (!rdma->error_reported) { \
85 error_report("RDMA is in an error state waiting migration" \
86 " to abort!"); \
87 rdma->error_reported = 1; \
88 } \
89 return rdma->error_state; \
90 } \
91 } while (0);
92
93 /*
94 * A work request ID is 64-bits and we split up these bits
95 * into 3 parts:
96 *
97 * bits 0-15 : type of control message, 2^16
98 * bits 16-29: ram block index, 2^14
99 * bits 30-63: ram block chunk number, 2^34
100 *
101 * The last two bit ranges are only used for RDMA writes,
102 * in order to track their completion and potentially
103 * also track unregistration status of the message.
104 */
105 #define RDMA_WRID_TYPE_SHIFT 0UL
106 #define RDMA_WRID_BLOCK_SHIFT 16UL
107 #define RDMA_WRID_CHUNK_SHIFT 30UL
108
109 #define RDMA_WRID_TYPE_MASK \
110 ((1UL << RDMA_WRID_BLOCK_SHIFT) - 1UL)
111
112 #define RDMA_WRID_BLOCK_MASK \
113 (~RDMA_WRID_TYPE_MASK & ((1UL << RDMA_WRID_CHUNK_SHIFT) - 1UL))
114
115 #define RDMA_WRID_CHUNK_MASK (~RDMA_WRID_BLOCK_MASK & ~RDMA_WRID_TYPE_MASK)
116
117 /*
118 * RDMA migration protocol:
119 * 1. RDMA Writes (data messages, i.e. RAM)
120 * 2. IB Send/Recv (control channel messages)
121 */
122 enum {
123 RDMA_WRID_NONE = 0,
124 RDMA_WRID_RDMA_WRITE = 1,
125 RDMA_WRID_SEND_CONTROL = 2000,
126 RDMA_WRID_RECV_CONTROL = 4000,
127 };
128
129 static const char *wrid_desc[] = {
130 [RDMA_WRID_NONE] = "NONE",
131 [RDMA_WRID_RDMA_WRITE] = "WRITE RDMA",
132 [RDMA_WRID_SEND_CONTROL] = "CONTROL SEND",
133 [RDMA_WRID_RECV_CONTROL] = "CONTROL RECV",
134 };
135
136 /*
137 * Work request IDs for IB SEND messages only (not RDMA writes).
138 * This is used by the migration protocol to transmit
139 * control messages (such as device state and registration commands)
140 *
141 * We could use more WRs, but we have enough for now.
142 */
143 enum {
144 RDMA_WRID_READY = 0,
145 RDMA_WRID_DATA,
146 RDMA_WRID_CONTROL,
147 RDMA_WRID_MAX,
148 };
149
150 /*
151 * SEND/RECV IB Control Messages.
152 */
153 enum {
154 RDMA_CONTROL_NONE = 0,
155 RDMA_CONTROL_ERROR,
156 RDMA_CONTROL_READY, /* ready to receive */
157 RDMA_CONTROL_QEMU_FILE, /* QEMUFile-transmitted bytes */
158 RDMA_CONTROL_RAM_BLOCKS_REQUEST, /* RAMBlock synchronization */
159 RDMA_CONTROL_RAM_BLOCKS_RESULT, /* RAMBlock synchronization */
160 RDMA_CONTROL_COMPRESS, /* page contains repeat values */
161 RDMA_CONTROL_REGISTER_REQUEST, /* dynamic page registration */
162 RDMA_CONTROL_REGISTER_RESULT, /* key to use after registration */
163 RDMA_CONTROL_REGISTER_FINISHED, /* current iteration finished */
164 RDMA_CONTROL_UNREGISTER_REQUEST, /* dynamic UN-registration */
165 RDMA_CONTROL_UNREGISTER_FINISHED, /* unpinning finished */
166 };
167
168 static const char *control_desc[] = {
169 [RDMA_CONTROL_NONE] = "NONE",
170 [RDMA_CONTROL_ERROR] = "ERROR",
171 [RDMA_CONTROL_READY] = "READY",
172 [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE",
173 [RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST",
174 [RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT",
175 [RDMA_CONTROL_COMPRESS] = "COMPRESS",
176 [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST",
177 [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT",
178 [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED",
179 [RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST",
180 [RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED",
181 };
182
183 /*
184 * Memory and MR structures used to represent an IB Send/Recv work request.
185 * This is *not* used for RDMA writes, only IB Send/Recv.
186 */
187 typedef struct {
188 uint8_t control[RDMA_CONTROL_MAX_BUFFER]; /* actual buffer to register */
189 struct ibv_mr *control_mr; /* registration metadata */
190 size_t control_len; /* length of the message */
191 uint8_t *control_curr; /* start of unconsumed bytes */
192 } RDMAWorkRequestData;
193
194 /*
195 * Negotiate RDMA capabilities during connection-setup time.
196 */
197 typedef struct {
198 uint32_t version;
199 uint32_t flags;
200 } RDMACapabilities;
201
202 static void caps_to_network(RDMACapabilities *cap)
203 {
204 cap->version = htonl(cap->version);
205 cap->flags = htonl(cap->flags);
206 }
207
208 static void network_to_caps(RDMACapabilities *cap)
209 {
210 cap->version = ntohl(cap->version);
211 cap->flags = ntohl(cap->flags);
212 }
213
214 /*
215 * Representation of a RAMBlock from an RDMA perspective.
216 * This is not transmitted, only local.
217 * This and subsequent structures cannot be linked lists
218 * because we're using a single IB message to transmit
219 * the information. It's small anyway, so a list is overkill.
220 */
221 typedef struct RDMALocalBlock {
222 char *block_name;
223 uint8_t *local_host_addr; /* local virtual address */
224 uint64_t remote_host_addr; /* remote virtual address */
225 uint64_t offset;
226 uint64_t length;
227 struct ibv_mr **pmr; /* MRs for chunk-level registration */
228 struct ibv_mr *mr; /* MR for non-chunk-level registration */
229 uint32_t *remote_keys; /* rkeys for chunk-level registration */
230 uint32_t remote_rkey; /* rkeys for non-chunk-level registration */
231 int index; /* which block are we */
232 unsigned int src_index; /* (Only used on dest) */
233 bool is_ram_block;
234 int nb_chunks;
235 unsigned long *transit_bitmap;
236 unsigned long *unregister_bitmap;
237 } RDMALocalBlock;
238
239 /*
240 * Also represents a RAMblock, but only on the dest.
241 * This gets transmitted by the dest during connection-time
242 * to the source VM and then is used to populate the
243 * corresponding RDMALocalBlock with
244 * the information needed to perform the actual RDMA.
245 */
246 typedef struct QEMU_PACKED RDMADestBlock {
247 uint64_t remote_host_addr;
248 uint64_t offset;
249 uint64_t length;
250 uint32_t remote_rkey;
251 uint32_t padding;
252 } RDMADestBlock;
253
254 static uint64_t htonll(uint64_t v)
255 {
256 union { uint32_t lv[2]; uint64_t llv; } u;
257 u.lv[0] = htonl(v >> 32);
258 u.lv[1] = htonl(v & 0xFFFFFFFFULL);
259 return u.llv;
260 }
261
262 static uint64_t ntohll(uint64_t v) {
263 union { uint32_t lv[2]; uint64_t llv; } u;
264 u.llv = v;
265 return ((uint64_t)ntohl(u.lv[0]) << 32) | (uint64_t) ntohl(u.lv[1]);
266 }
267
268 static void dest_block_to_network(RDMADestBlock *db)
269 {
270 db->remote_host_addr = htonll(db->remote_host_addr);
271 db->offset = htonll(db->offset);
272 db->length = htonll(db->length);
273 db->remote_rkey = htonl(db->remote_rkey);
274 }
275
276 static void network_to_dest_block(RDMADestBlock *db)
277 {
278 db->remote_host_addr = ntohll(db->remote_host_addr);
279 db->offset = ntohll(db->offset);
280 db->length = ntohll(db->length);
281 db->remote_rkey = ntohl(db->remote_rkey);
282 }
283
284 /*
285 * Virtual address of the above structures used for transmitting
286 * the RAMBlock descriptions at connection-time.
287 * This structure is *not* transmitted.
288 */
289 typedef struct RDMALocalBlocks {
290 int nb_blocks;
291 bool init; /* main memory init complete */
292 RDMALocalBlock *block;
293 } RDMALocalBlocks;
294
295 /*
296 * Main data structure for RDMA state.
297 * While there is only one copy of this structure being allocated right now,
298 * this is the place where one would start if you wanted to consider
299 * having more than one RDMA connection open at the same time.
300 */
301 typedef struct RDMAContext {
302 char *host;
303 int port;
304
305 RDMAWorkRequestData wr_data[RDMA_WRID_MAX];
306
307 /*
308 * This is used by *_exchange_send() to figure out whether or not
309 * the initial "READY" message has already been received or not.
310 * This is because other functions may potentially poll() and detect
311 * the READY message before send() does, in which case we need to
312 * know if it completed.
313 */
314 int control_ready_expected;
315
316 /* number of outstanding writes */
317 int nb_sent;
318
319 /* store info about current buffer so that we can
320 merge it with future sends */
321 uint64_t current_addr;
322 uint64_t current_length;
323 /* index of ram block the current buffer belongs to */
324 int current_index;
325 /* index of the chunk in the current ram block */
326 int current_chunk;
327
328 bool pin_all;
329
330 /*
331 * infiniband-specific variables for opening the device
332 * and maintaining connection state and so forth.
333 *
334 * cm_id also has ibv_context, rdma_event_channel, and ibv_qp in
335 * cm_id->verbs, cm_id->channel, and cm_id->qp.
336 */
337 struct rdma_cm_id *cm_id; /* connection manager ID */
338 struct rdma_cm_id *listen_id;
339 bool connected;
340
341 struct ibv_context *verbs;
342 struct rdma_event_channel *channel;
343 struct ibv_qp *qp; /* queue pair */
344 struct ibv_comp_channel *comp_channel; /* completion channel */
345 struct ibv_pd *pd; /* protection domain */
346 struct ibv_cq *cq; /* completion queue */
347
348 /*
349 * If a previous write failed (perhaps because of a failed
350 * memory registration, then do not attempt any future work
351 * and remember the error state.
352 */
353 int error_state;
354 int error_reported;
355 int received_error;
356
357 /*
358 * Description of ram blocks used throughout the code.
359 */
360 RDMALocalBlocks local_ram_blocks;
361 RDMADestBlock *dest_blocks;
362
363 /* Index of the next RAMBlock received during block registration */
364 unsigned int next_src_index;
365
366 /*
367 * Migration on *destination* started.
368 * Then use coroutine yield function.
369 * Source runs in a thread, so we don't care.
370 */
371 int migration_started_on_destination;
372
373 int total_registrations;
374 int total_writes;
375
376 int unregister_current, unregister_next;
377 uint64_t unregistrations[RDMA_SIGNALED_SEND_MAX];
378
379 GHashTable *blockmap;
380 } RDMAContext;
381
382 #define TYPE_QIO_CHANNEL_RDMA "qio-channel-rdma"
383 #define QIO_CHANNEL_RDMA(obj) \
384 OBJECT_CHECK(QIOChannelRDMA, (obj), TYPE_QIO_CHANNEL_RDMA)
385
386 typedef struct QIOChannelRDMA QIOChannelRDMA;
387
388
389 struct QIOChannelRDMA {
390 QIOChannel parent;
391 RDMAContext *rdma;
392 QEMUFile *file;
393 size_t len;
394 bool blocking; /* XXX we don't actually honour this yet */
395 };
396
397 /*
398 * Main structure for IB Send/Recv control messages.
399 * This gets prepended at the beginning of every Send/Recv.
400 */
401 typedef struct QEMU_PACKED {
402 uint32_t len; /* Total length of data portion */
403 uint32_t type; /* which control command to perform */
404 uint32_t repeat; /* number of commands in data portion of same type */
405 uint32_t padding;
406 } RDMAControlHeader;
407
408 static void control_to_network(RDMAControlHeader *control)
409 {
410 control->type = htonl(control->type);
411 control->len = htonl(control->len);
412 control->repeat = htonl(control->repeat);
413 }
414
415 static void network_to_control(RDMAControlHeader *control)
416 {
417 control->type = ntohl(control->type);
418 control->len = ntohl(control->len);
419 control->repeat = ntohl(control->repeat);
420 }
421
422 /*
423 * Register a single Chunk.
424 * Information sent by the source VM to inform the dest
425 * to register an single chunk of memory before we can perform
426 * the actual RDMA operation.
427 */
428 typedef struct QEMU_PACKED {
429 union QEMU_PACKED {
430 uint64_t current_addr; /* offset into the ram_addr_t space */
431 uint64_t chunk; /* chunk to lookup if unregistering */
432 } key;
433 uint32_t current_index; /* which ramblock the chunk belongs to */
434 uint32_t padding;
435 uint64_t chunks; /* how many sequential chunks to register */
436 } RDMARegister;
437
438 static void register_to_network(RDMAContext *rdma, RDMARegister *reg)
439 {
440 RDMALocalBlock *local_block;
441 local_block = &rdma->local_ram_blocks.block[reg->current_index];
442
443 if (local_block->is_ram_block) {
444 /*
445 * current_addr as passed in is an address in the local ram_addr_t
446 * space, we need to translate this for the destination
447 */
448 reg->key.current_addr -= local_block->offset;
449 reg->key.current_addr += rdma->dest_blocks[reg->current_index].offset;
450 }
451 reg->key.current_addr = htonll(reg->key.current_addr);
452 reg->current_index = htonl(reg->current_index);
453 reg->chunks = htonll(reg->chunks);
454 }
455
456 static void network_to_register(RDMARegister *reg)
457 {
458 reg->key.current_addr = ntohll(reg->key.current_addr);
459 reg->current_index = ntohl(reg->current_index);
460 reg->chunks = ntohll(reg->chunks);
461 }
462
463 typedef struct QEMU_PACKED {
464 uint32_t value; /* if zero, we will madvise() */
465 uint32_t block_idx; /* which ram block index */
466 uint64_t offset; /* Address in remote ram_addr_t space */
467 uint64_t length; /* length of the chunk */
468 } RDMACompress;
469
470 static void compress_to_network(RDMAContext *rdma, RDMACompress *comp)
471 {
472 comp->value = htonl(comp->value);
473 /*
474 * comp->offset as passed in is an address in the local ram_addr_t
475 * space, we need to translate this for the destination
476 */
477 comp->offset -= rdma->local_ram_blocks.block[comp->block_idx].offset;
478 comp->offset += rdma->dest_blocks[comp->block_idx].offset;
479 comp->block_idx = htonl(comp->block_idx);
480 comp->offset = htonll(comp->offset);
481 comp->length = htonll(comp->length);
482 }
483
484 static void network_to_compress(RDMACompress *comp)
485 {
486 comp->value = ntohl(comp->value);
487 comp->block_idx = ntohl(comp->block_idx);
488 comp->offset = ntohll(comp->offset);
489 comp->length = ntohll(comp->length);
490 }
491
492 /*
493 * The result of the dest's memory registration produces an "rkey"
494 * which the source VM must reference in order to perform
495 * the RDMA operation.
496 */
497 typedef struct QEMU_PACKED {
498 uint32_t rkey;
499 uint32_t padding;
500 uint64_t host_addr;
501 } RDMARegisterResult;
502
503 static void result_to_network(RDMARegisterResult *result)
504 {
505 result->rkey = htonl(result->rkey);
506 result->host_addr = htonll(result->host_addr);
507 };
508
509 static void network_to_result(RDMARegisterResult *result)
510 {
511 result->rkey = ntohl(result->rkey);
512 result->host_addr = ntohll(result->host_addr);
513 };
514
515 const char *print_wrid(int wrid);
516 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
517 uint8_t *data, RDMAControlHeader *resp,
518 int *resp_idx,
519 int (*callback)(RDMAContext *rdma));
520
521 static inline uint64_t ram_chunk_index(const uint8_t *start,
522 const uint8_t *host)
523 {
524 return ((uintptr_t) host - (uintptr_t) start) >> RDMA_REG_CHUNK_SHIFT;
525 }
526
527 static inline uint8_t *ram_chunk_start(const RDMALocalBlock *rdma_ram_block,
528 uint64_t i)
529 {
530 return (uint8_t *)(uintptr_t)(rdma_ram_block->local_host_addr +
531 (i << RDMA_REG_CHUNK_SHIFT));
532 }
533
534 static inline uint8_t *ram_chunk_end(const RDMALocalBlock *rdma_ram_block,
535 uint64_t i)
536 {
537 uint8_t *result = ram_chunk_start(rdma_ram_block, i) +
538 (1UL << RDMA_REG_CHUNK_SHIFT);
539
540 if (result > (rdma_ram_block->local_host_addr + rdma_ram_block->length)) {
541 result = rdma_ram_block->local_host_addr + rdma_ram_block->length;
542 }
543
544 return result;
545 }
546
547 static int rdma_add_block(RDMAContext *rdma, const char *block_name,
548 void *host_addr,
549 ram_addr_t block_offset, uint64_t length)
550 {
551 RDMALocalBlocks *local = &rdma->local_ram_blocks;
552 RDMALocalBlock *block;
553 RDMALocalBlock *old = local->block;
554
555 local->block = g_new0(RDMALocalBlock, local->nb_blocks + 1);
556
557 if (local->nb_blocks) {
558 int x;
559
560 if (rdma->blockmap) {
561 for (x = 0; x < local->nb_blocks; x++) {
562 g_hash_table_remove(rdma->blockmap,
563 (void *)(uintptr_t)old[x].offset);
564 g_hash_table_insert(rdma->blockmap,
565 (void *)(uintptr_t)old[x].offset,
566 &local->block[x]);
567 }
568 }
569 memcpy(local->block, old, sizeof(RDMALocalBlock) * local->nb_blocks);
570 g_free(old);
571 }
572
573 block = &local->block[local->nb_blocks];
574
575 block->block_name = g_strdup(block_name);
576 block->local_host_addr = host_addr;
577 block->offset = block_offset;
578 block->length = length;
579 block->index = local->nb_blocks;
580 block->src_index = ~0U; /* Filled in by the receipt of the block list */
581 block->nb_chunks = ram_chunk_index(host_addr, host_addr + length) + 1UL;
582 block->transit_bitmap = bitmap_new(block->nb_chunks);
583 bitmap_clear(block->transit_bitmap, 0, block->nb_chunks);
584 block->unregister_bitmap = bitmap_new(block->nb_chunks);
585 bitmap_clear(block->unregister_bitmap, 0, block->nb_chunks);
586 block->remote_keys = g_new0(uint32_t, block->nb_chunks);
587
588 block->is_ram_block = local->init ? false : true;
589
590 if (rdma->blockmap) {
591 g_hash_table_insert(rdma->blockmap, (void *)(uintptr_t)block_offset, block);
592 }
593
594 trace_rdma_add_block(block_name, local->nb_blocks,
595 (uintptr_t) block->local_host_addr,
596 block->offset, block->length,
597 (uintptr_t) (block->local_host_addr + block->length),
598 BITS_TO_LONGS(block->nb_chunks) *
599 sizeof(unsigned long) * 8,
600 block->nb_chunks);
601
602 local->nb_blocks++;
603
604 return 0;
605 }
606
607 /*
608 * Memory regions need to be registered with the device and queue pairs setup
609 * in advanced before the migration starts. This tells us where the RAM blocks
610 * are so that we can register them individually.
611 */
612 static int qemu_rdma_init_one_block(const char *block_name, void *host_addr,
613 ram_addr_t block_offset, ram_addr_t length, void *opaque)
614 {
615 return rdma_add_block(opaque, block_name, host_addr, block_offset, length);
616 }
617
618 /*
619 * Identify the RAMBlocks and their quantity. They will be references to
620 * identify chunk boundaries inside each RAMBlock and also be referenced
621 * during dynamic page registration.
622 */
623 static int qemu_rdma_init_ram_blocks(RDMAContext *rdma)
624 {
625 RDMALocalBlocks *local = &rdma->local_ram_blocks;
626
627 assert(rdma->blockmap == NULL);
628 memset(local, 0, sizeof *local);
629 qemu_ram_foreach_block(qemu_rdma_init_one_block, rdma);
630 trace_qemu_rdma_init_ram_blocks(local->nb_blocks);
631 rdma->dest_blocks = g_new0(RDMADestBlock,
632 rdma->local_ram_blocks.nb_blocks);
633 local->init = true;
634 return 0;
635 }
636
637 /*
638 * Note: If used outside of cleanup, the caller must ensure that the destination
639 * block structures are also updated
640 */
641 static int rdma_delete_block(RDMAContext *rdma, RDMALocalBlock *block)
642 {
643 RDMALocalBlocks *local = &rdma->local_ram_blocks;
644 RDMALocalBlock *old = local->block;
645 int x;
646
647 if (rdma->blockmap) {
648 g_hash_table_remove(rdma->blockmap, (void *)(uintptr_t)block->offset);
649 }
650 if (block->pmr) {
651 int j;
652
653 for (j = 0; j < block->nb_chunks; j++) {
654 if (!block->pmr[j]) {
655 continue;
656 }
657 ibv_dereg_mr(block->pmr[j]);
658 rdma->total_registrations--;
659 }
660 g_free(block->pmr);
661 block->pmr = NULL;
662 }
663
664 if (block->mr) {
665 ibv_dereg_mr(block->mr);
666 rdma->total_registrations--;
667 block->mr = NULL;
668 }
669
670 g_free(block->transit_bitmap);
671 block->transit_bitmap = NULL;
672
673 g_free(block->unregister_bitmap);
674 block->unregister_bitmap = NULL;
675
676 g_free(block->remote_keys);
677 block->remote_keys = NULL;
678
679 g_free(block->block_name);
680 block->block_name = NULL;
681
682 if (rdma->blockmap) {
683 for (x = 0; x < local->nb_blocks; x++) {
684 g_hash_table_remove(rdma->blockmap,
685 (void *)(uintptr_t)old[x].offset);
686 }
687 }
688
689 if (local->nb_blocks > 1) {
690
691 local->block = g_new0(RDMALocalBlock, local->nb_blocks - 1);
692
693 if (block->index) {
694 memcpy(local->block, old, sizeof(RDMALocalBlock) * block->index);
695 }
696
697 if (block->index < (local->nb_blocks - 1)) {
698 memcpy(local->block + block->index, old + (block->index + 1),
699 sizeof(RDMALocalBlock) *
700 (local->nb_blocks - (block->index + 1)));
701 }
702 } else {
703 assert(block == local->block);
704 local->block = NULL;
705 }
706
707 trace_rdma_delete_block(block, (uintptr_t)block->local_host_addr,
708 block->offset, block->length,
709 (uintptr_t)(block->local_host_addr + block->length),
710 BITS_TO_LONGS(block->nb_chunks) *
711 sizeof(unsigned long) * 8, block->nb_chunks);
712
713 g_free(old);
714
715 local->nb_blocks--;
716
717 if (local->nb_blocks && rdma->blockmap) {
718 for (x = 0; x < local->nb_blocks; x++) {
719 g_hash_table_insert(rdma->blockmap,
720 (void *)(uintptr_t)local->block[x].offset,
721 &local->block[x]);
722 }
723 }
724
725 return 0;
726 }
727
728 /*
729 * Put in the log file which RDMA device was opened and the details
730 * associated with that device.
731 */
732 static void qemu_rdma_dump_id(const char *who, struct ibv_context *verbs)
733 {
734 struct ibv_port_attr port;
735
736 if (ibv_query_port(verbs, 1, &port)) {
737 error_report("Failed to query port information");
738 return;
739 }
740
741 printf("%s RDMA Device opened: kernel name %s "
742 "uverbs device name %s, "
743 "infiniband_verbs class device path %s, "
744 "infiniband class device path %s, "
745 "transport: (%d) %s\n",
746 who,
747 verbs->device->name,
748 verbs->device->dev_name,
749 verbs->device->dev_path,
750 verbs->device->ibdev_path,
751 port.link_layer,
752 (port.link_layer == IBV_LINK_LAYER_INFINIBAND) ? "Infiniband" :
753 ((port.link_layer == IBV_LINK_LAYER_ETHERNET)
754 ? "Ethernet" : "Unknown"));
755 }
756
757 /*
758 * Put in the log file the RDMA gid addressing information,
759 * useful for folks who have trouble understanding the
760 * RDMA device hierarchy in the kernel.
761 */
762 static void qemu_rdma_dump_gid(const char *who, struct rdma_cm_id *id)
763 {
764 char sgid[33];
765 char dgid[33];
766 inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.sgid, sgid, sizeof sgid);
767 inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.dgid, dgid, sizeof dgid);
768 trace_qemu_rdma_dump_gid(who, sgid, dgid);
769 }
770
771 /*
772 * As of now, IPv6 over RoCE / iWARP is not supported by linux.
773 * We will try the next addrinfo struct, and fail if there are
774 * no other valid addresses to bind against.
775 *
776 * If user is listening on '[::]', then we will not have a opened a device
777 * yet and have no way of verifying if the device is RoCE or not.
778 *
779 * In this case, the source VM will throw an error for ALL types of
780 * connections (both IPv4 and IPv6) if the destination machine does not have
781 * a regular infiniband network available for use.
782 *
783 * The only way to guarantee that an error is thrown for broken kernels is
784 * for the management software to choose a *specific* interface at bind time
785 * and validate what time of hardware it is.
786 *
787 * Unfortunately, this puts the user in a fix:
788 *
789 * If the source VM connects with an IPv4 address without knowing that the
790 * destination has bound to '[::]' the migration will unconditionally fail
791 * unless the management software is explicitly listening on the IPv4
792 * address while using a RoCE-based device.
793 *
794 * If the source VM connects with an IPv6 address, then we're OK because we can
795 * throw an error on the source (and similarly on the destination).
796 *
797 * But in mixed environments, this will be broken for a while until it is fixed
798 * inside linux.
799 *
800 * We do provide a *tiny* bit of help in this function: We can list all of the
801 * devices in the system and check to see if all the devices are RoCE or
802 * Infiniband.
803 *
804 * If we detect that we have a *pure* RoCE environment, then we can safely
805 * thrown an error even if the management software has specified '[::]' as the
806 * bind address.
807 *
808 * However, if there is are multiple hetergeneous devices, then we cannot make
809 * this assumption and the user just has to be sure they know what they are
810 * doing.
811 *
812 * Patches are being reviewed on linux-rdma.
813 */
814 static int qemu_rdma_broken_ipv6_kernel(struct ibv_context *verbs, Error **errp)
815 {
816 struct ibv_port_attr port_attr;
817
818 /* This bug only exists in linux, to our knowledge. */
819 #ifdef CONFIG_LINUX
820
821 /*
822 * Verbs are only NULL if management has bound to '[::]'.
823 *
824 * Let's iterate through all the devices and see if there any pure IB
825 * devices (non-ethernet).
826 *
827 * If not, then we can safely proceed with the migration.
828 * Otherwise, there are no guarantees until the bug is fixed in linux.
829 */
830 if (!verbs) {
831 int num_devices, x;
832 struct ibv_device ** dev_list = ibv_get_device_list(&num_devices);
833 bool roce_found = false;
834 bool ib_found = false;
835
836 for (x = 0; x < num_devices; x++) {
837 verbs = ibv_open_device(dev_list[x]);
838 if (!verbs) {
839 if (errno == EPERM) {
840 continue;
841 } else {
842 return -EINVAL;
843 }
844 }
845
846 if (ibv_query_port(verbs, 1, &port_attr)) {
847 ibv_close_device(verbs);
848 ERROR(errp, "Could not query initial IB port");
849 return -EINVAL;
850 }
851
852 if (port_attr.link_layer == IBV_LINK_LAYER_INFINIBAND) {
853 ib_found = true;
854 } else if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) {
855 roce_found = true;
856 }
857
858 ibv_close_device(verbs);
859
860 }
861
862 if (roce_found) {
863 if (ib_found) {
864 fprintf(stderr, "WARN: migrations may fail:"
865 " IPv6 over RoCE / iWARP in linux"
866 " is broken. But since you appear to have a"
867 " mixed RoCE / IB environment, be sure to only"
868 " migrate over the IB fabric until the kernel "
869 " fixes the bug.\n");
870 } else {
871 ERROR(errp, "You only have RoCE / iWARP devices in your systems"
872 " and your management software has specified '[::]'"
873 ", but IPv6 over RoCE / iWARP is not supported in Linux.");
874 return -ENONET;
875 }
876 }
877
878 return 0;
879 }
880
881 /*
882 * If we have a verbs context, that means that some other than '[::]' was
883 * used by the management software for binding. In which case we can
884 * actually warn the user about a potentially broken kernel.
885 */
886
887 /* IB ports start with 1, not 0 */
888 if (ibv_query_port(verbs, 1, &port_attr)) {
889 ERROR(errp, "Could not query initial IB port");
890 return -EINVAL;
891 }
892
893 if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) {
894 ERROR(errp, "Linux kernel's RoCE / iWARP does not support IPv6 "
895 "(but patches on linux-rdma in progress)");
896 return -ENONET;
897 }
898
899 #endif
900
901 return 0;
902 }
903
904 /*
905 * Figure out which RDMA device corresponds to the requested IP hostname
906 * Also create the initial connection manager identifiers for opening
907 * the connection.
908 */
909 static int qemu_rdma_resolve_host(RDMAContext *rdma, Error **errp)
910 {
911 int ret;
912 struct rdma_addrinfo *res;
913 char port_str[16];
914 struct rdma_cm_event *cm_event;
915 char ip[40] = "unknown";
916 struct rdma_addrinfo *e;
917
918 if (rdma->host == NULL || !strcmp(rdma->host, "")) {
919 ERROR(errp, "RDMA hostname has not been set");
920 return -EINVAL;
921 }
922
923 /* create CM channel */
924 rdma->channel = rdma_create_event_channel();
925 if (!rdma->channel) {
926 ERROR(errp, "could not create CM channel");
927 return -EINVAL;
928 }
929
930 /* create CM id */
931 ret = rdma_create_id(rdma->channel, &rdma->cm_id, NULL, RDMA_PS_TCP);
932 if (ret) {
933 ERROR(errp, "could not create channel id");
934 goto err_resolve_create_id;
935 }
936
937 snprintf(port_str, 16, "%d", rdma->port);
938 port_str[15] = '\0';
939
940 ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res);
941 if (ret < 0) {
942 ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host);
943 goto err_resolve_get_addr;
944 }
945
946 for (e = res; e != NULL; e = e->ai_next) {
947 inet_ntop(e->ai_family,
948 &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip);
949 trace_qemu_rdma_resolve_host_trying(rdma->host, ip);
950
951 ret = rdma_resolve_addr(rdma->cm_id, NULL, e->ai_dst_addr,
952 RDMA_RESOLVE_TIMEOUT_MS);
953 if (!ret) {
954 if (e->ai_family == AF_INET6) {
955 ret = qemu_rdma_broken_ipv6_kernel(rdma->cm_id->verbs, errp);
956 if (ret) {
957 continue;
958 }
959 }
960 goto route;
961 }
962 }
963
964 ERROR(errp, "could not resolve address %s", rdma->host);
965 goto err_resolve_get_addr;
966
967 route:
968 qemu_rdma_dump_gid("source_resolve_addr", rdma->cm_id);
969
970 ret = rdma_get_cm_event(rdma->channel, &cm_event);
971 if (ret) {
972 ERROR(errp, "could not perform event_addr_resolved");
973 goto err_resolve_get_addr;
974 }
975
976 if (cm_event->event != RDMA_CM_EVENT_ADDR_RESOLVED) {
977 ERROR(errp, "result not equal to event_addr_resolved %s",
978 rdma_event_str(cm_event->event));
979 perror("rdma_resolve_addr");
980 rdma_ack_cm_event(cm_event);
981 ret = -EINVAL;
982 goto err_resolve_get_addr;
983 }
984 rdma_ack_cm_event(cm_event);
985
986 /* resolve route */
987 ret = rdma_resolve_route(rdma->cm_id, RDMA_RESOLVE_TIMEOUT_MS);
988 if (ret) {
989 ERROR(errp, "could not resolve rdma route");
990 goto err_resolve_get_addr;
991 }
992
993 ret = rdma_get_cm_event(rdma->channel, &cm_event);
994 if (ret) {
995 ERROR(errp, "could not perform event_route_resolved");
996 goto err_resolve_get_addr;
997 }
998 if (cm_event->event != RDMA_CM_EVENT_ROUTE_RESOLVED) {
999 ERROR(errp, "result not equal to event_route_resolved: %s",
1000 rdma_event_str(cm_event->event));
1001 rdma_ack_cm_event(cm_event);
1002 ret = -EINVAL;
1003 goto err_resolve_get_addr;
1004 }
1005 rdma_ack_cm_event(cm_event);
1006 rdma->verbs = rdma->cm_id->verbs;
1007 qemu_rdma_dump_id("source_resolve_host", rdma->cm_id->verbs);
1008 qemu_rdma_dump_gid("source_resolve_host", rdma->cm_id);
1009 return 0;
1010
1011 err_resolve_get_addr:
1012 rdma_destroy_id(rdma->cm_id);
1013 rdma->cm_id = NULL;
1014 err_resolve_create_id:
1015 rdma_destroy_event_channel(rdma->channel);
1016 rdma->channel = NULL;
1017 return ret;
1018 }
1019
1020 /*
1021 * Create protection domain and completion queues
1022 */
1023 static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma)
1024 {
1025 /* allocate pd */
1026 rdma->pd = ibv_alloc_pd(rdma->verbs);
1027 if (!rdma->pd) {
1028 error_report("failed to allocate protection domain");
1029 return -1;
1030 }
1031
1032 /* create completion channel */
1033 rdma->comp_channel = ibv_create_comp_channel(rdma->verbs);
1034 if (!rdma->comp_channel) {
1035 error_report("failed to allocate completion channel");
1036 goto err_alloc_pd_cq;
1037 }
1038
1039 /*
1040 * Completion queue can be filled by both read and write work requests,
1041 * so must reflect the sum of both possible queue sizes.
1042 */
1043 rdma->cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3),
1044 NULL, rdma->comp_channel, 0);
1045 if (!rdma->cq) {
1046 error_report("failed to allocate completion queue");
1047 goto err_alloc_pd_cq;
1048 }
1049
1050 return 0;
1051
1052 err_alloc_pd_cq:
1053 if (rdma->pd) {
1054 ibv_dealloc_pd(rdma->pd);
1055 }
1056 if (rdma->comp_channel) {
1057 ibv_destroy_comp_channel(rdma->comp_channel);
1058 }
1059 rdma->pd = NULL;
1060 rdma->comp_channel = NULL;
1061 return -1;
1062
1063 }
1064
1065 /*
1066 * Create queue pairs.
1067 */
1068 static int qemu_rdma_alloc_qp(RDMAContext *rdma)
1069 {
1070 struct ibv_qp_init_attr attr = { 0 };
1071 int ret;
1072
1073 attr.cap.max_send_wr = RDMA_SIGNALED_SEND_MAX;
1074 attr.cap.max_recv_wr = 3;
1075 attr.cap.max_send_sge = 1;
1076 attr.cap.max_recv_sge = 1;
1077 attr.send_cq = rdma->cq;
1078 attr.recv_cq = rdma->cq;
1079 attr.qp_type = IBV_QPT_RC;
1080
1081 ret = rdma_create_qp(rdma->cm_id, rdma->pd, &attr);
1082 if (ret) {
1083 return -1;
1084 }
1085
1086 rdma->qp = rdma->cm_id->qp;
1087 return 0;
1088 }
1089
1090 static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma)
1091 {
1092 int i;
1093 RDMALocalBlocks *local = &rdma->local_ram_blocks;
1094
1095 for (i = 0; i < local->nb_blocks; i++) {
1096 local->block[i].mr =
1097 ibv_reg_mr(rdma->pd,
1098 local->block[i].local_host_addr,
1099 local->block[i].length,
1100 IBV_ACCESS_LOCAL_WRITE |
1101 IBV_ACCESS_REMOTE_WRITE
1102 );
1103 if (!local->block[i].mr) {
1104 perror("Failed to register local dest ram block!\n");
1105 break;
1106 }
1107 rdma->total_registrations++;
1108 }
1109
1110 if (i >= local->nb_blocks) {
1111 return 0;
1112 }
1113
1114 for (i--; i >= 0; i--) {
1115 ibv_dereg_mr(local->block[i].mr);
1116 rdma->total_registrations--;
1117 }
1118
1119 return -1;
1120
1121 }
1122
1123 /*
1124 * Find the ram block that corresponds to the page requested to be
1125 * transmitted by QEMU.
1126 *
1127 * Once the block is found, also identify which 'chunk' within that
1128 * block that the page belongs to.
1129 *
1130 * This search cannot fail or the migration will fail.
1131 */
1132 static int qemu_rdma_search_ram_block(RDMAContext *rdma,
1133 uintptr_t block_offset,
1134 uint64_t offset,
1135 uint64_t length,
1136 uint64_t *block_index,
1137 uint64_t *chunk_index)
1138 {
1139 uint64_t current_addr = block_offset + offset;
1140 RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap,
1141 (void *) block_offset);
1142 assert(block);
1143 assert(current_addr >= block->offset);
1144 assert((current_addr + length) <= (block->offset + block->length));
1145
1146 *block_index = block->index;
1147 *chunk_index = ram_chunk_index(block->local_host_addr,
1148 block->local_host_addr + (current_addr - block->offset));
1149
1150 return 0;
1151 }
1152
1153 /*
1154 * Register a chunk with IB. If the chunk was already registered
1155 * previously, then skip.
1156 *
1157 * Also return the keys associated with the registration needed
1158 * to perform the actual RDMA operation.
1159 */
1160 static int qemu_rdma_register_and_get_keys(RDMAContext *rdma,
1161 RDMALocalBlock *block, uintptr_t host_addr,
1162 uint32_t *lkey, uint32_t *rkey, int chunk,
1163 uint8_t *chunk_start, uint8_t *chunk_end)
1164 {
1165 if (block->mr) {
1166 if (lkey) {
1167 *lkey = block->mr->lkey;
1168 }
1169 if (rkey) {
1170 *rkey = block->mr->rkey;
1171 }
1172 return 0;
1173 }
1174
1175 /* allocate memory to store chunk MRs */
1176 if (!block->pmr) {
1177 block->pmr = g_new0(struct ibv_mr *, block->nb_chunks);
1178 }
1179
1180 /*
1181 * If 'rkey', then we're the destination, so grant access to the source.
1182 *
1183 * If 'lkey', then we're the source VM, so grant access only to ourselves.
1184 */
1185 if (!block->pmr[chunk]) {
1186 uint64_t len = chunk_end - chunk_start;
1187
1188 trace_qemu_rdma_register_and_get_keys(len, chunk_start);
1189
1190 block->pmr[chunk] = ibv_reg_mr(rdma->pd,
1191 chunk_start, len,
1192 (rkey ? (IBV_ACCESS_LOCAL_WRITE |
1193 IBV_ACCESS_REMOTE_WRITE) : 0));
1194
1195 if (!block->pmr[chunk]) {
1196 perror("Failed to register chunk!");
1197 fprintf(stderr, "Chunk details: block: %d chunk index %d"
1198 " start %" PRIuPTR " end %" PRIuPTR
1199 " host %" PRIuPTR
1200 " local %" PRIuPTR " registrations: %d\n",
1201 block->index, chunk, (uintptr_t)chunk_start,
1202 (uintptr_t)chunk_end, host_addr,
1203 (uintptr_t)block->local_host_addr,
1204 rdma->total_registrations);
1205 return -1;
1206 }
1207 rdma->total_registrations++;
1208 }
1209
1210 if (lkey) {
1211 *lkey = block->pmr[chunk]->lkey;
1212 }
1213 if (rkey) {
1214 *rkey = block->pmr[chunk]->rkey;
1215 }
1216 return 0;
1217 }
1218
1219 /*
1220 * Register (at connection time) the memory used for control
1221 * channel messages.
1222 */
1223 static int qemu_rdma_reg_control(RDMAContext *rdma, int idx)
1224 {
1225 rdma->wr_data[idx].control_mr = ibv_reg_mr(rdma->pd,
1226 rdma->wr_data[idx].control, RDMA_CONTROL_MAX_BUFFER,
1227 IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
1228 if (rdma->wr_data[idx].control_mr) {
1229 rdma->total_registrations++;
1230 return 0;
1231 }
1232 error_report("qemu_rdma_reg_control failed");
1233 return -1;
1234 }
1235
1236 const char *print_wrid(int wrid)
1237 {
1238 if (wrid >= RDMA_WRID_RECV_CONTROL) {
1239 return wrid_desc[RDMA_WRID_RECV_CONTROL];
1240 }
1241 return wrid_desc[wrid];
1242 }
1243
1244 /*
1245 * RDMA requires memory registration (mlock/pinning), but this is not good for
1246 * overcommitment.
1247 *
1248 * In preparation for the future where LRU information or workload-specific
1249 * writable writable working set memory access behavior is available to QEMU
1250 * it would be nice to have in place the ability to UN-register/UN-pin
1251 * particular memory regions from the RDMA hardware when it is determine that
1252 * those regions of memory will likely not be accessed again in the near future.
1253 *
1254 * While we do not yet have such information right now, the following
1255 * compile-time option allows us to perform a non-optimized version of this
1256 * behavior.
1257 *
1258 * By uncommenting this option, you will cause *all* RDMA transfers to be
1259 * unregistered immediately after the transfer completes on both sides of the
1260 * connection. This has no effect in 'rdma-pin-all' mode, only regular mode.
1261 *
1262 * This will have a terrible impact on migration performance, so until future
1263 * workload information or LRU information is available, do not attempt to use
1264 * this feature except for basic testing.
1265 */
1266 //#define RDMA_UNREGISTRATION_EXAMPLE
1267
1268 /*
1269 * Perform a non-optimized memory unregistration after every transfer
1270 * for demonstration purposes, only if pin-all is not requested.
1271 *
1272 * Potential optimizations:
1273 * 1. Start a new thread to run this function continuously
1274 - for bit clearing
1275 - and for receipt of unregister messages
1276 * 2. Use an LRU.
1277 * 3. Use workload hints.
1278 */
1279 static int qemu_rdma_unregister_waiting(RDMAContext *rdma)
1280 {
1281 while (rdma->unregistrations[rdma->unregister_current]) {
1282 int ret;
1283 uint64_t wr_id = rdma->unregistrations[rdma->unregister_current];
1284 uint64_t chunk =
1285 (wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
1286 uint64_t index =
1287 (wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
1288 RDMALocalBlock *block =
1289 &(rdma->local_ram_blocks.block[index]);
1290 RDMARegister reg = { .current_index = index };
1291 RDMAControlHeader resp = { .type = RDMA_CONTROL_UNREGISTER_FINISHED,
1292 };
1293 RDMAControlHeader head = { .len = sizeof(RDMARegister),
1294 .type = RDMA_CONTROL_UNREGISTER_REQUEST,
1295 .repeat = 1,
1296 };
1297
1298 trace_qemu_rdma_unregister_waiting_proc(chunk,
1299 rdma->unregister_current);
1300
1301 rdma->unregistrations[rdma->unregister_current] = 0;
1302 rdma->unregister_current++;
1303
1304 if (rdma->unregister_current == RDMA_SIGNALED_SEND_MAX) {
1305 rdma->unregister_current = 0;
1306 }
1307
1308
1309 /*
1310 * Unregistration is speculative (because migration is single-threaded
1311 * and we cannot break the protocol's inifinband message ordering).
1312 * Thus, if the memory is currently being used for transmission,
1313 * then abort the attempt to unregister and try again
1314 * later the next time a completion is received for this memory.
1315 */
1316 clear_bit(chunk, block->unregister_bitmap);
1317
1318 if (test_bit(chunk, block->transit_bitmap)) {
1319 trace_qemu_rdma_unregister_waiting_inflight(chunk);
1320 continue;
1321 }
1322
1323 trace_qemu_rdma_unregister_waiting_send(chunk);
1324
1325 ret = ibv_dereg_mr(block->pmr[chunk]);
1326 block->pmr[chunk] = NULL;
1327 block->remote_keys[chunk] = 0;
1328
1329 if (ret != 0) {
1330 perror("unregistration chunk failed");
1331 return -ret;
1332 }
1333 rdma->total_registrations--;
1334
1335 reg.key.chunk = chunk;
1336 register_to_network(rdma, &reg);
1337 ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
1338 &resp, NULL, NULL);
1339 if (ret < 0) {
1340 return ret;
1341 }
1342
1343 trace_qemu_rdma_unregister_waiting_complete(chunk);
1344 }
1345
1346 return 0;
1347 }
1348
1349 static uint64_t qemu_rdma_make_wrid(uint64_t wr_id, uint64_t index,
1350 uint64_t chunk)
1351 {
1352 uint64_t result = wr_id & RDMA_WRID_TYPE_MASK;
1353
1354 result |= (index << RDMA_WRID_BLOCK_SHIFT);
1355 result |= (chunk << RDMA_WRID_CHUNK_SHIFT);
1356
1357 return result;
1358 }
1359
1360 /*
1361 * Set bit for unregistration in the next iteration.
1362 * We cannot transmit right here, but will unpin later.
1363 */
1364 static void qemu_rdma_signal_unregister(RDMAContext *rdma, uint64_t index,
1365 uint64_t chunk, uint64_t wr_id)
1366 {
1367 if (rdma->unregistrations[rdma->unregister_next] != 0) {
1368 error_report("rdma migration: queue is full");
1369 } else {
1370 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
1371
1372 if (!test_and_set_bit(chunk, block->unregister_bitmap)) {
1373 trace_qemu_rdma_signal_unregister_append(chunk,
1374 rdma->unregister_next);
1375
1376 rdma->unregistrations[rdma->unregister_next++] =
1377 qemu_rdma_make_wrid(wr_id, index, chunk);
1378
1379 if (rdma->unregister_next == RDMA_SIGNALED_SEND_MAX) {
1380 rdma->unregister_next = 0;
1381 }
1382 } else {
1383 trace_qemu_rdma_signal_unregister_already(chunk);
1384 }
1385 }
1386 }
1387
1388 /*
1389 * Consult the connection manager to see a work request
1390 * (of any kind) has completed.
1391 * Return the work request ID that completed.
1392 */
1393 static uint64_t qemu_rdma_poll(RDMAContext *rdma, uint64_t *wr_id_out,
1394 uint32_t *byte_len)
1395 {
1396 int ret;
1397 struct ibv_wc wc;
1398 uint64_t wr_id;
1399
1400 ret = ibv_poll_cq(rdma->cq, 1, &wc);
1401
1402 if (!ret) {
1403 *wr_id_out = RDMA_WRID_NONE;
1404 return 0;
1405 }
1406
1407 if (ret < 0) {
1408 error_report("ibv_poll_cq return %d", ret);
1409 return ret;
1410 }
1411
1412 wr_id = wc.wr_id & RDMA_WRID_TYPE_MASK;
1413
1414 if (wc.status != IBV_WC_SUCCESS) {
1415 fprintf(stderr, "ibv_poll_cq wc.status=%d %s!\n",
1416 wc.status, ibv_wc_status_str(wc.status));
1417 fprintf(stderr, "ibv_poll_cq wrid=%s!\n", wrid_desc[wr_id]);
1418
1419 return -1;
1420 }
1421
1422 if (rdma->control_ready_expected &&
1423 (wr_id >= RDMA_WRID_RECV_CONTROL)) {
1424 trace_qemu_rdma_poll_recv(wrid_desc[RDMA_WRID_RECV_CONTROL],
1425 wr_id - RDMA_WRID_RECV_CONTROL, wr_id, rdma->nb_sent);
1426 rdma->control_ready_expected = 0;
1427 }
1428
1429 if (wr_id == RDMA_WRID_RDMA_WRITE) {
1430 uint64_t chunk =
1431 (wc.wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
1432 uint64_t index =
1433 (wc.wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
1434 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
1435
1436 trace_qemu_rdma_poll_write(print_wrid(wr_id), wr_id, rdma->nb_sent,
1437 index, chunk, block->local_host_addr,
1438 (void *)(uintptr_t)block->remote_host_addr);
1439
1440 clear_bit(chunk, block->transit_bitmap);
1441
1442 if (rdma->nb_sent > 0) {
1443 rdma->nb_sent--;
1444 }
1445
1446 if (!rdma->pin_all) {
1447 /*
1448 * FYI: If one wanted to signal a specific chunk to be unregistered
1449 * using LRU or workload-specific information, this is the function
1450 * you would call to do so. That chunk would then get asynchronously
1451 * unregistered later.
1452 */
1453 #ifdef RDMA_UNREGISTRATION_EXAMPLE
1454 qemu_rdma_signal_unregister(rdma, index, chunk, wc.wr_id);
1455 #endif
1456 }
1457 } else {
1458 trace_qemu_rdma_poll_other(print_wrid(wr_id), wr_id, rdma->nb_sent);
1459 }
1460
1461 *wr_id_out = wc.wr_id;
1462 if (byte_len) {
1463 *byte_len = wc.byte_len;
1464 }
1465
1466 return 0;
1467 }
1468
1469 /*
1470 * Block until the next work request has completed.
1471 *
1472 * First poll to see if a work request has already completed,
1473 * otherwise block.
1474 *
1475 * If we encounter completed work requests for IDs other than
1476 * the one we're interested in, then that's generally an error.
1477 *
1478 * The only exception is actual RDMA Write completions. These
1479 * completions only need to be recorded, but do not actually
1480 * need further processing.
1481 */
1482 static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested,
1483 uint32_t *byte_len)
1484 {
1485 int num_cq_events = 0, ret = 0;
1486 struct ibv_cq *cq;
1487 void *cq_ctx;
1488 uint64_t wr_id = RDMA_WRID_NONE, wr_id_in;
1489
1490 if (ibv_req_notify_cq(rdma->cq, 0)) {
1491 return -1;
1492 }
1493 /* poll cq first */
1494 while (wr_id != wrid_requested) {
1495 ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len);
1496 if (ret < 0) {
1497 return ret;
1498 }
1499
1500 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
1501
1502 if (wr_id == RDMA_WRID_NONE) {
1503 break;
1504 }
1505 if (wr_id != wrid_requested) {
1506 trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested),
1507 wrid_requested, print_wrid(wr_id), wr_id);
1508 }
1509 }
1510
1511 if (wr_id == wrid_requested) {
1512 return 0;
1513 }
1514
1515 while (1) {
1516 /*
1517 * Coroutine doesn't start until migration_fd_process_incoming()
1518 * so don't yield unless we know we're running inside of a coroutine.
1519 */
1520 if (rdma->migration_started_on_destination) {
1521 yield_until_fd_readable(rdma->comp_channel->fd);
1522 }
1523
1524 ret = ibv_get_cq_event(rdma->comp_channel, &cq, &cq_ctx);
1525 if (ret) {
1526 perror("ibv_get_cq_event");
1527 goto err_block_for_wrid;
1528 }
1529
1530 num_cq_events++;
1531
1532 ret = -ibv_req_notify_cq(cq, 0);
1533 if (ret) {
1534 goto err_block_for_wrid;
1535 }
1536
1537 while (wr_id != wrid_requested) {
1538 ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len);
1539 if (ret < 0) {
1540 goto err_block_for_wrid;
1541 }
1542
1543 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
1544
1545 if (wr_id == RDMA_WRID_NONE) {
1546 break;
1547 }
1548 if (wr_id != wrid_requested) {
1549 trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested),
1550 wrid_requested, print_wrid(wr_id), wr_id);
1551 }
1552 }
1553
1554 if (wr_id == wrid_requested) {
1555 goto success_block_for_wrid;
1556 }
1557 }
1558
1559 success_block_for_wrid:
1560 if (num_cq_events) {
1561 ibv_ack_cq_events(cq, num_cq_events);
1562 }
1563 return 0;
1564
1565 err_block_for_wrid:
1566 if (num_cq_events) {
1567 ibv_ack_cq_events(cq, num_cq_events);
1568 }
1569
1570 rdma->error_state = ret;
1571 return ret;
1572 }
1573
1574 /*
1575 * Post a SEND message work request for the control channel
1576 * containing some data and block until the post completes.
1577 */
1578 static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf,
1579 RDMAControlHeader *head)
1580 {
1581 int ret = 0;
1582 RDMAWorkRequestData *wr = &rdma->wr_data[RDMA_WRID_CONTROL];
1583 struct ibv_send_wr *bad_wr;
1584 struct ibv_sge sge = {
1585 .addr = (uintptr_t)(wr->control),
1586 .length = head->len + sizeof(RDMAControlHeader),
1587 .lkey = wr->control_mr->lkey,
1588 };
1589 struct ibv_send_wr send_wr = {
1590 .wr_id = RDMA_WRID_SEND_CONTROL,
1591 .opcode = IBV_WR_SEND,
1592 .send_flags = IBV_SEND_SIGNALED,
1593 .sg_list = &sge,
1594 .num_sge = 1,
1595 };
1596
1597 trace_qemu_rdma_post_send_control(control_desc[head->type]);
1598
1599 /*
1600 * We don't actually need to do a memcpy() in here if we used
1601 * the "sge" properly, but since we're only sending control messages
1602 * (not RAM in a performance-critical path), then its OK for now.
1603 *
1604 * The copy makes the RDMAControlHeader simpler to manipulate
1605 * for the time being.
1606 */
1607 assert(head->len <= RDMA_CONTROL_MAX_BUFFER - sizeof(*head));
1608 memcpy(wr->control, head, sizeof(RDMAControlHeader));
1609 control_to_network((void *) wr->control);
1610
1611 if (buf) {
1612 memcpy(wr->control + sizeof(RDMAControlHeader), buf, head->len);
1613 }
1614
1615
1616 ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
1617
1618 if (ret > 0) {
1619 error_report("Failed to use post IB SEND for control");
1620 return -ret;
1621 }
1622
1623 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_SEND_CONTROL, NULL);
1624 if (ret < 0) {
1625 error_report("rdma migration: send polling control error");
1626 }
1627
1628 return ret;
1629 }
1630
1631 /*
1632 * Post a RECV work request in anticipation of some future receipt
1633 * of data on the control channel.
1634 */
1635 static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx)
1636 {
1637 struct ibv_recv_wr *bad_wr;
1638 struct ibv_sge sge = {
1639 .addr = (uintptr_t)(rdma->wr_data[idx].control),
1640 .length = RDMA_CONTROL_MAX_BUFFER,
1641 .lkey = rdma->wr_data[idx].control_mr->lkey,
1642 };
1643
1644 struct ibv_recv_wr recv_wr = {
1645 .wr_id = RDMA_WRID_RECV_CONTROL + idx,
1646 .sg_list = &sge,
1647 .num_sge = 1,
1648 };
1649
1650
1651 if (ibv_post_recv(rdma->qp, &recv_wr, &bad_wr)) {
1652 return -1;
1653 }
1654
1655 return 0;
1656 }
1657
1658 /*
1659 * Block and wait for a RECV control channel message to arrive.
1660 */
1661 static int qemu_rdma_exchange_get_response(RDMAContext *rdma,
1662 RDMAControlHeader *head, int expecting, int idx)
1663 {
1664 uint32_t byte_len;
1665 int ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RECV_CONTROL + idx,
1666 &byte_len);
1667
1668 if (ret < 0) {
1669 error_report("rdma migration: recv polling control error!");
1670 return ret;
1671 }
1672
1673 network_to_control((void *) rdma->wr_data[idx].control);
1674 memcpy(head, rdma->wr_data[idx].control, sizeof(RDMAControlHeader));
1675
1676 trace_qemu_rdma_exchange_get_response_start(control_desc[expecting]);
1677
1678 if (expecting == RDMA_CONTROL_NONE) {
1679 trace_qemu_rdma_exchange_get_response_none(control_desc[head->type],
1680 head->type);
1681 } else if (head->type != expecting || head->type == RDMA_CONTROL_ERROR) {
1682 error_report("Was expecting a %s (%d) control message"
1683 ", but got: %s (%d), length: %d",
1684 control_desc[expecting], expecting,
1685 control_desc[head->type], head->type, head->len);
1686 if (head->type == RDMA_CONTROL_ERROR) {
1687 rdma->received_error = true;
1688 }
1689 return -EIO;
1690 }
1691 if (head->len > RDMA_CONTROL_MAX_BUFFER - sizeof(*head)) {
1692 error_report("too long length: %d", head->len);
1693 return -EINVAL;
1694 }
1695 if (sizeof(*head) + head->len != byte_len) {
1696 error_report("Malformed length: %d byte_len %d", head->len, byte_len);
1697 return -EINVAL;
1698 }
1699
1700 return 0;
1701 }
1702
1703 /*
1704 * When a RECV work request has completed, the work request's
1705 * buffer is pointed at the header.
1706 *
1707 * This will advance the pointer to the data portion
1708 * of the control message of the work request's buffer that
1709 * was populated after the work request finished.
1710 */
1711 static void qemu_rdma_move_header(RDMAContext *rdma, int idx,
1712 RDMAControlHeader *head)
1713 {
1714 rdma->wr_data[idx].control_len = head->len;
1715 rdma->wr_data[idx].control_curr =
1716 rdma->wr_data[idx].control + sizeof(RDMAControlHeader);
1717 }
1718
1719 /*
1720 * This is an 'atomic' high-level operation to deliver a single, unified
1721 * control-channel message.
1722 *
1723 * Additionally, if the user is expecting some kind of reply to this message,
1724 * they can request a 'resp' response message be filled in by posting an
1725 * additional work request on behalf of the user and waiting for an additional
1726 * completion.
1727 *
1728 * The extra (optional) response is used during registration to us from having
1729 * to perform an *additional* exchange of message just to provide a response by
1730 * instead piggy-backing on the acknowledgement.
1731 */
1732 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
1733 uint8_t *data, RDMAControlHeader *resp,
1734 int *resp_idx,
1735 int (*callback)(RDMAContext *rdma))
1736 {
1737 int ret = 0;
1738
1739 /*
1740 * Wait until the dest is ready before attempting to deliver the message
1741 * by waiting for a READY message.
1742 */
1743 if (rdma->control_ready_expected) {
1744 RDMAControlHeader resp;
1745 ret = qemu_rdma_exchange_get_response(rdma,
1746 &resp, RDMA_CONTROL_READY, RDMA_WRID_READY);
1747 if (ret < 0) {
1748 return ret;
1749 }
1750 }
1751
1752 /*
1753 * If the user is expecting a response, post a WR in anticipation of it.
1754 */
1755 if (resp) {
1756 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_DATA);
1757 if (ret) {
1758 error_report("rdma migration: error posting"
1759 " extra control recv for anticipated result!");
1760 return ret;
1761 }
1762 }
1763
1764 /*
1765 * Post a WR to replace the one we just consumed for the READY message.
1766 */
1767 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
1768 if (ret) {
1769 error_report("rdma migration: error posting first control recv!");
1770 return ret;
1771 }
1772
1773 /*
1774 * Deliver the control message that was requested.
1775 */
1776 ret = qemu_rdma_post_send_control(rdma, data, head);
1777
1778 if (ret < 0) {
1779 error_report("Failed to send control buffer!");
1780 return ret;
1781 }
1782
1783 /*
1784 * If we're expecting a response, block and wait for it.
1785 */
1786 if (resp) {
1787 if (callback) {
1788 trace_qemu_rdma_exchange_send_issue_callback();
1789 ret = callback(rdma);
1790 if (ret < 0) {
1791 return ret;
1792 }
1793 }
1794
1795 trace_qemu_rdma_exchange_send_waiting(control_desc[resp->type]);
1796 ret = qemu_rdma_exchange_get_response(rdma, resp,
1797 resp->type, RDMA_WRID_DATA);
1798
1799 if (ret < 0) {
1800 return ret;
1801 }
1802
1803 qemu_rdma_move_header(rdma, RDMA_WRID_DATA, resp);
1804 if (resp_idx) {
1805 *resp_idx = RDMA_WRID_DATA;
1806 }
1807 trace_qemu_rdma_exchange_send_received(control_desc[resp->type]);
1808 }
1809
1810 rdma->control_ready_expected = 1;
1811
1812 return 0;
1813 }
1814
1815 /*
1816 * This is an 'atomic' high-level operation to receive a single, unified
1817 * control-channel message.
1818 */
1819 static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader *head,
1820 int expecting)
1821 {
1822 RDMAControlHeader ready = {
1823 .len = 0,
1824 .type = RDMA_CONTROL_READY,
1825 .repeat = 1,
1826 };
1827 int ret;
1828
1829 /*
1830 * Inform the source that we're ready to receive a message.
1831 */
1832 ret = qemu_rdma_post_send_control(rdma, NULL, &ready);
1833
1834 if (ret < 0) {
1835 error_report("Failed to send control buffer!");
1836 return ret;
1837 }
1838
1839 /*
1840 * Block and wait for the message.
1841 */
1842 ret = qemu_rdma_exchange_get_response(rdma, head,
1843 expecting, RDMA_WRID_READY);
1844
1845 if (ret < 0) {
1846 return ret;
1847 }
1848
1849 qemu_rdma_move_header(rdma, RDMA_WRID_READY, head);
1850
1851 /*
1852 * Post a new RECV work request to replace the one we just consumed.
1853 */
1854 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
1855 if (ret) {
1856 error_report("rdma migration: error posting second control recv!");
1857 return ret;
1858 }
1859
1860 return 0;
1861 }
1862
1863 /*
1864 * Write an actual chunk of memory using RDMA.
1865 *
1866 * If we're using dynamic registration on the dest-side, we have to
1867 * send a registration command first.
1868 */
1869 static int qemu_rdma_write_one(QEMUFile *f, RDMAContext *rdma,
1870 int current_index, uint64_t current_addr,
1871 uint64_t length)
1872 {
1873 struct ibv_sge sge;
1874 struct ibv_send_wr send_wr = { 0 };
1875 struct ibv_send_wr *bad_wr;
1876 int reg_result_idx, ret, count = 0;
1877 uint64_t chunk, chunks;
1878 uint8_t *chunk_start, *chunk_end;
1879 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[current_index]);
1880 RDMARegister reg;
1881 RDMARegisterResult *reg_result;
1882 RDMAControlHeader resp = { .type = RDMA_CONTROL_REGISTER_RESULT };
1883 RDMAControlHeader head = { .len = sizeof(RDMARegister),
1884 .type = RDMA_CONTROL_REGISTER_REQUEST,
1885 .repeat = 1,
1886 };
1887
1888 retry:
1889 sge.addr = (uintptr_t)(block->local_host_addr +
1890 (current_addr - block->offset));
1891 sge.length = length;
1892
1893 chunk = ram_chunk_index(block->local_host_addr,
1894 (uint8_t *)(uintptr_t)sge.addr);
1895 chunk_start = ram_chunk_start(block, chunk);
1896
1897 if (block->is_ram_block) {
1898 chunks = length / (1UL << RDMA_REG_CHUNK_SHIFT);
1899
1900 if (chunks && ((length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
1901 chunks--;
1902 }
1903 } else {
1904 chunks = block->length / (1UL << RDMA_REG_CHUNK_SHIFT);
1905
1906 if (chunks && ((block->length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
1907 chunks--;
1908 }
1909 }
1910
1911 trace_qemu_rdma_write_one_top(chunks + 1,
1912 (chunks + 1) *
1913 (1UL << RDMA_REG_CHUNK_SHIFT) / 1024 / 1024);
1914
1915 chunk_end = ram_chunk_end(block, chunk + chunks);
1916
1917 if (!rdma->pin_all) {
1918 #ifdef RDMA_UNREGISTRATION_EXAMPLE
1919 qemu_rdma_unregister_waiting(rdma);
1920 #endif
1921 }
1922
1923 while (test_bit(chunk, block->transit_bitmap)) {
1924 (void)count;
1925 trace_qemu_rdma_write_one_block(count++, current_index, chunk,
1926 sge.addr, length, rdma->nb_sent, block->nb_chunks);
1927
1928 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
1929
1930 if (ret < 0) {
1931 error_report("Failed to Wait for previous write to complete "
1932 "block %d chunk %" PRIu64
1933 " current %" PRIu64 " len %" PRIu64 " %d",
1934 current_index, chunk, sge.addr, length, rdma->nb_sent);
1935 return ret;
1936 }
1937 }
1938
1939 if (!rdma->pin_all || !block->is_ram_block) {
1940 if (!block->remote_keys[chunk]) {
1941 /*
1942 * This chunk has not yet been registered, so first check to see
1943 * if the entire chunk is zero. If so, tell the other size to
1944 * memset() + madvise() the entire chunk without RDMA.
1945 */
1946
1947 if (buffer_is_zero((void *)(uintptr_t)sge.addr, length)) {
1948 RDMACompress comp = {
1949 .offset = current_addr,
1950 .value = 0,
1951 .block_idx = current_index,
1952 .length = length,
1953 };
1954
1955 head.len = sizeof(comp);
1956 head.type = RDMA_CONTROL_COMPRESS;
1957
1958 trace_qemu_rdma_write_one_zero(chunk, sge.length,
1959 current_index, current_addr);
1960
1961 compress_to_network(rdma, &comp);
1962 ret = qemu_rdma_exchange_send(rdma, &head,
1963 (uint8_t *) &comp, NULL, NULL, NULL);
1964
1965 if (ret < 0) {
1966 return -EIO;
1967 }
1968
1969 acct_update_position(f, sge.length, true);
1970
1971 return 1;
1972 }
1973
1974 /*
1975 * Otherwise, tell other side to register.
1976 */
1977 reg.current_index = current_index;
1978 if (block->is_ram_block) {
1979 reg.key.current_addr = current_addr;
1980 } else {
1981 reg.key.chunk = chunk;
1982 }
1983 reg.chunks = chunks;
1984
1985 trace_qemu_rdma_write_one_sendreg(chunk, sge.length, current_index,
1986 current_addr);
1987
1988 register_to_network(rdma, &reg);
1989 ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
1990 &resp, &reg_result_idx, NULL);
1991 if (ret < 0) {
1992 return ret;
1993 }
1994
1995 /* try to overlap this single registration with the one we sent. */
1996 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
1997 &sge.lkey, NULL, chunk,
1998 chunk_start, chunk_end)) {
1999 error_report("cannot get lkey");
2000 return -EINVAL;
2001 }
2002
2003 reg_result = (RDMARegisterResult *)
2004 rdma->wr_data[reg_result_idx].control_curr;
2005
2006 network_to_result(reg_result);
2007
2008 trace_qemu_rdma_write_one_recvregres(block->remote_keys[chunk],
2009 reg_result->rkey, chunk);
2010
2011 block->remote_keys[chunk] = reg_result->rkey;
2012 block->remote_host_addr = reg_result->host_addr;
2013 } else {
2014 /* already registered before */
2015 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
2016 &sge.lkey, NULL, chunk,
2017 chunk_start, chunk_end)) {
2018 error_report("cannot get lkey!");
2019 return -EINVAL;
2020 }
2021 }
2022
2023 send_wr.wr.rdma.rkey = block->remote_keys[chunk];
2024 } else {
2025 send_wr.wr.rdma.rkey = block->remote_rkey;
2026
2027 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
2028 &sge.lkey, NULL, chunk,
2029 chunk_start, chunk_end)) {
2030 error_report("cannot get lkey!");
2031 return -EINVAL;
2032 }
2033 }
2034
2035 /*
2036 * Encode the ram block index and chunk within this wrid.
2037 * We will use this information at the time of completion
2038 * to figure out which bitmap to check against and then which
2039 * chunk in the bitmap to look for.
2040 */
2041 send_wr.wr_id = qemu_rdma_make_wrid(RDMA_WRID_RDMA_WRITE,
2042 current_index, chunk);
2043
2044 send_wr.opcode = IBV_WR_RDMA_WRITE;
2045 send_wr.send_flags = IBV_SEND_SIGNALED;
2046 send_wr.sg_list = &sge;
2047 send_wr.num_sge = 1;
2048 send_wr.wr.rdma.remote_addr = block->remote_host_addr +
2049 (current_addr - block->offset);
2050
2051 trace_qemu_rdma_write_one_post(chunk, sge.addr, send_wr.wr.rdma.remote_addr,
2052 sge.length);
2053
2054 /*
2055 * ibv_post_send() does not return negative error numbers,
2056 * per the specification they are positive - no idea why.
2057 */
2058 ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
2059
2060 if (ret == ENOMEM) {
2061 trace_qemu_rdma_write_one_queue_full();
2062 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
2063 if (ret < 0) {
2064 error_report("rdma migration: failed to make "
2065 "room in full send queue! %d", ret);
2066 return ret;
2067 }
2068
2069 goto retry;
2070
2071 } else if (ret > 0) {
2072 perror("rdma migration: post rdma write failed");
2073 return -ret;
2074 }
2075
2076 set_bit(chunk, block->transit_bitmap);
2077 acct_update_position(f, sge.length, false);
2078 rdma->total_writes++;
2079
2080 return 0;
2081 }
2082
2083 /*
2084 * Push out any unwritten RDMA operations.
2085 *
2086 * We support sending out multiple chunks at the same time.
2087 * Not all of them need to get signaled in the completion queue.
2088 */
2089 static int qemu_rdma_write_flush(QEMUFile *f, RDMAContext *rdma)
2090 {
2091 int ret;
2092
2093 if (!rdma->current_length) {
2094 return 0;
2095 }
2096
2097 ret = qemu_rdma_write_one(f, rdma,
2098 rdma->current_index, rdma->current_addr, rdma->current_length);
2099
2100 if (ret < 0) {
2101 return ret;
2102 }
2103
2104 if (ret == 0) {
2105 rdma->nb_sent++;
2106 trace_qemu_rdma_write_flush(rdma->nb_sent);
2107 }
2108
2109 rdma->current_length = 0;
2110 rdma->current_addr = 0;
2111
2112 return 0;
2113 }
2114
2115 static inline int qemu_rdma_buffer_mergable(RDMAContext *rdma,
2116 uint64_t offset, uint64_t len)
2117 {
2118 RDMALocalBlock *block;
2119 uint8_t *host_addr;
2120 uint8_t *chunk_end;
2121
2122 if (rdma->current_index < 0) {
2123 return 0;
2124 }
2125
2126 if (rdma->current_chunk < 0) {
2127 return 0;
2128 }
2129
2130 block = &(rdma->local_ram_blocks.block[rdma->current_index]);
2131 host_addr = block->local_host_addr + (offset - block->offset);
2132 chunk_end = ram_chunk_end(block, rdma->current_chunk);
2133
2134 if (rdma->current_length == 0) {
2135 return 0;
2136 }
2137
2138 /*
2139 * Only merge into chunk sequentially.
2140 */
2141 if (offset != (rdma->current_addr + rdma->current_length)) {
2142 return 0;
2143 }
2144
2145 if (offset < block->offset) {
2146 return 0;
2147 }
2148
2149 if ((offset + len) > (block->offset + block->length)) {
2150 return 0;
2151 }
2152
2153 if ((host_addr + len) > chunk_end) {
2154 return 0;
2155 }
2156
2157 return 1;
2158 }
2159
2160 /*
2161 * We're not actually writing here, but doing three things:
2162 *
2163 * 1. Identify the chunk the buffer belongs to.
2164 * 2. If the chunk is full or the buffer doesn't belong to the current
2165 * chunk, then start a new chunk and flush() the old chunk.
2166 * 3. To keep the hardware busy, we also group chunks into batches
2167 * and only require that a batch gets acknowledged in the completion
2168 * qeueue instead of each individual chunk.
2169 */
2170 static int qemu_rdma_write(QEMUFile *f, RDMAContext *rdma,
2171 uint64_t block_offset, uint64_t offset,
2172 uint64_t len)
2173 {
2174 uint64_t current_addr = block_offset + offset;
2175 uint64_t index = rdma->current_index;
2176 uint64_t chunk = rdma->current_chunk;
2177 int ret;
2178
2179 /* If we cannot merge it, we flush the current buffer first. */
2180 if (!qemu_rdma_buffer_mergable(rdma, current_addr, len)) {
2181 ret = qemu_rdma_write_flush(f, rdma);
2182 if (ret) {
2183 return ret;
2184 }
2185 rdma->current_length = 0;
2186 rdma->current_addr = current_addr;
2187
2188 ret = qemu_rdma_search_ram_block(rdma, block_offset,
2189 offset, len, &index, &chunk);
2190 if (ret) {
2191 error_report("ram block search failed");
2192 return ret;
2193 }
2194 rdma->current_index = index;
2195 rdma->current_chunk = chunk;
2196 }
2197
2198 /* merge it */
2199 rdma->current_length += len;
2200
2201 /* flush it if buffer is too large */
2202 if (rdma->current_length >= RDMA_MERGE_MAX) {
2203 return qemu_rdma_write_flush(f, rdma);
2204 }
2205
2206 return 0;
2207 }
2208
2209 static void qemu_rdma_cleanup(RDMAContext *rdma)
2210 {
2211 struct rdma_cm_event *cm_event;
2212 int ret, idx;
2213
2214 if (rdma->cm_id && rdma->connected) {
2215 if (rdma->error_state && !rdma->received_error) {
2216 RDMAControlHeader head = { .len = 0,
2217 .type = RDMA_CONTROL_ERROR,
2218 .repeat = 1,
2219 };
2220 error_report("Early error. Sending error.");
2221 qemu_rdma_post_send_control(rdma, NULL, &head);
2222 }
2223
2224 ret = rdma_disconnect(rdma->cm_id);
2225 if (!ret) {
2226 trace_qemu_rdma_cleanup_waiting_for_disconnect();
2227 ret = rdma_get_cm_event(rdma->channel, &cm_event);
2228 if (!ret) {
2229 rdma_ack_cm_event(cm_event);
2230 }
2231 }
2232 trace_qemu_rdma_cleanup_disconnect();
2233 rdma->connected = false;
2234 }
2235
2236 g_free(rdma->dest_blocks);
2237 rdma->dest_blocks = NULL;
2238
2239 for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2240 if (rdma->wr_data[idx].control_mr) {
2241 rdma->total_registrations--;
2242 ibv_dereg_mr(rdma->wr_data[idx].control_mr);
2243 }
2244 rdma->wr_data[idx].control_mr = NULL;
2245 }
2246
2247 if (rdma->local_ram_blocks.block) {
2248 while (rdma->local_ram_blocks.nb_blocks) {
2249 rdma_delete_block(rdma, &rdma->local_ram_blocks.block[0]);
2250 }
2251 }
2252
2253 if (rdma->qp) {
2254 rdma_destroy_qp(rdma->cm_id);
2255 rdma->qp = NULL;
2256 }
2257 if (rdma->cq) {
2258 ibv_destroy_cq(rdma->cq);
2259 rdma->cq = NULL;
2260 }
2261 if (rdma->comp_channel) {
2262 ibv_destroy_comp_channel(rdma->comp_channel);
2263 rdma->comp_channel = NULL;
2264 }
2265 if (rdma->pd) {
2266 ibv_dealloc_pd(rdma->pd);
2267 rdma->pd = NULL;
2268 }
2269 if (rdma->cm_id) {
2270 rdma_destroy_id(rdma->cm_id);
2271 rdma->cm_id = NULL;
2272 }
2273 if (rdma->listen_id) {
2274 rdma_destroy_id(rdma->listen_id);
2275 rdma->listen_id = NULL;
2276 }
2277 if (rdma->channel) {
2278 rdma_destroy_event_channel(rdma->channel);
2279 rdma->channel = NULL;
2280 }
2281 g_free(rdma->host);
2282 rdma->host = NULL;
2283 }
2284
2285
2286 static int qemu_rdma_source_init(RDMAContext *rdma, bool pin_all, Error **errp)
2287 {
2288 int ret, idx;
2289 Error *local_err = NULL, **temp = &local_err;
2290
2291 /*
2292 * Will be validated against destination's actual capabilities
2293 * after the connect() completes.
2294 */
2295 rdma->pin_all = pin_all;
2296
2297 ret = qemu_rdma_resolve_host(rdma, temp);
2298 if (ret) {
2299 goto err_rdma_source_init;
2300 }
2301
2302 ret = qemu_rdma_alloc_pd_cq(rdma);
2303 if (ret) {
2304 ERROR(temp, "rdma migration: error allocating pd and cq! Your mlock()"
2305 " limits may be too low. Please check $ ulimit -a # and "
2306 "search for 'ulimit -l' in the output");
2307 goto err_rdma_source_init;
2308 }
2309
2310 ret = qemu_rdma_alloc_qp(rdma);
2311 if (ret) {
2312 ERROR(temp, "rdma migration: error allocating qp!");
2313 goto err_rdma_source_init;
2314 }
2315
2316 ret = qemu_rdma_init_ram_blocks(rdma);
2317 if (ret) {
2318 ERROR(temp, "rdma migration: error initializing ram blocks!");
2319 goto err_rdma_source_init;
2320 }
2321
2322 /* Build the hash that maps from offset to RAMBlock */
2323 rdma->blockmap = g_hash_table_new(g_direct_hash, g_direct_equal);
2324 for (idx = 0; idx < rdma->local_ram_blocks.nb_blocks; idx++) {
2325 g_hash_table_insert(rdma->blockmap,
2326 (void *)(uintptr_t)rdma->local_ram_blocks.block[idx].offset,
2327 &rdma->local_ram_blocks.block[idx]);
2328 }
2329
2330 for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2331 ret = qemu_rdma_reg_control(rdma, idx);
2332 if (ret) {
2333 ERROR(temp, "rdma migration: error registering %d control!",
2334 idx);
2335 goto err_rdma_source_init;
2336 }
2337 }
2338
2339 return 0;
2340
2341 err_rdma_source_init:
2342 error_propagate(errp, local_err);
2343 qemu_rdma_cleanup(rdma);
2344 return -1;
2345 }
2346
2347 static int qemu_rdma_connect(RDMAContext *rdma, Error **errp)
2348 {
2349 RDMACapabilities cap = {
2350 .version = RDMA_CONTROL_VERSION_CURRENT,
2351 .flags = 0,
2352 };
2353 struct rdma_conn_param conn_param = { .initiator_depth = 2,
2354 .retry_count = 5,
2355 .private_data = &cap,
2356 .private_data_len = sizeof(cap),
2357 };
2358 struct rdma_cm_event *cm_event;
2359 int ret;
2360
2361 /*
2362 * Only negotiate the capability with destination if the user
2363 * on the source first requested the capability.
2364 */
2365 if (rdma->pin_all) {
2366 trace_qemu_rdma_connect_pin_all_requested();
2367 cap.flags |= RDMA_CAPABILITY_PIN_ALL;
2368 }
2369
2370 caps_to_network(&cap);
2371
2372 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
2373 if (ret) {
2374 ERROR(errp, "posting second control recv");
2375 goto err_rdma_source_connect;
2376 }
2377
2378 ret = rdma_connect(rdma->cm_id, &conn_param);
2379 if (ret) {
2380 perror("rdma_connect");
2381 ERROR(errp, "connecting to destination!");
2382 goto err_rdma_source_connect;
2383 }
2384
2385 ret = rdma_get_cm_event(rdma->channel, &cm_event);
2386 if (ret) {
2387 perror("rdma_get_cm_event after rdma_connect");
2388 ERROR(errp, "connecting to destination!");
2389 rdma_ack_cm_event(cm_event);
2390 goto err_rdma_source_connect;
2391 }
2392
2393 if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
2394 perror("rdma_get_cm_event != EVENT_ESTABLISHED after rdma_connect");
2395 ERROR(errp, "connecting to destination!");
2396 rdma_ack_cm_event(cm_event);
2397 goto err_rdma_source_connect;
2398 }
2399 rdma->connected = true;
2400
2401 memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
2402 network_to_caps(&cap);
2403
2404 /*
2405 * Verify that the *requested* capabilities are supported by the destination
2406 * and disable them otherwise.
2407 */
2408 if (rdma->pin_all && !(cap.flags & RDMA_CAPABILITY_PIN_ALL)) {
2409 ERROR(errp, "Server cannot support pinning all memory. "
2410 "Will register memory dynamically.");
2411 rdma->pin_all = false;
2412 }
2413
2414 trace_qemu_rdma_connect_pin_all_outcome(rdma->pin_all);
2415
2416 rdma_ack_cm_event(cm_event);
2417
2418 rdma->control_ready_expected = 1;
2419 rdma->nb_sent = 0;
2420 return 0;
2421
2422 err_rdma_source_connect:
2423 qemu_rdma_cleanup(rdma);
2424 return -1;
2425 }
2426
2427 static int qemu_rdma_dest_init(RDMAContext *rdma, Error **errp)
2428 {
2429 int ret, idx;
2430 struct rdma_cm_id *listen_id;
2431 char ip[40] = "unknown";
2432 struct rdma_addrinfo *res, *e;
2433 char port_str[16];
2434
2435 for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2436 rdma->wr_data[idx].control_len = 0;
2437 rdma->wr_data[idx].control_curr = NULL;
2438 }
2439
2440 if (!rdma->host || !rdma->host[0]) {
2441 ERROR(errp, "RDMA host is not set!");
2442 rdma->error_state = -EINVAL;
2443 return -1;
2444 }
2445 /* create CM channel */
2446 rdma->channel = rdma_create_event_channel();
2447 if (!rdma->channel) {
2448 ERROR(errp, "could not create rdma event channel");
2449 rdma->error_state = -EINVAL;
2450 return -1;
2451 }
2452
2453 /* create CM id */
2454 ret = rdma_create_id(rdma->channel, &listen_id, NULL, RDMA_PS_TCP);
2455 if (ret) {
2456 ERROR(errp, "could not create cm_id!");
2457 goto err_dest_init_create_listen_id;
2458 }
2459
2460 snprintf(port_str, 16, "%d", rdma->port);
2461 port_str[15] = '\0';
2462
2463 ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res);
2464 if (ret < 0) {
2465 ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host);
2466 goto err_dest_init_bind_addr;
2467 }
2468
2469 for (e = res; e != NULL; e = e->ai_next) {
2470 inet_ntop(e->ai_family,
2471 &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip);
2472 trace_qemu_rdma_dest_init_trying(rdma->host, ip);
2473 ret = rdma_bind_addr(listen_id, e->ai_dst_addr);
2474 if (ret) {
2475 continue;
2476 }
2477 if (e->ai_family == AF_INET6) {
2478 ret = qemu_rdma_broken_ipv6_kernel(listen_id->verbs, errp);
2479 if (ret) {
2480 continue;
2481 }
2482 }
2483 break;
2484 }
2485
2486 if (!e) {
2487 ERROR(errp, "Error: could not rdma_bind_addr!");
2488 goto err_dest_init_bind_addr;
2489 }
2490
2491 rdma->listen_id = listen_id;
2492 qemu_rdma_dump_gid("dest_init", listen_id);
2493 return 0;
2494
2495 err_dest_init_bind_addr:
2496 rdma_destroy_id(listen_id);
2497 err_dest_init_create_listen_id:
2498 rdma_destroy_event_channel(rdma->channel);
2499 rdma->channel = NULL;
2500 rdma->error_state = ret;
2501 return ret;
2502
2503 }
2504
2505 static void *qemu_rdma_data_init(const char *host_port, Error **errp)
2506 {
2507 RDMAContext *rdma = NULL;
2508 InetSocketAddress *addr;
2509
2510 if (host_port) {
2511 rdma = g_new0(RDMAContext, 1);
2512 rdma->current_index = -1;
2513 rdma->current_chunk = -1;
2514
2515 addr = g_new(InetSocketAddress, 1);
2516 if (!inet_parse(addr, host_port, NULL)) {
2517 rdma->port = atoi(addr->port);
2518 rdma->host = g_strdup(addr->host);
2519 } else {
2520 ERROR(errp, "bad RDMA migration address '%s'", host_port);
2521 g_free(rdma);
2522 rdma = NULL;
2523 }
2524
2525 qapi_free_InetSocketAddress(addr);
2526 }
2527
2528 return rdma;
2529 }
2530
2531 /*
2532 * QEMUFile interface to the control channel.
2533 * SEND messages for control only.
2534 * VM's ram is handled with regular RDMA messages.
2535 */
2536 static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
2537 const struct iovec *iov,
2538 size_t niov,
2539 int *fds,
2540 size_t nfds,
2541 Error **errp)
2542 {
2543 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2544 QEMUFile *f = rioc->file;
2545 RDMAContext *rdma = rioc->rdma;
2546 int ret;
2547 ssize_t done = 0;
2548 size_t i;
2549
2550 CHECK_ERROR_STATE();
2551
2552 /*
2553 * Push out any writes that
2554 * we're queued up for VM's ram.
2555 */
2556 ret = qemu_rdma_write_flush(f, rdma);
2557 if (ret < 0) {
2558 rdma->error_state = ret;
2559 return ret;
2560 }
2561
2562 for (i = 0; i < niov; i++) {
2563 size_t remaining = iov[i].iov_len;
2564 uint8_t * data = (void *)iov[i].iov_base;
2565 while (remaining) {
2566 RDMAControlHeader head;
2567
2568 rioc->len = MIN(remaining, RDMA_SEND_INCREMENT);
2569 remaining -= rioc->len;
2570
2571 head.len = rioc->len;
2572 head.type = RDMA_CONTROL_QEMU_FILE;
2573
2574 ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL);
2575
2576 if (ret < 0) {
2577 rdma->error_state = ret;
2578 return ret;
2579 }
2580
2581 data += rioc->len;
2582 done += rioc->len;
2583 }
2584 }
2585
2586 return done;
2587 }
2588
2589 static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf,
2590 size_t size, int idx)
2591 {
2592 size_t len = 0;
2593
2594 if (rdma->wr_data[idx].control_len) {
2595 trace_qemu_rdma_fill(rdma->wr_data[idx].control_len, size);
2596
2597 len = MIN(size, rdma->wr_data[idx].control_len);
2598 memcpy(buf, rdma->wr_data[idx].control_curr, len);
2599 rdma->wr_data[idx].control_curr += len;
2600 rdma->wr_data[idx].control_len -= len;
2601 }
2602
2603 return len;
2604 }
2605
2606 /*
2607 * QEMUFile interface to the control channel.
2608 * RDMA links don't use bytestreams, so we have to
2609 * return bytes to QEMUFile opportunistically.
2610 */
2611 static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
2612 const struct iovec *iov,
2613 size_t niov,
2614 int **fds,
2615 size_t *nfds,
2616 Error **errp)
2617 {
2618 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2619 RDMAContext *rdma = rioc->rdma;
2620 RDMAControlHeader head;
2621 int ret = 0;
2622 ssize_t i;
2623 size_t done = 0;
2624
2625 CHECK_ERROR_STATE();
2626
2627 for (i = 0; i < niov; i++) {
2628 size_t want = iov[i].iov_len;
2629 uint8_t *data = (void *)iov[i].iov_base;
2630
2631 /*
2632 * First, we hold on to the last SEND message we
2633 * were given and dish out the bytes until we run
2634 * out of bytes.
2635 */
2636 ret = qemu_rdma_fill(rioc->rdma, data, want, 0);
2637 done += ret;
2638 want -= ret;
2639 /* Got what we needed, so go to next iovec */
2640 if (want == 0) {
2641 continue;
2642 }
2643
2644 /* If we got any data so far, then don't wait
2645 * for more, just return what we have */
2646 if (done > 0) {
2647 break;
2648 }
2649
2650
2651 /* We've got nothing at all, so lets wait for
2652 * more to arrive
2653 */
2654 ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE);
2655
2656 if (ret < 0) {
2657 rdma->error_state = ret;
2658 return ret;
2659 }
2660
2661 /*
2662 * SEND was received with new bytes, now try again.
2663 */
2664 ret = qemu_rdma_fill(rioc->rdma, data, want, 0);
2665 done += ret;
2666 want -= ret;
2667
2668 /* Still didn't get enough, so lets just return */
2669 if (want) {
2670 if (done == 0) {
2671 return QIO_CHANNEL_ERR_BLOCK;
2672 } else {
2673 break;
2674 }
2675 }
2676 }
2677 rioc->len = done;
2678 return rioc->len;
2679 }
2680
2681 /*
2682 * Block until all the outstanding chunks have been delivered by the hardware.
2683 */
2684 static int qemu_rdma_drain_cq(QEMUFile *f, RDMAContext *rdma)
2685 {
2686 int ret;
2687
2688 if (qemu_rdma_write_flush(f, rdma) < 0) {
2689 return -EIO;
2690 }
2691
2692 while (rdma->nb_sent) {
2693 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
2694 if (ret < 0) {
2695 error_report("rdma migration: complete polling error!");
2696 return -EIO;
2697 }
2698 }
2699
2700 qemu_rdma_unregister_waiting(rdma);
2701
2702 return 0;
2703 }
2704
2705
2706 static int qio_channel_rdma_set_blocking(QIOChannel *ioc,
2707 bool blocking,
2708 Error **errp)
2709 {
2710 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2711 /* XXX we should make readv/writev actually honour this :-) */
2712 rioc->blocking = blocking;
2713 return 0;
2714 }
2715
2716
2717 typedef struct QIOChannelRDMASource QIOChannelRDMASource;
2718 struct QIOChannelRDMASource {
2719 GSource parent;
2720 QIOChannelRDMA *rioc;
2721 GIOCondition condition;
2722 };
2723
2724 static gboolean
2725 qio_channel_rdma_source_prepare(GSource *source,
2726 gint *timeout)
2727 {
2728 QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
2729 RDMAContext *rdma = rsource->rioc->rdma;
2730 GIOCondition cond = 0;
2731 *timeout = -1;
2732
2733 if (rdma->wr_data[0].control_len) {
2734 cond |= G_IO_IN;
2735 }
2736 cond |= G_IO_OUT;
2737
2738 return cond & rsource->condition;
2739 }
2740
2741 static gboolean
2742 qio_channel_rdma_source_check(GSource *source)
2743 {
2744 QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
2745 RDMAContext *rdma = rsource->rioc->rdma;
2746 GIOCondition cond = 0;
2747
2748 if (rdma->wr_data[0].control_len) {
2749 cond |= G_IO_IN;
2750 }
2751 cond |= G_IO_OUT;
2752
2753 return cond & rsource->condition;
2754 }
2755
2756 static gboolean
2757 qio_channel_rdma_source_dispatch(GSource *source,
2758 GSourceFunc callback,
2759 gpointer user_data)
2760 {
2761 QIOChannelFunc func = (QIOChannelFunc)callback;
2762 QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
2763 RDMAContext *rdma = rsource->rioc->rdma;
2764 GIOCondition cond = 0;
2765
2766 if (rdma->wr_data[0].control_len) {
2767 cond |= G_IO_IN;
2768 }
2769 cond |= G_IO_OUT;
2770
2771 return (*func)(QIO_CHANNEL(rsource->rioc),
2772 (cond & rsource->condition),
2773 user_data);
2774 }
2775
2776 static void
2777 qio_channel_rdma_source_finalize(GSource *source)
2778 {
2779 QIOChannelRDMASource *ssource = (QIOChannelRDMASource *)source;
2780
2781 object_unref(OBJECT(ssource->rioc));
2782 }
2783
2784 GSourceFuncs qio_channel_rdma_source_funcs = {
2785 qio_channel_rdma_source_prepare,
2786 qio_channel_rdma_source_check,
2787 qio_channel_rdma_source_dispatch,
2788 qio_channel_rdma_source_finalize
2789 };
2790
2791 static GSource *qio_channel_rdma_create_watch(QIOChannel *ioc,
2792 GIOCondition condition)
2793 {
2794 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2795 QIOChannelRDMASource *ssource;
2796 GSource *source;
2797
2798 source = g_source_new(&qio_channel_rdma_source_funcs,
2799 sizeof(QIOChannelRDMASource));
2800 ssource = (QIOChannelRDMASource *)source;
2801
2802 ssource->rioc = rioc;
2803 object_ref(OBJECT(rioc));
2804
2805 ssource->condition = condition;
2806
2807 return source;
2808 }
2809
2810
2811 static int qio_channel_rdma_close(QIOChannel *ioc,
2812 Error **errp)
2813 {
2814 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2815 trace_qemu_rdma_close();
2816 if (rioc->rdma) {
2817 if (!rioc->rdma->error_state) {
2818 rioc->rdma->error_state = qemu_file_get_error(rioc->file);
2819 }
2820 qemu_rdma_cleanup(rioc->rdma);
2821 g_free(rioc->rdma);
2822 rioc->rdma = NULL;
2823 }
2824 return 0;
2825 }
2826
2827 /*
2828 * Parameters:
2829 * @offset == 0 :
2830 * This means that 'block_offset' is a full virtual address that does not
2831 * belong to a RAMBlock of the virtual machine and instead
2832 * represents a private malloc'd memory area that the caller wishes to
2833 * transfer.
2834 *
2835 * @offset != 0 :
2836 * Offset is an offset to be added to block_offset and used
2837 * to also lookup the corresponding RAMBlock.
2838 *
2839 * @size > 0 :
2840 * Initiate an transfer this size.
2841 *
2842 * @size == 0 :
2843 * A 'hint' or 'advice' that means that we wish to speculatively
2844 * and asynchronously unregister this memory. In this case, there is no
2845 * guarantee that the unregister will actually happen, for example,
2846 * if the memory is being actively transmitted. Additionally, the memory
2847 * may be re-registered at any future time if a write within the same
2848 * chunk was requested again, even if you attempted to unregister it
2849 * here.
2850 *
2851 * @size < 0 : TODO, not yet supported
2852 * Unregister the memory NOW. This means that the caller does not
2853 * expect there to be any future RDMA transfers and we just want to clean
2854 * things up. This is used in case the upper layer owns the memory and
2855 * cannot wait for qemu_fclose() to occur.
2856 *
2857 * @bytes_sent : User-specificed pointer to indicate how many bytes were
2858 * sent. Usually, this will not be more than a few bytes of
2859 * the protocol because most transfers are sent asynchronously.
2860 */
2861 static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque,
2862 ram_addr_t block_offset, ram_addr_t offset,
2863 size_t size, uint64_t *bytes_sent)
2864 {
2865 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
2866 RDMAContext *rdma = rioc->rdma;
2867 int ret;
2868
2869 CHECK_ERROR_STATE();
2870
2871 qemu_fflush(f);
2872
2873 if (size > 0) {
2874 /*
2875 * Add this page to the current 'chunk'. If the chunk
2876 * is full, or the page doen't belong to the current chunk,
2877 * an actual RDMA write will occur and a new chunk will be formed.
2878 */
2879 ret = qemu_rdma_write(f, rdma, block_offset, offset, size);
2880 if (ret < 0) {
2881 error_report("rdma migration: write error! %d", ret);
2882 goto err;
2883 }
2884
2885 /*
2886 * We always return 1 bytes because the RDMA
2887 * protocol is completely asynchronous. We do not yet know
2888 * whether an identified chunk is zero or not because we're
2889 * waiting for other pages to potentially be merged with
2890 * the current chunk. So, we have to call qemu_update_position()
2891 * later on when the actual write occurs.
2892 */
2893 if (bytes_sent) {
2894 *bytes_sent = 1;
2895 }
2896 } else {
2897 uint64_t index, chunk;
2898
2899 /* TODO: Change QEMUFileOps prototype to be signed: size_t => long
2900 if (size < 0) {
2901 ret = qemu_rdma_drain_cq(f, rdma);
2902 if (ret < 0) {
2903 fprintf(stderr, "rdma: failed to synchronously drain"
2904 " completion queue before unregistration.\n");
2905 goto err;
2906 }
2907 }
2908 */
2909
2910 ret = qemu_rdma_search_ram_block(rdma, block_offset,
2911 offset, size, &index, &chunk);
2912
2913 if (ret) {
2914 error_report("ram block search failed");
2915 goto err;
2916 }
2917
2918 qemu_rdma_signal_unregister(rdma, index, chunk, 0);
2919
2920 /*
2921 * TODO: Synchronous, guaranteed unregistration (should not occur during
2922 * fast-path). Otherwise, unregisters will process on the next call to
2923 * qemu_rdma_drain_cq()
2924 if (size < 0) {
2925 qemu_rdma_unregister_waiting(rdma);
2926 }
2927 */
2928 }
2929
2930 /*
2931 * Drain the Completion Queue if possible, but do not block,
2932 * just poll.
2933 *
2934 * If nothing to poll, the end of the iteration will do this
2935 * again to make sure we don't overflow the request queue.
2936 */
2937 while (1) {
2938 uint64_t wr_id, wr_id_in;
2939 int ret = qemu_rdma_poll(rdma, &wr_id_in, NULL);
2940 if (ret < 0) {
2941 error_report("rdma migration: polling error! %d", ret);
2942 goto err;
2943 }
2944
2945 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
2946
2947 if (wr_id == RDMA_WRID_NONE) {
2948 break;
2949 }
2950 }
2951
2952 return RAM_SAVE_CONTROL_DELAYED;
2953 err:
2954 rdma->error_state = ret;
2955 return ret;
2956 }
2957
2958 static int qemu_rdma_accept(RDMAContext *rdma)
2959 {
2960 RDMACapabilities cap;
2961 struct rdma_conn_param conn_param = {
2962 .responder_resources = 2,
2963 .private_data = &cap,
2964 .private_data_len = sizeof(cap),
2965 };
2966 struct rdma_cm_event *cm_event;
2967 struct ibv_context *verbs;
2968 int ret = -EINVAL;
2969 int idx;
2970
2971 ret = rdma_get_cm_event(rdma->channel, &cm_event);
2972 if (ret) {
2973 goto err_rdma_dest_wait;
2974 }
2975
2976 if (cm_event->event != RDMA_CM_EVENT_CONNECT_REQUEST) {
2977 rdma_ack_cm_event(cm_event);
2978 goto err_rdma_dest_wait;
2979 }
2980
2981 memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
2982
2983 network_to_caps(&cap);
2984
2985 if (cap.version < 1 || cap.version > RDMA_CONTROL_VERSION_CURRENT) {
2986 error_report("Unknown source RDMA version: %d, bailing...",
2987 cap.version);
2988 rdma_ack_cm_event(cm_event);
2989 goto err_rdma_dest_wait;
2990 }
2991
2992 /*
2993 * Respond with only the capabilities this version of QEMU knows about.
2994 */
2995 cap.flags &= known_capabilities;
2996
2997 /*
2998 * Enable the ones that we do know about.
2999 * Add other checks here as new ones are introduced.
3000 */
3001 if (cap.flags & RDMA_CAPABILITY_PIN_ALL) {
3002 rdma->pin_all = true;
3003 }
3004
3005 rdma->cm_id = cm_event->id;
3006 verbs = cm_event->id->verbs;
3007
3008 rdma_ack_cm_event(cm_event);
3009
3010 trace_qemu_rdma_accept_pin_state(rdma->pin_all);
3011
3012 caps_to_network(&cap);
3013
3014 trace_qemu_rdma_accept_pin_verbsc(verbs);
3015
3016 if (!rdma->verbs) {
3017 rdma->verbs = verbs;
3018 } else if (rdma->verbs != verbs) {
3019 error_report("ibv context not matching %p, %p!", rdma->verbs,
3020 verbs);
3021 goto err_rdma_dest_wait;
3022 }
3023
3024 qemu_rdma_dump_id("dest_init", verbs);
3025
3026 ret = qemu_rdma_alloc_pd_cq(rdma);
3027 if (ret) {
3028 error_report("rdma migration: error allocating pd and cq!");
3029 goto err_rdma_dest_wait;
3030 }
3031
3032 ret = qemu_rdma_alloc_qp(rdma);
3033 if (ret) {
3034 error_report("rdma migration: error allocating qp!");
3035 goto err_rdma_dest_wait;
3036 }
3037
3038 ret = qemu_rdma_init_ram_blocks(rdma);
3039 if (ret) {
3040 error_report("rdma migration: error initializing ram blocks!");
3041 goto err_rdma_dest_wait;
3042 }
3043
3044 for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
3045 ret = qemu_rdma_reg_control(rdma, idx);
3046 if (ret) {
3047 error_report("rdma: error registering %d control", idx);
3048 goto err_rdma_dest_wait;
3049 }
3050 }
3051
3052 qemu_set_fd_handler(rdma->channel->fd, NULL, NULL, NULL);
3053
3054 ret = rdma_accept(rdma->cm_id, &conn_param);
3055 if (ret) {
3056 error_report("rdma_accept returns %d", ret);
3057 goto err_rdma_dest_wait;
3058 }
3059
3060 ret = rdma_get_cm_event(rdma->channel, &cm_event);
3061 if (ret) {
3062 error_report("rdma_accept get_cm_event failed %d", ret);
3063 goto err_rdma_dest_wait;
3064 }
3065
3066 if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
3067 error_report("rdma_accept not event established");
3068 rdma_ack_cm_event(cm_event);
3069 goto err_rdma_dest_wait;
3070 }
3071
3072 rdma_ack_cm_event(cm_event);
3073 rdma->connected = true;
3074
3075 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
3076 if (ret) {
3077 error_report("rdma migration: error posting second control recv");
3078 goto err_rdma_dest_wait;
3079 }
3080
3081 qemu_rdma_dump_gid("dest_connect", rdma->cm_id);
3082
3083 return 0;
3084
3085 err_rdma_dest_wait:
3086 rdma->error_state = ret;
3087 qemu_rdma_cleanup(rdma);
3088 return ret;
3089 }
3090
3091 static int dest_ram_sort_func(const void *a, const void *b)
3092 {
3093 unsigned int a_index = ((const RDMALocalBlock *)a)->src_index;
3094 unsigned int b_index = ((const RDMALocalBlock *)b)->src_index;
3095
3096 return (a_index < b_index) ? -1 : (a_index != b_index);
3097 }
3098
3099 /*
3100 * During each iteration of the migration, we listen for instructions
3101 * by the source VM to perform dynamic page registrations before they
3102 * can perform RDMA operations.
3103 *
3104 * We respond with the 'rkey'.
3105 *
3106 * Keep doing this until the source tells us to stop.
3107 */
3108 static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque)
3109 {
3110 RDMAControlHeader reg_resp = { .len = sizeof(RDMARegisterResult),
3111 .type = RDMA_CONTROL_REGISTER_RESULT,
3112 .repeat = 0,
3113 };
3114 RDMAControlHeader unreg_resp = { .len = 0,
3115 .type = RDMA_CONTROL_UNREGISTER_FINISHED,
3116 .repeat = 0,
3117 };
3118 RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT,
3119 .repeat = 1 };
3120 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
3121 RDMAContext *rdma = rioc->rdma;
3122 RDMALocalBlocks *local = &rdma->local_ram_blocks;
3123 RDMAControlHeader head;
3124 RDMARegister *reg, *registers;
3125 RDMACompress *comp;
3126 RDMARegisterResult *reg_result;
3127 static RDMARegisterResult results[RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE];
3128 RDMALocalBlock *block;
3129 void *host_addr;
3130 int ret = 0;
3131 int idx = 0;
3132 int count = 0;
3133 int i = 0;
3134
3135 CHECK_ERROR_STATE();
3136
3137 do {
3138 trace_qemu_rdma_registration_handle_wait();
3139
3140 ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_NONE);
3141
3142 if (ret < 0) {
3143 break;
3144 }
3145
3146 if (head.repeat > RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE) {
3147 error_report("rdma: Too many requests in this message (%d)."
3148 "Bailing.", head.repeat);
3149 ret = -EIO;
3150 break;
3151 }
3152
3153 switch (head.type) {
3154 case RDMA_CONTROL_COMPRESS:
3155 comp = (RDMACompress *) rdma->wr_data[idx].control_curr;
3156 network_to_compress(comp);
3157
3158 trace_qemu_rdma_registration_handle_compress(comp->length,
3159 comp->block_idx,
3160 comp->offset);
3161 if (comp->block_idx >= rdma->local_ram_blocks.nb_blocks) {
3162 error_report("rdma: 'compress' bad block index %u (vs %d)",
3163 (unsigned int)comp->block_idx,
3164 rdma->local_ram_blocks.nb_blocks);
3165 ret = -EIO;
3166 goto out;
3167 }
3168 block = &(rdma->local_ram_blocks.block[comp->block_idx]);
3169
3170 host_addr = block->local_host_addr +
3171 (comp->offset - block->offset);
3172
3173 ram_handle_compressed(host_addr, comp->value, comp->length);
3174 break;
3175
3176 case RDMA_CONTROL_REGISTER_FINISHED:
3177 trace_qemu_rdma_registration_handle_finished();
3178 goto out;
3179
3180 case RDMA_CONTROL_RAM_BLOCKS_REQUEST:
3181 trace_qemu_rdma_registration_handle_ram_blocks();
3182
3183 /* Sort our local RAM Block list so it's the same as the source,
3184 * we can do this since we've filled in a src_index in the list
3185 * as we received the RAMBlock list earlier.
3186 */
3187 qsort(rdma->local_ram_blocks.block,
3188 rdma->local_ram_blocks.nb_blocks,
3189 sizeof(RDMALocalBlock), dest_ram_sort_func);
3190 if (rdma->pin_all) {
3191 ret = qemu_rdma_reg_whole_ram_blocks(rdma);
3192 if (ret) {
3193 error_report("rdma migration: error dest "
3194 "registering ram blocks");
3195 goto out;
3196 }
3197 }
3198
3199 /*
3200 * Dest uses this to prepare to transmit the RAMBlock descriptions
3201 * to the source VM after connection setup.
3202 * Both sides use the "remote" structure to communicate and update
3203 * their "local" descriptions with what was sent.
3204 */
3205 for (i = 0; i < local->nb_blocks; i++) {
3206 rdma->dest_blocks[i].remote_host_addr =
3207 (uintptr_t)(local->block[i].local_host_addr);
3208
3209 if (rdma->pin_all) {
3210 rdma->dest_blocks[i].remote_rkey = local->block[i].mr->rkey;
3211 }
3212
3213 rdma->dest_blocks[i].offset = local->block[i].offset;
3214 rdma->dest_blocks[i].length = local->block[i].length;
3215
3216 dest_block_to_network(&rdma->dest_blocks[i]);
3217 trace_qemu_rdma_registration_handle_ram_blocks_loop(
3218 local->block[i].block_name,
3219 local->block[i].offset,
3220 local->block[i].length,
3221 local->block[i].local_host_addr,
3222 local->block[i].src_index);
3223 }
3224
3225 blocks.len = rdma->local_ram_blocks.nb_blocks
3226 * sizeof(RDMADestBlock);
3227
3228
3229 ret = qemu_rdma_post_send_control(rdma,
3230 (uint8_t *) rdma->dest_blocks, &blocks);
3231
3232 if (ret < 0) {
3233 error_report("rdma migration: error sending remote info");
3234 goto out;
3235 }
3236
3237 break;
3238 case RDMA_CONTROL_REGISTER_REQUEST:
3239 trace_qemu_rdma_registration_handle_register(head.repeat);
3240
3241 reg_resp.repeat = head.repeat;
3242 registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
3243
3244 for (count = 0; count < head.repeat; count++) {
3245 uint64_t chunk;
3246 uint8_t *chunk_start, *chunk_end;
3247
3248 reg = &registers[count];
3249 network_to_register(reg);
3250
3251 reg_result = &results[count];
3252
3253 trace_qemu_rdma_registration_handle_register_loop(count,
3254 reg->current_index, reg->key.current_addr, reg->chunks);
3255
3256 if (reg->current_index >= rdma->local_ram_blocks.nb_blocks) {
3257 error_report("rdma: 'register' bad block index %u (vs %d)",
3258 (unsigned int)reg->current_index,
3259 rdma->local_ram_blocks.nb_blocks);
3260 ret = -ENOENT;
3261 goto out;
3262 }
3263 block = &(rdma->local_ram_blocks.block[reg->current_index]);
3264 if (block->is_ram_block) {
3265 if (block->offset > reg->key.current_addr) {
3266 error_report("rdma: bad register address for block %s"
3267 " offset: %" PRIx64 " current_addr: %" PRIx64,
3268 block->block_name, block->offset,
3269 reg->key.current_addr);
3270 ret = -ERANGE;
3271 goto out;
3272 }
3273 host_addr = (block->local_host_addr +
3274 (reg->key.current_addr - block->offset));
3275 chunk = ram_chunk_index(block->local_host_addr,
3276 (uint8_t *) host_addr);
3277 } else {
3278 chunk = reg->key.chunk;
3279 host_addr = block->local_host_addr +
3280 (reg->key.chunk * (1UL << RDMA_REG_CHUNK_SHIFT));
3281 /* Check for particularly bad chunk value */
3282 if (host_addr < (void *)block->local_host_addr) {
3283 error_report("rdma: bad chunk for block %s"
3284 " chunk: %" PRIx64,
3285 block->block_name, reg->key.chunk);
3286 ret = -ERANGE;
3287 goto out;
3288 }
3289 }
3290 chunk_start = ram_chunk_start(block, chunk);
3291 chunk_end = ram_chunk_end(block, chunk + reg->chunks);
3292 if (qemu_rdma_register_and_get_keys(rdma, block,
3293 (uintptr_t)host_addr, NULL, &reg_result->rkey,
3294 chunk, chunk_start, chunk_end)) {
3295 error_report("cannot get rkey");
3296 ret = -EINVAL;
3297 goto out;
3298 }
3299
3300 reg_result->host_addr = (uintptr_t)block->local_host_addr;
3301
3302 trace_qemu_rdma_registration_handle_register_rkey(
3303 reg_result->rkey);
3304
3305 result_to_network(reg_result);
3306 }
3307
3308 ret = qemu_rdma_post_send_control(rdma,
3309 (uint8_t *) results, &reg_resp);
3310
3311 if (ret < 0) {
3312 error_report("Failed to send control buffer");
3313 goto out;
3314 }
3315 break;
3316 case RDMA_CONTROL_UNREGISTER_REQUEST:
3317 trace_qemu_rdma_registration_handle_unregister(head.repeat);
3318 unreg_resp.repeat = head.repeat;
3319 registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
3320
3321 for (count = 0; count < head.repeat; count++) {
3322 reg = &registers[count];
3323 network_to_register(reg);
3324
3325 trace_qemu_rdma_registration_handle_unregister_loop(count,
3326 reg->current_index, reg->key.chunk);
3327
3328 block = &(rdma->local_ram_blocks.block[reg->current_index]);
3329
3330 ret = ibv_dereg_mr(block->pmr[reg->key.chunk]);
3331 block->pmr[reg->key.chunk] = NULL;
3332
3333 if (ret != 0) {
3334 perror("rdma unregistration chunk failed");
3335 ret = -ret;
3336 goto out;
3337 }
3338
3339 rdma->total_registrations--;
3340
3341 trace_qemu_rdma_registration_handle_unregister_success(
3342 reg->key.chunk);
3343 }
3344
3345 ret = qemu_rdma_post_send_control(rdma, NULL, &unreg_resp);
3346
3347 if (ret < 0) {
3348 error_report("Failed to send control buffer");
3349 goto out;
3350 }
3351 break;
3352 case RDMA_CONTROL_REGISTER_RESULT:
3353 error_report("Invalid RESULT message at dest.");
3354 ret = -EIO;
3355 goto out;
3356 default:
3357 error_report("Unknown control message %s", control_desc[head.type]);
3358 ret = -EIO;
3359 goto out;
3360 }
3361 } while (1);
3362 out:
3363 if (ret < 0) {
3364 rdma->error_state = ret;
3365 }
3366 return ret;
3367 }
3368
3369 /* Destination:
3370 * Called via a ram_control_load_hook during the initial RAM load section which
3371 * lists the RAMBlocks by name. This lets us know the order of the RAMBlocks
3372 * on the source.
3373 * We've already built our local RAMBlock list, but not yet sent the list to
3374 * the source.
3375 */
3376 static int
3377 rdma_block_notification_handle(QIOChannelRDMA *rioc, const char *name)
3378 {
3379 RDMAContext *rdma = rioc->rdma;
3380 int curr;
3381 int found = -1;
3382
3383 /* Find the matching RAMBlock in our local list */
3384 for (curr = 0; curr < rdma->local_ram_blocks.nb_blocks; curr++) {
3385 if (!strcmp(rdma->local_ram_blocks.block[curr].block_name, name)) {
3386 found = curr;
3387 break;
3388 }
3389 }
3390
3391 if (found == -1) {
3392 error_report("RAMBlock '%s' not found on destination", name);
3393 return -ENOENT;
3394 }
3395
3396 rdma->local_ram_blocks.block[curr].src_index = rdma->next_src_index;
3397 trace_rdma_block_notification_handle(name, rdma->next_src_index);
3398 rdma->next_src_index++;
3399
3400 return 0;
3401 }
3402
3403 static int rdma_load_hook(QEMUFile *f, void *opaque, uint64_t flags, void *data)
3404 {
3405 switch (flags) {
3406 case RAM_CONTROL_BLOCK_REG:
3407 return rdma_block_notification_handle(opaque, data);
3408
3409 case RAM_CONTROL_HOOK:
3410 return qemu_rdma_registration_handle(f, opaque);
3411
3412 default:
3413 /* Shouldn't be called with any other values */
3414 abort();
3415 }
3416 }
3417
3418 static int qemu_rdma_registration_start(QEMUFile *f, void *opaque,
3419 uint64_t flags, void *data)
3420 {
3421 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
3422 RDMAContext *rdma = rioc->rdma;
3423
3424 CHECK_ERROR_STATE();
3425
3426 trace_qemu_rdma_registration_start(flags);
3427 qemu_put_be64(f, RAM_SAVE_FLAG_HOOK);
3428 qemu_fflush(f);
3429
3430 return 0;
3431 }
3432
3433 /*
3434 * Inform dest that dynamic registrations are done for now.
3435 * First, flush writes, if any.
3436 */
3437 static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
3438 uint64_t flags, void *data)
3439 {
3440 Error *local_err = NULL, **errp = &local_err;
3441 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
3442 RDMAContext *rdma = rioc->rdma;
3443 RDMAControlHeader head = { .len = 0, .repeat = 1 };
3444 int ret = 0;
3445
3446 CHECK_ERROR_STATE();
3447
3448 qemu_fflush(f);
3449 ret = qemu_rdma_drain_cq(f, rdma);
3450
3451 if (ret < 0) {
3452 goto err;
3453 }
3454
3455 if (flags == RAM_CONTROL_SETUP) {
3456 RDMAControlHeader resp = {.type = RDMA_CONTROL_RAM_BLOCKS_RESULT };
3457 RDMALocalBlocks *local = &rdma->local_ram_blocks;
3458 int reg_result_idx, i, nb_dest_blocks;
3459
3460 head.type = RDMA_CONTROL_RAM_BLOCKS_REQUEST;
3461 trace_qemu_rdma_registration_stop_ram();
3462
3463 /*
3464 * Make sure that we parallelize the pinning on both sides.
3465 * For very large guests, doing this serially takes a really
3466 * long time, so we have to 'interleave' the pinning locally
3467 * with the control messages by performing the pinning on this
3468 * side before we receive the control response from the other
3469 * side that the pinning has completed.
3470 */
3471 ret = qemu_rdma_exchange_send(rdma, &head, NULL, &resp,
3472 &reg_result_idx, rdma->pin_all ?
3473 qemu_rdma_reg_whole_ram_blocks : NULL);
3474 if (ret < 0) {
3475 ERROR(errp, "receiving remote info!");
3476 return ret;
3477 }
3478
3479 nb_dest_blocks = resp.len / sizeof(RDMADestBlock);
3480
3481 /*
3482 * The protocol uses two different sets of rkeys (mutually exclusive):
3483 * 1. One key to represent the virtual address of the entire ram block.
3484 * (dynamic chunk registration disabled - pin everything with one rkey.)
3485 * 2. One to represent individual chunks within a ram block.
3486 * (dynamic chunk registration enabled - pin individual chunks.)
3487 *
3488 * Once the capability is successfully negotiated, the destination transmits
3489 * the keys to use (or sends them later) including the virtual addresses
3490 * and then propagates the remote ram block descriptions to his local copy.
3491 */
3492
3493 if (local->nb_blocks != nb_dest_blocks) {
3494 ERROR(errp, "ram blocks mismatch (Number of blocks %d vs %d) "
3495 "Your QEMU command line parameters are probably "
3496 "not identical on both the source and destination.",
3497 local->nb_blocks, nb_dest_blocks);
3498 rdma->error_state = -EINVAL;
3499 return -EINVAL;
3500 }
3501
3502 qemu_rdma_move_header(rdma, reg_result_idx, &resp);
3503 memcpy(rdma->dest_blocks,
3504 rdma->wr_data[reg_result_idx].control_curr, resp.len);
3505 for (i = 0; i < nb_dest_blocks; i++) {
3506 network_to_dest_block(&rdma->dest_blocks[i]);
3507
3508 /* We require that the blocks are in the same order */
3509 if (rdma->dest_blocks[i].length != local->block[i].length) {
3510 ERROR(errp, "Block %s/%d has a different length %" PRIu64
3511 "vs %" PRIu64, local->block[i].block_name, i,
3512 local->block[i].length,
3513 rdma->dest_blocks[i].length);
3514 rdma->error_state = -EINVAL;
3515 return -EINVAL;
3516 }
3517 local->block[i].remote_host_addr =
3518 rdma->dest_blocks[i].remote_host_addr;
3519 local->block[i].remote_rkey = rdma->dest_blocks[i].remote_rkey;
3520 }
3521 }
3522
3523 trace_qemu_rdma_registration_stop(flags);
3524
3525 head.type = RDMA_CONTROL_REGISTER_FINISHED;
3526 ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL);
3527
3528 if (ret < 0) {
3529 goto err;
3530 }
3531
3532 return 0;
3533 err:
3534 rdma->error_state = ret;
3535 return ret;
3536 }
3537
3538 static const QEMUFileHooks rdma_read_hooks = {
3539 .hook_ram_load = rdma_load_hook,
3540 };
3541
3542 static const QEMUFileHooks rdma_write_hooks = {
3543 .before_ram_iterate = qemu_rdma_registration_start,
3544 .after_ram_iterate = qemu_rdma_registration_stop,
3545 .save_page = qemu_rdma_save_page,
3546 };
3547
3548
3549 static void qio_channel_rdma_finalize(Object *obj)
3550 {
3551 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(obj);
3552 if (rioc->rdma) {
3553 qemu_rdma_cleanup(rioc->rdma);
3554 g_free(rioc->rdma);
3555 rioc->rdma = NULL;
3556 }
3557 }
3558
3559 static void qio_channel_rdma_class_init(ObjectClass *klass,
3560 void *class_data G_GNUC_UNUSED)
3561 {
3562 QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass);
3563
3564 ioc_klass->io_writev = qio_channel_rdma_writev;
3565 ioc_klass->io_readv = qio_channel_rdma_readv;
3566 ioc_klass->io_set_blocking = qio_channel_rdma_set_blocking;
3567 ioc_klass->io_close = qio_channel_rdma_close;
3568 ioc_klass->io_create_watch = qio_channel_rdma_create_watch;
3569 }
3570
3571 static const TypeInfo qio_channel_rdma_info = {
3572 .parent = TYPE_QIO_CHANNEL,
3573 .name = TYPE_QIO_CHANNEL_RDMA,
3574 .instance_size = sizeof(QIOChannelRDMA),
3575 .instance_finalize = qio_channel_rdma_finalize,
3576 .class_init = qio_channel_rdma_class_init,
3577 };
3578
3579 static void qio_channel_rdma_register_types(void)
3580 {
3581 type_register_static(&qio_channel_rdma_info);
3582 }
3583
3584 type_init(qio_channel_rdma_register_types);
3585
3586 static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
3587 {
3588 QIOChannelRDMA *rioc;
3589
3590 if (qemu_file_mode_is_not_valid(mode)) {
3591 return NULL;
3592 }
3593
3594 rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
3595 rioc->rdma = rdma;
3596
3597 if (mode[0] == 'w') {
3598 rioc->file = qemu_fopen_channel_output(QIO_CHANNEL(rioc));
3599 qemu_file_set_hooks(rioc->file, &rdma_write_hooks);
3600 } else {
3601 rioc->file = qemu_fopen_channel_input(QIO_CHANNEL(rioc));
3602 qemu_file_set_hooks(rioc->file, &rdma_read_hooks);
3603 }
3604
3605 return rioc->file;
3606 }
3607
3608 static void rdma_accept_incoming_migration(void *opaque)
3609 {
3610 RDMAContext *rdma = opaque;
3611 int ret;
3612 QEMUFile *f;
3613 Error *local_err = NULL, **errp = &local_err;
3614
3615 trace_qemu_rdma_accept_incoming_migration();
3616 ret = qemu_rdma_accept(rdma);
3617
3618 if (ret) {
3619 ERROR(errp, "RDMA Migration initialization failed!");
3620 return;
3621 }
3622
3623 trace_qemu_rdma_accept_incoming_migration_accepted();
3624
3625 f = qemu_fopen_rdma(rdma, "rb");
3626 if (f == NULL) {
3627 ERROR(errp, "could not qemu_fopen_rdma!");
3628 qemu_rdma_cleanup(rdma);
3629 return;
3630 }
3631
3632 rdma->migration_started_on_destination = 1;
3633 migration_fd_process_incoming(f);
3634 }
3635
3636 void rdma_start_incoming_migration(const char *host_port, Error **errp)
3637 {
3638 int ret;
3639 RDMAContext *rdma;
3640 Error *local_err = NULL;
3641
3642 trace_rdma_start_incoming_migration();
3643 rdma = qemu_rdma_data_init(host_port, &local_err);
3644
3645 if (rdma == NULL) {
3646 goto err;
3647 }
3648
3649 ret = qemu_rdma_dest_init(rdma, &local_err);
3650
3651 if (ret) {
3652 goto err;
3653 }
3654
3655 trace_rdma_start_incoming_migration_after_dest_init();
3656
3657 ret = rdma_listen(rdma->listen_id, 5);
3658
3659 if (ret) {
3660 ERROR(errp, "listening on socket!");
3661 goto err;
3662 }
3663
3664 trace_rdma_start_incoming_migration_after_rdma_listen();
3665
3666 qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
3667 NULL, (void *)(intptr_t)rdma);
3668 return;
3669 err:
3670 error_propagate(errp, local_err);
3671 g_free(rdma);
3672 }
3673
3674 void rdma_start_outgoing_migration(void *opaque,
3675 const char *host_port, Error **errp)
3676 {
3677 MigrationState *s = opaque;
3678 RDMAContext *rdma = qemu_rdma_data_init(host_port, errp);
3679 int ret = 0;
3680
3681 if (rdma == NULL) {
3682 goto err;
3683 }
3684
3685 ret = qemu_rdma_source_init(rdma,
3686 s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL], errp);
3687
3688 if (ret) {
3689 goto err;
3690 }
3691
3692 trace_rdma_start_outgoing_migration_after_rdma_source_init();
3693 ret = qemu_rdma_connect(rdma, errp);
3694
3695 if (ret) {
3696 goto err;
3697 }
3698
3699 trace_rdma_start_outgoing_migration_after_rdma_connect();
3700
3701 s->to_dst_file = qemu_fopen_rdma(rdma, "wb");
3702 migrate_fd_connect(s);
3703 return;
3704 err:
3705 g_free(rdma);
3706 }