]> git.proxmox.com Git - ceph.git/blob - ceph/src/common/Timer.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / common / Timer.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include "Cond.h"
16 #include "Timer.h"
17
18
19 #define dout_subsys ceph_subsys_timer
20 #undef dout_prefix
21 #define dout_prefix *_dout << "timer(" << this << ")."
22
23 using std::pair;
24
25 using ceph::operator <<;
26
27 template <class Mutex>
28 class CommonSafeTimerThread : public Thread {
29 CommonSafeTimer<Mutex> *parent;
30 public:
31 explicit CommonSafeTimerThread(CommonSafeTimer<Mutex> *s) : parent(s) {}
32 void *entry() override {
33 parent->timer_thread();
34 return NULL;
35 }
36 };
37
38 template <class Mutex>
39 CommonSafeTimer<Mutex>::CommonSafeTimer(CephContext *cct_, Mutex &l, bool safe_callbacks)
40 : cct(cct_), lock(l),
41 safe_callbacks(safe_callbacks),
42 thread(NULL),
43 stopping(false)
44 {
45 }
46
47 template <class Mutex>
48 CommonSafeTimer<Mutex>::~CommonSafeTimer()
49 {
50 ceph_assert(thread == NULL);
51 }
52
53 template <class Mutex>
54 void CommonSafeTimer<Mutex>::init()
55 {
56 ldout(cct,10) << "init" << dendl;
57 thread = new CommonSafeTimerThread<Mutex>(this);
58 thread->create("safe_timer");
59 }
60
61 template <class Mutex>
62 void CommonSafeTimer<Mutex>::shutdown()
63 {
64 ldout(cct,10) << "shutdown" << dendl;
65 if (thread) {
66 ceph_assert(ceph_mutex_is_locked(lock));
67 cancel_all_events();
68 stopping = true;
69 cond.notify_all();
70 lock.unlock();
71 thread->join();
72 lock.lock();
73 delete thread;
74 thread = NULL;
75 }
76 }
77
78 template <class Mutex>
79 void CommonSafeTimer<Mutex>::timer_thread()
80 {
81 std::unique_lock l{lock};
82 ldout(cct,10) << "timer_thread starting" << dendl;
83 while (!stopping) {
84 auto now = clock_t::now();
85
86 while (!schedule.empty()) {
87 auto p = schedule.begin();
88
89 // is the future now?
90 if (p->first > now)
91 break;
92
93 Context *callback = p->second;
94 events.erase(callback);
95 schedule.erase(p);
96 ldout(cct,10) << "timer_thread executing " << callback << dendl;
97
98 if (!safe_callbacks) {
99 l.unlock();
100 callback->complete(0);
101 l.lock();
102 } else {
103 callback->complete(0);
104 }
105 }
106
107 // recheck stopping if we dropped the lock
108 if (!safe_callbacks && stopping)
109 break;
110
111 ldout(cct,20) << "timer_thread going to sleep" << dendl;
112 if (schedule.empty()) {
113 cond.wait(l);
114 } else {
115 auto when = schedule.begin()->first;
116 cond.wait_until(l, when);
117 }
118 ldout(cct,20) << "timer_thread awake" << dendl;
119 }
120 ldout(cct,10) << "timer_thread exiting" << dendl;
121 }
122
123 template <class Mutex>
124 Context* CommonSafeTimer<Mutex>::add_event_after(double seconds, Context *callback)
125 {
126 return add_event_after(ceph::make_timespan(seconds), callback);
127 }
128
129 template <class Mutex>
130 Context* CommonSafeTimer<Mutex>::add_event_after(ceph::timespan duration, Context *callback)
131 {
132 ceph_assert(ceph_mutex_is_locked(lock));
133
134 auto when = clock_t::now() + duration;
135 return add_event_at(when, callback);
136 }
137
138 template <class Mutex>
139 Context* CommonSafeTimer<Mutex>::add_event_at(CommonSafeTimer<Mutex>::clock_t::time_point when, Context *callback)
140 {
141 ceph_assert(ceph_mutex_is_locked(lock));
142 ldout(cct,10) << __func__ << " " << when << " -> " << callback << dendl;
143 if (stopping) {
144 ldout(cct,5) << __func__ << " already shutdown, event not added" << dendl;
145 delete callback;
146 return nullptr;
147 }
148 scheduled_map_t::value_type s_val(when, callback);
149 scheduled_map_t::iterator i = schedule.insert(s_val);
150
151 event_lookup_map_t::value_type e_val(callback, i);
152 pair < event_lookup_map_t::iterator, bool > rval(events.insert(e_val));
153
154 /* If you hit this, you tried to insert the same Context* twice. */
155 ceph_assert(rval.second);
156
157 /* If the event we have just inserted comes before everything else, we need to
158 * adjust our timeout. */
159 if (i == schedule.begin())
160 cond.notify_all();
161 return callback;
162 }
163
164 template <class Mutex>
165 Context* CommonSafeTimer<Mutex>::add_event_at(ceph::real_clock::time_point when, Context *callback)
166 {
167 ceph_assert(ceph_mutex_is_locked(lock));
168 // convert from real_clock to mono_clock
169 auto mono_now = ceph::mono_clock::now();
170 auto real_now = ceph::real_clock::now();
171 const auto delta = when - real_now;
172 const auto mono_atime = (mono_now +
173 std::chrono::ceil<clock_t::duration>(delta));
174 return add_event_at(mono_atime, callback);
175 }
176
177 template <class Mutex>
178 bool CommonSafeTimer<Mutex>::cancel_event(Context *callback)
179 {
180 ceph_assert(ceph_mutex_is_locked(lock));
181
182 auto p = events.find(callback);
183 if (p == events.end()) {
184 ldout(cct,10) << "cancel_event " << callback << " not found" << dendl;
185 return false;
186 }
187
188 ldout(cct,10) << "cancel_event " << p->second->first << " -> " << callback << dendl;
189 delete p->first;
190
191 schedule.erase(p->second);
192 events.erase(p);
193 return true;
194 }
195
196 template <class Mutex>
197 void CommonSafeTimer<Mutex>::cancel_all_events()
198 {
199 ldout(cct,10) << "cancel_all_events" << dendl;
200 ceph_assert(ceph_mutex_is_locked(lock));
201
202 while (!events.empty()) {
203 auto p = events.begin();
204 ldout(cct,10) << " cancelled " << p->second->first << " -> " << p->first << dendl;
205 delete p->first;
206 schedule.erase(p->second);
207 events.erase(p);
208 }
209 }
210
211 template <class Mutex>
212 void CommonSafeTimer<Mutex>::dump(const char *caller) const
213 {
214 if (!caller)
215 caller = "";
216 ldout(cct,10) << "dump " << caller << dendl;
217
218 for (scheduled_map_t::const_iterator s = schedule.begin();
219 s != schedule.end();
220 ++s)
221 ldout(cct,10) << " " << s->first << "->" << s->second << dendl;
222 }
223
224 template class CommonSafeTimer<ceph::mutex>;
225 template class CommonSafeTimer<ceph::fair_mutex>;