]> git.proxmox.com Git - ceph.git/blame - ceph/src/common/TrackedOp.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / common / TrackedOp.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * This is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License version 2.1, as published by the Free Software
9 * Foundation. See file COPYING.
10 * Copyright 2013 Inktank
11 */
12
13#include "TrackedOp.h"
7c673cae
FG
14
15#define dout_context cct
16#define dout_subsys ceph_subsys_optracker
17#undef dout_prefix
18#define dout_prefix _prefix(_dout)
19
f67539c2
TL
20using std::list;
21using std::make_pair;
22using std::ostream;
23using std::pair;
24using std::set;
25using std::string;
26using std::stringstream;
27
28using ceph::Formatter;
29
7c673cae
FG
30static ostream& _prefix(std::ostream* _dout)
31{
32 return *_dout << "-- op tracker -- ";
33}
34
11fdf7f2
TL
35void OpHistoryServiceThread::break_thread() {
36 queue_spinlock.lock();
37 _external_queue.clear();
38 _break_thread = true;
39 queue_spinlock.unlock();
40}
41
42void* OpHistoryServiceThread::entry() {
43 int sleep_time = 1000;
44 list<pair<utime_t, TrackedOpRef>> internal_queue;
45 while (1) {
46 queue_spinlock.lock();
47 if (_break_thread) {
48 queue_spinlock.unlock();
49 break;
50 }
51 internal_queue.swap(_external_queue);
52 queue_spinlock.unlock();
53 if (internal_queue.empty()) {
54 usleep(sleep_time);
55 if (sleep_time < 128000) {
56 sleep_time <<= 2;
57 }
58 } else {
59 sleep_time = 1000;
60 }
61
62 while (!internal_queue.empty()) {
63 pair<utime_t, TrackedOpRef> op = internal_queue.front();
64 _ophistory->_insert_delayed(op.first, op.second);
65 internal_queue.pop_front();
66 }
67 }
68 return nullptr;
69}
70
71
7c673cae
FG
72void OpHistory::on_shutdown()
73{
11fdf7f2
TL
74 opsvc.break_thread();
75 opsvc.join();
76 std::lock_guard history_lock(ops_history_lock);
7c673cae
FG
77 arrived.clear();
78 duration.clear();
79 slow_op.clear();
80 shutdown = true;
81}
82
11fdf7f2 83void OpHistory::_insert_delayed(const utime_t& now, TrackedOpRef op)
7c673cae 84{
11fdf7f2 85 std::lock_guard history_lock(ops_history_lock);
7c673cae
FG
86 if (shutdown)
87 return;
11fdf7f2
TL
88 double opduration = op->get_duration();
89 duration.insert(make_pair(opduration, op));
7c673cae 90 arrived.insert(make_pair(op->get_initiated(), op));
92f5a8d4 91 if (opduration >= history_slow_op_threshold.load())
7c673cae
FG
92 slow_op.insert(make_pair(op->get_initiated(), op));
93 cleanup(now);
94}
95
96void OpHistory::cleanup(utime_t now)
97{
98 while (arrived.size() &&
99 (now - arrived.begin()->first >
92f5a8d4 100 (double)(history_duration.load()))) {
7c673cae
FG
101 duration.erase(make_pair(
102 arrived.begin()->second->get_duration(),
103 arrived.begin()->second));
104 arrived.erase(arrived.begin());
105 }
106
92f5a8d4 107 while (duration.size() > history_size.load()) {
7c673cae
FG
108 arrived.erase(make_pair(
109 duration.begin()->second->get_initiated(),
110 duration.begin()->second));
111 duration.erase(duration.begin());
112 }
113
92f5a8d4 114 while (slow_op.size() > history_slow_op_size.load()) {
7c673cae
FG
115 slow_op.erase(make_pair(
116 slow_op.begin()->second->get_initiated(),
117 slow_op.begin()->second));
118 }
119}
120
11fdf7f2 121void OpHistory::dump_ops(utime_t now, Formatter *f, set<string> filters, bool by_duration)
7c673cae 122{
11fdf7f2 123 std::lock_guard history_lock(ops_history_lock);
7c673cae
FG
124 cleanup(now);
125 f->open_object_section("op_history");
92f5a8d4
TL
126 f->dump_int("size", history_size.load());
127 f->dump_int("duration", history_duration.load());
7c673cae
FG
128 {
129 f->open_array_section("ops");
11fdf7f2
TL
130 auto dump_fn = [&f, &now, &filters](auto begin_iter, auto end_iter) {
131 for (auto i=begin_iter; i!=end_iter; ++i) {
c07f9fc5
FG
132 if (!i->second->filter_out(filters))
133 continue;
7c673cae
FG
134 f->open_object_section("op");
135 i->second->dump(now, f);
136 f->close_section();
137 }
11fdf7f2
TL
138 };
139
140 if (by_duration) {
141 dump_fn(duration.rbegin(), duration.rend());
142 } else {
143 dump_fn(arrived.begin(), arrived.end());
7c673cae
FG
144 }
145 f->close_section();
146 }
147 f->close_section();
148}
149
150struct ShardedTrackingData {
11fdf7f2 151 ceph::mutex ops_in_flight_lock_sharded;
7c673cae 152 TrackedOp::tracked_op_list_t ops_in_flight_sharded;
11fdf7f2
TL
153 explicit ShardedTrackingData(string lock_name)
154 : ops_in_flight_lock_sharded(ceph::make_mutex(lock_name)) {}
7c673cae
FG
155};
156
157OpTracker::OpTracker(CephContext *cct_, bool tracking, uint32_t num_shards):
158 seq(0),
159 num_optracker_shards(num_shards),
160 complaint_time(0), log_threshold(0),
161 tracking_enabled(tracking),
9f95a23c 162 cct(cct_) {
7c673cae
FG
163 for (uint32_t i = 0; i < num_optracker_shards; i++) {
164 char lock_name[32] = {0};
11fdf7f2 165 snprintf(lock_name, sizeof(lock_name), "%s:%" PRIu32, "OpTracker::ShardedLock", i);
7c673cae
FG
166 ShardedTrackingData* one_shard = new ShardedTrackingData(lock_name);
167 sharded_in_flight_list.push_back(one_shard);
168 }
169}
170
171OpTracker::~OpTracker() {
172 while (!sharded_in_flight_list.empty()) {
20effc67
TL
173 ShardedTrackingData* sdata = sharded_in_flight_list.back();
174 ceph_assert(NULL != sdata);
175 while (!sdata->ops_in_flight_sharded.empty()) {
176 {
177 std::lock_guard locker(sdata->ops_in_flight_lock_sharded);
178 sdata->ops_in_flight_sharded.pop_back();
179 }
180 }
11fdf7f2 181 ceph_assert((sharded_in_flight_list.back())->ops_in_flight_sharded.empty());
7c673cae
FG
182 delete sharded_in_flight_list.back();
183 sharded_in_flight_list.pop_back();
184 }
185}
186
c07f9fc5 187bool OpTracker::dump_historic_ops(Formatter *f, bool by_duration, set<string> filters)
7c673cae 188{
7c673cae
FG
189 if (!tracking_enabled)
190 return false;
191
9f95a23c 192 std::shared_lock l{lock};
7c673cae 193 utime_t now = ceph_clock_now();
11fdf7f2 194 history.dump_ops(now, f, filters, by_duration);
7c673cae
FG
195 return true;
196}
197
c07f9fc5 198void OpHistory::dump_slow_ops(utime_t now, Formatter *f, set<string> filters)
7c673cae 199{
11fdf7f2 200 std::lock_guard history_lock(ops_history_lock);
7c673cae
FG
201 cleanup(now);
202 f->open_object_section("OpHistory slow ops");
92f5a8d4
TL
203 f->dump_int("num to keep", history_slow_op_size.load());
204 f->dump_int("threshold to keep", history_slow_op_threshold.load());
7c673cae
FG
205 {
206 f->open_array_section("Ops");
207 for (set<pair<utime_t, TrackedOpRef> >::const_iterator i =
208 slow_op.begin();
209 i != slow_op.end();
210 ++i) {
c07f9fc5
FG
211 if (!i->second->filter_out(filters))
212 continue;
7c673cae
FG
213 f->open_object_section("Op");
214 i->second->dump(now, f);
215 f->close_section();
216 }
217 f->close_section();
218 }
219 f->close_section();
220}
221
c07f9fc5 222bool OpTracker::dump_historic_slow_ops(Formatter *f, set<string> filters)
7c673cae 223{
7c673cae
FG
224 if (!tracking_enabled)
225 return false;
226
9f95a23c 227 std::shared_lock l{lock};
7c673cae 228 utime_t now = ceph_clock_now();
c07f9fc5 229 history.dump_slow_ops(now, f, filters);
7c673cae
FG
230 return true;
231}
232
c07f9fc5 233bool OpTracker::dump_ops_in_flight(Formatter *f, bool print_only_blocked, set<string> filters)
7c673cae 234{
7c673cae
FG
235 if (!tracking_enabled)
236 return false;
237
9f95a23c 238 std::shared_lock l{lock};
7c673cae
FG
239 f->open_object_section("ops_in_flight"); // overall dump
240 uint64_t total_ops_in_flight = 0;
241 f->open_array_section("ops"); // list of TrackedOps
242 utime_t now = ceph_clock_now();
243 for (uint32_t i = 0; i < num_optracker_shards; i++) {
244 ShardedTrackingData* sdata = sharded_in_flight_list[i];
11fdf7f2
TL
245 ceph_assert(NULL != sdata);
246 std::lock_guard locker(sdata->ops_in_flight_lock_sharded);
7c673cae
FG
247 for (auto& op : sdata->ops_in_flight_sharded) {
248 if (print_only_blocked && (now - op.get_initiated() <= complaint_time))
c07f9fc5
FG
249 break;
250 if (!op.filter_out(filters))
251 continue;
7c673cae
FG
252 f->open_object_section("op");
253 op.dump(now, f);
254 f->close_section(); // this TrackedOp
255 total_ops_in_flight++;
256 }
257 }
258 f->close_section(); // list of TrackedOps
259 if (print_only_blocked) {
260 f->dump_float("complaint_time", complaint_time);
261 f->dump_int("num_blocked_ops", total_ops_in_flight);
20effc67 262 } else {
7c673cae 263 f->dump_int("num_ops", total_ops_in_flight);
20effc67 264 }
7c673cae
FG
265 f->close_section(); // overall dump
266 return true;
267}
268
269bool OpTracker::register_inflight_op(TrackedOp *i)
270{
7c673cae
FG
271 if (!tracking_enabled)
272 return false;
273
9f95a23c 274 std::shared_lock l{lock};
31f18b77 275 uint64_t current_seq = ++seq;
7c673cae
FG
276 uint32_t shard_index = current_seq % num_optracker_shards;
277 ShardedTrackingData* sdata = sharded_in_flight_list[shard_index];
11fdf7f2 278 ceph_assert(NULL != sdata);
7c673cae 279 {
11fdf7f2 280 std::lock_guard locker(sdata->ops_in_flight_lock_sharded);
7c673cae
FG
281 sdata->ops_in_flight_sharded.push_back(*i);
282 i->seq = current_seq;
283 }
284 return true;
285}
286
11fdf7f2 287void OpTracker::unregister_inflight_op(TrackedOp* const i)
7c673cae
FG
288{
289 // caller checks;
11fdf7f2 290 ceph_assert(i->state);
7c673cae
FG
291
292 uint32_t shard_index = i->seq % num_optracker_shards;
293 ShardedTrackingData* sdata = sharded_in_flight_list[shard_index];
11fdf7f2 294 ceph_assert(NULL != sdata);
7c673cae 295 {
11fdf7f2 296 std::lock_guard locker(sdata->ops_in_flight_lock_sharded);
7c673cae
FG
297 auto p = sdata->ops_in_flight_sharded.iterator_to(*i);
298 sdata->ops_in_flight_sharded.erase(p);
299 }
11fdf7f2 300}
7c673cae 301
11fdf7f2
TL
302void OpTracker::record_history_op(TrackedOpRef&& i)
303{
9f95a23c 304 std::shared_lock l{lock};
11fdf7f2 305 history.insert(ceph_clock_now(), std::move(i));
7c673cae
FG
306}
307
11fdf7f2
TL
308bool OpTracker::visit_ops_in_flight(utime_t* oldest_secs,
309 std::function<bool(TrackedOp&)>&& visit)
7c673cae 310{
7c673cae
FG
311 if (!tracking_enabled)
312 return false;
313
11fdf7f2 314 const utime_t now = ceph_clock_now();
7c673cae 315 utime_t oldest_op = now;
92f5a8d4
TL
316 // single representation of all inflight operations reunified
317 // from OpTracker's shards. TrackedOpRef extends the lifetime
318 // to carry the ops outside of the critical section, and thus
319 // allows to call the visitor without any lock being held.
320 // This simplifies the contract on API at the price of plenty
321 // additional moves and atomic ref-counting. This seems OK as
322 // `visit_ops_in_flight()` is definitely not intended for any
323 // hot path.
324 std::vector<TrackedOpRef> ops_in_flight;
7c673cae 325
9f95a23c 326 std::shared_lock l{lock};
11fdf7f2
TL
327 for (const auto sdata : sharded_in_flight_list) {
328 ceph_assert(sdata);
329 std::lock_guard locker(sdata->ops_in_flight_lock_sharded);
7c673cae
FG
330 if (!sdata->ops_in_flight_sharded.empty()) {
331 utime_t oldest_op_tmp =
332 sdata->ops_in_flight_sharded.front().get_initiated();
333 if (oldest_op_tmp < oldest_op) {
334 oldest_op = oldest_op_tmp;
335 }
336 }
92f5a8d4
TL
337 std::transform(std::begin(sdata->ops_in_flight_sharded),
338 std::end(sdata->ops_in_flight_sharded),
339 std::back_inserter(ops_in_flight),
340 [] (TrackedOp& op) { return TrackedOpRef(&op); });
7c673cae 341 }
92f5a8d4 342 if (ops_in_flight.empty())
7c673cae 343 return false;
11fdf7f2 344 *oldest_secs = now - oldest_op;
92f5a8d4 345 dout(10) << "ops_in_flight.size: " << ops_in_flight.size()
11fdf7f2 346 << "; oldest is " << *oldest_secs
7c673cae
FG
347 << " seconds old" << dendl;
348
11fdf7f2 349 if (*oldest_secs < complaint_time)
7c673cae
FG
350 return false;
351
92f5a8d4
TL
352 l.unlock();
353 for (auto& op : ops_in_flight) {
354 // `lock` neither `ops_in_flight_lock_sharded` should be held when
355 // calling the visitor. Otherwise `OSD::get_health_metrics()` can
356 // dead-lock due to the `~TrackedOp()` calling `record_history_op()`
357 // or `unregister_inflight_op()`.
358 if (!visit(*op))
359 break;
11fdf7f2
TL
360 }
361 return true;
362}
1adf2230 363
11fdf7f2
TL
364bool OpTracker::with_slow_ops_in_flight(utime_t* oldest_secs,
365 int* num_slow_ops,
366 int* num_warned_ops,
367 std::function<void(TrackedOp&)>&& on_warn)
368{
369 const utime_t now = ceph_clock_now();
370 auto too_old = now;
371 too_old -= complaint_time;
372 int slow = 0;
373 int warned = 0;
374 auto check = [&](TrackedOp& op) {
375 if (op.get_initiated() >= too_old) {
376 // no more slow ops in flight
377 return false;
378 }
379 if (!op.warn_interval_multiplier)
380 return true;
381 slow++;
382 if (warned >= log_threshold) {
383 // enough samples of slow ops
384 return true;
7c673cae 385 }
11fdf7f2
TL
386 auto time_to_complain = (op.get_initiated() +
387 complaint_time * op.warn_interval_multiplier);
388 if (time_to_complain >= now) {
389 // complain later if the op is still in flight
390 return true;
391 }
392 // will warn, increase counter
393 warned++;
394 on_warn(op);
395 return true;
396 };
397 if (visit_ops_in_flight(oldest_secs, check)) {
398 if (num_slow_ops) {
399 *num_slow_ops = slow;
400 *num_warned_ops = warned;
401 }
402 return true;
403 } else {
404 return false;
7c673cae 405 }
11fdf7f2 406}
7c673cae 407
11fdf7f2
TL
408bool OpTracker::check_ops_in_flight(std::string* summary,
409 std::vector<string> &warnings,
410 int *num_slow_ops)
411{
412 const utime_t now = ceph_clock_now();
413 auto too_old = now;
414 too_old -= complaint_time;
415 int warned = 0;
416 utime_t oldest_secs;
417 auto warn_on_slow_op = [&](TrackedOp& op) {
418 stringstream ss;
419 utime_t age = now - op.get_initiated();
420 ss << "slow request " << age << " seconds old, received at "
421 << op.get_initiated() << ": " << op.get_desc()
422 << " currently "
423 << op.state_string();
424 warnings.push_back(ss.str());
425 // only those that have been shown will backoff
426 op.warn_interval_multiplier *= 2;
427 };
428 int slow = 0;
429 if (with_slow_ops_in_flight(&oldest_secs, &slow, &warned, warn_on_slow_op) &&
430 slow > 0) {
7c673cae 431 stringstream ss;
11fdf7f2
TL
432 ss << slow << " slow requests, "
433 << warned << " included below; oldest blocked for > "
7c673cae 434 << oldest_secs << " secs";
11fdf7f2
TL
435 *summary = ss.str();
436 if (num_slow_ops) {
437 *num_slow_ops = slow;
438 }
439 return true;
440 } else {
441 return false;
7c673cae 442 }
7c673cae
FG
443}
444
445void OpTracker::get_age_ms_histogram(pow2_hist_t *h)
446{
447 h->clear();
448 utime_t now = ceph_clock_now();
449
450 for (uint32_t iter = 0; iter < num_optracker_shards; iter++) {
451 ShardedTrackingData* sdata = sharded_in_flight_list[iter];
11fdf7f2
TL
452 ceph_assert(NULL != sdata);
453 std::lock_guard locker(sdata->ops_in_flight_lock_sharded);
7c673cae
FG
454
455 for (auto& i : sdata->ops_in_flight_sharded) {
456 utime_t age = now - i.get_initiated();
457 uint32_t ms = (long)(age * 1000.0);
458 h->add(ms);
459 }
460 }
461}
462
463
464#undef dout_context
465#define dout_context tracker->cct
466
11fdf7f2 467void TrackedOp::mark_event(std::string_view event, utime_t stamp)
7c673cae
FG
468{
469 if (!state)
470 return;
471
472 {
11fdf7f2
TL
473 std::lock_guard l(lock);
474 events.emplace_back(stamp, event);
7c673cae 475 }
c07f9fc5 476 dout(6) << " seq: " << seq
7c673cae
FG
477 << ", time: " << stamp
478 << ", event: " << event
479 << ", op: " << get_desc()
480 << dendl;
481 _event_marked();
482}
483
484void TrackedOp::dump(utime_t now, Formatter *f) const
485{
486 // Ignore if still in the constructor
487 if (!state)
488 return;
489 f->dump_string("description", get_desc());
490 f->dump_stream("initiated_at") << get_initiated();
491 f->dump_float("age", now - get_initiated());
492 f->dump_float("duration", get_duration());
493 {
c07f9fc5 494 f->open_object_section("type_data");
7c673cae
FG
495 _dump(f);
496 f->close_section();
497 }
498}