]> git.proxmox.com Git - ceph.git/blame - ceph/src/common/TrackedOp.cc
update sources to v12.1.0
[ceph.git] / ceph / src / common / TrackedOp.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * This is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License version 2.1, as published by the Free Software
9 * Foundation. See file COPYING.
10 * Copyright 2013 Inktank
11 */
12
13#include "TrackedOp.h"
7c673cae
FG
14
15#define dout_context cct
16#define dout_subsys ceph_subsys_optracker
17#undef dout_prefix
18#define dout_prefix _prefix(_dout)
19
20static ostream& _prefix(std::ostream* _dout)
21{
22 return *_dout << "-- op tracker -- ";
23}
24
25void OpHistory::on_shutdown()
26{
27 Mutex::Locker history_lock(ops_history_lock);
28 arrived.clear();
29 duration.clear();
30 slow_op.clear();
31 shutdown = true;
32}
33
34void OpHistory::insert(utime_t now, TrackedOpRef op)
35{
36 Mutex::Locker history_lock(ops_history_lock);
37 if (shutdown)
38 return;
39 duration.insert(make_pair(op->get_duration(), op));
40 arrived.insert(make_pair(op->get_initiated(), op));
41 if (op->get_duration() >= history_slow_op_threshold)
42 slow_op.insert(make_pair(op->get_initiated(), op));
43 cleanup(now);
44}
45
46void OpHistory::cleanup(utime_t now)
47{
48 while (arrived.size() &&
49 (now - arrived.begin()->first >
50 (double)(history_duration))) {
51 duration.erase(make_pair(
52 arrived.begin()->second->get_duration(),
53 arrived.begin()->second));
54 arrived.erase(arrived.begin());
55 }
56
57 while (duration.size() > history_size) {
58 arrived.erase(make_pair(
59 duration.begin()->second->get_initiated(),
60 duration.begin()->second));
61 duration.erase(duration.begin());
62 }
63
64 while (slow_op.size() > history_slow_op_size) {
65 slow_op.erase(make_pair(
66 slow_op.begin()->second->get_initiated(),
67 slow_op.begin()->second));
68 }
69}
70
71void OpHistory::dump_ops(utime_t now, Formatter *f)
72{
73 Mutex::Locker history_lock(ops_history_lock);
74 cleanup(now);
75 f->open_object_section("op_history");
76 f->dump_int("size", history_size);
77 f->dump_int("duration", history_duration);
78 {
79 f->open_array_section("ops");
80 for (set<pair<utime_t, TrackedOpRef> >::const_iterator i =
81 arrived.begin();
82 i != arrived.end();
83 ++i) {
84 f->open_object_section("op");
85 i->second->dump(now, f);
86 f->close_section();
87 }
88 f->close_section();
89 }
90 f->close_section();
91}
92
93void OpHistory::dump_ops_by_duration(utime_t now, Formatter *f)
94{
95 Mutex::Locker history_lock(ops_history_lock);
96 cleanup(now);
97 f->open_object_section("op_history");
98 f->dump_int("size", history_size);
99 f->dump_int("duration", history_duration);
100 {
101 f->open_array_section("ops");
102 if (arrived.size()) {
103 vector<pair<double, TrackedOpRef> > durationvec;
104 durationvec.reserve(arrived.size());
105
106 for (set<pair<utime_t, TrackedOpRef> >::const_iterator i =
107 arrived.begin();
108 i != arrived.end();
109 ++i) {
110 durationvec.push_back(pair<double, TrackedOpRef>(i->second->get_duration(), i->second));
111 }
112
113 sort(durationvec.begin(), durationvec.end());
114
115 for (auto i = durationvec.rbegin(); i != durationvec.rend(); ++i) {
116 f->open_object_section("op");
117 i->second->dump(now, f);
118 f->close_section();
119 }
120 }
121 f->close_section();
122 }
123 f->close_section();
124}
125
126struct ShardedTrackingData {
127 Mutex ops_in_flight_lock_sharded;
128 TrackedOp::tracked_op_list_t ops_in_flight_sharded;
129 explicit ShardedTrackingData(string lock_name):
130 ops_in_flight_lock_sharded(lock_name.c_str()) {}
131};
132
133OpTracker::OpTracker(CephContext *cct_, bool tracking, uint32_t num_shards):
134 seq(0),
135 num_optracker_shards(num_shards),
136 complaint_time(0), log_threshold(0),
137 tracking_enabled(tracking),
138 lock("OpTracker::lock"), cct(cct_) {
139 for (uint32_t i = 0; i < num_optracker_shards; i++) {
140 char lock_name[32] = {0};
141 snprintf(lock_name, sizeof(lock_name), "%s:%d", "OpTracker::ShardedLock", i);
142 ShardedTrackingData* one_shard = new ShardedTrackingData(lock_name);
143 sharded_in_flight_list.push_back(one_shard);
144 }
145}
146
147OpTracker::~OpTracker() {
148 while (!sharded_in_flight_list.empty()) {
149 assert((sharded_in_flight_list.back())->ops_in_flight_sharded.empty());
150 delete sharded_in_flight_list.back();
151 sharded_in_flight_list.pop_back();
152 }
153}
154
155bool OpTracker::dump_historic_ops(Formatter *f, bool by_duration)
156{
157 RWLock::RLocker l(lock);
158 if (!tracking_enabled)
159 return false;
160
161 utime_t now = ceph_clock_now();
162 if (by_duration) {
163 history.dump_ops_by_duration(now, f);
164 } else {
165 history.dump_ops(now, f);
166 }
167 return true;
168}
169
170void OpHistory::dump_slow_ops(utime_t now, Formatter *f)
171{
172 Mutex::Locker history_lock(ops_history_lock);
173 cleanup(now);
174 f->open_object_section("OpHistory slow ops");
175 f->dump_int("num to keep", history_slow_op_size);
176 f->dump_int("threshold to keep", history_slow_op_threshold);
177 {
178 f->open_array_section("Ops");
179 for (set<pair<utime_t, TrackedOpRef> >::const_iterator i =
180 slow_op.begin();
181 i != slow_op.end();
182 ++i) {
183 f->open_object_section("Op");
184 i->second->dump(now, f);
185 f->close_section();
186 }
187 f->close_section();
188 }
189 f->close_section();
190}
191
192bool OpTracker::dump_historic_slow_ops(Formatter *f)
193{
194 RWLock::RLocker l(lock);
195 if (!tracking_enabled)
196 return false;
197
198 utime_t now = ceph_clock_now();
199 history.dump_slow_ops(now, f);
200 return true;
201}
202
203bool OpTracker::dump_ops_in_flight(Formatter *f, bool print_only_blocked)
204{
205 RWLock::RLocker l(lock);
206 if (!tracking_enabled)
207 return false;
208
209 f->open_object_section("ops_in_flight"); // overall dump
210 uint64_t total_ops_in_flight = 0;
211 f->open_array_section("ops"); // list of TrackedOps
212 utime_t now = ceph_clock_now();
213 for (uint32_t i = 0; i < num_optracker_shards; i++) {
214 ShardedTrackingData* sdata = sharded_in_flight_list[i];
215 assert(NULL != sdata);
216 Mutex::Locker locker(sdata->ops_in_flight_lock_sharded);
217 for (auto& op : sdata->ops_in_flight_sharded) {
218 if (print_only_blocked && (now - op.get_initiated() <= complaint_time))
219 break;
220 f->open_object_section("op");
221 op.dump(now, f);
222 f->close_section(); // this TrackedOp
223 total_ops_in_flight++;
224 }
225 }
226 f->close_section(); // list of TrackedOps
227 if (print_only_blocked) {
228 f->dump_float("complaint_time", complaint_time);
229 f->dump_int("num_blocked_ops", total_ops_in_flight);
230 } else
231 f->dump_int("num_ops", total_ops_in_flight);
232 f->close_section(); // overall dump
233 return true;
234}
235
236bool OpTracker::register_inflight_op(TrackedOp *i)
237{
238 RWLock::RLocker l(lock);
239 if (!tracking_enabled)
240 return false;
241
31f18b77 242 uint64_t current_seq = ++seq;
7c673cae
FG
243 uint32_t shard_index = current_seq % num_optracker_shards;
244 ShardedTrackingData* sdata = sharded_in_flight_list[shard_index];
245 assert(NULL != sdata);
246 {
247 Mutex::Locker locker(sdata->ops_in_flight_lock_sharded);
248 sdata->ops_in_flight_sharded.push_back(*i);
249 i->seq = current_seq;
250 }
251 return true;
252}
253
254void OpTracker::unregister_inflight_op(TrackedOp *i)
255{
256 // caller checks;
257 assert(i->state);
258
259 uint32_t shard_index = i->seq % num_optracker_shards;
260 ShardedTrackingData* sdata = sharded_in_flight_list[shard_index];
261 assert(NULL != sdata);
262 {
263 Mutex::Locker locker(sdata->ops_in_flight_lock_sharded);
264 auto p = sdata->ops_in_flight_sharded.iterator_to(*i);
265 sdata->ops_in_flight_sharded.erase(p);
266 }
267 i->_unregistered();
268
269 RWLock::RLocker l(lock);
270 if (!tracking_enabled)
271 delete i;
272 else {
273 i->state = TrackedOp::STATE_HISTORY;
274 utime_t now = ceph_clock_now();
275 history.insert(now, TrackedOpRef(i));
276 }
277}
278
279bool OpTracker::check_ops_in_flight(std::vector<string> &warning_vector, int *slow)
280{
281 RWLock::RLocker l(lock);
282 if (!tracking_enabled)
283 return false;
284
285 utime_t now = ceph_clock_now();
286 utime_t too_old = now;
287 too_old -= complaint_time;
288 utime_t oldest_op = now;
289 uint64_t total_ops_in_flight = 0;
290
291 for (uint32_t i = 0; i < num_optracker_shards; i++) {
292 ShardedTrackingData* sdata = sharded_in_flight_list[i];
293 assert(NULL != sdata);
294 Mutex::Locker locker(sdata->ops_in_flight_lock_sharded);
295 if (!sdata->ops_in_flight_sharded.empty()) {
296 utime_t oldest_op_tmp =
297 sdata->ops_in_flight_sharded.front().get_initiated();
298 if (oldest_op_tmp < oldest_op) {
299 oldest_op = oldest_op_tmp;
300 }
301 }
302 total_ops_in_flight += sdata->ops_in_flight_sharded.size();
303 }
304
305 if (0 == total_ops_in_flight)
306 return false;
307
308 utime_t oldest_secs = now - oldest_op;
309
310 dout(10) << "ops_in_flight.size: " << total_ops_in_flight
311 << "; oldest is " << oldest_secs
312 << " seconds old" << dendl;
313
314 if (oldest_secs < complaint_time)
315 return false;
316
317 warning_vector.reserve(log_threshold + 1);
318 //store summary message
319 warning_vector.push_back("");
320
321 int _slow = 0; // total slow
322 if (!slow)
323 slow = &_slow;
324 else
325 *slow = _slow; // start from 0 anyway
326 int warned = 0; // total logged
327 for (uint32_t iter = 0; iter < num_optracker_shards; iter++) {
328 ShardedTrackingData* sdata = sharded_in_flight_list[iter];
329 assert(NULL != sdata);
330 Mutex::Locker locker(sdata->ops_in_flight_lock_sharded);
331 if (sdata->ops_in_flight_sharded.empty())
332 continue;
333 auto i = sdata->ops_in_flight_sharded.begin();
334 while (i != sdata->ops_in_flight_sharded.end() &&
335 i->get_initiated() < too_old) {
336 (*slow)++;
337
338 // exponential backoff of warning intervals
339 if (warned < log_threshold &&
340 (i->get_initiated() + (complaint_time *
341 i->warn_interval_multiplier)) < now) {
342 // will warn, increase counter
343 warned++;
344
345 utime_t age = now - i->get_initiated();
346 stringstream ss;
347 ss << "slow request " << age << " seconds old, received at "
348 << i->get_initiated() << ": " << i->get_desc()
349 << " currently "
350 << (i->current ? i->current : i->state_string());
351 warning_vector.push_back(ss.str());
352
353 // only those that have been shown will backoff
354 i->warn_interval_multiplier *= 2;
355 }
356 ++i;
357 }
358 }
359
360 // only summarize if we warn about any. if everything has backed
361 // off, we will stay silent.
362 if (warned > 0) {
363 stringstream ss;
364 ss << *slow << " slow requests, " << warned << " included below; oldest blocked for > "
365 << oldest_secs << " secs";
366 warning_vector[0] = ss.str();
367 }
368
369 return warned > 0;
370}
371
372void OpTracker::get_age_ms_histogram(pow2_hist_t *h)
373{
374 h->clear();
375 utime_t now = ceph_clock_now();
376
377 for (uint32_t iter = 0; iter < num_optracker_shards; iter++) {
378 ShardedTrackingData* sdata = sharded_in_flight_list[iter];
379 assert(NULL != sdata);
380 Mutex::Locker locker(sdata->ops_in_flight_lock_sharded);
381
382 for (auto& i : sdata->ops_in_flight_sharded) {
383 utime_t age = now - i.get_initiated();
384 uint32_t ms = (long)(age * 1000.0);
385 h->add(ms);
386 }
387 }
388}
389
390
391#undef dout_context
392#define dout_context tracker->cct
393
394void TrackedOp::mark_event_string(const string &event, utime_t stamp)
395{
396 if (!state)
397 return;
398
399 {
400 Mutex::Locker l(lock);
401 events.push_back(Event(stamp, event));
402 current = events.back().c_str();
403 }
404 dout(6) << "seq: " << seq
405 << ", time: " << stamp
406 << ", event: " << event
407 << ", op: " << get_desc()
408 << dendl;
409 _event_marked();
410}
411
412void TrackedOp::mark_event(const char *event, utime_t stamp)
413{
414 if (!state)
415 return;
416
417 {
418 Mutex::Locker l(lock);
419 events.push_back(Event(stamp, event));
420 current = event;
421 }
422 dout(6) << "seq: " << seq
423 << ", time: " << stamp
424 << ", event: " << event
425 << ", op: " << get_desc()
426 << dendl;
427 _event_marked();
428}
429
430void TrackedOp::dump(utime_t now, Formatter *f) const
431{
432 // Ignore if still in the constructor
433 if (!state)
434 return;
435 f->dump_string("description", get_desc());
436 f->dump_stream("initiated_at") << get_initiated();
437 f->dump_float("age", now - get_initiated());
438 f->dump_float("duration", get_duration());
439 {
440 f->open_array_section("type_data");
441 _dump(f);
442 f->close_section();
443 }
444}