]> git.proxmox.com Git - mirror_zfs.git/blobdiff - module/zfs/dmu_recv.c
Remove db_state DB_NOFILL checks from syncing context
[mirror_zfs.git] / module / zfs / dmu_recv.c
index 61cfe36515a309c9e4c3d6f2aed925a26458773d..680aed4513bc2cc587e1c7c2e8d4a7159e3283d7 100644 (file)
@@ -76,6 +76,12 @@ static int zfs_recv_best_effort_corrective = 0;
 static const void *const dmu_recv_tag = "dmu_recv_tag";
 const char *const recv_clone_name = "%recv";
 
+typedef enum {
+       ORNS_NO,
+       ORNS_YES,
+       ORNS_MAYBE
+} or_need_sync_t;
+
 static int receive_read_payload_and_next_header(dmu_recv_cookie_t *ra, int len,
     void *buf);
 
@@ -129,6 +135,9 @@ struct receive_writer_arg {
        uint8_t or_mac[ZIO_DATA_MAC_LEN];
        boolean_t or_byteorder;
        zio_t *heal_pio;
+
+       /* Keep track of DRR_FREEOBJECTS right after DRR_OBJECT_RANGE */
+       or_need_sync_t or_need_sync;
 };
 
 typedef struct dmu_recv_begin_arg {
@@ -1208,13 +1217,14 @@ dmu_recv_resume_begin_sync(void *arg, dmu_tx_t *tx)
  * succeeds; otherwise we will leak the holds on the datasets.
  */
 int
-dmu_recv_begin(char *tofs, char *tosnap, dmu_replay_record_t *drr_begin,
-    boolean_t force, boolean_t heal, boolean_t resumable, nvlist_t *localprops,
-    nvlist_t *hidden_args, char *origin, dmu_recv_cookie_t *drc,
-    zfs_file_t *fp, offset_t *voffp)
+dmu_recv_begin(const char *tofs, const char *tosnap,
+    dmu_replay_record_t *drr_begin, boolean_t force, boolean_t heal,
+    boolean_t resumable, nvlist_t *localprops, nvlist_t *hidden_args,
+    const char *origin, dmu_recv_cookie_t *drc, zfs_file_t *fp,
+    offset_t *voffp)
 {
        dmu_recv_begin_arg_t drba = { 0 };
-       int err;
+       int err = 0;
 
        memset(drc, 0, sizeof (dmu_recv_cookie_t));
        drc->drc_drr_begin = drr_begin;
@@ -1246,7 +1256,6 @@ dmu_recv_begin(char *tofs, char *tosnap, dmu_replay_record_t *drr_begin,
            DMU_GET_FEATUREFLAGS(drc->drc_drrb->drr_versioninfo);
 
        uint32_t payloadlen = drc->drc_drr_begin->drr_payloadlen;
-       void *payload = NULL;
 
        /*
         * Since OpenZFS 2.0.0, we have enforced a 64MB limit in userspace
@@ -1257,16 +1266,23 @@ dmu_recv_begin(char *tofs, char *tosnap, dmu_replay_record_t *drr_begin,
        if (payloadlen > (MIN((1U << 28), arc_all_memory() / 4)))
                return (E2BIG);
 
-       if (payloadlen != 0)
-               payload = vmem_alloc(payloadlen, KM_SLEEP);
 
-       err = receive_read_payload_and_next_header(drc, payloadlen,
-           payload);
-       if (err != 0) {
-               vmem_free(payload, payloadlen);
-               return (err);
-       }
        if (payloadlen != 0) {
+               void *payload = vmem_alloc(payloadlen, KM_SLEEP);
+               /*
+                * For compatibility with recursive send streams, we don't do
+                * this here if the stream could be part of a package. Instead,
+                * we'll do it in dmu_recv_stream. If we pull the next header
+                * too early, and it's the END record, we break the `recv_skip`
+                * logic.
+                */
+
+               err = receive_read_payload_and_next_header(drc, payloadlen,
+                   payload);
+               if (err != 0) {
+                       vmem_free(payload, payloadlen);
+                       return (err);
+               }
                err = nvlist_unpack(payload, payloadlen, &drc->drc_begin_nvl,
                    KM_SLEEP);
                vmem_free(payload, payloadlen);
@@ -1336,8 +1352,10 @@ corrective_read_done(zio_t *zio)
 {
        cr_cb_data_t *data = zio->io_private;
        /* Corruption corrected; update error log if needed */
-       if (zio->io_error == 0)
-               spa_remove_error(data->spa, &data->zb);
+       if (zio->io_error == 0) {
+               spa_remove_error(data->spa, &data->zb,
+                   BP_GET_LOGICAL_BIRTH(zio->io_bp));
+       }
        kmem_free(data, sizeof (cr_cb_data_t));
        abd_free(zio->io_abd);
 }
@@ -1355,8 +1373,8 @@ do_corrective_recv(struct receive_writer_arg *rwa, struct drr_write *drrw,
        dnode_t *dn;
        abd_t *abd = rrd->abd;
        zio_cksum_t bp_cksum = bp->blk_cksum;
-       zio_flag_t flags = ZIO_FLAG_SPECULATIVE |
-           ZIO_FLAG_DONT_CACHE | ZIO_FLAG_DONT_RETRY | ZIO_FLAG_CANFAIL;
+       zio_flag_t flags = ZIO_FLAG_SPECULATIVE | ZIO_FLAG_DONT_RETRY |
+           ZIO_FLAG_CANFAIL;
 
        if (rwa->raw)
                flags |= ZIO_FLAG_RAW;
@@ -1389,8 +1407,9 @@ do_corrective_recv(struct receive_writer_arg *rwa, struct drr_write *drrw,
                /* Recompress the data */
                abd_t *cabd = abd_alloc_linear(BP_GET_PSIZE(bp),
                    B_FALSE);
+               void *buf = abd_to_buf(cabd);
                uint64_t csize = zio_compress_data(BP_GET_COMPRESS(bp),
-                   abd, abd_to_buf(cabd), abd_get_size(abd),
+                   abd, &buf, abd_get_size(abd),
                    rwa->os->os_complevel);
                abd_zero_off(cabd, csize, BP_GET_PSIZE(bp) - csize);
                /* Swap in newly compressed data into the abd */
@@ -1463,8 +1482,9 @@ do_corrective_recv(struct receive_writer_arg *rwa, struct drr_write *drrw,
        }
        rrd->abd = abd;
 
-       io = zio_rewrite(NULL, rwa->os->os_spa, bp->blk_birth, bp, abd,
-           BP_GET_PSIZE(bp), NULL, NULL, ZIO_PRIORITY_SYNC_WRITE, flags, &zb);
+       io = zio_rewrite(NULL, rwa->os->os_spa, BP_GET_LOGICAL_BIRTH(bp), bp,
+           abd, BP_GET_PSIZE(bp), NULL, NULL, ZIO_PRIORITY_SYNC_WRITE, flags,
+           &zb);
 
        ASSERT(abd_get_size(abd) == BP_GET_LSIZE(bp) ||
            abd_get_size(abd) == BP_GET_PSIZE(bp));
@@ -1778,17 +1798,19 @@ receive_handle_existing_object(const struct receive_writer_arg *rwa,
        }
 
        /*
-        * The dmu does not currently support decreasing nlevels
-        * or changing the number of dnode slots on an object. For
-        * non-raw sends, this does not matter and the new object
-        * can just use the previous one's nlevels. For raw sends,
-        * however, the structure of the received dnode (including
-        * nlevels and dnode slots) must match that of the send
-        * side. Therefore, instead of using dmu_object_reclaim(),
-        * we must free the object completely and call
-        * dmu_object_claim_dnsize() instead.
+        * The dmu does not currently support decreasing nlevels or changing
+        * indirect block size if there is already one, same as changing the
+        * number of of dnode slots on an object.  For non-raw sends this
+        * does not matter and the new object can just use the previous one's
+        * parameters.  For raw sends, however, the structure of the received
+        * dnode (including indirects and dnode slots) must match that of the
+        * send side.  Therefore, instead of using dmu_object_reclaim(), we
+        * must free the object completely and call dmu_object_claim_dnsize()
+        * instead.
         */
-       if ((rwa->raw && drro->drr_nlevels < doi->doi_indirection) ||
+       if ((rwa->raw && ((doi->doi_indirection > 1 &&
+           indblksz != doi->doi_metadata_block_size) ||
+           drro->drr_nlevels < doi->doi_indirection)) ||
            dn_slots != doi->doi_dnodesize >> DNODE_SHIFT) {
                err = dmu_free_long_object(rwa->os, drro->drr_object);
                if (err != 0)
@@ -1914,10 +1936,22 @@ receive_object(struct receive_writer_arg *rwa, struct drr_object *drro,
                /* object was freed and we are about to allocate a new one */
                object_to_hold = DMU_NEW_OBJECT;
        } else {
+               /*
+                * If the only record in this range so far was DRR_FREEOBJECTS
+                * with at least one actually freed object, it's possible that
+                * the block will now be converted to a hole. We need to wait
+                * for the txg to sync to prevent races.
+                */
+               if (rwa->or_need_sync == ORNS_YES)
+                       txg_wait_synced(dmu_objset_pool(rwa->os), 0);
+
                /* object is free and we are about to allocate a new one */
                object_to_hold = DMU_NEW_OBJECT;
        }
 
+       /* Only relevant for the first object in the range */
+       rwa->or_need_sync = ORNS_NO;
+
        /*
         * If this is a multi-slot dnode there is a chance that this
         * object will expand into a slot that is already used by
@@ -2079,6 +2113,16 @@ receive_object(struct receive_writer_arg *rwa, struct drr_object *drro,
                dmu_buf_rele(db, FTAG);
                dnode_rele(dn, FTAG);
        }
+
+       /*
+        * If the receive fails, we want the resume stream to start with the
+        * same record that we last successfully received. There is no way to
+        * request resume from the object record, but we can benefit from the
+        * fact that sender always sends object record before anything else,
+        * after which it will "resend" data at offset 0 and resume normally.
+        */
+       save_resume_state(rwa, drro->drr_object, 0, tx);
+
        dmu_tx_commit(tx);
 
        return (0);
@@ -2111,6 +2155,9 @@ receive_freeobjects(struct receive_writer_arg *rwa,
 
                if (err != 0)
                        return (err);
+
+               if (rwa->or_need_sync == ORNS_MAYBE)
+                       rwa->or_need_sync = ORNS_YES;
        }
        if (next_err != ESRCH)
                return (next_err);
@@ -2194,7 +2241,7 @@ flush_write_batch_impl(struct receive_writer_arg *rwa)
                        if (err == 0)
                                abd_free(abd);
                } else {
-                       zio_prop_t zp;
+                       zio_prop_t zp = {0};
                        dmu_write_policy(rwa->os, dn, 0, 0, &zp);
 
                        zio_flag_t zio_flags = 0;
@@ -2309,7 +2356,6 @@ receive_process_write_record(struct receive_writer_arg *rwa,
        if (rwa->heal) {
                blkptr_t *bp;
                dmu_buf_t *dbp;
-               dnode_t *dn;
                int flags = DB_RF_CANFAIL;
 
                if (rwa->raw)
@@ -2341,19 +2387,15 @@ receive_process_write_record(struct receive_writer_arg *rwa,
                        dmu_buf_rele(dbp, FTAG);
                        return (err);
                }
-               dn = dmu_buf_dnode_enter(dbp);
                /* Make sure the on-disk block and recv record sizes match */
-               if (drrw->drr_logical_size !=
-                   dn->dn_datablkszsec << SPA_MINBLOCKSHIFT) {
+               if (drrw->drr_logical_size != dbp->db_size) {
                        err = ENOTSUP;
-                       dmu_buf_dnode_exit(dbp);
                        dmu_buf_rele(dbp, FTAG);
                        return (err);
                }
                /* Get the block pointer for the corrupted block */
                bp = dmu_buf_get_blkptr(dbp);
                err = do_corrective_recv(rwa, drrw, rrd, bp);
-               dmu_buf_dnode_exit(dbp);
                dmu_buf_rele(dbp, FTAG);
                return (err);
        }
@@ -2498,7 +2540,7 @@ receive_spill(struct receive_writer_arg *rwa, struct drr_spill *drrs,
         * size of the provided arc_buf_t.
         */
        if (db_spill->db_size != drrs->drr_length) {
-               dmu_buf_will_fill(db_spill, tx);
+               dmu_buf_will_fill(db_spill, tx, B_FALSE);
                VERIFY0(dbuf_spill_set_blksz(db_spill,
                    drrs->drr_length, tx));
        }
@@ -2604,6 +2646,8 @@ receive_object_range(struct receive_writer_arg *rwa,
        memcpy(rwa->or_mac, drror->drr_mac, ZIO_DATA_MAC_LEN);
        rwa->or_byteorder = byteorder;
 
+       rwa->or_need_sync = ORNS_MAYBE;
+
        return (0);
 }
 
@@ -3288,6 +3332,17 @@ dmu_recv_stream(dmu_recv_cookie_t *drc, offset_t *voffp)
                        goto out;
        }
 
+       /*
+        * For compatibility with recursive send streams, we do this here,
+        * rather than in dmu_recv_begin. If we pull the next header too
+        * early, and it's the END record, we break the `recv_skip` logic.
+        */
+       if (drc->drc_drr_begin->drr_payloadlen == 0) {
+               err = receive_read_payload_and_next_header(drc, 0, NULL);
+               if (err != 0)
+                       goto out;
+       }
+
        /*
         * If we failed before this point we will clean up any new resume
         * state that was created. Now that we've gotten past the initial