]> git.proxmox.com Git - ceph.git/blame - ceph/src/common/Finisher.h
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / common / Finisher.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#ifndef CEPH_FINISHER_H
16#define CEPH_FINISHER_H
17
11fdf7f2
TL
18#include "include/Context.h"
19#include "common/Thread.h"
20#include "common/ceph_mutex.h"
7c673cae 21#include "common/perf_counters.h"
11fdf7f2 22#include "common/Cond.h"
7c673cae
FG
23
24class CephContext;
25
26/// Finisher queue length performance counter ID.
27enum {
28 l_finisher_first = 997082,
29 l_finisher_queue_len,
30 l_finisher_complete_lat,
31 l_finisher_last
32};
33
34/** @brief Asynchronous cleanup class.
35 * Finisher asynchronously completes Contexts, which are simple classes
36 * representing callbacks, in a dedicated worker thread. Enqueuing
37 * contexts to complete is thread-safe.
38 */
39class Finisher {
40 CephContext *cct;
11fdf7f2
TL
41 ceph::mutex finisher_lock; ///< Protects access to queues and finisher_running.
42 ceph::condition_variable finisher_cond; ///< Signaled when there is something to process.
43 ceph::condition_variable finisher_empty_cond; ///< Signaled when the finisher has nothing more to process.
7c673cae
FG
44 bool finisher_stop; ///< Set when the finisher should stop.
45 bool finisher_running; ///< True when the finisher is currently executing contexts.
46 bool finisher_empty_wait; ///< True mean someone wait finisher empty.
11fdf7f2 47
7c673cae 48 /// Queue for contexts for which complete(0) will be called.
11fdf7f2 49 vector<pair<Context*,int>> finisher_queue;
7c673cae
FG
50
51 string thread_name;
52
7c673cae
FG
53 /// Performance counter for the finisher's queue length.
54 /// Only active for named finishers.
55 PerfCounters *logger;
56
57 void *finisher_thread_entry();
58
59 struct FinisherThread : public Thread {
60 Finisher *fin;
61 explicit FinisherThread(Finisher *f) : fin(f) {}
11fdf7f2 62 void* entry() override { return fin->finisher_thread_entry(); }
7c673cae
FG
63 } finisher_thread;
64
65 public:
66 /// Add a context to complete, optionally specifying a parameter for the complete function.
67 void queue(Context *c, int r = 0) {
11fdf7f2 68 std::unique_lock ul(finisher_lock);
7c673cae 69 if (finisher_queue.empty()) {
11fdf7f2 70 finisher_cond.notify_all();
7c673cae 71 }
11fdf7f2 72 finisher_queue.push_back(make_pair(c, r));
7c673cae
FG
73 if (logger)
74 logger->inc(l_finisher_queue_len);
7c673cae 75 }
11fdf7f2
TL
76
77 void queue(list<Context*>& ls) {
78 {
79 std::unique_lock ul(finisher_lock);
80 if (finisher_queue.empty()) {
81 finisher_cond.notify_all();
82 }
83 for (auto i : ls) {
84 finisher_queue.push_back(make_pair(i, 0));
85 }
86 if (logger)
87 logger->inc(l_finisher_queue_len, ls.size());
7c673cae 88 }
7c673cae
FG
89 ls.clear();
90 }
91 void queue(deque<Context*>& ls) {
11fdf7f2
TL
92 {
93 std::unique_lock ul(finisher_lock);
94 if (finisher_queue.empty()) {
95 finisher_cond.notify_all();
96 }
97 for (auto i : ls) {
98 finisher_queue.push_back(make_pair(i, 0));
99 }
100 if (logger)
101 logger->inc(l_finisher_queue_len, ls.size());
7c673cae 102 }
7c673cae
FG
103 ls.clear();
104 }
11fdf7f2
TL
105 void queue(vector<Context*>& ls) {
106 {
107 std::unique_lock ul(finisher_lock);
108 if (finisher_queue.empty()) {
109 finisher_cond.notify_all();
110 }
111 for (auto i : ls) {
112 finisher_queue.push_back(make_pair(i, 0));
113 }
114 if (logger)
115 logger->inc(l_finisher_queue_len, ls.size());
7c673cae 116 }
7c673cae
FG
117 ls.clear();
118 }
119
120 /// Start the worker thread.
121 void start();
122
123 /** @brief Stop the worker thread.
124 *
125 * Does not wait until all outstanding contexts are completed.
126 * To ensure that everything finishes, you should first shut down
127 * all sources that can add contexts to this finisher and call
128 * wait_for_empty() before calling stop(). */
129 void stop();
130
131 /** @brief Blocks until the finisher has nothing left to process.
132 * This function will also return when a concurrent call to stop()
133 * finishes, but this class should never be used in this way. */
134 void wait_for_empty();
135
136 /// Construct an anonymous Finisher.
137 /// Anonymous finishers do not log their queue length.
138 explicit Finisher(CephContext *cct_) :
11fdf7f2 139 cct(cct_), finisher_lock(ceph::make_mutex("Finisher::finisher_lock")),
7c673cae
FG
140 finisher_stop(false), finisher_running(false), finisher_empty_wait(false),
141 thread_name("fn_anonymous"), logger(0),
142 finisher_thread(this) {}
143
144 /// Construct a named Finisher that logs its queue length.
145 Finisher(CephContext *cct_, string name, string tn) :
11fdf7f2 146 cct(cct_), finisher_lock(ceph::make_mutex("Finisher::" + name)),
7c673cae
FG
147 finisher_stop(false), finisher_running(false), finisher_empty_wait(false),
148 thread_name(tn), logger(0),
149 finisher_thread(this) {
150 PerfCountersBuilder b(cct, string("finisher-") + name,
151 l_finisher_first, l_finisher_last);
152 b.add_u64(l_finisher_queue_len, "queue_len");
153 b.add_time_avg(l_finisher_complete_lat, "complete_latency");
154 logger = b.create_perf_counters();
155 cct->get_perfcounters_collection()->add(logger);
156 logger->set(l_finisher_queue_len, 0);
157 logger->set(l_finisher_complete_lat, 0);
158 }
159
160 ~Finisher() {
161 if (logger && cct) {
162 cct->get_perfcounters_collection()->remove(logger);
163 delete logger;
164 }
165 }
166};
167
168/// Context that is completed asynchronously on the supplied finisher.
169class C_OnFinisher : public Context {
170 Context *con;
171 Finisher *fin;
172public:
173 C_OnFinisher(Context *c, Finisher *f) : con(c), fin(f) {
11fdf7f2
TL
174 ceph_assert(fin != NULL);
175 ceph_assert(con != NULL);
7c673cae
FG
176 }
177
178 ~C_OnFinisher() override {
179 if (con != nullptr) {
180 delete con;
181 con = nullptr;
182 }
183 }
184
185 void finish(int r) override {
186 fin->queue(con, r);
187 con = nullptr;
188 }
189};
190
11fdf7f2
TL
191class ContextQueue {
192 list<Context *> q;
193 std::mutex q_mutex;
194 ceph::mutex& mutex;
195 ceph::condition_variable& cond;
196public:
197 ContextQueue(ceph::mutex& mut,
198 ceph::condition_variable& con)
199 : mutex(mut), cond(con) {}
200
201 void queue(list<Context *>& ls) {
202 bool empty = false;
203 {
204 std::scoped_lock l(q_mutex);
205 if (q.empty()) {
206 q.swap(ls);
207 empty = true;
208 } else {
209 q.insert(q.end(), ls.begin(), ls.end());
210 }
211 }
212
213 if (empty) {
214 std::scoped_lock l{mutex};
215 cond.notify_all();
216 }
217
218 ls.clear();
219 }
220
221 void swap(list<Context *>& ls) {
222 ls.clear();
223 std::scoped_lock l(q_mutex);
224 if (!q.empty()) {
225 q.swap(ls);
226 }
227 }
228
229 bool empty() {
230 std::scoped_lock l(q_mutex);
231 return q.empty();
232 }
233};
234
7c673cae 235#endif