]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
7c673cae FG |
4 | #include "Finisher.h" |
5 | ||
7c673cae FG |
6 | #define dout_subsys ceph_subsys_finisher |
7 | #undef dout_prefix | |
8 | #define dout_prefix *_dout << "finisher(" << this << ") " | |
9 | ||
10 | void Finisher::start() | |
11 | { | |
12 | ldout(cct, 10) << __func__ << dendl; | |
13 | finisher_thread.create(thread_name.c_str()); | |
14 | } | |
15 | ||
16 | void Finisher::stop() | |
17 | { | |
18 | ldout(cct, 10) << __func__ << dendl; | |
11fdf7f2 | 19 | finisher_lock.lock(); |
7c673cae FG |
20 | finisher_stop = true; |
21 | // we don't have any new work to do, but we want the worker to wake up anyway | |
22 | // to process the stop condition. | |
11fdf7f2 TL |
23 | finisher_cond.notify_all(); |
24 | finisher_lock.unlock(); | |
7c673cae FG |
25 | finisher_thread.join(); // wait until the worker exits completely |
26 | ldout(cct, 10) << __func__ << " finish" << dendl; | |
27 | } | |
28 | ||
29 | void Finisher::wait_for_empty() | |
30 | { | |
11fdf7f2 | 31 | std::unique_lock ul(finisher_lock); |
7c673cae FG |
32 | while (!finisher_queue.empty() || finisher_running) { |
33 | ldout(cct, 10) << "wait_for_empty waiting" << dendl; | |
34 | finisher_empty_wait = true; | |
11fdf7f2 | 35 | finisher_empty_cond.wait(ul); |
7c673cae FG |
36 | } |
37 | ldout(cct, 10) << "wait_for_empty empty" << dendl; | |
38 | finisher_empty_wait = false; | |
7c673cae FG |
39 | } |
40 | ||
41 | void *Finisher::finisher_thread_entry() | |
42 | { | |
11fdf7f2 | 43 | std::unique_lock ul(finisher_lock); |
7c673cae FG |
44 | ldout(cct, 10) << "finisher_thread start" << dendl; |
45 | ||
46 | utime_t start; | |
47 | uint64_t count = 0; | |
48 | while (!finisher_stop) { | |
49 | /// Every time we are woken up, we process the queue until it is empty. | |
50 | while (!finisher_queue.empty()) { | |
51 | // To reduce lock contention, we swap out the queue to process. | |
11fdf7f2 TL |
52 | // This way other threads can submit new contexts to complete |
53 | // while we are working. | |
9f95a23c | 54 | in_progress_queue.swap(finisher_queue); |
7c673cae | 55 | finisher_running = true; |
11fdf7f2 | 56 | ul.unlock(); |
9f95a23c | 57 | ldout(cct, 10) << "finisher_thread doing " << in_progress_queue << dendl; |
7c673cae FG |
58 | |
59 | if (logger) { | |
60 | start = ceph_clock_now(); | |
9f95a23c | 61 | count = in_progress_queue.size(); |
7c673cae FG |
62 | } |
63 | ||
64 | // Now actually process the contexts. | |
9f95a23c | 65 | for (auto p : in_progress_queue) { |
11fdf7f2 | 66 | p.first->complete(p.second); |
7c673cae | 67 | } |
9f95a23c TL |
68 | ldout(cct, 10) << "finisher_thread done with " << in_progress_queue |
69 | << dendl; | |
70 | in_progress_queue.clear(); | |
7c673cae FG |
71 | if (logger) { |
72 | logger->dec(l_finisher_queue_len, count); | |
73 | logger->tinc(l_finisher_complete_lat, ceph_clock_now() - start); | |
74 | } | |
75 | ||
11fdf7f2 | 76 | ul.lock(); |
7c673cae FG |
77 | finisher_running = false; |
78 | } | |
79 | ldout(cct, 10) << "finisher_thread empty" << dendl; | |
80 | if (unlikely(finisher_empty_wait)) | |
11fdf7f2 | 81 | finisher_empty_cond.notify_all(); |
7c673cae FG |
82 | if (finisher_stop) |
83 | break; | |
84 | ||
85 | ldout(cct, 10) << "finisher_thread sleeping" << dendl; | |
11fdf7f2 | 86 | finisher_cond.wait(ul); |
7c673cae FG |
87 | } |
88 | // If we are exiting, we signal the thread waiting in stop(), | |
89 | // otherwise it would never unblock | |
11fdf7f2 | 90 | finisher_empty_cond.notify_all(); |
7c673cae FG |
91 | |
92 | ldout(cct, 10) << "finisher_thread stop" << dendl; | |
93 | finisher_stop = false; | |
7c673cae FG |
94 | return 0; |
95 | } | |
96 |