]> git.proxmox.com Git - mirror_zfs.git/blobdiff - module/zfs/dmu_send.c
Don't panic on unencrypted block in encrypted dataset
[mirror_zfs.git] / module / zfs / dmu_send.c
index 884be31bd2260e8d905c3fcaaddb80cb28d344bc..37c68528bf95d18a8719cbc1f6f05837ba9caa03 100644 (file)
@@ -6,7 +6,7 @@
  * You may not use this file except in compliance with the License.
  *
  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
- * or http://www.opensolaris.org/os/licensing.
+ * or https://opensource.org/licenses/CDDL-1.0.
  * See the License for the specific language governing permissions
  * and limitations under the License.
  *
@@ -26,6 +26,8 @@
  * Copyright 2014 HybridCluster. All rights reserved.
  * Copyright 2016 RackTop Systems.
  * Copyright (c) 2016 Actifio, Inc. All rights reserved.
+ * Copyright (c) 2019, Klara Inc.
+ * Copyright (c) 2019, Allan Jude
  */
 
 #include <sys/dmu.h>
@@ -51,6 +53,7 @@
 #include <sys/ddt.h>
 #include <sys/zfs_onexit.h>
 #include <sys/dmu_send.h>
+#include <sys/dmu_recv.h>
 #include <sys/dsl_destroy.h>
 #include <sys/blkptr.h>
 #include <sys/dsl_bookmark.h>
@@ -64,7 +67,7 @@
 #endif
 
 /* Set this tunable to TRUE to replace corrupt data with 0x2f5baddb10c */
-int zfs_send_corrupt_data = B_FALSE;
+static int zfs_send_corrupt_data = B_FALSE;
 /*
  * This tunable controls the amount of data (measured in bytes) that will be
  * prefetched by zfs send.  If the main thread is blocking on reads that haven't
@@ -72,7 +75,7 @@ int zfs_send_corrupt_data = B_FALSE;
  * thread is issuing new reads because the prefetches have fallen out of the
  * cache, this may need to be decreased.
  */
-int zfs_send_queue_length = SPA_MAXBLOCKSIZE;
+static uint_t zfs_send_queue_length = SPA_MAXBLOCKSIZE;
 /*
  * This tunable controls the length of the queues that zfs send worker threads
  * use to communicate.  If the send_main_thread is blocking on these queues,
@@ -80,7 +83,7 @@ int zfs_send_queue_length = SPA_MAXBLOCKSIZE;
  * at the start of a send as these threads consume all the available IO
  * resources, this variable may need to be decreased.
  */
-int zfs_send_no_prefetch_queue_length = 1024 * 1024;
+static uint_t zfs_send_no_prefetch_queue_length = 1024 * 1024;
 /*
  * These tunables control the fill fraction of the queues by zfs send.  The fill
  * fraction controls the frequency with which threads have to be cv_signaled.
@@ -88,19 +91,19 @@ int zfs_send_no_prefetch_queue_length = 1024 * 1024;
  * down.  If the queues empty before the signalled thread can catch up, then
  * these should be tuned up.
  */
-int zfs_send_queue_ff = 20;
-int zfs_send_no_prefetch_queue_ff = 20;
+static uint_t zfs_send_queue_ff = 20;
+static uint_t zfs_send_no_prefetch_queue_ff = 20;
 
 /*
  * Use this to override the recordsize calculation for fast zfs send estimates.
  */
-int zfs_override_estimate_recordsize = 0;
+static uint_t zfs_override_estimate_recordsize = 0;
 
 /* Set this tunable to FALSE to disable setting of DRR_FLAG_FREERECORDS */
-int zfs_send_set_freerecords_bit = B_TRUE;
+static const boolean_t zfs_send_set_freerecords_bit = B_TRUE;
 
 /* Set this tunable to FALSE is disable sending unmodified spill blocks. */
-int zfs_send_unmodified_spill_blocks = B_TRUE;
+static int zfs_send_unmodified_spill_blocks = B_TRUE;
 
 static inline boolean_t
 overflow_multiply(uint64_t a, uint64_t b, uint64_t *c)
@@ -112,28 +115,14 @@ overflow_multiply(uint64_t a, uint64_t b, uint64_t *c)
        return (B_TRUE);
 }
 
-/*
- * Return B_TRUE and modifies *out to the span if the span is less than 2^64,
- * returns B_FALSE otherwise.
- */
-static inline boolean_t
-bp_span(uint32_t datablksz, uint8_t indblkshift, uint64_t level, uint64_t *out)
-{
-       uint64_t spanb = bp_span_in_blocks(indblkshift, level);
-       return (overflow_multiply(spanb, datablksz, out));
-}
-
 struct send_thread_arg {
        bqueue_t        q;
-       dsl_dataset_t   *ds;            /* Dataset to traverse */
-       redaction_list_t *redaction_list;
-       struct send_redact_record *current_record;
+       objset_t        *os;            /* Objset to traverse */
        uint64_t        fromtxg;        /* Traverse from this txg */
        int             flags;          /* flags to pass to traverse_dataset */
        int             error_code;
        boolean_t       cancel;
        zbookmark_phys_t resume;
-       objlist_t       *deleted_objs;
        uint64_t        *num_blocks_visited;
 };
 
@@ -147,23 +136,6 @@ struct redact_list_thread_arg {
        uint64_t                *num_blocks_visited;
 };
 
-/*
- * A wrapper around struct redact_block so it can be stored in a list_t.
- */
-struct redact_block_list_node {
-       redact_block_phys_t     block;
-       list_node_t             node;
-};
-
-struct redact_bookmark_info {
-       redact_block_phys_t     rbi_furthest[TXG_SIZE];
-       /* Lists of struct redact_block_list_node. */
-       list_t                  rbi_blocks[TXG_SIZE];
-       boolean_t               rbi_synctasc_txg[TXG_SIZE];
-       uint64_t                rbi_latest_synctask_txg;
-       redaction_list_t        *rbi_redaction_list;
-};
-
 struct send_merge_thread_arg {
        bqueue_t                        q;
        objset_t                        *os;
@@ -172,18 +144,6 @@ struct send_merge_thread_arg {
        struct redact_list_thread_arg   *redact_arg;
        int                             error;
        boolean_t                       cancel;
-       struct redact_bookmark_info     rbi;
-       /*
-        * If we're resuming a redacted send, then the object/offset from the
-        * resume token may be different from the object/offset that we have
-        * updated the bookmark to.  resume_redact_zb will store the earlier of
-        * the two object/offset pairs, and bookmark_before will be B_TRUE if
-        * resume_redact_zb has the object/offset for resuming the redaction
-        * bookmark, and B_FALSE if resume_redact_zb is storing the
-        * object/offset from the resume token.
-        */
-       zbookmark_phys_t                resume_redact_zb;
-       boolean_t                       bookmark_before;
 };
 
 struct send_range {
@@ -197,8 +157,16 @@ struct send_range {
        union {
                struct srd {
                        dmu_object_type_t       obj_type;
-                       uint32_t                datablksz;
+                       uint32_t                datablksz; // logical size
+                       uint32_t                datasz; // payload size
                        blkptr_t                bp;
+                       arc_buf_t               *abuf;
+                       abd_t                   *abd;
+                       kmutex_t                lock;
+                       kcondvar_t              cv;
+                       boolean_t               io_outstanding;
+                       boolean_t               io_compressed;
+                       int                     io_err;
                } data;
                struct srh {
                        uint32_t                datablksz;
@@ -263,6 +231,20 @@ range_free(struct send_range *range)
                size_t size = sizeof (dnode_phys_t) *
                    (range->sru.object.dnp->dn_extra_slots + 1);
                kmem_free(range->sru.object.dnp, size);
+       } else if (range->type == DATA) {
+               mutex_enter(&range->sru.data.lock);
+               while (range->sru.data.io_outstanding)
+                       cv_wait(&range->sru.data.cv, &range->sru.data.lock);
+               if (range->sru.data.abd != NULL)
+                       abd_free(range->sru.data.abd);
+               if (range->sru.data.abuf != NULL) {
+                       arc_buf_destroy(range->sru.data.abuf,
+                           &range->sru.data.abuf);
+               }
+               mutex_exit(&range->sru.data.lock);
+
+               cv_destroy(&range->sru.data.cv);
+               mutex_destroy(&range->sru.data.lock);
        }
        kmem_free(range, sizeof (*range));
 }
@@ -302,7 +284,7 @@ dump_record(dmu_send_cookie_t *dscp, void *payload, int payload_len)
        if (payload_len != 0) {
                *dscp->dsc_off += payload_len;
                /*
-                * payload is null when dso->ryrun == B_TRUE (i.e. when we're
+                * payload is null when dso_dryrun == B_TRUE (i.e. when we're
                 * doing a send size calculation)
                 */
                if (payload != NULL) {
@@ -397,7 +379,7 @@ dump_free(dmu_send_cookie_t *dscp, uint64_t object, uint64_t offset,
                }
        }
        /* create a FREE record and make it pending */
-       bzero(dscp->dsc_drr, sizeof (dmu_replay_record_t));
+       memset(dscp->dsc_drr, 0, sizeof (dmu_replay_record_t));
        dscp->dsc_drr->drr_type = DRR_FREE;
        drrf->drr_object = object;
        drrf->drr_offset = offset;
@@ -456,7 +438,7 @@ dump_redact(dmu_send_cookie_t *dscp, uint64_t object, uint64_t offset,
                }
        }
        /* create a REDACT record and make it pending */
-       bzero(dscp->dsc_drr, sizeof (dmu_replay_record_t));
+       memset(dscp->dsc_drr, 0, sizeof (dmu_replay_record_t));
        dscp->dsc_drr->drr_type = DRR_REDACT;
        drrr->drr_object = object;
        drrr->drr_offset = offset;
@@ -468,8 +450,9 @@ dump_redact(dmu_send_cookie_t *dscp, uint64_t object, uint64_t offset,
 }
 
 static int
-dump_write(dmu_send_cookie_t *dscp, dmu_object_type_t type, uint64_t object,
-    uint64_t offset, int lsize, int psize, const blkptr_t *bp, void *data)
+dmu_dump_write(dmu_send_cookie_t *dscp, dmu_object_type_t type, uint64_t object,
+    uint64_t offset, int lsize, int psize, const blkptr_t *bp,
+    boolean_t io_compressed, void *data)
 {
        uint64_t payload_size;
        boolean_t raw = (dscp->dsc_featureflags & DMU_BACKUP_FEATURE_RAW);
@@ -497,7 +480,7 @@ dump_write(dmu_send_cookie_t *dscp, dmu_object_type_t type, uint64_t object,
                dscp->dsc_pending_op = PENDING_NONE;
        }
        /* write a WRITE record */
-       bzero(dscp->dsc_drr, sizeof (dmu_replay_record_t));
+       memset(dscp->dsc_drr, 0, sizeof (dmu_replay_record_t));
        dscp->dsc_drr->drr_type = DRR_WRITE;
        drrw->drr_object = object;
        drrw->drr_type = type;
@@ -506,7 +489,11 @@ dump_write(dmu_send_cookie_t *dscp, dmu_object_type_t type, uint64_t object,
        drrw->drr_logical_size = lsize;
 
        /* only set the compression fields if the buf is compressed or raw */
-       if (raw || lsize != psize) {
+       boolean_t compressed =
+           (bp != NULL ? BP_GET_COMPRESS(bp) != ZIO_COMPRESS_OFF &&
+           io_compressed : lsize != psize);
+       if (raw || compressed) {
+               ASSERT(bp != NULL);
                ASSERT(raw || dscp->dsc_featureflags &
                    DMU_BACKUP_FEATURE_COMPRESSED);
                ASSERT(!BP_IS_EMBEDDED(bp));
@@ -548,7 +535,7 @@ dump_write(dmu_send_cookie_t *dscp, dmu_object_type_t type, uint64_t object,
                /*
                 * There's no pre-computed checksum for partial-block writes,
                 * embedded BP's, or encrypted BP's that are being sent as
-                * plaintext, so (like fletcher4-checkummed blocks) userland
+                * plaintext, so (like fletcher4-checksummed blocks) userland
                 * will have to compute a dedup-capable checksum itself.
                 */
                drrw->drr_checksumtype = ZIO_CHECKSUM_OFF;
@@ -585,7 +572,7 @@ dump_write_embedded(dmu_send_cookie_t *dscp, uint64_t object, uint64_t offset,
 
        ASSERT(BP_IS_EMBEDDED(bp));
 
-       bzero(dscp->dsc_drr, sizeof (dmu_replay_record_t));
+       memset(dscp->dsc_drr, 0, sizeof (dmu_replay_record_t));
        dscp->dsc_drr->drr_type = DRR_WRITE_EMBEDDED;
        drrw->drr_object = object;
        drrw->drr_offset = offset;
@@ -598,7 +585,13 @@ dump_write_embedded(dmu_send_cookie_t *dscp, uint64_t object, uint64_t offset,
 
        decode_embedded_bp_compressed(bp, buf);
 
-       if (dump_record(dscp, buf, P2ROUNDUP(drrw->drr_psize, 8)) != 0)
+       uint32_t psize = drrw->drr_psize;
+       uint32_t rsize = P2ROUNDUP(psize, 8);
+
+       if (psize != rsize)
+               memset(buf + psize, 0, rsize - psize);
+
+       if (dump_record(dscp, buf, rsize) != 0)
                return (SET_ERROR(EINTR));
        return (0);
 }
@@ -618,7 +611,7 @@ dump_spill(dmu_send_cookie_t *dscp, const blkptr_t *bp, uint64_t object,
        }
 
        /* write a SPILL record */
-       bzero(dscp->dsc_drr, sizeof (dmu_replay_record_t));
+       memset(dscp->dsc_drr, 0, sizeof (dmu_replay_record_t));
        dscp->dsc_drr->drr_type = DRR_SPILL;
        drrs->drr_object = object;
        drrs->drr_length = blksz;
@@ -662,7 +655,7 @@ dump_freeobjects(dmu_send_cookie_t *dscp, uint64_t firstobj, uint64_t numobjs)
         * receiving side.
         */
        if (maxobj > 0) {
-               if (maxobj < firstobj)
+               if (maxobj <= firstobj)
                        return (0);
 
                if (maxobj < firstobj + numobjs)
@@ -682,8 +675,6 @@ dump_freeobjects(dmu_send_cookie_t *dscp, uint64_t firstobj, uint64_t numobjs)
                        return (SET_ERROR(EINTR));
                dscp->dsc_pending_op = PENDING_NONE;
        }
-       if (numobjs == 0)
-               numobjs = UINT64_MAX - firstobj;
 
        if (dscp->dsc_pending_op == PENDING_FREEOBJECTS) {
                /*
@@ -702,7 +693,7 @@ dump_freeobjects(dmu_send_cookie_t *dscp, uint64_t firstobj, uint64_t numobjs)
        }
 
        /* write a FREEOBJECTS record */
-       bzero(dscp->dsc_drr, sizeof (dmu_replay_record_t));
+       memset(dscp->dsc_drr, 0, sizeof (dmu_replay_record_t));
        dscp->dsc_drr->drr_type = DRR_FREEOBJECTS;
        drrfo->drr_firstobj = firstobj;
        drrfo->drr_numobjs = numobjs;
@@ -743,7 +734,7 @@ dump_dnode(dmu_send_cookie_t *dscp, const blkptr_t *bp, uint64_t object,
        }
 
        /* write an OBJECT record */
-       bzero(dscp->dsc_drr, sizeof (dmu_replay_record_t));
+       memset(dscp->dsc_drr, 0, sizeof (dmu_replay_record_t));
        dscp->dsc_drr->drr_type = DRR_OBJECT;
        drro->drr_object = object;
        drro->drr_type = dnp->dn_type;
@@ -779,6 +770,8 @@ dump_dnode(dmu_send_cookie_t *dscp, const blkptr_t *bp, uint64_t object,
                 * to send it.
                 */
                if (bonuslen != 0) {
+                       if (drro->drr_bonuslen > DN_MAX_BONUS_LEN(dnp))
+                               return (SET_ERROR(EINVAL));
                        drro->drr_raw_bonuslen = DN_MAX_BONUS_LEN(dnp);
                        bonuslen = drro->drr_raw_bonuslen;
                }
@@ -815,7 +808,7 @@ dump_dnode(dmu_send_cookie_t *dscp, const blkptr_t *bp, uint64_t object,
                struct send_range record;
                blkptr_t *bp = DN_SPILL_BLKPTR(dnp);
 
-               bzero(&record, sizeof (struct send_range));
+               memset(&record, 0, sizeof (struct send_range));
                record.type = DATA;
                record.object = object;
                record.eos_marker = B_FALSE;
@@ -855,7 +848,7 @@ dump_object_range(dmu_send_cookie_t *dscp, const blkptr_t *bp,
                dscp->dsc_pending_op = PENDING_NONE;
        }
 
-       bzero(dscp->dsc_drr, sizeof (dmu_replay_record_t));
+       memset(dscp->dsc_drr, 0, sizeof (dmu_replay_record_t));
        dscp->dsc_drr->drr_type = DRR_OBJECT_RANGE;
        drror->drr_firstobj = firstobj;
        drror->drr_numslots = numslots;
@@ -871,7 +864,7 @@ dump_object_range(dmu_send_cookie_t *dscp, const blkptr_t *bp,
 }
 
 static boolean_t
-send_do_embed(dmu_send_cookie_t *dscp, const blkptr_t *bp)
+send_do_embed(const blkptr_t *bp, uint64_t featureflags)
 {
        if (!BP_IS_EMBEDDED(bp))
                return (B_FALSE);
@@ -880,7 +873,15 @@ send_do_embed(dmu_send_cookie_t *dscp, const blkptr_t *bp)
         * Compression function must be legacy, or explicitly enabled.
         */
        if ((BP_GET_COMPRESS(bp) >= ZIO_COMPRESS_LEGACY_FUNCTIONS &&
-           !(dscp->dsc_featureflags & DMU_BACKUP_FEATURE_LZ4)))
+           !(featureflags & DMU_BACKUP_FEATURE_LZ4)))
+               return (B_FALSE);
+
+       /*
+        * If we have not set the ZSTD feature flag, we can't send ZSTD
+        * compressed embedded blocks, as the receiver may not support them.
+        */
+       if ((BP_GET_COMPRESS(bp) == ZIO_COMPRESS_ZSTD &&
+           !(featureflags & DMU_BACKUP_FEATURE_ZSTD)))
                return (B_FALSE);
 
        /*
@@ -888,7 +889,7 @@ send_do_embed(dmu_send_cookie_t *dscp, const blkptr_t *bp)
         */
        switch (BPE_GET_ETYPE(bp)) {
        case BP_EMBEDDED_TYPE_DATA:
-               if (dscp->dsc_featureflags & DMU_BACKUP_FEATURE_EMBED_DATA)
+               if (featureflags & DMU_BACKUP_FEATURE_EMBED_DATA)
                        return (B_TRUE);
                break;
        default:
@@ -899,8 +900,8 @@ send_do_embed(dmu_send_cookie_t *dscp, const blkptr_t *bp)
 
 /*
  * This function actually handles figuring out what kind of record needs to be
- * dumped, reading the data (which has hopefully been prefetched), and calling
- * the appropriate helper function.
+ * dumped, and calling the appropriate helper function.  In most cases,
+ * the data has already been read by send_reader_thread().
  */
 static int
 do_dump(dmu_send_cookie_t *dscp, struct send_range *range)
@@ -940,14 +941,13 @@ do_dump(dmu_send_cookie_t *dscp, struct send_range *range)
                ASSERT3U(range->start_blkid + 1, ==, range->end_blkid);
                if (BP_GET_TYPE(bp) == DMU_OT_SA) {
                        arc_flags_t aflags = ARC_FLAG_WAIT;
-                       enum zio_flag zioflags = ZIO_FLAG_CANFAIL;
+                       zio_flag_t zioflags = ZIO_FLAG_CANFAIL;
 
                        if (dscp->dsc_featureflags & DMU_BACKUP_FEATURE_RAW) {
                                ASSERT(BP_IS_PROTECTED(bp));
                                zioflags |= ZIO_FLAG_RAW;
                        }
 
-                       arc_buf_t *abuf;
                        zbookmark_phys_t zb;
                        ASSERT3U(range->start_blkid, ==, DMU_SPILL_BLKID);
                        zb.zb_objset = dmu_objset_id(dscp->dsc_os);
@@ -955,16 +955,19 @@ do_dump(dmu_send_cookie_t *dscp, struct send_range *range)
                        zb.zb_level = 0;
                        zb.zb_blkid = range->start_blkid;
 
+                       arc_buf_t *abuf = NULL;
                        if (!dscp->dsc_dso->dso_dryrun && arc_read(NULL, spa,
                            bp, arc_getbuf_func, &abuf, ZIO_PRIORITY_ASYNC_READ,
                            zioflags, &aflags, &zb) != 0)
                                return (SET_ERROR(EIO));
 
-                       err = dump_spill(dscp, bp, zb.zb_object, abuf->b_data);
-                       arc_buf_destroy(abuf, &abuf);
+                       err = dump_spill(dscp, bp, zb.zb_object,
+                           (abuf == NULL ? NULL : abuf->b_data));
+                       if (abuf != NULL)
+                               arc_buf_destroy(abuf, &abuf);
                        return (err);
                }
-               if (send_do_embed(dscp, bp)) {
+               if (send_do_embed(bp, dscp->dsc_featureflags)) {
                        err = dump_write_embedded(dscp, range->object,
                            range->start_blkid * srdp->datablksz,
                            srdp->datablksz, bp);
@@ -975,71 +978,24 @@ do_dump(dmu_send_cookie_t *dscp, struct send_range *range)
                    range->start_blkid * srdp->datablksz >=
                    dscp->dsc_resume_offset));
                /* it's a level-0 block of a regular object */
-               arc_flags_t aflags = ARC_FLAG_WAIT;
-               arc_buf_t *abuf = NULL;
-               uint64_t offset;
 
-               /*
-                * If we have large blocks stored on disk but the send flags
-                * don't allow us to send large blocks, we split the data from
-                * the arc buf into chunks.
-                */
-               boolean_t split_large_blocks =
-                   srdp->datablksz > SPA_OLD_MAXBLOCKSIZE &&
-                   !(dscp->dsc_featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS);
-
-               /*
-                * Raw sends require that we always get raw data as it exists
-                * on disk, so we assert that we are not splitting blocks here.
-                */
-               boolean_t request_raw =
-                   (dscp->dsc_featureflags & DMU_BACKUP_FEATURE_RAW) != 0;
-
-               /*
-                * We should only request compressed data from the ARC if all
-                * the following are true:
-                *  - stream compression was requested
-                *  - we aren't splitting large blocks into smaller chunks
-                *  - the data won't need to be byteswapped before sending
-                *  - this isn't an embedded block
-                *  - this isn't metadata (if receiving on a different endian
-                *    system it can be byteswapped more easily)
-                */
-               boolean_t request_compressed =
-                   (dscp->dsc_featureflags & DMU_BACKUP_FEATURE_COMPRESSED) &&
-                   !split_large_blocks && !BP_SHOULD_BYTESWAP(bp) &&
-                   !BP_IS_EMBEDDED(bp) && !DMU_OT_IS_METADATA(BP_GET_TYPE(bp));
-
-               IMPLY(request_raw, !split_large_blocks);
-               IMPLY(request_raw, BP_IS_PROTECTED(bp));
-               if (!dscp->dsc_dso->dso_dryrun) {
-                       enum zio_flag zioflags = ZIO_FLAG_CANFAIL;
-
-                       ASSERT3U(srdp->datablksz, ==, BP_GET_LSIZE(bp));
-
-                       if (request_raw)
-                               zioflags |= ZIO_FLAG_RAW;
-                       else if (request_compressed)
-                               zioflags |= ZIO_FLAG_RAW_COMPRESS;
-                       zbookmark_phys_t zb;
-                       zb.zb_objset = dmu_objset_id(dscp->dsc_os);
-                       zb.zb_object = range->object;
-                       zb.zb_level = 0;
-                       zb.zb_blkid = range->start_blkid;
-
-                       err = arc_read(NULL, spa, bp, arc_getbuf_func, &abuf,
-                           ZIO_PRIORITY_ASYNC_READ, zioflags, &aflags, &zb);
-               }
+               mutex_enter(&srdp->lock);
+               while (srdp->io_outstanding)
+                       cv_wait(&srdp->cv, &srdp->lock);
+               err = srdp->io_err;
+               mutex_exit(&srdp->lock);
 
                if (err != 0) {
                        if (zfs_send_corrupt_data &&
                            !dscp->dsc_dso->dso_dryrun) {
-                               /* Send a block filled with 0x"zfs badd bloc" */
-                               abuf = arc_alloc_buf(spa, &abuf, ARC_BUFC_DATA,
-                                   srdp->datablksz);
+                               /*
+                                * Send a block filled with 0x"zfs badd bloc"
+                                */
+                               srdp->abuf = arc_alloc_buf(spa, &srdp->abuf,
+                                   ARC_BUFC_DATA, srdp->datablksz);
                                uint64_t *ptr;
-                               for (ptr = abuf->b_data;
-                                   (char *)ptr < (char *)abuf->b_data +
+                               for (ptr = srdp->abuf->b_data;
+                                   (char *)ptr < (char *)srdp->abuf->b_data +
                                    srdp->datablksz; ptr++)
                                        *ptr = 0x2f5baddb10cULL;
                        } else {
@@ -1047,41 +1003,49 @@ do_dump(dmu_send_cookie_t *dscp, struct send_range *range)
                        }
                }
 
-               offset = range->start_blkid * srdp->datablksz;
+               ASSERT(dscp->dsc_dso->dso_dryrun ||
+                   srdp->abuf != NULL || srdp->abd != NULL);
 
-               if (split_large_blocks) {
-                       ASSERT0(arc_is_encrypted(abuf));
-                       ASSERT3U(arc_get_compression(abuf), ==,
-                           ZIO_COMPRESS_OFF);
-                       char *buf = abuf->b_data;
+               uint64_t offset = range->start_blkid * srdp->datablksz;
+
+               char *data = NULL;
+               if (srdp->abd != NULL) {
+                       data = abd_to_buf(srdp->abd);
+                       ASSERT3P(srdp->abuf, ==, NULL);
+               } else if (srdp->abuf != NULL) {
+                       data = srdp->abuf->b_data;
+               }
+
+               /*
+                * If we have large blocks stored on disk but the send flags
+                * don't allow us to send large blocks, we split the data from
+                * the arc buf into chunks.
+                */
+               if (srdp->datablksz > SPA_OLD_MAXBLOCKSIZE &&
+                   !(dscp->dsc_featureflags &
+                   DMU_BACKUP_FEATURE_LARGE_BLOCKS)) {
                        while (srdp->datablksz > 0 && err == 0) {
                                int n = MIN(srdp->datablksz,
                                    SPA_OLD_MAXBLOCKSIZE);
-                               err = dump_write(dscp, srdp->obj_type,
-                                   range->object, offset, n, n, NULL, buf);
+                               err = dmu_dump_write(dscp, srdp->obj_type,
+                                   range->object, offset, n, n, NULL, B_FALSE,
+                                   data);
                                offset += n;
-                               buf += n;
+                               /*
+                                * When doing dry run, data==NULL is used as a
+                                * sentinel value by
+                                * dmu_dump_write()->dump_record().
+                                */
+                               if (data != NULL)
+                                       data += n;
                                srdp->datablksz -= n;
                        }
                } else {
-                       int psize;
-                       if (abuf != NULL) {
-                               psize = arc_buf_size(abuf);
-                               if (arc_get_compression(abuf) !=
-                                   ZIO_COMPRESS_OFF) {
-                                       ASSERT3S(psize, ==, BP_GET_PSIZE(bp));
-                               }
-                       } else if (!request_compressed) {
-                               psize = srdp->datablksz;
-                       } else {
-                               psize = BP_GET_PSIZE(bp);
-                       }
-                       err = dump_write(dscp, srdp->obj_type, range->object,
-                           offset, srdp->datablksz, psize, bp,
-                           (abuf == NULL ? NULL : abuf->b_data));
+                       err = dmu_dump_write(dscp, srdp->obj_type,
+                           range->object, offset,
+                           srdp->datablksz, srdp->datasz, bp,
+                           srdp->io_compressed, data);
                }
-               if (abuf != NULL)
-                       arc_buf_destroy(abuf, &abuf);
                return (err);
        }
        case HOLE: {
@@ -1116,7 +1080,7 @@ do_dump(dmu_send_cookie_t *dscp, struct send_range *range)
        return (err);
 }
 
-struct send_range *
+static struct send_range *
 range_alloc(enum type type, uint64_t object, uint64_t start_blkid,
     uint64_t end_blkid, boolean_t eos)
 {
@@ -1126,6 +1090,15 @@ range_alloc(enum type type, uint64_t object, uint64_t start_blkid,
        range->start_blkid = start_blkid;
        range->end_blkid = end_blkid;
        range->eos_marker = eos;
+       if (type == DATA) {
+               range->sru.data.abd = NULL;
+               range->sru.data.abuf = NULL;
+               mutex_init(&range->sru.data.lock, NULL, MUTEX_DEFAULT, NULL);
+               cv_init(&range->sru.data.cv, NULL, CV_DEFAULT, NULL);
+               range->sru.data.io_outstanding = 0;
+               range->sru.data.io_err = 0;
+               range->sru.data.io_compressed = B_FALSE;
+       }
        return (range);
 }
 
@@ -1133,29 +1106,24 @@ range_alloc(enum type type, uint64_t object, uint64_t start_blkid,
  * This is the callback function to traverse_dataset that acts as a worker
  * thread for dmu_send_impl.
  */
-/*ARGSUSED*/
 static int
 send_cb(spa_t *spa, zilog_t *zilog, const blkptr_t *bp,
     const zbookmark_phys_t *zb, const struct dnode_phys *dnp, void *arg)
 {
+       (void) zilog;
        struct send_thread_arg *sta = arg;
        struct send_range *record;
 
        ASSERT(zb->zb_object == DMU_META_DNODE_OBJECT ||
            zb->zb_object >= sta->resume.zb_object);
-       ASSERT3P(sta->ds, !=, NULL);
 
        /*
         * All bps of an encrypted os should have the encryption bit set.
         * If this is not true it indicates tampering and we report an error.
         */
-       objset_t *os;
-       VERIFY0(dmu_objset_from_ds(sta->ds, &os));
-       if (os->os_encrypted &&
+       if (sta->os->os_encrypted &&
            !BP_IS_HOLE(bp) && !BP_USES_CRYPT(bp)) {
-               spa_log_error(spa, zb);
-               zfs_panic_recover("unencrypted block in encrypted "
-                   "object set %llu", sta->ds->ds_object);
+               spa_log_error(spa, zb, &bp->blk_birth);
                return (SET_ERROR(EIO));
        }
 
@@ -1173,7 +1141,7 @@ send_cb(spa_t *spa, zilog_t *zilog, const blkptr_t *bp,
                record->sru.object.bp = *bp;
                size_t size  = sizeof (*dnp) * (dnp->dn_extra_slots + 1);
                record->sru.object.dnp = kmem_alloc(size, KM_SLEEP);
-               bcopy(dnp, record->sru.object.dnp, size);
+               memcpy(record->sru.object.dnp, dnp, size);
                bqueue_enqueue(&sta->q, record, sizeof (*record));
                return (0);
        }
@@ -1208,23 +1176,30 @@ send_cb(spa_t *spa, zilog_t *zilog, const blkptr_t *bp,
        if (zb->zb_blkid == DMU_SPILL_BLKID)
                ASSERT3U(BP_GET_TYPE(bp), ==, DMU_OT_SA);
 
-       record = range_alloc(DATA, zb->zb_object, start, (start + span < start ?
-           0 : start + span), B_FALSE);
+       enum type record_type = DATA;
+       if (BP_IS_HOLE(bp))
+               record_type = HOLE;
+       else if (BP_IS_REDACTED(bp))
+               record_type = REDACT;
+       else
+               record_type = DATA;
+
+       record = range_alloc(record_type, zb->zb_object, start,
+           (start + span < start ? 0 : start + span), B_FALSE);
 
        uint64_t datablksz = (zb->zb_blkid == DMU_SPILL_BLKID ?
            BP_GET_LSIZE(bp) : dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT);
+
        if (BP_IS_HOLE(bp)) {
-               record->type = HOLE;
                record->sru.hole.datablksz = datablksz;
        } else if (BP_IS_REDACTED(bp)) {
-               record->type = REDACT;
                record->sru.redact.datablksz = datablksz;
        } else {
-               record->type = DATA;
                record->sru.data.datablksz = datablksz;
                record->sru.data.obj_type = dnp->dn_type;
                record->sru.data.bp = *bp;
        }
+
        bqueue_enqueue(&sta->q, record, sizeof (*record));
        return (0);
 }
@@ -1262,12 +1237,9 @@ redact_list_cb(redact_block_phys_t *rb, void *arg)
 /*
  * This function kicks off the traverse_dataset.  It also handles setting the
  * error code of the thread in case something goes wrong, and pushes the End of
- * Stream record when the traverse_dataset call has finished.  If there is no
- * dataset to traverse, then we traverse the redaction list provided and enqueue
- * records for that.  If neither is provided, the thread immediately pushes an
- * End of Stream marker.
+ * Stream record when the traverse_dataset call has finished.
  */
-static void
+static __attribute__((noreturn)) void
 send_traverse_thread(void *arg)
 {
        struct send_thread_arg *st_arg = arg;
@@ -1275,20 +1247,9 @@ send_traverse_thread(void *arg)
        struct send_range *data;
        fstrans_cookie_t cookie = spl_fstrans_mark();
 
-       if (st_arg->ds != NULL) {
-               ASSERT3P(st_arg->redaction_list, ==, NULL);
-               err = traverse_dataset_resume(st_arg->ds,
-                   st_arg->fromtxg, &st_arg->resume,
-                   st_arg->flags, send_cb, st_arg);
-       } else if (st_arg->redaction_list != NULL) {
-               struct redact_list_cb_arg rlcba = {0};
-               rlcba.cancel = &st_arg->cancel;
-               rlcba.num_blocks_visited = st_arg->num_blocks_visited;
-               rlcba.q = &st_arg->q;
-               rlcba.mark_redact = B_FALSE;
-               err = dsl_redaction_list_traverse(st_arg->redaction_list,
-                   &st_arg->resume, redact_list_cb, &rlcba);
-       }
+       err = traverse_dataset_resume(st_arg->os->os_dsl_dataset,
+           st_arg->fromtxg, &st_arg->resume,
+           st_arg->flags, send_cb, st_arg);
 
        if (err != EINTR)
                st_arg->error_code = err;
@@ -1329,11 +1290,11 @@ send_range_after(const struct send_range *from, const struct send_range *to)
                return (-1);
        if (from_obj >= to_end_obj)
                return (1);
-       int64_t cmp = AVL_CMP(to->type == OBJECT_RANGE, from->type ==
+       int64_t cmp = TREE_CMP(to->type == OBJECT_RANGE, from->type ==
            OBJECT_RANGE);
        if (unlikely(cmp))
                return (cmp);
-       cmp = AVL_CMP(to->type == OBJECT, from->type == OBJECT);
+       cmp = TREE_CMP(to->type == OBJECT, from->type == OBJECT);
        if (unlikely(cmp))
                return (cmp);
        if (from->end_blkid <= to->start_blkid)
@@ -1368,7 +1329,7 @@ get_next_range(bqueue_t *bq, struct send_range *prev)
        return (next);
 }
 
-static void
+static __attribute__((noreturn)) void
 redact_list_thread(void *arg)
 {
        struct redact_list_thread_arg *rlt_arg = arg;
@@ -1388,6 +1349,8 @@ redact_list_thread(void *arg)
        record = range_alloc(DATA, 0, 0, 0, B_TRUE);
        bqueue_enqueue_flush(&rlt_arg->q, record, sizeof (*record));
        spl_fstrans_unmark(cookie);
+
+       thread_exit();
 }
 
 /*
@@ -1402,7 +1365,7 @@ send_range_start_compare(struct send_range *r1, struct send_range *r2)
        uint64_t r1_l0equiv = r1->start_blkid;
        uint64_t r2_objequiv = r2->object;
        uint64_t r2_l0equiv = r2->start_blkid;
-       int64_t cmp = AVL_CMP(r1->eos_marker, r2->eos_marker);
+       int64_t cmp = TREE_CMP(r1->eos_marker, r2->eos_marker);
        if (unlikely(cmp))
                return (cmp);
        if (r1->object == 0) {
@@ -1414,17 +1377,17 @@ send_range_start_compare(struct send_range *r1, struct send_range *r2)
                r2_l0equiv = 0;
        }
 
-       cmp = AVL_CMP(r1_objequiv, r2_objequiv);
+       cmp = TREE_CMP(r1_objequiv, r2_objequiv);
        if (likely(cmp))
                return (cmp);
-       cmp = AVL_CMP(r2->type == OBJECT_RANGE, r1->type == OBJECT_RANGE);
+       cmp = TREE_CMP(r2->type == OBJECT_RANGE, r1->type == OBJECT_RANGE);
        if (unlikely(cmp))
                return (cmp);
-       cmp = AVL_CMP(r2->type == OBJECT, r1->type == OBJECT);
+       cmp = TREE_CMP(r2->type == OBJECT, r1->type == OBJECT);
        if (unlikely(cmp))
                return (cmp);
 
-       return (AVL_CMP(r1_l0equiv, r2_l0equiv));
+       return (TREE_CMP(r1_l0equiv, r2_l0equiv));
 }
 
 enum q_idx {
@@ -1561,7 +1524,7 @@ find_next_range(struct send_range **ranges, bqueue_t **qs, uint64_t *out_mask)
  * data from the redact_list_thread and use that to determine which blocks
  * should be redacted.
  */
-static void
+static __attribute__((noreturn)) void
 send_merge_thread(void *arg)
 {
        struct send_merge_thread_arg *smt_arg = arg;
@@ -1628,27 +1591,124 @@ send_merge_thread(void *arg)
                }
                range_free(front_ranges[i]);
        }
-       if (range == NULL)
-               range = kmem_zalloc(sizeof (*range), KM_SLEEP);
        range->eos_marker = B_TRUE;
        bqueue_enqueue_flush(&smt_arg->q, range, 1);
        spl_fstrans_unmark(cookie);
        thread_exit();
 }
 
-struct send_prefetch_thread_arg {
+struct send_reader_thread_arg {
        struct send_merge_thread_arg *smta;
        bqueue_t q;
        boolean_t cancel;
-       boolean_t issue_prefetches;
+       boolean_t issue_reads;
+       uint64_t featureflags;
        int error;
 };
 
+static void
+dmu_send_read_done(zio_t *zio)
+{
+       struct send_range *range = zio->io_private;
+
+       mutex_enter(&range->sru.data.lock);
+       if (zio->io_error != 0) {
+               abd_free(range->sru.data.abd);
+               range->sru.data.abd = NULL;
+               range->sru.data.io_err = zio->io_error;
+       }
+
+       ASSERT(range->sru.data.io_outstanding);
+       range->sru.data.io_outstanding = B_FALSE;
+       cv_broadcast(&range->sru.data.cv);
+       mutex_exit(&range->sru.data.lock);
+}
+
+static void
+issue_data_read(struct send_reader_thread_arg *srta, struct send_range *range)
+{
+       struct srd *srdp = &range->sru.data;
+       blkptr_t *bp = &srdp->bp;
+       objset_t *os = srta->smta->os;
+
+       ASSERT3U(range->type, ==, DATA);
+       ASSERT3U(range->start_blkid + 1, ==, range->end_blkid);
+       /*
+        * If we have large blocks stored on disk but
+        * the send flags don't allow us to send large
+        * blocks, we split the data from the arc buf
+        * into chunks.
+        */
+       boolean_t split_large_blocks =
+           srdp->datablksz > SPA_OLD_MAXBLOCKSIZE &&
+           !(srta->featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS);
+       /*
+        * We should only request compressed data from the ARC if all
+        * the following are true:
+        *  - stream compression was requested
+        *  - we aren't splitting large blocks into smaller chunks
+        *  - the data won't need to be byteswapped before sending
+        *  - this isn't an embedded block
+        *  - this isn't metadata (if receiving on a different endian
+        *    system it can be byteswapped more easily)
+        */
+       boolean_t request_compressed =
+           (srta->featureflags & DMU_BACKUP_FEATURE_COMPRESSED) &&
+           !split_large_blocks && !BP_SHOULD_BYTESWAP(bp) &&
+           !BP_IS_EMBEDDED(bp) && !DMU_OT_IS_METADATA(BP_GET_TYPE(bp));
+
+       zio_flag_t zioflags = ZIO_FLAG_CANFAIL;
+
+       if (srta->featureflags & DMU_BACKUP_FEATURE_RAW) {
+               zioflags |= ZIO_FLAG_RAW;
+               srdp->io_compressed = B_TRUE;
+       } else if (request_compressed) {
+               zioflags |= ZIO_FLAG_RAW_COMPRESS;
+               srdp->io_compressed = B_TRUE;
+       }
+
+       srdp->datasz = (zioflags & ZIO_FLAG_RAW_COMPRESS) ?
+           BP_GET_PSIZE(bp) : BP_GET_LSIZE(bp);
+
+       if (!srta->issue_reads)
+               return;
+       if (BP_IS_REDACTED(bp))
+               return;
+       if (send_do_embed(bp, srta->featureflags))
+               return;
+
+       zbookmark_phys_t zb = {
+           .zb_objset = dmu_objset_id(os),
+           .zb_object = range->object,
+           .zb_level = 0,
+           .zb_blkid = range->start_blkid,
+       };
+
+       arc_flags_t aflags = ARC_FLAG_CACHED_ONLY;
+
+       int arc_err = arc_read(NULL, os->os_spa, bp,
+           arc_getbuf_func, &srdp->abuf, ZIO_PRIORITY_ASYNC_READ,
+           zioflags, &aflags, &zb);
+       /*
+        * If the data is not already cached in the ARC, we read directly
+        * from zio.  This avoids the performance overhead of adding a new
+        * entry to the ARC, and we also avoid polluting the ARC cache with
+        * data that is not likely to be used in the future.
+        */
+       if (arc_err != 0) {
+               srdp->abd = abd_alloc_linear(srdp->datasz, B_FALSE);
+               srdp->io_outstanding = B_TRUE;
+               zio_nowait(zio_read(NULL, os->os_spa, bp, srdp->abd,
+                   srdp->datasz, dmu_send_read_done, range,
+                   ZIO_PRIORITY_ASYNC_READ, zioflags, &zb));
+       }
+}
+
 /*
  * Create a new record with the given values.
  */
 static void
-enqueue_range(struct send_prefetch_thread_arg *spta, bqueue_t *q, dnode_t *dn,
+enqueue_range(struct send_reader_thread_arg *srta, bqueue_t *q, dnode_t *dn,
     uint64_t blkid, uint64_t count, const blkptr_t *bp, uint32_t datablksz)
 {
        enum type range_type = (bp == NULL || BP_IS_HOLE(bp) ? HOLE :
@@ -1657,8 +1717,10 @@ enqueue_range(struct send_prefetch_thread_arg *spta, bqueue_t *q, dnode_t *dn,
        struct send_range *range = range_alloc(range_type, dn->dn_object,
            blkid, blkid + count, B_FALSE);
 
-       if (blkid == DMU_SPILL_BLKID)
+       if (blkid == DMU_SPILL_BLKID) {
+               ASSERT3P(bp, !=, NULL);
                ASSERT3U(BP_GET_TYPE(bp), ==, DMU_OT_SA);
+       }
 
        switch (range_type) {
        case HOLE:
@@ -1669,18 +1731,7 @@ enqueue_range(struct send_prefetch_thread_arg *spta, bqueue_t *q, dnode_t *dn,
                range->sru.data.datablksz = datablksz;
                range->sru.data.obj_type = dn->dn_type;
                range->sru.data.bp = *bp;
-               if (spta->issue_prefetches) {
-                       zbookmark_phys_t zb = {0};
-                       zb.zb_objset = dmu_objset_id(dn->dn_objset);
-                       zb.zb_object = dn->dn_object;
-                       zb.zb_level = 0;
-                       zb.zb_blkid = blkid;
-                       arc_flags_t aflags = ARC_FLAG_NOWAIT |
-                           ARC_FLAG_PREFETCH;
-                       (void) arc_read(NULL, dn->dn_objset->os_spa, bp, NULL,
-                           NULL, ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_CANFAIL |
-                           ZIO_FLAG_SPECULATIVE, &aflags, &zb);
-               }
+               issue_data_read(srta, range);
                break;
        case REDACT:
                range->sru.redact.datablksz = datablksz;
@@ -1698,13 +1749,13 @@ enqueue_range(struct send_prefetch_thread_arg *spta, bqueue_t *q, dnode_t *dn,
  * some indirect blocks can be discarded because they're not holes. Second,
  * it issues prefetches for the data we need to send.
  */
-static void
-send_prefetch_thread(void *arg)
+static __attribute__((noreturn)) void
+send_reader_thread(void *arg)
 {
-       struct send_prefetch_thread_arg *spta = arg;
-       struct send_merge_thread_arg *smta = spta->smta;
+       struct send_reader_thread_arg *srta = arg;
+       struct send_merge_thread_arg *smta = srta->smta;
        bqueue_t *inq = &smta->q;
-       bqueue_t *outq = &spta->q;
+       bqueue_t *outq = &srta->q;
        objset_t *os = smta->os;
        fstrans_cookie_t cookie = spl_fstrans_mark();
        struct send_range *range = bqueue_dequeue(inq);
@@ -1720,30 +1771,14 @@ send_prefetch_thread(void *arg)
         */
        uint64_t last_obj = UINT64_MAX;
        uint64_t last_obj_exists = B_TRUE;
-       while (!range->eos_marker && !spta->cancel && smta->error == 0 &&
+       while (!range->eos_marker && !srta->cancel && smta->error == 0 &&
            err == 0) {
                switch (range->type) {
-               case DATA: {
-                       zbookmark_phys_t zb;
-                       zb.zb_objset = dmu_objset_id(os);
-                       zb.zb_object = range->object;
-                       zb.zb_level = 0;
-                       zb.zb_blkid = range->start_blkid;
-                       ASSERT3U(range->start_blkid + 1, ==, range->end_blkid);
-                       if (!BP_IS_REDACTED(&range->sru.data.bp) &&
-                           spta->issue_prefetches &&
-                           !BP_IS_EMBEDDED(&range->sru.data.bp)) {
-                               arc_flags_t aflags = ARC_FLAG_NOWAIT |
-                                   ARC_FLAG_PREFETCH;
-                               (void) arc_read(NULL, os->os_spa,
-                                   &range->sru.data.bp, NULL, NULL,
-                                   ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_CANFAIL |
-                                   ZIO_FLAG_SPECULATIVE, &aflags, &zb);
-                       }
+               case DATA:
+                       issue_data_read(srta, range);
                        bqueue_enqueue(outq, range, range->sru.data.datablksz);
                        range = get_next_range_nofree(inq, range);
                        break;
-               }
                case HOLE:
                case OBJECT:
                case OBJECT_RANGE:
@@ -1806,8 +1841,7 @@ send_prefetch_thread(void *arg)
                                continue;
                        }
                        uint64_t file_max =
-                           (dn->dn_maxblkid < range->end_blkid ?
-                           dn->dn_maxblkid : range->end_blkid);
+                           MIN(dn->dn_maxblkid, range->end_blkid);
                        /*
                         * The object exists, so we need to try to find the
                         * blkptr for each block in the range we're processing.
@@ -1845,7 +1879,7 @@ send_prefetch_thread(void *arg)
                                            datablksz);
                                        uint64_t nblks = (offset / datablksz) -
                                            blkid;
-                                       enqueue_range(spta, outq, dn, blkid,
+                                       enqueue_range(srta, outq, dn, blkid,
                                            nblks, NULL, datablksz);
                                        blkid += nblks;
                                }
@@ -1856,7 +1890,7 @@ send_prefetch_thread(void *arg)
                                if (err != 0)
                                        break;
                                ASSERT(!BP_IS_HOLE(&bp));
-                               enqueue_range(spta, outq, dn, blkid, 1, &bp,
+                               enqueue_range(srta, outq, dn, blkid, 1, &bp,
                                    datablksz);
                        }
                        rw_exit(&dn->dn_struct_rwlock);
@@ -1865,11 +1899,11 @@ send_prefetch_thread(void *arg)
                }
                }
        }
-       if (spta->cancel || err != 0) {
+       if (srta->cancel || err != 0) {
                smta->cancel = B_TRUE;
-               spta->error = err;
+               srta->error = err;
        } else if (smta->error != 0) {
-               spta->error = smta->error;
+               srta->error = smta->error;
        }
        while (!range->eos_marker)
                range = get_next_range(inq, range);
@@ -1883,7 +1917,7 @@ send_prefetch_thread(void *arg)
 
 struct dmu_send_params {
        /* Pool args */
-       void *tag; // Tag that dp was held with, will be used to release dp.
+       const void *tag; // Tag dp was held with, will be used to release dp.
        dsl_pool_t *dp;
        /* To snapshot args */
        const char *tosnap;
@@ -1898,8 +1932,11 @@ struct dmu_send_params {
        boolean_t embedok;
        boolean_t large_block_ok;
        boolean_t compressok;
+       boolean_t rawok;
+       boolean_t savedok;
        uint64_t resumeobj;
        uint64_t resumeoff;
+       uint64_t saved_guid;
        zfs_bookmark_phys_t *redactbook;
        /* Stream output params */
        dmu_send_outparams_t *dso;
@@ -1907,7 +1944,7 @@ struct dmu_send_params {
        /* Stream progress params */
        offset_t *off;
        int outfd;
-       boolean_t rawok;
+       char saved_toname[MAXNAMELEN];
 };
 
 static int
@@ -1916,7 +1953,7 @@ setup_featureflags(struct dmu_send_params *dspp, objset_t *os,
 {
        dsl_dataset_t *to_ds = dspp->to_ds;
        dsl_pool_t *dp = dspp->dp;
-#ifdef _KERNEL
+
        if (dmu_objset_type(os) == DMU_OST_ZFS) {
                uint64_t version;
                if (zfs_get_zplprop(os, ZFS_PROP_VERSION, &version) != 0)
@@ -1925,7 +1962,6 @@ setup_featureflags(struct dmu_send_params *dspp, objset_t *os,
                if (version >= ZPL_VERSION_SA)
                        *featureflags |= DMU_BACKUP_FEATURE_SA_SPILL;
        }
-#endif
 
        /* raw sends imply large_block_ok */
        if ((dspp->rawok || dspp->large_block_ok) &&
@@ -1942,6 +1978,7 @@ setup_featureflags(struct dmu_send_params *dspp, objset_t *os,
        /* raw send implies compressok */
        if (dspp->compressok || dspp->rawok)
                *featureflags |= DMU_BACKUP_FEATURE_COMPRESSED;
+
        if (dspp->rawok && os->os_encrypted)
                *featureflags |= DMU_BACKUP_FEATURE_RAW;
 
@@ -1952,6 +1989,17 @@ setup_featureflags(struct dmu_send_params *dspp, objset_t *os,
                *featureflags |= DMU_BACKUP_FEATURE_LZ4;
        }
 
+       /*
+        * We specifically do not include DMU_BACKUP_FEATURE_EMBED_DATA here to
+        * allow sending ZSTD compressed datasets to a receiver that does not
+        * support ZSTD
+        */
+       if ((*featureflags &
+           (DMU_BACKUP_FEATURE_COMPRESSED | DMU_BACKUP_FEATURE_RAW)) != 0 &&
+           dsl_dataset_feature_is_active(to_ds, SPA_FEATURE_ZSTD_COMPRESS)) {
+               *featureflags |= DMU_BACKUP_FEATURE_ZSTD;
+       }
+
        if (dspp->resumeobj != 0 || dspp->resumeoff != 0) {
                *featureflags |= DMU_BACKUP_FEATURE_RESUMING;
        }
@@ -1994,16 +2042,22 @@ create_begin_record(struct dmu_send_params *dspp, objset_t *os,
                drrb->drr_flags |= DRR_FLAG_FREERECORDS;
        drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_SPILL_BLOCK;
 
-       dsl_dataset_name(to_ds, drrb->drr_toname);
-       if (!to_ds->ds_is_snapshot) {
-               (void) strlcat(drrb->drr_toname, "@--head--",
+       if (dspp->savedok) {
+               drrb->drr_toguid = dspp->saved_guid;
+               strlcpy(drrb->drr_toname, dspp->saved_toname,
                    sizeof (drrb->drr_toname));
+       } else {
+               dsl_dataset_name(to_ds, drrb->drr_toname);
+               if (!to_ds->ds_is_snapshot) {
+                       (void) strlcat(drrb->drr_toname, "@--head--",
+                           sizeof (drrb->drr_toname));
+               }
        }
        return (drr);
 }
 
 static void
-setup_to_thread(struct send_thread_arg *to_arg, dsl_dataset_t *to_ds,
+setup_to_thread(struct send_thread_arg *to_arg, objset_t *to_os,
     dmu_sendstatus_t *dssp, uint64_t fromtxg, boolean_t rawok)
 {
        VERIFY0(bqueue_init(&to_arg->q, zfs_send_no_prefetch_queue_ff,
@@ -2011,12 +2065,13 @@ setup_to_thread(struct send_thread_arg *to_arg, dsl_dataset_t *to_ds,
            offsetof(struct send_range, ln)));
        to_arg->error_code = 0;
        to_arg->cancel = B_FALSE;
-       to_arg->ds = to_ds;
+       to_arg->os = to_os;
        to_arg->fromtxg = fromtxg;
        to_arg->flags = TRAVERSE_PRE | TRAVERSE_PREFETCH_METADATA;
        if (rawok)
                to_arg->flags |= TRAVERSE_NO_DECRYPT;
-       to_arg->redaction_list = NULL;
+       if (zfs_send_corrupt_data)
+               to_arg->flags |= TRAVERSE_HARD;
        to_arg->num_blocks_visited = &dssp->dss_blocks;
        (void) thread_create(NULL, 0, send_traverse_thread, to_arg, 0,
            curproc, TS_RUN, minclsyspri);
@@ -2084,15 +2139,17 @@ setup_merge_thread(struct send_merge_thread_arg *smt_arg,
 }
 
 static void
-setup_prefetch_thread(struct send_prefetch_thread_arg *spt_arg,
-    struct dmu_send_params *dspp, struct send_merge_thread_arg *smt_arg)
+setup_reader_thread(struct send_reader_thread_arg *srt_arg,
+    struct dmu_send_params *dspp, struct send_merge_thread_arg *smt_arg,
+    uint64_t featureflags)
 {
-       VERIFY0(bqueue_init(&spt_arg->q, zfs_send_queue_ff,
+       VERIFY0(bqueue_init(&srt_arg->q, zfs_send_queue_ff,
            MAX(zfs_send_queue_length, 2 * zfs_max_recordsize),
            offsetof(struct send_range, ln)));
-       spt_arg->smta = smt_arg;
-       spt_arg->issue_prefetches = !dspp->dso->dso_dryrun;
-       (void) thread_create(NULL, 0, send_prefetch_thread, spt_arg, 0,
+       srt_arg->smta = smt_arg;
+       srt_arg->issue_reads = !dspp->dso->dso_dryrun;
+       srt_arg->featureflags = featureflags;
+       (void) thread_create(NULL, 0, send_reader_thread, srt_arg, 0,
            curproc, TS_RUN, minclsyspri);
 }
 
@@ -2103,6 +2160,7 @@ setup_resume_points(struct dmu_send_params *dspp,
     struct send_merge_thread_arg *smt_arg, boolean_t resuming, objset_t *os,
     redaction_list_t *redact_rl, nvlist_t *nvl)
 {
+       (void) smt_arg;
        dsl_dataset_t *to_ds = dspp->to_ds;
        int err = 0;
 
@@ -2121,7 +2179,6 @@ setup_resume_points(struct dmu_send_params *dspp,
         * If we're resuming a redacted send, we can skip to the appropriate
         * point in the redaction bookmark by binary searching through it.
         */
-       smt_arg->bookmark_before = B_FALSE;
        if (redact_rl != NULL) {
                SET_BOOKMARK(&rlt_arg->resume, to_ds->ds_object, obj, 0, blkid);
        }
@@ -2262,7 +2319,7 @@ setup_send_progress(struct dmu_send_params *dspp)
  *
  * The final case is a simple zfs full or incremental send.  The to_ds traversal
  * thread behaves the same as always. The redact list thread is never started.
- * The send merge thread takes all the blocks that the to_ds traveral thread
+ * The send merge thread takes all the blocks that the to_ds traversal thread
  * sends it, prefetches the data, and sends the blocks on to the main thread.
  * The main thread sends the data over the wire.
  *
@@ -2298,7 +2355,7 @@ dmu_send_impl(struct dmu_send_params *dspp)
        struct send_thread_arg *to_arg;
        struct redact_list_thread_arg *rlt_arg;
        struct send_merge_thread_arg *smt_arg;
-       struct send_prefetch_thread_arg *spt_arg;
+       struct send_reader_thread_arg *srt_arg;
        struct send_range *range;
        redaction_list_t *from_rl = NULL;
        redaction_list_t *redact_rl = NULL;
@@ -2308,13 +2365,14 @@ dmu_send_impl(struct dmu_send_params *dspp)
        dsl_dataset_t *to_ds = dspp->to_ds;
        zfs_bookmark_phys_t *ancestor_zb = &dspp->ancestor_zb;
        dsl_pool_t *dp = dspp->dp;
-       void *tag = dspp->tag;
+       const void *tag = dspp->tag;
 
        err = dmu_objset_from_ds(to_ds, &os);
        if (err != 0) {
                dsl_pool_rele(dp, tag);
                return (err);
        }
+
        /*
         * If this is a non-raw send of an encrypted ds, we can ensure that
         * the objset_phys_t is authenticated. This is safe because this is
@@ -2342,12 +2400,6 @@ dmu_send_impl(struct dmu_send_params *dspp)
                return (err);
        }
 
-       from_arg = kmem_zalloc(sizeof (*from_arg), KM_SLEEP);
-       to_arg = kmem_zalloc(sizeof (*to_arg), KM_SLEEP);
-       rlt_arg = kmem_zalloc(sizeof (*rlt_arg), KM_SLEEP);
-       smt_arg = kmem_zalloc(sizeof (*smt_arg), KM_SLEEP);
-       spt_arg = kmem_zalloc(sizeof (*spt_arg), KM_SLEEP);
-
        /*
         * If we're doing a redacted send, hold the bookmark's redaction list.
         */
@@ -2382,6 +2434,12 @@ dmu_send_impl(struct dmu_send_params *dspp)
 
        dsl_dataset_long_hold(to_ds, FTAG);
 
+       from_arg = kmem_zalloc(sizeof (*from_arg), KM_SLEEP);
+       to_arg = kmem_zalloc(sizeof (*to_arg), KM_SLEEP);
+       rlt_arg = kmem_zalloc(sizeof (*rlt_arg), KM_SLEEP);
+       smt_arg = kmem_zalloc(sizeof (*smt_arg), KM_SLEEP);
+       srt_arg = kmem_zalloc(sizeof (*srt_arg), KM_SLEEP);
+
        drr = create_begin_record(dspp, os, featureflags);
        dssp = setup_send_progress(dspp);
 
@@ -2456,12 +2514,11 @@ dmu_send_impl(struct dmu_send_params *dspp)
        }
 
        if (featureflags & DMU_BACKUP_FEATURE_RAW) {
-               uint64_t ivset_guid = (ancestor_zb != NULL) ?
-                   ancestor_zb->zbm_ivset_guid : 0;
+               uint64_t ivset_guid = ancestor_zb->zbm_ivset_guid;
                nvlist_t *keynvl = NULL;
                ASSERT(os->os_encrypted);
 
-               err = dsl_crypto_populate_key_nvlist(to_ds, ivset_guid,
+               err = dsl_crypto_populate_key_nvlist(os, ivset_guid,
                    &keynvl);
                if (err != 0) {
                        fnvlist_free(nvl);
@@ -2485,18 +2542,18 @@ dmu_send_impl(struct dmu_send_params *dspp)
                goto out;
        }
 
-       setup_to_thread(to_arg, to_ds, dssp, fromtxg, dspp->rawok);
+       setup_to_thread(to_arg, os, dssp, fromtxg, dspp->rawok);
        setup_from_thread(from_arg, from_rl, dssp);
        setup_redact_list_thread(rlt_arg, dspp, redact_rl, dssp);
        setup_merge_thread(smt_arg, dspp, from_arg, to_arg, rlt_arg, os);
-       setup_prefetch_thread(spt_arg, dspp, smt_arg);
+       setup_reader_thread(srt_arg, dspp, smt_arg, featureflags);
 
-       range = bqueue_dequeue(&spt_arg->q);
+       range = bqueue_dequeue(&srt_arg->q);
        while (err == 0 && !range->eos_marker) {
                err = do_dump(&dsc, range);
-               range = get_next_range(&spt_arg->q, range);
+               range = get_next_range(&srt_arg->q, range);
                if (issig(JUSTLOOKING) && issig(FORREAL))
-                       err = EINTR;
+                       err = SET_ERROR(EINTR);
        }
 
        /*
@@ -2506,22 +2563,22 @@ dmu_send_impl(struct dmu_send_params *dspp)
         * pending records before exiting.
         */
        if (err != 0) {
-               spt_arg->cancel = B_TRUE;
+               srt_arg->cancel = B_TRUE;
                while (!range->eos_marker) {
-                       range = get_next_range(&spt_arg->q, range);
+                       range = get_next_range(&srt_arg->q, range);
                }
        }
        range_free(range);
 
-       bqueue_destroy(&spt_arg->q);
+       bqueue_destroy(&srt_arg->q);
        bqueue_destroy(&smt_arg->q);
        if (dspp->redactbook != NULL)
                bqueue_destroy(&rlt_arg->q);
        bqueue_destroy(&to_arg->q);
        bqueue_destroy(&from_arg->q);
 
-       if (err == 0 && spt_arg->error != 0)
-               err = spt_arg->error;
+       if (err == 0 && srt_arg->error != 0)
+               err = srt_arg->error;
 
        if (err != 0)
                goto out;
@@ -2536,19 +2593,27 @@ dmu_send_impl(struct dmu_send_params *dspp)
                goto out;
        }
 
-       bzero(drr, sizeof (dmu_replay_record_t));
-       drr->drr_type = DRR_END;
-       drr->drr_u.drr_end.drr_checksum = dsc.dsc_zc;
-       drr->drr_u.drr_end.drr_toguid = dsc.dsc_toguid;
+       /*
+        * Send the DRR_END record if this is not a saved stream.
+        * Otherwise, the omitted DRR_END record will signal to
+        * the receive side that the stream is incomplete.
+        */
+       if (!dspp->savedok) {
+               memset(drr, 0, sizeof (dmu_replay_record_t));
+               drr->drr_type = DRR_END;
+               drr->drr_u.drr_end.drr_checksum = dsc.dsc_zc;
+               drr->drr_u.drr_end.drr_toguid = dsc.dsc_toguid;
 
-       if (dump_record(&dsc, NULL, 0) != 0)
-               err = dsc.dsc_err;
+               if (dump_record(&dsc, NULL, 0) != 0)
+                       err = dsc.dsc_err;
+       }
 out:
        mutex_enter(&to_ds->ds_sendstream_lock);
        list_remove(&to_ds->ds_sendstreams, dssp);
        mutex_exit(&to_ds->ds_sendstream_lock);
 
-       VERIFY(err != 0 || (dsc.dsc_sent_begin && dsc.dsc_sent_end));
+       VERIFY(err != 0 || (dsc.dsc_sent_begin &&
+           (dsc.dsc_sent_end || dspp->savedok)));
 
        kmem_free(drr, sizeof (dmu_replay_record_t));
        kmem_free(dssp, sizeof (dmu_sendstatus_t));
@@ -2556,7 +2621,7 @@ out:
        kmem_free(to_arg, sizeof (*to_arg));
        kmem_free(rlt_arg, sizeof (*rlt_arg));
        kmem_free(smt_arg, sizeof (*smt_arg));
-       kmem_free(spt_arg, sizeof (*spt_arg));
+       kmem_free(srt_arg, sizeof (*srt_arg));
 
        dsl_dataset_long_rele(to_ds, FTAG);
        if (from_rl != NULL) {
@@ -2574,11 +2639,12 @@ out:
 int
 dmu_send_obj(const char *pool, uint64_t tosnap, uint64_t fromsnap,
     boolean_t embedok, boolean_t large_block_ok, boolean_t compressok,
-    boolean_t rawok, int outfd, offset_t *off, dmu_send_outparams_t *dsop)
+    boolean_t rawok, boolean_t savedok, int outfd, offset_t *off,
+    dmu_send_outparams_t *dsop)
 {
        int err;
        dsl_dataset_t *fromds;
-       ds_hold_flags_t dsflags = (rawok) ? 0 : DS_HOLD_FLAG_DECRYPT;
+       ds_hold_flags_t dsflags;
        struct dmu_send_params dspp = {0};
        dspp.embedok = embedok;
        dspp.large_block_ok = large_block_ok;
@@ -2588,7 +2654,9 @@ dmu_send_obj(const char *pool, uint64_t tosnap, uint64_t fromsnap,
        dspp.dso = dsop;
        dspp.tag = FTAG;
        dspp.rawok = rawok;
+       dspp.savedok = savedok;
 
+       dsflags = (rawok) ? DS_HOLD_FLAG_NONE : DS_HOLD_FLAG_DECRYPT;
        err = dsl_pool_hold(pool, FTAG, &dspp.dp);
        if (err != 0)
                return (err);
@@ -2632,21 +2700,28 @@ dmu_send_obj(const char *pool, uint64_t tosnap, uint64_t fromsnap,
                        uint64_t size = dspp.numfromredactsnaps *
                            sizeof (uint64_t);
                        dspp.fromredactsnaps = kmem_zalloc(size, KM_SLEEP);
-                       bcopy(fromredact, dspp.fromredactsnaps, size);
+                       memcpy(dspp.fromredactsnaps, fromredact, size);
                }
 
-               if (!dsl_dataset_is_before(dspp.to_ds, fromds, 0)) {
+               boolean_t is_before =
+                   dsl_dataset_is_before(dspp.to_ds, fromds, 0);
+               dspp.is_clone = (dspp.to_ds->ds_dir !=
+                   fromds->ds_dir);
+               dsl_dataset_rele(fromds, FTAG);
+               if (!is_before) {
+                       dsl_pool_rele(dspp.dp, FTAG);
                        err = SET_ERROR(EXDEV);
                } else {
-                       dspp.is_clone = (dspp.to_ds->ds_dir !=
-                           fromds->ds_dir);
-                       dsl_dataset_rele(fromds, FTAG);
                        err = dmu_send_impl(&dspp);
                }
        } else {
                dspp.numfromredactsnaps = NUM_SNAPS_NOT_REDACTED;
                err = dmu_send_impl(&dspp);
        }
+       if (dspp.fromredactsnaps)
+               kmem_free(dspp.fromredactsnaps,
+                   dspp.numfromredactsnaps * sizeof (uint64_t));
+
        dsl_dataset_rele(dspp.to_ds, FTAG);
        return (err);
 }
@@ -2654,15 +2729,18 @@ dmu_send_obj(const char *pool, uint64_t tosnap, uint64_t fromsnap,
 int
 dmu_send(const char *tosnap, const char *fromsnap, boolean_t embedok,
     boolean_t large_block_ok, boolean_t compressok, boolean_t rawok,
-    uint64_t resumeobj, uint64_t resumeoff, const char *redactbook, int outfd,
-    offset_t *off, dmu_send_outparams_t *dsop)
+    boolean_t savedok, uint64_t resumeobj, uint64_t resumeoff,
+    const char *redactbook, int outfd, offset_t *off,
+    dmu_send_outparams_t *dsop)
 {
        int err = 0;
-       ds_hold_flags_t dsflags = (rawok) ? 0 : DS_HOLD_FLAG_DECRYPT;
+       ds_hold_flags_t dsflags;
        boolean_t owned = B_FALSE;
        dsl_dataset_t *fromds = NULL;
        zfs_bookmark_phys_t book = {0};
        struct dmu_send_params dspp = {0};
+
+       dsflags = (rawok) ? DS_HOLD_FLAG_NONE : DS_HOLD_FLAG_DECRYPT;
        dspp.tosnap = tosnap;
        dspp.embedok = embedok;
        dspp.large_block_ok = large_block_ok;
@@ -2674,6 +2752,7 @@ dmu_send(const char *tosnap, const char *fromsnap, boolean_t embedok,
        dspp.resumeobj = resumeobj;
        dspp.resumeoff = resumeoff;
        dspp.rawok = rawok;
+       dspp.savedok = savedok;
 
        if (fromsnap != NULL && strpbrk(fromsnap, "@#") == NULL)
                return (SET_ERROR(EINVAL));
@@ -2681,20 +2760,68 @@ dmu_send(const char *tosnap, const char *fromsnap, boolean_t embedok,
        err = dsl_pool_hold(tosnap, FTAG, &dspp.dp);
        if (err != 0)
                return (err);
+
        if (strchr(tosnap, '@') == NULL && spa_writeable(dspp.dp->dp_spa)) {
                /*
                 * We are sending a filesystem or volume.  Ensure
                 * that it doesn't change by owning the dataset.
                 */
-               err = dsl_dataset_own(dspp.dp, tosnap, dsflags, FTAG,
-                   &dspp.to_ds);
-               owned = B_TRUE;
+
+               if (savedok) {
+                       /*
+                        * We are looking for the dataset that represents the
+                        * partially received send stream. If this stream was
+                        * received as a new snapshot of an existing dataset,
+                        * this will be saved in a hidden clone named
+                        * "<pool>/<dataset>/%recv". Otherwise, the stream
+                        * will be saved in the live dataset itself. In
+                        * either case we need to use dsl_dataset_own_force()
+                        * because the stream is marked as inconsistent,
+                        * which would normally make it unavailable to be
+                        * owned.
+                        */
+                       char *name = kmem_asprintf("%s/%s", tosnap,
+                           recv_clone_name);
+                       err = dsl_dataset_own_force(dspp.dp, name, dsflags,
+                           FTAG, &dspp.to_ds);
+                       if (err == ENOENT) {
+                               err = dsl_dataset_own_force(dspp.dp, tosnap,
+                                   dsflags, FTAG, &dspp.to_ds);
+                       }
+
+                       if (err == 0) {
+                               owned = B_TRUE;
+                               err = zap_lookup(dspp.dp->dp_meta_objset,
+                                   dspp.to_ds->ds_object,
+                                   DS_FIELD_RESUME_TOGUID, 8, 1,
+                                   &dspp.saved_guid);
+                       }
+
+                       if (err == 0) {
+                               err = zap_lookup(dspp.dp->dp_meta_objset,
+                                   dspp.to_ds->ds_object,
+                                   DS_FIELD_RESUME_TONAME, 1,
+                                   sizeof (dspp.saved_toname),
+                                   dspp.saved_toname);
+                       }
+                       /* Only disown if there was an error in the lookups */
+                       if (owned && (err != 0))
+                               dsl_dataset_disown(dspp.to_ds, dsflags, FTAG);
+
+                       kmem_strfree(name);
+               } else {
+                       err = dsl_dataset_own(dspp.dp, tosnap, dsflags,
+                           FTAG, &dspp.to_ds);
+                       if (err == 0)
+                               owned = B_TRUE;
+               }
        } else {
                err = dsl_dataset_hold_flags(dspp.dp, tosnap, dsflags, FTAG,
                    &dspp.to_ds);
        }
 
        if (err != 0) {
+               /* Note: dsl dataset is not owned at this point */
                dsl_pool_rele(dspp.dp, FTAG);
                return (err);
        }
@@ -2766,16 +2893,13 @@ dmu_send(const char *tosnap, const char *fromsnap, boolean_t embedok,
                                            sizeof (uint64_t);
                                        dspp.fromredactsnaps = kmem_zalloc(size,
                                            KM_SLEEP);
-                                       bcopy(fromredact, dspp.fromredactsnaps,
+                                       memcpy(dspp.fromredactsnaps, fromredact,
                                            size);
                                }
                                if (!dsl_dataset_is_before(dspp.to_ds, fromds,
                                    0)) {
                                        err = SET_ERROR(EXDEV);
                                } else {
-                                       ASSERT3U(dspp.is_clone, ==,
-                                           (dspp.to_ds->ds_dir !=
-                                           fromds->ds_dir));
                                        zb->zbm_creation_txg =
                                            dsl_dataset_phys(fromds)->
                                            ds_creation_txg;
@@ -2810,6 +2934,10 @@ dmu_send(const char *tosnap, const char *fromsnap, boolean_t embedok,
                        /* dmu_send_impl will call dsl_pool_rele for us. */
                        err = dmu_send_impl(&dspp);
                } else {
+                       if (dspp.fromredactsnaps)
+                               kmem_free(dspp.fromredactsnaps,
+                                   dspp.numfromredactsnaps *
+                                   sizeof (uint64_t));
                        dsl_pool_rele(dspp.dp, FTAG);
                }
        } else {
@@ -2880,37 +3008,80 @@ dmu_adjust_send_estimate_for_indirects(dsl_dataset_t *ds, uint64_t uncompressed,
 }
 
 int
-dmu_send_estimate_fast(dsl_dataset_t *ds, dsl_dataset_t *fromds,
-    zfs_bookmark_phys_t *frombook, boolean_t stream_compressed, uint64_t *sizep)
+dmu_send_estimate_fast(dsl_dataset_t *origds, dsl_dataset_t *fromds,
+    zfs_bookmark_phys_t *frombook, boolean_t stream_compressed,
+    boolean_t saved, uint64_t *sizep)
 {
        int err;
+       dsl_dataset_t *ds = origds;
        uint64_t uncomp, comp;
 
-       ASSERT(dsl_pool_config_held(ds->ds_dir->dd_pool));
+       ASSERT(dsl_pool_config_held(origds->ds_dir->dd_pool));
        ASSERT(fromds == NULL || frombook == NULL);
 
-       /* tosnap must be a snapshot */
-       if (!ds->ds_is_snapshot)
+       /*
+        * If this is a saved send we may actually be sending
+        * from the %recv clone used for resuming.
+        */
+       if (saved) {
+               objset_t *mos = origds->ds_dir->dd_pool->dp_meta_objset;
+               uint64_t guid;
+               char dsname[ZFS_MAX_DATASET_NAME_LEN + 6];
+
+               dsl_dataset_name(origds, dsname);
+               (void) strcat(dsname, "/");
+               (void) strlcat(dsname, recv_clone_name, sizeof (dsname));
+
+               err = dsl_dataset_hold(origds->ds_dir->dd_pool,
+                   dsname, FTAG, &ds);
+               if (err != ENOENT && err != 0) {
+                       return (err);
+               } else if (err == ENOENT) {
+                       ds = origds;
+               }
+
+               /* check that this dataset has partially received data */
+               err = zap_lookup(mos, ds->ds_object,
+                   DS_FIELD_RESUME_TOGUID, 8, 1, &guid);
+               if (err != 0) {
+                       err = SET_ERROR(err == ENOENT ? EINVAL : err);
+                       goto out;
+               }
+
+               err = zap_lookup(mos, ds->ds_object,
+                   DS_FIELD_RESUME_TONAME, 1, sizeof (dsname), dsname);
+               if (err != 0) {
+                       err = SET_ERROR(err == ENOENT ? EINVAL : err);
+                       goto out;
+               }
+       }
+
+       /* tosnap must be a snapshot or the target of a saved send */
+       if (!ds->ds_is_snapshot && ds == origds)
                return (SET_ERROR(EINVAL));
 
        if (fromds != NULL) {
                uint64_t used;
-               if (!fromds->ds_is_snapshot)
-                       return (SET_ERROR(EINVAL));
+               if (!fromds->ds_is_snapshot) {
+                       err = SET_ERROR(EINVAL);
+                       goto out;
+               }
 
-               if (!dsl_dataset_is_before(ds, fromds, 0))
-                       return (SET_ERROR(EXDEV));
+               if (!dsl_dataset_is_before(ds, fromds, 0)) {
+                       err = SET_ERROR(EXDEV);
+                       goto out;
+               }
 
                err = dsl_dataset_space_written(fromds, ds, &used, &comp,
                    &uncomp);
                if (err != 0)
-                       return (err);
+                       goto out;
        } else if (frombook != NULL) {
                uint64_t used;
                err = dsl_dataset_space_written_bookmark(frombook, ds, &used,
                    &comp, &uncomp);
                if (err != 0)
-                       return (err);
+                       goto out;
        } else {
                uncomp = dsl_dataset_phys(ds)->ds_uncompressed_bytes;
                comp = dsl_dataset_phys(ds)->ds_compressed_bytes;
@@ -2922,32 +3093,30 @@ dmu_send_estimate_fast(dsl_dataset_t *ds, dsl_dataset_t *fromds,
         * Add the size of the BEGIN and END records to the estimate.
         */
        *sizep += 2 * sizeof (dmu_replay_record_t);
+
+out:
+       if (ds != origds)
+               dsl_dataset_rele(ds, FTAG);
        return (err);
 }
 
-#if defined(_KERNEL)
-module_param(zfs_send_corrupt_data, int, 0644);
-MODULE_PARM_DESC(zfs_send_corrupt_data, "Allow sending corrupt data");
+ZFS_MODULE_PARAM(zfs_send, zfs_send_, corrupt_data, INT, ZMOD_RW,
+       "Allow sending corrupt data");
 
-module_param(zfs_send_queue_length, int, 0644);
-MODULE_PARM_DESC(zfs_send_queue_length, "Maximum send queue length");
+ZFS_MODULE_PARAM(zfs_send, zfs_send_, queue_length, UINT, ZMOD_RW,
+       "Maximum send queue length");
 
-module_param(zfs_send_unmodified_spill_blocks, int, 0644);
-MODULE_PARM_DESC(zfs_send_unmodified_spill_blocks,
+ZFS_MODULE_PARAM(zfs_send, zfs_send_, unmodified_spill_blocks, INT, ZMOD_RW,
        "Send unmodified spill blocks");
 
-module_param(zfs_send_no_prefetch_queue_length, int, 0644);
-MODULE_PARM_DESC(zfs_send_no_prefetch_queue_length,
+ZFS_MODULE_PARAM(zfs_send, zfs_send_, no_prefetch_queue_length, UINT, ZMOD_RW,
        "Maximum send queue length for non-prefetch queues");
 
-module_param(zfs_send_queue_ff, int, 0644);
-MODULE_PARM_DESC(zfs_send_queue_ff, "Send queue fill fraction");
+ZFS_MODULE_PARAM(zfs_send, zfs_send_, queue_ff, UINT, ZMOD_RW,
+       "Send queue fill fraction");
 
-module_param(zfs_send_no_prefetch_queue_ff, int, 0644);
-MODULE_PARM_DESC(zfs_send_no_prefetch_queue_ff,
+ZFS_MODULE_PARAM(zfs_send, zfs_send_, no_prefetch_queue_ff, UINT, ZMOD_RW,
        "Send queue fill fraction for non-prefetch queues");
 
-module_param(zfs_override_estimate_recordsize, int, 0644);
-MODULE_PARM_DESC(zfs_override_estimate_recordsize,
+ZFS_MODULE_PARAM(zfs_send, zfs_, override_estimate_recordsize, UINT, ZMOD_RW,
        "Override block size estimate with fixed size");
-#endif