4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
22 * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
23 * Copyright 2011 Nexenta Systems, Inc. All rights reserved.
24 * Copyright (c) 2011, 2018 by Delphix. All rights reserved.
25 * Copyright (c) 2014, Joyent, Inc. All rights reserved.
26 * Copyright 2014 HybridCluster. All rights reserved.
27 * Copyright 2016 RackTop Systems.
28 * Copyright (c) 2016 Actifio, Inc. All rights reserved.
29 * Copyright (c) 2019, Klara Inc.
30 * Copyright (c) 2019, Allan Jude
34 #include <sys/dmu_impl.h>
35 #include <sys/dmu_tx.h>
37 #include <sys/dnode.h>
38 #include <sys/zfs_context.h>
39 #include <sys/dmu_objset.h>
40 #include <sys/dmu_traverse.h>
41 #include <sys/dsl_dataset.h>
42 #include <sys/dsl_dir.h>
43 #include <sys/dsl_prop.h>
44 #include <sys/dsl_pool.h>
45 #include <sys/dsl_synctask.h>
46 #include <sys/spa_impl.h>
47 #include <sys/zfs_ioctl.h>
49 #include <sys/zio_checksum.h>
50 #include <sys/zfs_znode.h>
51 #include <zfs_fletcher.h>
54 #include <sys/zfs_onexit.h>
55 #include <sys/dmu_send.h>
56 #include <sys/dmu_recv.h>
57 #include <sys/dsl_destroy.h>
58 #include <sys/blkptr.h>
59 #include <sys/dsl_bookmark.h>
60 #include <sys/zfeature.h>
61 #include <sys/bqueue.h>
63 #include <sys/policy.h>
64 #include <sys/objlist.h>
66 #include <sys/zfs_vfsops.h>
69 /* Set this tunable to TRUE to replace corrupt data with 0x2f5baddb10c */
70 int zfs_send_corrupt_data
= B_FALSE
;
72 * This tunable controls the amount of data (measured in bytes) that will be
73 * prefetched by zfs send. If the main thread is blocking on reads that haven't
74 * completed, this variable might need to be increased. If instead the main
75 * thread is issuing new reads because the prefetches have fallen out of the
76 * cache, this may need to be decreased.
78 int zfs_send_queue_length
= SPA_MAXBLOCKSIZE
;
80 * This tunable controls the length of the queues that zfs send worker threads
81 * use to communicate. If the send_main_thread is blocking on these queues,
82 * this variable may need to be increased. If there is a significant slowdown
83 * at the start of a send as these threads consume all the available IO
84 * resources, this variable may need to be decreased.
86 int zfs_send_no_prefetch_queue_length
= 1024 * 1024;
88 * These tunables control the fill fraction of the queues by zfs send. The fill
89 * fraction controls the frequency with which threads have to be cv_signaled.
90 * If a lot of cpu time is being spent on cv_signal, then these should be tuned
91 * down. If the queues empty before the signalled thread can catch up, then
92 * these should be tuned up.
94 int zfs_send_queue_ff
= 20;
95 int zfs_send_no_prefetch_queue_ff
= 20;
98 * Use this to override the recordsize calculation for fast zfs send estimates.
100 int zfs_override_estimate_recordsize
= 0;
102 /* Set this tunable to FALSE to disable setting of DRR_FLAG_FREERECORDS */
103 int zfs_send_set_freerecords_bit
= B_TRUE
;
105 /* Set this tunable to FALSE is disable sending unmodified spill blocks. */
106 int zfs_send_unmodified_spill_blocks
= B_TRUE
;
108 static inline boolean_t
109 overflow_multiply(uint64_t a
, uint64_t b
, uint64_t *c
)
111 uint64_t temp
= a
* b
;
112 if (b
!= 0 && temp
/ b
!= a
)
118 struct send_thread_arg
{
120 objset_t
*os
; /* Objset to traverse */
121 uint64_t fromtxg
; /* Traverse from this txg */
122 int flags
; /* flags to pass to traverse_dataset */
125 zbookmark_phys_t resume
;
126 uint64_t *num_blocks_visited
;
129 struct redact_list_thread_arg
{
132 zbookmark_phys_t resume
;
133 redaction_list_t
*rl
;
134 boolean_t mark_redact
;
136 uint64_t *num_blocks_visited
;
139 struct send_merge_thread_arg
{
142 struct redact_list_thread_arg
*from_arg
;
143 struct send_thread_arg
*to_arg
;
144 struct redact_list_thread_arg
*redact_arg
;
150 boolean_t eos_marker
; /* Marks the end of the stream */
152 uint64_t start_blkid
;
155 enum type
{DATA
, HOLE
, OBJECT
, OBJECT_RANGE
, REDACT
,
156 PREVIOUSLY_REDACTED
} type
;
159 dmu_object_type_t obj_type
;
160 uint32_t datablksz
; // logical size
161 uint32_t datasz
; // payload size
167 boolean_t io_outstanding
;
175 * This is a pointer because embedding it in the
176 * struct causes these structures to be massively larger
177 * for all range types; this makes the code much less
193 * The list of data whose inclusion in a send stream can be pending from
194 * one call to backup_cb to another. Multiple calls to dump_free(),
195 * dump_freeobjects(), and dump_redact() can be aggregated into a single
196 * DRR_FREE, DRR_FREEOBJECTS, or DRR_REDACT replay record.
205 typedef struct dmu_send_cookie
{
206 dmu_replay_record_t
*dsc_drr
;
207 dmu_send_outparams_t
*dsc_dso
;
212 uint64_t dsc_fromtxg
;
214 dmu_pendop_t dsc_pending_op
;
215 uint64_t dsc_featureflags
;
216 uint64_t dsc_last_data_object
;
217 uint64_t dsc_last_data_offset
;
218 uint64_t dsc_resume_object
;
219 uint64_t dsc_resume_offset
;
220 boolean_t dsc_sent_begin
;
221 boolean_t dsc_sent_end
;
224 static int do_dump(dmu_send_cookie_t
*dscp
, struct send_range
*range
);
227 range_free(struct send_range
*range
)
229 if (range
->type
== OBJECT
) {
230 size_t size
= sizeof (dnode_phys_t
) *
231 (range
->sru
.object
.dnp
->dn_extra_slots
+ 1);
232 kmem_free(range
->sru
.object
.dnp
, size
);
233 } else if (range
->type
== DATA
) {
234 mutex_enter(&range
->sru
.data
.lock
);
235 while (range
->sru
.data
.io_outstanding
)
236 cv_wait(&range
->sru
.data
.cv
, &range
->sru
.data
.lock
);
237 if (range
->sru
.data
.abd
!= NULL
)
238 abd_free(range
->sru
.data
.abd
);
239 if (range
->sru
.data
.abuf
!= NULL
) {
240 arc_buf_destroy(range
->sru
.data
.abuf
,
241 &range
->sru
.data
.abuf
);
243 mutex_exit(&range
->sru
.data
.lock
);
245 cv_destroy(&range
->sru
.data
.cv
);
246 mutex_destroy(&range
->sru
.data
.lock
);
248 kmem_free(range
, sizeof (*range
));
252 * For all record types except BEGIN, fill in the checksum (overlaid in
253 * drr_u.drr_checksum.drr_checksum). The checksum verifies everything
254 * up to the start of the checksum itself.
257 dump_record(dmu_send_cookie_t
*dscp
, void *payload
, int payload_len
)
259 dmu_send_outparams_t
*dso
= dscp
->dsc_dso
;
260 ASSERT3U(offsetof(dmu_replay_record_t
, drr_u
.drr_checksum
.drr_checksum
),
261 ==, sizeof (dmu_replay_record_t
) - sizeof (zio_cksum_t
));
262 (void) fletcher_4_incremental_native(dscp
->dsc_drr
,
263 offsetof(dmu_replay_record_t
, drr_u
.drr_checksum
.drr_checksum
),
265 if (dscp
->dsc_drr
->drr_type
== DRR_BEGIN
) {
266 dscp
->dsc_sent_begin
= B_TRUE
;
268 ASSERT(ZIO_CHECKSUM_IS_ZERO(&dscp
->dsc_drr
->drr_u
.
269 drr_checksum
.drr_checksum
));
270 dscp
->dsc_drr
->drr_u
.drr_checksum
.drr_checksum
= dscp
->dsc_zc
;
272 if (dscp
->dsc_drr
->drr_type
== DRR_END
) {
273 dscp
->dsc_sent_end
= B_TRUE
;
275 (void) fletcher_4_incremental_native(&dscp
->dsc_drr
->
276 drr_u
.drr_checksum
.drr_checksum
,
277 sizeof (zio_cksum_t
), &dscp
->dsc_zc
);
278 *dscp
->dsc_off
+= sizeof (dmu_replay_record_t
);
279 dscp
->dsc_err
= dso
->dso_outfunc(dscp
->dsc_os
, dscp
->dsc_drr
,
280 sizeof (dmu_replay_record_t
), dso
->dso_arg
);
281 if (dscp
->dsc_err
!= 0)
282 return (SET_ERROR(EINTR
));
283 if (payload_len
!= 0) {
284 *dscp
->dsc_off
+= payload_len
;
286 * payload is null when dso_dryrun == B_TRUE (i.e. when we're
287 * doing a send size calculation)
289 if (payload
!= NULL
) {
290 (void) fletcher_4_incremental_native(
291 payload
, payload_len
, &dscp
->dsc_zc
);
295 * The code does not rely on this (len being a multiple of 8).
296 * We keep this assertion because of the corresponding assertion
297 * in receive_read(). Keeping this assertion ensures that we do
298 * not inadvertently break backwards compatibility (causing the
299 * assertion in receive_read() to trigger on old software).
301 * Raw sends cannot be received on old software, and so can
302 * bypass this assertion.
305 ASSERT((payload_len
% 8 == 0) ||
306 (dscp
->dsc_featureflags
& DMU_BACKUP_FEATURE_RAW
));
308 dscp
->dsc_err
= dso
->dso_outfunc(dscp
->dsc_os
, payload
,
309 payload_len
, dso
->dso_arg
);
310 if (dscp
->dsc_err
!= 0)
311 return (SET_ERROR(EINTR
));
317 * Fill in the drr_free struct, or perform aggregation if the previous record is
318 * also a free record, and the two are adjacent.
320 * Note that we send free records even for a full send, because we want to be
321 * able to receive a full send as a clone, which requires a list of all the free
322 * and freeobject records that were generated on the source.
325 dump_free(dmu_send_cookie_t
*dscp
, uint64_t object
, uint64_t offset
,
328 struct drr_free
*drrf
= &(dscp
->dsc_drr
->drr_u
.drr_free
);
331 * When we receive a free record, dbuf_free_range() assumes
332 * that the receiving system doesn't have any dbufs in the range
333 * being freed. This is always true because there is a one-record
334 * constraint: we only send one WRITE record for any given
335 * object,offset. We know that the one-record constraint is
336 * true because we always send data in increasing order by
339 * If the increasing-order constraint ever changes, we should find
340 * another way to assert that the one-record constraint is still
343 ASSERT(object
> dscp
->dsc_last_data_object
||
344 (object
== dscp
->dsc_last_data_object
&&
345 offset
> dscp
->dsc_last_data_offset
));
348 * If there is a pending op, but it's not PENDING_FREE, push it out,
349 * since free block aggregation can only be done for blocks of the
350 * same type (i.e., DRR_FREE records can only be aggregated with
351 * other DRR_FREE records. DRR_FREEOBJECTS records can only be
352 * aggregated with other DRR_FREEOBJECTS records).
354 if (dscp
->dsc_pending_op
!= PENDING_NONE
&&
355 dscp
->dsc_pending_op
!= PENDING_FREE
) {
356 if (dump_record(dscp
, NULL
, 0) != 0)
357 return (SET_ERROR(EINTR
));
358 dscp
->dsc_pending_op
= PENDING_NONE
;
361 if (dscp
->dsc_pending_op
== PENDING_FREE
) {
363 * Check to see whether this free block can be aggregated
366 if (drrf
->drr_object
== object
&& drrf
->drr_offset
+
367 drrf
->drr_length
== offset
) {
368 if (offset
+ length
< offset
|| length
== UINT64_MAX
)
369 drrf
->drr_length
= UINT64_MAX
;
371 drrf
->drr_length
+= length
;
374 /* not a continuation. Push out pending record */
375 if (dump_record(dscp
, NULL
, 0) != 0)
376 return (SET_ERROR(EINTR
));
377 dscp
->dsc_pending_op
= PENDING_NONE
;
380 /* create a FREE record and make it pending */
381 bzero(dscp
->dsc_drr
, sizeof (dmu_replay_record_t
));
382 dscp
->dsc_drr
->drr_type
= DRR_FREE
;
383 drrf
->drr_object
= object
;
384 drrf
->drr_offset
= offset
;
385 if (offset
+ length
< offset
)
386 drrf
->drr_length
= DMU_OBJECT_END
;
388 drrf
->drr_length
= length
;
389 drrf
->drr_toguid
= dscp
->dsc_toguid
;
390 if (length
== DMU_OBJECT_END
) {
391 if (dump_record(dscp
, NULL
, 0) != 0)
392 return (SET_ERROR(EINTR
));
394 dscp
->dsc_pending_op
= PENDING_FREE
;
401 * Fill in the drr_redact struct, or perform aggregation if the previous record
402 * is also a redaction record, and the two are adjacent.
405 dump_redact(dmu_send_cookie_t
*dscp
, uint64_t object
, uint64_t offset
,
408 struct drr_redact
*drrr
= &dscp
->dsc_drr
->drr_u
.drr_redact
;
411 * If there is a pending op, but it's not PENDING_REDACT, push it out,
412 * since free block aggregation can only be done for blocks of the
413 * same type (i.e., DRR_REDACT records can only be aggregated with
414 * other DRR_REDACT records).
416 if (dscp
->dsc_pending_op
!= PENDING_NONE
&&
417 dscp
->dsc_pending_op
!= PENDING_REDACT
) {
418 if (dump_record(dscp
, NULL
, 0) != 0)
419 return (SET_ERROR(EINTR
));
420 dscp
->dsc_pending_op
= PENDING_NONE
;
423 if (dscp
->dsc_pending_op
== PENDING_REDACT
) {
425 * Check to see whether this redacted block can be aggregated
428 if (drrr
->drr_object
== object
&& drrr
->drr_offset
+
429 drrr
->drr_length
== offset
) {
430 drrr
->drr_length
+= length
;
433 /* not a continuation. Push out pending record */
434 if (dump_record(dscp
, NULL
, 0) != 0)
435 return (SET_ERROR(EINTR
));
436 dscp
->dsc_pending_op
= PENDING_NONE
;
439 /* create a REDACT record and make it pending */
440 bzero(dscp
->dsc_drr
, sizeof (dmu_replay_record_t
));
441 dscp
->dsc_drr
->drr_type
= DRR_REDACT
;
442 drrr
->drr_object
= object
;
443 drrr
->drr_offset
= offset
;
444 drrr
->drr_length
= length
;
445 drrr
->drr_toguid
= dscp
->dsc_toguid
;
446 dscp
->dsc_pending_op
= PENDING_REDACT
;
452 dmu_dump_write(dmu_send_cookie_t
*dscp
, dmu_object_type_t type
, uint64_t object
,
453 uint64_t offset
, int lsize
, int psize
, const blkptr_t
*bp
, void *data
)
455 uint64_t payload_size
;
456 boolean_t raw
= (dscp
->dsc_featureflags
& DMU_BACKUP_FEATURE_RAW
);
457 struct drr_write
*drrw
= &(dscp
->dsc_drr
->drr_u
.drr_write
);
460 * We send data in increasing object, offset order.
461 * See comment in dump_free() for details.
463 ASSERT(object
> dscp
->dsc_last_data_object
||
464 (object
== dscp
->dsc_last_data_object
&&
465 offset
> dscp
->dsc_last_data_offset
));
466 dscp
->dsc_last_data_object
= object
;
467 dscp
->dsc_last_data_offset
= offset
+ lsize
- 1;
470 * If there is any kind of pending aggregation (currently either
471 * a grouping of free objects or free blocks), push it out to
472 * the stream, since aggregation can't be done across operations
473 * of different types.
475 if (dscp
->dsc_pending_op
!= PENDING_NONE
) {
476 if (dump_record(dscp
, NULL
, 0) != 0)
477 return (SET_ERROR(EINTR
));
478 dscp
->dsc_pending_op
= PENDING_NONE
;
480 /* write a WRITE record */
481 bzero(dscp
->dsc_drr
, sizeof (dmu_replay_record_t
));
482 dscp
->dsc_drr
->drr_type
= DRR_WRITE
;
483 drrw
->drr_object
= object
;
484 drrw
->drr_type
= type
;
485 drrw
->drr_offset
= offset
;
486 drrw
->drr_toguid
= dscp
->dsc_toguid
;
487 drrw
->drr_logical_size
= lsize
;
489 /* only set the compression fields if the buf is compressed or raw */
490 if (raw
|| lsize
!= psize
) {
491 ASSERT(raw
|| dscp
->dsc_featureflags
&
492 DMU_BACKUP_FEATURE_COMPRESSED
);
493 ASSERT(!BP_IS_EMBEDDED(bp
));
494 ASSERT3S(psize
, >, 0);
497 ASSERT(BP_IS_PROTECTED(bp
));
500 * This is a raw protected block so we need to pass
501 * along everything the receiving side will need to
502 * interpret this block, including the byteswap, salt,
505 if (BP_SHOULD_BYTESWAP(bp
))
506 drrw
->drr_flags
|= DRR_RAW_BYTESWAP
;
507 zio_crypt_decode_params_bp(bp
, drrw
->drr_salt
,
509 zio_crypt_decode_mac_bp(bp
, drrw
->drr_mac
);
511 /* this is a compressed block */
512 ASSERT(dscp
->dsc_featureflags
&
513 DMU_BACKUP_FEATURE_COMPRESSED
);
514 ASSERT(!BP_SHOULD_BYTESWAP(bp
));
515 ASSERT(!DMU_OT_IS_METADATA(BP_GET_TYPE(bp
)));
516 ASSERT3U(BP_GET_COMPRESS(bp
), !=, ZIO_COMPRESS_OFF
);
517 ASSERT3S(lsize
, >=, psize
);
520 /* set fields common to compressed and raw sends */
521 drrw
->drr_compressiontype
= BP_GET_COMPRESS(bp
);
522 drrw
->drr_compressed_size
= psize
;
523 payload_size
= drrw
->drr_compressed_size
;
525 payload_size
= drrw
->drr_logical_size
;
528 if (bp
== NULL
|| BP_IS_EMBEDDED(bp
) || (BP_IS_PROTECTED(bp
) && !raw
)) {
530 * There's no pre-computed checksum for partial-block writes,
531 * embedded BP's, or encrypted BP's that are being sent as
532 * plaintext, so (like fletcher4-checksummed blocks) userland
533 * will have to compute a dedup-capable checksum itself.
535 drrw
->drr_checksumtype
= ZIO_CHECKSUM_OFF
;
537 drrw
->drr_checksumtype
= BP_GET_CHECKSUM(bp
);
538 if (zio_checksum_table
[drrw
->drr_checksumtype
].ci_flags
&
539 ZCHECKSUM_FLAG_DEDUP
)
540 drrw
->drr_flags
|= DRR_CHECKSUM_DEDUP
;
541 DDK_SET_LSIZE(&drrw
->drr_key
, BP_GET_LSIZE(bp
));
542 DDK_SET_PSIZE(&drrw
->drr_key
, BP_GET_PSIZE(bp
));
543 DDK_SET_COMPRESS(&drrw
->drr_key
, BP_GET_COMPRESS(bp
));
544 DDK_SET_CRYPT(&drrw
->drr_key
, BP_IS_PROTECTED(bp
));
545 drrw
->drr_key
.ddk_cksum
= bp
->blk_cksum
;
548 if (dump_record(dscp
, data
, payload_size
) != 0)
549 return (SET_ERROR(EINTR
));
554 dump_write_embedded(dmu_send_cookie_t
*dscp
, uint64_t object
, uint64_t offset
,
555 int blksz
, const blkptr_t
*bp
)
557 char buf
[BPE_PAYLOAD_SIZE
];
558 struct drr_write_embedded
*drrw
=
559 &(dscp
->dsc_drr
->drr_u
.drr_write_embedded
);
561 if (dscp
->dsc_pending_op
!= PENDING_NONE
) {
562 if (dump_record(dscp
, NULL
, 0) != 0)
563 return (SET_ERROR(EINTR
));
564 dscp
->dsc_pending_op
= PENDING_NONE
;
567 ASSERT(BP_IS_EMBEDDED(bp
));
569 bzero(dscp
->dsc_drr
, sizeof (dmu_replay_record_t
));
570 dscp
->dsc_drr
->drr_type
= DRR_WRITE_EMBEDDED
;
571 drrw
->drr_object
= object
;
572 drrw
->drr_offset
= offset
;
573 drrw
->drr_length
= blksz
;
574 drrw
->drr_toguid
= dscp
->dsc_toguid
;
575 drrw
->drr_compression
= BP_GET_COMPRESS(bp
);
576 drrw
->drr_etype
= BPE_GET_ETYPE(bp
);
577 drrw
->drr_lsize
= BPE_GET_LSIZE(bp
);
578 drrw
->drr_psize
= BPE_GET_PSIZE(bp
);
580 decode_embedded_bp_compressed(bp
, buf
);
582 if (dump_record(dscp
, buf
, P2ROUNDUP(drrw
->drr_psize
, 8)) != 0)
583 return (SET_ERROR(EINTR
));
588 dump_spill(dmu_send_cookie_t
*dscp
, const blkptr_t
*bp
, uint64_t object
,
591 struct drr_spill
*drrs
= &(dscp
->dsc_drr
->drr_u
.drr_spill
);
592 uint64_t blksz
= BP_GET_LSIZE(bp
);
593 uint64_t payload_size
= blksz
;
595 if (dscp
->dsc_pending_op
!= PENDING_NONE
) {
596 if (dump_record(dscp
, NULL
, 0) != 0)
597 return (SET_ERROR(EINTR
));
598 dscp
->dsc_pending_op
= PENDING_NONE
;
601 /* write a SPILL record */
602 bzero(dscp
->dsc_drr
, sizeof (dmu_replay_record_t
));
603 dscp
->dsc_drr
->drr_type
= DRR_SPILL
;
604 drrs
->drr_object
= object
;
605 drrs
->drr_length
= blksz
;
606 drrs
->drr_toguid
= dscp
->dsc_toguid
;
608 /* See comment in dump_dnode() for full details */
609 if (zfs_send_unmodified_spill_blocks
&&
610 (bp
->blk_birth
<= dscp
->dsc_fromtxg
)) {
611 drrs
->drr_flags
|= DRR_SPILL_UNMODIFIED
;
614 /* handle raw send fields */
615 if (dscp
->dsc_featureflags
& DMU_BACKUP_FEATURE_RAW
) {
616 ASSERT(BP_IS_PROTECTED(bp
));
618 if (BP_SHOULD_BYTESWAP(bp
))
619 drrs
->drr_flags
|= DRR_RAW_BYTESWAP
;
620 drrs
->drr_compressiontype
= BP_GET_COMPRESS(bp
);
621 drrs
->drr_compressed_size
= BP_GET_PSIZE(bp
);
622 zio_crypt_decode_params_bp(bp
, drrs
->drr_salt
, drrs
->drr_iv
);
623 zio_crypt_decode_mac_bp(bp
, drrs
->drr_mac
);
624 payload_size
= drrs
->drr_compressed_size
;
627 if (dump_record(dscp
, data
, payload_size
) != 0)
628 return (SET_ERROR(EINTR
));
633 dump_freeobjects(dmu_send_cookie_t
*dscp
, uint64_t firstobj
, uint64_t numobjs
)
635 struct drr_freeobjects
*drrfo
= &(dscp
->dsc_drr
->drr_u
.drr_freeobjects
);
636 uint64_t maxobj
= DNODES_PER_BLOCK
*
637 (DMU_META_DNODE(dscp
->dsc_os
)->dn_maxblkid
+ 1);
640 * ZoL < 0.7 does not handle large FREEOBJECTS records correctly,
641 * leading to zfs recv never completing. to avoid this issue, don't
642 * send FREEOBJECTS records for object IDs which cannot exist on the
646 if (maxobj
< firstobj
)
649 if (maxobj
< firstobj
+ numobjs
)
650 numobjs
= maxobj
- firstobj
;
654 * If there is a pending op, but it's not PENDING_FREEOBJECTS,
655 * push it out, since free block aggregation can only be done for
656 * blocks of the same type (i.e., DRR_FREE records can only be
657 * aggregated with other DRR_FREE records. DRR_FREEOBJECTS records
658 * can only be aggregated with other DRR_FREEOBJECTS records).
660 if (dscp
->dsc_pending_op
!= PENDING_NONE
&&
661 dscp
->dsc_pending_op
!= PENDING_FREEOBJECTS
) {
662 if (dump_record(dscp
, NULL
, 0) != 0)
663 return (SET_ERROR(EINTR
));
664 dscp
->dsc_pending_op
= PENDING_NONE
;
667 numobjs
= UINT64_MAX
- firstobj
;
669 if (dscp
->dsc_pending_op
== PENDING_FREEOBJECTS
) {
671 * See whether this free object array can be aggregated
674 if (drrfo
->drr_firstobj
+ drrfo
->drr_numobjs
== firstobj
) {
675 drrfo
->drr_numobjs
+= numobjs
;
678 /* can't be aggregated. Push out pending record */
679 if (dump_record(dscp
, NULL
, 0) != 0)
680 return (SET_ERROR(EINTR
));
681 dscp
->dsc_pending_op
= PENDING_NONE
;
685 /* write a FREEOBJECTS record */
686 bzero(dscp
->dsc_drr
, sizeof (dmu_replay_record_t
));
687 dscp
->dsc_drr
->drr_type
= DRR_FREEOBJECTS
;
688 drrfo
->drr_firstobj
= firstobj
;
689 drrfo
->drr_numobjs
= numobjs
;
690 drrfo
->drr_toguid
= dscp
->dsc_toguid
;
692 dscp
->dsc_pending_op
= PENDING_FREEOBJECTS
;
698 dump_dnode(dmu_send_cookie_t
*dscp
, const blkptr_t
*bp
, uint64_t object
,
701 struct drr_object
*drro
= &(dscp
->dsc_drr
->drr_u
.drr_object
);
704 if (object
< dscp
->dsc_resume_object
) {
706 * Note: when resuming, we will visit all the dnodes in
707 * the block of dnodes that we are resuming from. In
708 * this case it's unnecessary to send the dnodes prior to
709 * the one we are resuming from. We should be at most one
710 * block's worth of dnodes behind the resume point.
712 ASSERT3U(dscp
->dsc_resume_object
- object
, <,
713 1 << (DNODE_BLOCK_SHIFT
- DNODE_SHIFT
));
717 if (dnp
== NULL
|| dnp
->dn_type
== DMU_OT_NONE
)
718 return (dump_freeobjects(dscp
, object
, 1));
720 if (dscp
->dsc_pending_op
!= PENDING_NONE
) {
721 if (dump_record(dscp
, NULL
, 0) != 0)
722 return (SET_ERROR(EINTR
));
723 dscp
->dsc_pending_op
= PENDING_NONE
;
726 /* write an OBJECT record */
727 bzero(dscp
->dsc_drr
, sizeof (dmu_replay_record_t
));
728 dscp
->dsc_drr
->drr_type
= DRR_OBJECT
;
729 drro
->drr_object
= object
;
730 drro
->drr_type
= dnp
->dn_type
;
731 drro
->drr_bonustype
= dnp
->dn_bonustype
;
732 drro
->drr_blksz
= dnp
->dn_datablkszsec
<< SPA_MINBLOCKSHIFT
;
733 drro
->drr_bonuslen
= dnp
->dn_bonuslen
;
734 drro
->drr_dn_slots
= dnp
->dn_extra_slots
+ 1;
735 drro
->drr_checksumtype
= dnp
->dn_checksum
;
736 drro
->drr_compress
= dnp
->dn_compress
;
737 drro
->drr_toguid
= dscp
->dsc_toguid
;
739 if (!(dscp
->dsc_featureflags
& DMU_BACKUP_FEATURE_LARGE_BLOCKS
) &&
740 drro
->drr_blksz
> SPA_OLD_MAXBLOCKSIZE
)
741 drro
->drr_blksz
= SPA_OLD_MAXBLOCKSIZE
;
743 bonuslen
= P2ROUNDUP(dnp
->dn_bonuslen
, 8);
745 if ((dscp
->dsc_featureflags
& DMU_BACKUP_FEATURE_RAW
)) {
746 ASSERT(BP_IS_ENCRYPTED(bp
));
748 if (BP_SHOULD_BYTESWAP(bp
))
749 drro
->drr_flags
|= DRR_RAW_BYTESWAP
;
751 /* needed for reconstructing dnp on recv side */
752 drro
->drr_maxblkid
= dnp
->dn_maxblkid
;
753 drro
->drr_indblkshift
= dnp
->dn_indblkshift
;
754 drro
->drr_nlevels
= dnp
->dn_nlevels
;
755 drro
->drr_nblkptr
= dnp
->dn_nblkptr
;
758 * Since we encrypt the entire bonus area, the (raw) part
759 * beyond the bonuslen is actually nonzero, so we need
763 drro
->drr_raw_bonuslen
= DN_MAX_BONUS_LEN(dnp
);
764 bonuslen
= drro
->drr_raw_bonuslen
;
769 * DRR_OBJECT_SPILL is set for every dnode which references a
770 * spill block. This allows the receiving pool to definitively
771 * determine when a spill block should be kept or freed.
773 if (dnp
->dn_flags
& DNODE_FLAG_SPILL_BLKPTR
)
774 drro
->drr_flags
|= DRR_OBJECT_SPILL
;
776 if (dump_record(dscp
, DN_BONUS(dnp
), bonuslen
) != 0)
777 return (SET_ERROR(EINTR
));
779 /* Free anything past the end of the file. */
780 if (dump_free(dscp
, object
, (dnp
->dn_maxblkid
+ 1) *
781 (dnp
->dn_datablkszsec
<< SPA_MINBLOCKSHIFT
), DMU_OBJECT_END
) != 0)
782 return (SET_ERROR(EINTR
));
785 * Send DRR_SPILL records for unmodified spill blocks. This is useful
786 * because changing certain attributes of the object (e.g. blocksize)
787 * can cause old versions of ZFS to incorrectly remove a spill block.
788 * Including these records in the stream forces an up to date version
789 * to always be written ensuring they're never lost. Current versions
790 * of the code which understand the DRR_FLAG_SPILL_BLOCK feature can
791 * ignore these unmodified spill blocks.
793 if (zfs_send_unmodified_spill_blocks
&&
794 (dnp
->dn_flags
& DNODE_FLAG_SPILL_BLKPTR
) &&
795 (DN_SPILL_BLKPTR(dnp
)->blk_birth
<= dscp
->dsc_fromtxg
)) {
796 struct send_range record
;
797 blkptr_t
*bp
= DN_SPILL_BLKPTR(dnp
);
799 bzero(&record
, sizeof (struct send_range
));
801 record
.object
= object
;
802 record
.eos_marker
= B_FALSE
;
803 record
.start_blkid
= DMU_SPILL_BLKID
;
804 record
.end_blkid
= record
.start_blkid
+ 1;
805 record
.sru
.data
.bp
= *bp
;
806 record
.sru
.data
.obj_type
= dnp
->dn_type
;
807 record
.sru
.data
.datablksz
= BP_GET_LSIZE(bp
);
809 if (do_dump(dscp
, &record
) != 0)
810 return (SET_ERROR(EINTR
));
813 if (dscp
->dsc_err
!= 0)
814 return (SET_ERROR(EINTR
));
820 dump_object_range(dmu_send_cookie_t
*dscp
, const blkptr_t
*bp
,
821 uint64_t firstobj
, uint64_t numslots
)
823 struct drr_object_range
*drror
=
824 &(dscp
->dsc_drr
->drr_u
.drr_object_range
);
826 /* we only use this record type for raw sends */
827 ASSERT(BP_IS_PROTECTED(bp
));
828 ASSERT(dscp
->dsc_featureflags
& DMU_BACKUP_FEATURE_RAW
);
829 ASSERT3U(BP_GET_COMPRESS(bp
), ==, ZIO_COMPRESS_OFF
);
830 ASSERT3U(BP_GET_TYPE(bp
), ==, DMU_OT_DNODE
);
831 ASSERT0(BP_GET_LEVEL(bp
));
833 if (dscp
->dsc_pending_op
!= PENDING_NONE
) {
834 if (dump_record(dscp
, NULL
, 0) != 0)
835 return (SET_ERROR(EINTR
));
836 dscp
->dsc_pending_op
= PENDING_NONE
;
839 bzero(dscp
->dsc_drr
, sizeof (dmu_replay_record_t
));
840 dscp
->dsc_drr
->drr_type
= DRR_OBJECT_RANGE
;
841 drror
->drr_firstobj
= firstobj
;
842 drror
->drr_numslots
= numslots
;
843 drror
->drr_toguid
= dscp
->dsc_toguid
;
844 if (BP_SHOULD_BYTESWAP(bp
))
845 drror
->drr_flags
|= DRR_RAW_BYTESWAP
;
846 zio_crypt_decode_params_bp(bp
, drror
->drr_salt
, drror
->drr_iv
);
847 zio_crypt_decode_mac_bp(bp
, drror
->drr_mac
);
849 if (dump_record(dscp
, NULL
, 0) != 0)
850 return (SET_ERROR(EINTR
));
855 send_do_embed(const blkptr_t
*bp
, uint64_t featureflags
)
857 if (!BP_IS_EMBEDDED(bp
))
861 * Compression function must be legacy, or explicitly enabled.
863 if ((BP_GET_COMPRESS(bp
) >= ZIO_COMPRESS_LEGACY_FUNCTIONS
&&
864 !(featureflags
& DMU_BACKUP_FEATURE_LZ4
)))
868 * If we have not set the ZSTD feature flag, we can't send ZSTD
869 * compressed embedded blocks, as the receiver may not support them.
871 if ((BP_GET_COMPRESS(bp
) == ZIO_COMPRESS_ZSTD
&&
872 !(featureflags
& DMU_BACKUP_FEATURE_ZSTD
)))
876 * Embed type must be explicitly enabled.
878 switch (BPE_GET_ETYPE(bp
)) {
879 case BP_EMBEDDED_TYPE_DATA
:
880 if (featureflags
& DMU_BACKUP_FEATURE_EMBED_DATA
)
890 * This function actually handles figuring out what kind of record needs to be
891 * dumped, and calling the appropriate helper function. In most cases,
892 * the data has already been read by send_reader_thread().
895 do_dump(dmu_send_cookie_t
*dscp
, struct send_range
*range
)
898 switch (range
->type
) {
900 err
= dump_dnode(dscp
, &range
->sru
.object
.bp
, range
->object
,
901 range
->sru
.object
.dnp
);
904 ASSERT3U(range
->start_blkid
+ 1, ==, range
->end_blkid
);
905 if (!(dscp
->dsc_featureflags
& DMU_BACKUP_FEATURE_RAW
)) {
908 uint64_t epb
= BP_GET_LSIZE(&range
->sru
.object_range
.bp
) >>
910 uint64_t firstobj
= range
->start_blkid
* epb
;
911 err
= dump_object_range(dscp
, &range
->sru
.object_range
.bp
,
916 struct srr
*srrp
= &range
->sru
.redact
;
917 err
= dump_redact(dscp
, range
->object
, range
->start_blkid
*
918 srrp
->datablksz
, (range
->end_blkid
- range
->start_blkid
) *
923 struct srd
*srdp
= &range
->sru
.data
;
924 blkptr_t
*bp
= &srdp
->bp
;
926 dmu_objset_spa(dscp
->dsc_os
);
928 ASSERT3U(srdp
->datablksz
, ==, BP_GET_LSIZE(bp
));
929 ASSERT3U(range
->start_blkid
+ 1, ==, range
->end_blkid
);
930 if (BP_GET_TYPE(bp
) == DMU_OT_SA
) {
931 arc_flags_t aflags
= ARC_FLAG_WAIT
;
932 enum zio_flag zioflags
= ZIO_FLAG_CANFAIL
;
934 if (dscp
->dsc_featureflags
& DMU_BACKUP_FEATURE_RAW
) {
935 ASSERT(BP_IS_PROTECTED(bp
));
936 zioflags
|= ZIO_FLAG_RAW
;
940 ASSERT3U(range
->start_blkid
, ==, DMU_SPILL_BLKID
);
941 zb
.zb_objset
= dmu_objset_id(dscp
->dsc_os
);
942 zb
.zb_object
= range
->object
;
944 zb
.zb_blkid
= range
->start_blkid
;
946 arc_buf_t
*abuf
= NULL
;
947 if (!dscp
->dsc_dso
->dso_dryrun
&& arc_read(NULL
, spa
,
948 bp
, arc_getbuf_func
, &abuf
, ZIO_PRIORITY_ASYNC_READ
,
949 zioflags
, &aflags
, &zb
) != 0)
950 return (SET_ERROR(EIO
));
952 err
= dump_spill(dscp
, bp
, zb
.zb_object
,
953 (abuf
== NULL
? NULL
: abuf
->b_data
));
955 arc_buf_destroy(abuf
, &abuf
);
958 if (send_do_embed(bp
, dscp
->dsc_featureflags
)) {
959 err
= dump_write_embedded(dscp
, range
->object
,
960 range
->start_blkid
* srdp
->datablksz
,
961 srdp
->datablksz
, bp
);
964 ASSERT(range
->object
> dscp
->dsc_resume_object
||
965 (range
->object
== dscp
->dsc_resume_object
&&
966 range
->start_blkid
* srdp
->datablksz
>=
967 dscp
->dsc_resume_offset
));
968 /* it's a level-0 block of a regular object */
970 mutex_enter(&srdp
->lock
);
971 while (srdp
->io_outstanding
)
972 cv_wait(&srdp
->cv
, &srdp
->lock
);
974 mutex_exit(&srdp
->lock
);
977 if (zfs_send_corrupt_data
&&
978 !dscp
->dsc_dso
->dso_dryrun
) {
980 * Send a block filled with 0x"zfs badd bloc"
982 srdp
->abuf
= arc_alloc_buf(spa
, &srdp
->abuf
,
983 ARC_BUFC_DATA
, srdp
->datablksz
);
985 for (ptr
= srdp
->abuf
->b_data
;
986 (char *)ptr
< (char *)srdp
->abuf
->b_data
+
987 srdp
->datablksz
; ptr
++)
988 *ptr
= 0x2f5baddb10cULL
;
990 return (SET_ERROR(EIO
));
994 ASSERT(dscp
->dsc_dso
->dso_dryrun
||
995 srdp
->abuf
!= NULL
|| srdp
->abd
!= NULL
);
997 uint64_t offset
= range
->start_blkid
* srdp
->datablksz
;
1000 if (srdp
->abd
!= NULL
) {
1001 data
= abd_to_buf(srdp
->abd
);
1002 ASSERT3P(srdp
->abuf
, ==, NULL
);
1003 } else if (srdp
->abuf
!= NULL
) {
1004 data
= srdp
->abuf
->b_data
;
1008 * If we have large blocks stored on disk but the send flags
1009 * don't allow us to send large blocks, we split the data from
1010 * the arc buf into chunks.
1012 if (srdp
->datablksz
> SPA_OLD_MAXBLOCKSIZE
&&
1013 !(dscp
->dsc_featureflags
&
1014 DMU_BACKUP_FEATURE_LARGE_BLOCKS
)) {
1015 while (srdp
->datablksz
> 0 && err
== 0) {
1016 int n
= MIN(srdp
->datablksz
,
1017 SPA_OLD_MAXBLOCKSIZE
);
1018 err
= dmu_dump_write(dscp
, srdp
->obj_type
,
1019 range
->object
, offset
, n
, n
, NULL
, data
);
1022 * When doing dry run, data==NULL is used as a
1024 * dmu_dump_write()->dump_record().
1028 srdp
->datablksz
-= n
;
1031 err
= dmu_dump_write(dscp
, srdp
->obj_type
,
1032 range
->object
, offset
,
1033 srdp
->datablksz
, srdp
->datasz
, bp
, data
);
1038 struct srh
*srhp
= &range
->sru
.hole
;
1039 if (range
->object
== DMU_META_DNODE_OBJECT
) {
1040 uint32_t span
= srhp
->datablksz
>> DNODE_SHIFT
;
1041 uint64_t first_obj
= range
->start_blkid
* span
;
1042 uint64_t numobj
= range
->end_blkid
* span
- first_obj
;
1043 return (dump_freeobjects(dscp
, first_obj
, numobj
));
1045 uint64_t offset
= 0;
1048 * If this multiply overflows, we don't need to send this block.
1049 * Even if it has a birth time, it can never not be a hole, so
1050 * we don't need to send records for it.
1052 if (!overflow_multiply(range
->start_blkid
, srhp
->datablksz
,
1058 if (!overflow_multiply(range
->end_blkid
, srhp
->datablksz
, &len
))
1061 return (dump_free(dscp
, range
->object
, offset
, len
));
1064 panic("Invalid range type in do_dump: %d", range
->type
);
1069 static struct send_range
*
1070 range_alloc(enum type type
, uint64_t object
, uint64_t start_blkid
,
1071 uint64_t end_blkid
, boolean_t eos
)
1073 struct send_range
*range
= kmem_alloc(sizeof (*range
), KM_SLEEP
);
1075 range
->object
= object
;
1076 range
->start_blkid
= start_blkid
;
1077 range
->end_blkid
= end_blkid
;
1078 range
->eos_marker
= eos
;
1080 range
->sru
.data
.abd
= NULL
;
1081 range
->sru
.data
.abuf
= NULL
;
1082 mutex_init(&range
->sru
.data
.lock
, NULL
, MUTEX_DEFAULT
, NULL
);
1083 cv_init(&range
->sru
.data
.cv
, NULL
, CV_DEFAULT
, NULL
);
1084 range
->sru
.data
.io_outstanding
= 0;
1085 range
->sru
.data
.io_err
= 0;
1091 * This is the callback function to traverse_dataset that acts as a worker
1092 * thread for dmu_send_impl.
1096 send_cb(spa_t
*spa
, zilog_t
*zilog
, const blkptr_t
*bp
,
1097 const zbookmark_phys_t
*zb
, const struct dnode_phys
*dnp
, void *arg
)
1099 struct send_thread_arg
*sta
= arg
;
1100 struct send_range
*record
;
1102 ASSERT(zb
->zb_object
== DMU_META_DNODE_OBJECT
||
1103 zb
->zb_object
>= sta
->resume
.zb_object
);
1106 * All bps of an encrypted os should have the encryption bit set.
1107 * If this is not true it indicates tampering and we report an error.
1109 if (sta
->os
->os_encrypted
&&
1110 !BP_IS_HOLE(bp
) && !BP_USES_CRYPT(bp
)) {
1111 spa_log_error(spa
, zb
);
1112 zfs_panic_recover("unencrypted block in encrypted "
1113 "object set %llu", dmu_objset_id(sta
->os
));
1114 return (SET_ERROR(EIO
));
1118 return (SET_ERROR(EINTR
));
1119 if (zb
->zb_object
!= DMU_META_DNODE_OBJECT
&&
1120 DMU_OBJECT_IS_SPECIAL(zb
->zb_object
))
1122 atomic_inc_64(sta
->num_blocks_visited
);
1124 if (zb
->zb_level
== ZB_DNODE_LEVEL
) {
1125 if (zb
->zb_object
== DMU_META_DNODE_OBJECT
)
1127 record
= range_alloc(OBJECT
, zb
->zb_object
, 0, 0, B_FALSE
);
1128 record
->sru
.object
.bp
= *bp
;
1129 size_t size
= sizeof (*dnp
) * (dnp
->dn_extra_slots
+ 1);
1130 record
->sru
.object
.dnp
= kmem_alloc(size
, KM_SLEEP
);
1131 bcopy(dnp
, record
->sru
.object
.dnp
, size
);
1132 bqueue_enqueue(&sta
->q
, record
, sizeof (*record
));
1135 if (zb
->zb_level
== 0 && zb
->zb_object
== DMU_META_DNODE_OBJECT
&&
1137 record
= range_alloc(OBJECT_RANGE
, 0, zb
->zb_blkid
,
1138 zb
->zb_blkid
+ 1, B_FALSE
);
1139 record
->sru
.object_range
.bp
= *bp
;
1140 bqueue_enqueue(&sta
->q
, record
, sizeof (*record
));
1143 if (zb
->zb_level
< 0 || (zb
->zb_level
> 0 && !BP_IS_HOLE(bp
)))
1145 if (zb
->zb_object
== DMU_META_DNODE_OBJECT
&& !BP_IS_HOLE(bp
))
1148 uint64_t span
= bp_span_in_blocks(dnp
->dn_indblkshift
, zb
->zb_level
);
1152 * If this multiply overflows, we don't need to send this block.
1153 * Even if it has a birth time, it can never not be a hole, so
1154 * we don't need to send records for it.
1156 if (!overflow_multiply(span
, zb
->zb_blkid
, &start
) || (!(zb
->zb_blkid
==
1157 DMU_SPILL_BLKID
|| DMU_OT_IS_METADATA(dnp
->dn_type
)) &&
1158 span
* zb
->zb_blkid
> dnp
->dn_maxblkid
)) {
1159 ASSERT(BP_IS_HOLE(bp
));
1163 if (zb
->zb_blkid
== DMU_SPILL_BLKID
)
1164 ASSERT3U(BP_GET_TYPE(bp
), ==, DMU_OT_SA
);
1166 enum type record_type
= DATA
;
1169 else if (BP_IS_REDACTED(bp
))
1170 record_type
= REDACT
;
1174 record
= range_alloc(record_type
, zb
->zb_object
, start
,
1175 (start
+ span
< start
? 0 : start
+ span
), B_FALSE
);
1177 uint64_t datablksz
= (zb
->zb_blkid
== DMU_SPILL_BLKID
?
1178 BP_GET_LSIZE(bp
) : dnp
->dn_datablkszsec
<< SPA_MINBLOCKSHIFT
);
1180 if (BP_IS_HOLE(bp
)) {
1181 record
->sru
.hole
.datablksz
= datablksz
;
1182 } else if (BP_IS_REDACTED(bp
)) {
1183 record
->sru
.redact
.datablksz
= datablksz
;
1185 record
->sru
.data
.datablksz
= datablksz
;
1186 record
->sru
.data
.obj_type
= dnp
->dn_type
;
1187 record
->sru
.data
.bp
= *bp
;
1190 bqueue_enqueue(&sta
->q
, record
, sizeof (*record
));
1194 struct redact_list_cb_arg
{
1195 uint64_t *num_blocks_visited
;
1198 boolean_t mark_redact
;
1202 redact_list_cb(redact_block_phys_t
*rb
, void *arg
)
1204 struct redact_list_cb_arg
*rlcap
= arg
;
1206 atomic_inc_64(rlcap
->num_blocks_visited
);
1210 struct send_range
*data
= range_alloc(REDACT
, rb
->rbp_object
,
1211 rb
->rbp_blkid
, rb
->rbp_blkid
+ redact_block_get_count(rb
), B_FALSE
);
1212 ASSERT3U(data
->end_blkid
, >, rb
->rbp_blkid
);
1213 if (rlcap
->mark_redact
) {
1214 data
->type
= REDACT
;
1215 data
->sru
.redact
.datablksz
= redact_block_get_size(rb
);
1217 data
->type
= PREVIOUSLY_REDACTED
;
1219 bqueue_enqueue(rlcap
->q
, data
, sizeof (*data
));
1225 * This function kicks off the traverse_dataset. It also handles setting the
1226 * error code of the thread in case something goes wrong, and pushes the End of
1227 * Stream record when the traverse_dataset call has finished.
1230 send_traverse_thread(void *arg
)
1232 struct send_thread_arg
*st_arg
= arg
;
1234 struct send_range
*data
;
1235 fstrans_cookie_t cookie
= spl_fstrans_mark();
1237 err
= traverse_dataset_resume(st_arg
->os
->os_dsl_dataset
,
1238 st_arg
->fromtxg
, &st_arg
->resume
,
1239 st_arg
->flags
, send_cb
, st_arg
);
1242 st_arg
->error_code
= err
;
1243 data
= range_alloc(DATA
, 0, 0, 0, B_TRUE
);
1244 bqueue_enqueue_flush(&st_arg
->q
, data
, sizeof (*data
));
1245 spl_fstrans_unmark(cookie
);
1250 * Utility function that causes End of Stream records to compare after of all
1251 * others, so that other threads' comparison logic can stay simple.
1253 static int __attribute__((unused
))
1254 send_range_after(const struct send_range
*from
, const struct send_range
*to
)
1256 if (from
->eos_marker
== B_TRUE
)
1258 if (to
->eos_marker
== B_TRUE
)
1261 uint64_t from_obj
= from
->object
;
1262 uint64_t from_end_obj
= from
->object
+ 1;
1263 uint64_t to_obj
= to
->object
;
1264 uint64_t to_end_obj
= to
->object
+ 1;
1265 if (from_obj
== 0) {
1266 ASSERT(from
->type
== HOLE
|| from
->type
== OBJECT_RANGE
);
1267 from_obj
= from
->start_blkid
<< DNODES_PER_BLOCK_SHIFT
;
1268 from_end_obj
= from
->end_blkid
<< DNODES_PER_BLOCK_SHIFT
;
1271 ASSERT(to
->type
== HOLE
|| to
->type
== OBJECT_RANGE
);
1272 to_obj
= to
->start_blkid
<< DNODES_PER_BLOCK_SHIFT
;
1273 to_end_obj
= to
->end_blkid
<< DNODES_PER_BLOCK_SHIFT
;
1276 if (from_end_obj
<= to_obj
)
1278 if (from_obj
>= to_end_obj
)
1280 int64_t cmp
= TREE_CMP(to
->type
== OBJECT_RANGE
, from
->type
==
1284 cmp
= TREE_CMP(to
->type
== OBJECT
, from
->type
== OBJECT
);
1287 if (from
->end_blkid
<= to
->start_blkid
)
1289 if (from
->start_blkid
>= to
->end_blkid
)
1295 * Pop the new data off the queue, check that the records we receive are in
1296 * the right order, but do not free the old data. This is used so that the
1297 * records can be sent on to the main thread without copying the data.
1299 static struct send_range
*
1300 get_next_range_nofree(bqueue_t
*bq
, struct send_range
*prev
)
1302 struct send_range
*next
= bqueue_dequeue(bq
);
1303 ASSERT3S(send_range_after(prev
, next
), ==, -1);
1308 * Pop the new data off the queue, check that the records we receive are in
1309 * the right order, and free the old data.
1311 static struct send_range
*
1312 get_next_range(bqueue_t
*bq
, struct send_range
*prev
)
1314 struct send_range
*next
= get_next_range_nofree(bq
, prev
);
1320 redact_list_thread(void *arg
)
1322 struct redact_list_thread_arg
*rlt_arg
= arg
;
1323 struct send_range
*record
;
1324 fstrans_cookie_t cookie
= spl_fstrans_mark();
1325 if (rlt_arg
->rl
!= NULL
) {
1326 struct redact_list_cb_arg rlcba
= {0};
1327 rlcba
.cancel
= &rlt_arg
->cancel
;
1328 rlcba
.q
= &rlt_arg
->q
;
1329 rlcba
.num_blocks_visited
= rlt_arg
->num_blocks_visited
;
1330 rlcba
.mark_redact
= rlt_arg
->mark_redact
;
1331 int err
= dsl_redaction_list_traverse(rlt_arg
->rl
,
1332 &rlt_arg
->resume
, redact_list_cb
, &rlcba
);
1334 rlt_arg
->error_code
= err
;
1336 record
= range_alloc(DATA
, 0, 0, 0, B_TRUE
);
1337 bqueue_enqueue_flush(&rlt_arg
->q
, record
, sizeof (*record
));
1338 spl_fstrans_unmark(cookie
);
1344 * Compare the start point of the two provided ranges. End of stream ranges
1345 * compare last, objects compare before any data or hole inside that object and
1346 * multi-object holes that start at the same object.
1349 send_range_start_compare(struct send_range
*r1
, struct send_range
*r2
)
1351 uint64_t r1_objequiv
= r1
->object
;
1352 uint64_t r1_l0equiv
= r1
->start_blkid
;
1353 uint64_t r2_objequiv
= r2
->object
;
1354 uint64_t r2_l0equiv
= r2
->start_blkid
;
1355 int64_t cmp
= TREE_CMP(r1
->eos_marker
, r2
->eos_marker
);
1358 if (r1
->object
== 0) {
1359 r1_objequiv
= r1
->start_blkid
* DNODES_PER_BLOCK
;
1362 if (r2
->object
== 0) {
1363 r2_objequiv
= r2
->start_blkid
* DNODES_PER_BLOCK
;
1367 cmp
= TREE_CMP(r1_objequiv
, r2_objequiv
);
1370 cmp
= TREE_CMP(r2
->type
== OBJECT_RANGE
, r1
->type
== OBJECT_RANGE
);
1373 cmp
= TREE_CMP(r2
->type
== OBJECT
, r1
->type
== OBJECT
);
1377 return (TREE_CMP(r1_l0equiv
, r2_l0equiv
));
1388 * This function returns the next range the send_merge_thread should operate on.
1389 * The inputs are two arrays; the first one stores the range at the front of the
1390 * queues stored in the second one. The ranges are sorted in descending
1391 * priority order; the metadata from earlier ranges overrules metadata from
1392 * later ranges. out_mask is used to return which threads the ranges came from;
1393 * bit i is set if ranges[i] started at the same place as the returned range.
1395 * This code is not hardcoded to compare a specific number of threads; it could
1396 * be used with any number, just by changing the q_idx enum.
1398 * The "next range" is the one with the earliest start; if two starts are equal,
1399 * the highest-priority range is the next to operate on. If a higher-priority
1400 * range starts in the middle of the first range, then the first range will be
1401 * truncated to end where the higher-priority range starts, and we will operate
1402 * on that one next time. In this way, we make sure that each block covered by
1403 * some range gets covered by a returned range, and each block covered is
1404 * returned using the metadata of the highest-priority range it appears in.
1406 * For example, if the three ranges at the front of the queues were [2,4),
1407 * [3,5), and [1,3), then the ranges returned would be [1,2) with the metadata
1408 * from the third range, [2,4) with the metadata from the first range, and then
1409 * [4,5) with the metadata from the second.
1411 static struct send_range
*
1412 find_next_range(struct send_range
**ranges
, bqueue_t
**qs
, uint64_t *out_mask
)
1414 int idx
= 0; // index of the range with the earliest start
1417 for (i
= 1; i
< NUM_THREADS
; i
++) {
1418 if (send_range_start_compare(ranges
[i
], ranges
[idx
]) < 0)
1421 if (ranges
[idx
]->eos_marker
) {
1422 struct send_range
*ret
= range_alloc(DATA
, 0, 0, 0, B_TRUE
);
1427 * Find all the ranges that start at that same point.
1429 for (i
= 0; i
< NUM_THREADS
; i
++) {
1430 if (send_range_start_compare(ranges
[i
], ranges
[idx
]) == 0)
1435 * OBJECT_RANGE records only come from the TO thread, and should always
1436 * be treated as overlapping with nothing and sent on immediately. They
1437 * are only used in raw sends, and are never redacted.
1439 if (ranges
[idx
]->type
== OBJECT_RANGE
) {
1440 ASSERT3U(idx
, ==, TO_IDX
);
1441 ASSERT3U(*out_mask
, ==, 1 << TO_IDX
);
1442 struct send_range
*ret
= ranges
[idx
];
1443 ranges
[idx
] = get_next_range_nofree(qs
[idx
], ranges
[idx
]);
1447 * Find the first start or end point after the start of the first range.
1449 uint64_t first_change
= ranges
[idx
]->end_blkid
;
1450 for (i
= 0; i
< NUM_THREADS
; i
++) {
1451 if (i
== idx
|| ranges
[i
]->eos_marker
||
1452 ranges
[i
]->object
> ranges
[idx
]->object
||
1453 ranges
[i
]->object
== DMU_META_DNODE_OBJECT
)
1455 ASSERT3U(ranges
[i
]->object
, ==, ranges
[idx
]->object
);
1456 if (first_change
> ranges
[i
]->start_blkid
&&
1457 (bmask
& (1 << i
)) == 0)
1458 first_change
= ranges
[i
]->start_blkid
;
1459 else if (first_change
> ranges
[i
]->end_blkid
)
1460 first_change
= ranges
[i
]->end_blkid
;
1463 * Update all ranges to no longer overlap with the range we're
1464 * returning. All such ranges must start at the same place as the range
1465 * being returned, and end at or after first_change. Thus we update
1466 * their start to first_change. If that makes them size 0, then free
1467 * them and pull a new range from that thread.
1469 for (i
= 0; i
< NUM_THREADS
; i
++) {
1470 if (i
== idx
|| (bmask
& (1 << i
)) == 0)
1472 ASSERT3U(first_change
, >, ranges
[i
]->start_blkid
);
1473 ranges
[i
]->start_blkid
= first_change
;
1474 ASSERT3U(ranges
[i
]->start_blkid
, <=, ranges
[i
]->end_blkid
);
1475 if (ranges
[i
]->start_blkid
== ranges
[i
]->end_blkid
)
1476 ranges
[i
] = get_next_range(qs
[i
], ranges
[i
]);
1479 * Short-circuit the simple case; if the range doesn't overlap with
1480 * anything else, or it only overlaps with things that start at the same
1481 * place and are longer, send it on.
1483 if (first_change
== ranges
[idx
]->end_blkid
) {
1484 struct send_range
*ret
= ranges
[idx
];
1485 ranges
[idx
] = get_next_range_nofree(qs
[idx
], ranges
[idx
]);
1490 * Otherwise, return a truncated copy of ranges[idx] and move the start
1491 * of ranges[idx] back to first_change.
1493 struct send_range
*ret
= kmem_alloc(sizeof (*ret
), KM_SLEEP
);
1494 *ret
= *ranges
[idx
];
1495 ret
->end_blkid
= first_change
;
1496 ranges
[idx
]->start_blkid
= first_change
;
1500 #define FROM_AND_REDACT_BITS ((1 << REDACT_IDX) | (1 << FROM_IDX))
1503 * Merge the results from the from thread and the to thread, and then hand the
1504 * records off to send_prefetch_thread to prefetch them. If this is not a
1505 * send from a redaction bookmark, the from thread will push an end of stream
1506 * record and stop, and we'll just send everything that was changed in the
1507 * to_ds since the ancestor's creation txg. If it is, then since
1508 * traverse_dataset has a canonical order, we can compare each change as
1509 * they're pulled off the queues. That will give us a stream that is
1510 * appropriately sorted, and covers all records. In addition, we pull the
1511 * data from the redact_list_thread and use that to determine which blocks
1512 * should be redacted.
1515 send_merge_thread(void *arg
)
1517 struct send_merge_thread_arg
*smt_arg
= arg
;
1518 struct send_range
*front_ranges
[NUM_THREADS
];
1519 bqueue_t
*queues
[NUM_THREADS
];
1521 fstrans_cookie_t cookie
= spl_fstrans_mark();
1523 if (smt_arg
->redact_arg
== NULL
) {
1524 front_ranges
[REDACT_IDX
] =
1525 kmem_zalloc(sizeof (struct send_range
), KM_SLEEP
);
1526 front_ranges
[REDACT_IDX
]->eos_marker
= B_TRUE
;
1527 front_ranges
[REDACT_IDX
]->type
= REDACT
;
1528 queues
[REDACT_IDX
] = NULL
;
1530 front_ranges
[REDACT_IDX
] =
1531 bqueue_dequeue(&smt_arg
->redact_arg
->q
);
1532 queues
[REDACT_IDX
] = &smt_arg
->redact_arg
->q
;
1534 front_ranges
[TO_IDX
] = bqueue_dequeue(&smt_arg
->to_arg
->q
);
1535 queues
[TO_IDX
] = &smt_arg
->to_arg
->q
;
1536 front_ranges
[FROM_IDX
] = bqueue_dequeue(&smt_arg
->from_arg
->q
);
1537 queues
[FROM_IDX
] = &smt_arg
->from_arg
->q
;
1539 struct send_range
*range
;
1540 for (range
= find_next_range(front_ranges
, queues
, &mask
);
1541 !range
->eos_marker
&& err
== 0 && !smt_arg
->cancel
;
1542 range
= find_next_range(front_ranges
, queues
, &mask
)) {
1544 * If the range in question was in both the from redact bookmark
1545 * and the bookmark we're using to redact, then don't send it.
1546 * It's already redacted on the receiving system, so a redaction
1547 * record would be redundant.
1549 if ((mask
& FROM_AND_REDACT_BITS
) == FROM_AND_REDACT_BITS
) {
1550 ASSERT3U(range
->type
, ==, REDACT
);
1554 bqueue_enqueue(&smt_arg
->q
, range
, sizeof (*range
));
1556 if (smt_arg
->to_arg
->error_code
!= 0) {
1557 err
= smt_arg
->to_arg
->error_code
;
1558 } else if (smt_arg
->from_arg
->error_code
!= 0) {
1559 err
= smt_arg
->from_arg
->error_code
;
1560 } else if (smt_arg
->redact_arg
!= NULL
&&
1561 smt_arg
->redact_arg
->error_code
!= 0) {
1562 err
= smt_arg
->redact_arg
->error_code
;
1565 if (smt_arg
->cancel
&& err
== 0)
1566 err
= SET_ERROR(EINTR
);
1567 smt_arg
->error
= err
;
1568 if (smt_arg
->error
!= 0) {
1569 smt_arg
->to_arg
->cancel
= B_TRUE
;
1570 smt_arg
->from_arg
->cancel
= B_TRUE
;
1571 if (smt_arg
->redact_arg
!= NULL
)
1572 smt_arg
->redact_arg
->cancel
= B_TRUE
;
1574 for (int i
= 0; i
< NUM_THREADS
; i
++) {
1575 while (!front_ranges
[i
]->eos_marker
) {
1576 front_ranges
[i
] = get_next_range(queues
[i
],
1579 range_free(front_ranges
[i
]);
1582 range
= kmem_zalloc(sizeof (*range
), KM_SLEEP
);
1583 range
->eos_marker
= B_TRUE
;
1584 bqueue_enqueue_flush(&smt_arg
->q
, range
, 1);
1585 spl_fstrans_unmark(cookie
);
1589 struct send_reader_thread_arg
{
1590 struct send_merge_thread_arg
*smta
;
1593 boolean_t issue_reads
;
1594 uint64_t featureflags
;
1599 dmu_send_read_done(zio_t
*zio
)
1601 struct send_range
*range
= zio
->io_private
;
1603 mutex_enter(&range
->sru
.data
.lock
);
1604 if (zio
->io_error
!= 0) {
1605 abd_free(range
->sru
.data
.abd
);
1606 range
->sru
.data
.abd
= NULL
;
1607 range
->sru
.data
.io_err
= zio
->io_error
;
1610 ASSERT(range
->sru
.data
.io_outstanding
);
1611 range
->sru
.data
.io_outstanding
= B_FALSE
;
1612 cv_broadcast(&range
->sru
.data
.cv
);
1613 mutex_exit(&range
->sru
.data
.lock
);
1617 issue_data_read(struct send_reader_thread_arg
*srta
, struct send_range
*range
)
1619 struct srd
*srdp
= &range
->sru
.data
;
1620 blkptr_t
*bp
= &srdp
->bp
;
1621 objset_t
*os
= srta
->smta
->os
;
1623 ASSERT3U(range
->type
, ==, DATA
);
1624 ASSERT3U(range
->start_blkid
+ 1, ==, range
->end_blkid
);
1626 * If we have large blocks stored on disk but
1627 * the send flags don't allow us to send large
1628 * blocks, we split the data from the arc buf
1631 boolean_t split_large_blocks
=
1632 srdp
->datablksz
> SPA_OLD_MAXBLOCKSIZE
&&
1633 !(srta
->featureflags
& DMU_BACKUP_FEATURE_LARGE_BLOCKS
);
1635 * We should only request compressed data from the ARC if all
1636 * the following are true:
1637 * - stream compression was requested
1638 * - we aren't splitting large blocks into smaller chunks
1639 * - the data won't need to be byteswapped before sending
1640 * - this isn't an embedded block
1641 * - this isn't metadata (if receiving on a different endian
1642 * system it can be byteswapped more easily)
1644 boolean_t request_compressed
=
1645 (srta
->featureflags
& DMU_BACKUP_FEATURE_COMPRESSED
) &&
1646 !split_large_blocks
&& !BP_SHOULD_BYTESWAP(bp
) &&
1647 !BP_IS_EMBEDDED(bp
) && !DMU_OT_IS_METADATA(BP_GET_TYPE(bp
));
1649 enum zio_flag zioflags
= ZIO_FLAG_CANFAIL
;
1651 if (srta
->featureflags
& DMU_BACKUP_FEATURE_RAW
)
1652 zioflags
|= ZIO_FLAG_RAW
;
1653 else if (request_compressed
)
1654 zioflags
|= ZIO_FLAG_RAW_COMPRESS
;
1656 srdp
->datasz
= (zioflags
& ZIO_FLAG_RAW_COMPRESS
) ?
1657 BP_GET_PSIZE(bp
) : BP_GET_LSIZE(bp
);
1659 if (!srta
->issue_reads
)
1661 if (BP_IS_REDACTED(bp
))
1663 if (send_do_embed(bp
, srta
->featureflags
))
1666 zbookmark_phys_t zb
= {
1667 .zb_objset
= dmu_objset_id(os
),
1668 .zb_object
= range
->object
,
1670 .zb_blkid
= range
->start_blkid
,
1673 arc_flags_t aflags
= ARC_FLAG_CACHED_ONLY
;
1675 int arc_err
= arc_read(NULL
, os
->os_spa
, bp
,
1676 arc_getbuf_func
, &srdp
->abuf
, ZIO_PRIORITY_ASYNC_READ
,
1677 zioflags
, &aflags
, &zb
);
1679 * If the data is not already cached in the ARC, we read directly
1680 * from zio. This avoids the performance overhead of adding a new
1681 * entry to the ARC, and we also avoid polluting the ARC cache with
1682 * data that is not likely to be used in the future.
1685 srdp
->abd
= abd_alloc_linear(srdp
->datasz
, B_FALSE
);
1686 srdp
->io_outstanding
= B_TRUE
;
1687 zio_nowait(zio_read(NULL
, os
->os_spa
, bp
, srdp
->abd
,
1688 srdp
->datasz
, dmu_send_read_done
, range
,
1689 ZIO_PRIORITY_ASYNC_READ
, zioflags
, &zb
));
1694 * Create a new record with the given values.
1697 enqueue_range(struct send_reader_thread_arg
*srta
, bqueue_t
*q
, dnode_t
*dn
,
1698 uint64_t blkid
, uint64_t count
, const blkptr_t
*bp
, uint32_t datablksz
)
1700 enum type range_type
= (bp
== NULL
|| BP_IS_HOLE(bp
) ? HOLE
:
1701 (BP_IS_REDACTED(bp
) ? REDACT
: DATA
));
1703 struct send_range
*range
= range_alloc(range_type
, dn
->dn_object
,
1704 blkid
, blkid
+ count
, B_FALSE
);
1706 if (blkid
== DMU_SPILL_BLKID
)
1707 ASSERT3U(BP_GET_TYPE(bp
), ==, DMU_OT_SA
);
1709 switch (range_type
) {
1711 range
->sru
.hole
.datablksz
= datablksz
;
1714 ASSERT3U(count
, ==, 1);
1715 range
->sru
.data
.datablksz
= datablksz
;
1716 range
->sru
.data
.obj_type
= dn
->dn_type
;
1717 range
->sru
.data
.bp
= *bp
;
1718 issue_data_read(srta
, range
);
1721 range
->sru
.redact
.datablksz
= datablksz
;
1726 bqueue_enqueue(q
, range
, datablksz
);
1730 * This thread is responsible for two things: First, it retrieves the correct
1731 * blkptr in the to ds if we need to send the data because of something from
1732 * the from thread. As a result of this, we're the first ones to discover that
1733 * some indirect blocks can be discarded because they're not holes. Second,
1734 * it issues prefetches for the data we need to send.
1737 send_reader_thread(void *arg
)
1739 struct send_reader_thread_arg
*srta
= arg
;
1740 struct send_merge_thread_arg
*smta
= srta
->smta
;
1741 bqueue_t
*inq
= &smta
->q
;
1742 bqueue_t
*outq
= &srta
->q
;
1743 objset_t
*os
= smta
->os
;
1744 fstrans_cookie_t cookie
= spl_fstrans_mark();
1745 struct send_range
*range
= bqueue_dequeue(inq
);
1749 * If the record we're analyzing is from a redaction bookmark from the
1750 * fromds, then we need to know whether or not it exists in the tods so
1751 * we know whether to create records for it or not. If it does, we need
1752 * the datablksz so we can generate an appropriate record for it.
1753 * Finally, if it isn't redacted, we need the blkptr so that we can send
1754 * a WRITE record containing the actual data.
1756 uint64_t last_obj
= UINT64_MAX
;
1757 uint64_t last_obj_exists
= B_TRUE
;
1758 while (!range
->eos_marker
&& !srta
->cancel
&& smta
->error
== 0 &&
1760 switch (range
->type
) {
1762 issue_data_read(srta
, range
);
1763 bqueue_enqueue(outq
, range
, range
->sru
.data
.datablksz
);
1764 range
= get_next_range_nofree(inq
, range
);
1769 case REDACT
: // Redacted blocks must exist
1770 bqueue_enqueue(outq
, range
, sizeof (*range
));
1771 range
= get_next_range_nofree(inq
, range
);
1773 case PREVIOUSLY_REDACTED
: {
1775 * This entry came from the "from bookmark" when
1776 * sending from a bookmark that has a redaction
1777 * list. We need to check if this object/blkid
1778 * exists in the target ("to") dataset, and if
1779 * not then we drop this entry. We also need
1780 * to fill in the block pointer so that we know
1783 * To accomplish the above, we first cache whether or
1784 * not the last object we examined exists. If it
1785 * doesn't, we can drop this record. If it does, we hold
1786 * the dnode and use it to call dbuf_dnode_findbp. We do
1787 * this instead of dbuf_bookmark_findbp because we will
1788 * often operate on large ranges, and holding the dnode
1789 * once is more efficient.
1791 boolean_t object_exists
= B_TRUE
;
1793 * If the data is redacted, we only care if it exists,
1794 * so that we don't send records for objects that have
1798 if (range
->object
== last_obj
&& !last_obj_exists
) {
1800 * If we're still examining the same object as
1801 * previously, and it doesn't exist, we don't
1802 * need to call dbuf_bookmark_findbp.
1804 object_exists
= B_FALSE
;
1806 err
= dnode_hold(os
, range
->object
, FTAG
, &dn
);
1807 if (err
== ENOENT
) {
1808 object_exists
= B_FALSE
;
1811 last_obj
= range
->object
;
1812 last_obj_exists
= object_exists
;
1817 } else if (!object_exists
) {
1819 * The block was modified, but doesn't
1820 * exist in the to dataset; if it was
1821 * deleted in the to dataset, then we'll
1822 * visit the hole bp for it at some point.
1824 range
= get_next_range(inq
, range
);
1828 (dn
->dn_maxblkid
< range
->end_blkid
?
1829 dn
->dn_maxblkid
: range
->end_blkid
);
1831 * The object exists, so we need to try to find the
1832 * blkptr for each block in the range we're processing.
1834 rw_enter(&dn
->dn_struct_rwlock
, RW_READER
);
1835 for (uint64_t blkid
= range
->start_blkid
;
1836 blkid
< file_max
; blkid
++) {
1838 uint32_t datablksz
=
1839 dn
->dn_phys
->dn_datablkszsec
<<
1841 uint64_t offset
= blkid
* datablksz
;
1843 * This call finds the next non-hole block in
1844 * the object. This is to prevent a
1845 * performance problem where we're unredacting
1846 * a large hole. Using dnode_next_offset to
1847 * skip over the large hole avoids iterating
1848 * over every block in it.
1850 err
= dnode_next_offset(dn
, DNODE_FIND_HAVELOCK
,
1853 offset
= UINT64_MAX
;
1855 } else if (err
!= 0) {
1858 if (offset
!= blkid
* datablksz
) {
1860 * if there is a hole from here
1863 offset
= MIN(offset
, file_max
*
1865 uint64_t nblks
= (offset
/ datablksz
) -
1867 enqueue_range(srta
, outq
, dn
, blkid
,
1868 nblks
, NULL
, datablksz
);
1871 if (blkid
>= file_max
)
1873 err
= dbuf_dnode_findbp(dn
, 0, blkid
, &bp
,
1877 ASSERT(!BP_IS_HOLE(&bp
));
1878 enqueue_range(srta
, outq
, dn
, blkid
, 1, &bp
,
1881 rw_exit(&dn
->dn_struct_rwlock
);
1882 dnode_rele(dn
, FTAG
);
1883 range
= get_next_range(inq
, range
);
1887 if (srta
->cancel
|| err
!= 0) {
1888 smta
->cancel
= B_TRUE
;
1890 } else if (smta
->error
!= 0) {
1891 srta
->error
= smta
->error
;
1893 while (!range
->eos_marker
)
1894 range
= get_next_range(inq
, range
);
1896 bqueue_enqueue_flush(outq
, range
, 1);
1897 spl_fstrans_unmark(cookie
);
1901 #define NUM_SNAPS_NOT_REDACTED UINT64_MAX
1903 struct dmu_send_params
{
1905 void *tag
; // Tag that dp was held with, will be used to release dp.
1907 /* To snapshot args */
1909 dsl_dataset_t
*to_ds
;
1910 /* From snapshot args */
1911 zfs_bookmark_phys_t ancestor_zb
;
1912 uint64_t *fromredactsnaps
;
1913 /* NUM_SNAPS_NOT_REDACTED if not sending from redaction bookmark */
1914 uint64_t numfromredactsnaps
;
1918 boolean_t large_block_ok
;
1919 boolean_t compressok
;
1924 uint64_t saved_guid
;
1925 zfs_bookmark_phys_t
*redactbook
;
1926 /* Stream output params */
1927 dmu_send_outparams_t
*dso
;
1929 /* Stream progress params */
1932 char saved_toname
[MAXNAMELEN
];
1936 setup_featureflags(struct dmu_send_params
*dspp
, objset_t
*os
,
1937 uint64_t *featureflags
)
1939 dsl_dataset_t
*to_ds
= dspp
->to_ds
;
1940 dsl_pool_t
*dp
= dspp
->dp
;
1942 if (dmu_objset_type(os
) == DMU_OST_ZFS
) {
1944 if (zfs_get_zplprop(os
, ZFS_PROP_VERSION
, &version
) != 0)
1945 return (SET_ERROR(EINVAL
));
1947 if (version
>= ZPL_VERSION_SA
)
1948 *featureflags
|= DMU_BACKUP_FEATURE_SA_SPILL
;
1952 /* raw sends imply large_block_ok */
1953 if ((dspp
->rawok
|| dspp
->large_block_ok
) &&
1954 dsl_dataset_feature_is_active(to_ds
, SPA_FEATURE_LARGE_BLOCKS
)) {
1955 *featureflags
|= DMU_BACKUP_FEATURE_LARGE_BLOCKS
;
1958 /* encrypted datasets will not have embedded blocks */
1959 if ((dspp
->embedok
|| dspp
->rawok
) && !os
->os_encrypted
&&
1960 spa_feature_is_active(dp
->dp_spa
, SPA_FEATURE_EMBEDDED_DATA
)) {
1961 *featureflags
|= DMU_BACKUP_FEATURE_EMBED_DATA
;
1964 /* raw send implies compressok */
1965 if (dspp
->compressok
|| dspp
->rawok
)
1966 *featureflags
|= DMU_BACKUP_FEATURE_COMPRESSED
;
1968 if (dspp
->rawok
&& os
->os_encrypted
)
1969 *featureflags
|= DMU_BACKUP_FEATURE_RAW
;
1971 if ((*featureflags
&
1972 (DMU_BACKUP_FEATURE_EMBED_DATA
| DMU_BACKUP_FEATURE_COMPRESSED
|
1973 DMU_BACKUP_FEATURE_RAW
)) != 0 &&
1974 spa_feature_is_active(dp
->dp_spa
, SPA_FEATURE_LZ4_COMPRESS
)) {
1975 *featureflags
|= DMU_BACKUP_FEATURE_LZ4
;
1979 * We specifically do not include DMU_BACKUP_FEATURE_EMBED_DATA here to
1980 * allow sending ZSTD compressed datasets to a receiver that does not
1983 if ((*featureflags
&
1984 (DMU_BACKUP_FEATURE_COMPRESSED
| DMU_BACKUP_FEATURE_RAW
)) != 0 &&
1985 dsl_dataset_feature_is_active(to_ds
, SPA_FEATURE_ZSTD_COMPRESS
)) {
1986 *featureflags
|= DMU_BACKUP_FEATURE_ZSTD
;
1989 if (dspp
->resumeobj
!= 0 || dspp
->resumeoff
!= 0) {
1990 *featureflags
|= DMU_BACKUP_FEATURE_RESUMING
;
1993 if (dspp
->redactbook
!= NULL
) {
1994 *featureflags
|= DMU_BACKUP_FEATURE_REDACTED
;
1997 if (dsl_dataset_feature_is_active(to_ds
, SPA_FEATURE_LARGE_DNODE
)) {
1998 *featureflags
|= DMU_BACKUP_FEATURE_LARGE_DNODE
;
2003 static dmu_replay_record_t
*
2004 create_begin_record(struct dmu_send_params
*dspp
, objset_t
*os
,
2005 uint64_t featureflags
)
2007 dmu_replay_record_t
*drr
= kmem_zalloc(sizeof (dmu_replay_record_t
),
2009 drr
->drr_type
= DRR_BEGIN
;
2011 struct drr_begin
*drrb
= &drr
->drr_u
.drr_begin
;
2012 dsl_dataset_t
*to_ds
= dspp
->to_ds
;
2014 drrb
->drr_magic
= DMU_BACKUP_MAGIC
;
2015 drrb
->drr_creation_time
= dsl_dataset_phys(to_ds
)->ds_creation_time
;
2016 drrb
->drr_type
= dmu_objset_type(os
);
2017 drrb
->drr_toguid
= dsl_dataset_phys(to_ds
)->ds_guid
;
2018 drrb
->drr_fromguid
= dspp
->ancestor_zb
.zbm_guid
;
2020 DMU_SET_STREAM_HDRTYPE(drrb
->drr_versioninfo
, DMU_SUBSTREAM
);
2021 DMU_SET_FEATUREFLAGS(drrb
->drr_versioninfo
, featureflags
);
2024 drrb
->drr_flags
|= DRR_FLAG_CLONE
;
2025 if (dsl_dataset_phys(dspp
->to_ds
)->ds_flags
& DS_FLAG_CI_DATASET
)
2026 drrb
->drr_flags
|= DRR_FLAG_CI_DATA
;
2027 if (zfs_send_set_freerecords_bit
)
2028 drrb
->drr_flags
|= DRR_FLAG_FREERECORDS
;
2029 drr
->drr_u
.drr_begin
.drr_flags
|= DRR_FLAG_SPILL_BLOCK
;
2031 if (dspp
->savedok
) {
2032 drrb
->drr_toguid
= dspp
->saved_guid
;
2033 strlcpy(drrb
->drr_toname
, dspp
->saved_toname
,
2034 sizeof (drrb
->drr_toname
));
2036 dsl_dataset_name(to_ds
, drrb
->drr_toname
);
2037 if (!to_ds
->ds_is_snapshot
) {
2038 (void) strlcat(drrb
->drr_toname
, "@--head--",
2039 sizeof (drrb
->drr_toname
));
2046 setup_to_thread(struct send_thread_arg
*to_arg
, objset_t
*to_os
,
2047 dmu_sendstatus_t
*dssp
, uint64_t fromtxg
, boolean_t rawok
)
2049 VERIFY0(bqueue_init(&to_arg
->q
, zfs_send_no_prefetch_queue_ff
,
2050 MAX(zfs_send_no_prefetch_queue_length
, 2 * zfs_max_recordsize
),
2051 offsetof(struct send_range
, ln
)));
2052 to_arg
->error_code
= 0;
2053 to_arg
->cancel
= B_FALSE
;
2055 to_arg
->fromtxg
= fromtxg
;
2056 to_arg
->flags
= TRAVERSE_PRE
| TRAVERSE_PREFETCH_METADATA
;
2058 to_arg
->flags
|= TRAVERSE_NO_DECRYPT
;
2059 to_arg
->num_blocks_visited
= &dssp
->dss_blocks
;
2060 (void) thread_create(NULL
, 0, send_traverse_thread
, to_arg
, 0,
2061 curproc
, TS_RUN
, minclsyspri
);
2065 setup_from_thread(struct redact_list_thread_arg
*from_arg
,
2066 redaction_list_t
*from_rl
, dmu_sendstatus_t
*dssp
)
2068 VERIFY0(bqueue_init(&from_arg
->q
, zfs_send_no_prefetch_queue_ff
,
2069 MAX(zfs_send_no_prefetch_queue_length
, 2 * zfs_max_recordsize
),
2070 offsetof(struct send_range
, ln
)));
2071 from_arg
->error_code
= 0;
2072 from_arg
->cancel
= B_FALSE
;
2073 from_arg
->rl
= from_rl
;
2074 from_arg
->mark_redact
= B_FALSE
;
2075 from_arg
->num_blocks_visited
= &dssp
->dss_blocks
;
2077 * If from_ds is null, send_traverse_thread just returns success and
2078 * enqueues an eos marker.
2080 (void) thread_create(NULL
, 0, redact_list_thread
, from_arg
, 0,
2081 curproc
, TS_RUN
, minclsyspri
);
2085 setup_redact_list_thread(struct redact_list_thread_arg
*rlt_arg
,
2086 struct dmu_send_params
*dspp
, redaction_list_t
*rl
, dmu_sendstatus_t
*dssp
)
2088 if (dspp
->redactbook
== NULL
)
2091 rlt_arg
->cancel
= B_FALSE
;
2092 VERIFY0(bqueue_init(&rlt_arg
->q
, zfs_send_no_prefetch_queue_ff
,
2093 MAX(zfs_send_no_prefetch_queue_length
, 2 * zfs_max_recordsize
),
2094 offsetof(struct send_range
, ln
)));
2095 rlt_arg
->error_code
= 0;
2096 rlt_arg
->mark_redact
= B_TRUE
;
2098 rlt_arg
->num_blocks_visited
= &dssp
->dss_blocks
;
2100 (void) thread_create(NULL
, 0, redact_list_thread
, rlt_arg
, 0,
2101 curproc
, TS_RUN
, minclsyspri
);
2105 setup_merge_thread(struct send_merge_thread_arg
*smt_arg
,
2106 struct dmu_send_params
*dspp
, struct redact_list_thread_arg
*from_arg
,
2107 struct send_thread_arg
*to_arg
, struct redact_list_thread_arg
*rlt_arg
,
2110 VERIFY0(bqueue_init(&smt_arg
->q
, zfs_send_no_prefetch_queue_ff
,
2111 MAX(zfs_send_no_prefetch_queue_length
, 2 * zfs_max_recordsize
),
2112 offsetof(struct send_range
, ln
)));
2113 smt_arg
->cancel
= B_FALSE
;
2115 smt_arg
->from_arg
= from_arg
;
2116 smt_arg
->to_arg
= to_arg
;
2117 if (dspp
->redactbook
!= NULL
)
2118 smt_arg
->redact_arg
= rlt_arg
;
2121 (void) thread_create(NULL
, 0, send_merge_thread
, smt_arg
, 0, curproc
,
2122 TS_RUN
, minclsyspri
);
2126 setup_reader_thread(struct send_reader_thread_arg
*srt_arg
,
2127 struct dmu_send_params
*dspp
, struct send_merge_thread_arg
*smt_arg
,
2128 uint64_t featureflags
)
2130 VERIFY0(bqueue_init(&srt_arg
->q
, zfs_send_queue_ff
,
2131 MAX(zfs_send_queue_length
, 2 * zfs_max_recordsize
),
2132 offsetof(struct send_range
, ln
)));
2133 srt_arg
->smta
= smt_arg
;
2134 srt_arg
->issue_reads
= !dspp
->dso
->dso_dryrun
;
2135 srt_arg
->featureflags
= featureflags
;
2136 (void) thread_create(NULL
, 0, send_reader_thread
, srt_arg
, 0,
2137 curproc
, TS_RUN
, minclsyspri
);
2141 setup_resume_points(struct dmu_send_params
*dspp
,
2142 struct send_thread_arg
*to_arg
, struct redact_list_thread_arg
*from_arg
,
2143 struct redact_list_thread_arg
*rlt_arg
,
2144 struct send_merge_thread_arg
*smt_arg
, boolean_t resuming
, objset_t
*os
,
2145 redaction_list_t
*redact_rl
, nvlist_t
*nvl
)
2147 dsl_dataset_t
*to_ds
= dspp
->to_ds
;
2153 obj
= dspp
->resumeobj
;
2154 dmu_object_info_t to_doi
;
2155 err
= dmu_object_info(os
, obj
, &to_doi
);
2159 blkid
= dspp
->resumeoff
/ to_doi
.doi_data_block_size
;
2162 * If we're resuming a redacted send, we can skip to the appropriate
2163 * point in the redaction bookmark by binary searching through it.
2165 if (redact_rl
!= NULL
) {
2166 SET_BOOKMARK(&rlt_arg
->resume
, to_ds
->ds_object
, obj
, 0, blkid
);
2169 SET_BOOKMARK(&to_arg
->resume
, to_ds
->ds_object
, obj
, 0, blkid
);
2170 if (nvlist_exists(nvl
, BEGINNV_REDACT_FROM_SNAPS
)) {
2171 uint64_t objset
= dspp
->ancestor_zb
.zbm_redaction_obj
;
2173 * Note: If the resume point is in an object whose
2174 * blocksize is different in the from vs to snapshots,
2175 * we will have divided by the "wrong" blocksize.
2176 * However, in this case fromsnap's send_cb() will
2177 * detect that the blocksize has changed and therefore
2178 * ignore this object.
2180 * If we're resuming a send from a redaction bookmark,
2181 * we still cannot accidentally suggest blocks behind
2182 * the to_ds. In addition, we know that any blocks in
2183 * the object in the to_ds will have to be sent, since
2184 * the size changed. Therefore, we can't cause any harm
2187 SET_BOOKMARK(&from_arg
->resume
, objset
, obj
, 0, blkid
);
2190 fnvlist_add_uint64(nvl
, BEGINNV_RESUME_OBJECT
, dspp
->resumeobj
);
2191 fnvlist_add_uint64(nvl
, BEGINNV_RESUME_OFFSET
, dspp
->resumeoff
);
2196 static dmu_sendstatus_t
*
2197 setup_send_progress(struct dmu_send_params
*dspp
)
2199 dmu_sendstatus_t
*dssp
= kmem_zalloc(sizeof (*dssp
), KM_SLEEP
);
2200 dssp
->dss_outfd
= dspp
->outfd
;
2201 dssp
->dss_off
= dspp
->off
;
2202 dssp
->dss_proc
= curproc
;
2203 mutex_enter(&dspp
->to_ds
->ds_sendstream_lock
);
2204 list_insert_head(&dspp
->to_ds
->ds_sendstreams
, dssp
);
2205 mutex_exit(&dspp
->to_ds
->ds_sendstream_lock
);
2210 * Actually do the bulk of the work in a zfs send.
2212 * The idea is that we want to do a send from ancestor_zb to to_ds. We also
2213 * want to not send any data that has been modified by all the datasets in
2214 * redactsnaparr, and store the list of blocks that are redacted in this way in
2215 * a bookmark named redactbook, created on the to_ds. We do this by creating
2216 * several worker threads, whose function is described below.
2218 * There are three cases.
2219 * The first case is a redacted zfs send. In this case there are 5 threads.
2220 * The first thread is the to_ds traversal thread: it calls dataset_traverse on
2221 * the to_ds and finds all the blocks that have changed since ancestor_zb (if
2222 * it's a full send, that's all blocks in the dataset). It then sends those
2223 * blocks on to the send merge thread. The redact list thread takes the data
2224 * from the redaction bookmark and sends those blocks on to the send merge
2225 * thread. The send merge thread takes the data from the to_ds traversal
2226 * thread, and combines it with the redaction records from the redact list
2227 * thread. If a block appears in both the to_ds's data and the redaction data,
2228 * the send merge thread will mark it as redacted and send it on to the prefetch
2229 * thread. Otherwise, the send merge thread will send the block on to the
2230 * prefetch thread unchanged. The prefetch thread will issue prefetch reads for
2231 * any data that isn't redacted, and then send the data on to the main thread.
2232 * The main thread behaves the same as in a normal send case, issuing demand
2233 * reads for data blocks and sending out records over the network
2235 * The graphic below diagrams the flow of data in the case of a redacted zfs
2236 * send. Each box represents a thread, and each line represents the flow of
2239 * Records from the |
2240 * redaction bookmark |
2241 * +--------------------+ | +---------------------------+
2242 * | | v | Send Merge Thread |
2243 * | Redact List Thread +----------> Apply redaction marks to |
2244 * | | | records as specified by |
2245 * +--------------------+ | redaction ranges |
2246 * +----^---------------+------+
2249 * | +------------v--------+
2250 * | | Prefetch Thread |
2251 * +--------------------+ | | Issues prefetch |
2252 * | to_ds Traversal | | | reads of data blocks|
2253 * | Thread (finds +---------------+ +------------+--------+
2254 * | candidate blocks) | Blocks modified | Prefetched data
2255 * +--------------------+ by to_ds since |
2256 * ancestor_zb +------------v----+
2257 * | Main Thread | File Descriptor
2258 * | Sends data over +->(to zfs receive)
2260 * +-----------------+
2262 * The second case is an incremental send from a redaction bookmark. The to_ds
2263 * traversal thread and the main thread behave the same as in the redacted
2264 * send case. The new thread is the from bookmark traversal thread. It
2265 * iterates over the redaction list in the redaction bookmark, and enqueues
2266 * records for each block that was redacted in the original send. The send
2267 * merge thread now has to merge the data from the two threads. For details
2268 * about that process, see the header comment of send_merge_thread(). Any data
2269 * it decides to send on will be prefetched by the prefetch thread. Note that
2270 * you can perform a redacted send from a redaction bookmark; in that case,
2271 * the data flow behaves very similarly to the flow in the redacted send case,
2272 * except with the addition of the bookmark traversal thread iterating over the
2273 * redaction bookmark. The send_merge_thread also has to take on the
2274 * responsibility of merging the redact list thread's records, the bookmark
2275 * traversal thread's records, and the to_ds records.
2277 * +---------------------+
2279 * | Redact List Thread +--------------+
2281 * +---------------------+ |
2282 * Blocks in redaction list | Ranges modified by every secure snap
2283 * of from bookmark | (or EOS if not readcted)
2285 * +---------------------+ | +----v----------------------+
2286 * | bookmark Traversal | v | Send Merge Thread |
2287 * | Thread (finds +---------> Merges bookmark, rlt, and |
2288 * | candidate blocks) | | to_ds send records |
2289 * +---------------------+ +----^---------------+------+
2291 * | +------------v--------+
2292 * | | Prefetch Thread |
2293 * +--------------------+ | | Issues prefetch |
2294 * | to_ds Traversal | | | reads of data blocks|
2295 * | Thread (finds +---------------+ +------------+--------+
2296 * | candidate blocks) | Blocks modified | Prefetched data
2297 * +--------------------+ by to_ds since +------------v----+
2298 * ancestor_zb | Main Thread | File Descriptor
2299 * | Sends data over +->(to zfs receive)
2301 * +-----------------+
2303 * The final case is a simple zfs full or incremental send. The to_ds traversal
2304 * thread behaves the same as always. The redact list thread is never started.
2305 * The send merge thread takes all the blocks that the to_ds traversal thread
2306 * sends it, prefetches the data, and sends the blocks on to the main thread.
2307 * The main thread sends the data over the wire.
2309 * To keep performance acceptable, we want to prefetch the data in the worker
2310 * threads. While the to_ds thread could simply use the TRAVERSE_PREFETCH
2311 * feature built into traverse_dataset, the combining and deletion of records
2312 * due to redaction and sends from redaction bookmarks mean that we could
2313 * issue many unnecessary prefetches. As a result, we only prefetch data
2314 * after we've determined that the record is not going to be redacted. To
2315 * prevent the prefetching from getting too far ahead of the main thread, the
2316 * blocking queues that are used for communication are capped not by the
2317 * number of entries in the queue, but by the sum of the size of the
2318 * prefetches associated with them. The limit on the amount of data that the
2319 * thread can prefetch beyond what the main thread has reached is controlled
2320 * by the global variable zfs_send_queue_length. In addition, to prevent poor
2321 * performance in the beginning of a send, we also limit the distance ahead
2322 * that the traversal threads can be. That distance is controlled by the
2323 * zfs_send_no_prefetch_queue_length tunable.
2325 * Note: Releases dp using the specified tag.
2328 dmu_send_impl(struct dmu_send_params
*dspp
)
2331 dmu_replay_record_t
*drr
;
2332 dmu_sendstatus_t
*dssp
;
2333 dmu_send_cookie_t dsc
= {0};
2335 uint64_t fromtxg
= dspp
->ancestor_zb
.zbm_creation_txg
;
2336 uint64_t featureflags
= 0;
2337 struct redact_list_thread_arg
*from_arg
;
2338 struct send_thread_arg
*to_arg
;
2339 struct redact_list_thread_arg
*rlt_arg
;
2340 struct send_merge_thread_arg
*smt_arg
;
2341 struct send_reader_thread_arg
*srt_arg
;
2342 struct send_range
*range
;
2343 redaction_list_t
*from_rl
= NULL
;
2344 redaction_list_t
*redact_rl
= NULL
;
2345 boolean_t resuming
= (dspp
->resumeobj
!= 0 || dspp
->resumeoff
!= 0);
2346 boolean_t book_resuming
= resuming
;
2348 dsl_dataset_t
*to_ds
= dspp
->to_ds
;
2349 zfs_bookmark_phys_t
*ancestor_zb
= &dspp
->ancestor_zb
;
2350 dsl_pool_t
*dp
= dspp
->dp
;
2351 void *tag
= dspp
->tag
;
2353 err
= dmu_objset_from_ds(to_ds
, &os
);
2355 dsl_pool_rele(dp
, tag
);
2360 * If this is a non-raw send of an encrypted ds, we can ensure that
2361 * the objset_phys_t is authenticated. This is safe because this is
2362 * either a snapshot or we have owned the dataset, ensuring that
2363 * it can't be modified.
2365 if (!dspp
->rawok
&& os
->os_encrypted
&&
2366 arc_is_unauthenticated(os
->os_phys_buf
)) {
2367 zbookmark_phys_t zb
;
2369 SET_BOOKMARK(&zb
, to_ds
->ds_object
, ZB_ROOT_OBJECT
,
2370 ZB_ROOT_LEVEL
, ZB_ROOT_BLKID
);
2371 err
= arc_untransform(os
->os_phys_buf
, os
->os_spa
,
2374 dsl_pool_rele(dp
, tag
);
2378 ASSERT0(arc_is_unauthenticated(os
->os_phys_buf
));
2381 if ((err
= setup_featureflags(dspp
, os
, &featureflags
)) != 0) {
2382 dsl_pool_rele(dp
, tag
);
2387 * If we're doing a redacted send, hold the bookmark's redaction list.
2389 if (dspp
->redactbook
!= NULL
) {
2390 err
= dsl_redaction_list_hold_obj(dp
,
2391 dspp
->redactbook
->zbm_redaction_obj
, FTAG
,
2394 dsl_pool_rele(dp
, tag
);
2395 return (SET_ERROR(EINVAL
));
2397 dsl_redaction_list_long_hold(dp
, redact_rl
, FTAG
);
2401 * If we're sending from a redaction bookmark, hold the redaction list
2402 * so that we can consider sending the redacted blocks.
2404 if (ancestor_zb
->zbm_redaction_obj
!= 0) {
2405 err
= dsl_redaction_list_hold_obj(dp
,
2406 ancestor_zb
->zbm_redaction_obj
, FTAG
, &from_rl
);
2408 if (redact_rl
!= NULL
) {
2409 dsl_redaction_list_long_rele(redact_rl
, FTAG
);
2410 dsl_redaction_list_rele(redact_rl
, FTAG
);
2412 dsl_pool_rele(dp
, tag
);
2413 return (SET_ERROR(EINVAL
));
2415 dsl_redaction_list_long_hold(dp
, from_rl
, FTAG
);
2418 dsl_dataset_long_hold(to_ds
, FTAG
);
2420 from_arg
= kmem_zalloc(sizeof (*from_arg
), KM_SLEEP
);
2421 to_arg
= kmem_zalloc(sizeof (*to_arg
), KM_SLEEP
);
2422 rlt_arg
= kmem_zalloc(sizeof (*rlt_arg
), KM_SLEEP
);
2423 smt_arg
= kmem_zalloc(sizeof (*smt_arg
), KM_SLEEP
);
2424 srt_arg
= kmem_zalloc(sizeof (*srt_arg
), KM_SLEEP
);
2426 drr
= create_begin_record(dspp
, os
, featureflags
);
2427 dssp
= setup_send_progress(dspp
);
2430 dsc
.dsc_dso
= dspp
->dso
;
2432 dsc
.dsc_off
= dspp
->off
;
2433 dsc
.dsc_toguid
= dsl_dataset_phys(to_ds
)->ds_guid
;
2434 dsc
.dsc_fromtxg
= fromtxg
;
2435 dsc
.dsc_pending_op
= PENDING_NONE
;
2436 dsc
.dsc_featureflags
= featureflags
;
2437 dsc
.dsc_resume_object
= dspp
->resumeobj
;
2438 dsc
.dsc_resume_offset
= dspp
->resumeoff
;
2440 dsl_pool_rele(dp
, tag
);
2442 void *payload
= NULL
;
2443 size_t payload_len
= 0;
2444 nvlist_t
*nvl
= fnvlist_alloc();
2447 * If we're doing a redacted send, we include the snapshots we're
2448 * redacted with respect to so that the target system knows what send
2449 * streams can be correctly received on top of this dataset. If we're
2450 * instead sending a redacted dataset, we include the snapshots that the
2451 * dataset was created with respect to.
2453 if (dspp
->redactbook
!= NULL
) {
2454 fnvlist_add_uint64_array(nvl
, BEGINNV_REDACT_SNAPS
,
2455 redact_rl
->rl_phys
->rlp_snaps
,
2456 redact_rl
->rl_phys
->rlp_num_snaps
);
2457 } else if (dsl_dataset_feature_is_active(to_ds
,
2458 SPA_FEATURE_REDACTED_DATASETS
)) {
2459 uint64_t *tods_guids
;
2461 VERIFY(dsl_dataset_get_uint64_array_feature(to_ds
,
2462 SPA_FEATURE_REDACTED_DATASETS
, &length
, &tods_guids
));
2463 fnvlist_add_uint64_array(nvl
, BEGINNV_REDACT_SNAPS
, tods_guids
,
2468 * If we're sending from a redaction bookmark, then we should retrieve
2469 * the guids of that bookmark so we can send them over the wire.
2471 if (from_rl
!= NULL
) {
2472 fnvlist_add_uint64_array(nvl
, BEGINNV_REDACT_FROM_SNAPS
,
2473 from_rl
->rl_phys
->rlp_snaps
,
2474 from_rl
->rl_phys
->rlp_num_snaps
);
2478 * If the snapshot we're sending from is redacted, include the redaction
2479 * list in the stream.
2481 if (dspp
->numfromredactsnaps
!= NUM_SNAPS_NOT_REDACTED
) {
2482 ASSERT3P(from_rl
, ==, NULL
);
2483 fnvlist_add_uint64_array(nvl
, BEGINNV_REDACT_FROM_SNAPS
,
2484 dspp
->fromredactsnaps
, (uint_t
)dspp
->numfromredactsnaps
);
2485 if (dspp
->numfromredactsnaps
> 0) {
2486 kmem_free(dspp
->fromredactsnaps
,
2487 dspp
->numfromredactsnaps
* sizeof (uint64_t));
2488 dspp
->fromredactsnaps
= NULL
;
2492 if (resuming
|| book_resuming
) {
2493 err
= setup_resume_points(dspp
, to_arg
, from_arg
,
2494 rlt_arg
, smt_arg
, resuming
, os
, redact_rl
, nvl
);
2499 if (featureflags
& DMU_BACKUP_FEATURE_RAW
) {
2500 uint64_t ivset_guid
= (ancestor_zb
!= NULL
) ?
2501 ancestor_zb
->zbm_ivset_guid
: 0;
2502 nvlist_t
*keynvl
= NULL
;
2503 ASSERT(os
->os_encrypted
);
2505 err
= dsl_crypto_populate_key_nvlist(os
, ivset_guid
,
2512 fnvlist_add_nvlist(nvl
, "crypt_keydata", keynvl
);
2513 fnvlist_free(keynvl
);
2516 if (!nvlist_empty(nvl
)) {
2517 payload
= fnvlist_pack(nvl
, &payload_len
);
2518 drr
->drr_payloadlen
= payload_len
;
2522 err
= dump_record(&dsc
, payload
, payload_len
);
2523 fnvlist_pack_free(payload
, payload_len
);
2529 setup_to_thread(to_arg
, os
, dssp
, fromtxg
, dspp
->rawok
);
2530 setup_from_thread(from_arg
, from_rl
, dssp
);
2531 setup_redact_list_thread(rlt_arg
, dspp
, redact_rl
, dssp
);
2532 setup_merge_thread(smt_arg
, dspp
, from_arg
, to_arg
, rlt_arg
, os
);
2533 setup_reader_thread(srt_arg
, dspp
, smt_arg
, featureflags
);
2535 range
= bqueue_dequeue(&srt_arg
->q
);
2536 while (err
== 0 && !range
->eos_marker
) {
2537 err
= do_dump(&dsc
, range
);
2538 range
= get_next_range(&srt_arg
->q
, range
);
2539 if (issig(JUSTLOOKING
) && issig(FORREAL
))
2540 err
= SET_ERROR(EINTR
);
2544 * If we hit an error or are interrupted, cancel our worker threads and
2545 * clear the queue of any pending records. The threads will pass the
2546 * cancel up the tree of worker threads, and each one will clean up any
2547 * pending records before exiting.
2550 srt_arg
->cancel
= B_TRUE
;
2551 while (!range
->eos_marker
) {
2552 range
= get_next_range(&srt_arg
->q
, range
);
2557 bqueue_destroy(&srt_arg
->q
);
2558 bqueue_destroy(&smt_arg
->q
);
2559 if (dspp
->redactbook
!= NULL
)
2560 bqueue_destroy(&rlt_arg
->q
);
2561 bqueue_destroy(&to_arg
->q
);
2562 bqueue_destroy(&from_arg
->q
);
2564 if (err
== 0 && srt_arg
->error
!= 0)
2565 err
= srt_arg
->error
;
2570 if (dsc
.dsc_pending_op
!= PENDING_NONE
)
2571 if (dump_record(&dsc
, NULL
, 0) != 0)
2572 err
= SET_ERROR(EINTR
);
2575 if (err
== EINTR
&& dsc
.dsc_err
!= 0)
2581 * Send the DRR_END record if this is not a saved stream.
2582 * Otherwise, the omitted DRR_END record will signal to
2583 * the receive side that the stream is incomplete.
2585 if (!dspp
->savedok
) {
2586 bzero(drr
, sizeof (dmu_replay_record_t
));
2587 drr
->drr_type
= DRR_END
;
2588 drr
->drr_u
.drr_end
.drr_checksum
= dsc
.dsc_zc
;
2589 drr
->drr_u
.drr_end
.drr_toguid
= dsc
.dsc_toguid
;
2591 if (dump_record(&dsc
, NULL
, 0) != 0)
2595 mutex_enter(&to_ds
->ds_sendstream_lock
);
2596 list_remove(&to_ds
->ds_sendstreams
, dssp
);
2597 mutex_exit(&to_ds
->ds_sendstream_lock
);
2599 VERIFY(err
!= 0 || (dsc
.dsc_sent_begin
&&
2600 (dsc
.dsc_sent_end
|| dspp
->savedok
)));
2602 kmem_free(drr
, sizeof (dmu_replay_record_t
));
2603 kmem_free(dssp
, sizeof (dmu_sendstatus_t
));
2604 kmem_free(from_arg
, sizeof (*from_arg
));
2605 kmem_free(to_arg
, sizeof (*to_arg
));
2606 kmem_free(rlt_arg
, sizeof (*rlt_arg
));
2607 kmem_free(smt_arg
, sizeof (*smt_arg
));
2608 kmem_free(srt_arg
, sizeof (*srt_arg
));
2610 dsl_dataset_long_rele(to_ds
, FTAG
);
2611 if (from_rl
!= NULL
) {
2612 dsl_redaction_list_long_rele(from_rl
, FTAG
);
2613 dsl_redaction_list_rele(from_rl
, FTAG
);
2615 if (redact_rl
!= NULL
) {
2616 dsl_redaction_list_long_rele(redact_rl
, FTAG
);
2617 dsl_redaction_list_rele(redact_rl
, FTAG
);
2624 dmu_send_obj(const char *pool
, uint64_t tosnap
, uint64_t fromsnap
,
2625 boolean_t embedok
, boolean_t large_block_ok
, boolean_t compressok
,
2626 boolean_t rawok
, boolean_t savedok
, int outfd
, offset_t
*off
,
2627 dmu_send_outparams_t
*dsop
)
2630 dsl_dataset_t
*fromds
;
2631 ds_hold_flags_t dsflags
= (rawok
) ? 0 : DS_HOLD_FLAG_DECRYPT
;
2632 struct dmu_send_params dspp
= {0};
2633 dspp
.embedok
= embedok
;
2634 dspp
.large_block_ok
= large_block_ok
;
2635 dspp
.compressok
= compressok
;
2641 dspp
.savedok
= savedok
;
2643 err
= dsl_pool_hold(pool
, FTAG
, &dspp
.dp
);
2647 err
= dsl_dataset_hold_obj_flags(dspp
.dp
, tosnap
, dsflags
, FTAG
,
2650 dsl_pool_rele(dspp
.dp
, FTAG
);
2654 if (fromsnap
!= 0) {
2655 err
= dsl_dataset_hold_obj_flags(dspp
.dp
, fromsnap
, dsflags
,
2658 dsl_dataset_rele_flags(dspp
.to_ds
, dsflags
, FTAG
);
2659 dsl_pool_rele(dspp
.dp
, FTAG
);
2662 dspp
.ancestor_zb
.zbm_guid
= dsl_dataset_phys(fromds
)->ds_guid
;
2663 dspp
.ancestor_zb
.zbm_creation_txg
=
2664 dsl_dataset_phys(fromds
)->ds_creation_txg
;
2665 dspp
.ancestor_zb
.zbm_creation_time
=
2666 dsl_dataset_phys(fromds
)->ds_creation_time
;
2668 if (dsl_dataset_is_zapified(fromds
)) {
2669 (void) zap_lookup(dspp
.dp
->dp_meta_objset
,
2670 fromds
->ds_object
, DS_FIELD_IVSET_GUID
, 8, 1,
2671 &dspp
.ancestor_zb
.zbm_ivset_guid
);
2674 /* See dmu_send for the reasons behind this. */
2675 uint64_t *fromredact
;
2677 if (!dsl_dataset_get_uint64_array_feature(fromds
,
2678 SPA_FEATURE_REDACTED_DATASETS
,
2679 &dspp
.numfromredactsnaps
,
2681 dspp
.numfromredactsnaps
= NUM_SNAPS_NOT_REDACTED
;
2682 } else if (dspp
.numfromredactsnaps
> 0) {
2683 uint64_t size
= dspp
.numfromredactsnaps
*
2685 dspp
.fromredactsnaps
= kmem_zalloc(size
, KM_SLEEP
);
2686 bcopy(fromredact
, dspp
.fromredactsnaps
, size
);
2689 if (!dsl_dataset_is_before(dspp
.to_ds
, fromds
, 0)) {
2690 err
= SET_ERROR(EXDEV
);
2692 dspp
.is_clone
= (dspp
.to_ds
->ds_dir
!=
2694 dsl_dataset_rele(fromds
, FTAG
);
2695 err
= dmu_send_impl(&dspp
);
2698 dspp
.numfromredactsnaps
= NUM_SNAPS_NOT_REDACTED
;
2699 err
= dmu_send_impl(&dspp
);
2701 dsl_dataset_rele(dspp
.to_ds
, FTAG
);
2706 dmu_send(const char *tosnap
, const char *fromsnap
, boolean_t embedok
,
2707 boolean_t large_block_ok
, boolean_t compressok
, boolean_t rawok
,
2708 boolean_t savedok
, uint64_t resumeobj
, uint64_t resumeoff
,
2709 const char *redactbook
, int outfd
, offset_t
*off
,
2710 dmu_send_outparams_t
*dsop
)
2713 ds_hold_flags_t dsflags
= (rawok
) ? 0 : DS_HOLD_FLAG_DECRYPT
;
2714 boolean_t owned
= B_FALSE
;
2715 dsl_dataset_t
*fromds
= NULL
;
2716 zfs_bookmark_phys_t book
= {0};
2717 struct dmu_send_params dspp
= {0};
2719 dspp
.tosnap
= tosnap
;
2720 dspp
.embedok
= embedok
;
2721 dspp
.large_block_ok
= large_block_ok
;
2722 dspp
.compressok
= compressok
;
2727 dspp
.resumeobj
= resumeobj
;
2728 dspp
.resumeoff
= resumeoff
;
2730 dspp
.savedok
= savedok
;
2732 if (fromsnap
!= NULL
&& strpbrk(fromsnap
, "@#") == NULL
)
2733 return (SET_ERROR(EINVAL
));
2735 err
= dsl_pool_hold(tosnap
, FTAG
, &dspp
.dp
);
2739 if (strchr(tosnap
, '@') == NULL
&& spa_writeable(dspp
.dp
->dp_spa
)) {
2741 * We are sending a filesystem or volume. Ensure
2742 * that it doesn't change by owning the dataset.
2747 * We are looking for the dataset that represents the
2748 * partially received send stream. If this stream was
2749 * received as a new snapshot of an existing dataset,
2750 * this will be saved in a hidden clone named
2751 * "<pool>/<dataset>/%recv". Otherwise, the stream
2752 * will be saved in the live dataset itself. In
2753 * either case we need to use dsl_dataset_own_force()
2754 * because the stream is marked as inconsistent,
2755 * which would normally make it unavailable to be
2758 char *name
= kmem_asprintf("%s/%s", tosnap
,
2760 err
= dsl_dataset_own_force(dspp
.dp
, name
, dsflags
,
2762 if (err
== ENOENT
) {
2763 err
= dsl_dataset_own_force(dspp
.dp
, tosnap
,
2764 dsflags
, FTAG
, &dspp
.to_ds
);
2768 err
= zap_lookup(dspp
.dp
->dp_meta_objset
,
2769 dspp
.to_ds
->ds_object
,
2770 DS_FIELD_RESUME_TOGUID
, 8, 1,
2775 err
= zap_lookup(dspp
.dp
->dp_meta_objset
,
2776 dspp
.to_ds
->ds_object
,
2777 DS_FIELD_RESUME_TONAME
, 1,
2778 sizeof (dspp
.saved_toname
),
2782 dsl_dataset_disown(dspp
.to_ds
, dsflags
, FTAG
);
2786 err
= dsl_dataset_own(dspp
.dp
, tosnap
, dsflags
,
2791 err
= dsl_dataset_hold_flags(dspp
.dp
, tosnap
, dsflags
, FTAG
,
2796 dsl_pool_rele(dspp
.dp
, FTAG
);
2800 if (redactbook
!= NULL
) {
2801 char path
[ZFS_MAX_DATASET_NAME_LEN
];
2802 (void) strlcpy(path
, tosnap
, sizeof (path
));
2803 char *at
= strchr(path
, '@');
2807 (void) snprintf(at
, sizeof (path
) - (at
- path
), "#%s",
2809 err
= dsl_bookmark_lookup(dspp
.dp
, path
,
2811 dspp
.redactbook
= &book
;
2816 dsl_pool_rele(dspp
.dp
, FTAG
);
2818 dsl_dataset_disown(dspp
.to_ds
, dsflags
, FTAG
);
2820 dsl_dataset_rele_flags(dspp
.to_ds
, dsflags
, FTAG
);
2824 if (fromsnap
!= NULL
) {
2825 zfs_bookmark_phys_t
*zb
= &dspp
.ancestor_zb
;
2827 if (strpbrk(tosnap
, "@#") != NULL
)
2828 fsnamelen
= strpbrk(tosnap
, "@#") - tosnap
;
2830 fsnamelen
= strlen(tosnap
);
2833 * If the fromsnap is in a different filesystem, then
2834 * mark the send stream as a clone.
2836 if (strncmp(tosnap
, fromsnap
, fsnamelen
) != 0 ||
2837 (fromsnap
[fsnamelen
] != '@' &&
2838 fromsnap
[fsnamelen
] != '#')) {
2839 dspp
.is_clone
= B_TRUE
;
2842 if (strchr(fromsnap
, '@') != NULL
) {
2843 err
= dsl_dataset_hold(dspp
.dp
, fromsnap
, FTAG
,
2847 ASSERT3P(fromds
, ==, NULL
);
2850 * We need to make a deep copy of the redact
2851 * snapshots of the from snapshot, because the
2852 * array will be freed when we evict from_ds.
2854 uint64_t *fromredact
;
2855 if (!dsl_dataset_get_uint64_array_feature(
2856 fromds
, SPA_FEATURE_REDACTED_DATASETS
,
2857 &dspp
.numfromredactsnaps
,
2859 dspp
.numfromredactsnaps
=
2860 NUM_SNAPS_NOT_REDACTED
;
2861 } else if (dspp
.numfromredactsnaps
> 0) {
2863 dspp
.numfromredactsnaps
*
2865 dspp
.fromredactsnaps
= kmem_zalloc(size
,
2867 bcopy(fromredact
, dspp
.fromredactsnaps
,
2870 if (!dsl_dataset_is_before(dspp
.to_ds
, fromds
,
2872 err
= SET_ERROR(EXDEV
);
2874 zb
->zbm_creation_txg
=
2875 dsl_dataset_phys(fromds
)->
2877 zb
->zbm_creation_time
=
2878 dsl_dataset_phys(fromds
)->
2881 dsl_dataset_phys(fromds
)->ds_guid
;
2882 zb
->zbm_redaction_obj
= 0;
2884 if (dsl_dataset_is_zapified(fromds
)) {
2886 dspp
.dp
->dp_meta_objset
,
2888 DS_FIELD_IVSET_GUID
, 8, 1,
2889 &zb
->zbm_ivset_guid
);
2892 dsl_dataset_rele(fromds
, FTAG
);
2895 dspp
.numfromredactsnaps
= NUM_SNAPS_NOT_REDACTED
;
2896 err
= dsl_bookmark_lookup(dspp
.dp
, fromsnap
, dspp
.to_ds
,
2898 if (err
== EXDEV
&& zb
->zbm_redaction_obj
!= 0 &&
2900 dsl_dataset_phys(dspp
.to_ds
)->ds_guid
)
2905 /* dmu_send_impl will call dsl_pool_rele for us. */
2906 err
= dmu_send_impl(&dspp
);
2908 dsl_pool_rele(dspp
.dp
, FTAG
);
2911 dspp
.numfromredactsnaps
= NUM_SNAPS_NOT_REDACTED
;
2912 err
= dmu_send_impl(&dspp
);
2915 dsl_dataset_disown(dspp
.to_ds
, dsflags
, FTAG
);
2917 dsl_dataset_rele_flags(dspp
.to_ds
, dsflags
, FTAG
);
2922 dmu_adjust_send_estimate_for_indirects(dsl_dataset_t
*ds
, uint64_t uncompressed
,
2923 uint64_t compressed
, boolean_t stream_compressed
, uint64_t *sizep
)
2928 * Assume that space (both on-disk and in-stream) is dominated by
2929 * data. We will adjust for indirect blocks and the copies property,
2930 * but ignore per-object space used (eg, dnodes and DRR_OBJECT records).
2933 uint64_t recordsize
;
2934 uint64_t record_count
;
2936 VERIFY0(dmu_objset_from_ds(ds
, &os
));
2938 /* Assume all (uncompressed) blocks are recordsize. */
2939 if (zfs_override_estimate_recordsize
!= 0) {
2940 recordsize
= zfs_override_estimate_recordsize
;
2941 } else if (os
->os_phys
->os_type
== DMU_OST_ZVOL
) {
2942 err
= dsl_prop_get_int_ds(ds
,
2943 zfs_prop_to_name(ZFS_PROP_VOLBLOCKSIZE
), &recordsize
);
2945 err
= dsl_prop_get_int_ds(ds
,
2946 zfs_prop_to_name(ZFS_PROP_RECORDSIZE
), &recordsize
);
2950 record_count
= uncompressed
/ recordsize
;
2953 * If we're estimating a send size for a compressed stream, use the
2954 * compressed data size to estimate the stream size. Otherwise, use the
2955 * uncompressed data size.
2957 size
= stream_compressed
? compressed
: uncompressed
;
2960 * Subtract out approximate space used by indirect blocks.
2961 * Assume most space is used by data blocks (non-indirect, non-dnode).
2962 * Assume no ditto blocks or internal fragmentation.
2964 * Therefore, space used by indirect blocks is sizeof(blkptr_t) per
2967 size
-= record_count
* sizeof (blkptr_t
);
2969 /* Add in the space for the record associated with each block. */
2970 size
+= record_count
* sizeof (dmu_replay_record_t
);
2978 dmu_send_estimate_fast(dsl_dataset_t
*origds
, dsl_dataset_t
*fromds
,
2979 zfs_bookmark_phys_t
*frombook
, boolean_t stream_compressed
,
2980 boolean_t saved
, uint64_t *sizep
)
2983 dsl_dataset_t
*ds
= origds
;
2984 uint64_t uncomp
, comp
;
2986 ASSERT(dsl_pool_config_held(origds
->ds_dir
->dd_pool
));
2987 ASSERT(fromds
== NULL
|| frombook
== NULL
);
2990 * If this is a saved send we may actually be sending
2991 * from the %recv clone used for resuming.
2994 objset_t
*mos
= origds
->ds_dir
->dd_pool
->dp_meta_objset
;
2996 char dsname
[ZFS_MAX_DATASET_NAME_LEN
+ 6];
2998 dsl_dataset_name(origds
, dsname
);
2999 (void) strcat(dsname
, "/");
3000 (void) strcat(dsname
, recv_clone_name
);
3002 err
= dsl_dataset_hold(origds
->ds_dir
->dd_pool
,
3004 if (err
!= ENOENT
&& err
!= 0) {
3006 } else if (err
== ENOENT
) {
3010 /* check that this dataset has partially received data */
3011 err
= zap_lookup(mos
, ds
->ds_object
,
3012 DS_FIELD_RESUME_TOGUID
, 8, 1, &guid
);
3014 err
= SET_ERROR(err
== ENOENT
? EINVAL
: err
);
3018 err
= zap_lookup(mos
, ds
->ds_object
,
3019 DS_FIELD_RESUME_TONAME
, 1, sizeof (dsname
), dsname
);
3021 err
= SET_ERROR(err
== ENOENT
? EINVAL
: err
);
3026 /* tosnap must be a snapshot or the target of a saved send */
3027 if (!ds
->ds_is_snapshot
&& ds
== origds
)
3028 return (SET_ERROR(EINVAL
));
3030 if (fromds
!= NULL
) {
3032 if (!fromds
->ds_is_snapshot
) {
3033 err
= SET_ERROR(EINVAL
);
3037 if (!dsl_dataset_is_before(ds
, fromds
, 0)) {
3038 err
= SET_ERROR(EXDEV
);
3042 err
= dsl_dataset_space_written(fromds
, ds
, &used
, &comp
,
3046 } else if (frombook
!= NULL
) {
3048 err
= dsl_dataset_space_written_bookmark(frombook
, ds
, &used
,
3053 uncomp
= dsl_dataset_phys(ds
)->ds_uncompressed_bytes
;
3054 comp
= dsl_dataset_phys(ds
)->ds_compressed_bytes
;
3057 err
= dmu_adjust_send_estimate_for_indirects(ds
, uncomp
, comp
,
3058 stream_compressed
, sizep
);
3060 * Add the size of the BEGIN and END records to the estimate.
3062 *sizep
+= 2 * sizeof (dmu_replay_record_t
);
3066 dsl_dataset_rele(ds
, FTAG
);
3071 ZFS_MODULE_PARAM(zfs_send
, zfs_send_
, corrupt_data
, INT
, ZMOD_RW
,
3072 "Allow sending corrupt data");
3074 ZFS_MODULE_PARAM(zfs_send
, zfs_send_
, queue_length
, INT
, ZMOD_RW
,
3075 "Maximum send queue length");
3077 ZFS_MODULE_PARAM(zfs_send
, zfs_send_
, unmodified_spill_blocks
, INT
, ZMOD_RW
,
3078 "Send unmodified spill blocks");
3080 ZFS_MODULE_PARAM(zfs_send
, zfs_send_
, no_prefetch_queue_length
, INT
, ZMOD_RW
,
3081 "Maximum send queue length for non-prefetch queues");
3083 ZFS_MODULE_PARAM(zfs_send
, zfs_send_
, queue_ff
, INT
, ZMOD_RW
,
3084 "Send queue fill fraction");
3086 ZFS_MODULE_PARAM(zfs_send
, zfs_send_
, no_prefetch_queue_ff
, INT
, ZMOD_RW
,
3087 "Send queue fill fraction for non-prefetch queues");
3089 ZFS_MODULE_PARAM(zfs_send
, zfs_
, override_estimate_recordsize
, INT
, ZMOD_RW
,
3090 "Override block size estimate with fixed size");