*/
/*
* Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
- * Copyright (c) 2013 by Delphix. All rights reserved.
+ * Copyright (c) 2011, 2017 by Delphix. All rights reserved.
+ * Copyright (c) 2014 Integros [integros.com]
*/
/* Portions Copyright 2010 Robert Milkowski */
#include <sys/dmu_tx.h>
#include <sys/dsl_pool.h>
#include <sys/metaslab.h>
+#include <sys/trace_zil.h>
+#include <sys/abd.h>
/*
* The zfs intent log (ZIL) saves transaction records of system calls
*/
int zfs_nocacheflush = 0;
+/*
+ * Limit SLOG write size per commit executed with synchronous priority.
+ * Any writes above that will be executed with lower (asynchronous) priority
+ * to limit potential SLOG device abuse by single active ZIL writer.
+ */
+unsigned long zil_slog_bulk = 768 * 1024;
+
static kmem_cache_t *zil_lwb_cache;
static void zil_async_to_sync(zilog_t *zilog, uint64_t foid);
#define LWB_EMPTY(lwb) ((BP_GET_LSIZE(&lwb->lwb_blk) - \
sizeof (zil_chain_t)) == (lwb->lwb_sz - lwb->lwb_nused))
-
-/*
- * ziltest is by and large an ugly hack, but very useful in
- * checking replay without tedious work.
- * When running ziltest we want to keep all itx's and so maintain
- * a single list in the zl_itxg[] that uses a high txg: ZILTEST_TXG
- * We subtract TXG_CONCURRENT_STATES to allow for common code.
- */
-#define ZILTEST_TXG (UINT64_MAX - TXG_CONCURRENT_STATES)
-
static int
zil_bp_compare(const void *x1, const void *x2)
{
const dva_t *dva1 = &((zil_bp_node_t *)x1)->zn_dva;
const dva_t *dva2 = &((zil_bp_node_t *)x2)->zn_dva;
- if (DVA_GET_VDEV(dva1) < DVA_GET_VDEV(dva2))
- return (-1);
- if (DVA_GET_VDEV(dva1) > DVA_GET_VDEV(dva2))
- return (1);
-
- if (DVA_GET_OFFSET(dva1) < DVA_GET_OFFSET(dva2))
- return (-1);
- if (DVA_GET_OFFSET(dva1) > DVA_GET_OFFSET(dva2))
- return (1);
+ int cmp = AVL_CMP(DVA_GET_VDEV(dva1), DVA_GET_VDEV(dva2));
+ if (likely(cmp))
+ return (cmp);
- return (0);
+ return (AVL_CMP(DVA_GET_OFFSET(dva1), DVA_GET_OFFSET(dva2)));
}
static void
zil_bp_tree_add(zilog_t *zilog, const blkptr_t *bp)
{
avl_tree_t *t = &zilog->zl_bp_tree;
- const dva_t *dva = BP_IDENTITY(bp);
+ const dva_t *dva;
zil_bp_node_t *zn;
avl_index_t where;
+ if (BP_IS_EMBEDDED(bp))
+ return (0);
+
+ dva = BP_IDENTITY(bp);
+
if (avl_find(t, dva, &where) != NULL)
return (SET_ERROR(EEXIST));
- zn = kmem_alloc(sizeof (zil_bp_node_t), KM_PUSHPAGE);
+ zn = kmem_alloc(sizeof (zil_bp_node_t), KM_SLEEP);
zn->zn_dva = *dva;
avl_insert(t, zn, where);
* Read a log block and make sure it's valid.
*/
static int
-zil_read_log_block(zilog_t *zilog, const blkptr_t *bp, blkptr_t *nbp, void *dst,
- char **end)
+zil_read_log_block(zilog_t *zilog, boolean_t decrypt, const blkptr_t *bp,
+ blkptr_t *nbp, void *dst, char **end)
{
enum zio_flag zio_flags = ZIO_FLAG_CANFAIL;
- uint32_t aflags = ARC_WAIT;
+ arc_flags_t aflags = ARC_FLAG_WAIT;
arc_buf_t *abuf = NULL;
- zbookmark_t zb;
+ zbookmark_phys_t zb;
int error;
if (zilog->zl_header->zh_claim_txg == 0)
if (!(zilog->zl_header->zh_flags & ZIL_CLAIM_LR_SEQ_VALID))
zio_flags |= ZIO_FLAG_SPECULATIVE;
+ if (!decrypt)
+ zio_flags |= ZIO_FLAG_RAW;
+
SET_BOOKMARK(&zb, bp->blk_cksum.zc_word[ZIL_ZC_OBJSET],
ZB_ZIL_OBJECT, ZB_ZIL_LEVEL, bp->blk_cksum.zc_word[ZIL_ZC_SEQ]);
- error = arc_read(NULL, zilog->zl_spa, bp, arc_getbuf_func, &abuf,
- ZIO_PRIORITY_SYNC_READ, zio_flags, &aflags, &zb);
+ error = arc_read(NULL, zilog->zl_spa, bp, arc_getbuf_func,
+ &abuf, ZIO_PRIORITY_SYNC_READ, zio_flags, &aflags, &zb);
if (error == 0) {
zio_cksum_t cksum = bp->blk_cksum;
sizeof (cksum)) || BP_IS_HOLE(&zilc->zc_next_blk)) {
error = SET_ERROR(ECKSUM);
} else {
+ ASSERT3U(len, <=, SPA_OLD_MAXBLOCKSIZE);
bcopy(lr, dst, len);
*end = (char *)dst + len;
*nbp = zilc->zc_next_blk;
(zilc->zc_nused > (size - sizeof (*zilc)))) {
error = SET_ERROR(ECKSUM);
} else {
+ ASSERT3U(zilc->zc_nused, <=,
+ SPA_OLD_MAXBLOCKSIZE);
bcopy(lr, dst, zilc->zc_nused);
*end = (char *)dst + zilc->zc_nused;
*nbp = zilc->zc_next_blk;
}
}
- VERIFY(arc_buf_remove_ref(abuf, &abuf));
+ arc_buf_destroy(abuf, &abuf);
}
return (error);
{
enum zio_flag zio_flags = ZIO_FLAG_CANFAIL;
const blkptr_t *bp = &lr->lr_blkptr;
- uint32_t aflags = ARC_WAIT;
+ arc_flags_t aflags = ARC_FLAG_WAIT;
arc_buf_t *abuf = NULL;
- zbookmark_t zb;
+ zbookmark_phys_t zb;
int error;
if (BP_IS_HOLE(bp)) {
if (zilog->zl_header->zh_claim_txg == 0)
zio_flags |= ZIO_FLAG_SPECULATIVE | ZIO_FLAG_SCRUB;
+ /*
+ * If we are not using the resulting data, we are just checking that
+ * it hasn't been corrupted so we don't need to waste CPU time
+ * decompressing and decrypting it.
+ */
+ if (wbuf == NULL)
+ zio_flags |= ZIO_FLAG_RAW;
+
SET_BOOKMARK(&zb, dmu_objset_id(zilog->zl_os), lr->lr_foid,
ZB_ZIL_LEVEL, lr->lr_offset / BP_GET_LSIZE(bp));
if (error == 0) {
if (wbuf != NULL)
bcopy(abuf->b_data, wbuf, arc_buf_size(abuf));
- (void) arc_buf_remove_ref(abuf, &abuf);
+ arc_buf_destroy(abuf, &abuf);
}
return (error);
*/
int
zil_parse(zilog_t *zilog, zil_parse_blk_func_t *parse_blk_func,
- zil_parse_lr_func_t *parse_lr_func, void *arg, uint64_t txg)
+ zil_parse_lr_func_t *parse_lr_func, void *arg, uint64_t txg,
+ boolean_t decrypt)
{
const zil_header_t *zh = zilog->zl_header;
boolean_t claimed = !!zh->zh_claim_txg;
* If the log has been claimed, stop if we encounter a sequence
* number greater than the highest claimed sequence number.
*/
- lrbuf = zio_buf_alloc(SPA_MAXBLOCKSIZE);
+ lrbuf = zio_buf_alloc(SPA_OLD_MAXBLOCKSIZE);
zil_bp_tree_init(zilog);
for (blk = zh->zh_log; !BP_IS_HOLE(&blk); blk = next_blk) {
if (blk_seq > claim_blk_seq)
break;
- if ((error = parse_blk_func(zilog, &blk, arg, txg)) != 0)
+
+ error = parse_blk_func(zilog, &blk, arg, txg);
+ if (error != 0)
break;
ASSERT3U(max_blk_seq, <, blk_seq);
max_blk_seq = blk_seq;
if (max_lr_seq == claim_lr_seq && max_blk_seq == claim_blk_seq)
break;
- error = zil_read_log_block(zilog, &blk, &next_blk, lrbuf, &end);
+ error = zil_read_log_block(zilog, decrypt, &blk, &next_blk,
+ lrbuf, &end);
if (error != 0)
break;
ASSERT3U(reclen, >=, sizeof (lr_t));
if (lr->lrc_seq > claim_lr_seq)
goto done;
- if ((error = parse_lr_func(zilog, lr, arg, txg)) != 0)
+
+ error = parse_lr_func(zilog, lr, arg, txg);
+ if (error != 0)
goto done;
ASSERT3U(max_lr_seq, <, lr->lrc_seq);
max_lr_seq = lr->lrc_seq;
zilog->zl_parse_lr_count = lr_count;
ASSERT(!claimed || !(zh->zh_flags & ZIL_CLAIM_LR_SEQ_VALID) ||
- (max_blk_seq == claim_blk_seq && max_lr_seq == claim_lr_seq));
+ (max_blk_seq == claim_blk_seq && max_lr_seq == claim_lr_seq) ||
+ (decrypt && error == EIO));
zil_bp_tree_fini(zilog);
- zio_buf_free(lrbuf, SPA_MAXBLOCKSIZE);
+ zio_buf_free(lrbuf, SPA_OLD_MAXBLOCKSIZE);
return (error);
}
* Claim log block if not already committed and not already claimed.
* If tx == NULL, just verify that the block is claimable.
*/
- if (bp->blk_birth < first_txg || zil_bp_tree_add(zilog, bp) != 0)
+ if (BP_IS_HOLE(bp) || bp->blk_birth < first_txg ||
+ zil_bp_tree_add(zilog, bp) != 0)
return (0);
return (zio_wait(zio_claim(NULL, zilog->zl_spa,
* waited for all writes to be stable first), so it is semantically
* correct to declare this the end of the log.
*/
- if (lr->lr_blkptr.blk_birth >= first_txg &&
- (error = zil_read_log_data(zilog, lr, NULL)) != 0)
- return (error);
+ if (lr->lr_blkptr.blk_birth >= first_txg) {
+ error = zil_read_log_data(zilog, lr, NULL);
+ if (error != 0)
+ return (error);
+ }
+
return (zil_claim_log_block(zilog, &lr->lr_blkptr, tx, first_txg));
}
* If we previously claimed it, we need to free it.
*/
if (claim_txg != 0 && lrc->lrc_txtype == TX_WRITE &&
- bp->blk_birth >= claim_txg && zil_bp_tree_add(zilog, bp) == 0)
+ bp->blk_birth >= claim_txg && zil_bp_tree_add(zilog, bp) == 0 &&
+ !BP_IS_HOLE(bp))
zio_free(zilog->zl_spa, dmu_tx_get_txg(tx), bp);
return (0);
}
static lwb_t *
-zil_alloc_lwb(zilog_t *zilog, blkptr_t *bp, uint64_t txg, boolean_t fastwrite)
+zil_alloc_lwb(zilog_t *zilog, blkptr_t *bp, boolean_t slog, uint64_t txg,
+ boolean_t fastwrite)
{
lwb_t *lwb;
- lwb = kmem_cache_alloc(zil_lwb_cache, KM_PUSHPAGE);
+ lwb = kmem_cache_alloc(zil_lwb_cache, KM_SLEEP);
lwb->lwb_zilog = zilog;
lwb->lwb_blk = *bp;
lwb->lwb_fastwrite = fastwrite;
+ lwb->lwb_slog = slog;
lwb->lwb_buf = zio_buf_alloc(BP_GET_LSIZE(bp));
lwb->lwb_max_txg = txg;
lwb->lwb_zio = NULL;
dsl_pool_t *dp = zilog->zl_dmu_pool;
dsl_dataset_t *ds = dmu_objset_ds(zilog->zl_os);
- if (dsl_dataset_is_snapshot(ds))
+ if (ds->ds_is_snapshot)
panic("dirtying snapshot!");
if (txg_list_add(&dp->dp_dirty_zilogs, zilog, txg)) {
}
}
+/*
+ * Determine if the zil is dirty in the specified txg. Callers wanting to
+ * ensure that the dirty state does not change must hold the itxg_lock for
+ * the specified txg. Holding the lock will ensure that the zil cannot be
+ * dirtied (zil_itx_assign) or cleaned (zil_clean) while we check its current
+ * state.
+ */
+boolean_t
+zilog_is_dirty_in_txg(zilog_t *zilog, uint64_t txg)
+{
+ dsl_pool_t *dp = zilog->zl_dmu_pool;
+
+ if (txg_list_member(&dp->dp_dirty_zilogs, zilog, txg & TXG_MASK))
+ return (B_TRUE);
+ return (B_FALSE);
+}
+
+/*
+ * Determine if the zil is dirty. The zil is considered dirty if it has
+ * any pending itx records that have not been cleaned by zil_clean().
+ */
boolean_t
zilog_is_dirty(zilog_t *zilog)
{
blkptr_t blk;
int error = 0;
boolean_t fastwrite = FALSE;
+ boolean_t slog = FALSE;
/*
* Wait for any previous destroy to complete.
/*
* Allocate an initial log block if:
* - there isn't one already
- * - the existing block is the wrong endianess
+ * - the existing block is the wrong endianness
*/
if (BP_IS_HOLE(&blk) || BP_SHOULD_BYTESWAP(&blk)) {
tx = dmu_tx_create(zilog->zl_os);
BP_ZERO(&blk);
}
- error = zio_alloc_zil(zilog->zl_spa, txg, &blk,
- ZIL_MIN_BLKSZ, B_TRUE);
+ error = zio_alloc_zil(zilog->zl_spa, zilog->zl_os, txg, &blk,
+ ZIL_MIN_BLKSZ, &slog);
fastwrite = TRUE;
if (error == 0)
* Allocate a log write buffer (lwb) for the first log block.
*/
if (error == 0)
- lwb = zil_alloc_lwb(zilog, &blk, txg, fastwrite);
+ lwb = zil_alloc_lwb(zilog, &blk, slog, txg, fastwrite);
/*
* If we just allocated the first log block, commit our transaction
{
ASSERT(list_is_empty(&zilog->zl_lwb_list));
(void) zil_parse(zilog, zil_free_log_block,
- zil_free_log_record, tx, zilog->zl_header->zh_claim_txg);
+ zil_free_log_record, tx, zilog->zl_header->zh_claim_txg, B_FALSE);
}
int
-zil_claim(const char *osname, void *txarg)
+zil_claim(dsl_pool_t *dp, dsl_dataset_t *ds, void *txarg)
{
dmu_tx_t *tx = txarg;
uint64_t first_txg = dmu_tx_get_txg(tx);
objset_t *os;
int error;
- error = dmu_objset_own(osname, DMU_OST_ANY, B_FALSE, FTAG, &os);
+ error = dmu_objset_own_obj(dp, ds->ds_object,
+ DMU_OST_ANY, B_FALSE, B_FALSE, FTAG, &os);
if (error != 0) {
- cmn_err(CE_WARN, "can't open objset for %s", osname);
+ /*
+ * EBUSY indicates that the objset is inconsistent, in which
+ * case it can not have a ZIL.
+ */
+ if (error != EBUSY) {
+ cmn_err(CE_WARN, "can't open objset for %llu, error %u",
+ (unsigned long long)ds->ds_object, error);
+ }
+
return (0);
}
if (!BP_IS_HOLE(&zh->zh_log))
zio_free_zil(zilog->zl_spa, first_txg, &zh->zh_log);
BP_ZERO(&zh->zh_log);
+ if (os->os_encrypted)
+ os->os_next_write_raw = B_TRUE;
dsl_dataset_dirty(dmu_objset_ds(os), tx);
- dmu_objset_disown(os, FTAG);
+ dmu_objset_disown(os, B_FALSE, FTAG);
return (0);
}
ASSERT3U(zh->zh_claim_txg, <=, first_txg);
if (zh->zh_claim_txg == 0 && !BP_IS_HOLE(&zh->zh_log)) {
(void) zil_parse(zilog, zil_claim_log_block,
- zil_claim_log_record, tx, first_txg);
+ zil_claim_log_record, tx, first_txg, B_FALSE);
zh->zh_claim_txg = first_txg;
zh->zh_claim_blk_seq = zilog->zl_parse_blk_seq;
zh->zh_claim_lr_seq = zilog->zl_parse_lr_seq;
}
ASSERT3U(first_txg, ==, (spa_last_synced_txg(zilog->zl_spa) + 1));
- dmu_objset_disown(os, FTAG);
+ dmu_objset_disown(os, B_FALSE, FTAG);
return (0);
}
* Checksum errors are ok as they indicate the end of the chain.
* Any other error (no device or read failure) returns an error.
*/
+/* ARGSUSED */
int
-zil_check_log_chain(const char *osname, void *tx)
+zil_check_log_chain(dsl_pool_t *dp, dsl_dataset_t *ds, void *tx)
{
zilog_t *zilog;
objset_t *os;
ASSERT(tx == NULL);
- error = dmu_objset_hold(osname, FTAG, &os);
+ error = dmu_objset_from_ds(ds, &os);
if (error != 0) {
- cmn_err(CE_WARN, "can't open objset for %s", osname);
+ cmn_err(CE_WARN, "can't open objset %llu, error %d",
+ (unsigned long long)ds->ds_object, error);
return (0);
}
valid = vdev_log_state_valid(vd);
spa_config_exit(os->os_spa, SCL_STATE, FTAG);
- if (!valid) {
- dmu_objset_rele(os, FTAG);
+ if (!valid)
return (0);
- }
}
/*
* which will update spa_max_claim_txg. See spa_load() for details.
*/
error = zil_parse(zilog, zil_claim_log_block, zil_claim_log_record, tx,
- zilog->zl_header->zh_claim_txg ? -1ULL : spa_first_txg(os->os_spa));
-
- dmu_objset_rele(os, FTAG);
+ zilog->zl_header->zh_claim_txg ? -1ULL : spa_first_txg(os->os_spa),
+ B_FALSE);
return ((error == ECKSUM || error == ENOENT) ? 0 : error);
}
const uint64_t v1 = ((zil_vdev_node_t *)x1)->zv_vdev;
const uint64_t v2 = ((zil_vdev_node_t *)x2)->zv_vdev;
- if (v1 < v2)
- return (-1);
- if (v1 > v2)
- return (1);
-
- return (0);
+ return (AVL_CMP(v1, v2));
}
void
for (i = 0; i < ndvas; i++) {
zvsearch.zv_vdev = DVA_GET_VDEV(&bp->blk_dva[i]);
if (avl_find(t, &zvsearch, &where) == NULL) {
- zv = kmem_alloc(sizeof (*zv), KM_PUSHPAGE);
+ zv = kmem_alloc(sizeof (*zv), KM_SLEEP);
zv->zv_vdev = zvsearch.zv_vdev;
avl_insert(t, zv, where);
}
ASSERT(BP_GET_BYTEORDER(zio->io_bp) == ZFS_HOST_BYTEORDER);
ASSERT(!BP_IS_GANG(zio->io_bp));
ASSERT(!BP_IS_HOLE(zio->io_bp));
- ASSERT(zio->io_bp->blk_fill == 0);
+ ASSERT(BP_GET_FILL(zio->io_bp) == 0);
/*
* Ensure the lwb buffer pointer is cleared before releasing
* one in zil_commit_writer(). zil_sync() will only remove
* the lwb if lwb_buf is null.
*/
+ abd_put(zio->io_abd);
zio_buf_free(lwb->lwb_buf, lwb->lwb_sz);
mutex_enter(&zilog->zl_lock);
lwb->lwb_zio = NULL;
static void
zil_lwb_write_init(zilog_t *zilog, lwb_t *lwb)
{
- zbookmark_t zb;
+ zbookmark_phys_t zb;
+ zio_priority_t prio;
SET_BOOKMARK(&zb, lwb->lwb_blk.blk_cksum.zc_word[ZIL_ZC_OBJSET],
ZB_ZIL_OBJECT, ZB_ZIL_LEVEL,
/* Lock so zil_sync() doesn't fastwrite_unmark after zio is created */
mutex_enter(&zilog->zl_lock);
if (lwb->lwb_zio == NULL) {
+ abd_t *lwb_abd = abd_get_from_buf(lwb->lwb_buf,
+ BP_GET_LSIZE(&lwb->lwb_blk));
if (!lwb->lwb_fastwrite) {
metaslab_fastwrite_mark(zilog->zl_spa, &lwb->lwb_blk);
lwb->lwb_fastwrite = 1;
}
+ if (!lwb->lwb_slog || zilog->zl_cur_used <= zil_slog_bulk)
+ prio = ZIO_PRIORITY_SYNC_WRITE;
+ else
+ prio = ZIO_PRIORITY_ASYNC_WRITE;
lwb->lwb_zio = zio_rewrite(zilog->zl_root_zio, zilog->zl_spa,
- 0, &lwb->lwb_blk, lwb->lwb_buf, BP_GET_LSIZE(&lwb->lwb_blk),
- zil_lwb_write_done, lwb, ZIO_PRIORITY_SYNC_WRITE,
+ 0, &lwb->lwb_blk, lwb_abd, BP_GET_LSIZE(&lwb->lwb_blk),
+ zil_lwb_write_done, lwb, prio,
ZIO_FLAG_CANFAIL | ZIO_FLAG_DONT_PROPAGATE |
ZIO_FLAG_FASTWRITE, &zb);
}
*
* These must be a multiple of 4KB. Note only the amount used (again
* aligned to 4KB) actually gets written. However, we can't always just
- * allocate SPA_MAXBLOCKSIZE as the slog space could be exhausted.
+ * allocate SPA_OLD_MAXBLOCKSIZE as the slog space could be exhausted.
*/
uint64_t zil_block_buckets[] = {
4096, /* non TX_WRITE */
UINT64_MAX
};
-/*
- * Use the slog as long as the current commit size is less than the
- * limit or the total list size is less than 2X the limit. Limit
- * checking is disabled by setting zil_slog_limit to UINT64_MAX.
- */
-unsigned long zil_slog_limit = 1024 * 1024;
-#define USE_SLOG(zilog) (((zilog)->zl_cur_used < zil_slog_limit) || \
- ((zilog)->zl_itx_list_sz < (zil_slog_limit << 1)))
-
/*
* Start a log block write and advance to the next log block.
* Calls are serialized.
uint64_t txg;
uint64_t zil_blksz, wsz;
int i, error;
- boolean_t use_slog;
+ boolean_t slog;
if (BP_GET_CHECKSUM(&lwb->lwb_blk) == ZIO_CHECKSUM_ZILOG2) {
zilc = (zil_chain_t *)lwb->lwb_buf;
* to clean up in the event of allocation failure or I/O failure.
*/
tx = dmu_tx_create(zilog->zl_os);
- VERIFY(dmu_tx_assign(tx, TXG_WAIT) == 0);
+
+ /*
+ * Since we are not going to create any new dirty data and we can even
+ * help with clearing the existing dirty data, we should not be subject
+ * to the dirty data based delays.
+ * We (ab)use TXG_WAITED to bypass the delay mechanism.
+ * One side effect from using TXG_WAITED is that dmu_tx_assign() can
+ * fail if the pool is suspended. Those are dramatic circumstances,
+ * so we return NULL to signal that the normal ZIL processing is not
+ * possible and txg_wait_synced() should be used to ensure that the data
+ * is on disk.
+ */
+ error = dmu_tx_assign(tx, TXG_WAITED);
+ if (error != 0) {
+ ASSERT3S(error, ==, EIO);
+ dmu_tx_abort(tx);
+ return (NULL);
+ }
dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
txg = dmu_tx_get_txg(tx);
continue;
zil_blksz = zil_block_buckets[i];
if (zil_blksz == UINT64_MAX)
- zil_blksz = SPA_MAXBLOCKSIZE;
+ zil_blksz = SPA_OLD_MAXBLOCKSIZE;
zilog->zl_prev_blks[zilog->zl_prev_rotor] = zil_blksz;
for (i = 0; i < ZIL_PREV_BLKS; i++)
zil_blksz = MAX(zil_blksz, zilog->zl_prev_blks[i]);
zilog->zl_prev_rotor = (zilog->zl_prev_rotor + 1) & (ZIL_PREV_BLKS - 1);
BP_ZERO(bp);
- use_slog = USE_SLOG(zilog);
- error = zio_alloc_zil(spa, txg, bp, zil_blksz,
- USE_SLOG(zilog));
- if (use_slog) {
+ error = zio_alloc_zil(spa, zilog->zl_os, txg, bp, zil_blksz, &slog);
+ if (slog) {
ZIL_STAT_BUMP(zil_itx_metaslab_slog_count);
ZIL_STAT_INCR(zil_itx_metaslab_slog_bytes, lwb->lwb_nused);
} else {
/*
* Allocate a new log write buffer (lwb).
*/
- nlwb = zil_alloc_lwb(zilog, bp, txg, TRUE);
+ nlwb = zil_alloc_lwb(zilog, bp, slog, txg, TRUE);
/* Record the block for later vdev flushing */
zil_add_block(zilog, &lwb->lwb_blk);
static lwb_t *
zil_lwb_commit(zilog_t *zilog, itx_t *itx, lwb_t *lwb)
{
- lr_t *lrc = &itx->itx_lr; /* common log record */
- lr_write_t *lrw = (lr_write_t *)lrc;
+ lr_t *lrcb, *lrc;
+ lr_write_t *lrwb, *lrw;
char *lr_buf;
- uint64_t txg = lrc->lrc_txg;
- uint64_t reclen = lrc->lrc_reclen;
- uint64_t dlen = 0;
+ uint64_t dlen, dnow, lwb_sp, reclen, txg;
if (lwb == NULL)
return (NULL);
ASSERT(lwb->lwb_buf != NULL);
- ASSERT(zilog_is_dirty(zilog) ||
- spa_freeze_txg(zilog->zl_spa) != UINT64_MAX);
- if (lrc->lrc_txtype == TX_WRITE && itx->itx_wr_state == WR_NEED_COPY)
+ lrc = &itx->itx_lr; /* Common log record inside itx. */
+ lrw = (lr_write_t *)lrc; /* Write log record inside itx. */
+ if (lrc->lrc_txtype == TX_WRITE && itx->itx_wr_state == WR_NEED_COPY) {
dlen = P2ROUNDUP_TYPED(
lrw->lr_length, sizeof (uint64_t), uint64_t);
-
+ } else {
+ dlen = 0;
+ }
+ reclen = lrc->lrc_reclen;
zilog->zl_cur_used += (reclen + dlen);
+ txg = lrc->lrc_txg;
zil_lwb_write_init(zilog, lwb);
+cont:
/*
* If this record won't fit in the current log block, start a new one.
+ * For WR_NEED_COPY optimize layout for minimal number of chunks.
*/
- if (lwb->lwb_nused + reclen + dlen > lwb->lwb_sz) {
+ lwb_sp = lwb->lwb_sz - lwb->lwb_nused;
+ if (reclen > lwb_sp || (reclen + dlen > lwb_sp &&
+ lwb_sp < ZIL_MAX_WASTE_SPACE && (dlen % ZIL_MAX_LOG_DATA == 0 ||
+ lwb_sp < reclen + dlen % ZIL_MAX_LOG_DATA))) {
lwb = zil_lwb_write_start(zilog, lwb);
if (lwb == NULL)
return (NULL);
zil_lwb_write_init(zilog, lwb);
ASSERT(LWB_EMPTY(lwb));
- if (lwb->lwb_nused + reclen + dlen > lwb->lwb_sz) {
- txg_wait_synced(zilog->zl_dmu_pool, txg);
- return (lwb);
- }
+ lwb_sp = lwb->lwb_sz - lwb->lwb_nused;
+ ASSERT3U(reclen + MIN(dlen, sizeof (uint64_t)), <=, lwb_sp);
}
+ dnow = MIN(dlen, lwb_sp - reclen);
lr_buf = lwb->lwb_buf + lwb->lwb_nused;
bcopy(lrc, lr_buf, reclen);
- lrc = (lr_t *)lr_buf;
- lrw = (lr_write_t *)lrc;
+ lrcb = (lr_t *)lr_buf; /* Like lrc, but inside lwb. */
+ lrwb = (lr_write_t *)lrcb; /* Like lrw, but inside lwb. */
ZIL_STAT_BUMP(zil_itx_count);
char *dbuf;
int error;
- if (dlen) {
- ASSERT(itx->itx_wr_state == WR_NEED_COPY);
+ if (itx->itx_wr_state == WR_NEED_COPY) {
dbuf = lr_buf + reclen;
- lrw->lr_common.lrc_reclen += dlen;
+ lrcb->lrc_reclen += dnow;
+ if (lrwb->lr_length > dnow)
+ lrwb->lr_length = dnow;
+ lrw->lr_offset += dnow;
+ lrw->lr_length -= dnow;
ZIL_STAT_BUMP(zil_itx_needcopy_count);
ZIL_STAT_INCR(zil_itx_needcopy_bytes,
lrw->lr_length);
lrw->lr_length);
}
error = zilog->zl_get_data(
- itx->itx_private, lrw, dbuf, lwb->lwb_zio);
+ itx->itx_private, lrwb, dbuf, lwb->lwb_zio);
if (error == EIO) {
txg_wait_synced(zilog->zl_dmu_pool, txg);
return (lwb);
* equal to the itx sequence number because not all transactions
* are synchronous, and sometimes spa_sync() gets there first.
*/
- lrc->lrc_seq = ++zilog->zl_lr_seq; /* we are single threaded */
- lwb->lwb_nused += reclen + dlen;
+ lrcb->lrc_seq = ++zilog->zl_lr_seq; /* we are single threaded */
+ lwb->lwb_nused += reclen + dnow;
lwb->lwb_max_txg = MAX(lwb->lwb_max_txg, txg);
ASSERT3U(lwb->lwb_nused, <=, lwb->lwb_sz);
ASSERT0(P2PHASE(lwb->lwb_nused, sizeof (uint64_t)));
+ dlen -= dnow;
+ if (dlen > 0) {
+ zilog->zl_cur_used += reclen;
+ goto cont;
+ }
+
return (lwb);
}
lrsize = P2ROUNDUP_TYPED(lrsize, sizeof (uint64_t), size_t);
- itx = kmem_alloc(offsetof(itx_t, itx_lr) + lrsize,
- KM_PUSHPAGE | KM_NODEBUG);
+ itx = zio_data_buf_alloc(offsetof(itx_t, itx_lr) + lrsize);
itx->itx_lr.lrc_txtype = txtype;
itx->itx_lr.lrc_reclen = lrsize;
- itx->itx_sod = lrsize; /* if write & WR_NEED_COPY will be increased */
itx->itx_lr.lrc_seq = 0; /* defensive */
itx->itx_sync = B_TRUE; /* default is synchronous */
itx->itx_callback = NULL;
void
zil_itx_destroy(itx_t *itx)
{
- kmem_free(itx, offsetof(itx_t, itx_lr) + itx->itx_lr.lrc_reclen);
+ zio_data_buf_free(itx, offsetof(itx_t, itx_lr)+itx->itx_lr.lrc_reclen);
}
/*
if (itx->itx_callback != NULL)
itx->itx_callback(itx->itx_callback_data);
list_remove(list, itx);
- kmem_free(itx, offsetof(itx_t, itx_lr) +
- itx->itx_lr.lrc_reclen);
+ zil_itx_destroy(itx);
}
cookie = NULL;
if (itx->itx_callback != NULL)
itx->itx_callback(itx->itx_callback_data);
list_remove(list, itx);
- kmem_free(itx, offsetof(itx_t, itx_lr) +
- itx->itx_lr.lrc_reclen);
+ zil_itx_destroy(itx);
}
list_destroy(list);
kmem_free(ian, sizeof (itx_async_node_t));
const uint64_t o1 = ((itx_async_node_t *)x1)->ia_foid;
const uint64_t o2 = ((itx_async_node_t *)x2)->ia_foid;
- if (o1 < o2)
- return (-1);
- if (o1 > o2)
- return (1);
-
- return (0);
+ return (AVL_CMP(o1, o2));
}
/*
if (itx->itx_callback != NULL)
itx->itx_callback(itx->itx_callback_data);
list_remove(&clean_list, itx);
- kmem_free(itx, offsetof(itx_t, itx_lr) +
- itx->itx_lr.lrc_reclen);
+ zil_itx_destroy(itx);
}
list_destroy(&clean_list);
}
* this itxg. Save the itxs for release below.
* This should be rare.
*/
- atomic_add_64(&zilog->zl_itx_list_sz, -itxg->itxg_sod);
- itxg->itxg_sod = 0;
+ zfs_dbgmsg("zil_itx_assign: missed itx cleanup for "
+ "txg %llu", itxg->itxg_txg);
clean = itxg->itxg_itxs;
}
- ASSERT(itxg->itxg_sod == 0);
itxg->itxg_txg = txg;
itxs = itxg->itxg_itxs = kmem_zalloc(sizeof (itxs_t),
- KM_PUSHPAGE);
+ KM_SLEEP);
list_create(&itxs->i_sync_list, sizeof (itx_t),
offsetof(itx_t, itx_node));
}
if (itx->itx_sync) {
list_insert_tail(&itxs->i_sync_list, itx);
- atomic_add_64(&zilog->zl_itx_list_sz, itx->itx_sod);
- itxg->itxg_sod += itx->itx_sod;
} else {
avl_tree_t *t = &itxs->i_async_tree;
- uint64_t foid = ((lr_ooo_t *)&itx->itx_lr)->lr_foid;
+ uint64_t foid =
+ LR_FOID_GET_OBJ(((lr_ooo_t *)&itx->itx_lr)->lr_foid);
itx_async_node_t *ian;
avl_index_t where;
ian = avl_find(t, &foid, &where);
if (ian == NULL) {
ian = kmem_alloc(sizeof (itx_async_node_t),
- KM_PUSHPAGE);
+ KM_SLEEP);
list_create(&ian->ia_list, sizeof (itx_t),
offsetof(itx_t, itx_node));
ian->ia_foid = foid;
ASSERT3U(itxg->itxg_txg, <=, synced_txg);
ASSERT(itxg->itxg_txg != 0);
ASSERT(zilog->zl_clean_taskq != NULL);
- atomic_add_64(&zilog->zl_itx_list_sz, -itxg->itxg_sod);
- itxg->itxg_sod = 0;
clean_me = itxg->itxg_itxs;
itxg->itxg_itxs = NULL;
itxg->itxg_txg = 0;
{
uint64_t otxg, txg;
list_t *commit_list = &zilog->zl_itx_commit_list;
- uint64_t push_sod = 0;
if (spa_freeze_txg(zilog->zl_spa) != UINT64_MAX) /* ziltest support */
otxg = ZILTEST_TXG;
else
otxg = spa_last_synced_txg(zilog->zl_spa) + 1;
+ /*
+ * This is inherently racy, since there is nothing to prevent
+ * the last synced txg from changing. That's okay since we'll
+ * only commit things in the future.
+ */
for (txg = otxg; txg < (otxg + TXG_CONCURRENT_STATES); txg++) {
itxg_t *itxg = &zilog->zl_itxg[txg & TXG_MASK];
continue;
}
+ /*
+ * If we're adding itx records to the zl_itx_commit_list,
+ * then the zil better be dirty in this "txg". We can assert
+ * that here since we're holding the itxg_lock which will
+ * prevent spa_sync from cleaning it. Once we add the itxs
+ * to the zl_itx_commit_list we must commit it to disk even
+ * if it's unnecessary (i.e. the txg was synced).
+ */
+ ASSERT(zilog_is_dirty_in_txg(zilog, txg) ||
+ spa_freeze_txg(zilog->zl_spa) != UINT64_MAX);
list_move_tail(commit_list, &itxg->itxg_itxs->i_sync_list);
- push_sod += itxg->itxg_sod;
- itxg->itxg_sod = 0;
mutex_exit(&itxg->itxg_lock);
}
- atomic_add_64(&zilog->zl_itx_list_sz, -push_sod);
}
/*
else
otxg = spa_last_synced_txg(zilog->zl_spa) + 1;
+ /*
+ * This is inherently racy, since there is nothing to prevent
+ * the last synced txg from changing.
+ */
for (txg = otxg; txg < (otxg + TXG_CONCURRENT_STATES); txg++) {
itxg_t *itxg = &zilog->zl_itxg[txg & TXG_MASK];
for (itx = list_head(&zilog->zl_itx_commit_list); itx != NULL;
itx = list_next(&zilog->zl_itx_commit_list, itx)) {
txg = itx->itx_lr.lrc_txg;
- ASSERT(txg);
+ ASSERT3U(txg, !=, 0);
+ /*
+ * This is inherently racy and may result in us writing
+ * out a log block for a txg that was just synced. This is
+ * ok since we'll end cleaning up that log block the next
+ * time we call zil_sync().
+ */
if (txg > spa_last_synced_txg(spa) || txg > spa_freeze_txg(spa))
lwb = zil_lwb_commit(zilog, itx, lwb);
}
if (itx->itx_callback != NULL)
itx->itx_callback(itx->itx_callback_data);
list_remove(&zilog->zl_itx_commit_list, itx);
- kmem_free(itx, offsetof(itx_t, itx_lr)
- + itx->itx_lr.lrc_reclen);
+ zil_itx_destroy(itx);
}
mutex_enter(&zilog->zl_lock);
zilog_t *zilog;
int i;
- zilog = kmem_zalloc(sizeof (zilog_t), KM_PUSHPAGE);
+ zilog = kmem_zalloc(sizeof (zilog_t), KM_SLEEP);
zilog->zl_header = zh_phys;
zilog->zl_os = os;
ASSERT(list_is_empty(&zilog->zl_lwb_list));
zilog->zl_get_data = get_data;
- zilog->zl_clean_taskq = taskq_create("zil_clean", 1, minclsyspri,
+ zilog->zl_clean_taskq = taskq_create("zil_clean", 1, defclsyspri,
2, 2, TASKQ_PREPOPULATE);
return (zilog);
mutex_exit(&zilog->zl_lock);
if (txg)
txg_wait_synced(zilog->zl_dmu_pool, txg);
- ASSERT(!zilog_is_dirty(zilog));
+
+ if (zilog_is_dirty(zilog))
+ zfs_dbgmsg("zil (%p) is dirty, txg %llu", zilog, txg);
+ if (txg < spa_freeze_txg(zilog->zl_spa))
+ VERIFY(!zilog_is_dirty(zilog));
taskq_destroy(zilog->zl_clean_taskq);
zilog->zl_clean_taskq = NULL;
return (0);
}
+ /*
+ * The ZIL has work to do. Ensure that the associated encryption
+ * key will remain mapped while we are committing the log by
+ * grabbing a reference to it. If the key isn't loaded we have no
+ * choice but to return an error until the wrapping key is loaded.
+ */
+ if (os->os_encrypted && spa_keystore_create_mapping(os->os_spa,
+ dmu_objset_ds(os), FTAG) != 0) {
+ zilog->zl_suspend--;
+ mutex_exit(&zilog->zl_lock);
+ dsl_dataset_long_rele(dmu_objset_ds(os), suspend_tag);
+ dsl_dataset_rele(dmu_objset_ds(os), suspend_tag);
+ return (SET_ERROR(EBUSY));
+ }
+
zilog->zl_suspending = B_TRUE;
mutex_exit(&zilog->zl_lock);
cv_broadcast(&zilog->zl_cv_suspend);
mutex_exit(&zilog->zl_lock);
+ if (os->os_encrypted) {
+ /*
+ * Encrypted datasets need to wait for all data to be
+ * synced out before removing the mapping.
+ *
+ * XXX: Depending on the number of datasets with
+ * outstanding ZIL data on a given log device, this
+ * might cause spa_offline_log() to take a long time.
+ */
+ txg_wait_synced(zilog->zl_dmu_pool, zilog->zl_destroy_txg);
+ VERIFY0(spa_keystore_remove_mapping(os->os_spa,
+ dmu_objset_id(os), FTAG));
+ }
+
if (cookiep == NULL)
zil_resume(os);
else
static int
zil_replay_error(zilog_t *zilog, lr_t *lr, int error)
{
- char name[MAXNAMELEN];
+ char name[ZFS_MAX_DATASET_NAME_LEN];
zilog->zl_replaying_seq--; /* didn't actually replay this one */
*/
if (TX_OOO(txtype)) {
error = dmu_object_info(zilog->zl_os,
- ((lr_ooo_t *)lr)->lr_foid, NULL);
+ LR_FOID_GET_OBJ(((lr_ooo_t *)lr)->lr_foid), NULL);
if (error == ENOENT || error == EEXIST)
return (0);
}
zr.zr_replay = replay_func;
zr.zr_arg = arg;
zr.zr_byteswap = BP_SHOULD_BYTESWAP(&zh->zh_log);
- zr.zr_lr = vmem_alloc(2 * SPA_MAXBLOCKSIZE, KM_PUSHPAGE);
+ zr.zr_lr = vmem_alloc(2 * SPA_MAXBLOCKSIZE, KM_SLEEP);
/*
* Wait for in-progress removes to sync before starting replay.
zilog->zl_replay_time = ddi_get_lbolt();
ASSERT(zilog->zl_replay_blks == 0);
(void) zil_parse(zilog, zil_incr_blks, zil_replay_log_record, &zr,
- zh->zh_claim_txg);
+ zh->zh_claim_txg, B_TRUE);
vmem_free(zr.zr_lr, 2 * SPA_MAXBLOCKSIZE);
zil_destroy(zilog, B_FALSE);
}
#if defined(_KERNEL) && defined(HAVE_SPL)
+EXPORT_SYMBOL(zil_alloc);
+EXPORT_SYMBOL(zil_free);
+EXPORT_SYMBOL(zil_open);
+EXPORT_SYMBOL(zil_close);
+EXPORT_SYMBOL(zil_replay);
+EXPORT_SYMBOL(zil_replaying);
+EXPORT_SYMBOL(zil_destroy);
+EXPORT_SYMBOL(zil_destroy_sync);
+EXPORT_SYMBOL(zil_itx_create);
+EXPORT_SYMBOL(zil_itx_destroy);
+EXPORT_SYMBOL(zil_itx_assign);
+EXPORT_SYMBOL(zil_commit);
+EXPORT_SYMBOL(zil_vdev_offline);
+EXPORT_SYMBOL(zil_claim);
+EXPORT_SYMBOL(zil_check_log_chain);
+EXPORT_SYMBOL(zil_sync);
+EXPORT_SYMBOL(zil_clean);
+EXPORT_SYMBOL(zil_suspend);
+EXPORT_SYMBOL(zil_resume);
+EXPORT_SYMBOL(zil_add_block);
+EXPORT_SYMBOL(zil_bp_tree_add);
+EXPORT_SYMBOL(zil_set_sync);
+EXPORT_SYMBOL(zil_set_logbias);
+
+/* BEGIN CSTYLED */
module_param(zil_replay_disable, int, 0644);
MODULE_PARM_DESC(zil_replay_disable, "Disable intent logging replay");
module_param(zfs_nocacheflush, int, 0644);
MODULE_PARM_DESC(zfs_nocacheflush, "Disable cache flushes");
-module_param(zil_slog_limit, ulong, 0644);
-MODULE_PARM_DESC(zil_slog_limit, "Max commit bytes to separate log device");
+module_param(zil_slog_bulk, ulong, 0644);
+MODULE_PARM_DESC(zil_slog_bulk, "Limit in bytes slog sync writes per commit");
+/* END CSTYLED */
#endif