]> git.proxmox.com Git - ceph.git/blame - ceph/src/common/Finisher.h
update sources to v12.1.0
[ceph.git] / ceph / src / common / Finisher.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#ifndef CEPH_FINISHER_H
16#define CEPH_FINISHER_H
17
7c673cae
FG
18#include "common/Mutex.h"
19#include "common/Cond.h"
7c673cae
FG
20#include "common/perf_counters.h"
21
22class CephContext;
23
24/// Finisher queue length performance counter ID.
25enum {
26 l_finisher_first = 997082,
27 l_finisher_queue_len,
28 l_finisher_complete_lat,
29 l_finisher_last
30};
31
32/** @brief Asynchronous cleanup class.
33 * Finisher asynchronously completes Contexts, which are simple classes
34 * representing callbacks, in a dedicated worker thread. Enqueuing
35 * contexts to complete is thread-safe.
36 */
37class Finisher {
38 CephContext *cct;
39 Mutex finisher_lock; ///< Protects access to queues and finisher_running.
40 Cond finisher_cond; ///< Signaled when there is something to process.
41 Cond finisher_empty_cond; ///< Signaled when the finisher has nothing more to process.
42 bool finisher_stop; ///< Set when the finisher should stop.
43 bool finisher_running; ///< True when the finisher is currently executing contexts.
44 bool finisher_empty_wait; ///< True mean someone wait finisher empty.
45 /// Queue for contexts for which complete(0) will be called.
46 /// NULLs in this queue indicate that an item from finisher_queue_rval
47 /// should be completed in that place instead.
48 vector<Context*> finisher_queue;
49
50 string thread_name;
51
52 /// Queue for contexts for which the complete function will be called
53 /// with a parameter other than 0.
54 list<pair<Context*,int> > finisher_queue_rval;
55
56 /// Performance counter for the finisher's queue length.
57 /// Only active for named finishers.
58 PerfCounters *logger;
59
60 void *finisher_thread_entry();
61
62 struct FinisherThread : public Thread {
63 Finisher *fin;
64 explicit FinisherThread(Finisher *f) : fin(f) {}
65 void* entry() override { return (void*)fin->finisher_thread_entry(); }
66 } finisher_thread;
67
68 public:
69 /// Add a context to complete, optionally specifying a parameter for the complete function.
70 void queue(Context *c, int r = 0) {
71 finisher_lock.Lock();
72 if (finisher_queue.empty()) {
73 finisher_cond.Signal();
74 }
75 if (r) {
76 finisher_queue_rval.push_back(pair<Context*, int>(c, r));
77 finisher_queue.push_back(NULL);
78 } else
79 finisher_queue.push_back(c);
80 if (logger)
81 logger->inc(l_finisher_queue_len);
82 finisher_lock.Unlock();
83 }
84 void queue(vector<Context*>& ls) {
85 finisher_lock.Lock();
86 if (finisher_queue.empty()) {
87 finisher_cond.Signal();
88 }
89 finisher_queue.insert(finisher_queue.end(), ls.begin(), ls.end());
90 if (logger)
91 logger->inc(l_finisher_queue_len, ls.size());
92 finisher_lock.Unlock();
93 ls.clear();
94 }
95 void queue(deque<Context*>& ls) {
96 finisher_lock.Lock();
97 if (finisher_queue.empty()) {
98 finisher_cond.Signal();
99 }
100 finisher_queue.insert(finisher_queue.end(), ls.begin(), ls.end());
101 if (logger)
102 logger->inc(l_finisher_queue_len, ls.size());
103 finisher_lock.Unlock();
104 ls.clear();
105 }
106 void queue(list<Context*>& ls) {
107 finisher_lock.Lock();
108 if (finisher_queue.empty()) {
109 finisher_cond.Signal();
110 }
111 finisher_queue.insert(finisher_queue.end(), ls.begin(), ls.end());
112 if (logger)
113 logger->inc(l_finisher_queue_len, ls.size());
114 finisher_lock.Unlock();
115 ls.clear();
116 }
117
118 /// Start the worker thread.
119 void start();
120
121 /** @brief Stop the worker thread.
122 *
123 * Does not wait until all outstanding contexts are completed.
124 * To ensure that everything finishes, you should first shut down
125 * all sources that can add contexts to this finisher and call
126 * wait_for_empty() before calling stop(). */
127 void stop();
128
129 /** @brief Blocks until the finisher has nothing left to process.
130 * This function will also return when a concurrent call to stop()
131 * finishes, but this class should never be used in this way. */
132 void wait_for_empty();
133
134 /// Construct an anonymous Finisher.
135 /// Anonymous finishers do not log their queue length.
136 explicit Finisher(CephContext *cct_) :
137 cct(cct_), finisher_lock("Finisher::finisher_lock"),
138 finisher_stop(false), finisher_running(false), finisher_empty_wait(false),
139 thread_name("fn_anonymous"), logger(0),
140 finisher_thread(this) {}
141
142 /// Construct a named Finisher that logs its queue length.
143 Finisher(CephContext *cct_, string name, string tn) :
144 cct(cct_), finisher_lock("Finisher::" + name),
145 finisher_stop(false), finisher_running(false), finisher_empty_wait(false),
146 thread_name(tn), logger(0),
147 finisher_thread(this) {
148 PerfCountersBuilder b(cct, string("finisher-") + name,
149 l_finisher_first, l_finisher_last);
150 b.add_u64(l_finisher_queue_len, "queue_len");
151 b.add_time_avg(l_finisher_complete_lat, "complete_latency");
152 logger = b.create_perf_counters();
153 cct->get_perfcounters_collection()->add(logger);
154 logger->set(l_finisher_queue_len, 0);
155 logger->set(l_finisher_complete_lat, 0);
156 }
157
158 ~Finisher() {
159 if (logger && cct) {
160 cct->get_perfcounters_collection()->remove(logger);
161 delete logger;
162 }
163 }
164};
165
166/// Context that is completed asynchronously on the supplied finisher.
167class C_OnFinisher : public Context {
168 Context *con;
169 Finisher *fin;
170public:
171 C_OnFinisher(Context *c, Finisher *f) : con(c), fin(f) {
172 assert(fin != NULL);
173 assert(con != NULL);
174 }
175
176 ~C_OnFinisher() override {
177 if (con != nullptr) {
178 delete con;
179 con = nullptr;
180 }
181 }
182
183 void finish(int r) override {
184 fin->queue(con, r);
185 con = nullptr;
186 }
187};
188
189#endif