]> git.proxmox.com Git - ceph.git/blame - ceph/src/common/TrackedOp.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / common / TrackedOp.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * This is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License version 2.1, as published by the Free Software
9 * Foundation. See file COPYING.
10 * Copyright 2013 Inktank
11 */
12
13#include "TrackedOp.h"
7c673cae
FG
14
15#define dout_context cct
16#define dout_subsys ceph_subsys_optracker
17#undef dout_prefix
18#define dout_prefix _prefix(_dout)
19
f67539c2
TL
20using std::list;
21using std::make_pair;
22using std::ostream;
23using std::pair;
24using std::set;
25using std::string;
26using std::stringstream;
27
28using ceph::Formatter;
29
7c673cae
FG
30static ostream& _prefix(std::ostream* _dout)
31{
32 return *_dout << "-- op tracker -- ";
33}
34
11fdf7f2
TL
35void OpHistoryServiceThread::break_thread() {
36 queue_spinlock.lock();
37 _external_queue.clear();
38 _break_thread = true;
39 queue_spinlock.unlock();
40}
41
42void* OpHistoryServiceThread::entry() {
43 int sleep_time = 1000;
44 list<pair<utime_t, TrackedOpRef>> internal_queue;
45 while (1) {
46 queue_spinlock.lock();
47 if (_break_thread) {
48 queue_spinlock.unlock();
49 break;
50 }
51 internal_queue.swap(_external_queue);
52 queue_spinlock.unlock();
53 if (internal_queue.empty()) {
54 usleep(sleep_time);
55 if (sleep_time < 128000) {
56 sleep_time <<= 2;
57 }
58 } else {
59 sleep_time = 1000;
60 }
61
62 while (!internal_queue.empty()) {
63 pair<utime_t, TrackedOpRef> op = internal_queue.front();
64 _ophistory->_insert_delayed(op.first, op.second);
65 internal_queue.pop_front();
66 }
67 }
68 return nullptr;
69}
70
71
7c673cae
FG
72void OpHistory::on_shutdown()
73{
11fdf7f2
TL
74 opsvc.break_thread();
75 opsvc.join();
76 std::lock_guard history_lock(ops_history_lock);
7c673cae
FG
77 arrived.clear();
78 duration.clear();
79 slow_op.clear();
80 shutdown = true;
81}
82
11fdf7f2 83void OpHistory::_insert_delayed(const utime_t& now, TrackedOpRef op)
7c673cae 84{
11fdf7f2 85 std::lock_guard history_lock(ops_history_lock);
7c673cae
FG
86 if (shutdown)
87 return;
11fdf7f2
TL
88 double opduration = op->get_duration();
89 duration.insert(make_pair(opduration, op));
7c673cae 90 arrived.insert(make_pair(op->get_initiated(), op));
1e59de90 91 if (opduration >= history_slow_op_threshold.load()) {
7c673cae 92 slow_op.insert(make_pair(op->get_initiated(), op));
1e59de90
TL
93 logger->inc(l_osd_slow_op_count);
94 }
7c673cae
FG
95 cleanup(now);
96}
97
98void OpHistory::cleanup(utime_t now)
99{
100 while (arrived.size() &&
101 (now - arrived.begin()->first >
92f5a8d4 102 (double)(history_duration.load()))) {
7c673cae
FG
103 duration.erase(make_pair(
104 arrived.begin()->second->get_duration(),
105 arrived.begin()->second));
106 arrived.erase(arrived.begin());
107 }
108
92f5a8d4 109 while (duration.size() > history_size.load()) {
7c673cae
FG
110 arrived.erase(make_pair(
111 duration.begin()->second->get_initiated(),
112 duration.begin()->second));
113 duration.erase(duration.begin());
114 }
115
92f5a8d4 116 while (slow_op.size() > history_slow_op_size.load()) {
7c673cae
FG
117 slow_op.erase(make_pair(
118 slow_op.begin()->second->get_initiated(),
119 slow_op.begin()->second));
120 }
121}
122
11fdf7f2 123void OpHistory::dump_ops(utime_t now, Formatter *f, set<string> filters, bool by_duration)
7c673cae 124{
11fdf7f2 125 std::lock_guard history_lock(ops_history_lock);
7c673cae
FG
126 cleanup(now);
127 f->open_object_section("op_history");
92f5a8d4
TL
128 f->dump_int("size", history_size.load());
129 f->dump_int("duration", history_duration.load());
7c673cae
FG
130 {
131 f->open_array_section("ops");
11fdf7f2
TL
132 auto dump_fn = [&f, &now, &filters](auto begin_iter, auto end_iter) {
133 for (auto i=begin_iter; i!=end_iter; ++i) {
c07f9fc5
FG
134 if (!i->second->filter_out(filters))
135 continue;
7c673cae
FG
136 f->open_object_section("op");
137 i->second->dump(now, f);
138 f->close_section();
139 }
11fdf7f2
TL
140 };
141
142 if (by_duration) {
143 dump_fn(duration.rbegin(), duration.rend());
144 } else {
145 dump_fn(arrived.begin(), arrived.end());
7c673cae
FG
146 }
147 f->close_section();
148 }
149 f->close_section();
150}
151
152struct ShardedTrackingData {
11fdf7f2 153 ceph::mutex ops_in_flight_lock_sharded;
7c673cae 154 TrackedOp::tracked_op_list_t ops_in_flight_sharded;
11fdf7f2
TL
155 explicit ShardedTrackingData(string lock_name)
156 : ops_in_flight_lock_sharded(ceph::make_mutex(lock_name)) {}
7c673cae
FG
157};
158
159OpTracker::OpTracker(CephContext *cct_, bool tracking, uint32_t num_shards):
160 seq(0),
1e59de90 161 history(cct_),
7c673cae
FG
162 num_optracker_shards(num_shards),
163 complaint_time(0), log_threshold(0),
164 tracking_enabled(tracking),
9f95a23c 165 cct(cct_) {
7c673cae 166 for (uint32_t i = 0; i < num_optracker_shards; i++) {
1e59de90 167 char lock_name[34] = {0};
11fdf7f2 168 snprintf(lock_name, sizeof(lock_name), "%s:%" PRIu32, "OpTracker::ShardedLock", i);
7c673cae
FG
169 ShardedTrackingData* one_shard = new ShardedTrackingData(lock_name);
170 sharded_in_flight_list.push_back(one_shard);
171 }
172}
173
174OpTracker::~OpTracker() {
175 while (!sharded_in_flight_list.empty()) {
20effc67
TL
176 ShardedTrackingData* sdata = sharded_in_flight_list.back();
177 ceph_assert(NULL != sdata);
178 while (!sdata->ops_in_flight_sharded.empty()) {
179 {
180 std::lock_guard locker(sdata->ops_in_flight_lock_sharded);
181 sdata->ops_in_flight_sharded.pop_back();
182 }
183 }
11fdf7f2 184 ceph_assert((sharded_in_flight_list.back())->ops_in_flight_sharded.empty());
7c673cae
FG
185 delete sharded_in_flight_list.back();
186 sharded_in_flight_list.pop_back();
187 }
188}
189
c07f9fc5 190bool OpTracker::dump_historic_ops(Formatter *f, bool by_duration, set<string> filters)
7c673cae 191{
7c673cae
FG
192 if (!tracking_enabled)
193 return false;
194
9f95a23c 195 std::shared_lock l{lock};
7c673cae 196 utime_t now = ceph_clock_now();
11fdf7f2 197 history.dump_ops(now, f, filters, by_duration);
7c673cae
FG
198 return true;
199}
200
c07f9fc5 201void OpHistory::dump_slow_ops(utime_t now, Formatter *f, set<string> filters)
7c673cae 202{
11fdf7f2 203 std::lock_guard history_lock(ops_history_lock);
7c673cae
FG
204 cleanup(now);
205 f->open_object_section("OpHistory slow ops");
92f5a8d4
TL
206 f->dump_int("num to keep", history_slow_op_size.load());
207 f->dump_int("threshold to keep", history_slow_op_threshold.load());
7c673cae
FG
208 {
209 f->open_array_section("Ops");
210 for (set<pair<utime_t, TrackedOpRef> >::const_iterator i =
211 slow_op.begin();
212 i != slow_op.end();
213 ++i) {
c07f9fc5
FG
214 if (!i->second->filter_out(filters))
215 continue;
7c673cae
FG
216 f->open_object_section("Op");
217 i->second->dump(now, f);
218 f->close_section();
219 }
220 f->close_section();
221 }
222 f->close_section();
223}
224
c07f9fc5 225bool OpTracker::dump_historic_slow_ops(Formatter *f, set<string> filters)
7c673cae 226{
7c673cae
FG
227 if (!tracking_enabled)
228 return false;
229
9f95a23c 230 std::shared_lock l{lock};
7c673cae 231 utime_t now = ceph_clock_now();
c07f9fc5 232 history.dump_slow_ops(now, f, filters);
7c673cae
FG
233 return true;
234}
235
1e59de90 236bool OpTracker::dump_ops_in_flight(Formatter *f, bool print_only_blocked, set<string> filters, bool count_only)
7c673cae 237{
7c673cae
FG
238 if (!tracking_enabled)
239 return false;
240
9f95a23c 241 std::shared_lock l{lock};
7c673cae
FG
242 f->open_object_section("ops_in_flight"); // overall dump
243 uint64_t total_ops_in_flight = 0;
1e59de90
TL
244
245 if (!count_only) {
246 f->open_array_section("ops"); // list of TrackedOps
247 }
248
7c673cae
FG
249 utime_t now = ceph_clock_now();
250 for (uint32_t i = 0; i < num_optracker_shards; i++) {
251 ShardedTrackingData* sdata = sharded_in_flight_list[i];
11fdf7f2
TL
252 ceph_assert(NULL != sdata);
253 std::lock_guard locker(sdata->ops_in_flight_lock_sharded);
7c673cae
FG
254 for (auto& op : sdata->ops_in_flight_sharded) {
255 if (print_only_blocked && (now - op.get_initiated() <= complaint_time))
c07f9fc5
FG
256 break;
257 if (!op.filter_out(filters))
258 continue;
1e59de90
TL
259
260 if (!count_only) {
261 f->open_object_section("op");
262 op.dump(now, f);
263 f->close_section(); // this TrackedOp
264 }
265
7c673cae
FG
266 total_ops_in_flight++;
267 }
268 }
1e59de90
TL
269
270 if (!count_only) {
271 f->close_section(); // list of TrackedOps
272 }
273
7c673cae
FG
274 if (print_only_blocked) {
275 f->dump_float("complaint_time", complaint_time);
276 f->dump_int("num_blocked_ops", total_ops_in_flight);
20effc67 277 } else {
7c673cae 278 f->dump_int("num_ops", total_ops_in_flight);
20effc67 279 }
7c673cae
FG
280 f->close_section(); // overall dump
281 return true;
282}
283
284bool OpTracker::register_inflight_op(TrackedOp *i)
285{
7c673cae
FG
286 if (!tracking_enabled)
287 return false;
288
9f95a23c 289 std::shared_lock l{lock};
31f18b77 290 uint64_t current_seq = ++seq;
7c673cae
FG
291 uint32_t shard_index = current_seq % num_optracker_shards;
292 ShardedTrackingData* sdata = sharded_in_flight_list[shard_index];
11fdf7f2 293 ceph_assert(NULL != sdata);
7c673cae 294 {
11fdf7f2 295 std::lock_guard locker(sdata->ops_in_flight_lock_sharded);
7c673cae
FG
296 sdata->ops_in_flight_sharded.push_back(*i);
297 i->seq = current_seq;
298 }
299 return true;
300}
301
11fdf7f2 302void OpTracker::unregister_inflight_op(TrackedOp* const i)
7c673cae
FG
303{
304 // caller checks;
11fdf7f2 305 ceph_assert(i->state);
7c673cae
FG
306
307 uint32_t shard_index = i->seq % num_optracker_shards;
308 ShardedTrackingData* sdata = sharded_in_flight_list[shard_index];
11fdf7f2 309 ceph_assert(NULL != sdata);
7c673cae 310 {
11fdf7f2 311 std::lock_guard locker(sdata->ops_in_flight_lock_sharded);
7c673cae
FG
312 auto p = sdata->ops_in_flight_sharded.iterator_to(*i);
313 sdata->ops_in_flight_sharded.erase(p);
314 }
11fdf7f2 315}
7c673cae 316
11fdf7f2
TL
317void OpTracker::record_history_op(TrackedOpRef&& i)
318{
9f95a23c 319 std::shared_lock l{lock};
11fdf7f2 320 history.insert(ceph_clock_now(), std::move(i));
7c673cae
FG
321}
322
11fdf7f2
TL
323bool OpTracker::visit_ops_in_flight(utime_t* oldest_secs,
324 std::function<bool(TrackedOp&)>&& visit)
7c673cae 325{
7c673cae
FG
326 if (!tracking_enabled)
327 return false;
328
11fdf7f2 329 const utime_t now = ceph_clock_now();
7c673cae 330 utime_t oldest_op = now;
92f5a8d4
TL
331 // single representation of all inflight operations reunified
332 // from OpTracker's shards. TrackedOpRef extends the lifetime
333 // to carry the ops outside of the critical section, and thus
334 // allows to call the visitor without any lock being held.
335 // This simplifies the contract on API at the price of plenty
336 // additional moves and atomic ref-counting. This seems OK as
337 // `visit_ops_in_flight()` is definitely not intended for any
338 // hot path.
339 std::vector<TrackedOpRef> ops_in_flight;
7c673cae 340
9f95a23c 341 std::shared_lock l{lock};
11fdf7f2
TL
342 for (const auto sdata : sharded_in_flight_list) {
343 ceph_assert(sdata);
344 std::lock_guard locker(sdata->ops_in_flight_lock_sharded);
7c673cae
FG
345 if (!sdata->ops_in_flight_sharded.empty()) {
346 utime_t oldest_op_tmp =
347 sdata->ops_in_flight_sharded.front().get_initiated();
348 if (oldest_op_tmp < oldest_op) {
349 oldest_op = oldest_op_tmp;
350 }
351 }
92f5a8d4
TL
352 std::transform(std::begin(sdata->ops_in_flight_sharded),
353 std::end(sdata->ops_in_flight_sharded),
354 std::back_inserter(ops_in_flight),
355 [] (TrackedOp& op) { return TrackedOpRef(&op); });
7c673cae 356 }
92f5a8d4 357 if (ops_in_flight.empty())
7c673cae 358 return false;
11fdf7f2 359 *oldest_secs = now - oldest_op;
92f5a8d4 360 dout(10) << "ops_in_flight.size: " << ops_in_flight.size()
11fdf7f2 361 << "; oldest is " << *oldest_secs
7c673cae
FG
362 << " seconds old" << dendl;
363
11fdf7f2 364 if (*oldest_secs < complaint_time)
7c673cae
FG
365 return false;
366
92f5a8d4
TL
367 l.unlock();
368 for (auto& op : ops_in_flight) {
369 // `lock` neither `ops_in_flight_lock_sharded` should be held when
370 // calling the visitor. Otherwise `OSD::get_health_metrics()` can
371 // dead-lock due to the `~TrackedOp()` calling `record_history_op()`
372 // or `unregister_inflight_op()`.
373 if (!visit(*op))
374 break;
11fdf7f2
TL
375 }
376 return true;
377}
1adf2230 378
11fdf7f2
TL
379bool OpTracker::with_slow_ops_in_flight(utime_t* oldest_secs,
380 int* num_slow_ops,
381 int* num_warned_ops,
382 std::function<void(TrackedOp&)>&& on_warn)
383{
384 const utime_t now = ceph_clock_now();
385 auto too_old = now;
386 too_old -= complaint_time;
387 int slow = 0;
388 int warned = 0;
389 auto check = [&](TrackedOp& op) {
390 if (op.get_initiated() >= too_old) {
391 // no more slow ops in flight
392 return false;
393 }
394 if (!op.warn_interval_multiplier)
395 return true;
396 slow++;
397 if (warned >= log_threshold) {
398 // enough samples of slow ops
399 return true;
7c673cae 400 }
11fdf7f2
TL
401 auto time_to_complain = (op.get_initiated() +
402 complaint_time * op.warn_interval_multiplier);
403 if (time_to_complain >= now) {
404 // complain later if the op is still in flight
405 return true;
406 }
407 // will warn, increase counter
408 warned++;
409 on_warn(op);
410 return true;
411 };
412 if (visit_ops_in_flight(oldest_secs, check)) {
413 if (num_slow_ops) {
414 *num_slow_ops = slow;
415 *num_warned_ops = warned;
416 }
417 return true;
418 } else {
419 return false;
7c673cae 420 }
11fdf7f2 421}
7c673cae 422
11fdf7f2
TL
423bool OpTracker::check_ops_in_flight(std::string* summary,
424 std::vector<string> &warnings,
425 int *num_slow_ops)
426{
427 const utime_t now = ceph_clock_now();
428 auto too_old = now;
429 too_old -= complaint_time;
430 int warned = 0;
431 utime_t oldest_secs;
432 auto warn_on_slow_op = [&](TrackedOp& op) {
433 stringstream ss;
434 utime_t age = now - op.get_initiated();
435 ss << "slow request " << age << " seconds old, received at "
436 << op.get_initiated() << ": " << op.get_desc()
437 << " currently "
438 << op.state_string();
439 warnings.push_back(ss.str());
440 // only those that have been shown will backoff
441 op.warn_interval_multiplier *= 2;
442 };
443 int slow = 0;
444 if (with_slow_ops_in_flight(&oldest_secs, &slow, &warned, warn_on_slow_op) &&
445 slow > 0) {
7c673cae 446 stringstream ss;
11fdf7f2
TL
447 ss << slow << " slow requests, "
448 << warned << " included below; oldest blocked for > "
7c673cae 449 << oldest_secs << " secs";
11fdf7f2
TL
450 *summary = ss.str();
451 if (num_slow_ops) {
452 *num_slow_ops = slow;
453 }
454 return true;
455 } else {
456 return false;
7c673cae 457 }
7c673cae
FG
458}
459
460void OpTracker::get_age_ms_histogram(pow2_hist_t *h)
461{
462 h->clear();
463 utime_t now = ceph_clock_now();
464
465 for (uint32_t iter = 0; iter < num_optracker_shards; iter++) {
466 ShardedTrackingData* sdata = sharded_in_flight_list[iter];
11fdf7f2
TL
467 ceph_assert(NULL != sdata);
468 std::lock_guard locker(sdata->ops_in_flight_lock_sharded);
7c673cae
FG
469
470 for (auto& i : sdata->ops_in_flight_sharded) {
471 utime_t age = now - i.get_initiated();
472 uint32_t ms = (long)(age * 1000.0);
473 h->add(ms);
474 }
475 }
476}
477
478
479#undef dout_context
480#define dout_context tracker->cct
481
11fdf7f2 482void TrackedOp::mark_event(std::string_view event, utime_t stamp)
7c673cae
FG
483{
484 if (!state)
485 return;
486
487 {
11fdf7f2
TL
488 std::lock_guard l(lock);
489 events.emplace_back(stamp, event);
7c673cae 490 }
c07f9fc5 491 dout(6) << " seq: " << seq
7c673cae
FG
492 << ", time: " << stamp
493 << ", event: " << event
494 << ", op: " << get_desc()
495 << dendl;
496 _event_marked();
497}
498
499void TrackedOp::dump(utime_t now, Formatter *f) const
500{
501 // Ignore if still in the constructor
502 if (!state)
503 return;
504 f->dump_string("description", get_desc());
505 f->dump_stream("initiated_at") << get_initiated();
506 f->dump_float("age", now - get_initiated());
507 f->dump_float("duration", get_duration());
508 {
c07f9fc5 509 f->open_object_section("type_data");
7c673cae
FG
510 _dump(f);
511 f->close_section();
512 }
513}