]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | // Portions Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | // This source code is licensed under both the GPLv2 (found in the | |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
5 | // | |
6 | // Borrowed from | |
7 | // http://www.crazygaze.com/blog/2016/03/24/portable-c-timer-queue/ | |
8 | // Timer Queue | |
9 | // | |
10 | // License | |
11 | // | |
12 | // The source code in this article is licensed under the CC0 license, so feel | |
13 | // free to copy, modify, share, do whatever you want with it. | |
14 | // No attribution is required, but Ill be happy if you do. | |
15 | // CC0 license | |
16 | ||
17 | // The person who associated a work with this deed has dedicated the work to the | |
18 | // public domain by waiving all of his or her rights to the work worldwide | |
19 | // under copyright law, including all related and neighboring rights, to the | |
20 | // extent allowed by law. You can copy, modify, distribute and perform the | |
21 | // work, even for commercial purposes, all without asking permission. | |
22 | ||
23 | #pragma once | |
24 | ||
11fdf7f2 | 25 | #include <assert.h> |
1e59de90 | 26 | |
11fdf7f2 TL |
27 | #include <chrono> |
28 | #include <condition_variable> | |
29 | #include <functional> | |
30 | #include <queue> | |
31 | #include <thread> | |
32 | #include <utility> | |
33 | #include <vector> | |
34 | ||
494da23a | 35 | #include "port/port.h" |
f67539c2 | 36 | #include "test_util/sync_point.h" |
494da23a | 37 | |
11fdf7f2 TL |
38 | // Allows execution of handlers at a specified time in the future |
39 | // Guarantees: | |
40 | // - All handlers are executed ONCE, even if cancelled (aborted parameter will | |
41 | // be set to true) | |
42 | // - If TimerQueue is destroyed, it will cancel all handlers. | |
43 | // - Handlers are ALWAYS executed in the Timer Queue worker thread. | |
44 | // - Handlers execution order is NOT guaranteed | |
45 | // | |
46 | //////////////////////////////////////////////////////////////////////////////// | |
47 | // borrowed from | |
48 | // http://www.crazygaze.com/blog/2016/03/24/portable-c-timer-queue/ | |
49 | class TimerQueue { | |
50 | public: | |
51 | TimerQueue() : m_th(&TimerQueue::run, this) {} | |
52 | ||
494da23a TL |
53 | ~TimerQueue() { shutdown(); } |
54 | ||
55 | // This function is not thread-safe. | |
56 | void shutdown() { | |
57 | if (closed_) { | |
58 | return; | |
59 | } | |
11fdf7f2 TL |
60 | cancelAll(); |
61 | // Abusing the timer queue to trigger the shutdown. | |
62 | add(0, [this](bool) { | |
63 | m_finish = true; | |
64 | return std::make_pair(false, 0); | |
65 | }); | |
66 | m_th.join(); | |
494da23a | 67 | closed_ = true; |
11fdf7f2 TL |
68 | } |
69 | ||
70 | // Adds a new timer | |
71 | // \return | |
72 | // Returns the ID of the new timer. You can use this ID to cancel the | |
73 | // timer | |
74 | uint64_t add(int64_t milliseconds, | |
75 | std::function<std::pair<bool, int64_t>(bool)> handler) { | |
76 | WorkItem item; | |
77 | Clock::time_point tp = Clock::now(); | |
78 | item.end = tp + std::chrono::milliseconds(milliseconds); | |
494da23a | 79 | TEST_SYNC_POINT_CALLBACK("TimeQueue::Add:item.end", &item.end); |
11fdf7f2 TL |
80 | item.period = milliseconds; |
81 | item.handler = std::move(handler); | |
82 | ||
83 | std::unique_lock<std::mutex> lk(m_mtx); | |
84 | uint64_t id = ++m_idcounter; | |
85 | item.id = id; | |
86 | m_items.push(std::move(item)); | |
87 | ||
88 | // Something changed, so wake up timer thread | |
89 | m_checkWork.notify_one(); | |
90 | return id; | |
91 | } | |
92 | ||
93 | // Cancels the specified timer | |
94 | // \return | |
95 | // 1 if the timer was cancelled. | |
96 | // 0 if you were too late to cancel (or the timer ID was never valid to | |
97 | // start with) | |
98 | size_t cancel(uint64_t id) { | |
99 | // Instead of removing the item from the container (thus breaking the | |
100 | // heap integrity), we set the item as having no handler, and put | |
101 | // that handler on a new item at the top for immediate execution | |
102 | // The timer thread will then ignore the original item, since it has no | |
103 | // handler. | |
104 | std::unique_lock<std::mutex> lk(m_mtx); | |
105 | for (auto&& item : m_items.getContainer()) { | |
106 | if (item.id == id && item.handler) { | |
107 | WorkItem newItem; | |
108 | // Zero time, so it stays at the top for immediate execution | |
109 | newItem.end = Clock::time_point(); | |
110 | newItem.id = 0; // Means it is a canceled item | |
111 | // Move the handler from item to newitem (thus clearing item) | |
112 | newItem.handler = std::move(item.handler); | |
113 | m_items.push(std::move(newItem)); | |
114 | ||
115 | // Something changed, so wake up timer thread | |
116 | m_checkWork.notify_one(); | |
117 | return 1; | |
118 | } | |
119 | } | |
120 | return 0; | |
121 | } | |
122 | ||
123 | // Cancels all timers | |
124 | // \return | |
125 | // The number of timers cancelled | |
126 | size_t cancelAll() { | |
127 | // Setting all "end" to 0 (for immediate execution) is ok, | |
128 | // since it maintains the heap integrity | |
129 | std::unique_lock<std::mutex> lk(m_mtx); | |
130 | m_cancel = true; | |
131 | for (auto&& item : m_items.getContainer()) { | |
132 | if (item.id && item.handler) { | |
133 | item.end = Clock::time_point(); | |
134 | item.id = 0; | |
135 | } | |
136 | } | |
137 | auto ret = m_items.size(); | |
138 | ||
139 | m_checkWork.notify_one(); | |
140 | return ret; | |
141 | } | |
142 | ||
143 | private: | |
144 | using Clock = std::chrono::steady_clock; | |
145 | TimerQueue(const TimerQueue&) = delete; | |
146 | TimerQueue& operator=(const TimerQueue&) = delete; | |
147 | ||
148 | void run() { | |
149 | std::unique_lock<std::mutex> lk(m_mtx); | |
150 | while (!m_finish) { | |
151 | auto end = calcWaitTime_lock(); | |
152 | if (end.first) { | |
153 | // Timers found, so wait until it expires (or something else | |
154 | // changes) | |
155 | m_checkWork.wait_until(lk, end.second); | |
156 | } else { | |
157 | // No timers exist, so wait forever until something changes | |
158 | m_checkWork.wait(lk); | |
159 | } | |
160 | ||
161 | // Check and execute as much work as possible, such as, all expired | |
162 | // timers | |
163 | checkWork(&lk); | |
164 | } | |
165 | ||
166 | // If we are shutting down, we should not have any items left, | |
167 | // since the shutdown cancels all items | |
168 | assert(m_items.size() == 0); | |
169 | } | |
170 | ||
171 | std::pair<bool, Clock::time_point> calcWaitTime_lock() { | |
172 | while (m_items.size()) { | |
173 | if (m_items.top().handler) { | |
174 | // Item present, so return the new wait time | |
175 | return std::make_pair(true, m_items.top().end); | |
176 | } else { | |
177 | // Discard empty handlers (they were cancelled) | |
178 | m_items.pop(); | |
179 | } | |
180 | } | |
181 | ||
182 | // No items found, so return no wait time (causes the thread to wait | |
183 | // indefinitely) | |
184 | return std::make_pair(false, Clock::time_point()); | |
185 | } | |
186 | ||
187 | void checkWork(std::unique_lock<std::mutex>* lk) { | |
188 | while (m_items.size() && m_items.top().end <= Clock::now()) { | |
189 | WorkItem item(m_items.top()); | |
190 | m_items.pop(); | |
191 | ||
192 | if (item.handler) { | |
193 | (*lk).unlock(); | |
194 | auto reschedule_pair = item.handler(item.id == 0); | |
195 | (*lk).lock(); | |
196 | if (!m_cancel && reschedule_pair.first) { | |
197 | int64_t new_period = (reschedule_pair.second == -1) | |
198 | ? item.period | |
199 | : reschedule_pair.second; | |
200 | ||
201 | item.period = new_period; | |
202 | item.end = Clock::now() + std::chrono::milliseconds(new_period); | |
203 | m_items.push(std::move(item)); | |
204 | } | |
205 | } | |
206 | } | |
207 | } | |
208 | ||
209 | bool m_finish = false; | |
210 | bool m_cancel = false; | |
211 | uint64_t m_idcounter = 0; | |
212 | std::condition_variable m_checkWork; | |
213 | ||
214 | struct WorkItem { | |
215 | Clock::time_point end; | |
216 | int64_t period; | |
217 | uint64_t id; // id==0 means it was cancelled | |
218 | std::function<std::pair<bool, int64_t>(bool)> handler; | |
219 | bool operator>(const WorkItem& other) const { return end > other.end; } | |
220 | }; | |
221 | ||
222 | std::mutex m_mtx; | |
223 | // Inheriting from priority_queue, so we can access the internal container | |
224 | class Queue : public std::priority_queue<WorkItem, std::vector<WorkItem>, | |
225 | std::greater<WorkItem>> { | |
226 | public: | |
227 | std::vector<WorkItem>& getContainer() { return this->c; } | |
228 | } m_items; | |
f67539c2 | 229 | ROCKSDB_NAMESPACE::port::Thread m_th; |
494da23a | 230 | bool closed_ = false; |
11fdf7f2 | 231 | }; |