]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | #ifndef QUEUE_RING_H |
2 | #define QUEUE_RING_H | |
3 | ||
7c673cae FG |
4 | #include "common/Mutex.h" |
5 | #include "common/Cond.h" | |
6 | ||
31f18b77 FG |
7 | #include <list> |
8 | #include <atomic> | |
9 | #include <vector> | |
7c673cae FG |
10 | |
11 | template <class T> | |
12 | class QueueRing { | |
13 | struct QueueBucket { | |
14 | Mutex lock; | |
15 | Cond cond; | |
16 | typename std::list<T> entries; | |
17 | ||
18 | QueueBucket() : lock("QueueRing::QueueBucket::lock") {} | |
19 | QueueBucket(const QueueBucket& rhs) : lock("QueueRing::QueueBucket::lock") { | |
20 | entries = rhs.entries; | |
21 | } | |
22 | ||
23 | void enqueue(const T& entry) { | |
24 | lock.Lock(); | |
25 | if (entries.empty()) { | |
26 | cond.Signal(); | |
27 | } | |
28 | entries.push_back(entry); | |
29 | lock.Unlock(); | |
30 | } | |
31 | ||
32 | void dequeue(T *entry) { | |
33 | lock.Lock(); | |
34 | if (entries.empty()) { | |
35 | cond.Wait(lock); | |
36 | }; | |
37 | assert(!entries.empty()); | |
38 | *entry = entries.front(); | |
39 | entries.pop_front(); | |
40 | lock.Unlock(); | |
41 | }; | |
42 | }; | |
43 | ||
44 | std::vector<QueueBucket> buckets; | |
45 | int num_buckets; | |
31f18b77 FG |
46 | |
47 | std::atomic<int64_t> cur_read_bucket = { 0 }; | |
48 | std::atomic<int64_t> cur_write_bucket = { 0 }; | |
49 | ||
7c673cae FG |
50 | public: |
51 | QueueRing(int n) : buckets(n), num_buckets(n) { | |
52 | } | |
53 | ||
54 | void enqueue(const T& entry) { | |
31f18b77 | 55 | buckets[++cur_write_bucket % num_buckets].enqueue(entry); |
7c673cae FG |
56 | }; |
57 | ||
58 | void dequeue(T *entry) { | |
31f18b77 | 59 | buckets[++cur_read_bucket % num_buckets].dequeue(entry); |
7c673cae FG |
60 | } |
61 | }; | |
62 | ||
63 | #endif |