]> git.proxmox.com Git - ceph.git/blame - ceph/src/common/Finisher.h
import ceph quincy 17.2.4
[ceph.git] / ceph / src / common / Finisher.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#ifndef CEPH_FINISHER_H
16#define CEPH_FINISHER_H
17
11fdf7f2 18#include "include/Context.h"
9f95a23c 19#include "include/common_fwd.h"
11fdf7f2
TL
20#include "common/Thread.h"
21#include "common/ceph_mutex.h"
7c673cae 22#include "common/perf_counters.h"
11fdf7f2 23#include "common/Cond.h"
7c673cae 24
7c673cae
FG
25
26/// Finisher queue length performance counter ID.
27enum {
28 l_finisher_first = 997082,
29 l_finisher_queue_len,
30 l_finisher_complete_lat,
31 l_finisher_last
32};
33
34/** @brief Asynchronous cleanup class.
35 * Finisher asynchronously completes Contexts, which are simple classes
36 * representing callbacks, in a dedicated worker thread. Enqueuing
37 * contexts to complete is thread-safe.
38 */
39class Finisher {
40 CephContext *cct;
11fdf7f2
TL
41 ceph::mutex finisher_lock; ///< Protects access to queues and finisher_running.
42 ceph::condition_variable finisher_cond; ///< Signaled when there is something to process.
43 ceph::condition_variable finisher_empty_cond; ///< Signaled when the finisher has nothing more to process.
7c673cae
FG
44 bool finisher_stop; ///< Set when the finisher should stop.
45 bool finisher_running; ///< True when the finisher is currently executing contexts.
46 bool finisher_empty_wait; ///< True mean someone wait finisher empty.
11fdf7f2 47
7c673cae 48 /// Queue for contexts for which complete(0) will be called.
9f95a23c
TL
49 std::vector<std::pair<Context*,int>> finisher_queue;
50 std::vector<std::pair<Context*,int>> in_progress_queue;
7c673cae 51
9f95a23c 52 std::string thread_name;
7c673cae 53
7c673cae
FG
54 /// Performance counter for the finisher's queue length.
55 /// Only active for named finishers.
56 PerfCounters *logger;
9f95a23c 57
7c673cae
FG
58 void *finisher_thread_entry();
59
60 struct FinisherThread : public Thread {
9f95a23c 61 Finisher *fin;
7c673cae 62 explicit FinisherThread(Finisher *f) : fin(f) {}
11fdf7f2 63 void* entry() override { return fin->finisher_thread_entry(); }
7c673cae
FG
64 } finisher_thread;
65
66 public:
67 /// Add a context to complete, optionally specifying a parameter for the complete function.
68 void queue(Context *c, int r = 0) {
11fdf7f2 69 std::unique_lock ul(finisher_lock);
9f95a23c
TL
70 bool was_empty = finisher_queue.empty();
71 finisher_queue.push_back(std::make_pair(c, r));
72 if (was_empty) {
73 finisher_cond.notify_one();
7c673cae 74 }
7c673cae
FG
75 if (logger)
76 logger->inc(l_finisher_queue_len);
7c673cae 77 }
11fdf7f2 78
9f95a23c 79 void queue(std::list<Context*>& ls) {
11fdf7f2
TL
80 {
81 std::unique_lock ul(finisher_lock);
82 if (finisher_queue.empty()) {
83 finisher_cond.notify_all();
84 }
85 for (auto i : ls) {
9f95a23c 86 finisher_queue.push_back(std::make_pair(i, 0));
11fdf7f2
TL
87 }
88 if (logger)
89 logger->inc(l_finisher_queue_len, ls.size());
7c673cae 90 }
7c673cae
FG
91 ls.clear();
92 }
9f95a23c 93 void queue(std::deque<Context*>& ls) {
11fdf7f2
TL
94 {
95 std::unique_lock ul(finisher_lock);
96 if (finisher_queue.empty()) {
97 finisher_cond.notify_all();
98 }
99 for (auto i : ls) {
9f95a23c 100 finisher_queue.push_back(std::make_pair(i, 0));
11fdf7f2
TL
101 }
102 if (logger)
103 logger->inc(l_finisher_queue_len, ls.size());
7c673cae 104 }
7c673cae
FG
105 ls.clear();
106 }
9f95a23c 107 void queue(std::vector<Context*>& ls) {
11fdf7f2
TL
108 {
109 std::unique_lock ul(finisher_lock);
110 if (finisher_queue.empty()) {
111 finisher_cond.notify_all();
112 }
113 for (auto i : ls) {
9f95a23c 114 finisher_queue.push_back(std::make_pair(i, 0));
11fdf7f2
TL
115 }
116 if (logger)
117 logger->inc(l_finisher_queue_len, ls.size());
7c673cae 118 }
7c673cae
FG
119 ls.clear();
120 }
121
122 /// Start the worker thread.
123 void start();
124
125 /** @brief Stop the worker thread.
126 *
127 * Does not wait until all outstanding contexts are completed.
128 * To ensure that everything finishes, you should first shut down
129 * all sources that can add contexts to this finisher and call
130 * wait_for_empty() before calling stop(). */
131 void stop();
132
133 /** @brief Blocks until the finisher has nothing left to process.
134 * This function will also return when a concurrent call to stop()
135 * finishes, but this class should never be used in this way. */
136 void wait_for_empty();
137
138 /// Construct an anonymous Finisher.
139 /// Anonymous finishers do not log their queue length.
140 explicit Finisher(CephContext *cct_) :
11fdf7f2 141 cct(cct_), finisher_lock(ceph::make_mutex("Finisher::finisher_lock")),
7c673cae
FG
142 finisher_stop(false), finisher_running(false), finisher_empty_wait(false),
143 thread_name("fn_anonymous"), logger(0),
144 finisher_thread(this) {}
145
146 /// Construct a named Finisher that logs its queue length.
9f95a23c 147 Finisher(CephContext *cct_, std::string name, std::string tn) :
11fdf7f2 148 cct(cct_), finisher_lock(ceph::make_mutex("Finisher::" + name)),
7c673cae
FG
149 finisher_stop(false), finisher_running(false), finisher_empty_wait(false),
150 thread_name(tn), logger(0),
151 finisher_thread(this) {
9f95a23c 152 PerfCountersBuilder b(cct, std::string("finisher-") + name,
7c673cae
FG
153 l_finisher_first, l_finisher_last);
154 b.add_u64(l_finisher_queue_len, "queue_len");
155 b.add_time_avg(l_finisher_complete_lat, "complete_latency");
156 logger = b.create_perf_counters();
157 cct->get_perfcounters_collection()->add(logger);
158 logger->set(l_finisher_queue_len, 0);
159 logger->set(l_finisher_complete_lat, 0);
160 }
161
162 ~Finisher() {
163 if (logger && cct) {
164 cct->get_perfcounters_collection()->remove(logger);
165 delete logger;
166 }
167 }
168};
169
170/// Context that is completed asynchronously on the supplied finisher.
171class C_OnFinisher : public Context {
172 Context *con;
173 Finisher *fin;
174public:
175 C_OnFinisher(Context *c, Finisher *f) : con(c), fin(f) {
11fdf7f2
TL
176 ceph_assert(fin != NULL);
177 ceph_assert(con != NULL);
7c673cae
FG
178 }
179
180 ~C_OnFinisher() override {
181 if (con != nullptr) {
182 delete con;
183 con = nullptr;
184 }
185 }
186
187 void finish(int r) override {
188 fin->queue(con, r);
189 con = nullptr;
190 }
191};
192
11fdf7f2 193class ContextQueue {
9f95a23c 194 std::list<Context *> q;
11fdf7f2
TL
195 std::mutex q_mutex;
196 ceph::mutex& mutex;
197 ceph::condition_variable& cond;
9f95a23c 198 std::atomic_bool q_empty = true;
11fdf7f2
TL
199public:
200 ContextQueue(ceph::mutex& mut,
201 ceph::condition_variable& con)
202 : mutex(mut), cond(con) {}
203
9f95a23c
TL
204 void queue(std::list<Context *>& ls) {
205 bool was_empty = false;
11fdf7f2
TL
206 {
207 std::scoped_lock l(q_mutex);
208 if (q.empty()) {
209 q.swap(ls);
9f95a23c 210 was_empty = true;
11fdf7f2
TL
211 } else {
212 q.insert(q.end(), ls.begin(), ls.end());
213 }
9f95a23c 214 q_empty = q.empty();
11fdf7f2
TL
215 }
216
9f95a23c 217 if (was_empty) {
11fdf7f2
TL
218 std::scoped_lock l{mutex};
219 cond.notify_all();
220 }
221
222 ls.clear();
223 }
224
9f95a23c 225 void move_to(std::list<Context *>& ls) {
11fdf7f2
TL
226 ls.clear();
227 std::scoped_lock l(q_mutex);
228 if (!q.empty()) {
229 q.swap(ls);
230 }
9f95a23c 231 q_empty = true;
11fdf7f2
TL
232 }
233
234 bool empty() {
9f95a23c 235 return q_empty;
11fdf7f2
TL
236 }
237};
238
7c673cae 239#endif