]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/spdk/ocf/src/mngt/ocf_mngt_flush.c
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / spdk / ocf / src / mngt / ocf_mngt_flush.c
index 2728e6f9ebc38fed8ff425858374bda6ca4967c2..78e63c2fafc7609bb4f49c1297b7a167a9001da3 100644 (file)
@@ -15,7 +15,7 @@
 #include "../utils/utils_part.h"
 #include "../utils/utils_pipeline.h"
 #include "../utils/utils_refcnt.h"
-#include "../utils/utils_req.h"
+#include "../ocf_request.h"
 #include "../ocf_def_priv.h"
 
 struct ocf_mngt_cache_flush_context;
@@ -46,8 +46,11 @@ struct ocf_mngt_cache_flush_context
        ocf_cache_t cache;
        /* target core */
        ocf_core_t core;
-       /* true if flush interrupt respected */
-       bool allow_interruption;
+
+       struct {
+               bool lock : 1;
+               bool freeze : 1;
+       } flags;
 
        /* management operation identifier */
        enum {
@@ -57,7 +60,7 @@ struct ocf_mngt_cache_flush_context
                purge_core
        } op;
 
-       /* ocf mngmt entry point completion */
+       /* ocf management entry point completion */
        union {
                ocf_mngt_cache_flush_end_t flush_cache;
                ocf_mngt_core_flush_end_t flush_core;
@@ -89,38 +92,37 @@ static void _ocf_mngt_begin_flush(ocf_pipeline_t pipeline, void *priv,
 {
        struct ocf_mngt_cache_flush_context *context = priv;
        ocf_cache_t cache = context->cache;
+       int result;
 
        /* FIXME: need mechanism for async waiting for outstanding flushes to
         * finish */
-       env_mutex_lock(&cache->flush_mutex);
+       result = env_mutex_trylock(&cache->flush_mutex);
+       if (result)
+               OCF_PL_FINISH_RET(pipeline, -OCF_ERR_FLUSH_IN_PROGRESS);
+       context->flags.lock = true;
 
-       ocf_refcnt_freeze(&cache->dirty);
+       ocf_refcnt_freeze(&cache->refcnt.dirty);
+       context->flags.freeze = true;
 
-       ocf_refcnt_register_zero_cb(&cache->dirty,
+       ocf_refcnt_register_zero_cb(&cache->refcnt.dirty,
                        _ocf_mngt_begin_flush_complete, context);
 }
 
-static void _ocf_mngt_end_flush(ocf_cache_t cache)
+bool ocf_mngt_core_is_dirty(ocf_core_t core)
 {
-       ocf_refcnt_unfreeze(&cache->dirty);
-
-       env_mutex_unlock(&cache->flush_mutex);
+       return !!env_atomic_read(&core->runtime_meta->dirty_clines);
 }
 
 bool ocf_mngt_cache_is_dirty(ocf_cache_t cache)
 {
-       uint32_t i;
+       ocf_core_t core;
+       ocf_core_id_t core_id;
 
        OCF_CHECK_NULL(cache);
 
-       for (i = 0; i < OCF_CORE_MAX; ++i) {
-               if (!cache->core_conf_meta[i].added)
-                       continue;
-
-               if (env_atomic_read(&(cache->core_runtime_meta[i].
-                               dirty_clines))) {
+       for_each_core(cache, core, core_id) {
+               if (ocf_mngt_core_is_dirty(core))
                        return true;
-               }
        }
 
        return false;
@@ -135,64 +137,71 @@ bool ocf_mngt_cache_is_dirty(ocf_cache_t cache)
  * NOTE:
  * Table is not sorted.
  */
-static int _ocf_mngt_get_sectors(struct ocf_cache *cache, int core_id,
+static int _ocf_mngt_get_sectors(ocf_cache_t cache, ocf_core_id_t core_id,
                struct flush_data **tbl, uint32_t *num)
 {
+       ocf_core_t core = ocf_cache_get_core(cache, core_id);
        uint64_t core_line;
        ocf_core_id_t i_core_id;
-       struct flush_data *p;
-       uint32_t i, j, dirty = 0;
+       struct flush_data *elem;
+       uint32_t line, dirty_found = 0, dirty_total = 0;
+       unsigned ret = 0;
+
+       ocf_metadata_start_exclusive_access(&cache->metadata.lock);
 
-       dirty = env_atomic_read(&cache->core_runtime_meta[core_id].
-                       dirty_clines);
-       if (!dirty) {
+       dirty_total = env_atomic_read(&core->runtime_meta->dirty_clines);
+       if (!dirty_total) {
                *num = 0;
                *tbl = NULL;
-               return 0;
-       }
-
-       p = env_vmalloc(dirty * sizeof(**tbl));
-       if (!p)
-               return -OCF_ERR_NO_MEM;
-
-       for (i = 0, j = 0; i < cache->device->collision_table_entries; i++) {
-               ocf_metadata_get_core_info(cache, i, &i_core_id, &core_line);
-
-               if (i_core_id != core_id)
-                       continue;
-
-               if (!metadata_test_valid_any(cache, i))
-                       continue;
-
-               if (!metadata_test_dirty(cache, i))
-                       continue;
-
-               if (ocf_cache_line_is_used(cache, i))
-                       continue;
+               goto unlock;
+       }
+
+       *tbl = env_vmalloc(dirty_total * sizeof(**tbl));
+       if (*tbl == NULL) {
+               ret = -OCF_ERR_NO_MEM;
+               goto unlock;
+       }
+
+       for (line = 0, elem = *tbl;
+                       line < cache->device->collision_table_entries;
+                       line++) {
+               ocf_metadata_get_core_info(cache, line, &i_core_id,
+                               &core_line);
+
+               if (i_core_id == core_id &&
+                               metadata_test_valid_any(cache, line) &&
+                               metadata_test_dirty(cache, line)) {
+                       /* It's valid and dirty target core cacheline */
+                       elem->cache_line = line;
+                       elem->core_line = core_line;
+                       elem->core_id = i_core_id;
+                       elem++;
+                       dirty_found++;
+
+                       /* stop if all cachelines were found */
+                       if (dirty_found == dirty_total)
+                               break;
+               }
 
-               /* It's core_id cacheline and it's valid and it's dirty! */
-               p[j].cache_line = i;
-               p[j].core_line = core_line;
-               p[j].core_id = i_core_id;
-               j++;
-               /* stop if all cachelines were found */
-               if (j == dirty)
-                       break;
+               if ((line + 1) % 131072 == 0) {
+                       ocf_metadata_end_exclusive_access(
+                                       &cache->metadata.lock);
+                       env_cond_resched();
+                       ocf_metadata_start_exclusive_access(
+                                       &cache->metadata.lock);
+               }
        }
 
-       ocf_core_log(&cache->core[core_id], log_debug,
-                       "%u dirty cache lines to clean\n", j);
+       ocf_core_log(core, log_debug,
+                       "%u dirty cache lines to clean\n", dirty_found);
 
-       if (dirty != j) {
-               ocf_cache_log(cache, log_debug, "Wrong number of dirty "
-                               "blocks for flushing core %s (%u!=%u)\n",
-                               cache->core[core_id].name, j, dirty);
-       }
 
+       *num = dirty_found;
 
-       *tbl = p;
-       *num = j;
-       return 0;
+unlock:
+       ocf_metadata_end_exclusive_access(&cache->metadata.lock);
+
+       return ret;
 }
 
 static int _ocf_mngt_get_flush_containers(ocf_cache_t cache,
@@ -204,8 +213,12 @@ static int _ocf_mngt_get_flush_containers(ocf_cache_t cache,
        uint32_t num;
        uint64_t core_line;
        ocf_core_id_t core_id;
-       uint32_t i, j, dirty = 0;
-       int step = 0;
+       ocf_core_t core;
+       uint32_t i, j = 0, line;
+       uint32_t dirty_found = 0, dirty_total = 0;
+       int ret = 0;
+
+       ocf_metadata_start_exclusive_access(&cache->metadata.lock);
 
        /*
         * TODO: Create containers for each physical device, not for
@@ -214,7 +227,7 @@ static int _ocf_mngt_get_flush_containers(ocf_cache_t cache,
        num = cache->conf_meta->core_count;
        if (num == 0) {
                *fcnum = 0;
-               return 0;
+               goto unlock;
        }
 
        core_revmap = env_vzalloc(sizeof(*core_revmap) * OCF_CORE_MAX);
@@ -225,20 +238,18 @@ static int _ocf_mngt_get_flush_containers(ocf_cache_t cache,
        fc = env_vzalloc(sizeof(**fctbl) * num);
        if (!fc) {
                env_vfree(core_revmap);
-               return -OCF_ERR_NO_MEM;
+               ret = -OCF_ERR_NO_MEM;
+               goto unlock;
        }
 
-       for (i = 0, j = 0; i < OCF_CORE_MAX; i++) {
-               if (!env_bit_test(i, cache->conf_meta->valid_core_bitmap))
-                       continue;
-
-               fc[j].core_id = i;
-               core_revmap[i] = j;
+       for_each_core(cache, core, core_id) {
+               fc[j].core_id = core_id;
+               core_revmap[core_id] = j;
 
                /* Check for dirty blocks */
-               fc[j].count = env_atomic_read(&cache->
-                               core_runtime_meta[i].dirty_clines);
-               dirty += fc[j].count;
+               fc[j].count = env_atomic_read(
+                               &core->runtime_meta->dirty_clines);
+               dirty_total += fc[j].count;
 
                if (fc[j].count) {
                        fc[j].flush_data = env_vmalloc(fc[j].count *
@@ -249,46 +260,44 @@ static int _ocf_mngt_get_flush_containers(ocf_cache_t cache,
                        break;
        }
 
-       if (!dirty) {
+       if (!dirty_total) {
                env_vfree(core_revmap);
                env_vfree(fc);
                *fcnum = 0;
-               return 0;
+               goto unlock;
        }
 
-       for (i = 0, j = 0; i < cache->device->collision_table_entries; i++) {
-               ocf_metadata_get_core_info(cache, i, &core_id, &core_line);
-
-               if (!metadata_test_valid_any(cache, i))
-                       continue;
-
-               if (!metadata_test_dirty(cache, i))
-                       continue;
-
-               if (ocf_cache_line_is_used(cache, i))
-                       continue;
+       for (line = 0; line < cache->device->collision_table_entries; line++) {
+               ocf_metadata_get_core_info(cache, line, &core_id, &core_line);
 
-               curr = &fc[core_revmap[core_id]];
+               if (metadata_test_valid_any(cache, line) &&
+                               metadata_test_dirty(cache, line)) {
+                       curr = &fc[core_revmap[core_id]];
 
-               ENV_BUG_ON(curr->iter >= curr->count);
+                       ENV_BUG_ON(curr->iter >= curr->count);
 
-               /* It's core_id cacheline and it's valid and it's dirty! */
-               curr->flush_data[curr->iter].cache_line = i;
-               curr->flush_data[curr->iter].core_line = core_line;
-               curr->flush_data[curr->iter].core_id = core_id;
-               curr->iter++;
+                       /* It's core_id cacheline and it's valid and it's dirty! */
+                       curr->flush_data[curr->iter].cache_line = line;
+                       curr->flush_data[curr->iter].core_line = core_line;
+                       curr->flush_data[curr->iter].core_id = core_id;
+                       curr->iter++;
+                       dirty_found++;
 
-               j++;
-               /* stop if all cachelines were found */
-               if (j == dirty)
-                       break;
+                       /* stop if all cachelines were found */
+                       if (dirty_found == dirty_total)
+                               break;
+               }
 
-               OCF_COND_RESCHED(step, 1000000)
+               if ((line + 1) % 131072 == 0) {
+                       ocf_metadata_end_exclusive_access(
+                                       &cache->metadata.lock);
+                       env_cond_resched();
+                       ocf_metadata_start_exclusive_access(
+                                       &cache->metadata.lock);
+               }
        }
 
-       if (dirty != j) {
-               ocf_cache_log(cache, log_debug, "Wrong number of dirty "
-                               "blocks (%u!=%u)\n", j, dirty);
+       if (dirty_total != dirty_found) {
                for (i = 0; i < num; i++)
                        fc[i].count = fc[i].iter;
        }
@@ -299,7 +308,10 @@ static int _ocf_mngt_get_flush_containers(ocf_cache_t cache,
        env_vfree(core_revmap);
        *fctbl = fc;
        *fcnum = num;
-       return 0;
+
+unlock:
+       ocf_metadata_end_exclusive_access(&cache->metadata.lock);
+       return ret;
 }
 
 static void _ocf_mngt_free_flush_containers(struct flush_container *fctbl,
@@ -364,18 +376,11 @@ static void _ocf_mngt_flush_portion_end(void *private_data, int error)
        env_atomic_cmpxchg(&fsc->error, 0, error);
 
        if (cache->flushing_interrupted) {
-               first_interrupt = !env_atomic_cmpxchg(&fsc->interrupt_seen, 0, 1);
+               first_interrupt = !env_atomic_cmpxchg(
+                               &fsc->interrupt_seen, 0, 1);
                if (first_interrupt) {
-                       if (context->allow_interruption) {
-                               ocf_cache_log(cache, log_info,
-                                       "Flushing interrupted by "
-                                       "user\n");
-                       } else {
-                               ocf_cache_log(cache, log_err,
-                                       "Cannot interrupt flushing\n");
-                       }
-               }
-               if (context->allow_interruption) {
+                       ocf_cache_log(cache, log_info,
+                                       "Flushing interrupted by user\n");
                        env_atomic_cmpxchg(&fsc->error, 0,
                                        -OCF_ERR_FLUSHING_INTERRUPTED);
                }
@@ -387,7 +392,7 @@ static void _ocf_mngt_flush_portion_end(void *private_data, int error)
                return;
        }
 
-       ocf_engine_push_req_front(fc->req, false);
+       ocf_engine_push_req_back(fc->req, false);
 }
 
 
@@ -396,9 +401,9 @@ static int _ofc_flush_container_step(struct ocf_request *req)
        struct flush_container *fc = req->priv;
        ocf_cache_t cache = fc->cache;
 
-       ocf_metadata_lock(cache, OCF_METADATA_WR);
+       ocf_metadata_start_exclusive_access(&cache->metadata.lock);
        _ocf_mngt_flush_portion(fc);
-       ocf_metadata_unlock(cache, OCF_METADATA_WR);
+       ocf_metadata_end_exclusive_access(&cache->metadata.lock);
 
        return 0;
 }
@@ -442,7 +447,7 @@ static void _ocf_mngt_flush_container(
        fc->ticks1 = 0;
        fc->ticks2 = UINT_MAX;
 
-       ocf_engine_push_req_front(fc->req, true);
+       ocf_engine_push_req_back(fc->req, true);
        return;
 
 finish:
@@ -512,8 +517,6 @@ static void _ocf_mngt_flush_core(
                return;
        }
 
-       ocf_metadata_lock(cache, OCF_METADATA_WR);
-
        ret = _ocf_mngt_get_sectors(cache, core_id,
                        &fc->flush_data, &fc->count);
        if (ret) {
@@ -528,8 +531,6 @@ static void _ocf_mngt_flush_core(
        fc->iter = 0;
 
        _ocf_mngt_flush_containers(context, fc, 1, complete);
-
-       ocf_metadata_unlock(cache, OCF_METADATA_WR);
 }
 
 static void _ocf_mngt_flush_all_cores(
@@ -548,20 +549,17 @@ static void _ocf_mngt_flush_all_cores(
 
        env_atomic_set(&cache->flush_in_progress, 1);
 
-       ocf_metadata_lock(cache, OCF_METADATA_WR);
-
        /* Get all 'dirty' sectors for all cores */
        ret = _ocf_mngt_get_flush_containers(cache, &fctbl, &fcnum);
        if (ret) {
                ocf_cache_log(cache, log_err, "Flushing operation aborted, "
                                "no memory\n");
+               ocf_metadata_end_exclusive_access(&cache->metadata.lock);
                complete(context, ret);
                return;
        }
 
        _ocf_mngt_flush_containers(context, fctbl, fcnum, complete);
-
-       ocf_metadata_unlock(cache, OCF_METADATA_WR);
 }
 
 static void _ocf_mngt_flush_all_cores_complete(
@@ -582,10 +580,8 @@ static void _ocf_mngt_flush_all_cores_complete(
                        break;
        }
 
-       if (error) {
-               ocf_pipeline_finish(context->pipeline, error);
-               return;
-       }
+       if (error)
+               OCF_PL_FINISH_RET(context->pipeline, error);
 
        if (context->op == flush_cache)
                ocf_cache_log(cache, log_info, "Flushing cache completed\n");
@@ -600,6 +596,7 @@ static void _ocf_mngt_cache_flush(ocf_pipeline_t pipeline, void *priv,
                ocf_pipeline_arg_t arg)
 {
        struct ocf_mngt_cache_flush_context *context = priv;
+
        context->cache->flushing_interrupted = 0;
        _ocf_mngt_flush_all_cores(context, _ocf_mngt_flush_all_cores_complete);
 }
@@ -610,43 +607,32 @@ static void _ocf_mngt_flush_finish(ocf_pipeline_t pipeline, void *priv,
 {
        struct ocf_mngt_cache_flush_context *context = priv;
        ocf_cache_t cache = context->cache;
-       int64_t core_id;
+       ocf_core_t core = context->core;
 
-       if (!error) {
-               switch(context->op) {
-               case flush_cache:
-               case purge_cache:
-                       ENV_BUG_ON(ocf_mngt_cache_is_dirty(cache));
-                       break;
-               case flush_core:
-               case purge_core:
-                       core_id = ocf_core_get_id(context->core);
-                       ENV_BUG_ON(env_atomic_read(&cache->core_runtime_meta
-                                       [core_id].dirty_clines));
-                       break;
-               }
-       }
+       if (context->flags.freeze)
+               ocf_refcnt_unfreeze(&cache->refcnt.dirty);
 
-       _ocf_mngt_end_flush(context->cache);
+       if (context->flags.lock)
+               env_mutex_unlock(&cache->flush_mutex);
 
        switch (context->op) {
        case flush_cache:
-               context->cmpl.flush_cache(context->cache, context->priv, error);
+               context->cmpl.flush_cache(cache, context->priv, error);
                break;
        case flush_core:
-               context->cmpl.flush_core(context->core, context->priv, error);
+               context->cmpl.flush_core(core, context->priv, error);
                break;
        case purge_cache:
-               context->cmpl.purge_cache(context->cache, context->priv, error);
+               context->cmpl.purge_cache(cache, context->priv, error);
                break;
        case purge_core:
-               context->cmpl.purge_core(context->core, context->priv, error);
+               context->cmpl.purge_core(core, context->priv, error);
                break;
        default:
                ENV_BUG();
        }
 
-       ocf_pipeline_destroy(context->pipeline);
+       ocf_pipeline_destroy(pipeline);
 }
 
 static struct ocf_pipeline_properties _ocf_mngt_cache_flush_pipeline_properties = {
@@ -659,7 +645,7 @@ static struct ocf_pipeline_properties _ocf_mngt_cache_flush_pipeline_properties
        },
 };
 
-void ocf_mngt_cache_flush(ocf_cache_t cache, bool interruption,
+void ocf_mngt_cache_flush(ocf_cache_t cache,
                ocf_mngt_cache_flush_end_t cmpl, void *priv)
 {
        ocf_pipeline_t pipeline;
@@ -671,37 +657,32 @@ void ocf_mngt_cache_flush(ocf_cache_t cache, bool interruption,
        if (!ocf_cache_is_device_attached(cache)) {
                ocf_cache_log(cache, log_err, "Cannot flush cache - "
                                "cache device is detached\n");
-               cmpl(cache, priv, -OCF_ERR_INVAL);
-               return;
+               OCF_CMPL_RET(cache, priv, -OCF_ERR_INVAL);
        }
 
        if (ocf_cache_is_incomplete(cache)) {
                ocf_cache_log(cache, log_err, "Cannot flush cache - "
                                "cache is in incomplete state\n");
-               cmpl(cache, priv, -OCF_ERR_CACHE_IN_INCOMPLETE_STATE);
-               return;
+               OCF_CMPL_RET(cache, priv, -OCF_ERR_CACHE_IN_INCOMPLETE_STATE);
        }
 
        if (!cache->mngt_queue) {
                ocf_cache_log(cache, log_err,
                                "Cannot flush cache - no flush queue set\n");
-               cmpl(cache, priv, -OCF_ERR_INVAL);
-               return;
+               OCF_CMPL_RET(cache, priv, -OCF_ERR_INVAL);
        }
 
        result = ocf_pipeline_create(&pipeline, cache,
                        &_ocf_mngt_cache_flush_pipeline_properties);
-       if (result) {
-               cmpl(cache, priv, -OCF_ERR_NO_MEM);
-               return;
-       }
+       if (result)
+               OCF_CMPL_RET(cache, priv, -OCF_ERR_NO_MEM);
+
        context = ocf_pipeline_get_priv(pipeline);
 
        context->pipeline = pipeline;
        context->cmpl.flush_cache = cmpl;
        context->priv = priv;
        context->cache = cache;
-       context->allow_interruption = interruption;
        context->op = flush_cache;
 
        ocf_pipeline_next(context->pipeline);
@@ -715,10 +696,8 @@ static void _ocf_mngt_flush_core_complete(
 
        env_atomic_set(&core->flushed, 0);
 
-       if (error) {
-               ocf_pipeline_finish(context->pipeline, error);
-               return;
-       }
+       if (error)
+               OCF_PL_FINISH_RET(context->pipeline, error);
 
        if (context->op == flush_core)
                ocf_cache_log(cache, log_info, "Flushing completed\n");
@@ -752,7 +731,7 @@ struct ocf_pipeline_properties _ocf_mngt_core_flush_pipeline_properties = {
        },
 };
 
-void ocf_mngt_core_flush(ocf_core_t core, bool interruption,
+void ocf_mngt_core_flush(ocf_core_t core,
                ocf_mngt_core_flush_end_t cmpl, void *priv)
 {
        ocf_pipeline_t pipeline;
@@ -767,37 +746,32 @@ void ocf_mngt_core_flush(ocf_core_t core, bool interruption,
        if (!ocf_cache_is_device_attached(cache)) {
                ocf_cache_log(cache, log_err, "Cannot flush core - "
                                "cache device is detached\n");
-               cmpl(core, priv, -OCF_ERR_INVAL);
-               return;
+               OCF_CMPL_RET(core, priv, -OCF_ERR_INVAL);
        }
 
        if (!core->opened) {
                ocf_core_log(core, log_err, "Cannot flush - core is in "
                                "inactive state\n");
-               cmpl(core, priv, -OCF_ERR_CORE_IN_INACTIVE_STATE);
-               return;
+               OCF_CMPL_RET(core, priv, -OCF_ERR_CORE_IN_INACTIVE_STATE);
        }
 
        if (!cache->mngt_queue) {
                ocf_core_log(core, log_err,
                                "Cannot flush core - no flush queue set\n");
-               cmpl(core, priv, -OCF_ERR_INVAL);
-               return;
+               OCF_CMPL_RET(core, priv, -OCF_ERR_INVAL);
        }
 
        result = ocf_pipeline_create(&pipeline, cache,
                        &_ocf_mngt_core_flush_pipeline_properties);
-       if (result) {
-               cmpl(core, priv, -OCF_ERR_NO_MEM);
-               return;
-       }
+       if (result)
+               OCF_CMPL_RET(core, priv, -OCF_ERR_NO_MEM);
+
        context = ocf_pipeline_get_priv(pipeline);
 
        context->pipeline = pipeline;
        context->cmpl.flush_core = cmpl;
        context->priv = priv;
        context->cache = cache;
-       context->allow_interruption = interruption;
        context->op = flush_core;
        context->core = core;
 
@@ -811,15 +785,12 @@ static void _ocf_mngt_cache_invalidate(ocf_pipeline_t pipeline, void *priv,
        ocf_cache_t cache = context->cache;
        int result;
 
-       OCF_METADATA_LOCK_WR();
+       ocf_metadata_start_exclusive_access(&cache->metadata.lock);
        result = ocf_metadata_sparse_range(cache, context->purge.core_id, 0,
                        context->purge.end_byte);
-       OCF_METADATA_UNLOCK_WR();
+       ocf_metadata_end_exclusive_access(&cache->metadata.lock);
 
-       if (result)
-               ocf_pipeline_finish(context->pipeline, result);
-       else
-               ocf_pipeline_next(context->pipeline);
+       OCF_PL_NEXT_ON_SUCCESS_RET(context->pipeline, result);
 }
 
 static
@@ -846,23 +817,20 @@ void ocf_mngt_cache_purge(ocf_cache_t cache,
        if (!cache->mngt_queue) {
                ocf_cache_log(cache, log_err,
                                "Cannot purge cache - no flush queue set\n");
-               cmpl(cache, priv, -OCF_ERR_INVAL);
-               return;
+               OCF_CMPL_RET(cache, priv, -OCF_ERR_INVAL);
        }
 
        result = ocf_pipeline_create(&pipeline, cache,
                        &_ocf_mngt_cache_purge_pipeline_properties);
-       if (result) {
-               cmpl(cache, priv, -OCF_ERR_NO_MEM);
-               return;
-       }
+       if (result)
+               OCF_CMPL_RET(cache, priv, -OCF_ERR_NO_MEM);
+
        context = ocf_pipeline_get_priv(pipeline);
 
        context->pipeline = pipeline;
        context->cmpl.purge_cache = cmpl;
        context->priv = priv;
        context->cache = cache;
-       context->allow_interruption = true;
        context->op = purge_cache;
        context->purge.core_id = OCF_CORE_ID_INVALID;
        context->purge.end_byte = ~0ULL;
@@ -900,18 +868,15 @@ void ocf_mngt_core_purge(ocf_core_t core,
        if (!cache->mngt_queue) {
                ocf_core_log(core, log_err,
                                "Cannot purge core - no flush queue set\n");
-               cmpl(core, priv, -OCF_ERR_INVAL);
-               return;
+               OCF_CMPL_RET(core, priv, -OCF_ERR_INVAL);
        }
 
        core_size = ocf_volume_get_length(&cache->core[core_id].volume);
 
        result = ocf_pipeline_create(&pipeline, cache,
                        &_ocf_mngt_core_purge_pipeline_properties);
-       if (result) {
-               cmpl(core, priv, -OCF_ERR_NO_MEM);
-               return;
-       }
+       if (result)
+               OCF_CMPL_RET(core, priv, -OCF_ERR_NO_MEM);
 
        context = ocf_pipeline_get_priv(pipeline);
 
@@ -919,7 +884,6 @@ void ocf_mngt_core_purge(ocf_core_t core,
        context->cmpl.purge_core = cmpl;
        context->priv = priv;
        context->cache = cache;
-       context->allow_interruption = true;
        context->op = purge_core;
        context->purge.core_id = core_id;
        context->purge.end_byte = core_size ?: ~0ULL;
@@ -954,7 +918,7 @@ int ocf_mngt_cache_cleaning_set_policy(ocf_cache_t cache, ocf_cleaning_t type)
                return 0;
        }
 
-       ocf_metadata_lock(cache, OCF_METADATA_WR);
+       ocf_metadata_start_exclusive_access(&cache->metadata.lock);
 
        if (cleaning_policy_ops[old_type].deinitialize)
                cleaning_policy_ops[old_type].deinitialize(cache);
@@ -972,7 +936,7 @@ int ocf_mngt_cache_cleaning_set_policy(ocf_cache_t cache, ocf_cleaning_t type)
 
        cache->conf_meta->cleaning_policy_type = type;
 
-       ocf_metadata_unlock(cache, OCF_METADATA_WR);
+       ocf_metadata_end_exclusive_access(&cache->metadata.lock);
 
        ocf_cache_log(cache, log_info, "Changing cleaning policy from "
                        "%s to %s\n", cleaning_policy_ops[old_type].name,
@@ -1004,12 +968,12 @@ int ocf_mngt_cache_cleaning_set_param(ocf_cache_t cache, ocf_cleaning_t type,
        if (!cleaning_policy_ops[type].set_cleaning_param)
                return -OCF_ERR_INVAL;
 
-       ocf_metadata_lock(cache, OCF_METADATA_WR);
+       ocf_metadata_start_exclusive_access(&cache->metadata.lock);
 
        ret = cleaning_policy_ops[type].set_cleaning_param(cache,
                        param_id, param_value);
 
-       ocf_metadata_unlock(cache, OCF_METADATA_WR);
+       ocf_metadata_end_exclusive_access(&cache->metadata.lock);
 
        return ret;
 }