]> git.proxmox.com Git - ceph.git/blob - ceph/src/common/TrackedOp.h
9435a1913c063febbb74ad8da243062dd5aa5b50
[ceph.git] / ceph / src / common / TrackedOp.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2012 New Dream Network/Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 */
13
14 #ifndef TRACKEDREQUEST_H_
15 #define TRACKEDREQUEST_H_
16
17 #include <atomic>
18 #include "common/histogram.h"
19 #include "msg/Message.h"
20 #include "common/RWLock.h"
21
22 #define OPTRACKER_PREALLOC_EVENTS 20
23
24 class TrackedOp;
25 typedef boost::intrusive_ptr<TrackedOp> TrackedOpRef;
26
27 class OpHistory {
28 set<pair<utime_t, TrackedOpRef> > arrived;
29 set<pair<double, TrackedOpRef> > duration;
30 set<pair<utime_t, TrackedOpRef> > slow_op;
31 Mutex ops_history_lock;
32 void cleanup(utime_t now);
33 bool shutdown;
34 uint32_t history_size;
35 uint32_t history_duration;
36 uint32_t history_slow_op_size;
37 uint32_t history_slow_op_threshold;
38
39 public:
40 OpHistory() : ops_history_lock("OpHistory::Lock"), shutdown(false),
41 history_size(0), history_duration(0),
42 history_slow_op_size(0), history_slow_op_threshold(0) {}
43 ~OpHistory() {
44 assert(arrived.empty());
45 assert(duration.empty());
46 assert(slow_op.empty());
47 }
48 void insert(utime_t now, TrackedOpRef op);
49 void dump_ops(utime_t now, Formatter *f, set<string> filters = {""});
50 void dump_ops_by_duration(utime_t now, Formatter *f, set<string> filters = {""});
51 void dump_slow_ops(utime_t now, Formatter *f, set<string> filters = {""});
52 void on_shutdown();
53 void set_size_and_duration(uint32_t new_size, uint32_t new_duration) {
54 history_size = new_size;
55 history_duration = new_duration;
56 }
57 void set_slow_op_size_and_threshold(uint32_t new_size, uint32_t new_threshold) {
58 history_slow_op_size = new_size;
59 history_slow_op_threshold = new_threshold;
60 }
61 };
62
63 struct ShardedTrackingData;
64 class OpTracker {
65 friend class OpHistory;
66 std::atomic<int64_t> seq = { 0 };
67 vector<ShardedTrackingData*> sharded_in_flight_list;
68 uint32_t num_optracker_shards;
69 OpHistory history;
70 float complaint_time;
71 int log_threshold;
72 bool tracking_enabled;
73 RWLock lock;
74
75 public:
76 CephContext *cct;
77 OpTracker(CephContext *cct_, bool tracking, uint32_t num_shards);
78
79 void set_complaint_and_threshold(float time, int threshold) {
80 complaint_time = time;
81 log_threshold = threshold;
82 }
83 void set_history_size_and_duration(uint32_t new_size, uint32_t new_duration) {
84 history.set_size_and_duration(new_size, new_duration);
85 }
86 void set_history_slow_op_size_and_threshold(uint32_t new_size, uint32_t new_threshold) {
87 history.set_slow_op_size_and_threshold(new_size, new_threshold);
88 }
89 void set_tracking(bool enable) {
90 RWLock::WLocker l(lock);
91 tracking_enabled = enable;
92 }
93 bool dump_ops_in_flight(Formatter *f, bool print_only_blocked = false, set<string> filters = {""});
94 bool dump_historic_ops(Formatter *f, bool by_duration = false, set<string> filters = {""});
95 bool dump_historic_slow_ops(Formatter *f, set<string> filters = {""});
96 bool register_inflight_op(TrackedOp *i);
97 void unregister_inflight_op(TrackedOp *i);
98
99 void get_age_ms_histogram(pow2_hist_t *h);
100
101 /**
102 * Look for Ops which are too old, and insert warning
103 * strings for each Op that is too old.
104 *
105 * @param warning_strings A vector<string> reference which is filled
106 * with a warning string for each old Op.
107 * @return True if there are any Ops to warn on, false otherwise.
108 */
109 bool check_ops_in_flight(std::vector<string> &warning_strings,
110 int *slow = NULL);
111
112 void on_shutdown() {
113 history.on_shutdown();
114 }
115 ~OpTracker();
116
117 template <typename T, typename U>
118 typename T::Ref create_request(U params)
119 {
120 typename T::Ref retval(new T(params, this));
121 retval->tracking_start();
122 return retval;
123 }
124 };
125
126
127 class TrackedOp : public boost::intrusive::list_base_hook<> {
128 private:
129 friend class OpHistory;
130 friend class OpTracker;
131
132 boost::intrusive::list_member_hook<> tracker_item;
133
134 public:
135 typedef boost::intrusive::list<
136 TrackedOp,
137 boost::intrusive::member_hook<
138 TrackedOp,
139 boost::intrusive::list_member_hook<>,
140 &TrackedOp::tracker_item> > tracked_op_list_t;
141
142 // for use when clearing lists. e.g.,
143 // ls.clear_and_dispose(TrackedOp::Putter());
144 struct Putter {
145 void operator()(TrackedOp *op) {
146 op->put();
147 }
148 };
149
150 protected:
151 OpTracker *tracker; ///< the tracker we are associated with
152 std::atomic_int nref = {0}; ///< ref count
153
154 utime_t initiated_at;
155
156 struct Event {
157 utime_t stamp;
158 string str;
159 const char *cstr = nullptr;
160
161 Event(utime_t t, const string& s) : stamp(t), str(s) {}
162 Event(utime_t t, const char *s) : stamp(t), cstr(s) {}
163
164 int compare(const char *s) const {
165 if (cstr)
166 return strcmp(cstr, s);
167 else
168 return str.compare(s);
169 }
170
171 const char *c_str() const {
172 if (cstr)
173 return cstr;
174 else
175 return str.c_str();
176 }
177
178 void dump(Formatter *f) const {
179 f->dump_stream("time") << stamp;
180 f->dump_string("event", c_str());
181 }
182 };
183
184 vector<Event> events; ///< list of events and their times
185 mutable Mutex lock = {"TrackedOp::lock"}; ///< to protect the events list
186 const char *current = 0; ///< the current state the event is in
187 uint64_t seq = 0; ///< a unique value set by the OpTracker
188
189 uint32_t warn_interval_multiplier = 1; //< limits output of a given op warning
190
191 enum {
192 STATE_UNTRACKED = 0,
193 STATE_LIVE,
194 STATE_HISTORY
195 };
196 atomic<int> state = {STATE_UNTRACKED};
197
198 mutable string desc_str; ///< protected by lock
199 mutable const char *desc = nullptr; ///< readable without lock
200 mutable atomic<bool> want_new_desc = {false};
201
202 TrackedOp(OpTracker *_tracker, const utime_t& initiated) :
203 tracker(_tracker),
204 initiated_at(initiated)
205 {
206 events.reserve(OPTRACKER_PREALLOC_EVENTS);
207 }
208
209 /// output any type-specific data you want to get when dump() is called
210 virtual void _dump(Formatter *f) const {}
211 /// if you want something else to happen when events are marked, implement
212 virtual void _event_marked() {}
213 /// return a unique descriptor of the Op; eg the message it's attached to
214 virtual void _dump_op_descriptor_unlocked(ostream& stream) const = 0;
215 /// called when the last non-OpTracker reference is dropped
216 virtual void _unregistered() {}
217
218 virtual bool filter_out(const set<string>& filters) { return true; }
219
220 public:
221 ZTracer::Trace osd_trace;
222 ZTracer::Trace pg_trace;
223 ZTracer::Trace store_trace;
224 ZTracer::Trace journal_trace;
225
226 virtual ~TrackedOp() {}
227
228 void get() {
229 ++nref;
230 }
231 void put() {
232 if (--nref == 0) {
233 switch (state.load()) {
234 case STATE_UNTRACKED:
235 _unregistered();
236 delete this;
237 break;
238
239 case STATE_LIVE:
240 mark_event("done");
241 tracker->unregister_inflight_op(this);
242 break;
243
244 case STATE_HISTORY:
245 delete this;
246 break;
247
248 default:
249 ceph_abort();
250 }
251 }
252 }
253
254 const char *get_desc() const {
255 if (!desc || want_new_desc.load()) {
256 Mutex::Locker l(lock);
257 _gen_desc();
258 }
259 return desc;
260 }
261 private:
262 void _gen_desc() const {
263 ostringstream ss;
264 _dump_op_descriptor_unlocked(ss);
265 desc_str = ss.str();
266 desc = desc_str.c_str();
267 want_new_desc = false;
268 }
269 public:
270 void reset_desc() {
271 want_new_desc = true;
272 }
273
274 const utime_t& get_initiated() const {
275 return initiated_at;
276 }
277
278 double get_duration() const {
279 Mutex::Locker l(lock);
280 if (!events.empty() && events.rbegin()->compare("done") == 0)
281 return events.rbegin()->stamp - get_initiated();
282 else
283 return ceph_clock_now() - get_initiated();
284 }
285
286 void mark_event_string(const string &event,
287 utime_t stamp=ceph_clock_now());
288 void mark_event(const char *event,
289 utime_t stamp=ceph_clock_now());
290
291 virtual const char *state_string() const {
292 Mutex::Locker l(lock);
293 return events.rbegin()->c_str();
294 }
295
296 void dump(utime_t now, Formatter *f) const;
297
298 void tracking_start() {
299 if (tracker->register_inflight_op(this)) {
300 events.push_back(Event(initiated_at, "initiated"));
301 state = STATE_LIVE;
302 }
303 }
304
305 // ref counting via intrusive_ptr, with special behavior on final
306 // put for historical op tracking
307 friend void intrusive_ptr_add_ref(TrackedOp *o) {
308 o->get();
309 }
310 friend void intrusive_ptr_release(TrackedOp *o) {
311 o->put();
312 }
313 };
314
315
316 #endif