]> git.proxmox.com Git - ceph.git/blob - ceph/src/common/TrackedOp.cc
bump version to 18.2.4-pve3
[ceph.git] / ceph / src / common / TrackedOp.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * This is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License version 2.1, as published by the Free Software
9 * Foundation. See file COPYING.
10 * Copyright 2013 Inktank
11 */
12
13 #include "TrackedOp.h"
14
15 #define dout_context cct
16 #define dout_subsys ceph_subsys_optracker
17 #undef dout_prefix
18 #define dout_prefix _prefix(_dout)
19
20 using std::list;
21 using std::make_pair;
22 using std::ostream;
23 using std::pair;
24 using std::set;
25 using std::string;
26 using std::stringstream;
27
28 using ceph::Formatter;
29
30 static ostream& _prefix(std::ostream* _dout)
31 {
32 return *_dout << "-- op tracker -- ";
33 }
34
35 void OpHistoryServiceThread::break_thread() {
36 queue_spinlock.lock();
37 _external_queue.clear();
38 _break_thread = true;
39 queue_spinlock.unlock();
40 }
41
42 void* OpHistoryServiceThread::entry() {
43 int sleep_time = 1000;
44 list<pair<utime_t, TrackedOpRef>> internal_queue;
45 while (1) {
46 queue_spinlock.lock();
47 if (_break_thread) {
48 queue_spinlock.unlock();
49 break;
50 }
51 internal_queue.swap(_external_queue);
52 queue_spinlock.unlock();
53 if (internal_queue.empty()) {
54 usleep(sleep_time);
55 if (sleep_time < 128000) {
56 sleep_time <<= 2;
57 }
58 } else {
59 sleep_time = 1000;
60 }
61
62 while (!internal_queue.empty()) {
63 pair<utime_t, TrackedOpRef> op = internal_queue.front();
64 _ophistory->_insert_delayed(op.first, op.second);
65 internal_queue.pop_front();
66 }
67 }
68 return nullptr;
69 }
70
71
72 void OpHistory::on_shutdown()
73 {
74 opsvc.break_thread();
75 opsvc.join();
76 std::lock_guard history_lock(ops_history_lock);
77 arrived.clear();
78 duration.clear();
79 slow_op.clear();
80 shutdown = true;
81 }
82
83 void OpHistory::_insert_delayed(const utime_t& now, TrackedOpRef op)
84 {
85 std::lock_guard history_lock(ops_history_lock);
86 if (shutdown)
87 return;
88 double opduration = op->get_duration();
89 duration.insert(make_pair(opduration, op));
90 arrived.insert(make_pair(op->get_initiated(), op));
91 if (opduration >= history_slow_op_threshold.load()) {
92 slow_op.insert(make_pair(op->get_initiated(), op));
93 logger->inc(l_osd_slow_op_count);
94 }
95 cleanup(now);
96 }
97
98 void OpHistory::cleanup(utime_t now)
99 {
100 while (arrived.size() &&
101 (now - arrived.begin()->first >
102 (double)(history_duration.load()))) {
103 duration.erase(make_pair(
104 arrived.begin()->second->get_duration(),
105 arrived.begin()->second));
106 arrived.erase(arrived.begin());
107 }
108
109 while (duration.size() > history_size.load()) {
110 arrived.erase(make_pair(
111 duration.begin()->second->get_initiated(),
112 duration.begin()->second));
113 duration.erase(duration.begin());
114 }
115
116 while (slow_op.size() > history_slow_op_size.load()) {
117 slow_op.erase(make_pair(
118 slow_op.begin()->second->get_initiated(),
119 slow_op.begin()->second));
120 }
121 }
122
123 void OpHistory::dump_ops(utime_t now, Formatter *f, set<string> filters, bool by_duration)
124 {
125 std::lock_guard history_lock(ops_history_lock);
126 cleanup(now);
127 f->open_object_section("op_history");
128 f->dump_int("size", history_size.load());
129 f->dump_int("duration", history_duration.load());
130 {
131 f->open_array_section("ops");
132 auto dump_fn = [&f, &now, &filters](auto begin_iter, auto end_iter) {
133 for (auto i=begin_iter; i!=end_iter; ++i) {
134 if (!i->second->filter_out(filters))
135 continue;
136 f->open_object_section("op");
137 i->second->dump(now, f, OpTracker::default_dumper);
138 f->close_section();
139 }
140 };
141
142 if (by_duration) {
143 dump_fn(duration.rbegin(), duration.rend());
144 } else {
145 dump_fn(arrived.begin(), arrived.end());
146 }
147 f->close_section();
148 }
149 f->close_section();
150 }
151
152 struct ShardedTrackingData {
153 ceph::mutex ops_in_flight_lock_sharded;
154 TrackedOp::tracked_op_list_t ops_in_flight_sharded;
155 explicit ShardedTrackingData(string lock_name)
156 : ops_in_flight_lock_sharded(ceph::make_mutex(lock_name)) {}
157 };
158
159 OpTracker::OpTracker(CephContext *cct_, bool tracking, uint32_t num_shards):
160 seq(0),
161 history(cct_),
162 num_optracker_shards(num_shards),
163 complaint_time(0), log_threshold(0),
164 tracking_enabled(tracking),
165 cct(cct_) {
166 for (uint32_t i = 0; i < num_optracker_shards; i++) {
167 char lock_name[34] = {0};
168 snprintf(lock_name, sizeof(lock_name), "%s:%" PRIu32, "OpTracker::ShardedLock", i);
169 ShardedTrackingData* one_shard = new ShardedTrackingData(lock_name);
170 sharded_in_flight_list.push_back(one_shard);
171 }
172 }
173
174 OpTracker::~OpTracker() {
175 while (!sharded_in_flight_list.empty()) {
176 ShardedTrackingData* sdata = sharded_in_flight_list.back();
177 ceph_assert(NULL != sdata);
178 while (!sdata->ops_in_flight_sharded.empty()) {
179 {
180 std::lock_guard locker(sdata->ops_in_flight_lock_sharded);
181 sdata->ops_in_flight_sharded.pop_back();
182 }
183 }
184 ceph_assert((sharded_in_flight_list.back())->ops_in_flight_sharded.empty());
185 delete sharded_in_flight_list.back();
186 sharded_in_flight_list.pop_back();
187 }
188 }
189
190 bool OpTracker::dump_historic_ops(Formatter *f, bool by_duration, set<string> filters)
191 {
192 if (!tracking_enabled)
193 return false;
194
195 std::shared_lock l{lock};
196 utime_t now = ceph_clock_now();
197 history.dump_ops(now, f, filters, by_duration);
198 return true;
199 }
200
201 void OpHistory::dump_slow_ops(utime_t now, Formatter *f, set<string> filters)
202 {
203 std::lock_guard history_lock(ops_history_lock);
204 cleanup(now);
205 f->open_object_section("OpHistory slow ops");
206 f->dump_int("num to keep", history_slow_op_size.load());
207 f->dump_int("threshold to keep", history_slow_op_threshold.load());
208 {
209 f->open_array_section("Ops");
210 for (set<pair<utime_t, TrackedOpRef> >::const_iterator i =
211 slow_op.begin();
212 i != slow_op.end();
213 ++i) {
214 if (!i->second->filter_out(filters))
215 continue;
216 f->open_object_section("Op");
217 i->second->dump(now, f, OpTracker::default_dumper);
218 f->close_section();
219 }
220 f->close_section();
221 }
222 f->close_section();
223 }
224
225 bool OpTracker::dump_historic_slow_ops(Formatter *f, set<string> filters)
226 {
227 if (!tracking_enabled)
228 return false;
229
230 std::shared_lock l{lock};
231 utime_t now = ceph_clock_now();
232 history.dump_slow_ops(now, f, filters);
233 return true;
234 }
235
236 bool OpTracker::dump_ops_in_flight(Formatter *f, bool print_only_blocked, set<string> filters, bool count_only, dumper lambda)
237 {
238 if (!tracking_enabled)
239 return false;
240
241 std::shared_lock l{lock};
242 f->open_object_section("ops_in_flight"); // overall dump
243 uint64_t total_ops_in_flight = 0;
244
245 if (!count_only) {
246 f->open_array_section("ops"); // list of TrackedOps
247 }
248
249 utime_t now = ceph_clock_now();
250 for (uint32_t i = 0; i < num_optracker_shards; i++) {
251 ShardedTrackingData* sdata = sharded_in_flight_list[i];
252 ceph_assert(NULL != sdata);
253 std::lock_guard locker(sdata->ops_in_flight_lock_sharded);
254 for (auto& op : sdata->ops_in_flight_sharded) {
255 if (print_only_blocked && (now - op.get_initiated() <= complaint_time))
256 break;
257 if (!op.filter_out(filters))
258 continue;
259
260 if (!count_only) {
261 f->open_object_section("op");
262 op.dump(now, f, lambda);
263 f->close_section(); // this TrackedOp
264 }
265
266 total_ops_in_flight++;
267 }
268 }
269
270 if (!count_only) {
271 f->close_section(); // list of TrackedOps
272 }
273
274 if (print_only_blocked) {
275 f->dump_float("complaint_time", complaint_time);
276 f->dump_int("num_blocked_ops", total_ops_in_flight);
277 } else {
278 f->dump_int("num_ops", total_ops_in_flight);
279 }
280 f->close_section(); // overall dump
281 return true;
282 }
283
284 bool OpTracker::register_inflight_op(TrackedOp *i)
285 {
286 if (!tracking_enabled)
287 return false;
288
289 std::shared_lock l{lock};
290 uint64_t current_seq = ++seq;
291 uint32_t shard_index = current_seq % num_optracker_shards;
292 ShardedTrackingData* sdata = sharded_in_flight_list[shard_index];
293 ceph_assert(NULL != sdata);
294 {
295 std::lock_guard locker(sdata->ops_in_flight_lock_sharded);
296 sdata->ops_in_flight_sharded.push_back(*i);
297 i->seq = current_seq;
298 }
299 return true;
300 }
301
302 void OpTracker::unregister_inflight_op(TrackedOp* const i)
303 {
304 // caller checks;
305 ceph_assert(i->state);
306
307 uint32_t shard_index = i->seq % num_optracker_shards;
308 ShardedTrackingData* sdata = sharded_in_flight_list[shard_index];
309 ceph_assert(NULL != sdata);
310 {
311 std::lock_guard locker(sdata->ops_in_flight_lock_sharded);
312 auto p = sdata->ops_in_flight_sharded.iterator_to(*i);
313 sdata->ops_in_flight_sharded.erase(p);
314 }
315 }
316
317 void OpTracker::record_history_op(TrackedOpRef&& i)
318 {
319 std::shared_lock l{lock};
320 history.insert(ceph_clock_now(), std::move(i));
321 }
322
323 bool OpTracker::visit_ops_in_flight(utime_t* oldest_secs,
324 std::function<bool(TrackedOp&)>&& visit)
325 {
326 if (!tracking_enabled)
327 return false;
328
329 const utime_t now = ceph_clock_now();
330 utime_t oldest_op = now;
331 // single representation of all inflight operations reunified
332 // from OpTracker's shards. TrackedOpRef extends the lifetime
333 // to carry the ops outside of the critical section, and thus
334 // allows to call the visitor without any lock being held.
335 // This simplifies the contract on API at the price of plenty
336 // additional moves and atomic ref-counting. This seems OK as
337 // `visit_ops_in_flight()` is definitely not intended for any
338 // hot path.
339 std::vector<TrackedOpRef> ops_in_flight;
340
341 std::shared_lock l{lock};
342 for (const auto sdata : sharded_in_flight_list) {
343 ceph_assert(sdata);
344 std::lock_guard locker(sdata->ops_in_flight_lock_sharded);
345 if (!sdata->ops_in_flight_sharded.empty()) {
346 utime_t oldest_op_tmp =
347 sdata->ops_in_flight_sharded.front().get_initiated();
348 if (oldest_op_tmp < oldest_op) {
349 oldest_op = oldest_op_tmp;
350 }
351 }
352 std::transform(std::begin(sdata->ops_in_flight_sharded),
353 std::end(sdata->ops_in_flight_sharded),
354 std::back_inserter(ops_in_flight),
355 [] (TrackedOp& op) { return TrackedOpRef(&op); });
356 }
357 if (ops_in_flight.empty())
358 return false;
359 *oldest_secs = now - oldest_op;
360 dout(10) << "ops_in_flight.size: " << ops_in_flight.size()
361 << "; oldest is " << *oldest_secs
362 << " seconds old" << dendl;
363
364 if (*oldest_secs < complaint_time)
365 return false;
366
367 l.unlock();
368 for (auto& op : ops_in_flight) {
369 // `lock` neither `ops_in_flight_lock_sharded` should be held when
370 // calling the visitor. Otherwise `OSD::get_health_metrics()` can
371 // dead-lock due to the `~TrackedOp()` calling `record_history_op()`
372 // or `unregister_inflight_op()`.
373 if (!visit(*op))
374 break;
375 }
376 return true;
377 }
378
379 bool OpTracker::with_slow_ops_in_flight(utime_t* oldest_secs,
380 int* num_slow_ops,
381 int* num_warned_ops,
382 std::function<void(TrackedOp&)>&& on_warn)
383 {
384 const utime_t now = ceph_clock_now();
385 auto too_old = now;
386 too_old -= complaint_time;
387 int slow = 0;
388 int warned = 0;
389 auto check = [&](TrackedOp& op) {
390 if (op.get_initiated() >= too_old) {
391 // no more slow ops in flight
392 return false;
393 }
394 if (!op.warn_interval_multiplier)
395 return true;
396 slow++;
397 if (warned >= log_threshold) {
398 // enough samples of slow ops
399 return true;
400 }
401 auto time_to_complain = (op.get_initiated() +
402 complaint_time * op.warn_interval_multiplier);
403 if (time_to_complain >= now) {
404 // complain later if the op is still in flight
405 return true;
406 }
407 // will warn, increase counter
408 warned++;
409 on_warn(op);
410 return true;
411 };
412 if (visit_ops_in_flight(oldest_secs, check)) {
413 if (num_slow_ops) {
414 *num_slow_ops = slow;
415 *num_warned_ops = warned;
416 }
417 return true;
418 } else {
419 return false;
420 }
421 }
422
423 bool OpTracker::check_ops_in_flight(std::string* summary,
424 std::vector<string> &warnings,
425 int *num_slow_ops)
426 {
427 const utime_t now = ceph_clock_now();
428 auto too_old = now;
429 too_old -= complaint_time;
430 int warned = 0;
431 utime_t oldest_secs;
432 auto warn_on_slow_op = [&](TrackedOp& op) {
433 stringstream ss;
434 utime_t age = now - op.get_initiated();
435 ss << "slow request " << age << " seconds old, received at "
436 << op.get_initiated() << ": " << op.get_desc()
437 << " currently "
438 << op.state_string();
439 warnings.push_back(ss.str());
440 // only those that have been shown will backoff
441 op.warn_interval_multiplier *= 2;
442 };
443 int slow = 0;
444 if (with_slow_ops_in_flight(&oldest_secs, &slow, &warned, warn_on_slow_op) &&
445 slow > 0) {
446 stringstream ss;
447 ss << slow << " slow requests, "
448 << warned << " included below; oldest blocked for > "
449 << oldest_secs << " secs";
450 *summary = ss.str();
451 if (num_slow_ops) {
452 *num_slow_ops = slow;
453 }
454 return true;
455 } else {
456 return false;
457 }
458 }
459
460 void OpTracker::get_age_ms_histogram(pow2_hist_t *h)
461 {
462 h->clear();
463 utime_t now = ceph_clock_now();
464
465 for (uint32_t iter = 0; iter < num_optracker_shards; iter++) {
466 ShardedTrackingData* sdata = sharded_in_flight_list[iter];
467 ceph_assert(NULL != sdata);
468 std::lock_guard locker(sdata->ops_in_flight_lock_sharded);
469
470 for (auto& i : sdata->ops_in_flight_sharded) {
471 utime_t age = now - i.get_initiated();
472 uint32_t ms = (long)(age * 1000.0);
473 h->add(ms);
474 }
475 }
476 }
477
478
479 #undef dout_context
480 #define dout_context tracker->cct
481
482 void TrackedOp::mark_event(std::string_view event, utime_t stamp)
483 {
484 if (!state)
485 return;
486
487 {
488 std::lock_guard l(lock);
489 events.emplace_back(stamp, event);
490 }
491 dout(6) << " seq: " << seq
492 << ", time: " << stamp
493 << ", event: " << event
494 << ", op: " << get_desc()
495 << dendl;
496 _event_marked();
497 }
498
499 void TrackedOp::dump(utime_t now, Formatter *f, OpTracker::dumper lambda) const
500 {
501 // Ignore if still in the constructor
502 if (!state)
503 return;
504 f->dump_string("description", get_desc());
505 f->dump_stream("initiated_at") << get_initiated();
506 f->dump_float("age", now - get_initiated());
507 f->dump_float("duration", get_duration());
508 {
509 f->open_object_section("type_data");
510 lambda(*this, f);
511 f->close_section();
512 }
513 }