]> git.proxmox.com Git - mirror_zfs.git/blobdiff - module/zfs/dmu_recv.c
Provide macros for setting and getting blkptr birth times
[mirror_zfs.git] / module / zfs / dmu_recv.c
index e22b6b13a40b7697f9bfd3ffdb9ec7da3d79356f..680aed4513bc2cc587e1c7c2e8d4a7159e3283d7 100644 (file)
@@ -31,6 +31,7 @@
  * Copyright (c) 2022 Axcient.
  */
 
+#include <sys/arc.h>
 #include <sys/spa_impl.h>
 #include <sys/dmu.h>
 #include <sys/dmu_impl.h>
 #endif
 #include <sys/zfs_file.h>
 
-static int zfs_recv_queue_length = SPA_MAXBLOCKSIZE;
-static int zfs_recv_queue_ff = 20;
-static int zfs_recv_write_batch_size = 1024 * 1024;
+static uint_t zfs_recv_queue_length = SPA_MAXBLOCKSIZE;
+static uint_t zfs_recv_queue_ff = 20;
+static uint_t zfs_recv_write_batch_size = 1024 * 1024;
 static int zfs_recv_best_effort_corrective = 0;
 
 static const void *const dmu_recv_tag = "dmu_recv_tag";
 const char *const recv_clone_name = "%recv";
 
+typedef enum {
+       ORNS_NO,
+       ORNS_YES,
+       ORNS_MAYBE
+} or_need_sync_t;
+
 static int receive_read_payload_and_next_header(dmu_recv_cookie_t *ra, int len,
     void *buf);
 
@@ -128,6 +135,9 @@ struct receive_writer_arg {
        uint8_t or_mac[ZIO_DATA_MAC_LEN];
        boolean_t or_byteorder;
        zio_t *heal_pio;
+
+       /* Keep track of DRR_FREEOBJECTS right after DRR_OBJECT_RANGE */
+       or_need_sync_t or_need_sync;
 };
 
 typedef struct dmu_recv_begin_arg {
@@ -646,7 +656,7 @@ dmu_recv_begin_check(void *arg, dmu_tx_t *tx)
                 * so add the DS_HOLD_FLAG_DECRYPT flag only if we are dealing
                 * with a dataset we may encrypt.
                 */
-               if (drba->drba_dcp != NULL &&
+               if (drba->drba_dcp == NULL ||
                    drba->drba_dcp->cp_crypt != ZIO_CRYPT_OFF) {
                        dsflags |= DS_HOLD_FLAG_DECRYPT;
                }
@@ -1207,13 +1217,14 @@ dmu_recv_resume_begin_sync(void *arg, dmu_tx_t *tx)
  * succeeds; otherwise we will leak the holds on the datasets.
  */
 int
-dmu_recv_begin(char *tofs, char *tosnap, dmu_replay_record_t *drr_begin,
-    boolean_t force, boolean_t heal, boolean_t resumable, nvlist_t *localprops,
-    nvlist_t *hidden_args, char *origin, dmu_recv_cookie_t *drc,
-    zfs_file_t *fp, offset_t *voffp)
+dmu_recv_begin(const char *tofs, const char *tosnap,
+    dmu_replay_record_t *drr_begin, boolean_t force, boolean_t heal,
+    boolean_t resumable, nvlist_t *localprops, nvlist_t *hidden_args,
+    const char *origin, dmu_recv_cookie_t *drc, zfs_file_t *fp,
+    offset_t *voffp)
 {
        dmu_recv_begin_arg_t drba = { 0 };
-       int err;
+       int err = 0;
 
        memset(drc, 0, sizeof (dmu_recv_cookie_t));
        drc->drc_drr_begin = drr_begin;
@@ -1245,20 +1256,36 @@ dmu_recv_begin(char *tofs, char *tosnap, dmu_replay_record_t *drr_begin,
            DMU_GET_FEATUREFLAGS(drc->drc_drrb->drr_versioninfo);
 
        uint32_t payloadlen = drc->drc_drr_begin->drr_payloadlen;
-       void *payload = NULL;
-       if (payloadlen != 0)
-               payload = kmem_alloc(payloadlen, KM_SLEEP);
 
-       err = receive_read_payload_and_next_header(drc, payloadlen,
-           payload);
-       if (err != 0) {
-               kmem_free(payload, payloadlen);
-               return (err);
-       }
+       /*
+        * Since OpenZFS 2.0.0, we have enforced a 64MB limit in userspace
+        * configurable via ZFS_SENDRECV_MAX_NVLIST. We enforce 256MB as a hard
+        * upper limit. Systems with less than 1GB of RAM will see a lower
+        * limit from `arc_all_memory() / 4`.
+        */
+       if (payloadlen > (MIN((1U << 28), arc_all_memory() / 4)))
+               return (E2BIG);
+
+
        if (payloadlen != 0) {
+               void *payload = vmem_alloc(payloadlen, KM_SLEEP);
+               /*
+                * For compatibility with recursive send streams, we don't do
+                * this here if the stream could be part of a package. Instead,
+                * we'll do it in dmu_recv_stream. If we pull the next header
+                * too early, and it's the END record, we break the `recv_skip`
+                * logic.
+                */
+
+               err = receive_read_payload_and_next_header(drc, payloadlen,
+                   payload);
+               if (err != 0) {
+                       vmem_free(payload, payloadlen);
+                       return (err);
+               }
                err = nvlist_unpack(payload, payloadlen, &drc->drc_begin_nvl,
                    KM_SLEEP);
-               kmem_free(payload, payloadlen);
+               vmem_free(payload, payloadlen);
                if (err != 0) {
                        kmem_free(drc->drc_next_rrd,
                            sizeof (*drc->drc_next_rrd));
@@ -1325,8 +1352,10 @@ corrective_read_done(zio_t *zio)
 {
        cr_cb_data_t *data = zio->io_private;
        /* Corruption corrected; update error log if needed */
-       if (zio->io_error == 0)
-               spa_remove_error(data->spa, &data->zb);
+       if (zio->io_error == 0) {
+               spa_remove_error(data->spa, &data->zb,
+                   BP_GET_LOGICAL_BIRTH(zio->io_bp));
+       }
        kmem_free(data, sizeof (cr_cb_data_t));
        abd_free(zio->io_abd);
 }
@@ -1344,8 +1373,8 @@ do_corrective_recv(struct receive_writer_arg *rwa, struct drr_write *drrw,
        dnode_t *dn;
        abd_t *abd = rrd->abd;
        zio_cksum_t bp_cksum = bp->blk_cksum;
-       enum zio_flag flags = ZIO_FLAG_SPECULATIVE |
-           ZIO_FLAG_DONT_CACHE | ZIO_FLAG_DONT_RETRY | ZIO_FLAG_CANFAIL;
+       zio_flag_t flags = ZIO_FLAG_SPECULATIVE | ZIO_FLAG_DONT_RETRY |
+           ZIO_FLAG_CANFAIL;
 
        if (rwa->raw)
                flags |= ZIO_FLAG_RAW;
@@ -1378,8 +1407,9 @@ do_corrective_recv(struct receive_writer_arg *rwa, struct drr_write *drrw,
                /* Recompress the data */
                abd_t *cabd = abd_alloc_linear(BP_GET_PSIZE(bp),
                    B_FALSE);
+               void *buf = abd_to_buf(cabd);
                uint64_t csize = zio_compress_data(BP_GET_COMPRESS(bp),
-                   abd, abd_to_buf(cabd), abd_get_size(abd),
+                   abd, &buf, abd_get_size(abd),
                    rwa->os->os_complevel);
                abd_zero_off(cabd, csize, BP_GET_PSIZE(bp) - csize);
                /* Swap in newly compressed data into the abd */
@@ -1452,8 +1482,9 @@ do_corrective_recv(struct receive_writer_arg *rwa, struct drr_write *drrw,
        }
        rrd->abd = abd;
 
-       io = zio_rewrite(NULL, rwa->os->os_spa, bp->blk_birth, bp, abd,
-           BP_GET_PSIZE(bp), NULL, NULL, ZIO_PRIORITY_SYNC_WRITE, flags, &zb);
+       io = zio_rewrite(NULL, rwa->os->os_spa, BP_GET_LOGICAL_BIRTH(bp), bp,
+           abd, BP_GET_PSIZE(bp), NULL, NULL, ZIO_PRIORITY_SYNC_WRITE, flags,
+           &zb);
 
        ASSERT(abd_get_size(abd) == BP_GET_LSIZE(bp) ||
            abd_get_size(abd) == BP_GET_PSIZE(bp));
@@ -1500,11 +1531,11 @@ receive_read(dmu_recv_cookie_t *drc, int len, void *buf)
            (drc->drc_featureflags & DMU_BACKUP_FEATURE_RAW) != 0);
 
        while (done < len) {
-               ssize_t resid;
+               ssize_t resid = len - done;
                zfs_file_t *fp = drc->drc_fp;
                int err = zfs_file_read(fp, (char *)buf + done,
                    len - done, &resid);
-               if (resid == len - done) {
+               if (err == 0 && resid == len - done) {
                        /*
                         * Note: ECKSUM or ZFS_ERR_STREAM_TRUNCATED indicates
                         * that the receive was interrupted and can
@@ -1767,17 +1798,19 @@ receive_handle_existing_object(const struct receive_writer_arg *rwa,
        }
 
        /*
-        * The dmu does not currently support decreasing nlevels
-        * or changing the number of dnode slots on an object. For
-        * non-raw sends, this does not matter and the new object
-        * can just use the previous one's nlevels. For raw sends,
-        * however, the structure of the received dnode (including
-        * nlevels and dnode slots) must match that of the send
-        * side. Therefore, instead of using dmu_object_reclaim(),
-        * we must free the object completely and call
-        * dmu_object_claim_dnsize() instead.
+        * The dmu does not currently support decreasing nlevels or changing
+        * indirect block size if there is already one, same as changing the
+        * number of of dnode slots on an object.  For non-raw sends this
+        * does not matter and the new object can just use the previous one's
+        * parameters.  For raw sends, however, the structure of the received
+        * dnode (including indirects and dnode slots) must match that of the
+        * send side.  Therefore, instead of using dmu_object_reclaim(), we
+        * must free the object completely and call dmu_object_claim_dnsize()
+        * instead.
         */
-       if ((rwa->raw && drro->drr_nlevels < doi->doi_indirection) ||
+       if ((rwa->raw && ((doi->doi_indirection > 1 &&
+           indblksz != doi->doi_metadata_block_size) ||
+           drro->drr_nlevels < doi->doi_indirection)) ||
            dn_slots != doi->doi_dnodesize >> DNODE_SHIFT) {
                err = dmu_free_long_object(rwa->os, drro->drr_object);
                if (err != 0)
@@ -1903,10 +1936,22 @@ receive_object(struct receive_writer_arg *rwa, struct drr_object *drro,
                /* object was freed and we are about to allocate a new one */
                object_to_hold = DMU_NEW_OBJECT;
        } else {
+               /*
+                * If the only record in this range so far was DRR_FREEOBJECTS
+                * with at least one actually freed object, it's possible that
+                * the block will now be converted to a hole. We need to wait
+                * for the txg to sync to prevent races.
+                */
+               if (rwa->or_need_sync == ORNS_YES)
+                       txg_wait_synced(dmu_objset_pool(rwa->os), 0);
+
                /* object is free and we are about to allocate a new one */
                object_to_hold = DMU_NEW_OBJECT;
        }
 
+       /* Only relevant for the first object in the range */
+       rwa->or_need_sync = ORNS_NO;
+
        /*
         * If this is a multi-slot dnode there is a chance that this
         * object will expand into a slot that is already used by
@@ -2068,6 +2113,16 @@ receive_object(struct receive_writer_arg *rwa, struct drr_object *drro,
                dmu_buf_rele(db, FTAG);
                dnode_rele(dn, FTAG);
        }
+
+       /*
+        * If the receive fails, we want the resume stream to start with the
+        * same record that we last successfully received. There is no way to
+        * request resume from the object record, but we can benefit from the
+        * fact that sender always sends object record before anything else,
+        * after which it will "resend" data at offset 0 and resume normally.
+        */
+       save_resume_state(rwa, drro->drr_object, 0, tx);
+
        dmu_tx_commit(tx);
 
        return (0);
@@ -2100,6 +2155,9 @@ receive_freeobjects(struct receive_writer_arg *rwa,
 
                if (err != 0)
                        return (err);
+
+               if (rwa->or_need_sync == ORNS_MAYBE)
+                       rwa->or_need_sync = ORNS_YES;
        }
        if (next_err != ESRCH)
                return (next_err);
@@ -2183,10 +2241,10 @@ flush_write_batch_impl(struct receive_writer_arg *rwa)
                        if (err == 0)
                                abd_free(abd);
                } else {
-                       zio_prop_t zp;
+                       zio_prop_t zp = {0};
                        dmu_write_policy(rwa->os, dn, 0, 0, &zp);
 
-                       enum zio_flag zio_flags = 0;
+                       zio_flag_t zio_flags = 0;
 
                        if (rwa->raw) {
                                zp.zp_encrypt = B_TRUE;
@@ -2298,7 +2356,6 @@ receive_process_write_record(struct receive_writer_arg *rwa,
        if (rwa->heal) {
                blkptr_t *bp;
                dmu_buf_t *dbp;
-               dnode_t *dn;
                int flags = DB_RF_CANFAIL;
 
                if (rwa->raw)
@@ -2330,19 +2387,15 @@ receive_process_write_record(struct receive_writer_arg *rwa,
                        dmu_buf_rele(dbp, FTAG);
                        return (err);
                }
-               dn = dmu_buf_dnode_enter(dbp);
                /* Make sure the on-disk block and recv record sizes match */
-               if (drrw->drr_logical_size !=
-                   dn->dn_datablkszsec << SPA_MINBLOCKSHIFT) {
+               if (drrw->drr_logical_size != dbp->db_size) {
                        err = ENOTSUP;
-                       dmu_buf_dnode_exit(dbp);
                        dmu_buf_rele(dbp, FTAG);
                        return (err);
                }
                /* Get the block pointer for the corrupted block */
                bp = dmu_buf_get_blkptr(dbp);
                err = do_corrective_recv(rwa, drrw, rrd, bp);
-               dmu_buf_dnode_exit(dbp);
                dmu_buf_rele(dbp, FTAG);
                return (err);
        }
@@ -2487,7 +2540,7 @@ receive_spill(struct receive_writer_arg *rwa, struct drr_spill *drrs,
         * size of the provided arc_buf_t.
         */
        if (db_spill->db_size != drrs->drr_length) {
-               dmu_buf_will_fill(db_spill, tx);
+               dmu_buf_will_fill(db_spill, tx, B_FALSE);
                VERIFY0(dbuf_spill_set_blksz(db_spill,
                    drrs->drr_length, tx));
        }
@@ -2593,6 +2646,8 @@ receive_object_range(struct receive_writer_arg *rwa,
        memcpy(rwa->or_mac, drror->drr_mac, ZIO_DATA_MAC_LEN);
        rwa->or_byteorder = byteorder;
 
+       rwa->or_need_sync = ORNS_MAYBE;
+
        return (0);
 }
 
@@ -3277,6 +3332,17 @@ dmu_recv_stream(dmu_recv_cookie_t *drc, offset_t *voffp)
                        goto out;
        }
 
+       /*
+        * For compatibility with recursive send streams, we do this here,
+        * rather than in dmu_recv_begin. If we pull the next header too
+        * early, and it's the END record, we break the `recv_skip` logic.
+        */
+       if (drc->drc_drr_begin->drr_payloadlen == 0) {
+               err = receive_read_payload_and_next_header(drc, 0, NULL);
+               if (err != 0)
+                       goto out;
+       }
+
        /*
         * If we failed before this point we will clean up any new resume
         * state that was created. Now that we've gotten past the initial
@@ -3729,13 +3795,13 @@ dmu_objset_is_receiving(objset_t *os)
            os->os_dsl_dataset->ds_owner == dmu_recv_tag);
 }
 
-ZFS_MODULE_PARAM(zfs_recv, zfs_recv_, queue_length, INT, ZMOD_RW,
+ZFS_MODULE_PARAM(zfs_recv, zfs_recv_, queue_length, UINT, ZMOD_RW,
        "Maximum receive queue length");
 
-ZFS_MODULE_PARAM(zfs_recv, zfs_recv_, queue_ff, INT, ZMOD_RW,
+ZFS_MODULE_PARAM(zfs_recv, zfs_recv_, queue_ff, UINT, ZMOD_RW,
        "Receive queue fill fraction");
 
-ZFS_MODULE_PARAM(zfs_recv, zfs_recv_, write_batch_size, INT, ZMOD_RW,
+ZFS_MODULE_PARAM(zfs_recv, zfs_recv_, write_batch_size, UINT, ZMOD_RW,
        "Maximum amount of writes to batch into one transaction");
 
 ZFS_MODULE_PARAM(zfs_recv, zfs_recv_, best_effort_corrective, INT, ZMOD_RW,