]> git.proxmox.com Git - mirror_ubuntu-artful-kernel.git/blobdiff - drivers/md/dm-cache-target.c
Merge branch 'dm-4.12' into dm-4.12-post-merge
[mirror_ubuntu-artful-kernel.git] / drivers / md / dm-cache-target.c
index 9c689b34e6e792105d64f2bb4835e11e37c91578..1db375f50a1321aae81492d1b3ac6392c52a00c2 100644 (file)
@@ -5,7 +5,7 @@
  */
 
 #include "dm.h"
-#include "dm-bio-prison.h"
+#include "dm-bio-prison-v2.h"
 #include "dm-bio-record.h"
 #include "dm-cache-metadata.h"
 
@@ -15,6 +15,7 @@
 #include <linux/init.h>
 #include <linux/mempool.h>
 #include <linux/module.h>
+#include <linux/rwsem.h>
 #include <linux/slab.h>
 #include <linux/vmalloc.h>
 
@@ -25,7 +26,18 @@ DECLARE_DM_KCOPYD_THROTTLE_WITH_MODULE_PARM(cache_copy_throttle,
 
 /*----------------------------------------------------------------*/
 
-#define IOT_RESOLUTION 4
+/*
+ * Glossary:
+ *
+ * oblock: index of an origin block
+ * cblock: index of a cache block
+ * promotion: movement of a block from origin to cache
+ * demotion: movement of a block from cache to origin
+ * migration: movement of a block between the origin and cache device,
+ *           either direction
+ */
+
+/*----------------------------------------------------------------*/
 
 struct io_tracker {
        spinlock_t lock;
@@ -99,18 +111,177 @@ static void iot_io_end(struct io_tracker *iot, sector_t len)
 /*----------------------------------------------------------------*/
 
 /*
- * Glossary:
- *
- * oblock: index of an origin block
- * cblock: index of a cache block
- * promotion: movement of a block from origin to cache
- * demotion: movement of a block from cache to origin
- * migration: movement of a block between the origin and cache device,
- *           either direction
+ * Represents a chunk of future work.  'input' allows continuations to pass
+ * values between themselves, typically error values.
  */
+struct continuation {
+       struct work_struct ws;
+       int input;
+};
+
+static inline void init_continuation(struct continuation *k,
+                                    void (*fn)(struct work_struct *))
+{
+       INIT_WORK(&k->ws, fn);
+       k->input = 0;
+}
+
+static inline void queue_continuation(struct workqueue_struct *wq,
+                                     struct continuation *k)
+{
+       queue_work(wq, &k->ws);
+}
 
 /*----------------------------------------------------------------*/
 
+/*
+ * The batcher collects together pieces of work that need a particular
+ * operation to occur before they can proceed (typically a commit).
+ */
+struct batcher {
+       /*
+        * The operation that everyone is waiting for.
+        */
+       int (*commit_op)(void *context);
+       void *commit_context;
+
+       /*
+        * This is how bios should be issued once the commit op is complete
+        * (accounted_request).
+        */
+       void (*issue_op)(struct bio *bio, void *context);
+       void *issue_context;
+
+       /*
+        * Queued work gets put on here after commit.
+        */
+       struct workqueue_struct *wq;
+
+       spinlock_t lock;
+       struct list_head work_items;
+       struct bio_list bios;
+       struct work_struct commit_work;
+
+       bool commit_scheduled;
+};
+
+static void __commit(struct work_struct *_ws)
+{
+       struct batcher *b = container_of(_ws, struct batcher, commit_work);
+
+       int r;
+       unsigned long flags;
+       struct list_head work_items;
+       struct work_struct *ws, *tmp;
+       struct continuation *k;
+       struct bio *bio;
+       struct bio_list bios;
+
+       INIT_LIST_HEAD(&work_items);
+       bio_list_init(&bios);
+
+       /*
+        * We have to grab these before the commit_op to avoid a race
+        * condition.
+        */
+       spin_lock_irqsave(&b->lock, flags);
+       list_splice_init(&b->work_items, &work_items);
+       bio_list_merge(&bios, &b->bios);
+       bio_list_init(&b->bios);
+       b->commit_scheduled = false;
+       spin_unlock_irqrestore(&b->lock, flags);
+
+       r = b->commit_op(b->commit_context);
+
+       list_for_each_entry_safe(ws, tmp, &work_items, entry) {
+               k = container_of(ws, struct continuation, ws);
+               k->input = r;
+               INIT_LIST_HEAD(&ws->entry); /* to avoid a WARN_ON */
+               queue_work(b->wq, ws);
+       }
+
+       while ((bio = bio_list_pop(&bios))) {
+               if (r) {
+                       bio->bi_error = r;
+                       bio_endio(bio);
+               } else
+                       b->issue_op(bio, b->issue_context);
+       }
+}
+
+static void batcher_init(struct batcher *b,
+                        int (*commit_op)(void *),
+                        void *commit_context,
+                        void (*issue_op)(struct bio *bio, void *),
+                        void *issue_context,
+                        struct workqueue_struct *wq)
+{
+       b->commit_op = commit_op;
+       b->commit_context = commit_context;
+       b->issue_op = issue_op;
+       b->issue_context = issue_context;
+       b->wq = wq;
+
+       spin_lock_init(&b->lock);
+       INIT_LIST_HEAD(&b->work_items);
+       bio_list_init(&b->bios);
+       INIT_WORK(&b->commit_work, __commit);
+       b->commit_scheduled = false;
+}
+
+static void async_commit(struct batcher *b)
+{
+       queue_work(b->wq, &b->commit_work);
+}
+
+static void continue_after_commit(struct batcher *b, struct continuation *k)
+{
+       unsigned long flags;
+       bool commit_scheduled;
+
+       spin_lock_irqsave(&b->lock, flags);
+       commit_scheduled = b->commit_scheduled;
+       list_add_tail(&k->ws.entry, &b->work_items);
+       spin_unlock_irqrestore(&b->lock, flags);
+
+       if (commit_scheduled)
+               async_commit(b);
+}
+
+/*
+ * Bios are errored if commit failed.
+ */
+static void issue_after_commit(struct batcher *b, struct bio *bio)
+{
+       unsigned long flags;
+       bool commit_scheduled;
+
+       spin_lock_irqsave(&b->lock, flags);
+       commit_scheduled = b->commit_scheduled;
+       bio_list_add(&b->bios, bio);
+       spin_unlock_irqrestore(&b->lock, flags);
+
+       if (commit_scheduled)
+              async_commit(b);
+}
+
+/*
+ * Call this if some urgent work is waiting for the commit to complete.
+ */
+static void schedule_commit(struct batcher *b)
+{
+       bool immediate;
+       unsigned long flags;
+
+       spin_lock_irqsave(&b->lock, flags);
+       immediate = !list_empty(&b->work_items) || !bio_list_empty(&b->bios);
+       b->commit_scheduled = true;
+       spin_unlock_irqrestore(&b->lock, flags);
+
+       if (immediate)
+               async_commit(b);
+}
+
 /*
  * There are a couple of places where we let a bio run, but want to do some
  * work before calling its endio function.  We do this by temporarily
@@ -189,31 +360,13 @@ struct cache_stats {
        atomic_t write_miss;
        atomic_t demotion;
        atomic_t promotion;
+       atomic_t writeback;
        atomic_t copies_avoided;
        atomic_t cache_cell_clash;
        atomic_t commit_count;
        atomic_t discard_count;
 };
 
-/*
- * Defines a range of cblocks, begin to (end - 1) are in the range.  end is
- * the one-past-the-end value.
- */
-struct cblock_range {
-       dm_cblock_t begin;
-       dm_cblock_t end;
-};
-
-struct invalidation_request {
-       struct list_head list;
-       struct cblock_range *cblocks;
-
-       atomic_t complete;
-       int err;
-
-       wait_queue_head_t result_wait;
-};
-
 struct cache {
        struct dm_target *ti;
        struct dm_target_callbacks callbacks;
@@ -255,11 +408,7 @@ struct cache {
        spinlock_t lock;
        struct list_head deferred_cells;
        struct bio_list deferred_bios;
-       struct bio_list deferred_flush_bios;
        struct bio_list deferred_writethrough_bios;
-       struct list_head quiesced_migrations;
-       struct list_head completed_migrations;
-       struct list_head need_commit_migrations;
        sector_t migration_threshold;
        wait_queue_head_t migration_wait;
        atomic_t nr_allocated_migrations;
@@ -270,9 +419,7 @@ struct cache {
         */
        atomic_t nr_io_migrations;
 
-       wait_queue_head_t quiescing_wait;
-       atomic_t quiescing;
-       atomic_t quiescing_ack;
+       struct rw_semaphore quiesce_lock;
 
        /*
         * cache_size entries, dirty if set
@@ -296,13 +443,11 @@ struct cache {
 
        struct dm_kcopyd_client *copier;
        struct workqueue_struct *wq;
-       struct work_struct worker;
-
+       struct work_struct deferred_bio_worker;
+       struct work_struct deferred_writethrough_worker;
+       struct work_struct migration_worker;
        struct delayed_work waker;
-       unsigned long last_commit_jiffies;
-
-       struct dm_bio_prison *prison;
-       struct dm_deferred_set *all_io_ds;
+       struct dm_bio_prison_v2 *prison;
 
        mempool_t *migration_pool;
 
@@ -330,12 +475,17 @@ struct cache {
        struct list_head invalidation_requests;
 
        struct io_tracker origin_tracker;
+
+       struct work_struct commit_ws;
+       struct batcher committer;
+
+       struct rw_semaphore background_work_lock;
 };
 
 struct per_bio_data {
        bool tick:1;
        unsigned req_nr:2;
-       struct dm_deferred_entry *all_io_entry;
+       struct dm_bio_prison_cell_v2 *cell;
        struct dm_hook_info hook_info;
        sector_t len;
 
@@ -350,55 +500,64 @@ struct per_bio_data {
 };
 
 struct dm_cache_migration {
-       struct list_head list;
+       struct continuation k;
        struct cache *cache;
 
-       unsigned long start_jiffies;
-       dm_oblock_t old_oblock;
-       dm_oblock_t new_oblock;
-       dm_cblock_t cblock;
-
-       bool err:1;
-       bool discard:1;
-       bool writeback:1;
-       bool demote:1;
-       bool promote:1;
-       bool requeue_holder:1;
-       bool invalidate:1;
+       struct policy_work *op;
+       struct bio *overwrite_bio;
+       struct dm_bio_prison_cell_v2 *cell;
 
-       struct dm_bio_prison_cell *old_ocell;
-       struct dm_bio_prison_cell *new_ocell;
+       dm_cblock_t invalidate_cblock;
+       dm_oblock_t invalidate_oblock;
 };
 
-/*
- * Processing a bio in the worker thread may require these memory
- * allocations.  We prealloc to avoid deadlocks (the same worker thread
- * frees them back to the mempool).
- */
-struct prealloc {
-       struct dm_cache_migration *mg;
-       struct dm_bio_prison_cell *cell1;
-       struct dm_bio_prison_cell *cell2;
-};
+/*----------------------------------------------------------------*/
+
+static bool writethrough_mode(struct cache_features *f)
+{
+       return f->io_mode == CM_IO_WRITETHROUGH;
+}
+
+static bool writeback_mode(struct cache_features *f)
+{
+       return f->io_mode == CM_IO_WRITEBACK;
+}
+
+static inline bool passthrough_mode(struct cache_features *f)
+{
+       return unlikely(f->io_mode == CM_IO_PASSTHROUGH);
+}
+
+/*----------------------------------------------------------------*/
+
+static void wake_deferred_bio_worker(struct cache *cache)
+{
+       queue_work(cache->wq, &cache->deferred_bio_worker);
+}
 
-static enum cache_metadata_mode get_cache_mode(struct cache *cache);
+static void wake_deferred_writethrough_worker(struct cache *cache)
+{
+       queue_work(cache->wq, &cache->deferred_writethrough_worker);
+}
 
-static void wake_worker(struct cache *cache)
+static void wake_migration_worker(struct cache *cache)
 {
-       queue_work(cache->wq, &cache->worker);
+       if (passthrough_mode(&cache->features))
+               return;
+
+       queue_work(cache->wq, &cache->migration_worker);
 }
 
 /*----------------------------------------------------------------*/
 
-static struct dm_bio_prison_cell *alloc_prison_cell(struct cache *cache)
+static struct dm_bio_prison_cell_v2 *alloc_prison_cell(struct cache *cache)
 {
-       /* FIXME: change to use a local slab. */
-       return dm_bio_prison_alloc_cell(cache->prison, GFP_NOWAIT);
+       return dm_bio_prison_alloc_cell_v2(cache->prison, GFP_NOWAIT);
 }
 
-static void free_prison_cell(struct cache *cache, struct dm_bio_prison_cell *cell)
+static void free_prison_cell(struct cache *cache, struct dm_bio_prison_cell_v2 *cell)
 {
-       dm_bio_prison_free_cell(cache->prison, cell);
+       dm_bio_prison_free_cell_v2(cache->prison, cell);
 }
 
 static struct dm_cache_migration *alloc_migration(struct cache *cache)
@@ -424,146 +583,127 @@ static void free_migration(struct dm_cache_migration *mg)
        mempool_free(mg, cache->migration_pool);
 }
 
-static int prealloc_data_structs(struct cache *cache, struct prealloc *p)
-{
-       if (!p->mg) {
-               p->mg = alloc_migration(cache);
-               if (!p->mg)
-                       return -ENOMEM;
-       }
-
-       if (!p->cell1) {
-               p->cell1 = alloc_prison_cell(cache);
-               if (!p->cell1)
-                       return -ENOMEM;
-       }
-
-       if (!p->cell2) {
-               p->cell2 = alloc_prison_cell(cache);
-               if (!p->cell2)
-                       return -ENOMEM;
-       }
-
-       return 0;
-}
+/*----------------------------------------------------------------*/
 
-static void prealloc_free_structs(struct cache *cache, struct prealloc *p)
+static inline dm_oblock_t oblock_succ(dm_oblock_t b)
 {
-       if (p->cell2)
-               free_prison_cell(cache, p->cell2);
-
-       if (p->cell1)
-               free_prison_cell(cache, p->cell1);
-
-       if (p->mg)
-               free_migration(p->mg);
+       return to_oblock(from_oblock(b) + 1ull);
 }
 
-static struct dm_cache_migration *prealloc_get_migration(struct prealloc *p)
+static void build_key(dm_oblock_t begin, dm_oblock_t end, struct dm_cell_key_v2 *key)
 {
-       struct dm_cache_migration *mg = p->mg;
-
-       BUG_ON(!mg);
-       p->mg = NULL;
-
-       return mg;
+       key->virtual = 0;
+       key->dev = 0;
+       key->block_begin = from_oblock(begin);
+       key->block_end = from_oblock(end);
 }
 
 /*
- * You must have a cell within the prealloc struct to return.  If not this
- * function will BUG() rather than returning NULL.
+ * We have two lock levels.  Level 0, which is used to prevent WRITEs, and
+ * level 1 which prevents *both* READs and WRITEs.
  */
-static struct dm_bio_prison_cell *prealloc_get_cell(struct prealloc *p)
+#define WRITE_LOCK_LEVEL 0
+#define READ_WRITE_LOCK_LEVEL 1
+
+static unsigned lock_level(struct bio *bio)
 {
-       struct dm_bio_prison_cell *r = NULL;
+       return bio_data_dir(bio) == WRITE ?
+               WRITE_LOCK_LEVEL :
+               READ_WRITE_LOCK_LEVEL;
+}
 
-       if (p->cell1) {
-               r = p->cell1;
-               p->cell1 = NULL;
+/*----------------------------------------------------------------
+ * Per bio data
+ *--------------------------------------------------------------*/
 
-       } else if (p->cell2) {
-               r = p->cell2;
-               p->cell2 = NULL;
-       } else
-               BUG();
+/*
+ * If using writeback, leave out struct per_bio_data's writethrough fields.
+ */
+#define PB_DATA_SIZE_WB (offsetof(struct per_bio_data, cache))
+#define PB_DATA_SIZE_WT (sizeof(struct per_bio_data))
 
-       return r;
+static size_t get_per_bio_data_size(struct cache *cache)
+{
+       return writethrough_mode(&cache->features) ? PB_DATA_SIZE_WT : PB_DATA_SIZE_WB;
 }
 
-/*
- * You can't have more than two cells in a prealloc struct.  BUG() will be
- * called if you try and overfill.
- */
-static void prealloc_put_cell(struct prealloc *p, struct dm_bio_prison_cell *cell)
+static struct per_bio_data *get_per_bio_data(struct bio *bio, size_t data_size)
 {
-       if (!p->cell2)
-               p->cell2 = cell;
+       struct per_bio_data *pb = dm_per_bio_data(bio, data_size);
+       BUG_ON(!pb);
+       return pb;
+}
 
-       else if (!p->cell1)
-               p->cell1 = cell;
+static struct per_bio_data *init_per_bio_data(struct bio *bio, size_t data_size)
+{
+       struct per_bio_data *pb = get_per_bio_data(bio, data_size);
 
-       else
-               BUG();
+       pb->tick = false;
+       pb->req_nr = dm_bio_get_target_bio_nr(bio);
+       pb->cell = NULL;
+       pb->len = 0;
+
+       return pb;
 }
 
 /*----------------------------------------------------------------*/
 
-static void build_key(dm_oblock_t begin, dm_oblock_t end, struct dm_cell_key *key)
+static void defer_bio(struct cache *cache, struct bio *bio)
 {
-       key->virtual = 0;
-       key->dev = 0;
-       key->block_begin = from_oblock(begin);
-       key->block_end = from_oblock(end);
-}
+       unsigned long flags;
 
-/*
- * The caller hands in a preallocated cell, and a free function for it.
- * The cell will be freed if there's an error, or if it wasn't used because
- * a cell with that key already exists.
- */
-typedef void (*cell_free_fn)(void *context, struct dm_bio_prison_cell *cell);
+       spin_lock_irqsave(&cache->lock, flags);
+       bio_list_add(&cache->deferred_bios, bio);
+       spin_unlock_irqrestore(&cache->lock, flags);
+
+       wake_deferred_bio_worker(cache);
+}
 
-static int bio_detain_range(struct cache *cache, dm_oblock_t oblock_begin, dm_oblock_t oblock_end,
-                           struct bio *bio, struct dm_bio_prison_cell *cell_prealloc,
-                           cell_free_fn free_fn, void *free_context,
-                           struct dm_bio_prison_cell **cell_result)
+static void defer_bios(struct cache *cache, struct bio_list *bios)
 {
-       int r;
-       struct dm_cell_key key;
+       unsigned long flags;
 
-       build_key(oblock_begin, oblock_end, &key);
-       r = dm_bio_detain(cache->prison, &key, bio, cell_prealloc, cell_result);
-       if (r)
-               free_fn(free_context, cell_prealloc);
+       spin_lock_irqsave(&cache->lock, flags);
+       bio_list_merge(&cache->deferred_bios, bios);
+       bio_list_init(bios);
+       spin_unlock_irqrestore(&cache->lock, flags);
 
-       return r;
+       wake_deferred_bio_worker(cache);
 }
 
-static int bio_detain(struct cache *cache, dm_oblock_t oblock,
-                     struct bio *bio, struct dm_bio_prison_cell *cell_prealloc,
-                     cell_free_fn free_fn, void *free_context,
-                     struct dm_bio_prison_cell **cell_result)
+/*----------------------------------------------------------------*/
+
+static bool bio_detain_shared(struct cache *cache, dm_oblock_t oblock, struct bio *bio)
 {
+       bool r;
+       size_t pb_size;
+       struct per_bio_data *pb;
+       struct dm_cell_key_v2 key;
        dm_oblock_t end = to_oblock(from_oblock(oblock) + 1ULL);
-       return bio_detain_range(cache, oblock, end, bio,
-                               cell_prealloc, free_fn, free_context, cell_result);
-}
+       struct dm_bio_prison_cell_v2 *cell_prealloc, *cell;
 
-static int get_cell(struct cache *cache,
-                   dm_oblock_t oblock,
-                   struct prealloc *structs,
-                   struct dm_bio_prison_cell **cell_result)
-{
-       int r;
-       struct dm_cell_key key;
-       struct dm_bio_prison_cell *cell_prealloc;
+       cell_prealloc = alloc_prison_cell(cache); /* FIXME: allow wait if calling from worker */
+       if (!cell_prealloc) {
+               defer_bio(cache, bio);
+               return false;
+       }
+
+       build_key(oblock, end, &key);
+       r = dm_cell_get_v2(cache->prison, &key, lock_level(bio), bio, cell_prealloc, &cell);
+       if (!r) {
+               /*
+                * Failed to get the lock.
+                */
+               free_prison_cell(cache, cell_prealloc);
+               return r;
+       }
 
-       cell_prealloc = prealloc_get_cell(structs);
+       if (cell != cell_prealloc)
+               free_prison_cell(cache, cell_prealloc);
 
-       build_key(oblock, to_oblock(from_oblock(oblock) + 1ULL), &key);
-       r = dm_get_cell(cache->prison, &key, cell_prealloc, cell_result);
-       if (r)
-               prealloc_put_cell(structs, cell_prealloc);
+       pb_size = get_per_bio_data_size(cache);
+       pb = get_per_bio_data(bio, pb_size);
+       pb->cell = cell;
 
        return r;
 }
@@ -575,21 +715,33 @@ static bool is_dirty(struct cache *cache, dm_cblock_t b)
        return test_bit(from_cblock(b), cache->dirty_bitset);
 }
 
-static void set_dirty(struct cache *cache, dm_oblock_t oblock, dm_cblock_t cblock)
+static void set_dirty(struct cache *cache, dm_cblock_t cblock)
 {
        if (!test_and_set_bit(from_cblock(cblock), cache->dirty_bitset)) {
                atomic_inc(&cache->nr_dirty);
-               policy_set_dirty(cache->policy, oblock);
+               policy_set_dirty(cache->policy, cblock);
        }
 }
 
-static void clear_dirty(struct cache *cache, dm_oblock_t oblock, dm_cblock_t cblock)
+/*
+ * These two are called when setting after migrations to force the policy
+ * and dirty bitset to be in sync.
+ */
+static void force_set_dirty(struct cache *cache, dm_cblock_t cblock)
+{
+       if (!test_and_set_bit(from_cblock(cblock), cache->dirty_bitset))
+               atomic_inc(&cache->nr_dirty);
+       policy_set_dirty(cache->policy, cblock);
+}
+
+static void force_clear_dirty(struct cache *cache, dm_cblock_t cblock)
 {
        if (test_and_clear_bit(from_cblock(cblock), cache->dirty_bitset)) {
-               policy_clear_dirty(cache->policy, oblock);
                if (atomic_dec_return(&cache->nr_dirty) == 0)
                        dm_table_event(cache->ti->table);
        }
+
+       policy_clear_dirty(cache->policy, cblock);
 }
 
 /*----------------------------------------------------------------*/
@@ -628,11 +780,6 @@ static dm_dblock_t oblock_to_dblock(struct cache *cache, dm_oblock_t oblock)
                                   oblocks_per_dblock(cache)));
 }
 
-static dm_oblock_t dblock_to_oblock(struct cache *cache, dm_dblock_t dblock)
-{
-       return to_oblock(from_dblock(dblock) * oblocks_per_dblock(cache));
-}
-
 static void set_discard(struct cache *cache, dm_dblock_t b)
 {
        unsigned long flags;
@@ -679,89 +826,12 @@ static bool is_discarded_oblock(struct cache *cache, dm_oblock_t b)
        return r;
 }
 
-/*----------------------------------------------------------------*/
-
-static void load_stats(struct cache *cache)
+/*----------------------------------------------------------------
+ * Remapping
+ *--------------------------------------------------------------*/
+static void remap_to_origin(struct cache *cache, struct bio *bio)
 {
-       struct dm_cache_statistics stats;
-
-       dm_cache_metadata_get_stats(cache->cmd, &stats);
-       atomic_set(&cache->stats.read_hit, stats.read_hits);
-       atomic_set(&cache->stats.read_miss, stats.read_misses);
-       atomic_set(&cache->stats.write_hit, stats.write_hits);
-       atomic_set(&cache->stats.write_miss, stats.write_misses);
-}
-
-static void save_stats(struct cache *cache)
-{
-       struct dm_cache_statistics stats;
-
-       if (get_cache_mode(cache) >= CM_READ_ONLY)
-               return;
-
-       stats.read_hits = atomic_read(&cache->stats.read_hit);
-       stats.read_misses = atomic_read(&cache->stats.read_miss);
-       stats.write_hits = atomic_read(&cache->stats.write_hit);
-       stats.write_misses = atomic_read(&cache->stats.write_miss);
-
-       dm_cache_metadata_set_stats(cache->cmd, &stats);
-}
-
-/*----------------------------------------------------------------
- * Per bio data
- *--------------------------------------------------------------*/
-
-/*
- * If using writeback, leave out struct per_bio_data's writethrough fields.
- */
-#define PB_DATA_SIZE_WB (offsetof(struct per_bio_data, cache))
-#define PB_DATA_SIZE_WT (sizeof(struct per_bio_data))
-
-static bool writethrough_mode(struct cache_features *f)
-{
-       return f->io_mode == CM_IO_WRITETHROUGH;
-}
-
-static bool writeback_mode(struct cache_features *f)
-{
-       return f->io_mode == CM_IO_WRITEBACK;
-}
-
-static bool passthrough_mode(struct cache_features *f)
-{
-       return f->io_mode == CM_IO_PASSTHROUGH;
-}
-
-static size_t get_per_bio_data_size(struct cache *cache)
-{
-       return writethrough_mode(&cache->features) ? PB_DATA_SIZE_WT : PB_DATA_SIZE_WB;
-}
-
-static struct per_bio_data *get_per_bio_data(struct bio *bio, size_t data_size)
-{
-       struct per_bio_data *pb = dm_per_bio_data(bio, data_size);
-       BUG_ON(!pb);
-       return pb;
-}
-
-static struct per_bio_data *init_per_bio_data(struct bio *bio, size_t data_size)
-{
-       struct per_bio_data *pb = get_per_bio_data(bio, data_size);
-
-       pb->tick = false;
-       pb->req_nr = dm_bio_get_target_bio_nr(bio);
-       pb->all_io_entry = NULL;
-       pb->len = 0;
-
-       return pb;
-}
-
-/*----------------------------------------------------------------
- * Remapping
- *--------------------------------------------------------------*/
-static void remap_to_origin(struct cache *cache, struct bio *bio)
-{
-       bio->bi_bdev = cache->origin_dev->bdev;
+       bio->bi_bdev = cache->origin_dev->bdev;
 }
 
 static void remap_to_cache(struct cache *cache, struct bio *bio,
@@ -797,8 +867,9 @@ static void check_if_tick_bio_needed(struct cache *cache, struct bio *bio)
 }
 
 static void remap_to_origin_clear_discard(struct cache *cache, struct bio *bio,
-                                 dm_oblock_t oblock)
+                                         dm_oblock_t oblock)
 {
+       // FIXME: this is called way too much.
        check_if_tick_bio_needed(cache, bio);
        remap_to_origin(cache, bio);
        if (bio_data_dir(bio) == WRITE)
@@ -811,7 +882,7 @@ static void remap_to_cache_dirty(struct cache *cache, struct bio *bio,
        check_if_tick_bio_needed(cache, bio);
        remap_to_cache(cache, bio, cblock);
        if (bio_data_dir(bio) == WRITE) {
-               set_dirty(cache, oblock, cblock);
+               set_dirty(cache, cblock);
                clear_discard(cache, oblock_to_dblock(cache, oblock));
        }
 }
@@ -828,22 +899,6 @@ static dm_oblock_t get_bio_block(struct cache *cache, struct bio *bio)
        return to_oblock(block_nr);
 }
 
-/*
- * You must increment the deferred set whilst the prison cell is held.  To
- * encourage this, we ask for 'cell' to be passed in.
- */
-static void inc_ds(struct cache *cache, struct bio *bio,
-                  struct dm_bio_prison_cell *cell)
-{
-       size_t pb_data_size = get_per_bio_data_size(cache);
-       struct per_bio_data *pb = get_per_bio_data(bio, pb_data_size);
-
-       BUG_ON(!cell);
-       BUG_ON(pb->all_io_entry);
-
-       pb->all_io_entry = dm_deferred_entry_inc(cache->all_io_ds);
-}
-
 static bool accountable_bio(struct cache *cache, struct bio *bio)
 {
        return ((bio->bi_bdev == cache->origin_dev->bdev) &&
@@ -875,29 +930,10 @@ static void accounted_request(struct cache *cache, struct bio *bio)
        generic_make_request(bio);
 }
 
-static void issue(struct cache *cache, struct bio *bio)
-{
-       unsigned long flags;
-
-       if (!op_is_flush(bio->bi_opf)) {
-               accounted_request(cache, bio);
-               return;
-       }
-
-       /*
-        * Batch together any bios that trigger commits and then issue a
-        * single commit for them in do_worker().
-        */
-       spin_lock_irqsave(&cache->lock, flags);
-       cache->commit_requested = true;
-       bio_list_add(&cache->deferred_flush_bios, bio);
-       spin_unlock_irqrestore(&cache->lock, flags);
-}
-
-static void inc_and_issue(struct cache *cache, struct bio *bio, struct dm_bio_prison_cell *cell)
+static void issue_op(struct bio *bio, void *context)
 {
-       inc_ds(cache, bio, cell);
-       issue(cache, bio);
+       struct cache *cache = context;
+       accounted_request(cache, bio);
 }
 
 static void defer_writethrough_bio(struct cache *cache, struct bio *bio)
@@ -908,7 +944,7 @@ static void defer_writethrough_bio(struct cache *cache, struct bio *bio)
        bio_list_add(&cache->deferred_writethrough_bios, bio);
        spin_unlock_irqrestore(&cache->lock, flags);
 
-       wake_worker(cache);
+       wake_deferred_writethrough_worker(cache);
 }
 
 static void writethrough_endio(struct bio *bio)
@@ -934,6 +970,7 @@ static void writethrough_endio(struct bio *bio)
 }
 
 /*
+ * FIXME: send in parallel, huge latency as is.
  * When running in writethrough mode we need to send writes to clean blocks
  * to both the cache and origin devices.  In future we'd like to clone the
  * bio and send them in parallel, but for now we're doing them in
@@ -1046,12 +1083,58 @@ static void metadata_operation_failed(struct cache *cache, const char *op, int r
        set_cache_mode(cache, CM_READ_ONLY);
 }
 
+/*----------------------------------------------------------------*/
+
+static void load_stats(struct cache *cache)
+{
+       struct dm_cache_statistics stats;
+
+       dm_cache_metadata_get_stats(cache->cmd, &stats);
+       atomic_set(&cache->stats.read_hit, stats.read_hits);
+       atomic_set(&cache->stats.read_miss, stats.read_misses);
+       atomic_set(&cache->stats.write_hit, stats.write_hits);
+       atomic_set(&cache->stats.write_miss, stats.write_misses);
+}
+
+static void save_stats(struct cache *cache)
+{
+       struct dm_cache_statistics stats;
+
+       if (get_cache_mode(cache) >= CM_READ_ONLY)
+               return;
+
+       stats.read_hits = atomic_read(&cache->stats.read_hit);
+       stats.read_misses = atomic_read(&cache->stats.read_miss);
+       stats.write_hits = atomic_read(&cache->stats.write_hit);
+       stats.write_misses = atomic_read(&cache->stats.write_miss);
+
+       dm_cache_metadata_set_stats(cache->cmd, &stats);
+}
+
+static void update_stats(struct cache_stats *stats, enum policy_operation op)
+{
+       switch (op) {
+       case POLICY_PROMOTE:
+               atomic_inc(&stats->promotion);
+               break;
+
+       case POLICY_DEMOTE:
+               atomic_inc(&stats->demotion);
+               break;
+
+       case POLICY_WRITEBACK:
+               atomic_inc(&stats->writeback);
+               break;
+       }
+}
+
 /*----------------------------------------------------------------
  * Migration processing
  *
  * Migration covers moving data from the origin device to the cache, or
  * vice versa.
  *--------------------------------------------------------------*/
+
 static void inc_io_migrations(struct cache *cache)
 {
        atomic_inc(&cache->nr_io_migrations);
@@ -1067,213 +1150,109 @@ static bool discard_or_flush(struct bio *bio)
        return bio_op(bio) == REQ_OP_DISCARD || op_is_flush(bio->bi_opf);
 }
 
-static void __cell_defer(struct cache *cache, struct dm_bio_prison_cell *cell)
-{
-       if (discard_or_flush(cell->holder)) {
-               /*
-                * We have to handle these bios individually.
-                */
-               dm_cell_release(cache->prison, cell, &cache->deferred_bios);
-               free_prison_cell(cache, cell);
-       } else
-               list_add_tail(&cell->user_list, &cache->deferred_cells);
-}
-
-static void cell_defer(struct cache *cache, struct dm_bio_prison_cell *cell, bool holder)
+static void calc_discard_block_range(struct cache *cache, struct bio *bio,
+                                    dm_dblock_t *b, dm_dblock_t *e)
 {
-       unsigned long flags;
-
-       if (!holder && dm_cell_promote_or_release(cache->prison, cell)) {
-               /*
-                * There was no prisoner to promote to holder, the
-                * cell has been released.
-                */
-               free_prison_cell(cache, cell);
-               return;
-       }
+       sector_t sb = bio->bi_iter.bi_sector;
+       sector_t se = bio_end_sector(bio);
 
-       spin_lock_irqsave(&cache->lock, flags);
-       __cell_defer(cache, cell);
-       spin_unlock_irqrestore(&cache->lock, flags);
+       *b = to_dblock(dm_sector_div_up(sb, cache->discard_block_size));
 
-       wake_worker(cache);
+       if (se - sb < cache->discard_block_size)
+               *e = *b;
+       else
+               *e = to_dblock(block_div(se, cache->discard_block_size));
 }
 
-static void cell_error_with_code(struct cache *cache, struct dm_bio_prison_cell *cell, int err)
-{
-       dm_cell_error(cache->prison, cell, err);
-       free_prison_cell(cache, cell);
-}
+/*----------------------------------------------------------------*/
 
-static void cell_requeue(struct cache *cache, struct dm_bio_prison_cell *cell)
+static void prevent_background_work(struct cache *cache)
 {
-       cell_error_with_code(cache, cell, DM_ENDIO_REQUEUE);
+       lockdep_off();
+       down_write(&cache->background_work_lock);
+       lockdep_on();
 }
 
-static void free_io_migration(struct dm_cache_migration *mg)
+static void allow_background_work(struct cache *cache)
 {
-       struct cache *cache = mg->cache;
-
-       dec_io_migrations(cache);
-       free_migration(mg);
-       wake_worker(cache);
+       lockdep_off();
+       up_write(&cache->background_work_lock);
+       lockdep_on();
 }
 
-static void migration_failure(struct dm_cache_migration *mg)
+static bool background_work_begin(struct cache *cache)
 {
-       struct cache *cache = mg->cache;
-       const char *dev_name = cache_device_name(cache);
-
-       if (mg->writeback) {
-               DMERR_LIMIT("%s: writeback failed; couldn't copy block", dev_name);
-               set_dirty(cache, mg->old_oblock, mg->cblock);
-               cell_defer(cache, mg->old_ocell, false);
-
-       } else if (mg->demote) {
-               DMERR_LIMIT("%s: demotion failed; couldn't copy block", dev_name);
-               policy_force_mapping(cache->policy, mg->new_oblock, mg->old_oblock);
+       bool r;
 
-               cell_defer(cache, mg->old_ocell, mg->promote ? false : true);
-               if (mg->promote)
-                       cell_defer(cache, mg->new_ocell, true);
-       } else {
-               DMERR_LIMIT("%s: promotion failed; couldn't copy block", dev_name);
-               policy_remove_mapping(cache->policy, mg->new_oblock);
-               cell_defer(cache, mg->new_ocell, true);
-       }
+       lockdep_off();
+       r = down_read_trylock(&cache->background_work_lock);
+       lockdep_on();
 
-       free_io_migration(mg);
+       return r;
 }
 
-static void migration_success_pre_commit(struct dm_cache_migration *mg)
+static void background_work_end(struct cache *cache)
 {
-       int r;
-       unsigned long flags;
-       struct cache *cache = mg->cache;
-
-       if (mg->writeback) {
-               clear_dirty(cache, mg->old_oblock, mg->cblock);
-               cell_defer(cache, mg->old_ocell, false);
-               free_io_migration(mg);
-               return;
+       lockdep_off();
+       up_read(&cache->background_work_lock);
+       lockdep_on();
+}
 
-       } else if (mg->demote) {
-               r = dm_cache_remove_mapping(cache->cmd, mg->cblock);
-               if (r) {
-                       DMERR_LIMIT("%s: demotion failed; couldn't update on disk metadata",
-                                   cache_device_name(cache));
-                       metadata_operation_failed(cache, "dm_cache_remove_mapping", r);
-                       policy_force_mapping(cache->policy, mg->new_oblock,
-                                            mg->old_oblock);
-                       if (mg->promote)
-                               cell_defer(cache, mg->new_ocell, true);
-                       free_io_migration(mg);
-                       return;
-               }
-       } else {
-               r = dm_cache_insert_mapping(cache->cmd, mg->cblock, mg->new_oblock);
-               if (r) {
-                       DMERR_LIMIT("%s: promotion failed; couldn't update on disk metadata",
-                                   cache_device_name(cache));
-                       metadata_operation_failed(cache, "dm_cache_insert_mapping", r);
-                       policy_remove_mapping(cache->policy, mg->new_oblock);
-                       free_io_migration(mg);
-                       return;
-               }
-       }
+/*----------------------------------------------------------------*/
 
-       spin_lock_irqsave(&cache->lock, flags);
-       list_add_tail(&mg->list, &cache->need_commit_migrations);
-       cache->commit_requested = true;
-       spin_unlock_irqrestore(&cache->lock, flags);
+static void quiesce(struct dm_cache_migration *mg,
+                   void (*continuation)(struct work_struct *))
+{
+       init_continuation(&mg->k, continuation);
+       dm_cell_quiesce_v2(mg->cache->prison, mg->cell, &mg->k.ws);
 }
 
-static void migration_success_post_commit(struct dm_cache_migration *mg)
+static struct dm_cache_migration *ws_to_mg(struct work_struct *ws)
 {
-       unsigned long flags;
-       struct cache *cache = mg->cache;
-
-       if (mg->writeback) {
-               DMWARN_LIMIT("%s: writeback unexpectedly triggered commit",
-                            cache_device_name(cache));
-               return;
-
-       } else if (mg->demote) {
-               cell_defer(cache, mg->old_ocell, mg->promote ? false : true);
-
-               if (mg->promote) {
-                       mg->demote = false;
-
-                       spin_lock_irqsave(&cache->lock, flags);
-                       list_add_tail(&mg->list, &cache->quiesced_migrations);
-                       spin_unlock_irqrestore(&cache->lock, flags);
-
-               } else {
-                       if (mg->invalidate)
-                               policy_remove_mapping(cache->policy, mg->old_oblock);
-                       free_io_migration(mg);
-               }
-
-       } else {
-               if (mg->requeue_holder) {
-                       clear_dirty(cache, mg->new_oblock, mg->cblock);
-                       cell_defer(cache, mg->new_ocell, true);
-               } else {
-                       /*
-                        * The block was promoted via an overwrite, so it's dirty.
-                        */
-                       set_dirty(cache, mg->new_oblock, mg->cblock);
-                       bio_endio(mg->new_ocell->holder);
-                       cell_defer(cache, mg->new_ocell, false);
-               }
-               free_io_migration(mg);
-       }
+       struct continuation *k = container_of(ws, struct continuation, ws);
+       return container_of(k, struct dm_cache_migration, k);
 }
 
 static void copy_complete(int read_err, unsigned long write_err, void *context)
 {
-       unsigned long flags;
-       struct dm_cache_migration *mg = (struct dm_cache_migration *) context;
-       struct cache *cache = mg->cache;
+       struct dm_cache_migration *mg = container_of(context, struct dm_cache_migration, k);
 
        if (read_err || write_err)
-               mg->err = true;
-
-       spin_lock_irqsave(&cache->lock, flags);
-       list_add_tail(&mg->list, &cache->completed_migrations);
-       spin_unlock_irqrestore(&cache->lock, flags);
+               mg->k.input = -EIO;
 
-       wake_worker(cache);
+       queue_continuation(mg->cache->wq, &mg->k);
 }
 
-static void issue_copy(struct dm_cache_migration *mg)
+static int copy(struct dm_cache_migration *mg, bool promote)
 {
        int r;
        struct dm_io_region o_region, c_region;
        struct cache *cache = mg->cache;
-       sector_t cblock = from_cblock(mg->cblock);
 
        o_region.bdev = cache->origin_dev->bdev;
+       o_region.sector = from_oblock(mg->op->oblock) * cache->sectors_per_block;
        o_region.count = cache->sectors_per_block;
 
        c_region.bdev = cache->cache_dev->bdev;
-       c_region.sector = cblock * cache->sectors_per_block;
+       c_region.sector = from_cblock(mg->op->cblock) * cache->sectors_per_block;
        c_region.count = cache->sectors_per_block;
 
-       if (mg->writeback || mg->demote) {
-               /* demote */
-               o_region.sector = from_oblock(mg->old_oblock) * cache->sectors_per_block;
-               r = dm_kcopyd_copy(cache->copier, &c_region, 1, &o_region, 0, copy_complete, mg);
-       } else {
-               /* promote */
-               o_region.sector = from_oblock(mg->new_oblock) * cache->sectors_per_block;
-               r = dm_kcopyd_copy(cache->copier, &o_region, 1, &c_region, 0, copy_complete, mg);
-       }
+       if (promote)
+               r = dm_kcopyd_copy(cache->copier, &o_region, 1, &c_region, 0, copy_complete, &mg->k);
+       else
+               r = dm_kcopyd_copy(cache->copier, &c_region, 1, &o_region, 0, copy_complete, &mg->k);
 
-       if (r < 0) {
-               DMERR_LIMIT("%s: issuing migration failed", cache_device_name(cache));
-               migration_failure(mg);
-       }
+       return r;
+}
+
+static void bio_drop_shared_lock(struct cache *cache, struct bio *bio)
+{
+       size_t pb_data_size = get_per_bio_data_size(cache);
+       struct per_bio_data *pb = get_per_bio_data(bio, pb_data_size);
+
+       if (pb->cell && dm_cell_put_v2(cache->prison, pb->cell))
+               free_prison_cell(cache, pb->cell);
+       pb->cell = NULL;
 }
 
 static void overwrite_endio(struct bio *bio)
@@ -1282,930 +1261,752 @@ static void overwrite_endio(struct bio *bio)
        struct cache *cache = mg->cache;
        size_t pb_data_size = get_per_bio_data_size(cache);
        struct per_bio_data *pb = get_per_bio_data(bio, pb_data_size);
-       unsigned long flags;
 
        dm_unhook_bio(&pb->hook_info, bio);
 
        if (bio->bi_error)
-               mg->err = true;
-
-       mg->requeue_holder = false;
+               mg->k.input = bio->bi_error;
 
-       spin_lock_irqsave(&cache->lock, flags);
-       list_add_tail(&mg->list, &cache->completed_migrations);
-       spin_unlock_irqrestore(&cache->lock, flags);
-
-       wake_worker(cache);
+       queue_continuation(mg->cache->wq, &mg->k);
 }
 
-static void issue_overwrite(struct dm_cache_migration *mg, struct bio *bio)
+static void overwrite(struct dm_cache_migration *mg,
+                     void (*continuation)(struct work_struct *))
 {
+       struct bio *bio = mg->overwrite_bio;
        size_t pb_data_size = get_per_bio_data_size(mg->cache);
        struct per_bio_data *pb = get_per_bio_data(bio, pb_data_size);
 
        dm_hook_bio(&pb->hook_info, bio, overwrite_endio, mg);
-       remap_to_cache_dirty(mg->cache, bio, mg->new_oblock, mg->cblock);
 
        /*
-        * No need to inc_ds() here, since the cell will be held for the
-        * duration of the io.
+        * The overwrite bio is part of the copy operation, as such it does
+        * not set/clear discard or dirty flags.
         */
+       if (mg->op->op == POLICY_PROMOTE)
+               remap_to_cache(mg->cache, bio, mg->op->cblock);
+       else
+               remap_to_origin(mg->cache, bio);
+
+       init_continuation(&mg->k, continuation);
        accounted_request(mg->cache, bio);
 }
 
-static bool bio_writes_complete_block(struct cache *cache, struct bio *bio)
+/*
+ * Migration steps:
+ *
+ * 1) exclusive lock preventing WRITEs
+ * 2) quiesce
+ * 3) copy or issue overwrite bio
+ * 4) upgrade to exclusive lock preventing READs and WRITEs
+ * 5) quiesce
+ * 6) update metadata and commit
+ * 7) unlock
+ */
+static void mg_complete(struct dm_cache_migration *mg, bool success)
 {
-       return (bio_data_dir(bio) == WRITE) &&
-               (bio->bi_iter.bi_size == (cache->sectors_per_block << SECTOR_SHIFT));
+       struct bio_list bios;
+       struct cache *cache = mg->cache;
+       struct policy_work *op = mg->op;
+       dm_cblock_t cblock = op->cblock;
+
+       if (success)
+               update_stats(&cache->stats, op->op);
+
+       switch (op->op) {
+       case POLICY_PROMOTE:
+               clear_discard(cache, oblock_to_dblock(cache, op->oblock));
+               policy_complete_background_work(cache->policy, op, success);
+
+               if (mg->overwrite_bio) {
+                       if (success)
+                               force_set_dirty(cache, cblock);
+                       else
+                               mg->overwrite_bio->bi_error = (mg->k.input ? : -EIO);
+                       bio_endio(mg->overwrite_bio);
+               } else {
+                       if (success)
+                               force_clear_dirty(cache, cblock);
+                       dec_io_migrations(cache);
+               }
+               break;
+
+       case POLICY_DEMOTE:
+               /*
+                * We clear dirty here to update the nr_dirty counter.
+                */
+               if (success)
+                       force_clear_dirty(cache, cblock);
+               policy_complete_background_work(cache->policy, op, success);
+               dec_io_migrations(cache);
+               break;
+
+       case POLICY_WRITEBACK:
+               if (success)
+                       force_clear_dirty(cache, cblock);
+               policy_complete_background_work(cache->policy, op, success);
+               dec_io_migrations(cache);
+               break;
+       }
+
+       bio_list_init(&bios);
+       if (mg->cell) {
+               if (dm_cell_unlock_v2(cache->prison, mg->cell, &bios))
+                       free_prison_cell(cache, mg->cell);
+       }
+
+       free_migration(mg);
+       defer_bios(cache, &bios);
+       wake_migration_worker(cache);
+
+       background_work_end(cache);
 }
 
-static void avoid_copy(struct dm_cache_migration *mg)
+static void mg_success(struct work_struct *ws)
 {
-       atomic_inc(&mg->cache->stats.copies_avoided);
-       migration_success_pre_commit(mg);
+       struct dm_cache_migration *mg = ws_to_mg(ws);
+       mg_complete(mg, mg->k.input == 0);
 }
 
-static void calc_discard_block_range(struct cache *cache, struct bio *bio,
-                                    dm_dblock_t *b, dm_dblock_t *e)
+static void mg_update_metadata(struct work_struct *ws)
 {
-       sector_t sb = bio->bi_iter.bi_sector;
-       sector_t se = bio_end_sector(bio);
-
-       *b = to_dblock(dm_sector_div_up(sb, cache->discard_block_size));
-
-       if (se - sb < cache->discard_block_size)
-               *e = *b;
-       else
-               *e = to_dblock(block_div(se, cache->discard_block_size));
-}
-
-static void issue_discard(struct dm_cache_migration *mg)
-{
-       dm_dblock_t b, e;
-       struct bio *bio = mg->new_ocell->holder;
-       struct cache *cache = mg->cache;
-
-       calc_discard_block_range(cache, bio, &b, &e);
-       while (b != e) {
-               set_discard(cache, b);
-               b = to_dblock(from_dblock(b) + 1);
-       }
-
-       bio_endio(bio);
-       cell_defer(cache, mg->new_ocell, false);
-       free_migration(mg);
-       wake_worker(cache);
-}
-
-static void issue_copy_or_discard(struct dm_cache_migration *mg)
-{
-       bool avoid;
+       int r;
+       struct dm_cache_migration *mg = ws_to_mg(ws);
        struct cache *cache = mg->cache;
+       struct policy_work *op = mg->op;
 
-       if (mg->discard) {
-               issue_discard(mg);
-               return;
-       }
+       switch (op->op) {
+       case POLICY_PROMOTE:
+               r = dm_cache_insert_mapping(cache->cmd, op->cblock, op->oblock);
+               if (r) {
+                       DMERR_LIMIT("%s: migration failed; couldn't insert mapping",
+                                   cache_device_name(cache));
+                       metadata_operation_failed(cache, "dm_cache_insert_mapping", r);
 
-       if (mg->writeback || mg->demote)
-               avoid = !is_dirty(cache, mg->cblock) ||
-                       is_discarded_oblock(cache, mg->old_oblock);
-       else {
-               struct bio *bio = mg->new_ocell->holder;
+                       mg_complete(mg, false);
+                       return;
+               }
+               mg_complete(mg, true);
+               break;
 
-               avoid = is_discarded_oblock(cache, mg->new_oblock);
+       case POLICY_DEMOTE:
+               r = dm_cache_remove_mapping(cache->cmd, op->cblock);
+               if (r) {
+                       DMERR_LIMIT("%s: migration failed; couldn't update on disk metadata",
+                                   cache_device_name(cache));
+                       metadata_operation_failed(cache, "dm_cache_remove_mapping", r);
 
-               if (writeback_mode(&cache->features) &&
-                   !avoid && bio_writes_complete_block(cache, bio)) {
-                       issue_overwrite(mg, bio);
+                       mg_complete(mg, false);
                        return;
                }
-       }
 
-       avoid ? avoid_copy(mg) : issue_copy(mg);
+               /*
+                * It would be nice if we only had to commit when a REQ_FLUSH
+                * comes through.  But there's one scenario that we have to
+                * look out for:
+                *
+                * - vblock x in a cache block
+                * - domotion occurs
+                * - cache block gets reallocated and over written
+                * - crash
+                *
+                * When we recover, because there was no commit the cache will
+                * rollback to having the data for vblock x in the cache block.
+                * But the cache block has since been overwritten, so it'll end
+                * up pointing to data that was never in 'x' during the history
+                * of the device.
+                *
+                * To avoid this issue we require a commit as part of the
+                * demotion operation.
+                */
+               init_continuation(&mg->k, mg_success);
+               continue_after_commit(&cache->committer, &mg->k);
+               schedule_commit(&cache->committer);
+               break;
+
+       case POLICY_WRITEBACK:
+               mg_complete(mg, true);
+               break;
+       }
 }
 
-static void complete_migration(struct dm_cache_migration *mg)
+static void mg_update_metadata_after_copy(struct work_struct *ws)
 {
-       if (mg->err)
-               migration_failure(mg);
+       struct dm_cache_migration *mg = ws_to_mg(ws);
+
+       /*
+        * Did the copy succeed?
+        */
+       if (mg->k.input)
+               mg_complete(mg, false);
        else
-               migration_success_pre_commit(mg);
+               mg_update_metadata(ws);
 }
 
-static void process_migrations(struct cache *cache, struct list_head *head,
-                              void (*fn)(struct dm_cache_migration *))
+static void mg_upgrade_lock(struct work_struct *ws)
 {
-       unsigned long flags;
-       struct list_head list;
-       struct dm_cache_migration *mg, *tmp;
+       int r;
+       struct dm_cache_migration *mg = ws_to_mg(ws);
 
-       INIT_LIST_HEAD(&list);
-       spin_lock_irqsave(&cache->lock, flags);
-       list_splice_init(head, &list);
-       spin_unlock_irqrestore(&cache->lock, flags);
+       /*
+        * Did the copy succeed?
+        */
+       if (mg->k.input)
+               mg_complete(mg, false);
 
-       list_for_each_entry_safe(mg, tmp, &list, list)
-               fn(mg);
-}
+       else {
+               /*
+                * Now we want the lock to prevent both reads and writes.
+                */
+               r = dm_cell_lock_promote_v2(mg->cache->prison, mg->cell,
+                                           READ_WRITE_LOCK_LEVEL);
+               if (r < 0)
+                       mg_complete(mg, false);
 
-static void __queue_quiesced_migration(struct dm_cache_migration *mg)
-{
-       list_add_tail(&mg->list, &mg->cache->quiesced_migrations);
+               else if (r)
+                       quiesce(mg, mg_update_metadata);
+
+               else
+                       mg_update_metadata(ws);
+       }
 }
 
-static void queue_quiesced_migration(struct dm_cache_migration *mg)
+static void mg_copy(struct work_struct *ws)
 {
-       unsigned long flags;
-       struct cache *cache = mg->cache;
+       int r;
+       struct dm_cache_migration *mg = ws_to_mg(ws);
 
-       spin_lock_irqsave(&cache->lock, flags);
-       __queue_quiesced_migration(mg);
-       spin_unlock_irqrestore(&cache->lock, flags);
+       if (mg->overwrite_bio) {
+               /*
+                * It's safe to do this here, even though it's new data
+                * because all IO has been locked out of the block.
+                *
+                * mg_lock_writes() already took READ_WRITE_LOCK_LEVEL
+                * so _not_ using mg_upgrade_lock() as continutation.
+                */
+               overwrite(mg, mg_update_metadata_after_copy);
 
-       wake_worker(cache);
-}
+       } else {
+               struct cache *cache = mg->cache;
+               struct policy_work *op = mg->op;
+               bool is_policy_promote = (op->op == POLICY_PROMOTE);
 
-static void queue_quiesced_migrations(struct cache *cache, struct list_head *work)
-{
-       unsigned long flags;
-       struct dm_cache_migration *mg, *tmp;
+               if ((!is_policy_promote && !is_dirty(cache, op->cblock)) ||
+                   is_discarded_oblock(cache, op->oblock)) {
+                       mg_upgrade_lock(ws);
+                       return;
+               }
 
-       spin_lock_irqsave(&cache->lock, flags);
-       list_for_each_entry_safe(mg, tmp, work, list)
-               __queue_quiesced_migration(mg);
-       spin_unlock_irqrestore(&cache->lock, flags);
+               init_continuation(&mg->k, mg_upgrade_lock);
 
-       wake_worker(cache);
+               r = copy(mg, is_policy_promote);
+               if (r) {
+                       DMERR_LIMIT("%s: migration copy failed", cache_device_name(cache));
+                       mg->k.input = -EIO;
+                       mg_complete(mg, false);
+               }
+       }
 }
 
-static void check_for_quiesced_migrations(struct cache *cache,
-                                         struct per_bio_data *pb)
+static int mg_lock_writes(struct dm_cache_migration *mg)
 {
-       struct list_head work;
-
-       if (!pb->all_io_entry)
-               return;
-
-       INIT_LIST_HEAD(&work);
-       dm_deferred_entry_dec(pb->all_io_entry, &work);
+       int r;
+       struct dm_cell_key_v2 key;
+       struct cache *cache = mg->cache;
+       struct dm_bio_prison_cell_v2 *prealloc;
 
-       if (!list_empty(&work))
-               queue_quiesced_migrations(cache, &work);
-}
+       prealloc = alloc_prison_cell(cache);
+       if (!prealloc) {
+               DMERR_LIMIT("%s: alloc_prison_cell failed", cache_device_name(cache));
+               mg_complete(mg, false);
+               return -ENOMEM;
+       }
 
-static void quiesce_migration(struct dm_cache_migration *mg)
-{
-       if (!dm_deferred_set_add_work(mg->cache->all_io_ds, &mg->list))
-               queue_quiesced_migration(mg);
-}
+       /*
+        * Prevent writes to the block, but allow reads to continue.
+        * Unless we're using an overwrite bio, in which case we lock
+        * everything.
+        */
+       build_key(mg->op->oblock, oblock_succ(mg->op->oblock), &key);
+       r = dm_cell_lock_v2(cache->prison, &key,
+                           mg->overwrite_bio ?  READ_WRITE_LOCK_LEVEL : WRITE_LOCK_LEVEL,
+                           prealloc, &mg->cell);
+       if (r < 0) {
+               free_prison_cell(cache, prealloc);
+               mg_complete(mg, false);
+               return r;
+       }
 
-static void promote(struct cache *cache, struct prealloc *structs,
-                   dm_oblock_t oblock, dm_cblock_t cblock,
-                   struct dm_bio_prison_cell *cell)
-{
-       struct dm_cache_migration *mg = prealloc_get_migration(structs);
+       if (mg->cell != prealloc)
+               free_prison_cell(cache, prealloc);
 
-       mg->err = false;
-       mg->discard = false;
-       mg->writeback = false;
-       mg->demote = false;
-       mg->promote = true;
-       mg->requeue_holder = true;
-       mg->invalidate = false;
-       mg->cache = cache;
-       mg->new_oblock = oblock;
-       mg->cblock = cblock;
-       mg->old_ocell = NULL;
-       mg->new_ocell = cell;
-       mg->start_jiffies = jiffies;
+       if (r == 0)
+               mg_copy(&mg->k.ws);
+       else
+               quiesce(mg, mg_copy);
 
-       inc_io_migrations(cache);
-       quiesce_migration(mg);
+       return 0;
 }
 
-static void writeback(struct cache *cache, struct prealloc *structs,
-                     dm_oblock_t oblock, dm_cblock_t cblock,
-                     struct dm_bio_prison_cell *cell)
+static int mg_start(struct cache *cache, struct policy_work *op, struct bio *bio)
 {
-       struct dm_cache_migration *mg = prealloc_get_migration(structs);
-
-       mg->err = false;
-       mg->discard = false;
-       mg->writeback = true;
-       mg->demote = false;
-       mg->promote = false;
-       mg->requeue_holder = true;
-       mg->invalidate = false;
-       mg->cache = cache;
-       mg->old_oblock = oblock;
-       mg->cblock = cblock;
-       mg->old_ocell = cell;
-       mg->new_ocell = NULL;
-       mg->start_jiffies = jiffies;
-
-       inc_io_migrations(cache);
-       quiesce_migration(mg);
-}
-
-static void demote_then_promote(struct cache *cache, struct prealloc *structs,
-                               dm_oblock_t old_oblock, dm_oblock_t new_oblock,
-                               dm_cblock_t cblock,
-                               struct dm_bio_prison_cell *old_ocell,
-                               struct dm_bio_prison_cell *new_ocell)
-{
-       struct dm_cache_migration *mg = prealloc_get_migration(structs);
-
-       mg->err = false;
-       mg->discard = false;
-       mg->writeback = false;
-       mg->demote = true;
-       mg->promote = true;
-       mg->requeue_holder = true;
-       mg->invalidate = false;
-       mg->cache = cache;
-       mg->old_oblock = old_oblock;
-       mg->new_oblock = new_oblock;
-       mg->cblock = cblock;
-       mg->old_ocell = old_ocell;
-       mg->new_ocell = new_ocell;
-       mg->start_jiffies = jiffies;
-
-       inc_io_migrations(cache);
-       quiesce_migration(mg);
-}
+       struct dm_cache_migration *mg;
 
-/*
- * Invalidate a cache entry.  No writeback occurs; any changes in the cache
- * block are thrown away.
- */
-static void invalidate(struct cache *cache, struct prealloc *structs,
-                      dm_oblock_t oblock, dm_cblock_t cblock,
-                      struct dm_bio_prison_cell *cell)
-{
-       struct dm_cache_migration *mg = prealloc_get_migration(structs);
-
-       mg->err = false;
-       mg->discard = false;
-       mg->writeback = false;
-       mg->demote = true;
-       mg->promote = false;
-       mg->requeue_holder = true;
-       mg->invalidate = true;
-       mg->cache = cache;
-       mg->old_oblock = oblock;
-       mg->cblock = cblock;
-       mg->old_ocell = cell;
-       mg->new_ocell = NULL;
-       mg->start_jiffies = jiffies;
+       if (!background_work_begin(cache)) {
+               policy_complete_background_work(cache->policy, op, false);
+               return -EPERM;
+       }
 
-       inc_io_migrations(cache);
-       quiesce_migration(mg);
-}
+       mg = alloc_migration(cache);
+       if (!mg) {
+               policy_complete_background_work(cache->policy, op, false);
+               background_work_end(cache);
+               return -ENOMEM;
+       }
 
-static void discard(struct cache *cache, struct prealloc *structs,
-                   struct dm_bio_prison_cell *cell)
-{
-       struct dm_cache_migration *mg = prealloc_get_migration(structs);
+       memset(mg, 0, sizeof(*mg));
 
-       mg->err = false;
-       mg->discard = true;
-       mg->writeback = false;
-       mg->demote = false;
-       mg->promote = false;
-       mg->requeue_holder = false;
-       mg->invalidate = false;
        mg->cache = cache;
-       mg->old_ocell = NULL;
-       mg->new_ocell = cell;
-       mg->start_jiffies = jiffies;
+       mg->op = op;
+       mg->overwrite_bio = bio;
+
+       if (!bio)
+               inc_io_migrations(cache);
 
-       quiesce_migration(mg);
+       return mg_lock_writes(mg);
 }
 
 /*----------------------------------------------------------------
- * bio processing
+ * invalidation processing
  *--------------------------------------------------------------*/
-static void defer_bio(struct cache *cache, struct bio *bio)
-{
-       unsigned long flags;
-
-       spin_lock_irqsave(&cache->lock, flags);
-       bio_list_add(&cache->deferred_bios, bio);
-       spin_unlock_irqrestore(&cache->lock, flags);
-
-       wake_worker(cache);
-}
-
-static void process_flush_bio(struct cache *cache, struct bio *bio)
-{
-       size_t pb_data_size = get_per_bio_data_size(cache);
-       struct per_bio_data *pb = get_per_bio_data(bio, pb_data_size);
-
-       BUG_ON(bio->bi_iter.bi_size);
-       if (!pb->req_nr)
-               remap_to_origin(cache, bio);
-       else
-               remap_to_cache(cache, bio, 0);
 
-       /*
-        * REQ_PREFLUSH is not directed at any particular block so we don't
-        * need to inc_ds().  REQ_FUA's are split into a write + REQ_PREFLUSH
-        * by dm-core.
-        */
-       issue(cache, bio);
-}
-
-static void process_discard_bio(struct cache *cache, struct prealloc *structs,
-                               struct bio *bio)
+static void invalidate_complete(struct dm_cache_migration *mg, bool success)
 {
-       int r;
-       dm_dblock_t b, e;
-       struct dm_bio_prison_cell *cell_prealloc, *new_ocell;
-
-       calc_discard_block_range(cache, bio, &b, &e);
-       if (b == e) {
-               bio_endio(bio);
-               return;
-       }
+       struct bio_list bios;
+       struct cache *cache = mg->cache;
 
-       cell_prealloc = prealloc_get_cell(structs);
-       r = bio_detain_range(cache, dblock_to_oblock(cache, b), dblock_to_oblock(cache, e), bio, cell_prealloc,
-                            (cell_free_fn) prealloc_put_cell,
-                            structs, &new_ocell);
-       if (r > 0)
-               return;
+       bio_list_init(&bios);
+       if (dm_cell_unlock_v2(cache->prison, mg->cell, &bios))
+               free_prison_cell(cache, mg->cell);
 
-       discard(cache, structs, new_ocell);
-}
+       if (!success && mg->overwrite_bio)
+               bio_io_error(mg->overwrite_bio);
 
-static bool spare_migration_bandwidth(struct cache *cache)
-{
-       sector_t current_volume = (atomic_read(&cache->nr_io_migrations) + 1) *
-               cache->sectors_per_block;
-       return current_volume < cache->migration_threshold;
-}
+       free_migration(mg);
+       defer_bios(cache, &bios);
 
-static void inc_hit_counter(struct cache *cache, struct bio *bio)
-{
-       atomic_inc(bio_data_dir(bio) == READ ?
-                  &cache->stats.read_hit : &cache->stats.write_hit);
+       background_work_end(cache);
 }
 
-static void inc_miss_counter(struct cache *cache, struct bio *bio)
+static void invalidate_completed(struct work_struct *ws)
 {
-       atomic_inc(bio_data_dir(bio) == READ ?
-                  &cache->stats.read_miss : &cache->stats.write_miss);
+       struct dm_cache_migration *mg = ws_to_mg(ws);
+       invalidate_complete(mg, !mg->k.input);
 }
 
-/*----------------------------------------------------------------*/
-
-struct inc_detail {
-       struct cache *cache;
-       struct bio_list bios_for_issue;
-       struct bio_list unhandled_bios;
-       bool any_writes;
-};
-
-static void inc_fn(void *context, struct dm_bio_prison_cell *cell)
+static int invalidate_cblock(struct cache *cache, dm_cblock_t cblock)
 {
-       struct bio *bio;
-       struct inc_detail *detail = context;
-       struct cache *cache = detail->cache;
-
-       inc_ds(cache, cell->holder, cell);
-       if (bio_data_dir(cell->holder) == WRITE)
-               detail->any_writes = true;
-
-       while ((bio = bio_list_pop(&cell->bios))) {
-               if (discard_or_flush(bio)) {
-                       bio_list_add(&detail->unhandled_bios, bio);
-                       continue;
+       int r = policy_invalidate_mapping(cache->policy, cblock);
+       if (!r) {
+               r = dm_cache_remove_mapping(cache->cmd, cblock);
+               if (r) {
+                       DMERR_LIMIT("%s: invalidation failed; couldn't update on disk metadata",
+                                   cache_device_name(cache));
+                       metadata_operation_failed(cache, "dm_cache_remove_mapping", r);
                }
 
-               if (bio_data_dir(bio) == WRITE)
-                       detail->any_writes = true;
+       } else if (r == -ENODATA) {
+               /*
+                * Harmless, already unmapped.
+                */
+               r = 0;
 
-               bio_list_add(&detail->bios_for_issue, bio);
-               inc_ds(cache, bio, cell);
-       }
+       } else
+               DMERR("%s: policy_invalidate_mapping failed", cache_device_name(cache));
+
+       return r;
 }
 
-// FIXME: refactor these two
-static void remap_cell_to_origin_clear_discard(struct cache *cache,
-                                              struct dm_bio_prison_cell *cell,
-                                              dm_oblock_t oblock, bool issue_holder)
+static void invalidate_remove(struct work_struct *ws)
 {
-       struct bio *bio;
-       unsigned long flags;
-       struct inc_detail detail;
-
-       detail.cache = cache;
-       bio_list_init(&detail.bios_for_issue);
-       bio_list_init(&detail.unhandled_bios);
-       detail.any_writes = false;
-
-       spin_lock_irqsave(&cache->lock, flags);
-       dm_cell_visit_release(cache->prison, inc_fn, &detail, cell);
-       bio_list_merge(&cache->deferred_bios, &detail.unhandled_bios);
-       spin_unlock_irqrestore(&cache->lock, flags);
-
-       remap_to_origin(cache, cell->holder);
-       if (issue_holder)
-               issue(cache, cell->holder);
-       else
-               accounted_begin(cache, cell->holder);
-
-       if (detail.any_writes)
-               clear_discard(cache, oblock_to_dblock(cache, oblock));
+       int r;
+       struct dm_cache_migration *mg = ws_to_mg(ws);
+       struct cache *cache = mg->cache;
 
-       while ((bio = bio_list_pop(&detail.bios_for_issue))) {
-               remap_to_origin(cache, bio);
-               issue(cache, bio);
+       r = invalidate_cblock(cache, mg->invalidate_cblock);
+       if (r) {
+               invalidate_complete(mg, false);
+               return;
        }
 
-       free_prison_cell(cache, cell);
+       init_continuation(&mg->k, invalidate_completed);
+       continue_after_commit(&cache->committer, &mg->k);
+       remap_to_origin_clear_discard(cache, mg->overwrite_bio, mg->invalidate_oblock);
+       mg->overwrite_bio = NULL;
+       schedule_commit(&cache->committer);
 }
 
-static void remap_cell_to_cache_dirty(struct cache *cache, struct dm_bio_prison_cell *cell,
-                                     dm_oblock_t oblock, dm_cblock_t cblock, bool issue_holder)
+static int invalidate_lock(struct dm_cache_migration *mg)
 {
-       struct bio *bio;
-       unsigned long flags;
-       struct inc_detail detail;
-
-       detail.cache = cache;
-       bio_list_init(&detail.bios_for_issue);
-       bio_list_init(&detail.unhandled_bios);
-       detail.any_writes = false;
-
-       spin_lock_irqsave(&cache->lock, flags);
-       dm_cell_visit_release(cache->prison, inc_fn, &detail, cell);
-       bio_list_merge(&cache->deferred_bios, &detail.unhandled_bios);
-       spin_unlock_irqrestore(&cache->lock, flags);
-
-       remap_to_cache(cache, cell->holder, cblock);
-       if (issue_holder)
-               issue(cache, cell->holder);
-       else
-               accounted_begin(cache, cell->holder);
+       int r;
+       struct dm_cell_key_v2 key;
+       struct cache *cache = mg->cache;
+       struct dm_bio_prison_cell_v2 *prealloc;
 
-       if (detail.any_writes) {
-               set_dirty(cache, oblock, cblock);
-               clear_discard(cache, oblock_to_dblock(cache, oblock));
+       prealloc = alloc_prison_cell(cache);
+       if (!prealloc) {
+               invalidate_complete(mg, false);
+               return -ENOMEM;
        }
 
-       while ((bio = bio_list_pop(&detail.bios_for_issue))) {
-               remap_to_cache(cache, bio, cblock);
-               issue(cache, bio);
+       build_key(mg->invalidate_oblock, oblock_succ(mg->invalidate_oblock), &key);
+       r = dm_cell_lock_v2(cache->prison, &key,
+                           READ_WRITE_LOCK_LEVEL, prealloc, &mg->cell);
+       if (r < 0) {
+               free_prison_cell(cache, prealloc);
+               invalidate_complete(mg, false);
+               return r;
        }
 
-       free_prison_cell(cache, cell);
-}
+       if (mg->cell != prealloc)
+               free_prison_cell(cache, prealloc);
 
-/*----------------------------------------------------------------*/
+       if (r)
+               quiesce(mg, invalidate_remove);
 
-struct old_oblock_lock {
-       struct policy_locker locker;
-       struct cache *cache;
-       struct prealloc *structs;
-       struct dm_bio_prison_cell *cell;
-};
+       else {
+               /*
+                * We can't call invalidate_remove() directly here because we
+                * might still be in request context.
+                */
+               init_continuation(&mg->k, invalidate_remove);
+               queue_work(cache->wq, &mg->k.ws);
+       }
 
-static int null_locker(struct policy_locker *locker, dm_oblock_t b)
-{
-       /* This should never be called */
-       BUG();
        return 0;
 }
 
-static int cell_locker(struct policy_locker *locker, dm_oblock_t b)
+static int invalidate_start(struct cache *cache, dm_cblock_t cblock,
+                           dm_oblock_t oblock, struct bio *bio)
 {
-       struct old_oblock_lock *l = container_of(locker, struct old_oblock_lock, locker);
-       struct dm_bio_prison_cell *cell_prealloc = prealloc_get_cell(l->structs);
-
-       return bio_detain(l->cache, b, NULL, cell_prealloc,
-                         (cell_free_fn) prealloc_put_cell,
-                         l->structs, &l->cell);
-}
-
-static void process_cell(struct cache *cache, struct prealloc *structs,
-                        struct dm_bio_prison_cell *new_ocell)
-{
-       int r;
-       bool release_cell = true;
-       struct bio *bio = new_ocell->holder;
-       dm_oblock_t block = get_bio_block(cache, bio);
-       struct policy_result lookup_result;
-       bool passthrough = passthrough_mode(&cache->features);
-       bool fast_promotion, can_migrate;
-       struct old_oblock_lock ool;
-
-       fast_promotion = is_discarded_oblock(cache, block) || bio_writes_complete_block(cache, bio);
-       can_migrate = !passthrough && (fast_promotion || spare_migration_bandwidth(cache));
-
-       ool.locker.fn = cell_locker;
-       ool.cache = cache;
-       ool.structs = structs;
-       ool.cell = NULL;
-       r = policy_map(cache->policy, block, true, can_migrate, fast_promotion,
-                      bio, &ool.locker, &lookup_result);
-
-       if (r == -EWOULDBLOCK)
-               /* migration has been denied */
-               lookup_result.op = POLICY_MISS;
-
-       switch (lookup_result.op) {
-       case POLICY_HIT:
-               if (passthrough) {
-                       inc_miss_counter(cache, bio);
-
-                       /*
-                        * Passthrough always maps to the origin,
-                        * invalidating any cache blocks that are written
-                        * to.
-                        */
-
-                       if (bio_data_dir(bio) == WRITE) {
-                               atomic_inc(&cache->stats.demotion);
-                               invalidate(cache, structs, block, lookup_result.cblock, new_ocell);
-                               release_cell = false;
+       struct dm_cache_migration *mg;
 
-                       } else {
-                               /* FIXME: factor out issue_origin() */
-                               remap_to_origin_clear_discard(cache, bio, block);
-                               inc_and_issue(cache, bio, new_ocell);
-                       }
-               } else {
-                       inc_hit_counter(cache, bio);
-
-                       if (bio_data_dir(bio) == WRITE &&
-                           writethrough_mode(&cache->features) &&
-                           !is_dirty(cache, lookup_result.cblock)) {
-                               remap_to_origin_then_cache(cache, bio, block, lookup_result.cblock);
-                               inc_and_issue(cache, bio, new_ocell);
-
-                       } else {
-                               remap_cell_to_cache_dirty(cache, new_ocell, block, lookup_result.cblock, true);
-                               release_cell = false;
-                       }
-               }
+       if (!background_work_begin(cache))
+               return -EPERM;
 
-               break;
+       mg = alloc_migration(cache);
+       if (!mg) {
+               background_work_end(cache);
+               return -ENOMEM;
+       }
 
-       case POLICY_MISS:
-               inc_miss_counter(cache, bio);
-               remap_cell_to_origin_clear_discard(cache, new_ocell, block, true);
-               release_cell = false;
-               break;
+       memset(mg, 0, sizeof(*mg));
 
-       case POLICY_NEW:
-               atomic_inc(&cache->stats.promotion);
-               promote(cache, structs, block, lookup_result.cblock, new_ocell);
-               release_cell = false;
-               break;
+       mg->cache = cache;
+       mg->overwrite_bio = bio;
+       mg->invalidate_cblock = cblock;
+       mg->invalidate_oblock = oblock;
 
-       case POLICY_REPLACE:
-               atomic_inc(&cache->stats.demotion);
-               atomic_inc(&cache->stats.promotion);
-               demote_then_promote(cache, structs, lookup_result.old_oblock,
-                                   block, lookup_result.cblock,
-                                   ool.cell, new_ocell);
-               release_cell = false;
-               break;
+       return invalidate_lock(mg);
+}
 
-       default:
-               DMERR_LIMIT("%s: %s: erroring bio, unknown policy op: %u",
-                           cache_device_name(cache), __func__,
-                           (unsigned) lookup_result.op);
-               bio_io_error(bio);
-       }
+/*----------------------------------------------------------------
+ * bio processing
+ *--------------------------------------------------------------*/
 
-       if (release_cell)
-               cell_defer(cache, new_ocell, false);
-}
+enum busy {
+       IDLE,
+       MODERATE,
+       BUSY
+};
 
-static void process_bio(struct cache *cache, struct prealloc *structs,
-                       struct bio *bio)
+static enum busy spare_migration_bandwidth(struct cache *cache)
 {
-       int r;
-       dm_oblock_t block = get_bio_block(cache, bio);
-       struct dm_bio_prison_cell *cell_prealloc, *new_ocell;
-
-       /*
-        * Check to see if that block is currently migrating.
-        */
-       cell_prealloc = prealloc_get_cell(structs);
-       r = bio_detain(cache, block, bio, cell_prealloc,
-                      (cell_free_fn) prealloc_put_cell,
-                      structs, &new_ocell);
-       if (r > 0)
-               return;
+       bool idle = iot_idle_for(&cache->origin_tracker, HZ);
+       sector_t current_volume = (atomic_read(&cache->nr_io_migrations) + 1) *
+               cache->sectors_per_block;
 
-       process_cell(cache, structs, new_ocell);
+       if (current_volume <= cache->migration_threshold)
+               return idle ? IDLE : MODERATE;
+       else
+               return idle ? MODERATE : BUSY;
 }
 
-static int need_commit_due_to_time(struct cache *cache)
+static void inc_hit_counter(struct cache *cache, struct bio *bio)
 {
-       return jiffies < cache->last_commit_jiffies ||
-              jiffies > cache->last_commit_jiffies + COMMIT_PERIOD;
+       atomic_inc(bio_data_dir(bio) == READ ?
+                  &cache->stats.read_hit : &cache->stats.write_hit);
 }
 
-/*
- * A non-zero return indicates read_only or fail_io mode.
- */
-static int commit(struct cache *cache, bool clean_shutdown)
+static void inc_miss_counter(struct cache *cache, struct bio *bio)
 {
-       int r;
-
-       if (get_cache_mode(cache) >= CM_READ_ONLY)
-               return -EINVAL;
+       atomic_inc(bio_data_dir(bio) == READ ?
+                  &cache->stats.read_miss : &cache->stats.write_miss);
+}
 
-       atomic_inc(&cache->stats.commit_count);
-       r = dm_cache_commit(cache->cmd, clean_shutdown);
-       if (r)
-               metadata_operation_failed(cache, "dm_cache_commit", r);
+/*----------------------------------------------------------------*/
 
-       return r;
+static bool bio_writes_complete_block(struct cache *cache, struct bio *bio)
+{
+       return (bio_data_dir(bio) == WRITE) &&
+               (bio->bi_iter.bi_size == (cache->sectors_per_block << SECTOR_SHIFT));
 }
 
-static int commit_if_needed(struct cache *cache)
+static bool optimisable_bio(struct cache *cache, struct bio *bio, dm_oblock_t block)
 {
-       int r = 0;
-
-       if ((cache->commit_requested || need_commit_due_to_time(cache)) &&
-           dm_cache_changed_this_transaction(cache->cmd)) {
-               r = commit(cache, false);
-               cache->commit_requested = false;
-               cache->last_commit_jiffies = jiffies;
-       }
-
-       return r;
+       return writeback_mode(&cache->features) &&
+               (is_discarded_oblock(cache, block) || bio_writes_complete_block(cache, bio));
 }
 
-static void process_deferred_bios(struct cache *cache)
+static int map_bio(struct cache *cache, struct bio *bio, dm_oblock_t block,
+                  bool *commit_needed)
 {
-       bool prealloc_used = false;
-       unsigned long flags;
-       struct bio_list bios;
-       struct bio *bio;
-       struct prealloc structs;
-
-       memset(&structs, 0, sizeof(structs));
-       bio_list_init(&bios);
+       int r, data_dir;
+       bool rb, background_queued;
+       dm_cblock_t cblock;
+       size_t pb_data_size = get_per_bio_data_size(cache);
+       struct per_bio_data *pb = get_per_bio_data(bio, pb_data_size);
 
-       spin_lock_irqsave(&cache->lock, flags);
-       bio_list_merge(&bios, &cache->deferred_bios);
-       bio_list_init(&cache->deferred_bios);
-       spin_unlock_irqrestore(&cache->lock, flags);
+       *commit_needed = false;
 
-       while (!bio_list_empty(&bios)) {
+       rb = bio_detain_shared(cache, block, bio);
+       if (!rb) {
                /*
-                * If we've got no free migration structs, and processing
-                * this bio might require one, we pause until there are some
-                * prepared mappings to process.
+                * An exclusive lock is held for this block, so we have to
+                * wait.  We set the commit_needed flag so the current
+                * transaction will be committed asap, allowing this lock
+                * to be dropped.
                 */
-               prealloc_used = true;
-               if (prealloc_data_structs(cache, &structs)) {
-                       spin_lock_irqsave(&cache->lock, flags);
-                       bio_list_merge(&cache->deferred_bios, &bios);
-                       spin_unlock_irqrestore(&cache->lock, flags);
-                       break;
+               *commit_needed = true;
+               return DM_MAPIO_SUBMITTED;
+       }
+
+       data_dir = bio_data_dir(bio);
+
+       if (optimisable_bio(cache, bio, block)) {
+               struct policy_work *op = NULL;
+
+               r = policy_lookup_with_work(cache->policy, block, &cblock, data_dir, true, &op);
+               if (unlikely(r && r != -ENOENT)) {
+                       DMERR_LIMIT("%s: policy_lookup_with_work() failed with r = %d",
+                                   cache_device_name(cache), r);
+                       bio_io_error(bio);
+                       return DM_MAPIO_SUBMITTED;
                }
 
-               bio = bio_list_pop(&bios);
+               if (r == -ENOENT && op) {
+                       bio_drop_shared_lock(cache, bio);
+                       BUG_ON(op->op != POLICY_PROMOTE);
+                       mg_start(cache, op, bio);
+                       return DM_MAPIO_SUBMITTED;
+               }
+       } else {
+               r = policy_lookup(cache->policy, block, &cblock, data_dir, false, &background_queued);
+               if (unlikely(r && r != -ENOENT)) {
+                       DMERR_LIMIT("%s: policy_lookup() failed with r = %d",
+                                   cache_device_name(cache), r);
+                       bio_io_error(bio);
+                       return DM_MAPIO_SUBMITTED;
+               }
 
-               if (bio->bi_opf & REQ_PREFLUSH)
-                       process_flush_bio(cache, bio);
-               else if (bio_op(bio) == REQ_OP_DISCARD)
-                       process_discard_bio(cache, &structs, bio);
-               else
-                       process_bio(cache, &structs, bio);
+               if (background_queued)
+                       wake_migration_worker(cache);
        }
 
-       if (prealloc_used)
-               prealloc_free_structs(cache, &structs);
-}
-
-static void process_deferred_cells(struct cache *cache)
-{
-       bool prealloc_used = false;
-       unsigned long flags;
-       struct dm_bio_prison_cell *cell, *tmp;
-       struct list_head cells;
-       struct prealloc structs;
+       if (r == -ENOENT) {
+               /*
+                * Miss.
+                */
+               inc_miss_counter(cache, bio);
+               if (pb->req_nr == 0) {
+                       accounted_begin(cache, bio);
+                       remap_to_origin_clear_discard(cache, bio, block);
 
-       memset(&structs, 0, sizeof(structs));
+               } else {
+                       /*
+                        * This is a duplicate writethrough io that is no
+                        * longer needed because the block has been demoted.
+                        */
+                       bio_endio(bio);
+                       return DM_MAPIO_SUBMITTED;
+               }
+       } else {
+               /*
+                * Hit.
+                */
+               inc_hit_counter(cache, bio);
 
-       INIT_LIST_HEAD(&cells);
+               /*
+                * Passthrough always maps to the origin, invalidating any
+                * cache blocks that are written to.
+                */
+               if (passthrough_mode(&cache->features)) {
+                       if (bio_data_dir(bio) == WRITE) {
+                               bio_drop_shared_lock(cache, bio);
+                               atomic_inc(&cache->stats.demotion);
+                               invalidate_start(cache, cblock, block, bio);
+                       } else
+                               remap_to_origin_clear_discard(cache, bio, block);
 
-       spin_lock_irqsave(&cache->lock, flags);
-       list_splice_init(&cache->deferred_cells, &cells);
-       spin_unlock_irqrestore(&cache->lock, flags);
+               } else {
+                       if (bio_data_dir(bio) == WRITE && writethrough_mode(&cache->features) &&
+                           !is_dirty(cache, cblock)) {
+                               remap_to_origin_then_cache(cache, bio, block, cblock);
+                               accounted_begin(cache, bio);
+                       } else
+                               remap_to_cache_dirty(cache, bio, block, cblock);
+               }
+       }
 
-       list_for_each_entry_safe(cell, tmp, &cells, user_list) {
+       /*
+        * dm core turns FUA requests into a separate payload and FLUSH req.
+        */
+       if (bio->bi_opf & REQ_FUA) {
                /*
-                * If we've got no free migration structs, and processing
-                * this bio might require one, we pause until there are some
-                * prepared mappings to process.
+                * issue_after_commit will call accounted_begin a second time.  So
+                * we call accounted_complete() to avoid double accounting.
                 */
-               prealloc_used = true;
-               if (prealloc_data_structs(cache, &structs)) {
-                       spin_lock_irqsave(&cache->lock, flags);
-                       list_splice(&cells, &cache->deferred_cells);
-                       spin_unlock_irqrestore(&cache->lock, flags);
-                       break;
-               }
-
-               process_cell(cache, &structs, cell);
+               accounted_complete(cache, bio);
+               issue_after_commit(&cache->committer, bio);
+               *commit_needed = true;
+               return DM_MAPIO_SUBMITTED;
        }
 
-       if (prealloc_used)
-               prealloc_free_structs(cache, &structs);
+       return DM_MAPIO_REMAPPED;
 }
 
-static void process_deferred_flush_bios(struct cache *cache, bool submit_bios)
+static bool process_bio(struct cache *cache, struct bio *bio)
 {
-       unsigned long flags;
-       struct bio_list bios;
-       struct bio *bio;
-
-       bio_list_init(&bios);
+       bool commit_needed;
 
-       spin_lock_irqsave(&cache->lock, flags);
-       bio_list_merge(&bios, &cache->deferred_flush_bios);
-       bio_list_init(&cache->deferred_flush_bios);
-       spin_unlock_irqrestore(&cache->lock, flags);
+       if (map_bio(cache, bio, get_bio_block(cache, bio), &commit_needed) == DM_MAPIO_REMAPPED)
+               generic_make_request(bio);
 
-       /*
-        * These bios have already been through inc_ds()
-        */
-       while ((bio = bio_list_pop(&bios)))
-               submit_bios ? accounted_request(cache, bio) : bio_io_error(bio);
+       return commit_needed;
 }
 
-static void process_deferred_writethrough_bios(struct cache *cache)
+/*
+ * A non-zero return indicates read_only or fail_io mode.
+ */
+static int commit(struct cache *cache, bool clean_shutdown)
 {
-       unsigned long flags;
-       struct bio_list bios;
-       struct bio *bio;
+       int r;
 
-       bio_list_init(&bios);
+       if (get_cache_mode(cache) >= CM_READ_ONLY)
+               return -EINVAL;
 
-       spin_lock_irqsave(&cache->lock, flags);
-       bio_list_merge(&bios, &cache->deferred_writethrough_bios);
-       bio_list_init(&cache->deferred_writethrough_bios);
-       spin_unlock_irqrestore(&cache->lock, flags);
+       atomic_inc(&cache->stats.commit_count);
+       r = dm_cache_commit(cache->cmd, clean_shutdown);
+       if (r)
+               metadata_operation_failed(cache, "dm_cache_commit", r);
 
-       /*
-        * These bios have already been through inc_ds()
-        */
-       while ((bio = bio_list_pop(&bios)))
-               accounted_request(cache, bio);
+       return r;
 }
 
-static void writeback_some_dirty_blocks(struct cache *cache)
+/*
+ * Used by the batcher.
+ */
+static int commit_op(void *context)
 {
-       bool prealloc_used = false;
-       dm_oblock_t oblock;
-       dm_cblock_t cblock;
-       struct prealloc structs;
-       struct dm_bio_prison_cell *old_ocell;
-       bool busy = !iot_idle_for(&cache->origin_tracker, HZ);
-
-       memset(&structs, 0, sizeof(structs));
-
-       while (spare_migration_bandwidth(cache)) {
-               if (policy_writeback_work(cache->policy, &oblock, &cblock, busy))
-                       break; /* no work to do */
-
-               prealloc_used = true;
-               if (prealloc_data_structs(cache, &structs) ||
-                   get_cell(cache, oblock, &structs, &old_ocell)) {
-                       policy_set_dirty(cache->policy, oblock);
-                       break;
-               }
+       struct cache *cache = context;
 
-               writeback(cache, &structs, oblock, cblock, old_ocell);
-       }
+       if (dm_cache_changed_this_transaction(cache->cmd))
+               return commit(cache, false);
 
-       if (prealloc_used)
-               prealloc_free_structs(cache, &structs);
+       return 0;
 }
 
-/*----------------------------------------------------------------
- * Invalidations.
- * Dropping something from the cache *without* writing back.
- *--------------------------------------------------------------*/
+/*----------------------------------------------------------------*/
 
-static void process_invalidation_request(struct cache *cache, struct invalidation_request *req)
+static bool process_flush_bio(struct cache *cache, struct bio *bio)
 {
-       int r = 0;
-       uint64_t begin = from_cblock(req->cblocks->begin);
-       uint64_t end = from_cblock(req->cblocks->end);
-
-       while (begin != end) {
-               r = policy_remove_cblock(cache->policy, to_cblock(begin));
-               if (!r) {
-                       r = dm_cache_remove_mapping(cache->cmd, to_cblock(begin));
-                       if (r) {
-                               metadata_operation_failed(cache, "dm_cache_remove_mapping", r);
-                               break;
-                       }
-
-               } else if (r == -ENODATA) {
-                       /* harmless, already unmapped */
-                       r = 0;
-
-               } else {
-                       DMERR("%s: policy_remove_cblock failed", cache_device_name(cache));
-                       break;
-               }
-
-               begin++;
-        }
-
-       cache->commit_requested = true;
+       size_t pb_data_size = get_per_bio_data_size(cache);
+       struct per_bio_data *pb = get_per_bio_data(bio, pb_data_size);
 
-       req->err = r;
-       atomic_set(&req->complete, 1);
+       if (!pb->req_nr)
+               remap_to_origin(cache, bio);
+       else
+               remap_to_cache(cache, bio, 0);
 
-       wake_up(&req->result_wait);
+       issue_after_commit(&cache->committer, bio);
+       return true;
 }
 
-static void process_invalidation_requests(struct cache *cache)
+static bool process_discard_bio(struct cache *cache, struct bio *bio)
 {
-       struct list_head list;
-       struct invalidation_request *req, *tmp;
+       dm_dblock_t b, e;
 
-       INIT_LIST_HEAD(&list);
-       spin_lock(&cache->invalidation_lock);
-       list_splice_init(&cache->invalidation_requests, &list);
-       spin_unlock(&cache->invalidation_lock);
+       // FIXME: do we need to lock the region?  Or can we just assume the
+       // user wont be so foolish as to issue discard concurrently with
+       // other IO?
+       calc_discard_block_range(cache, bio, &b, &e);
+       while (b != e) {
+               set_discard(cache, b);
+               b = to_dblock(from_dblock(b) + 1);
+       }
 
-       list_for_each_entry_safe (req, tmp, &list, list)
-               process_invalidation_request(cache, req);
-}
+       bio_endio(bio);
 
-/*----------------------------------------------------------------
- * Main worker loop
- *--------------------------------------------------------------*/
-static bool is_quiescing(struct cache *cache)
-{
-       return atomic_read(&cache->quiescing);
+       return false;
 }
 
-static void ack_quiescing(struct cache *cache)
+static void process_deferred_bios(struct work_struct *ws)
 {
-       if (is_quiescing(cache)) {
-               atomic_inc(&cache->quiescing_ack);
-               wake_up(&cache->quiescing_wait);
-       }
-}
+       struct cache *cache = container_of(ws, struct cache, deferred_bio_worker);
 
-static void wait_for_quiescing_ack(struct cache *cache)
-{
-       wait_event(cache->quiescing_wait, atomic_read(&cache->quiescing_ack));
-}
+       unsigned long flags;
+       bool commit_needed = false;
+       struct bio_list bios;
+       struct bio *bio;
 
-static void start_quiescing(struct cache *cache)
-{
-       atomic_inc(&cache->quiescing);
-       wait_for_quiescing_ack(cache);
-}
+       bio_list_init(&bios);
 
-static void stop_quiescing(struct cache *cache)
-{
-       atomic_set(&cache->quiescing, 0);
-       atomic_set(&cache->quiescing_ack, 0);
-}
+       spin_lock_irqsave(&cache->lock, flags);
+       bio_list_merge(&bios, &cache->deferred_bios);
+       bio_list_init(&cache->deferred_bios);
+       spin_unlock_irqrestore(&cache->lock, flags);
 
-static void wait_for_migrations(struct cache *cache)
-{
-       wait_event(cache->migration_wait, !atomic_read(&cache->nr_allocated_migrations));
-}
+       while ((bio = bio_list_pop(&bios))) {
+               if (bio->bi_opf & REQ_PREFLUSH)
+                       commit_needed = process_flush_bio(cache, bio) || commit_needed;
 
-static void stop_worker(struct cache *cache)
-{
-       cancel_delayed_work(&cache->waker);
-       flush_workqueue(cache->wq);
+               else if (bio_op(bio) == REQ_OP_DISCARD)
+                       commit_needed = process_discard_bio(cache, bio) || commit_needed;
+
+               else
+                       commit_needed = process_bio(cache, bio) || commit_needed;
+       }
+
+       if (commit_needed)
+               schedule_commit(&cache->committer);
 }
 
-static void requeue_deferred_cells(struct cache *cache)
+static void process_deferred_writethrough_bios(struct work_struct *ws)
 {
+       struct cache *cache = container_of(ws, struct cache, deferred_writethrough_worker);
+
        unsigned long flags;
-       struct list_head cells;
-       struct dm_bio_prison_cell *cell, *tmp;
+       struct bio_list bios;
+       struct bio *bio;
+
+       bio_list_init(&bios);
 
-       INIT_LIST_HEAD(&cells);
        spin_lock_irqsave(&cache->lock, flags);
-       list_splice_init(&cache->deferred_cells, &cells);
+       bio_list_merge(&bios, &cache->deferred_writethrough_bios);
+       bio_list_init(&cache->deferred_writethrough_bios);
        spin_unlock_irqrestore(&cache->lock, flags);
 
-       list_for_each_entry_safe(cell, tmp, &cells, user_list)
-               cell_requeue(cache, cell);
+       /*
+        * These bios have already been through accounted_begin()
+        */
+       while ((bio = bio_list_pop(&bios)))
+               generic_make_request(bio);
 }
 
+/*----------------------------------------------------------------
+ * Main worker loop
+ *--------------------------------------------------------------*/
+
 static void requeue_deferred_bios(struct cache *cache)
 {
        struct bio *bio;
@@ -2221,53 +2022,6 @@ static void requeue_deferred_bios(struct cache *cache)
        }
 }
 
-static int more_work(struct cache *cache)
-{
-       if (is_quiescing(cache))
-               return !list_empty(&cache->quiesced_migrations) ||
-                       !list_empty(&cache->completed_migrations) ||
-                       !list_empty(&cache->need_commit_migrations);
-       else
-               return !bio_list_empty(&cache->deferred_bios) ||
-                       !list_empty(&cache->deferred_cells) ||
-                       !bio_list_empty(&cache->deferred_flush_bios) ||
-                       !bio_list_empty(&cache->deferred_writethrough_bios) ||
-                       !list_empty(&cache->quiesced_migrations) ||
-                       !list_empty(&cache->completed_migrations) ||
-                       !list_empty(&cache->need_commit_migrations) ||
-                       cache->invalidate;
-}
-
-static void do_worker(struct work_struct *ws)
-{
-       struct cache *cache = container_of(ws, struct cache, worker);
-
-       do {
-               if (!is_quiescing(cache)) {
-                       writeback_some_dirty_blocks(cache);
-                       process_deferred_writethrough_bios(cache);
-                       process_deferred_bios(cache);
-                       process_deferred_cells(cache);
-                       process_invalidation_requests(cache);
-               }
-
-               process_migrations(cache, &cache->quiesced_migrations, issue_copy_or_discard);
-               process_migrations(cache, &cache->completed_migrations, complete_migration);
-
-               if (commit_if_needed(cache)) {
-                       process_deferred_flush_bios(cache, false);
-                       process_migrations(cache, &cache->need_commit_migrations, migration_failure);
-               } else {
-                       process_deferred_flush_bios(cache, true);
-                       process_migrations(cache, &cache->need_commit_migrations,
-                                          migration_success_post_commit);
-               }
-
-               ack_quiescing(cache);
-
-       } while (more_work(cache));
-}
-
 /*
  * We want to commit periodically so that not too much
  * unwritten metadata builds up.
@@ -2275,25 +2029,39 @@ static void do_worker(struct work_struct *ws)
 static void do_waker(struct work_struct *ws)
 {
        struct cache *cache = container_of(to_delayed_work(ws), struct cache, waker);
+
        policy_tick(cache->policy, true);
-       wake_worker(cache);
+       wake_migration_worker(cache);
+       schedule_commit(&cache->committer);
        queue_delayed_work(cache->wq, &cache->waker, COMMIT_PERIOD);
 }
 
-/*----------------------------------------------------------------*/
-
-static int is_congested(struct dm_dev *dev, int bdi_bits)
+static void check_migrations(struct work_struct *ws)
 {
-       struct request_queue *q = bdev_get_queue(dev->bdev);
-       return bdi_congested(q->backing_dev_info, bdi_bits);
-}
+       int r;
+       struct policy_work *op;
+       struct cache *cache = container_of(ws, struct cache, migration_worker);
+       enum busy b;
 
-static int cache_is_congested(struct dm_target_callbacks *cb, int bdi_bits)
-{
-       struct cache *cache = container_of(cb, struct cache, callbacks);
+       for (;;) {
+               b = spare_migration_bandwidth(cache);
+               if (b == BUSY)
+                       break;
 
-       return is_congested(cache->origin_dev, bdi_bits) ||
-               is_congested(cache->cache_dev, bdi_bits);
+               r = policy_get_background_work(cache->policy, b == IDLE, &op);
+               if (r == -ENODATA)
+                       break;
+
+               if (r) {
+                       DMERR_LIMIT("%s: policy_background_work failed",
+                                   cache_device_name(cache));
+                       break;
+               }
+
+               r = mg_start(cache, op, NULL);
+               if (r)
+                       break;
+       }
 }
 
 /*----------------------------------------------------------------
@@ -2310,11 +2078,8 @@ static void destroy(struct cache *cache)
 
        mempool_destroy(cache->migration_pool);
 
-       if (cache->all_io_ds)
-               dm_deferred_set_destroy(cache->all_io_ds);
-
        if (cache->prison)
-               dm_bio_prison_destroy(cache->prison);
+               dm_bio_prison_destroy_v2(cache->prison);
 
        if (cache->wq)
                destroy_workqueue(cache->wq);
@@ -2707,6 +2472,7 @@ static int create_cache_policy(struct cache *cache, struct cache_args *ca,
                return PTR_ERR(p);
        }
        cache->policy = p;
+       BUG_ON(!cache->policy);
 
        return 0;
 }
@@ -2750,6 +2516,20 @@ static void set_cache_size(struct cache *cache, dm_cblock_t size)
        cache->cache_size = size;
 }
 
+static int is_congested(struct dm_dev *dev, int bdi_bits)
+{
+       struct request_queue *q = bdev_get_queue(dev->bdev);
+       return bdi_congested(q->backing_dev_info, bdi_bits);
+}
+
+static int cache_is_congested(struct dm_target_callbacks *cb, int bdi_bits)
+{
+       struct cache *cache = container_of(cb, struct cache, callbacks);
+
+       return is_congested(cache->origin_dev, bdi_bits) ||
+               is_congested(cache->cache_dev, bdi_bits);
+}
+
 #define DEFAULT_MIGRATION_THRESHOLD 2048
 
 static int cache_create(struct cache_args *ca, struct cache **result)
@@ -2773,7 +2553,6 @@ static int cache_create(struct cache_args *ca, struct cache **result)
 
        ti->num_discard_bios = 1;
        ti->discards_supported = true;
-       ti->discard_zeroes_data_unsupported = true;
        ti->split_discard_bios = false;
 
        cache->features = ca->features;
@@ -2788,7 +2567,6 @@ static int cache_create(struct cache_args *ca, struct cache **result)
 
        ca->metadata_dev = ca->origin_dev = ca->cache_dev = NULL;
 
-       /* FIXME: factor out this whole section */
        origin_blocks = cache->origin_sectors = ca->origin_sectors;
        origin_blocks = block_div(origin_blocks, ca->block_size);
        cache->origin_blocks = to_oblock(origin_blocks);
@@ -2854,24 +2632,18 @@ static int cache_create(struct cache_args *ca, struct cache **result)
                        r = -EINVAL;
                        goto bad;
                }
+
+               policy_allow_migrations(cache->policy, false);
        }
 
        spin_lock_init(&cache->lock);
        INIT_LIST_HEAD(&cache->deferred_cells);
        bio_list_init(&cache->deferred_bios);
-       bio_list_init(&cache->deferred_flush_bios);
        bio_list_init(&cache->deferred_writethrough_bios);
-       INIT_LIST_HEAD(&cache->quiesced_migrations);
-       INIT_LIST_HEAD(&cache->completed_migrations);
-       INIT_LIST_HEAD(&cache->need_commit_migrations);
        atomic_set(&cache->nr_allocated_migrations, 0);
        atomic_set(&cache->nr_io_migrations, 0);
        init_waitqueue_head(&cache->migration_wait);
 
-       init_waitqueue_head(&cache->quiescing_wait);
-       atomic_set(&cache->quiescing, 0);
-       atomic_set(&cache->quiescing_ack, 0);
-
        r = -ENOMEM;
        atomic_set(&cache->nr_dirty, 0);
        cache->dirty_bitset = alloc_bitset(from_cblock(cache->cache_size));
@@ -2900,27 +2672,23 @@ static int cache_create(struct cache_args *ca, struct cache **result)
                goto bad;
        }
 
-       cache->wq = alloc_ordered_workqueue("dm-" DM_MSG_PREFIX, WQ_MEM_RECLAIM);
+       cache->wq = alloc_workqueue("dm-" DM_MSG_PREFIX, WQ_MEM_RECLAIM, 0);
        if (!cache->wq) {
                *error = "could not create workqueue for metadata object";
                goto bad;
        }
-       INIT_WORK(&cache->worker, do_worker);
+       INIT_WORK(&cache->deferred_bio_worker, process_deferred_bios);
+       INIT_WORK(&cache->deferred_writethrough_worker,
+                 process_deferred_writethrough_bios);
+       INIT_WORK(&cache->migration_worker, check_migrations);
        INIT_DELAYED_WORK(&cache->waker, do_waker);
-       cache->last_commit_jiffies = jiffies;
 
-       cache->prison = dm_bio_prison_create();
+       cache->prison = dm_bio_prison_create_v2(cache->wq);
        if (!cache->prison) {
                *error = "could not create bio prison";
                goto bad;
        }
 
-       cache->all_io_ds = dm_deferred_set_create();
-       if (!cache->all_io_ds) {
-               *error = "could not create all_io deferred set";
-               goto bad;
-       }
-
        cache->migration_pool = mempool_create_slab_pool(MIGRATION_POOL_SIZE,
                                                         migration_cache);
        if (!cache->migration_pool) {
@@ -2947,11 +2715,15 @@ static int cache_create(struct cache_args *ca, struct cache **result)
        spin_lock_init(&cache->invalidation_lock);
        INIT_LIST_HEAD(&cache->invalidation_requests);
 
+       batcher_init(&cache->committer, commit_op, cache,
+                    issue_op, cache, cache->wq);
        iot_init(&cache->origin_tracker);
 
+       init_rwsem(&cache->background_work_lock);
+       prevent_background_work(cache);
+
        *result = cache;
        return 0;
-
 bad:
        destroy(cache);
        return r;
@@ -3009,7 +2781,6 @@ static int cache_ctr(struct dm_target *ti, unsigned argc, char **argv)
        }
 
        ti->private = cache;
-
 out:
        destroy_cache_args(ca);
        return r;
@@ -3022,17 +2793,11 @@ static int cache_map(struct dm_target *ti, struct bio *bio)
        struct cache *cache = ti->private;
 
        int r;
-       struct dm_bio_prison_cell *cell = NULL;
+       bool commit_needed;
        dm_oblock_t block = get_bio_block(cache, bio);
        size_t pb_data_size = get_per_bio_data_size(cache);
-       bool can_migrate = false;
-       bool fast_promotion;
-       struct policy_result lookup_result;
-       struct per_bio_data *pb = init_per_bio_data(bio, pb_data_size);
-       struct old_oblock_lock ool;
-
-       ool.locker.fn = null_locker;
 
+       init_per_bio_data(bio, pb_data_size);
        if (unlikely(from_oblock(block) >= from_oblock(cache->origin_blocks))) {
                /*
                 * This can only occur if the io goes to a partial block at
@@ -3049,101 +2814,9 @@ static int cache_map(struct dm_target *ti, struct bio *bio)
                return DM_MAPIO_SUBMITTED;
        }
 
-       /*
-        * Check to see if that block is currently migrating.
-        */
-       cell = alloc_prison_cell(cache);
-       if (!cell) {
-               defer_bio(cache, bio);
-               return DM_MAPIO_SUBMITTED;
-       }
-
-       r = bio_detain(cache, block, bio, cell,
-                      (cell_free_fn) free_prison_cell,
-                      cache, &cell);
-       if (r) {
-               if (r < 0)
-                       defer_bio(cache, bio);
-
-               return DM_MAPIO_SUBMITTED;
-       }
-
-       fast_promotion = is_discarded_oblock(cache, block) || bio_writes_complete_block(cache, bio);
-
-       r = policy_map(cache->policy, block, false, can_migrate, fast_promotion,
-                      bio, &ool.locker, &lookup_result);
-       if (r == -EWOULDBLOCK) {
-               cell_defer(cache, cell, true);
-               return DM_MAPIO_SUBMITTED;
-
-       } else if (r) {
-               DMERR_LIMIT("%s: Unexpected return from cache replacement policy: %d",
-                           cache_device_name(cache), r);
-               cell_defer(cache, cell, false);
-               bio_io_error(bio);
-               return DM_MAPIO_SUBMITTED;
-       }
-
-       r = DM_MAPIO_REMAPPED;
-       switch (lookup_result.op) {
-       case POLICY_HIT:
-               if (passthrough_mode(&cache->features)) {
-                       if (bio_data_dir(bio) == WRITE) {
-                               /*
-                                * We need to invalidate this block, so
-                                * defer for the worker thread.
-                                */
-                               cell_defer(cache, cell, true);
-                               r = DM_MAPIO_SUBMITTED;
-
-                       } else {
-                               inc_miss_counter(cache, bio);
-                               remap_to_origin_clear_discard(cache, bio, block);
-                               accounted_begin(cache, bio);
-                               inc_ds(cache, bio, cell);
-                               // FIXME: we want to remap hits or misses straight
-                               // away rather than passing over to the worker.
-                               cell_defer(cache, cell, false);
-                       }
-
-               } else {
-                       inc_hit_counter(cache, bio);
-                       if (bio_data_dir(bio) == WRITE && writethrough_mode(&cache->features) &&
-                           !is_dirty(cache, lookup_result.cblock)) {
-                               remap_to_origin_then_cache(cache, bio, block, lookup_result.cblock);
-                               accounted_begin(cache, bio);
-                               inc_ds(cache, bio, cell);
-                               cell_defer(cache, cell, false);
-
-                       } else
-                               remap_cell_to_cache_dirty(cache, cell, block, lookup_result.cblock, false);
-               }
-               break;
-
-       case POLICY_MISS:
-               inc_miss_counter(cache, bio);
-               if (pb->req_nr != 0) {
-                       /*
-                        * This is a duplicate writethrough io that is no
-                        * longer needed because the block has been demoted.
-                        */
-                       bio_endio(bio);
-                       // FIXME: remap everything as a miss
-                       cell_defer(cache, cell, false);
-                       r = DM_MAPIO_SUBMITTED;
-
-               } else
-                       remap_cell_to_origin_clear_discard(cache, cell, block, false);
-               break;
-
-       default:
-               DMERR_LIMIT("%s: %s: erroring bio: unknown policy op: %u",
-                           cache_device_name(cache), __func__,
-                           (unsigned) lookup_result.op);
-               cell_defer(cache, cell, false);
-               bio_io_error(bio);
-               r = DM_MAPIO_SUBMITTED;
-       }
+       r = map_bio(cache, bio, block, &commit_needed);
+       if (commit_needed)
+               schedule_commit(&cache->committer);
 
        return r;
 }
@@ -3163,7 +2836,7 @@ static int cache_end_io(struct dm_target *ti, struct bio *bio, int error)
                spin_unlock_irqrestore(&cache->lock, flags);
        }
 
-       check_for_quiesced_migrations(cache, pb);
+       bio_drop_shared_lock(cache, bio);
        accounted_complete(cache, bio);
 
        return 0;
@@ -3263,12 +2936,18 @@ static void cache_postsuspend(struct dm_target *ti)
 {
        struct cache *cache = ti->private;
 
-       start_quiescing(cache);
-       wait_for_migrations(cache);
-       stop_worker(cache);
+       prevent_background_work(cache);
+       BUG_ON(atomic_read(&cache->nr_io_migrations));
+
+       cancel_delayed_work(&cache->waker);
+       flush_workqueue(cache->wq);
+       WARN_ON(cache->origin_tracker.in_flight);
+
+       /*
+        * If it's a flush suspend there won't be any deferred bios, so this
+        * call is harmless.
+        */
        requeue_deferred_bios(cache);
-       requeue_deferred_cells(cache);
-       stop_quiescing(cache);
 
        if (get_cache_mode(cache) == CM_WRITE)
                (void) sync_metadata(cache);
@@ -3280,15 +2959,16 @@ static int load_mapping(void *context, dm_oblock_t oblock, dm_cblock_t cblock,
        int r;
        struct cache *cache = context;
 
-       r = policy_load_mapping(cache->policy, oblock, cblock, hint, hint_valid);
+       if (dirty) {
+               set_bit(from_cblock(cblock), cache->dirty_bitset);
+               atomic_inc(&cache->nr_dirty);
+       } else
+               clear_bit(from_cblock(cblock), cache->dirty_bitset);
+
+       r = policy_load_mapping(cache->policy, oblock, cblock, dirty, hint, hint_valid);
        if (r)
                return r;
 
-       if (dirty)
-               set_dirty(cache, oblock, cblock);
-       else
-               clear_dirty(cache, oblock, cblock);
-
        return 0;
 }
 
@@ -3487,6 +3167,7 @@ static void cache_resume(struct dm_target *ti)
        struct cache *cache = ti->private;
 
        cache->need_tick_bio = true;
+       allow_background_work(cache);
        do_waker(&cache->waker.work);
 }
 
@@ -3620,11 +3301,20 @@ err:
        DMEMIT("Error");
 }
 
+/*
+ * Defines a range of cblocks, begin to (end - 1) are in the range.  end is
+ * the one-past-the-end value.
+ */
+struct cblock_range {
+       dm_cblock_t begin;
+       dm_cblock_t end;
+};
+
 /*
  * A cache block range can take two forms:
  *
  * i) A single cblock, eg. '3456'
- * ii) A begin and end cblock with dots between, eg. 123-234
+ * ii) A begin and end cblock with a dash between, eg. 123-234
  */
 static int parse_cblock_range(struct cache *cache, const char *str,
                              struct cblock_range *result)
@@ -3690,23 +3380,31 @@ static int validate_cblock_range(struct cache *cache, struct cblock_range *range
        return 0;
 }
 
+static inline dm_cblock_t cblock_succ(dm_cblock_t b)
+{
+       return to_cblock(from_cblock(b) + 1);
+}
+
 static int request_invalidation(struct cache *cache, struct cblock_range *range)
 {
-       struct invalidation_request req;
+       int r = 0;
 
-       INIT_LIST_HEAD(&req.list);
-       req.cblocks = range;
-       atomic_set(&req.complete, 0);
-       req.err = 0;
-       init_waitqueue_head(&req.result_wait);
+       /*
+        * We don't need to do any locking here because we know we're in
+        * passthrough mode.  There's is potential for a race between an
+        * invalidation triggered by an io and an invalidation message.  This
+        * is harmless, we must not worry if the policy call fails.
+        */
+       while (range->begin != range->end) {
+               r = invalidate_cblock(cache, range->begin);
+               if (r)
+                       return r;
 
-       spin_lock(&cache->invalidation_lock);
-       list_add(&req.list, &cache->invalidation_requests);
-       spin_unlock(&cache->invalidation_lock);
-       wake_worker(cache);
+               range->begin = cblock_succ(range->begin);
+       }
 
-       wait_event(req.result_wait, atomic_read(&req.complete));
-       return req.err;
+       cache->commit_requested = true;
+       return r;
 }
 
 static int process_invalidate_cblocks_message(struct cache *cache, unsigned count,
@@ -3816,7 +3514,7 @@ static void cache_io_hints(struct dm_target *ti, struct queue_limits *limits)
 
 static struct target_type cache_target = {
        .name = "cache",
-       .version = {1, 10, 0},
+       .version = {2, 0, 0},
        .module = THIS_MODULE,
        .ctr = cache_ctr,
        .dtr = cache_dtr,