1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2012 New Dream Network/Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
14 #ifndef TRACKEDREQUEST_H_
15 #define TRACKEDREQUEST_H_
18 #include "common/histogram.h"
19 #include "common/RWLock.h"
20 #include "common/Thread.h"
21 #include "common/Clock.h"
22 #include "common/ceph_mutex.h"
23 #include "include/spinlock.h"
24 #include "msg/Message.h"
26 #define OPTRACKER_PREALLOC_EVENTS 20
31 typedef boost::intrusive_ptr
<TrackedOp
> TrackedOpRef
;
33 class OpHistoryServiceThread
: public Thread
36 list
<pair
<utime_t
, TrackedOpRef
>> _external_queue
;
37 OpHistory
* _ophistory
;
38 mutable ceph::spinlock queue_spinlock
;
41 explicit OpHistoryServiceThread(OpHistory
* parent
)
43 _break_thread(false) { }
46 void insert_op(const utime_t
& now
, TrackedOpRef op
) {
47 queue_spinlock
.lock();
48 _external_queue
.emplace_back(now
, op
);
49 queue_spinlock
.unlock();
52 void *entry() override
;
57 set
<pair
<utime_t
, TrackedOpRef
> > arrived
;
58 set
<pair
<double, TrackedOpRef
> > duration
;
59 set
<pair
<utime_t
, TrackedOpRef
> > slow_op
;
60 ceph::mutex ops_history_lock
= ceph::make_mutex("OpHistory::ops_history_lock");
61 void cleanup(utime_t now
);
62 uint32_t history_size
;
63 uint32_t history_duration
;
64 uint32_t history_slow_op_size
;
65 uint32_t history_slow_op_threshold
;
66 std::atomic_bool shutdown
;
67 OpHistoryServiceThread opsvc
;
68 friend class OpHistoryServiceThread
;
72 : history_size(0), history_duration(0),
73 history_slow_op_size(0), history_slow_op_threshold(0),
74 shutdown(false), opsvc(this) {
75 opsvc
.create("OpHistorySvc");
78 ceph_assert(arrived
.empty());
79 ceph_assert(duration
.empty());
80 ceph_assert(slow_op
.empty());
82 void insert(const utime_t
& now
, TrackedOpRef op
)
87 opsvc
.insert_op(now
, op
);
90 void _insert_delayed(const utime_t
& now
, TrackedOpRef op
);
91 void dump_ops(utime_t now
, Formatter
*f
, set
<string
> filters
= {""}, bool by_duration
=false);
92 void dump_slow_ops(utime_t now
, Formatter
*f
, set
<string
> filters
= {""});
94 void set_size_and_duration(uint32_t new_size
, uint32_t new_duration
) {
95 history_size
= new_size
;
96 history_duration
= new_duration
;
98 void set_slow_op_size_and_threshold(uint32_t new_size
, uint32_t new_threshold
) {
99 history_slow_op_size
= new_size
;
100 history_slow_op_threshold
= new_threshold
;
104 struct ShardedTrackingData
;
106 friend class OpHistory
;
107 std::atomic
<int64_t> seq
= { 0 };
108 vector
<ShardedTrackingData
*> sharded_in_flight_list
;
110 uint32_t num_optracker_shards
;
111 float complaint_time
;
113 std::atomic
<bool> tracking_enabled
;
118 OpTracker(CephContext
*cct_
, bool tracking
, uint32_t num_shards
);
120 void set_complaint_and_threshold(float time
, int threshold
) {
121 complaint_time
= time
;
122 log_threshold
= threshold
;
124 void set_history_size_and_duration(uint32_t new_size
, uint32_t new_duration
) {
125 history
.set_size_and_duration(new_size
, new_duration
);
127 void set_history_slow_op_size_and_threshold(uint32_t new_size
, uint32_t new_threshold
) {
128 history
.set_slow_op_size_and_threshold(new_size
, new_threshold
);
130 bool is_tracking() const {
131 return tracking_enabled
;
133 void set_tracking(bool enable
) {
134 tracking_enabled
= enable
;
136 bool dump_ops_in_flight(Formatter
*f
, bool print_only_blocked
= false, set
<string
> filters
= {""});
137 bool dump_historic_ops(Formatter
*f
, bool by_duration
= false, set
<string
> filters
= {""});
138 bool dump_historic_slow_ops(Formatter
*f
, set
<string
> filters
= {""});
139 bool register_inflight_op(TrackedOp
*i
);
140 void unregister_inflight_op(TrackedOp
*i
);
141 void record_history_op(TrackedOpRef
&& i
);
143 void get_age_ms_histogram(pow2_hist_t
*h
);
146 * walk through ops in flight
148 * @param oldest_sec the amount of time since the oldest op was initiated
149 * @param check a function consuming tracked ops, the function returns
150 * false if it don't want to be fed with more ops
151 * @return True if there are any Ops to warn on, false otherwise
153 bool visit_ops_in_flight(utime_t
* oldest_secs
,
154 std::function
<bool(TrackedOp
&)>&& visit
);
156 * walk through slow ops in flight
158 * @param[out] oldest_sec the amount of time since the oldest op was initiated
159 * @param[out] num_slow_ops total number of slow ops
160 * @param[out] num_warned_ops total number of warned ops
161 * @param on_warn a function consuming tracked ops, the function returns
162 * false if it don't want to be fed with more ops
163 * @return True if there are any Ops to warn on, false otherwise
165 bool with_slow_ops_in_flight(utime_t
* oldest_secs
,
168 std::function
<void(TrackedOp
&)>&& on_warn
);
170 * Look for Ops which are too old, and insert warning
171 * strings for each Op that is too old.
173 * @param summary[out] a string summarizing slow Ops.
174 * @param warning_strings[out] A vector<string> reference which is filled
175 * with a warning string for each old Op.
176 * @param slow[out] total number of slow ops
177 * @return True if there are any Ops to warn on, false otherwise.
179 bool check_ops_in_flight(std::string
* summary
,
180 std::vector
<string
> &warning_strings
,
181 int* slow
= nullptr);
184 history
.on_shutdown();
188 template <typename T
, typename U
>
189 typename
T::Ref
create_request(U params
)
191 typename
T::Ref
retval(new T(params
, this));
192 retval
->tracking_start();
195 retval
->mark_event("header_read", params
->get_recv_stamp());
196 retval
->mark_event("throttled", params
->get_throttle_stamp());
197 retval
->mark_event("all_read", params
->get_recv_complete_stamp());
198 retval
->mark_event("dispatched", params
->get_dispatch_stamp());
206 class TrackedOp
: public boost::intrusive::list_base_hook
<> {
208 friend class OpHistory
;
209 friend class OpTracker
;
211 boost::intrusive::list_member_hook
<> tracker_item
;
214 typedef boost::intrusive::list
<
216 boost::intrusive::member_hook
<
218 boost::intrusive::list_member_hook
<>,
219 &TrackedOp::tracker_item
> > tracked_op_list_t
;
221 // for use when clearing lists. e.g.,
222 // ls.clear_and_dispose(TrackedOp::Putter());
224 void operator()(TrackedOp
*op
) {
230 OpTracker
*tracker
; ///< the tracker we are associated with
231 std::atomic_int nref
= {0}; ///< ref count
233 utime_t initiated_at
;
239 Event(utime_t t
, std::string_view s
) : stamp(t
), str(s
) {}
241 int compare(const char *s
) const {
242 return str
.compare(s
);
245 const char *c_str() const {
249 void dump(Formatter
*f
) const {
250 f
->dump_stream("time") << stamp
;
251 f
->dump_string("event", str
);
255 vector
<Event
> events
; ///< list of events and their times
256 mutable ceph::mutex lock
= ceph::make_mutex("TrackedOp::lock"); ///< to protect the events list
257 uint64_t seq
= 0; ///< a unique value set by the OpTracker
259 uint32_t warn_interval_multiplier
= 1; //< limits output of a given op warning
266 atomic
<int> state
= {STATE_UNTRACKED
};
268 mutable string desc_str
; ///< protected by lock
269 mutable const char *desc
= nullptr; ///< readable without lock
270 mutable atomic
<bool> want_new_desc
= {false};
272 TrackedOp(OpTracker
*_tracker
, const utime_t
& initiated
) :
274 initiated_at(initiated
)
276 events
.reserve(OPTRACKER_PREALLOC_EVENTS
);
279 /// output any type-specific data you want to get when dump() is called
280 virtual void _dump(Formatter
*f
) const {}
281 /// if you want something else to happen when events are marked, implement
282 virtual void _event_marked() {}
283 /// return a unique descriptor of the Op; eg the message it's attached to
284 virtual void _dump_op_descriptor_unlocked(ostream
& stream
) const = 0;
285 /// called when the last non-OpTracker reference is dropped
286 virtual void _unregistered() {}
288 virtual bool filter_out(const set
<string
>& filters
) { return true; }
291 ZTracer::Trace osd_trace
;
292 ZTracer::Trace pg_trace
;
293 ZTracer::Trace store_trace
;
294 ZTracer::Trace journal_trace
;
296 virtual ~TrackedOp() {}
303 auto nref_snap
= nref
.load();
304 if (nref_snap
== 1) {
305 switch (state
.load()) {
306 case STATE_UNTRACKED
:
313 tracker
->unregister_inflight_op(this);
315 if (!tracker
->is_tracking()) {
318 state
= TrackedOp::STATE_HISTORY
;
319 tracker
->record_history_op(
320 TrackedOpRef(this, /* add_ref = */ false));
331 } else if (!nref
.compare_exchange_weak(nref_snap
, nref_snap
- 1)) {
336 const char *get_desc() const {
337 if (!desc
|| want_new_desc
.load()) {
338 std::lock_guard
l(lock
);
344 void _gen_desc() const {
346 _dump_op_descriptor_unlocked(ss
);
348 desc
= desc_str
.c_str();
349 want_new_desc
= false;
353 want_new_desc
= true;
356 const utime_t
& get_initiated() const {
360 double get_duration() const {
361 std::lock_guard
l(lock
);
362 if (!events
.empty() && events
.rbegin()->compare("done") == 0)
363 return events
.rbegin()->stamp
- get_initiated();
365 return ceph_clock_now() - get_initiated();
368 void mark_event(std::string_view event
, utime_t stamp
=ceph_clock_now());
371 warn_interval_multiplier
= 0;
374 virtual std::string_view
state_string() const {
375 std::lock_guard
l(lock
);
376 return events
.empty() ? std::string_view() : std::string_view(events
.rbegin()->str
);
379 void dump(utime_t now
, Formatter
*f
) const;
381 void tracking_start() {
382 if (tracker
->register_inflight_op(this)) {
383 events
.emplace_back(initiated_at
, "initiated");
388 // ref counting via intrusive_ptr, with special behavior on final
389 // put for historical op tracking
390 friend void intrusive_ptr_add_ref(TrackedOp
*o
) {
393 friend void intrusive_ptr_release(TrackedOp
*o
) {