]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | // Portions Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | // This source code is licensed under both the GPLv2 (found in the | |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
5 | // | |
6 | // Borrowed from | |
7 | // http://www.crazygaze.com/blog/2016/03/24/portable-c-timer-queue/ | |
8 | // Timer Queue | |
9 | // | |
10 | // License | |
11 | // | |
12 | // The source code in this article is licensed under the CC0 license, so feel | |
13 | // free to copy, modify, share, do whatever you want with it. | |
14 | // No attribution is required, but Ill be happy if you do. | |
15 | // CC0 license | |
16 | ||
17 | // The person who associated a work with this deed has dedicated the work to the | |
18 | // public domain by waiving all of his or her rights to the work worldwide | |
19 | // under copyright law, including all related and neighboring rights, to the | |
20 | // extent allowed by law. You can copy, modify, distribute and perform the | |
21 | // work, even for commercial purposes, all without asking permission. | |
22 | ||
23 | #pragma once | |
24 | ||
11fdf7f2 TL |
25 | #include <assert.h> |
26 | #include <chrono> | |
27 | #include <condition_variable> | |
28 | #include <functional> | |
29 | #include <queue> | |
30 | #include <thread> | |
31 | #include <utility> | |
32 | #include <vector> | |
33 | ||
494da23a | 34 | #include "port/port.h" |
f67539c2 | 35 | #include "test_util/sync_point.h" |
494da23a | 36 | |
11fdf7f2 TL |
37 | // Allows execution of handlers at a specified time in the future |
38 | // Guarantees: | |
39 | // - All handlers are executed ONCE, even if cancelled (aborted parameter will | |
40 | // be set to true) | |
41 | // - If TimerQueue is destroyed, it will cancel all handlers. | |
42 | // - Handlers are ALWAYS executed in the Timer Queue worker thread. | |
43 | // - Handlers execution order is NOT guaranteed | |
44 | // | |
45 | //////////////////////////////////////////////////////////////////////////////// | |
46 | // borrowed from | |
47 | // http://www.crazygaze.com/blog/2016/03/24/portable-c-timer-queue/ | |
48 | class TimerQueue { | |
49 | public: | |
50 | TimerQueue() : m_th(&TimerQueue::run, this) {} | |
51 | ||
494da23a TL |
52 | ~TimerQueue() { shutdown(); } |
53 | ||
54 | // This function is not thread-safe. | |
55 | void shutdown() { | |
56 | if (closed_) { | |
57 | return; | |
58 | } | |
11fdf7f2 TL |
59 | cancelAll(); |
60 | // Abusing the timer queue to trigger the shutdown. | |
61 | add(0, [this](bool) { | |
62 | m_finish = true; | |
63 | return std::make_pair(false, 0); | |
64 | }); | |
65 | m_th.join(); | |
494da23a | 66 | closed_ = true; |
11fdf7f2 TL |
67 | } |
68 | ||
69 | // Adds a new timer | |
70 | // \return | |
71 | // Returns the ID of the new timer. You can use this ID to cancel the | |
72 | // timer | |
73 | uint64_t add(int64_t milliseconds, | |
74 | std::function<std::pair<bool, int64_t>(bool)> handler) { | |
75 | WorkItem item; | |
76 | Clock::time_point tp = Clock::now(); | |
77 | item.end = tp + std::chrono::milliseconds(milliseconds); | |
494da23a | 78 | TEST_SYNC_POINT_CALLBACK("TimeQueue::Add:item.end", &item.end); |
11fdf7f2 TL |
79 | item.period = milliseconds; |
80 | item.handler = std::move(handler); | |
81 | ||
82 | std::unique_lock<std::mutex> lk(m_mtx); | |
83 | uint64_t id = ++m_idcounter; | |
84 | item.id = id; | |
85 | m_items.push(std::move(item)); | |
86 | ||
87 | // Something changed, so wake up timer thread | |
88 | m_checkWork.notify_one(); | |
89 | return id; | |
90 | } | |
91 | ||
92 | // Cancels the specified timer | |
93 | // \return | |
94 | // 1 if the timer was cancelled. | |
95 | // 0 if you were too late to cancel (or the timer ID was never valid to | |
96 | // start with) | |
97 | size_t cancel(uint64_t id) { | |
98 | // Instead of removing the item from the container (thus breaking the | |
99 | // heap integrity), we set the item as having no handler, and put | |
100 | // that handler on a new item at the top for immediate execution | |
101 | // The timer thread will then ignore the original item, since it has no | |
102 | // handler. | |
103 | std::unique_lock<std::mutex> lk(m_mtx); | |
104 | for (auto&& item : m_items.getContainer()) { | |
105 | if (item.id == id && item.handler) { | |
106 | WorkItem newItem; | |
107 | // Zero time, so it stays at the top for immediate execution | |
108 | newItem.end = Clock::time_point(); | |
109 | newItem.id = 0; // Means it is a canceled item | |
110 | // Move the handler from item to newitem (thus clearing item) | |
111 | newItem.handler = std::move(item.handler); | |
112 | m_items.push(std::move(newItem)); | |
113 | ||
114 | // Something changed, so wake up timer thread | |
115 | m_checkWork.notify_one(); | |
116 | return 1; | |
117 | } | |
118 | } | |
119 | return 0; | |
120 | } | |
121 | ||
122 | // Cancels all timers | |
123 | // \return | |
124 | // The number of timers cancelled | |
125 | size_t cancelAll() { | |
126 | // Setting all "end" to 0 (for immediate execution) is ok, | |
127 | // since it maintains the heap integrity | |
128 | std::unique_lock<std::mutex> lk(m_mtx); | |
129 | m_cancel = true; | |
130 | for (auto&& item : m_items.getContainer()) { | |
131 | if (item.id && item.handler) { | |
132 | item.end = Clock::time_point(); | |
133 | item.id = 0; | |
134 | } | |
135 | } | |
136 | auto ret = m_items.size(); | |
137 | ||
138 | m_checkWork.notify_one(); | |
139 | return ret; | |
140 | } | |
141 | ||
142 | private: | |
143 | using Clock = std::chrono::steady_clock; | |
144 | TimerQueue(const TimerQueue&) = delete; | |
145 | TimerQueue& operator=(const TimerQueue&) = delete; | |
146 | ||
147 | void run() { | |
148 | std::unique_lock<std::mutex> lk(m_mtx); | |
149 | while (!m_finish) { | |
150 | auto end = calcWaitTime_lock(); | |
151 | if (end.first) { | |
152 | // Timers found, so wait until it expires (or something else | |
153 | // changes) | |
154 | m_checkWork.wait_until(lk, end.second); | |
155 | } else { | |
156 | // No timers exist, so wait forever until something changes | |
157 | m_checkWork.wait(lk); | |
158 | } | |
159 | ||
160 | // Check and execute as much work as possible, such as, all expired | |
161 | // timers | |
162 | checkWork(&lk); | |
163 | } | |
164 | ||
165 | // If we are shutting down, we should not have any items left, | |
166 | // since the shutdown cancels all items | |
167 | assert(m_items.size() == 0); | |
168 | } | |
169 | ||
170 | std::pair<bool, Clock::time_point> calcWaitTime_lock() { | |
171 | while (m_items.size()) { | |
172 | if (m_items.top().handler) { | |
173 | // Item present, so return the new wait time | |
174 | return std::make_pair(true, m_items.top().end); | |
175 | } else { | |
176 | // Discard empty handlers (they were cancelled) | |
177 | m_items.pop(); | |
178 | } | |
179 | } | |
180 | ||
181 | // No items found, so return no wait time (causes the thread to wait | |
182 | // indefinitely) | |
183 | return std::make_pair(false, Clock::time_point()); | |
184 | } | |
185 | ||
186 | void checkWork(std::unique_lock<std::mutex>* lk) { | |
187 | while (m_items.size() && m_items.top().end <= Clock::now()) { | |
188 | WorkItem item(m_items.top()); | |
189 | m_items.pop(); | |
190 | ||
191 | if (item.handler) { | |
192 | (*lk).unlock(); | |
193 | auto reschedule_pair = item.handler(item.id == 0); | |
194 | (*lk).lock(); | |
195 | if (!m_cancel && reschedule_pair.first) { | |
196 | int64_t new_period = (reschedule_pair.second == -1) | |
197 | ? item.period | |
198 | : reschedule_pair.second; | |
199 | ||
200 | item.period = new_period; | |
201 | item.end = Clock::now() + std::chrono::milliseconds(new_period); | |
202 | m_items.push(std::move(item)); | |
203 | } | |
204 | } | |
205 | } | |
206 | } | |
207 | ||
208 | bool m_finish = false; | |
209 | bool m_cancel = false; | |
210 | uint64_t m_idcounter = 0; | |
211 | std::condition_variable m_checkWork; | |
212 | ||
213 | struct WorkItem { | |
214 | Clock::time_point end; | |
215 | int64_t period; | |
216 | uint64_t id; // id==0 means it was cancelled | |
217 | std::function<std::pair<bool, int64_t>(bool)> handler; | |
218 | bool operator>(const WorkItem& other) const { return end > other.end; } | |
219 | }; | |
220 | ||
221 | std::mutex m_mtx; | |
222 | // Inheriting from priority_queue, so we can access the internal container | |
223 | class Queue : public std::priority_queue<WorkItem, std::vector<WorkItem>, | |
224 | std::greater<WorkItem>> { | |
225 | public: | |
226 | std::vector<WorkItem>& getContainer() { return this->c; } | |
227 | } m_items; | |
f67539c2 | 228 | ROCKSDB_NAMESPACE::port::Thread m_th; |
494da23a | 229 | bool closed_ = false; |
11fdf7f2 | 230 | }; |