]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | #include "Cond.h" | |
7c673cae FG |
16 | #include "Timer.h" |
17 | ||
7c673cae FG |
18 | |
19 | #define dout_subsys ceph_subsys_timer | |
20 | #undef dout_prefix | |
21 | #define dout_prefix *_dout << "timer(" << this << ")." | |
22 | ||
7c673cae FG |
23 | |
24 | ||
25 | class SafeTimerThread : public Thread { | |
26 | SafeTimer *parent; | |
27 | public: | |
28 | explicit SafeTimerThread(SafeTimer *s) : parent(s) {} | |
29 | void *entry() override { | |
30 | parent->timer_thread(); | |
31 | return NULL; | |
32 | } | |
33 | }; | |
34 | ||
35 | ||
36 | ||
37 | typedef std::multimap < utime_t, Context *> scheduled_map_t; | |
38 | typedef std::map < Context*, scheduled_map_t::iterator > event_lookup_map_t; | |
39 | ||
40 | SafeTimer::SafeTimer(CephContext *cct_, Mutex &l, bool safe_callbacks) | |
41 | : cct(cct_), lock(l), | |
42 | safe_callbacks(safe_callbacks), | |
43 | thread(NULL), | |
44 | stopping(false) | |
45 | { | |
46 | } | |
47 | ||
48 | SafeTimer::~SafeTimer() | |
49 | { | |
50 | assert(thread == NULL); | |
51 | } | |
52 | ||
53 | void SafeTimer::init() | |
54 | { | |
55 | ldout(cct,10) << "init" << dendl; | |
56 | thread = new SafeTimerThread(this); | |
57 | thread->create("safe_timer"); | |
58 | } | |
59 | ||
60 | void SafeTimer::shutdown() | |
61 | { | |
62 | ldout(cct,10) << "shutdown" << dendl; | |
63 | if (thread) { | |
64 | assert(lock.is_locked()); | |
65 | cancel_all_events(); | |
66 | stopping = true; | |
67 | cond.Signal(); | |
68 | lock.Unlock(); | |
69 | thread->join(); | |
70 | lock.Lock(); | |
71 | delete thread; | |
72 | thread = NULL; | |
73 | } | |
74 | } | |
75 | ||
76 | void SafeTimer::timer_thread() | |
77 | { | |
78 | lock.Lock(); | |
79 | ldout(cct,10) << "timer_thread starting" << dendl; | |
80 | while (!stopping) { | |
81 | utime_t now = ceph_clock_now(); | |
82 | ||
83 | while (!schedule.empty()) { | |
84 | scheduled_map_t::iterator p = schedule.begin(); | |
85 | ||
86 | // is the future now? | |
87 | if (p->first > now) | |
88 | break; | |
89 | ||
90 | Context *callback = p->second; | |
91 | events.erase(callback); | |
92 | schedule.erase(p); | |
93 | ldout(cct,10) << "timer_thread executing " << callback << dendl; | |
94 | ||
95 | if (!safe_callbacks) | |
96 | lock.Unlock(); | |
97 | callback->complete(0); | |
98 | if (!safe_callbacks) | |
99 | lock.Lock(); | |
100 | } | |
101 | ||
102 | // recheck stopping if we dropped the lock | |
103 | if (!safe_callbacks && stopping) | |
104 | break; | |
105 | ||
106 | ldout(cct,20) << "timer_thread going to sleep" << dendl; | |
107 | if (schedule.empty()) | |
108 | cond.Wait(lock); | |
109 | else | |
110 | cond.WaitUntil(lock, schedule.begin()->first); | |
111 | ldout(cct,20) << "timer_thread awake" << dendl; | |
112 | } | |
113 | ldout(cct,10) << "timer_thread exiting" << dendl; | |
114 | lock.Unlock(); | |
115 | } | |
116 | ||
117 | void SafeTimer::add_event_after(double seconds, Context *callback) | |
118 | { | |
119 | assert(lock.is_locked()); | |
120 | ||
121 | utime_t when = ceph_clock_now(); | |
122 | when += seconds; | |
123 | add_event_at(when, callback); | |
124 | } | |
125 | ||
126 | void SafeTimer::add_event_at(utime_t when, Context *callback) | |
127 | { | |
128 | assert(lock.is_locked()); | |
129 | ldout(cct,10) << "add_event_at " << when << " -> " << callback << dendl; | |
130 | ||
131 | scheduled_map_t::value_type s_val(when, callback); | |
132 | scheduled_map_t::iterator i = schedule.insert(s_val); | |
133 | ||
134 | event_lookup_map_t::value_type e_val(callback, i); | |
135 | pair < event_lookup_map_t::iterator, bool > rval(events.insert(e_val)); | |
136 | ||
137 | /* If you hit this, you tried to insert the same Context* twice. */ | |
138 | assert(rval.second); | |
139 | ||
140 | /* If the event we have just inserted comes before everything else, we need to | |
141 | * adjust our timeout. */ | |
142 | if (i == schedule.begin()) | |
143 | cond.Signal(); | |
144 | ||
145 | } | |
146 | ||
147 | bool SafeTimer::cancel_event(Context *callback) | |
148 | { | |
149 | assert(lock.is_locked()); | |
150 | ||
151 | auto p = events.find(callback); | |
152 | if (p == events.end()) { | |
153 | ldout(cct,10) << "cancel_event " << callback << " not found" << dendl; | |
154 | return false; | |
155 | } | |
156 | ||
157 | ldout(cct,10) << "cancel_event " << p->second->first << " -> " << callback << dendl; | |
158 | delete p->first; | |
159 | ||
160 | schedule.erase(p->second); | |
161 | events.erase(p); | |
162 | return true; | |
163 | } | |
164 | ||
165 | void SafeTimer::cancel_all_events() | |
166 | { | |
167 | ldout(cct,10) << "cancel_all_events" << dendl; | |
168 | assert(lock.is_locked()); | |
169 | ||
170 | while (!events.empty()) { | |
171 | auto p = events.begin(); | |
172 | ldout(cct,10) << " cancelled " << p->second->first << " -> " << p->first << dendl; | |
173 | delete p->first; | |
174 | schedule.erase(p->second); | |
175 | events.erase(p); | |
176 | } | |
177 | } | |
178 | ||
179 | void SafeTimer::dump(const char *caller) const | |
180 | { | |
181 | if (!caller) | |
182 | caller = ""; | |
183 | ldout(cct,10) << "dump " << caller << dendl; | |
184 | ||
185 | for (scheduled_map_t::const_iterator s = schedule.begin(); | |
186 | s != schedule.end(); | |
187 | ++s) | |
188 | ldout(cct,10) << " " << s->first << "->" << s->second << dendl; | |
189 | } |