* You may not use this file except in compliance with the License.
*
* You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
- * or http://www.opensolaris.org/os/licensing.
+ * or https://opensource.org/licenses/CDDL-1.0.
* See the License for the specific language governing permissions
* and limitations under the License.
*
* Copyright 2014 HybridCluster. All rights reserved.
* Copyright 2016 RackTop Systems.
* Copyright (c) 2016 Actifio, Inc. All rights reserved.
+ * Copyright (c) 2019, Klara Inc.
+ * Copyright (c) 2019, Allan Jude
*/
#include <sys/dmu.h>
#include <sys/ddt.h>
#include <sys/zfs_onexit.h>
#include <sys/dmu_send.h>
+#include <sys/dmu_recv.h>
#include <sys/dsl_destroy.h>
#include <sys/blkptr.h>
#include <sys/dsl_bookmark.h>
#endif
/* Set this tunable to TRUE to replace corrupt data with 0x2f5baddb10c */
-int zfs_send_corrupt_data = B_FALSE;
+static int zfs_send_corrupt_data = B_FALSE;
/*
* This tunable controls the amount of data (measured in bytes) that will be
* prefetched by zfs send. If the main thread is blocking on reads that haven't
* thread is issuing new reads because the prefetches have fallen out of the
* cache, this may need to be decreased.
*/
-int zfs_send_queue_length = SPA_MAXBLOCKSIZE;
+static uint_t zfs_send_queue_length = SPA_MAXBLOCKSIZE;
/*
* This tunable controls the length of the queues that zfs send worker threads
* use to communicate. If the send_main_thread is blocking on these queues,
* at the start of a send as these threads consume all the available IO
* resources, this variable may need to be decreased.
*/
-int zfs_send_no_prefetch_queue_length = 1024 * 1024;
+static uint_t zfs_send_no_prefetch_queue_length = 1024 * 1024;
/*
* These tunables control the fill fraction of the queues by zfs send. The fill
* fraction controls the frequency with which threads have to be cv_signaled.
* down. If the queues empty before the signalled thread can catch up, then
* these should be tuned up.
*/
-int zfs_send_queue_ff = 20;
-int zfs_send_no_prefetch_queue_ff = 20;
+static uint_t zfs_send_queue_ff = 20;
+static uint_t zfs_send_no_prefetch_queue_ff = 20;
/*
* Use this to override the recordsize calculation for fast zfs send estimates.
*/
-int zfs_override_estimate_recordsize = 0;
+static uint_t zfs_override_estimate_recordsize = 0;
/* Set this tunable to FALSE to disable setting of DRR_FLAG_FREERECORDS */
-int zfs_send_set_freerecords_bit = B_TRUE;
+static const boolean_t zfs_send_set_freerecords_bit = B_TRUE;
/* Set this tunable to FALSE is disable sending unmodified spill blocks. */
-int zfs_send_unmodified_spill_blocks = B_TRUE;
+static int zfs_send_unmodified_spill_blocks = B_TRUE;
static inline boolean_t
overflow_multiply(uint64_t a, uint64_t b, uint64_t *c)
return (B_TRUE);
}
-/*
- * Return B_TRUE and modifies *out to the span if the span is less than 2^64,
- * returns B_FALSE otherwise.
- */
-static inline boolean_t
-bp_span(uint32_t datablksz, uint8_t indblkshift, uint64_t level, uint64_t *out)
-{
- uint64_t spanb = bp_span_in_blocks(indblkshift, level);
- return (overflow_multiply(spanb, datablksz, out));
-}
-
struct send_thread_arg {
bqueue_t q;
- dsl_dataset_t *ds; /* Dataset to traverse */
- redaction_list_t *redaction_list;
- struct send_redact_record *current_record;
+ objset_t *os; /* Objset to traverse */
uint64_t fromtxg; /* Traverse from this txg */
int flags; /* flags to pass to traverse_dataset */
int error_code;
boolean_t cancel;
zbookmark_phys_t resume;
- objlist_t *deleted_objs;
uint64_t *num_blocks_visited;
};
uint64_t *num_blocks_visited;
};
-/*
- * A wrapper around struct redact_block so it can be stored in a list_t.
- */
-struct redact_block_list_node {
- redact_block_phys_t block;
- list_node_t node;
-};
-
-struct redact_bookmark_info {
- redact_block_phys_t rbi_furthest[TXG_SIZE];
- /* Lists of struct redact_block_list_node. */
- list_t rbi_blocks[TXG_SIZE];
- boolean_t rbi_synctasc_txg[TXG_SIZE];
- uint64_t rbi_latest_synctask_txg;
- redaction_list_t *rbi_redaction_list;
-};
-
struct send_merge_thread_arg {
bqueue_t q;
objset_t *os;
struct redact_list_thread_arg *redact_arg;
int error;
boolean_t cancel;
- struct redact_bookmark_info rbi;
- /*
- * If we're resuming a redacted send, then the object/offset from the
- * resume token may be different from the object/offset that we have
- * updated the bookmark to. resume_redact_zb will store the earlier of
- * the two object/offset pairs, and bookmark_before will be B_TRUE if
- * resume_redact_zb has the object/offset for resuming the redaction
- * bookmark, and B_FALSE if resume_redact_zb is storing the
- * object/offset from the resume token.
- */
- zbookmark_phys_t resume_redact_zb;
- boolean_t bookmark_before;
};
struct send_range {
union {
struct srd {
dmu_object_type_t obj_type;
- uint32_t datablksz;
+ uint32_t datablksz; // logical size
+ uint32_t datasz; // payload size
blkptr_t bp;
+ arc_buf_t *abuf;
+ abd_t *abd;
+ kmutex_t lock;
+ kcondvar_t cv;
+ boolean_t io_outstanding;
+ boolean_t io_compressed;
+ int io_err;
} data;
struct srh {
uint32_t datablksz;
size_t size = sizeof (dnode_phys_t) *
(range->sru.object.dnp->dn_extra_slots + 1);
kmem_free(range->sru.object.dnp, size);
+ } else if (range->type == DATA) {
+ mutex_enter(&range->sru.data.lock);
+ while (range->sru.data.io_outstanding)
+ cv_wait(&range->sru.data.cv, &range->sru.data.lock);
+ if (range->sru.data.abd != NULL)
+ abd_free(range->sru.data.abd);
+ if (range->sru.data.abuf != NULL) {
+ arc_buf_destroy(range->sru.data.abuf,
+ &range->sru.data.abuf);
+ }
+ mutex_exit(&range->sru.data.lock);
+
+ cv_destroy(&range->sru.data.cv);
+ mutex_destroy(&range->sru.data.lock);
}
kmem_free(range, sizeof (*range));
}
if (payload_len != 0) {
*dscp->dsc_off += payload_len;
/*
- * payload is null when dso->ryrun == B_TRUE (i.e. when we're
+ * payload is null when dso_dryrun == B_TRUE (i.e. when we're
* doing a send size calculation)
*/
if (payload != NULL) {
}
}
/* create a FREE record and make it pending */
- bzero(dscp->dsc_drr, sizeof (dmu_replay_record_t));
+ memset(dscp->dsc_drr, 0, sizeof (dmu_replay_record_t));
dscp->dsc_drr->drr_type = DRR_FREE;
drrf->drr_object = object;
drrf->drr_offset = offset;
}
}
/* create a REDACT record and make it pending */
- bzero(dscp->dsc_drr, sizeof (dmu_replay_record_t));
+ memset(dscp->dsc_drr, 0, sizeof (dmu_replay_record_t));
dscp->dsc_drr->drr_type = DRR_REDACT;
drrr->drr_object = object;
drrr->drr_offset = offset;
}
static int
-dump_write(dmu_send_cookie_t *dscp, dmu_object_type_t type, uint64_t object,
- uint64_t offset, int lsize, int psize, const blkptr_t *bp, void *data)
+dmu_dump_write(dmu_send_cookie_t *dscp, dmu_object_type_t type, uint64_t object,
+ uint64_t offset, int lsize, int psize, const blkptr_t *bp,
+ boolean_t io_compressed, void *data)
{
uint64_t payload_size;
boolean_t raw = (dscp->dsc_featureflags & DMU_BACKUP_FEATURE_RAW);
dscp->dsc_pending_op = PENDING_NONE;
}
/* write a WRITE record */
- bzero(dscp->dsc_drr, sizeof (dmu_replay_record_t));
+ memset(dscp->dsc_drr, 0, sizeof (dmu_replay_record_t));
dscp->dsc_drr->drr_type = DRR_WRITE;
drrw->drr_object = object;
drrw->drr_type = type;
drrw->drr_logical_size = lsize;
/* only set the compression fields if the buf is compressed or raw */
- if (raw || lsize != psize) {
+ boolean_t compressed =
+ (bp != NULL ? BP_GET_COMPRESS(bp) != ZIO_COMPRESS_OFF &&
+ io_compressed : lsize != psize);
+ if (raw || compressed) {
+ ASSERT(bp != NULL);
ASSERT(raw || dscp->dsc_featureflags &
DMU_BACKUP_FEATURE_COMPRESSED);
ASSERT(!BP_IS_EMBEDDED(bp));
/*
* There's no pre-computed checksum for partial-block writes,
* embedded BP's, or encrypted BP's that are being sent as
- * plaintext, so (like fletcher4-checkummed blocks) userland
+ * plaintext, so (like fletcher4-checksummed blocks) userland
* will have to compute a dedup-capable checksum itself.
*/
drrw->drr_checksumtype = ZIO_CHECKSUM_OFF;
ASSERT(BP_IS_EMBEDDED(bp));
- bzero(dscp->dsc_drr, sizeof (dmu_replay_record_t));
+ memset(dscp->dsc_drr, 0, sizeof (dmu_replay_record_t));
dscp->dsc_drr->drr_type = DRR_WRITE_EMBEDDED;
drrw->drr_object = object;
drrw->drr_offset = offset;
decode_embedded_bp_compressed(bp, buf);
- if (dump_record(dscp, buf, P2ROUNDUP(drrw->drr_psize, 8)) != 0)
+ uint32_t psize = drrw->drr_psize;
+ uint32_t rsize = P2ROUNDUP(psize, 8);
+
+ if (psize != rsize)
+ memset(buf + psize, 0, rsize - psize);
+
+ if (dump_record(dscp, buf, rsize) != 0)
return (SET_ERROR(EINTR));
return (0);
}
}
/* write a SPILL record */
- bzero(dscp->dsc_drr, sizeof (dmu_replay_record_t));
+ memset(dscp->dsc_drr, 0, sizeof (dmu_replay_record_t));
dscp->dsc_drr->drr_type = DRR_SPILL;
drrs->drr_object = object;
drrs->drr_length = blksz;
* receiving side.
*/
if (maxobj > 0) {
- if (maxobj < firstobj)
+ if (maxobj <= firstobj)
return (0);
if (maxobj < firstobj + numobjs)
return (SET_ERROR(EINTR));
dscp->dsc_pending_op = PENDING_NONE;
}
- if (numobjs == 0)
- numobjs = UINT64_MAX - firstobj;
if (dscp->dsc_pending_op == PENDING_FREEOBJECTS) {
/*
}
/* write a FREEOBJECTS record */
- bzero(dscp->dsc_drr, sizeof (dmu_replay_record_t));
+ memset(dscp->dsc_drr, 0, sizeof (dmu_replay_record_t));
dscp->dsc_drr->drr_type = DRR_FREEOBJECTS;
drrfo->drr_firstobj = firstobj;
drrfo->drr_numobjs = numobjs;
}
/* write an OBJECT record */
- bzero(dscp->dsc_drr, sizeof (dmu_replay_record_t));
+ memset(dscp->dsc_drr, 0, sizeof (dmu_replay_record_t));
dscp->dsc_drr->drr_type = DRR_OBJECT;
drro->drr_object = object;
drro->drr_type = dnp->dn_type;
* to send it.
*/
if (bonuslen != 0) {
+ if (drro->drr_bonuslen > DN_MAX_BONUS_LEN(dnp))
+ return (SET_ERROR(EINVAL));
drro->drr_raw_bonuslen = DN_MAX_BONUS_LEN(dnp);
bonuslen = drro->drr_raw_bonuslen;
}
struct send_range record;
blkptr_t *bp = DN_SPILL_BLKPTR(dnp);
- bzero(&record, sizeof (struct send_range));
+ memset(&record, 0, sizeof (struct send_range));
record.type = DATA;
record.object = object;
record.eos_marker = B_FALSE;
dscp->dsc_pending_op = PENDING_NONE;
}
- bzero(dscp->dsc_drr, sizeof (dmu_replay_record_t));
+ memset(dscp->dsc_drr, 0, sizeof (dmu_replay_record_t));
dscp->dsc_drr->drr_type = DRR_OBJECT_RANGE;
drror->drr_firstobj = firstobj;
drror->drr_numslots = numslots;
}
static boolean_t
-send_do_embed(dmu_send_cookie_t *dscp, const blkptr_t *bp)
+send_do_embed(const blkptr_t *bp, uint64_t featureflags)
{
if (!BP_IS_EMBEDDED(bp))
return (B_FALSE);
* Compression function must be legacy, or explicitly enabled.
*/
if ((BP_GET_COMPRESS(bp) >= ZIO_COMPRESS_LEGACY_FUNCTIONS &&
- !(dscp->dsc_featureflags & DMU_BACKUP_FEATURE_LZ4)))
+ !(featureflags & DMU_BACKUP_FEATURE_LZ4)))
+ return (B_FALSE);
+
+ /*
+ * If we have not set the ZSTD feature flag, we can't send ZSTD
+ * compressed embedded blocks, as the receiver may not support them.
+ */
+ if ((BP_GET_COMPRESS(bp) == ZIO_COMPRESS_ZSTD &&
+ !(featureflags & DMU_BACKUP_FEATURE_ZSTD)))
return (B_FALSE);
/*
*/
switch (BPE_GET_ETYPE(bp)) {
case BP_EMBEDDED_TYPE_DATA:
- if (dscp->dsc_featureflags & DMU_BACKUP_FEATURE_EMBED_DATA)
+ if (featureflags & DMU_BACKUP_FEATURE_EMBED_DATA)
return (B_TRUE);
break;
default:
/*
* This function actually handles figuring out what kind of record needs to be
- * dumped, reading the data (which has hopefully been prefetched), and calling
- * the appropriate helper function.
+ * dumped, and calling the appropriate helper function. In most cases,
+ * the data has already been read by send_reader_thread().
*/
static int
do_dump(dmu_send_cookie_t *dscp, struct send_range *range)
ASSERT3U(range->start_blkid + 1, ==, range->end_blkid);
if (BP_GET_TYPE(bp) == DMU_OT_SA) {
arc_flags_t aflags = ARC_FLAG_WAIT;
- enum zio_flag zioflags = ZIO_FLAG_CANFAIL;
+ zio_flag_t zioflags = ZIO_FLAG_CANFAIL;
if (dscp->dsc_featureflags & DMU_BACKUP_FEATURE_RAW) {
ASSERT(BP_IS_PROTECTED(bp));
zioflags |= ZIO_FLAG_RAW;
}
- arc_buf_t *abuf;
zbookmark_phys_t zb;
ASSERT3U(range->start_blkid, ==, DMU_SPILL_BLKID);
zb.zb_objset = dmu_objset_id(dscp->dsc_os);
zb.zb_level = 0;
zb.zb_blkid = range->start_blkid;
+ arc_buf_t *abuf = NULL;
if (!dscp->dsc_dso->dso_dryrun && arc_read(NULL, spa,
bp, arc_getbuf_func, &abuf, ZIO_PRIORITY_ASYNC_READ,
zioflags, &aflags, &zb) != 0)
return (SET_ERROR(EIO));
- err = dump_spill(dscp, bp, zb.zb_object, abuf->b_data);
- arc_buf_destroy(abuf, &abuf);
+ err = dump_spill(dscp, bp, zb.zb_object,
+ (abuf == NULL ? NULL : abuf->b_data));
+ if (abuf != NULL)
+ arc_buf_destroy(abuf, &abuf);
return (err);
}
- if (send_do_embed(dscp, bp)) {
+ if (send_do_embed(bp, dscp->dsc_featureflags)) {
err = dump_write_embedded(dscp, range->object,
range->start_blkid * srdp->datablksz,
srdp->datablksz, bp);
range->start_blkid * srdp->datablksz >=
dscp->dsc_resume_offset));
/* it's a level-0 block of a regular object */
- arc_flags_t aflags = ARC_FLAG_WAIT;
- arc_buf_t *abuf = NULL;
- uint64_t offset;
- /*
- * If we have large blocks stored on disk but the send flags
- * don't allow us to send large blocks, we split the data from
- * the arc buf into chunks.
- */
- boolean_t split_large_blocks =
- srdp->datablksz > SPA_OLD_MAXBLOCKSIZE &&
- !(dscp->dsc_featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS);
-
- /*
- * Raw sends require that we always get raw data as it exists
- * on disk, so we assert that we are not splitting blocks here.
- */
- boolean_t request_raw =
- (dscp->dsc_featureflags & DMU_BACKUP_FEATURE_RAW) != 0;
-
- /*
- * We should only request compressed data from the ARC if all
- * the following are true:
- * - stream compression was requested
- * - we aren't splitting large blocks into smaller chunks
- * - the data won't need to be byteswapped before sending
- * - this isn't an embedded block
- * - this isn't metadata (if receiving on a different endian
- * system it can be byteswapped more easily)
- */
- boolean_t request_compressed =
- (dscp->dsc_featureflags & DMU_BACKUP_FEATURE_COMPRESSED) &&
- !split_large_blocks && !BP_SHOULD_BYTESWAP(bp) &&
- !BP_IS_EMBEDDED(bp) && !DMU_OT_IS_METADATA(BP_GET_TYPE(bp));
-
- IMPLY(request_raw, !split_large_blocks);
- IMPLY(request_raw, BP_IS_PROTECTED(bp));
- if (!dscp->dsc_dso->dso_dryrun) {
- enum zio_flag zioflags = ZIO_FLAG_CANFAIL;
-
- ASSERT3U(srdp->datablksz, ==, BP_GET_LSIZE(bp));
-
- if (request_raw)
- zioflags |= ZIO_FLAG_RAW;
- else if (request_compressed)
- zioflags |= ZIO_FLAG_RAW_COMPRESS;
- zbookmark_phys_t zb;
- zb.zb_objset = dmu_objset_id(dscp->dsc_os);
- zb.zb_object = range->object;
- zb.zb_level = 0;
- zb.zb_blkid = range->start_blkid;
-
- err = arc_read(NULL, spa, bp, arc_getbuf_func, &abuf,
- ZIO_PRIORITY_ASYNC_READ, zioflags, &aflags, &zb);
- }
+ mutex_enter(&srdp->lock);
+ while (srdp->io_outstanding)
+ cv_wait(&srdp->cv, &srdp->lock);
+ err = srdp->io_err;
+ mutex_exit(&srdp->lock);
if (err != 0) {
if (zfs_send_corrupt_data &&
!dscp->dsc_dso->dso_dryrun) {
- /* Send a block filled with 0x"zfs badd bloc" */
- abuf = arc_alloc_buf(spa, &abuf, ARC_BUFC_DATA,
- srdp->datablksz);
+ /*
+ * Send a block filled with 0x"zfs badd bloc"
+ */
+ srdp->abuf = arc_alloc_buf(spa, &srdp->abuf,
+ ARC_BUFC_DATA, srdp->datablksz);
uint64_t *ptr;
- for (ptr = abuf->b_data;
- (char *)ptr < (char *)abuf->b_data +
+ for (ptr = srdp->abuf->b_data;
+ (char *)ptr < (char *)srdp->abuf->b_data +
srdp->datablksz; ptr++)
*ptr = 0x2f5baddb10cULL;
} else {
}
}
- offset = range->start_blkid * srdp->datablksz;
+ ASSERT(dscp->dsc_dso->dso_dryrun ||
+ srdp->abuf != NULL || srdp->abd != NULL);
- if (split_large_blocks) {
- ASSERT0(arc_is_encrypted(abuf));
- ASSERT3U(arc_get_compression(abuf), ==,
- ZIO_COMPRESS_OFF);
- char *buf = abuf->b_data;
+ uint64_t offset = range->start_blkid * srdp->datablksz;
+
+ char *data = NULL;
+ if (srdp->abd != NULL) {
+ data = abd_to_buf(srdp->abd);
+ ASSERT3P(srdp->abuf, ==, NULL);
+ } else if (srdp->abuf != NULL) {
+ data = srdp->abuf->b_data;
+ }
+
+ /*
+ * If we have large blocks stored on disk but the send flags
+ * don't allow us to send large blocks, we split the data from
+ * the arc buf into chunks.
+ */
+ if (srdp->datablksz > SPA_OLD_MAXBLOCKSIZE &&
+ !(dscp->dsc_featureflags &
+ DMU_BACKUP_FEATURE_LARGE_BLOCKS)) {
while (srdp->datablksz > 0 && err == 0) {
int n = MIN(srdp->datablksz,
SPA_OLD_MAXBLOCKSIZE);
- err = dump_write(dscp, srdp->obj_type,
- range->object, offset, n, n, NULL, buf);
+ err = dmu_dump_write(dscp, srdp->obj_type,
+ range->object, offset, n, n, NULL, B_FALSE,
+ data);
offset += n;
- buf += n;
+ /*
+ * When doing dry run, data==NULL is used as a
+ * sentinel value by
+ * dmu_dump_write()->dump_record().
+ */
+ if (data != NULL)
+ data += n;
srdp->datablksz -= n;
}
} else {
- int psize;
- if (abuf != NULL) {
- psize = arc_buf_size(abuf);
- if (arc_get_compression(abuf) !=
- ZIO_COMPRESS_OFF) {
- ASSERT3S(psize, ==, BP_GET_PSIZE(bp));
- }
- } else if (!request_compressed) {
- psize = srdp->datablksz;
- } else {
- psize = BP_GET_PSIZE(bp);
- }
- err = dump_write(dscp, srdp->obj_type, range->object,
- offset, srdp->datablksz, psize, bp,
- (abuf == NULL ? NULL : abuf->b_data));
+ err = dmu_dump_write(dscp, srdp->obj_type,
+ range->object, offset,
+ srdp->datablksz, srdp->datasz, bp,
+ srdp->io_compressed, data);
}
- if (abuf != NULL)
- arc_buf_destroy(abuf, &abuf);
return (err);
}
case HOLE: {
return (err);
}
-struct send_range *
+static struct send_range *
range_alloc(enum type type, uint64_t object, uint64_t start_blkid,
uint64_t end_blkid, boolean_t eos)
{
range->start_blkid = start_blkid;
range->end_blkid = end_blkid;
range->eos_marker = eos;
+ if (type == DATA) {
+ range->sru.data.abd = NULL;
+ range->sru.data.abuf = NULL;
+ mutex_init(&range->sru.data.lock, NULL, MUTEX_DEFAULT, NULL);
+ cv_init(&range->sru.data.cv, NULL, CV_DEFAULT, NULL);
+ range->sru.data.io_outstanding = 0;
+ range->sru.data.io_err = 0;
+ range->sru.data.io_compressed = B_FALSE;
+ }
return (range);
}
* This is the callback function to traverse_dataset that acts as a worker
* thread for dmu_send_impl.
*/
-/*ARGSUSED*/
static int
send_cb(spa_t *spa, zilog_t *zilog, const blkptr_t *bp,
const zbookmark_phys_t *zb, const struct dnode_phys *dnp, void *arg)
{
+ (void) zilog;
struct send_thread_arg *sta = arg;
struct send_range *record;
ASSERT(zb->zb_object == DMU_META_DNODE_OBJECT ||
zb->zb_object >= sta->resume.zb_object);
- ASSERT3P(sta->ds, !=, NULL);
/*
* All bps of an encrypted os should have the encryption bit set.
* If this is not true it indicates tampering and we report an error.
*/
- objset_t *os;
- VERIFY0(dmu_objset_from_ds(sta->ds, &os));
- if (os->os_encrypted &&
+ if (sta->os->os_encrypted &&
!BP_IS_HOLE(bp) && !BP_USES_CRYPT(bp)) {
- spa_log_error(spa, zb);
- zfs_panic_recover("unencrypted block in encrypted "
- "object set %llu", sta->ds->ds_object);
+ spa_log_error(spa, zb, &bp->blk_birth);
return (SET_ERROR(EIO));
}
record->sru.object.bp = *bp;
size_t size = sizeof (*dnp) * (dnp->dn_extra_slots + 1);
record->sru.object.dnp = kmem_alloc(size, KM_SLEEP);
- bcopy(dnp, record->sru.object.dnp, size);
+ memcpy(record->sru.object.dnp, dnp, size);
bqueue_enqueue(&sta->q, record, sizeof (*record));
return (0);
}
if (zb->zb_blkid == DMU_SPILL_BLKID)
ASSERT3U(BP_GET_TYPE(bp), ==, DMU_OT_SA);
- record = range_alloc(DATA, zb->zb_object, start, (start + span < start ?
- 0 : start + span), B_FALSE);
+ enum type record_type = DATA;
+ if (BP_IS_HOLE(bp))
+ record_type = HOLE;
+ else if (BP_IS_REDACTED(bp))
+ record_type = REDACT;
+ else
+ record_type = DATA;
+
+ record = range_alloc(record_type, zb->zb_object, start,
+ (start + span < start ? 0 : start + span), B_FALSE);
uint64_t datablksz = (zb->zb_blkid == DMU_SPILL_BLKID ?
BP_GET_LSIZE(bp) : dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT);
+
if (BP_IS_HOLE(bp)) {
- record->type = HOLE;
record->sru.hole.datablksz = datablksz;
} else if (BP_IS_REDACTED(bp)) {
- record->type = REDACT;
record->sru.redact.datablksz = datablksz;
} else {
- record->type = DATA;
record->sru.data.datablksz = datablksz;
record->sru.data.obj_type = dnp->dn_type;
record->sru.data.bp = *bp;
}
+
bqueue_enqueue(&sta->q, record, sizeof (*record));
return (0);
}
/*
* This function kicks off the traverse_dataset. It also handles setting the
* error code of the thread in case something goes wrong, and pushes the End of
- * Stream record when the traverse_dataset call has finished. If there is no
- * dataset to traverse, then we traverse the redaction list provided and enqueue
- * records for that. If neither is provided, the thread immediately pushes an
- * End of Stream marker.
+ * Stream record when the traverse_dataset call has finished.
*/
-static void
+static __attribute__((noreturn)) void
send_traverse_thread(void *arg)
{
struct send_thread_arg *st_arg = arg;
struct send_range *data;
fstrans_cookie_t cookie = spl_fstrans_mark();
- if (st_arg->ds != NULL) {
- ASSERT3P(st_arg->redaction_list, ==, NULL);
- err = traverse_dataset_resume(st_arg->ds,
- st_arg->fromtxg, &st_arg->resume,
- st_arg->flags, send_cb, st_arg);
- } else if (st_arg->redaction_list != NULL) {
- struct redact_list_cb_arg rlcba = {0};
- rlcba.cancel = &st_arg->cancel;
- rlcba.num_blocks_visited = st_arg->num_blocks_visited;
- rlcba.q = &st_arg->q;
- rlcba.mark_redact = B_FALSE;
- err = dsl_redaction_list_traverse(st_arg->redaction_list,
- &st_arg->resume, redact_list_cb, &rlcba);
- }
+ err = traverse_dataset_resume(st_arg->os->os_dsl_dataset,
+ st_arg->fromtxg, &st_arg->resume,
+ st_arg->flags, send_cb, st_arg);
if (err != EINTR)
st_arg->error_code = err;
return (-1);
if (from_obj >= to_end_obj)
return (1);
- int64_t cmp = AVL_CMP(to->type == OBJECT_RANGE, from->type ==
+ int64_t cmp = TREE_CMP(to->type == OBJECT_RANGE, from->type ==
OBJECT_RANGE);
if (unlikely(cmp))
return (cmp);
- cmp = AVL_CMP(to->type == OBJECT, from->type == OBJECT);
+ cmp = TREE_CMP(to->type == OBJECT, from->type == OBJECT);
if (unlikely(cmp))
return (cmp);
if (from->end_blkid <= to->start_blkid)
return (next);
}
-static void
+static __attribute__((noreturn)) void
redact_list_thread(void *arg)
{
struct redact_list_thread_arg *rlt_arg = arg;
record = range_alloc(DATA, 0, 0, 0, B_TRUE);
bqueue_enqueue_flush(&rlt_arg->q, record, sizeof (*record));
spl_fstrans_unmark(cookie);
+
+ thread_exit();
}
/*
uint64_t r1_l0equiv = r1->start_blkid;
uint64_t r2_objequiv = r2->object;
uint64_t r2_l0equiv = r2->start_blkid;
- int64_t cmp = AVL_CMP(r1->eos_marker, r2->eos_marker);
+ int64_t cmp = TREE_CMP(r1->eos_marker, r2->eos_marker);
if (unlikely(cmp))
return (cmp);
if (r1->object == 0) {
r2_l0equiv = 0;
}
- cmp = AVL_CMP(r1_objequiv, r2_objequiv);
+ cmp = TREE_CMP(r1_objequiv, r2_objequiv);
if (likely(cmp))
return (cmp);
- cmp = AVL_CMP(r2->type == OBJECT_RANGE, r1->type == OBJECT_RANGE);
+ cmp = TREE_CMP(r2->type == OBJECT_RANGE, r1->type == OBJECT_RANGE);
if (unlikely(cmp))
return (cmp);
- cmp = AVL_CMP(r2->type == OBJECT, r1->type == OBJECT);
+ cmp = TREE_CMP(r2->type == OBJECT, r1->type == OBJECT);
if (unlikely(cmp))
return (cmp);
- return (AVL_CMP(r1_l0equiv, r2_l0equiv));
+ return (TREE_CMP(r1_l0equiv, r2_l0equiv));
}
enum q_idx {
* data from the redact_list_thread and use that to determine which blocks
* should be redacted.
*/
-static void
+static __attribute__((noreturn)) void
send_merge_thread(void *arg)
{
struct send_merge_thread_arg *smt_arg = arg;
}
range_free(front_ranges[i]);
}
- if (range == NULL)
- range = kmem_zalloc(sizeof (*range), KM_SLEEP);
range->eos_marker = B_TRUE;
bqueue_enqueue_flush(&smt_arg->q, range, 1);
spl_fstrans_unmark(cookie);
thread_exit();
}
-struct send_prefetch_thread_arg {
+struct send_reader_thread_arg {
struct send_merge_thread_arg *smta;
bqueue_t q;
boolean_t cancel;
- boolean_t issue_prefetches;
+ boolean_t issue_reads;
+ uint64_t featureflags;
int error;
};
+static void
+dmu_send_read_done(zio_t *zio)
+{
+ struct send_range *range = zio->io_private;
+
+ mutex_enter(&range->sru.data.lock);
+ if (zio->io_error != 0) {
+ abd_free(range->sru.data.abd);
+ range->sru.data.abd = NULL;
+ range->sru.data.io_err = zio->io_error;
+ }
+
+ ASSERT(range->sru.data.io_outstanding);
+ range->sru.data.io_outstanding = B_FALSE;
+ cv_broadcast(&range->sru.data.cv);
+ mutex_exit(&range->sru.data.lock);
+}
+
+static void
+issue_data_read(struct send_reader_thread_arg *srta, struct send_range *range)
+{
+ struct srd *srdp = &range->sru.data;
+ blkptr_t *bp = &srdp->bp;
+ objset_t *os = srta->smta->os;
+
+ ASSERT3U(range->type, ==, DATA);
+ ASSERT3U(range->start_blkid + 1, ==, range->end_blkid);
+ /*
+ * If we have large blocks stored on disk but
+ * the send flags don't allow us to send large
+ * blocks, we split the data from the arc buf
+ * into chunks.
+ */
+ boolean_t split_large_blocks =
+ srdp->datablksz > SPA_OLD_MAXBLOCKSIZE &&
+ !(srta->featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS);
+ /*
+ * We should only request compressed data from the ARC if all
+ * the following are true:
+ * - stream compression was requested
+ * - we aren't splitting large blocks into smaller chunks
+ * - the data won't need to be byteswapped before sending
+ * - this isn't an embedded block
+ * - this isn't metadata (if receiving on a different endian
+ * system it can be byteswapped more easily)
+ */
+ boolean_t request_compressed =
+ (srta->featureflags & DMU_BACKUP_FEATURE_COMPRESSED) &&
+ !split_large_blocks && !BP_SHOULD_BYTESWAP(bp) &&
+ !BP_IS_EMBEDDED(bp) && !DMU_OT_IS_METADATA(BP_GET_TYPE(bp));
+
+ zio_flag_t zioflags = ZIO_FLAG_CANFAIL;
+
+ if (srta->featureflags & DMU_BACKUP_FEATURE_RAW) {
+ zioflags |= ZIO_FLAG_RAW;
+ srdp->io_compressed = B_TRUE;
+ } else if (request_compressed) {
+ zioflags |= ZIO_FLAG_RAW_COMPRESS;
+ srdp->io_compressed = B_TRUE;
+ }
+
+ srdp->datasz = (zioflags & ZIO_FLAG_RAW_COMPRESS) ?
+ BP_GET_PSIZE(bp) : BP_GET_LSIZE(bp);
+
+ if (!srta->issue_reads)
+ return;
+ if (BP_IS_REDACTED(bp))
+ return;
+ if (send_do_embed(bp, srta->featureflags))
+ return;
+
+ zbookmark_phys_t zb = {
+ .zb_objset = dmu_objset_id(os),
+ .zb_object = range->object,
+ .zb_level = 0,
+ .zb_blkid = range->start_blkid,
+ };
+
+ arc_flags_t aflags = ARC_FLAG_CACHED_ONLY;
+
+ int arc_err = arc_read(NULL, os->os_spa, bp,
+ arc_getbuf_func, &srdp->abuf, ZIO_PRIORITY_ASYNC_READ,
+ zioflags, &aflags, &zb);
+ /*
+ * If the data is not already cached in the ARC, we read directly
+ * from zio. This avoids the performance overhead of adding a new
+ * entry to the ARC, and we also avoid polluting the ARC cache with
+ * data that is not likely to be used in the future.
+ */
+ if (arc_err != 0) {
+ srdp->abd = abd_alloc_linear(srdp->datasz, B_FALSE);
+ srdp->io_outstanding = B_TRUE;
+ zio_nowait(zio_read(NULL, os->os_spa, bp, srdp->abd,
+ srdp->datasz, dmu_send_read_done, range,
+ ZIO_PRIORITY_ASYNC_READ, zioflags, &zb));
+ }
+}
+
/*
* Create a new record with the given values.
*/
static void
-enqueue_range(struct send_prefetch_thread_arg *spta, bqueue_t *q, dnode_t *dn,
+enqueue_range(struct send_reader_thread_arg *srta, bqueue_t *q, dnode_t *dn,
uint64_t blkid, uint64_t count, const blkptr_t *bp, uint32_t datablksz)
{
enum type range_type = (bp == NULL || BP_IS_HOLE(bp) ? HOLE :
struct send_range *range = range_alloc(range_type, dn->dn_object,
blkid, blkid + count, B_FALSE);
- if (blkid == DMU_SPILL_BLKID)
+ if (blkid == DMU_SPILL_BLKID) {
+ ASSERT3P(bp, !=, NULL);
ASSERT3U(BP_GET_TYPE(bp), ==, DMU_OT_SA);
+ }
switch (range_type) {
case HOLE:
range->sru.data.datablksz = datablksz;
range->sru.data.obj_type = dn->dn_type;
range->sru.data.bp = *bp;
- if (spta->issue_prefetches) {
- zbookmark_phys_t zb = {0};
- zb.zb_objset = dmu_objset_id(dn->dn_objset);
- zb.zb_object = dn->dn_object;
- zb.zb_level = 0;
- zb.zb_blkid = blkid;
- arc_flags_t aflags = ARC_FLAG_NOWAIT |
- ARC_FLAG_PREFETCH;
- (void) arc_read(NULL, dn->dn_objset->os_spa, bp, NULL,
- NULL, ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_CANFAIL |
- ZIO_FLAG_SPECULATIVE, &aflags, &zb);
- }
+ issue_data_read(srta, range);
break;
case REDACT:
range->sru.redact.datablksz = datablksz;
* some indirect blocks can be discarded because they're not holes. Second,
* it issues prefetches for the data we need to send.
*/
-static void
-send_prefetch_thread(void *arg)
+static __attribute__((noreturn)) void
+send_reader_thread(void *arg)
{
- struct send_prefetch_thread_arg *spta = arg;
- struct send_merge_thread_arg *smta = spta->smta;
+ struct send_reader_thread_arg *srta = arg;
+ struct send_merge_thread_arg *smta = srta->smta;
bqueue_t *inq = &smta->q;
- bqueue_t *outq = &spta->q;
+ bqueue_t *outq = &srta->q;
objset_t *os = smta->os;
fstrans_cookie_t cookie = spl_fstrans_mark();
struct send_range *range = bqueue_dequeue(inq);
*/
uint64_t last_obj = UINT64_MAX;
uint64_t last_obj_exists = B_TRUE;
- while (!range->eos_marker && !spta->cancel && smta->error == 0 &&
+ while (!range->eos_marker && !srta->cancel && smta->error == 0 &&
err == 0) {
switch (range->type) {
- case DATA: {
- zbookmark_phys_t zb;
- zb.zb_objset = dmu_objset_id(os);
- zb.zb_object = range->object;
- zb.zb_level = 0;
- zb.zb_blkid = range->start_blkid;
- ASSERT3U(range->start_blkid + 1, ==, range->end_blkid);
- if (!BP_IS_REDACTED(&range->sru.data.bp) &&
- spta->issue_prefetches &&
- !BP_IS_EMBEDDED(&range->sru.data.bp)) {
- arc_flags_t aflags = ARC_FLAG_NOWAIT |
- ARC_FLAG_PREFETCH;
- (void) arc_read(NULL, os->os_spa,
- &range->sru.data.bp, NULL, NULL,
- ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_CANFAIL |
- ZIO_FLAG_SPECULATIVE, &aflags, &zb);
- }
+ case DATA:
+ issue_data_read(srta, range);
bqueue_enqueue(outq, range, range->sru.data.datablksz);
range = get_next_range_nofree(inq, range);
break;
- }
case HOLE:
case OBJECT:
case OBJECT_RANGE:
continue;
}
uint64_t file_max =
- (dn->dn_maxblkid < range->end_blkid ?
- dn->dn_maxblkid : range->end_blkid);
+ MIN(dn->dn_maxblkid, range->end_blkid);
/*
* The object exists, so we need to try to find the
* blkptr for each block in the range we're processing.
datablksz);
uint64_t nblks = (offset / datablksz) -
blkid;
- enqueue_range(spta, outq, dn, blkid,
+ enqueue_range(srta, outq, dn, blkid,
nblks, NULL, datablksz);
blkid += nblks;
}
if (err != 0)
break;
ASSERT(!BP_IS_HOLE(&bp));
- enqueue_range(spta, outq, dn, blkid, 1, &bp,
+ enqueue_range(srta, outq, dn, blkid, 1, &bp,
datablksz);
}
rw_exit(&dn->dn_struct_rwlock);
}
}
}
- if (spta->cancel || err != 0) {
+ if (srta->cancel || err != 0) {
smta->cancel = B_TRUE;
- spta->error = err;
+ srta->error = err;
} else if (smta->error != 0) {
- spta->error = smta->error;
+ srta->error = smta->error;
}
while (!range->eos_marker)
range = get_next_range(inq, range);
struct dmu_send_params {
/* Pool args */
- void *tag; // Tag that dp was held with, will be used to release dp.
+ const void *tag; // Tag dp was held with, will be used to release dp.
dsl_pool_t *dp;
/* To snapshot args */
const char *tosnap;
boolean_t embedok;
boolean_t large_block_ok;
boolean_t compressok;
+ boolean_t rawok;
+ boolean_t savedok;
uint64_t resumeobj;
uint64_t resumeoff;
+ uint64_t saved_guid;
zfs_bookmark_phys_t *redactbook;
/* Stream output params */
dmu_send_outparams_t *dso;
/* Stream progress params */
offset_t *off;
int outfd;
- boolean_t rawok;
+ char saved_toname[MAXNAMELEN];
};
static int
{
dsl_dataset_t *to_ds = dspp->to_ds;
dsl_pool_t *dp = dspp->dp;
-#ifdef _KERNEL
+
if (dmu_objset_type(os) == DMU_OST_ZFS) {
uint64_t version;
if (zfs_get_zplprop(os, ZFS_PROP_VERSION, &version) != 0)
if (version >= ZPL_VERSION_SA)
*featureflags |= DMU_BACKUP_FEATURE_SA_SPILL;
}
-#endif
/* raw sends imply large_block_ok */
if ((dspp->rawok || dspp->large_block_ok) &&
/* raw send implies compressok */
if (dspp->compressok || dspp->rawok)
*featureflags |= DMU_BACKUP_FEATURE_COMPRESSED;
+
if (dspp->rawok && os->os_encrypted)
*featureflags |= DMU_BACKUP_FEATURE_RAW;
*featureflags |= DMU_BACKUP_FEATURE_LZ4;
}
+ /*
+ * We specifically do not include DMU_BACKUP_FEATURE_EMBED_DATA here to
+ * allow sending ZSTD compressed datasets to a receiver that does not
+ * support ZSTD
+ */
+ if ((*featureflags &
+ (DMU_BACKUP_FEATURE_COMPRESSED | DMU_BACKUP_FEATURE_RAW)) != 0 &&
+ dsl_dataset_feature_is_active(to_ds, SPA_FEATURE_ZSTD_COMPRESS)) {
+ *featureflags |= DMU_BACKUP_FEATURE_ZSTD;
+ }
+
if (dspp->resumeobj != 0 || dspp->resumeoff != 0) {
*featureflags |= DMU_BACKUP_FEATURE_RESUMING;
}
drrb->drr_flags |= DRR_FLAG_FREERECORDS;
drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_SPILL_BLOCK;
- dsl_dataset_name(to_ds, drrb->drr_toname);
- if (!to_ds->ds_is_snapshot) {
- (void) strlcat(drrb->drr_toname, "@--head--",
+ if (dspp->savedok) {
+ drrb->drr_toguid = dspp->saved_guid;
+ strlcpy(drrb->drr_toname, dspp->saved_toname,
sizeof (drrb->drr_toname));
+ } else {
+ dsl_dataset_name(to_ds, drrb->drr_toname);
+ if (!to_ds->ds_is_snapshot) {
+ (void) strlcat(drrb->drr_toname, "@--head--",
+ sizeof (drrb->drr_toname));
+ }
}
return (drr);
}
static void
-setup_to_thread(struct send_thread_arg *to_arg, dsl_dataset_t *to_ds,
+setup_to_thread(struct send_thread_arg *to_arg, objset_t *to_os,
dmu_sendstatus_t *dssp, uint64_t fromtxg, boolean_t rawok)
{
VERIFY0(bqueue_init(&to_arg->q, zfs_send_no_prefetch_queue_ff,
offsetof(struct send_range, ln)));
to_arg->error_code = 0;
to_arg->cancel = B_FALSE;
- to_arg->ds = to_ds;
+ to_arg->os = to_os;
to_arg->fromtxg = fromtxg;
to_arg->flags = TRAVERSE_PRE | TRAVERSE_PREFETCH_METADATA;
if (rawok)
to_arg->flags |= TRAVERSE_NO_DECRYPT;
- to_arg->redaction_list = NULL;
+ if (zfs_send_corrupt_data)
+ to_arg->flags |= TRAVERSE_HARD;
to_arg->num_blocks_visited = &dssp->dss_blocks;
(void) thread_create(NULL, 0, send_traverse_thread, to_arg, 0,
curproc, TS_RUN, minclsyspri);
}
static void
-setup_prefetch_thread(struct send_prefetch_thread_arg *spt_arg,
- struct dmu_send_params *dspp, struct send_merge_thread_arg *smt_arg)
+setup_reader_thread(struct send_reader_thread_arg *srt_arg,
+ struct dmu_send_params *dspp, struct send_merge_thread_arg *smt_arg,
+ uint64_t featureflags)
{
- VERIFY0(bqueue_init(&spt_arg->q, zfs_send_queue_ff,
+ VERIFY0(bqueue_init(&srt_arg->q, zfs_send_queue_ff,
MAX(zfs_send_queue_length, 2 * zfs_max_recordsize),
offsetof(struct send_range, ln)));
- spt_arg->smta = smt_arg;
- spt_arg->issue_prefetches = !dspp->dso->dso_dryrun;
- (void) thread_create(NULL, 0, send_prefetch_thread, spt_arg, 0,
+ srt_arg->smta = smt_arg;
+ srt_arg->issue_reads = !dspp->dso->dso_dryrun;
+ srt_arg->featureflags = featureflags;
+ (void) thread_create(NULL, 0, send_reader_thread, srt_arg, 0,
curproc, TS_RUN, minclsyspri);
}
struct send_merge_thread_arg *smt_arg, boolean_t resuming, objset_t *os,
redaction_list_t *redact_rl, nvlist_t *nvl)
{
+ (void) smt_arg;
dsl_dataset_t *to_ds = dspp->to_ds;
int err = 0;
* If we're resuming a redacted send, we can skip to the appropriate
* point in the redaction bookmark by binary searching through it.
*/
- smt_arg->bookmark_before = B_FALSE;
if (redact_rl != NULL) {
SET_BOOKMARK(&rlt_arg->resume, to_ds->ds_object, obj, 0, blkid);
}
*
* The final case is a simple zfs full or incremental send. The to_ds traversal
* thread behaves the same as always. The redact list thread is never started.
- * The send merge thread takes all the blocks that the to_ds traveral thread
+ * The send merge thread takes all the blocks that the to_ds traversal thread
* sends it, prefetches the data, and sends the blocks on to the main thread.
* The main thread sends the data over the wire.
*
struct send_thread_arg *to_arg;
struct redact_list_thread_arg *rlt_arg;
struct send_merge_thread_arg *smt_arg;
- struct send_prefetch_thread_arg *spt_arg;
+ struct send_reader_thread_arg *srt_arg;
struct send_range *range;
redaction_list_t *from_rl = NULL;
redaction_list_t *redact_rl = NULL;
dsl_dataset_t *to_ds = dspp->to_ds;
zfs_bookmark_phys_t *ancestor_zb = &dspp->ancestor_zb;
dsl_pool_t *dp = dspp->dp;
- void *tag = dspp->tag;
+ const void *tag = dspp->tag;
err = dmu_objset_from_ds(to_ds, &os);
if (err != 0) {
dsl_pool_rele(dp, tag);
return (err);
}
+
/*
* If this is a non-raw send of an encrypted ds, we can ensure that
* the objset_phys_t is authenticated. This is safe because this is
return (err);
}
- from_arg = kmem_zalloc(sizeof (*from_arg), KM_SLEEP);
- to_arg = kmem_zalloc(sizeof (*to_arg), KM_SLEEP);
- rlt_arg = kmem_zalloc(sizeof (*rlt_arg), KM_SLEEP);
- smt_arg = kmem_zalloc(sizeof (*smt_arg), KM_SLEEP);
- spt_arg = kmem_zalloc(sizeof (*spt_arg), KM_SLEEP);
-
/*
* If we're doing a redacted send, hold the bookmark's redaction list.
*/
dsl_dataset_long_hold(to_ds, FTAG);
+ from_arg = kmem_zalloc(sizeof (*from_arg), KM_SLEEP);
+ to_arg = kmem_zalloc(sizeof (*to_arg), KM_SLEEP);
+ rlt_arg = kmem_zalloc(sizeof (*rlt_arg), KM_SLEEP);
+ smt_arg = kmem_zalloc(sizeof (*smt_arg), KM_SLEEP);
+ srt_arg = kmem_zalloc(sizeof (*srt_arg), KM_SLEEP);
+
drr = create_begin_record(dspp, os, featureflags);
dssp = setup_send_progress(dspp);
}
if (featureflags & DMU_BACKUP_FEATURE_RAW) {
- uint64_t ivset_guid = (ancestor_zb != NULL) ?
- ancestor_zb->zbm_ivset_guid : 0;
+ uint64_t ivset_guid = ancestor_zb->zbm_ivset_guid;
nvlist_t *keynvl = NULL;
ASSERT(os->os_encrypted);
- err = dsl_crypto_populate_key_nvlist(to_ds, ivset_guid,
+ err = dsl_crypto_populate_key_nvlist(os, ivset_guid,
&keynvl);
if (err != 0) {
fnvlist_free(nvl);
goto out;
}
- setup_to_thread(to_arg, to_ds, dssp, fromtxg, dspp->rawok);
+ setup_to_thread(to_arg, os, dssp, fromtxg, dspp->rawok);
setup_from_thread(from_arg, from_rl, dssp);
setup_redact_list_thread(rlt_arg, dspp, redact_rl, dssp);
setup_merge_thread(smt_arg, dspp, from_arg, to_arg, rlt_arg, os);
- setup_prefetch_thread(spt_arg, dspp, smt_arg);
+ setup_reader_thread(srt_arg, dspp, smt_arg, featureflags);
- range = bqueue_dequeue(&spt_arg->q);
+ range = bqueue_dequeue(&srt_arg->q);
while (err == 0 && !range->eos_marker) {
err = do_dump(&dsc, range);
- range = get_next_range(&spt_arg->q, range);
+ range = get_next_range(&srt_arg->q, range);
if (issig(JUSTLOOKING) && issig(FORREAL))
- err = EINTR;
+ err = SET_ERROR(EINTR);
}
/*
* pending records before exiting.
*/
if (err != 0) {
- spt_arg->cancel = B_TRUE;
+ srt_arg->cancel = B_TRUE;
while (!range->eos_marker) {
- range = get_next_range(&spt_arg->q, range);
+ range = get_next_range(&srt_arg->q, range);
}
}
range_free(range);
- bqueue_destroy(&spt_arg->q);
+ bqueue_destroy(&srt_arg->q);
bqueue_destroy(&smt_arg->q);
if (dspp->redactbook != NULL)
bqueue_destroy(&rlt_arg->q);
bqueue_destroy(&to_arg->q);
bqueue_destroy(&from_arg->q);
- if (err == 0 && spt_arg->error != 0)
- err = spt_arg->error;
+ if (err == 0 && srt_arg->error != 0)
+ err = srt_arg->error;
if (err != 0)
goto out;
goto out;
}
- bzero(drr, sizeof (dmu_replay_record_t));
- drr->drr_type = DRR_END;
- drr->drr_u.drr_end.drr_checksum = dsc.dsc_zc;
- drr->drr_u.drr_end.drr_toguid = dsc.dsc_toguid;
+ /*
+ * Send the DRR_END record if this is not a saved stream.
+ * Otherwise, the omitted DRR_END record will signal to
+ * the receive side that the stream is incomplete.
+ */
+ if (!dspp->savedok) {
+ memset(drr, 0, sizeof (dmu_replay_record_t));
+ drr->drr_type = DRR_END;
+ drr->drr_u.drr_end.drr_checksum = dsc.dsc_zc;
+ drr->drr_u.drr_end.drr_toguid = dsc.dsc_toguid;
- if (dump_record(&dsc, NULL, 0) != 0)
- err = dsc.dsc_err;
+ if (dump_record(&dsc, NULL, 0) != 0)
+ err = dsc.dsc_err;
+ }
out:
mutex_enter(&to_ds->ds_sendstream_lock);
list_remove(&to_ds->ds_sendstreams, dssp);
mutex_exit(&to_ds->ds_sendstream_lock);
- VERIFY(err != 0 || (dsc.dsc_sent_begin && dsc.dsc_sent_end));
+ VERIFY(err != 0 || (dsc.dsc_sent_begin &&
+ (dsc.dsc_sent_end || dspp->savedok)));
kmem_free(drr, sizeof (dmu_replay_record_t));
kmem_free(dssp, sizeof (dmu_sendstatus_t));
kmem_free(to_arg, sizeof (*to_arg));
kmem_free(rlt_arg, sizeof (*rlt_arg));
kmem_free(smt_arg, sizeof (*smt_arg));
- kmem_free(spt_arg, sizeof (*spt_arg));
+ kmem_free(srt_arg, sizeof (*srt_arg));
dsl_dataset_long_rele(to_ds, FTAG);
if (from_rl != NULL) {
int
dmu_send_obj(const char *pool, uint64_t tosnap, uint64_t fromsnap,
boolean_t embedok, boolean_t large_block_ok, boolean_t compressok,
- boolean_t rawok, int outfd, offset_t *off, dmu_send_outparams_t *dsop)
+ boolean_t rawok, boolean_t savedok, int outfd, offset_t *off,
+ dmu_send_outparams_t *dsop)
{
int err;
dsl_dataset_t *fromds;
- ds_hold_flags_t dsflags = (rawok) ? 0 : DS_HOLD_FLAG_DECRYPT;
+ ds_hold_flags_t dsflags;
struct dmu_send_params dspp = {0};
dspp.embedok = embedok;
dspp.large_block_ok = large_block_ok;
dspp.dso = dsop;
dspp.tag = FTAG;
dspp.rawok = rawok;
+ dspp.savedok = savedok;
+ dsflags = (rawok) ? DS_HOLD_FLAG_NONE : DS_HOLD_FLAG_DECRYPT;
err = dsl_pool_hold(pool, FTAG, &dspp.dp);
if (err != 0)
return (err);
uint64_t size = dspp.numfromredactsnaps *
sizeof (uint64_t);
dspp.fromredactsnaps = kmem_zalloc(size, KM_SLEEP);
- bcopy(fromredact, dspp.fromredactsnaps, size);
+ memcpy(dspp.fromredactsnaps, fromredact, size);
}
- if (!dsl_dataset_is_before(dspp.to_ds, fromds, 0)) {
+ boolean_t is_before =
+ dsl_dataset_is_before(dspp.to_ds, fromds, 0);
+ dspp.is_clone = (dspp.to_ds->ds_dir !=
+ fromds->ds_dir);
+ dsl_dataset_rele(fromds, FTAG);
+ if (!is_before) {
+ dsl_pool_rele(dspp.dp, FTAG);
err = SET_ERROR(EXDEV);
} else {
- dspp.is_clone = (dspp.to_ds->ds_dir !=
- fromds->ds_dir);
- dsl_dataset_rele(fromds, FTAG);
err = dmu_send_impl(&dspp);
}
} else {
dspp.numfromredactsnaps = NUM_SNAPS_NOT_REDACTED;
err = dmu_send_impl(&dspp);
}
+ if (dspp.fromredactsnaps)
+ kmem_free(dspp.fromredactsnaps,
+ dspp.numfromredactsnaps * sizeof (uint64_t));
+
dsl_dataset_rele(dspp.to_ds, FTAG);
return (err);
}
int
dmu_send(const char *tosnap, const char *fromsnap, boolean_t embedok,
boolean_t large_block_ok, boolean_t compressok, boolean_t rawok,
- uint64_t resumeobj, uint64_t resumeoff, const char *redactbook, int outfd,
- offset_t *off, dmu_send_outparams_t *dsop)
+ boolean_t savedok, uint64_t resumeobj, uint64_t resumeoff,
+ const char *redactbook, int outfd, offset_t *off,
+ dmu_send_outparams_t *dsop)
{
int err = 0;
- ds_hold_flags_t dsflags = (rawok) ? 0 : DS_HOLD_FLAG_DECRYPT;
+ ds_hold_flags_t dsflags;
boolean_t owned = B_FALSE;
dsl_dataset_t *fromds = NULL;
zfs_bookmark_phys_t book = {0};
struct dmu_send_params dspp = {0};
+
+ dsflags = (rawok) ? DS_HOLD_FLAG_NONE : DS_HOLD_FLAG_DECRYPT;
dspp.tosnap = tosnap;
dspp.embedok = embedok;
dspp.large_block_ok = large_block_ok;
dspp.resumeobj = resumeobj;
dspp.resumeoff = resumeoff;
dspp.rawok = rawok;
+ dspp.savedok = savedok;
if (fromsnap != NULL && strpbrk(fromsnap, "@#") == NULL)
return (SET_ERROR(EINVAL));
err = dsl_pool_hold(tosnap, FTAG, &dspp.dp);
if (err != 0)
return (err);
+
if (strchr(tosnap, '@') == NULL && spa_writeable(dspp.dp->dp_spa)) {
/*
* We are sending a filesystem or volume. Ensure
* that it doesn't change by owning the dataset.
*/
- err = dsl_dataset_own(dspp.dp, tosnap, dsflags, FTAG,
- &dspp.to_ds);
- owned = B_TRUE;
+
+ if (savedok) {
+ /*
+ * We are looking for the dataset that represents the
+ * partially received send stream. If this stream was
+ * received as a new snapshot of an existing dataset,
+ * this will be saved in a hidden clone named
+ * "<pool>/<dataset>/%recv". Otherwise, the stream
+ * will be saved in the live dataset itself. In
+ * either case we need to use dsl_dataset_own_force()
+ * because the stream is marked as inconsistent,
+ * which would normally make it unavailable to be
+ * owned.
+ */
+ char *name = kmem_asprintf("%s/%s", tosnap,
+ recv_clone_name);
+ err = dsl_dataset_own_force(dspp.dp, name, dsflags,
+ FTAG, &dspp.to_ds);
+ if (err == ENOENT) {
+ err = dsl_dataset_own_force(dspp.dp, tosnap,
+ dsflags, FTAG, &dspp.to_ds);
+ }
+
+ if (err == 0) {
+ owned = B_TRUE;
+ err = zap_lookup(dspp.dp->dp_meta_objset,
+ dspp.to_ds->ds_object,
+ DS_FIELD_RESUME_TOGUID, 8, 1,
+ &dspp.saved_guid);
+ }
+
+ if (err == 0) {
+ err = zap_lookup(dspp.dp->dp_meta_objset,
+ dspp.to_ds->ds_object,
+ DS_FIELD_RESUME_TONAME, 1,
+ sizeof (dspp.saved_toname),
+ dspp.saved_toname);
+ }
+ /* Only disown if there was an error in the lookups */
+ if (owned && (err != 0))
+ dsl_dataset_disown(dspp.to_ds, dsflags, FTAG);
+
+ kmem_strfree(name);
+ } else {
+ err = dsl_dataset_own(dspp.dp, tosnap, dsflags,
+ FTAG, &dspp.to_ds);
+ if (err == 0)
+ owned = B_TRUE;
+ }
} else {
err = dsl_dataset_hold_flags(dspp.dp, tosnap, dsflags, FTAG,
&dspp.to_ds);
}
if (err != 0) {
+ /* Note: dsl dataset is not owned at this point */
dsl_pool_rele(dspp.dp, FTAG);
return (err);
}
sizeof (uint64_t);
dspp.fromredactsnaps = kmem_zalloc(size,
KM_SLEEP);
- bcopy(fromredact, dspp.fromredactsnaps,
+ memcpy(dspp.fromredactsnaps, fromredact,
size);
}
if (!dsl_dataset_is_before(dspp.to_ds, fromds,
0)) {
err = SET_ERROR(EXDEV);
} else {
- ASSERT3U(dspp.is_clone, ==,
- (dspp.to_ds->ds_dir !=
- fromds->ds_dir));
zb->zbm_creation_txg =
dsl_dataset_phys(fromds)->
ds_creation_txg;
/* dmu_send_impl will call dsl_pool_rele for us. */
err = dmu_send_impl(&dspp);
} else {
+ if (dspp.fromredactsnaps)
+ kmem_free(dspp.fromredactsnaps,
+ dspp.numfromredactsnaps *
+ sizeof (uint64_t));
dsl_pool_rele(dspp.dp, FTAG);
}
} else {
}
int
-dmu_send_estimate_fast(dsl_dataset_t *ds, dsl_dataset_t *fromds,
- zfs_bookmark_phys_t *frombook, boolean_t stream_compressed, uint64_t *sizep)
+dmu_send_estimate_fast(dsl_dataset_t *origds, dsl_dataset_t *fromds,
+ zfs_bookmark_phys_t *frombook, boolean_t stream_compressed,
+ boolean_t saved, uint64_t *sizep)
{
int err;
+ dsl_dataset_t *ds = origds;
uint64_t uncomp, comp;
- ASSERT(dsl_pool_config_held(ds->ds_dir->dd_pool));
+ ASSERT(dsl_pool_config_held(origds->ds_dir->dd_pool));
ASSERT(fromds == NULL || frombook == NULL);
- /* tosnap must be a snapshot */
- if (!ds->ds_is_snapshot)
+ /*
+ * If this is a saved send we may actually be sending
+ * from the %recv clone used for resuming.
+ */
+ if (saved) {
+ objset_t *mos = origds->ds_dir->dd_pool->dp_meta_objset;
+ uint64_t guid;
+ char dsname[ZFS_MAX_DATASET_NAME_LEN + 6];
+
+ dsl_dataset_name(origds, dsname);
+ (void) strcat(dsname, "/");
+ (void) strlcat(dsname, recv_clone_name, sizeof (dsname));
+
+ err = dsl_dataset_hold(origds->ds_dir->dd_pool,
+ dsname, FTAG, &ds);
+ if (err != ENOENT && err != 0) {
+ return (err);
+ } else if (err == ENOENT) {
+ ds = origds;
+ }
+
+ /* check that this dataset has partially received data */
+ err = zap_lookup(mos, ds->ds_object,
+ DS_FIELD_RESUME_TOGUID, 8, 1, &guid);
+ if (err != 0) {
+ err = SET_ERROR(err == ENOENT ? EINVAL : err);
+ goto out;
+ }
+
+ err = zap_lookup(mos, ds->ds_object,
+ DS_FIELD_RESUME_TONAME, 1, sizeof (dsname), dsname);
+ if (err != 0) {
+ err = SET_ERROR(err == ENOENT ? EINVAL : err);
+ goto out;
+ }
+ }
+
+ /* tosnap must be a snapshot or the target of a saved send */
+ if (!ds->ds_is_snapshot && ds == origds)
return (SET_ERROR(EINVAL));
if (fromds != NULL) {
uint64_t used;
- if (!fromds->ds_is_snapshot)
- return (SET_ERROR(EINVAL));
+ if (!fromds->ds_is_snapshot) {
+ err = SET_ERROR(EINVAL);
+ goto out;
+ }
- if (!dsl_dataset_is_before(ds, fromds, 0))
- return (SET_ERROR(EXDEV));
+ if (!dsl_dataset_is_before(ds, fromds, 0)) {
+ err = SET_ERROR(EXDEV);
+ goto out;
+ }
err = dsl_dataset_space_written(fromds, ds, &used, &comp,
&uncomp);
if (err != 0)
- return (err);
+ goto out;
} else if (frombook != NULL) {
uint64_t used;
err = dsl_dataset_space_written_bookmark(frombook, ds, &used,
&comp, &uncomp);
if (err != 0)
- return (err);
+ goto out;
} else {
uncomp = dsl_dataset_phys(ds)->ds_uncompressed_bytes;
comp = dsl_dataset_phys(ds)->ds_compressed_bytes;
* Add the size of the BEGIN and END records to the estimate.
*/
*sizep += 2 * sizeof (dmu_replay_record_t);
+
+out:
+ if (ds != origds)
+ dsl_dataset_rele(ds, FTAG);
return (err);
}
-#if defined(_KERNEL)
-module_param(zfs_send_corrupt_data, int, 0644);
-MODULE_PARM_DESC(zfs_send_corrupt_data, "Allow sending corrupt data");
+ZFS_MODULE_PARAM(zfs_send, zfs_send_, corrupt_data, INT, ZMOD_RW,
+ "Allow sending corrupt data");
-module_param(zfs_send_queue_length, int, 0644);
-MODULE_PARM_DESC(zfs_send_queue_length, "Maximum send queue length");
+ZFS_MODULE_PARAM(zfs_send, zfs_send_, queue_length, UINT, ZMOD_RW,
+ "Maximum send queue length");
-module_param(zfs_send_unmodified_spill_blocks, int, 0644);
-MODULE_PARM_DESC(zfs_send_unmodified_spill_blocks,
+ZFS_MODULE_PARAM(zfs_send, zfs_send_, unmodified_spill_blocks, INT, ZMOD_RW,
"Send unmodified spill blocks");
-module_param(zfs_send_no_prefetch_queue_length, int, 0644);
-MODULE_PARM_DESC(zfs_send_no_prefetch_queue_length,
+ZFS_MODULE_PARAM(zfs_send, zfs_send_, no_prefetch_queue_length, UINT, ZMOD_RW,
"Maximum send queue length for non-prefetch queues");
-module_param(zfs_send_queue_ff, int, 0644);
-MODULE_PARM_DESC(zfs_send_queue_ff, "Send queue fill fraction");
+ZFS_MODULE_PARAM(zfs_send, zfs_send_, queue_ff, UINT, ZMOD_RW,
+ "Send queue fill fraction");
-module_param(zfs_send_no_prefetch_queue_ff, int, 0644);
-MODULE_PARM_DESC(zfs_send_no_prefetch_queue_ff,
+ZFS_MODULE_PARAM(zfs_send, zfs_send_, no_prefetch_queue_ff, UINT, ZMOD_RW,
"Send queue fill fraction for non-prefetch queues");
-module_param(zfs_override_estimate_recordsize, int, 0644);
-MODULE_PARM_DESC(zfs_override_estimate_recordsize,
+ZFS_MODULE_PARAM(zfs_send, zfs_, override_estimate_recordsize, UINT, ZMOD_RW,
"Override block size estimate with fixed size");
-#endif