]> git.proxmox.com Git - ceph.git/blob - ceph/src/common/Finisher.h
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / common / Finisher.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef CEPH_FINISHER_H
16 #define CEPH_FINISHER_H
17
18 #include "include/atomic.h"
19 #include "common/Mutex.h"
20 #include "common/Cond.h"
21 #include "common/Thread.h"
22 #include "common/perf_counters.h"
23
24 class CephContext;
25
26 /// Finisher queue length performance counter ID.
27 enum {
28 l_finisher_first = 997082,
29 l_finisher_queue_len,
30 l_finisher_complete_lat,
31 l_finisher_last
32 };
33
34 /** @brief Asynchronous cleanup class.
35 * Finisher asynchronously completes Contexts, which are simple classes
36 * representing callbacks, in a dedicated worker thread. Enqueuing
37 * contexts to complete is thread-safe.
38 */
39 class Finisher {
40 CephContext *cct;
41 Mutex finisher_lock; ///< Protects access to queues and finisher_running.
42 Cond finisher_cond; ///< Signaled when there is something to process.
43 Cond finisher_empty_cond; ///< Signaled when the finisher has nothing more to process.
44 bool finisher_stop; ///< Set when the finisher should stop.
45 bool finisher_running; ///< True when the finisher is currently executing contexts.
46 bool finisher_empty_wait; ///< True mean someone wait finisher empty.
47 /// Queue for contexts for which complete(0) will be called.
48 /// NULLs in this queue indicate that an item from finisher_queue_rval
49 /// should be completed in that place instead.
50 vector<Context*> finisher_queue;
51
52 string thread_name;
53
54 /// Queue for contexts for which the complete function will be called
55 /// with a parameter other than 0.
56 list<pair<Context*,int> > finisher_queue_rval;
57
58 /// Performance counter for the finisher's queue length.
59 /// Only active for named finishers.
60 PerfCounters *logger;
61
62 void *finisher_thread_entry();
63
64 struct FinisherThread : public Thread {
65 Finisher *fin;
66 explicit FinisherThread(Finisher *f) : fin(f) {}
67 void* entry() override { return (void*)fin->finisher_thread_entry(); }
68 } finisher_thread;
69
70 public:
71 /// Add a context to complete, optionally specifying a parameter for the complete function.
72 void queue(Context *c, int r = 0) {
73 finisher_lock.Lock();
74 if (finisher_queue.empty()) {
75 finisher_cond.Signal();
76 }
77 if (r) {
78 finisher_queue_rval.push_back(pair<Context*, int>(c, r));
79 finisher_queue.push_back(NULL);
80 } else
81 finisher_queue.push_back(c);
82 if (logger)
83 logger->inc(l_finisher_queue_len);
84 finisher_lock.Unlock();
85 }
86 void queue(vector<Context*>& ls) {
87 finisher_lock.Lock();
88 if (finisher_queue.empty()) {
89 finisher_cond.Signal();
90 }
91 finisher_queue.insert(finisher_queue.end(), ls.begin(), ls.end());
92 if (logger)
93 logger->inc(l_finisher_queue_len, ls.size());
94 finisher_lock.Unlock();
95 ls.clear();
96 }
97 void queue(deque<Context*>& ls) {
98 finisher_lock.Lock();
99 if (finisher_queue.empty()) {
100 finisher_cond.Signal();
101 }
102 finisher_queue.insert(finisher_queue.end(), ls.begin(), ls.end());
103 if (logger)
104 logger->inc(l_finisher_queue_len, ls.size());
105 finisher_lock.Unlock();
106 ls.clear();
107 }
108 void queue(list<Context*>& ls) {
109 finisher_lock.Lock();
110 if (finisher_queue.empty()) {
111 finisher_cond.Signal();
112 }
113 finisher_queue.insert(finisher_queue.end(), ls.begin(), ls.end());
114 if (logger)
115 logger->inc(l_finisher_queue_len, ls.size());
116 finisher_lock.Unlock();
117 ls.clear();
118 }
119
120 /// Start the worker thread.
121 void start();
122
123 /** @brief Stop the worker thread.
124 *
125 * Does not wait until all outstanding contexts are completed.
126 * To ensure that everything finishes, you should first shut down
127 * all sources that can add contexts to this finisher and call
128 * wait_for_empty() before calling stop(). */
129 void stop();
130
131 /** @brief Blocks until the finisher has nothing left to process.
132 * This function will also return when a concurrent call to stop()
133 * finishes, but this class should never be used in this way. */
134 void wait_for_empty();
135
136 /// Construct an anonymous Finisher.
137 /// Anonymous finishers do not log their queue length.
138 explicit Finisher(CephContext *cct_) :
139 cct(cct_), finisher_lock("Finisher::finisher_lock"),
140 finisher_stop(false), finisher_running(false), finisher_empty_wait(false),
141 thread_name("fn_anonymous"), logger(0),
142 finisher_thread(this) {}
143
144 /// Construct a named Finisher that logs its queue length.
145 Finisher(CephContext *cct_, string name, string tn) :
146 cct(cct_), finisher_lock("Finisher::" + name),
147 finisher_stop(false), finisher_running(false), finisher_empty_wait(false),
148 thread_name(tn), logger(0),
149 finisher_thread(this) {
150 PerfCountersBuilder b(cct, string("finisher-") + name,
151 l_finisher_first, l_finisher_last);
152 b.add_u64(l_finisher_queue_len, "queue_len");
153 b.add_time_avg(l_finisher_complete_lat, "complete_latency");
154 logger = b.create_perf_counters();
155 cct->get_perfcounters_collection()->add(logger);
156 logger->set(l_finisher_queue_len, 0);
157 logger->set(l_finisher_complete_lat, 0);
158 }
159
160 ~Finisher() {
161 if (logger && cct) {
162 cct->get_perfcounters_collection()->remove(logger);
163 delete logger;
164 }
165 }
166 };
167
168 /// Context that is completed asynchronously on the supplied finisher.
169 class C_OnFinisher : public Context {
170 Context *con;
171 Finisher *fin;
172 public:
173 C_OnFinisher(Context *c, Finisher *f) : con(c), fin(f) {
174 assert(fin != NULL);
175 assert(con != NULL);
176 }
177
178 ~C_OnFinisher() override {
179 if (con != nullptr) {
180 delete con;
181 con = nullptr;
182 }
183 }
184
185 void finish(int r) override {
186 fin->queue(con, r);
187 con = nullptr;
188 }
189 };
190
191 #endif