]> git.proxmox.com Git - mirror_qemu.git/blob - migration/rdma.c
Merge remote-tracking branch 'remotes/borntraeger/tags/s390x-20170608' into staging
[mirror_qemu.git] / migration / rdma.c
1 /*
2 * RDMA protocol and interfaces
3 *
4 * Copyright IBM, Corp. 2010-2013
5 * Copyright Red Hat, Inc. 2015-2016
6 *
7 * Authors:
8 * Michael R. Hines <mrhines@us.ibm.com>
9 * Jiuxing Liu <jl@us.ibm.com>
10 * Daniel P. Berrange <berrange@redhat.com>
11 *
12 * This work is licensed under the terms of the GNU GPL, version 2 or
13 * later. See the COPYING file in the top-level directory.
14 *
15 */
16 #include "qemu/osdep.h"
17 #include "qapi/error.h"
18 #include "qemu-common.h"
19 #include "qemu/cutils.h"
20 #include "rdma.h"
21 #include "migration/migration.h"
22 #include "qemu-file.h"
23 #include "ram.h"
24 #include "qemu-file-channel.h"
25 #include "qemu/error-report.h"
26 #include "qemu/main-loop.h"
27 #include "qemu/sockets.h"
28 #include "qemu/bitmap.h"
29 #include "qemu/coroutine.h"
30 #include <sys/socket.h>
31 #include <netdb.h>
32 #include <arpa/inet.h>
33 #include <rdma/rdma_cma.h>
34 #include "trace.h"
35
36 /*
37 * Print and error on both the Monitor and the Log file.
38 */
39 #define ERROR(errp, fmt, ...) \
40 do { \
41 fprintf(stderr, "RDMA ERROR: " fmt "\n", ## __VA_ARGS__); \
42 if (errp && (*(errp) == NULL)) { \
43 error_setg(errp, "RDMA ERROR: " fmt, ## __VA_ARGS__); \
44 } \
45 } while (0)
46
47 #define RDMA_RESOLVE_TIMEOUT_MS 10000
48
49 /* Do not merge data if larger than this. */
50 #define RDMA_MERGE_MAX (2 * 1024 * 1024)
51 #define RDMA_SIGNALED_SEND_MAX (RDMA_MERGE_MAX / 4096)
52
53 #define RDMA_REG_CHUNK_SHIFT 20 /* 1 MB */
54
55 /*
56 * This is only for non-live state being migrated.
57 * Instead of RDMA_WRITE messages, we use RDMA_SEND
58 * messages for that state, which requires a different
59 * delivery design than main memory.
60 */
61 #define RDMA_SEND_INCREMENT 32768
62
63 /*
64 * Maximum size infiniband SEND message
65 */
66 #define RDMA_CONTROL_MAX_BUFFER (512 * 1024)
67 #define RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE 4096
68
69 #define RDMA_CONTROL_VERSION_CURRENT 1
70 /*
71 * Capabilities for negotiation.
72 */
73 #define RDMA_CAPABILITY_PIN_ALL 0x01
74
75 /*
76 * Add the other flags above to this list of known capabilities
77 * as they are introduced.
78 */
79 static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL;
80
81 #define CHECK_ERROR_STATE() \
82 do { \
83 if (rdma->error_state) { \
84 if (!rdma->error_reported) { \
85 error_report("RDMA is in an error state waiting migration" \
86 " to abort!"); \
87 rdma->error_reported = 1; \
88 } \
89 return rdma->error_state; \
90 } \
91 } while (0);
92
93 /*
94 * A work request ID is 64-bits and we split up these bits
95 * into 3 parts:
96 *
97 * bits 0-15 : type of control message, 2^16
98 * bits 16-29: ram block index, 2^14
99 * bits 30-63: ram block chunk number, 2^34
100 *
101 * The last two bit ranges are only used for RDMA writes,
102 * in order to track their completion and potentially
103 * also track unregistration status of the message.
104 */
105 #define RDMA_WRID_TYPE_SHIFT 0UL
106 #define RDMA_WRID_BLOCK_SHIFT 16UL
107 #define RDMA_WRID_CHUNK_SHIFT 30UL
108
109 #define RDMA_WRID_TYPE_MASK \
110 ((1UL << RDMA_WRID_BLOCK_SHIFT) - 1UL)
111
112 #define RDMA_WRID_BLOCK_MASK \
113 (~RDMA_WRID_TYPE_MASK & ((1UL << RDMA_WRID_CHUNK_SHIFT) - 1UL))
114
115 #define RDMA_WRID_CHUNK_MASK (~RDMA_WRID_BLOCK_MASK & ~RDMA_WRID_TYPE_MASK)
116
117 /*
118 * RDMA migration protocol:
119 * 1. RDMA Writes (data messages, i.e. RAM)
120 * 2. IB Send/Recv (control channel messages)
121 */
122 enum {
123 RDMA_WRID_NONE = 0,
124 RDMA_WRID_RDMA_WRITE = 1,
125 RDMA_WRID_SEND_CONTROL = 2000,
126 RDMA_WRID_RECV_CONTROL = 4000,
127 };
128
129 static const char *wrid_desc[] = {
130 [RDMA_WRID_NONE] = "NONE",
131 [RDMA_WRID_RDMA_WRITE] = "WRITE RDMA",
132 [RDMA_WRID_SEND_CONTROL] = "CONTROL SEND",
133 [RDMA_WRID_RECV_CONTROL] = "CONTROL RECV",
134 };
135
136 /*
137 * Work request IDs for IB SEND messages only (not RDMA writes).
138 * This is used by the migration protocol to transmit
139 * control messages (such as device state and registration commands)
140 *
141 * We could use more WRs, but we have enough for now.
142 */
143 enum {
144 RDMA_WRID_READY = 0,
145 RDMA_WRID_DATA,
146 RDMA_WRID_CONTROL,
147 RDMA_WRID_MAX,
148 };
149
150 /*
151 * SEND/RECV IB Control Messages.
152 */
153 enum {
154 RDMA_CONTROL_NONE = 0,
155 RDMA_CONTROL_ERROR,
156 RDMA_CONTROL_READY, /* ready to receive */
157 RDMA_CONTROL_QEMU_FILE, /* QEMUFile-transmitted bytes */
158 RDMA_CONTROL_RAM_BLOCKS_REQUEST, /* RAMBlock synchronization */
159 RDMA_CONTROL_RAM_BLOCKS_RESULT, /* RAMBlock synchronization */
160 RDMA_CONTROL_COMPRESS, /* page contains repeat values */
161 RDMA_CONTROL_REGISTER_REQUEST, /* dynamic page registration */
162 RDMA_CONTROL_REGISTER_RESULT, /* key to use after registration */
163 RDMA_CONTROL_REGISTER_FINISHED, /* current iteration finished */
164 RDMA_CONTROL_UNREGISTER_REQUEST, /* dynamic UN-registration */
165 RDMA_CONTROL_UNREGISTER_FINISHED, /* unpinning finished */
166 };
167
168 static const char *control_desc[] = {
169 [RDMA_CONTROL_NONE] = "NONE",
170 [RDMA_CONTROL_ERROR] = "ERROR",
171 [RDMA_CONTROL_READY] = "READY",
172 [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE",
173 [RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST",
174 [RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT",
175 [RDMA_CONTROL_COMPRESS] = "COMPRESS",
176 [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST",
177 [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT",
178 [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED",
179 [RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST",
180 [RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED",
181 };
182
183 /*
184 * Memory and MR structures used to represent an IB Send/Recv work request.
185 * This is *not* used for RDMA writes, only IB Send/Recv.
186 */
187 typedef struct {
188 uint8_t control[RDMA_CONTROL_MAX_BUFFER]; /* actual buffer to register */
189 struct ibv_mr *control_mr; /* registration metadata */
190 size_t control_len; /* length of the message */
191 uint8_t *control_curr; /* start of unconsumed bytes */
192 } RDMAWorkRequestData;
193
194 /*
195 * Negotiate RDMA capabilities during connection-setup time.
196 */
197 typedef struct {
198 uint32_t version;
199 uint32_t flags;
200 } RDMACapabilities;
201
202 static void caps_to_network(RDMACapabilities *cap)
203 {
204 cap->version = htonl(cap->version);
205 cap->flags = htonl(cap->flags);
206 }
207
208 static void network_to_caps(RDMACapabilities *cap)
209 {
210 cap->version = ntohl(cap->version);
211 cap->flags = ntohl(cap->flags);
212 }
213
214 /*
215 * Representation of a RAMBlock from an RDMA perspective.
216 * This is not transmitted, only local.
217 * This and subsequent structures cannot be linked lists
218 * because we're using a single IB message to transmit
219 * the information. It's small anyway, so a list is overkill.
220 */
221 typedef struct RDMALocalBlock {
222 char *block_name;
223 uint8_t *local_host_addr; /* local virtual address */
224 uint64_t remote_host_addr; /* remote virtual address */
225 uint64_t offset;
226 uint64_t length;
227 struct ibv_mr **pmr; /* MRs for chunk-level registration */
228 struct ibv_mr *mr; /* MR for non-chunk-level registration */
229 uint32_t *remote_keys; /* rkeys for chunk-level registration */
230 uint32_t remote_rkey; /* rkeys for non-chunk-level registration */
231 int index; /* which block are we */
232 unsigned int src_index; /* (Only used on dest) */
233 bool is_ram_block;
234 int nb_chunks;
235 unsigned long *transit_bitmap;
236 unsigned long *unregister_bitmap;
237 } RDMALocalBlock;
238
239 /*
240 * Also represents a RAMblock, but only on the dest.
241 * This gets transmitted by the dest during connection-time
242 * to the source VM and then is used to populate the
243 * corresponding RDMALocalBlock with
244 * the information needed to perform the actual RDMA.
245 */
246 typedef struct QEMU_PACKED RDMADestBlock {
247 uint64_t remote_host_addr;
248 uint64_t offset;
249 uint64_t length;
250 uint32_t remote_rkey;
251 uint32_t padding;
252 } RDMADestBlock;
253
254 static uint64_t htonll(uint64_t v)
255 {
256 union { uint32_t lv[2]; uint64_t llv; } u;
257 u.lv[0] = htonl(v >> 32);
258 u.lv[1] = htonl(v & 0xFFFFFFFFULL);
259 return u.llv;
260 }
261
262 static uint64_t ntohll(uint64_t v) {
263 union { uint32_t lv[2]; uint64_t llv; } u;
264 u.llv = v;
265 return ((uint64_t)ntohl(u.lv[0]) << 32) | (uint64_t) ntohl(u.lv[1]);
266 }
267
268 static void dest_block_to_network(RDMADestBlock *db)
269 {
270 db->remote_host_addr = htonll(db->remote_host_addr);
271 db->offset = htonll(db->offset);
272 db->length = htonll(db->length);
273 db->remote_rkey = htonl(db->remote_rkey);
274 }
275
276 static void network_to_dest_block(RDMADestBlock *db)
277 {
278 db->remote_host_addr = ntohll(db->remote_host_addr);
279 db->offset = ntohll(db->offset);
280 db->length = ntohll(db->length);
281 db->remote_rkey = ntohl(db->remote_rkey);
282 }
283
284 /*
285 * Virtual address of the above structures used for transmitting
286 * the RAMBlock descriptions at connection-time.
287 * This structure is *not* transmitted.
288 */
289 typedef struct RDMALocalBlocks {
290 int nb_blocks;
291 bool init; /* main memory init complete */
292 RDMALocalBlock *block;
293 } RDMALocalBlocks;
294
295 /*
296 * Main data structure for RDMA state.
297 * While there is only one copy of this structure being allocated right now,
298 * this is the place where one would start if you wanted to consider
299 * having more than one RDMA connection open at the same time.
300 */
301 typedef struct RDMAContext {
302 char *host;
303 int port;
304
305 RDMAWorkRequestData wr_data[RDMA_WRID_MAX];
306
307 /*
308 * This is used by *_exchange_send() to figure out whether or not
309 * the initial "READY" message has already been received or not.
310 * This is because other functions may potentially poll() and detect
311 * the READY message before send() does, in which case we need to
312 * know if it completed.
313 */
314 int control_ready_expected;
315
316 /* number of outstanding writes */
317 int nb_sent;
318
319 /* store info about current buffer so that we can
320 merge it with future sends */
321 uint64_t current_addr;
322 uint64_t current_length;
323 /* index of ram block the current buffer belongs to */
324 int current_index;
325 /* index of the chunk in the current ram block */
326 int current_chunk;
327
328 bool pin_all;
329
330 /*
331 * infiniband-specific variables for opening the device
332 * and maintaining connection state and so forth.
333 *
334 * cm_id also has ibv_context, rdma_event_channel, and ibv_qp in
335 * cm_id->verbs, cm_id->channel, and cm_id->qp.
336 */
337 struct rdma_cm_id *cm_id; /* connection manager ID */
338 struct rdma_cm_id *listen_id;
339 bool connected;
340
341 struct ibv_context *verbs;
342 struct rdma_event_channel *channel;
343 struct ibv_qp *qp; /* queue pair */
344 struct ibv_comp_channel *comp_channel; /* completion channel */
345 struct ibv_pd *pd; /* protection domain */
346 struct ibv_cq *cq; /* completion queue */
347
348 /*
349 * If a previous write failed (perhaps because of a failed
350 * memory registration, then do not attempt any future work
351 * and remember the error state.
352 */
353 int error_state;
354 int error_reported;
355 int received_error;
356
357 /*
358 * Description of ram blocks used throughout the code.
359 */
360 RDMALocalBlocks local_ram_blocks;
361 RDMADestBlock *dest_blocks;
362
363 /* Index of the next RAMBlock received during block registration */
364 unsigned int next_src_index;
365
366 /*
367 * Migration on *destination* started.
368 * Then use coroutine yield function.
369 * Source runs in a thread, so we don't care.
370 */
371 int migration_started_on_destination;
372
373 int total_registrations;
374 int total_writes;
375
376 int unregister_current, unregister_next;
377 uint64_t unregistrations[RDMA_SIGNALED_SEND_MAX];
378
379 GHashTable *blockmap;
380 } RDMAContext;
381
382 #define TYPE_QIO_CHANNEL_RDMA "qio-channel-rdma"
383 #define QIO_CHANNEL_RDMA(obj) \
384 OBJECT_CHECK(QIOChannelRDMA, (obj), TYPE_QIO_CHANNEL_RDMA)
385
386 typedef struct QIOChannelRDMA QIOChannelRDMA;
387
388
389 struct QIOChannelRDMA {
390 QIOChannel parent;
391 RDMAContext *rdma;
392 QEMUFile *file;
393 size_t len;
394 bool blocking; /* XXX we don't actually honour this yet */
395 };
396
397 /*
398 * Main structure for IB Send/Recv control messages.
399 * This gets prepended at the beginning of every Send/Recv.
400 */
401 typedef struct QEMU_PACKED {
402 uint32_t len; /* Total length of data portion */
403 uint32_t type; /* which control command to perform */
404 uint32_t repeat; /* number of commands in data portion of same type */
405 uint32_t padding;
406 } RDMAControlHeader;
407
408 static void control_to_network(RDMAControlHeader *control)
409 {
410 control->type = htonl(control->type);
411 control->len = htonl(control->len);
412 control->repeat = htonl(control->repeat);
413 }
414
415 static void network_to_control(RDMAControlHeader *control)
416 {
417 control->type = ntohl(control->type);
418 control->len = ntohl(control->len);
419 control->repeat = ntohl(control->repeat);
420 }
421
422 /*
423 * Register a single Chunk.
424 * Information sent by the source VM to inform the dest
425 * to register an single chunk of memory before we can perform
426 * the actual RDMA operation.
427 */
428 typedef struct QEMU_PACKED {
429 union QEMU_PACKED {
430 uint64_t current_addr; /* offset into the ram_addr_t space */
431 uint64_t chunk; /* chunk to lookup if unregistering */
432 } key;
433 uint32_t current_index; /* which ramblock the chunk belongs to */
434 uint32_t padding;
435 uint64_t chunks; /* how many sequential chunks to register */
436 } RDMARegister;
437
438 static void register_to_network(RDMAContext *rdma, RDMARegister *reg)
439 {
440 RDMALocalBlock *local_block;
441 local_block = &rdma->local_ram_blocks.block[reg->current_index];
442
443 if (local_block->is_ram_block) {
444 /*
445 * current_addr as passed in is an address in the local ram_addr_t
446 * space, we need to translate this for the destination
447 */
448 reg->key.current_addr -= local_block->offset;
449 reg->key.current_addr += rdma->dest_blocks[reg->current_index].offset;
450 }
451 reg->key.current_addr = htonll(reg->key.current_addr);
452 reg->current_index = htonl(reg->current_index);
453 reg->chunks = htonll(reg->chunks);
454 }
455
456 static void network_to_register(RDMARegister *reg)
457 {
458 reg->key.current_addr = ntohll(reg->key.current_addr);
459 reg->current_index = ntohl(reg->current_index);
460 reg->chunks = ntohll(reg->chunks);
461 }
462
463 typedef struct QEMU_PACKED {
464 uint32_t value; /* if zero, we will madvise() */
465 uint32_t block_idx; /* which ram block index */
466 uint64_t offset; /* Address in remote ram_addr_t space */
467 uint64_t length; /* length of the chunk */
468 } RDMACompress;
469
470 static void compress_to_network(RDMAContext *rdma, RDMACompress *comp)
471 {
472 comp->value = htonl(comp->value);
473 /*
474 * comp->offset as passed in is an address in the local ram_addr_t
475 * space, we need to translate this for the destination
476 */
477 comp->offset -= rdma->local_ram_blocks.block[comp->block_idx].offset;
478 comp->offset += rdma->dest_blocks[comp->block_idx].offset;
479 comp->block_idx = htonl(comp->block_idx);
480 comp->offset = htonll(comp->offset);
481 comp->length = htonll(comp->length);
482 }
483
484 static void network_to_compress(RDMACompress *comp)
485 {
486 comp->value = ntohl(comp->value);
487 comp->block_idx = ntohl(comp->block_idx);
488 comp->offset = ntohll(comp->offset);
489 comp->length = ntohll(comp->length);
490 }
491
492 /*
493 * The result of the dest's memory registration produces an "rkey"
494 * which the source VM must reference in order to perform
495 * the RDMA operation.
496 */
497 typedef struct QEMU_PACKED {
498 uint32_t rkey;
499 uint32_t padding;
500 uint64_t host_addr;
501 } RDMARegisterResult;
502
503 static void result_to_network(RDMARegisterResult *result)
504 {
505 result->rkey = htonl(result->rkey);
506 result->host_addr = htonll(result->host_addr);
507 };
508
509 static void network_to_result(RDMARegisterResult *result)
510 {
511 result->rkey = ntohl(result->rkey);
512 result->host_addr = ntohll(result->host_addr);
513 };
514
515 const char *print_wrid(int wrid);
516 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
517 uint8_t *data, RDMAControlHeader *resp,
518 int *resp_idx,
519 int (*callback)(RDMAContext *rdma));
520
521 static inline uint64_t ram_chunk_index(const uint8_t *start,
522 const uint8_t *host)
523 {
524 return ((uintptr_t) host - (uintptr_t) start) >> RDMA_REG_CHUNK_SHIFT;
525 }
526
527 static inline uint8_t *ram_chunk_start(const RDMALocalBlock *rdma_ram_block,
528 uint64_t i)
529 {
530 return (uint8_t *)(uintptr_t)(rdma_ram_block->local_host_addr +
531 (i << RDMA_REG_CHUNK_SHIFT));
532 }
533
534 static inline uint8_t *ram_chunk_end(const RDMALocalBlock *rdma_ram_block,
535 uint64_t i)
536 {
537 uint8_t *result = ram_chunk_start(rdma_ram_block, i) +
538 (1UL << RDMA_REG_CHUNK_SHIFT);
539
540 if (result > (rdma_ram_block->local_host_addr + rdma_ram_block->length)) {
541 result = rdma_ram_block->local_host_addr + rdma_ram_block->length;
542 }
543
544 return result;
545 }
546
547 static int rdma_add_block(RDMAContext *rdma, const char *block_name,
548 void *host_addr,
549 ram_addr_t block_offset, uint64_t length)
550 {
551 RDMALocalBlocks *local = &rdma->local_ram_blocks;
552 RDMALocalBlock *block;
553 RDMALocalBlock *old = local->block;
554
555 local->block = g_new0(RDMALocalBlock, local->nb_blocks + 1);
556
557 if (local->nb_blocks) {
558 int x;
559
560 if (rdma->blockmap) {
561 for (x = 0; x < local->nb_blocks; x++) {
562 g_hash_table_remove(rdma->blockmap,
563 (void *)(uintptr_t)old[x].offset);
564 g_hash_table_insert(rdma->blockmap,
565 (void *)(uintptr_t)old[x].offset,
566 &local->block[x]);
567 }
568 }
569 memcpy(local->block, old, sizeof(RDMALocalBlock) * local->nb_blocks);
570 g_free(old);
571 }
572
573 block = &local->block[local->nb_blocks];
574
575 block->block_name = g_strdup(block_name);
576 block->local_host_addr = host_addr;
577 block->offset = block_offset;
578 block->length = length;
579 block->index = local->nb_blocks;
580 block->src_index = ~0U; /* Filled in by the receipt of the block list */
581 block->nb_chunks = ram_chunk_index(host_addr, host_addr + length) + 1UL;
582 block->transit_bitmap = bitmap_new(block->nb_chunks);
583 bitmap_clear(block->transit_bitmap, 0, block->nb_chunks);
584 block->unregister_bitmap = bitmap_new(block->nb_chunks);
585 bitmap_clear(block->unregister_bitmap, 0, block->nb_chunks);
586 block->remote_keys = g_new0(uint32_t, block->nb_chunks);
587
588 block->is_ram_block = local->init ? false : true;
589
590 if (rdma->blockmap) {
591 g_hash_table_insert(rdma->blockmap, (void *)(uintptr_t)block_offset, block);
592 }
593
594 trace_rdma_add_block(block_name, local->nb_blocks,
595 (uintptr_t) block->local_host_addr,
596 block->offset, block->length,
597 (uintptr_t) (block->local_host_addr + block->length),
598 BITS_TO_LONGS(block->nb_chunks) *
599 sizeof(unsigned long) * 8,
600 block->nb_chunks);
601
602 local->nb_blocks++;
603
604 return 0;
605 }
606
607 /*
608 * Memory regions need to be registered with the device and queue pairs setup
609 * in advanced before the migration starts. This tells us where the RAM blocks
610 * are so that we can register them individually.
611 */
612 static int qemu_rdma_init_one_block(const char *block_name, void *host_addr,
613 ram_addr_t block_offset, ram_addr_t length, void *opaque)
614 {
615 return rdma_add_block(opaque, block_name, host_addr, block_offset, length);
616 }
617
618 /*
619 * Identify the RAMBlocks and their quantity. They will be references to
620 * identify chunk boundaries inside each RAMBlock and also be referenced
621 * during dynamic page registration.
622 */
623 static int qemu_rdma_init_ram_blocks(RDMAContext *rdma)
624 {
625 RDMALocalBlocks *local = &rdma->local_ram_blocks;
626
627 assert(rdma->blockmap == NULL);
628 memset(local, 0, sizeof *local);
629 qemu_ram_foreach_block(qemu_rdma_init_one_block, rdma);
630 trace_qemu_rdma_init_ram_blocks(local->nb_blocks);
631 rdma->dest_blocks = g_new0(RDMADestBlock,
632 rdma->local_ram_blocks.nb_blocks);
633 local->init = true;
634 return 0;
635 }
636
637 /*
638 * Note: If used outside of cleanup, the caller must ensure that the destination
639 * block structures are also updated
640 */
641 static int rdma_delete_block(RDMAContext *rdma, RDMALocalBlock *block)
642 {
643 RDMALocalBlocks *local = &rdma->local_ram_blocks;
644 RDMALocalBlock *old = local->block;
645 int x;
646
647 if (rdma->blockmap) {
648 g_hash_table_remove(rdma->blockmap, (void *)(uintptr_t)block->offset);
649 }
650 if (block->pmr) {
651 int j;
652
653 for (j = 0; j < block->nb_chunks; j++) {
654 if (!block->pmr[j]) {
655 continue;
656 }
657 ibv_dereg_mr(block->pmr[j]);
658 rdma->total_registrations--;
659 }
660 g_free(block->pmr);
661 block->pmr = NULL;
662 }
663
664 if (block->mr) {
665 ibv_dereg_mr(block->mr);
666 rdma->total_registrations--;
667 block->mr = NULL;
668 }
669
670 g_free(block->transit_bitmap);
671 block->transit_bitmap = NULL;
672
673 g_free(block->unregister_bitmap);
674 block->unregister_bitmap = NULL;
675
676 g_free(block->remote_keys);
677 block->remote_keys = NULL;
678
679 g_free(block->block_name);
680 block->block_name = NULL;
681
682 if (rdma->blockmap) {
683 for (x = 0; x < local->nb_blocks; x++) {
684 g_hash_table_remove(rdma->blockmap,
685 (void *)(uintptr_t)old[x].offset);
686 }
687 }
688
689 if (local->nb_blocks > 1) {
690
691 local->block = g_new0(RDMALocalBlock, local->nb_blocks - 1);
692
693 if (block->index) {
694 memcpy(local->block, old, sizeof(RDMALocalBlock) * block->index);
695 }
696
697 if (block->index < (local->nb_blocks - 1)) {
698 memcpy(local->block + block->index, old + (block->index + 1),
699 sizeof(RDMALocalBlock) *
700 (local->nb_blocks - (block->index + 1)));
701 }
702 } else {
703 assert(block == local->block);
704 local->block = NULL;
705 }
706
707 trace_rdma_delete_block(block, (uintptr_t)block->local_host_addr,
708 block->offset, block->length,
709 (uintptr_t)(block->local_host_addr + block->length),
710 BITS_TO_LONGS(block->nb_chunks) *
711 sizeof(unsigned long) * 8, block->nb_chunks);
712
713 g_free(old);
714
715 local->nb_blocks--;
716
717 if (local->nb_blocks && rdma->blockmap) {
718 for (x = 0; x < local->nb_blocks; x++) {
719 g_hash_table_insert(rdma->blockmap,
720 (void *)(uintptr_t)local->block[x].offset,
721 &local->block[x]);
722 }
723 }
724
725 return 0;
726 }
727
728 /*
729 * Put in the log file which RDMA device was opened and the details
730 * associated with that device.
731 */
732 static void qemu_rdma_dump_id(const char *who, struct ibv_context *verbs)
733 {
734 struct ibv_port_attr port;
735
736 if (ibv_query_port(verbs, 1, &port)) {
737 error_report("Failed to query port information");
738 return;
739 }
740
741 printf("%s RDMA Device opened: kernel name %s "
742 "uverbs device name %s, "
743 "infiniband_verbs class device path %s, "
744 "infiniband class device path %s, "
745 "transport: (%d) %s\n",
746 who,
747 verbs->device->name,
748 verbs->device->dev_name,
749 verbs->device->dev_path,
750 verbs->device->ibdev_path,
751 port.link_layer,
752 (port.link_layer == IBV_LINK_LAYER_INFINIBAND) ? "Infiniband" :
753 ((port.link_layer == IBV_LINK_LAYER_ETHERNET)
754 ? "Ethernet" : "Unknown"));
755 }
756
757 /*
758 * Put in the log file the RDMA gid addressing information,
759 * useful for folks who have trouble understanding the
760 * RDMA device hierarchy in the kernel.
761 */
762 static void qemu_rdma_dump_gid(const char *who, struct rdma_cm_id *id)
763 {
764 char sgid[33];
765 char dgid[33];
766 inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.sgid, sgid, sizeof sgid);
767 inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.dgid, dgid, sizeof dgid);
768 trace_qemu_rdma_dump_gid(who, sgid, dgid);
769 }
770
771 /*
772 * As of now, IPv6 over RoCE / iWARP is not supported by linux.
773 * We will try the next addrinfo struct, and fail if there are
774 * no other valid addresses to bind against.
775 *
776 * If user is listening on '[::]', then we will not have a opened a device
777 * yet and have no way of verifying if the device is RoCE or not.
778 *
779 * In this case, the source VM will throw an error for ALL types of
780 * connections (both IPv4 and IPv6) if the destination machine does not have
781 * a regular infiniband network available for use.
782 *
783 * The only way to guarantee that an error is thrown for broken kernels is
784 * for the management software to choose a *specific* interface at bind time
785 * and validate what time of hardware it is.
786 *
787 * Unfortunately, this puts the user in a fix:
788 *
789 * If the source VM connects with an IPv4 address without knowing that the
790 * destination has bound to '[::]' the migration will unconditionally fail
791 * unless the management software is explicitly listening on the IPv4
792 * address while using a RoCE-based device.
793 *
794 * If the source VM connects with an IPv6 address, then we're OK because we can
795 * throw an error on the source (and similarly on the destination).
796 *
797 * But in mixed environments, this will be broken for a while until it is fixed
798 * inside linux.
799 *
800 * We do provide a *tiny* bit of help in this function: We can list all of the
801 * devices in the system and check to see if all the devices are RoCE or
802 * Infiniband.
803 *
804 * If we detect that we have a *pure* RoCE environment, then we can safely
805 * thrown an error even if the management software has specified '[::]' as the
806 * bind address.
807 *
808 * However, if there is are multiple hetergeneous devices, then we cannot make
809 * this assumption and the user just has to be sure they know what they are
810 * doing.
811 *
812 * Patches are being reviewed on linux-rdma.
813 */
814 static int qemu_rdma_broken_ipv6_kernel(struct ibv_context *verbs, Error **errp)
815 {
816 struct ibv_port_attr port_attr;
817
818 /* This bug only exists in linux, to our knowledge. */
819 #ifdef CONFIG_LINUX
820
821 /*
822 * Verbs are only NULL if management has bound to '[::]'.
823 *
824 * Let's iterate through all the devices and see if there any pure IB
825 * devices (non-ethernet).
826 *
827 * If not, then we can safely proceed with the migration.
828 * Otherwise, there are no guarantees until the bug is fixed in linux.
829 */
830 if (!verbs) {
831 int num_devices, x;
832 struct ibv_device ** dev_list = ibv_get_device_list(&num_devices);
833 bool roce_found = false;
834 bool ib_found = false;
835
836 for (x = 0; x < num_devices; x++) {
837 verbs = ibv_open_device(dev_list[x]);
838 if (!verbs) {
839 if (errno == EPERM) {
840 continue;
841 } else {
842 return -EINVAL;
843 }
844 }
845
846 if (ibv_query_port(verbs, 1, &port_attr)) {
847 ibv_close_device(verbs);
848 ERROR(errp, "Could not query initial IB port");
849 return -EINVAL;
850 }
851
852 if (port_attr.link_layer == IBV_LINK_LAYER_INFINIBAND) {
853 ib_found = true;
854 } else if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) {
855 roce_found = true;
856 }
857
858 ibv_close_device(verbs);
859
860 }
861
862 if (roce_found) {
863 if (ib_found) {
864 fprintf(stderr, "WARN: migrations may fail:"
865 " IPv6 over RoCE / iWARP in linux"
866 " is broken. But since you appear to have a"
867 " mixed RoCE / IB environment, be sure to only"
868 " migrate over the IB fabric until the kernel "
869 " fixes the bug.\n");
870 } else {
871 ERROR(errp, "You only have RoCE / iWARP devices in your systems"
872 " and your management software has specified '[::]'"
873 ", but IPv6 over RoCE / iWARP is not supported in Linux.");
874 return -ENONET;
875 }
876 }
877
878 return 0;
879 }
880
881 /*
882 * If we have a verbs context, that means that some other than '[::]' was
883 * used by the management software for binding. In which case we can
884 * actually warn the user about a potentially broken kernel.
885 */
886
887 /* IB ports start with 1, not 0 */
888 if (ibv_query_port(verbs, 1, &port_attr)) {
889 ERROR(errp, "Could not query initial IB port");
890 return -EINVAL;
891 }
892
893 if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) {
894 ERROR(errp, "Linux kernel's RoCE / iWARP does not support IPv6 "
895 "(but patches on linux-rdma in progress)");
896 return -ENONET;
897 }
898
899 #endif
900
901 return 0;
902 }
903
904 /*
905 * Figure out which RDMA device corresponds to the requested IP hostname
906 * Also create the initial connection manager identifiers for opening
907 * the connection.
908 */
909 static int qemu_rdma_resolve_host(RDMAContext *rdma, Error **errp)
910 {
911 int ret;
912 struct rdma_addrinfo *res;
913 char port_str[16];
914 struct rdma_cm_event *cm_event;
915 char ip[40] = "unknown";
916 struct rdma_addrinfo *e;
917
918 if (rdma->host == NULL || !strcmp(rdma->host, "")) {
919 ERROR(errp, "RDMA hostname has not been set");
920 return -EINVAL;
921 }
922
923 /* create CM channel */
924 rdma->channel = rdma_create_event_channel();
925 if (!rdma->channel) {
926 ERROR(errp, "could not create CM channel");
927 return -EINVAL;
928 }
929
930 /* create CM id */
931 ret = rdma_create_id(rdma->channel, &rdma->cm_id, NULL, RDMA_PS_TCP);
932 if (ret) {
933 ERROR(errp, "could not create channel id");
934 goto err_resolve_create_id;
935 }
936
937 snprintf(port_str, 16, "%d", rdma->port);
938 port_str[15] = '\0';
939
940 ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res);
941 if (ret < 0) {
942 ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host);
943 goto err_resolve_get_addr;
944 }
945
946 for (e = res; e != NULL; e = e->ai_next) {
947 inet_ntop(e->ai_family,
948 &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip);
949 trace_qemu_rdma_resolve_host_trying(rdma->host, ip);
950
951 ret = rdma_resolve_addr(rdma->cm_id, NULL, e->ai_dst_addr,
952 RDMA_RESOLVE_TIMEOUT_MS);
953 if (!ret) {
954 if (e->ai_family == AF_INET6) {
955 ret = qemu_rdma_broken_ipv6_kernel(rdma->cm_id->verbs, errp);
956 if (ret) {
957 continue;
958 }
959 }
960 goto route;
961 }
962 }
963
964 ERROR(errp, "could not resolve address %s", rdma->host);
965 goto err_resolve_get_addr;
966
967 route:
968 qemu_rdma_dump_gid("source_resolve_addr", rdma->cm_id);
969
970 ret = rdma_get_cm_event(rdma->channel, &cm_event);
971 if (ret) {
972 ERROR(errp, "could not perform event_addr_resolved");
973 goto err_resolve_get_addr;
974 }
975
976 if (cm_event->event != RDMA_CM_EVENT_ADDR_RESOLVED) {
977 ERROR(errp, "result not equal to event_addr_resolved %s",
978 rdma_event_str(cm_event->event));
979 perror("rdma_resolve_addr");
980 rdma_ack_cm_event(cm_event);
981 ret = -EINVAL;
982 goto err_resolve_get_addr;
983 }
984 rdma_ack_cm_event(cm_event);
985
986 /* resolve route */
987 ret = rdma_resolve_route(rdma->cm_id, RDMA_RESOLVE_TIMEOUT_MS);
988 if (ret) {
989 ERROR(errp, "could not resolve rdma route");
990 goto err_resolve_get_addr;
991 }
992
993 ret = rdma_get_cm_event(rdma->channel, &cm_event);
994 if (ret) {
995 ERROR(errp, "could not perform event_route_resolved");
996 goto err_resolve_get_addr;
997 }
998 if (cm_event->event != RDMA_CM_EVENT_ROUTE_RESOLVED) {
999 ERROR(errp, "result not equal to event_route_resolved: %s",
1000 rdma_event_str(cm_event->event));
1001 rdma_ack_cm_event(cm_event);
1002 ret = -EINVAL;
1003 goto err_resolve_get_addr;
1004 }
1005 rdma_ack_cm_event(cm_event);
1006 rdma->verbs = rdma->cm_id->verbs;
1007 qemu_rdma_dump_id("source_resolve_host", rdma->cm_id->verbs);
1008 qemu_rdma_dump_gid("source_resolve_host", rdma->cm_id);
1009 return 0;
1010
1011 err_resolve_get_addr:
1012 rdma_destroy_id(rdma->cm_id);
1013 rdma->cm_id = NULL;
1014 err_resolve_create_id:
1015 rdma_destroy_event_channel(rdma->channel);
1016 rdma->channel = NULL;
1017 return ret;
1018 }
1019
1020 /*
1021 * Create protection domain and completion queues
1022 */
1023 static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma)
1024 {
1025 /* allocate pd */
1026 rdma->pd = ibv_alloc_pd(rdma->verbs);
1027 if (!rdma->pd) {
1028 error_report("failed to allocate protection domain");
1029 return -1;
1030 }
1031
1032 /* create completion channel */
1033 rdma->comp_channel = ibv_create_comp_channel(rdma->verbs);
1034 if (!rdma->comp_channel) {
1035 error_report("failed to allocate completion channel");
1036 goto err_alloc_pd_cq;
1037 }
1038
1039 /*
1040 * Completion queue can be filled by both read and write work requests,
1041 * so must reflect the sum of both possible queue sizes.
1042 */
1043 rdma->cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3),
1044 NULL, rdma->comp_channel, 0);
1045 if (!rdma->cq) {
1046 error_report("failed to allocate completion queue");
1047 goto err_alloc_pd_cq;
1048 }
1049
1050 return 0;
1051
1052 err_alloc_pd_cq:
1053 if (rdma->pd) {
1054 ibv_dealloc_pd(rdma->pd);
1055 }
1056 if (rdma->comp_channel) {
1057 ibv_destroy_comp_channel(rdma->comp_channel);
1058 }
1059 rdma->pd = NULL;
1060 rdma->comp_channel = NULL;
1061 return -1;
1062
1063 }
1064
1065 /*
1066 * Create queue pairs.
1067 */
1068 static int qemu_rdma_alloc_qp(RDMAContext *rdma)
1069 {
1070 struct ibv_qp_init_attr attr = { 0 };
1071 int ret;
1072
1073 attr.cap.max_send_wr = RDMA_SIGNALED_SEND_MAX;
1074 attr.cap.max_recv_wr = 3;
1075 attr.cap.max_send_sge = 1;
1076 attr.cap.max_recv_sge = 1;
1077 attr.send_cq = rdma->cq;
1078 attr.recv_cq = rdma->cq;
1079 attr.qp_type = IBV_QPT_RC;
1080
1081 ret = rdma_create_qp(rdma->cm_id, rdma->pd, &attr);
1082 if (ret) {
1083 return -1;
1084 }
1085
1086 rdma->qp = rdma->cm_id->qp;
1087 return 0;
1088 }
1089
1090 static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma)
1091 {
1092 int i;
1093 RDMALocalBlocks *local = &rdma->local_ram_blocks;
1094
1095 for (i = 0; i < local->nb_blocks; i++) {
1096 local->block[i].mr =
1097 ibv_reg_mr(rdma->pd,
1098 local->block[i].local_host_addr,
1099 local->block[i].length,
1100 IBV_ACCESS_LOCAL_WRITE |
1101 IBV_ACCESS_REMOTE_WRITE
1102 );
1103 if (!local->block[i].mr) {
1104 perror("Failed to register local dest ram block!\n");
1105 break;
1106 }
1107 rdma->total_registrations++;
1108 }
1109
1110 if (i >= local->nb_blocks) {
1111 return 0;
1112 }
1113
1114 for (i--; i >= 0; i--) {
1115 ibv_dereg_mr(local->block[i].mr);
1116 rdma->total_registrations--;
1117 }
1118
1119 return -1;
1120
1121 }
1122
1123 /*
1124 * Find the ram block that corresponds to the page requested to be
1125 * transmitted by QEMU.
1126 *
1127 * Once the block is found, also identify which 'chunk' within that
1128 * block that the page belongs to.
1129 *
1130 * This search cannot fail or the migration will fail.
1131 */
1132 static int qemu_rdma_search_ram_block(RDMAContext *rdma,
1133 uintptr_t block_offset,
1134 uint64_t offset,
1135 uint64_t length,
1136 uint64_t *block_index,
1137 uint64_t *chunk_index)
1138 {
1139 uint64_t current_addr = block_offset + offset;
1140 RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap,
1141 (void *) block_offset);
1142 assert(block);
1143 assert(current_addr >= block->offset);
1144 assert((current_addr + length) <= (block->offset + block->length));
1145
1146 *block_index = block->index;
1147 *chunk_index = ram_chunk_index(block->local_host_addr,
1148 block->local_host_addr + (current_addr - block->offset));
1149
1150 return 0;
1151 }
1152
1153 /*
1154 * Register a chunk with IB. If the chunk was already registered
1155 * previously, then skip.
1156 *
1157 * Also return the keys associated with the registration needed
1158 * to perform the actual RDMA operation.
1159 */
1160 static int qemu_rdma_register_and_get_keys(RDMAContext *rdma,
1161 RDMALocalBlock *block, uintptr_t host_addr,
1162 uint32_t *lkey, uint32_t *rkey, int chunk,
1163 uint8_t *chunk_start, uint8_t *chunk_end)
1164 {
1165 if (block->mr) {
1166 if (lkey) {
1167 *lkey = block->mr->lkey;
1168 }
1169 if (rkey) {
1170 *rkey = block->mr->rkey;
1171 }
1172 return 0;
1173 }
1174
1175 /* allocate memory to store chunk MRs */
1176 if (!block->pmr) {
1177 block->pmr = g_new0(struct ibv_mr *, block->nb_chunks);
1178 }
1179
1180 /*
1181 * If 'rkey', then we're the destination, so grant access to the source.
1182 *
1183 * If 'lkey', then we're the source VM, so grant access only to ourselves.
1184 */
1185 if (!block->pmr[chunk]) {
1186 uint64_t len = chunk_end - chunk_start;
1187
1188 trace_qemu_rdma_register_and_get_keys(len, chunk_start);
1189
1190 block->pmr[chunk] = ibv_reg_mr(rdma->pd,
1191 chunk_start, len,
1192 (rkey ? (IBV_ACCESS_LOCAL_WRITE |
1193 IBV_ACCESS_REMOTE_WRITE) : 0));
1194
1195 if (!block->pmr[chunk]) {
1196 perror("Failed to register chunk!");
1197 fprintf(stderr, "Chunk details: block: %d chunk index %d"
1198 " start %" PRIuPTR " end %" PRIuPTR
1199 " host %" PRIuPTR
1200 " local %" PRIuPTR " registrations: %d\n",
1201 block->index, chunk, (uintptr_t)chunk_start,
1202 (uintptr_t)chunk_end, host_addr,
1203 (uintptr_t)block->local_host_addr,
1204 rdma->total_registrations);
1205 return -1;
1206 }
1207 rdma->total_registrations++;
1208 }
1209
1210 if (lkey) {
1211 *lkey = block->pmr[chunk]->lkey;
1212 }
1213 if (rkey) {
1214 *rkey = block->pmr[chunk]->rkey;
1215 }
1216 return 0;
1217 }
1218
1219 /*
1220 * Register (at connection time) the memory used for control
1221 * channel messages.
1222 */
1223 static int qemu_rdma_reg_control(RDMAContext *rdma, int idx)
1224 {
1225 rdma->wr_data[idx].control_mr = ibv_reg_mr(rdma->pd,
1226 rdma->wr_data[idx].control, RDMA_CONTROL_MAX_BUFFER,
1227 IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
1228 if (rdma->wr_data[idx].control_mr) {
1229 rdma->total_registrations++;
1230 return 0;
1231 }
1232 error_report("qemu_rdma_reg_control failed");
1233 return -1;
1234 }
1235
1236 const char *print_wrid(int wrid)
1237 {
1238 if (wrid >= RDMA_WRID_RECV_CONTROL) {
1239 return wrid_desc[RDMA_WRID_RECV_CONTROL];
1240 }
1241 return wrid_desc[wrid];
1242 }
1243
1244 /*
1245 * RDMA requires memory registration (mlock/pinning), but this is not good for
1246 * overcommitment.
1247 *
1248 * In preparation for the future where LRU information or workload-specific
1249 * writable writable working set memory access behavior is available to QEMU
1250 * it would be nice to have in place the ability to UN-register/UN-pin
1251 * particular memory regions from the RDMA hardware when it is determine that
1252 * those regions of memory will likely not be accessed again in the near future.
1253 *
1254 * While we do not yet have such information right now, the following
1255 * compile-time option allows us to perform a non-optimized version of this
1256 * behavior.
1257 *
1258 * By uncommenting this option, you will cause *all* RDMA transfers to be
1259 * unregistered immediately after the transfer completes on both sides of the
1260 * connection. This has no effect in 'rdma-pin-all' mode, only regular mode.
1261 *
1262 * This will have a terrible impact on migration performance, so until future
1263 * workload information or LRU information is available, do not attempt to use
1264 * this feature except for basic testing.
1265 */
1266 //#define RDMA_UNREGISTRATION_EXAMPLE
1267
1268 /*
1269 * Perform a non-optimized memory unregistration after every transfer
1270 * for demonstration purposes, only if pin-all is not requested.
1271 *
1272 * Potential optimizations:
1273 * 1. Start a new thread to run this function continuously
1274 - for bit clearing
1275 - and for receipt of unregister messages
1276 * 2. Use an LRU.
1277 * 3. Use workload hints.
1278 */
1279 static int qemu_rdma_unregister_waiting(RDMAContext *rdma)
1280 {
1281 while (rdma->unregistrations[rdma->unregister_current]) {
1282 int ret;
1283 uint64_t wr_id = rdma->unregistrations[rdma->unregister_current];
1284 uint64_t chunk =
1285 (wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
1286 uint64_t index =
1287 (wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
1288 RDMALocalBlock *block =
1289 &(rdma->local_ram_blocks.block[index]);
1290 RDMARegister reg = { .current_index = index };
1291 RDMAControlHeader resp = { .type = RDMA_CONTROL_UNREGISTER_FINISHED,
1292 };
1293 RDMAControlHeader head = { .len = sizeof(RDMARegister),
1294 .type = RDMA_CONTROL_UNREGISTER_REQUEST,
1295 .repeat = 1,
1296 };
1297
1298 trace_qemu_rdma_unregister_waiting_proc(chunk,
1299 rdma->unregister_current);
1300
1301 rdma->unregistrations[rdma->unregister_current] = 0;
1302 rdma->unregister_current++;
1303
1304 if (rdma->unregister_current == RDMA_SIGNALED_SEND_MAX) {
1305 rdma->unregister_current = 0;
1306 }
1307
1308
1309 /*
1310 * Unregistration is speculative (because migration is single-threaded
1311 * and we cannot break the protocol's inifinband message ordering).
1312 * Thus, if the memory is currently being used for transmission,
1313 * then abort the attempt to unregister and try again
1314 * later the next time a completion is received for this memory.
1315 */
1316 clear_bit(chunk, block->unregister_bitmap);
1317
1318 if (test_bit(chunk, block->transit_bitmap)) {
1319 trace_qemu_rdma_unregister_waiting_inflight(chunk);
1320 continue;
1321 }
1322
1323 trace_qemu_rdma_unregister_waiting_send(chunk);
1324
1325 ret = ibv_dereg_mr(block->pmr[chunk]);
1326 block->pmr[chunk] = NULL;
1327 block->remote_keys[chunk] = 0;
1328
1329 if (ret != 0) {
1330 perror("unregistration chunk failed");
1331 return -ret;
1332 }
1333 rdma->total_registrations--;
1334
1335 reg.key.chunk = chunk;
1336 register_to_network(rdma, &reg);
1337 ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
1338 &resp, NULL, NULL);
1339 if (ret < 0) {
1340 return ret;
1341 }
1342
1343 trace_qemu_rdma_unregister_waiting_complete(chunk);
1344 }
1345
1346 return 0;
1347 }
1348
1349 static uint64_t qemu_rdma_make_wrid(uint64_t wr_id, uint64_t index,
1350 uint64_t chunk)
1351 {
1352 uint64_t result = wr_id & RDMA_WRID_TYPE_MASK;
1353
1354 result |= (index << RDMA_WRID_BLOCK_SHIFT);
1355 result |= (chunk << RDMA_WRID_CHUNK_SHIFT);
1356
1357 return result;
1358 }
1359
1360 /*
1361 * Set bit for unregistration in the next iteration.
1362 * We cannot transmit right here, but will unpin later.
1363 */
1364 static void qemu_rdma_signal_unregister(RDMAContext *rdma, uint64_t index,
1365 uint64_t chunk, uint64_t wr_id)
1366 {
1367 if (rdma->unregistrations[rdma->unregister_next] != 0) {
1368 error_report("rdma migration: queue is full");
1369 } else {
1370 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
1371
1372 if (!test_and_set_bit(chunk, block->unregister_bitmap)) {
1373 trace_qemu_rdma_signal_unregister_append(chunk,
1374 rdma->unregister_next);
1375
1376 rdma->unregistrations[rdma->unregister_next++] =
1377 qemu_rdma_make_wrid(wr_id, index, chunk);
1378
1379 if (rdma->unregister_next == RDMA_SIGNALED_SEND_MAX) {
1380 rdma->unregister_next = 0;
1381 }
1382 } else {
1383 trace_qemu_rdma_signal_unregister_already(chunk);
1384 }
1385 }
1386 }
1387
1388 /*
1389 * Consult the connection manager to see a work request
1390 * (of any kind) has completed.
1391 * Return the work request ID that completed.
1392 */
1393 static uint64_t qemu_rdma_poll(RDMAContext *rdma, uint64_t *wr_id_out,
1394 uint32_t *byte_len)
1395 {
1396 int ret;
1397 struct ibv_wc wc;
1398 uint64_t wr_id;
1399
1400 ret = ibv_poll_cq(rdma->cq, 1, &wc);
1401
1402 if (!ret) {
1403 *wr_id_out = RDMA_WRID_NONE;
1404 return 0;
1405 }
1406
1407 if (ret < 0) {
1408 error_report("ibv_poll_cq return %d", ret);
1409 return ret;
1410 }
1411
1412 wr_id = wc.wr_id & RDMA_WRID_TYPE_MASK;
1413
1414 if (wc.status != IBV_WC_SUCCESS) {
1415 fprintf(stderr, "ibv_poll_cq wc.status=%d %s!\n",
1416 wc.status, ibv_wc_status_str(wc.status));
1417 fprintf(stderr, "ibv_poll_cq wrid=%s!\n", wrid_desc[wr_id]);
1418
1419 return -1;
1420 }
1421
1422 if (rdma->control_ready_expected &&
1423 (wr_id >= RDMA_WRID_RECV_CONTROL)) {
1424 trace_qemu_rdma_poll_recv(wrid_desc[RDMA_WRID_RECV_CONTROL],
1425 wr_id - RDMA_WRID_RECV_CONTROL, wr_id, rdma->nb_sent);
1426 rdma->control_ready_expected = 0;
1427 }
1428
1429 if (wr_id == RDMA_WRID_RDMA_WRITE) {
1430 uint64_t chunk =
1431 (wc.wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
1432 uint64_t index =
1433 (wc.wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
1434 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
1435
1436 trace_qemu_rdma_poll_write(print_wrid(wr_id), wr_id, rdma->nb_sent,
1437 index, chunk, block->local_host_addr,
1438 (void *)(uintptr_t)block->remote_host_addr);
1439
1440 clear_bit(chunk, block->transit_bitmap);
1441
1442 if (rdma->nb_sent > 0) {
1443 rdma->nb_sent--;
1444 }
1445
1446 if (!rdma->pin_all) {
1447 /*
1448 * FYI: If one wanted to signal a specific chunk to be unregistered
1449 * using LRU or workload-specific information, this is the function
1450 * you would call to do so. That chunk would then get asynchronously
1451 * unregistered later.
1452 */
1453 #ifdef RDMA_UNREGISTRATION_EXAMPLE
1454 qemu_rdma_signal_unregister(rdma, index, chunk, wc.wr_id);
1455 #endif
1456 }
1457 } else {
1458 trace_qemu_rdma_poll_other(print_wrid(wr_id), wr_id, rdma->nb_sent);
1459 }
1460
1461 *wr_id_out = wc.wr_id;
1462 if (byte_len) {
1463 *byte_len = wc.byte_len;
1464 }
1465
1466 return 0;
1467 }
1468
1469 /*
1470 * Block until the next work request has completed.
1471 *
1472 * First poll to see if a work request has already completed,
1473 * otherwise block.
1474 *
1475 * If we encounter completed work requests for IDs other than
1476 * the one we're interested in, then that's generally an error.
1477 *
1478 * The only exception is actual RDMA Write completions. These
1479 * completions only need to be recorded, but do not actually
1480 * need further processing.
1481 */
1482 static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested,
1483 uint32_t *byte_len)
1484 {
1485 int num_cq_events = 0, ret = 0;
1486 struct ibv_cq *cq;
1487 void *cq_ctx;
1488 uint64_t wr_id = RDMA_WRID_NONE, wr_id_in;
1489
1490 if (ibv_req_notify_cq(rdma->cq, 0)) {
1491 return -1;
1492 }
1493 /* poll cq first */
1494 while (wr_id != wrid_requested) {
1495 ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len);
1496 if (ret < 0) {
1497 return ret;
1498 }
1499
1500 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
1501
1502 if (wr_id == RDMA_WRID_NONE) {
1503 break;
1504 }
1505 if (wr_id != wrid_requested) {
1506 trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested),
1507 wrid_requested, print_wrid(wr_id), wr_id);
1508 }
1509 }
1510
1511 if (wr_id == wrid_requested) {
1512 return 0;
1513 }
1514
1515 while (1) {
1516 /*
1517 * Coroutine doesn't start until migration_fd_process_incoming()
1518 * so don't yield unless we know we're running inside of a coroutine.
1519 */
1520 if (rdma->migration_started_on_destination) {
1521 yield_until_fd_readable(rdma->comp_channel->fd);
1522 }
1523
1524 if (ibv_get_cq_event(rdma->comp_channel, &cq, &cq_ctx)) {
1525 perror("ibv_get_cq_event");
1526 goto err_block_for_wrid;
1527 }
1528
1529 num_cq_events++;
1530
1531 if (ibv_req_notify_cq(cq, 0)) {
1532 goto err_block_for_wrid;
1533 }
1534
1535 while (wr_id != wrid_requested) {
1536 ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len);
1537 if (ret < 0) {
1538 goto err_block_for_wrid;
1539 }
1540
1541 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
1542
1543 if (wr_id == RDMA_WRID_NONE) {
1544 break;
1545 }
1546 if (wr_id != wrid_requested) {
1547 trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested),
1548 wrid_requested, print_wrid(wr_id), wr_id);
1549 }
1550 }
1551
1552 if (wr_id == wrid_requested) {
1553 goto success_block_for_wrid;
1554 }
1555 }
1556
1557 success_block_for_wrid:
1558 if (num_cq_events) {
1559 ibv_ack_cq_events(cq, num_cq_events);
1560 }
1561 return 0;
1562
1563 err_block_for_wrid:
1564 if (num_cq_events) {
1565 ibv_ack_cq_events(cq, num_cq_events);
1566 }
1567 return ret;
1568 }
1569
1570 /*
1571 * Post a SEND message work request for the control channel
1572 * containing some data and block until the post completes.
1573 */
1574 static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf,
1575 RDMAControlHeader *head)
1576 {
1577 int ret = 0;
1578 RDMAWorkRequestData *wr = &rdma->wr_data[RDMA_WRID_CONTROL];
1579 struct ibv_send_wr *bad_wr;
1580 struct ibv_sge sge = {
1581 .addr = (uintptr_t)(wr->control),
1582 .length = head->len + sizeof(RDMAControlHeader),
1583 .lkey = wr->control_mr->lkey,
1584 };
1585 struct ibv_send_wr send_wr = {
1586 .wr_id = RDMA_WRID_SEND_CONTROL,
1587 .opcode = IBV_WR_SEND,
1588 .send_flags = IBV_SEND_SIGNALED,
1589 .sg_list = &sge,
1590 .num_sge = 1,
1591 };
1592
1593 trace_qemu_rdma_post_send_control(control_desc[head->type]);
1594
1595 /*
1596 * We don't actually need to do a memcpy() in here if we used
1597 * the "sge" properly, but since we're only sending control messages
1598 * (not RAM in a performance-critical path), then its OK for now.
1599 *
1600 * The copy makes the RDMAControlHeader simpler to manipulate
1601 * for the time being.
1602 */
1603 assert(head->len <= RDMA_CONTROL_MAX_BUFFER - sizeof(*head));
1604 memcpy(wr->control, head, sizeof(RDMAControlHeader));
1605 control_to_network((void *) wr->control);
1606
1607 if (buf) {
1608 memcpy(wr->control + sizeof(RDMAControlHeader), buf, head->len);
1609 }
1610
1611
1612 ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
1613
1614 if (ret > 0) {
1615 error_report("Failed to use post IB SEND for control");
1616 return -ret;
1617 }
1618
1619 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_SEND_CONTROL, NULL);
1620 if (ret < 0) {
1621 error_report("rdma migration: send polling control error");
1622 }
1623
1624 return ret;
1625 }
1626
1627 /*
1628 * Post a RECV work request in anticipation of some future receipt
1629 * of data on the control channel.
1630 */
1631 static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx)
1632 {
1633 struct ibv_recv_wr *bad_wr;
1634 struct ibv_sge sge = {
1635 .addr = (uintptr_t)(rdma->wr_data[idx].control),
1636 .length = RDMA_CONTROL_MAX_BUFFER,
1637 .lkey = rdma->wr_data[idx].control_mr->lkey,
1638 };
1639
1640 struct ibv_recv_wr recv_wr = {
1641 .wr_id = RDMA_WRID_RECV_CONTROL + idx,
1642 .sg_list = &sge,
1643 .num_sge = 1,
1644 };
1645
1646
1647 if (ibv_post_recv(rdma->qp, &recv_wr, &bad_wr)) {
1648 return -1;
1649 }
1650
1651 return 0;
1652 }
1653
1654 /*
1655 * Block and wait for a RECV control channel message to arrive.
1656 */
1657 static int qemu_rdma_exchange_get_response(RDMAContext *rdma,
1658 RDMAControlHeader *head, int expecting, int idx)
1659 {
1660 uint32_t byte_len;
1661 int ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RECV_CONTROL + idx,
1662 &byte_len);
1663
1664 if (ret < 0) {
1665 error_report("rdma migration: recv polling control error!");
1666 return ret;
1667 }
1668
1669 network_to_control((void *) rdma->wr_data[idx].control);
1670 memcpy(head, rdma->wr_data[idx].control, sizeof(RDMAControlHeader));
1671
1672 trace_qemu_rdma_exchange_get_response_start(control_desc[expecting]);
1673
1674 if (expecting == RDMA_CONTROL_NONE) {
1675 trace_qemu_rdma_exchange_get_response_none(control_desc[head->type],
1676 head->type);
1677 } else if (head->type != expecting || head->type == RDMA_CONTROL_ERROR) {
1678 error_report("Was expecting a %s (%d) control message"
1679 ", but got: %s (%d), length: %d",
1680 control_desc[expecting], expecting,
1681 control_desc[head->type], head->type, head->len);
1682 if (head->type == RDMA_CONTROL_ERROR) {
1683 rdma->received_error = true;
1684 }
1685 return -EIO;
1686 }
1687 if (head->len > RDMA_CONTROL_MAX_BUFFER - sizeof(*head)) {
1688 error_report("too long length: %d", head->len);
1689 return -EINVAL;
1690 }
1691 if (sizeof(*head) + head->len != byte_len) {
1692 error_report("Malformed length: %d byte_len %d", head->len, byte_len);
1693 return -EINVAL;
1694 }
1695
1696 return 0;
1697 }
1698
1699 /*
1700 * When a RECV work request has completed, the work request's
1701 * buffer is pointed at the header.
1702 *
1703 * This will advance the pointer to the data portion
1704 * of the control message of the work request's buffer that
1705 * was populated after the work request finished.
1706 */
1707 static void qemu_rdma_move_header(RDMAContext *rdma, int idx,
1708 RDMAControlHeader *head)
1709 {
1710 rdma->wr_data[idx].control_len = head->len;
1711 rdma->wr_data[idx].control_curr =
1712 rdma->wr_data[idx].control + sizeof(RDMAControlHeader);
1713 }
1714
1715 /*
1716 * This is an 'atomic' high-level operation to deliver a single, unified
1717 * control-channel message.
1718 *
1719 * Additionally, if the user is expecting some kind of reply to this message,
1720 * they can request a 'resp' response message be filled in by posting an
1721 * additional work request on behalf of the user and waiting for an additional
1722 * completion.
1723 *
1724 * The extra (optional) response is used during registration to us from having
1725 * to perform an *additional* exchange of message just to provide a response by
1726 * instead piggy-backing on the acknowledgement.
1727 */
1728 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
1729 uint8_t *data, RDMAControlHeader *resp,
1730 int *resp_idx,
1731 int (*callback)(RDMAContext *rdma))
1732 {
1733 int ret = 0;
1734
1735 /*
1736 * Wait until the dest is ready before attempting to deliver the message
1737 * by waiting for a READY message.
1738 */
1739 if (rdma->control_ready_expected) {
1740 RDMAControlHeader resp;
1741 ret = qemu_rdma_exchange_get_response(rdma,
1742 &resp, RDMA_CONTROL_READY, RDMA_WRID_READY);
1743 if (ret < 0) {
1744 return ret;
1745 }
1746 }
1747
1748 /*
1749 * If the user is expecting a response, post a WR in anticipation of it.
1750 */
1751 if (resp) {
1752 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_DATA);
1753 if (ret) {
1754 error_report("rdma migration: error posting"
1755 " extra control recv for anticipated result!");
1756 return ret;
1757 }
1758 }
1759
1760 /*
1761 * Post a WR to replace the one we just consumed for the READY message.
1762 */
1763 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
1764 if (ret) {
1765 error_report("rdma migration: error posting first control recv!");
1766 return ret;
1767 }
1768
1769 /*
1770 * Deliver the control message that was requested.
1771 */
1772 ret = qemu_rdma_post_send_control(rdma, data, head);
1773
1774 if (ret < 0) {
1775 error_report("Failed to send control buffer!");
1776 return ret;
1777 }
1778
1779 /*
1780 * If we're expecting a response, block and wait for it.
1781 */
1782 if (resp) {
1783 if (callback) {
1784 trace_qemu_rdma_exchange_send_issue_callback();
1785 ret = callback(rdma);
1786 if (ret < 0) {
1787 return ret;
1788 }
1789 }
1790
1791 trace_qemu_rdma_exchange_send_waiting(control_desc[resp->type]);
1792 ret = qemu_rdma_exchange_get_response(rdma, resp,
1793 resp->type, RDMA_WRID_DATA);
1794
1795 if (ret < 0) {
1796 return ret;
1797 }
1798
1799 qemu_rdma_move_header(rdma, RDMA_WRID_DATA, resp);
1800 if (resp_idx) {
1801 *resp_idx = RDMA_WRID_DATA;
1802 }
1803 trace_qemu_rdma_exchange_send_received(control_desc[resp->type]);
1804 }
1805
1806 rdma->control_ready_expected = 1;
1807
1808 return 0;
1809 }
1810
1811 /*
1812 * This is an 'atomic' high-level operation to receive a single, unified
1813 * control-channel message.
1814 */
1815 static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader *head,
1816 int expecting)
1817 {
1818 RDMAControlHeader ready = {
1819 .len = 0,
1820 .type = RDMA_CONTROL_READY,
1821 .repeat = 1,
1822 };
1823 int ret;
1824
1825 /*
1826 * Inform the source that we're ready to receive a message.
1827 */
1828 ret = qemu_rdma_post_send_control(rdma, NULL, &ready);
1829
1830 if (ret < 0) {
1831 error_report("Failed to send control buffer!");
1832 return ret;
1833 }
1834
1835 /*
1836 * Block and wait for the message.
1837 */
1838 ret = qemu_rdma_exchange_get_response(rdma, head,
1839 expecting, RDMA_WRID_READY);
1840
1841 if (ret < 0) {
1842 return ret;
1843 }
1844
1845 qemu_rdma_move_header(rdma, RDMA_WRID_READY, head);
1846
1847 /*
1848 * Post a new RECV work request to replace the one we just consumed.
1849 */
1850 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
1851 if (ret) {
1852 error_report("rdma migration: error posting second control recv!");
1853 return ret;
1854 }
1855
1856 return 0;
1857 }
1858
1859 /*
1860 * Write an actual chunk of memory using RDMA.
1861 *
1862 * If we're using dynamic registration on the dest-side, we have to
1863 * send a registration command first.
1864 */
1865 static int qemu_rdma_write_one(QEMUFile *f, RDMAContext *rdma,
1866 int current_index, uint64_t current_addr,
1867 uint64_t length)
1868 {
1869 struct ibv_sge sge;
1870 struct ibv_send_wr send_wr = { 0 };
1871 struct ibv_send_wr *bad_wr;
1872 int reg_result_idx, ret, count = 0;
1873 uint64_t chunk, chunks;
1874 uint8_t *chunk_start, *chunk_end;
1875 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[current_index]);
1876 RDMARegister reg;
1877 RDMARegisterResult *reg_result;
1878 RDMAControlHeader resp = { .type = RDMA_CONTROL_REGISTER_RESULT };
1879 RDMAControlHeader head = { .len = sizeof(RDMARegister),
1880 .type = RDMA_CONTROL_REGISTER_REQUEST,
1881 .repeat = 1,
1882 };
1883
1884 retry:
1885 sge.addr = (uintptr_t)(block->local_host_addr +
1886 (current_addr - block->offset));
1887 sge.length = length;
1888
1889 chunk = ram_chunk_index(block->local_host_addr,
1890 (uint8_t *)(uintptr_t)sge.addr);
1891 chunk_start = ram_chunk_start(block, chunk);
1892
1893 if (block->is_ram_block) {
1894 chunks = length / (1UL << RDMA_REG_CHUNK_SHIFT);
1895
1896 if (chunks && ((length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
1897 chunks--;
1898 }
1899 } else {
1900 chunks = block->length / (1UL << RDMA_REG_CHUNK_SHIFT);
1901
1902 if (chunks && ((block->length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
1903 chunks--;
1904 }
1905 }
1906
1907 trace_qemu_rdma_write_one_top(chunks + 1,
1908 (chunks + 1) *
1909 (1UL << RDMA_REG_CHUNK_SHIFT) / 1024 / 1024);
1910
1911 chunk_end = ram_chunk_end(block, chunk + chunks);
1912
1913 if (!rdma->pin_all) {
1914 #ifdef RDMA_UNREGISTRATION_EXAMPLE
1915 qemu_rdma_unregister_waiting(rdma);
1916 #endif
1917 }
1918
1919 while (test_bit(chunk, block->transit_bitmap)) {
1920 (void)count;
1921 trace_qemu_rdma_write_one_block(count++, current_index, chunk,
1922 sge.addr, length, rdma->nb_sent, block->nb_chunks);
1923
1924 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
1925
1926 if (ret < 0) {
1927 error_report("Failed to Wait for previous write to complete "
1928 "block %d chunk %" PRIu64
1929 " current %" PRIu64 " len %" PRIu64 " %d",
1930 current_index, chunk, sge.addr, length, rdma->nb_sent);
1931 return ret;
1932 }
1933 }
1934
1935 if (!rdma->pin_all || !block->is_ram_block) {
1936 if (!block->remote_keys[chunk]) {
1937 /*
1938 * This chunk has not yet been registered, so first check to see
1939 * if the entire chunk is zero. If so, tell the other size to
1940 * memset() + madvise() the entire chunk without RDMA.
1941 */
1942
1943 if (buffer_is_zero((void *)(uintptr_t)sge.addr, length)) {
1944 RDMACompress comp = {
1945 .offset = current_addr,
1946 .value = 0,
1947 .block_idx = current_index,
1948 .length = length,
1949 };
1950
1951 head.len = sizeof(comp);
1952 head.type = RDMA_CONTROL_COMPRESS;
1953
1954 trace_qemu_rdma_write_one_zero(chunk, sge.length,
1955 current_index, current_addr);
1956
1957 compress_to_network(rdma, &comp);
1958 ret = qemu_rdma_exchange_send(rdma, &head,
1959 (uint8_t *) &comp, NULL, NULL, NULL);
1960
1961 if (ret < 0) {
1962 return -EIO;
1963 }
1964
1965 acct_update_position(f, sge.length, true);
1966
1967 return 1;
1968 }
1969
1970 /*
1971 * Otherwise, tell other side to register.
1972 */
1973 reg.current_index = current_index;
1974 if (block->is_ram_block) {
1975 reg.key.current_addr = current_addr;
1976 } else {
1977 reg.key.chunk = chunk;
1978 }
1979 reg.chunks = chunks;
1980
1981 trace_qemu_rdma_write_one_sendreg(chunk, sge.length, current_index,
1982 current_addr);
1983
1984 register_to_network(rdma, &reg);
1985 ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
1986 &resp, &reg_result_idx, NULL);
1987 if (ret < 0) {
1988 return ret;
1989 }
1990
1991 /* try to overlap this single registration with the one we sent. */
1992 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
1993 &sge.lkey, NULL, chunk,
1994 chunk_start, chunk_end)) {
1995 error_report("cannot get lkey");
1996 return -EINVAL;
1997 }
1998
1999 reg_result = (RDMARegisterResult *)
2000 rdma->wr_data[reg_result_idx].control_curr;
2001
2002 network_to_result(reg_result);
2003
2004 trace_qemu_rdma_write_one_recvregres(block->remote_keys[chunk],
2005 reg_result->rkey, chunk);
2006
2007 block->remote_keys[chunk] = reg_result->rkey;
2008 block->remote_host_addr = reg_result->host_addr;
2009 } else {
2010 /* already registered before */
2011 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
2012 &sge.lkey, NULL, chunk,
2013 chunk_start, chunk_end)) {
2014 error_report("cannot get lkey!");
2015 return -EINVAL;
2016 }
2017 }
2018
2019 send_wr.wr.rdma.rkey = block->remote_keys[chunk];
2020 } else {
2021 send_wr.wr.rdma.rkey = block->remote_rkey;
2022
2023 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
2024 &sge.lkey, NULL, chunk,
2025 chunk_start, chunk_end)) {
2026 error_report("cannot get lkey!");
2027 return -EINVAL;
2028 }
2029 }
2030
2031 /*
2032 * Encode the ram block index and chunk within this wrid.
2033 * We will use this information at the time of completion
2034 * to figure out which bitmap to check against and then which
2035 * chunk in the bitmap to look for.
2036 */
2037 send_wr.wr_id = qemu_rdma_make_wrid(RDMA_WRID_RDMA_WRITE,
2038 current_index, chunk);
2039
2040 send_wr.opcode = IBV_WR_RDMA_WRITE;
2041 send_wr.send_flags = IBV_SEND_SIGNALED;
2042 send_wr.sg_list = &sge;
2043 send_wr.num_sge = 1;
2044 send_wr.wr.rdma.remote_addr = block->remote_host_addr +
2045 (current_addr - block->offset);
2046
2047 trace_qemu_rdma_write_one_post(chunk, sge.addr, send_wr.wr.rdma.remote_addr,
2048 sge.length);
2049
2050 /*
2051 * ibv_post_send() does not return negative error numbers,
2052 * per the specification they are positive - no idea why.
2053 */
2054 ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
2055
2056 if (ret == ENOMEM) {
2057 trace_qemu_rdma_write_one_queue_full();
2058 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
2059 if (ret < 0) {
2060 error_report("rdma migration: failed to make "
2061 "room in full send queue! %d", ret);
2062 return ret;
2063 }
2064
2065 goto retry;
2066
2067 } else if (ret > 0) {
2068 perror("rdma migration: post rdma write failed");
2069 return -ret;
2070 }
2071
2072 set_bit(chunk, block->transit_bitmap);
2073 acct_update_position(f, sge.length, false);
2074 rdma->total_writes++;
2075
2076 return 0;
2077 }
2078
2079 /*
2080 * Push out any unwritten RDMA operations.
2081 *
2082 * We support sending out multiple chunks at the same time.
2083 * Not all of them need to get signaled in the completion queue.
2084 */
2085 static int qemu_rdma_write_flush(QEMUFile *f, RDMAContext *rdma)
2086 {
2087 int ret;
2088
2089 if (!rdma->current_length) {
2090 return 0;
2091 }
2092
2093 ret = qemu_rdma_write_one(f, rdma,
2094 rdma->current_index, rdma->current_addr, rdma->current_length);
2095
2096 if (ret < 0) {
2097 return ret;
2098 }
2099
2100 if (ret == 0) {
2101 rdma->nb_sent++;
2102 trace_qemu_rdma_write_flush(rdma->nb_sent);
2103 }
2104
2105 rdma->current_length = 0;
2106 rdma->current_addr = 0;
2107
2108 return 0;
2109 }
2110
2111 static inline int qemu_rdma_buffer_mergable(RDMAContext *rdma,
2112 uint64_t offset, uint64_t len)
2113 {
2114 RDMALocalBlock *block;
2115 uint8_t *host_addr;
2116 uint8_t *chunk_end;
2117
2118 if (rdma->current_index < 0) {
2119 return 0;
2120 }
2121
2122 if (rdma->current_chunk < 0) {
2123 return 0;
2124 }
2125
2126 block = &(rdma->local_ram_blocks.block[rdma->current_index]);
2127 host_addr = block->local_host_addr + (offset - block->offset);
2128 chunk_end = ram_chunk_end(block, rdma->current_chunk);
2129
2130 if (rdma->current_length == 0) {
2131 return 0;
2132 }
2133
2134 /*
2135 * Only merge into chunk sequentially.
2136 */
2137 if (offset != (rdma->current_addr + rdma->current_length)) {
2138 return 0;
2139 }
2140
2141 if (offset < block->offset) {
2142 return 0;
2143 }
2144
2145 if ((offset + len) > (block->offset + block->length)) {
2146 return 0;
2147 }
2148
2149 if ((host_addr + len) > chunk_end) {
2150 return 0;
2151 }
2152
2153 return 1;
2154 }
2155
2156 /*
2157 * We're not actually writing here, but doing three things:
2158 *
2159 * 1. Identify the chunk the buffer belongs to.
2160 * 2. If the chunk is full or the buffer doesn't belong to the current
2161 * chunk, then start a new chunk and flush() the old chunk.
2162 * 3. To keep the hardware busy, we also group chunks into batches
2163 * and only require that a batch gets acknowledged in the completion
2164 * qeueue instead of each individual chunk.
2165 */
2166 static int qemu_rdma_write(QEMUFile *f, RDMAContext *rdma,
2167 uint64_t block_offset, uint64_t offset,
2168 uint64_t len)
2169 {
2170 uint64_t current_addr = block_offset + offset;
2171 uint64_t index = rdma->current_index;
2172 uint64_t chunk = rdma->current_chunk;
2173 int ret;
2174
2175 /* If we cannot merge it, we flush the current buffer first. */
2176 if (!qemu_rdma_buffer_mergable(rdma, current_addr, len)) {
2177 ret = qemu_rdma_write_flush(f, rdma);
2178 if (ret) {
2179 return ret;
2180 }
2181 rdma->current_length = 0;
2182 rdma->current_addr = current_addr;
2183
2184 ret = qemu_rdma_search_ram_block(rdma, block_offset,
2185 offset, len, &index, &chunk);
2186 if (ret) {
2187 error_report("ram block search failed");
2188 return ret;
2189 }
2190 rdma->current_index = index;
2191 rdma->current_chunk = chunk;
2192 }
2193
2194 /* merge it */
2195 rdma->current_length += len;
2196
2197 /* flush it if buffer is too large */
2198 if (rdma->current_length >= RDMA_MERGE_MAX) {
2199 return qemu_rdma_write_flush(f, rdma);
2200 }
2201
2202 return 0;
2203 }
2204
2205 static void qemu_rdma_cleanup(RDMAContext *rdma)
2206 {
2207 struct rdma_cm_event *cm_event;
2208 int ret, idx;
2209
2210 if (rdma->cm_id && rdma->connected) {
2211 if (rdma->error_state && !rdma->received_error) {
2212 RDMAControlHeader head = { .len = 0,
2213 .type = RDMA_CONTROL_ERROR,
2214 .repeat = 1,
2215 };
2216 error_report("Early error. Sending error.");
2217 qemu_rdma_post_send_control(rdma, NULL, &head);
2218 }
2219
2220 ret = rdma_disconnect(rdma->cm_id);
2221 if (!ret) {
2222 trace_qemu_rdma_cleanup_waiting_for_disconnect();
2223 ret = rdma_get_cm_event(rdma->channel, &cm_event);
2224 if (!ret) {
2225 rdma_ack_cm_event(cm_event);
2226 }
2227 }
2228 trace_qemu_rdma_cleanup_disconnect();
2229 rdma->connected = false;
2230 }
2231
2232 g_free(rdma->dest_blocks);
2233 rdma->dest_blocks = NULL;
2234
2235 for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2236 if (rdma->wr_data[idx].control_mr) {
2237 rdma->total_registrations--;
2238 ibv_dereg_mr(rdma->wr_data[idx].control_mr);
2239 }
2240 rdma->wr_data[idx].control_mr = NULL;
2241 }
2242
2243 if (rdma->local_ram_blocks.block) {
2244 while (rdma->local_ram_blocks.nb_blocks) {
2245 rdma_delete_block(rdma, &rdma->local_ram_blocks.block[0]);
2246 }
2247 }
2248
2249 if (rdma->qp) {
2250 rdma_destroy_qp(rdma->cm_id);
2251 rdma->qp = NULL;
2252 }
2253 if (rdma->cq) {
2254 ibv_destroy_cq(rdma->cq);
2255 rdma->cq = NULL;
2256 }
2257 if (rdma->comp_channel) {
2258 ibv_destroy_comp_channel(rdma->comp_channel);
2259 rdma->comp_channel = NULL;
2260 }
2261 if (rdma->pd) {
2262 ibv_dealloc_pd(rdma->pd);
2263 rdma->pd = NULL;
2264 }
2265 if (rdma->cm_id) {
2266 rdma_destroy_id(rdma->cm_id);
2267 rdma->cm_id = NULL;
2268 }
2269 if (rdma->listen_id) {
2270 rdma_destroy_id(rdma->listen_id);
2271 rdma->listen_id = NULL;
2272 }
2273 if (rdma->channel) {
2274 rdma_destroy_event_channel(rdma->channel);
2275 rdma->channel = NULL;
2276 }
2277 g_free(rdma->host);
2278 rdma->host = NULL;
2279 }
2280
2281
2282 static int qemu_rdma_source_init(RDMAContext *rdma, bool pin_all, Error **errp)
2283 {
2284 int ret, idx;
2285 Error *local_err = NULL, **temp = &local_err;
2286
2287 /*
2288 * Will be validated against destination's actual capabilities
2289 * after the connect() completes.
2290 */
2291 rdma->pin_all = pin_all;
2292
2293 ret = qemu_rdma_resolve_host(rdma, temp);
2294 if (ret) {
2295 goto err_rdma_source_init;
2296 }
2297
2298 ret = qemu_rdma_alloc_pd_cq(rdma);
2299 if (ret) {
2300 ERROR(temp, "rdma migration: error allocating pd and cq! Your mlock()"
2301 " limits may be too low. Please check $ ulimit -a # and "
2302 "search for 'ulimit -l' in the output");
2303 goto err_rdma_source_init;
2304 }
2305
2306 ret = qemu_rdma_alloc_qp(rdma);
2307 if (ret) {
2308 ERROR(temp, "rdma migration: error allocating qp!");
2309 goto err_rdma_source_init;
2310 }
2311
2312 ret = qemu_rdma_init_ram_blocks(rdma);
2313 if (ret) {
2314 ERROR(temp, "rdma migration: error initializing ram blocks!");
2315 goto err_rdma_source_init;
2316 }
2317
2318 /* Build the hash that maps from offset to RAMBlock */
2319 rdma->blockmap = g_hash_table_new(g_direct_hash, g_direct_equal);
2320 for (idx = 0; idx < rdma->local_ram_blocks.nb_blocks; idx++) {
2321 g_hash_table_insert(rdma->blockmap,
2322 (void *)(uintptr_t)rdma->local_ram_blocks.block[idx].offset,
2323 &rdma->local_ram_blocks.block[idx]);
2324 }
2325
2326 for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2327 ret = qemu_rdma_reg_control(rdma, idx);
2328 if (ret) {
2329 ERROR(temp, "rdma migration: error registering %d control!",
2330 idx);
2331 goto err_rdma_source_init;
2332 }
2333 }
2334
2335 return 0;
2336
2337 err_rdma_source_init:
2338 error_propagate(errp, local_err);
2339 qemu_rdma_cleanup(rdma);
2340 return -1;
2341 }
2342
2343 static int qemu_rdma_connect(RDMAContext *rdma, Error **errp)
2344 {
2345 RDMACapabilities cap = {
2346 .version = RDMA_CONTROL_VERSION_CURRENT,
2347 .flags = 0,
2348 };
2349 struct rdma_conn_param conn_param = { .initiator_depth = 2,
2350 .retry_count = 5,
2351 .private_data = &cap,
2352 .private_data_len = sizeof(cap),
2353 };
2354 struct rdma_cm_event *cm_event;
2355 int ret;
2356
2357 /*
2358 * Only negotiate the capability with destination if the user
2359 * on the source first requested the capability.
2360 */
2361 if (rdma->pin_all) {
2362 trace_qemu_rdma_connect_pin_all_requested();
2363 cap.flags |= RDMA_CAPABILITY_PIN_ALL;
2364 }
2365
2366 caps_to_network(&cap);
2367
2368 ret = rdma_connect(rdma->cm_id, &conn_param);
2369 if (ret) {
2370 perror("rdma_connect");
2371 ERROR(errp, "connecting to destination!");
2372 goto err_rdma_source_connect;
2373 }
2374
2375 ret = rdma_get_cm_event(rdma->channel, &cm_event);
2376 if (ret) {
2377 perror("rdma_get_cm_event after rdma_connect");
2378 ERROR(errp, "connecting to destination!");
2379 rdma_ack_cm_event(cm_event);
2380 goto err_rdma_source_connect;
2381 }
2382
2383 if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
2384 perror("rdma_get_cm_event != EVENT_ESTABLISHED after rdma_connect");
2385 ERROR(errp, "connecting to destination!");
2386 rdma_ack_cm_event(cm_event);
2387 goto err_rdma_source_connect;
2388 }
2389 rdma->connected = true;
2390
2391 memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
2392 network_to_caps(&cap);
2393
2394 /*
2395 * Verify that the *requested* capabilities are supported by the destination
2396 * and disable them otherwise.
2397 */
2398 if (rdma->pin_all && !(cap.flags & RDMA_CAPABILITY_PIN_ALL)) {
2399 ERROR(errp, "Server cannot support pinning all memory. "
2400 "Will register memory dynamically.");
2401 rdma->pin_all = false;
2402 }
2403
2404 trace_qemu_rdma_connect_pin_all_outcome(rdma->pin_all);
2405
2406 rdma_ack_cm_event(cm_event);
2407
2408 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
2409 if (ret) {
2410 ERROR(errp, "posting second control recv!");
2411 goto err_rdma_source_connect;
2412 }
2413
2414 rdma->control_ready_expected = 1;
2415 rdma->nb_sent = 0;
2416 return 0;
2417
2418 err_rdma_source_connect:
2419 qemu_rdma_cleanup(rdma);
2420 return -1;
2421 }
2422
2423 static int qemu_rdma_dest_init(RDMAContext *rdma, Error **errp)
2424 {
2425 int ret, idx;
2426 struct rdma_cm_id *listen_id;
2427 char ip[40] = "unknown";
2428 struct rdma_addrinfo *res, *e;
2429 char port_str[16];
2430
2431 for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2432 rdma->wr_data[idx].control_len = 0;
2433 rdma->wr_data[idx].control_curr = NULL;
2434 }
2435
2436 if (!rdma->host || !rdma->host[0]) {
2437 ERROR(errp, "RDMA host is not set!");
2438 rdma->error_state = -EINVAL;
2439 return -1;
2440 }
2441 /* create CM channel */
2442 rdma->channel = rdma_create_event_channel();
2443 if (!rdma->channel) {
2444 ERROR(errp, "could not create rdma event channel");
2445 rdma->error_state = -EINVAL;
2446 return -1;
2447 }
2448
2449 /* create CM id */
2450 ret = rdma_create_id(rdma->channel, &listen_id, NULL, RDMA_PS_TCP);
2451 if (ret) {
2452 ERROR(errp, "could not create cm_id!");
2453 goto err_dest_init_create_listen_id;
2454 }
2455
2456 snprintf(port_str, 16, "%d", rdma->port);
2457 port_str[15] = '\0';
2458
2459 ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res);
2460 if (ret < 0) {
2461 ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host);
2462 goto err_dest_init_bind_addr;
2463 }
2464
2465 for (e = res; e != NULL; e = e->ai_next) {
2466 inet_ntop(e->ai_family,
2467 &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip);
2468 trace_qemu_rdma_dest_init_trying(rdma->host, ip);
2469 ret = rdma_bind_addr(listen_id, e->ai_dst_addr);
2470 if (ret) {
2471 continue;
2472 }
2473 if (e->ai_family == AF_INET6) {
2474 ret = qemu_rdma_broken_ipv6_kernel(listen_id->verbs, errp);
2475 if (ret) {
2476 continue;
2477 }
2478 }
2479 break;
2480 }
2481
2482 if (!e) {
2483 ERROR(errp, "Error: could not rdma_bind_addr!");
2484 goto err_dest_init_bind_addr;
2485 }
2486
2487 rdma->listen_id = listen_id;
2488 qemu_rdma_dump_gid("dest_init", listen_id);
2489 return 0;
2490
2491 err_dest_init_bind_addr:
2492 rdma_destroy_id(listen_id);
2493 err_dest_init_create_listen_id:
2494 rdma_destroy_event_channel(rdma->channel);
2495 rdma->channel = NULL;
2496 rdma->error_state = ret;
2497 return ret;
2498
2499 }
2500
2501 static void *qemu_rdma_data_init(const char *host_port, Error **errp)
2502 {
2503 RDMAContext *rdma = NULL;
2504 InetSocketAddress *addr;
2505
2506 if (host_port) {
2507 rdma = g_new0(RDMAContext, 1);
2508 rdma->current_index = -1;
2509 rdma->current_chunk = -1;
2510
2511 addr = g_new(InetSocketAddress, 1);
2512 if (!inet_parse(addr, host_port, NULL)) {
2513 rdma->port = atoi(addr->port);
2514 rdma->host = g_strdup(addr->host);
2515 } else {
2516 ERROR(errp, "bad RDMA migration address '%s'", host_port);
2517 g_free(rdma);
2518 rdma = NULL;
2519 }
2520
2521 qapi_free_InetSocketAddress(addr);
2522 }
2523
2524 return rdma;
2525 }
2526
2527 /*
2528 * QEMUFile interface to the control channel.
2529 * SEND messages for control only.
2530 * VM's ram is handled with regular RDMA messages.
2531 */
2532 static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
2533 const struct iovec *iov,
2534 size_t niov,
2535 int *fds,
2536 size_t nfds,
2537 Error **errp)
2538 {
2539 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2540 QEMUFile *f = rioc->file;
2541 RDMAContext *rdma = rioc->rdma;
2542 int ret;
2543 ssize_t done = 0;
2544 size_t i;
2545
2546 CHECK_ERROR_STATE();
2547
2548 /*
2549 * Push out any writes that
2550 * we're queued up for VM's ram.
2551 */
2552 ret = qemu_rdma_write_flush(f, rdma);
2553 if (ret < 0) {
2554 rdma->error_state = ret;
2555 return ret;
2556 }
2557
2558 for (i = 0; i < niov; i++) {
2559 size_t remaining = iov[i].iov_len;
2560 uint8_t * data = (void *)iov[i].iov_base;
2561 while (remaining) {
2562 RDMAControlHeader head;
2563
2564 rioc->len = MIN(remaining, RDMA_SEND_INCREMENT);
2565 remaining -= rioc->len;
2566
2567 head.len = rioc->len;
2568 head.type = RDMA_CONTROL_QEMU_FILE;
2569
2570 ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL);
2571
2572 if (ret < 0) {
2573 rdma->error_state = ret;
2574 return ret;
2575 }
2576
2577 data += rioc->len;
2578 done += rioc->len;
2579 }
2580 }
2581
2582 return done;
2583 }
2584
2585 static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf,
2586 size_t size, int idx)
2587 {
2588 size_t len = 0;
2589
2590 if (rdma->wr_data[idx].control_len) {
2591 trace_qemu_rdma_fill(rdma->wr_data[idx].control_len, size);
2592
2593 len = MIN(size, rdma->wr_data[idx].control_len);
2594 memcpy(buf, rdma->wr_data[idx].control_curr, len);
2595 rdma->wr_data[idx].control_curr += len;
2596 rdma->wr_data[idx].control_len -= len;
2597 }
2598
2599 return len;
2600 }
2601
2602 /*
2603 * QEMUFile interface to the control channel.
2604 * RDMA links don't use bytestreams, so we have to
2605 * return bytes to QEMUFile opportunistically.
2606 */
2607 static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
2608 const struct iovec *iov,
2609 size_t niov,
2610 int **fds,
2611 size_t *nfds,
2612 Error **errp)
2613 {
2614 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2615 RDMAContext *rdma = rioc->rdma;
2616 RDMAControlHeader head;
2617 int ret = 0;
2618 ssize_t i;
2619 size_t done = 0;
2620
2621 CHECK_ERROR_STATE();
2622
2623 for (i = 0; i < niov; i++) {
2624 size_t want = iov[i].iov_len;
2625 uint8_t *data = (void *)iov[i].iov_base;
2626
2627 /*
2628 * First, we hold on to the last SEND message we
2629 * were given and dish out the bytes until we run
2630 * out of bytes.
2631 */
2632 ret = qemu_rdma_fill(rioc->rdma, data, want, 0);
2633 done += ret;
2634 want -= ret;
2635 /* Got what we needed, so go to next iovec */
2636 if (want == 0) {
2637 continue;
2638 }
2639
2640 /* If we got any data so far, then don't wait
2641 * for more, just return what we have */
2642 if (done > 0) {
2643 break;
2644 }
2645
2646
2647 /* We've got nothing at all, so lets wait for
2648 * more to arrive
2649 */
2650 ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE);
2651
2652 if (ret < 0) {
2653 rdma->error_state = ret;
2654 return ret;
2655 }
2656
2657 /*
2658 * SEND was received with new bytes, now try again.
2659 */
2660 ret = qemu_rdma_fill(rioc->rdma, data, want, 0);
2661 done += ret;
2662 want -= ret;
2663
2664 /* Still didn't get enough, so lets just return */
2665 if (want) {
2666 if (done == 0) {
2667 return QIO_CHANNEL_ERR_BLOCK;
2668 } else {
2669 break;
2670 }
2671 }
2672 }
2673 rioc->len = done;
2674 return rioc->len;
2675 }
2676
2677 /*
2678 * Block until all the outstanding chunks have been delivered by the hardware.
2679 */
2680 static int qemu_rdma_drain_cq(QEMUFile *f, RDMAContext *rdma)
2681 {
2682 int ret;
2683
2684 if (qemu_rdma_write_flush(f, rdma) < 0) {
2685 return -EIO;
2686 }
2687
2688 while (rdma->nb_sent) {
2689 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
2690 if (ret < 0) {
2691 error_report("rdma migration: complete polling error!");
2692 return -EIO;
2693 }
2694 }
2695
2696 qemu_rdma_unregister_waiting(rdma);
2697
2698 return 0;
2699 }
2700
2701
2702 static int qio_channel_rdma_set_blocking(QIOChannel *ioc,
2703 bool blocking,
2704 Error **errp)
2705 {
2706 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2707 /* XXX we should make readv/writev actually honour this :-) */
2708 rioc->blocking = blocking;
2709 return 0;
2710 }
2711
2712
2713 typedef struct QIOChannelRDMASource QIOChannelRDMASource;
2714 struct QIOChannelRDMASource {
2715 GSource parent;
2716 QIOChannelRDMA *rioc;
2717 GIOCondition condition;
2718 };
2719
2720 static gboolean
2721 qio_channel_rdma_source_prepare(GSource *source,
2722 gint *timeout)
2723 {
2724 QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
2725 RDMAContext *rdma = rsource->rioc->rdma;
2726 GIOCondition cond = 0;
2727 *timeout = -1;
2728
2729 if (rdma->wr_data[0].control_len) {
2730 cond |= G_IO_IN;
2731 }
2732 cond |= G_IO_OUT;
2733
2734 return cond & rsource->condition;
2735 }
2736
2737 static gboolean
2738 qio_channel_rdma_source_check(GSource *source)
2739 {
2740 QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
2741 RDMAContext *rdma = rsource->rioc->rdma;
2742 GIOCondition cond = 0;
2743
2744 if (rdma->wr_data[0].control_len) {
2745 cond |= G_IO_IN;
2746 }
2747 cond |= G_IO_OUT;
2748
2749 return cond & rsource->condition;
2750 }
2751
2752 static gboolean
2753 qio_channel_rdma_source_dispatch(GSource *source,
2754 GSourceFunc callback,
2755 gpointer user_data)
2756 {
2757 QIOChannelFunc func = (QIOChannelFunc)callback;
2758 QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
2759 RDMAContext *rdma = rsource->rioc->rdma;
2760 GIOCondition cond = 0;
2761
2762 if (rdma->wr_data[0].control_len) {
2763 cond |= G_IO_IN;
2764 }
2765 cond |= G_IO_OUT;
2766
2767 return (*func)(QIO_CHANNEL(rsource->rioc),
2768 (cond & rsource->condition),
2769 user_data);
2770 }
2771
2772 static void
2773 qio_channel_rdma_source_finalize(GSource *source)
2774 {
2775 QIOChannelRDMASource *ssource = (QIOChannelRDMASource *)source;
2776
2777 object_unref(OBJECT(ssource->rioc));
2778 }
2779
2780 GSourceFuncs qio_channel_rdma_source_funcs = {
2781 qio_channel_rdma_source_prepare,
2782 qio_channel_rdma_source_check,
2783 qio_channel_rdma_source_dispatch,
2784 qio_channel_rdma_source_finalize
2785 };
2786
2787 static GSource *qio_channel_rdma_create_watch(QIOChannel *ioc,
2788 GIOCondition condition)
2789 {
2790 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2791 QIOChannelRDMASource *ssource;
2792 GSource *source;
2793
2794 source = g_source_new(&qio_channel_rdma_source_funcs,
2795 sizeof(QIOChannelRDMASource));
2796 ssource = (QIOChannelRDMASource *)source;
2797
2798 ssource->rioc = rioc;
2799 object_ref(OBJECT(rioc));
2800
2801 ssource->condition = condition;
2802
2803 return source;
2804 }
2805
2806
2807 static int qio_channel_rdma_close(QIOChannel *ioc,
2808 Error **errp)
2809 {
2810 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2811 trace_qemu_rdma_close();
2812 if (rioc->rdma) {
2813 if (!rioc->rdma->error_state) {
2814 rioc->rdma->error_state = qemu_file_get_error(rioc->file);
2815 }
2816 qemu_rdma_cleanup(rioc->rdma);
2817 g_free(rioc->rdma);
2818 rioc->rdma = NULL;
2819 }
2820 return 0;
2821 }
2822
2823 /*
2824 * Parameters:
2825 * @offset == 0 :
2826 * This means that 'block_offset' is a full virtual address that does not
2827 * belong to a RAMBlock of the virtual machine and instead
2828 * represents a private malloc'd memory area that the caller wishes to
2829 * transfer.
2830 *
2831 * @offset != 0 :
2832 * Offset is an offset to be added to block_offset and used
2833 * to also lookup the corresponding RAMBlock.
2834 *
2835 * @size > 0 :
2836 * Initiate an transfer this size.
2837 *
2838 * @size == 0 :
2839 * A 'hint' or 'advice' that means that we wish to speculatively
2840 * and asynchronously unregister this memory. In this case, there is no
2841 * guarantee that the unregister will actually happen, for example,
2842 * if the memory is being actively transmitted. Additionally, the memory
2843 * may be re-registered at any future time if a write within the same
2844 * chunk was requested again, even if you attempted to unregister it
2845 * here.
2846 *
2847 * @size < 0 : TODO, not yet supported
2848 * Unregister the memory NOW. This means that the caller does not
2849 * expect there to be any future RDMA transfers and we just want to clean
2850 * things up. This is used in case the upper layer owns the memory and
2851 * cannot wait for qemu_fclose() to occur.
2852 *
2853 * @bytes_sent : User-specificed pointer to indicate how many bytes were
2854 * sent. Usually, this will not be more than a few bytes of
2855 * the protocol because most transfers are sent asynchronously.
2856 */
2857 static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque,
2858 ram_addr_t block_offset, ram_addr_t offset,
2859 size_t size, uint64_t *bytes_sent)
2860 {
2861 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
2862 RDMAContext *rdma = rioc->rdma;
2863 int ret;
2864
2865 CHECK_ERROR_STATE();
2866
2867 qemu_fflush(f);
2868
2869 if (size > 0) {
2870 /*
2871 * Add this page to the current 'chunk'. If the chunk
2872 * is full, or the page doen't belong to the current chunk,
2873 * an actual RDMA write will occur and a new chunk will be formed.
2874 */
2875 ret = qemu_rdma_write(f, rdma, block_offset, offset, size);
2876 if (ret < 0) {
2877 error_report("rdma migration: write error! %d", ret);
2878 goto err;
2879 }
2880
2881 /*
2882 * We always return 1 bytes because the RDMA
2883 * protocol is completely asynchronous. We do not yet know
2884 * whether an identified chunk is zero or not because we're
2885 * waiting for other pages to potentially be merged with
2886 * the current chunk. So, we have to call qemu_update_position()
2887 * later on when the actual write occurs.
2888 */
2889 if (bytes_sent) {
2890 *bytes_sent = 1;
2891 }
2892 } else {
2893 uint64_t index, chunk;
2894
2895 /* TODO: Change QEMUFileOps prototype to be signed: size_t => long
2896 if (size < 0) {
2897 ret = qemu_rdma_drain_cq(f, rdma);
2898 if (ret < 0) {
2899 fprintf(stderr, "rdma: failed to synchronously drain"
2900 " completion queue before unregistration.\n");
2901 goto err;
2902 }
2903 }
2904 */
2905
2906 ret = qemu_rdma_search_ram_block(rdma, block_offset,
2907 offset, size, &index, &chunk);
2908
2909 if (ret) {
2910 error_report("ram block search failed");
2911 goto err;
2912 }
2913
2914 qemu_rdma_signal_unregister(rdma, index, chunk, 0);
2915
2916 /*
2917 * TODO: Synchronous, guaranteed unregistration (should not occur during
2918 * fast-path). Otherwise, unregisters will process on the next call to
2919 * qemu_rdma_drain_cq()
2920 if (size < 0) {
2921 qemu_rdma_unregister_waiting(rdma);
2922 }
2923 */
2924 }
2925
2926 /*
2927 * Drain the Completion Queue if possible, but do not block,
2928 * just poll.
2929 *
2930 * If nothing to poll, the end of the iteration will do this
2931 * again to make sure we don't overflow the request queue.
2932 */
2933 while (1) {
2934 uint64_t wr_id, wr_id_in;
2935 int ret = qemu_rdma_poll(rdma, &wr_id_in, NULL);
2936 if (ret < 0) {
2937 error_report("rdma migration: polling error! %d", ret);
2938 goto err;
2939 }
2940
2941 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
2942
2943 if (wr_id == RDMA_WRID_NONE) {
2944 break;
2945 }
2946 }
2947
2948 return RAM_SAVE_CONTROL_DELAYED;
2949 err:
2950 rdma->error_state = ret;
2951 return ret;
2952 }
2953
2954 static int qemu_rdma_accept(RDMAContext *rdma)
2955 {
2956 RDMACapabilities cap;
2957 struct rdma_conn_param conn_param = {
2958 .responder_resources = 2,
2959 .private_data = &cap,
2960 .private_data_len = sizeof(cap),
2961 };
2962 struct rdma_cm_event *cm_event;
2963 struct ibv_context *verbs;
2964 int ret = -EINVAL;
2965 int idx;
2966
2967 ret = rdma_get_cm_event(rdma->channel, &cm_event);
2968 if (ret) {
2969 goto err_rdma_dest_wait;
2970 }
2971
2972 if (cm_event->event != RDMA_CM_EVENT_CONNECT_REQUEST) {
2973 rdma_ack_cm_event(cm_event);
2974 goto err_rdma_dest_wait;
2975 }
2976
2977 memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
2978
2979 network_to_caps(&cap);
2980
2981 if (cap.version < 1 || cap.version > RDMA_CONTROL_VERSION_CURRENT) {
2982 error_report("Unknown source RDMA version: %d, bailing...",
2983 cap.version);
2984 rdma_ack_cm_event(cm_event);
2985 goto err_rdma_dest_wait;
2986 }
2987
2988 /*
2989 * Respond with only the capabilities this version of QEMU knows about.
2990 */
2991 cap.flags &= known_capabilities;
2992
2993 /*
2994 * Enable the ones that we do know about.
2995 * Add other checks here as new ones are introduced.
2996 */
2997 if (cap.flags & RDMA_CAPABILITY_PIN_ALL) {
2998 rdma->pin_all = true;
2999 }
3000
3001 rdma->cm_id = cm_event->id;
3002 verbs = cm_event->id->verbs;
3003
3004 rdma_ack_cm_event(cm_event);
3005
3006 trace_qemu_rdma_accept_pin_state(rdma->pin_all);
3007
3008 caps_to_network(&cap);
3009
3010 trace_qemu_rdma_accept_pin_verbsc(verbs);
3011
3012 if (!rdma->verbs) {
3013 rdma->verbs = verbs;
3014 } else if (rdma->verbs != verbs) {
3015 error_report("ibv context not matching %p, %p!", rdma->verbs,
3016 verbs);
3017 goto err_rdma_dest_wait;
3018 }
3019
3020 qemu_rdma_dump_id("dest_init", verbs);
3021
3022 ret = qemu_rdma_alloc_pd_cq(rdma);
3023 if (ret) {
3024 error_report("rdma migration: error allocating pd and cq!");
3025 goto err_rdma_dest_wait;
3026 }
3027
3028 ret = qemu_rdma_alloc_qp(rdma);
3029 if (ret) {
3030 error_report("rdma migration: error allocating qp!");
3031 goto err_rdma_dest_wait;
3032 }
3033
3034 ret = qemu_rdma_init_ram_blocks(rdma);
3035 if (ret) {
3036 error_report("rdma migration: error initializing ram blocks!");
3037 goto err_rdma_dest_wait;
3038 }
3039
3040 for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
3041 ret = qemu_rdma_reg_control(rdma, idx);
3042 if (ret) {
3043 error_report("rdma: error registering %d control", idx);
3044 goto err_rdma_dest_wait;
3045 }
3046 }
3047
3048 qemu_set_fd_handler(rdma->channel->fd, NULL, NULL, NULL);
3049
3050 ret = rdma_accept(rdma->cm_id, &conn_param);
3051 if (ret) {
3052 error_report("rdma_accept returns %d", ret);
3053 goto err_rdma_dest_wait;
3054 }
3055
3056 ret = rdma_get_cm_event(rdma->channel, &cm_event);
3057 if (ret) {
3058 error_report("rdma_accept get_cm_event failed %d", ret);
3059 goto err_rdma_dest_wait;
3060 }
3061
3062 if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
3063 error_report("rdma_accept not event established");
3064 rdma_ack_cm_event(cm_event);
3065 goto err_rdma_dest_wait;
3066 }
3067
3068 rdma_ack_cm_event(cm_event);
3069 rdma->connected = true;
3070
3071 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
3072 if (ret) {
3073 error_report("rdma migration: error posting second control recv");
3074 goto err_rdma_dest_wait;
3075 }
3076
3077 qemu_rdma_dump_gid("dest_connect", rdma->cm_id);
3078
3079 return 0;
3080
3081 err_rdma_dest_wait:
3082 rdma->error_state = ret;
3083 qemu_rdma_cleanup(rdma);
3084 return ret;
3085 }
3086
3087 static int dest_ram_sort_func(const void *a, const void *b)
3088 {
3089 unsigned int a_index = ((const RDMALocalBlock *)a)->src_index;
3090 unsigned int b_index = ((const RDMALocalBlock *)b)->src_index;
3091
3092 return (a_index < b_index) ? -1 : (a_index != b_index);
3093 }
3094
3095 /*
3096 * During each iteration of the migration, we listen for instructions
3097 * by the source VM to perform dynamic page registrations before they
3098 * can perform RDMA operations.
3099 *
3100 * We respond with the 'rkey'.
3101 *
3102 * Keep doing this until the source tells us to stop.
3103 */
3104 static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque)
3105 {
3106 RDMAControlHeader reg_resp = { .len = sizeof(RDMARegisterResult),
3107 .type = RDMA_CONTROL_REGISTER_RESULT,
3108 .repeat = 0,
3109 };
3110 RDMAControlHeader unreg_resp = { .len = 0,
3111 .type = RDMA_CONTROL_UNREGISTER_FINISHED,
3112 .repeat = 0,
3113 };
3114 RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT,
3115 .repeat = 1 };
3116 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
3117 RDMAContext *rdma = rioc->rdma;
3118 RDMALocalBlocks *local = &rdma->local_ram_blocks;
3119 RDMAControlHeader head;
3120 RDMARegister *reg, *registers;
3121 RDMACompress *comp;
3122 RDMARegisterResult *reg_result;
3123 static RDMARegisterResult results[RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE];
3124 RDMALocalBlock *block;
3125 void *host_addr;
3126 int ret = 0;
3127 int idx = 0;
3128 int count = 0;
3129 int i = 0;
3130
3131 CHECK_ERROR_STATE();
3132
3133 do {
3134 trace_qemu_rdma_registration_handle_wait();
3135
3136 ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_NONE);
3137
3138 if (ret < 0) {
3139 break;
3140 }
3141
3142 if (head.repeat > RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE) {
3143 error_report("rdma: Too many requests in this message (%d)."
3144 "Bailing.", head.repeat);
3145 ret = -EIO;
3146 break;
3147 }
3148
3149 switch (head.type) {
3150 case RDMA_CONTROL_COMPRESS:
3151 comp = (RDMACompress *) rdma->wr_data[idx].control_curr;
3152 network_to_compress(comp);
3153
3154 trace_qemu_rdma_registration_handle_compress(comp->length,
3155 comp->block_idx,
3156 comp->offset);
3157 if (comp->block_idx >= rdma->local_ram_blocks.nb_blocks) {
3158 error_report("rdma: 'compress' bad block index %u (vs %d)",
3159 (unsigned int)comp->block_idx,
3160 rdma->local_ram_blocks.nb_blocks);
3161 ret = -EIO;
3162 goto out;
3163 }
3164 block = &(rdma->local_ram_blocks.block[comp->block_idx]);
3165
3166 host_addr = block->local_host_addr +
3167 (comp->offset - block->offset);
3168
3169 ram_handle_compressed(host_addr, comp->value, comp->length);
3170 break;
3171
3172 case RDMA_CONTROL_REGISTER_FINISHED:
3173 trace_qemu_rdma_registration_handle_finished();
3174 goto out;
3175
3176 case RDMA_CONTROL_RAM_BLOCKS_REQUEST:
3177 trace_qemu_rdma_registration_handle_ram_blocks();
3178
3179 /* Sort our local RAM Block list so it's the same as the source,
3180 * we can do this since we've filled in a src_index in the list
3181 * as we received the RAMBlock list earlier.
3182 */
3183 qsort(rdma->local_ram_blocks.block,
3184 rdma->local_ram_blocks.nb_blocks,
3185 sizeof(RDMALocalBlock), dest_ram_sort_func);
3186 if (rdma->pin_all) {
3187 ret = qemu_rdma_reg_whole_ram_blocks(rdma);
3188 if (ret) {
3189 error_report("rdma migration: error dest "
3190 "registering ram blocks");
3191 goto out;
3192 }
3193 }
3194
3195 /*
3196 * Dest uses this to prepare to transmit the RAMBlock descriptions
3197 * to the source VM after connection setup.
3198 * Both sides use the "remote" structure to communicate and update
3199 * their "local" descriptions with what was sent.
3200 */
3201 for (i = 0; i < local->nb_blocks; i++) {
3202 rdma->dest_blocks[i].remote_host_addr =
3203 (uintptr_t)(local->block[i].local_host_addr);
3204
3205 if (rdma->pin_all) {
3206 rdma->dest_blocks[i].remote_rkey = local->block[i].mr->rkey;
3207 }
3208
3209 rdma->dest_blocks[i].offset = local->block[i].offset;
3210 rdma->dest_blocks[i].length = local->block[i].length;
3211
3212 dest_block_to_network(&rdma->dest_blocks[i]);
3213 trace_qemu_rdma_registration_handle_ram_blocks_loop(
3214 local->block[i].block_name,
3215 local->block[i].offset,
3216 local->block[i].length,
3217 local->block[i].local_host_addr,
3218 local->block[i].src_index);
3219 }
3220
3221 blocks.len = rdma->local_ram_blocks.nb_blocks
3222 * sizeof(RDMADestBlock);
3223
3224
3225 ret = qemu_rdma_post_send_control(rdma,
3226 (uint8_t *) rdma->dest_blocks, &blocks);
3227
3228 if (ret < 0) {
3229 error_report("rdma migration: error sending remote info");
3230 goto out;
3231 }
3232
3233 break;
3234 case RDMA_CONTROL_REGISTER_REQUEST:
3235 trace_qemu_rdma_registration_handle_register(head.repeat);
3236
3237 reg_resp.repeat = head.repeat;
3238 registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
3239
3240 for (count = 0; count < head.repeat; count++) {
3241 uint64_t chunk;
3242 uint8_t *chunk_start, *chunk_end;
3243
3244 reg = &registers[count];
3245 network_to_register(reg);
3246
3247 reg_result = &results[count];
3248
3249 trace_qemu_rdma_registration_handle_register_loop(count,
3250 reg->current_index, reg->key.current_addr, reg->chunks);
3251
3252 if (reg->current_index >= rdma->local_ram_blocks.nb_blocks) {
3253 error_report("rdma: 'register' bad block index %u (vs %d)",
3254 (unsigned int)reg->current_index,
3255 rdma->local_ram_blocks.nb_blocks);
3256 ret = -ENOENT;
3257 goto out;
3258 }
3259 block = &(rdma->local_ram_blocks.block[reg->current_index]);
3260 if (block->is_ram_block) {
3261 if (block->offset > reg->key.current_addr) {
3262 error_report("rdma: bad register address for block %s"
3263 " offset: %" PRIx64 " current_addr: %" PRIx64,
3264 block->block_name, block->offset,
3265 reg->key.current_addr);
3266 ret = -ERANGE;
3267 goto out;
3268 }
3269 host_addr = (block->local_host_addr +
3270 (reg->key.current_addr - block->offset));
3271 chunk = ram_chunk_index(block->local_host_addr,
3272 (uint8_t *) host_addr);
3273 } else {
3274 chunk = reg->key.chunk;
3275 host_addr = block->local_host_addr +
3276 (reg->key.chunk * (1UL << RDMA_REG_CHUNK_SHIFT));
3277 /* Check for particularly bad chunk value */
3278 if (host_addr < (void *)block->local_host_addr) {
3279 error_report("rdma: bad chunk for block %s"
3280 " chunk: %" PRIx64,
3281 block->block_name, reg->key.chunk);
3282 ret = -ERANGE;
3283 goto out;
3284 }
3285 }
3286 chunk_start = ram_chunk_start(block, chunk);
3287 chunk_end = ram_chunk_end(block, chunk + reg->chunks);
3288 if (qemu_rdma_register_and_get_keys(rdma, block,
3289 (uintptr_t)host_addr, NULL, &reg_result->rkey,
3290 chunk, chunk_start, chunk_end)) {
3291 error_report("cannot get rkey");
3292 ret = -EINVAL;
3293 goto out;
3294 }
3295
3296 reg_result->host_addr = (uintptr_t)block->local_host_addr;
3297
3298 trace_qemu_rdma_registration_handle_register_rkey(
3299 reg_result->rkey);
3300
3301 result_to_network(reg_result);
3302 }
3303
3304 ret = qemu_rdma_post_send_control(rdma,
3305 (uint8_t *) results, &reg_resp);
3306
3307 if (ret < 0) {
3308 error_report("Failed to send control buffer");
3309 goto out;
3310 }
3311 break;
3312 case RDMA_CONTROL_UNREGISTER_REQUEST:
3313 trace_qemu_rdma_registration_handle_unregister(head.repeat);
3314 unreg_resp.repeat = head.repeat;
3315 registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
3316
3317 for (count = 0; count < head.repeat; count++) {
3318 reg = &registers[count];
3319 network_to_register(reg);
3320
3321 trace_qemu_rdma_registration_handle_unregister_loop(count,
3322 reg->current_index, reg->key.chunk);
3323
3324 block = &(rdma->local_ram_blocks.block[reg->current_index]);
3325
3326 ret = ibv_dereg_mr(block->pmr[reg->key.chunk]);
3327 block->pmr[reg->key.chunk] = NULL;
3328
3329 if (ret != 0) {
3330 perror("rdma unregistration chunk failed");
3331 ret = -ret;
3332 goto out;
3333 }
3334
3335 rdma->total_registrations--;
3336
3337 trace_qemu_rdma_registration_handle_unregister_success(
3338 reg->key.chunk);
3339 }
3340
3341 ret = qemu_rdma_post_send_control(rdma, NULL, &unreg_resp);
3342
3343 if (ret < 0) {
3344 error_report("Failed to send control buffer");
3345 goto out;
3346 }
3347 break;
3348 case RDMA_CONTROL_REGISTER_RESULT:
3349 error_report("Invalid RESULT message at dest.");
3350 ret = -EIO;
3351 goto out;
3352 default:
3353 error_report("Unknown control message %s", control_desc[head.type]);
3354 ret = -EIO;
3355 goto out;
3356 }
3357 } while (1);
3358 out:
3359 if (ret < 0) {
3360 rdma->error_state = ret;
3361 }
3362 return ret;
3363 }
3364
3365 /* Destination:
3366 * Called via a ram_control_load_hook during the initial RAM load section which
3367 * lists the RAMBlocks by name. This lets us know the order of the RAMBlocks
3368 * on the source.
3369 * We've already built our local RAMBlock list, but not yet sent the list to
3370 * the source.
3371 */
3372 static int
3373 rdma_block_notification_handle(QIOChannelRDMA *rioc, const char *name)
3374 {
3375 RDMAContext *rdma = rioc->rdma;
3376 int curr;
3377 int found = -1;
3378
3379 /* Find the matching RAMBlock in our local list */
3380 for (curr = 0; curr < rdma->local_ram_blocks.nb_blocks; curr++) {
3381 if (!strcmp(rdma->local_ram_blocks.block[curr].block_name, name)) {
3382 found = curr;
3383 break;
3384 }
3385 }
3386
3387 if (found == -1) {
3388 error_report("RAMBlock '%s' not found on destination", name);
3389 return -ENOENT;
3390 }
3391
3392 rdma->local_ram_blocks.block[curr].src_index = rdma->next_src_index;
3393 trace_rdma_block_notification_handle(name, rdma->next_src_index);
3394 rdma->next_src_index++;
3395
3396 return 0;
3397 }
3398
3399 static int rdma_load_hook(QEMUFile *f, void *opaque, uint64_t flags, void *data)
3400 {
3401 switch (flags) {
3402 case RAM_CONTROL_BLOCK_REG:
3403 return rdma_block_notification_handle(opaque, data);
3404
3405 case RAM_CONTROL_HOOK:
3406 return qemu_rdma_registration_handle(f, opaque);
3407
3408 default:
3409 /* Shouldn't be called with any other values */
3410 abort();
3411 }
3412 }
3413
3414 static int qemu_rdma_registration_start(QEMUFile *f, void *opaque,
3415 uint64_t flags, void *data)
3416 {
3417 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
3418 RDMAContext *rdma = rioc->rdma;
3419
3420 CHECK_ERROR_STATE();
3421
3422 trace_qemu_rdma_registration_start(flags);
3423 qemu_put_be64(f, RAM_SAVE_FLAG_HOOK);
3424 qemu_fflush(f);
3425
3426 return 0;
3427 }
3428
3429 /*
3430 * Inform dest that dynamic registrations are done for now.
3431 * First, flush writes, if any.
3432 */
3433 static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
3434 uint64_t flags, void *data)
3435 {
3436 Error *local_err = NULL, **errp = &local_err;
3437 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
3438 RDMAContext *rdma = rioc->rdma;
3439 RDMAControlHeader head = { .len = 0, .repeat = 1 };
3440 int ret = 0;
3441
3442 CHECK_ERROR_STATE();
3443
3444 qemu_fflush(f);
3445 ret = qemu_rdma_drain_cq(f, rdma);
3446
3447 if (ret < 0) {
3448 goto err;
3449 }
3450
3451 if (flags == RAM_CONTROL_SETUP) {
3452 RDMAControlHeader resp = {.type = RDMA_CONTROL_RAM_BLOCKS_RESULT };
3453 RDMALocalBlocks *local = &rdma->local_ram_blocks;
3454 int reg_result_idx, i, nb_dest_blocks;
3455
3456 head.type = RDMA_CONTROL_RAM_BLOCKS_REQUEST;
3457 trace_qemu_rdma_registration_stop_ram();
3458
3459 /*
3460 * Make sure that we parallelize the pinning on both sides.
3461 * For very large guests, doing this serially takes a really
3462 * long time, so we have to 'interleave' the pinning locally
3463 * with the control messages by performing the pinning on this
3464 * side before we receive the control response from the other
3465 * side that the pinning has completed.
3466 */
3467 ret = qemu_rdma_exchange_send(rdma, &head, NULL, &resp,
3468 &reg_result_idx, rdma->pin_all ?
3469 qemu_rdma_reg_whole_ram_blocks : NULL);
3470 if (ret < 0) {
3471 ERROR(errp, "receiving remote info!");
3472 return ret;
3473 }
3474
3475 nb_dest_blocks = resp.len / sizeof(RDMADestBlock);
3476
3477 /*
3478 * The protocol uses two different sets of rkeys (mutually exclusive):
3479 * 1. One key to represent the virtual address of the entire ram block.
3480 * (dynamic chunk registration disabled - pin everything with one rkey.)
3481 * 2. One to represent individual chunks within a ram block.
3482 * (dynamic chunk registration enabled - pin individual chunks.)
3483 *
3484 * Once the capability is successfully negotiated, the destination transmits
3485 * the keys to use (or sends them later) including the virtual addresses
3486 * and then propagates the remote ram block descriptions to his local copy.
3487 */
3488
3489 if (local->nb_blocks != nb_dest_blocks) {
3490 ERROR(errp, "ram blocks mismatch (Number of blocks %d vs %d) "
3491 "Your QEMU command line parameters are probably "
3492 "not identical on both the source and destination.",
3493 local->nb_blocks, nb_dest_blocks);
3494 rdma->error_state = -EINVAL;
3495 return -EINVAL;
3496 }
3497
3498 qemu_rdma_move_header(rdma, reg_result_idx, &resp);
3499 memcpy(rdma->dest_blocks,
3500 rdma->wr_data[reg_result_idx].control_curr, resp.len);
3501 for (i = 0; i < nb_dest_blocks; i++) {
3502 network_to_dest_block(&rdma->dest_blocks[i]);
3503
3504 /* We require that the blocks are in the same order */
3505 if (rdma->dest_blocks[i].length != local->block[i].length) {
3506 ERROR(errp, "Block %s/%d has a different length %" PRIu64
3507 "vs %" PRIu64, local->block[i].block_name, i,
3508 local->block[i].length,
3509 rdma->dest_blocks[i].length);
3510 rdma->error_state = -EINVAL;
3511 return -EINVAL;
3512 }
3513 local->block[i].remote_host_addr =
3514 rdma->dest_blocks[i].remote_host_addr;
3515 local->block[i].remote_rkey = rdma->dest_blocks[i].remote_rkey;
3516 }
3517 }
3518
3519 trace_qemu_rdma_registration_stop(flags);
3520
3521 head.type = RDMA_CONTROL_REGISTER_FINISHED;
3522 ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL);
3523
3524 if (ret < 0) {
3525 goto err;
3526 }
3527
3528 return 0;
3529 err:
3530 rdma->error_state = ret;
3531 return ret;
3532 }
3533
3534 static const QEMUFileHooks rdma_read_hooks = {
3535 .hook_ram_load = rdma_load_hook,
3536 };
3537
3538 static const QEMUFileHooks rdma_write_hooks = {
3539 .before_ram_iterate = qemu_rdma_registration_start,
3540 .after_ram_iterate = qemu_rdma_registration_stop,
3541 .save_page = qemu_rdma_save_page,
3542 };
3543
3544
3545 static void qio_channel_rdma_finalize(Object *obj)
3546 {
3547 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(obj);
3548 if (rioc->rdma) {
3549 qemu_rdma_cleanup(rioc->rdma);
3550 g_free(rioc->rdma);
3551 rioc->rdma = NULL;
3552 }
3553 }
3554
3555 static void qio_channel_rdma_class_init(ObjectClass *klass,
3556 void *class_data G_GNUC_UNUSED)
3557 {
3558 QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass);
3559
3560 ioc_klass->io_writev = qio_channel_rdma_writev;
3561 ioc_klass->io_readv = qio_channel_rdma_readv;
3562 ioc_klass->io_set_blocking = qio_channel_rdma_set_blocking;
3563 ioc_klass->io_close = qio_channel_rdma_close;
3564 ioc_klass->io_create_watch = qio_channel_rdma_create_watch;
3565 }
3566
3567 static const TypeInfo qio_channel_rdma_info = {
3568 .parent = TYPE_QIO_CHANNEL,
3569 .name = TYPE_QIO_CHANNEL_RDMA,
3570 .instance_size = sizeof(QIOChannelRDMA),
3571 .instance_finalize = qio_channel_rdma_finalize,
3572 .class_init = qio_channel_rdma_class_init,
3573 };
3574
3575 static void qio_channel_rdma_register_types(void)
3576 {
3577 type_register_static(&qio_channel_rdma_info);
3578 }
3579
3580 type_init(qio_channel_rdma_register_types);
3581
3582 static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
3583 {
3584 QIOChannelRDMA *rioc;
3585
3586 if (qemu_file_mode_is_not_valid(mode)) {
3587 return NULL;
3588 }
3589
3590 rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
3591 rioc->rdma = rdma;
3592
3593 if (mode[0] == 'w') {
3594 rioc->file = qemu_fopen_channel_output(QIO_CHANNEL(rioc));
3595 qemu_file_set_hooks(rioc->file, &rdma_write_hooks);
3596 } else {
3597 rioc->file = qemu_fopen_channel_input(QIO_CHANNEL(rioc));
3598 qemu_file_set_hooks(rioc->file, &rdma_read_hooks);
3599 }
3600
3601 return rioc->file;
3602 }
3603
3604 static void rdma_accept_incoming_migration(void *opaque)
3605 {
3606 RDMAContext *rdma = opaque;
3607 int ret;
3608 QEMUFile *f;
3609 Error *local_err = NULL, **errp = &local_err;
3610
3611 trace_qemu_rdma_accept_incoming_migration();
3612 ret = qemu_rdma_accept(rdma);
3613
3614 if (ret) {
3615 ERROR(errp, "RDMA Migration initialization failed!");
3616 return;
3617 }
3618
3619 trace_qemu_rdma_accept_incoming_migration_accepted();
3620
3621 f = qemu_fopen_rdma(rdma, "rb");
3622 if (f == NULL) {
3623 ERROR(errp, "could not qemu_fopen_rdma!");
3624 qemu_rdma_cleanup(rdma);
3625 return;
3626 }
3627
3628 rdma->migration_started_on_destination = 1;
3629 migration_fd_process_incoming(f);
3630 }
3631
3632 void rdma_start_incoming_migration(const char *host_port, Error **errp)
3633 {
3634 int ret;
3635 RDMAContext *rdma;
3636 Error *local_err = NULL;
3637
3638 trace_rdma_start_incoming_migration();
3639 rdma = qemu_rdma_data_init(host_port, &local_err);
3640
3641 if (rdma == NULL) {
3642 goto err;
3643 }
3644
3645 ret = qemu_rdma_dest_init(rdma, &local_err);
3646
3647 if (ret) {
3648 goto err;
3649 }
3650
3651 trace_rdma_start_incoming_migration_after_dest_init();
3652
3653 ret = rdma_listen(rdma->listen_id, 5);
3654
3655 if (ret) {
3656 ERROR(errp, "listening on socket!");
3657 goto err;
3658 }
3659
3660 trace_rdma_start_incoming_migration_after_rdma_listen();
3661
3662 qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
3663 NULL, (void *)(intptr_t)rdma);
3664 return;
3665 err:
3666 error_propagate(errp, local_err);
3667 g_free(rdma);
3668 }
3669
3670 void rdma_start_outgoing_migration(void *opaque,
3671 const char *host_port, Error **errp)
3672 {
3673 MigrationState *s = opaque;
3674 RDMAContext *rdma = qemu_rdma_data_init(host_port, errp);
3675 int ret = 0;
3676
3677 if (rdma == NULL) {
3678 goto err;
3679 }
3680
3681 ret = qemu_rdma_source_init(rdma,
3682 s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL], errp);
3683
3684 if (ret) {
3685 goto err;
3686 }
3687
3688 trace_rdma_start_outgoing_migration_after_rdma_source_init();
3689 ret = qemu_rdma_connect(rdma, errp);
3690
3691 if (ret) {
3692 goto err;
3693 }
3694
3695 trace_rdma_start_outgoing_migration_after_rdma_connect();
3696
3697 s->to_dst_file = qemu_fopen_rdma(rdma, "wb");
3698 migrate_fd_connect(s);
3699 return;
3700 err:
3701 g_free(rdma);
3702 }