]> git.proxmox.com Git - ceph.git/blob - ceph/src/common/TrackedOp.h
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / common / TrackedOp.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2012 New Dream Network/Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 */
13
14 #ifndef TRACKEDREQUEST_H_
15 #define TRACKEDREQUEST_H_
16
17 #include <atomic>
18 #include "common/histogram.h"
19 #include "common/RWLock.h"
20 #include "common/Thread.h"
21 #include "common/Clock.h"
22 #include "common/ceph_mutex.h"
23 #include "include/spinlock.h"
24 #include "msg/Message.h"
25
26 #define OPTRACKER_PREALLOC_EVENTS 20
27
28 class TrackedOp;
29 class OpHistory;
30
31 typedef boost::intrusive_ptr<TrackedOp> TrackedOpRef;
32
33 class OpHistoryServiceThread : public Thread
34 {
35 private:
36 list<pair<utime_t, TrackedOpRef>> _external_queue;
37 OpHistory* _ophistory;
38 mutable ceph::spinlock queue_spinlock;
39 bool _break_thread;
40 public:
41 explicit OpHistoryServiceThread(OpHistory* parent)
42 : _ophistory(parent),
43 _break_thread(false) { }
44
45 void break_thread();
46 void insert_op(const utime_t& now, TrackedOpRef op) {
47 queue_spinlock.lock();
48 _external_queue.emplace_back(now, op);
49 queue_spinlock.unlock();
50 }
51
52 void *entry() override;
53 };
54
55
56 class OpHistory {
57 set<pair<utime_t, TrackedOpRef> > arrived;
58 set<pair<double, TrackedOpRef> > duration;
59 set<pair<utime_t, TrackedOpRef> > slow_op;
60 ceph::mutex ops_history_lock = ceph::make_mutex("OpHistory::ops_history_lock");
61 void cleanup(utime_t now);
62 uint32_t history_size;
63 uint32_t history_duration;
64 uint32_t history_slow_op_size;
65 uint32_t history_slow_op_threshold;
66 std::atomic_bool shutdown;
67 OpHistoryServiceThread opsvc;
68 friend class OpHistoryServiceThread;
69
70 public:
71 OpHistory()
72 : history_size(0), history_duration(0),
73 history_slow_op_size(0), history_slow_op_threshold(0),
74 shutdown(false), opsvc(this) {
75 opsvc.create("OpHistorySvc");
76 }
77 ~OpHistory() {
78 ceph_assert(arrived.empty());
79 ceph_assert(duration.empty());
80 ceph_assert(slow_op.empty());
81 }
82 void insert(const utime_t& now, TrackedOpRef op)
83 {
84 if (shutdown)
85 return;
86
87 opsvc.insert_op(now, op);
88 }
89
90 void _insert_delayed(const utime_t& now, TrackedOpRef op);
91 void dump_ops(utime_t now, Formatter *f, set<string> filters = {""}, bool by_duration=false);
92 void dump_slow_ops(utime_t now, Formatter *f, set<string> filters = {""});
93 void on_shutdown();
94 void set_size_and_duration(uint32_t new_size, uint32_t new_duration) {
95 history_size = new_size;
96 history_duration = new_duration;
97 }
98 void set_slow_op_size_and_threshold(uint32_t new_size, uint32_t new_threshold) {
99 history_slow_op_size = new_size;
100 history_slow_op_threshold = new_threshold;
101 }
102 };
103
104 struct ShardedTrackingData;
105 class OpTracker {
106 friend class OpHistory;
107 std::atomic<int64_t> seq = { 0 };
108 vector<ShardedTrackingData*> sharded_in_flight_list;
109 OpHistory history;
110 uint32_t num_optracker_shards;
111 float complaint_time;
112 int log_threshold;
113 std::atomic<bool> tracking_enabled;
114 RWLock lock;
115
116 public:
117 CephContext *cct;
118 OpTracker(CephContext *cct_, bool tracking, uint32_t num_shards);
119
120 void set_complaint_and_threshold(float time, int threshold) {
121 complaint_time = time;
122 log_threshold = threshold;
123 }
124 void set_history_size_and_duration(uint32_t new_size, uint32_t new_duration) {
125 history.set_size_and_duration(new_size, new_duration);
126 }
127 void set_history_slow_op_size_and_threshold(uint32_t new_size, uint32_t new_threshold) {
128 history.set_slow_op_size_and_threshold(new_size, new_threshold);
129 }
130 bool is_tracking() const {
131 return tracking_enabled;
132 }
133 void set_tracking(bool enable) {
134 tracking_enabled = enable;
135 }
136 bool dump_ops_in_flight(Formatter *f, bool print_only_blocked = false, set<string> filters = {""});
137 bool dump_historic_ops(Formatter *f, bool by_duration = false, set<string> filters = {""});
138 bool dump_historic_slow_ops(Formatter *f, set<string> filters = {""});
139 bool register_inflight_op(TrackedOp *i);
140 void unregister_inflight_op(TrackedOp *i);
141 void record_history_op(TrackedOpRef&& i);
142
143 void get_age_ms_histogram(pow2_hist_t *h);
144
145 /**
146 * walk through ops in flight
147 *
148 * @param oldest_sec the amount of time since the oldest op was initiated
149 * @param check a function consuming tracked ops, the function returns
150 * false if it don't want to be fed with more ops
151 * @return True if there are any Ops to warn on, false otherwise
152 */
153 bool visit_ops_in_flight(utime_t* oldest_secs,
154 std::function<bool(TrackedOp&)>&& visit);
155 /**
156 * walk through slow ops in flight
157 *
158 * @param[out] oldest_sec the amount of time since the oldest op was initiated
159 * @param[out] num_slow_ops total number of slow ops
160 * @param[out] num_warned_ops total number of warned ops
161 * @param on_warn a function consuming tracked ops, the function returns
162 * false if it don't want to be fed with more ops
163 * @return True if there are any Ops to warn on, false otherwise
164 */
165 bool with_slow_ops_in_flight(utime_t* oldest_secs,
166 int* num_slow_ops,
167 int* num_warned_ops,
168 std::function<void(TrackedOp&)>&& on_warn);
169 /**
170 * Look for Ops which are too old, and insert warning
171 * strings for each Op that is too old.
172 *
173 * @param summary[out] a string summarizing slow Ops.
174 * @param warning_strings[out] A vector<string> reference which is filled
175 * with a warning string for each old Op.
176 * @param slow[out] total number of slow ops
177 * @return True if there are any Ops to warn on, false otherwise.
178 */
179 bool check_ops_in_flight(std::string* summary,
180 std::vector<string> &warning_strings,
181 int* slow = nullptr);
182
183 void on_shutdown() {
184 history.on_shutdown();
185 }
186 ~OpTracker();
187
188 template <typename T, typename U>
189 typename T::Ref create_request(U params)
190 {
191 typename T::Ref retval(new T(params, this));
192 retval->tracking_start();
193
194 if (is_tracking()) {
195 retval->mark_event("header_read", params->get_recv_stamp());
196 retval->mark_event("throttled", params->get_throttle_stamp());
197 retval->mark_event("all_read", params->get_recv_complete_stamp());
198 retval->mark_event("dispatched", params->get_dispatch_stamp());
199 }
200
201 return retval;
202 }
203 };
204
205
206 class TrackedOp : public boost::intrusive::list_base_hook<> {
207 private:
208 friend class OpHistory;
209 friend class OpTracker;
210
211 boost::intrusive::list_member_hook<> tracker_item;
212
213 public:
214 typedef boost::intrusive::list<
215 TrackedOp,
216 boost::intrusive::member_hook<
217 TrackedOp,
218 boost::intrusive::list_member_hook<>,
219 &TrackedOp::tracker_item> > tracked_op_list_t;
220
221 // for use when clearing lists. e.g.,
222 // ls.clear_and_dispose(TrackedOp::Putter());
223 struct Putter {
224 void operator()(TrackedOp *op) {
225 op->put();
226 }
227 };
228
229 protected:
230 OpTracker *tracker; ///< the tracker we are associated with
231 std::atomic_int nref = {0}; ///< ref count
232
233 utime_t initiated_at;
234
235 struct Event {
236 utime_t stamp;
237 std::string str;
238
239 Event(utime_t t, std::string_view s) : stamp(t), str(s) {}
240
241 int compare(const char *s) const {
242 return str.compare(s);
243 }
244
245 const char *c_str() const {
246 return str.c_str();
247 }
248
249 void dump(Formatter *f) const {
250 f->dump_stream("time") << stamp;
251 f->dump_string("event", str);
252 }
253 };
254
255 vector<Event> events; ///< list of events and their times
256 mutable ceph::mutex lock = ceph::make_mutex("TrackedOp::lock"); ///< to protect the events list
257 uint64_t seq = 0; ///< a unique value set by the OpTracker
258
259 uint32_t warn_interval_multiplier = 1; //< limits output of a given op warning
260
261 enum {
262 STATE_UNTRACKED = 0,
263 STATE_LIVE,
264 STATE_HISTORY
265 };
266 atomic<int> state = {STATE_UNTRACKED};
267
268 mutable string desc_str; ///< protected by lock
269 mutable const char *desc = nullptr; ///< readable without lock
270 mutable atomic<bool> want_new_desc = {false};
271
272 TrackedOp(OpTracker *_tracker, const utime_t& initiated) :
273 tracker(_tracker),
274 initiated_at(initiated)
275 {
276 events.reserve(OPTRACKER_PREALLOC_EVENTS);
277 }
278
279 /// output any type-specific data you want to get when dump() is called
280 virtual void _dump(Formatter *f) const {}
281 /// if you want something else to happen when events are marked, implement
282 virtual void _event_marked() {}
283 /// return a unique descriptor of the Op; eg the message it's attached to
284 virtual void _dump_op_descriptor_unlocked(ostream& stream) const = 0;
285 /// called when the last non-OpTracker reference is dropped
286 virtual void _unregistered() {}
287
288 virtual bool filter_out(const set<string>& filters) { return true; }
289
290 public:
291 ZTracer::Trace osd_trace;
292 ZTracer::Trace pg_trace;
293 ZTracer::Trace store_trace;
294 ZTracer::Trace journal_trace;
295
296 virtual ~TrackedOp() {}
297
298 void get() {
299 ++nref;
300 }
301 void put() {
302 again:
303 auto nref_snap = nref.load();
304 if (nref_snap == 1) {
305 switch (state.load()) {
306 case STATE_UNTRACKED:
307 _unregistered();
308 delete this;
309 break;
310
311 case STATE_LIVE:
312 mark_event("done");
313 tracker->unregister_inflight_op(this);
314 _unregistered();
315 if (!tracker->is_tracking()) {
316 delete this;
317 } else {
318 state = TrackedOp::STATE_HISTORY;
319 tracker->record_history_op(
320 TrackedOpRef(this, /* add_ref = */ false));
321 }
322 break;
323
324 case STATE_HISTORY:
325 delete this;
326 break;
327
328 default:
329 ceph_abort();
330 }
331 } else if (!nref.compare_exchange_weak(nref_snap, nref_snap - 1)) {
332 goto again;
333 }
334 }
335
336 const char *get_desc() const {
337 if (!desc || want_new_desc.load()) {
338 std::lock_guard l(lock);
339 _gen_desc();
340 }
341 return desc;
342 }
343 private:
344 void _gen_desc() const {
345 ostringstream ss;
346 _dump_op_descriptor_unlocked(ss);
347 desc_str = ss.str();
348 desc = desc_str.c_str();
349 want_new_desc = false;
350 }
351 public:
352 void reset_desc() {
353 want_new_desc = true;
354 }
355
356 const utime_t& get_initiated() const {
357 return initiated_at;
358 }
359
360 double get_duration() const {
361 std::lock_guard l(lock);
362 if (!events.empty() && events.rbegin()->compare("done") == 0)
363 return events.rbegin()->stamp - get_initiated();
364 else
365 return ceph_clock_now() - get_initiated();
366 }
367
368 void mark_event(std::string_view event, utime_t stamp=ceph_clock_now());
369
370 void mark_nowarn() {
371 warn_interval_multiplier = 0;
372 }
373
374 virtual std::string_view state_string() const {
375 std::lock_guard l(lock);
376 return events.empty() ? std::string_view() : std::string_view(events.rbegin()->str);
377 }
378
379 void dump(utime_t now, Formatter *f) const;
380
381 void tracking_start() {
382 if (tracker->register_inflight_op(this)) {
383 events.emplace_back(initiated_at, "initiated");
384 state = STATE_LIVE;
385 }
386 }
387
388 // ref counting via intrusive_ptr, with special behavior on final
389 // put for historical op tracking
390 friend void intrusive_ptr_add_ref(TrackedOp *o) {
391 o->get();
392 }
393 friend void intrusive_ptr_release(TrackedOp *o) {
394 o->put();
395 }
396 };
397
398
399 #endif