]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | #ifndef COMMON_CEPH_TIMER_H | |
16 | #define COMMON_CEPH_TIMER_H | |
17 | ||
18 | #include <condition_variable> | |
7c673cae | 19 | #include <thread> |
7c673cae FG |
20 | #include <boost/intrusive/set.hpp> |
21 | ||
7c673cae FG |
22 | namespace ceph { |
23 | ||
24 | /// Newly constructed timer should be suspended at point of | |
25 | /// construction. | |
26 | ||
27 | struct construct_suspended_t { }; | |
28 | constexpr construct_suspended_t construct_suspended { }; | |
29 | ||
30 | namespace timer_detail { | |
31 | using boost::intrusive::member_hook; | |
32 | using boost::intrusive::set_member_hook; | |
33 | using boost::intrusive::link_mode; | |
34 | using boost::intrusive::normal_link; | |
35 | using boost::intrusive::set; | |
36 | using boost::intrusive::constant_time_size; | |
37 | using boost::intrusive::compare; | |
38 | ||
39 | // Compared to the SafeTimer this does fewer allocations (you | |
40 | // don't have to allocate a new Context every time you | |
41 | // want to cue the next tick.) | |
42 | // | |
43 | // It also does not share a lock with the caller. If you call | |
44 | // cancel event, it either cancels the event (and returns true) or | |
45 | // you missed it. If this does not work for you, you can set up a | |
46 | // flag and mutex of your own. | |
47 | // | |
48 | // You get to pick your clock. I like mono_clock, since I usually | |
49 | // want to wait FOR a given duration. real_clock is worthwhile if | |
50 | // you want to wait UNTIL a specific moment of wallclock time. If | |
51 | // you want you can set up a timer that executes a function after | |
52 | // you use up ten seconds of CPU time. | |
53 | ||
54 | template <class TC> | |
55 | class timer { | |
56 | using sh = set_member_hook<link_mode<normal_link> >; | |
57 | ||
58 | struct event { | |
59 | typename TC::time_point t; | |
60 | uint64_t id; | |
61 | std::function<void()> f; | |
62 | ||
63 | sh schedule_link; | |
64 | sh event_link; | |
65 | ||
66 | event() : t(TC::time_point::min()), id(0) {} | |
67 | event(uint64_t _id) : t(TC::time_point::min()), id(_id) {} | |
68 | event(typename TC::time_point _t, uint64_t _id, | |
69 | std::function<void()>&& _f) : t(_t), id(_id), f(_f) {} | |
70 | event(typename TC::time_point _t, uint64_t _id, | |
71 | const std::function<void()>& _f) : t(_t), id(_id), f(_f) {} | |
72 | bool operator <(const event& e) { | |
73 | return t == e.t ? id < e.id : t < e.t; | |
74 | } | |
75 | }; | |
76 | struct SchedCompare { | |
77 | bool operator()(const event& e1, const event& e2) const { | |
78 | return e1.t == e2.t ? e1.id < e2.id : e1.t < e2.t; | |
79 | } | |
80 | }; | |
81 | struct EventCompare { | |
82 | bool operator()(const event& e1, const event& e2) const { | |
83 | return e1.id < e2.id; | |
84 | } | |
85 | }; | |
86 | ||
87 | using schedule_type = set<event, | |
88 | member_hook<event, sh, &event::schedule_link>, | |
89 | constant_time_size<false>, | |
90 | compare<SchedCompare> >; | |
91 | ||
92 | schedule_type schedule; | |
93 | ||
94 | using event_set_type = set<event, | |
95 | member_hook<event, sh, &event::event_link>, | |
96 | constant_time_size<false>, | |
97 | compare<EventCompare> >; | |
98 | ||
99 | event_set_type events; | |
100 | ||
101 | std::mutex lock; | |
102 | using lock_guard = std::lock_guard<std::mutex>; | |
103 | using unique_lock = std::unique_lock<std::mutex>; | |
104 | std::condition_variable cond; | |
105 | ||
106 | event* running{ nullptr }; | |
107 | uint64_t next_id{ 0 }; | |
108 | ||
109 | bool suspended; | |
110 | std::thread thread; | |
111 | ||
112 | void timer_thread() { | |
113 | unique_lock l(lock); | |
114 | while (!suspended) { | |
115 | typename TC::time_point now = TC::now(); | |
116 | ||
117 | while (!schedule.empty()) { | |
118 | auto p = schedule.begin(); | |
119 | // Should we wait for the future? | |
120 | if (p->t > now) | |
121 | break; | |
122 | ||
123 | event& e = *p; | |
124 | schedule.erase(e); | |
125 | events.erase(e); | |
126 | ||
127 | // Since we have only one thread it is impossible to have more | |
128 | // than one running event | |
129 | running = &e; | |
130 | ||
131 | l.unlock(); | |
132 | e.f(); | |
133 | l.lock(); | |
134 | ||
135 | if (running) { | |
136 | running = nullptr; | |
137 | delete &e; | |
138 | } // Otherwise the event requeued itself | |
139 | } | |
140 | ||
a8e16298 TL |
141 | if (suspended) |
142 | break; | |
7c673cae FG |
143 | if (schedule.empty()) |
144 | cond.wait(l); | |
145 | else | |
146 | cond.wait_until(l, schedule.begin()->t); | |
147 | } | |
148 | } | |
149 | ||
150 | public: | |
151 | timer() { | |
152 | lock_guard l(lock); | |
153 | suspended = false; | |
154 | thread = std::thread(&timer::timer_thread, this); | |
155 | } | |
156 | ||
157 | // Create a suspended timer, jobs will be executed in order when | |
158 | // it is resumed. | |
159 | timer(construct_suspended_t) { | |
160 | lock_guard l(lock); | |
161 | suspended = true; | |
162 | } | |
163 | ||
164 | timer(const timer &) = delete; | |
165 | timer& operator=(const timer &) = delete; | |
166 | ||
167 | ~timer() { | |
168 | suspend(); | |
169 | cancel_all_events(); | |
170 | } | |
171 | ||
172 | // Suspend operation of the timer (and let its thread die). | |
173 | void suspend() { | |
174 | unique_lock l(lock); | |
175 | if (suspended) | |
176 | return; | |
177 | ||
178 | suspended = true; | |
179 | cond.notify_one(); | |
180 | l.unlock(); | |
181 | thread.join(); | |
182 | } | |
183 | ||
184 | ||
185 | // Resume operation of the timer. (Must have been previously | |
186 | // suspended.) | |
187 | void resume() { | |
188 | unique_lock l(lock); | |
189 | if (!suspended) | |
190 | return; | |
191 | ||
192 | suspended = false; | |
193 | assert(!thread.joinable()); | |
194 | thread = std::thread(&timer::timer_thread, this); | |
195 | } | |
196 | ||
197 | // Schedule an event in the relative future | |
198 | template<typename Callable, typename... Args> | |
199 | uint64_t add_event(typename TC::duration duration, | |
200 | Callable&& f, Args&&... args) { | |
201 | typename TC::time_point when = TC::now(); | |
202 | when += duration; | |
203 | return add_event(when, | |
204 | std::forward<Callable>(f), | |
205 | std::forward<Args>(args)...); | |
206 | } | |
207 | ||
208 | // Schedule an event in the absolute future | |
209 | template<typename Callable, typename... Args> | |
210 | uint64_t add_event(typename TC::time_point when, | |
211 | Callable&& f, Args&&... args) { | |
212 | std::lock_guard<std::mutex> l(lock); | |
213 | event& e = *(new event( | |
214 | when, ++next_id, | |
215 | std::forward<std::function<void()> >( | |
216 | std::bind(std::forward<Callable>(f), | |
217 | std::forward<Args>(args)...)))); | |
218 | auto i = schedule.insert(e); | |
219 | events.insert(e); | |
220 | ||
221 | /* If the event we have just inserted comes before everything | |
222 | * else, we need to adjust our timeout. */ | |
223 | if (i.first == schedule.begin()) | |
224 | cond.notify_one(); | |
225 | ||
226 | // Previously each event was a context, identified by a | |
227 | // pointer, and each context to be called only once. Since you | |
228 | // can queue the same function pointer, member function, | |
229 | // lambda, or functor up multiple times, identifying things by | |
230 | // function for the purposes of cancellation is no longer | |
231 | // suitable. Thus: | |
232 | return e.id; | |
233 | } | |
234 | ||
235 | // Adjust the timeout of a currently-scheduled event (relative) | |
236 | bool adjust_event(uint64_t id, typename TC::duration duration) { | |
237 | return adjust_event(id, TC::now() + duration); | |
238 | } | |
239 | ||
240 | // Adjust the timeout of a currently-scheduled event (absolute) | |
241 | bool adjust_event(uint64_t id, typename TC::time_point when) { | |
242 | std::lock_guard<std::mutex> l(lock); | |
243 | ||
244 | event key(id); | |
245 | typename event_set_type::iterator it = events.find(key); | |
246 | ||
247 | if (it == events.end()) | |
248 | return false; | |
249 | ||
250 | event& e = *it; | |
251 | ||
252 | schedule.erase(e); | |
253 | e.t = when; | |
254 | schedule.insert(e); | |
255 | ||
256 | return true; | |
257 | } | |
258 | ||
259 | // Cancel an event. If the event has already come and gone (or you | |
260 | // never submitted it) you will receive false. Otherwise you will | |
261 | // receive true and it is guaranteed the event will not execute. | |
262 | bool cancel_event(const uint64_t id) { | |
263 | std::lock_guard<std::mutex> l(lock); | |
264 | event dummy(id); | |
265 | auto p = events.find(dummy); | |
266 | if (p == events.end()) { | |
267 | return false; | |
268 | } | |
269 | ||
270 | event& e = *p; | |
271 | events.erase(e); | |
272 | schedule.erase(e); | |
273 | delete &e; | |
274 | ||
275 | return true; | |
276 | } | |
277 | ||
278 | // Reschedules a currently running event in the relative | |
279 | // future. Must be called only from an event executed by this | |
280 | // timer. If you have a function that can be called either from | |
281 | // this timer or some other way, it is your responsibility to make | |
282 | // sure it can tell the difference only does not call | |
283 | // reschedule_me in the non-timer case. | |
284 | // | |
285 | // Returns an event id. If you had an event_id from the first | |
286 | // scheduling, replace it with this return value. | |
287 | uint64_t reschedule_me(typename TC::duration duration) { | |
288 | return reschedule_me(TC::now() + duration); | |
289 | } | |
290 | ||
291 | // Reschedules a currently running event in the absolute | |
292 | // future. Must be called only from an event executed by this | |
293 | // timer. if you have a function that can be called either from | |
294 | // this timer or some other way, it is your responsibility to make | |
295 | // sure it can tell the difference only does not call | |
296 | // reschedule_me in the non-timer case. | |
297 | // | |
298 | // Returns an event id. If you had an event_id from the first | |
299 | // scheduling, replace it with this return value. | |
300 | uint64_t reschedule_me(typename TC::time_point when) { | |
301 | if (std::this_thread::get_id() != thread.get_id()) | |
302 | throw std::make_error_condition(std::errc::operation_not_permitted); | |
303 | std::lock_guard<std::mutex> l(lock); | |
304 | running->t = when; | |
305 | uint64_t id = ++next_id; | |
306 | running->id = id; | |
307 | schedule.insert(*running); | |
308 | events.insert(*running); | |
309 | ||
310 | // Hacky, but keeps us from being deleted | |
311 | running = nullptr; | |
312 | ||
313 | // Same function, but you get a new ID. | |
314 | return id; | |
315 | } | |
316 | ||
317 | // Remove all events from the queue. | |
318 | void cancel_all_events() { | |
319 | std::lock_guard<std::mutex> l(lock); | |
320 | while (!events.empty()) { | |
321 | auto p = events.begin(); | |
322 | event& e = *p; | |
323 | schedule.erase(e); | |
324 | events.erase(e); | |
325 | delete &e; | |
326 | } | |
327 | } | |
328 | }; // timer | |
329 | }; // timer_detail | |
330 | ||
331 | using timer_detail::timer; | |
332 | }; // ceph | |
333 | ||
334 | #endif |