]> git.proxmox.com Git - mirror_zfs.git/blame - module/zfs/dmu_send.c
Fix typos in module/zfs/
[mirror_zfs.git] / module / zfs / dmu_send.c
CommitLineData
34dc7c2f
BB
1/*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21/*
428870ff 22 * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
8d35c149 23 * Copyright 2011 Nexenta Systems, Inc. All rights reserved.
30af21b0 24 * Copyright (c) 2011, 2018 by Delphix. All rights reserved.
788eb90c 25 * Copyright (c) 2014, Joyent, Inc. All rights reserved.
47dfff3b 26 * Copyright 2014 HybridCluster. All rights reserved.
b607405f 27 * Copyright 2016 RackTop Systems.
a0bd735a 28 * Copyright (c) 2016 Actifio, Inc. All rights reserved.
8d35c149 29 */
34dc7c2f 30
34dc7c2f
BB
31#include <sys/dmu.h>
32#include <sys/dmu_impl.h>
33#include <sys/dmu_tx.h>
34#include <sys/dbuf.h>
35#include <sys/dnode.h>
36#include <sys/zfs_context.h>
37#include <sys/dmu_objset.h>
38#include <sys/dmu_traverse.h>
39#include <sys/dsl_dataset.h>
40#include <sys/dsl_dir.h>
428870ff 41#include <sys/dsl_prop.h>
34dc7c2f
BB
42#include <sys/dsl_pool.h>
43#include <sys/dsl_synctask.h>
044baf00 44#include <sys/spa_impl.h>
34dc7c2f
BB
45#include <sys/zfs_ioctl.h>
46#include <sys/zap.h>
47#include <sys/zio_checksum.h>
428870ff
BB
48#include <sys/zfs_znode.h>
49#include <zfs_fletcher.h>
50#include <sys/avl.h>
51#include <sys/ddt.h>
572e2857 52#include <sys/zfs_onexit.h>
13fe0198
MA
53#include <sys/dmu_send.h>
54#include <sys/dsl_destroy.h>
9b67f605 55#include <sys/blkptr.h>
da536844 56#include <sys/dsl_bookmark.h>
9b67f605 57#include <sys/zfeature.h>
fcff0f35 58#include <sys/bqueue.h>
a0bd735a 59#include <sys/zvol.h>
f74b821a 60#include <sys/policy.h>
30af21b0
PD
61#include <sys/objlist.h>
62#ifdef _KERNEL
63#include <sys/zfs_vfsops.h>
64#endif
34dc7c2f 65
330d06f9
MA
66/* Set this tunable to TRUE to replace corrupt data with 0x2f5baddb10c */
67int zfs_send_corrupt_data = B_FALSE;
30af21b0
PD
68/*
69 * This tunable controls the amount of data (measured in bytes) that will be
70 * prefetched by zfs send. If the main thread is blocking on reads that haven't
71 * completed, this variable might need to be increased. If instead the main
72 * thread is issuing new reads because the prefetches have fallen out of the
73 * cache, this may need to be decreased.
74 */
3b0d9928 75int zfs_send_queue_length = SPA_MAXBLOCKSIZE;
30af21b0
PD
76/*
77 * This tunable controls the length of the queues that zfs send worker threads
78 * use to communicate. If the send_main_thread is blocking on these queues,
79 * this variable may need to be increased. If there is a significant slowdown
80 * at the start of a send as these threads consume all the available IO
81 * resources, this variable may need to be decreased.
82 */
83int zfs_send_no_prefetch_queue_length = 1024 * 1024;
84/*
85 * These tunables control the fill fraction of the queues by zfs send. The fill
86 * fraction controls the frequency with which threads have to be cv_signaled.
87 * If a lot of cpu time is being spent on cv_signal, then these should be tuned
88 * down. If the queues empty before the signalled thread can catch up, then
89 * these should be tuned up.
90 */
91int zfs_send_queue_ff = 20;
92int zfs_send_no_prefetch_queue_ff = 20;
93
94/*
95 * Use this to override the recordsize calculation for fast zfs send estimates.
96 */
97int zfs_override_estimate_recordsize = 0;
98
b607405f
AS
99/* Set this tunable to FALSE to disable setting of DRR_FLAG_FREERECORDS */
100int zfs_send_set_freerecords_bit = B_TRUE;
30af21b0 101
caf9dd20
BB
102/* Set this tunable to FALSE is disable sending unmodified spill blocks. */
103int zfs_send_unmodified_spill_blocks = B_TRUE;
330d06f9 104
30af21b0
PD
105static inline boolean_t
106overflow_multiply(uint64_t a, uint64_t b, uint64_t *c)
107{
108 uint64_t temp = a * b;
109 if (b != 0 && temp / b != a)
110 return (B_FALSE);
111 *c = temp;
112 return (B_TRUE);
113}
114
ca0845d5 115/*
30af21b0
PD
116 * Return B_TRUE and modifies *out to the span if the span is less than 2^64,
117 * returns B_FALSE otherwise.
ca0845d5 118 */
30af21b0
PD
119static inline boolean_t
120bp_span(uint32_t datablksz, uint8_t indblkshift, uint64_t level, uint64_t *out)
121{
122 uint64_t spanb = bp_span_in_blocks(indblkshift, level);
123 return (overflow_multiply(spanb, datablksz, out));
124}
fcff0f35
PD
125
126struct send_thread_arg {
127 bqueue_t q;
128 dsl_dataset_t *ds; /* Dataset to traverse */
30af21b0
PD
129 redaction_list_t *redaction_list;
130 struct send_redact_record *current_record;
fcff0f35
PD
131 uint64_t fromtxg; /* Traverse from this txg */
132 int flags; /* flags to pass to traverse_dataset */
133 int error_code;
134 boolean_t cancel;
47dfff3b 135 zbookmark_phys_t resume;
30af21b0
PD
136 objlist_t *deleted_objs;
137 uint64_t *num_blocks_visited;
fcff0f35
PD
138};
139
30af21b0
PD
140struct redact_list_thread_arg {
141 boolean_t cancel;
142 bqueue_t q;
143 zbookmark_phys_t resume;
144 redaction_list_t *rl;
145 boolean_t mark_redact;
146 int error_code;
147 uint64_t *num_blocks_visited;
fcff0f35
PD
148};
149
30af21b0
PD
150/*
151 * A wrapper around struct redact_block so it can be stored in a list_t.
152 */
153struct redact_block_list_node {
154 redact_block_phys_t block;
155 list_node_t node;
156};
caf9dd20 157
30af21b0
PD
158struct redact_bookmark_info {
159 redact_block_phys_t rbi_furthest[TXG_SIZE];
160 /* Lists of struct redact_block_list_node. */
161 list_t rbi_blocks[TXG_SIZE];
162 boolean_t rbi_synctasc_txg[TXG_SIZE];
163 uint64_t rbi_latest_synctask_txg;
164 redaction_list_t *rbi_redaction_list;
165};
f8866f8a 166
30af21b0
PD
167struct send_merge_thread_arg {
168 bqueue_t q;
169 objset_t *os;
170 struct redact_list_thread_arg *from_arg;
171 struct send_thread_arg *to_arg;
172 struct redact_list_thread_arg *redact_arg;
173 int error;
174 boolean_t cancel;
175 struct redact_bookmark_info rbi;
f8866f8a 176 /*
30af21b0
PD
177 * If we're resuming a redacted send, then the object/offset from the
178 * resume token may be different from the object/offset that we have
179 * updated the bookmark to. resume_redact_zb will store the earlier of
180 * the two object/offset pairs, and bookmark_before will be B_TRUE if
181 * resume_redact_zb has the object/offset for resuming the redaction
182 * bookmark, and B_FALSE if resume_redact_zb is storing the
183 * object/offset from the resume token.
f8866f8a 184 */
30af21b0
PD
185 zbookmark_phys_t resume_redact_zb;
186 boolean_t bookmark_before;
187};
f8866f8a 188
30af21b0
PD
189struct send_range {
190 boolean_t eos_marker; /* Marks the end of the stream */
191 uint64_t object;
192 uint64_t start_blkid;
193 uint64_t end_blkid;
194 bqueue_node_t ln;
195 enum type {DATA, HOLE, OBJECT, OBJECT_RANGE, REDACT,
196 PREVIOUSLY_REDACTED} type;
197 union {
198 struct srd {
199 dmu_object_type_t obj_type;
200 uint32_t datablksz;
201 blkptr_t bp;
202 } data;
203 struct srh {
204 uint32_t datablksz;
205 } hole;
206 struct sro {
207 /*
208 * This is a pointer because embedding it in the
209 * struct causes these structures to be massively larger
210 * for all range types; this makes the code much less
211 * memory efficient.
212 */
213 dnode_phys_t *dnp;
214 blkptr_t bp;
215 } object;
216 struct srr {
217 uint32_t datablksz;
218 } redact;
219 struct sror {
220 blkptr_t bp;
221 } object_range;
222 } sru;
223};
37abac6d 224
30af21b0
PD
225/*
226 * The list of data whose inclusion in a send stream can be pending from
227 * one call to backup_cb to another. Multiple calls to dump_free(),
228 * dump_freeobjects(), and dump_redact() can be aggregated into a single
229 * DRR_FREE, DRR_FREEOBJECTS, or DRR_REDACT replay record.
230 */
231typedef enum {
232 PENDING_NONE,
233 PENDING_FREE,
234 PENDING_FREEOBJECTS,
235 PENDING_REDACT
236} dmu_pendop_t;
237
238typedef struct dmu_send_cookie {
239 dmu_replay_record_t *dsc_drr;
240 dmu_send_outparams_t *dsc_dso;
241 offset_t *dsc_off;
242 objset_t *dsc_os;
243 zio_cksum_t dsc_zc;
244 uint64_t dsc_toguid;
245 uint64_t dsc_fromtxg;
246 int dsc_err;
247 dmu_pendop_t dsc_pending_op;
248 uint64_t dsc_featureflags;
249 uint64_t dsc_last_data_object;
250 uint64_t dsc_last_data_offset;
251 uint64_t dsc_resume_object;
252 uint64_t dsc_resume_offset;
253 boolean_t dsc_sent_begin;
254 boolean_t dsc_sent_end;
255} dmu_send_cookie_t;
256
257static int do_dump(dmu_send_cookie_t *dscp, struct send_range *range);
044baf00 258
30af21b0
PD
259static void
260range_free(struct send_range *range)
044baf00 261{
30af21b0
PD
262 if (range->type == OBJECT) {
263 size_t size = sizeof (dnode_phys_t) *
264 (range->sru.object.dnp->dn_extra_slots + 1);
265 kmem_free(range->sru.object.dnp, size);
266 }
267 kmem_free(range, sizeof (*range));
34dc7c2f
BB
268}
269
37f8a883
MA
270/*
271 * For all record types except BEGIN, fill in the checksum (overlaid in
272 * drr_u.drr_checksum.drr_checksum). The checksum verifies everything
273 * up to the start of the checksum itself.
274 */
275static int
30af21b0 276dump_record(dmu_send_cookie_t *dscp, void *payload, int payload_len)
37f8a883 277{
30af21b0 278 dmu_send_outparams_t *dso = dscp->dsc_dso;
37f8a883
MA
279 ASSERT3U(offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),
280 ==, sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t));
30af21b0 281 (void) fletcher_4_incremental_native(dscp->dsc_drr,
37f8a883 282 offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),
30af21b0
PD
283 &dscp->dsc_zc);
284 if (dscp->dsc_drr->drr_type == DRR_BEGIN) {
285 dscp->dsc_sent_begin = B_TRUE;
51907a31 286 } else {
30af21b0 287 ASSERT(ZIO_CHECKSUM_IS_ZERO(&dscp->dsc_drr->drr_u.
37f8a883 288 drr_checksum.drr_checksum));
30af21b0 289 dscp->dsc_drr->drr_u.drr_checksum.drr_checksum = dscp->dsc_zc;
37f8a883 290 }
30af21b0
PD
291 if (dscp->dsc_drr->drr_type == DRR_END) {
292 dscp->dsc_sent_end = B_TRUE;
51907a31 293 }
30af21b0 294 (void) fletcher_4_incremental_native(&dscp->dsc_drr->
37f8a883 295 drr_u.drr_checksum.drr_checksum,
30af21b0
PD
296 sizeof (zio_cksum_t), &dscp->dsc_zc);
297 *dscp->dsc_off += sizeof (dmu_replay_record_t);
298 dscp->dsc_err = dso->dso_outfunc(dscp->dsc_os, dscp->dsc_drr,
299 sizeof (dmu_replay_record_t), dso->dso_arg);
300 if (dscp->dsc_err != 0)
37f8a883
MA
301 return (SET_ERROR(EINTR));
302 if (payload_len != 0) {
30af21b0
PD
303 *dscp->dsc_off += payload_len;
304 /*
305 * payload is null when dso->ryrun == B_TRUE (i.e. when we're
306 * doing a send size calculation)
307 */
308 if (payload != NULL) {
309 (void) fletcher_4_incremental_native(
310 payload, payload_len, &dscp->dsc_zc);
311 }
312
313 /*
314 * The code does not rely on this (len being a multiple of 8).
315 * We keep this assertion because of the corresponding assertion
316 * in receive_read(). Keeping this assertion ensures that we do
317 * not inadvertently break backwards compatibility (causing the
318 * assertion in receive_read() to trigger on old software).
319 *
320 * Raw sends cannot be received on old software, and so can
321 * bypass this assertion.
322 */
323
324 ASSERT((payload_len % 8 == 0) ||
325 (dscp->dsc_featureflags & DMU_BACKUP_FEATURE_RAW));
326
327 dscp->dsc_err = dso->dso_outfunc(dscp->dsc_os, payload,
328 payload_len, dso->dso_arg);
329 if (dscp->dsc_err != 0)
37f8a883
MA
330 return (SET_ERROR(EINTR));
331 }
332 return (0);
333}
334
e6d3a843
PD
335/*
336 * Fill in the drr_free struct, or perform aggregation if the previous record is
337 * also a free record, and the two are adjacent.
338 *
339 * Note that we send free records even for a full send, because we want to be
340 * able to receive a full send as a clone, which requires a list of all the free
341 * and freeobject records that were generated on the source.
342 */
34dc7c2f 343static int
30af21b0 344dump_free(dmu_send_cookie_t *dscp, uint64_t object, uint64_t offset,
34dc7c2f
BB
345 uint64_t length)
346{
30af21b0 347 struct drr_free *drrf = &(dscp->dsc_drr->drr_u.drr_free);
428870ff 348
ea97f8ce
MA
349 /*
350 * When we receive a free record, dbuf_free_range() assumes
351 * that the receiving system doesn't have any dbufs in the range
352 * being freed. This is always true because there is a one-record
353 * constraint: we only send one WRITE record for any given
47dfff3b 354 * object,offset. We know that the one-record constraint is
ea97f8ce
MA
355 * true because we always send data in increasing order by
356 * object,offset.
357 *
358 * If the increasing-order constraint ever changes, we should find
359 * another way to assert that the one-record constraint is still
360 * satisfied.
361 */
30af21b0
PD
362 ASSERT(object > dscp->dsc_last_data_object ||
363 (object == dscp->dsc_last_data_object &&
364 offset > dscp->dsc_last_data_offset));
ea97f8ce 365
428870ff
BB
366 /*
367 * If there is a pending op, but it's not PENDING_FREE, push it out,
368 * since free block aggregation can only be done for blocks of the
369 * same type (i.e., DRR_FREE records can only be aggregated with
370 * other DRR_FREE records. DRR_FREEOBJECTS records can only be
30af21b0 371 * aggregated with other DRR_FREEOBJECTS records).
428870ff 372 */
30af21b0
PD
373 if (dscp->dsc_pending_op != PENDING_NONE &&
374 dscp->dsc_pending_op != PENDING_FREE) {
375 if (dump_record(dscp, NULL, 0) != 0)
2e528b49 376 return (SET_ERROR(EINTR));
30af21b0 377 dscp->dsc_pending_op = PENDING_NONE;
428870ff
BB
378 }
379
30af21b0 380 if (dscp->dsc_pending_op == PENDING_FREE) {
428870ff
BB
381 /*
382 * Check to see whether this free block can be aggregated
383 * with pending one.
384 */
385 if (drrf->drr_object == object && drrf->drr_offset +
386 drrf->drr_length == offset) {
30af21b0
PD
387 if (offset + length < offset || length == UINT64_MAX)
388 drrf->drr_length = UINT64_MAX;
ee45fbd8 389 else
390 drrf->drr_length += length;
428870ff
BB
391 return (0);
392 } else {
393 /* not a continuation. Push out pending record */
30af21b0 394 if (dump_record(dscp, NULL, 0) != 0)
2e528b49 395 return (SET_ERROR(EINTR));
30af21b0 396 dscp->dsc_pending_op = PENDING_NONE;
428870ff
BB
397 }
398 }
399 /* create a FREE record and make it pending */
30af21b0
PD
400 bzero(dscp->dsc_drr, sizeof (dmu_replay_record_t));
401 dscp->dsc_drr->drr_type = DRR_FREE;
428870ff
BB
402 drrf->drr_object = object;
403 drrf->drr_offset = offset;
ee45fbd8 404 if (offset + length < offset)
405 drrf->drr_length = DMU_OBJECT_END;
406 else
407 drrf->drr_length = length;
30af21b0 408 drrf->drr_toguid = dscp->dsc_toguid;
ee45fbd8 409 if (length == DMU_OBJECT_END) {
30af21b0 410 if (dump_record(dscp, NULL, 0) != 0)
2e528b49 411 return (SET_ERROR(EINTR));
428870ff 412 } else {
30af21b0
PD
413 dscp->dsc_pending_op = PENDING_FREE;
414 }
415
416 return (0);
417}
418
419/*
420 * Fill in the drr_redact struct, or perform aggregation if the previous record
421 * is also a redaction record, and the two are adjacent.
422 */
423static int
424dump_redact(dmu_send_cookie_t *dscp, uint64_t object, uint64_t offset,
425 uint64_t length)
426{
427 struct drr_redact *drrr = &dscp->dsc_drr->drr_u.drr_redact;
428
429 /*
430 * If there is a pending op, but it's not PENDING_REDACT, push it out,
431 * since free block aggregation can only be done for blocks of the
432 * same type (i.e., DRR_REDACT records can only be aggregated with
433 * other DRR_REDACT records).
434 */
435 if (dscp->dsc_pending_op != PENDING_NONE &&
436 dscp->dsc_pending_op != PENDING_REDACT) {
437 if (dump_record(dscp, NULL, 0) != 0)
438 return (SET_ERROR(EINTR));
439 dscp->dsc_pending_op = PENDING_NONE;
440 }
441
442 if (dscp->dsc_pending_op == PENDING_REDACT) {
443 /*
444 * Check to see whether this redacted block can be aggregated
445 * with pending one.
446 */
447 if (drrr->drr_object == object && drrr->drr_offset +
448 drrr->drr_length == offset) {
449 drrr->drr_length += length;
450 return (0);
451 } else {
452 /* not a continuation. Push out pending record */
453 if (dump_record(dscp, NULL, 0) != 0)
454 return (SET_ERROR(EINTR));
455 dscp->dsc_pending_op = PENDING_NONE;
456 }
428870ff 457 }
30af21b0
PD
458 /* create a REDACT record and make it pending */
459 bzero(dscp->dsc_drr, sizeof (dmu_replay_record_t));
460 dscp->dsc_drr->drr_type = DRR_REDACT;
461 drrr->drr_object = object;
462 drrr->drr_offset = offset;
463 drrr->drr_length = length;
464 drrr->drr_toguid = dscp->dsc_toguid;
465 dscp->dsc_pending_op = PENDING_REDACT;
34dc7c2f 466
34dc7c2f
BB
467 return (0);
468}
469
470static int
30af21b0 471dump_write(dmu_send_cookie_t *dscp, dmu_object_type_t type, uint64_t object,
b5256303 472 uint64_t offset, int lsize, int psize, const blkptr_t *bp, void *data)
34dc7c2f 473{
2aa34383 474 uint64_t payload_size;
30af21b0
PD
475 boolean_t raw = (dscp->dsc_featureflags & DMU_BACKUP_FEATURE_RAW);
476 struct drr_write *drrw = &(dscp->dsc_drr->drr_u.drr_write);
428870ff 477
ea97f8ce
MA
478 /*
479 * We send data in increasing object, offset order.
480 * See comment in dump_free() for details.
481 */
30af21b0
PD
482 ASSERT(object > dscp->dsc_last_data_object ||
483 (object == dscp->dsc_last_data_object &&
484 offset > dscp->dsc_last_data_offset));
485 dscp->dsc_last_data_object = object;
486 dscp->dsc_last_data_offset = offset + lsize - 1;
428870ff
BB
487
488 /*
489 * If there is any kind of pending aggregation (currently either
490 * a grouping of free objects or free blocks), push it out to
491 * the stream, since aggregation can't be done across operations
492 * of different types.
493 */
30af21b0
PD
494 if (dscp->dsc_pending_op != PENDING_NONE) {
495 if (dump_record(dscp, NULL, 0) != 0)
2e528b49 496 return (SET_ERROR(EINTR));
30af21b0 497 dscp->dsc_pending_op = PENDING_NONE;
428870ff 498 }
37f8a883 499 /* write a WRITE record */
30af21b0
PD
500 bzero(dscp->dsc_drr, sizeof (dmu_replay_record_t));
501 dscp->dsc_drr->drr_type = DRR_WRITE;
428870ff
BB
502 drrw->drr_object = object;
503 drrw->drr_type = type;
504 drrw->drr_offset = offset;
30af21b0 505 drrw->drr_toguid = dscp->dsc_toguid;
2aa34383
DK
506 drrw->drr_logical_size = lsize;
507
b5256303
TC
508 /* only set the compression fields if the buf is compressed or raw */
509 if (raw || lsize != psize) {
30af21b0
PD
510 ASSERT(raw || dscp->dsc_featureflags &
511 DMU_BACKUP_FEATURE_COMPRESSED);
2aa34383 512 ASSERT(!BP_IS_EMBEDDED(bp));
2aa34383 513 ASSERT3S(psize, >, 0);
2aa34383 514
b5256303
TC
515 if (raw) {
516 ASSERT(BP_IS_PROTECTED(bp));
517
518 /*
9b840763
TC
519 * This is a raw protected block so we need to pass
520 * along everything the receiving side will need to
521 * interpret this block, including the byteswap, salt,
522 * IV, and MAC.
b5256303 523 */
b5256303
TC
524 if (BP_SHOULD_BYTESWAP(bp))
525 drrw->drr_flags |= DRR_RAW_BYTESWAP;
526 zio_crypt_decode_params_bp(bp, drrw->drr_salt,
527 drrw->drr_iv);
528 zio_crypt_decode_mac_bp(bp, drrw->drr_mac);
529 } else {
530 /* this is a compressed block */
30af21b0 531 ASSERT(dscp->dsc_featureflags &
b5256303
TC
532 DMU_BACKUP_FEATURE_COMPRESSED);
533 ASSERT(!BP_SHOULD_BYTESWAP(bp));
534 ASSERT(!DMU_OT_IS_METADATA(BP_GET_TYPE(bp)));
535 ASSERT3U(BP_GET_COMPRESS(bp), !=, ZIO_COMPRESS_OFF);
536 ASSERT3S(lsize, >=, psize);
537 }
538
539 /* set fields common to compressed and raw sends */
2aa34383
DK
540 drrw->drr_compressiontype = BP_GET_COMPRESS(bp);
541 drrw->drr_compressed_size = psize;
542 payload_size = drrw->drr_compressed_size;
543 } else {
544 payload_size = drrw->drr_logical_size;
545 }
546
b5256303 547 if (bp == NULL || BP_IS_EMBEDDED(bp) || (BP_IS_PROTECTED(bp) && !raw)) {
9b67f605 548 /*
b5256303
TC
549 * There's no pre-computed checksum for partial-block writes,
550 * embedded BP's, or encrypted BP's that are being sent as
e1cfd73f 551 * plaintext, so (like fletcher4-checksummed blocks) userland
b5256303 552 * will have to compute a dedup-capable checksum itself.
9b67f605
MA
553 */
554 drrw->drr_checksumtype = ZIO_CHECKSUM_OFF;
555 } else {
556 drrw->drr_checksumtype = BP_GET_CHECKSUM(bp);
3c67d83a
TH
557 if (zio_checksum_table[drrw->drr_checksumtype].ci_flags &
558 ZCHECKSUM_FLAG_DEDUP)
b5256303 559 drrw->drr_flags |= DRR_CHECKSUM_DEDUP;
9b67f605
MA
560 DDK_SET_LSIZE(&drrw->drr_key, BP_GET_LSIZE(bp));
561 DDK_SET_PSIZE(&drrw->drr_key, BP_GET_PSIZE(bp));
562 DDK_SET_COMPRESS(&drrw->drr_key, BP_GET_COMPRESS(bp));
b5256303 563 DDK_SET_CRYPT(&drrw->drr_key, BP_IS_PROTECTED(bp));
9b67f605
MA
564 drrw->drr_key.ddk_cksum = bp->blk_cksum;
565 }
428870ff 566
30af21b0 567 if (dump_record(dscp, data, payload_size) != 0)
2e528b49 568 return (SET_ERROR(EINTR));
428870ff
BB
569 return (0);
570}
571
9b67f605 572static int
30af21b0 573dump_write_embedded(dmu_send_cookie_t *dscp, uint64_t object, uint64_t offset,
9b67f605
MA
574 int blksz, const blkptr_t *bp)
575{
576 char buf[BPE_PAYLOAD_SIZE];
577 struct drr_write_embedded *drrw =
30af21b0 578 &(dscp->dsc_drr->drr_u.drr_write_embedded);
9b67f605 579
30af21b0
PD
580 if (dscp->dsc_pending_op != PENDING_NONE) {
581 if (dump_record(dscp, NULL, 0) != 0)
ecb2b7dc 582 return (SET_ERROR(EINTR));
30af21b0 583 dscp->dsc_pending_op = PENDING_NONE;
9b67f605
MA
584 }
585
586 ASSERT(BP_IS_EMBEDDED(bp));
587
30af21b0
PD
588 bzero(dscp->dsc_drr, sizeof (dmu_replay_record_t));
589 dscp->dsc_drr->drr_type = DRR_WRITE_EMBEDDED;
9b67f605
MA
590 drrw->drr_object = object;
591 drrw->drr_offset = offset;
592 drrw->drr_length = blksz;
30af21b0 593 drrw->drr_toguid = dscp->dsc_toguid;
9b67f605
MA
594 drrw->drr_compression = BP_GET_COMPRESS(bp);
595 drrw->drr_etype = BPE_GET_ETYPE(bp);
596 drrw->drr_lsize = BPE_GET_LSIZE(bp);
597 drrw->drr_psize = BPE_GET_PSIZE(bp);
598
599 decode_embedded_bp_compressed(bp, buf);
600
30af21b0 601 if (dump_record(dscp, buf, P2ROUNDUP(drrw->drr_psize, 8)) != 0)
ecb2b7dc 602 return (SET_ERROR(EINTR));
9b67f605
MA
603 return (0);
604}
605
428870ff 606static int
30af21b0
PD
607dump_spill(dmu_send_cookie_t *dscp, const blkptr_t *bp, uint64_t object,
608 void *data)
428870ff 609{
30af21b0 610 struct drr_spill *drrs = &(dscp->dsc_drr->drr_u.drr_spill);
b5256303 611 uint64_t blksz = BP_GET_LSIZE(bp);
b0ee5946 612 uint64_t payload_size = blksz;
428870ff 613
30af21b0
PD
614 if (dscp->dsc_pending_op != PENDING_NONE) {
615 if (dump_record(dscp, NULL, 0) != 0)
2e528b49 616 return (SET_ERROR(EINTR));
30af21b0 617 dscp->dsc_pending_op = PENDING_NONE;
428870ff
BB
618 }
619
620 /* write a SPILL record */
30af21b0
PD
621 bzero(dscp->dsc_drr, sizeof (dmu_replay_record_t));
622 dscp->dsc_drr->drr_type = DRR_SPILL;
428870ff
BB
623 drrs->drr_object = object;
624 drrs->drr_length = blksz;
30af21b0 625 drrs->drr_toguid = dscp->dsc_toguid;
34dc7c2f 626
caf9dd20
BB
627 /* See comment in dump_dnode() for full details */
628 if (zfs_send_unmodified_spill_blocks &&
30af21b0 629 (bp->blk_birth <= dscp->dsc_fromtxg)) {
caf9dd20
BB
630 drrs->drr_flags |= DRR_SPILL_UNMODIFIED;
631 }
632
b5256303 633 /* handle raw send fields */
30af21b0 634 if (dscp->dsc_featureflags & DMU_BACKUP_FEATURE_RAW) {
9b840763
TC
635 ASSERT(BP_IS_PROTECTED(bp));
636
b5256303
TC
637 if (BP_SHOULD_BYTESWAP(bp))
638 drrs->drr_flags |= DRR_RAW_BYTESWAP;
639 drrs->drr_compressiontype = BP_GET_COMPRESS(bp);
640 drrs->drr_compressed_size = BP_GET_PSIZE(bp);
641 zio_crypt_decode_params_bp(bp, drrs->drr_salt, drrs->drr_iv);
642 zio_crypt_decode_mac_bp(bp, drrs->drr_mac);
b0ee5946 643 payload_size = drrs->drr_compressed_size;
b5256303
TC
644 }
645
30af21b0 646 if (dump_record(dscp, data, payload_size) != 0)
2e528b49 647 return (SET_ERROR(EINTR));
34dc7c2f
BB
648 return (0);
649}
650
651static int
30af21b0 652dump_freeobjects(dmu_send_cookie_t *dscp, uint64_t firstobj, uint64_t numobjs)
34dc7c2f 653{
30af21b0 654 struct drr_freeobjects *drrfo = &(dscp->dsc_drr->drr_u.drr_freeobjects);
829e95c4 655 uint64_t maxobj = DNODES_PER_BLOCK *
30af21b0 656 (DMU_META_DNODE(dscp->dsc_os)->dn_maxblkid + 1);
829e95c4
FG
657
658 /*
659 * ZoL < 0.7 does not handle large FREEOBJECTS records correctly,
660 * leading to zfs recv never completing. to avoid this issue, don't
661 * send FREEOBJECTS records for object IDs which cannot exist on the
662 * receiving side.
663 */
664 if (maxobj > 0) {
665 if (maxobj < firstobj)
666 return (0);
667
668 if (maxobj < firstobj + numobjs)
669 numobjs = maxobj - firstobj;
670 }
428870ff
BB
671
672 /*
673 * If there is a pending op, but it's not PENDING_FREEOBJECTS,
674 * push it out, since free block aggregation can only be done for
675 * blocks of the same type (i.e., DRR_FREE records can only be
676 * aggregated with other DRR_FREE records. DRR_FREEOBJECTS records
30af21b0 677 * can only be aggregated with other DRR_FREEOBJECTS records).
428870ff 678 */
30af21b0
PD
679 if (dscp->dsc_pending_op != PENDING_NONE &&
680 dscp->dsc_pending_op != PENDING_FREEOBJECTS) {
681 if (dump_record(dscp, NULL, 0) != 0)
2e528b49 682 return (SET_ERROR(EINTR));
30af21b0 683 dscp->dsc_pending_op = PENDING_NONE;
428870ff 684 }
30af21b0
PD
685 if (numobjs == 0)
686 numobjs = UINT64_MAX - firstobj;
687
688 if (dscp->dsc_pending_op == PENDING_FREEOBJECTS) {
428870ff
BB
689 /*
690 * See whether this free object array can be aggregated
691 * with pending one
692 */
693 if (drrfo->drr_firstobj + drrfo->drr_numobjs == firstobj) {
694 drrfo->drr_numobjs += numobjs;
695 return (0);
696 } else {
697 /* can't be aggregated. Push out pending record */
30af21b0 698 if (dump_record(dscp, NULL, 0) != 0)
2e528b49 699 return (SET_ERROR(EINTR));
30af21b0 700 dscp->dsc_pending_op = PENDING_NONE;
428870ff
BB
701 }
702 }
703
34dc7c2f 704 /* write a FREEOBJECTS record */
30af21b0
PD
705 bzero(dscp->dsc_drr, sizeof (dmu_replay_record_t));
706 dscp->dsc_drr->drr_type = DRR_FREEOBJECTS;
428870ff
BB
707 drrfo->drr_firstobj = firstobj;
708 drrfo->drr_numobjs = numobjs;
30af21b0 709 drrfo->drr_toguid = dscp->dsc_toguid;
428870ff 710
30af21b0 711 dscp->dsc_pending_op = PENDING_FREEOBJECTS;
34dc7c2f 712
34dc7c2f
BB
713 return (0);
714}
715
716static int
30af21b0 717dump_dnode(dmu_send_cookie_t *dscp, const blkptr_t *bp, uint64_t object,
b5256303 718 dnode_phys_t *dnp)
34dc7c2f 719{
30af21b0 720 struct drr_object *drro = &(dscp->dsc_drr->drr_u.drr_object);
4807c0ba 721 int bonuslen;
428870ff 722
30af21b0 723 if (object < dscp->dsc_resume_object) {
47dfff3b
MA
724 /*
725 * Note: when resuming, we will visit all the dnodes in
726 * the block of dnodes that we are resuming from. In
727 * this case it's unnecessary to send the dnodes prior to
728 * the one we are resuming from. We should be at most one
729 * block's worth of dnodes behind the resume point.
730 */
30af21b0 731 ASSERT3U(dscp->dsc_resume_object - object, <,
47dfff3b
MA
732 1 << (DNODE_BLOCK_SHIFT - DNODE_SHIFT));
733 return (0);
734 }
735
34dc7c2f 736 if (dnp == NULL || dnp->dn_type == DMU_OT_NONE)
30af21b0 737 return (dump_freeobjects(dscp, object, 1));
34dc7c2f 738
30af21b0
PD
739 if (dscp->dsc_pending_op != PENDING_NONE) {
740 if (dump_record(dscp, NULL, 0) != 0)
2e528b49 741 return (SET_ERROR(EINTR));
30af21b0 742 dscp->dsc_pending_op = PENDING_NONE;
428870ff
BB
743 }
744
34dc7c2f 745 /* write an OBJECT record */
30af21b0
PD
746 bzero(dscp->dsc_drr, sizeof (dmu_replay_record_t));
747 dscp->dsc_drr->drr_type = DRR_OBJECT;
428870ff
BB
748 drro->drr_object = object;
749 drro->drr_type = dnp->dn_type;
750 drro->drr_bonustype = dnp->dn_bonustype;
751 drro->drr_blksz = dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT;
752 drro->drr_bonuslen = dnp->dn_bonuslen;
50c957f7 753 drro->drr_dn_slots = dnp->dn_extra_slots + 1;
428870ff
BB
754 drro->drr_checksumtype = dnp->dn_checksum;
755 drro->drr_compress = dnp->dn_compress;
30af21b0 756 drro->drr_toguid = dscp->dsc_toguid;
428870ff 757
30af21b0 758 if (!(dscp->dsc_featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS) &&
f1512ee6
MA
759 drro->drr_blksz > SPA_OLD_MAXBLOCKSIZE)
760 drro->drr_blksz = SPA_OLD_MAXBLOCKSIZE;
761
4807c0ba
TC
762 bonuslen = P2ROUNDUP(dnp->dn_bonuslen, 8);
763
30af21b0 764 if ((dscp->dsc_featureflags & DMU_BACKUP_FEATURE_RAW)) {
9b840763
TC
765 ASSERT(BP_IS_ENCRYPTED(bp));
766
b5256303
TC
767 if (BP_SHOULD_BYTESWAP(bp))
768 drro->drr_flags |= DRR_RAW_BYTESWAP;
769
770 /* needed for reconstructing dnp on recv side */
ae76f45c 771 drro->drr_maxblkid = dnp->dn_maxblkid;
b5256303
TC
772 drro->drr_indblkshift = dnp->dn_indblkshift;
773 drro->drr_nlevels = dnp->dn_nlevels;
774 drro->drr_nblkptr = dnp->dn_nblkptr;
775
776 /*
777 * Since we encrypt the entire bonus area, the (raw) part
4807c0ba 778 * beyond the bonuslen is actually nonzero, so we need
b5256303
TC
779 * to send it.
780 */
781 if (bonuslen != 0) {
782 drro->drr_raw_bonuslen = DN_MAX_BONUS_LEN(dnp);
783 bonuslen = drro->drr_raw_bonuslen;
784 }
37f8a883 785 }
34dc7c2f 786
caf9dd20
BB
787 /*
788 * DRR_OBJECT_SPILL is set for every dnode which references a
30af21b0 789 * spill block. This allows the receiving pool to definitively
caf9dd20
BB
790 * determine when a spill block should be kept or freed.
791 */
792 if (dnp->dn_flags & DNODE_FLAG_SPILL_BLKPTR)
793 drro->drr_flags |= DRR_OBJECT_SPILL;
794
30af21b0 795 if (dump_record(dscp, DN_BONUS(dnp), bonuslen) != 0)
b5256303
TC
796 return (SET_ERROR(EINTR));
797
ea97f8ce 798 /* Free anything past the end of the file. */
30af21b0 799 if (dump_free(dscp, object, (dnp->dn_maxblkid + 1) *
ee45fbd8 800 (dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT), DMU_OBJECT_END) != 0)
2e528b49 801 return (SET_ERROR(EINTR));
caf9dd20
BB
802
803 /*
30af21b0 804 * Send DRR_SPILL records for unmodified spill blocks. This is useful
caf9dd20
BB
805 * because changing certain attributes of the object (e.g. blocksize)
806 * can cause old versions of ZFS to incorrectly remove a spill block.
807 * Including these records in the stream forces an up to date version
808 * to always be written ensuring they're never lost. Current versions
809 * of the code which understand the DRR_FLAG_SPILL_BLOCK feature can
810 * ignore these unmodified spill blocks.
811 */
812 if (zfs_send_unmodified_spill_blocks &&
813 (dnp->dn_flags & DNODE_FLAG_SPILL_BLKPTR) &&
30af21b0
PD
814 (DN_SPILL_BLKPTR(dnp)->blk_birth <= dscp->dsc_fromtxg)) {
815 struct send_range record;
816 blkptr_t *bp = DN_SPILL_BLKPTR(dnp);
caf9dd20 817
30af21b0
PD
818 bzero(&record, sizeof (struct send_range));
819 record.type = DATA;
820 record.object = object;
caf9dd20 821 record.eos_marker = B_FALSE;
30af21b0
PD
822 record.start_blkid = DMU_SPILL_BLKID;
823 record.end_blkid = record.start_blkid + 1;
824 record.sru.data.bp = *bp;
825 record.sru.data.obj_type = dnp->dn_type;
826 record.sru.data.datablksz = BP_GET_LSIZE(bp);
caf9dd20 827
30af21b0 828 if (do_dump(dscp, &record) != 0)
caf9dd20
BB
829 return (SET_ERROR(EINTR));
830 }
831
30af21b0 832 if (dscp->dsc_err != 0)
2e528b49 833 return (SET_ERROR(EINTR));
caf9dd20 834
34dc7c2f
BB
835 return (0);
836}
837
b5256303 838static int
30af21b0
PD
839dump_object_range(dmu_send_cookie_t *dscp, const blkptr_t *bp,
840 uint64_t firstobj, uint64_t numslots)
b5256303
TC
841{
842 struct drr_object_range *drror =
30af21b0 843 &(dscp->dsc_drr->drr_u.drr_object_range);
b5256303
TC
844
845 /* we only use this record type for raw sends */
846 ASSERT(BP_IS_PROTECTED(bp));
30af21b0 847 ASSERT(dscp->dsc_featureflags & DMU_BACKUP_FEATURE_RAW);
b5256303
TC
848 ASSERT3U(BP_GET_COMPRESS(bp), ==, ZIO_COMPRESS_OFF);
849 ASSERT3U(BP_GET_TYPE(bp), ==, DMU_OT_DNODE);
850 ASSERT0(BP_GET_LEVEL(bp));
851
30af21b0
PD
852 if (dscp->dsc_pending_op != PENDING_NONE) {
853 if (dump_record(dscp, NULL, 0) != 0)
b5256303 854 return (SET_ERROR(EINTR));
30af21b0 855 dscp->dsc_pending_op = PENDING_NONE;
b5256303
TC
856 }
857
30af21b0
PD
858 bzero(dscp->dsc_drr, sizeof (dmu_replay_record_t));
859 dscp->dsc_drr->drr_type = DRR_OBJECT_RANGE;
b5256303
TC
860 drror->drr_firstobj = firstobj;
861 drror->drr_numslots = numslots;
30af21b0 862 drror->drr_toguid = dscp->dsc_toguid;
b5256303
TC
863 if (BP_SHOULD_BYTESWAP(bp))
864 drror->drr_flags |= DRR_RAW_BYTESWAP;
865 zio_crypt_decode_params_bp(bp, drror->drr_salt, drror->drr_iv);
866 zio_crypt_decode_mac_bp(bp, drror->drr_mac);
867
30af21b0 868 if (dump_record(dscp, NULL, 0) != 0)
b5256303
TC
869 return (SET_ERROR(EINTR));
870 return (0);
871}
872
9b67f605 873static boolean_t
30af21b0 874send_do_embed(dmu_send_cookie_t *dscp, const blkptr_t *bp)
9b67f605
MA
875{
876 if (!BP_IS_EMBEDDED(bp))
877 return (B_FALSE);
878
879 /*
880 * Compression function must be legacy, or explicitly enabled.
881 */
882 if ((BP_GET_COMPRESS(bp) >= ZIO_COMPRESS_LEGACY_FUNCTIONS &&
30af21b0 883 !(dscp->dsc_featureflags & DMU_BACKUP_FEATURE_LZ4)))
9b67f605
MA
884 return (B_FALSE);
885
886 /*
887 * Embed type must be explicitly enabled.
888 */
889 switch (BPE_GET_ETYPE(bp)) {
890 case BP_EMBEDDED_TYPE_DATA:
30af21b0 891 if (dscp->dsc_featureflags & DMU_BACKUP_FEATURE_EMBED_DATA)
9b67f605
MA
892 return (B_TRUE);
893 break;
894 default:
895 return (B_FALSE);
896 }
897 return (B_FALSE);
898}
899
fcff0f35
PD
900/*
901 * This function actually handles figuring out what kind of record needs to be
902 * dumped, reading the data (which has hopefully been prefetched), and calling
903 * the appropriate helper function.
904 */
34dc7c2f 905static int
30af21b0 906do_dump(dmu_send_cookie_t *dscp, struct send_range *range)
34dc7c2f 907{
34dc7c2f 908 int err = 0;
30af21b0
PD
909 switch (range->type) {
910 case OBJECT:
911 err = dump_dnode(dscp, &range->sru.object.bp, range->object,
912 range->sru.object.dnp);
913 return (err);
914 case OBJECT_RANGE: {
915 ASSERT3U(range->start_blkid + 1, ==, range->end_blkid);
916 if (!(dscp->dsc_featureflags & DMU_BACKUP_FEATURE_RAW)) {
917 return (0);
b5256303 918 }
30af21b0
PD
919 uint64_t epb = BP_GET_LSIZE(&range->sru.object_range.bp) >>
920 DNODE_SHIFT;
921 uint64_t firstobj = range->start_blkid * epb;
922 err = dump_object_range(dscp, &range->sru.object_range.bp,
923 firstobj, epb);
924 break;
925 }
926 case REDACT: {
927 struct srr *srrp = &range->sru.redact;
928 err = dump_redact(dscp, range->object, range->start_blkid *
929 srrp->datablksz, (range->end_blkid - range->start_blkid) *
930 srrp->datablksz);
931 return (err);
932 }
933 case DATA: {
934 struct srd *srdp = &range->sru.data;
935 blkptr_t *bp = &srdp->bp;
936 spa_t *spa =
937 dmu_objset_spa(dscp->dsc_os);
938
939 ASSERT3U(srdp->datablksz, ==, BP_GET_LSIZE(bp));
940 ASSERT3U(range->start_blkid + 1, ==, range->end_blkid);
941 if (BP_GET_TYPE(bp) == DMU_OT_SA) {
942 arc_flags_t aflags = ARC_FLAG_WAIT;
943 enum zio_flag zioflags = ZIO_FLAG_CANFAIL;
944
945 if (dscp->dsc_featureflags & DMU_BACKUP_FEATURE_RAW) {
946 ASSERT(BP_IS_PROTECTED(bp));
947 zioflags |= ZIO_FLAG_RAW;
948 }
b5256303 949
30af21b0
PD
950 arc_buf_t *abuf;
951 zbookmark_phys_t zb;
952 ASSERT3U(range->start_blkid, ==, DMU_SPILL_BLKID);
953 zb.zb_objset = dmu_objset_id(dscp->dsc_os);
954 zb.zb_object = range->object;
955 zb.zb_level = 0;
956 zb.zb_blkid = range->start_blkid;
957
958 if (!dscp->dsc_dso->dso_dryrun && arc_read(NULL, spa,
959 bp, arc_getbuf_func, &abuf, ZIO_PRIORITY_ASYNC_READ,
960 zioflags, &aflags, &zb) != 0)
961 return (SET_ERROR(EIO));
b5256303 962
30af21b0
PD
963 err = dump_spill(dscp, bp, zb.zb_object, abuf->b_data);
964 arc_buf_destroy(abuf, &abuf);
965 return (err);
34dc7c2f 966 }
30af21b0
PD
967 if (send_do_embed(dscp, bp)) {
968 err = dump_write_embedded(dscp, range->object,
969 range->start_blkid * srdp->datablksz,
970 srdp->datablksz, bp);
971 return (err);
b5256303 972 }
30af21b0
PD
973 ASSERT(range->object > dscp->dsc_resume_object ||
974 (range->object == dscp->dsc_resume_object &&
975 range->start_blkid * srdp->datablksz >=
976 dscp->dsc_resume_offset));
fcff0f35 977 /* it's a level-0 block of a regular object */
2a432414 978 arc_flags_t aflags = ARC_FLAG_WAIT;
30af21b0 979 arc_buf_t *abuf = NULL;
fcff0f35 980 uint64_t offset;
2aa34383
DK
981
982 /*
983 * If we have large blocks stored on disk but the send flags
984 * don't allow us to send large blocks, we split the data from
985 * the arc buf into chunks.
986 */
30af21b0
PD
987 boolean_t split_large_blocks =
988 srdp->datablksz > SPA_OLD_MAXBLOCKSIZE &&
989 !(dscp->dsc_featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS);
b5256303
TC
990
991 /*
992 * Raw sends require that we always get raw data as it exists
993 * on disk, so we assert that we are not splitting blocks here.
994 */
995 boolean_t request_raw =
30af21b0 996 (dscp->dsc_featureflags & DMU_BACKUP_FEATURE_RAW) != 0;
b5256303 997
2aa34383
DK
998 /*
999 * We should only request compressed data from the ARC if all
1000 * the following are true:
1001 * - stream compression was requested
1002 * - we aren't splitting large blocks into smaller chunks
1003 * - the data won't need to be byteswapped before sending
1004 * - this isn't an embedded block
1005 * - this isn't metadata (if receiving on a different endian
1006 * system it can be byteswapped more easily)
1007 */
1008 boolean_t request_compressed =
30af21b0 1009 (dscp->dsc_featureflags & DMU_BACKUP_FEATURE_COMPRESSED) &&
2aa34383
DK
1010 !split_large_blocks && !BP_SHOULD_BYTESWAP(bp) &&
1011 !BP_IS_EMBEDDED(bp) && !DMU_OT_IS_METADATA(BP_GET_TYPE(bp));
428870ff 1012
b5256303
TC
1013 IMPLY(request_raw, !split_large_blocks);
1014 IMPLY(request_raw, BP_IS_PROTECTED(bp));
30af21b0
PD
1015 if (!dscp->dsc_dso->dso_dryrun) {
1016 enum zio_flag zioflags = ZIO_FLAG_CANFAIL;
1017
1018 ASSERT3U(srdp->datablksz, ==, BP_GET_LSIZE(bp));
1019
1020 if (request_raw)
1021 zioflags |= ZIO_FLAG_RAW;
1022 else if (request_compressed)
1023 zioflags |= ZIO_FLAG_RAW_COMPRESS;
1024 zbookmark_phys_t zb;
1025 zb.zb_objset = dmu_objset_id(dscp->dsc_os);
1026 zb.zb_object = range->object;
1027 zb.zb_level = 0;
1028 zb.zb_blkid = range->start_blkid;
1029
1030 err = arc_read(NULL, spa, bp, arc_getbuf_func, &abuf,
1031 ZIO_PRIORITY_ASYNC_READ, zioflags, &aflags, &zb);
1032 }
1033
1034 if (err != 0) {
1035 if (zfs_send_corrupt_data &&
1036 !dscp->dsc_dso->dso_dryrun) {
330d06f9 1037 /* Send a block filled with 0x"zfs badd bloc" */
2aa34383 1038 abuf = arc_alloc_buf(spa, &abuf, ARC_BUFC_DATA,
30af21b0 1039 srdp->datablksz);
a7004725 1040 uint64_t *ptr;
330d06f9 1041 for (ptr = abuf->b_data;
30af21b0
PD
1042 (char *)ptr < (char *)abuf->b_data +
1043 srdp->datablksz; ptr++)
dd26aa53 1044 *ptr = 0x2f5baddb10cULL;
330d06f9 1045 } else {
2e528b49 1046 return (SET_ERROR(EIO));
330d06f9
MA
1047 }
1048 }
428870ff 1049
30af21b0 1050 offset = range->start_blkid * srdp->datablksz;
f1512ee6 1051
2aa34383 1052 if (split_large_blocks) {
b5256303 1053 ASSERT0(arc_is_encrypted(abuf));
2aa34383
DK
1054 ASSERT3U(arc_get_compression(abuf), ==,
1055 ZIO_COMPRESS_OFF);
a7004725 1056 char *buf = abuf->b_data;
30af21b0
PD
1057 while (srdp->datablksz > 0 && err == 0) {
1058 int n = MIN(srdp->datablksz,
1059 SPA_OLD_MAXBLOCKSIZE);
1060 err = dump_write(dscp, srdp->obj_type,
1061 range->object, offset, n, n, NULL, buf);
f1512ee6
MA
1062 offset += n;
1063 buf += n;
30af21b0 1064 srdp->datablksz -= n;
f1512ee6
MA
1065 }
1066 } else {
30af21b0
PD
1067 int psize;
1068 if (abuf != NULL) {
1069 psize = arc_buf_size(abuf);
1070 if (arc_get_compression(abuf) !=
1071 ZIO_COMPRESS_OFF) {
1072 ASSERT3S(psize, ==, BP_GET_PSIZE(bp));
1073 }
1074 } else if (!request_compressed) {
1075 psize = srdp->datablksz;
1076 } else {
1077 psize = BP_GET_PSIZE(bp);
1078 }
1079 err = dump_write(dscp, srdp->obj_type, range->object,
1080 offset, srdp->datablksz, psize, bp,
1081 (abuf == NULL ? NULL : abuf->b_data));
f1512ee6 1082 }
30af21b0
PD
1083 if (abuf != NULL)
1084 arc_buf_destroy(abuf, &abuf);
1085 return (err);
34dc7c2f 1086 }
30af21b0
PD
1087 case HOLE: {
1088 struct srh *srhp = &range->sru.hole;
1089 if (range->object == DMU_META_DNODE_OBJECT) {
1090 uint32_t span = srhp->datablksz >> DNODE_SHIFT;
1091 uint64_t first_obj = range->start_blkid * span;
1092 uint64_t numobj = range->end_blkid * span - first_obj;
1093 return (dump_freeobjects(dscp, first_obj, numobj));
1094 }
1095 uint64_t offset = 0;
1096
1097 /*
1098 * If this multiply overflows, we don't need to send this block.
1099 * Even if it has a birth time, it can never not be a hole, so
1100 * we don't need to send records for it.
1101 */
1102 if (!overflow_multiply(range->start_blkid, srhp->datablksz,
1103 &offset)) {
1104 return (0);
1105 }
1106 uint64_t len = 0;
34dc7c2f 1107
30af21b0
PD
1108 if (!overflow_multiply(range->end_blkid, srhp->datablksz, &len))
1109 len = UINT64_MAX;
1110 len = len - offset;
1111 return (dump_free(dscp, range->object, offset, len));
1112 }
1113 default:
1114 panic("Invalid range type in do_dump: %d", range->type);
1115 }
34dc7c2f
BB
1116 return (err);
1117}
1118
30af21b0
PD
1119struct send_range *
1120range_alloc(enum type type, uint64_t object, uint64_t start_blkid,
1121 uint64_t end_blkid, boolean_t eos)
fcff0f35 1122{
30af21b0
PD
1123 struct send_range *range = kmem_alloc(sizeof (*range), KM_SLEEP);
1124 range->type = type;
1125 range->object = object;
1126 range->start_blkid = start_blkid;
1127 range->end_blkid = end_blkid;
1128 range->eos_marker = eos;
1129 return (range);
fcff0f35
PD
1130}
1131
1132/*
30af21b0
PD
1133 * This is the callback function to traverse_dataset that acts as a worker
1134 * thread for dmu_send_impl.
6f1ffb06 1135 */
30af21b0 1136/*ARGSUSED*/
13fe0198 1137static int
30af21b0
PD
1138send_cb(spa_t *spa, zilog_t *zilog, const blkptr_t *bp,
1139 const zbookmark_phys_t *zb, const struct dnode_phys *dnp, void *arg)
34dc7c2f 1140{
30af21b0
PD
1141 struct send_thread_arg *sta = arg;
1142 struct send_range *record;
34dc7c2f 1143
30af21b0
PD
1144 ASSERT(zb->zb_object == DMU_META_DNODE_OBJECT ||
1145 zb->zb_object >= sta->resume.zb_object);
1146 ASSERT3P(sta->ds, !=, NULL);
34dc7c2f 1147
b5256303 1148 /*
30af21b0
PD
1149 * All bps of an encrypted os should have the encryption bit set.
1150 * If this is not true it indicates tampering and we report an error.
b5256303 1151 */
30af21b0
PD
1152 objset_t *os;
1153 VERIFY0(dmu_objset_from_ds(sta->ds, &os));
1154 if (os->os_encrypted &&
1155 !BP_IS_HOLE(bp) && !BP_USES_CRYPT(bp)) {
1156 spa_log_error(spa, zb);
1157 zfs_panic_recover("unencrypted block in encrypted "
1158 "object set %llu", sta->ds->ds_object);
1159 return (SET_ERROR(EIO));
1160 }
a2c2ed1b 1161
30af21b0
PD
1162 if (sta->cancel)
1163 return (SET_ERROR(EINTR));
1164 if (zb->zb_object != DMU_META_DNODE_OBJECT &&
1165 DMU_OBJECT_IS_SPECIAL(zb->zb_object))
1166 return (0);
1167 atomic_inc_64(sta->num_blocks_visited);
b5256303 1168
30af21b0
PD
1169 if (zb->zb_level == ZB_DNODE_LEVEL) {
1170 if (zb->zb_object == DMU_META_DNODE_OBJECT)
1171 return (0);
1172 record = range_alloc(OBJECT, zb->zb_object, 0, 0, B_FALSE);
1173 record->sru.object.bp = *bp;
1174 size_t size = sizeof (*dnp) * (dnp->dn_extra_slots + 1);
1175 record->sru.object.dnp = kmem_alloc(size, KM_SLEEP);
1176 bcopy(dnp, record->sru.object.dnp, size);
1177 bqueue_enqueue(&sta->q, record, sizeof (*record));
1178 return (0);
b5256303 1179 }
30af21b0
PD
1180 if (zb->zb_level == 0 && zb->zb_object == DMU_META_DNODE_OBJECT &&
1181 !BP_IS_HOLE(bp)) {
1182 record = range_alloc(OBJECT_RANGE, 0, zb->zb_blkid,
1183 zb->zb_blkid + 1, B_FALSE);
1184 record->sru.object_range.bp = *bp;
1185 bqueue_enqueue(&sta->q, record, sizeof (*record));
1186 return (0);
1187 }
1188 if (zb->zb_level < 0 || (zb->zb_level > 0 && !BP_IS_HOLE(bp)))
1189 return (0);
1190 if (zb->zb_object == DMU_META_DNODE_OBJECT && !BP_IS_HOLE(bp))
1191 return (0);
b5256303 1192
30af21b0
PD
1193 uint64_t span = bp_span_in_blocks(dnp->dn_indblkshift, zb->zb_level);
1194 uint64_t start;
47dfff3b 1195
30af21b0
PD
1196 /*
1197 * If this multiply overflows, we don't need to send this block.
1198 * Even if it has a birth time, it can never not be a hole, so
1199 * we don't need to send records for it.
1200 */
1201 if (!overflow_multiply(span, zb->zb_blkid, &start) || (!(zb->zb_blkid ==
1202 DMU_SPILL_BLKID || DMU_OT_IS_METADATA(dnp->dn_type)) &&
1203 span * zb->zb_blkid > dnp->dn_maxblkid)) {
1204 ASSERT(BP_IS_HOLE(bp));
1205 return (0);
1206 }
1207
1208 if (zb->zb_blkid == DMU_SPILL_BLKID)
1209 ASSERT3U(BP_GET_TYPE(bp), ==, DMU_OT_SA);
1210
1211 record = range_alloc(DATA, zb->zb_object, start, (start + span < start ?
1212 0 : start + span), B_FALSE);
1213
1214 uint64_t datablksz = (zb->zb_blkid == DMU_SPILL_BLKID ?
1215 BP_GET_LSIZE(bp) : dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT);
1216 if (BP_IS_HOLE(bp)) {
1217 record->type = HOLE;
1218 record->sru.hole.datablksz = datablksz;
1219 } else if (BP_IS_REDACTED(bp)) {
1220 record->type = REDACT;
1221 record->sru.redact.datablksz = datablksz;
1222 } else {
1223 record->type = DATA;
1224 record->sru.data.datablksz = datablksz;
1225 record->sru.data.obj_type = dnp->dn_type;
1226 record->sru.data.bp = *bp;
1227 }
1228 bqueue_enqueue(&sta->q, record, sizeof (*record));
1229 return (0);
1230}
1231
1232struct redact_list_cb_arg {
1233 uint64_t *num_blocks_visited;
1234 bqueue_t *q;
1235 boolean_t *cancel;
1236 boolean_t mark_redact;
1237};
1238
1239static int
1240redact_list_cb(redact_block_phys_t *rb, void *arg)
1241{
1242 struct redact_list_cb_arg *rlcap = arg;
1243
1244 atomic_inc_64(rlcap->num_blocks_visited);
1245 if (*rlcap->cancel)
1246 return (-1);
1247
1248 struct send_range *data = range_alloc(REDACT, rb->rbp_object,
1249 rb->rbp_blkid, rb->rbp_blkid + redact_block_get_count(rb), B_FALSE);
1250 ASSERT3U(data->end_blkid, >, rb->rbp_blkid);
1251 if (rlcap->mark_redact) {
1252 data->type = REDACT;
1253 data->sru.redact.datablksz = redact_block_get_size(rb);
1254 } else {
1255 data->type = PREVIOUSLY_REDACTED;
1256 }
1257 bqueue_enqueue(rlcap->q, data, sizeof (*data));
1258
1259 return (0);
1260}
1261
1262/*
1263 * This function kicks off the traverse_dataset. It also handles setting the
1264 * error code of the thread in case something goes wrong, and pushes the End of
1265 * Stream record when the traverse_dataset call has finished. If there is no
1266 * dataset to traverse, then we traverse the redaction list provided and enqueue
1267 * records for that. If neither is provided, the thread immediately pushes an
1268 * End of Stream marker.
1269 */
1270static void
1271send_traverse_thread(void *arg)
1272{
1273 struct send_thread_arg *st_arg = arg;
1274 int err = 0;
1275 struct send_range *data;
1276 fstrans_cookie_t cookie = spl_fstrans_mark();
1277
1278 if (st_arg->ds != NULL) {
1279 ASSERT3P(st_arg->redaction_list, ==, NULL);
1280 err = traverse_dataset_resume(st_arg->ds,
1281 st_arg->fromtxg, &st_arg->resume,
1282 st_arg->flags, send_cb, st_arg);
1283 } else if (st_arg->redaction_list != NULL) {
1284 struct redact_list_cb_arg rlcba = {0};
1285 rlcba.cancel = &st_arg->cancel;
1286 rlcba.num_blocks_visited = st_arg->num_blocks_visited;
1287 rlcba.q = &st_arg->q;
1288 rlcba.mark_redact = B_FALSE;
1289 err = dsl_redaction_list_traverse(st_arg->redaction_list,
1290 &st_arg->resume, redact_list_cb, &rlcba);
1291 }
1292
1293 if (err != EINTR)
1294 st_arg->error_code = err;
1295 data = range_alloc(DATA, 0, 0, 0, B_TRUE);
1296 bqueue_enqueue_flush(&st_arg->q, data, sizeof (*data));
1297 spl_fstrans_unmark(cookie);
1298 thread_exit();
1299}
1300
1301/*
1302 * Utility function that causes End of Stream records to compare after of all
1303 * others, so that other threads' comparison logic can stay simple.
1304 */
1305static int __attribute__((unused))
1306send_range_after(const struct send_range *from, const struct send_range *to)
1307{
1308 if (from->eos_marker == B_TRUE)
1309 return (1);
1310 if (to->eos_marker == B_TRUE)
1311 return (-1);
1312
1313 uint64_t from_obj = from->object;
1314 uint64_t from_end_obj = from->object + 1;
1315 uint64_t to_obj = to->object;
1316 uint64_t to_end_obj = to->object + 1;
1317 if (from_obj == 0) {
1318 ASSERT(from->type == HOLE || from->type == OBJECT_RANGE);
1319 from_obj = from->start_blkid << DNODES_PER_BLOCK_SHIFT;
1320 from_end_obj = from->end_blkid << DNODES_PER_BLOCK_SHIFT;
1321 }
1322 if (to_obj == 0) {
1323 ASSERT(to->type == HOLE || to->type == OBJECT_RANGE);
1324 to_obj = to->start_blkid << DNODES_PER_BLOCK_SHIFT;
1325 to_end_obj = to->end_blkid << DNODES_PER_BLOCK_SHIFT;
1326 }
1327
1328 if (from_end_obj <= to_obj)
1329 return (-1);
1330 if (from_obj >= to_end_obj)
1331 return (1);
1332 int64_t cmp = AVL_CMP(to->type == OBJECT_RANGE, from->type ==
1333 OBJECT_RANGE);
1334 if (unlikely(cmp))
1335 return (cmp);
1336 cmp = AVL_CMP(to->type == OBJECT, from->type == OBJECT);
1337 if (unlikely(cmp))
1338 return (cmp);
1339 if (from->end_blkid <= to->start_blkid)
1340 return (-1);
1341 if (from->start_blkid >= to->end_blkid)
1342 return (1);
1343 return (0);
1344}
1345
1346/*
1347 * Pop the new data off the queue, check that the records we receive are in
1348 * the right order, but do not free the old data. This is used so that the
1349 * records can be sent on to the main thread without copying the data.
1350 */
1351static struct send_range *
1352get_next_range_nofree(bqueue_t *bq, struct send_range *prev)
1353{
1354 struct send_range *next = bqueue_dequeue(bq);
1355 ASSERT3S(send_range_after(prev, next), ==, -1);
1356 return (next);
1357}
1358
1359/*
1360 * Pop the new data off the queue, check that the records we receive are in
1361 * the right order, and free the old data.
1362 */
1363static struct send_range *
1364get_next_range(bqueue_t *bq, struct send_range *prev)
1365{
1366 struct send_range *next = get_next_range_nofree(bq, prev);
1367 range_free(prev);
1368 return (next);
1369}
1370
1371static void
1372redact_list_thread(void *arg)
1373{
1374 struct redact_list_thread_arg *rlt_arg = arg;
1375 struct send_range *record;
1376 fstrans_cookie_t cookie = spl_fstrans_mark();
1377 if (rlt_arg->rl != NULL) {
1378 struct redact_list_cb_arg rlcba = {0};
1379 rlcba.cancel = &rlt_arg->cancel;
1380 rlcba.q = &rlt_arg->q;
1381 rlcba.num_blocks_visited = rlt_arg->num_blocks_visited;
1382 rlcba.mark_redact = rlt_arg->mark_redact;
1383 int err = dsl_redaction_list_traverse(rlt_arg->rl,
1384 &rlt_arg->resume, redact_list_cb, &rlcba);
1385 if (err != EINTR)
1386 rlt_arg->error_code = err;
1387 }
1388 record = range_alloc(DATA, 0, 0, 0, B_TRUE);
1389 bqueue_enqueue_flush(&rlt_arg->q, record, sizeof (*record));
1390 spl_fstrans_unmark(cookie);
1391}
1392
1393/*
1394 * Compare the start point of the two provided ranges. End of stream ranges
1395 * compare last, objects compare before any data or hole inside that object and
1396 * multi-object holes that start at the same object.
1397 */
1398static int
1399send_range_start_compare(struct send_range *r1, struct send_range *r2)
1400{
1401 uint64_t r1_objequiv = r1->object;
1402 uint64_t r1_l0equiv = r1->start_blkid;
1403 uint64_t r2_objequiv = r2->object;
1404 uint64_t r2_l0equiv = r2->start_blkid;
1405 int64_t cmp = AVL_CMP(r1->eos_marker, r2->eos_marker);
1406 if (unlikely(cmp))
1407 return (cmp);
1408 if (r1->object == 0) {
1409 r1_objequiv = r1->start_blkid * DNODES_PER_BLOCK;
1410 r1_l0equiv = 0;
1411 }
1412 if (r2->object == 0) {
1413 r2_objequiv = r2->start_blkid * DNODES_PER_BLOCK;
1414 r2_l0equiv = 0;
1415 }
1416
1417 cmp = AVL_CMP(r1_objequiv, r2_objequiv);
1418 if (likely(cmp))
1419 return (cmp);
1420 cmp = AVL_CMP(r2->type == OBJECT_RANGE, r1->type == OBJECT_RANGE);
1421 if (unlikely(cmp))
1422 return (cmp);
1423 cmp = AVL_CMP(r2->type == OBJECT, r1->type == OBJECT);
1424 if (unlikely(cmp))
1425 return (cmp);
1426
1427 return (AVL_CMP(r1_l0equiv, r2_l0equiv));
1428}
1429
1430enum q_idx {
1431 REDACT_IDX = 0,
1432 TO_IDX,
1433 FROM_IDX,
1434 NUM_THREADS
1435};
1436
1437/*
1438 * This function returns the next range the send_merge_thread should operate on.
1439 * The inputs are two arrays; the first one stores the range at the front of the
1440 * queues stored in the second one. The ranges are sorted in descending
1441 * priority order; the metadata from earlier ranges overrules metadata from
1442 * later ranges. out_mask is used to return which threads the ranges came from;
1443 * bit i is set if ranges[i] started at the same place as the returned range.
1444 *
1445 * This code is not hardcoded to compare a specific number of threads; it could
1446 * be used with any number, just by changing the q_idx enum.
1447 *
1448 * The "next range" is the one with the earliest start; if two starts are equal,
1449 * the highest-priority range is the next to operate on. If a higher-priority
1450 * range starts in the middle of the first range, then the first range will be
1451 * truncated to end where the higher-priority range starts, and we will operate
1452 * on that one next time. In this way, we make sure that each block covered by
1453 * some range gets covered by a returned range, and each block covered is
1454 * returned using the metadata of the highest-priority range it appears in.
1455 *
1456 * For example, if the three ranges at the front of the queues were [2,4),
1457 * [3,5), and [1,3), then the ranges returned would be [1,2) with the metadata
1458 * from the third range, [2,4) with the metadata from the first range, and then
1459 * [4,5) with the metadata from the second.
1460 */
1461static struct send_range *
1462find_next_range(struct send_range **ranges, bqueue_t **qs, uint64_t *out_mask)
1463{
1464 int idx = 0; // index of the range with the earliest start
1465 int i;
1466 uint64_t bmask = 0;
1467 for (i = 1; i < NUM_THREADS; i++) {
1468 if (send_range_start_compare(ranges[i], ranges[idx]) < 0)
1469 idx = i;
1470 }
1471 if (ranges[idx]->eos_marker) {
1472 struct send_range *ret = range_alloc(DATA, 0, 0, 0, B_TRUE);
1473 *out_mask = 0;
1474 return (ret);
1475 }
1476 /*
1477 * Find all the ranges that start at that same point.
1478 */
1479 for (i = 0; i < NUM_THREADS; i++) {
1480 if (send_range_start_compare(ranges[i], ranges[idx]) == 0)
1481 bmask |= 1 << i;
1482 }
1483 *out_mask = bmask;
1484 /*
1485 * OBJECT_RANGE records only come from the TO thread, and should always
1486 * be treated as overlapping with nothing and sent on immediately. They
1487 * are only used in raw sends, and are never redacted.
1488 */
1489 if (ranges[idx]->type == OBJECT_RANGE) {
1490 ASSERT3U(idx, ==, TO_IDX);
1491 ASSERT3U(*out_mask, ==, 1 << TO_IDX);
1492 struct send_range *ret = ranges[idx];
1493 ranges[idx] = get_next_range_nofree(qs[idx], ranges[idx]);
1494 return (ret);
1495 }
1496 /*
1497 * Find the first start or end point after the start of the first range.
1498 */
1499 uint64_t first_change = ranges[idx]->end_blkid;
1500 for (i = 0; i < NUM_THREADS; i++) {
1501 if (i == idx || ranges[i]->eos_marker ||
1502 ranges[i]->object > ranges[idx]->object ||
1503 ranges[i]->object == DMU_META_DNODE_OBJECT)
1504 continue;
1505 ASSERT3U(ranges[i]->object, ==, ranges[idx]->object);
1506 if (first_change > ranges[i]->start_blkid &&
1507 (bmask & (1 << i)) == 0)
1508 first_change = ranges[i]->start_blkid;
1509 else if (first_change > ranges[i]->end_blkid)
1510 first_change = ranges[i]->end_blkid;
1511 }
1512 /*
1513 * Update all ranges to no longer overlap with the range we're
1514 * returning. All such ranges must start at the same place as the range
1515 * being returned, and end at or after first_change. Thus we update
1516 * their start to first_change. If that makes them size 0, then free
1517 * them and pull a new range from that thread.
1518 */
1519 for (i = 0; i < NUM_THREADS; i++) {
1520 if (i == idx || (bmask & (1 << i)) == 0)
1521 continue;
1522 ASSERT3U(first_change, >, ranges[i]->start_blkid);
1523 ranges[i]->start_blkid = first_change;
1524 ASSERT3U(ranges[i]->start_blkid, <=, ranges[i]->end_blkid);
1525 if (ranges[i]->start_blkid == ranges[i]->end_blkid)
1526 ranges[i] = get_next_range(qs[i], ranges[i]);
1527 }
1528 /*
1529 * Short-circuit the simple case; if the range doesn't overlap with
1530 * anything else, or it only overlaps with things that start at the same
1531 * place and are longer, send it on.
1532 */
1533 if (first_change == ranges[idx]->end_blkid) {
1534 struct send_range *ret = ranges[idx];
1535 ranges[idx] = get_next_range_nofree(qs[idx], ranges[idx]);
1536 return (ret);
1537 }
1538
1539 /*
1540 * Otherwise, return a truncated copy of ranges[idx] and move the start
1541 * of ranges[idx] back to first_change.
1542 */
1543 struct send_range *ret = kmem_alloc(sizeof (*ret), KM_SLEEP);
1544 *ret = *ranges[idx];
1545 ret->end_blkid = first_change;
1546 ranges[idx]->start_blkid = first_change;
1547 return (ret);
1548}
1549
1550#define FROM_AND_REDACT_BITS ((1 << REDACT_IDX) | (1 << FROM_IDX))
1551
1552/*
1553 * Merge the results from the from thread and the to thread, and then hand the
1554 * records off to send_prefetch_thread to prefetch them. If this is not a
1555 * send from a redaction bookmark, the from thread will push an end of stream
1556 * record and stop, and we'll just send everything that was changed in the
1557 * to_ds since the ancestor's creation txg. If it is, then since
1558 * traverse_dataset has a canonical order, we can compare each change as
1559 * they're pulled off the queues. That will give us a stream that is
1560 * appropriately sorted, and covers all records. In addition, we pull the
1561 * data from the redact_list_thread and use that to determine which blocks
1562 * should be redacted.
1563 */
1564static void
1565send_merge_thread(void *arg)
1566{
1567 struct send_merge_thread_arg *smt_arg = arg;
1568 struct send_range *front_ranges[NUM_THREADS];
1569 bqueue_t *queues[NUM_THREADS];
1570 int err = 0;
1571 fstrans_cookie_t cookie = spl_fstrans_mark();
1572
1573 if (smt_arg->redact_arg == NULL) {
1574 front_ranges[REDACT_IDX] =
1575 kmem_zalloc(sizeof (struct send_range), KM_SLEEP);
1576 front_ranges[REDACT_IDX]->eos_marker = B_TRUE;
1577 front_ranges[REDACT_IDX]->type = REDACT;
1578 queues[REDACT_IDX] = NULL;
1579 } else {
1580 front_ranges[REDACT_IDX] =
1581 bqueue_dequeue(&smt_arg->redact_arg->q);
1582 queues[REDACT_IDX] = &smt_arg->redact_arg->q;
1583 }
1584 front_ranges[TO_IDX] = bqueue_dequeue(&smt_arg->to_arg->q);
1585 queues[TO_IDX] = &smt_arg->to_arg->q;
1586 front_ranges[FROM_IDX] = bqueue_dequeue(&smt_arg->from_arg->q);
1587 queues[FROM_IDX] = &smt_arg->from_arg->q;
1588 uint64_t mask = 0;
1589 struct send_range *range;
1590 for (range = find_next_range(front_ranges, queues, &mask);
1591 !range->eos_marker && err == 0 && !smt_arg->cancel;
1592 range = find_next_range(front_ranges, queues, &mask)) {
1593 /*
1594 * If the range in question was in both the from redact bookmark
1595 * and the bookmark we're using to redact, then don't send it.
1596 * It's already redacted on the receiving system, so a redaction
1597 * record would be redundant.
1598 */
1599 if ((mask & FROM_AND_REDACT_BITS) == FROM_AND_REDACT_BITS) {
1600 ASSERT3U(range->type, ==, REDACT);
1601 range_free(range);
1602 continue;
1603 }
1604 bqueue_enqueue(&smt_arg->q, range, sizeof (*range));
1605
1606 if (smt_arg->to_arg->error_code != 0) {
1607 err = smt_arg->to_arg->error_code;
1608 } else if (smt_arg->from_arg->error_code != 0) {
1609 err = smt_arg->from_arg->error_code;
1610 } else if (smt_arg->redact_arg != NULL &&
1611 smt_arg->redact_arg->error_code != 0) {
1612 err = smt_arg->redact_arg->error_code;
1613 }
1614 }
1615 if (smt_arg->cancel && err == 0)
1616 err = SET_ERROR(EINTR);
1617 smt_arg->error = err;
1618 if (smt_arg->error != 0) {
1619 smt_arg->to_arg->cancel = B_TRUE;
1620 smt_arg->from_arg->cancel = B_TRUE;
1621 if (smt_arg->redact_arg != NULL)
1622 smt_arg->redact_arg->cancel = B_TRUE;
1623 }
1624 for (int i = 0; i < NUM_THREADS; i++) {
1625 while (!front_ranges[i]->eos_marker) {
1626 front_ranges[i] = get_next_range(queues[i],
1627 front_ranges[i]);
1628 }
1629 range_free(front_ranges[i]);
1630 }
1631 if (range == NULL)
1632 range = kmem_zalloc(sizeof (*range), KM_SLEEP);
1633 range->eos_marker = B_TRUE;
1634 bqueue_enqueue_flush(&smt_arg->q, range, 1);
1635 spl_fstrans_unmark(cookie);
1636 thread_exit();
1637}
1638
1639struct send_prefetch_thread_arg {
1640 struct send_merge_thread_arg *smta;
1641 bqueue_t q;
1642 boolean_t cancel;
1643 boolean_t issue_prefetches;
1644 int error;
1645};
1646
1647/*
1648 * Create a new record with the given values.
1649 */
1650static void
1651enqueue_range(struct send_prefetch_thread_arg *spta, bqueue_t *q, dnode_t *dn,
1652 uint64_t blkid, uint64_t count, const blkptr_t *bp, uint32_t datablksz)
1653{
1654 enum type range_type = (bp == NULL || BP_IS_HOLE(bp) ? HOLE :
1655 (BP_IS_REDACTED(bp) ? REDACT : DATA));
1656
1657 struct send_range *range = range_alloc(range_type, dn->dn_object,
1658 blkid, blkid + count, B_FALSE);
1659
1660 if (blkid == DMU_SPILL_BLKID)
1661 ASSERT3U(BP_GET_TYPE(bp), ==, DMU_OT_SA);
1662
1663 switch (range_type) {
1664 case HOLE:
1665 range->sru.hole.datablksz = datablksz;
1666 break;
1667 case DATA:
1668 ASSERT3U(count, ==, 1);
1669 range->sru.data.datablksz = datablksz;
1670 range->sru.data.obj_type = dn->dn_type;
1671 range->sru.data.bp = *bp;
1672 if (spta->issue_prefetches) {
1673 zbookmark_phys_t zb = {0};
1674 zb.zb_objset = dmu_objset_id(dn->dn_objset);
1675 zb.zb_object = dn->dn_object;
1676 zb.zb_level = 0;
1677 zb.zb_blkid = blkid;
1678 arc_flags_t aflags = ARC_FLAG_NOWAIT |
1679 ARC_FLAG_PREFETCH;
1680 (void) arc_read(NULL, dn->dn_objset->os_spa, bp, NULL,
1681 NULL, ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_CANFAIL |
1682 ZIO_FLAG_SPECULATIVE, &aflags, &zb);
1683 }
1684 break;
1685 case REDACT:
1686 range->sru.redact.datablksz = datablksz;
1687 break;
1688 default:
1689 break;
1690 }
1691 bqueue_enqueue(q, range, datablksz);
1692}
1693
1694/*
1695 * This thread is responsible for two things: First, it retrieves the correct
1696 * blkptr in the to ds if we need to send the data because of something from
1697 * the from thread. As a result of this, we're the first ones to discover that
1698 * some indirect blocks can be discarded because they're not holes. Second,
1699 * it issues prefetches for the data we need to send.
1700 */
1701static void
1702send_prefetch_thread(void *arg)
1703{
1704 struct send_prefetch_thread_arg *spta = arg;
1705 struct send_merge_thread_arg *smta = spta->smta;
1706 bqueue_t *inq = &smta->q;
1707 bqueue_t *outq = &spta->q;
1708 objset_t *os = smta->os;
1709 fstrans_cookie_t cookie = spl_fstrans_mark();
1710 struct send_range *range = bqueue_dequeue(inq);
1711 int err = 0;
1712
1713 /*
1714 * If the record we're analyzing is from a redaction bookmark from the
1715 * fromds, then we need to know whether or not it exists in the tods so
1716 * we know whether to create records for it or not. If it does, we need
1717 * the datablksz so we can generate an appropriate record for it.
1718 * Finally, if it isn't redacted, we need the blkptr so that we can send
1719 * a WRITE record containing the actual data.
1720 */
1721 uint64_t last_obj = UINT64_MAX;
1722 uint64_t last_obj_exists = B_TRUE;
1723 while (!range->eos_marker && !spta->cancel && smta->error == 0 &&
1724 err == 0) {
1725 switch (range->type) {
1726 case DATA: {
1727 zbookmark_phys_t zb;
1728 zb.zb_objset = dmu_objset_id(os);
1729 zb.zb_object = range->object;
1730 zb.zb_level = 0;
1731 zb.zb_blkid = range->start_blkid;
1732 ASSERT3U(range->start_blkid + 1, ==, range->end_blkid);
1733 if (!BP_IS_REDACTED(&range->sru.data.bp) &&
1734 spta->issue_prefetches &&
1735 !BP_IS_EMBEDDED(&range->sru.data.bp)) {
1736 arc_flags_t aflags = ARC_FLAG_NOWAIT |
1737 ARC_FLAG_PREFETCH;
1738 (void) arc_read(NULL, os->os_spa,
1739 &range->sru.data.bp, NULL, NULL,
1740 ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_CANFAIL |
1741 ZIO_FLAG_SPECULATIVE, &aflags, &zb);
1742 }
1743 bqueue_enqueue(outq, range, range->sru.data.datablksz);
1744 range = get_next_range_nofree(inq, range);
1745 break;
1746 }
1747 case HOLE:
1748 case OBJECT:
1749 case OBJECT_RANGE:
1750 case REDACT: // Redacted blocks must exist
1751 bqueue_enqueue(outq, range, sizeof (*range));
1752 range = get_next_range_nofree(inq, range);
1753 break;
1754 case PREVIOUSLY_REDACTED: {
1755 /*
1756 * This entry came from the "from bookmark" when
1757 * sending from a bookmark that has a redaction
1758 * list. We need to check if this object/blkid
1759 * exists in the target ("to") dataset, and if
1760 * not then we drop this entry. We also need
1761 * to fill in the block pointer so that we know
1762 * what to prefetch.
1763 *
1764 * To accomplish the above, we first cache whether or
1765 * not the last object we examined exists. If it
1766 * doesn't, we can drop this record. If it does, we hold
1767 * the dnode and use it to call dbuf_dnode_findbp. We do
1768 * this instead of dbuf_bookmark_findbp because we will
1769 * often operate on large ranges, and holding the dnode
1770 * once is more efficient.
1771 */
1772 boolean_t object_exists = B_TRUE;
1773 /*
1774 * If the data is redacted, we only care if it exists,
1775 * so that we don't send records for objects that have
1776 * been deleted.
1777 */
1778 dnode_t *dn;
1779 if (range->object == last_obj && !last_obj_exists) {
1780 /*
1781 * If we're still examining the same object as
1782 * previously, and it doesn't exist, we don't
1783 * need to call dbuf_bookmark_findbp.
1784 */
1785 object_exists = B_FALSE;
1786 } else {
1787 err = dnode_hold(os, range->object, FTAG, &dn);
1788 if (err == ENOENT) {
1789 object_exists = B_FALSE;
1790 err = 0;
1791 }
1792 last_obj = range->object;
1793 last_obj_exists = object_exists;
1794 }
1795
1796 if (err != 0) {
1797 break;
1798 } else if (!object_exists) {
1799 /*
1800 * The block was modified, but doesn't
1801 * exist in the to dataset; if it was
1802 * deleted in the to dataset, then we'll
1803 * visit the hole bp for it at some point.
1804 */
1805 range = get_next_range(inq, range);
1806 continue;
1807 }
1808 uint64_t file_max =
1809 (dn->dn_maxblkid < range->end_blkid ?
1810 dn->dn_maxblkid : range->end_blkid);
1811 /*
1812 * The object exists, so we need to try to find the
1813 * blkptr for each block in the range we're processing.
1814 */
1815 rw_enter(&dn->dn_struct_rwlock, RW_READER);
1816 for (uint64_t blkid = range->start_blkid;
1817 blkid < file_max; blkid++) {
1818 blkptr_t bp;
1819 uint32_t datablksz =
1820 dn->dn_phys->dn_datablkszsec <<
1821 SPA_MINBLOCKSHIFT;
1822 uint64_t offset = blkid * datablksz;
1823 /*
1824 * This call finds the next non-hole block in
1825 * the object. This is to prevent a
1826 * performance problem where we're unredacting
1827 * a large hole. Using dnode_next_offset to
1828 * skip over the large hole avoids iterating
1829 * over every block in it.
1830 */
1831 err = dnode_next_offset(dn, DNODE_FIND_HAVELOCK,
1832 &offset, 1, 1, 0);
1833 if (err == ESRCH) {
1834 offset = UINT64_MAX;
1835 err = 0;
1836 } else if (err != 0) {
1837 break;
1838 }
1839 if (offset != blkid * datablksz) {
1840 /*
1841 * if there is a hole from here
1842 * (blkid) to offset
1843 */
1844 offset = MIN(offset, file_max *
1845 datablksz);
1846 uint64_t nblks = (offset / datablksz) -
1847 blkid;
1848 enqueue_range(spta, outq, dn, blkid,
1849 nblks, NULL, datablksz);
1850 blkid += nblks;
1851 }
1852 if (blkid >= file_max)
1853 break;
1854 err = dbuf_dnode_findbp(dn, 0, blkid, &bp,
1855 NULL, NULL);
1856 if (err != 0)
1857 break;
1858 ASSERT(!BP_IS_HOLE(&bp));
1859 enqueue_range(spta, outq, dn, blkid, 1, &bp,
1860 datablksz);
1861 }
1862 rw_exit(&dn->dn_struct_rwlock);
1863 dnode_rele(dn, FTAG);
1864 range = get_next_range(inq, range);
37abac6d 1865 }
428870ff
BB
1866 }
1867 }
30af21b0
PD
1868 if (spta->cancel || err != 0) {
1869 smta->cancel = B_TRUE;
1870 spta->error = err;
1871 } else if (smta->error != 0) {
1872 spta->error = smta->error;
1873 }
1874 while (!range->eos_marker)
1875 range = get_next_range(inq, range);
1876
1877 bqueue_enqueue_flush(outq, range, 1);
1878 spl_fstrans_unmark(cookie);
1879 thread_exit();
1880}
1881
1882#define NUM_SNAPS_NOT_REDACTED UINT64_MAX
1883
1884struct dmu_send_params {
1885 /* Pool args */
1886 void *tag; // Tag that dp was held with, will be used to release dp.
1887 dsl_pool_t *dp;
1888 /* To snapshot args */
1889 const char *tosnap;
1890 dsl_dataset_t *to_ds;
1891 /* From snapshot args */
1892 zfs_bookmark_phys_t ancestor_zb;
1893 uint64_t *fromredactsnaps;
1894 /* NUM_SNAPS_NOT_REDACTED if not sending from redaction bookmark */
1895 uint64_t numfromredactsnaps;
1896 /* Stream params */
1897 boolean_t is_clone;
1898 boolean_t embedok;
1899 boolean_t large_block_ok;
1900 boolean_t compressok;
1901 uint64_t resumeobj;
1902 uint64_t resumeoff;
1903 zfs_bookmark_phys_t *redactbook;
1904 /* Stream output params */
1905 dmu_send_outparams_t *dso;
1906
1907 /* Stream progress params */
1908 offset_t *off;
1909 int outfd;
1910 boolean_t rawok;
1911};
1912
1913static int
1914setup_featureflags(struct dmu_send_params *dspp, objset_t *os,
1915 uint64_t *featureflags)
1916{
1917 dsl_dataset_t *to_ds = dspp->to_ds;
1918 dsl_pool_t *dp = dspp->dp;
1919#ifdef _KERNEL
1920 if (dmu_objset_type(os) == DMU_OST_ZFS) {
1921 uint64_t version;
1922 if (zfs_get_zplprop(os, ZFS_PROP_VERSION, &version) != 0)
1923 return (SET_ERROR(EINVAL));
1924
1925 if (version >= ZPL_VERSION_SA)
1926 *featureflags |= DMU_BACKUP_FEATURE_SA_SPILL;
1927 }
428870ff
BB
1928#endif
1929
b5256303 1930 /* raw sends imply large_block_ok */
30af21b0
PD
1931 if ((dspp->rawok || dspp->large_block_ok) &&
1932 dsl_dataset_feature_is_active(to_ds, SPA_FEATURE_LARGE_BLOCKS)) {
1933 *featureflags |= DMU_BACKUP_FEATURE_LARGE_BLOCKS;
1934 }
b5256303
TC
1935
1936 /* encrypted datasets will not have embedded blocks */
30af21b0 1937 if ((dspp->embedok || dspp->rawok) && !os->os_encrypted &&
9b67f605 1938 spa_feature_is_active(dp->dp_spa, SPA_FEATURE_EMBEDDED_DATA)) {
30af21b0 1939 *featureflags |= DMU_BACKUP_FEATURE_EMBED_DATA;
2aa34383 1940 }
b5256303
TC
1941
1942 /* raw send implies compressok */
30af21b0
PD
1943 if (dspp->compressok || dspp->rawok)
1944 *featureflags |= DMU_BACKUP_FEATURE_COMPRESSED;
1945 if (dspp->rawok && os->os_encrypted)
1946 *featureflags |= DMU_BACKUP_FEATURE_RAW;
b5256303 1947
30af21b0 1948 if ((*featureflags &
b5256303
TC
1949 (DMU_BACKUP_FEATURE_EMBED_DATA | DMU_BACKUP_FEATURE_COMPRESSED |
1950 DMU_BACKUP_FEATURE_RAW)) != 0 &&
1951 spa_feature_is_active(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS)) {
30af21b0 1952 *featureflags |= DMU_BACKUP_FEATURE_LZ4;
9b67f605
MA
1953 }
1954
30af21b0
PD
1955 if (dspp->resumeobj != 0 || dspp->resumeoff != 0) {
1956 *featureflags |= DMU_BACKUP_FEATURE_RESUMING;
47dfff3b
MA
1957 }
1958
30af21b0
PD
1959 if (dspp->redactbook != NULL) {
1960 *featureflags |= DMU_BACKUP_FEATURE_REDACTED;
1961 }
9b67f605 1962
30af21b0
PD
1963 if (dsl_dataset_feature_is_active(to_ds, SPA_FEATURE_LARGE_DNODE)) {
1964 *featureflags |= DMU_BACKUP_FEATURE_LARGE_DNODE;
1965 }
1966 return (0);
1967}
34dc7c2f 1968
30af21b0
PD
1969static dmu_replay_record_t *
1970create_begin_record(struct dmu_send_params *dspp, objset_t *os,
1971 uint64_t featureflags)
1972{
1973 dmu_replay_record_t *drr = kmem_zalloc(sizeof (dmu_replay_record_t),
1974 KM_SLEEP);
1975 drr->drr_type = DRR_BEGIN;
1976
1977 struct drr_begin *drrb = &drr->drr_u.drr_begin;
1978 dsl_dataset_t *to_ds = dspp->to_ds;
1979
1980 drrb->drr_magic = DMU_BACKUP_MAGIC;
1981 drrb->drr_creation_time = dsl_dataset_phys(to_ds)->ds_creation_time;
1982 drrb->drr_type = dmu_objset_type(os);
1983 drrb->drr_toguid = dsl_dataset_phys(to_ds)->ds_guid;
1984 drrb->drr_fromguid = dspp->ancestor_zb.zbm_guid;
1985
1986 DMU_SET_STREAM_HDRTYPE(drrb->drr_versioninfo, DMU_SUBSTREAM);
1987 DMU_SET_FEATUREFLAGS(drrb->drr_versioninfo, featureflags);
1988
1989 if (dspp->is_clone)
1990 drrb->drr_flags |= DRR_FLAG_CLONE;
1991 if (dsl_dataset_phys(dspp->to_ds)->ds_flags & DS_FLAG_CI_DATASET)
1992 drrb->drr_flags |= DRR_FLAG_CI_DATA;
1993 if (zfs_send_set_freerecords_bit)
1994 drrb->drr_flags |= DRR_FLAG_FREERECORDS;
caf9dd20
BB
1995 drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_SPILL_BLOCK;
1996
30af21b0 1997 dsl_dataset_name(to_ds, drrb->drr_toname);
fcff0f35 1998 if (!to_ds->ds_is_snapshot) {
30af21b0
PD
1999 (void) strlcat(drrb->drr_toname, "@--head--",
2000 sizeof (drrb->drr_toname));
13fe0198 2001 }
30af21b0
PD
2002 return (drr);
2003}
34dc7c2f 2004
30af21b0
PD
2005static void
2006setup_to_thread(struct send_thread_arg *to_arg, dsl_dataset_t *to_ds,
2007 dmu_sendstatus_t *dssp, uint64_t fromtxg, boolean_t rawok)
2008{
2009 VERIFY0(bqueue_init(&to_arg->q, zfs_send_no_prefetch_queue_ff,
2010 MAX(zfs_send_no_prefetch_queue_length, 2 * zfs_max_recordsize),
2011 offsetof(struct send_range, ln)));
2012 to_arg->error_code = 0;
2013 to_arg->cancel = B_FALSE;
2014 to_arg->ds = to_ds;
2015 to_arg->fromtxg = fromtxg;
2016 to_arg->flags = TRAVERSE_PRE | TRAVERSE_PREFETCH_METADATA;
2017 if (rawok)
2018 to_arg->flags |= TRAVERSE_NO_DECRYPT;
2019 to_arg->redaction_list = NULL;
2020 to_arg->num_blocks_visited = &dssp->dss_blocks;
2021 (void) thread_create(NULL, 0, send_traverse_thread, to_arg, 0,
2022 curproc, TS_RUN, minclsyspri);
2023}
37abac6d 2024
30af21b0
PD
2025static void
2026setup_from_thread(struct redact_list_thread_arg *from_arg,
2027 redaction_list_t *from_rl, dmu_sendstatus_t *dssp)
2028{
2029 VERIFY0(bqueue_init(&from_arg->q, zfs_send_no_prefetch_queue_ff,
2030 MAX(zfs_send_no_prefetch_queue_length, 2 * zfs_max_recordsize),
2031 offsetof(struct send_range, ln)));
2032 from_arg->error_code = 0;
2033 from_arg->cancel = B_FALSE;
2034 from_arg->rl = from_rl;
2035 from_arg->mark_redact = B_FALSE;
2036 from_arg->num_blocks_visited = &dssp->dss_blocks;
2037 /*
2038 * If from_ds is null, send_traverse_thread just returns success and
2039 * enqueues an eos marker.
2040 */
2041 (void) thread_create(NULL, 0, redact_list_thread, from_arg, 0,
2042 curproc, TS_RUN, minclsyspri);
2043}
37abac6d 2044
30af21b0
PD
2045static void
2046setup_redact_list_thread(struct redact_list_thread_arg *rlt_arg,
2047 struct dmu_send_params *dspp, redaction_list_t *rl, dmu_sendstatus_t *dssp)
2048{
2049 if (dspp->redactbook == NULL)
2050 return;
2051
2052 rlt_arg->cancel = B_FALSE;
2053 VERIFY0(bqueue_init(&rlt_arg->q, zfs_send_no_prefetch_queue_ff,
2054 MAX(zfs_send_no_prefetch_queue_length, 2 * zfs_max_recordsize),
2055 offsetof(struct send_range, ln)));
2056 rlt_arg->error_code = 0;
2057 rlt_arg->mark_redact = B_TRUE;
2058 rlt_arg->rl = rl;
2059 rlt_arg->num_blocks_visited = &dssp->dss_blocks;
2060
2061 (void) thread_create(NULL, 0, redact_list_thread, rlt_arg, 0,
2062 curproc, TS_RUN, minclsyspri);
2063}
37abac6d 2064
30af21b0
PD
2065static void
2066setup_merge_thread(struct send_merge_thread_arg *smt_arg,
2067 struct dmu_send_params *dspp, struct redact_list_thread_arg *from_arg,
2068 struct send_thread_arg *to_arg, struct redact_list_thread_arg *rlt_arg,
2069 objset_t *os)
2070{
2071 VERIFY0(bqueue_init(&smt_arg->q, zfs_send_no_prefetch_queue_ff,
2072 MAX(zfs_send_no_prefetch_queue_length, 2 * zfs_max_recordsize),
2073 offsetof(struct send_range, ln)));
2074 smt_arg->cancel = B_FALSE;
2075 smt_arg->error = 0;
2076 smt_arg->from_arg = from_arg;
2077 smt_arg->to_arg = to_arg;
2078 if (dspp->redactbook != NULL)
2079 smt_arg->redact_arg = rlt_arg;
2080
2081 smt_arg->os = os;
2082 (void) thread_create(NULL, 0, send_merge_thread, smt_arg, 0, curproc,
2083 TS_RUN, minclsyspri);
2084}
7ec09286 2085
30af21b0
PD
2086static void
2087setup_prefetch_thread(struct send_prefetch_thread_arg *spt_arg,
2088 struct dmu_send_params *dspp, struct send_merge_thread_arg *smt_arg)
2089{
2090 VERIFY0(bqueue_init(&spt_arg->q, zfs_send_queue_ff,
2091 MAX(zfs_send_queue_length, 2 * zfs_max_recordsize),
2092 offsetof(struct send_range, ln)));
2093 spt_arg->smta = smt_arg;
2094 spt_arg->issue_prefetches = !dspp->dso->dso_dryrun;
2095 (void) thread_create(NULL, 0, send_prefetch_thread, spt_arg, 0,
2096 curproc, TS_RUN, minclsyspri);
2097}
b5256303 2098
30af21b0
PD
2099static int
2100setup_resume_points(struct dmu_send_params *dspp,
2101 struct send_thread_arg *to_arg, struct redact_list_thread_arg *from_arg,
2102 struct redact_list_thread_arg *rlt_arg,
2103 struct send_merge_thread_arg *smt_arg, boolean_t resuming, objset_t *os,
2104 redaction_list_t *redact_rl, nvlist_t *nvl)
2105{
2106 dsl_dataset_t *to_ds = dspp->to_ds;
2107 int err = 0;
2108
2109 uint64_t obj = 0;
2110 uint64_t blkid = 0;
2111 if (resuming) {
2112 obj = dspp->resumeobj;
2113 dmu_object_info_t to_doi;
2114 err = dmu_object_info(os, obj, &to_doi);
2115 if (err != 0)
2116 return (err);
2117
2118 blkid = dspp->resumeoff / to_doi.doi_data_block_size;
2119 }
2120 /*
2121 * If we're resuming a redacted send, we can skip to the appropriate
2122 * point in the redaction bookmark by binary searching through it.
2123 */
2124 smt_arg->bookmark_before = B_FALSE;
2125 if (redact_rl != NULL) {
2126 SET_BOOKMARK(&rlt_arg->resume, to_ds->ds_object, obj, 0, blkid);
2127 }
2128
2129 SET_BOOKMARK(&to_arg->resume, to_ds->ds_object, obj, 0, blkid);
2130 if (nvlist_exists(nvl, BEGINNV_REDACT_FROM_SNAPS)) {
2131 uint64_t objset = dspp->ancestor_zb.zbm_redaction_obj;
2132 /*
2133 * Note: If the resume point is in an object whose
2134 * blocksize is different in the from vs to snapshots,
2135 * we will have divided by the "wrong" blocksize.
2136 * However, in this case fromsnap's send_cb() will
2137 * detect that the blocksize has changed and therefore
2138 * ignore this object.
2139 *
2140 * If we're resuming a send from a redaction bookmark,
2141 * we still cannot accidentally suggest blocks behind
2142 * the to_ds. In addition, we know that any blocks in
2143 * the object in the to_ds will have to be sent, since
2144 * the size changed. Therefore, we can't cause any harm
2145 * this way either.
2146 */
2147 SET_BOOKMARK(&from_arg->resume, objset, obj, 0, blkid);
2148 }
2149 if (resuming) {
2150 fnvlist_add_uint64(nvl, BEGINNV_RESUME_OBJECT, dspp->resumeobj);
2151 fnvlist_add_uint64(nvl, BEGINNV_RESUME_OFFSET, dspp->resumeoff);
2152 }
2153 return (0);
2154}
b5256303 2155
30af21b0
PD
2156static dmu_sendstatus_t *
2157setup_send_progress(struct dmu_send_params *dspp)
2158{
2159 dmu_sendstatus_t *dssp = kmem_zalloc(sizeof (*dssp), KM_SLEEP);
2160 dssp->dss_outfd = dspp->outfd;
2161 dssp->dss_off = dspp->off;
2162 dssp->dss_proc = curproc;
2163 mutex_enter(&dspp->to_ds->ds_sendstream_lock);
2164 list_insert_head(&dspp->to_ds->ds_sendstreams, dssp);
2165 mutex_exit(&dspp->to_ds->ds_sendstream_lock);
2166 return (dssp);
2167}
b5256303 2168
30af21b0
PD
2169/*
2170 * Actually do the bulk of the work in a zfs send.
2171 *
2172 * The idea is that we want to do a send from ancestor_zb to to_ds. We also
2173 * want to not send any data that has been modified by all the datasets in
2174 * redactsnaparr, and store the list of blocks that are redacted in this way in
2175 * a bookmark named redactbook, created on the to_ds. We do this by creating
2176 * several worker threads, whose function is described below.
2177 *
2178 * There are three cases.
2179 * The first case is a redacted zfs send. In this case there are 5 threads.
2180 * The first thread is the to_ds traversal thread: it calls dataset_traverse on
2181 * the to_ds and finds all the blocks that have changed since ancestor_zb (if
2182 * it's a full send, that's all blocks in the dataset). It then sends those
2183 * blocks on to the send merge thread. The redact list thread takes the data
2184 * from the redaction bookmark and sends those blocks on to the send merge
2185 * thread. The send merge thread takes the data from the to_ds traversal
2186 * thread, and combines it with the redaction records from the redact list
2187 * thread. If a block appears in both the to_ds's data and the redaction data,
2188 * the send merge thread will mark it as redacted and send it on to the prefetch
2189 * thread. Otherwise, the send merge thread will send the block on to the
2190 * prefetch thread unchanged. The prefetch thread will issue prefetch reads for
2191 * any data that isn't redacted, and then send the data on to the main thread.
2192 * The main thread behaves the same as in a normal send case, issuing demand
2193 * reads for data blocks and sending out records over the network
2194 *
2195 * The graphic below diagrams the flow of data in the case of a redacted zfs
2196 * send. Each box represents a thread, and each line represents the flow of
2197 * data.
2198 *
2199 * Records from the |
2200 * redaction bookmark |
2201 * +--------------------+ | +---------------------------+
2202 * | | v | Send Merge Thread |
2203 * | Redact List Thread +----------> Apply redaction marks to |
2204 * | | | records as specified by |
2205 * +--------------------+ | redaction ranges |
2206 * +----^---------------+------+
2207 * | | Merged data
2208 * | |
2209 * | +------------v--------+
2210 * | | Prefetch Thread |
2211 * +--------------------+ | | Issues prefetch |
2212 * | to_ds Traversal | | | reads of data blocks|
2213 * | Thread (finds +---------------+ +------------+--------+
2214 * | candidate blocks) | Blocks modified | Prefetched data
2215 * +--------------------+ by to_ds since |
2216 * ancestor_zb +------------v----+
2217 * | Main Thread | File Descriptor
2218 * | Sends data over +->(to zfs receive)
2219 * | wire |
2220 * +-----------------+
2221 *
2222 * The second case is an incremental send from a redaction bookmark. The to_ds
2223 * traversal thread and the main thread behave the same as in the redacted
2224 * send case. The new thread is the from bookmark traversal thread. It
2225 * iterates over the redaction list in the redaction bookmark, and enqueues
2226 * records for each block that was redacted in the original send. The send
2227 * merge thread now has to merge the data from the two threads. For details
2228 * about that process, see the header comment of send_merge_thread(). Any data
2229 * it decides to send on will be prefetched by the prefetch thread. Note that
2230 * you can perform a redacted send from a redaction bookmark; in that case,
2231 * the data flow behaves very similarly to the flow in the redacted send case,
2232 * except with the addition of the bookmark traversal thread iterating over the
2233 * redaction bookmark. The send_merge_thread also has to take on the
2234 * responsibility of merging the redact list thread's records, the bookmark
2235 * traversal thread's records, and the to_ds records.
2236 *
2237 * +---------------------+
2238 * | |
2239 * | Redact List Thread +--------------+
2240 * | | |
2241 * +---------------------+ |
2242 * Blocks in redaction list | Ranges modified by every secure snap
2243 * of from bookmark | (or EOS if not readcted)
2244 * |
2245 * +---------------------+ | +----v----------------------+
2246 * | bookmark Traversal | v | Send Merge Thread |
2247 * | Thread (finds +---------> Merges bookmark, rlt, and |
2248 * | candidate blocks) | | to_ds send records |
2249 * +---------------------+ +----^---------------+------+
2250 * | | Merged data
2251 * | +------------v--------+
2252 * | | Prefetch Thread |
2253 * +--------------------+ | | Issues prefetch |
2254 * | to_ds Traversal | | | reads of data blocks|
2255 * | Thread (finds +---------------+ +------------+--------+
2256 * | candidate blocks) | Blocks modified | Prefetched data
2257 * +--------------------+ by to_ds since +------------v----+
2258 * ancestor_zb | Main Thread | File Descriptor
2259 * | Sends data over +->(to zfs receive)
2260 * | wire |
2261 * +-----------------+
2262 *
2263 * The final case is a simple zfs full or incremental send. The to_ds traversal
2264 * thread behaves the same as always. The redact list thread is never started.
e1cfd73f 2265 * The send merge thread takes all the blocks that the to_ds traversal thread
30af21b0
PD
2266 * sends it, prefetches the data, and sends the blocks on to the main thread.
2267 * The main thread sends the data over the wire.
2268 *
2269 * To keep performance acceptable, we want to prefetch the data in the worker
2270 * threads. While the to_ds thread could simply use the TRAVERSE_PREFETCH
2271 * feature built into traverse_dataset, the combining and deletion of records
2272 * due to redaction and sends from redaction bookmarks mean that we could
2273 * issue many unnecessary prefetches. As a result, we only prefetch data
2274 * after we've determined that the record is not going to be redacted. To
2275 * prevent the prefetching from getting too far ahead of the main thread, the
2276 * blocking queues that are used for communication are capped not by the
2277 * number of entries in the queue, but by the sum of the size of the
2278 * prefetches associated with them. The limit on the amount of data that the
2279 * thread can prefetch beyond what the main thread has reached is controlled
2280 * by the global variable zfs_send_queue_length. In addition, to prevent poor
2281 * performance in the beginning of a send, we also limit the distance ahead
2282 * that the traversal threads can be. That distance is controlled by the
2283 * zfs_send_no_prefetch_queue_length tunable.
2284 *
2285 * Note: Releases dp using the specified tag.
2286 */
2287static int
2288dmu_send_impl(struct dmu_send_params *dspp)
2289{
2290 objset_t *os;
2291 dmu_replay_record_t *drr;
2292 dmu_sendstatus_t *dssp;
2293 dmu_send_cookie_t dsc = {0};
2294 int err;
2295 uint64_t fromtxg = dspp->ancestor_zb.zbm_creation_txg;
2296 uint64_t featureflags = 0;
2297 struct redact_list_thread_arg *from_arg;
2298 struct send_thread_arg *to_arg;
2299 struct redact_list_thread_arg *rlt_arg;
2300 struct send_merge_thread_arg *smt_arg;
2301 struct send_prefetch_thread_arg *spt_arg;
2302 struct send_range *range;
2303 redaction_list_t *from_rl = NULL;
2304 redaction_list_t *redact_rl = NULL;
2305 boolean_t resuming = (dspp->resumeobj != 0 || dspp->resumeoff != 0);
2306 boolean_t book_resuming = resuming;
2307
2308 dsl_dataset_t *to_ds = dspp->to_ds;
2309 zfs_bookmark_phys_t *ancestor_zb = &dspp->ancestor_zb;
2310 dsl_pool_t *dp = dspp->dp;
2311 void *tag = dspp->tag;
2312
2313 err = dmu_objset_from_ds(to_ds, &os);
2314 if (err != 0) {
2315 dsl_pool_rele(dp, tag);
2316 return (err);
2317 }
2318 /*
2319 * If this is a non-raw send of an encrypted ds, we can ensure that
2320 * the objset_phys_t is authenticated. This is safe because this is
2321 * either a snapshot or we have owned the dataset, ensuring that
2322 * it can't be modified.
2323 */
2324 if (!dspp->rawok && os->os_encrypted &&
2325 arc_is_unauthenticated(os->os_phys_buf)) {
2326 zbookmark_phys_t zb;
2327
2328 SET_BOOKMARK(&zb, to_ds->ds_object, ZB_ROOT_OBJECT,
2329 ZB_ROOT_LEVEL, ZB_ROOT_BLKID);
2330 err = arc_untransform(os->os_phys_buf, os->os_spa,
2331 &zb, B_FALSE);
2332 if (err != 0) {
2333 dsl_pool_rele(dp, tag);
2334 return (err);
b5256303
TC
2335 }
2336
30af21b0
PD
2337 ASSERT0(arc_is_unauthenticated(os->os_phys_buf));
2338 }
2339
2340 if ((err = setup_featureflags(dspp, os, &featureflags)) != 0) {
2341 dsl_pool_rele(dp, tag);
2342 return (err);
2343 }
f00ab3f2 2344
30af21b0
PD
2345 from_arg = kmem_zalloc(sizeof (*from_arg), KM_SLEEP);
2346 to_arg = kmem_zalloc(sizeof (*to_arg), KM_SLEEP);
2347 rlt_arg = kmem_zalloc(sizeof (*rlt_arg), KM_SLEEP);
2348 smt_arg = kmem_zalloc(sizeof (*smt_arg), KM_SLEEP);
2349 spt_arg = kmem_zalloc(sizeof (*spt_arg), KM_SLEEP);
b5256303 2350
30af21b0
PD
2351 /*
2352 * If we're doing a redacted send, hold the bookmark's redaction list.
2353 */
2354 if (dspp->redactbook != NULL) {
2355 err = dsl_redaction_list_hold_obj(dp,
2356 dspp->redactbook->zbm_redaction_obj, FTAG,
2357 &redact_rl);
2358 if (err != 0) {
2359 dsl_pool_rele(dp, tag);
2360 return (SET_ERROR(EINVAL));
2361 }
2362 dsl_redaction_list_long_hold(dp, redact_rl, FTAG);
2363 }
2364
2365 /*
2366 * If we're sending from a redaction bookmark, hold the redaction list
2367 * so that we can consider sending the redacted blocks.
2368 */
2369 if (ancestor_zb->zbm_redaction_obj != 0) {
2370 err = dsl_redaction_list_hold_obj(dp,
2371 ancestor_zb->zbm_redaction_obj, FTAG, &from_rl);
2372 if (err != 0) {
2373 if (redact_rl != NULL) {
2374 dsl_redaction_list_long_rele(redact_rl, FTAG);
2375 dsl_redaction_list_rele(redact_rl, FTAG);
b5256303 2376 }
30af21b0
PD
2377 dsl_pool_rele(dp, tag);
2378 return (SET_ERROR(EINVAL));
2379 }
2380 dsl_redaction_list_long_hold(dp, from_rl, FTAG);
2381 }
2382
2383 dsl_dataset_long_hold(to_ds, FTAG);
2384
2385 drr = create_begin_record(dspp, os, featureflags);
2386 dssp = setup_send_progress(dspp);
2387
2388 dsc.dsc_drr = drr;
2389 dsc.dsc_dso = dspp->dso;
2390 dsc.dsc_os = os;
2391 dsc.dsc_off = dspp->off;
2392 dsc.dsc_toguid = dsl_dataset_phys(to_ds)->ds_guid;
2393 dsc.dsc_fromtxg = fromtxg;
2394 dsc.dsc_pending_op = PENDING_NONE;
2395 dsc.dsc_featureflags = featureflags;
2396 dsc.dsc_resume_object = dspp->resumeobj;
2397 dsc.dsc_resume_offset = dspp->resumeoff;
2398
2399 dsl_pool_rele(dp, tag);
2400
2401 void *payload = NULL;
2402 size_t payload_len = 0;
2403 nvlist_t *nvl = fnvlist_alloc();
2404
2405 /*
2406 * If we're doing a redacted send, we include the snapshots we're
2407 * redacted with respect to so that the target system knows what send
2408 * streams can be correctly received on top of this dataset. If we're
2409 * instead sending a redacted dataset, we include the snapshots that the
2410 * dataset was created with respect to.
2411 */
2412 if (dspp->redactbook != NULL) {
2413 fnvlist_add_uint64_array(nvl, BEGINNV_REDACT_SNAPS,
2414 redact_rl->rl_phys->rlp_snaps,
2415 redact_rl->rl_phys->rlp_num_snaps);
2416 } else if (dsl_dataset_feature_is_active(to_ds,
2417 SPA_FEATURE_REDACTED_DATASETS)) {
2418 uint64_t *tods_guids;
2419 uint64_t length;
2420 VERIFY(dsl_dataset_get_uint64_array_feature(to_ds,
2421 SPA_FEATURE_REDACTED_DATASETS, &length, &tods_guids));
2422 fnvlist_add_uint64_array(nvl, BEGINNV_REDACT_SNAPS, tods_guids,
2423 length);
2424 }
2425
2426 /*
2427 * If we're sending from a redaction bookmark, then we should retrieve
2428 * the guids of that bookmark so we can send them over the wire.
2429 */
2430 if (from_rl != NULL) {
2431 fnvlist_add_uint64_array(nvl, BEGINNV_REDACT_FROM_SNAPS,
2432 from_rl->rl_phys->rlp_snaps,
2433 from_rl->rl_phys->rlp_num_snaps);
2434 }
b5256303 2435
30af21b0
PD
2436 /*
2437 * If the snapshot we're sending from is redacted, include the redaction
2438 * list in the stream.
2439 */
2440 if (dspp->numfromredactsnaps != NUM_SNAPS_NOT_REDACTED) {
2441 ASSERT3P(from_rl, ==, NULL);
2442 fnvlist_add_uint64_array(nvl, BEGINNV_REDACT_FROM_SNAPS,
2443 dspp->fromredactsnaps, (uint_t)dspp->numfromredactsnaps);
2444 if (dspp->numfromredactsnaps > 0) {
2445 kmem_free(dspp->fromredactsnaps,
2446 dspp->numfromredactsnaps * sizeof (uint64_t));
2447 dspp->fromredactsnaps = NULL;
b5256303 2448 }
30af21b0
PD
2449 }
2450
2451 if (resuming || book_resuming) {
2452 err = setup_resume_points(dspp, to_arg, from_arg,
2453 rlt_arg, smt_arg, resuming, os, redact_rl, nvl);
2454 if (err != 0)
2455 goto out;
2456 }
2457
2458 if (featureflags & DMU_BACKUP_FEATURE_RAW) {
2459 uint64_t ivset_guid = (ancestor_zb != NULL) ?
2460 ancestor_zb->zbm_ivset_guid : 0;
2461 nvlist_t *keynvl = NULL;
2462 ASSERT(os->os_encrypted);
47dfff3b 2463
30af21b0
PD
2464 err = dsl_crypto_populate_key_nvlist(to_ds, ivset_guid,
2465 &keynvl);
2466 if (err != 0) {
2467 fnvlist_free(nvl);
2468 goto out;
2469 }
2470
2471 fnvlist_add_nvlist(nvl, "crypt_keydata", keynvl);
2472 fnvlist_free(keynvl);
2473 }
2474
2475 if (!nvlist_empty(nvl)) {
47dfff3b
MA
2476 payload = fnvlist_pack(nvl, &payload_len);
2477 drr->drr_payloadlen = payload_len;
47dfff3b
MA
2478 }
2479
30af21b0
PD
2480 fnvlist_free(nvl);
2481 err = dump_record(&dsc, payload, payload_len);
47dfff3b
MA
2482 fnvlist_pack_free(payload, payload_len);
2483 if (err != 0) {
30af21b0 2484 err = dsc.dsc_err;
37abac6d 2485 goto out;
34dc7c2f
BB
2486 }
2487
30af21b0
PD
2488 setup_to_thread(to_arg, to_ds, dssp, fromtxg, dspp->rawok);
2489 setup_from_thread(from_arg, from_rl, dssp);
2490 setup_redact_list_thread(rlt_arg, dspp, redact_rl, dssp);
2491 setup_merge_thread(smt_arg, dspp, from_arg, to_arg, rlt_arg, os);
2492 setup_prefetch_thread(spt_arg, dspp, smt_arg);
fcff0f35 2493
30af21b0
PD
2494 range = bqueue_dequeue(&spt_arg->q);
2495 while (err == 0 && !range->eos_marker) {
2496 err = do_dump(&dsc, range);
2497 range = get_next_range(&spt_arg->q, range);
fcff0f35
PD
2498 if (issig(JUSTLOOKING) && issig(FORREAL))
2499 err = EINTR;
2500 }
2501
30af21b0
PD
2502 /*
2503 * If we hit an error or are interrupted, cancel our worker threads and
2504 * clear the queue of any pending records. The threads will pass the
2505 * cancel up the tree of worker threads, and each one will clean up any
2506 * pending records before exiting.
2507 */
fcff0f35 2508 if (err != 0) {
30af21b0
PD
2509 spt_arg->cancel = B_TRUE;
2510 while (!range->eos_marker) {
2511 range = get_next_range(&spt_arg->q, range);
fcff0f35
PD
2512 }
2513 }
30af21b0 2514 range_free(range);
fcff0f35 2515
30af21b0
PD
2516 bqueue_destroy(&spt_arg->q);
2517 bqueue_destroy(&smt_arg->q);
2518 if (dspp->redactbook != NULL)
2519 bqueue_destroy(&rlt_arg->q);
2520 bqueue_destroy(&to_arg->q);
2521 bqueue_destroy(&from_arg->q);
fcff0f35 2522
30af21b0
PD
2523 if (err == 0 && spt_arg->error != 0)
2524 err = spt_arg->error;
fcff0f35
PD
2525
2526 if (err != 0)
2527 goto out;
34dc7c2f 2528
30af21b0
PD
2529 if (dsc.dsc_pending_op != PENDING_NONE)
2530 if (dump_record(&dsc, NULL, 0) != 0)
2e528b49 2531 err = SET_ERROR(EINTR);
428870ff 2532
13fe0198 2533 if (err != 0) {
30af21b0
PD
2534 if (err == EINTR && dsc.dsc_err != 0)
2535 err = dsc.dsc_err;
37abac6d 2536 goto out;
34dc7c2f
BB
2537 }
2538
2539 bzero(drr, sizeof (dmu_replay_record_t));
2540 drr->drr_type = DRR_END;
30af21b0
PD
2541 drr->drr_u.drr_end.drr_checksum = dsc.dsc_zc;
2542 drr->drr_u.drr_end.drr_toguid = dsc.dsc_toguid;
34dc7c2f 2543
30af21b0
PD
2544 if (dump_record(&dsc, NULL, 0) != 0)
2545 err = dsc.dsc_err;
37abac6d 2546out:
fcff0f35 2547 mutex_enter(&to_ds->ds_sendstream_lock);
30af21b0 2548 list_remove(&to_ds->ds_sendstreams, dssp);
fcff0f35 2549 mutex_exit(&to_ds->ds_sendstream_lock);
37abac6d 2550
30af21b0 2551 VERIFY(err != 0 || (dsc.dsc_sent_begin && dsc.dsc_sent_end));
51907a31 2552
34dc7c2f 2553 kmem_free(drr, sizeof (dmu_replay_record_t));
30af21b0
PD
2554 kmem_free(dssp, sizeof (dmu_sendstatus_t));
2555 kmem_free(from_arg, sizeof (*from_arg));
2556 kmem_free(to_arg, sizeof (*to_arg));
2557 kmem_free(rlt_arg, sizeof (*rlt_arg));
2558 kmem_free(smt_arg, sizeof (*smt_arg));
2559 kmem_free(spt_arg, sizeof (*spt_arg));
34dc7c2f 2560
fcff0f35 2561 dsl_dataset_long_rele(to_ds, FTAG);
30af21b0
PD
2562 if (from_rl != NULL) {
2563 dsl_redaction_list_long_rele(from_rl, FTAG);
2564 dsl_redaction_list_rele(from_rl, FTAG);
2565 }
2566 if (redact_rl != NULL) {
2567 dsl_redaction_list_long_rele(redact_rl, FTAG);
2568 dsl_redaction_list_rele(redact_rl, FTAG);
2569 }
13fe0198 2570
37abac6d 2571 return (err);
34dc7c2f
BB
2572}
2573
330d06f9 2574int
13fe0198 2575dmu_send_obj(const char *pool, uint64_t tosnap, uint64_t fromsnap,
2aa34383 2576 boolean_t embedok, boolean_t large_block_ok, boolean_t compressok,
30af21b0 2577 boolean_t rawok, int outfd, offset_t *off, dmu_send_outparams_t *dsop)
13fe0198 2578{
13fe0198 2579 int err;
30af21b0
PD
2580 dsl_dataset_t *fromds;
2581 ds_hold_flags_t dsflags = (rawok) ? 0 : DS_HOLD_FLAG_DECRYPT;
2582 struct dmu_send_params dspp = {0};
2583 dspp.embedok = embedok;
2584 dspp.large_block_ok = large_block_ok;
2585 dspp.compressok = compressok;
2586 dspp.outfd = outfd;
2587 dspp.off = off;
2588 dspp.dso = dsop;
2589 dspp.tag = FTAG;
2590 dspp.rawok = rawok;
2591
2592 err = dsl_pool_hold(pool, FTAG, &dspp.dp);
13fe0198
MA
2593 if (err != 0)
2594 return (err);
2595
30af21b0
PD
2596 err = dsl_dataset_hold_obj_flags(dspp.dp, tosnap, dsflags, FTAG,
2597 &dspp.to_ds);
13fe0198 2598 if (err != 0) {
30af21b0 2599 dsl_pool_rele(dspp.dp, FTAG);
13fe0198
MA
2600 return (err);
2601 }
2602
2603 if (fromsnap != 0) {
30af21b0
PD
2604 err = dsl_dataset_hold_obj_flags(dspp.dp, fromsnap, dsflags,
2605 FTAG, &fromds);
13fe0198 2606 if (err != 0) {
30af21b0
PD
2607 dsl_dataset_rele_flags(dspp.to_ds, dsflags, FTAG);
2608 dsl_pool_rele(dspp.dp, FTAG);
13fe0198
MA
2609 return (err);
2610 }
30af21b0
PD
2611 dspp.ancestor_zb.zbm_guid = dsl_dataset_phys(fromds)->ds_guid;
2612 dspp.ancestor_zb.zbm_creation_txg =
2613 dsl_dataset_phys(fromds)->ds_creation_txg;
2614 dspp.ancestor_zb.zbm_creation_time =
d683ddbb 2615 dsl_dataset_phys(fromds)->ds_creation_time;
f00ab3f2
TC
2616
2617 if (dsl_dataset_is_zapified(fromds)) {
30af21b0 2618 (void) zap_lookup(dspp.dp->dp_meta_objset,
f00ab3f2 2619 fromds->ds_object, DS_FIELD_IVSET_GUID, 8, 1,
30af21b0
PD
2620 &dspp.ancestor_zb.zbm_ivset_guid);
2621 }
2622
2623 /* See dmu_send for the reasons behind this. */
2624 uint64_t *fromredact;
2625
2626 if (!dsl_dataset_get_uint64_array_feature(fromds,
2627 SPA_FEATURE_REDACTED_DATASETS,
2628 &dspp.numfromredactsnaps,
2629 &fromredact)) {
2630 dspp.numfromredactsnaps = NUM_SNAPS_NOT_REDACTED;
2631 } else if (dspp.numfromredactsnaps > 0) {
2632 uint64_t size = dspp.numfromredactsnaps *
2633 sizeof (uint64_t);
2634 dspp.fromredactsnaps = kmem_zalloc(size, KM_SLEEP);
2635 bcopy(fromredact, dspp.fromredactsnaps, size);
f00ab3f2
TC
2636 }
2637
30af21b0
PD
2638 if (!dsl_dataset_is_before(dspp.to_ds, fromds, 0)) {
2639 err = SET_ERROR(EXDEV);
2640 } else {
2641 dspp.is_clone = (dspp.to_ds->ds_dir !=
2642 fromds->ds_dir);
2643 dsl_dataset_rele(fromds, FTAG);
2644 err = dmu_send_impl(&dspp);
2645 }
da536844 2646 } else {
30af21b0
PD
2647 dspp.numfromredactsnaps = NUM_SNAPS_NOT_REDACTED;
2648 err = dmu_send_impl(&dspp);
13fe0198 2649 }
30af21b0 2650 dsl_dataset_rele(dspp.to_ds, FTAG);
da536844 2651 return (err);
13fe0198
MA
2652}
2653
2654int
47dfff3b 2655dmu_send(const char *tosnap, const char *fromsnap, boolean_t embedok,
b5256303 2656 boolean_t large_block_ok, boolean_t compressok, boolean_t rawok,
30af21b0
PD
2657 uint64_t resumeobj, uint64_t resumeoff, const char *redactbook, int outfd,
2658 offset_t *off, dmu_send_outparams_t *dsop)
13fe0198 2659{
30af21b0 2660 int err = 0;
b5256303 2661 ds_hold_flags_t dsflags = (rawok) ? 0 : DS_HOLD_FLAG_DECRYPT;
da536844 2662 boolean_t owned = B_FALSE;
30af21b0
PD
2663 dsl_dataset_t *fromds = NULL;
2664 zfs_bookmark_phys_t book = {0};
2665 struct dmu_send_params dspp = {0};
2666 dspp.tosnap = tosnap;
2667 dspp.embedok = embedok;
2668 dspp.large_block_ok = large_block_ok;
2669 dspp.compressok = compressok;
2670 dspp.outfd = outfd;
2671 dspp.off = off;
2672 dspp.dso = dsop;
2673 dspp.tag = FTAG;
2674 dspp.resumeobj = resumeobj;
2675 dspp.resumeoff = resumeoff;
2676 dspp.rawok = rawok;
13fe0198 2677
da536844 2678 if (fromsnap != NULL && strpbrk(fromsnap, "@#") == NULL)
2e528b49 2679 return (SET_ERROR(EINVAL));
13fe0198 2680
30af21b0 2681 err = dsl_pool_hold(tosnap, FTAG, &dspp.dp);
13fe0198
MA
2682 if (err != 0)
2683 return (err);
30af21b0 2684 if (strchr(tosnap, '@') == NULL && spa_writeable(dspp.dp->dp_spa)) {
da536844
MA
2685 /*
2686 * We are sending a filesystem or volume. Ensure
2687 * that it doesn't change by owning the dataset.
2688 */
30af21b0
PD
2689 err = dsl_dataset_own(dspp.dp, tosnap, dsflags, FTAG,
2690 &dspp.to_ds);
da536844
MA
2691 owned = B_TRUE;
2692 } else {
30af21b0
PD
2693 err = dsl_dataset_hold_flags(dspp.dp, tosnap, dsflags, FTAG,
2694 &dspp.to_ds);
2695 }
2696
2697 if (err != 0) {
2698 dsl_pool_rele(dspp.dp, FTAG);
2699 return (err);
2700 }
2701
2702 if (redactbook != NULL) {
2703 char path[ZFS_MAX_DATASET_NAME_LEN];
2704 (void) strlcpy(path, tosnap, sizeof (path));
2705 char *at = strchr(path, '@');
2706 if (at == NULL) {
2707 err = EINVAL;
2708 } else {
2709 (void) snprintf(at, sizeof (path) - (at - path), "#%s",
2710 redactbook);
2711 err = dsl_bookmark_lookup(dspp.dp, path,
2712 NULL, &book);
2713 dspp.redactbook = &book;
2714 }
da536844 2715 }
30af21b0 2716
13fe0198 2717 if (err != 0) {
30af21b0
PD
2718 dsl_pool_rele(dspp.dp, FTAG);
2719 if (owned)
2720 dsl_dataset_disown(dspp.to_ds, dsflags, FTAG);
2721 else
2722 dsl_dataset_rele_flags(dspp.to_ds, dsflags, FTAG);
13fe0198
MA
2723 return (err);
2724 }
2725
2726 if (fromsnap != NULL) {
30af21b0
PD
2727 zfs_bookmark_phys_t *zb = &dspp.ancestor_zb;
2728 int fsnamelen;
2729 if (strpbrk(tosnap, "@#") != NULL)
2730 fsnamelen = strpbrk(tosnap, "@#") - tosnap;
2731 else
2732 fsnamelen = strlen(tosnap);
da536844
MA
2733
2734 /*
2735 * If the fromsnap is in a different filesystem, then
2736 * mark the send stream as a clone.
2737 */
2738 if (strncmp(tosnap, fromsnap, fsnamelen) != 0 ||
2739 (fromsnap[fsnamelen] != '@' &&
2740 fromsnap[fsnamelen] != '#')) {
30af21b0 2741 dspp.is_clone = B_TRUE;
da536844
MA
2742 }
2743
30af21b0
PD
2744 if (strchr(fromsnap, '@') != NULL) {
2745 err = dsl_dataset_hold(dspp.dp, fromsnap, FTAG,
2746 &fromds);
2747
2748 if (err != 0) {
2749 ASSERT3P(fromds, ==, NULL);
2750 } else {
2751 /*
2752 * We need to make a deep copy of the redact
2753 * snapshots of the from snapshot, because the
2754 * array will be freed when we evict from_ds.
2755 */
2756 uint64_t *fromredact;
2757 if (!dsl_dataset_get_uint64_array_feature(
2758 fromds, SPA_FEATURE_REDACTED_DATASETS,
2759 &dspp.numfromredactsnaps,
2760 &fromredact)) {
2761 dspp.numfromredactsnaps =
2762 NUM_SNAPS_NOT_REDACTED;
2763 } else if (dspp.numfromredactsnaps > 0) {
2764 uint64_t size =
2765 dspp.numfromredactsnaps *
2766 sizeof (uint64_t);
2767 dspp.fromredactsnaps = kmem_zalloc(size,
2768 KM_SLEEP);
2769 bcopy(fromredact, dspp.fromredactsnaps,
2770 size);
2771 }
2772 if (!dsl_dataset_is_before(dspp.to_ds, fromds,
2773 0)) {
da536844 2774 err = SET_ERROR(EXDEV);
30af21b0
PD
2775 } else {
2776 ASSERT3U(dspp.is_clone, ==,
2777 (dspp.to_ds->ds_dir !=
2778 fromds->ds_dir));
2779 zb->zbm_creation_txg =
2780 dsl_dataset_phys(fromds)->
2781 ds_creation_txg;
2782 zb->zbm_creation_time =
2783 dsl_dataset_phys(fromds)->
2784 ds_creation_time;
2785 zb->zbm_guid =
2786 dsl_dataset_phys(fromds)->ds_guid;
2787 zb->zbm_redaction_obj = 0;
2788
2789 if (dsl_dataset_is_zapified(fromds)) {
2790 (void) zap_lookup(
2791 dspp.dp->dp_meta_objset,
2792 fromds->ds_object,
2793 DS_FIELD_IVSET_GUID, 8, 1,
2794 &zb->zbm_ivset_guid);
2795 }
f00ab3f2 2796 }
da536844
MA
2797 dsl_dataset_rele(fromds, FTAG);
2798 }
2799 } else {
30af21b0
PD
2800 dspp.numfromredactsnaps = NUM_SNAPS_NOT_REDACTED;
2801 err = dsl_bookmark_lookup(dspp.dp, fromsnap, dspp.to_ds,
2802 zb);
2803 if (err == EXDEV && zb->zbm_redaction_obj != 0 &&
2804 zb->zbm_guid ==
2805 dsl_dataset_phys(dspp.to_ds)->ds_guid)
2806 err = 0;
da536844 2807 }
b5256303 2808
30af21b0
PD
2809 if (err == 0) {
2810 /* dmu_send_impl will call dsl_pool_rele for us. */
2811 err = dmu_send_impl(&dspp);
2812 } else {
2813 dsl_pool_rele(dspp.dp, FTAG);
13fe0198 2814 }
da536844 2815 } else {
30af21b0
PD
2816 dspp.numfromredactsnaps = NUM_SNAPS_NOT_REDACTED;
2817 err = dmu_send_impl(&dspp);
13fe0198 2818 }
da536844 2819 if (owned)
30af21b0 2820 dsl_dataset_disown(dspp.to_ds, dsflags, FTAG);
da536844 2821 else
30af21b0 2822 dsl_dataset_rele_flags(dspp.to_ds, dsflags, FTAG);
da536844 2823 return (err);
13fe0198
MA
2824}
2825
5dc8b736 2826static int
2aa34383
DK
2827dmu_adjust_send_estimate_for_indirects(dsl_dataset_t *ds, uint64_t uncompressed,
2828 uint64_t compressed, boolean_t stream_compressed, uint64_t *sizep)
5dc8b736 2829{
ca0845d5 2830 int err = 0;
2aa34383 2831 uint64_t size;
5dc8b736
MG
2832 /*
2833 * Assume that space (both on-disk and in-stream) is dominated by
2834 * data. We will adjust for indirect blocks and the copies property,
2835 * but ignore per-object space used (eg, dnodes and DRR_OBJECT records).
2836 */
2837
2aa34383
DK
2838 uint64_t recordsize;
2839 uint64_t record_count;
dd429b46
PD
2840 objset_t *os;
2841 VERIFY0(dmu_objset_from_ds(ds, &os));
2aa34383
DK
2842
2843 /* Assume all (uncompressed) blocks are recordsize. */
ca0845d5
PD
2844 if (zfs_override_estimate_recordsize != 0) {
2845 recordsize = zfs_override_estimate_recordsize;
2846 } else if (os->os_phys->os_type == DMU_OST_ZVOL) {
dd429b46
PD
2847 err = dsl_prop_get_int_ds(ds,
2848 zfs_prop_to_name(ZFS_PROP_VOLBLOCKSIZE), &recordsize);
2849 } else {
2850 err = dsl_prop_get_int_ds(ds,
2851 zfs_prop_to_name(ZFS_PROP_RECORDSIZE), &recordsize);
2852 }
2aa34383
DK
2853 if (err != 0)
2854 return (err);
2855 record_count = uncompressed / recordsize;
2856
2857 /*
2858 * If we're estimating a send size for a compressed stream, use the
2859 * compressed data size to estimate the stream size. Otherwise, use the
2860 * uncompressed data size.
2861 */
2862 size = stream_compressed ? compressed : uncompressed;
2863
5dc8b736
MG
2864 /*
2865 * Subtract out approximate space used by indirect blocks.
2866 * Assume most space is used by data blocks (non-indirect, non-dnode).
2aa34383 2867 * Assume no ditto blocks or internal fragmentation.
5dc8b736
MG
2868 *
2869 * Therefore, space used by indirect blocks is sizeof(blkptr_t) per
2aa34383 2870 * block.
5dc8b736 2871 */
2aa34383 2872 size -= record_count * sizeof (blkptr_t);
5dc8b736
MG
2873
2874 /* Add in the space for the record associated with each block. */
2aa34383 2875 size += record_count * sizeof (dmu_replay_record_t);
5dc8b736
MG
2876
2877 *sizep = size;
2878
2879 return (0);
2880}
2881
13fe0198 2882int
30af21b0
PD
2883dmu_send_estimate_fast(dsl_dataset_t *ds, dsl_dataset_t *fromds,
2884 zfs_bookmark_phys_t *frombook, boolean_t stream_compressed, uint64_t *sizep)
330d06f9 2885{
330d06f9 2886 int err;
2aa34383 2887 uint64_t uncomp, comp;
13fe0198 2888
fd0fd646 2889 ASSERT(dsl_pool_config_held(ds->ds_dir->dd_pool));
30af21b0 2890 ASSERT(fromds == NULL || frombook == NULL);
330d06f9
MA
2891
2892 /* tosnap must be a snapshot */
0c66c32d 2893 if (!ds->ds_is_snapshot)
2e528b49 2894 return (SET_ERROR(EINVAL));
330d06f9 2895
30af21b0
PD
2896 if (fromds != NULL) {
2897 uint64_t used;
2898 if (!fromds->ds_is_snapshot)
2899 return (SET_ERROR(EINVAL));
71e2fe41 2900
30af21b0
PD
2901 if (!dsl_dataset_is_before(ds, fromds, 0))
2902 return (SET_ERROR(EXDEV));
330d06f9 2903
30af21b0
PD
2904 err = dsl_dataset_space_written(fromds, ds, &used, &comp,
2905 &uncomp);
2906 if (err != 0)
2907 return (err);
2908 } else if (frombook != NULL) {
2aa34383 2909 uint64_t used;
30af21b0
PD
2910 err = dsl_dataset_space_written_bookmark(frombook, ds, &used,
2911 &comp, &uncomp);
13fe0198 2912 if (err != 0)
330d06f9 2913 return (err);
30af21b0
PD
2914 } else {
2915 uncomp = dsl_dataset_phys(ds)->ds_uncompressed_bytes;
2916 comp = dsl_dataset_phys(ds)->ds_compressed_bytes;
330d06f9
MA
2917 }
2918
2aa34383
DK
2919 err = dmu_adjust_send_estimate_for_indirects(ds, uncomp, comp,
2920 stream_compressed, sizep);
dd429b46
PD
2921 /*
2922 * Add the size of the BEGIN and END records to the estimate.
2923 */
2924 *sizep += 2 * sizeof (dmu_replay_record_t);
5dc8b736
MG
2925 return (err);
2926}
330d06f9 2927
03916905 2928#if defined(_KERNEL)
03916905
PD
2929module_param(zfs_send_corrupt_data, int, 0644);
2930MODULE_PARM_DESC(zfs_send_corrupt_data, "Allow sending corrupt data");
3b0d9928
BB
2931
2932module_param(zfs_send_queue_length, int, 0644);
2933MODULE_PARM_DESC(zfs_send_queue_length, "Maximum send queue length");
caf9dd20
BB
2934
2935module_param(zfs_send_unmodified_spill_blocks, int, 0644);
2936MODULE_PARM_DESC(zfs_send_unmodified_spill_blocks,
2937 "Send unmodified spill blocks");
30af21b0
PD
2938
2939module_param(zfs_send_no_prefetch_queue_length, int, 0644);
2940MODULE_PARM_DESC(zfs_send_no_prefetch_queue_length,
2941 "Maximum send queue length for non-prefetch queues");
2942
2943module_param(zfs_send_queue_ff, int, 0644);
2944MODULE_PARM_DESC(zfs_send_queue_ff, "Send queue fill fraction");
2945
2946module_param(zfs_send_no_prefetch_queue_ff, int, 0644);
2947MODULE_PARM_DESC(zfs_send_no_prefetch_queue_ff,
2948 "Send queue fill fraction for non-prefetch queues");
2949
2950module_param(zfs_override_estimate_recordsize, int, 0644);
2951MODULE_PARM_DESC(zfs_override_estimate_recordsize,
2952 "Override block size estimate with fixed size");
fd8febbd 2953#endif