1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2012 New Dream Network/Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
14 #ifndef TRACKEDREQUEST_H_
15 #define TRACKEDREQUEST_H_
18 #include "common/ceph_mutex.h"
19 #include "common/histogram.h"
20 #include "common/Thread.h"
21 #include "common/Clock.h"
22 #include "include/spinlock.h"
23 #include "msg/Message.h"
25 #define OPTRACKER_PREALLOC_EVENTS 20
30 typedef boost::intrusive_ptr
<TrackedOp
> TrackedOpRef
;
32 class OpHistoryServiceThread
: public Thread
35 std::list
<std::pair
<utime_t
, TrackedOpRef
>> _external_queue
;
36 OpHistory
* _ophistory
;
37 mutable ceph::spinlock queue_spinlock
;
40 explicit OpHistoryServiceThread(OpHistory
* parent
)
42 _break_thread(false) { }
45 void insert_op(const utime_t
& now
, TrackedOpRef op
) {
46 queue_spinlock
.lock();
47 _external_queue
.emplace_back(now
, op
);
48 queue_spinlock
.unlock();
51 void *entry() override
;
55 l_osd_slow_op_first
= 1000,
61 CephContext
* cct
= nullptr;
62 std::set
<std::pair
<utime_t
, TrackedOpRef
> > arrived
;
63 std::set
<std::pair
<double, TrackedOpRef
> > duration
;
64 std::set
<std::pair
<utime_t
, TrackedOpRef
> > slow_op
;
65 ceph::mutex ops_history_lock
= ceph::make_mutex("OpHistory::ops_history_lock");
66 void cleanup(utime_t now
);
67 std::atomic_size_t history_size
{0};
68 std::atomic_uint32_t history_duration
{0};
69 std::atomic_size_t history_slow_op_size
{0};
70 std::atomic_uint32_t history_slow_op_threshold
{0};
71 std::atomic_bool shutdown
{false};
72 OpHistoryServiceThread opsvc
;
73 friend class OpHistoryServiceThread
;
74 std::unique_ptr
<PerfCounters
> logger
;
77 OpHistory(CephContext
*c
) : cct(c
), opsvc(this) {
78 PerfCountersBuilder
b(cct
, "osd-slow-ops",
79 l_osd_slow_op_first
, l_osd_slow_op_last
);
80 b
.add_u64_counter(l_osd_slow_op_count
, "slow_ops_count",
81 "Number of operations taking over ten second");
83 logger
.reset(b
.create_perf_counters());
84 cct
->get_perfcounters_collection()->add(logger
.get());
86 opsvc
.create("OpHistorySvc");
89 ceph_assert(arrived
.empty());
90 ceph_assert(duration
.empty());
91 ceph_assert(slow_op
.empty());
93 cct
->get_perfcounters_collection()->remove(logger
.get());
97 void insert(const utime_t
& now
, TrackedOpRef op
)
102 opsvc
.insert_op(now
, op
);
105 void _insert_delayed(const utime_t
& now
, TrackedOpRef op
);
106 void dump_ops(utime_t now
, ceph::Formatter
*f
, std::set
<std::string
> filters
= {""}, bool by_duration
=false);
107 void dump_slow_ops(utime_t now
, ceph::Formatter
*f
, std::set
<std::string
> filters
= {""});
109 void set_size_and_duration(size_t new_size
, uint32_t new_duration
) {
110 history_size
= new_size
;
111 history_duration
= new_duration
;
113 void set_slow_op_size_and_threshold(size_t new_size
, uint32_t new_threshold
) {
114 history_slow_op_size
= new_size
;
115 history_slow_op_threshold
= new_threshold
;
119 struct ShardedTrackingData
;
121 friend class OpHistory
;
122 std::atomic
<int64_t> seq
= { 0 };
123 std::vector
<ShardedTrackingData
*> sharded_in_flight_list
;
125 uint32_t num_optracker_shards
;
126 float complaint_time
;
128 std::atomic
<bool> tracking_enabled
;
129 ceph::shared_mutex lock
= ceph::make_shared_mutex("OpTracker::lock");
133 OpTracker(CephContext
*cct_
, bool tracking
, uint32_t num_shards
);
135 void set_complaint_and_threshold(float time
, int threshold
) {
136 complaint_time
= time
;
137 log_threshold
= threshold
;
139 void set_history_size_and_duration(uint32_t new_size
, uint32_t new_duration
) {
140 history
.set_size_and_duration(new_size
, new_duration
);
142 void set_history_slow_op_size_and_threshold(uint32_t new_size
, uint32_t new_threshold
) {
143 history
.set_slow_op_size_and_threshold(new_size
, new_threshold
);
145 bool is_tracking() const {
146 return tracking_enabled
;
148 void set_tracking(bool enable
) {
149 tracking_enabled
= enable
;
151 bool dump_ops_in_flight(ceph::Formatter
*f
, bool print_only_blocked
= false, std::set
<std::string
> filters
= {""}, bool count_only
= false);
152 bool dump_historic_ops(ceph::Formatter
*f
, bool by_duration
= false, std::set
<std::string
> filters
= {""});
153 bool dump_historic_slow_ops(ceph::Formatter
*f
, std::set
<std::string
> filters
= {""});
154 bool register_inflight_op(TrackedOp
*i
);
155 void unregister_inflight_op(TrackedOp
*i
);
156 void record_history_op(TrackedOpRef
&& i
);
158 void get_age_ms_histogram(pow2_hist_t
*h
);
161 * walk through ops in flight
163 * @param oldest_sec the amount of time since the oldest op was initiated
164 * @param check a function consuming tracked ops, the function returns
165 * false if it don't want to be fed with more ops
166 * @return True if there are any Ops to warn on, false otherwise
168 bool visit_ops_in_flight(utime_t
* oldest_secs
,
169 std::function
<bool(TrackedOp
&)>&& visit
);
171 * walk through slow ops in flight
173 * @param[out] oldest_sec the amount of time since the oldest op was initiated
174 * @param[out] num_slow_ops total number of slow ops
175 * @param[out] num_warned_ops total number of warned ops
176 * @param on_warn a function consuming tracked ops, the function returns
177 * false if it don't want to be fed with more ops
178 * @return True if there are any Ops to warn on, false otherwise
180 bool with_slow_ops_in_flight(utime_t
* oldest_secs
,
183 std::function
<void(TrackedOp
&)>&& on_warn
);
185 * Look for Ops which are too old, and insert warning
186 * strings for each Op that is too old.
188 * @param summary[out] a std::string summarizing slow Ops.
189 * @param warning_strings[out] A std::vector<std::string> reference which is filled
190 * with a warning std::string for each old Op.
191 * @param slow[out] total number of slow ops
192 * @return True if there are any Ops to warn on, false otherwise.
194 bool check_ops_in_flight(std::string
* summary
,
195 std::vector
<std::string
> &warning_strings
,
196 int* slow
= nullptr);
199 history
.on_shutdown();
203 template <typename T
, typename U
>
204 typename
T::Ref
create_request(U params
)
206 typename
T::Ref
retval(new T(params
, this));
207 retval
->tracking_start();
209 retval
->mark_event("throttled", params
->get_throttle_stamp());
210 retval
->mark_event("header_read", params
->get_recv_stamp());
211 retval
->mark_event("all_read", params
->get_recv_complete_stamp());
212 retval
->mark_event("dispatched", params
->get_dispatch_stamp());
219 class TrackedOp
: public boost::intrusive::list_base_hook
<> {
221 friend class OpHistory
;
222 friend class OpTracker
;
224 boost::intrusive::list_member_hook
<> tracker_item
;
227 typedef boost::intrusive::list
<
229 boost::intrusive::member_hook
<
231 boost::intrusive::list_member_hook
<>,
232 &TrackedOp::tracker_item
> > tracked_op_list_t
;
234 // for use when clearing lists. e.g.,
235 // ls.clear_and_dispose(TrackedOp::Putter());
237 void operator()(TrackedOp
*op
) {
243 OpTracker
*tracker
; ///< the tracker we are associated with
244 std::atomic_int nref
= {0}; ///< ref count
246 utime_t initiated_at
;
252 Event(utime_t t
, std::string_view s
) : stamp(t
), str(s
) {}
254 int compare(const char *s
) const {
255 return str
.compare(s
);
258 const char *c_str() const {
262 void dump(ceph::Formatter
*f
) const {
263 f
->dump_stream("time") << stamp
;
264 f
->dump_string("event", str
);
268 std::vector
<Event
> events
; ///< std::list of events and their times
269 mutable ceph::mutex lock
= ceph::make_mutex("TrackedOp::lock"); ///< to protect the events list
270 uint64_t seq
= 0; ///< a unique value std::set by the OpTracker
272 uint32_t warn_interval_multiplier
= 1; //< limits output of a given op warning
279 std::atomic
<int> state
= {STATE_UNTRACKED
};
281 mutable std::string desc_str
; ///< protected by lock
282 mutable const char *desc
= nullptr; ///< readable without lock
283 mutable std::atomic
<bool> want_new_desc
= {false};
285 TrackedOp(OpTracker
*_tracker
, const utime_t
& initiated
) :
287 initiated_at(initiated
)
289 events
.reserve(OPTRACKER_PREALLOC_EVENTS
);
292 /// output any type-specific data you want to get when dump() is called
293 virtual void _dump(ceph::Formatter
*f
) const {}
294 /// if you want something else to happen when events are marked, implement
295 virtual void _event_marked() {}
296 /// return a unique descriptor of the Op; eg the message it's attached to
297 virtual void _dump_op_descriptor_unlocked(std::ostream
& stream
) const = 0;
298 /// called when the last non-OpTracker reference is dropped
299 virtual void _unregistered() {}
301 virtual bool filter_out(const std::set
<std::string
>& filters
) { return true; }
304 ZTracer::Trace osd_trace
;
305 ZTracer::Trace pg_trace
;
306 ZTracer::Trace store_trace
;
307 ZTracer::Trace journal_trace
;
309 virtual ~TrackedOp() {}
316 auto nref_snap
= nref
.load();
317 if (nref_snap
== 1) {
318 switch (state
.load()) {
319 case STATE_UNTRACKED
:
326 tracker
->unregister_inflight_op(this);
328 if (!tracker
->is_tracking()) {
331 state
= TrackedOp::STATE_HISTORY
;
332 tracker
->record_history_op(
333 TrackedOpRef(this, /* add_ref = */ false));
344 } else if (!nref
.compare_exchange_weak(nref_snap
, nref_snap
- 1)) {
349 const char *get_desc() const {
350 if (!desc
|| want_new_desc
.load()) {
351 std::lock_guard
l(lock
);
357 void _gen_desc() const {
358 std::ostringstream ss
;
359 _dump_op_descriptor_unlocked(ss
);
361 desc
= desc_str
.c_str();
362 want_new_desc
= false;
366 want_new_desc
= true;
369 const utime_t
& get_initiated() const {
373 double get_duration() const {
374 std::lock_guard
l(lock
);
375 if (!events
.empty() && events
.rbegin()->compare("done") == 0)
376 return events
.rbegin()->stamp
- get_initiated();
378 return ceph_clock_now() - get_initiated();
381 void mark_event(std::string_view event
, utime_t stamp
=ceph_clock_now());
384 warn_interval_multiplier
= 0;
387 virtual std::string_view
state_string() const {
388 std::lock_guard
l(lock
);
389 return events
.empty() ? std::string_view() : std::string_view(events
.rbegin()->str
);
392 void dump(utime_t now
, ceph::Formatter
*f
) const;
394 void tracking_start() {
395 if (tracker
->register_inflight_op(this)) {
396 events
.emplace_back(initiated_at
, "initiated");
401 // ref counting via intrusive_ptr, with special behavior on final
402 // put for historical op tracking
403 friend void intrusive_ptr_add_ref(TrackedOp
*o
) {
406 friend void intrusive_ptr_release(TrackedOp
*o
) {