1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #ifndef CEPH_FINISHER_H
16 #define CEPH_FINISHER_H
18 #include "include/Context.h"
19 #include "include/common_fwd.h"
20 #include "common/Thread.h"
21 #include "common/ceph_mutex.h"
22 #include "common/perf_counters.h"
23 #include "common/Cond.h"
26 /// Finisher queue length performance counter ID.
28 l_finisher_first
= 997082,
30 l_finisher_complete_lat
,
34 /** @brief Asynchronous cleanup class.
35 * Finisher asynchronously completes Contexts, which are simple classes
36 * representing callbacks, in a dedicated worker thread. Enqueuing
37 * contexts to complete is thread-safe.
41 ceph::mutex finisher_lock
; ///< Protects access to queues and finisher_running.
42 ceph::condition_variable finisher_cond
; ///< Signaled when there is something to process.
43 ceph::condition_variable finisher_empty_cond
; ///< Signaled when the finisher has nothing more to process.
44 bool finisher_stop
; ///< Set when the finisher should stop.
45 bool finisher_running
; ///< True when the finisher is currently executing contexts.
46 bool finisher_empty_wait
; ///< True mean someone wait finisher empty.
48 /// Queue for contexts for which complete(0) will be called.
49 std::vector
<std::pair
<Context
*,int>> finisher_queue
;
50 std::vector
<std::pair
<Context
*,int>> in_progress_queue
;
52 std::string thread_name
;
54 /// Performance counter for the finisher's queue length.
55 /// Only active for named finishers.
58 void *finisher_thread_entry();
60 struct FinisherThread
: public Thread
{
62 explicit FinisherThread(Finisher
*f
) : fin(f
) {}
63 void* entry() override
{ return fin
->finisher_thread_entry(); }
67 /// Add a context to complete, optionally specifying a parameter for the complete function.
68 void queue(Context
*c
, int r
= 0) {
69 std::unique_lock
ul(finisher_lock
);
70 bool was_empty
= finisher_queue
.empty();
71 finisher_queue
.push_back(std::make_pair(c
, r
));
73 finisher_cond
.notify_one();
76 logger
->inc(l_finisher_queue_len
);
79 void queue(std::list
<Context
*>& ls
) {
81 std::unique_lock
ul(finisher_lock
);
82 if (finisher_queue
.empty()) {
83 finisher_cond
.notify_all();
86 finisher_queue
.push_back(std::make_pair(i
, 0));
89 logger
->inc(l_finisher_queue_len
, ls
.size());
93 void queue(std::deque
<Context
*>& ls
) {
95 std::unique_lock
ul(finisher_lock
);
96 if (finisher_queue
.empty()) {
97 finisher_cond
.notify_all();
100 finisher_queue
.push_back(std::make_pair(i
, 0));
103 logger
->inc(l_finisher_queue_len
, ls
.size());
107 void queue(std::vector
<Context
*>& ls
) {
109 std::unique_lock
ul(finisher_lock
);
110 if (finisher_queue
.empty()) {
111 finisher_cond
.notify_all();
114 finisher_queue
.push_back(std::make_pair(i
, 0));
117 logger
->inc(l_finisher_queue_len
, ls
.size());
122 /// Start the worker thread.
125 /** @brief Stop the worker thread.
127 * Does not wait until all outstanding contexts are completed.
128 * To ensure that everything finishes, you should first shut down
129 * all sources that can add contexts to this finisher and call
130 * wait_for_empty() before calling stop(). */
133 /** @brief Blocks until the finisher has nothing left to process.
134 * This function will also return when a concurrent call to stop()
135 * finishes, but this class should never be used in this way. */
136 void wait_for_empty();
138 /// Construct an anonymous Finisher.
139 /// Anonymous finishers do not log their queue length.
140 explicit Finisher(CephContext
*cct_
) :
141 cct(cct_
), finisher_lock(ceph::make_mutex("Finisher::finisher_lock")),
142 finisher_stop(false), finisher_running(false), finisher_empty_wait(false),
143 thread_name("fn_anonymous"), logger(0),
144 finisher_thread(this) {}
146 /// Construct a named Finisher that logs its queue length.
147 Finisher(CephContext
*cct_
, std::string name
, std::string tn
) :
148 cct(cct_
), finisher_lock(ceph::make_mutex("Finisher::" + name
)),
149 finisher_stop(false), finisher_running(false), finisher_empty_wait(false),
150 thread_name(tn
), logger(0),
151 finisher_thread(this) {
152 PerfCountersBuilder
b(cct
, std::string("finisher-") + name
,
153 l_finisher_first
, l_finisher_last
);
154 b
.add_u64(l_finisher_queue_len
, "queue_len");
155 b
.add_time_avg(l_finisher_complete_lat
, "complete_latency");
156 logger
= b
.create_perf_counters();
157 cct
->get_perfcounters_collection()->add(logger
);
158 logger
->set(l_finisher_queue_len
, 0);
159 logger
->set(l_finisher_complete_lat
, 0);
164 cct
->get_perfcounters_collection()->remove(logger
);
170 /// Context that is completed asynchronously on the supplied finisher.
171 class C_OnFinisher
: public Context
{
175 C_OnFinisher(Context
*c
, Finisher
*f
) : con(c
), fin(f
) {
176 ceph_assert(fin
!= NULL
);
177 ceph_assert(con
!= NULL
);
180 ~C_OnFinisher() override
{
181 if (con
!= nullptr) {
187 void finish(int r
) override
{
194 std::list
<Context
*> q
;
197 ceph::condition_variable
& cond
;
198 std::atomic_bool q_empty
= true;
200 ContextQueue(ceph::mutex
& mut
,
201 ceph::condition_variable
& con
)
202 : mutex(mut
), cond(con
) {}
204 void queue(std::list
<Context
*>& ls
) {
205 bool was_empty
= false;
207 std::scoped_lock
l(q_mutex
);
212 q
.insert(q
.end(), ls
.begin(), ls
.end());
218 std::scoped_lock l
{mutex
};
225 void move_to(std::list
<Context
*>& ls
) {
227 std::scoped_lock
l(q_mutex
);