]> git.proxmox.com Git - ceph.git/blame - ceph/src/common/Finisher.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / common / Finisher.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
7c673cae
FG
4#include "Finisher.h"
5
7c673cae
FG
6#define dout_subsys ceph_subsys_finisher
7#undef dout_prefix
8#define dout_prefix *_dout << "finisher(" << this << ") "
9
10void Finisher::start()
11{
12 ldout(cct, 10) << __func__ << dendl;
13 finisher_thread.create(thread_name.c_str());
14}
15
16void Finisher::stop()
17{
18 ldout(cct, 10) << __func__ << dendl;
11fdf7f2 19 finisher_lock.lock();
7c673cae
FG
20 finisher_stop = true;
21 // we don't have any new work to do, but we want the worker to wake up anyway
22 // to process the stop condition.
11fdf7f2
TL
23 finisher_cond.notify_all();
24 finisher_lock.unlock();
7c673cae
FG
25 finisher_thread.join(); // wait until the worker exits completely
26 ldout(cct, 10) << __func__ << " finish" << dendl;
27}
28
29void Finisher::wait_for_empty()
30{
11fdf7f2 31 std::unique_lock ul(finisher_lock);
7c673cae
FG
32 while (!finisher_queue.empty() || finisher_running) {
33 ldout(cct, 10) << "wait_for_empty waiting" << dendl;
34 finisher_empty_wait = true;
11fdf7f2 35 finisher_empty_cond.wait(ul);
7c673cae
FG
36 }
37 ldout(cct, 10) << "wait_for_empty empty" << dendl;
38 finisher_empty_wait = false;
7c673cae
FG
39}
40
41void *Finisher::finisher_thread_entry()
42{
11fdf7f2 43 std::unique_lock ul(finisher_lock);
7c673cae
FG
44 ldout(cct, 10) << "finisher_thread start" << dendl;
45
46 utime_t start;
47 uint64_t count = 0;
48 while (!finisher_stop) {
49 /// Every time we are woken up, we process the queue until it is empty.
50 while (!finisher_queue.empty()) {
51 // To reduce lock contention, we swap out the queue to process.
11fdf7f2
TL
52 // This way other threads can submit new contexts to complete
53 // while we are working.
9f95a23c 54 in_progress_queue.swap(finisher_queue);
7c673cae 55 finisher_running = true;
11fdf7f2 56 ul.unlock();
9f95a23c 57 ldout(cct, 10) << "finisher_thread doing " << in_progress_queue << dendl;
7c673cae
FG
58
59 if (logger) {
60 start = ceph_clock_now();
9f95a23c 61 count = in_progress_queue.size();
7c673cae
FG
62 }
63
64 // Now actually process the contexts.
9f95a23c 65 for (auto p : in_progress_queue) {
11fdf7f2 66 p.first->complete(p.second);
7c673cae 67 }
9f95a23c
TL
68 ldout(cct, 10) << "finisher_thread done with " << in_progress_queue
69 << dendl;
70 in_progress_queue.clear();
7c673cae
FG
71 if (logger) {
72 logger->dec(l_finisher_queue_len, count);
73 logger->tinc(l_finisher_complete_lat, ceph_clock_now() - start);
74 }
75
11fdf7f2 76 ul.lock();
7c673cae
FG
77 finisher_running = false;
78 }
79 ldout(cct, 10) << "finisher_thread empty" << dendl;
80 if (unlikely(finisher_empty_wait))
11fdf7f2 81 finisher_empty_cond.notify_all();
7c673cae
FG
82 if (finisher_stop)
83 break;
84
85 ldout(cct, 10) << "finisher_thread sleeping" << dendl;
11fdf7f2 86 finisher_cond.wait(ul);
7c673cae
FG
87 }
88 // If we are exiting, we signal the thread waiting in stop(),
89 // otherwise it would never unblock
11fdf7f2 90 finisher_empty_cond.notify_all();
7c673cae
FG
91
92 ldout(cct, 10) << "finisher_thread stop" << dendl;
93 finisher_stop = false;
7c673cae
FG
94 return 0;
95}
96