]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/compressor/AsyncCompressor.h
update sources to v12.1.0
[ceph.git] / ceph / src / compressor / AsyncCompressor.h
index 81321fdd511e50b9e6fdc59c621d773b38234b4d..7ca8fad04d6804886e9c74c3570b9811f796d607 100644 (file)
 #define CEPH_ASYNCCOMPRESSOR_H
 
 #include <deque>
+#include <vector>
+#include <atomic>
 
-#include "include/atomic.h"
 #include "include/str_list.h"
+
 #include "Compressor.h"
 #include "common/WorkQueue.h"
 
-
 class AsyncCompressor {
  private:
   CompressorRef compressor;
   CephContext *cct;
-  atomic_t job_id;
+  std::atomic<uint64_t> job_id { 0 };
   vector<int> coreids;
   ThreadPool compress_tp;
 
-  enum {
+  enum class status_t {
     WAIT,
     WORKING,
     DONE,
     ERROR
-  } status;
+  };
+
   struct Job {
     uint64_t id;
-    atomic_t status;
+    std::atomic<status_t> status { status_t::WAIT };
     bool is_compress;
     bufferlist data;
-    Job(uint64_t i, bool compress): id(i), status(WAIT), is_compress(compress) {}
-    Job(const Job &j): id(j.id), status(j.status.read()), is_compress(j.is_compress), data(j.data) {}
+    Job(uint64_t i, bool compress): id(i), is_compress(compress) {}
+    Job(const Job &j): id(j.id), status(j.status.load()), is_compress(j.is_compress), data(j.data) {}
   };
   Mutex job_lock;
   // only when job.status == DONE && with job_lock holding, we can insert/erase element in jobs
@@ -75,7 +77,9 @@ class AsyncCompressor {
       while (!job_queue.empty()) {
         item = job_queue.front();
         job_queue.pop_front();
-        if (item->status.compare_and_swap(WAIT, WORKING)) {
+
+        auto expected = status_t::WAIT;
+        if (item->status.compare_exchange_strong(expected, status_t::WORKING)) {
           break;
         } else {
           Mutex::Locker l(async_compressor->job_lock);
@@ -86,7 +90,7 @@ class AsyncCompressor {
       return item;
     }
     void _process(Job *item, ThreadPool::TPHandle &) override {
-      assert(item->status.read() == WORKING);
+      assert(item->status == status_t::WORKING);
       bufferlist out;
       int r;
       if (item->is_compress)
@@ -95,9 +99,10 @@ class AsyncCompressor {
         r = async_compressor->compressor->decompress(item->data, out);
       if (!r) {
         item->data.swap(out);
-        assert(item->status.compare_and_swap(WORKING, DONE));
+        auto expected = status_t::WORKING;
+        assert(item->status.compare_exchange_strong(expected, status_t::DONE));
       } else {
-        item->status.set(ERROR);
+        item->status = status_t::ERROR;
       }
     }
     void _process_finish(Job *item) override {}