]> git.proxmox.com Git - ceph.git/blame - ceph/src/common/TrackedOp.h
update sources to v12.1.0
[ceph.git] / ceph / src / common / TrackedOp.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2012 New Dream Network/Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 */
13
14#ifndef TRACKEDREQUEST_H_
15#define TRACKEDREQUEST_H_
7c673cae 16
31f18b77 17#include <atomic>
7c673cae 18#include "common/histogram.h"
7c673cae 19#include "msg/Message.h"
7c673cae
FG
20#include "common/RWLock.h"
21
22#define OPTRACKER_PREALLOC_EVENTS 20
23
24class TrackedOp;
25typedef boost::intrusive_ptr<TrackedOp> TrackedOpRef;
26
27class OpHistory {
28 set<pair<utime_t, TrackedOpRef> > arrived;
29 set<pair<double, TrackedOpRef> > duration;
30 set<pair<utime_t, TrackedOpRef> > slow_op;
31 Mutex ops_history_lock;
32 void cleanup(utime_t now);
33 bool shutdown;
34 uint32_t history_size;
35 uint32_t history_duration;
36 uint32_t history_slow_op_size;
37 uint32_t history_slow_op_threshold;
38
39public:
40 OpHistory() : ops_history_lock("OpHistory::Lock"), shutdown(false),
41 history_size(0), history_duration(0),
42 history_slow_op_size(0), history_slow_op_threshold(0) {}
43 ~OpHistory() {
44 assert(arrived.empty());
45 assert(duration.empty());
46 assert(slow_op.empty());
47 }
48 void insert(utime_t now, TrackedOpRef op);
49 void dump_ops(utime_t now, Formatter *f);
50 void dump_ops_by_duration(utime_t now, Formatter *f);
51 void dump_slow_ops(utime_t now, Formatter *f);
52 void on_shutdown();
53 void set_size_and_duration(uint32_t new_size, uint32_t new_duration) {
54 history_size = new_size;
55 history_duration = new_duration;
56 }
57 void set_slow_op_size_and_threshold(uint32_t new_size, uint32_t new_threshold) {
58 history_slow_op_size = new_size;
59 history_slow_op_threshold = new_threshold;
60 }
61};
62
63struct ShardedTrackingData;
64class OpTracker {
65 friend class OpHistory;
31f18b77 66 std::atomic<int64_t> seq = { 0 };
7c673cae
FG
67 vector<ShardedTrackingData*> sharded_in_flight_list;
68 uint32_t num_optracker_shards;
69 OpHistory history;
70 float complaint_time;
71 int log_threshold;
72 bool tracking_enabled;
73 RWLock lock;
74
75public:
76 CephContext *cct;
77 OpTracker(CephContext *cct_, bool tracking, uint32_t num_shards);
78
79 void set_complaint_and_threshold(float time, int threshold) {
80 complaint_time = time;
81 log_threshold = threshold;
82 }
83 void set_history_size_and_duration(uint32_t new_size, uint32_t new_duration) {
84 history.set_size_and_duration(new_size, new_duration);
85 }
86 void set_history_slow_op_size_and_threshold(uint32_t new_size, uint32_t new_threshold) {
87 history.set_slow_op_size_and_threshold(new_size, new_threshold);
88 }
89 void set_tracking(bool enable) {
90 RWLock::WLocker l(lock);
91 tracking_enabled = enable;
92 }
93 bool dump_ops_in_flight(Formatter *f, bool print_only_blocked=false);
94 bool dump_historic_ops(Formatter *f, bool by_duration = false);
95 bool dump_historic_slow_ops(Formatter *f);
96 bool register_inflight_op(TrackedOp *i);
97 void unregister_inflight_op(TrackedOp *i);
98
99 void get_age_ms_histogram(pow2_hist_t *h);
100
101 /**
102 * Look for Ops which are too old, and insert warning
103 * strings for each Op that is too old.
104 *
105 * @param warning_strings A vector<string> reference which is filled
106 * with a warning string for each old Op.
107 * @return True if there are any Ops to warn on, false otherwise.
108 */
109 bool check_ops_in_flight(std::vector<string> &warning_strings,
110 int *slow = NULL);
111
112 void on_shutdown() {
113 history.on_shutdown();
114 }
115 ~OpTracker();
116
117 template <typename T, typename U>
118 typename T::Ref create_request(U params)
119 {
120 typename T::Ref retval(new T(params, this));
121 retval->tracking_start();
122 return retval;
123 }
124};
125
126
127class TrackedOp : public boost::intrusive::list_base_hook<> {
128private:
129 friend class OpHistory;
130 friend class OpTracker;
131
132 boost::intrusive::list_member_hook<> tracker_item;
133
134public:
135 typedef boost::intrusive::list<
136 TrackedOp,
137 boost::intrusive::member_hook<
138 TrackedOp,
139 boost::intrusive::list_member_hook<>,
140 &TrackedOp::tracker_item> > tracked_op_list_t;
141
142 // for use when clearing lists. e.g.,
143 // ls.clear_and_dispose(TrackedOp::Putter());
144 struct Putter {
145 void operator()(TrackedOp *op) {
146 op->put();
147 }
148 };
149
150protected:
151 OpTracker *tracker; ///< the tracker we are associated with
152 std::atomic_int nref = {0}; ///< ref count
153
154 utime_t initiated_at;
155
156 struct Event {
157 utime_t stamp;
158 string str;
159 const char *cstr = nullptr;
160
161 Event(utime_t t, const string& s) : stamp(t), str(s) {}
162 Event(utime_t t, const char *s) : stamp(t), cstr(s) {}
163
164 int compare(const char *s) const {
165 if (cstr)
166 return strcmp(cstr, s);
167 else
168 return str.compare(s);
169 }
170
171 const char *c_str() const {
172 if (cstr)
173 return cstr;
174 else
175 return str.c_str();
176 }
177
178 void dump(Formatter *f) const {
179 f->dump_stream("time") << stamp;
180 f->dump_string("event", c_str());
181 }
182 };
183
184 vector<Event> events; ///< list of events and their times
185 mutable Mutex lock = {"TrackedOp::lock"}; ///< to protect the events list
186 const char *current = 0; ///< the current state the event is in
187 uint64_t seq = 0; ///< a unique value set by the OpTracker
188
189 uint32_t warn_interval_multiplier = 1; //< limits output of a given op warning
190
191 enum {
192 STATE_UNTRACKED = 0,
193 STATE_LIVE,
194 STATE_HISTORY
195 };
196 atomic<int> state = {STATE_UNTRACKED};
197
198 mutable string desc_str; ///< protected by lock
199 mutable const char *desc = nullptr; ///< readable without lock
200 mutable atomic<bool> want_new_desc = {false};
201
202 TrackedOp(OpTracker *_tracker, const utime_t& initiated) :
203 tracker(_tracker),
204 initiated_at(initiated)
205 {
206 events.reserve(OPTRACKER_PREALLOC_EVENTS);
207 }
208
209 /// output any type-specific data you want to get when dump() is called
210 virtual void _dump(Formatter *f) const {}
211 /// if you want something else to happen when events are marked, implement
212 virtual void _event_marked() {}
213 /// return a unique descriptor of the Op; eg the message it's attached to
214 virtual void _dump_op_descriptor_unlocked(ostream& stream) const = 0;
215 /// called when the last non-OpTracker reference is dropped
216 virtual void _unregistered() {};
217
218public:
219 ZTracer::Trace osd_trace;
220 ZTracer::Trace pg_trace;
221 ZTracer::Trace store_trace;
222 ZTracer::Trace journal_trace;
223
224 virtual ~TrackedOp() {}
225
226 void get() {
227 ++nref;
228 }
229 void put() {
230 if (--nref == 0) {
231 switch (state.load()) {
232 case STATE_UNTRACKED:
233 _unregistered();
234 delete this;
235 break;
236
237 case STATE_LIVE:
238 mark_event("done");
239 tracker->unregister_inflight_op(this);
240 break;
241
242 case STATE_HISTORY:
243 delete this;
244 break;
245
246 default:
247 ceph_abort();
248 }
249 }
250 }
251
252 const char *get_desc() const {
253 if (!desc || want_new_desc.load()) {
254 Mutex::Locker l(lock);
255 _gen_desc();
256 }
257 return desc;
258 }
259private:
260 void _gen_desc() const {
261 ostringstream ss;
262 _dump_op_descriptor_unlocked(ss);
263 desc_str = ss.str();
264 desc = desc_str.c_str();
265 want_new_desc = false;
266 }
267public:
268 void reset_desc() {
269 want_new_desc = true;
270 }
271
272 const utime_t& get_initiated() const {
273 return initiated_at;
274 }
275
276 double get_duration() const {
277 Mutex::Locker l(lock);
278 if (!events.empty() && events.rbegin()->compare("done") == 0)
279 return events.rbegin()->stamp - get_initiated();
280 else
281 return ceph_clock_now() - get_initiated();
282 }
283
284 void mark_event_string(const string &event,
285 utime_t stamp=ceph_clock_now());
286 void mark_event(const char *event,
287 utime_t stamp=ceph_clock_now());
288
289 virtual const char *state_string() const {
290 Mutex::Locker l(lock);
291 return events.rbegin()->c_str();
292 }
293
294 void dump(utime_t now, Formatter *f) const;
295
296 void tracking_start() {
297 if (tracker->register_inflight_op(this)) {
298 events.push_back(Event(initiated_at, "initiated"));
299 state = STATE_LIVE;
300 }
301 }
302
303 // ref counting via intrusive_ptr, with special behavior on final
304 // put for historical op tracking
305 friend void intrusive_ptr_add_ref(TrackedOp *o) {
306 o->get();
307 }
308 friend void intrusive_ptr_release(TrackedOp *o) {
309 o->put();
310 }
311};
312
313
314#endif