]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/util/timer_queue.h
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / util / timer_queue.h
CommitLineData
11fdf7f2
TL
1// Portions Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
5//
6// Borrowed from
7// http://www.crazygaze.com/blog/2016/03/24/portable-c-timer-queue/
8// Timer Queue
9//
10// License
11//
12// The source code in this article is licensed under the CC0 license, so feel
13// free to copy, modify, share, do whatever you want with it.
14// No attribution is required, but Ill be happy if you do.
15// CC0 license
16
17// The person who associated a work with this deed has dedicated the work to the
18// public domain by waiving all of his or her rights to the work worldwide
19// under copyright law, including all related and neighboring rights, to the
20// extent allowed by law. You can copy, modify, distribute and perform the
21// work, even for commercial purposes, all without asking permission.
22
23#pragma once
24
11fdf7f2 25#include <assert.h>
1e59de90 26
11fdf7f2
TL
27#include <chrono>
28#include <condition_variable>
29#include <functional>
30#include <queue>
31#include <thread>
32#include <utility>
33#include <vector>
34
494da23a 35#include "port/port.h"
f67539c2 36#include "test_util/sync_point.h"
494da23a 37
11fdf7f2
TL
38// Allows execution of handlers at a specified time in the future
39// Guarantees:
40// - All handlers are executed ONCE, even if cancelled (aborted parameter will
41// be set to true)
42// - If TimerQueue is destroyed, it will cancel all handlers.
43// - Handlers are ALWAYS executed in the Timer Queue worker thread.
44// - Handlers execution order is NOT guaranteed
45//
46////////////////////////////////////////////////////////////////////////////////
47// borrowed from
48// http://www.crazygaze.com/blog/2016/03/24/portable-c-timer-queue/
49class TimerQueue {
50 public:
51 TimerQueue() : m_th(&TimerQueue::run, this) {}
52
494da23a
TL
53 ~TimerQueue() { shutdown(); }
54
55 // This function is not thread-safe.
56 void shutdown() {
57 if (closed_) {
58 return;
59 }
11fdf7f2
TL
60 cancelAll();
61 // Abusing the timer queue to trigger the shutdown.
62 add(0, [this](bool) {
63 m_finish = true;
64 return std::make_pair(false, 0);
65 });
66 m_th.join();
494da23a 67 closed_ = true;
11fdf7f2
TL
68 }
69
70 // Adds a new timer
71 // \return
72 // Returns the ID of the new timer. You can use this ID to cancel the
73 // timer
74 uint64_t add(int64_t milliseconds,
75 std::function<std::pair<bool, int64_t>(bool)> handler) {
76 WorkItem item;
77 Clock::time_point tp = Clock::now();
78 item.end = tp + std::chrono::milliseconds(milliseconds);
494da23a 79 TEST_SYNC_POINT_CALLBACK("TimeQueue::Add:item.end", &item.end);
11fdf7f2
TL
80 item.period = milliseconds;
81 item.handler = std::move(handler);
82
83 std::unique_lock<std::mutex> lk(m_mtx);
84 uint64_t id = ++m_idcounter;
85 item.id = id;
86 m_items.push(std::move(item));
87
88 // Something changed, so wake up timer thread
89 m_checkWork.notify_one();
90 return id;
91 }
92
93 // Cancels the specified timer
94 // \return
95 // 1 if the timer was cancelled.
96 // 0 if you were too late to cancel (or the timer ID was never valid to
97 // start with)
98 size_t cancel(uint64_t id) {
99 // Instead of removing the item from the container (thus breaking the
100 // heap integrity), we set the item as having no handler, and put
101 // that handler on a new item at the top for immediate execution
102 // The timer thread will then ignore the original item, since it has no
103 // handler.
104 std::unique_lock<std::mutex> lk(m_mtx);
105 for (auto&& item : m_items.getContainer()) {
106 if (item.id == id && item.handler) {
107 WorkItem newItem;
108 // Zero time, so it stays at the top for immediate execution
109 newItem.end = Clock::time_point();
110 newItem.id = 0; // Means it is a canceled item
111 // Move the handler from item to newitem (thus clearing item)
112 newItem.handler = std::move(item.handler);
113 m_items.push(std::move(newItem));
114
115 // Something changed, so wake up timer thread
116 m_checkWork.notify_one();
117 return 1;
118 }
119 }
120 return 0;
121 }
122
123 // Cancels all timers
124 // \return
125 // The number of timers cancelled
126 size_t cancelAll() {
127 // Setting all "end" to 0 (for immediate execution) is ok,
128 // since it maintains the heap integrity
129 std::unique_lock<std::mutex> lk(m_mtx);
130 m_cancel = true;
131 for (auto&& item : m_items.getContainer()) {
132 if (item.id && item.handler) {
133 item.end = Clock::time_point();
134 item.id = 0;
135 }
136 }
137 auto ret = m_items.size();
138
139 m_checkWork.notify_one();
140 return ret;
141 }
142
143 private:
144 using Clock = std::chrono::steady_clock;
145 TimerQueue(const TimerQueue&) = delete;
146 TimerQueue& operator=(const TimerQueue&) = delete;
147
148 void run() {
149 std::unique_lock<std::mutex> lk(m_mtx);
150 while (!m_finish) {
151 auto end = calcWaitTime_lock();
152 if (end.first) {
153 // Timers found, so wait until it expires (or something else
154 // changes)
155 m_checkWork.wait_until(lk, end.second);
156 } else {
157 // No timers exist, so wait forever until something changes
158 m_checkWork.wait(lk);
159 }
160
161 // Check and execute as much work as possible, such as, all expired
162 // timers
163 checkWork(&lk);
164 }
165
166 // If we are shutting down, we should not have any items left,
167 // since the shutdown cancels all items
168 assert(m_items.size() == 0);
169 }
170
171 std::pair<bool, Clock::time_point> calcWaitTime_lock() {
172 while (m_items.size()) {
173 if (m_items.top().handler) {
174 // Item present, so return the new wait time
175 return std::make_pair(true, m_items.top().end);
176 } else {
177 // Discard empty handlers (they were cancelled)
178 m_items.pop();
179 }
180 }
181
182 // No items found, so return no wait time (causes the thread to wait
183 // indefinitely)
184 return std::make_pair(false, Clock::time_point());
185 }
186
187 void checkWork(std::unique_lock<std::mutex>* lk) {
188 while (m_items.size() && m_items.top().end <= Clock::now()) {
189 WorkItem item(m_items.top());
190 m_items.pop();
191
192 if (item.handler) {
193 (*lk).unlock();
194 auto reschedule_pair = item.handler(item.id == 0);
195 (*lk).lock();
196 if (!m_cancel && reschedule_pair.first) {
197 int64_t new_period = (reschedule_pair.second == -1)
198 ? item.period
199 : reschedule_pair.second;
200
201 item.period = new_period;
202 item.end = Clock::now() + std::chrono::milliseconds(new_period);
203 m_items.push(std::move(item));
204 }
205 }
206 }
207 }
208
209 bool m_finish = false;
210 bool m_cancel = false;
211 uint64_t m_idcounter = 0;
212 std::condition_variable m_checkWork;
213
214 struct WorkItem {
215 Clock::time_point end;
216 int64_t period;
217 uint64_t id; // id==0 means it was cancelled
218 std::function<std::pair<bool, int64_t>(bool)> handler;
219 bool operator>(const WorkItem& other) const { return end > other.end; }
220 };
221
222 std::mutex m_mtx;
223 // Inheriting from priority_queue, so we can access the internal container
224 class Queue : public std::priority_queue<WorkItem, std::vector<WorkItem>,
225 std::greater<WorkItem>> {
226 public:
227 std::vector<WorkItem>& getContainer() { return this->c; }
228 } m_items;
f67539c2 229 ROCKSDB_NAMESPACE::port::Thread m_th;
494da23a 230 bool closed_ = false;
11fdf7f2 231};