]> git.proxmox.com Git - ceph.git/blob - ceph/src/common/Finisher.cc
d/control: depend on python3-yaml for ceph-mgr
[ceph.git] / ceph / src / common / Finisher.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "Finisher.h"
5
6 #define dout_subsys ceph_subsys_finisher
7 #undef dout_prefix
8 #define dout_prefix *_dout << "finisher(" << this << ") "
9
10 void Finisher::start()
11 {
12 ldout(cct, 10) << __func__ << dendl;
13 finisher_thread.create(thread_name.c_str());
14 }
15
16 void Finisher::stop()
17 {
18 ldout(cct, 10) << __func__ << dendl;
19 finisher_lock.lock();
20 finisher_stop = true;
21 // we don't have any new work to do, but we want the worker to wake up anyway
22 // to process the stop condition.
23 finisher_cond.notify_all();
24 finisher_lock.unlock();
25 finisher_thread.join(); // wait until the worker exits completely
26 ldout(cct, 10) << __func__ << " finish" << dendl;
27 }
28
29 void Finisher::wait_for_empty()
30 {
31 std::unique_lock ul(finisher_lock);
32 while (!finisher_queue.empty() || finisher_running) {
33 ldout(cct, 10) << "wait_for_empty waiting" << dendl;
34 finisher_empty_wait = true;
35 finisher_empty_cond.wait(ul);
36 }
37 ldout(cct, 10) << "wait_for_empty empty" << dendl;
38 finisher_empty_wait = false;
39 }
40
41 void *Finisher::finisher_thread_entry()
42 {
43 std::unique_lock ul(finisher_lock);
44 ldout(cct, 10) << "finisher_thread start" << dendl;
45
46 utime_t start;
47 uint64_t count = 0;
48 while (!finisher_stop) {
49 /// Every time we are woken up, we process the queue until it is empty.
50 while (!finisher_queue.empty()) {
51 // To reduce lock contention, we swap out the queue to process.
52 // This way other threads can submit new contexts to complete
53 // while we are working.
54 in_progress_queue.swap(finisher_queue);
55 finisher_running = true;
56 ul.unlock();
57 ldout(cct, 10) << "finisher_thread doing " << in_progress_queue << dendl;
58
59 if (logger) {
60 start = ceph_clock_now();
61 count = in_progress_queue.size();
62 }
63
64 // Now actually process the contexts.
65 for (auto p : in_progress_queue) {
66 p.first->complete(p.second);
67 }
68 ldout(cct, 10) << "finisher_thread done with " << in_progress_queue
69 << dendl;
70 in_progress_queue.clear();
71 if (logger) {
72 logger->dec(l_finisher_queue_len, count);
73 logger->tinc(l_finisher_complete_lat, ceph_clock_now() - start);
74 }
75
76 ul.lock();
77 finisher_running = false;
78 }
79 ldout(cct, 10) << "finisher_thread empty" << dendl;
80 if (unlikely(finisher_empty_wait))
81 finisher_empty_cond.notify_all();
82 if (finisher_stop)
83 break;
84
85 ldout(cct, 10) << "finisher_thread sleeping" << dendl;
86 finisher_cond.wait(ul);
87 }
88 // If we are exiting, we signal the thread waiting in stop(),
89 // otherwise it would never unblock
90 finisher_empty_cond.notify_all();
91
92 ldout(cct, 10) << "finisher_thread stop" << dendl;
93 finisher_stop = false;
94 return 0;
95 }
96