]> git.proxmox.com Git - mirror_zfs.git/blob - module/zfs/dmu_send.c
Add zstd support to zfs
[mirror_zfs.git] / module / zfs / dmu_send.c
1 /*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21 /*
22 * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
23 * Copyright 2011 Nexenta Systems, Inc. All rights reserved.
24 * Copyright (c) 2011, 2018 by Delphix. All rights reserved.
25 * Copyright (c) 2014, Joyent, Inc. All rights reserved.
26 * Copyright 2014 HybridCluster. All rights reserved.
27 * Copyright 2016 RackTop Systems.
28 * Copyright (c) 2016 Actifio, Inc. All rights reserved.
29 * Copyright (c) 2019, Klara Inc.
30 * Copyright (c) 2019, Allan Jude
31 */
32
33 #include <sys/dmu.h>
34 #include <sys/dmu_impl.h>
35 #include <sys/dmu_tx.h>
36 #include <sys/dbuf.h>
37 #include <sys/dnode.h>
38 #include <sys/zfs_context.h>
39 #include <sys/dmu_objset.h>
40 #include <sys/dmu_traverse.h>
41 #include <sys/dsl_dataset.h>
42 #include <sys/dsl_dir.h>
43 #include <sys/dsl_prop.h>
44 #include <sys/dsl_pool.h>
45 #include <sys/dsl_synctask.h>
46 #include <sys/spa_impl.h>
47 #include <sys/zfs_ioctl.h>
48 #include <sys/zap.h>
49 #include <sys/zio_checksum.h>
50 #include <sys/zfs_znode.h>
51 #include <zfs_fletcher.h>
52 #include <sys/avl.h>
53 #include <sys/ddt.h>
54 #include <sys/zfs_onexit.h>
55 #include <sys/dmu_send.h>
56 #include <sys/dmu_recv.h>
57 #include <sys/dsl_destroy.h>
58 #include <sys/blkptr.h>
59 #include <sys/dsl_bookmark.h>
60 #include <sys/zfeature.h>
61 #include <sys/bqueue.h>
62 #include <sys/zvol.h>
63 #include <sys/policy.h>
64 #include <sys/objlist.h>
65 #ifdef _KERNEL
66 #include <sys/zfs_vfsops.h>
67 #endif
68
69 /* Set this tunable to TRUE to replace corrupt data with 0x2f5baddb10c */
70 int zfs_send_corrupt_data = B_FALSE;
71 /*
72 * This tunable controls the amount of data (measured in bytes) that will be
73 * prefetched by zfs send. If the main thread is blocking on reads that haven't
74 * completed, this variable might need to be increased. If instead the main
75 * thread is issuing new reads because the prefetches have fallen out of the
76 * cache, this may need to be decreased.
77 */
78 int zfs_send_queue_length = SPA_MAXBLOCKSIZE;
79 /*
80 * This tunable controls the length of the queues that zfs send worker threads
81 * use to communicate. If the send_main_thread is blocking on these queues,
82 * this variable may need to be increased. If there is a significant slowdown
83 * at the start of a send as these threads consume all the available IO
84 * resources, this variable may need to be decreased.
85 */
86 int zfs_send_no_prefetch_queue_length = 1024 * 1024;
87 /*
88 * These tunables control the fill fraction of the queues by zfs send. The fill
89 * fraction controls the frequency with which threads have to be cv_signaled.
90 * If a lot of cpu time is being spent on cv_signal, then these should be tuned
91 * down. If the queues empty before the signalled thread can catch up, then
92 * these should be tuned up.
93 */
94 int zfs_send_queue_ff = 20;
95 int zfs_send_no_prefetch_queue_ff = 20;
96
97 /*
98 * Use this to override the recordsize calculation for fast zfs send estimates.
99 */
100 int zfs_override_estimate_recordsize = 0;
101
102 /* Set this tunable to FALSE to disable setting of DRR_FLAG_FREERECORDS */
103 int zfs_send_set_freerecords_bit = B_TRUE;
104
105 /* Set this tunable to FALSE is disable sending unmodified spill blocks. */
106 int zfs_send_unmodified_spill_blocks = B_TRUE;
107
108 static inline boolean_t
109 overflow_multiply(uint64_t a, uint64_t b, uint64_t *c)
110 {
111 uint64_t temp = a * b;
112 if (b != 0 && temp / b != a)
113 return (B_FALSE);
114 *c = temp;
115 return (B_TRUE);
116 }
117
118 struct send_thread_arg {
119 bqueue_t q;
120 objset_t *os; /* Objset to traverse */
121 uint64_t fromtxg; /* Traverse from this txg */
122 int flags; /* flags to pass to traverse_dataset */
123 int error_code;
124 boolean_t cancel;
125 zbookmark_phys_t resume;
126 uint64_t *num_blocks_visited;
127 };
128
129 struct redact_list_thread_arg {
130 boolean_t cancel;
131 bqueue_t q;
132 zbookmark_phys_t resume;
133 redaction_list_t *rl;
134 boolean_t mark_redact;
135 int error_code;
136 uint64_t *num_blocks_visited;
137 };
138
139 struct send_merge_thread_arg {
140 bqueue_t q;
141 objset_t *os;
142 struct redact_list_thread_arg *from_arg;
143 struct send_thread_arg *to_arg;
144 struct redact_list_thread_arg *redact_arg;
145 int error;
146 boolean_t cancel;
147 };
148
149 struct send_range {
150 boolean_t eos_marker; /* Marks the end of the stream */
151 uint64_t object;
152 uint64_t start_blkid;
153 uint64_t end_blkid;
154 bqueue_node_t ln;
155 enum type {DATA, HOLE, OBJECT, OBJECT_RANGE, REDACT,
156 PREVIOUSLY_REDACTED} type;
157 union {
158 struct srd {
159 dmu_object_type_t obj_type;
160 uint32_t datablksz; // logical size
161 uint32_t datasz; // payload size
162 blkptr_t bp;
163 arc_buf_t *abuf;
164 abd_t *abd;
165 kmutex_t lock;
166 kcondvar_t cv;
167 boolean_t io_outstanding;
168 int io_err;
169 } data;
170 struct srh {
171 uint32_t datablksz;
172 } hole;
173 struct sro {
174 /*
175 * This is a pointer because embedding it in the
176 * struct causes these structures to be massively larger
177 * for all range types; this makes the code much less
178 * memory efficient.
179 */
180 dnode_phys_t *dnp;
181 blkptr_t bp;
182 } object;
183 struct srr {
184 uint32_t datablksz;
185 } redact;
186 struct sror {
187 blkptr_t bp;
188 } object_range;
189 } sru;
190 };
191
192 /*
193 * The list of data whose inclusion in a send stream can be pending from
194 * one call to backup_cb to another. Multiple calls to dump_free(),
195 * dump_freeobjects(), and dump_redact() can be aggregated into a single
196 * DRR_FREE, DRR_FREEOBJECTS, or DRR_REDACT replay record.
197 */
198 typedef enum {
199 PENDING_NONE,
200 PENDING_FREE,
201 PENDING_FREEOBJECTS,
202 PENDING_REDACT
203 } dmu_pendop_t;
204
205 typedef struct dmu_send_cookie {
206 dmu_replay_record_t *dsc_drr;
207 dmu_send_outparams_t *dsc_dso;
208 offset_t *dsc_off;
209 objset_t *dsc_os;
210 zio_cksum_t dsc_zc;
211 uint64_t dsc_toguid;
212 uint64_t dsc_fromtxg;
213 int dsc_err;
214 dmu_pendop_t dsc_pending_op;
215 uint64_t dsc_featureflags;
216 uint64_t dsc_last_data_object;
217 uint64_t dsc_last_data_offset;
218 uint64_t dsc_resume_object;
219 uint64_t dsc_resume_offset;
220 boolean_t dsc_sent_begin;
221 boolean_t dsc_sent_end;
222 } dmu_send_cookie_t;
223
224 static int do_dump(dmu_send_cookie_t *dscp, struct send_range *range);
225
226 static void
227 range_free(struct send_range *range)
228 {
229 if (range->type == OBJECT) {
230 size_t size = sizeof (dnode_phys_t) *
231 (range->sru.object.dnp->dn_extra_slots + 1);
232 kmem_free(range->sru.object.dnp, size);
233 } else if (range->type == DATA) {
234 mutex_enter(&range->sru.data.lock);
235 while (range->sru.data.io_outstanding)
236 cv_wait(&range->sru.data.cv, &range->sru.data.lock);
237 if (range->sru.data.abd != NULL)
238 abd_free(range->sru.data.abd);
239 if (range->sru.data.abuf != NULL) {
240 arc_buf_destroy(range->sru.data.abuf,
241 &range->sru.data.abuf);
242 }
243 mutex_exit(&range->sru.data.lock);
244
245 cv_destroy(&range->sru.data.cv);
246 mutex_destroy(&range->sru.data.lock);
247 }
248 kmem_free(range, sizeof (*range));
249 }
250
251 /*
252 * For all record types except BEGIN, fill in the checksum (overlaid in
253 * drr_u.drr_checksum.drr_checksum). The checksum verifies everything
254 * up to the start of the checksum itself.
255 */
256 static int
257 dump_record(dmu_send_cookie_t *dscp, void *payload, int payload_len)
258 {
259 dmu_send_outparams_t *dso = dscp->dsc_dso;
260 ASSERT3U(offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),
261 ==, sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t));
262 (void) fletcher_4_incremental_native(dscp->dsc_drr,
263 offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),
264 &dscp->dsc_zc);
265 if (dscp->dsc_drr->drr_type == DRR_BEGIN) {
266 dscp->dsc_sent_begin = B_TRUE;
267 } else {
268 ASSERT(ZIO_CHECKSUM_IS_ZERO(&dscp->dsc_drr->drr_u.
269 drr_checksum.drr_checksum));
270 dscp->dsc_drr->drr_u.drr_checksum.drr_checksum = dscp->dsc_zc;
271 }
272 if (dscp->dsc_drr->drr_type == DRR_END) {
273 dscp->dsc_sent_end = B_TRUE;
274 }
275 (void) fletcher_4_incremental_native(&dscp->dsc_drr->
276 drr_u.drr_checksum.drr_checksum,
277 sizeof (zio_cksum_t), &dscp->dsc_zc);
278 *dscp->dsc_off += sizeof (dmu_replay_record_t);
279 dscp->dsc_err = dso->dso_outfunc(dscp->dsc_os, dscp->dsc_drr,
280 sizeof (dmu_replay_record_t), dso->dso_arg);
281 if (dscp->dsc_err != 0)
282 return (SET_ERROR(EINTR));
283 if (payload_len != 0) {
284 *dscp->dsc_off += payload_len;
285 /*
286 * payload is null when dso_dryrun == B_TRUE (i.e. when we're
287 * doing a send size calculation)
288 */
289 if (payload != NULL) {
290 (void) fletcher_4_incremental_native(
291 payload, payload_len, &dscp->dsc_zc);
292 }
293
294 /*
295 * The code does not rely on this (len being a multiple of 8).
296 * We keep this assertion because of the corresponding assertion
297 * in receive_read(). Keeping this assertion ensures that we do
298 * not inadvertently break backwards compatibility (causing the
299 * assertion in receive_read() to trigger on old software).
300 *
301 * Raw sends cannot be received on old software, and so can
302 * bypass this assertion.
303 */
304
305 ASSERT((payload_len % 8 == 0) ||
306 (dscp->dsc_featureflags & DMU_BACKUP_FEATURE_RAW));
307
308 dscp->dsc_err = dso->dso_outfunc(dscp->dsc_os, payload,
309 payload_len, dso->dso_arg);
310 if (dscp->dsc_err != 0)
311 return (SET_ERROR(EINTR));
312 }
313 return (0);
314 }
315
316 /*
317 * Fill in the drr_free struct, or perform aggregation if the previous record is
318 * also a free record, and the two are adjacent.
319 *
320 * Note that we send free records even for a full send, because we want to be
321 * able to receive a full send as a clone, which requires a list of all the free
322 * and freeobject records that were generated on the source.
323 */
324 static int
325 dump_free(dmu_send_cookie_t *dscp, uint64_t object, uint64_t offset,
326 uint64_t length)
327 {
328 struct drr_free *drrf = &(dscp->dsc_drr->drr_u.drr_free);
329
330 /*
331 * When we receive a free record, dbuf_free_range() assumes
332 * that the receiving system doesn't have any dbufs in the range
333 * being freed. This is always true because there is a one-record
334 * constraint: we only send one WRITE record for any given
335 * object,offset. We know that the one-record constraint is
336 * true because we always send data in increasing order by
337 * object,offset.
338 *
339 * If the increasing-order constraint ever changes, we should find
340 * another way to assert that the one-record constraint is still
341 * satisfied.
342 */
343 ASSERT(object > dscp->dsc_last_data_object ||
344 (object == dscp->dsc_last_data_object &&
345 offset > dscp->dsc_last_data_offset));
346
347 /*
348 * If there is a pending op, but it's not PENDING_FREE, push it out,
349 * since free block aggregation can only be done for blocks of the
350 * same type (i.e., DRR_FREE records can only be aggregated with
351 * other DRR_FREE records. DRR_FREEOBJECTS records can only be
352 * aggregated with other DRR_FREEOBJECTS records).
353 */
354 if (dscp->dsc_pending_op != PENDING_NONE &&
355 dscp->dsc_pending_op != PENDING_FREE) {
356 if (dump_record(dscp, NULL, 0) != 0)
357 return (SET_ERROR(EINTR));
358 dscp->dsc_pending_op = PENDING_NONE;
359 }
360
361 if (dscp->dsc_pending_op == PENDING_FREE) {
362 /*
363 * Check to see whether this free block can be aggregated
364 * with pending one.
365 */
366 if (drrf->drr_object == object && drrf->drr_offset +
367 drrf->drr_length == offset) {
368 if (offset + length < offset || length == UINT64_MAX)
369 drrf->drr_length = UINT64_MAX;
370 else
371 drrf->drr_length += length;
372 return (0);
373 } else {
374 /* not a continuation. Push out pending record */
375 if (dump_record(dscp, NULL, 0) != 0)
376 return (SET_ERROR(EINTR));
377 dscp->dsc_pending_op = PENDING_NONE;
378 }
379 }
380 /* create a FREE record and make it pending */
381 bzero(dscp->dsc_drr, sizeof (dmu_replay_record_t));
382 dscp->dsc_drr->drr_type = DRR_FREE;
383 drrf->drr_object = object;
384 drrf->drr_offset = offset;
385 if (offset + length < offset)
386 drrf->drr_length = DMU_OBJECT_END;
387 else
388 drrf->drr_length = length;
389 drrf->drr_toguid = dscp->dsc_toguid;
390 if (length == DMU_OBJECT_END) {
391 if (dump_record(dscp, NULL, 0) != 0)
392 return (SET_ERROR(EINTR));
393 } else {
394 dscp->dsc_pending_op = PENDING_FREE;
395 }
396
397 return (0);
398 }
399
400 /*
401 * Fill in the drr_redact struct, or perform aggregation if the previous record
402 * is also a redaction record, and the two are adjacent.
403 */
404 static int
405 dump_redact(dmu_send_cookie_t *dscp, uint64_t object, uint64_t offset,
406 uint64_t length)
407 {
408 struct drr_redact *drrr = &dscp->dsc_drr->drr_u.drr_redact;
409
410 /*
411 * If there is a pending op, but it's not PENDING_REDACT, push it out,
412 * since free block aggregation can only be done for blocks of the
413 * same type (i.e., DRR_REDACT records can only be aggregated with
414 * other DRR_REDACT records).
415 */
416 if (dscp->dsc_pending_op != PENDING_NONE &&
417 dscp->dsc_pending_op != PENDING_REDACT) {
418 if (dump_record(dscp, NULL, 0) != 0)
419 return (SET_ERROR(EINTR));
420 dscp->dsc_pending_op = PENDING_NONE;
421 }
422
423 if (dscp->dsc_pending_op == PENDING_REDACT) {
424 /*
425 * Check to see whether this redacted block can be aggregated
426 * with pending one.
427 */
428 if (drrr->drr_object == object && drrr->drr_offset +
429 drrr->drr_length == offset) {
430 drrr->drr_length += length;
431 return (0);
432 } else {
433 /* not a continuation. Push out pending record */
434 if (dump_record(dscp, NULL, 0) != 0)
435 return (SET_ERROR(EINTR));
436 dscp->dsc_pending_op = PENDING_NONE;
437 }
438 }
439 /* create a REDACT record and make it pending */
440 bzero(dscp->dsc_drr, sizeof (dmu_replay_record_t));
441 dscp->dsc_drr->drr_type = DRR_REDACT;
442 drrr->drr_object = object;
443 drrr->drr_offset = offset;
444 drrr->drr_length = length;
445 drrr->drr_toguid = dscp->dsc_toguid;
446 dscp->dsc_pending_op = PENDING_REDACT;
447
448 return (0);
449 }
450
451 static int
452 dmu_dump_write(dmu_send_cookie_t *dscp, dmu_object_type_t type, uint64_t object,
453 uint64_t offset, int lsize, int psize, const blkptr_t *bp, void *data)
454 {
455 uint64_t payload_size;
456 boolean_t raw = (dscp->dsc_featureflags & DMU_BACKUP_FEATURE_RAW);
457 struct drr_write *drrw = &(dscp->dsc_drr->drr_u.drr_write);
458
459 /*
460 * We send data in increasing object, offset order.
461 * See comment in dump_free() for details.
462 */
463 ASSERT(object > dscp->dsc_last_data_object ||
464 (object == dscp->dsc_last_data_object &&
465 offset > dscp->dsc_last_data_offset));
466 dscp->dsc_last_data_object = object;
467 dscp->dsc_last_data_offset = offset + lsize - 1;
468
469 /*
470 * If there is any kind of pending aggregation (currently either
471 * a grouping of free objects or free blocks), push it out to
472 * the stream, since aggregation can't be done across operations
473 * of different types.
474 */
475 if (dscp->dsc_pending_op != PENDING_NONE) {
476 if (dump_record(dscp, NULL, 0) != 0)
477 return (SET_ERROR(EINTR));
478 dscp->dsc_pending_op = PENDING_NONE;
479 }
480 /* write a WRITE record */
481 bzero(dscp->dsc_drr, sizeof (dmu_replay_record_t));
482 dscp->dsc_drr->drr_type = DRR_WRITE;
483 drrw->drr_object = object;
484 drrw->drr_type = type;
485 drrw->drr_offset = offset;
486 drrw->drr_toguid = dscp->dsc_toguid;
487 drrw->drr_logical_size = lsize;
488
489 /* only set the compression fields if the buf is compressed or raw */
490 if (raw || lsize != psize) {
491 ASSERT(raw || dscp->dsc_featureflags &
492 DMU_BACKUP_FEATURE_COMPRESSED);
493 ASSERT(!BP_IS_EMBEDDED(bp));
494 ASSERT3S(psize, >, 0);
495
496 if (raw) {
497 ASSERT(BP_IS_PROTECTED(bp));
498
499 /*
500 * This is a raw protected block so we need to pass
501 * along everything the receiving side will need to
502 * interpret this block, including the byteswap, salt,
503 * IV, and MAC.
504 */
505 if (BP_SHOULD_BYTESWAP(bp))
506 drrw->drr_flags |= DRR_RAW_BYTESWAP;
507 zio_crypt_decode_params_bp(bp, drrw->drr_salt,
508 drrw->drr_iv);
509 zio_crypt_decode_mac_bp(bp, drrw->drr_mac);
510 } else {
511 /* this is a compressed block */
512 ASSERT(dscp->dsc_featureflags &
513 DMU_BACKUP_FEATURE_COMPRESSED);
514 ASSERT(!BP_SHOULD_BYTESWAP(bp));
515 ASSERT(!DMU_OT_IS_METADATA(BP_GET_TYPE(bp)));
516 ASSERT3U(BP_GET_COMPRESS(bp), !=, ZIO_COMPRESS_OFF);
517 ASSERT3S(lsize, >=, psize);
518 }
519
520 /* set fields common to compressed and raw sends */
521 drrw->drr_compressiontype = BP_GET_COMPRESS(bp);
522 drrw->drr_compressed_size = psize;
523 payload_size = drrw->drr_compressed_size;
524 } else {
525 payload_size = drrw->drr_logical_size;
526 }
527
528 if (bp == NULL || BP_IS_EMBEDDED(bp) || (BP_IS_PROTECTED(bp) && !raw)) {
529 /*
530 * There's no pre-computed checksum for partial-block writes,
531 * embedded BP's, or encrypted BP's that are being sent as
532 * plaintext, so (like fletcher4-checksummed blocks) userland
533 * will have to compute a dedup-capable checksum itself.
534 */
535 drrw->drr_checksumtype = ZIO_CHECKSUM_OFF;
536 } else {
537 drrw->drr_checksumtype = BP_GET_CHECKSUM(bp);
538 if (zio_checksum_table[drrw->drr_checksumtype].ci_flags &
539 ZCHECKSUM_FLAG_DEDUP)
540 drrw->drr_flags |= DRR_CHECKSUM_DEDUP;
541 DDK_SET_LSIZE(&drrw->drr_key, BP_GET_LSIZE(bp));
542 DDK_SET_PSIZE(&drrw->drr_key, BP_GET_PSIZE(bp));
543 DDK_SET_COMPRESS(&drrw->drr_key, BP_GET_COMPRESS(bp));
544 DDK_SET_CRYPT(&drrw->drr_key, BP_IS_PROTECTED(bp));
545 drrw->drr_key.ddk_cksum = bp->blk_cksum;
546 }
547
548 if (dump_record(dscp, data, payload_size) != 0)
549 return (SET_ERROR(EINTR));
550 return (0);
551 }
552
553 static int
554 dump_write_embedded(dmu_send_cookie_t *dscp, uint64_t object, uint64_t offset,
555 int blksz, const blkptr_t *bp)
556 {
557 char buf[BPE_PAYLOAD_SIZE];
558 struct drr_write_embedded *drrw =
559 &(dscp->dsc_drr->drr_u.drr_write_embedded);
560
561 if (dscp->dsc_pending_op != PENDING_NONE) {
562 if (dump_record(dscp, NULL, 0) != 0)
563 return (SET_ERROR(EINTR));
564 dscp->dsc_pending_op = PENDING_NONE;
565 }
566
567 ASSERT(BP_IS_EMBEDDED(bp));
568
569 bzero(dscp->dsc_drr, sizeof (dmu_replay_record_t));
570 dscp->dsc_drr->drr_type = DRR_WRITE_EMBEDDED;
571 drrw->drr_object = object;
572 drrw->drr_offset = offset;
573 drrw->drr_length = blksz;
574 drrw->drr_toguid = dscp->dsc_toguid;
575 drrw->drr_compression = BP_GET_COMPRESS(bp);
576 drrw->drr_etype = BPE_GET_ETYPE(bp);
577 drrw->drr_lsize = BPE_GET_LSIZE(bp);
578 drrw->drr_psize = BPE_GET_PSIZE(bp);
579
580 decode_embedded_bp_compressed(bp, buf);
581
582 if (dump_record(dscp, buf, P2ROUNDUP(drrw->drr_psize, 8)) != 0)
583 return (SET_ERROR(EINTR));
584 return (0);
585 }
586
587 static int
588 dump_spill(dmu_send_cookie_t *dscp, const blkptr_t *bp, uint64_t object,
589 void *data)
590 {
591 struct drr_spill *drrs = &(dscp->dsc_drr->drr_u.drr_spill);
592 uint64_t blksz = BP_GET_LSIZE(bp);
593 uint64_t payload_size = blksz;
594
595 if (dscp->dsc_pending_op != PENDING_NONE) {
596 if (dump_record(dscp, NULL, 0) != 0)
597 return (SET_ERROR(EINTR));
598 dscp->dsc_pending_op = PENDING_NONE;
599 }
600
601 /* write a SPILL record */
602 bzero(dscp->dsc_drr, sizeof (dmu_replay_record_t));
603 dscp->dsc_drr->drr_type = DRR_SPILL;
604 drrs->drr_object = object;
605 drrs->drr_length = blksz;
606 drrs->drr_toguid = dscp->dsc_toguid;
607
608 /* See comment in dump_dnode() for full details */
609 if (zfs_send_unmodified_spill_blocks &&
610 (bp->blk_birth <= dscp->dsc_fromtxg)) {
611 drrs->drr_flags |= DRR_SPILL_UNMODIFIED;
612 }
613
614 /* handle raw send fields */
615 if (dscp->dsc_featureflags & DMU_BACKUP_FEATURE_RAW) {
616 ASSERT(BP_IS_PROTECTED(bp));
617
618 if (BP_SHOULD_BYTESWAP(bp))
619 drrs->drr_flags |= DRR_RAW_BYTESWAP;
620 drrs->drr_compressiontype = BP_GET_COMPRESS(bp);
621 drrs->drr_compressed_size = BP_GET_PSIZE(bp);
622 zio_crypt_decode_params_bp(bp, drrs->drr_salt, drrs->drr_iv);
623 zio_crypt_decode_mac_bp(bp, drrs->drr_mac);
624 payload_size = drrs->drr_compressed_size;
625 }
626
627 if (dump_record(dscp, data, payload_size) != 0)
628 return (SET_ERROR(EINTR));
629 return (0);
630 }
631
632 static int
633 dump_freeobjects(dmu_send_cookie_t *dscp, uint64_t firstobj, uint64_t numobjs)
634 {
635 struct drr_freeobjects *drrfo = &(dscp->dsc_drr->drr_u.drr_freeobjects);
636 uint64_t maxobj = DNODES_PER_BLOCK *
637 (DMU_META_DNODE(dscp->dsc_os)->dn_maxblkid + 1);
638
639 /*
640 * ZoL < 0.7 does not handle large FREEOBJECTS records correctly,
641 * leading to zfs recv never completing. to avoid this issue, don't
642 * send FREEOBJECTS records for object IDs which cannot exist on the
643 * receiving side.
644 */
645 if (maxobj > 0) {
646 if (maxobj < firstobj)
647 return (0);
648
649 if (maxobj < firstobj + numobjs)
650 numobjs = maxobj - firstobj;
651 }
652
653 /*
654 * If there is a pending op, but it's not PENDING_FREEOBJECTS,
655 * push it out, since free block aggregation can only be done for
656 * blocks of the same type (i.e., DRR_FREE records can only be
657 * aggregated with other DRR_FREE records. DRR_FREEOBJECTS records
658 * can only be aggregated with other DRR_FREEOBJECTS records).
659 */
660 if (dscp->dsc_pending_op != PENDING_NONE &&
661 dscp->dsc_pending_op != PENDING_FREEOBJECTS) {
662 if (dump_record(dscp, NULL, 0) != 0)
663 return (SET_ERROR(EINTR));
664 dscp->dsc_pending_op = PENDING_NONE;
665 }
666 if (numobjs == 0)
667 numobjs = UINT64_MAX - firstobj;
668
669 if (dscp->dsc_pending_op == PENDING_FREEOBJECTS) {
670 /*
671 * See whether this free object array can be aggregated
672 * with pending one
673 */
674 if (drrfo->drr_firstobj + drrfo->drr_numobjs == firstobj) {
675 drrfo->drr_numobjs += numobjs;
676 return (0);
677 } else {
678 /* can't be aggregated. Push out pending record */
679 if (dump_record(dscp, NULL, 0) != 0)
680 return (SET_ERROR(EINTR));
681 dscp->dsc_pending_op = PENDING_NONE;
682 }
683 }
684
685 /* write a FREEOBJECTS record */
686 bzero(dscp->dsc_drr, sizeof (dmu_replay_record_t));
687 dscp->dsc_drr->drr_type = DRR_FREEOBJECTS;
688 drrfo->drr_firstobj = firstobj;
689 drrfo->drr_numobjs = numobjs;
690 drrfo->drr_toguid = dscp->dsc_toguid;
691
692 dscp->dsc_pending_op = PENDING_FREEOBJECTS;
693
694 return (0);
695 }
696
697 static int
698 dump_dnode(dmu_send_cookie_t *dscp, const blkptr_t *bp, uint64_t object,
699 dnode_phys_t *dnp)
700 {
701 struct drr_object *drro = &(dscp->dsc_drr->drr_u.drr_object);
702 int bonuslen;
703
704 if (object < dscp->dsc_resume_object) {
705 /*
706 * Note: when resuming, we will visit all the dnodes in
707 * the block of dnodes that we are resuming from. In
708 * this case it's unnecessary to send the dnodes prior to
709 * the one we are resuming from. We should be at most one
710 * block's worth of dnodes behind the resume point.
711 */
712 ASSERT3U(dscp->dsc_resume_object - object, <,
713 1 << (DNODE_BLOCK_SHIFT - DNODE_SHIFT));
714 return (0);
715 }
716
717 if (dnp == NULL || dnp->dn_type == DMU_OT_NONE)
718 return (dump_freeobjects(dscp, object, 1));
719
720 if (dscp->dsc_pending_op != PENDING_NONE) {
721 if (dump_record(dscp, NULL, 0) != 0)
722 return (SET_ERROR(EINTR));
723 dscp->dsc_pending_op = PENDING_NONE;
724 }
725
726 /* write an OBJECT record */
727 bzero(dscp->dsc_drr, sizeof (dmu_replay_record_t));
728 dscp->dsc_drr->drr_type = DRR_OBJECT;
729 drro->drr_object = object;
730 drro->drr_type = dnp->dn_type;
731 drro->drr_bonustype = dnp->dn_bonustype;
732 drro->drr_blksz = dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT;
733 drro->drr_bonuslen = dnp->dn_bonuslen;
734 drro->drr_dn_slots = dnp->dn_extra_slots + 1;
735 drro->drr_checksumtype = dnp->dn_checksum;
736 drro->drr_compress = dnp->dn_compress;
737 drro->drr_toguid = dscp->dsc_toguid;
738
739 if (!(dscp->dsc_featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS) &&
740 drro->drr_blksz > SPA_OLD_MAXBLOCKSIZE)
741 drro->drr_blksz = SPA_OLD_MAXBLOCKSIZE;
742
743 bonuslen = P2ROUNDUP(dnp->dn_bonuslen, 8);
744
745 if ((dscp->dsc_featureflags & DMU_BACKUP_FEATURE_RAW)) {
746 ASSERT(BP_IS_ENCRYPTED(bp));
747
748 if (BP_SHOULD_BYTESWAP(bp))
749 drro->drr_flags |= DRR_RAW_BYTESWAP;
750
751 /* needed for reconstructing dnp on recv side */
752 drro->drr_maxblkid = dnp->dn_maxblkid;
753 drro->drr_indblkshift = dnp->dn_indblkshift;
754 drro->drr_nlevels = dnp->dn_nlevels;
755 drro->drr_nblkptr = dnp->dn_nblkptr;
756
757 /*
758 * Since we encrypt the entire bonus area, the (raw) part
759 * beyond the bonuslen is actually nonzero, so we need
760 * to send it.
761 */
762 if (bonuslen != 0) {
763 drro->drr_raw_bonuslen = DN_MAX_BONUS_LEN(dnp);
764 bonuslen = drro->drr_raw_bonuslen;
765 }
766 }
767
768 /*
769 * DRR_OBJECT_SPILL is set for every dnode which references a
770 * spill block. This allows the receiving pool to definitively
771 * determine when a spill block should be kept or freed.
772 */
773 if (dnp->dn_flags & DNODE_FLAG_SPILL_BLKPTR)
774 drro->drr_flags |= DRR_OBJECT_SPILL;
775
776 if (dump_record(dscp, DN_BONUS(dnp), bonuslen) != 0)
777 return (SET_ERROR(EINTR));
778
779 /* Free anything past the end of the file. */
780 if (dump_free(dscp, object, (dnp->dn_maxblkid + 1) *
781 (dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT), DMU_OBJECT_END) != 0)
782 return (SET_ERROR(EINTR));
783
784 /*
785 * Send DRR_SPILL records for unmodified spill blocks. This is useful
786 * because changing certain attributes of the object (e.g. blocksize)
787 * can cause old versions of ZFS to incorrectly remove a spill block.
788 * Including these records in the stream forces an up to date version
789 * to always be written ensuring they're never lost. Current versions
790 * of the code which understand the DRR_FLAG_SPILL_BLOCK feature can
791 * ignore these unmodified spill blocks.
792 */
793 if (zfs_send_unmodified_spill_blocks &&
794 (dnp->dn_flags & DNODE_FLAG_SPILL_BLKPTR) &&
795 (DN_SPILL_BLKPTR(dnp)->blk_birth <= dscp->dsc_fromtxg)) {
796 struct send_range record;
797 blkptr_t *bp = DN_SPILL_BLKPTR(dnp);
798
799 bzero(&record, sizeof (struct send_range));
800 record.type = DATA;
801 record.object = object;
802 record.eos_marker = B_FALSE;
803 record.start_blkid = DMU_SPILL_BLKID;
804 record.end_blkid = record.start_blkid + 1;
805 record.sru.data.bp = *bp;
806 record.sru.data.obj_type = dnp->dn_type;
807 record.sru.data.datablksz = BP_GET_LSIZE(bp);
808
809 if (do_dump(dscp, &record) != 0)
810 return (SET_ERROR(EINTR));
811 }
812
813 if (dscp->dsc_err != 0)
814 return (SET_ERROR(EINTR));
815
816 return (0);
817 }
818
819 static int
820 dump_object_range(dmu_send_cookie_t *dscp, const blkptr_t *bp,
821 uint64_t firstobj, uint64_t numslots)
822 {
823 struct drr_object_range *drror =
824 &(dscp->dsc_drr->drr_u.drr_object_range);
825
826 /* we only use this record type for raw sends */
827 ASSERT(BP_IS_PROTECTED(bp));
828 ASSERT(dscp->dsc_featureflags & DMU_BACKUP_FEATURE_RAW);
829 ASSERT3U(BP_GET_COMPRESS(bp), ==, ZIO_COMPRESS_OFF);
830 ASSERT3U(BP_GET_TYPE(bp), ==, DMU_OT_DNODE);
831 ASSERT0(BP_GET_LEVEL(bp));
832
833 if (dscp->dsc_pending_op != PENDING_NONE) {
834 if (dump_record(dscp, NULL, 0) != 0)
835 return (SET_ERROR(EINTR));
836 dscp->dsc_pending_op = PENDING_NONE;
837 }
838
839 bzero(dscp->dsc_drr, sizeof (dmu_replay_record_t));
840 dscp->dsc_drr->drr_type = DRR_OBJECT_RANGE;
841 drror->drr_firstobj = firstobj;
842 drror->drr_numslots = numslots;
843 drror->drr_toguid = dscp->dsc_toguid;
844 if (BP_SHOULD_BYTESWAP(bp))
845 drror->drr_flags |= DRR_RAW_BYTESWAP;
846 zio_crypt_decode_params_bp(bp, drror->drr_salt, drror->drr_iv);
847 zio_crypt_decode_mac_bp(bp, drror->drr_mac);
848
849 if (dump_record(dscp, NULL, 0) != 0)
850 return (SET_ERROR(EINTR));
851 return (0);
852 }
853
854 static boolean_t
855 send_do_embed(const blkptr_t *bp, uint64_t featureflags)
856 {
857 if (!BP_IS_EMBEDDED(bp))
858 return (B_FALSE);
859
860 /*
861 * Compression function must be legacy, or explicitly enabled.
862 */
863 if ((BP_GET_COMPRESS(bp) >= ZIO_COMPRESS_LEGACY_FUNCTIONS &&
864 !(featureflags & DMU_BACKUP_FEATURE_LZ4)))
865 return (B_FALSE);
866
867 /*
868 * If we have not set the ZSTD feature flag, we can't send ZSTD
869 * compressed embedded blocks, as the receiver may not support them.
870 */
871 if ((BP_GET_COMPRESS(bp) == ZIO_COMPRESS_ZSTD &&
872 !(featureflags & DMU_BACKUP_FEATURE_ZSTD)))
873 return (B_FALSE);
874
875 /*
876 * Embed type must be explicitly enabled.
877 */
878 switch (BPE_GET_ETYPE(bp)) {
879 case BP_EMBEDDED_TYPE_DATA:
880 if (featureflags & DMU_BACKUP_FEATURE_EMBED_DATA)
881 return (B_TRUE);
882 break;
883 default:
884 return (B_FALSE);
885 }
886 return (B_FALSE);
887 }
888
889 /*
890 * This function actually handles figuring out what kind of record needs to be
891 * dumped, and calling the appropriate helper function. In most cases,
892 * the data has already been read by send_reader_thread().
893 */
894 static int
895 do_dump(dmu_send_cookie_t *dscp, struct send_range *range)
896 {
897 int err = 0;
898 switch (range->type) {
899 case OBJECT:
900 err = dump_dnode(dscp, &range->sru.object.bp, range->object,
901 range->sru.object.dnp);
902 return (err);
903 case OBJECT_RANGE: {
904 ASSERT3U(range->start_blkid + 1, ==, range->end_blkid);
905 if (!(dscp->dsc_featureflags & DMU_BACKUP_FEATURE_RAW)) {
906 return (0);
907 }
908 uint64_t epb = BP_GET_LSIZE(&range->sru.object_range.bp) >>
909 DNODE_SHIFT;
910 uint64_t firstobj = range->start_blkid * epb;
911 err = dump_object_range(dscp, &range->sru.object_range.bp,
912 firstobj, epb);
913 break;
914 }
915 case REDACT: {
916 struct srr *srrp = &range->sru.redact;
917 err = dump_redact(dscp, range->object, range->start_blkid *
918 srrp->datablksz, (range->end_blkid - range->start_blkid) *
919 srrp->datablksz);
920 return (err);
921 }
922 case DATA: {
923 struct srd *srdp = &range->sru.data;
924 blkptr_t *bp = &srdp->bp;
925 spa_t *spa =
926 dmu_objset_spa(dscp->dsc_os);
927
928 ASSERT3U(srdp->datablksz, ==, BP_GET_LSIZE(bp));
929 ASSERT3U(range->start_blkid + 1, ==, range->end_blkid);
930 if (BP_GET_TYPE(bp) == DMU_OT_SA) {
931 arc_flags_t aflags = ARC_FLAG_WAIT;
932 enum zio_flag zioflags = ZIO_FLAG_CANFAIL;
933
934 if (dscp->dsc_featureflags & DMU_BACKUP_FEATURE_RAW) {
935 ASSERT(BP_IS_PROTECTED(bp));
936 zioflags |= ZIO_FLAG_RAW;
937 }
938
939 zbookmark_phys_t zb;
940 ASSERT3U(range->start_blkid, ==, DMU_SPILL_BLKID);
941 zb.zb_objset = dmu_objset_id(dscp->dsc_os);
942 zb.zb_object = range->object;
943 zb.zb_level = 0;
944 zb.zb_blkid = range->start_blkid;
945
946 arc_buf_t *abuf = NULL;
947 if (!dscp->dsc_dso->dso_dryrun && arc_read(NULL, spa,
948 bp, arc_getbuf_func, &abuf, ZIO_PRIORITY_ASYNC_READ,
949 zioflags, &aflags, &zb) != 0)
950 return (SET_ERROR(EIO));
951
952 err = dump_spill(dscp, bp, zb.zb_object,
953 (abuf == NULL ? NULL : abuf->b_data));
954 if (abuf != NULL)
955 arc_buf_destroy(abuf, &abuf);
956 return (err);
957 }
958 if (send_do_embed(bp, dscp->dsc_featureflags)) {
959 err = dump_write_embedded(dscp, range->object,
960 range->start_blkid * srdp->datablksz,
961 srdp->datablksz, bp);
962 return (err);
963 }
964 ASSERT(range->object > dscp->dsc_resume_object ||
965 (range->object == dscp->dsc_resume_object &&
966 range->start_blkid * srdp->datablksz >=
967 dscp->dsc_resume_offset));
968 /* it's a level-0 block of a regular object */
969
970 mutex_enter(&srdp->lock);
971 while (srdp->io_outstanding)
972 cv_wait(&srdp->cv, &srdp->lock);
973 err = srdp->io_err;
974 mutex_exit(&srdp->lock);
975
976 if (err != 0) {
977 if (zfs_send_corrupt_data &&
978 !dscp->dsc_dso->dso_dryrun) {
979 /*
980 * Send a block filled with 0x"zfs badd bloc"
981 */
982 srdp->abuf = arc_alloc_buf(spa, &srdp->abuf,
983 ARC_BUFC_DATA, srdp->datablksz);
984 uint64_t *ptr;
985 for (ptr = srdp->abuf->b_data;
986 (char *)ptr < (char *)srdp->abuf->b_data +
987 srdp->datablksz; ptr++)
988 *ptr = 0x2f5baddb10cULL;
989 } else {
990 return (SET_ERROR(EIO));
991 }
992 }
993
994 ASSERT(dscp->dsc_dso->dso_dryrun ||
995 srdp->abuf != NULL || srdp->abd != NULL);
996
997 uint64_t offset = range->start_blkid * srdp->datablksz;
998
999 char *data = NULL;
1000 if (srdp->abd != NULL) {
1001 data = abd_to_buf(srdp->abd);
1002 ASSERT3P(srdp->abuf, ==, NULL);
1003 } else if (srdp->abuf != NULL) {
1004 data = srdp->abuf->b_data;
1005 }
1006
1007 /*
1008 * If we have large blocks stored on disk but the send flags
1009 * don't allow us to send large blocks, we split the data from
1010 * the arc buf into chunks.
1011 */
1012 if (srdp->datablksz > SPA_OLD_MAXBLOCKSIZE &&
1013 !(dscp->dsc_featureflags &
1014 DMU_BACKUP_FEATURE_LARGE_BLOCKS)) {
1015 while (srdp->datablksz > 0 && err == 0) {
1016 int n = MIN(srdp->datablksz,
1017 SPA_OLD_MAXBLOCKSIZE);
1018 err = dmu_dump_write(dscp, srdp->obj_type,
1019 range->object, offset, n, n, NULL, data);
1020 offset += n;
1021 /*
1022 * When doing dry run, data==NULL is used as a
1023 * sentinel value by
1024 * dmu_dump_write()->dump_record().
1025 */
1026 if (data != NULL)
1027 data += n;
1028 srdp->datablksz -= n;
1029 }
1030 } else {
1031 err = dmu_dump_write(dscp, srdp->obj_type,
1032 range->object, offset,
1033 srdp->datablksz, srdp->datasz, bp, data);
1034 }
1035 return (err);
1036 }
1037 case HOLE: {
1038 struct srh *srhp = &range->sru.hole;
1039 if (range->object == DMU_META_DNODE_OBJECT) {
1040 uint32_t span = srhp->datablksz >> DNODE_SHIFT;
1041 uint64_t first_obj = range->start_blkid * span;
1042 uint64_t numobj = range->end_blkid * span - first_obj;
1043 return (dump_freeobjects(dscp, first_obj, numobj));
1044 }
1045 uint64_t offset = 0;
1046
1047 /*
1048 * If this multiply overflows, we don't need to send this block.
1049 * Even if it has a birth time, it can never not be a hole, so
1050 * we don't need to send records for it.
1051 */
1052 if (!overflow_multiply(range->start_blkid, srhp->datablksz,
1053 &offset)) {
1054 return (0);
1055 }
1056 uint64_t len = 0;
1057
1058 if (!overflow_multiply(range->end_blkid, srhp->datablksz, &len))
1059 len = UINT64_MAX;
1060 len = len - offset;
1061 return (dump_free(dscp, range->object, offset, len));
1062 }
1063 default:
1064 panic("Invalid range type in do_dump: %d", range->type);
1065 }
1066 return (err);
1067 }
1068
1069 static struct send_range *
1070 range_alloc(enum type type, uint64_t object, uint64_t start_blkid,
1071 uint64_t end_blkid, boolean_t eos)
1072 {
1073 struct send_range *range = kmem_alloc(sizeof (*range), KM_SLEEP);
1074 range->type = type;
1075 range->object = object;
1076 range->start_blkid = start_blkid;
1077 range->end_blkid = end_blkid;
1078 range->eos_marker = eos;
1079 if (type == DATA) {
1080 range->sru.data.abd = NULL;
1081 range->sru.data.abuf = NULL;
1082 mutex_init(&range->sru.data.lock, NULL, MUTEX_DEFAULT, NULL);
1083 cv_init(&range->sru.data.cv, NULL, CV_DEFAULT, NULL);
1084 range->sru.data.io_outstanding = 0;
1085 range->sru.data.io_err = 0;
1086 }
1087 return (range);
1088 }
1089
1090 /*
1091 * This is the callback function to traverse_dataset that acts as a worker
1092 * thread for dmu_send_impl.
1093 */
1094 /*ARGSUSED*/
1095 static int
1096 send_cb(spa_t *spa, zilog_t *zilog, const blkptr_t *bp,
1097 const zbookmark_phys_t *zb, const struct dnode_phys *dnp, void *arg)
1098 {
1099 struct send_thread_arg *sta = arg;
1100 struct send_range *record;
1101
1102 ASSERT(zb->zb_object == DMU_META_DNODE_OBJECT ||
1103 zb->zb_object >= sta->resume.zb_object);
1104
1105 /*
1106 * All bps of an encrypted os should have the encryption bit set.
1107 * If this is not true it indicates tampering and we report an error.
1108 */
1109 if (sta->os->os_encrypted &&
1110 !BP_IS_HOLE(bp) && !BP_USES_CRYPT(bp)) {
1111 spa_log_error(spa, zb);
1112 zfs_panic_recover("unencrypted block in encrypted "
1113 "object set %llu", dmu_objset_id(sta->os));
1114 return (SET_ERROR(EIO));
1115 }
1116
1117 if (sta->cancel)
1118 return (SET_ERROR(EINTR));
1119 if (zb->zb_object != DMU_META_DNODE_OBJECT &&
1120 DMU_OBJECT_IS_SPECIAL(zb->zb_object))
1121 return (0);
1122 atomic_inc_64(sta->num_blocks_visited);
1123
1124 if (zb->zb_level == ZB_DNODE_LEVEL) {
1125 if (zb->zb_object == DMU_META_DNODE_OBJECT)
1126 return (0);
1127 record = range_alloc(OBJECT, zb->zb_object, 0, 0, B_FALSE);
1128 record->sru.object.bp = *bp;
1129 size_t size = sizeof (*dnp) * (dnp->dn_extra_slots + 1);
1130 record->sru.object.dnp = kmem_alloc(size, KM_SLEEP);
1131 bcopy(dnp, record->sru.object.dnp, size);
1132 bqueue_enqueue(&sta->q, record, sizeof (*record));
1133 return (0);
1134 }
1135 if (zb->zb_level == 0 && zb->zb_object == DMU_META_DNODE_OBJECT &&
1136 !BP_IS_HOLE(bp)) {
1137 record = range_alloc(OBJECT_RANGE, 0, zb->zb_blkid,
1138 zb->zb_blkid + 1, B_FALSE);
1139 record->sru.object_range.bp = *bp;
1140 bqueue_enqueue(&sta->q, record, sizeof (*record));
1141 return (0);
1142 }
1143 if (zb->zb_level < 0 || (zb->zb_level > 0 && !BP_IS_HOLE(bp)))
1144 return (0);
1145 if (zb->zb_object == DMU_META_DNODE_OBJECT && !BP_IS_HOLE(bp))
1146 return (0);
1147
1148 uint64_t span = bp_span_in_blocks(dnp->dn_indblkshift, zb->zb_level);
1149 uint64_t start;
1150
1151 /*
1152 * If this multiply overflows, we don't need to send this block.
1153 * Even if it has a birth time, it can never not be a hole, so
1154 * we don't need to send records for it.
1155 */
1156 if (!overflow_multiply(span, zb->zb_blkid, &start) || (!(zb->zb_blkid ==
1157 DMU_SPILL_BLKID || DMU_OT_IS_METADATA(dnp->dn_type)) &&
1158 span * zb->zb_blkid > dnp->dn_maxblkid)) {
1159 ASSERT(BP_IS_HOLE(bp));
1160 return (0);
1161 }
1162
1163 if (zb->zb_blkid == DMU_SPILL_BLKID)
1164 ASSERT3U(BP_GET_TYPE(bp), ==, DMU_OT_SA);
1165
1166 enum type record_type = DATA;
1167 if (BP_IS_HOLE(bp))
1168 record_type = HOLE;
1169 else if (BP_IS_REDACTED(bp))
1170 record_type = REDACT;
1171 else
1172 record_type = DATA;
1173
1174 record = range_alloc(record_type, zb->zb_object, start,
1175 (start + span < start ? 0 : start + span), B_FALSE);
1176
1177 uint64_t datablksz = (zb->zb_blkid == DMU_SPILL_BLKID ?
1178 BP_GET_LSIZE(bp) : dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT);
1179
1180 if (BP_IS_HOLE(bp)) {
1181 record->sru.hole.datablksz = datablksz;
1182 } else if (BP_IS_REDACTED(bp)) {
1183 record->sru.redact.datablksz = datablksz;
1184 } else {
1185 record->sru.data.datablksz = datablksz;
1186 record->sru.data.obj_type = dnp->dn_type;
1187 record->sru.data.bp = *bp;
1188 }
1189
1190 bqueue_enqueue(&sta->q, record, sizeof (*record));
1191 return (0);
1192 }
1193
1194 struct redact_list_cb_arg {
1195 uint64_t *num_blocks_visited;
1196 bqueue_t *q;
1197 boolean_t *cancel;
1198 boolean_t mark_redact;
1199 };
1200
1201 static int
1202 redact_list_cb(redact_block_phys_t *rb, void *arg)
1203 {
1204 struct redact_list_cb_arg *rlcap = arg;
1205
1206 atomic_inc_64(rlcap->num_blocks_visited);
1207 if (*rlcap->cancel)
1208 return (-1);
1209
1210 struct send_range *data = range_alloc(REDACT, rb->rbp_object,
1211 rb->rbp_blkid, rb->rbp_blkid + redact_block_get_count(rb), B_FALSE);
1212 ASSERT3U(data->end_blkid, >, rb->rbp_blkid);
1213 if (rlcap->mark_redact) {
1214 data->type = REDACT;
1215 data->sru.redact.datablksz = redact_block_get_size(rb);
1216 } else {
1217 data->type = PREVIOUSLY_REDACTED;
1218 }
1219 bqueue_enqueue(rlcap->q, data, sizeof (*data));
1220
1221 return (0);
1222 }
1223
1224 /*
1225 * This function kicks off the traverse_dataset. It also handles setting the
1226 * error code of the thread in case something goes wrong, and pushes the End of
1227 * Stream record when the traverse_dataset call has finished.
1228 */
1229 static void
1230 send_traverse_thread(void *arg)
1231 {
1232 struct send_thread_arg *st_arg = arg;
1233 int err = 0;
1234 struct send_range *data;
1235 fstrans_cookie_t cookie = spl_fstrans_mark();
1236
1237 err = traverse_dataset_resume(st_arg->os->os_dsl_dataset,
1238 st_arg->fromtxg, &st_arg->resume,
1239 st_arg->flags, send_cb, st_arg);
1240
1241 if (err != EINTR)
1242 st_arg->error_code = err;
1243 data = range_alloc(DATA, 0, 0, 0, B_TRUE);
1244 bqueue_enqueue_flush(&st_arg->q, data, sizeof (*data));
1245 spl_fstrans_unmark(cookie);
1246 thread_exit();
1247 }
1248
1249 /*
1250 * Utility function that causes End of Stream records to compare after of all
1251 * others, so that other threads' comparison logic can stay simple.
1252 */
1253 static int __attribute__((unused))
1254 send_range_after(const struct send_range *from, const struct send_range *to)
1255 {
1256 if (from->eos_marker == B_TRUE)
1257 return (1);
1258 if (to->eos_marker == B_TRUE)
1259 return (-1);
1260
1261 uint64_t from_obj = from->object;
1262 uint64_t from_end_obj = from->object + 1;
1263 uint64_t to_obj = to->object;
1264 uint64_t to_end_obj = to->object + 1;
1265 if (from_obj == 0) {
1266 ASSERT(from->type == HOLE || from->type == OBJECT_RANGE);
1267 from_obj = from->start_blkid << DNODES_PER_BLOCK_SHIFT;
1268 from_end_obj = from->end_blkid << DNODES_PER_BLOCK_SHIFT;
1269 }
1270 if (to_obj == 0) {
1271 ASSERT(to->type == HOLE || to->type == OBJECT_RANGE);
1272 to_obj = to->start_blkid << DNODES_PER_BLOCK_SHIFT;
1273 to_end_obj = to->end_blkid << DNODES_PER_BLOCK_SHIFT;
1274 }
1275
1276 if (from_end_obj <= to_obj)
1277 return (-1);
1278 if (from_obj >= to_end_obj)
1279 return (1);
1280 int64_t cmp = TREE_CMP(to->type == OBJECT_RANGE, from->type ==
1281 OBJECT_RANGE);
1282 if (unlikely(cmp))
1283 return (cmp);
1284 cmp = TREE_CMP(to->type == OBJECT, from->type == OBJECT);
1285 if (unlikely(cmp))
1286 return (cmp);
1287 if (from->end_blkid <= to->start_blkid)
1288 return (-1);
1289 if (from->start_blkid >= to->end_blkid)
1290 return (1);
1291 return (0);
1292 }
1293
1294 /*
1295 * Pop the new data off the queue, check that the records we receive are in
1296 * the right order, but do not free the old data. This is used so that the
1297 * records can be sent on to the main thread without copying the data.
1298 */
1299 static struct send_range *
1300 get_next_range_nofree(bqueue_t *bq, struct send_range *prev)
1301 {
1302 struct send_range *next = bqueue_dequeue(bq);
1303 ASSERT3S(send_range_after(prev, next), ==, -1);
1304 return (next);
1305 }
1306
1307 /*
1308 * Pop the new data off the queue, check that the records we receive are in
1309 * the right order, and free the old data.
1310 */
1311 static struct send_range *
1312 get_next_range(bqueue_t *bq, struct send_range *prev)
1313 {
1314 struct send_range *next = get_next_range_nofree(bq, prev);
1315 range_free(prev);
1316 return (next);
1317 }
1318
1319 static void
1320 redact_list_thread(void *arg)
1321 {
1322 struct redact_list_thread_arg *rlt_arg = arg;
1323 struct send_range *record;
1324 fstrans_cookie_t cookie = spl_fstrans_mark();
1325 if (rlt_arg->rl != NULL) {
1326 struct redact_list_cb_arg rlcba = {0};
1327 rlcba.cancel = &rlt_arg->cancel;
1328 rlcba.q = &rlt_arg->q;
1329 rlcba.num_blocks_visited = rlt_arg->num_blocks_visited;
1330 rlcba.mark_redact = rlt_arg->mark_redact;
1331 int err = dsl_redaction_list_traverse(rlt_arg->rl,
1332 &rlt_arg->resume, redact_list_cb, &rlcba);
1333 if (err != EINTR)
1334 rlt_arg->error_code = err;
1335 }
1336 record = range_alloc(DATA, 0, 0, 0, B_TRUE);
1337 bqueue_enqueue_flush(&rlt_arg->q, record, sizeof (*record));
1338 spl_fstrans_unmark(cookie);
1339
1340 thread_exit();
1341 }
1342
1343 /*
1344 * Compare the start point of the two provided ranges. End of stream ranges
1345 * compare last, objects compare before any data or hole inside that object and
1346 * multi-object holes that start at the same object.
1347 */
1348 static int
1349 send_range_start_compare(struct send_range *r1, struct send_range *r2)
1350 {
1351 uint64_t r1_objequiv = r1->object;
1352 uint64_t r1_l0equiv = r1->start_blkid;
1353 uint64_t r2_objequiv = r2->object;
1354 uint64_t r2_l0equiv = r2->start_blkid;
1355 int64_t cmp = TREE_CMP(r1->eos_marker, r2->eos_marker);
1356 if (unlikely(cmp))
1357 return (cmp);
1358 if (r1->object == 0) {
1359 r1_objequiv = r1->start_blkid * DNODES_PER_BLOCK;
1360 r1_l0equiv = 0;
1361 }
1362 if (r2->object == 0) {
1363 r2_objequiv = r2->start_blkid * DNODES_PER_BLOCK;
1364 r2_l0equiv = 0;
1365 }
1366
1367 cmp = TREE_CMP(r1_objequiv, r2_objequiv);
1368 if (likely(cmp))
1369 return (cmp);
1370 cmp = TREE_CMP(r2->type == OBJECT_RANGE, r1->type == OBJECT_RANGE);
1371 if (unlikely(cmp))
1372 return (cmp);
1373 cmp = TREE_CMP(r2->type == OBJECT, r1->type == OBJECT);
1374 if (unlikely(cmp))
1375 return (cmp);
1376
1377 return (TREE_CMP(r1_l0equiv, r2_l0equiv));
1378 }
1379
1380 enum q_idx {
1381 REDACT_IDX = 0,
1382 TO_IDX,
1383 FROM_IDX,
1384 NUM_THREADS
1385 };
1386
1387 /*
1388 * This function returns the next range the send_merge_thread should operate on.
1389 * The inputs are two arrays; the first one stores the range at the front of the
1390 * queues stored in the second one. The ranges are sorted in descending
1391 * priority order; the metadata from earlier ranges overrules metadata from
1392 * later ranges. out_mask is used to return which threads the ranges came from;
1393 * bit i is set if ranges[i] started at the same place as the returned range.
1394 *
1395 * This code is not hardcoded to compare a specific number of threads; it could
1396 * be used with any number, just by changing the q_idx enum.
1397 *
1398 * The "next range" is the one with the earliest start; if two starts are equal,
1399 * the highest-priority range is the next to operate on. If a higher-priority
1400 * range starts in the middle of the first range, then the first range will be
1401 * truncated to end where the higher-priority range starts, and we will operate
1402 * on that one next time. In this way, we make sure that each block covered by
1403 * some range gets covered by a returned range, and each block covered is
1404 * returned using the metadata of the highest-priority range it appears in.
1405 *
1406 * For example, if the three ranges at the front of the queues were [2,4),
1407 * [3,5), and [1,3), then the ranges returned would be [1,2) with the metadata
1408 * from the third range, [2,4) with the metadata from the first range, and then
1409 * [4,5) with the metadata from the second.
1410 */
1411 static struct send_range *
1412 find_next_range(struct send_range **ranges, bqueue_t **qs, uint64_t *out_mask)
1413 {
1414 int idx = 0; // index of the range with the earliest start
1415 int i;
1416 uint64_t bmask = 0;
1417 for (i = 1; i < NUM_THREADS; i++) {
1418 if (send_range_start_compare(ranges[i], ranges[idx]) < 0)
1419 idx = i;
1420 }
1421 if (ranges[idx]->eos_marker) {
1422 struct send_range *ret = range_alloc(DATA, 0, 0, 0, B_TRUE);
1423 *out_mask = 0;
1424 return (ret);
1425 }
1426 /*
1427 * Find all the ranges that start at that same point.
1428 */
1429 for (i = 0; i < NUM_THREADS; i++) {
1430 if (send_range_start_compare(ranges[i], ranges[idx]) == 0)
1431 bmask |= 1 << i;
1432 }
1433 *out_mask = bmask;
1434 /*
1435 * OBJECT_RANGE records only come from the TO thread, and should always
1436 * be treated as overlapping with nothing and sent on immediately. They
1437 * are only used in raw sends, and are never redacted.
1438 */
1439 if (ranges[idx]->type == OBJECT_RANGE) {
1440 ASSERT3U(idx, ==, TO_IDX);
1441 ASSERT3U(*out_mask, ==, 1 << TO_IDX);
1442 struct send_range *ret = ranges[idx];
1443 ranges[idx] = get_next_range_nofree(qs[idx], ranges[idx]);
1444 return (ret);
1445 }
1446 /*
1447 * Find the first start or end point after the start of the first range.
1448 */
1449 uint64_t first_change = ranges[idx]->end_blkid;
1450 for (i = 0; i < NUM_THREADS; i++) {
1451 if (i == idx || ranges[i]->eos_marker ||
1452 ranges[i]->object > ranges[idx]->object ||
1453 ranges[i]->object == DMU_META_DNODE_OBJECT)
1454 continue;
1455 ASSERT3U(ranges[i]->object, ==, ranges[idx]->object);
1456 if (first_change > ranges[i]->start_blkid &&
1457 (bmask & (1 << i)) == 0)
1458 first_change = ranges[i]->start_blkid;
1459 else if (first_change > ranges[i]->end_blkid)
1460 first_change = ranges[i]->end_blkid;
1461 }
1462 /*
1463 * Update all ranges to no longer overlap with the range we're
1464 * returning. All such ranges must start at the same place as the range
1465 * being returned, and end at or after first_change. Thus we update
1466 * their start to first_change. If that makes them size 0, then free
1467 * them and pull a new range from that thread.
1468 */
1469 for (i = 0; i < NUM_THREADS; i++) {
1470 if (i == idx || (bmask & (1 << i)) == 0)
1471 continue;
1472 ASSERT3U(first_change, >, ranges[i]->start_blkid);
1473 ranges[i]->start_blkid = first_change;
1474 ASSERT3U(ranges[i]->start_blkid, <=, ranges[i]->end_blkid);
1475 if (ranges[i]->start_blkid == ranges[i]->end_blkid)
1476 ranges[i] = get_next_range(qs[i], ranges[i]);
1477 }
1478 /*
1479 * Short-circuit the simple case; if the range doesn't overlap with
1480 * anything else, or it only overlaps with things that start at the same
1481 * place and are longer, send it on.
1482 */
1483 if (first_change == ranges[idx]->end_blkid) {
1484 struct send_range *ret = ranges[idx];
1485 ranges[idx] = get_next_range_nofree(qs[idx], ranges[idx]);
1486 return (ret);
1487 }
1488
1489 /*
1490 * Otherwise, return a truncated copy of ranges[idx] and move the start
1491 * of ranges[idx] back to first_change.
1492 */
1493 struct send_range *ret = kmem_alloc(sizeof (*ret), KM_SLEEP);
1494 *ret = *ranges[idx];
1495 ret->end_blkid = first_change;
1496 ranges[idx]->start_blkid = first_change;
1497 return (ret);
1498 }
1499
1500 #define FROM_AND_REDACT_BITS ((1 << REDACT_IDX) | (1 << FROM_IDX))
1501
1502 /*
1503 * Merge the results from the from thread and the to thread, and then hand the
1504 * records off to send_prefetch_thread to prefetch them. If this is not a
1505 * send from a redaction bookmark, the from thread will push an end of stream
1506 * record and stop, and we'll just send everything that was changed in the
1507 * to_ds since the ancestor's creation txg. If it is, then since
1508 * traverse_dataset has a canonical order, we can compare each change as
1509 * they're pulled off the queues. That will give us a stream that is
1510 * appropriately sorted, and covers all records. In addition, we pull the
1511 * data from the redact_list_thread and use that to determine which blocks
1512 * should be redacted.
1513 */
1514 static void
1515 send_merge_thread(void *arg)
1516 {
1517 struct send_merge_thread_arg *smt_arg = arg;
1518 struct send_range *front_ranges[NUM_THREADS];
1519 bqueue_t *queues[NUM_THREADS];
1520 int err = 0;
1521 fstrans_cookie_t cookie = spl_fstrans_mark();
1522
1523 if (smt_arg->redact_arg == NULL) {
1524 front_ranges[REDACT_IDX] =
1525 kmem_zalloc(sizeof (struct send_range), KM_SLEEP);
1526 front_ranges[REDACT_IDX]->eos_marker = B_TRUE;
1527 front_ranges[REDACT_IDX]->type = REDACT;
1528 queues[REDACT_IDX] = NULL;
1529 } else {
1530 front_ranges[REDACT_IDX] =
1531 bqueue_dequeue(&smt_arg->redact_arg->q);
1532 queues[REDACT_IDX] = &smt_arg->redact_arg->q;
1533 }
1534 front_ranges[TO_IDX] = bqueue_dequeue(&smt_arg->to_arg->q);
1535 queues[TO_IDX] = &smt_arg->to_arg->q;
1536 front_ranges[FROM_IDX] = bqueue_dequeue(&smt_arg->from_arg->q);
1537 queues[FROM_IDX] = &smt_arg->from_arg->q;
1538 uint64_t mask = 0;
1539 struct send_range *range;
1540 for (range = find_next_range(front_ranges, queues, &mask);
1541 !range->eos_marker && err == 0 && !smt_arg->cancel;
1542 range = find_next_range(front_ranges, queues, &mask)) {
1543 /*
1544 * If the range in question was in both the from redact bookmark
1545 * and the bookmark we're using to redact, then don't send it.
1546 * It's already redacted on the receiving system, so a redaction
1547 * record would be redundant.
1548 */
1549 if ((mask & FROM_AND_REDACT_BITS) == FROM_AND_REDACT_BITS) {
1550 ASSERT3U(range->type, ==, REDACT);
1551 range_free(range);
1552 continue;
1553 }
1554 bqueue_enqueue(&smt_arg->q, range, sizeof (*range));
1555
1556 if (smt_arg->to_arg->error_code != 0) {
1557 err = smt_arg->to_arg->error_code;
1558 } else if (smt_arg->from_arg->error_code != 0) {
1559 err = smt_arg->from_arg->error_code;
1560 } else if (smt_arg->redact_arg != NULL &&
1561 smt_arg->redact_arg->error_code != 0) {
1562 err = smt_arg->redact_arg->error_code;
1563 }
1564 }
1565 if (smt_arg->cancel && err == 0)
1566 err = SET_ERROR(EINTR);
1567 smt_arg->error = err;
1568 if (smt_arg->error != 0) {
1569 smt_arg->to_arg->cancel = B_TRUE;
1570 smt_arg->from_arg->cancel = B_TRUE;
1571 if (smt_arg->redact_arg != NULL)
1572 smt_arg->redact_arg->cancel = B_TRUE;
1573 }
1574 for (int i = 0; i < NUM_THREADS; i++) {
1575 while (!front_ranges[i]->eos_marker) {
1576 front_ranges[i] = get_next_range(queues[i],
1577 front_ranges[i]);
1578 }
1579 range_free(front_ranges[i]);
1580 }
1581 if (range == NULL)
1582 range = kmem_zalloc(sizeof (*range), KM_SLEEP);
1583 range->eos_marker = B_TRUE;
1584 bqueue_enqueue_flush(&smt_arg->q, range, 1);
1585 spl_fstrans_unmark(cookie);
1586 thread_exit();
1587 }
1588
1589 struct send_reader_thread_arg {
1590 struct send_merge_thread_arg *smta;
1591 bqueue_t q;
1592 boolean_t cancel;
1593 boolean_t issue_reads;
1594 uint64_t featureflags;
1595 int error;
1596 };
1597
1598 static void
1599 dmu_send_read_done(zio_t *zio)
1600 {
1601 struct send_range *range = zio->io_private;
1602
1603 mutex_enter(&range->sru.data.lock);
1604 if (zio->io_error != 0) {
1605 abd_free(range->sru.data.abd);
1606 range->sru.data.abd = NULL;
1607 range->sru.data.io_err = zio->io_error;
1608 }
1609
1610 ASSERT(range->sru.data.io_outstanding);
1611 range->sru.data.io_outstanding = B_FALSE;
1612 cv_broadcast(&range->sru.data.cv);
1613 mutex_exit(&range->sru.data.lock);
1614 }
1615
1616 static void
1617 issue_data_read(struct send_reader_thread_arg *srta, struct send_range *range)
1618 {
1619 struct srd *srdp = &range->sru.data;
1620 blkptr_t *bp = &srdp->bp;
1621 objset_t *os = srta->smta->os;
1622
1623 ASSERT3U(range->type, ==, DATA);
1624 ASSERT3U(range->start_blkid + 1, ==, range->end_blkid);
1625 /*
1626 * If we have large blocks stored on disk but
1627 * the send flags don't allow us to send large
1628 * blocks, we split the data from the arc buf
1629 * into chunks.
1630 */
1631 boolean_t split_large_blocks =
1632 srdp->datablksz > SPA_OLD_MAXBLOCKSIZE &&
1633 !(srta->featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS);
1634 /*
1635 * We should only request compressed data from the ARC if all
1636 * the following are true:
1637 * - stream compression was requested
1638 * - we aren't splitting large blocks into smaller chunks
1639 * - the data won't need to be byteswapped before sending
1640 * - this isn't an embedded block
1641 * - this isn't metadata (if receiving on a different endian
1642 * system it can be byteswapped more easily)
1643 */
1644 boolean_t request_compressed =
1645 (srta->featureflags & DMU_BACKUP_FEATURE_COMPRESSED) &&
1646 !split_large_blocks && !BP_SHOULD_BYTESWAP(bp) &&
1647 !BP_IS_EMBEDDED(bp) && !DMU_OT_IS_METADATA(BP_GET_TYPE(bp));
1648
1649 enum zio_flag zioflags = ZIO_FLAG_CANFAIL;
1650
1651 if (srta->featureflags & DMU_BACKUP_FEATURE_RAW)
1652 zioflags |= ZIO_FLAG_RAW;
1653 else if (request_compressed)
1654 zioflags |= ZIO_FLAG_RAW_COMPRESS;
1655
1656 srdp->datasz = (zioflags & ZIO_FLAG_RAW_COMPRESS) ?
1657 BP_GET_PSIZE(bp) : BP_GET_LSIZE(bp);
1658
1659 if (!srta->issue_reads)
1660 return;
1661 if (BP_IS_REDACTED(bp))
1662 return;
1663 if (send_do_embed(bp, srta->featureflags))
1664 return;
1665
1666 zbookmark_phys_t zb = {
1667 .zb_objset = dmu_objset_id(os),
1668 .zb_object = range->object,
1669 .zb_level = 0,
1670 .zb_blkid = range->start_blkid,
1671 };
1672
1673 arc_flags_t aflags = ARC_FLAG_CACHED_ONLY;
1674
1675 int arc_err = arc_read(NULL, os->os_spa, bp,
1676 arc_getbuf_func, &srdp->abuf, ZIO_PRIORITY_ASYNC_READ,
1677 zioflags, &aflags, &zb);
1678 /*
1679 * If the data is not already cached in the ARC, we read directly
1680 * from zio. This avoids the performance overhead of adding a new
1681 * entry to the ARC, and we also avoid polluting the ARC cache with
1682 * data that is not likely to be used in the future.
1683 */
1684 if (arc_err != 0) {
1685 srdp->abd = abd_alloc_linear(srdp->datasz, B_FALSE);
1686 srdp->io_outstanding = B_TRUE;
1687 zio_nowait(zio_read(NULL, os->os_spa, bp, srdp->abd,
1688 srdp->datasz, dmu_send_read_done, range,
1689 ZIO_PRIORITY_ASYNC_READ, zioflags, &zb));
1690 }
1691 }
1692
1693 /*
1694 * Create a new record with the given values.
1695 */
1696 static void
1697 enqueue_range(struct send_reader_thread_arg *srta, bqueue_t *q, dnode_t *dn,
1698 uint64_t blkid, uint64_t count, const blkptr_t *bp, uint32_t datablksz)
1699 {
1700 enum type range_type = (bp == NULL || BP_IS_HOLE(bp) ? HOLE :
1701 (BP_IS_REDACTED(bp) ? REDACT : DATA));
1702
1703 struct send_range *range = range_alloc(range_type, dn->dn_object,
1704 blkid, blkid + count, B_FALSE);
1705
1706 if (blkid == DMU_SPILL_BLKID)
1707 ASSERT3U(BP_GET_TYPE(bp), ==, DMU_OT_SA);
1708
1709 switch (range_type) {
1710 case HOLE:
1711 range->sru.hole.datablksz = datablksz;
1712 break;
1713 case DATA:
1714 ASSERT3U(count, ==, 1);
1715 range->sru.data.datablksz = datablksz;
1716 range->sru.data.obj_type = dn->dn_type;
1717 range->sru.data.bp = *bp;
1718 issue_data_read(srta, range);
1719 break;
1720 case REDACT:
1721 range->sru.redact.datablksz = datablksz;
1722 break;
1723 default:
1724 break;
1725 }
1726 bqueue_enqueue(q, range, datablksz);
1727 }
1728
1729 /*
1730 * This thread is responsible for two things: First, it retrieves the correct
1731 * blkptr in the to ds if we need to send the data because of something from
1732 * the from thread. As a result of this, we're the first ones to discover that
1733 * some indirect blocks can be discarded because they're not holes. Second,
1734 * it issues prefetches for the data we need to send.
1735 */
1736 static void
1737 send_reader_thread(void *arg)
1738 {
1739 struct send_reader_thread_arg *srta = arg;
1740 struct send_merge_thread_arg *smta = srta->smta;
1741 bqueue_t *inq = &smta->q;
1742 bqueue_t *outq = &srta->q;
1743 objset_t *os = smta->os;
1744 fstrans_cookie_t cookie = spl_fstrans_mark();
1745 struct send_range *range = bqueue_dequeue(inq);
1746 int err = 0;
1747
1748 /*
1749 * If the record we're analyzing is from a redaction bookmark from the
1750 * fromds, then we need to know whether or not it exists in the tods so
1751 * we know whether to create records for it or not. If it does, we need
1752 * the datablksz so we can generate an appropriate record for it.
1753 * Finally, if it isn't redacted, we need the blkptr so that we can send
1754 * a WRITE record containing the actual data.
1755 */
1756 uint64_t last_obj = UINT64_MAX;
1757 uint64_t last_obj_exists = B_TRUE;
1758 while (!range->eos_marker && !srta->cancel && smta->error == 0 &&
1759 err == 0) {
1760 switch (range->type) {
1761 case DATA:
1762 issue_data_read(srta, range);
1763 bqueue_enqueue(outq, range, range->sru.data.datablksz);
1764 range = get_next_range_nofree(inq, range);
1765 break;
1766 case HOLE:
1767 case OBJECT:
1768 case OBJECT_RANGE:
1769 case REDACT: // Redacted blocks must exist
1770 bqueue_enqueue(outq, range, sizeof (*range));
1771 range = get_next_range_nofree(inq, range);
1772 break;
1773 case PREVIOUSLY_REDACTED: {
1774 /*
1775 * This entry came from the "from bookmark" when
1776 * sending from a bookmark that has a redaction
1777 * list. We need to check if this object/blkid
1778 * exists in the target ("to") dataset, and if
1779 * not then we drop this entry. We also need
1780 * to fill in the block pointer so that we know
1781 * what to prefetch.
1782 *
1783 * To accomplish the above, we first cache whether or
1784 * not the last object we examined exists. If it
1785 * doesn't, we can drop this record. If it does, we hold
1786 * the dnode and use it to call dbuf_dnode_findbp. We do
1787 * this instead of dbuf_bookmark_findbp because we will
1788 * often operate on large ranges, and holding the dnode
1789 * once is more efficient.
1790 */
1791 boolean_t object_exists = B_TRUE;
1792 /*
1793 * If the data is redacted, we only care if it exists,
1794 * so that we don't send records for objects that have
1795 * been deleted.
1796 */
1797 dnode_t *dn;
1798 if (range->object == last_obj && !last_obj_exists) {
1799 /*
1800 * If we're still examining the same object as
1801 * previously, and it doesn't exist, we don't
1802 * need to call dbuf_bookmark_findbp.
1803 */
1804 object_exists = B_FALSE;
1805 } else {
1806 err = dnode_hold(os, range->object, FTAG, &dn);
1807 if (err == ENOENT) {
1808 object_exists = B_FALSE;
1809 err = 0;
1810 }
1811 last_obj = range->object;
1812 last_obj_exists = object_exists;
1813 }
1814
1815 if (err != 0) {
1816 break;
1817 } else if (!object_exists) {
1818 /*
1819 * The block was modified, but doesn't
1820 * exist in the to dataset; if it was
1821 * deleted in the to dataset, then we'll
1822 * visit the hole bp for it at some point.
1823 */
1824 range = get_next_range(inq, range);
1825 continue;
1826 }
1827 uint64_t file_max =
1828 (dn->dn_maxblkid < range->end_blkid ?
1829 dn->dn_maxblkid : range->end_blkid);
1830 /*
1831 * The object exists, so we need to try to find the
1832 * blkptr for each block in the range we're processing.
1833 */
1834 rw_enter(&dn->dn_struct_rwlock, RW_READER);
1835 for (uint64_t blkid = range->start_blkid;
1836 blkid < file_max; blkid++) {
1837 blkptr_t bp;
1838 uint32_t datablksz =
1839 dn->dn_phys->dn_datablkszsec <<
1840 SPA_MINBLOCKSHIFT;
1841 uint64_t offset = blkid * datablksz;
1842 /*
1843 * This call finds the next non-hole block in
1844 * the object. This is to prevent a
1845 * performance problem where we're unredacting
1846 * a large hole. Using dnode_next_offset to
1847 * skip over the large hole avoids iterating
1848 * over every block in it.
1849 */
1850 err = dnode_next_offset(dn, DNODE_FIND_HAVELOCK,
1851 &offset, 1, 1, 0);
1852 if (err == ESRCH) {
1853 offset = UINT64_MAX;
1854 err = 0;
1855 } else if (err != 0) {
1856 break;
1857 }
1858 if (offset != blkid * datablksz) {
1859 /*
1860 * if there is a hole from here
1861 * (blkid) to offset
1862 */
1863 offset = MIN(offset, file_max *
1864 datablksz);
1865 uint64_t nblks = (offset / datablksz) -
1866 blkid;
1867 enqueue_range(srta, outq, dn, blkid,
1868 nblks, NULL, datablksz);
1869 blkid += nblks;
1870 }
1871 if (blkid >= file_max)
1872 break;
1873 err = dbuf_dnode_findbp(dn, 0, blkid, &bp,
1874 NULL, NULL);
1875 if (err != 0)
1876 break;
1877 ASSERT(!BP_IS_HOLE(&bp));
1878 enqueue_range(srta, outq, dn, blkid, 1, &bp,
1879 datablksz);
1880 }
1881 rw_exit(&dn->dn_struct_rwlock);
1882 dnode_rele(dn, FTAG);
1883 range = get_next_range(inq, range);
1884 }
1885 }
1886 }
1887 if (srta->cancel || err != 0) {
1888 smta->cancel = B_TRUE;
1889 srta->error = err;
1890 } else if (smta->error != 0) {
1891 srta->error = smta->error;
1892 }
1893 while (!range->eos_marker)
1894 range = get_next_range(inq, range);
1895
1896 bqueue_enqueue_flush(outq, range, 1);
1897 spl_fstrans_unmark(cookie);
1898 thread_exit();
1899 }
1900
1901 #define NUM_SNAPS_NOT_REDACTED UINT64_MAX
1902
1903 struct dmu_send_params {
1904 /* Pool args */
1905 void *tag; // Tag that dp was held with, will be used to release dp.
1906 dsl_pool_t *dp;
1907 /* To snapshot args */
1908 const char *tosnap;
1909 dsl_dataset_t *to_ds;
1910 /* From snapshot args */
1911 zfs_bookmark_phys_t ancestor_zb;
1912 uint64_t *fromredactsnaps;
1913 /* NUM_SNAPS_NOT_REDACTED if not sending from redaction bookmark */
1914 uint64_t numfromredactsnaps;
1915 /* Stream params */
1916 boolean_t is_clone;
1917 boolean_t embedok;
1918 boolean_t large_block_ok;
1919 boolean_t compressok;
1920 boolean_t rawok;
1921 boolean_t savedok;
1922 uint64_t resumeobj;
1923 uint64_t resumeoff;
1924 uint64_t saved_guid;
1925 zfs_bookmark_phys_t *redactbook;
1926 /* Stream output params */
1927 dmu_send_outparams_t *dso;
1928
1929 /* Stream progress params */
1930 offset_t *off;
1931 int outfd;
1932 char saved_toname[MAXNAMELEN];
1933 };
1934
1935 static int
1936 setup_featureflags(struct dmu_send_params *dspp, objset_t *os,
1937 uint64_t *featureflags)
1938 {
1939 dsl_dataset_t *to_ds = dspp->to_ds;
1940 dsl_pool_t *dp = dspp->dp;
1941 #ifdef _KERNEL
1942 if (dmu_objset_type(os) == DMU_OST_ZFS) {
1943 uint64_t version;
1944 if (zfs_get_zplprop(os, ZFS_PROP_VERSION, &version) != 0)
1945 return (SET_ERROR(EINVAL));
1946
1947 if (version >= ZPL_VERSION_SA)
1948 *featureflags |= DMU_BACKUP_FEATURE_SA_SPILL;
1949 }
1950 #endif
1951
1952 /* raw sends imply large_block_ok */
1953 if ((dspp->rawok || dspp->large_block_ok) &&
1954 dsl_dataset_feature_is_active(to_ds, SPA_FEATURE_LARGE_BLOCKS)) {
1955 *featureflags |= DMU_BACKUP_FEATURE_LARGE_BLOCKS;
1956 }
1957
1958 /* encrypted datasets will not have embedded blocks */
1959 if ((dspp->embedok || dspp->rawok) && !os->os_encrypted &&
1960 spa_feature_is_active(dp->dp_spa, SPA_FEATURE_EMBEDDED_DATA)) {
1961 *featureflags |= DMU_BACKUP_FEATURE_EMBED_DATA;
1962 }
1963
1964 /* raw send implies compressok */
1965 if (dspp->compressok || dspp->rawok)
1966 *featureflags |= DMU_BACKUP_FEATURE_COMPRESSED;
1967
1968 if (dspp->rawok && os->os_encrypted)
1969 *featureflags |= DMU_BACKUP_FEATURE_RAW;
1970
1971 if ((*featureflags &
1972 (DMU_BACKUP_FEATURE_EMBED_DATA | DMU_BACKUP_FEATURE_COMPRESSED |
1973 DMU_BACKUP_FEATURE_RAW)) != 0 &&
1974 spa_feature_is_active(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS)) {
1975 *featureflags |= DMU_BACKUP_FEATURE_LZ4;
1976 }
1977
1978 /*
1979 * We specifically do not include DMU_BACKUP_FEATURE_EMBED_DATA here to
1980 * allow sending ZSTD compressed datasets to a receiver that does not
1981 * support ZSTD
1982 */
1983 if ((*featureflags &
1984 (DMU_BACKUP_FEATURE_COMPRESSED | DMU_BACKUP_FEATURE_RAW)) != 0 &&
1985 dsl_dataset_feature_is_active(to_ds, SPA_FEATURE_ZSTD_COMPRESS)) {
1986 *featureflags |= DMU_BACKUP_FEATURE_ZSTD;
1987 }
1988
1989 if (dspp->resumeobj != 0 || dspp->resumeoff != 0) {
1990 *featureflags |= DMU_BACKUP_FEATURE_RESUMING;
1991 }
1992
1993 if (dspp->redactbook != NULL) {
1994 *featureflags |= DMU_BACKUP_FEATURE_REDACTED;
1995 }
1996
1997 if (dsl_dataset_feature_is_active(to_ds, SPA_FEATURE_LARGE_DNODE)) {
1998 *featureflags |= DMU_BACKUP_FEATURE_LARGE_DNODE;
1999 }
2000 return (0);
2001 }
2002
2003 static dmu_replay_record_t *
2004 create_begin_record(struct dmu_send_params *dspp, objset_t *os,
2005 uint64_t featureflags)
2006 {
2007 dmu_replay_record_t *drr = kmem_zalloc(sizeof (dmu_replay_record_t),
2008 KM_SLEEP);
2009 drr->drr_type = DRR_BEGIN;
2010
2011 struct drr_begin *drrb = &drr->drr_u.drr_begin;
2012 dsl_dataset_t *to_ds = dspp->to_ds;
2013
2014 drrb->drr_magic = DMU_BACKUP_MAGIC;
2015 drrb->drr_creation_time = dsl_dataset_phys(to_ds)->ds_creation_time;
2016 drrb->drr_type = dmu_objset_type(os);
2017 drrb->drr_toguid = dsl_dataset_phys(to_ds)->ds_guid;
2018 drrb->drr_fromguid = dspp->ancestor_zb.zbm_guid;
2019
2020 DMU_SET_STREAM_HDRTYPE(drrb->drr_versioninfo, DMU_SUBSTREAM);
2021 DMU_SET_FEATUREFLAGS(drrb->drr_versioninfo, featureflags);
2022
2023 if (dspp->is_clone)
2024 drrb->drr_flags |= DRR_FLAG_CLONE;
2025 if (dsl_dataset_phys(dspp->to_ds)->ds_flags & DS_FLAG_CI_DATASET)
2026 drrb->drr_flags |= DRR_FLAG_CI_DATA;
2027 if (zfs_send_set_freerecords_bit)
2028 drrb->drr_flags |= DRR_FLAG_FREERECORDS;
2029 drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_SPILL_BLOCK;
2030
2031 if (dspp->savedok) {
2032 drrb->drr_toguid = dspp->saved_guid;
2033 strlcpy(drrb->drr_toname, dspp->saved_toname,
2034 sizeof (drrb->drr_toname));
2035 } else {
2036 dsl_dataset_name(to_ds, drrb->drr_toname);
2037 if (!to_ds->ds_is_snapshot) {
2038 (void) strlcat(drrb->drr_toname, "@--head--",
2039 sizeof (drrb->drr_toname));
2040 }
2041 }
2042 return (drr);
2043 }
2044
2045 static void
2046 setup_to_thread(struct send_thread_arg *to_arg, objset_t *to_os,
2047 dmu_sendstatus_t *dssp, uint64_t fromtxg, boolean_t rawok)
2048 {
2049 VERIFY0(bqueue_init(&to_arg->q, zfs_send_no_prefetch_queue_ff,
2050 MAX(zfs_send_no_prefetch_queue_length, 2 * zfs_max_recordsize),
2051 offsetof(struct send_range, ln)));
2052 to_arg->error_code = 0;
2053 to_arg->cancel = B_FALSE;
2054 to_arg->os = to_os;
2055 to_arg->fromtxg = fromtxg;
2056 to_arg->flags = TRAVERSE_PRE | TRAVERSE_PREFETCH_METADATA;
2057 if (rawok)
2058 to_arg->flags |= TRAVERSE_NO_DECRYPT;
2059 to_arg->num_blocks_visited = &dssp->dss_blocks;
2060 (void) thread_create(NULL, 0, send_traverse_thread, to_arg, 0,
2061 curproc, TS_RUN, minclsyspri);
2062 }
2063
2064 static void
2065 setup_from_thread(struct redact_list_thread_arg *from_arg,
2066 redaction_list_t *from_rl, dmu_sendstatus_t *dssp)
2067 {
2068 VERIFY0(bqueue_init(&from_arg->q, zfs_send_no_prefetch_queue_ff,
2069 MAX(zfs_send_no_prefetch_queue_length, 2 * zfs_max_recordsize),
2070 offsetof(struct send_range, ln)));
2071 from_arg->error_code = 0;
2072 from_arg->cancel = B_FALSE;
2073 from_arg->rl = from_rl;
2074 from_arg->mark_redact = B_FALSE;
2075 from_arg->num_blocks_visited = &dssp->dss_blocks;
2076 /*
2077 * If from_ds is null, send_traverse_thread just returns success and
2078 * enqueues an eos marker.
2079 */
2080 (void) thread_create(NULL, 0, redact_list_thread, from_arg, 0,
2081 curproc, TS_RUN, minclsyspri);
2082 }
2083
2084 static void
2085 setup_redact_list_thread(struct redact_list_thread_arg *rlt_arg,
2086 struct dmu_send_params *dspp, redaction_list_t *rl, dmu_sendstatus_t *dssp)
2087 {
2088 if (dspp->redactbook == NULL)
2089 return;
2090
2091 rlt_arg->cancel = B_FALSE;
2092 VERIFY0(bqueue_init(&rlt_arg->q, zfs_send_no_prefetch_queue_ff,
2093 MAX(zfs_send_no_prefetch_queue_length, 2 * zfs_max_recordsize),
2094 offsetof(struct send_range, ln)));
2095 rlt_arg->error_code = 0;
2096 rlt_arg->mark_redact = B_TRUE;
2097 rlt_arg->rl = rl;
2098 rlt_arg->num_blocks_visited = &dssp->dss_blocks;
2099
2100 (void) thread_create(NULL, 0, redact_list_thread, rlt_arg, 0,
2101 curproc, TS_RUN, minclsyspri);
2102 }
2103
2104 static void
2105 setup_merge_thread(struct send_merge_thread_arg *smt_arg,
2106 struct dmu_send_params *dspp, struct redact_list_thread_arg *from_arg,
2107 struct send_thread_arg *to_arg, struct redact_list_thread_arg *rlt_arg,
2108 objset_t *os)
2109 {
2110 VERIFY0(bqueue_init(&smt_arg->q, zfs_send_no_prefetch_queue_ff,
2111 MAX(zfs_send_no_prefetch_queue_length, 2 * zfs_max_recordsize),
2112 offsetof(struct send_range, ln)));
2113 smt_arg->cancel = B_FALSE;
2114 smt_arg->error = 0;
2115 smt_arg->from_arg = from_arg;
2116 smt_arg->to_arg = to_arg;
2117 if (dspp->redactbook != NULL)
2118 smt_arg->redact_arg = rlt_arg;
2119
2120 smt_arg->os = os;
2121 (void) thread_create(NULL, 0, send_merge_thread, smt_arg, 0, curproc,
2122 TS_RUN, minclsyspri);
2123 }
2124
2125 static void
2126 setup_reader_thread(struct send_reader_thread_arg *srt_arg,
2127 struct dmu_send_params *dspp, struct send_merge_thread_arg *smt_arg,
2128 uint64_t featureflags)
2129 {
2130 VERIFY0(bqueue_init(&srt_arg->q, zfs_send_queue_ff,
2131 MAX(zfs_send_queue_length, 2 * zfs_max_recordsize),
2132 offsetof(struct send_range, ln)));
2133 srt_arg->smta = smt_arg;
2134 srt_arg->issue_reads = !dspp->dso->dso_dryrun;
2135 srt_arg->featureflags = featureflags;
2136 (void) thread_create(NULL, 0, send_reader_thread, srt_arg, 0,
2137 curproc, TS_RUN, minclsyspri);
2138 }
2139
2140 static int
2141 setup_resume_points(struct dmu_send_params *dspp,
2142 struct send_thread_arg *to_arg, struct redact_list_thread_arg *from_arg,
2143 struct redact_list_thread_arg *rlt_arg,
2144 struct send_merge_thread_arg *smt_arg, boolean_t resuming, objset_t *os,
2145 redaction_list_t *redact_rl, nvlist_t *nvl)
2146 {
2147 dsl_dataset_t *to_ds = dspp->to_ds;
2148 int err = 0;
2149
2150 uint64_t obj = 0;
2151 uint64_t blkid = 0;
2152 if (resuming) {
2153 obj = dspp->resumeobj;
2154 dmu_object_info_t to_doi;
2155 err = dmu_object_info(os, obj, &to_doi);
2156 if (err != 0)
2157 return (err);
2158
2159 blkid = dspp->resumeoff / to_doi.doi_data_block_size;
2160 }
2161 /*
2162 * If we're resuming a redacted send, we can skip to the appropriate
2163 * point in the redaction bookmark by binary searching through it.
2164 */
2165 if (redact_rl != NULL) {
2166 SET_BOOKMARK(&rlt_arg->resume, to_ds->ds_object, obj, 0, blkid);
2167 }
2168
2169 SET_BOOKMARK(&to_arg->resume, to_ds->ds_object, obj, 0, blkid);
2170 if (nvlist_exists(nvl, BEGINNV_REDACT_FROM_SNAPS)) {
2171 uint64_t objset = dspp->ancestor_zb.zbm_redaction_obj;
2172 /*
2173 * Note: If the resume point is in an object whose
2174 * blocksize is different in the from vs to snapshots,
2175 * we will have divided by the "wrong" blocksize.
2176 * However, in this case fromsnap's send_cb() will
2177 * detect that the blocksize has changed and therefore
2178 * ignore this object.
2179 *
2180 * If we're resuming a send from a redaction bookmark,
2181 * we still cannot accidentally suggest blocks behind
2182 * the to_ds. In addition, we know that any blocks in
2183 * the object in the to_ds will have to be sent, since
2184 * the size changed. Therefore, we can't cause any harm
2185 * this way either.
2186 */
2187 SET_BOOKMARK(&from_arg->resume, objset, obj, 0, blkid);
2188 }
2189 if (resuming) {
2190 fnvlist_add_uint64(nvl, BEGINNV_RESUME_OBJECT, dspp->resumeobj);
2191 fnvlist_add_uint64(nvl, BEGINNV_RESUME_OFFSET, dspp->resumeoff);
2192 }
2193 return (0);
2194 }
2195
2196 static dmu_sendstatus_t *
2197 setup_send_progress(struct dmu_send_params *dspp)
2198 {
2199 dmu_sendstatus_t *dssp = kmem_zalloc(sizeof (*dssp), KM_SLEEP);
2200 dssp->dss_outfd = dspp->outfd;
2201 dssp->dss_off = dspp->off;
2202 dssp->dss_proc = curproc;
2203 mutex_enter(&dspp->to_ds->ds_sendstream_lock);
2204 list_insert_head(&dspp->to_ds->ds_sendstreams, dssp);
2205 mutex_exit(&dspp->to_ds->ds_sendstream_lock);
2206 return (dssp);
2207 }
2208
2209 /*
2210 * Actually do the bulk of the work in a zfs send.
2211 *
2212 * The idea is that we want to do a send from ancestor_zb to to_ds. We also
2213 * want to not send any data that has been modified by all the datasets in
2214 * redactsnaparr, and store the list of blocks that are redacted in this way in
2215 * a bookmark named redactbook, created on the to_ds. We do this by creating
2216 * several worker threads, whose function is described below.
2217 *
2218 * There are three cases.
2219 * The first case is a redacted zfs send. In this case there are 5 threads.
2220 * The first thread is the to_ds traversal thread: it calls dataset_traverse on
2221 * the to_ds and finds all the blocks that have changed since ancestor_zb (if
2222 * it's a full send, that's all blocks in the dataset). It then sends those
2223 * blocks on to the send merge thread. The redact list thread takes the data
2224 * from the redaction bookmark and sends those blocks on to the send merge
2225 * thread. The send merge thread takes the data from the to_ds traversal
2226 * thread, and combines it with the redaction records from the redact list
2227 * thread. If a block appears in both the to_ds's data and the redaction data,
2228 * the send merge thread will mark it as redacted and send it on to the prefetch
2229 * thread. Otherwise, the send merge thread will send the block on to the
2230 * prefetch thread unchanged. The prefetch thread will issue prefetch reads for
2231 * any data that isn't redacted, and then send the data on to the main thread.
2232 * The main thread behaves the same as in a normal send case, issuing demand
2233 * reads for data blocks and sending out records over the network
2234 *
2235 * The graphic below diagrams the flow of data in the case of a redacted zfs
2236 * send. Each box represents a thread, and each line represents the flow of
2237 * data.
2238 *
2239 * Records from the |
2240 * redaction bookmark |
2241 * +--------------------+ | +---------------------------+
2242 * | | v | Send Merge Thread |
2243 * | Redact List Thread +----------> Apply redaction marks to |
2244 * | | | records as specified by |
2245 * +--------------------+ | redaction ranges |
2246 * +----^---------------+------+
2247 * | | Merged data
2248 * | |
2249 * | +------------v--------+
2250 * | | Prefetch Thread |
2251 * +--------------------+ | | Issues prefetch |
2252 * | to_ds Traversal | | | reads of data blocks|
2253 * | Thread (finds +---------------+ +------------+--------+
2254 * | candidate blocks) | Blocks modified | Prefetched data
2255 * +--------------------+ by to_ds since |
2256 * ancestor_zb +------------v----+
2257 * | Main Thread | File Descriptor
2258 * | Sends data over +->(to zfs receive)
2259 * | wire |
2260 * +-----------------+
2261 *
2262 * The second case is an incremental send from a redaction bookmark. The to_ds
2263 * traversal thread and the main thread behave the same as in the redacted
2264 * send case. The new thread is the from bookmark traversal thread. It
2265 * iterates over the redaction list in the redaction bookmark, and enqueues
2266 * records for each block that was redacted in the original send. The send
2267 * merge thread now has to merge the data from the two threads. For details
2268 * about that process, see the header comment of send_merge_thread(). Any data
2269 * it decides to send on will be prefetched by the prefetch thread. Note that
2270 * you can perform a redacted send from a redaction bookmark; in that case,
2271 * the data flow behaves very similarly to the flow in the redacted send case,
2272 * except with the addition of the bookmark traversal thread iterating over the
2273 * redaction bookmark. The send_merge_thread also has to take on the
2274 * responsibility of merging the redact list thread's records, the bookmark
2275 * traversal thread's records, and the to_ds records.
2276 *
2277 * +---------------------+
2278 * | |
2279 * | Redact List Thread +--------------+
2280 * | | |
2281 * +---------------------+ |
2282 * Blocks in redaction list | Ranges modified by every secure snap
2283 * of from bookmark | (or EOS if not readcted)
2284 * |
2285 * +---------------------+ | +----v----------------------+
2286 * | bookmark Traversal | v | Send Merge Thread |
2287 * | Thread (finds +---------> Merges bookmark, rlt, and |
2288 * | candidate blocks) | | to_ds send records |
2289 * +---------------------+ +----^---------------+------+
2290 * | | Merged data
2291 * | +------------v--------+
2292 * | | Prefetch Thread |
2293 * +--------------------+ | | Issues prefetch |
2294 * | to_ds Traversal | | | reads of data blocks|
2295 * | Thread (finds +---------------+ +------------+--------+
2296 * | candidate blocks) | Blocks modified | Prefetched data
2297 * +--------------------+ by to_ds since +------------v----+
2298 * ancestor_zb | Main Thread | File Descriptor
2299 * | Sends data over +->(to zfs receive)
2300 * | wire |
2301 * +-----------------+
2302 *
2303 * The final case is a simple zfs full or incremental send. The to_ds traversal
2304 * thread behaves the same as always. The redact list thread is never started.
2305 * The send merge thread takes all the blocks that the to_ds traversal thread
2306 * sends it, prefetches the data, and sends the blocks on to the main thread.
2307 * The main thread sends the data over the wire.
2308 *
2309 * To keep performance acceptable, we want to prefetch the data in the worker
2310 * threads. While the to_ds thread could simply use the TRAVERSE_PREFETCH
2311 * feature built into traverse_dataset, the combining and deletion of records
2312 * due to redaction and sends from redaction bookmarks mean that we could
2313 * issue many unnecessary prefetches. As a result, we only prefetch data
2314 * after we've determined that the record is not going to be redacted. To
2315 * prevent the prefetching from getting too far ahead of the main thread, the
2316 * blocking queues that are used for communication are capped not by the
2317 * number of entries in the queue, but by the sum of the size of the
2318 * prefetches associated with them. The limit on the amount of data that the
2319 * thread can prefetch beyond what the main thread has reached is controlled
2320 * by the global variable zfs_send_queue_length. In addition, to prevent poor
2321 * performance in the beginning of a send, we also limit the distance ahead
2322 * that the traversal threads can be. That distance is controlled by the
2323 * zfs_send_no_prefetch_queue_length tunable.
2324 *
2325 * Note: Releases dp using the specified tag.
2326 */
2327 static int
2328 dmu_send_impl(struct dmu_send_params *dspp)
2329 {
2330 objset_t *os;
2331 dmu_replay_record_t *drr;
2332 dmu_sendstatus_t *dssp;
2333 dmu_send_cookie_t dsc = {0};
2334 int err;
2335 uint64_t fromtxg = dspp->ancestor_zb.zbm_creation_txg;
2336 uint64_t featureflags = 0;
2337 struct redact_list_thread_arg *from_arg;
2338 struct send_thread_arg *to_arg;
2339 struct redact_list_thread_arg *rlt_arg;
2340 struct send_merge_thread_arg *smt_arg;
2341 struct send_reader_thread_arg *srt_arg;
2342 struct send_range *range;
2343 redaction_list_t *from_rl = NULL;
2344 redaction_list_t *redact_rl = NULL;
2345 boolean_t resuming = (dspp->resumeobj != 0 || dspp->resumeoff != 0);
2346 boolean_t book_resuming = resuming;
2347
2348 dsl_dataset_t *to_ds = dspp->to_ds;
2349 zfs_bookmark_phys_t *ancestor_zb = &dspp->ancestor_zb;
2350 dsl_pool_t *dp = dspp->dp;
2351 void *tag = dspp->tag;
2352
2353 err = dmu_objset_from_ds(to_ds, &os);
2354 if (err != 0) {
2355 dsl_pool_rele(dp, tag);
2356 return (err);
2357 }
2358
2359 /*
2360 * If this is a non-raw send of an encrypted ds, we can ensure that
2361 * the objset_phys_t is authenticated. This is safe because this is
2362 * either a snapshot or we have owned the dataset, ensuring that
2363 * it can't be modified.
2364 */
2365 if (!dspp->rawok && os->os_encrypted &&
2366 arc_is_unauthenticated(os->os_phys_buf)) {
2367 zbookmark_phys_t zb;
2368
2369 SET_BOOKMARK(&zb, to_ds->ds_object, ZB_ROOT_OBJECT,
2370 ZB_ROOT_LEVEL, ZB_ROOT_BLKID);
2371 err = arc_untransform(os->os_phys_buf, os->os_spa,
2372 &zb, B_FALSE);
2373 if (err != 0) {
2374 dsl_pool_rele(dp, tag);
2375 return (err);
2376 }
2377
2378 ASSERT0(arc_is_unauthenticated(os->os_phys_buf));
2379 }
2380
2381 if ((err = setup_featureflags(dspp, os, &featureflags)) != 0) {
2382 dsl_pool_rele(dp, tag);
2383 return (err);
2384 }
2385
2386 /*
2387 * If we're doing a redacted send, hold the bookmark's redaction list.
2388 */
2389 if (dspp->redactbook != NULL) {
2390 err = dsl_redaction_list_hold_obj(dp,
2391 dspp->redactbook->zbm_redaction_obj, FTAG,
2392 &redact_rl);
2393 if (err != 0) {
2394 dsl_pool_rele(dp, tag);
2395 return (SET_ERROR(EINVAL));
2396 }
2397 dsl_redaction_list_long_hold(dp, redact_rl, FTAG);
2398 }
2399
2400 /*
2401 * If we're sending from a redaction bookmark, hold the redaction list
2402 * so that we can consider sending the redacted blocks.
2403 */
2404 if (ancestor_zb->zbm_redaction_obj != 0) {
2405 err = dsl_redaction_list_hold_obj(dp,
2406 ancestor_zb->zbm_redaction_obj, FTAG, &from_rl);
2407 if (err != 0) {
2408 if (redact_rl != NULL) {
2409 dsl_redaction_list_long_rele(redact_rl, FTAG);
2410 dsl_redaction_list_rele(redact_rl, FTAG);
2411 }
2412 dsl_pool_rele(dp, tag);
2413 return (SET_ERROR(EINVAL));
2414 }
2415 dsl_redaction_list_long_hold(dp, from_rl, FTAG);
2416 }
2417
2418 dsl_dataset_long_hold(to_ds, FTAG);
2419
2420 from_arg = kmem_zalloc(sizeof (*from_arg), KM_SLEEP);
2421 to_arg = kmem_zalloc(sizeof (*to_arg), KM_SLEEP);
2422 rlt_arg = kmem_zalloc(sizeof (*rlt_arg), KM_SLEEP);
2423 smt_arg = kmem_zalloc(sizeof (*smt_arg), KM_SLEEP);
2424 srt_arg = kmem_zalloc(sizeof (*srt_arg), KM_SLEEP);
2425
2426 drr = create_begin_record(dspp, os, featureflags);
2427 dssp = setup_send_progress(dspp);
2428
2429 dsc.dsc_drr = drr;
2430 dsc.dsc_dso = dspp->dso;
2431 dsc.dsc_os = os;
2432 dsc.dsc_off = dspp->off;
2433 dsc.dsc_toguid = dsl_dataset_phys(to_ds)->ds_guid;
2434 dsc.dsc_fromtxg = fromtxg;
2435 dsc.dsc_pending_op = PENDING_NONE;
2436 dsc.dsc_featureflags = featureflags;
2437 dsc.dsc_resume_object = dspp->resumeobj;
2438 dsc.dsc_resume_offset = dspp->resumeoff;
2439
2440 dsl_pool_rele(dp, tag);
2441
2442 void *payload = NULL;
2443 size_t payload_len = 0;
2444 nvlist_t *nvl = fnvlist_alloc();
2445
2446 /*
2447 * If we're doing a redacted send, we include the snapshots we're
2448 * redacted with respect to so that the target system knows what send
2449 * streams can be correctly received on top of this dataset. If we're
2450 * instead sending a redacted dataset, we include the snapshots that the
2451 * dataset was created with respect to.
2452 */
2453 if (dspp->redactbook != NULL) {
2454 fnvlist_add_uint64_array(nvl, BEGINNV_REDACT_SNAPS,
2455 redact_rl->rl_phys->rlp_snaps,
2456 redact_rl->rl_phys->rlp_num_snaps);
2457 } else if (dsl_dataset_feature_is_active(to_ds,
2458 SPA_FEATURE_REDACTED_DATASETS)) {
2459 uint64_t *tods_guids;
2460 uint64_t length;
2461 VERIFY(dsl_dataset_get_uint64_array_feature(to_ds,
2462 SPA_FEATURE_REDACTED_DATASETS, &length, &tods_guids));
2463 fnvlist_add_uint64_array(nvl, BEGINNV_REDACT_SNAPS, tods_guids,
2464 length);
2465 }
2466
2467 /*
2468 * If we're sending from a redaction bookmark, then we should retrieve
2469 * the guids of that bookmark so we can send them over the wire.
2470 */
2471 if (from_rl != NULL) {
2472 fnvlist_add_uint64_array(nvl, BEGINNV_REDACT_FROM_SNAPS,
2473 from_rl->rl_phys->rlp_snaps,
2474 from_rl->rl_phys->rlp_num_snaps);
2475 }
2476
2477 /*
2478 * If the snapshot we're sending from is redacted, include the redaction
2479 * list in the stream.
2480 */
2481 if (dspp->numfromredactsnaps != NUM_SNAPS_NOT_REDACTED) {
2482 ASSERT3P(from_rl, ==, NULL);
2483 fnvlist_add_uint64_array(nvl, BEGINNV_REDACT_FROM_SNAPS,
2484 dspp->fromredactsnaps, (uint_t)dspp->numfromredactsnaps);
2485 if (dspp->numfromredactsnaps > 0) {
2486 kmem_free(dspp->fromredactsnaps,
2487 dspp->numfromredactsnaps * sizeof (uint64_t));
2488 dspp->fromredactsnaps = NULL;
2489 }
2490 }
2491
2492 if (resuming || book_resuming) {
2493 err = setup_resume_points(dspp, to_arg, from_arg,
2494 rlt_arg, smt_arg, resuming, os, redact_rl, nvl);
2495 if (err != 0)
2496 goto out;
2497 }
2498
2499 if (featureflags & DMU_BACKUP_FEATURE_RAW) {
2500 uint64_t ivset_guid = (ancestor_zb != NULL) ?
2501 ancestor_zb->zbm_ivset_guid : 0;
2502 nvlist_t *keynvl = NULL;
2503 ASSERT(os->os_encrypted);
2504
2505 err = dsl_crypto_populate_key_nvlist(os, ivset_guid,
2506 &keynvl);
2507 if (err != 0) {
2508 fnvlist_free(nvl);
2509 goto out;
2510 }
2511
2512 fnvlist_add_nvlist(nvl, "crypt_keydata", keynvl);
2513 fnvlist_free(keynvl);
2514 }
2515
2516 if (!nvlist_empty(nvl)) {
2517 payload = fnvlist_pack(nvl, &payload_len);
2518 drr->drr_payloadlen = payload_len;
2519 }
2520
2521 fnvlist_free(nvl);
2522 err = dump_record(&dsc, payload, payload_len);
2523 fnvlist_pack_free(payload, payload_len);
2524 if (err != 0) {
2525 err = dsc.dsc_err;
2526 goto out;
2527 }
2528
2529 setup_to_thread(to_arg, os, dssp, fromtxg, dspp->rawok);
2530 setup_from_thread(from_arg, from_rl, dssp);
2531 setup_redact_list_thread(rlt_arg, dspp, redact_rl, dssp);
2532 setup_merge_thread(smt_arg, dspp, from_arg, to_arg, rlt_arg, os);
2533 setup_reader_thread(srt_arg, dspp, smt_arg, featureflags);
2534
2535 range = bqueue_dequeue(&srt_arg->q);
2536 while (err == 0 && !range->eos_marker) {
2537 err = do_dump(&dsc, range);
2538 range = get_next_range(&srt_arg->q, range);
2539 if (issig(JUSTLOOKING) && issig(FORREAL))
2540 err = SET_ERROR(EINTR);
2541 }
2542
2543 /*
2544 * If we hit an error or are interrupted, cancel our worker threads and
2545 * clear the queue of any pending records. The threads will pass the
2546 * cancel up the tree of worker threads, and each one will clean up any
2547 * pending records before exiting.
2548 */
2549 if (err != 0) {
2550 srt_arg->cancel = B_TRUE;
2551 while (!range->eos_marker) {
2552 range = get_next_range(&srt_arg->q, range);
2553 }
2554 }
2555 range_free(range);
2556
2557 bqueue_destroy(&srt_arg->q);
2558 bqueue_destroy(&smt_arg->q);
2559 if (dspp->redactbook != NULL)
2560 bqueue_destroy(&rlt_arg->q);
2561 bqueue_destroy(&to_arg->q);
2562 bqueue_destroy(&from_arg->q);
2563
2564 if (err == 0 && srt_arg->error != 0)
2565 err = srt_arg->error;
2566
2567 if (err != 0)
2568 goto out;
2569
2570 if (dsc.dsc_pending_op != PENDING_NONE)
2571 if (dump_record(&dsc, NULL, 0) != 0)
2572 err = SET_ERROR(EINTR);
2573
2574 if (err != 0) {
2575 if (err == EINTR && dsc.dsc_err != 0)
2576 err = dsc.dsc_err;
2577 goto out;
2578 }
2579
2580 /*
2581 * Send the DRR_END record if this is not a saved stream.
2582 * Otherwise, the omitted DRR_END record will signal to
2583 * the receive side that the stream is incomplete.
2584 */
2585 if (!dspp->savedok) {
2586 bzero(drr, sizeof (dmu_replay_record_t));
2587 drr->drr_type = DRR_END;
2588 drr->drr_u.drr_end.drr_checksum = dsc.dsc_zc;
2589 drr->drr_u.drr_end.drr_toguid = dsc.dsc_toguid;
2590
2591 if (dump_record(&dsc, NULL, 0) != 0)
2592 err = dsc.dsc_err;
2593 }
2594 out:
2595 mutex_enter(&to_ds->ds_sendstream_lock);
2596 list_remove(&to_ds->ds_sendstreams, dssp);
2597 mutex_exit(&to_ds->ds_sendstream_lock);
2598
2599 VERIFY(err != 0 || (dsc.dsc_sent_begin &&
2600 (dsc.dsc_sent_end || dspp->savedok)));
2601
2602 kmem_free(drr, sizeof (dmu_replay_record_t));
2603 kmem_free(dssp, sizeof (dmu_sendstatus_t));
2604 kmem_free(from_arg, sizeof (*from_arg));
2605 kmem_free(to_arg, sizeof (*to_arg));
2606 kmem_free(rlt_arg, sizeof (*rlt_arg));
2607 kmem_free(smt_arg, sizeof (*smt_arg));
2608 kmem_free(srt_arg, sizeof (*srt_arg));
2609
2610 dsl_dataset_long_rele(to_ds, FTAG);
2611 if (from_rl != NULL) {
2612 dsl_redaction_list_long_rele(from_rl, FTAG);
2613 dsl_redaction_list_rele(from_rl, FTAG);
2614 }
2615 if (redact_rl != NULL) {
2616 dsl_redaction_list_long_rele(redact_rl, FTAG);
2617 dsl_redaction_list_rele(redact_rl, FTAG);
2618 }
2619
2620 return (err);
2621 }
2622
2623 int
2624 dmu_send_obj(const char *pool, uint64_t tosnap, uint64_t fromsnap,
2625 boolean_t embedok, boolean_t large_block_ok, boolean_t compressok,
2626 boolean_t rawok, boolean_t savedok, int outfd, offset_t *off,
2627 dmu_send_outparams_t *dsop)
2628 {
2629 int err;
2630 dsl_dataset_t *fromds;
2631 ds_hold_flags_t dsflags = (rawok) ? 0 : DS_HOLD_FLAG_DECRYPT;
2632 struct dmu_send_params dspp = {0};
2633 dspp.embedok = embedok;
2634 dspp.large_block_ok = large_block_ok;
2635 dspp.compressok = compressok;
2636 dspp.outfd = outfd;
2637 dspp.off = off;
2638 dspp.dso = dsop;
2639 dspp.tag = FTAG;
2640 dspp.rawok = rawok;
2641 dspp.savedok = savedok;
2642
2643 err = dsl_pool_hold(pool, FTAG, &dspp.dp);
2644 if (err != 0)
2645 return (err);
2646
2647 err = dsl_dataset_hold_obj_flags(dspp.dp, tosnap, dsflags, FTAG,
2648 &dspp.to_ds);
2649 if (err != 0) {
2650 dsl_pool_rele(dspp.dp, FTAG);
2651 return (err);
2652 }
2653
2654 if (fromsnap != 0) {
2655 err = dsl_dataset_hold_obj_flags(dspp.dp, fromsnap, dsflags,
2656 FTAG, &fromds);
2657 if (err != 0) {
2658 dsl_dataset_rele_flags(dspp.to_ds, dsflags, FTAG);
2659 dsl_pool_rele(dspp.dp, FTAG);
2660 return (err);
2661 }
2662 dspp.ancestor_zb.zbm_guid = dsl_dataset_phys(fromds)->ds_guid;
2663 dspp.ancestor_zb.zbm_creation_txg =
2664 dsl_dataset_phys(fromds)->ds_creation_txg;
2665 dspp.ancestor_zb.zbm_creation_time =
2666 dsl_dataset_phys(fromds)->ds_creation_time;
2667
2668 if (dsl_dataset_is_zapified(fromds)) {
2669 (void) zap_lookup(dspp.dp->dp_meta_objset,
2670 fromds->ds_object, DS_FIELD_IVSET_GUID, 8, 1,
2671 &dspp.ancestor_zb.zbm_ivset_guid);
2672 }
2673
2674 /* See dmu_send for the reasons behind this. */
2675 uint64_t *fromredact;
2676
2677 if (!dsl_dataset_get_uint64_array_feature(fromds,
2678 SPA_FEATURE_REDACTED_DATASETS,
2679 &dspp.numfromredactsnaps,
2680 &fromredact)) {
2681 dspp.numfromredactsnaps = NUM_SNAPS_NOT_REDACTED;
2682 } else if (dspp.numfromredactsnaps > 0) {
2683 uint64_t size = dspp.numfromredactsnaps *
2684 sizeof (uint64_t);
2685 dspp.fromredactsnaps = kmem_zalloc(size, KM_SLEEP);
2686 bcopy(fromredact, dspp.fromredactsnaps, size);
2687 }
2688
2689 if (!dsl_dataset_is_before(dspp.to_ds, fromds, 0)) {
2690 err = SET_ERROR(EXDEV);
2691 } else {
2692 dspp.is_clone = (dspp.to_ds->ds_dir !=
2693 fromds->ds_dir);
2694 dsl_dataset_rele(fromds, FTAG);
2695 err = dmu_send_impl(&dspp);
2696 }
2697 } else {
2698 dspp.numfromredactsnaps = NUM_SNAPS_NOT_REDACTED;
2699 err = dmu_send_impl(&dspp);
2700 }
2701 dsl_dataset_rele(dspp.to_ds, FTAG);
2702 return (err);
2703 }
2704
2705 int
2706 dmu_send(const char *tosnap, const char *fromsnap, boolean_t embedok,
2707 boolean_t large_block_ok, boolean_t compressok, boolean_t rawok,
2708 boolean_t savedok, uint64_t resumeobj, uint64_t resumeoff,
2709 const char *redactbook, int outfd, offset_t *off,
2710 dmu_send_outparams_t *dsop)
2711 {
2712 int err = 0;
2713 ds_hold_flags_t dsflags = (rawok) ? 0 : DS_HOLD_FLAG_DECRYPT;
2714 boolean_t owned = B_FALSE;
2715 dsl_dataset_t *fromds = NULL;
2716 zfs_bookmark_phys_t book = {0};
2717 struct dmu_send_params dspp = {0};
2718
2719 dspp.tosnap = tosnap;
2720 dspp.embedok = embedok;
2721 dspp.large_block_ok = large_block_ok;
2722 dspp.compressok = compressok;
2723 dspp.outfd = outfd;
2724 dspp.off = off;
2725 dspp.dso = dsop;
2726 dspp.tag = FTAG;
2727 dspp.resumeobj = resumeobj;
2728 dspp.resumeoff = resumeoff;
2729 dspp.rawok = rawok;
2730 dspp.savedok = savedok;
2731
2732 if (fromsnap != NULL && strpbrk(fromsnap, "@#") == NULL)
2733 return (SET_ERROR(EINVAL));
2734
2735 err = dsl_pool_hold(tosnap, FTAG, &dspp.dp);
2736 if (err != 0)
2737 return (err);
2738
2739 if (strchr(tosnap, '@') == NULL && spa_writeable(dspp.dp->dp_spa)) {
2740 /*
2741 * We are sending a filesystem or volume. Ensure
2742 * that it doesn't change by owning the dataset.
2743 */
2744
2745 if (savedok) {
2746 /*
2747 * We are looking for the dataset that represents the
2748 * partially received send stream. If this stream was
2749 * received as a new snapshot of an existing dataset,
2750 * this will be saved in a hidden clone named
2751 * "<pool>/<dataset>/%recv". Otherwise, the stream
2752 * will be saved in the live dataset itself. In
2753 * either case we need to use dsl_dataset_own_force()
2754 * because the stream is marked as inconsistent,
2755 * which would normally make it unavailable to be
2756 * owned.
2757 */
2758 char *name = kmem_asprintf("%s/%s", tosnap,
2759 recv_clone_name);
2760 err = dsl_dataset_own_force(dspp.dp, name, dsflags,
2761 FTAG, &dspp.to_ds);
2762 if (err == ENOENT) {
2763 err = dsl_dataset_own_force(dspp.dp, tosnap,
2764 dsflags, FTAG, &dspp.to_ds);
2765 }
2766
2767 if (err == 0) {
2768 err = zap_lookup(dspp.dp->dp_meta_objset,
2769 dspp.to_ds->ds_object,
2770 DS_FIELD_RESUME_TOGUID, 8, 1,
2771 &dspp.saved_guid);
2772 }
2773
2774 if (err == 0) {
2775 err = zap_lookup(dspp.dp->dp_meta_objset,
2776 dspp.to_ds->ds_object,
2777 DS_FIELD_RESUME_TONAME, 1,
2778 sizeof (dspp.saved_toname),
2779 dspp.saved_toname);
2780 }
2781 if (err != 0)
2782 dsl_dataset_disown(dspp.to_ds, dsflags, FTAG);
2783
2784 kmem_strfree(name);
2785 } else {
2786 err = dsl_dataset_own(dspp.dp, tosnap, dsflags,
2787 FTAG, &dspp.to_ds);
2788 }
2789 owned = B_TRUE;
2790 } else {
2791 err = dsl_dataset_hold_flags(dspp.dp, tosnap, dsflags, FTAG,
2792 &dspp.to_ds);
2793 }
2794
2795 if (err != 0) {
2796 dsl_pool_rele(dspp.dp, FTAG);
2797 return (err);
2798 }
2799
2800 if (redactbook != NULL) {
2801 char path[ZFS_MAX_DATASET_NAME_LEN];
2802 (void) strlcpy(path, tosnap, sizeof (path));
2803 char *at = strchr(path, '@');
2804 if (at == NULL) {
2805 err = EINVAL;
2806 } else {
2807 (void) snprintf(at, sizeof (path) - (at - path), "#%s",
2808 redactbook);
2809 err = dsl_bookmark_lookup(dspp.dp, path,
2810 NULL, &book);
2811 dspp.redactbook = &book;
2812 }
2813 }
2814
2815 if (err != 0) {
2816 dsl_pool_rele(dspp.dp, FTAG);
2817 if (owned)
2818 dsl_dataset_disown(dspp.to_ds, dsflags, FTAG);
2819 else
2820 dsl_dataset_rele_flags(dspp.to_ds, dsflags, FTAG);
2821 return (err);
2822 }
2823
2824 if (fromsnap != NULL) {
2825 zfs_bookmark_phys_t *zb = &dspp.ancestor_zb;
2826 int fsnamelen;
2827 if (strpbrk(tosnap, "@#") != NULL)
2828 fsnamelen = strpbrk(tosnap, "@#") - tosnap;
2829 else
2830 fsnamelen = strlen(tosnap);
2831
2832 /*
2833 * If the fromsnap is in a different filesystem, then
2834 * mark the send stream as a clone.
2835 */
2836 if (strncmp(tosnap, fromsnap, fsnamelen) != 0 ||
2837 (fromsnap[fsnamelen] != '@' &&
2838 fromsnap[fsnamelen] != '#')) {
2839 dspp.is_clone = B_TRUE;
2840 }
2841
2842 if (strchr(fromsnap, '@') != NULL) {
2843 err = dsl_dataset_hold(dspp.dp, fromsnap, FTAG,
2844 &fromds);
2845
2846 if (err != 0) {
2847 ASSERT3P(fromds, ==, NULL);
2848 } else {
2849 /*
2850 * We need to make a deep copy of the redact
2851 * snapshots of the from snapshot, because the
2852 * array will be freed when we evict from_ds.
2853 */
2854 uint64_t *fromredact;
2855 if (!dsl_dataset_get_uint64_array_feature(
2856 fromds, SPA_FEATURE_REDACTED_DATASETS,
2857 &dspp.numfromredactsnaps,
2858 &fromredact)) {
2859 dspp.numfromredactsnaps =
2860 NUM_SNAPS_NOT_REDACTED;
2861 } else if (dspp.numfromredactsnaps > 0) {
2862 uint64_t size =
2863 dspp.numfromredactsnaps *
2864 sizeof (uint64_t);
2865 dspp.fromredactsnaps = kmem_zalloc(size,
2866 KM_SLEEP);
2867 bcopy(fromredact, dspp.fromredactsnaps,
2868 size);
2869 }
2870 if (!dsl_dataset_is_before(dspp.to_ds, fromds,
2871 0)) {
2872 err = SET_ERROR(EXDEV);
2873 } else {
2874 zb->zbm_creation_txg =
2875 dsl_dataset_phys(fromds)->
2876 ds_creation_txg;
2877 zb->zbm_creation_time =
2878 dsl_dataset_phys(fromds)->
2879 ds_creation_time;
2880 zb->zbm_guid =
2881 dsl_dataset_phys(fromds)->ds_guid;
2882 zb->zbm_redaction_obj = 0;
2883
2884 if (dsl_dataset_is_zapified(fromds)) {
2885 (void) zap_lookup(
2886 dspp.dp->dp_meta_objset,
2887 fromds->ds_object,
2888 DS_FIELD_IVSET_GUID, 8, 1,
2889 &zb->zbm_ivset_guid);
2890 }
2891 }
2892 dsl_dataset_rele(fromds, FTAG);
2893 }
2894 } else {
2895 dspp.numfromredactsnaps = NUM_SNAPS_NOT_REDACTED;
2896 err = dsl_bookmark_lookup(dspp.dp, fromsnap, dspp.to_ds,
2897 zb);
2898 if (err == EXDEV && zb->zbm_redaction_obj != 0 &&
2899 zb->zbm_guid ==
2900 dsl_dataset_phys(dspp.to_ds)->ds_guid)
2901 err = 0;
2902 }
2903
2904 if (err == 0) {
2905 /* dmu_send_impl will call dsl_pool_rele for us. */
2906 err = dmu_send_impl(&dspp);
2907 } else {
2908 dsl_pool_rele(dspp.dp, FTAG);
2909 }
2910 } else {
2911 dspp.numfromredactsnaps = NUM_SNAPS_NOT_REDACTED;
2912 err = dmu_send_impl(&dspp);
2913 }
2914 if (owned)
2915 dsl_dataset_disown(dspp.to_ds, dsflags, FTAG);
2916 else
2917 dsl_dataset_rele_flags(dspp.to_ds, dsflags, FTAG);
2918 return (err);
2919 }
2920
2921 static int
2922 dmu_adjust_send_estimate_for_indirects(dsl_dataset_t *ds, uint64_t uncompressed,
2923 uint64_t compressed, boolean_t stream_compressed, uint64_t *sizep)
2924 {
2925 int err = 0;
2926 uint64_t size;
2927 /*
2928 * Assume that space (both on-disk and in-stream) is dominated by
2929 * data. We will adjust for indirect blocks and the copies property,
2930 * but ignore per-object space used (eg, dnodes and DRR_OBJECT records).
2931 */
2932
2933 uint64_t recordsize;
2934 uint64_t record_count;
2935 objset_t *os;
2936 VERIFY0(dmu_objset_from_ds(ds, &os));
2937
2938 /* Assume all (uncompressed) blocks are recordsize. */
2939 if (zfs_override_estimate_recordsize != 0) {
2940 recordsize = zfs_override_estimate_recordsize;
2941 } else if (os->os_phys->os_type == DMU_OST_ZVOL) {
2942 err = dsl_prop_get_int_ds(ds,
2943 zfs_prop_to_name(ZFS_PROP_VOLBLOCKSIZE), &recordsize);
2944 } else {
2945 err = dsl_prop_get_int_ds(ds,
2946 zfs_prop_to_name(ZFS_PROP_RECORDSIZE), &recordsize);
2947 }
2948 if (err != 0)
2949 return (err);
2950 record_count = uncompressed / recordsize;
2951
2952 /*
2953 * If we're estimating a send size for a compressed stream, use the
2954 * compressed data size to estimate the stream size. Otherwise, use the
2955 * uncompressed data size.
2956 */
2957 size = stream_compressed ? compressed : uncompressed;
2958
2959 /*
2960 * Subtract out approximate space used by indirect blocks.
2961 * Assume most space is used by data blocks (non-indirect, non-dnode).
2962 * Assume no ditto blocks or internal fragmentation.
2963 *
2964 * Therefore, space used by indirect blocks is sizeof(blkptr_t) per
2965 * block.
2966 */
2967 size -= record_count * sizeof (blkptr_t);
2968
2969 /* Add in the space for the record associated with each block. */
2970 size += record_count * sizeof (dmu_replay_record_t);
2971
2972 *sizep = size;
2973
2974 return (0);
2975 }
2976
2977 int
2978 dmu_send_estimate_fast(dsl_dataset_t *origds, dsl_dataset_t *fromds,
2979 zfs_bookmark_phys_t *frombook, boolean_t stream_compressed,
2980 boolean_t saved, uint64_t *sizep)
2981 {
2982 int err;
2983 dsl_dataset_t *ds = origds;
2984 uint64_t uncomp, comp;
2985
2986 ASSERT(dsl_pool_config_held(origds->ds_dir->dd_pool));
2987 ASSERT(fromds == NULL || frombook == NULL);
2988
2989 /*
2990 * If this is a saved send we may actually be sending
2991 * from the %recv clone used for resuming.
2992 */
2993 if (saved) {
2994 objset_t *mos = origds->ds_dir->dd_pool->dp_meta_objset;
2995 uint64_t guid;
2996 char dsname[ZFS_MAX_DATASET_NAME_LEN + 6];
2997
2998 dsl_dataset_name(origds, dsname);
2999 (void) strcat(dsname, "/");
3000 (void) strcat(dsname, recv_clone_name);
3001
3002 err = dsl_dataset_hold(origds->ds_dir->dd_pool,
3003 dsname, FTAG, &ds);
3004 if (err != ENOENT && err != 0) {
3005 return (err);
3006 } else if (err == ENOENT) {
3007 ds = origds;
3008 }
3009
3010 /* check that this dataset has partially received data */
3011 err = zap_lookup(mos, ds->ds_object,
3012 DS_FIELD_RESUME_TOGUID, 8, 1, &guid);
3013 if (err != 0) {
3014 err = SET_ERROR(err == ENOENT ? EINVAL : err);
3015 goto out;
3016 }
3017
3018 err = zap_lookup(mos, ds->ds_object,
3019 DS_FIELD_RESUME_TONAME, 1, sizeof (dsname), dsname);
3020 if (err != 0) {
3021 err = SET_ERROR(err == ENOENT ? EINVAL : err);
3022 goto out;
3023 }
3024 }
3025
3026 /* tosnap must be a snapshot or the target of a saved send */
3027 if (!ds->ds_is_snapshot && ds == origds)
3028 return (SET_ERROR(EINVAL));
3029
3030 if (fromds != NULL) {
3031 uint64_t used;
3032 if (!fromds->ds_is_snapshot) {
3033 err = SET_ERROR(EINVAL);
3034 goto out;
3035 }
3036
3037 if (!dsl_dataset_is_before(ds, fromds, 0)) {
3038 err = SET_ERROR(EXDEV);
3039 goto out;
3040 }
3041
3042 err = dsl_dataset_space_written(fromds, ds, &used, &comp,
3043 &uncomp);
3044 if (err != 0)
3045 goto out;
3046 } else if (frombook != NULL) {
3047 uint64_t used;
3048 err = dsl_dataset_space_written_bookmark(frombook, ds, &used,
3049 &comp, &uncomp);
3050 if (err != 0)
3051 goto out;
3052 } else {
3053 uncomp = dsl_dataset_phys(ds)->ds_uncompressed_bytes;
3054 comp = dsl_dataset_phys(ds)->ds_compressed_bytes;
3055 }
3056
3057 err = dmu_adjust_send_estimate_for_indirects(ds, uncomp, comp,
3058 stream_compressed, sizep);
3059 /*
3060 * Add the size of the BEGIN and END records to the estimate.
3061 */
3062 *sizep += 2 * sizeof (dmu_replay_record_t);
3063
3064 out:
3065 if (ds != origds)
3066 dsl_dataset_rele(ds, FTAG);
3067 return (err);
3068 }
3069
3070 /* BEGIN CSTYLED */
3071 ZFS_MODULE_PARAM(zfs_send, zfs_send_, corrupt_data, INT, ZMOD_RW,
3072 "Allow sending corrupt data");
3073
3074 ZFS_MODULE_PARAM(zfs_send, zfs_send_, queue_length, INT, ZMOD_RW,
3075 "Maximum send queue length");
3076
3077 ZFS_MODULE_PARAM(zfs_send, zfs_send_, unmodified_spill_blocks, INT, ZMOD_RW,
3078 "Send unmodified spill blocks");
3079
3080 ZFS_MODULE_PARAM(zfs_send, zfs_send_, no_prefetch_queue_length, INT, ZMOD_RW,
3081 "Maximum send queue length for non-prefetch queues");
3082
3083 ZFS_MODULE_PARAM(zfs_send, zfs_send_, queue_ff, INT, ZMOD_RW,
3084 "Send queue fill fraction");
3085
3086 ZFS_MODULE_PARAM(zfs_send, zfs_send_, no_prefetch_queue_ff, INT, ZMOD_RW,
3087 "Send queue fill fraction for non-prefetch queues");
3088
3089 ZFS_MODULE_PARAM(zfs_send, zfs_, override_estimate_recordsize, INT, ZMOD_RW,
3090 "Override block size estimate with fixed size");
3091 /* END CSTYLED */