1 // Portions Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
7 // http://www.crazygaze.com/blog/2016/03/24/portable-c-timer-queue/
12 // The source code in this article is licensed under the CC0 license, so feel
13 // free to copy, modify, share, do whatever you want with it.
14 // No attribution is required, but Ill be happy if you do.
17 // The person who associated a work with this deed has dedicated the work to the
18 // public domain by waiving all of his or her rights to the work worldwide
19 // under copyright law, including all related and neighboring rights, to the
20 // extent allowed by law. You can copy, modify, distribute and perform the
21 // work, even for commercial purposes, all without asking permission.
27 #include <condition_variable>
34 #include "port/port.h"
35 #include "util/sync_point.h"
37 // Allows execution of handlers at a specified time in the future
39 // - All handlers are executed ONCE, even if cancelled (aborted parameter will
41 // - If TimerQueue is destroyed, it will cancel all handlers.
42 // - Handlers are ALWAYS executed in the Timer Queue worker thread.
43 // - Handlers execution order is NOT guaranteed
45 ////////////////////////////////////////////////////////////////////////////////
47 // http://www.crazygaze.com/blog/2016/03/24/portable-c-timer-queue/
50 TimerQueue() : m_th(&TimerQueue::run
, this) {}
52 ~TimerQueue() { shutdown(); }
54 // This function is not thread-safe.
60 // Abusing the timer queue to trigger the shutdown.
63 return std::make_pair(false, 0);
71 // Returns the ID of the new timer. You can use this ID to cancel the
73 uint64_t add(int64_t milliseconds
,
74 std::function
<std::pair
<bool, int64_t>(bool)> handler
) {
76 Clock::time_point tp
= Clock::now();
77 item
.end
= tp
+ std::chrono::milliseconds(milliseconds
);
78 TEST_SYNC_POINT_CALLBACK("TimeQueue::Add:item.end", &item
.end
);
79 item
.period
= milliseconds
;
80 item
.handler
= std::move(handler
);
82 std::unique_lock
<std::mutex
> lk(m_mtx
);
83 uint64_t id
= ++m_idcounter
;
85 m_items
.push(std::move(item
));
87 // Something changed, so wake up timer thread
88 m_checkWork
.notify_one();
92 // Cancels the specified timer
94 // 1 if the timer was cancelled.
95 // 0 if you were too late to cancel (or the timer ID was never valid to
97 size_t cancel(uint64_t id
) {
98 // Instead of removing the item from the container (thus breaking the
99 // heap integrity), we set the item as having no handler, and put
100 // that handler on a new item at the top for immediate execution
101 // The timer thread will then ignore the original item, since it has no
103 std::unique_lock
<std::mutex
> lk(m_mtx
);
104 for (auto&& item
: m_items
.getContainer()) {
105 if (item
.id
== id
&& item
.handler
) {
107 // Zero time, so it stays at the top for immediate execution
108 newItem
.end
= Clock::time_point();
109 newItem
.id
= 0; // Means it is a canceled item
110 // Move the handler from item to newitem (thus clearing item)
111 newItem
.handler
= std::move(item
.handler
);
112 m_items
.push(std::move(newItem
));
114 // Something changed, so wake up timer thread
115 m_checkWork
.notify_one();
122 // Cancels all timers
124 // The number of timers cancelled
126 // Setting all "end" to 0 (for immediate execution) is ok,
127 // since it maintains the heap integrity
128 std::unique_lock
<std::mutex
> lk(m_mtx
);
130 for (auto&& item
: m_items
.getContainer()) {
131 if (item
.id
&& item
.handler
) {
132 item
.end
= Clock::time_point();
136 auto ret
= m_items
.size();
138 m_checkWork
.notify_one();
143 using Clock
= std::chrono::steady_clock
;
144 TimerQueue(const TimerQueue
&) = delete;
145 TimerQueue
& operator=(const TimerQueue
&) = delete;
148 std::unique_lock
<std::mutex
> lk(m_mtx
);
150 auto end
= calcWaitTime_lock();
152 // Timers found, so wait until it expires (or something else
154 m_checkWork
.wait_until(lk
, end
.second
);
156 // No timers exist, so wait forever until something changes
157 m_checkWork
.wait(lk
);
160 // Check and execute as much work as possible, such as, all expired
165 // If we are shutting down, we should not have any items left,
166 // since the shutdown cancels all items
167 assert(m_items
.size() == 0);
170 std::pair
<bool, Clock::time_point
> calcWaitTime_lock() {
171 while (m_items
.size()) {
172 if (m_items
.top().handler
) {
173 // Item present, so return the new wait time
174 return std::make_pair(true, m_items
.top().end
);
176 // Discard empty handlers (they were cancelled)
181 // No items found, so return no wait time (causes the thread to wait
183 return std::make_pair(false, Clock::time_point());
186 void checkWork(std::unique_lock
<std::mutex
>* lk
) {
187 while (m_items
.size() && m_items
.top().end
<= Clock::now()) {
188 WorkItem
item(m_items
.top());
193 auto reschedule_pair
= item
.handler(item
.id
== 0);
195 if (!m_cancel
&& reschedule_pair
.first
) {
196 int64_t new_period
= (reschedule_pair
.second
== -1)
198 : reschedule_pair
.second
;
200 item
.period
= new_period
;
201 item
.end
= Clock::now() + std::chrono::milliseconds(new_period
);
202 m_items
.push(std::move(item
));
208 bool m_finish
= false;
209 bool m_cancel
= false;
210 uint64_t m_idcounter
= 0;
211 std::condition_variable m_checkWork
;
214 Clock::time_point end
;
216 uint64_t id
; // id==0 means it was cancelled
217 std::function
<std::pair
<bool, int64_t>(bool)> handler
;
218 bool operator>(const WorkItem
& other
) const { return end
> other
.end
; }
222 // Inheriting from priority_queue, so we can access the internal container
223 class Queue
: public std::priority_queue
<WorkItem
, std::vector
<WorkItem
>,
224 std::greater
<WorkItem
>> {
226 std::vector
<WorkItem
>& getContainer() { return this->c
; }
228 rocksdb::port::Thread m_th
;
229 bool closed_
= false;