]> git.proxmox.com Git - ceph.git/blob - ceph/src/common/QueueRing.h
update sources to v12.1.0
[ceph.git] / ceph / src / common / QueueRing.h
1 #ifndef QUEUE_RING_H
2 #define QUEUE_RING_H
3
4 #include "common/Mutex.h"
5 #include "common/Cond.h"
6
7 #include <list>
8 #include <atomic>
9 #include <vector>
10
11 template <class T>
12 class QueueRing {
13 struct QueueBucket {
14 Mutex lock;
15 Cond cond;
16 typename std::list<T> entries;
17
18 QueueBucket() : lock("QueueRing::QueueBucket::lock") {}
19 QueueBucket(const QueueBucket& rhs) : lock("QueueRing::QueueBucket::lock") {
20 entries = rhs.entries;
21 }
22
23 void enqueue(const T& entry) {
24 lock.Lock();
25 if (entries.empty()) {
26 cond.Signal();
27 }
28 entries.push_back(entry);
29 lock.Unlock();
30 }
31
32 void dequeue(T *entry) {
33 lock.Lock();
34 if (entries.empty()) {
35 cond.Wait(lock);
36 };
37 assert(!entries.empty());
38 *entry = entries.front();
39 entries.pop_front();
40 lock.Unlock();
41 };
42 };
43
44 std::vector<QueueBucket> buckets;
45 int num_buckets;
46
47 std::atomic<int64_t> cur_read_bucket = { 0 };
48 std::atomic<int64_t> cur_write_bucket = { 0 };
49
50 public:
51 QueueRing(int n) : buckets(n), num_buckets(n) {
52 }
53
54 void enqueue(const T& entry) {
55 buckets[++cur_write_bucket % num_buckets].enqueue(entry);
56 };
57
58 void dequeue(T *entry) {
59 buckets[++cur_read_bucket % num_buckets].dequeue(entry);
60 }
61 };
62
63 #endif