]> git.proxmox.com Git - ceph.git/blame - ceph/src/common/TrackedOp.h
bump version to 18.2.4-pve3
[ceph.git] / ceph / src / common / TrackedOp.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2012 New Dream Network/Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 */
13
14#ifndef TRACKEDREQUEST_H_
15#define TRACKEDREQUEST_H_
7c673cae 16
31f18b77 17#include <atomic>
aee94f69 18#include "common/StackStringStream.h"
9f95a23c 19#include "common/ceph_mutex.h"
7c673cae 20#include "common/histogram.h"
11fdf7f2
TL
21#include "common/Thread.h"
22#include "common/Clock.h"
11fdf7f2
TL
23#include "include/spinlock.h"
24#include "msg/Message.h"
7c673cae
FG
25
26#define OPTRACKER_PREALLOC_EVENTS 20
27
28class TrackedOp;
11fdf7f2
TL
29class OpHistory;
30
7c673cae
FG
31typedef boost::intrusive_ptr<TrackedOp> TrackedOpRef;
32
11fdf7f2
TL
33class OpHistoryServiceThread : public Thread
34{
35private:
9f95a23c 36 std::list<std::pair<utime_t, TrackedOpRef>> _external_queue;
11fdf7f2
TL
37 OpHistory* _ophistory;
38 mutable ceph::spinlock queue_spinlock;
39 bool _break_thread;
40public:
41 explicit OpHistoryServiceThread(OpHistory* parent)
42 : _ophistory(parent),
43 _break_thread(false) { }
44
45 void break_thread();
46 void insert_op(const utime_t& now, TrackedOpRef op) {
47 queue_spinlock.lock();
48 _external_queue.emplace_back(now, op);
49 queue_spinlock.unlock();
50 }
51
52 void *entry() override;
53};
54
1e59de90
TL
55enum {
56 l_osd_slow_op_first = 1000,
57 l_osd_slow_op_count,
58 l_osd_slow_op_last,
59};
11fdf7f2 60
7c673cae 61class OpHistory {
1e59de90 62 CephContext* cct = nullptr;
9f95a23c
TL
63 std::set<std::pair<utime_t, TrackedOpRef> > arrived;
64 std::set<std::pair<double, TrackedOpRef> > duration;
65 std::set<std::pair<utime_t, TrackedOpRef> > slow_op;
11fdf7f2 66 ceph::mutex ops_history_lock = ceph::make_mutex("OpHistory::ops_history_lock");
7c673cae 67 void cleanup(utime_t now);
92f5a8d4
TL
68 std::atomic_size_t history_size{0};
69 std::atomic_uint32_t history_duration{0};
70 std::atomic_size_t history_slow_op_size{0};
71 std::atomic_uint32_t history_slow_op_threshold{0};
72 std::atomic_bool shutdown{false};
11fdf7f2
TL
73 OpHistoryServiceThread opsvc;
74 friend class OpHistoryServiceThread;
1e59de90 75 std::unique_ptr<PerfCounters> logger;
7c673cae
FG
76
77public:
1e59de90
TL
78 OpHistory(CephContext *c) : cct(c), opsvc(this) {
79 PerfCountersBuilder b(cct, "osd-slow-ops",
80 l_osd_slow_op_first, l_osd_slow_op_last);
81 b.add_u64_counter(l_osd_slow_op_count, "slow_ops_count",
82 "Number of operations taking over ten second");
83
84 logger.reset(b.create_perf_counters());
85 cct->get_perfcounters_collection()->add(logger.get());
86
11fdf7f2
TL
87 opsvc.create("OpHistorySvc");
88 }
7c673cae 89 ~OpHistory() {
11fdf7f2
TL
90 ceph_assert(arrived.empty());
91 ceph_assert(duration.empty());
92 ceph_assert(slow_op.empty());
1e59de90
TL
93 if(logger) {
94 cct->get_perfcounters_collection()->remove(logger.get());
95 logger.reset();
96 }
11fdf7f2
TL
97 }
98 void insert(const utime_t& now, TrackedOpRef op)
99 {
100 if (shutdown)
101 return;
102
103 opsvc.insert_op(now, op);
7c673cae 104 }
11fdf7f2
TL
105
106 void _insert_delayed(const utime_t& now, TrackedOpRef op);
9f95a23c
TL
107 void dump_ops(utime_t now, ceph::Formatter *f, std::set<std::string> filters = {""}, bool by_duration=false);
108 void dump_slow_ops(utime_t now, ceph::Formatter *f, std::set<std::string> filters = {""});
7c673cae 109 void on_shutdown();
92f5a8d4 110 void set_size_and_duration(size_t new_size, uint32_t new_duration) {
7c673cae
FG
111 history_size = new_size;
112 history_duration = new_duration;
113 }
92f5a8d4 114 void set_slow_op_size_and_threshold(size_t new_size, uint32_t new_threshold) {
7c673cae
FG
115 history_slow_op_size = new_size;
116 history_slow_op_threshold = new_threshold;
117 }
118};
119
120struct ShardedTrackingData;
121class OpTracker {
122 friend class OpHistory;
31f18b77 123 std::atomic<int64_t> seq = { 0 };
9f95a23c 124 std::vector<ShardedTrackingData*> sharded_in_flight_list;
7c673cae 125 OpHistory history;
11fdf7f2 126 uint32_t num_optracker_shards;
7c673cae
FG
127 float complaint_time;
128 int log_threshold;
11fdf7f2 129 std::atomic<bool> tracking_enabled;
9f95a23c 130 ceph::shared_mutex lock = ceph::make_shared_mutex("OpTracker::lock");
7c673cae
FG
131
132public:
aee94f69
TL
133 using dumper = std::function<void(const TrackedOp&, Formatter*)>;
134
7c673cae
FG
135 CephContext *cct;
136 OpTracker(CephContext *cct_, bool tracking, uint32_t num_shards);
137
138 void set_complaint_and_threshold(float time, int threshold) {
139 complaint_time = time;
140 log_threshold = threshold;
141 }
142 void set_history_size_and_duration(uint32_t new_size, uint32_t new_duration) {
143 history.set_size_and_duration(new_size, new_duration);
144 }
145 void set_history_slow_op_size_and_threshold(uint32_t new_size, uint32_t new_threshold) {
146 history.set_slow_op_size_and_threshold(new_size, new_threshold);
147 }
11fdf7f2
TL
148 bool is_tracking() const {
149 return tracking_enabled;
150 }
7c673cae 151 void set_tracking(bool enable) {
7c673cae
FG
152 tracking_enabled = enable;
153 }
aee94f69
TL
154 static void default_dumper(const TrackedOp& op, Formatter* f);
155 bool dump_ops_in_flight(ceph::Formatter *f, bool print_only_blocked = false, std::set<std::string> filters = {""}, bool count_only = false, dumper lambda = default_dumper);
9f95a23c
TL
156 bool dump_historic_ops(ceph::Formatter *f, bool by_duration = false, std::set<std::string> filters = {""});
157 bool dump_historic_slow_ops(ceph::Formatter *f, std::set<std::string> filters = {""});
7c673cae
FG
158 bool register_inflight_op(TrackedOp *i);
159 void unregister_inflight_op(TrackedOp *i);
11fdf7f2 160 void record_history_op(TrackedOpRef&& i);
7c673cae
FG
161
162 void get_age_ms_histogram(pow2_hist_t *h);
163
11fdf7f2
TL
164 /**
165 * walk through ops in flight
166 *
167 * @param oldest_sec the amount of time since the oldest op was initiated
168 * @param check a function consuming tracked ops, the function returns
169 * false if it don't want to be fed with more ops
170 * @return True if there are any Ops to warn on, false otherwise
171 */
172 bool visit_ops_in_flight(utime_t* oldest_secs,
173 std::function<bool(TrackedOp&)>&& visit);
174 /**
175 * walk through slow ops in flight
176 *
177 * @param[out] oldest_sec the amount of time since the oldest op was initiated
178 * @param[out] num_slow_ops total number of slow ops
179 * @param[out] num_warned_ops total number of warned ops
180 * @param on_warn a function consuming tracked ops, the function returns
181 * false if it don't want to be fed with more ops
182 * @return True if there are any Ops to warn on, false otherwise
183 */
184 bool with_slow_ops_in_flight(utime_t* oldest_secs,
185 int* num_slow_ops,
186 int* num_warned_ops,
187 std::function<void(TrackedOp&)>&& on_warn);
7c673cae
FG
188 /**
189 * Look for Ops which are too old, and insert warning
190 * strings for each Op that is too old.
191 *
9f95a23c
TL
192 * @param summary[out] a std::string summarizing slow Ops.
193 * @param warning_strings[out] A std::vector<std::string> reference which is filled
194 * with a warning std::string for each old Op.
11fdf7f2 195 * @param slow[out] total number of slow ops
7c673cae
FG
196 * @return True if there are any Ops to warn on, false otherwise.
197 */
11fdf7f2 198 bool check_ops_in_flight(std::string* summary,
9f95a23c 199 std::vector<std::string> &warning_strings,
11fdf7f2 200 int* slow = nullptr);
7c673cae
FG
201
202 void on_shutdown() {
203 history.on_shutdown();
204 }
205 ~OpTracker();
206
207 template <typename T, typename U>
208 typename T::Ref create_request(U params)
209 {
210 typename T::Ref retval(new T(params, this));
211 retval->tracking_start();
11fdf7f2 212 if (is_tracking()) {
11fdf7f2 213 retval->mark_event("throttled", params->get_throttle_stamp());
9f95a23c 214 retval->mark_event("header_read", params->get_recv_stamp());
11fdf7f2
TL
215 retval->mark_event("all_read", params->get_recv_complete_stamp());
216 retval->mark_event("dispatched", params->get_dispatch_stamp());
217 }
218
7c673cae
FG
219 return retval;
220 }
221};
222
7c673cae
FG
223class TrackedOp : public boost::intrusive::list_base_hook<> {
224private:
225 friend class OpHistory;
226 friend class OpTracker;
227
228 boost::intrusive::list_member_hook<> tracker_item;
229
230public:
231 typedef boost::intrusive::list<
232 TrackedOp,
233 boost::intrusive::member_hook<
234 TrackedOp,
235 boost::intrusive::list_member_hook<>,
236 &TrackedOp::tracker_item> > tracked_op_list_t;
237
238 // for use when clearing lists. e.g.,
239 // ls.clear_and_dispose(TrackedOp::Putter());
240 struct Putter {
241 void operator()(TrackedOp *op) {
242 op->put();
243 }
244 };
245
246protected:
247 OpTracker *tracker; ///< the tracker we are associated with
248 std::atomic_int nref = {0}; ///< ref count
249
250 utime_t initiated_at;
251
252 struct Event {
253 utime_t stamp;
11fdf7f2 254 std::string str;
7c673cae 255
11fdf7f2 256 Event(utime_t t, std::string_view s) : stamp(t), str(s) {}
7c673cae
FG
257
258 int compare(const char *s) const {
11fdf7f2 259 return str.compare(s);
7c673cae
FG
260 }
261
262 const char *c_str() const {
11fdf7f2 263 return str.c_str();
7c673cae
FG
264 }
265
9f95a23c 266 void dump(ceph::Formatter *f) const {
7c673cae 267 f->dump_stream("time") << stamp;
11fdf7f2 268 f->dump_string("event", str);
7c673cae
FG
269 }
270 };
271
9f95a23c 272 std::vector<Event> events; ///< std::list of events and their times
11fdf7f2 273 mutable ceph::mutex lock = ceph::make_mutex("TrackedOp::lock"); ///< to protect the events list
9f95a23c 274 uint64_t seq = 0; ///< a unique value std::set by the OpTracker
7c673cae
FG
275
276 uint32_t warn_interval_multiplier = 1; //< limits output of a given op warning
277
278 enum {
279 STATE_UNTRACKED = 0,
280 STATE_LIVE,
281 STATE_HISTORY
282 };
9f95a23c 283 std::atomic<int> state = {STATE_UNTRACKED};
7c673cae 284
7c673cae
FG
285 TrackedOp(OpTracker *_tracker, const utime_t& initiated) :
286 tracker(_tracker),
287 initiated_at(initiated)
288 {
289 events.reserve(OPTRACKER_PREALLOC_EVENTS);
290 }
291
292 /// output any type-specific data you want to get when dump() is called
9f95a23c 293 virtual void _dump(ceph::Formatter *f) const {}
7c673cae
FG
294 /// if you want something else to happen when events are marked, implement
295 virtual void _event_marked() {}
296 /// return a unique descriptor of the Op; eg the message it's attached to
aee94f69 297 virtual void _dump_op_descriptor(std::ostream& stream) const = 0;
7c673cae 298 /// called when the last non-OpTracker reference is dropped
c07f9fc5
FG
299 virtual void _unregistered() {}
300
9f95a23c 301 virtual bool filter_out(const std::set<std::string>& filters) { return true; }
7c673cae
FG
302
303public:
304 ZTracer::Trace osd_trace;
305 ZTracer::Trace pg_trace;
306 ZTracer::Trace store_trace;
307 ZTracer::Trace journal_trace;
308
309 virtual ~TrackedOp() {}
310
311 void get() {
312 ++nref;
313 }
314 void put() {
11fdf7f2
TL
315 again:
316 auto nref_snap = nref.load();
317 if (nref_snap == 1) {
7c673cae
FG
318 switch (state.load()) {
319 case STATE_UNTRACKED:
320 _unregistered();
321 delete this;
322 break;
323
324 case STATE_LIVE:
325 mark_event("done");
326 tracker->unregister_inflight_op(this);
11fdf7f2
TL
327 _unregistered();
328 if (!tracker->is_tracking()) {
329 delete this;
330 } else {
331 state = TrackedOp::STATE_HISTORY;
332 tracker->record_history_op(
333 TrackedOpRef(this, /* add_ref = */ false));
334 }
7c673cae
FG
335 break;
336
337 case STATE_HISTORY:
338 delete this;
339 break;
340
341 default:
342 ceph_abort();
343 }
11fdf7f2
TL
344 } else if (!nref.compare_exchange_weak(nref_snap, nref_snap - 1)) {
345 goto again;
7c673cae
FG
346 }
347 }
348
aee94f69
TL
349 std::string get_desc() const {
350 std::string ret;
351 {
352 std::lock_guard l(desc_lock);
353 ret = desc;
354 }
355 if (ret.size() == 0 || want_new_desc.load()) {
356 CachedStackStringStream css;
357 std::scoped_lock l(lock, desc_lock);
358 if (desc.size() && !want_new_desc.load()) {
359 return desc;
360 }
361 _dump_op_descriptor(*css);
362 desc = css->strv();
363 want_new_desc = false;
364 return desc;
365 } else {
366 return ret;
7c673cae 367 }
7c673cae 368 }
aee94f69 369
7c673cae 370private:
aee94f69
TL
371 mutable ceph::mutex desc_lock = ceph::make_mutex("OpTracker::desc_lock");
372 mutable std::string desc; ///< protected by desc_lock
373 mutable std::atomic<bool> want_new_desc = {false};
374
7c673cae
FG
375public:
376 void reset_desc() {
377 want_new_desc = true;
378 }
379
aee94f69
TL
380 void dump_type(Formatter* f) const {
381 return _dump(f);
382 }
383
7c673cae
FG
384 const utime_t& get_initiated() const {
385 return initiated_at;
386 }
387
388 double get_duration() const {
11fdf7f2 389 std::lock_guard l(lock);
7c673cae
FG
390 if (!events.empty() && events.rbegin()->compare("done") == 0)
391 return events.rbegin()->stamp - get_initiated();
392 else
393 return ceph_clock_now() - get_initiated();
394 }
395
11fdf7f2 396 void mark_event(std::string_view event, utime_t stamp=ceph_clock_now());
7c673cae 397
1adf2230
AA
398 void mark_nowarn() {
399 warn_interval_multiplier = 0;
400 }
401
aee94f69 402 std::string state_string() const {
11fdf7f2 403 std::lock_guard l(lock);
aee94f69 404 return _get_state_string();
7c673cae
FG
405 }
406
aee94f69 407 void dump(utime_t now, ceph::Formatter *f, OpTracker::dumper lambda) const;
7c673cae
FG
408
409 void tracking_start() {
410 if (tracker->register_inflight_op(this)) {
11fdf7f2 411 events.emplace_back(initiated_at, "initiated");
7c673cae
FG
412 state = STATE_LIVE;
413 }
414 }
415
416 // ref counting via intrusive_ptr, with special behavior on final
417 // put for historical op tracking
418 friend void intrusive_ptr_add_ref(TrackedOp *o) {
419 o->get();
420 }
421 friend void intrusive_ptr_release(TrackedOp *o) {
422 o->put();
423 }
aee94f69
TL
424
425protected:
426 virtual std::string _get_state_string() const {
427 return events.empty() ? std::string() : std::string(events.rbegin()->str);
428 }
7c673cae
FG
429};
430
aee94f69
TL
431inline void OpTracker::default_dumper(const TrackedOp& op, Formatter* f) {
432 op._dump(f);
433}
7c673cae
FG
434
435#endif