};
-
-typedef std::multimap < utime_t, Context *> scheduled_map_t;
-typedef std::map < Context*, scheduled_map_t::iterator > event_lookup_map_t;
-
-SafeTimer::SafeTimer(CephContext *cct_, Mutex &l, bool safe_callbacks)
+SafeTimer::SafeTimer(CephContext *cct_, ceph::mutex &l, bool safe_callbacks)
: cct(cct_), lock(l),
safe_callbacks(safe_callbacks),
thread(NULL),
{
ldout(cct,10) << "shutdown" << dendl;
if (thread) {
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
cancel_all_events();
stopping = true;
- cond.Signal();
+ cond.notify_all();
lock.unlock();
thread->join();
lock.lock();
void SafeTimer::timer_thread()
{
- lock.lock();
+ std::unique_lock l{lock};
ldout(cct,10) << "timer_thread starting" << dendl;
while (!stopping) {
- utime_t now = ceph_clock_now();
+ auto now = clock_t::now();
while (!schedule.empty()) {
- scheduled_map_t::iterator p = schedule.begin();
+ auto p = schedule.begin();
// is the future now?
if (p->first > now)
schedule.erase(p);
ldout(cct,10) << "timer_thread executing " << callback << dendl;
- if (!safe_callbacks)
- lock.unlock();
- callback->complete(0);
- if (!safe_callbacks)
- lock.lock();
+ if (!safe_callbacks) {
+ l.unlock();
+ callback->complete(0);
+ l.lock();
+ } else {
+ callback->complete(0);
+ }
}
// recheck stopping if we dropped the lock
break;
ldout(cct,20) << "timer_thread going to sleep" << dendl;
- if (schedule.empty())
- cond.Wait(lock);
- else
- cond.WaitUntil(lock, schedule.begin()->first);
+ if (schedule.empty()) {
+ cond.wait(l);
+ } else {
+ cond.wait_until(l, schedule.begin()->first);
+ }
ldout(cct,20) << "timer_thread awake" << dendl;
}
ldout(cct,10) << "timer_thread exiting" << dendl;
- lock.unlock();
}
Context* SafeTimer::add_event_after(double seconds, Context *callback)
{
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
- utime_t when = ceph_clock_now();
- when += seconds;
+ auto when = clock_t::now() + ceph::make_timespan(seconds);
return add_event_at(when, callback);
}
-Context* SafeTimer::add_event_at(utime_t when, Context *callback)
+Context* SafeTimer::add_event_at(SafeTimer::clock_t::time_point when, Context *callback)
{
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
ldout(cct,10) << __func__ << " " << when << " -> " << callback << dendl;
if (stopping) {
ldout(cct,5) << __func__ << " already shutdown, event not added" << dendl;
/* If the event we have just inserted comes before everything else, we need to
* adjust our timeout. */
if (i == schedule.begin())
- cond.Signal();
+ cond.notify_all();
return callback;
}
bool SafeTimer::cancel_event(Context *callback)
{
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
auto p = events.find(callback);
if (p == events.end()) {
void SafeTimer::cancel_all_events()
{
ldout(cct,10) << "cancel_all_events" << dendl;
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
while (!events.empty()) {
auto p = events.begin();