]> git.proxmox.com Git - ceph.git/blame - ceph/src/mds/PurgeQueue.cc
update download target update for octopus release
[ceph.git] / ceph / src / mds / PurgeQueue.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2015 Red Hat
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include "common/debug.h"
16#include "mds/mdstypes.h"
17#include "mds/CInode.h"
18#include "mds/MDCache.h"
19
20#include "PurgeQueue.h"
21
f64942e4 22#include <string.h>
7c673cae
FG
23
24#define dout_context cct
25#define dout_subsys ceph_subsys_mds
26#undef dout_prefix
27#define dout_prefix _prefix(_dout, rank) << __func__ << ": "
28static ostream& _prefix(std::ostream *_dout, mds_rank_t rank) {
29 return *_dout << "mds." << rank << ".purge_queue ";
30}
31
11fdf7f2
TL
32const std::map<std::string, PurgeItem::Action> PurgeItem::actions = {
33 {"NONE", PurgeItem::NONE},
34 {"PURGE_FILE", PurgeItem::PURGE_FILE},
35 {"TRUNCATE_FILE", PurgeItem::TRUNCATE_FILE},
36 {"PURGE_DIR", PurgeItem::PURGE_DIR}
37};
38
7c673cae
FG
39void PurgeItem::encode(bufferlist &bl) const
40{
11fdf7f2
TL
41 ENCODE_START(2, 1, bl);
42 encode((uint8_t)action, bl);
43 encode(ino, bl);
44 encode(size, bl);
45 encode(layout, bl, CEPH_FEATURE_FS_FILE_LAYOUT_V2);
46 encode(old_pools, bl);
47 encode(snapc, bl);
48 encode(fragtree, bl);
49 encode(stamp, bl);
50 uint8_t static const pad = 0xff;
51 for (unsigned int i = 0; i<pad_size; i++) {
52 encode(pad, bl);
53 }
7c673cae
FG
54 ENCODE_FINISH(bl);
55}
56
11fdf7f2 57void PurgeItem::decode(bufferlist::const_iterator &p)
7c673cae 58{
11fdf7f2
TL
59 DECODE_START(2, p);
60 decode((uint8_t&)action, p);
61 decode(ino, p);
62 decode(size, p);
63 decode(layout, p);
64 decode(old_pools, p);
65 decode(snapc, p);
66 decode(fragtree, p);
67 if (struct_v >= 2) {
68 decode(stamp, p);
69 }
7c673cae
FG
70 DECODE_FINISH(p);
71}
72
73// TODO: if Objecter has any slow requests, take that as a hint and
74// slow down our rate of purging (keep accepting pushes though)
75PurgeQueue::PurgeQueue(
76 CephContext *cct_,
77 mds_rank_t rank_,
78 const int64_t metadata_pool_,
79 Objecter *objecter_,
80 Context *on_error_)
81 :
82 cct(cct_),
83 rank(rank_),
84 lock("PurgeQueue"),
85 metadata_pool(metadata_pool_),
86 finisher(cct, "PurgeQueue", "PQ_Finisher"),
87 timer(cct, lock),
88 filer(objecter_, &finisher),
89 objecter(objecter_),
90 journaler("pq", MDS_INO_PURGE_QUEUE + rank, metadata_pool,
91 CEPH_FS_ONDISK_MAGIC, objecter_, nullptr, 0,
92 &finisher),
93 on_error(on_error_),
94 ops_in_flight(0),
95 max_purge_ops(0),
96 drain_initial(0),
97 draining(false),
3efd9988
FG
98 delayed_flush(nullptr),
99 recovered(false)
7c673cae 100{
11fdf7f2
TL
101 ceph_assert(cct != nullptr);
102 ceph_assert(on_error != nullptr);
103 ceph_assert(objecter != nullptr);
7c673cae
FG
104 journaler.set_write_error_handler(on_error);
105}
106
107PurgeQueue::~PurgeQueue()
108{
109 if (logger) {
110 g_ceph_context->get_perfcounters_collection()->remove(logger.get());
111 }
f64942e4 112 delete on_error;
7c673cae
FG
113}
114
115void PurgeQueue::create_logger()
116{
91327a77
AA
117 PerfCountersBuilder pcb(g_ceph_context, "purge_queue", l_pq_first, l_pq_last);
118
119 pcb.add_u64_counter(l_pq_executed, "pq_executed", "Purge queue tasks executed",
120 "purg", PerfCountersBuilder::PRIO_INTERESTING);
121
122 pcb.set_prio_default(PerfCountersBuilder::PRIO_USEFUL);
7c673cae 123 pcb.add_u64(l_pq_executing_ops, "pq_executing_ops", "Purge queue ops in flight");
92f5a8d4 124 pcb.add_u64(l_pq_executing_ops_high_water, "pq_executing_ops_high_water", "Maximum number of executing file purge ops");
7c673cae 125 pcb.add_u64(l_pq_executing, "pq_executing", "Purge queue tasks in flight");
92f5a8d4 126 pcb.add_u64(l_pq_executing_high_water, "pq_executing_high_water", "Maximum number of executing file purges");
7c673cae
FG
127
128 logger.reset(pcb.create_perf_counters());
129 g_ceph_context->get_perfcounters_collection()->add(logger.get());
130}
131
132void PurgeQueue::init()
133{
11fdf7f2 134 std::lock_guard l(lock);
7c673cae 135
11fdf7f2 136 ceph_assert(logger != nullptr);
7c673cae
FG
137
138 finisher.start();
139 timer.init();
140}
141
c07f9fc5
FG
142void PurgeQueue::activate()
143{
11fdf7f2 144 std::lock_guard l(lock);
f64942e4
AA
145
146 if (readonly) {
147 dout(10) << "skipping activate: PurgeQueue is readonly" << dendl;
148 return;
149 }
150
c07f9fc5
FG
151 if (journaler.get_read_pos() == journaler.get_write_pos())
152 return;
153
154 if (in_flight.empty()) {
155 dout(4) << "start work (by drain)" << dendl;
156 finisher.queue(new FunctionContext([this](int r) {
11fdf7f2 157 std::lock_guard l(lock);
c07f9fc5
FG
158 _consume();
159 }));
160 }
161}
162
7c673cae
FG
163void PurgeQueue::shutdown()
164{
11fdf7f2 165 std::lock_guard l(lock);
7c673cae
FG
166
167 journaler.shutdown();
168 timer.shutdown();
169 finisher.stop();
170}
171
172void PurgeQueue::open(Context *completion)
173{
174 dout(4) << "opening" << dendl;
175
11fdf7f2 176 std::lock_guard l(lock);
7c673cae 177
3efd9988
FG
178 if (completion)
179 waiting_for_recovery.push_back(completion);
180
181 journaler.recover(new FunctionContext([this](int r){
7c673cae
FG
182 if (r == -ENOENT) {
183 dout(1) << "Purge Queue not found, assuming this is an upgrade and "
184 "creating it." << dendl;
3efd9988 185 create(NULL);
7c673cae 186 } else if (r == 0) {
11fdf7f2 187 std::lock_guard l(lock);
7c673cae
FG
188 dout(4) << "open complete" << dendl;
189
190 // Journaler only guarantees entries before head write_pos have been
191 // fully flushed. Before appending new entries, we need to find and
192 // drop any partial written entry.
193 if (journaler.last_committed.write_pos < journaler.get_write_pos()) {
194 dout(4) << "recovering write_pos" << dendl;
195 journaler.set_read_pos(journaler.last_committed.write_pos);
3efd9988 196 _recover();
7c673cae
FG
197 return;
198 }
199
200 journaler.set_writeable();
3efd9988
FG
201 recovered = true;
202 finish_contexts(g_ceph_context, waiting_for_recovery);
7c673cae
FG
203 } else {
204 derr << "Error " << r << " loading Journaler" << dendl;
f64942e4 205 _go_readonly(r);
7c673cae
FG
206 }
207 }));
208}
209
3efd9988
FG
210void PurgeQueue::wait_for_recovery(Context* c)
211{
11fdf7f2 212 std::lock_guard l(lock);
f64942e4 213 if (recovered) {
3efd9988 214 c->complete(0);
f64942e4
AA
215 } else if (readonly) {
216 dout(10) << "cannot wait for recovery: PurgeQueue is readonly" << dendl;
217 c->complete(-EROFS);
218 } else {
3efd9988 219 waiting_for_recovery.push_back(c);
f64942e4 220 }
3efd9988 221}
7c673cae 222
3efd9988 223void PurgeQueue::_recover()
7c673cae 224{
11fdf7f2 225 ceph_assert(lock.is_locked_by_me());
7c673cae
FG
226
227 // Journaler::is_readable() adjusts write_pos if partial entry is encountered
228 while (1) {
229 if (!journaler.is_readable() &&
230 !journaler.get_error() &&
231 journaler.get_read_pos() < journaler.get_write_pos()) {
3efd9988 232 journaler.wait_for_readable(new FunctionContext([this](int r) {
11fdf7f2 233 std::lock_guard l(lock);
3efd9988 234 _recover();
7c673cae
FG
235 }));
236 return;
237 }
238
239 if (journaler.get_error()) {
240 int r = journaler.get_error();
241 derr << "Error " << r << " recovering write_pos" << dendl;
f64942e4 242 _go_readonly(r);
7c673cae
FG
243 return;
244 }
245
246 if (journaler.get_read_pos() == journaler.get_write_pos()) {
247 dout(4) << "write_pos recovered" << dendl;
248 // restore original read_pos
249 journaler.set_read_pos(journaler.last_committed.expire_pos);
250 journaler.set_writeable();
3efd9988
FG
251 recovered = true;
252 finish_contexts(g_ceph_context, waiting_for_recovery);
7c673cae
FG
253 return;
254 }
255
256 bufferlist bl;
257 bool readable = journaler.try_read_entry(bl);
11fdf7f2 258 ceph_assert(readable); // we checked earlier
7c673cae
FG
259 }
260}
261
262void PurgeQueue::create(Context *fin)
263{
264 dout(4) << "creating" << dendl;
11fdf7f2 265 std::lock_guard l(lock);
7c673cae 266
3efd9988
FG
267 if (fin)
268 waiting_for_recovery.push_back(fin);
269
7c673cae
FG
270 file_layout_t layout = file_layout_t::get_default();
271 layout.pool_id = metadata_pool;
272 journaler.set_writeable();
273 journaler.create(&layout, JOURNAL_FORMAT_RESILIENT);
3efd9988 274 journaler.write_head(new FunctionContext([this](int r) {
11fdf7f2 275 std::lock_guard l(lock);
f64942e4
AA
276 if (r) {
277 _go_readonly(r);
278 } else {
279 recovered = true;
280 finish_contexts(g_ceph_context, waiting_for_recovery);
281 }
3efd9988 282 }));
7c673cae
FG
283}
284
285/**
286 * The `completion` context will always be called back via a Finisher
287 */
288void PurgeQueue::push(const PurgeItem &pi, Context *completion)
289{
11fdf7f2
TL
290 dout(4) << "pushing inode " << pi.ino << dendl;
291 std::lock_guard l(lock);
7c673cae 292
f64942e4
AA
293 if (readonly) {
294 dout(10) << "cannot push inode: PurgeQueue is readonly" << dendl;
295 completion->complete(-EROFS);
296 return;
297 }
298
7c673cae 299 // Callers should have waited for open() before using us
11fdf7f2 300 ceph_assert(!journaler.is_readonly());
7c673cae
FG
301
302 bufferlist bl;
303
11fdf7f2 304 encode(pi, bl);
7c673cae
FG
305 journaler.append_entry(bl);
306 journaler.wait_for_flush(completion);
307
308 // Maybe go ahead and do something with it right away
309 bool could_consume = _consume();
310 if (!could_consume) {
311 // Usually, it is not necessary to explicitly flush here, because the reader
312 // will get flushes generated inside Journaler::is_readable. However,
f64942e4 313 // if we remain in a _can_consume()==false state for a long period then
7c673cae
FG
314 // we should flush in order to allow MDCache to drop its strays rather
315 // than having them wait for purgequeue to progress.
316 if (!delayed_flush) {
317 delayed_flush = new FunctionContext([this](int r){
318 delayed_flush = nullptr;
319 journaler.flush();
320 });
321
322 timer.add_event_after(
11fdf7f2 323 g_conf()->mds_purge_queue_busy_flush_period,
7c673cae
FG
324 delayed_flush);
325 }
326 }
327}
328
329uint32_t PurgeQueue::_calculate_ops(const PurgeItem &item) const
330{
331 uint32_t ops_required = 0;
332 if (item.action == PurgeItem::PURGE_DIR) {
333 // Directory, count dirfrags to be deleted
11fdf7f2 334 frag_vec_t leaves;
7c673cae 335 if (!item.fragtree.is_leaf(frag_t())) {
11fdf7f2 336 item.fragtree.get_leaves(leaves);
7c673cae
FG
337 }
338 // One for the root, plus any leaves
11fdf7f2 339 ops_required = 1 + leaves.size();
7c673cae
FG
340 } else {
341 // File, work out concurrent Filer::purge deletes
342 const uint64_t num = (item.size > 0) ?
343 Striper::get_num_objects(item.layout, item.size) : 1;
344
11fdf7f2 345 ops_required = std::min(num, g_conf()->filer_max_purge_ops);
7c673cae
FG
346
347 // Account for removing (or zeroing) backtrace
348 ops_required += 1;
349
350 // Account for deletions for old pools
351 if (item.action != PurgeItem::TRUNCATE_FILE) {
352 ops_required += item.old_pools.size();
353 }
354 }
355
356 return ops_required;
357}
358
f64942e4 359bool PurgeQueue::_can_consume()
7c673cae 360{
f64942e4
AA
361 if (readonly) {
362 dout(10) << "can't consume: PurgeQueue is readonly" << dendl;
363 return false;
364 }
365
7c673cae 366 dout(20) << ops_in_flight << "/" << max_purge_ops << " ops, "
11fdf7f2 367 << in_flight.size() << "/" << g_conf()->mds_max_purge_files
7c673cae
FG
368 << " files" << dendl;
369
370 if (in_flight.size() == 0 && cct->_conf->mds_max_purge_files > 0) {
371 // Always permit consumption if nothing is in flight, so that the ops
372 // limit can never be so low as to forbid all progress (unless
373 // administrator has deliberately paused purging by setting max
374 // purge files to zero).
375 return true;
376 }
377
378 if (ops_in_flight >= max_purge_ops) {
379 dout(20) << "Throttling on op limit " << ops_in_flight << "/"
380 << max_purge_ops << dendl;
381 return false;
382 }
383
384 if (in_flight.size() >= cct->_conf->mds_max_purge_files) {
385 dout(20) << "Throttling on item limit " << in_flight.size()
386 << "/" << cct->_conf->mds_max_purge_files << dendl;
387 return false;
388 } else {
389 return true;
390 }
391}
392
f64942e4
AA
393void PurgeQueue::_go_readonly(int r)
394{
395 if (readonly) return;
396 dout(1) << "going readonly because internal IO failed: " << strerror(-r) << dendl;
397 readonly = true;
398 on_error->complete(r);
399 on_error = nullptr;
400 journaler.set_readonly();
401 finish_contexts(g_ceph_context, waiting_for_recovery, r);
402}
403
7c673cae
FG
404bool PurgeQueue::_consume()
405{
11fdf7f2 406 ceph_assert(lock.is_locked_by_me());
7c673cae
FG
407
408 bool could_consume = false;
f64942e4 409 while(_can_consume()) {
7c673cae
FG
410
411 if (delayed_flush) {
412 // We are now going to read from the journal, so any proactive
413 // flush is no longer necessary. This is not functionally necessary
414 // but it can avoid generating extra fragmented flush IOs.
415 timer.cancel_event(delayed_flush);
416 delayed_flush = nullptr;
417 }
418
1adf2230
AA
419 if (int r = journaler.get_error()) {
420 derr << "Error " << r << " recovering write_pos" << dendl;
f64942e4 421 _go_readonly(r);
1adf2230
AA
422 return could_consume;
423 }
424
7c673cae
FG
425 if (!journaler.is_readable()) {
426 dout(10) << " not readable right now" << dendl;
427 // Because we are the writer and the reader of the journal
428 // via the same Journaler instance, we never need to reread_head
429 if (!journaler.have_waiter()) {
430 journaler.wait_for_readable(new FunctionContext([this](int r) {
11fdf7f2 431 std::lock_guard l(lock);
7c673cae
FG
432 if (r == 0) {
433 _consume();
1adf2230 434 } else if (r != -EAGAIN) {
f64942e4 435 _go_readonly(r);
7c673cae
FG
436 }
437 }));
438 }
439
440 return could_consume;
441 }
442
28e407b8 443 could_consume = true;
7c673cae
FG
444 // The journaler is readable: consume an entry
445 bufferlist bl;
446 bool readable = journaler.try_read_entry(bl);
11fdf7f2 447 ceph_assert(readable); // we checked earlier
7c673cae
FG
448
449 dout(20) << " decoding entry" << dendl;
450 PurgeItem item;
11fdf7f2 451 auto q = bl.cbegin();
7c673cae 452 try {
11fdf7f2 453 decode(item, q);
7c673cae
FG
454 } catch (const buffer::error &err) {
455 derr << "Decode error at read_pos=0x" << std::hex
456 << journaler.get_read_pos() << dendl;
f64942e4 457 _go_readonly(EIO);
7c673cae 458 }
11fdf7f2 459 dout(20) << " executing item (" << item.ino << ")" << dendl;
7c673cae
FG
460 _execute_item(item, journaler.get_read_pos());
461 }
462
463 dout(10) << " cannot consume right now" << dendl;
464
465 return could_consume;
466}
467
468void PurgeQueue::_execute_item(
469 const PurgeItem &item,
470 uint64_t expire_to)
471{
11fdf7f2 472 ceph_assert(lock.is_locked_by_me());
7c673cae
FG
473
474 in_flight[expire_to] = item;
475 logger->set(l_pq_executing, in_flight.size());
92f5a8d4
TL
476 files_high_water = std::max(files_high_water, in_flight.size());
477 logger->set(l_pq_executing_high_water, files_high_water);
f64942e4
AA
478 auto ops = _calculate_ops(item);
479 ops_in_flight += ops;
7c673cae 480 logger->set(l_pq_executing_ops, ops_in_flight);
92f5a8d4
TL
481 ops_high_water = std::max(ops_high_water, ops_in_flight);
482 logger->set(l_pq_executing_ops_high_water, ops_high_water);
7c673cae
FG
483
484 SnapContext nullsnapc;
485
486 C_GatherBuilder gather(cct);
487 if (item.action == PurgeItem::PURGE_FILE) {
488 if (item.size > 0) {
489 uint64_t num = Striper::get_num_objects(item.layout, item.size);
490 dout(10) << " 0~" << item.size << " objects 0~" << num
491 << " snapc " << item.snapc << " on " << item.ino << dendl;
492 filer.purge_range(item.ino, &item.layout, item.snapc,
493 0, num, ceph::real_clock::now(), 0,
494 gather.new_sub());
495 }
496
497 // remove the backtrace object if it was not purged
498 object_t oid = CInode::get_object_name(item.ino, frag_t(), "");
499 if (!gather.has_subs() || !item.layout.pool_ns.empty()) {
500 object_locator_t oloc(item.layout.pool_id);
501 dout(10) << " remove backtrace object " << oid
502 << " pool " << oloc.pool << " snapc " << item.snapc << dendl;
503 objecter->remove(oid, oloc, item.snapc,
504 ceph::real_clock::now(), 0,
505 gather.new_sub());
506 }
507
508 // remove old backtrace objects
509 for (const auto &p : item.old_pools) {
510 object_locator_t oloc(p);
511 dout(10) << " remove backtrace object " << oid
512 << " old pool " << p << " snapc " << item.snapc << dendl;
513 objecter->remove(oid, oloc, item.snapc,
514 ceph::real_clock::now(), 0,
515 gather.new_sub());
516 }
517 } else if (item.action == PurgeItem::PURGE_DIR) {
518 object_locator_t oloc(metadata_pool);
11fdf7f2 519 frag_vec_t leaves;
7c673cae 520 if (!item.fragtree.is_leaf(frag_t()))
11fdf7f2
TL
521 item.fragtree.get_leaves(leaves);
522 leaves.push_back(frag_t());
523 for (const auto &leaf : leaves) {
524 object_t oid = CInode::get_object_name(item.ino, leaf, "");
7c673cae
FG
525 dout(10) << " remove dirfrag " << oid << dendl;
526 objecter->remove(oid, oloc, nullsnapc,
527 ceph::real_clock::now(),
528 0, gather.new_sub());
529 }
530 } else if (item.action == PurgeItem::TRUNCATE_FILE) {
531 const uint64_t num = Striper::get_num_objects(item.layout, item.size);
532 dout(10) << " 0~" << item.size << " objects 0~" << num
533 << " snapc " << item.snapc << " on " << item.ino << dendl;
534
535 // keep backtrace object
536 if (num > 1) {
537 filer.purge_range(item.ino, &item.layout, item.snapc,
538 1, num - 1, ceph::real_clock::now(),
539 0, gather.new_sub());
540 }
541 filer.zero(item.ino, &item.layout, item.snapc,
542 0, item.layout.object_size,
543 ceph::real_clock::now(),
544 0, true, gather.new_sub());
545 } else {
546 derr << "Invalid item (action=" << item.action << ") in purge queue, "
547 "dropping it" << dendl;
f64942e4
AA
548 ops_in_flight -= ops;
549 logger->set(l_pq_executing_ops, ops_in_flight);
92f5a8d4
TL
550 ops_high_water = std::max(ops_high_water, ops_in_flight);
551 logger->set(l_pq_executing_ops_high_water, ops_high_water);
7c673cae
FG
552 in_flight.erase(expire_to);
553 logger->set(l_pq_executing, in_flight.size());
92f5a8d4
TL
554 files_high_water = std::max(files_high_water, in_flight.size());
555 logger->set(l_pq_executing_high_water, files_high_water);
7c673cae
FG
556 return;
557 }
11fdf7f2 558 ceph_assert(gather.has_subs());
7c673cae 559
31f18b77
FG
560 gather.set_finisher(new C_OnFinisher(
561 new FunctionContext([this, expire_to](int r){
11fdf7f2 562 std::lock_guard l(lock);
31f18b77 563 _execute_item_complete(expire_to);
7c673cae 564
31f18b77 565 _consume();
7c673cae
FG
566
567 // Have we gone idle? If so, do an extra write_head now instead of
568 // waiting for next flush after journaler_write_head_interval.
569 // Also do this periodically even if not idle, so that the persisted
570 // expire_pos doesn't fall too far behind our progress when consuming
571 // a very long queue.
572 if (in_flight.empty() || journaler.write_head_needed()) {
f64942e4 573 journaler.write_head(nullptr);
7c673cae 574 }
31f18b77
FG
575 }), &finisher));
576
7c673cae
FG
577 gather.activate();
578}
579
580void PurgeQueue::_execute_item_complete(
581 uint64_t expire_to)
582{
11fdf7f2 583 ceph_assert(lock.is_locked_by_me());
7c673cae 584 dout(10) << "complete at 0x" << std::hex << expire_to << std::dec << dendl;
11fdf7f2 585 ceph_assert(in_flight.count(expire_to) == 1);
7c673cae
FG
586
587 auto iter = in_flight.find(expire_to);
11fdf7f2 588 ceph_assert(iter != in_flight.end());
7c673cae 589 if (iter == in_flight.begin()) {
11fdf7f2
TL
590 uint64_t pos = expire_to;
591 if (!pending_expire.empty()) {
592 auto n = iter;
593 ++n;
594 if (n == in_flight.end()) {
595 pos = *pending_expire.rbegin();
596 pending_expire.clear();
597 } else {
598 auto p = pending_expire.begin();
599 do {
600 if (*p >= n->first)
601 break;
602 pos = *p;
603 pending_expire.erase(p++);
604 } while (p != pending_expire.end());
605 }
606 }
607 dout(10) << "expiring to 0x" << std::hex << pos << std::dec << dendl;
608 journaler.set_expire_pos(pos);
7c673cae
FG
609 } else {
610 // This is completely fine, we're not supposed to purge files in
611 // order when doing them in parallel.
612 dout(10) << "non-sequential completion, not expiring anything" << dendl;
11fdf7f2 613 pending_expire.insert(expire_to);
7c673cae
FG
614 }
615
616 ops_in_flight -= _calculate_ops(iter->second);
617 logger->set(l_pq_executing_ops, ops_in_flight);
92f5a8d4
TL
618 ops_high_water = std::max(ops_high_water, ops_in_flight);
619 logger->set(l_pq_executing_ops_high_water, ops_high_water);
7c673cae 620
11fdf7f2 621 dout(10) << "completed item for ino " << iter->second.ino << dendl;
7c673cae
FG
622
623 in_flight.erase(iter);
624 logger->set(l_pq_executing, in_flight.size());
92f5a8d4
TL
625 files_high_water = std::max(files_high_water, in_flight.size());
626 logger->set(l_pq_executing_high_water, files_high_water);
7c673cae
FG
627 dout(10) << "in_flight.size() now " << in_flight.size() << dendl;
628
629 logger->inc(l_pq_executed);
630}
631
632void PurgeQueue::update_op_limit(const MDSMap &mds_map)
633{
11fdf7f2 634 std::lock_guard l(lock);
7c673cae 635
f64942e4
AA
636 if (readonly) {
637 dout(10) << "skipping; PurgeQueue is readonly" << dendl;
638 return;
639 }
640
7c673cae
FG
641 uint64_t pg_count = 0;
642 objecter->with_osdmap([&](const OSDMap& o) {
643 // Number of PGs across all data pools
31f18b77 644 const std::vector<int64_t> &data_pools = mds_map.get_data_pools();
7c673cae
FG
645 for (const auto dp : data_pools) {
646 if (o.get_pg_pool(dp) == NULL) {
647 // It is possible that we have an older OSDMap than MDSMap,
648 // because we don't start watching every OSDMap until after
649 // MDSRank is initialized
650 dout(4) << " data pool " << dp << " not found in OSDMap" << dendl;
651 continue;
652 }
653 pg_count += o.get_pg_num(dp);
654 }
655 });
656
657 // Work out a limit based on n_pgs / n_mdss, multiplied by the user's
658 // preference for how many ops per PG
659 max_purge_ops = uint64_t(((double)pg_count / (double)mds_map.get_max_mds()) *
660 cct->_conf->mds_max_purge_ops_per_pg);
661
662 // User may also specify a hard limit, apply this if so.
663 if (cct->_conf->mds_max_purge_ops) {
11fdf7f2 664 max_purge_ops = std::min(max_purge_ops, cct->_conf->mds_max_purge_ops);
7c673cae
FG
665 }
666}
667
92f5a8d4 668void PurgeQueue::handle_conf_change(const std::set<std::string>& changed, const MDSMap& mds_map)
7c673cae
FG
669{
670 if (changed.count("mds_max_purge_ops")
671 || changed.count("mds_max_purge_ops_per_pg")) {
672 update_op_limit(mds_map);
673 } else if (changed.count("mds_max_purge_files")) {
11fdf7f2 674 std::lock_guard l(lock);
7c673cae
FG
675 if (in_flight.empty()) {
676 // We might have gone from zero to a finite limit, so
677 // might need to kick off consume.
678 dout(4) << "maybe start work again (max_purge_files="
92f5a8d4 679 << g_conf()->mds_max_purge_files << dendl;
7c673cae 680 finisher.queue(new FunctionContext([this](int r){
11fdf7f2 681 std::lock_guard l(lock);
7c673cae
FG
682 _consume();
683 }));
684 }
685 }
686}
687
688bool PurgeQueue::drain(
689 uint64_t *progress,
690 uint64_t *progress_total,
691 size_t *in_flight_count
692 )
693{
11fdf7f2 694 std::lock_guard l(lock);
f64942e4
AA
695
696 if (readonly) {
697 dout(10) << "skipping drain; PurgeQueue is readonly" << dendl;
698 return true;
699 }
700
11fdf7f2
TL
701 ceph_assert(progress != nullptr);
702 ceph_assert(progress_total != nullptr);
703 ceph_assert(in_flight_count != nullptr);
7c673cae
FG
704
705 const bool done = in_flight.empty() && (
706 journaler.get_read_pos() == journaler.get_write_pos());
707 if (done) {
708 return true;
709 }
710
711 const uint64_t bytes_remaining = journaler.get_write_pos()
712 - journaler.get_read_pos();
713
714 if (!draining) {
715 // Start of draining: remember how much there was outstanding at
716 // this point so that we can give a progress percentage later
717 draining = true;
718
719 // Life the op throttle as this daemon now has nothing to do but
720 // drain the purge queue, so do it as fast as we can.
721 max_purge_ops = 0xffff;
722 }
723
11fdf7f2 724 drain_initial = std::max(bytes_remaining, drain_initial);
7c673cae
FG
725
726 *progress = drain_initial - bytes_remaining;
727 *progress_total = drain_initial;
728 *in_flight_count = in_flight.size();
729
730 return false;
731}
732
11fdf7f2
TL
733std::string_view PurgeItem::get_type_str() const
734{
735 switch(action) {
736 case PurgeItem::NONE: return "NONE";
737 case PurgeItem::PURGE_FILE: return "PURGE_FILE";
738 case PurgeItem::PURGE_DIR: return "PURGE_DIR";
739 case PurgeItem::TRUNCATE_FILE: return "TRUNCATE_FILE";
740 default:
741 return "UNKNOWN";
742 }
743}
744