]> git.proxmox.com Git - mirror_ubuntu-artful-kernel.git/blobdiff - drivers/md/raid5-cache.c
drivers/md/dm-ioctl.c: use kvmalloc rather than opencoded variant
[mirror_ubuntu-artful-kernel.git] / drivers / md / raid5-cache.c
index 302dea3296ba5ccd07740365314f45d74df49ec2..26ba09282e7c9691bdc352c5fb4b75b9f6e29821 100644 (file)
@@ -20,6 +20,7 @@
 #include <linux/crc32c.h>
 #include <linux/random.h>
 #include <linux/kthread.h>
+#include <linux/types.h>
 #include "md.h"
 #include "raid5.h"
 #include "bitmap.h"
@@ -29,6 +30,7 @@
  * underneath hardware sector size. only works with PAGE_SIZE == 4096
  */
 #define BLOCK_SECTORS (8)
+#define BLOCK_SECTOR_SHIFT (3)
 
 /*
  * log->max_free_space is min(1/4 disk size, 10G reclaimable space).
@@ -42,7 +44,7 @@
 /* wake up reclaim thread periodically */
 #define R5C_RECLAIM_WAKEUP_INTERVAL (30 * HZ)
 /* start flush with these full stripes */
-#define R5C_FULL_STRIPE_FLUSH_BATCH 256
+#define R5C_FULL_STRIPE_FLUSH_BATCH(conf) (conf->max_nr_stripes / 4)
 /* reclaim stripes in groups */
 #define R5C_RECLAIM_STRIPE_GROUP (NR_STRIPE_HASH_LOCKS * 2)
 
  */
 #define R5L_POOL_SIZE  4
 
-/*
- * r5c journal modes of the array: write-back or write-through.
- * write-through mode has identical behavior as existing log only
- * implementation.
- */
-enum r5c_journal_mode {
-       R5C_JOURNAL_MODE_WRITE_THROUGH = 0,
-       R5C_JOURNAL_MODE_WRITE_BACK = 1,
-};
-
 static char *r5c_journal_mode_str[] = {"write-through",
                                       "write-back"};
 /*
@@ -164,8 +156,59 @@ struct r5l_log {
        struct work_struct deferred_io_work;
        /* to disable write back during in degraded mode */
        struct work_struct disable_writeback_work;
+
+       /* to for chunk_aligned_read in writeback mode, details below */
+       spinlock_t tree_lock;
+       struct radix_tree_root big_stripe_tree;
 };
 
+/*
+ * Enable chunk_aligned_read() with write back cache.
+ *
+ * Each chunk may contain more than one stripe (for example, a 256kB
+ * chunk contains 64 4kB-page, so this chunk contain 64 stripes). For
+ * chunk_aligned_read, these stripes are grouped into one "big_stripe".
+ * For each big_stripe, we count how many stripes of this big_stripe
+ * are in the write back cache. These data are tracked in a radix tree
+ * (big_stripe_tree). We use radix_tree item pointer as the counter.
+ * r5c_tree_index() is used to calculate keys for the radix tree.
+ *
+ * chunk_aligned_read() calls r5c_big_stripe_cached() to look up
+ * big_stripe of each chunk in the tree. If this big_stripe is in the
+ * tree, chunk_aligned_read() aborts. This look up is protected by
+ * rcu_read_lock().
+ *
+ * It is necessary to remember whether a stripe is counted in
+ * big_stripe_tree. Instead of adding new flag, we reuses existing flags:
+ * STRIPE_R5C_PARTIAL_STRIPE and STRIPE_R5C_FULL_STRIPE. If either of these
+ * two flags are set, the stripe is counted in big_stripe_tree. This
+ * requires moving set_bit(STRIPE_R5C_PARTIAL_STRIPE) to
+ * r5c_try_caching_write(); and moving clear_bit of
+ * STRIPE_R5C_PARTIAL_STRIPE and STRIPE_R5C_FULL_STRIPE to
+ * r5c_finish_stripe_write_out().
+ */
+
+/*
+ * radix tree requests lowest 2 bits of data pointer to be 2b'00.
+ * So it is necessary to left shift the counter by 2 bits before using it
+ * as data pointer of the tree.
+ */
+#define R5C_RADIX_COUNT_SHIFT 2
+
+/*
+ * calculate key for big_stripe_tree
+ *
+ * sect: align_bi->bi_iter.bi_sector or sh->sector
+ */
+static inline sector_t r5c_tree_index(struct r5conf *conf,
+                                     sector_t sect)
+{
+       sector_t offset;
+
+       offset = sector_div(sect, conf->chunk_sectors);
+       return sect;
+}
+
 /*
  * an IO range starts from a meta data block and end at the next meta data
  * block. The io unit's the meta data block tracks data/parity followed it. io
@@ -255,8 +298,7 @@ static void __r5l_set_io_unit_state(struct r5l_io_unit *io,
 }
 
 static void
-r5c_return_dev_pending_writes(struct r5conf *conf, struct r5dev *dev,
-                             struct bio_list *return_bi)
+r5c_return_dev_pending_writes(struct r5conf *conf, struct r5dev *dev)
 {
        struct bio *wbi, *wbi2;
 
@@ -265,24 +307,21 @@ r5c_return_dev_pending_writes(struct r5conf *conf, struct r5dev *dev,
        while (wbi && wbi->bi_iter.bi_sector <
               dev->sector + STRIPE_SECTORS) {
                wbi2 = r5_next_bio(wbi, dev->sector);
-               if (!raid5_dec_bi_active_stripes(wbi)) {
-                       md_write_end(conf->mddev);
-                       bio_list_add(return_bi, wbi);
-               }
+               md_write_end(conf->mddev);
+               bio_endio(wbi);
                wbi = wbi2;
        }
 }
 
 void r5c_handle_cached_data_endio(struct r5conf *conf,
-         struct stripe_head *sh, int disks, struct bio_list *return_bi)
+                                 struct stripe_head *sh, int disks)
 {
        int i;
 
        for (i = sh->disks; i--; ) {
                if (sh->dev[i].written) {
                        set_bit(R5_UPTODATE, &sh->dev[i].flags);
-                       r5c_return_dev_pending_writes(conf, &sh->dev[i],
-                                                     return_bi);
+                       r5c_return_dev_pending_writes(conf, &sh->dev[i]);
                        bitmap_endwrite(conf->mddev->bitmap, sh->sector,
                                        STRIPE_SECTORS,
                                        !test_bit(STRIPE_DEGRADED, &sh->state),
@@ -291,6 +330,8 @@ void r5c_handle_cached_data_endio(struct r5conf *conf,
        }
 }
 
+void r5l_wake_reclaim(struct r5l_log *log, sector_t space);
+
 /* Check whether we should flush some stripes to free up stripe cache */
 void r5c_check_stripe_cache_usage(struct r5conf *conf)
 {
@@ -329,7 +370,7 @@ void r5c_check_cached_full_stripe(struct r5conf *conf)
         * or a full stripe (chunk size / 4k stripes).
         */
        if (atomic_read(&conf->r5c_cached_full_stripes) >=
-           min(R5C_FULL_STRIPE_FLUSH_BATCH,
+           min(R5C_FULL_STRIPE_FLUSH_BATCH(conf),
                conf->chunk_sectors >> STRIPE_SHIFT))
                r5l_wake_reclaim(conf->log, 0);
 }
@@ -337,17 +378,30 @@ void r5c_check_cached_full_stripe(struct r5conf *conf)
 /*
  * Total log space (in sectors) needed to flush all data in cache
  *
- * Currently, writing-out phase automatically includes all pending writes
- * to the same sector. So the reclaim of each stripe takes up to
- * (conf->raid_disks + 1) pages of log space.
+ * To avoid deadlock due to log space, it is necessary to reserve log
+ * space to flush critical stripes (stripes that occupying log space near
+ * last_checkpoint). This function helps check how much log space is
+ * required to flush all cached stripes.
+ *
+ * To reduce log space requirements, two mechanisms are used to give cache
+ * flush higher priorities:
+ *    1. In handle_stripe_dirtying() and schedule_reconstruction(),
+ *       stripes ALREADY in journal can be flushed w/o pending writes;
+ *    2. In r5l_write_stripe() and r5c_cache_data(), stripes NOT in journal
+ *       can be delayed (r5l_add_no_space_stripe).
  *
- * To totally avoid deadlock due to log space, the code reserves
- * (conf->raid_disks + 1) pages for each stripe in cache, which is not
- * necessary in most cases.
+ * In cache flush, the stripe goes through 1 and then 2. For a stripe that
+ * already passed 1, flushing it requires at most (conf->max_degraded + 1)
+ * pages of journal space. For stripes that has not passed 1, flushing it
+ * requires (conf->raid_disks + 1) pages of journal space. There are at
+ * most (conf->group_cnt + 1) stripe that passed 1. So total journal space
+ * required to flush all cached stripes (in pages) is:
  *
- * To improve this, we will need writing-out phase to be able to NOT include
- * pending writes, which will reduce the requirement to
- * (conf->max_degraded + 1) pages per stripe in cache.
+ *     (stripe_in_journal_count - group_cnt - 1) * (max_degraded + 1) +
+ *     (group_cnt + 1) * (raid_disks + 1)
+ * or
+ *     (stripe_in_journal_count) * (max_degraded + 1) +
+ *     (group_cnt + 1) * (raid_disks - max_degraded)
  */
 static sector_t r5c_log_required_to_flush_cache(struct r5conf *conf)
 {
@@ -356,8 +410,9 @@ static sector_t r5c_log_required_to_flush_cache(struct r5conf *conf)
        if (!r5c_is_writeback(log))
                return 0;
 
-       return BLOCK_SECTORS * (conf->raid_disks + 1) *
-               atomic_read(&log->stripe_in_journal_count);
+       return BLOCK_SECTORS *
+               ((conf->max_degraded + 1) * atomic_read(&log->stripe_in_journal_count) +
+                (conf->raid_disks - conf->max_degraded) * (conf->group_cnt + 1));
 }
 
 /*
@@ -412,16 +467,6 @@ void r5c_make_stripe_write_out(struct stripe_head *sh)
 
        if (!test_and_set_bit(STRIPE_PREREAD_ACTIVE, &sh->state))
                atomic_inc(&conf->preread_active_stripes);
-
-       if (test_and_clear_bit(STRIPE_R5C_PARTIAL_STRIPE, &sh->state)) {
-               BUG_ON(atomic_read(&conf->r5c_cached_partial_stripes) == 0);
-               atomic_dec(&conf->r5c_cached_partial_stripes);
-       }
-
-       if (test_and_clear_bit(STRIPE_R5C_FULL_STRIPE, &sh->state)) {
-               BUG_ON(atomic_read(&conf->r5c_cached_full_stripes) == 0);
-               atomic_dec(&conf->r5c_cached_full_stripes);
-       }
 }
 
 static void r5c_handle_data_cached(struct stripe_head *sh)
@@ -534,7 +579,7 @@ static void r5l_log_endio(struct bio *bio)
 
        spin_lock_irqsave(&log->io_list_lock, flags);
        __r5l_set_io_unit_state(io, IO_UNIT_IO_END);
-       if (log->need_cache_flush)
+       if (log->need_cache_flush && !list_empty(&io->stripe_list))
                r5l_move_to_end_ios(log);
        else
                r5l_log_run_stripes(log);
@@ -562,9 +607,11 @@ static void r5l_log_endio(struct bio *bio)
                        bio_endio(bi);
                        atomic_dec(&io->pending_stripe);
                }
-               if (atomic_read(&io->pending_stripe) == 0)
-                       __r5l_stripe_write_finished(io);
        }
+
+       /* finish flush only io_unit and PAYLOAD_FLUSH only io_unit */
+       if (atomic_read(&io->pending_stripe) == 0)
+               __r5l_stripe_write_finished(io);
 }
 
 static void r5l_do_submit_io(struct r5l_log *log, struct r5l_io_unit *io)
@@ -786,6 +833,41 @@ static void r5l_append_payload_page(struct r5l_log *log, struct page *page)
        r5_reserve_log_entry(log, io);
 }
 
+static void r5l_append_flush_payload(struct r5l_log *log, sector_t sect)
+{
+       struct mddev *mddev = log->rdev->mddev;
+       struct r5conf *conf = mddev->private;
+       struct r5l_io_unit *io;
+       struct r5l_payload_flush *payload;
+       int meta_size;
+
+       /*
+        * payload_flush requires extra writes to the journal.
+        * To avoid handling the extra IO in quiesce, just skip
+        * flush_payload
+        */
+       if (conf->quiesce)
+               return;
+
+       mutex_lock(&log->io_mutex);
+       meta_size = sizeof(struct r5l_payload_flush) + sizeof(__le64);
+
+       if (r5l_get_meta(log, meta_size)) {
+               mutex_unlock(&log->io_mutex);
+               return;
+       }
+
+       /* current implementation is one stripe per flush payload */
+       io = log->current_io;
+       payload = page_address(io->meta_page) + io->meta_offset;
+       payload->header.type = cpu_to_le16(R5LOG_PAYLOAD_FLUSH);
+       payload->header.flags = cpu_to_le16(0);
+       payload->size = cpu_to_le32(sizeof(__le64));
+       payload->flush_stripes[0] = cpu_to_le64(sect);
+       io->meta_offset += meta_size;
+       mutex_unlock(&log->io_mutex);
+}
+
 static int r5l_log_stripe(struct r5l_log *log, struct stripe_head *sh,
                           int data_pages, int parity_pages)
 {
@@ -1271,6 +1353,10 @@ static void r5c_flush_stripe(struct r5conf *conf, struct stripe_head *sh)
        atomic_inc(&conf->active_stripes);
        r5c_make_stripe_write_out(sh);
 
+       if (test_bit(STRIPE_R5C_PARTIAL_STRIPE, &sh->state))
+               atomic_inc(&conf->r5c_flushing_partial_stripes);
+       else
+               atomic_inc(&conf->r5c_flushing_full_stripes);
        raid5_release_stripe(sh);
 }
 
@@ -1313,12 +1399,16 @@ static void r5c_do_reclaim(struct r5conf *conf)
        unsigned long flags;
        int total_cached;
        int stripes_to_flush;
+       int flushing_partial, flushing_full;
 
        if (!r5c_is_writeback(log))
                return;
 
+       flushing_partial = atomic_read(&conf->r5c_flushing_partial_stripes);
+       flushing_full = atomic_read(&conf->r5c_flushing_full_stripes);
        total_cached = atomic_read(&conf->r5c_cached_partial_stripes) +
-               atomic_read(&conf->r5c_cached_full_stripes);
+               atomic_read(&conf->r5c_cached_full_stripes) -
+               flushing_full - flushing_partial;
 
        if (total_cached > conf->min_nr_stripes * 3 / 4 ||
            atomic_read(&conf->empty_inactive_list_nr) > 0)
@@ -1328,8 +1418,8 @@ static void r5c_do_reclaim(struct r5conf *conf)
                 */
                stripes_to_flush = R5C_RECLAIM_STRIPE_GROUP;
        else if (total_cached > conf->min_nr_stripes * 1 / 2 ||
-                atomic_read(&conf->r5c_cached_full_stripes) >
-                R5C_FULL_STRIPE_FLUSH_BATCH)
+                atomic_read(&conf->r5c_cached_full_stripes) - flushing_full >
+                R5C_FULL_STRIPE_FLUSH_BATCH(conf))
                /*
                 * if stripe cache pressure moderate, or if there is many full
                 * stripes,flush all full stripes
@@ -1362,9 +1452,9 @@ static void r5c_do_reclaim(struct r5conf *conf)
                            !test_bit(STRIPE_HANDLE, &sh->state) &&
                            atomic_read(&sh->count) == 0) {
                                r5c_flush_stripe(conf, sh);
+                               if (count++ >= R5C_RECLAIM_STRIPE_GROUP)
+                                       break;
                        }
-                       if (count++ >= R5C_RECLAIM_STRIPE_GROUP)
-                               break;
                }
                spin_unlock(&conf->device_lock);
                spin_unlock_irqrestore(&log->stripe_in_journal_lock, flags);
@@ -1488,6 +1578,8 @@ bool r5l_log_disk_error(struct r5conf *conf)
        return ret;
 }
 
+#define R5L_RECOVERY_PAGE_POOL_SIZE 256
+
 struct r5l_recovery_ctx {
        struct page *meta_page;         /* current meta */
        sector_t meta_total_blocks;     /* total size of current meta and data */
@@ -1496,18 +1588,131 @@ struct r5l_recovery_ctx {
        int data_parity_stripes;        /* number of data_parity stripes */
        int data_only_stripes;          /* number of data_only stripes */
        struct list_head cached_list;
+
+       /*
+        * read ahead page pool (ra_pool)
+        * in recovery, log is read sequentially. It is not efficient to
+        * read every page with sync_page_io(). The read ahead page pool
+        * reads multiple pages with one IO, so further log read can
+        * just copy data from the pool.
+        */
+       struct page *ra_pool[R5L_RECOVERY_PAGE_POOL_SIZE];
+       sector_t pool_offset;   /* offset of first page in the pool */
+       int total_pages;        /* total allocated pages */
+       int valid_pages;        /* pages with valid data */
+       struct bio *ra_bio;     /* bio to do the read ahead */
 };
 
+static int r5l_recovery_allocate_ra_pool(struct r5l_log *log,
+                                           struct r5l_recovery_ctx *ctx)
+{
+       struct page *page;
+
+       ctx->ra_bio = bio_alloc_bioset(GFP_KERNEL, BIO_MAX_PAGES, log->bs);
+       if (!ctx->ra_bio)
+               return -ENOMEM;
+
+       ctx->valid_pages = 0;
+       ctx->total_pages = 0;
+       while (ctx->total_pages < R5L_RECOVERY_PAGE_POOL_SIZE) {
+               page = alloc_page(GFP_KERNEL);
+
+               if (!page)
+                       break;
+               ctx->ra_pool[ctx->total_pages] = page;
+               ctx->total_pages += 1;
+       }
+
+       if (ctx->total_pages == 0) {
+               bio_put(ctx->ra_bio);
+               return -ENOMEM;
+       }
+
+       ctx->pool_offset = 0;
+       return 0;
+}
+
+static void r5l_recovery_free_ra_pool(struct r5l_log *log,
+                                       struct r5l_recovery_ctx *ctx)
+{
+       int i;
+
+       for (i = 0; i < ctx->total_pages; ++i)
+               put_page(ctx->ra_pool[i]);
+       bio_put(ctx->ra_bio);
+}
+
+/*
+ * fetch ctx->valid_pages pages from offset
+ * In normal cases, ctx->valid_pages == ctx->total_pages after the call.
+ * However, if the offset is close to the end of the journal device,
+ * ctx->valid_pages could be smaller than ctx->total_pages
+ */
+static int r5l_recovery_fetch_ra_pool(struct r5l_log *log,
+                                     struct r5l_recovery_ctx *ctx,
+                                     sector_t offset)
+{
+       bio_reset(ctx->ra_bio);
+       ctx->ra_bio->bi_bdev = log->rdev->bdev;
+       bio_set_op_attrs(ctx->ra_bio, REQ_OP_READ, 0);
+       ctx->ra_bio->bi_iter.bi_sector = log->rdev->data_offset + offset;
+
+       ctx->valid_pages = 0;
+       ctx->pool_offset = offset;
+
+       while (ctx->valid_pages < ctx->total_pages) {
+               bio_add_page(ctx->ra_bio,
+                            ctx->ra_pool[ctx->valid_pages], PAGE_SIZE, 0);
+               ctx->valid_pages += 1;
+
+               offset = r5l_ring_add(log, offset, BLOCK_SECTORS);
+
+               if (offset == 0)  /* reached end of the device */
+                       break;
+       }
+
+       return submit_bio_wait(ctx->ra_bio);
+}
+
+/*
+ * try read a page from the read ahead page pool, if the page is not in the
+ * pool, call r5l_recovery_fetch_ra_pool
+ */
+static int r5l_recovery_read_page(struct r5l_log *log,
+                                 struct r5l_recovery_ctx *ctx,
+                                 struct page *page,
+                                 sector_t offset)
+{
+       int ret;
+
+       if (offset < ctx->pool_offset ||
+           offset >= ctx->pool_offset + ctx->valid_pages * BLOCK_SECTORS) {
+               ret = r5l_recovery_fetch_ra_pool(log, ctx, offset);
+               if (ret)
+                       return ret;
+       }
+
+       BUG_ON(offset < ctx->pool_offset ||
+              offset >= ctx->pool_offset + ctx->valid_pages * BLOCK_SECTORS);
+
+       memcpy(page_address(page),
+              page_address(ctx->ra_pool[(offset - ctx->pool_offset) >>
+                                        BLOCK_SECTOR_SHIFT]),
+              PAGE_SIZE);
+       return 0;
+}
+
 static int r5l_recovery_read_meta_block(struct r5l_log *log,
                                        struct r5l_recovery_ctx *ctx)
 {
        struct page *page = ctx->meta_page;
        struct r5l_meta_block *mb;
        u32 crc, stored_crc;
+       int ret;
 
-       if (!sync_page_io(log->rdev, ctx->pos, PAGE_SIZE, page, REQ_OP_READ, 0,
-                         false))
-               return -EIO;
+       ret = r5l_recovery_read_page(log, ctx, page, ctx->pos);
+       if (ret != 0)
+               return ret;
 
        mb = page_address(page);
        stored_crc = le32_to_cpu(mb->checksum);
@@ -1589,8 +1794,7 @@ static void r5l_recovery_load_data(struct r5l_log *log,
        raid5_compute_sector(conf,
                             le64_to_cpu(payload->location), 0,
                             &dd_idx, sh);
-       sync_page_io(log->rdev, log_offset, PAGE_SIZE,
-                    sh->dev[dd_idx].page, REQ_OP_READ, 0, false);
+       r5l_recovery_read_page(log, ctx, sh->dev[dd_idx].page, log_offset);
        sh->dev[dd_idx].log_checksum =
                le32_to_cpu(payload->checksum[0]);
        ctx->meta_total_blocks += BLOCK_SECTORS;
@@ -1609,17 +1813,15 @@ static void r5l_recovery_load_parity(struct r5l_log *log,
        struct r5conf *conf = mddev->private;
 
        ctx->meta_total_blocks += BLOCK_SECTORS * conf->max_degraded;
-       sync_page_io(log->rdev, log_offset, PAGE_SIZE,
-                    sh->dev[sh->pd_idx].page, REQ_OP_READ, 0, false);
+       r5l_recovery_read_page(log, ctx, sh->dev[sh->pd_idx].page, log_offset);
        sh->dev[sh->pd_idx].log_checksum =
                le32_to_cpu(payload->checksum[0]);
        set_bit(R5_Wantwrite, &sh->dev[sh->pd_idx].flags);
 
        if (sh->qd_idx >= 0) {
-               sync_page_io(log->rdev,
-                            r5l_ring_add(log, log_offset, BLOCK_SECTORS),
-                            PAGE_SIZE, sh->dev[sh->qd_idx].page,
-                            REQ_OP_READ, 0, false);
+               r5l_recovery_read_page(
+                       log, ctx, sh->dev[sh->qd_idx].page,
+                       r5l_ring_add(log, log_offset, BLOCK_SECTORS));
                sh->dev[sh->qd_idx].log_checksum =
                        le32_to_cpu(payload->checksum[1]);
                set_bit(R5_Wantwrite, &sh->dev[sh->qd_idx].flags);
@@ -1750,14 +1952,15 @@ r5c_recovery_replay_stripes(struct list_head *cached_stripe_list,
 
 /* if matches return 0; otherwise return -EINVAL */
 static int
-r5l_recovery_verify_data_checksum(struct r5l_log *log, struct page *page,
+r5l_recovery_verify_data_checksum(struct r5l_log *log,
+                                 struct r5l_recovery_ctx *ctx,
+                                 struct page *page,
                                  sector_t log_offset, __le32 log_checksum)
 {
        void *addr;
        u32 checksum;
 
-       sync_page_io(log->rdev, log_offset, PAGE_SIZE,
-                    page, REQ_OP_READ, 0, false);
+       r5l_recovery_read_page(log, ctx, page, log_offset);
        addr = kmap_atomic(page);
        checksum = crc32c_le(log->uuid_checksum, addr, PAGE_SIZE);
        kunmap_atomic(addr);
@@ -1779,6 +1982,7 @@ r5l_recovery_verify_data_checksum_for_mb(struct r5l_log *log,
        sector_t log_offset = r5l_ring_add(log, ctx->pos, BLOCK_SECTORS);
        struct page *page;
        struct r5l_payload_data_parity *payload;
+       struct r5l_payload_flush *payload_flush;
 
        page = alloc_page(GFP_KERNEL);
        if (!page)
@@ -1786,33 +1990,42 @@ r5l_recovery_verify_data_checksum_for_mb(struct r5l_log *log,
 
        while (mb_offset < le32_to_cpu(mb->meta_size)) {
                payload = (void *)mb + mb_offset;
+               payload_flush = (void *)mb + mb_offset;
 
-               if (payload->header.type == R5LOG_PAYLOAD_DATA) {
+               if (le16_to_cpu(payload->header.type) == R5LOG_PAYLOAD_DATA) {
                        if (r5l_recovery_verify_data_checksum(
-                                   log, page, log_offset,
+                                   log, ctx, page, log_offset,
                                    payload->checksum[0]) < 0)
                                goto mismatch;
-               } else if (payload->header.type == R5LOG_PAYLOAD_PARITY) {
+               } else if (le16_to_cpu(payload->header.type) == R5LOG_PAYLOAD_PARITY) {
                        if (r5l_recovery_verify_data_checksum(
-                                   log, page, log_offset,
+                                   log, ctx, page, log_offset,
                                    payload->checksum[0]) < 0)
                                goto mismatch;
                        if (conf->max_degraded == 2 && /* q for RAID 6 */
                            r5l_recovery_verify_data_checksum(
-                                   log, page,
+                                   log, ctx, page,
                                    r5l_ring_add(log, log_offset,
                                                 BLOCK_SECTORS),
                                    payload->checksum[1]) < 0)
                                goto mismatch;
-               } else /* not R5LOG_PAYLOAD_DATA or R5LOG_PAYLOAD_PARITY */
+               } else if (le16_to_cpu(payload->header.type) == R5LOG_PAYLOAD_FLUSH) {
+                       /* nothing to do for R5LOG_PAYLOAD_FLUSH here */
+               } else /* not R5LOG_PAYLOAD_DATA/PARITY/FLUSH */
                        goto mismatch;
 
-               log_offset = r5l_ring_add(log, log_offset,
-                                         le32_to_cpu(payload->size));
+               if (le16_to_cpu(payload->header.type) == R5LOG_PAYLOAD_FLUSH) {
+                       mb_offset += sizeof(struct r5l_payload_flush) +
+                               le32_to_cpu(payload_flush->size);
+               } else {
+                       /* DATA or PARITY payload */
+                       log_offset = r5l_ring_add(log, log_offset,
+                                                 le32_to_cpu(payload->size));
+                       mb_offset += sizeof(struct r5l_payload_data_parity) +
+                               sizeof(__le32) *
+                               (le32_to_cpu(payload->size) >> (PAGE_SHIFT - 9));
+               }
 
-               mb_offset += sizeof(struct r5l_payload_data_parity) +
-                       sizeof(__le32) *
-                       (le32_to_cpu(payload->size) >> (PAGE_SHIFT - 9));
        }
 
        put_page(page);
@@ -1840,6 +2053,7 @@ r5c_recovery_analyze_meta_block(struct r5l_log *log,
        struct r5conf *conf = mddev->private;
        struct r5l_meta_block *mb;
        struct r5l_payload_data_parity *payload;
+       struct r5l_payload_flush *payload_flush;
        int mb_offset;
        sector_t log_offset;
        sector_t stripe_sect;
@@ -1865,7 +2079,31 @@ r5c_recovery_analyze_meta_block(struct r5l_log *log,
                int dd;
 
                payload = (void *)mb + mb_offset;
-               stripe_sect = (payload->header.type == R5LOG_PAYLOAD_DATA) ?
+               payload_flush = (void *)mb + mb_offset;
+
+               if (le16_to_cpu(payload->header.type) == R5LOG_PAYLOAD_FLUSH) {
+                       int i, count;
+
+                       count = le32_to_cpu(payload_flush->size) / sizeof(__le64);
+                       for (i = 0; i < count; ++i) {
+                               stripe_sect = le64_to_cpu(payload_flush->flush_stripes[i]);
+                               sh = r5c_recovery_lookup_stripe(cached_stripe_list,
+                                                               stripe_sect);
+                               if (sh) {
+                                       WARN_ON(test_bit(STRIPE_R5C_CACHING, &sh->state));
+                                       r5l_recovery_reset_stripe(sh);
+                                       list_del_init(&sh->lru);
+                                       raid5_release_stripe(sh);
+                               }
+                       }
+
+                       mb_offset += sizeof(struct r5l_payload_flush) +
+                               le32_to_cpu(payload_flush->size);
+                       continue;
+               }
+
+               /* DATA or PARITY payload */
+               stripe_sect = (le16_to_cpu(payload->header.type) == R5LOG_PAYLOAD_DATA) ?
                        raid5_compute_sector(
                                conf, le64_to_cpu(payload->location), 0, &dd,
                                NULL)
@@ -1903,7 +2141,7 @@ r5c_recovery_analyze_meta_block(struct r5l_log *log,
                        list_add_tail(&sh->lru, cached_stripe_list);
                }
 
-               if (payload->header.type == R5LOG_PAYLOAD_DATA) {
+               if (le16_to_cpu(payload->header.type) == R5LOG_PAYLOAD_DATA) {
                        if (!test_bit(STRIPE_R5C_CACHING, &sh->state) &&
                            test_bit(R5_Wantwrite, &sh->dev[sh->pd_idx].flags)) {
                                r5l_recovery_replay_one_stripe(conf, sh, ctx);
@@ -1911,7 +2149,7 @@ r5c_recovery_analyze_meta_block(struct r5l_log *log,
                        }
                        r5l_recovery_load_data(log, sh, ctx, payload,
                                               log_offset);
-               } else if (payload->header.type == R5LOG_PAYLOAD_PARITY)
+               } else if (le16_to_cpu(payload->header.type) == R5LOG_PAYLOAD_PARITY)
                        r5l_recovery_load_parity(log, sh, ctx, payload,
                                                 log_offset);
                else
@@ -2113,7 +2351,7 @@ r5c_recovery_rewrite_data_only_stripes(struct r5l_log *log,
                                payload = (void *)mb + offset;
                                payload->header.type = cpu_to_le16(
                                        R5LOG_PAYLOAD_DATA);
-                               payload->size = BLOCK_SECTORS;
+                               payload->size = cpu_to_le32(BLOCK_SECTORS);
                                payload->location = cpu_to_le64(
                                        raid5_compute_blocknr(sh, i, 0));
                                addr = kmap_atomic(dev->page);
@@ -2177,55 +2415,70 @@ static void r5c_recovery_flush_data_only_stripes(struct r5l_log *log,
 static int r5l_recovery_log(struct r5l_log *log)
 {
        struct mddev *mddev = log->rdev->mddev;
-       struct r5l_recovery_ctx ctx;
+       struct r5l_recovery_ctx *ctx;
        int ret;
        sector_t pos;
 
-       ctx.pos = log->last_checkpoint;
-       ctx.seq = log->last_cp_seq;
-       ctx.meta_page = alloc_page(GFP_KERNEL);
-       ctx.data_only_stripes = 0;
-       ctx.data_parity_stripes = 0;
-       INIT_LIST_HEAD(&ctx.cached_list);
-
-       if (!ctx.meta_page)
+       ctx = kzalloc(sizeof(*ctx), GFP_KERNEL);
+       if (!ctx)
                return -ENOMEM;
 
-       ret = r5c_recovery_flush_log(log, &ctx);
-       __free_page(ctx.meta_page);
+       ctx->pos = log->last_checkpoint;
+       ctx->seq = log->last_cp_seq;
+       INIT_LIST_HEAD(&ctx->cached_list);
+       ctx->meta_page = alloc_page(GFP_KERNEL);
 
-       if (ret)
-               return ret;
+       if (!ctx->meta_page) {
+               ret =  -ENOMEM;
+               goto meta_page;
+       }
+
+       if (r5l_recovery_allocate_ra_pool(log, ctx) != 0) {
+               ret = -ENOMEM;
+               goto ra_pool;
+       }
 
-       pos = ctx.pos;
-       ctx.seq += 10000;
+       ret = r5c_recovery_flush_log(log, ctx);
+
+       if (ret)
+               goto error;
 
+       pos = ctx->pos;
+       ctx->seq += 10000;
 
-       if ((ctx.data_only_stripes == 0) && (ctx.data_parity_stripes == 0))
+       if ((ctx->data_only_stripes == 0) && (ctx->data_parity_stripes == 0))
                pr_debug("md/raid:%s: starting from clean shutdown\n",
                         mdname(mddev));
        else
                pr_debug("md/raid:%s: recovering %d data-only stripes and %d data-parity stripes\n",
-                        mdname(mddev), ctx.data_only_stripes,
-                        ctx.data_parity_stripes);
-
-       if (ctx.data_only_stripes == 0) {
-               log->next_checkpoint = ctx.pos;
-               r5l_log_write_empty_meta_block(log, ctx.pos, ctx.seq++);
-               ctx.pos = r5l_ring_add(log, ctx.pos, BLOCK_SECTORS);
-       } else if (r5c_recovery_rewrite_data_only_stripes(log, &ctx)) {
+                        mdname(mddev), ctx->data_only_stripes,
+                        ctx->data_parity_stripes);
+
+       if (ctx->data_only_stripes == 0) {
+               log->next_checkpoint = ctx->pos;
+               r5l_log_write_empty_meta_block(log, ctx->pos, ctx->seq++);
+               ctx->pos = r5l_ring_add(log, ctx->pos, BLOCK_SECTORS);
+       } else if (r5c_recovery_rewrite_data_only_stripes(log, ctx)) {
                pr_err("md/raid:%s: failed to rewrite stripes to journal\n",
                       mdname(mddev));
-               return -EIO;
+               ret =  -EIO;
+               goto error;
        }
 
-       log->log_start = ctx.pos;
-       log->seq = ctx.seq;
+       log->log_start = ctx->pos;
+       log->seq = ctx->seq;
        log->last_checkpoint = pos;
        r5l_write_super(log, pos);
 
-       r5c_recovery_flush_data_only_stripes(log, &ctx);
-       return 0;
+       r5c_recovery_flush_data_only_stripes(log, ctx);
+       ret = 0;
+error:
+       r5l_recovery_free_ra_pool(log, ctx);
+ra_pool:
+       __free_page(ctx->meta_page);
+meta_page:
+       kfree(ctx);
+       return ret;
 }
 
 static void r5l_write_super(struct r5l_log *log, sector_t cp)
@@ -2263,40 +2516,56 @@ static ssize_t r5c_journal_mode_show(struct mddev *mddev, char *page)
        return ret;
 }
 
-static ssize_t r5c_journal_mode_store(struct mddev *mddev,
-                                     const char *page, size_t length)
+/*
+ * Set journal cache mode on @mddev (external API initially needed by dm-raid).
+ *
+ * @mode as defined in 'enum r5c_journal_mode'.
+ *
+ */
+int r5c_journal_mode_set(struct mddev *mddev, int mode)
 {
        struct r5conf *conf = mddev->private;
        struct r5l_log *log = conf->log;
-       int val = -1, i;
-       int len = length;
 
        if (!log)
                return -ENODEV;
 
-       if (len && page[len - 1] == '\n')
-               len -= 1;
-       for (i = 0; i < ARRAY_SIZE(r5c_journal_mode_str); i++)
-               if (strlen(r5c_journal_mode_str[i]) == len &&
-                   strncmp(page, r5c_journal_mode_str[i], len) == 0) {
-                       val = i;
-                       break;
-               }
-       if (val < R5C_JOURNAL_MODE_WRITE_THROUGH ||
-           val > R5C_JOURNAL_MODE_WRITE_BACK)
+       if (mode < R5C_JOURNAL_MODE_WRITE_THROUGH ||
+           mode > R5C_JOURNAL_MODE_WRITE_BACK)
                return -EINVAL;
 
        if (raid5_calc_degraded(conf) > 0 &&
-           val == R5C_JOURNAL_MODE_WRITE_BACK)
+           mode == R5C_JOURNAL_MODE_WRITE_BACK)
                return -EINVAL;
 
        mddev_suspend(mddev);
-       conf->log->r5c_journal_mode = val;
+       conf->log->r5c_journal_mode = mode;
        mddev_resume(mddev);
 
        pr_debug("md/raid:%s: setting r5c cache mode to %d: %s\n",
-                mdname(mddev), val, r5c_journal_mode_str[val]);
-       return length;
+                mdname(mddev), mode, r5c_journal_mode_str[mode]);
+       return 0;
+}
+EXPORT_SYMBOL(r5c_journal_mode_set);
+
+static ssize_t r5c_journal_mode_store(struct mddev *mddev,
+                                     const char *page, size_t length)
+{
+       int mode = ARRAY_SIZE(r5c_journal_mode_str);
+       size_t len = length;
+
+       if (len < 2)
+               return -EINVAL;
+
+       if (page[len - 1] == '\n')
+               len--;
+
+       while (mode--)
+               if (strlen(r5c_journal_mode_str[mode]) == len &&
+                   !strncmp(page, r5c_journal_mode_str[mode], len))
+                       break;
+
+       return r5c_journal_mode_set(mddev, mode) ?: length;
 }
 
 struct md_sysfs_entry
@@ -2320,6 +2589,10 @@ int r5c_try_caching_write(struct r5conf *conf,
        int i;
        struct r5dev *dev;
        int to_cache = 0;
+       void **pslot;
+       sector_t tree_index;
+       int ret;
+       uintptr_t refcount;
 
        BUG_ON(!r5c_is_writeback(log));
 
@@ -2364,6 +2637,44 @@ int r5c_try_caching_write(struct r5conf *conf,
                }
        }
 
+       /* if the stripe is not counted in big_stripe_tree, add it now */
+       if (!test_bit(STRIPE_R5C_PARTIAL_STRIPE, &sh->state) &&
+           !test_bit(STRIPE_R5C_FULL_STRIPE, &sh->state)) {
+               tree_index = r5c_tree_index(conf, sh->sector);
+               spin_lock(&log->tree_lock);
+               pslot = radix_tree_lookup_slot(&log->big_stripe_tree,
+                                              tree_index);
+               if (pslot) {
+                       refcount = (uintptr_t)radix_tree_deref_slot_protected(
+                               pslot, &log->tree_lock) >>
+                               R5C_RADIX_COUNT_SHIFT;
+                       radix_tree_replace_slot(
+                               &log->big_stripe_tree, pslot,
+                               (void *)((refcount + 1) << R5C_RADIX_COUNT_SHIFT));
+               } else {
+                       /*
+                        * this radix_tree_insert can fail safely, so no
+                        * need to call radix_tree_preload()
+                        */
+                       ret = radix_tree_insert(
+                               &log->big_stripe_tree, tree_index,
+                               (void *)(1 << R5C_RADIX_COUNT_SHIFT));
+                       if (ret) {
+                               spin_unlock(&log->tree_lock);
+                               r5c_make_stripe_write_out(sh);
+                               return -EAGAIN;
+                       }
+               }
+               spin_unlock(&log->tree_lock);
+
+               /*
+                * set STRIPE_R5C_PARTIAL_STRIPE, this shows the stripe is
+                * counted in the radix tree
+                */
+               set_bit(STRIPE_R5C_PARTIAL_STRIPE, &sh->state);
+               atomic_inc(&conf->r5c_cached_partial_stripes);
+       }
+
        for (i = disks; i--; ) {
                dev = &sh->dev[i];
                if (dev->towrite) {
@@ -2438,17 +2749,20 @@ void r5c_finish_stripe_write_out(struct r5conf *conf,
                                 struct stripe_head *sh,
                                 struct stripe_head_state *s)
 {
+       struct r5l_log *log = conf->log;
        int i;
        int do_wakeup = 0;
+       sector_t tree_index;
+       void **pslot;
+       uintptr_t refcount;
 
-       if (!conf->log ||
-           !test_bit(R5_InJournal, &sh->dev[sh->pd_idx].flags))
+       if (!log || !test_bit(R5_InJournal, &sh->dev[sh->pd_idx].flags))
                return;
 
        WARN_ON(test_bit(STRIPE_R5C_CACHING, &sh->state));
        clear_bit(R5_InJournal, &sh->dev[sh->pd_idx].flags);
 
-       if (conf->log->r5c_journal_mode == R5C_JOURNAL_MODE_WRITE_THROUGH)
+       if (log->r5c_journal_mode == R5C_JOURNAL_MODE_WRITE_THROUGH)
                return;
 
        for (i = sh->disks; i--; ) {
@@ -2470,17 +2784,50 @@ void r5c_finish_stripe_write_out(struct r5conf *conf,
        if (do_wakeup)
                wake_up(&conf->wait_for_overlap);
 
-       spin_lock_irq(&conf->log->stripe_in_journal_lock);
+       spin_lock_irq(&log->stripe_in_journal_lock);
        list_del_init(&sh->r5c);
-       spin_unlock_irq(&conf->log->stripe_in_journal_lock);
+       spin_unlock_irq(&log->stripe_in_journal_lock);
        sh->log_start = MaxSector;
-       atomic_dec(&conf->log->stripe_in_journal_count);
-       r5c_update_log_state(conf->log);
+
+       atomic_dec(&log->stripe_in_journal_count);
+       r5c_update_log_state(log);
+
+       /* stop counting this stripe in big_stripe_tree */
+       if (test_bit(STRIPE_R5C_PARTIAL_STRIPE, &sh->state) ||
+           test_bit(STRIPE_R5C_FULL_STRIPE, &sh->state)) {
+               tree_index = r5c_tree_index(conf, sh->sector);
+               spin_lock(&log->tree_lock);
+               pslot = radix_tree_lookup_slot(&log->big_stripe_tree,
+                                              tree_index);
+               BUG_ON(pslot == NULL);
+               refcount = (uintptr_t)radix_tree_deref_slot_protected(
+                       pslot, &log->tree_lock) >>
+                       R5C_RADIX_COUNT_SHIFT;
+               if (refcount == 1)
+                       radix_tree_delete(&log->big_stripe_tree, tree_index);
+               else
+                       radix_tree_replace_slot(
+                               &log->big_stripe_tree, pslot,
+                               (void *)((refcount - 1) << R5C_RADIX_COUNT_SHIFT));
+               spin_unlock(&log->tree_lock);
+       }
+
+       if (test_and_clear_bit(STRIPE_R5C_PARTIAL_STRIPE, &sh->state)) {
+               BUG_ON(atomic_read(&conf->r5c_cached_partial_stripes) == 0);
+               atomic_dec(&conf->r5c_flushing_partial_stripes);
+               atomic_dec(&conf->r5c_cached_partial_stripes);
+       }
+
+       if (test_and_clear_bit(STRIPE_R5C_FULL_STRIPE, &sh->state)) {
+               BUG_ON(atomic_read(&conf->r5c_cached_full_stripes) == 0);
+               atomic_dec(&conf->r5c_flushing_full_stripes);
+               atomic_dec(&conf->r5c_cached_full_stripes);
+       }
+
+       r5l_append_flush_payload(log, sh->sector);
 }
 
-int
-r5c_cache_data(struct r5l_log *log, struct stripe_head *sh,
-              struct stripe_head_state *s)
+int r5c_cache_data(struct r5l_log *log, struct stripe_head *sh)
 {
        struct r5conf *conf = sh->raid_conf;
        int pages = 0;
@@ -2535,6 +2882,22 @@ r5c_cache_data(struct r5l_log *log, struct stripe_head *sh,
        return 0;
 }
 
+/* check whether this big stripe is in write back cache. */
+bool r5c_big_stripe_cached(struct r5conf *conf, sector_t sect)
+{
+       struct r5l_log *log = conf->log;
+       sector_t tree_index;
+       void *slot;
+
+       if (!log)
+               return false;
+
+       WARN_ON_ONCE(!rcu_read_lock_held());
+       tree_index = r5c_tree_index(conf, sect);
+       slot = radix_tree_lookup(&log->big_stripe_tree, tree_index);
+       return slot != NULL;
+}
+
 static int r5l_load_log(struct r5l_log *log)
 {
        struct md_rdev *rdev = log->rdev;
@@ -2627,6 +2990,10 @@ int r5l_init_log(struct r5conf *conf, struct md_rdev *rdev)
 {
        struct request_queue *q = bdev_get_queue(rdev->bdev);
        struct r5l_log *log;
+       char b[BDEVNAME_SIZE];
+
+       pr_debug("md/raid:%s: using device %s as journal\n",
+                mdname(conf->mddev), bdevname(rdev->bdev, b));
 
        if (PAGE_SIZE != 4096)
                return -EINVAL;
@@ -2681,6 +3048,9 @@ int r5l_init_log(struct r5conf *conf, struct md_rdev *rdev)
        if (!log->meta_pool)
                goto out_mempool;
 
+       spin_lock_init(&log->tree_lock);
+       INIT_RADIX_TREE(&log->big_stripe_tree, GFP_NOWAIT | __GFP_NOWARN);
+
        log->reclaim_thread = md_register_thread(r5l_reclaim_thread,
                                                 log->rdev->mddev, "reclaim");
        if (!log->reclaim_thread)
@@ -2726,8 +3096,13 @@ io_kc:
        return -EINVAL;
 }
 
-void r5l_exit_log(struct r5l_log *log)
+void r5l_exit_log(struct r5conf *conf)
 {
+       struct r5l_log *log = conf->log;
+
+       conf->log = NULL;
+       synchronize_rcu();
+
        flush_work(&log->disable_writeback_work);
        md_unregister_thread(&log->reclaim_thread);
        mempool_destroy(log->meta_pool);