]> git.proxmox.com Git - ceph.git/blob - ceph/src/common/QueueRing.h
830f80f8442e0e9934790f96cfcfd61f7d7af245
[ceph.git] / ceph / src / common / QueueRing.h
1 #ifndef QUEUE_RING_H
2 #define QUEUE_RING_H
3
4 #include <list>
5 #include <vector>
6 #include "common/Mutex.h"
7 #include "common/Cond.h"
8
9
10
11 template <class T>
12 class QueueRing {
13 struct QueueBucket {
14 Mutex lock;
15 Cond cond;
16 typename std::list<T> entries;
17
18 QueueBucket() : lock("QueueRing::QueueBucket::lock") {}
19 QueueBucket(const QueueBucket& rhs) : lock("QueueRing::QueueBucket::lock") {
20 entries = rhs.entries;
21 }
22
23 void enqueue(const T& entry) {
24 lock.Lock();
25 if (entries.empty()) {
26 cond.Signal();
27 }
28 entries.push_back(entry);
29 lock.Unlock();
30 }
31
32 void dequeue(T *entry) {
33 lock.Lock();
34 if (entries.empty()) {
35 cond.Wait(lock);
36 };
37 assert(!entries.empty());
38 *entry = entries.front();
39 entries.pop_front();
40 lock.Unlock();
41 };
42 };
43
44 std::vector<QueueBucket> buckets;
45 int num_buckets;
46 atomic_t cur_read_bucket;
47 atomic_t cur_write_bucket;
48 public:
49 QueueRing(int n) : buckets(n), num_buckets(n) {
50 }
51
52 void enqueue(const T& entry) {
53 buckets[cur_write_bucket.inc() % num_buckets].enqueue(entry);
54 };
55
56 void dequeue(T *entry) {
57 buckets[cur_read_bucket.inc() % num_buckets].dequeue(entry);
58 }
59 };
60
61 #endif