]> git.proxmox.com Git - ceph.git/blob - ceph/src/common/Finisher.cc
d7220d10cd00bb93d3ee494d4a1c598551fa5bfa
[ceph.git] / ceph / src / common / Finisher.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "common/config.h"
5 #include "Finisher.h"
6
7 #include "common/debug.h"
8 #define dout_subsys ceph_subsys_finisher
9 #undef dout_prefix
10 #define dout_prefix *_dout << "finisher(" << this << ") "
11
12 void Finisher::start()
13 {
14 ldout(cct, 10) << __func__ << dendl;
15 finisher_thread.create(thread_name.c_str());
16 }
17
18 void Finisher::stop()
19 {
20 ldout(cct, 10) << __func__ << dendl;
21 finisher_lock.Lock();
22 finisher_stop = true;
23 // we don't have any new work to do, but we want the worker to wake up anyway
24 // to process the stop condition.
25 finisher_cond.Signal();
26 finisher_lock.Unlock();
27 finisher_thread.join(); // wait until the worker exits completely
28 ldout(cct, 10) << __func__ << " finish" << dendl;
29 }
30
31 void Finisher::wait_for_empty()
32 {
33 finisher_lock.Lock();
34 while (!finisher_queue.empty() || finisher_running) {
35 ldout(cct, 10) << "wait_for_empty waiting" << dendl;
36 finisher_empty_wait = true;
37 finisher_empty_cond.Wait(finisher_lock);
38 }
39 ldout(cct, 10) << "wait_for_empty empty" << dendl;
40 finisher_empty_wait = false;
41 finisher_lock.Unlock();
42 }
43
44 void *Finisher::finisher_thread_entry()
45 {
46 finisher_lock.Lock();
47 ldout(cct, 10) << "finisher_thread start" << dendl;
48
49 utime_t start;
50 uint64_t count = 0;
51 while (!finisher_stop) {
52 /// Every time we are woken up, we process the queue until it is empty.
53 while (!finisher_queue.empty()) {
54 // To reduce lock contention, we swap out the queue to process.
55 // This way other threads can submit new contexts to complete while we are working.
56 vector<Context*> ls;
57 list<pair<Context*,int> > ls_rval;
58 ls.swap(finisher_queue);
59 ls_rval.swap(finisher_queue_rval);
60 finisher_running = true;
61 finisher_lock.Unlock();
62 ldout(cct, 10) << "finisher_thread doing " << ls << dendl;
63
64 if (logger) {
65 start = ceph_clock_now();
66 count = ls.size();
67 }
68
69 // Now actually process the contexts.
70 for (vector<Context*>::iterator p = ls.begin();
71 p != ls.end();
72 ++p) {
73 if (*p) {
74 (*p)->complete(0);
75 } else {
76 // When an item is NULL in the finisher_queue, it means
77 // we should instead process an item from finisher_queue_rval,
78 // which has a parameter for complete() other than zero.
79 // This preserves the order while saving some storage.
80 assert(!ls_rval.empty());
81 Context *c = ls_rval.front().first;
82 c->complete(ls_rval.front().second);
83 ls_rval.pop_front();
84 }
85 }
86 ldout(cct, 10) << "finisher_thread done with " << ls << dendl;
87 ls.clear();
88 if (logger) {
89 logger->dec(l_finisher_queue_len, count);
90 logger->tinc(l_finisher_complete_lat, ceph_clock_now() - start);
91 }
92
93 finisher_lock.Lock();
94 finisher_running = false;
95 }
96 ldout(cct, 10) << "finisher_thread empty" << dendl;
97 if (unlikely(finisher_empty_wait))
98 finisher_empty_cond.Signal();
99 if (finisher_stop)
100 break;
101
102 ldout(cct, 10) << "finisher_thread sleeping" << dendl;
103 finisher_cond.Wait(finisher_lock);
104 }
105 // If we are exiting, we signal the thread waiting in stop(),
106 // otherwise it would never unblock
107 finisher_empty_cond.Signal();
108
109 ldout(cct, 10) << "finisher_thread stop" << dendl;
110 finisher_stop = false;
111 finisher_lock.Unlock();
112 return 0;
113 }
114