1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * This is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License version 2.1, as published by the Free Software
9 * Foundation. See file COPYING.
10 * Copyright 2013 Inktank
13 #include "TrackedOp.h"
15 #define dout_context cct
16 #define dout_subsys ceph_subsys_optracker
18 #define dout_prefix _prefix(_dout)
26 using std::stringstream
;
28 using ceph::Formatter
;
30 static ostream
& _prefix(std::ostream
* _dout
)
32 return *_dout
<< "-- op tracker -- ";
35 void OpHistoryServiceThread::break_thread() {
36 queue_spinlock
.lock();
37 _external_queue
.clear();
39 queue_spinlock
.unlock();
42 void* OpHistoryServiceThread::entry() {
43 int sleep_time
= 1000;
44 list
<pair
<utime_t
, TrackedOpRef
>> internal_queue
;
46 queue_spinlock
.lock();
48 queue_spinlock
.unlock();
51 internal_queue
.swap(_external_queue
);
52 queue_spinlock
.unlock();
53 if (internal_queue
.empty()) {
55 if (sleep_time
< 128000) {
62 while (!internal_queue
.empty()) {
63 pair
<utime_t
, TrackedOpRef
> op
= internal_queue
.front();
64 _ophistory
->_insert_delayed(op
.first
, op
.second
);
65 internal_queue
.pop_front();
72 void OpHistory::on_shutdown()
76 std::lock_guard
history_lock(ops_history_lock
);
83 void OpHistory::_insert_delayed(const utime_t
& now
, TrackedOpRef op
)
85 std::lock_guard
history_lock(ops_history_lock
);
88 double opduration
= op
->get_duration();
89 duration
.insert(make_pair(opduration
, op
));
90 arrived
.insert(make_pair(op
->get_initiated(), op
));
91 if (opduration
>= history_slow_op_threshold
.load()) {
92 slow_op
.insert(make_pair(op
->get_initiated(), op
));
93 logger
->inc(l_osd_slow_op_count
);
98 void OpHistory::cleanup(utime_t now
)
100 while (arrived
.size() &&
101 (now
- arrived
.begin()->first
>
102 (double)(history_duration
.load()))) {
103 duration
.erase(make_pair(
104 arrived
.begin()->second
->get_duration(),
105 arrived
.begin()->second
));
106 arrived
.erase(arrived
.begin());
109 while (duration
.size() > history_size
.load()) {
110 arrived
.erase(make_pair(
111 duration
.begin()->second
->get_initiated(),
112 duration
.begin()->second
));
113 duration
.erase(duration
.begin());
116 while (slow_op
.size() > history_slow_op_size
.load()) {
117 slow_op
.erase(make_pair(
118 slow_op
.begin()->second
->get_initiated(),
119 slow_op
.begin()->second
));
123 void OpHistory::dump_ops(utime_t now
, Formatter
*f
, set
<string
> filters
, bool by_duration
)
125 std::lock_guard
history_lock(ops_history_lock
);
127 f
->open_object_section("op_history");
128 f
->dump_int("size", history_size
.load());
129 f
->dump_int("duration", history_duration
.load());
131 f
->open_array_section("ops");
132 auto dump_fn
= [&f
, &now
, &filters
](auto begin_iter
, auto end_iter
) {
133 for (auto i
=begin_iter
; i
!=end_iter
; ++i
) {
134 if (!i
->second
->filter_out(filters
))
136 f
->open_object_section("op");
137 i
->second
->dump(now
, f
, OpTracker::default_dumper
);
143 dump_fn(duration
.rbegin(), duration
.rend());
145 dump_fn(arrived
.begin(), arrived
.end());
152 struct ShardedTrackingData
{
153 ceph::mutex ops_in_flight_lock_sharded
;
154 TrackedOp::tracked_op_list_t ops_in_flight_sharded
;
155 explicit ShardedTrackingData(string lock_name
)
156 : ops_in_flight_lock_sharded(ceph::make_mutex(lock_name
)) {}
159 OpTracker::OpTracker(CephContext
*cct_
, bool tracking
, uint32_t num_shards
):
162 num_optracker_shards(num_shards
),
163 complaint_time(0), log_threshold(0),
164 tracking_enabled(tracking
),
166 for (uint32_t i
= 0; i
< num_optracker_shards
; i
++) {
167 char lock_name
[34] = {0};
168 snprintf(lock_name
, sizeof(lock_name
), "%s:%" PRIu32
, "OpTracker::ShardedLock", i
);
169 ShardedTrackingData
* one_shard
= new ShardedTrackingData(lock_name
);
170 sharded_in_flight_list
.push_back(one_shard
);
174 OpTracker::~OpTracker() {
175 while (!sharded_in_flight_list
.empty()) {
176 ShardedTrackingData
* sdata
= sharded_in_flight_list
.back();
177 ceph_assert(NULL
!= sdata
);
178 while (!sdata
->ops_in_flight_sharded
.empty()) {
180 std::lock_guard
locker(sdata
->ops_in_flight_lock_sharded
);
181 sdata
->ops_in_flight_sharded
.pop_back();
184 ceph_assert((sharded_in_flight_list
.back())->ops_in_flight_sharded
.empty());
185 delete sharded_in_flight_list
.back();
186 sharded_in_flight_list
.pop_back();
190 bool OpTracker::dump_historic_ops(Formatter
*f
, bool by_duration
, set
<string
> filters
)
192 if (!tracking_enabled
)
195 std::shared_lock l
{lock
};
196 utime_t now
= ceph_clock_now();
197 history
.dump_ops(now
, f
, filters
, by_duration
);
201 void OpHistory::dump_slow_ops(utime_t now
, Formatter
*f
, set
<string
> filters
)
203 std::lock_guard
history_lock(ops_history_lock
);
205 f
->open_object_section("OpHistory slow ops");
206 f
->dump_int("num to keep", history_slow_op_size
.load());
207 f
->dump_int("threshold to keep", history_slow_op_threshold
.load());
209 f
->open_array_section("Ops");
210 for (set
<pair
<utime_t
, TrackedOpRef
> >::const_iterator i
=
214 if (!i
->second
->filter_out(filters
))
216 f
->open_object_section("Op");
217 i
->second
->dump(now
, f
, OpTracker::default_dumper
);
225 bool OpTracker::dump_historic_slow_ops(Formatter
*f
, set
<string
> filters
)
227 if (!tracking_enabled
)
230 std::shared_lock l
{lock
};
231 utime_t now
= ceph_clock_now();
232 history
.dump_slow_ops(now
, f
, filters
);
236 bool OpTracker::dump_ops_in_flight(Formatter
*f
, bool print_only_blocked
, set
<string
> filters
, bool count_only
, dumper lambda
)
238 if (!tracking_enabled
)
241 std::shared_lock l
{lock
};
242 f
->open_object_section("ops_in_flight"); // overall dump
243 uint64_t total_ops_in_flight
= 0;
246 f
->open_array_section("ops"); // list of TrackedOps
249 utime_t now
= ceph_clock_now();
250 for (uint32_t i
= 0; i
< num_optracker_shards
; i
++) {
251 ShardedTrackingData
* sdata
= sharded_in_flight_list
[i
];
252 ceph_assert(NULL
!= sdata
);
253 std::lock_guard
locker(sdata
->ops_in_flight_lock_sharded
);
254 for (auto& op
: sdata
->ops_in_flight_sharded
) {
255 if (print_only_blocked
&& (now
- op
.get_initiated() <= complaint_time
))
257 if (!op
.filter_out(filters
))
261 f
->open_object_section("op");
262 op
.dump(now
, f
, lambda
);
263 f
->close_section(); // this TrackedOp
266 total_ops_in_flight
++;
271 f
->close_section(); // list of TrackedOps
274 if (print_only_blocked
) {
275 f
->dump_float("complaint_time", complaint_time
);
276 f
->dump_int("num_blocked_ops", total_ops_in_flight
);
278 f
->dump_int("num_ops", total_ops_in_flight
);
280 f
->close_section(); // overall dump
284 bool OpTracker::register_inflight_op(TrackedOp
*i
)
286 if (!tracking_enabled
)
289 std::shared_lock l
{lock
};
290 uint64_t current_seq
= ++seq
;
291 uint32_t shard_index
= current_seq
% num_optracker_shards
;
292 ShardedTrackingData
* sdata
= sharded_in_flight_list
[shard_index
];
293 ceph_assert(NULL
!= sdata
);
295 std::lock_guard
locker(sdata
->ops_in_flight_lock_sharded
);
296 sdata
->ops_in_flight_sharded
.push_back(*i
);
297 i
->seq
= current_seq
;
302 void OpTracker::unregister_inflight_op(TrackedOp
* const i
)
305 ceph_assert(i
->state
);
307 uint32_t shard_index
= i
->seq
% num_optracker_shards
;
308 ShardedTrackingData
* sdata
= sharded_in_flight_list
[shard_index
];
309 ceph_assert(NULL
!= sdata
);
311 std::lock_guard
locker(sdata
->ops_in_flight_lock_sharded
);
312 auto p
= sdata
->ops_in_flight_sharded
.iterator_to(*i
);
313 sdata
->ops_in_flight_sharded
.erase(p
);
317 void OpTracker::record_history_op(TrackedOpRef
&& i
)
319 std::shared_lock l
{lock
};
320 history
.insert(ceph_clock_now(), std::move(i
));
323 bool OpTracker::visit_ops_in_flight(utime_t
* oldest_secs
,
324 std::function
<bool(TrackedOp
&)>&& visit
)
326 if (!tracking_enabled
)
329 const utime_t now
= ceph_clock_now();
330 utime_t oldest_op
= now
;
331 // single representation of all inflight operations reunified
332 // from OpTracker's shards. TrackedOpRef extends the lifetime
333 // to carry the ops outside of the critical section, and thus
334 // allows to call the visitor without any lock being held.
335 // This simplifies the contract on API at the price of plenty
336 // additional moves and atomic ref-counting. This seems OK as
337 // `visit_ops_in_flight()` is definitely not intended for any
339 std::vector
<TrackedOpRef
> ops_in_flight
;
341 std::shared_lock l
{lock
};
342 for (const auto sdata
: sharded_in_flight_list
) {
344 std::lock_guard
locker(sdata
->ops_in_flight_lock_sharded
);
345 if (!sdata
->ops_in_flight_sharded
.empty()) {
346 utime_t oldest_op_tmp
=
347 sdata
->ops_in_flight_sharded
.front().get_initiated();
348 if (oldest_op_tmp
< oldest_op
) {
349 oldest_op
= oldest_op_tmp
;
352 std::transform(std::begin(sdata
->ops_in_flight_sharded
),
353 std::end(sdata
->ops_in_flight_sharded
),
354 std::back_inserter(ops_in_flight
),
355 [] (TrackedOp
& op
) { return TrackedOpRef(&op
); });
357 if (ops_in_flight
.empty())
359 *oldest_secs
= now
- oldest_op
;
360 dout(10) << "ops_in_flight.size: " << ops_in_flight
.size()
361 << "; oldest is " << *oldest_secs
362 << " seconds old" << dendl
;
364 if (*oldest_secs
< complaint_time
)
368 for (auto& op
: ops_in_flight
) {
369 // `lock` neither `ops_in_flight_lock_sharded` should be held when
370 // calling the visitor. Otherwise `OSD::get_health_metrics()` can
371 // dead-lock due to the `~TrackedOp()` calling `record_history_op()`
372 // or `unregister_inflight_op()`.
379 bool OpTracker::with_slow_ops_in_flight(utime_t
* oldest_secs
,
382 std::function
<void(TrackedOp
&)>&& on_warn
)
384 const utime_t now
= ceph_clock_now();
386 too_old
-= complaint_time
;
389 auto check
= [&](TrackedOp
& op
) {
390 if (op
.get_initiated() >= too_old
) {
391 // no more slow ops in flight
394 if (!op
.warn_interval_multiplier
)
397 if (warned
>= log_threshold
) {
398 // enough samples of slow ops
401 auto time_to_complain
= (op
.get_initiated() +
402 complaint_time
* op
.warn_interval_multiplier
);
403 if (time_to_complain
>= now
) {
404 // complain later if the op is still in flight
407 // will warn, increase counter
412 if (visit_ops_in_flight(oldest_secs
, check
)) {
414 *num_slow_ops
= slow
;
415 *num_warned_ops
= warned
;
423 bool OpTracker::check_ops_in_flight(std::string
* summary
,
424 std::vector
<string
> &warnings
,
427 const utime_t now
= ceph_clock_now();
429 too_old
-= complaint_time
;
432 auto warn_on_slow_op
= [&](TrackedOp
& op
) {
434 utime_t age
= now
- op
.get_initiated();
435 ss
<< "slow request " << age
<< " seconds old, received at "
436 << op
.get_initiated() << ": " << op
.get_desc()
438 << op
.state_string();
439 warnings
.push_back(ss
.str());
440 // only those that have been shown will backoff
441 op
.warn_interval_multiplier
*= 2;
444 if (with_slow_ops_in_flight(&oldest_secs
, &slow
, &warned
, warn_on_slow_op
) &&
447 ss
<< slow
<< " slow requests, "
448 << warned
<< " included below; oldest blocked for > "
449 << oldest_secs
<< " secs";
452 *num_slow_ops
= slow
;
460 void OpTracker::get_age_ms_histogram(pow2_hist_t
*h
)
463 utime_t now
= ceph_clock_now();
465 for (uint32_t iter
= 0; iter
< num_optracker_shards
; iter
++) {
466 ShardedTrackingData
* sdata
= sharded_in_flight_list
[iter
];
467 ceph_assert(NULL
!= sdata
);
468 std::lock_guard
locker(sdata
->ops_in_flight_lock_sharded
);
470 for (auto& i
: sdata
->ops_in_flight_sharded
) {
471 utime_t age
= now
- i
.get_initiated();
472 uint32_t ms
= (long)(age
* 1000.0);
480 #define dout_context tracker->cct
482 void TrackedOp::mark_event(std::string_view event
, utime_t stamp
)
488 std::lock_guard
l(lock
);
489 events
.emplace_back(stamp
, event
);
491 dout(6) << " seq: " << seq
492 << ", time: " << stamp
493 << ", event: " << event
494 << ", op: " << get_desc()
499 void TrackedOp::dump(utime_t now
, Formatter
*f
, OpTracker::dumper lambda
) const
501 // Ignore if still in the constructor
504 f
->dump_string("description", get_desc());
505 f
->dump_stream("initiated_at") << get_initiated();
506 f
->dump_float("age", now
- get_initiated());
507 f
->dump_float("duration", get_duration());
509 f
->open_object_section("type_data");