]> git.proxmox.com Git - ceph.git/blame - ceph/src/common/QueueRing.h
update sources to v12.1.0
[ceph.git] / ceph / src / common / QueueRing.h
CommitLineData
7c673cae
FG
1#ifndef QUEUE_RING_H
2#define QUEUE_RING_H
3
7c673cae
FG
4#include "common/Mutex.h"
5#include "common/Cond.h"
6
31f18b77
FG
7#include <list>
8#include <atomic>
9#include <vector>
7c673cae
FG
10
11template <class T>
12class QueueRing {
13 struct QueueBucket {
14 Mutex lock;
15 Cond cond;
16 typename std::list<T> entries;
17
18 QueueBucket() : lock("QueueRing::QueueBucket::lock") {}
19 QueueBucket(const QueueBucket& rhs) : lock("QueueRing::QueueBucket::lock") {
20 entries = rhs.entries;
21 }
22
23 void enqueue(const T& entry) {
24 lock.Lock();
25 if (entries.empty()) {
26 cond.Signal();
27 }
28 entries.push_back(entry);
29 lock.Unlock();
30 }
31
32 void dequeue(T *entry) {
33 lock.Lock();
34 if (entries.empty()) {
35 cond.Wait(lock);
36 };
37 assert(!entries.empty());
38 *entry = entries.front();
39 entries.pop_front();
40 lock.Unlock();
41 };
42 };
43
44 std::vector<QueueBucket> buckets;
45 int num_buckets;
31f18b77
FG
46
47 std::atomic<int64_t> cur_read_bucket = { 0 };
48 std::atomic<int64_t> cur_write_bucket = { 0 };
49
7c673cae
FG
50public:
51 QueueRing(int n) : buckets(n), num_buckets(n) {
52 }
53
54 void enqueue(const T& entry) {
31f18b77 55 buckets[++cur_write_bucket % num_buckets].enqueue(entry);
7c673cae
FG
56 };
57
58 void dequeue(T *entry) {
31f18b77 59 buckets[++cur_read_bucket % num_buckets].dequeue(entry);
7c673cae
FG
60 }
61};
62
63#endif