]> git.proxmox.com Git - mirror_zfs.git/commitdiff
Speculative prefetch for reordered requests
authorAlexander Motin <mav@FreeBSD.org>
Mon, 8 Apr 2024 22:13:27 +0000 (18:13 -0400)
committerGitHub <noreply@github.com>
Mon, 8 Apr 2024 22:13:27 +0000 (15:13 -0700)
Before this change speculative prefetcher was able to detect a stream
only if all of its accesses are perfectly sequential.  It was easy to
implement and is perfectly fine for single-threaded applications.
Unfortunately multi-threaded network servers, such as iSCSI, SMB or
NFS usually have plenty of threads and may often reorder requests,
preventing successful speculation and prefetch.

This change allows speculative prefetcher to detect streams even if
requests are reordered by introducing a list of 9 non-contiguous
ranges up to 16MB ahead of current stream position and filling the
gaps as more requests arrive.  It also allows stream to proceed
even with holes up to a certain configurable threshold (25%).

Reviewed-by: Brian Behlendorf <behlendorf1@llnl.gov>
Signed-off-by: Alexander Motin <mav@FreeBSD.org>
Sponsored by: iXsystems, Inc.
Closes #16022

cmd/arc_summary
include/sys/dmu_zfetch.h
man/man4/zfs.4
module/zfs/dmu.c
module/zfs/dmu_zfetch.c

index 9c69ec4f8cccace85ba6a80d4db0cd5629b62e05..100fb1987a8bf7e2c3a7601b1d1c7a9406a5557b 100755 (executable)
@@ -793,18 +793,27 @@ def section_dmu(kstats_dict):
 
     zfetch_stats = isolate_section('zfetchstats', kstats_dict)
 
-    zfetch_access_total = int(zfetch_stats['hits'])+int(zfetch_stats['misses'])
+    zfetch_access_total = int(zfetch_stats['hits']) +\
+        int(zfetch_stats['future']) + int(zfetch_stats['stride']) +\
+        int(zfetch_stats['past']) + int(zfetch_stats['misses'])
 
     prt_1('DMU predictive prefetcher calls:', f_hits(zfetch_access_total))
     prt_i2('Stream hits:',
            f_perc(zfetch_stats['hits'], zfetch_access_total),
            f_hits(zfetch_stats['hits']))
+    future = int(zfetch_stats['future']) + int(zfetch_stats['stride'])
+    prt_i2('Hits ahead of stream:', f_perc(future, zfetch_access_total),
+           f_hits(future))
+    prt_i2('Hits behind stream:',
+           f_perc(zfetch_stats['past'], zfetch_access_total),
+           f_hits(zfetch_stats['past']))
     prt_i2('Stream misses:',
            f_perc(zfetch_stats['misses'], zfetch_access_total),
            f_hits(zfetch_stats['misses']))
     prt_i2('Streams limit reached:',
            f_perc(zfetch_stats['max_streams'], zfetch_stats['misses']),
            f_hits(zfetch_stats['max_streams']))
+    prt_i1('Stream strides:', f_hits(zfetch_stats['stride']))
     prt_i1('Prefetches issued', f_hits(zfetch_stats['io_issued']))
     print()
 
index f00e13cf03a659e68b2664f418f1db5c8ea880ea..322472fb1ae25af96aee3cbf409f6b0dd9106459 100644 (file)
@@ -45,18 +45,24 @@ typedef struct zfetch {
        int             zf_numstreams;  /* number of zstream_t's */
 } zfetch_t;
 
+typedef struct zsrange {
+       uint16_t        start;
+       uint16_t        end;
+} zsrange_t;
+
+#define        ZFETCH_RANGES   9               /* Fits zstream_t into 128 bytes */
+
 typedef struct zstream {
+       list_node_t     zs_node;        /* link for zf_stream */
        uint64_t        zs_blkid;       /* expect next access at this blkid */
+       uint_t          zs_atime;       /* time last prefetch issued */
+       zsrange_t       zs_ranges[ZFETCH_RANGES]; /* ranges from future */
        unsigned int    zs_pf_dist;     /* data prefetch distance in bytes */
        unsigned int    zs_ipf_dist;    /* L1 prefetch distance in bytes */
        uint64_t        zs_pf_start;    /* first data block to prefetch */
        uint64_t        zs_pf_end;      /* data block to prefetch up to */
        uint64_t        zs_ipf_start;   /* first data block to prefetch L1 */
        uint64_t        zs_ipf_end;     /* data block to prefetch L1 up to */
-
-       list_node_t     zs_node;        /* link for zf_stream */
-       hrtime_t        zs_atime;       /* time last prefetch issued */
-       zfetch_t        *zs_fetch;      /* parent fetch */
        boolean_t       zs_missed;      /* stream saw cache misses */
        boolean_t       zs_more;        /* need more distant prefetch */
        zfs_refcount_t  zs_callers;     /* number of pending callers */
@@ -74,7 +80,7 @@ void          dmu_zfetch_init(zfetch_t *, struct dnode *);
 void           dmu_zfetch_fini(zfetch_t *);
 zstream_t      *dmu_zfetch_prepare(zfetch_t *, uint64_t, uint64_t, boolean_t,
     boolean_t);
-void           dmu_zfetch_run(zstream_t *, boolean_t, boolean_t);
+void           dmu_zfetch_run(zfetch_t *, zstream_t *, boolean_t, boolean_t);
 void           dmu_zfetch(zfetch_t *, uint64_t, uint64_t, boolean_t, boolean_t,
     boolean_t);
 
index c8e90968ab34bf667ee44dcd0b65435e918f8116..6088ebc7ef359301ef4ffc778f5745faf6a05902 100644 (file)
@@ -564,6 +564,10 @@ However, this is limited by
 Maximum micro ZAP size.
 A micro ZAP is upgraded to a fat ZAP, once it grows beyond the specified size.
 .
+.It Sy zfetch_hole_shift Ns = Ns Sy 2 Pq uint
+Log2 fraction of holes in speculative prefetch stream allowed for it to
+proceed.
+.
 .It Sy zfetch_min_distance Ns = Ns Sy 4194304 Ns B Po 4 MiB Pc Pq uint
 Min bytes to prefetch per stream.
 Prefetch distance starts from the demand access size and quickly grows to
@@ -578,6 +582,13 @@ Max bytes to prefetch per stream.
 .It Sy zfetch_max_idistance Ns = Ns Sy 67108864 Ns B Po 64 MiB Pc Pq uint
 Max bytes to prefetch indirects for per stream.
 .
+.It Sy zfetch_max_reorder Ns = Ns Sy 16777216 Ns B Po 16 MiB Pc Pq uint
+Requests within this byte distance from the current prefetch stream position
+are considered parts of the stream, reordered due to parallel processing.
+Such requests do not advance the stream position immediately unless
+.Sy zfetch_hole_shift
+fill threshold is reached, but saved to fill holes in the stream later.
+.
 .It Sy zfetch_max_streams Ns = Ns Sy 8 Pq uint
 Max number of streams per zfetch (prefetch streams per file).
 .
index 753dde6d520599ddd23c6c17298e40f3fdf1118b..6ef149aab9a67364a4a5f4f9b92cd33ea89d8775 100644 (file)
@@ -569,8 +569,10 @@ dmu_buf_hold_array_by_dnode(dnode_t *dn, uint64_t offset, uint64_t length,
        for (i = 0; i < nblks; i++) {
                dmu_buf_impl_t *db = dbuf_hold(dn, blkid + i, tag);
                if (db == NULL) {
-                       if (zs)
-                               dmu_zfetch_run(zs, missed, B_TRUE);
+                       if (zs) {
+                               dmu_zfetch_run(&dn->dn_zfetch, zs, missed,
+                                   B_TRUE);
+                       }
                        rw_exit(&dn->dn_struct_rwlock);
                        dmu_buf_rele_array(dbp, nblks, tag);
                        if (read)
@@ -606,7 +608,7 @@ dmu_buf_hold_array_by_dnode(dnode_t *dn, uint64_t offset, uint64_t length,
                zfs_racct_write(length, nblks);
 
        if (zs)
-               dmu_zfetch_run(zs, missed, B_TRUE);
+               dmu_zfetch_run(&dn->dn_zfetch, zs, missed, B_TRUE);
        rw_exit(&dn->dn_struct_rwlock);
 
        if (read) {
index 2b2d72671001661391c7636337c205d436297fd3..915d99916d2e84fce75490ac2b1f926b9192a18c 100644 (file)
@@ -65,9 +65,16 @@ unsigned int zfetch_max_distance = 64 * 1024 * 1024;
 #endif
 /* max bytes to prefetch indirects for per stream (default 64MB) */
 unsigned int   zfetch_max_idistance = 64 * 1024 * 1024;
+/* max request reorder distance within a stream (default 16MB) */
+unsigned int   zfetch_max_reorder = 16 * 1024 * 1024;
+/* Max log2 fraction of holes in a stream */
+unsigned int   zfetch_hole_shift = 2;
 
 typedef struct zfetch_stats {
        kstat_named_t zfetchstat_hits;
+       kstat_named_t zfetchstat_future;
+       kstat_named_t zfetchstat_stride;
+       kstat_named_t zfetchstat_past;
        kstat_named_t zfetchstat_misses;
        kstat_named_t zfetchstat_max_streams;
        kstat_named_t zfetchstat_io_issued;
@@ -76,6 +83,9 @@ typedef struct zfetch_stats {
 
 static zfetch_stats_t zfetch_stats = {
        { "hits",                       KSTAT_DATA_UINT64 },
+       { "future",                     KSTAT_DATA_UINT64 },
+       { "stride",                     KSTAT_DATA_UINT64 },
+       { "past",                       KSTAT_DATA_UINT64 },
        { "misses",                     KSTAT_DATA_UINT64 },
        { "max_streams",                KSTAT_DATA_UINT64 },
        { "io_issued",                  KSTAT_DATA_UINT64 },
@@ -84,6 +94,9 @@ static zfetch_stats_t zfetch_stats = {
 
 struct {
        wmsum_t zfetchstat_hits;
+       wmsum_t zfetchstat_future;
+       wmsum_t zfetchstat_stride;
+       wmsum_t zfetchstat_past;
        wmsum_t zfetchstat_misses;
        wmsum_t zfetchstat_max_streams;
        wmsum_t zfetchstat_io_issued;
@@ -107,6 +120,12 @@ zfetch_kstats_update(kstat_t *ksp, int rw)
                return (EACCES);
        zs->zfetchstat_hits.value.ui64 =
            wmsum_value(&zfetch_sums.zfetchstat_hits);
+       zs->zfetchstat_future.value.ui64 =
+           wmsum_value(&zfetch_sums.zfetchstat_future);
+       zs->zfetchstat_stride.value.ui64 =
+           wmsum_value(&zfetch_sums.zfetchstat_stride);
+       zs->zfetchstat_past.value.ui64 =
+           wmsum_value(&zfetch_sums.zfetchstat_past);
        zs->zfetchstat_misses.value.ui64 =
            wmsum_value(&zfetch_sums.zfetchstat_misses);
        zs->zfetchstat_max_streams.value.ui64 =
@@ -122,6 +141,9 @@ void
 zfetch_init(void)
 {
        wmsum_init(&zfetch_sums.zfetchstat_hits, 0);
+       wmsum_init(&zfetch_sums.zfetchstat_future, 0);
+       wmsum_init(&zfetch_sums.zfetchstat_stride, 0);
+       wmsum_init(&zfetch_sums.zfetchstat_past, 0);
        wmsum_init(&zfetch_sums.zfetchstat_misses, 0);
        wmsum_init(&zfetch_sums.zfetchstat_max_streams, 0);
        wmsum_init(&zfetch_sums.zfetchstat_io_issued, 0);
@@ -147,6 +169,9 @@ zfetch_fini(void)
        }
 
        wmsum_fini(&zfetch_sums.zfetchstat_hits);
+       wmsum_fini(&zfetch_sums.zfetchstat_future);
+       wmsum_fini(&zfetch_sums.zfetchstat_stride);
+       wmsum_fini(&zfetch_sums.zfetchstat_past);
        wmsum_fini(&zfetch_sums.zfetchstat_misses);
        wmsum_fini(&zfetch_sums.zfetchstat_max_streams);
        wmsum_fini(&zfetch_sums.zfetchstat_io_issued);
@@ -222,22 +247,22 @@ static void
 dmu_zfetch_stream_create(zfetch_t *zf, uint64_t blkid)
 {
        zstream_t *zs, *zs_next, *zs_old = NULL;
-       hrtime_t now = gethrtime(), t;
+       uint_t now = gethrestime_sec(), t;
 
        ASSERT(MUTEX_HELD(&zf->zf_lock));
 
        /*
         * Delete too old streams, reusing the first found one.
         */
-       t = now - SEC2NSEC(zfetch_max_sec_reap);
+       t = now - zfetch_max_sec_reap;
        for (zs = list_head(&zf->zf_stream); zs != NULL; zs = zs_next) {
                zs_next = list_next(&zf->zf_stream, zs);
                /*
                 * Skip if still active.  1 -- zf_stream reference.
                 */
-               if (zfs_refcount_count(&zs->zs_refs) != 1)
+               if ((int)(zs->zs_atime - t) >= 0)
                        continue;
-               if (zs->zs_atime > t)
+               if (zfs_refcount_count(&zs->zs_refs) != 1)
                        continue;
                if (zs_old)
                        dmu_zfetch_stream_remove(zf, zs);
@@ -246,6 +271,7 @@ dmu_zfetch_stream_create(zfetch_t *zf, uint64_t blkid)
        }
        if (zs_old) {
                zs = zs_old;
+               list_remove(&zf->zf_stream, zs);
                goto reuse;
        }
 
@@ -255,21 +281,23 @@ dmu_zfetch_stream_create(zfetch_t *zf, uint64_t blkid)
         * for all the streams to be non-overlapping.
         */
        uint32_t max_streams = MAX(1, MIN(zfetch_max_streams,
-           zf->zf_dnode->dn_maxblkid * zf->zf_dnode->dn_datablksz /
+           (zf->zf_dnode->dn_maxblkid << zf->zf_dnode->dn_datablkshift) /
            zfetch_max_distance));
        if (zf->zf_numstreams >= max_streams) {
-               t = now - SEC2NSEC(zfetch_min_sec_reap);
+               t = now - zfetch_min_sec_reap;
                for (zs = list_head(&zf->zf_stream); zs != NULL;
                    zs = list_next(&zf->zf_stream, zs)) {
-                       if (zfs_refcount_count(&zs->zs_refs) != 1)
+                       if ((int)(zs->zs_atime - t) >= 0)
                                continue;
-                       if (zs->zs_atime > t)
+                       if (zfs_refcount_count(&zs->zs_refs) != 1)
                                continue;
-                       if (zs_old == NULL || zs->zs_atime < zs_old->zs_atime)
+                       if (zs_old == NULL ||
+                           (int)(zs_old->zs_atime - zs->zs_atime) >= 0)
                                zs_old = zs;
                }
                if (zs_old) {
                        zs = zs_old;
+                       list_remove(&zf->zf_stream, zs);
                        goto reuse;
                }
                ZFETCHSTAT_BUMP(zfetchstat_max_streams);
@@ -277,24 +305,24 @@ dmu_zfetch_stream_create(zfetch_t *zf, uint64_t blkid)
        }
 
        zs = kmem_zalloc(sizeof (*zs), KM_SLEEP);
-       zs->zs_fetch = zf;
        zfs_refcount_create(&zs->zs_callers);
        zfs_refcount_create(&zs->zs_refs);
        /* One reference for zf_stream. */
        zfs_refcount_add(&zs->zs_refs, NULL);
        zf->zf_numstreams++;
-       list_insert_head(&zf->zf_stream, zs);
 
 reuse:
+       list_insert_head(&zf->zf_stream, zs);
        zs->zs_blkid = blkid;
+       /* Allow immediate stream reuse until first hit. */
+       zs->zs_atime = now - zfetch_min_sec_reap;
+       memset(zs->zs_ranges, 0, sizeof (zs->zs_ranges));
        zs->zs_pf_dist = 0;
+       zs->zs_ipf_dist = 0;
        zs->zs_pf_start = blkid;
        zs->zs_pf_end = blkid;
-       zs->zs_ipf_dist = 0;
        zs->zs_ipf_start = blkid;
        zs->zs_ipf_end = blkid;
-       /* Allow immediate stream reuse until first hit. */
-       zs->zs_atime = now - SEC2NSEC(zfetch_min_sec_reap);
        zs->zs_missed = B_FALSE;
        zs->zs_more = B_FALSE;
 }
@@ -311,6 +339,120 @@ dmu_zfetch_done(void *arg, uint64_t level, uint64_t blkid, boolean_t io_issued)
        aggsum_add(&zfetch_sums.zfetchstat_io_active, -1);
 }
 
+/*
+ * Process stream hit access for nblks blocks starting at zs_blkid.  Return
+ * number of blocks to proceed for after aggregation with future ranges.
+ */
+static uint64_t
+dmu_zfetch_hit(zstream_t *zs, uint64_t nblks)
+{
+       uint_t i, j;
+
+       /* Optimize sequential accesses (no future ranges). */
+       if (zs->zs_ranges[0].start == 0)
+               goto done;
+
+       /* Look for intersections with further ranges. */
+       for (i = 0; i < ZFETCH_RANGES; i++) {
+               zsrange_t *r = &zs->zs_ranges[i];
+               if (r->start == 0 || r->start > nblks)
+                       break;
+               if (r->end >= nblks) {
+                       nblks = r->end;
+                       i++;
+                       break;
+               }
+       }
+
+       /* Delete all found intersecting ranges, updates remaining. */
+       for (j = 0; i < ZFETCH_RANGES; i++, j++) {
+               if (zs->zs_ranges[i].start == 0)
+                       break;
+               ASSERT3U(zs->zs_ranges[i].start, >, nblks);
+               ASSERT3U(zs->zs_ranges[i].end, >, nblks);
+               zs->zs_ranges[j].start = zs->zs_ranges[i].start - nblks;
+               zs->zs_ranges[j].end = zs->zs_ranges[i].end - nblks;
+       }
+       if (j < ZFETCH_RANGES) {
+               zs->zs_ranges[j].start = 0;
+               zs->zs_ranges[j].end = 0;
+       }
+
+done:
+       zs->zs_blkid += nblks;
+       return (nblks);
+}
+
+/*
+ * Process future stream access for nblks blocks starting at blkid.  Return
+ * number of blocks to proceed for if future ranges reach fill threshold.
+ */
+static uint64_t
+dmu_zfetch_future(zstream_t *zs, uint64_t blkid, uint64_t nblks)
+{
+       ASSERT3U(blkid, >, zs->zs_blkid);
+       blkid -= zs->zs_blkid;
+       ASSERT3U(blkid + nblks, <=, UINT16_MAX);
+
+       /* Search for first and last intersection or insert point. */
+       uint_t f = ZFETCH_RANGES, l = 0, i;
+       for (i = 0; i < ZFETCH_RANGES; i++) {
+               zsrange_t *r = &zs->zs_ranges[i];
+               if (r->start == 0 || r->start > blkid + nblks)
+                       break;
+               if (r->end < blkid)
+                       continue;
+               if (f > i)
+                       f = i;
+               if (l < i)
+                       l = i;
+       }
+       if (f <= l) {
+               /* Got some intersecting range, expand it if needed. */
+               if (zs->zs_ranges[f].start > blkid)
+                       zs->zs_ranges[f].start = blkid;
+               zs->zs_ranges[f].end = MAX(zs->zs_ranges[l].end, blkid + nblks);
+               if (f < l) {
+                       /* Got more than one intersection, remove others. */
+                       for (f++, l++; l < ZFETCH_RANGES; f++, l++) {
+                               zs->zs_ranges[f].start = zs->zs_ranges[l].start;
+                               zs->zs_ranges[f].end = zs->zs_ranges[l].end;
+                       }
+                       zs->zs_ranges[ZFETCH_RANGES - 1].start = 0;
+                       zs->zs_ranges[ZFETCH_RANGES - 1].end = 0;
+               }
+       } else if (i < ZFETCH_RANGES) {
+               /* Got no intersecting ranges, insert new one. */
+               for (l = ZFETCH_RANGES - 1; l > i; l--) {
+                       zs->zs_ranges[l].start = zs->zs_ranges[l - 1].start;
+                       zs->zs_ranges[l].end = zs->zs_ranges[l - 1].end;
+               }
+               zs->zs_ranges[i].start = blkid;
+               zs->zs_ranges[i].end = blkid + nblks;
+       } else {
+               /* No space left to insert.  Drop the range. */
+               return (0);
+       }
+
+       /* Check if with the new access addition we reached fill threshold. */
+       if (zfetch_hole_shift >= 16)
+               return (0);
+       uint_t hole = 0;
+       for (i = f = l = 0; i < ZFETCH_RANGES; i++) {
+               zsrange_t *r = &zs->zs_ranges[i];
+               if (r->start == 0)
+                       break;
+               hole += r->start - f;
+               f = r->end;
+               if (hole <= r->end >> zfetch_hole_shift)
+                       l = r->end;
+       }
+       if (l > 0)
+               return (dmu_zfetch_hit(zs, l));
+
+       return (0);
+}
+
 /*
  * This is the predictive prefetch entry point.  dmu_zfetch_prepare()
  * associates dnode access specified with blkid and nblks arguments with
@@ -370,53 +512,92 @@ dmu_zfetch_prepare(zfetch_t *zf, uint64_t blkid, uint64_t nblks,
        mutex_enter(&zf->zf_lock);
 
        /*
-        * Find matching prefetch stream.  Depending on whether the accesses
+        * Find perfect prefetch stream.  Depending on whether the accesses
         * are block-aligned, first block of the new access may either follow
         * the last block of the previous access, or be equal to it.
         */
+       unsigned int dbs = zf->zf_dnode->dn_datablkshift;
+       uint64_t end_blkid = blkid + nblks;
        for (zs = list_head(&zf->zf_stream); zs != NULL;
            zs = list_next(&zf->zf_stream, zs)) {
                if (blkid == zs->zs_blkid) {
-                       break;
+                       goto hit;
                } else if (blkid + 1 == zs->zs_blkid) {
                        blkid++;
                        nblks--;
-                       break;
+                       goto hit;
                }
        }
 
        /*
-        * If the file is ending, remove the matching stream if found.
-        * If not found then it is too late to create a new one now.
+        * Find close enough prefetch stream.  Access crossing stream position
+        * is a hit in its new part.  Access ahead of stream position considered
+        * a hit for metadata prefetch, since we do not care about fill percent,
+        * or stored for future otherwise.  Access behind stream position is
+        * silently ignored, since we already skipped it reaching fill percent.
         */
-       uint64_t end_of_access_blkid = blkid + nblks;
-       if (end_of_access_blkid >= maxblkid) {
-               if (zs != NULL)
-                       dmu_zfetch_stream_remove(zf, zs);
-               mutex_exit(&zf->zf_lock);
-               if (!have_lock)
-                       rw_exit(&zf->zf_dnode->dn_struct_rwlock);
-               return (NULL);
+       uint_t max_reorder = MIN((zfetch_max_reorder >> dbs) + 1, UINT16_MAX);
+       uint_t t = gethrestime_sec() - zfetch_max_sec_reap;
+       for (zs = list_head(&zf->zf_stream); zs != NULL;
+           zs = list_next(&zf->zf_stream, zs)) {
+               if (blkid > zs->zs_blkid) {
+                       if (end_blkid <= zs->zs_blkid + max_reorder) {
+                               if (!fetch_data) {
+                                       nblks = dmu_zfetch_hit(zs,
+                                           end_blkid - zs->zs_blkid);
+                                       ZFETCHSTAT_BUMP(zfetchstat_stride);
+                                       goto future;
+                               }
+                               nblks = dmu_zfetch_future(zs, blkid, nblks);
+                               if (nblks > 0)
+                                       ZFETCHSTAT_BUMP(zfetchstat_stride);
+                               else
+                                       ZFETCHSTAT_BUMP(zfetchstat_future);
+                               goto future;
+                       }
+               } else if (end_blkid >= zs->zs_blkid) {
+                       nblks -= zs->zs_blkid - blkid;
+                       blkid += zs->zs_blkid - blkid;
+                       goto hit;
+               } else if (end_blkid + max_reorder > zs->zs_blkid &&
+                   (int)(zs->zs_atime - t) >= 0) {
+                       ZFETCHSTAT_BUMP(zfetchstat_past);
+                       zs->zs_atime = gethrestime_sec();
+                       goto out;
+               }
        }
 
-       /* Exit if we already prefetched this block before. */
-       if (nblks == 0) {
-               mutex_exit(&zf->zf_lock);
-               if (!have_lock)
-                       rw_exit(&zf->zf_dnode->dn_struct_rwlock);
-               return (NULL);
-       }
+       /*
+        * This access is not part of any existing stream.  Create a new
+        * stream for it unless we are at the end of file.
+        */
+       if (end_blkid < maxblkid)
+               dmu_zfetch_stream_create(zf, end_blkid);
+       mutex_exit(&zf->zf_lock);
+       if (!have_lock)
+               rw_exit(&zf->zf_dnode->dn_struct_rwlock);
+       ZFETCHSTAT_BUMP(zfetchstat_misses);
+       return (NULL);
 
-       if (zs == NULL) {
-               /*
-                * This access is not part of any existing stream.  Create
-                * a new stream for it.
-                */
-               dmu_zfetch_stream_create(zf, end_of_access_blkid);
+hit:
+       nblks = dmu_zfetch_hit(zs, nblks);
+       ZFETCHSTAT_BUMP(zfetchstat_hits);
+
+future:
+       zs->zs_atime = gethrestime_sec();
+
+       /* Exit if we already prefetched for this position before. */
+       if (nblks == 0)
+               goto out;
+
+       /* If the file is ending, remove the stream. */
+       end_blkid = zs->zs_blkid;
+       if (end_blkid >= maxblkid) {
+               dmu_zfetch_stream_remove(zf, zs);
+out:
                mutex_exit(&zf->zf_lock);
                if (!have_lock)
                        rw_exit(&zf->zf_dnode->dn_struct_rwlock);
-               ZFETCHSTAT_BUMP(zfetchstat_misses);
                return (NULL);
        }
 
@@ -432,7 +613,6 @@ dmu_zfetch_prepare(zfetch_t *zf, uint64_t blkid, uint64_t nblks,
         * than ~6% of ARC held by active prefetches.  It should help with
         * getting out of RAM on some badly mispredicted read patterns.
         */
-       unsigned int dbs = zf->zf_dnode->dn_datablkshift;
        unsigned int nbytes = nblks << dbs;
        unsigned int pf_nblks;
        if (fetch_data) {
@@ -452,10 +632,10 @@ dmu_zfetch_prepare(zfetch_t *zf, uint64_t blkid, uint64_t nblks,
        } else {
                pf_nblks = 0;
        }
-       if (zs->zs_pf_start < end_of_access_blkid)
-               zs->zs_pf_start = end_of_access_blkid;
-       if (zs->zs_pf_end < end_of_access_blkid + pf_nblks)
-               zs->zs_pf_end = end_of_access_blkid + pf_nblks;
+       if (zs->zs_pf_start < end_blkid)
+               zs->zs_pf_start = end_blkid;
+       if (zs->zs_pf_end < end_blkid + pf_nblks)
+               zs->zs_pf_end = end_blkid + pf_nblks;
 
        /*
         * Do the same for indirects, starting where we will stop reading
@@ -473,9 +653,6 @@ dmu_zfetch_prepare(zfetch_t *zf, uint64_t blkid, uint64_t nblks,
        if (zs->zs_ipf_end < zs->zs_pf_end + pf_nblks)
                zs->zs_ipf_end = zs->zs_pf_end + pf_nblks;
 
-       zs->zs_blkid = end_of_access_blkid;
-       /* Protect the stream from reclamation. */
-       zs->zs_atime = gethrtime();
        zfs_refcount_add(&zs->zs_refs, NULL);
        /* Count concurrent callers. */
        zfs_refcount_add(&zs->zs_callers, NULL);
@@ -483,15 +660,13 @@ dmu_zfetch_prepare(zfetch_t *zf, uint64_t blkid, uint64_t nblks,
 
        if (!have_lock)
                rw_exit(&zf->zf_dnode->dn_struct_rwlock);
-
-       ZFETCHSTAT_BUMP(zfetchstat_hits);
        return (zs);
 }
 
 void
-dmu_zfetch_run(zstream_t *zs, boolean_t missed, boolean_t have_lock)
+dmu_zfetch_run(zfetch_t *zf, zstream_t *zs, boolean_t missed,
+    boolean_t have_lock)
 {
-       zfetch_t *zf = zs->zs_fetch;
        int64_t pf_start, pf_end, ipf_start, ipf_end;
        int epbs, issued;
 
@@ -567,7 +742,7 @@ dmu_zfetch(zfetch_t *zf, uint64_t blkid, uint64_t nblks, boolean_t fetch_data,
 
        zs = dmu_zfetch_prepare(zf, blkid, nblks, fetch_data, have_lock);
        if (zs)
-               dmu_zfetch_run(zs, missed, have_lock);
+               dmu_zfetch_run(zf, zs, missed, have_lock);
 }
 
 ZFS_MODULE_PARAM(zfs_prefetch, zfs_prefetch_, disable, INT, ZMOD_RW,
@@ -590,3 +765,9 @@ ZFS_MODULE_PARAM(zfs_prefetch, zfetch_, max_distance, UINT, ZMOD_RW,
 
 ZFS_MODULE_PARAM(zfs_prefetch, zfetch_, max_idistance, UINT, ZMOD_RW,
        "Max bytes to prefetch indirects for per stream");
+
+ZFS_MODULE_PARAM(zfs_prefetch, zfetch_, max_reorder, UINT, ZMOD_RW,
+       "Max request reorder distance within a stream");
+
+ZFS_MODULE_PARAM(zfs_prefetch, zfetch_, hole_shift, UINT, ZMOD_RW,
+       "Max log2 fraction of holes in a stream");