]> git.proxmox.com Git - mirror_zfs.git/commitdiff
Improve zfs send performance by bypassing the ARC
authorMatthew Ahrens <mahrens@delphix.com>
Tue, 10 Mar 2020 17:51:04 +0000 (10:51 -0700)
committerGitHub <noreply@github.com>
Tue, 10 Mar 2020 17:51:04 +0000 (10:51 -0700)
When doing a zfs send on a dataset with small recordsize (e.g. 8K),
performance is dominated by the per-block overheads.  This is especially
true with `zfs send --compressed`, which further reduces the amount of
data sent, for the same number of blocks.  Several threads are involved,
but the limiting factor is the `send_prefetch` thread, which is 100% on
CPU.

The main job of the `send_prefetch` thread is to issue zio's for the
data that will be needed by the main thread.  It does this by calling
`arc_read(ARC_FLAG_PREFETCH)`.  This has an immediate cost of creating
an arc_hdr, which takes around 14% of one CPU.  It also induces later
costs by other threads:

 * Since the data was only prefetched, dmu_send()->dmu_dump_write() will
   need to call arc_read() again to get the data.  This will have to
   look up the arc_hdr in the hash table and copy the data from the
   scatter ABD in the arc_hdr to a linear ABD in arc_buf.  This takes
   27% of one CPU.

 * dmu_dump_write() needs to arc_buf_destroy()  This takes 11% of one
   CPU.

 * arc_adjust() will need to evict this arc_hdr, taking about 50% of one
   CPU.

All of these costs can be avoided by bypassing the ARC if the data is
not already cached.  This commit changes `zfs send` to check for the
data in the ARC, and if it is not found then we directly call
`zio_read()`, reading the data into a linear ABD which is used by
dmu_dump_write() directly.

The performance improvement is best expressed in terms of how many
blocks can be processed by `zfs send` in one second.  This change
increases the metric by 50%, from ~100,000 to ~150,000.  When the amount
of data per block is small (e.g. 2KB), there is a corresponding
reduction in the elapsed time of `zfs send >/dev/null` (from 86 minutes
to 58 minutes in this test case).

In addition to improving the performance of `zfs send`, this change
makes `zfs send` not pollute the ARC cache.  In most cases the data will
not be reused, so this allows us to keep caching useful data in the MRU
(hit-once) part of the ARC.

Reviewed-by: Paul Dagnelie <pcd@delphix.com>
Reviewed-by: Serapheim Dimitropoulos <serapheim@delphix.com>
Reviewed-by: Brian Behlendorf <behlendorf1@llnl.gov>
Signed-off-by: Matthew Ahrens <mahrens@delphix.com>
Closes #10067

include/sys/arc.h
include/sys/arc_impl.h
module/zfs/arc.c
module/zfs/dmu_send.c

index f6dea3fbd045558320b5d0dfca2636931fa56c19..6f56f732ad87796f82000dc9f84026dba402cef3 100644 (file)
@@ -146,6 +146,12 @@ typedef enum arc_flags
        ARC_FLAG_COMPRESSED_ARC         = 1 << 20,
        ARC_FLAG_SHARED_DATA            = 1 << 21,
 
+       /*
+        * Fail this arc_read() (with ENOENT) if the data is not already present
+        * in cache.
+        */
+       ARC_FLAG_CACHED_ONLY            = 1 << 22,
+
        /*
         * The arc buffer's compression mode is stored in the top 7 bits of the
         * flags field, so these dummy flags are included so that MDB can
index 8eeee54c90557948f675361b746d9268074217d9..c55640f8ba4c17d83af8e7ddf04aa4d06ba67423 100644 (file)
@@ -554,6 +554,7 @@ typedef struct arc_stats {
        kstat_named_t arcstat_need_free;
        kstat_named_t arcstat_sys_free;
        kstat_named_t arcstat_raw_size;
+       kstat_named_t arcstat_cached_only_in_progress;
 } arc_stats_t;
 
 typedef enum free_memory_reason_t {
index 3df53d2db21a6e14a20398e35651e5a351beb27d..d49d85db0383f9d666142912488607a49d061ef8 100644 (file)
@@ -548,7 +548,8 @@ arc_stats_t arc_stats = {
        { "demand_hit_prescient_prefetch", KSTAT_DATA_UINT64 },
        { "arc_need_free",              KSTAT_DATA_UINT64 },
        { "arc_sys_free",               KSTAT_DATA_UINT64 },
-       { "arc_raw_size",               KSTAT_DATA_UINT64 }
+       { "arc_raw_size",               KSTAT_DATA_UINT64 },
+       { "cached_only_in_progress",    KSTAT_DATA_UINT64 },
 };
 
 #define        ARCSTAT_MAX(stat, val) {                                        \
@@ -5563,6 +5564,13 @@ top:
                if (HDR_IO_IN_PROGRESS(hdr)) {
                        zio_t *head_zio = hdr->b_l1hdr.b_acb->acb_zio_head;
 
+                       if (*arc_flags & ARC_FLAG_CACHED_ONLY) {
+                               mutex_exit(hash_lock);
+                               ARCSTAT_BUMP(arcstat_cached_only_in_progress);
+                               rc = SET_ERROR(ENOENT);
+                               goto out;
+                       }
+
                        ASSERT3P(head_zio, !=, NULL);
                        if ((hdr->b_flags & ARC_FLAG_PRIO_ASYNC_READ) &&
                            priority == ZIO_PRIORITY_SYNC_READ) {
@@ -5698,12 +5706,21 @@ top:
                uint64_t size;
                abd_t *hdr_abd;
 
+               if (*arc_flags & ARC_FLAG_CACHED_ONLY) {
+                       rc = SET_ERROR(ENOENT);
+                       if (hash_lock != NULL)
+                               mutex_exit(hash_lock);
+                       goto out;
+               }
+
                /*
                 * Gracefully handle a damaged logical block size as a
                 * checksum error.
                 */
                if (lsize > spa_maxblocksize(spa)) {
                        rc = SET_ERROR(ECKSUM);
+                       if (hash_lock != NULL)
+                               mutex_exit(hash_lock);
                        goto out;
                }
 
index f3630a0cf133730ae0a0ac1e299248c8f2b8bab6..2c7dca23edd9147dc9fa2c3c8154345e981efef3 100644 (file)
@@ -156,8 +156,15 @@ struct send_range {
        union {
                struct srd {
                        dmu_object_type_t       obj_type;
-                       uint32_t                datablksz;
+                       uint32_t                datablksz; // logical size
+                       uint32_t                datasz; // payload size
                        blkptr_t                bp;
+                       arc_buf_t               *abuf;
+                       abd_t                   *abd;
+                       kmutex_t                lock;
+                       kcondvar_t              cv;
+                       boolean_t               io_outstanding;
+                       int                     io_err;
                } data;
                struct srh {
                        uint32_t                datablksz;
@@ -222,6 +229,20 @@ range_free(struct send_range *range)
                size_t size = sizeof (dnode_phys_t) *
                    (range->sru.object.dnp->dn_extra_slots + 1);
                kmem_free(range->sru.object.dnp, size);
+       } else if (range->type == DATA) {
+               mutex_enter(&range->sru.data.lock);
+               while (range->sru.data.io_outstanding)
+                       cv_wait(&range->sru.data.cv, &range->sru.data.lock);
+               if (range->sru.data.abd != NULL)
+                       abd_free(range->sru.data.abd);
+               if (range->sru.data.abuf != NULL) {
+                       arc_buf_destroy(range->sru.data.abuf,
+                           &range->sru.data.abuf);
+               }
+               mutex_exit(&range->sru.data.lock);
+
+               cv_destroy(&range->sru.data.cv);
+               mutex_destroy(&range->sru.data.lock);
        }
        kmem_free(range, sizeof (*range));
 }
@@ -830,7 +851,7 @@ dump_object_range(dmu_send_cookie_t *dscp, const blkptr_t *bp,
 }
 
 static boolean_t
-send_do_embed(dmu_send_cookie_t *dscp, const blkptr_t *bp)
+send_do_embed(const blkptr_t *bp, uint64_t featureflags)
 {
        if (!BP_IS_EMBEDDED(bp))
                return (B_FALSE);
@@ -839,7 +860,7 @@ send_do_embed(dmu_send_cookie_t *dscp, const blkptr_t *bp)
         * Compression function must be legacy, or explicitly enabled.
         */
        if ((BP_GET_COMPRESS(bp) >= ZIO_COMPRESS_LEGACY_FUNCTIONS &&
-           !(dscp->dsc_featureflags & DMU_BACKUP_FEATURE_LZ4)))
+           !(featureflags & DMU_BACKUP_FEATURE_LZ4)))
                return (B_FALSE);
 
        /*
@@ -847,7 +868,7 @@ send_do_embed(dmu_send_cookie_t *dscp, const blkptr_t *bp)
         */
        switch (BPE_GET_ETYPE(bp)) {
        case BP_EMBEDDED_TYPE_DATA:
-               if (dscp->dsc_featureflags & DMU_BACKUP_FEATURE_EMBED_DATA)
+               if (featureflags & DMU_BACKUP_FEATURE_EMBED_DATA)
                        return (B_TRUE);
                break;
        default:
@@ -858,8 +879,8 @@ send_do_embed(dmu_send_cookie_t *dscp, const blkptr_t *bp)
 
 /*
  * This function actually handles figuring out what kind of record needs to be
- * dumped, reading the data (which has hopefully been prefetched), and calling
- * the appropriate helper function.
+ * dumped, and calling the appropriate helper function.  In most cases,
+ * the data has already been read by send_reader_thread().
  */
 static int
 do_dump(dmu_send_cookie_t *dscp, struct send_range *range)
@@ -894,7 +915,6 @@ do_dump(dmu_send_cookie_t *dscp, struct send_range *range)
                blkptr_t *bp = &srdp->bp;
                spa_t *spa =
                    dmu_objset_spa(dscp->dsc_os);
-               arc_buf_t *abuf = NULL;
 
                ASSERT3U(srdp->datablksz, ==, BP_GET_LSIZE(bp));
                ASSERT3U(range->start_blkid + 1, ==, range->end_blkid);
@@ -914,6 +934,7 @@ do_dump(dmu_send_cookie_t *dscp, struct send_range *range)
                        zb.zb_level = 0;
                        zb.zb_blkid = range->start_blkid;
 
+                       arc_buf_t *abuf = NULL;
                        if (!dscp->dsc_dso->dso_dryrun && arc_read(NULL, spa,
                            bp, arc_getbuf_func, &abuf, ZIO_PRIORITY_ASYNC_READ,
                            zioflags, &aflags, &zb) != 0)
@@ -925,7 +946,7 @@ do_dump(dmu_send_cookie_t *dscp, struct send_range *range)
                                arc_buf_destroy(abuf, &abuf);
                        return (err);
                }
-               if (send_do_embed(dscp, bp)) {
+               if (send_do_embed(bp, dscp->dsc_featureflags)) {
                        err = dump_write_embedded(dscp, range->object,
                            range->start_blkid * srdp->datablksz,
                            srdp->datablksz, bp);
@@ -936,70 +957,24 @@ do_dump(dmu_send_cookie_t *dscp, struct send_range *range)
                    range->start_blkid * srdp->datablksz >=
                    dscp->dsc_resume_offset));
                /* it's a level-0 block of a regular object */
-               arc_flags_t aflags = ARC_FLAG_WAIT;
-               uint64_t offset;
-
-               /*
-                * If we have large blocks stored on disk but the send flags
-                * don't allow us to send large blocks, we split the data from
-                * the arc buf into chunks.
-                */
-               boolean_t split_large_blocks =
-                   srdp->datablksz > SPA_OLD_MAXBLOCKSIZE &&
-                   !(dscp->dsc_featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS);
-
-               /*
-                * Raw sends require that we always get raw data as it exists
-                * on disk, so we assert that we are not splitting blocks here.
-                */
-               boolean_t request_raw =
-                   (dscp->dsc_featureflags & DMU_BACKUP_FEATURE_RAW) != 0;
-
-               /*
-                * We should only request compressed data from the ARC if all
-                * the following are true:
-                *  - stream compression was requested
-                *  - we aren't splitting large blocks into smaller chunks
-                *  - the data won't need to be byteswapped before sending
-                *  - this isn't an embedded block
-                *  - this isn't metadata (if receiving on a different endian
-                *    system it can be byteswapped more easily)
-                */
-               boolean_t request_compressed =
-                   (dscp->dsc_featureflags & DMU_BACKUP_FEATURE_COMPRESSED) &&
-                   !split_large_blocks && !BP_SHOULD_BYTESWAP(bp) &&
-                   !BP_IS_EMBEDDED(bp) && !DMU_OT_IS_METADATA(BP_GET_TYPE(bp));
-
-               IMPLY(request_raw, !split_large_blocks);
-               IMPLY(request_raw, BP_IS_PROTECTED(bp));
-               if (!dscp->dsc_dso->dso_dryrun) {
-                       enum zio_flag zioflags = ZIO_FLAG_CANFAIL;
-
-                       ASSERT3U(srdp->datablksz, ==, BP_GET_LSIZE(bp));
-
-                       if (request_raw)
-                               zioflags |= ZIO_FLAG_RAW;
-                       else if (request_compressed)
-                               zioflags |= ZIO_FLAG_RAW_COMPRESS;
-                       zbookmark_phys_t zb;
-                       zb.zb_objset = dmu_objset_id(dscp->dsc_os);
-                       zb.zb_object = range->object;
-                       zb.zb_level = 0;
-                       zb.zb_blkid = range->start_blkid;
 
-                       err = arc_read(NULL, spa, bp, arc_getbuf_func, &abuf,
-                           ZIO_PRIORITY_ASYNC_READ, zioflags, &aflags, &zb);
-               }
+               mutex_enter(&srdp->lock);
+               while (srdp->io_outstanding)
+                       cv_wait(&srdp->cv, &srdp->lock);
+               err = srdp->io_err;
+               mutex_exit(&srdp->lock);
 
                if (err != 0) {
                        if (zfs_send_corrupt_data &&
                            !dscp->dsc_dso->dso_dryrun) {
-                               /* Send a block filled with 0x"zfs badd bloc" */
-                               abuf = arc_alloc_buf(spa, &abuf, ARC_BUFC_DATA,
-                                   srdp->datablksz);
+                               /*
+                                * Send a block filled with 0x"zfs badd bloc"
+                                */
+                               srdp->abuf = arc_alloc_buf(spa, &srdp->abuf,
+                                   ARC_BUFC_DATA, srdp->datablksz);
                                uint64_t *ptr;
-                               for (ptr = abuf->b_data;
-                                   (char *)ptr < (char *)abuf->b_data +
+                               for (ptr = srdp->abuf->b_data;
+                                   (char *)ptr < (char *)srdp->abuf->b_data +
                                    srdp->datablksz; ptr++)
                                        *ptr = 0x2f5baddb10cULL;
                        } else {
@@ -1007,41 +982,47 @@ do_dump(dmu_send_cookie_t *dscp, struct send_range *range)
                        }
                }
 
-               offset = range->start_blkid * srdp->datablksz;
+               ASSERT(dscp->dsc_dso->dso_dryrun ||
+                   srdp->abuf != NULL || srdp->abd != NULL);
+
+               uint64_t offset = range->start_blkid * srdp->datablksz;
+
+               char *data = NULL;
+               if (srdp->abd != NULL) {
+                       data = abd_to_buf(srdp->abd);
+                       ASSERT3P(srdp->abuf, ==, NULL);
+               } else if (srdp->abuf != NULL) {
+                       data = srdp->abuf->b_data;
+               }
 
-               if (split_large_blocks) {
-                       ASSERT0(arc_is_encrypted(abuf));
-                       ASSERT3U(arc_get_compression(abuf), ==,
-                           ZIO_COMPRESS_OFF);
-                       char *buf = abuf->b_data;
+               /*
+                * If we have large blocks stored on disk but the send flags
+                * don't allow us to send large blocks, we split the data from
+                * the arc buf into chunks.
+                */
+               if (srdp->datablksz > SPA_OLD_MAXBLOCKSIZE &&
+                   !(dscp->dsc_featureflags &
+                   DMU_BACKUP_FEATURE_LARGE_BLOCKS)) {
                        while (srdp->datablksz > 0 && err == 0) {
                                int n = MIN(srdp->datablksz,
                                    SPA_OLD_MAXBLOCKSIZE);
                                err = dmu_dump_write(dscp, srdp->obj_type,
-                                   range->object, offset, n, n, NULL, buf);
+                                   range->object, offset, n, n, NULL, data);
                                offset += n;
-                               buf += n;
+                               /*
+                                * When doing dry run, data==NULL is used as a
+                                * sentinel value by
+                                * dmu_dump_write()->dump_record().
+                                */
+                               if (data != NULL)
+                                       data += n;
                                srdp->datablksz -= n;
                        }
                } else {
-                       int psize;
-                       if (abuf != NULL) {
-                               psize = arc_buf_size(abuf);
-                               if (arc_get_compression(abuf) !=
-                                   ZIO_COMPRESS_OFF) {
-                                       ASSERT3S(psize, ==, BP_GET_PSIZE(bp));
-                               }
-                       } else if (!request_compressed) {
-                               psize = srdp->datablksz;
-                       } else {
-                               psize = BP_GET_PSIZE(bp);
-                       }
                        err = dmu_dump_write(dscp, srdp->obj_type,
-                           range->object, offset, srdp->datablksz, psize, bp,
-                           (abuf == NULL ? NULL : abuf->b_data));
+                           range->object, offset,
+                           srdp->datablksz, srdp->datasz, bp, data);
                }
-               if (abuf != NULL)
-                       arc_buf_destroy(abuf, &abuf);
                return (err);
        }
        case HOLE: {
@@ -1086,6 +1067,14 @@ range_alloc(enum type type, uint64_t object, uint64_t start_blkid,
        range->start_blkid = start_blkid;
        range->end_blkid = end_blkid;
        range->eos_marker = eos;
+       if (type == DATA) {
+               range->sru.data.abd = NULL;
+               range->sru.data.abuf = NULL;
+               mutex_init(&range->sru.data.lock, NULL, MUTEX_DEFAULT, NULL);
+               cv_init(&range->sru.data.cv, NULL, CV_DEFAULT, NULL);
+               range->sru.data.io_outstanding = 0;
+               range->sru.data.io_err = 0;
+       }
        return (range);
 }
 
@@ -1596,19 +1585,115 @@ send_merge_thread(void *arg)
        thread_exit();
 }
 
-struct send_prefetch_thread_arg {
+struct send_reader_thread_arg {
        struct send_merge_thread_arg *smta;
        bqueue_t q;
        boolean_t cancel;
-       boolean_t issue_prefetches;
+       boolean_t issue_reads;
+       uint64_t featureflags;
        int error;
 };
 
+static void
+dmu_send_read_done(zio_t *zio)
+{
+       struct send_range *range = zio->io_private;
+
+       mutex_enter(&range->sru.data.lock);
+       if (zio->io_error != 0) {
+               abd_free(range->sru.data.abd);
+               range->sru.data.abd = NULL;
+               range->sru.data.io_err = zio->io_error;
+       }
+
+       ASSERT(range->sru.data.io_outstanding);
+       range->sru.data.io_outstanding = B_FALSE;
+       cv_broadcast(&range->sru.data.cv);
+       mutex_exit(&range->sru.data.lock);
+}
+
+static void
+issue_data_read(struct send_reader_thread_arg *srta, struct send_range *range)
+{
+       struct srd *srdp = &range->sru.data;
+       blkptr_t *bp = &srdp->bp;
+       objset_t *os = srta->smta->os;
+
+       ASSERT3U(range->type, ==, DATA);
+       ASSERT3U(range->start_blkid + 1, ==, range->end_blkid);
+       /*
+        * If we have large blocks stored on disk but
+        * the send flags don't allow us to send large
+        * blocks, we split the data from the arc buf
+        * into chunks.
+        */
+       boolean_t split_large_blocks =
+           srdp->datablksz > SPA_OLD_MAXBLOCKSIZE &&
+           !(srta->featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS);
+       /*
+        * We should only request compressed data from the ARC if all
+        * the following are true:
+        *  - stream compression was requested
+        *  - we aren't splitting large blocks into smaller chunks
+        *  - the data won't need to be byteswapped before sending
+        *  - this isn't an embedded block
+        *  - this isn't metadata (if receiving on a different endian
+        *    system it can be byteswapped more easily)
+        */
+       boolean_t request_compressed =
+           (srta->featureflags & DMU_BACKUP_FEATURE_COMPRESSED) &&
+           !split_large_blocks && !BP_SHOULD_BYTESWAP(bp) &&
+           !BP_IS_EMBEDDED(bp) && !DMU_OT_IS_METADATA(BP_GET_TYPE(bp));
+
+       enum zio_flag zioflags = ZIO_FLAG_CANFAIL;
+
+       if (srta->featureflags & DMU_BACKUP_FEATURE_RAW)
+               zioflags |= ZIO_FLAG_RAW;
+       else if (request_compressed)
+               zioflags |= ZIO_FLAG_RAW_COMPRESS;
+
+       srdp->datasz = (zioflags & ZIO_FLAG_RAW_COMPRESS) ?
+           BP_GET_PSIZE(bp) : BP_GET_LSIZE(bp);
+
+       if (!srta->issue_reads)
+               return;
+       if (BP_IS_REDACTED(bp))
+               return;
+       if (send_do_embed(bp, srta->featureflags))
+               return;
+
+       zbookmark_phys_t zb = {
+           .zb_objset = dmu_objset_id(os),
+           .zb_object = range->object,
+           .zb_level = 0,
+           .zb_blkid = range->start_blkid,
+       };
+
+       arc_flags_t aflags = ARC_FLAG_CACHED_ONLY;
+
+       int arc_err = arc_read(NULL, os->os_spa, bp,
+           arc_getbuf_func, &srdp->abuf, ZIO_PRIORITY_ASYNC_READ,
+           zioflags, &aflags, &zb);
+       /*
+        * If the data is not already cached in the ARC, we read directly
+        * from zio.  This avoids the performance overhead of adding a new
+        * entry to the ARC, and we also avoid polluting the ARC cache with
+        * data that is not likely to be used in the future.
+        */
+       if (arc_err != 0) {
+               srdp->abd = abd_alloc_linear(srdp->datasz, B_FALSE);
+               srdp->io_outstanding = B_TRUE;
+               zio_nowait(zio_read(NULL, os->os_spa, bp, srdp->abd,
+                   srdp->datasz, dmu_send_read_done, range,
+                   ZIO_PRIORITY_ASYNC_READ, zioflags, &zb));
+       }
+}
+
 /*
  * Create a new record with the given values.
  */
 static void
-enqueue_range(struct send_prefetch_thread_arg *spta, bqueue_t *q, dnode_t *dn,
+enqueue_range(struct send_reader_thread_arg *srta, bqueue_t *q, dnode_t *dn,
     uint64_t blkid, uint64_t count, const blkptr_t *bp, uint32_t datablksz)
 {
        enum type range_type = (bp == NULL || BP_IS_HOLE(bp) ? HOLE :
@@ -1629,18 +1714,7 @@ enqueue_range(struct send_prefetch_thread_arg *spta, bqueue_t *q, dnode_t *dn,
                range->sru.data.datablksz = datablksz;
                range->sru.data.obj_type = dn->dn_type;
                range->sru.data.bp = *bp;
-               if (spta->issue_prefetches) {
-                       zbookmark_phys_t zb = {0};
-                       zb.zb_objset = dmu_objset_id(dn->dn_objset);
-                       zb.zb_object = dn->dn_object;
-                       zb.zb_level = 0;
-                       zb.zb_blkid = blkid;
-                       arc_flags_t aflags = ARC_FLAG_NOWAIT |
-                           ARC_FLAG_PREFETCH;
-                       (void) arc_read(NULL, dn->dn_objset->os_spa, bp, NULL,
-                           NULL, ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_CANFAIL |
-                           ZIO_FLAG_SPECULATIVE, &aflags, &zb);
-               }
+               issue_data_read(srta, range);
                break;
        case REDACT:
                range->sru.redact.datablksz = datablksz;
@@ -1659,12 +1733,12 @@ enqueue_range(struct send_prefetch_thread_arg *spta, bqueue_t *q, dnode_t *dn,
  * it issues prefetches for the data we need to send.
  */
 static void
-send_prefetch_thread(void *arg)
+send_reader_thread(void *arg)
 {
-       struct send_prefetch_thread_arg *spta = arg;
-       struct send_merge_thread_arg *smta = spta->smta;
+       struct send_reader_thread_arg *srta = arg;
+       struct send_merge_thread_arg *smta = srta->smta;
        bqueue_t *inq = &smta->q;
-       bqueue_t *outq = &spta->q;
+       bqueue_t *outq = &srta->q;
        objset_t *os = smta->os;
        fstrans_cookie_t cookie = spl_fstrans_mark();
        struct send_range *range = bqueue_dequeue(inq);
@@ -1680,30 +1754,14 @@ send_prefetch_thread(void *arg)
         */
        uint64_t last_obj = UINT64_MAX;
        uint64_t last_obj_exists = B_TRUE;
-       while (!range->eos_marker && !spta->cancel && smta->error == 0 &&
+       while (!range->eos_marker && !srta->cancel && smta->error == 0 &&
            err == 0) {
                switch (range->type) {
-               case DATA: {
-                       zbookmark_phys_t zb;
-                       zb.zb_objset = dmu_objset_id(os);
-                       zb.zb_object = range->object;
-                       zb.zb_level = 0;
-                       zb.zb_blkid = range->start_blkid;
-                       ASSERT3U(range->start_blkid + 1, ==, range->end_blkid);
-                       if (!BP_IS_REDACTED(&range->sru.data.bp) &&
-                           spta->issue_prefetches &&
-                           !BP_IS_EMBEDDED(&range->sru.data.bp)) {
-                               arc_flags_t aflags = ARC_FLAG_NOWAIT |
-                                   ARC_FLAG_PREFETCH;
-                               (void) arc_read(NULL, os->os_spa,
-                                   &range->sru.data.bp, NULL, NULL,
-                                   ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_CANFAIL |
-                                   ZIO_FLAG_SPECULATIVE, &aflags, &zb);
-                       }
+               case DATA:
+                       issue_data_read(srta, range);
                        bqueue_enqueue(outq, range, range->sru.data.datablksz);
                        range = get_next_range_nofree(inq, range);
                        break;
-               }
                case HOLE:
                case OBJECT:
                case OBJECT_RANGE:
@@ -1805,7 +1863,7 @@ send_prefetch_thread(void *arg)
                                            datablksz);
                                        uint64_t nblks = (offset / datablksz) -
                                            blkid;
-                                       enqueue_range(spta, outq, dn, blkid,
+                                       enqueue_range(srta, outq, dn, blkid,
                                            nblks, NULL, datablksz);
                                        blkid += nblks;
                                }
@@ -1816,7 +1874,7 @@ send_prefetch_thread(void *arg)
                                if (err != 0)
                                        break;
                                ASSERT(!BP_IS_HOLE(&bp));
-                               enqueue_range(spta, outq, dn, blkid, 1, &bp,
+                               enqueue_range(srta, outq, dn, blkid, 1, &bp,
                                    datablksz);
                        }
                        rw_exit(&dn->dn_struct_rwlock);
@@ -1825,11 +1883,11 @@ send_prefetch_thread(void *arg)
                }
                }
        }
-       if (spta->cancel || err != 0) {
+       if (srta->cancel || err != 0) {
                smta->cancel = B_TRUE;
-               spta->error = err;
+               srta->error = err;
        } else if (smta->error != 0) {
-               spta->error = smta->error;
+               srta->error = smta->error;
        }
        while (!range->eos_marker)
                range = get_next_range(inq, range);
@@ -2052,15 +2110,17 @@ setup_merge_thread(struct send_merge_thread_arg *smt_arg,
 }
 
 static void
-setup_prefetch_thread(struct send_prefetch_thread_arg *spt_arg,
-    struct dmu_send_params *dspp, struct send_merge_thread_arg *smt_arg)
+setup_reader_thread(struct send_reader_thread_arg *srt_arg,
+    struct dmu_send_params *dspp, struct send_merge_thread_arg *smt_arg,
+    uint64_t featureflags)
 {
-       VERIFY0(bqueue_init(&spt_arg->q, zfs_send_queue_ff,
+       VERIFY0(bqueue_init(&srt_arg->q, zfs_send_queue_ff,
            MAX(zfs_send_queue_length, 2 * zfs_max_recordsize),
            offsetof(struct send_range, ln)));
-       spt_arg->smta = smt_arg;
-       spt_arg->issue_prefetches = !dspp->dso->dso_dryrun;
-       (void) thread_create(NULL, 0, send_prefetch_thread, spt_arg, 0,
+       srt_arg->smta = smt_arg;
+       srt_arg->issue_reads = !dspp->dso->dso_dryrun;
+       srt_arg->featureflags = featureflags;
+       (void) thread_create(NULL, 0, send_reader_thread, srt_arg, 0,
            curproc, TS_RUN, minclsyspri);
 }
 
@@ -2265,7 +2325,7 @@ dmu_send_impl(struct dmu_send_params *dspp)
        struct send_thread_arg *to_arg;
        struct redact_list_thread_arg *rlt_arg;
        struct send_merge_thread_arg *smt_arg;
-       struct send_prefetch_thread_arg *spt_arg;
+       struct send_reader_thread_arg *srt_arg;
        struct send_range *range;
        redaction_list_t *from_rl = NULL;
        redaction_list_t *redact_rl = NULL;
@@ -2348,7 +2408,7 @@ dmu_send_impl(struct dmu_send_params *dspp)
        to_arg = kmem_zalloc(sizeof (*to_arg), KM_SLEEP);
        rlt_arg = kmem_zalloc(sizeof (*rlt_arg), KM_SLEEP);
        smt_arg = kmem_zalloc(sizeof (*smt_arg), KM_SLEEP);
-       spt_arg = kmem_zalloc(sizeof (*spt_arg), KM_SLEEP);
+       srt_arg = kmem_zalloc(sizeof (*srt_arg), KM_SLEEP);
 
        drr = create_begin_record(dspp, os, featureflags);
        dssp = setup_send_progress(dspp);
@@ -2457,12 +2517,12 @@ dmu_send_impl(struct dmu_send_params *dspp)
        setup_from_thread(from_arg, from_rl, dssp);
        setup_redact_list_thread(rlt_arg, dspp, redact_rl, dssp);
        setup_merge_thread(smt_arg, dspp, from_arg, to_arg, rlt_arg, os);
-       setup_prefetch_thread(spt_arg, dspp, smt_arg);
+       setup_reader_thread(srt_arg, dspp, smt_arg, featureflags);
 
-       range = bqueue_dequeue(&spt_arg->q);
+       range = bqueue_dequeue(&srt_arg->q);
        while (err == 0 && !range->eos_marker) {
                err = do_dump(&dsc, range);
-               range = get_next_range(&spt_arg->q, range);
+               range = get_next_range(&srt_arg->q, range);
                if (issig(JUSTLOOKING) && issig(FORREAL))
                        err = SET_ERROR(EINTR);
        }
@@ -2474,22 +2534,22 @@ dmu_send_impl(struct dmu_send_params *dspp)
         * pending records before exiting.
         */
        if (err != 0) {
-               spt_arg->cancel = B_TRUE;
+               srt_arg->cancel = B_TRUE;
                while (!range->eos_marker) {
-                       range = get_next_range(&spt_arg->q, range);
+                       range = get_next_range(&srt_arg->q, range);
                }
        }
        range_free(range);
 
-       bqueue_destroy(&spt_arg->q);
+       bqueue_destroy(&srt_arg->q);
        bqueue_destroy(&smt_arg->q);
        if (dspp->redactbook != NULL)
                bqueue_destroy(&rlt_arg->q);
        bqueue_destroy(&to_arg->q);
        bqueue_destroy(&from_arg->q);
 
-       if (err == 0 && spt_arg->error != 0)
-               err = spt_arg->error;
+       if (err == 0 && srt_arg->error != 0)
+               err = srt_arg->error;
 
        if (err != 0)
                goto out;
@@ -2532,7 +2592,7 @@ out:
        kmem_free(to_arg, sizeof (*to_arg));
        kmem_free(rlt_arg, sizeof (*rlt_arg));
        kmem_free(smt_arg, sizeof (*smt_arg));
-       kmem_free(spt_arg, sizeof (*spt_arg));
+       kmem_free(srt_arg, sizeof (*srt_arg));
 
        dsl_dataset_long_rele(to_ds, FTAG);
        if (from_rl != NULL) {