]> git.proxmox.com Git - ceph.git/blame - ceph/src/mds/PurgeQueue.cc
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / mds / PurgeQueue.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2015 Red Hat
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include "common/debug.h"
16#include "mds/mdstypes.h"
17#include "mds/CInode.h"
18#include "mds/MDCache.h"
19
20#include "PurgeQueue.h"
21
7c673cae
FG
22#define dout_context cct
23#define dout_subsys ceph_subsys_mds
24#undef dout_prefix
25#define dout_prefix _prefix(_dout, rank) << __func__ << ": "
20effc67
TL
26
27using namespace std;
28
7c673cae
FG
29static ostream& _prefix(std::ostream *_dout, mds_rank_t rank) {
30 return *_dout << "mds." << rank << ".purge_queue ";
31}
32
11fdf7f2
TL
33const std::map<std::string, PurgeItem::Action> PurgeItem::actions = {
34 {"NONE", PurgeItem::NONE},
35 {"PURGE_FILE", PurgeItem::PURGE_FILE},
36 {"TRUNCATE_FILE", PurgeItem::TRUNCATE_FILE},
37 {"PURGE_DIR", PurgeItem::PURGE_DIR}
38};
39
7c673cae
FG
40void PurgeItem::encode(bufferlist &bl) const
41{
11fdf7f2
TL
42 ENCODE_START(2, 1, bl);
43 encode((uint8_t)action, bl);
44 encode(ino, bl);
45 encode(size, bl);
46 encode(layout, bl, CEPH_FEATURE_FS_FILE_LAYOUT_V2);
47 encode(old_pools, bl);
48 encode(snapc, bl);
49 encode(fragtree, bl);
50 encode(stamp, bl);
51 uint8_t static const pad = 0xff;
52 for (unsigned int i = 0; i<pad_size; i++) {
53 encode(pad, bl);
54 }
7c673cae
FG
55 ENCODE_FINISH(bl);
56}
57
11fdf7f2 58void PurgeItem::decode(bufferlist::const_iterator &p)
7c673cae 59{
11fdf7f2 60 DECODE_START(2, p);
9f95a23c
TL
61 bool done = false;
62 if (struct_v == 1) {
63 auto p_start = p;
64 try {
65 // bad encoding introduced by v13.2.2
66 decode(stamp, p);
67 decode(pad_size, p);
68 p += pad_size;
f91f0fd5
TL
69 uint8_t raw_action;
70 decode(raw_action, p);
71 action = (Action)raw_action;
9f95a23c
TL
72 decode(ino, p);
73 decode(size, p);
74 decode(layout, p);
75 decode(old_pools, p);
76 decode(snapc, p);
77 decode(fragtree, p);
78 if (p.get_off() > struct_end)
79 throw buffer::end_of_buffer();
80 done = true;
81 } catch (const buffer::error &e) {
82 p = p_start;
83 }
84 }
85 if (!done) {
f91f0fd5
TL
86 uint8_t raw_action;
87 decode(raw_action, p);
88 action = (Action)raw_action;
9f95a23c
TL
89 decode(ino, p);
90 decode(size, p);
91 decode(layout, p);
92 decode(old_pools, p);
93 decode(snapc, p);
94 decode(fragtree, p);
95 if (struct_v >= 2) {
96 decode(stamp, p);
97 }
11fdf7f2 98 }
7c673cae
FG
99 DECODE_FINISH(p);
100}
101
f67539c2
TL
102// if Objecter has any slow requests, take that as a hint and
103// slow down our rate of purging
7c673cae
FG
104PurgeQueue::PurgeQueue(
105 CephContext *cct_,
106 mds_rank_t rank_,
107 const int64_t metadata_pool_,
108 Objecter *objecter_,
109 Context *on_error_)
110 :
111 cct(cct_),
112 rank(rank_),
7c673cae
FG
113 metadata_pool(metadata_pool_),
114 finisher(cct, "PurgeQueue", "PQ_Finisher"),
115 timer(cct, lock),
116 filer(objecter_, &finisher),
117 objecter(objecter_),
118 journaler("pq", MDS_INO_PURGE_QUEUE + rank, metadata_pool,
119 CEPH_FS_ONDISK_MAGIC, objecter_, nullptr, 0,
120 &finisher),
9f95a23c 121 on_error(on_error_)
7c673cae 122{
11fdf7f2
TL
123 ceph_assert(cct != nullptr);
124 ceph_assert(on_error != nullptr);
125 ceph_assert(objecter != nullptr);
7c673cae
FG
126 journaler.set_write_error_handler(on_error);
127}
128
129PurgeQueue::~PurgeQueue()
130{
131 if (logger) {
132 g_ceph_context->get_perfcounters_collection()->remove(logger.get());
133 }
f64942e4 134 delete on_error;
7c673cae
FG
135}
136
137void PurgeQueue::create_logger()
138{
91327a77
AA
139 PerfCountersBuilder pcb(g_ceph_context, "purge_queue", l_pq_first, l_pq_last);
140
141 pcb.add_u64_counter(l_pq_executed, "pq_executed", "Purge queue tasks executed",
142 "purg", PerfCountersBuilder::PRIO_INTERESTING);
143
144 pcb.set_prio_default(PerfCountersBuilder::PRIO_USEFUL);
7c673cae 145 pcb.add_u64(l_pq_executing_ops, "pq_executing_ops", "Purge queue ops in flight");
92f5a8d4 146 pcb.add_u64(l_pq_executing_ops_high_water, "pq_executing_ops_high_water", "Maximum number of executing file purge ops");
7c673cae 147 pcb.add_u64(l_pq_executing, "pq_executing", "Purge queue tasks in flight");
92f5a8d4 148 pcb.add_u64(l_pq_executing_high_water, "pq_executing_high_water", "Maximum number of executing file purges");
9f95a23c 149 pcb.add_u64(l_pq_item_in_journal, "pq_item_in_journal", "Purge item left in journal");
7c673cae
FG
150
151 logger.reset(pcb.create_perf_counters());
152 g_ceph_context->get_perfcounters_collection()->add(logger.get());
153}
154
155void PurgeQueue::init()
156{
11fdf7f2 157 std::lock_guard l(lock);
7c673cae 158
11fdf7f2 159 ceph_assert(logger != nullptr);
7c673cae
FG
160
161 finisher.start();
162 timer.init();
163}
164
c07f9fc5
FG
165void PurgeQueue::activate()
166{
11fdf7f2 167 std::lock_guard l(lock);
f64942e4 168
9f95a23c
TL
169 {
170 PurgeItem item;
171 bufferlist bl;
172
173 // calculate purge item serialized size stored in journal
174 // used to count how many items still left in journal later
175 ::encode(item, bl);
176 purge_item_journal_size = bl.length() + journaler.get_journal_envelope_size();
177 }
178
f64942e4
AA
179 if (readonly) {
180 dout(10) << "skipping activate: PurgeQueue is readonly" << dendl;
181 return;
182 }
183
c07f9fc5
FG
184 if (journaler.get_read_pos() == journaler.get_write_pos())
185 return;
186
187 if (in_flight.empty()) {
188 dout(4) << "start work (by drain)" << dendl;
9f95a23c 189 finisher.queue(new LambdaContext([this](int r) {
11fdf7f2 190 std::lock_guard l(lock);
c07f9fc5
FG
191 _consume();
192 }));
193 }
194}
195
7c673cae
FG
196void PurgeQueue::shutdown()
197{
11fdf7f2 198 std::lock_guard l(lock);
7c673cae
FG
199
200 journaler.shutdown();
201 timer.shutdown();
202 finisher.stop();
203}
204
205void PurgeQueue::open(Context *completion)
206{
207 dout(4) << "opening" << dendl;
208
11fdf7f2 209 std::lock_guard l(lock);
7c673cae 210
3efd9988
FG
211 if (completion)
212 waiting_for_recovery.push_back(completion);
213
9f95a23c 214 journaler.recover(new LambdaContext([this](int r){
f67539c2 215 if (r == -CEPHFS_ENOENT) {
7c673cae
FG
216 dout(1) << "Purge Queue not found, assuming this is an upgrade and "
217 "creating it." << dendl;
3efd9988 218 create(NULL);
7c673cae 219 } else if (r == 0) {
11fdf7f2 220 std::lock_guard l(lock);
7c673cae
FG
221 dout(4) << "open complete" << dendl;
222
223 // Journaler only guarantees entries before head write_pos have been
224 // fully flushed. Before appending new entries, we need to find and
225 // drop any partial written entry.
226 if (journaler.last_committed.write_pos < journaler.get_write_pos()) {
227 dout(4) << "recovering write_pos" << dendl;
228 journaler.set_read_pos(journaler.last_committed.write_pos);
3efd9988 229 _recover();
7c673cae
FG
230 return;
231 }
232
233 journaler.set_writeable();
3efd9988
FG
234 recovered = true;
235 finish_contexts(g_ceph_context, waiting_for_recovery);
7c673cae
FG
236 } else {
237 derr << "Error " << r << " loading Journaler" << dendl;
f64942e4 238 _go_readonly(r);
7c673cae
FG
239 }
240 }));
241}
242
3efd9988
FG
243void PurgeQueue::wait_for_recovery(Context* c)
244{
11fdf7f2 245 std::lock_guard l(lock);
f64942e4 246 if (recovered) {
3efd9988 247 c->complete(0);
f64942e4
AA
248 } else if (readonly) {
249 dout(10) << "cannot wait for recovery: PurgeQueue is readonly" << dendl;
f67539c2 250 c->complete(-CEPHFS_EROFS);
f64942e4 251 } else {
3efd9988 252 waiting_for_recovery.push_back(c);
f64942e4 253 }
3efd9988 254}
7c673cae 255
3efd9988 256void PurgeQueue::_recover()
7c673cae 257{
9f95a23c 258 ceph_assert(ceph_mutex_is_locked_by_me(lock));
7c673cae
FG
259
260 // Journaler::is_readable() adjusts write_pos if partial entry is encountered
261 while (1) {
262 if (!journaler.is_readable() &&
263 !journaler.get_error() &&
264 journaler.get_read_pos() < journaler.get_write_pos()) {
9f95a23c 265 journaler.wait_for_readable(new LambdaContext([this](int r) {
11fdf7f2 266 std::lock_guard l(lock);
3efd9988 267 _recover();
7c673cae
FG
268 }));
269 return;
270 }
271
272 if (journaler.get_error()) {
273 int r = journaler.get_error();
274 derr << "Error " << r << " recovering write_pos" << dendl;
f64942e4 275 _go_readonly(r);
7c673cae
FG
276 return;
277 }
278
279 if (journaler.get_read_pos() == journaler.get_write_pos()) {
280 dout(4) << "write_pos recovered" << dendl;
281 // restore original read_pos
282 journaler.set_read_pos(journaler.last_committed.expire_pos);
283 journaler.set_writeable();
3efd9988
FG
284 recovered = true;
285 finish_contexts(g_ceph_context, waiting_for_recovery);
7c673cae
FG
286 return;
287 }
288
289 bufferlist bl;
290 bool readable = journaler.try_read_entry(bl);
11fdf7f2 291 ceph_assert(readable); // we checked earlier
7c673cae
FG
292 }
293}
294
295void PurgeQueue::create(Context *fin)
296{
297 dout(4) << "creating" << dendl;
11fdf7f2 298 std::lock_guard l(lock);
7c673cae 299
3efd9988
FG
300 if (fin)
301 waiting_for_recovery.push_back(fin);
302
7c673cae
FG
303 file_layout_t layout = file_layout_t::get_default();
304 layout.pool_id = metadata_pool;
305 journaler.set_writeable();
306 journaler.create(&layout, JOURNAL_FORMAT_RESILIENT);
9f95a23c 307 journaler.write_head(new LambdaContext([this](int r) {
11fdf7f2 308 std::lock_guard l(lock);
f64942e4
AA
309 if (r) {
310 _go_readonly(r);
311 } else {
312 recovered = true;
313 finish_contexts(g_ceph_context, waiting_for_recovery);
314 }
3efd9988 315 }));
7c673cae
FG
316}
317
318/**
319 * The `completion` context will always be called back via a Finisher
320 */
321void PurgeQueue::push(const PurgeItem &pi, Context *completion)
322{
11fdf7f2
TL
323 dout(4) << "pushing inode " << pi.ino << dendl;
324 std::lock_guard l(lock);
7c673cae 325
f64942e4
AA
326 if (readonly) {
327 dout(10) << "cannot push inode: PurgeQueue is readonly" << dendl;
f67539c2 328 completion->complete(-CEPHFS_EROFS);
f64942e4
AA
329 return;
330 }
331
7c673cae 332 // Callers should have waited for open() before using us
11fdf7f2 333 ceph_assert(!journaler.is_readonly());
7c673cae
FG
334
335 bufferlist bl;
336
11fdf7f2 337 encode(pi, bl);
7c673cae
FG
338 journaler.append_entry(bl);
339 journaler.wait_for_flush(completion);
340
341 // Maybe go ahead and do something with it right away
342 bool could_consume = _consume();
343 if (!could_consume) {
344 // Usually, it is not necessary to explicitly flush here, because the reader
345 // will get flushes generated inside Journaler::is_readable. However,
f64942e4 346 // if we remain in a _can_consume()==false state for a long period then
7c673cae
FG
347 // we should flush in order to allow MDCache to drop its strays rather
348 // than having them wait for purgequeue to progress.
349 if (!delayed_flush) {
9f95a23c 350 delayed_flush = new LambdaContext([this](int r){
7c673cae
FG
351 delayed_flush = nullptr;
352 journaler.flush();
353 });
354
355 timer.add_event_after(
11fdf7f2 356 g_conf()->mds_purge_queue_busy_flush_period,
7c673cae
FG
357 delayed_flush);
358 }
359 }
360}
361
362uint32_t PurgeQueue::_calculate_ops(const PurgeItem &item) const
363{
364 uint32_t ops_required = 0;
365 if (item.action == PurgeItem::PURGE_DIR) {
366 // Directory, count dirfrags to be deleted
11fdf7f2 367 frag_vec_t leaves;
7c673cae 368 if (!item.fragtree.is_leaf(frag_t())) {
11fdf7f2 369 item.fragtree.get_leaves(leaves);
7c673cae
FG
370 }
371 // One for the root, plus any leaves
11fdf7f2 372 ops_required = 1 + leaves.size();
7c673cae
FG
373 } else {
374 // File, work out concurrent Filer::purge deletes
f91f0fd5 375 // Account for removing (or zeroing) backtrace
7c673cae
FG
376 const uint64_t num = (item.size > 0) ?
377 Striper::get_num_objects(item.layout, item.size) : 1;
378
39ae355f 379 ops_required = num;
7c673cae 380
7c673cae
FG
381 // Account for deletions for old pools
382 if (item.action != PurgeItem::TRUNCATE_FILE) {
383 ops_required += item.old_pools.size();
384 }
385 }
386
387 return ops_required;
388}
389
f64942e4 390bool PurgeQueue::_can_consume()
7c673cae 391{
f64942e4
AA
392 if (readonly) {
393 dout(10) << "can't consume: PurgeQueue is readonly" << dendl;
394 return false;
395 }
396
7c673cae 397 dout(20) << ops_in_flight << "/" << max_purge_ops << " ops, "
11fdf7f2 398 << in_flight.size() << "/" << g_conf()->mds_max_purge_files
7c673cae
FG
399 << " files" << dendl;
400
401 if (in_flight.size() == 0 && cct->_conf->mds_max_purge_files > 0) {
402 // Always permit consumption if nothing is in flight, so that the ops
403 // limit can never be so low as to forbid all progress (unless
404 // administrator has deliberately paused purging by setting max
405 // purge files to zero).
406 return true;
407 }
408
409 if (ops_in_flight >= max_purge_ops) {
410 dout(20) << "Throttling on op limit " << ops_in_flight << "/"
411 << max_purge_ops << dendl;
412 return false;
413 }
414
415 if (in_flight.size() >= cct->_conf->mds_max_purge_files) {
416 dout(20) << "Throttling on item limit " << in_flight.size()
417 << "/" << cct->_conf->mds_max_purge_files << dendl;
418 return false;
419 } else {
420 return true;
421 }
422}
423
f64942e4
AA
424void PurgeQueue::_go_readonly(int r)
425{
426 if (readonly) return;
427 dout(1) << "going readonly because internal IO failed: " << strerror(-r) << dendl;
428 readonly = true;
9f95a23c 429 finisher.queue(on_error, r);
f64942e4
AA
430 on_error = nullptr;
431 journaler.set_readonly();
432 finish_contexts(g_ceph_context, waiting_for_recovery, r);
433}
434
7c673cae
FG
435bool PurgeQueue::_consume()
436{
9f95a23c 437 ceph_assert(ceph_mutex_is_locked_by_me(lock));
7c673cae
FG
438
439 bool could_consume = false;
f64942e4 440 while(_can_consume()) {
7c673cae
FG
441
442 if (delayed_flush) {
443 // We are now going to read from the journal, so any proactive
444 // flush is no longer necessary. This is not functionally necessary
445 // but it can avoid generating extra fragmented flush IOs.
446 timer.cancel_event(delayed_flush);
447 delayed_flush = nullptr;
448 }
449
1adf2230
AA
450 if (int r = journaler.get_error()) {
451 derr << "Error " << r << " recovering write_pos" << dendl;
f64942e4 452 _go_readonly(r);
1adf2230
AA
453 return could_consume;
454 }
455
7c673cae
FG
456 if (!journaler.is_readable()) {
457 dout(10) << " not readable right now" << dendl;
458 // Because we are the writer and the reader of the journal
459 // via the same Journaler instance, we never need to reread_head
460 if (!journaler.have_waiter()) {
9f95a23c 461 journaler.wait_for_readable(new LambdaContext([this](int r) {
11fdf7f2 462 std::lock_guard l(lock);
7c673cae
FG
463 if (r == 0) {
464 _consume();
f67539c2 465 } else if (r != -CEPHFS_EAGAIN) {
f64942e4 466 _go_readonly(r);
7c673cae
FG
467 }
468 }));
469 }
470
471 return could_consume;
472 }
473
28e407b8 474 could_consume = true;
7c673cae
FG
475 // The journaler is readable: consume an entry
476 bufferlist bl;
477 bool readable = journaler.try_read_entry(bl);
11fdf7f2 478 ceph_assert(readable); // we checked earlier
7c673cae
FG
479
480 dout(20) << " decoding entry" << dendl;
481 PurgeItem item;
11fdf7f2 482 auto q = bl.cbegin();
7c673cae 483 try {
11fdf7f2 484 decode(item, q);
7c673cae
FG
485 } catch (const buffer::error &err) {
486 derr << "Decode error at read_pos=0x" << std::hex
487 << journaler.get_read_pos() << dendl;
f67539c2 488 _go_readonly(CEPHFS_EIO);
7c673cae 489 }
11fdf7f2 490 dout(20) << " executing item (" << item.ino << ")" << dendl;
7c673cae
FG
491 _execute_item(item, journaler.get_read_pos());
492 }
493
494 dout(10) << " cannot consume right now" << dendl;
495
496 return could_consume;
497}
498
f67539c2
TL
499class C_IO_PurgeItem_Commit : public Context {
500public:
501 C_IO_PurgeItem_Commit(PurgeQueue *pq, std::vector<PurgeItemCommitOp> ops, uint64_t expire_to)
502 : purge_queue(pq), ops_vec(std::move(ops)), expire_to(expire_to) {
503 }
504
505 void finish(int r) override {
506 purge_queue->_commit_ops(r, ops_vec, expire_to);
507 }
508
509private:
510 PurgeQueue *purge_queue;
511 std::vector<PurgeItemCommitOp> ops_vec;
512 uint64_t expire_to;
513};
514
515void PurgeQueue::_commit_ops(int r, const std::vector<PurgeItemCommitOp>& ops_vec, uint64_t expire_to)
516{
517 if (r < 0) {
518 derr << " r = " << r << dendl;
519 return;
520 }
521
522 SnapContext nullsnapc;
523 C_GatherBuilder gather(cct);
524
525 for (auto &op : ops_vec) {
526 dout(10) << op.item.get_type_str() << dendl;
527 if (op.type == PurgeItemCommitOp::PURGE_OP_RANGE) {
528 uint64_t first_obj = 0, num_obj = 0;
529 uint64_t num = Striper::get_num_objects(op.item.layout, op.item.size);
530 num_obj = num;
531
532 if (op.item.action == PurgeItem::TRUNCATE_FILE) {
533 first_obj = 1;
534 if (num > 1)
535 num_obj = num - 1;
536 else
537 continue;
538 }
539
540 filer.purge_range(op.item.ino, &op.item.layout, op.item.snapc,
541 first_obj, num_obj, ceph::real_clock::now(), op.flags,
542 gather.new_sub());
543 } else if (op.type == PurgeItemCommitOp::PURGE_OP_REMOVE) {
544 if (op.item.action == PurgeItem::PURGE_DIR) {
545 objecter->remove(op.oid, op.oloc, nullsnapc,
546 ceph::real_clock::now(), op.flags,
547 gather.new_sub());
548 } else {
549 objecter->remove(op.oid, op.oloc, op.item.snapc,
550 ceph::real_clock::now(), op.flags,
551 gather.new_sub());
552 }
553 } else if (op.type == PurgeItemCommitOp::PURGE_OP_ZERO) {
554 filer.zero(op.item.ino, &op.item.layout, op.item.snapc,
555 0, op.item.layout.object_size, ceph::real_clock::now(), 0, true,
556 gather.new_sub());
557 } else {
558 derr << "Invalid purge op: " << op.type << dendl;
559 ceph_abort();
560 }
561 }
562
563 ceph_assert(gather.has_subs());
564
565 gather.set_finisher(new C_OnFinisher(
566 new LambdaContext([this, expire_to](int r) {
567 std::lock_guard l(lock);
568
569 if (r == -CEPHFS_EBLOCKLISTED) {
570 finisher.queue(on_error, r);
571 on_error = nullptr;
572 return;
573 }
574
575 _execute_item_complete(expire_to);
576 _consume();
577
578 // Have we gone idle? If so, do an extra write_head now instead of
579 // waiting for next flush after journaler_write_head_interval.
580 // Also do this periodically even if not idle, so that the persisted
581 // expire_pos doesn't fall too far behind our progress when consuming
582 // a very long queue.
583 if (!readonly &&
584 (in_flight.empty() || journaler.write_head_needed())) {
585 journaler.write_head(nullptr);
586 }
587 }), &finisher));
588
589 gather.activate();
590}
591
7c673cae
FG
592void PurgeQueue::_execute_item(
593 const PurgeItem &item,
594 uint64_t expire_to)
595{
9f95a23c 596 ceph_assert(ceph_mutex_is_locked_by_me(lock));
7c673cae
FG
597
598 in_flight[expire_to] = item;
599 logger->set(l_pq_executing, in_flight.size());
20effc67
TL
600 files_high_water = std::max<uint64_t>(files_high_water,
601 in_flight.size());
92f5a8d4 602 logger->set(l_pq_executing_high_water, files_high_water);
f64942e4
AA
603 auto ops = _calculate_ops(item);
604 ops_in_flight += ops;
7c673cae 605 logger->set(l_pq_executing_ops, ops_in_flight);
92f5a8d4
TL
606 ops_high_water = std::max(ops_high_water, ops_in_flight);
607 logger->set(l_pq_executing_ops_high_water, ops_high_water);
7c673cae 608
f67539c2
TL
609 std::vector<PurgeItemCommitOp> ops_vec;
610 auto submit_ops = [&]() {
611 finisher.queue(new C_IO_PurgeItem_Commit(this, std::move(ops_vec), expire_to));
612 };
7c673cae 613
7c673cae
FG
614 if (item.action == PurgeItem::PURGE_FILE) {
615 if (item.size > 0) {
616 uint64_t num = Striper::get_num_objects(item.layout, item.size);
617 dout(10) << " 0~" << item.size << " objects 0~" << num
618 << " snapc " << item.snapc << " on " << item.ino << dendl;
f67539c2 619 ops_vec.emplace_back(item, PurgeItemCommitOp::PURGE_OP_RANGE, 0);
7c673cae
FG
620 }
621
622 // remove the backtrace object if it was not purged
623 object_t oid = CInode::get_object_name(item.ino, frag_t(), "");
f67539c2 624 if (ops_vec.empty() || !item.layout.pool_ns.empty()) {
7c673cae
FG
625 object_locator_t oloc(item.layout.pool_id);
626 dout(10) << " remove backtrace object " << oid
627 << " pool " << oloc.pool << " snapc " << item.snapc << dendl;
f67539c2 628 ops_vec.emplace_back(item, PurgeItemCommitOp::PURGE_OP_REMOVE, 0, oid, oloc);
7c673cae
FG
629 }
630
631 // remove old backtrace objects
632 for (const auto &p : item.old_pools) {
633 object_locator_t oloc(p);
634 dout(10) << " remove backtrace object " << oid
635 << " old pool " << p << " snapc " << item.snapc << dendl;
f67539c2 636 ops_vec.emplace_back(item, PurgeItemCommitOp::PURGE_OP_REMOVE, 0, oid, oloc);
7c673cae
FG
637 }
638 } else if (item.action == PurgeItem::PURGE_DIR) {
639 object_locator_t oloc(metadata_pool);
11fdf7f2 640 frag_vec_t leaves;
7c673cae 641 if (!item.fragtree.is_leaf(frag_t()))
11fdf7f2
TL
642 item.fragtree.get_leaves(leaves);
643 leaves.push_back(frag_t());
644 for (const auto &leaf : leaves) {
645 object_t oid = CInode::get_object_name(item.ino, leaf, "");
7c673cae 646 dout(10) << " remove dirfrag " << oid << dendl;
f67539c2 647 ops_vec.emplace_back(item, PurgeItemCommitOp::PURGE_OP_REMOVE, 0, oid, oloc);
7c673cae
FG
648 }
649 } else if (item.action == PurgeItem::TRUNCATE_FILE) {
650 const uint64_t num = Striper::get_num_objects(item.layout, item.size);
651 dout(10) << " 0~" << item.size << " objects 0~" << num
652 << " snapc " << item.snapc << " on " << item.ino << dendl;
653
654 // keep backtrace object
655 if (num > 1) {
f67539c2 656 ops_vec.emplace_back(item, PurgeItemCommitOp::PURGE_OP_RANGE, 0);
7c673cae 657 }
f67539c2 658 ops_vec.emplace_back(item, PurgeItemCommitOp::PURGE_OP_ZERO, 0);
7c673cae
FG
659 } else {
660 derr << "Invalid item (action=" << item.action << ") in purge queue, "
661 "dropping it" << dendl;
f64942e4
AA
662 ops_in_flight -= ops;
663 logger->set(l_pq_executing_ops, ops_in_flight);
92f5a8d4
TL
664 ops_high_water = std::max(ops_high_water, ops_in_flight);
665 logger->set(l_pq_executing_ops_high_water, ops_high_water);
7c673cae
FG
666 in_flight.erase(expire_to);
667 logger->set(l_pq_executing, in_flight.size());
20effc67
TL
668 files_high_water = std::max<uint64_t>(files_high_water,
669 in_flight.size());
92f5a8d4 670 logger->set(l_pq_executing_high_water, files_high_water);
7c673cae
FG
671 return;
672 }
7c673cae 673
f67539c2 674 submit_ops();
7c673cae
FG
675}
676
677void PurgeQueue::_execute_item_complete(
678 uint64_t expire_to)
679{
9f95a23c 680 ceph_assert(ceph_mutex_is_locked_by_me(lock));
7c673cae 681 dout(10) << "complete at 0x" << std::hex << expire_to << std::dec << dendl;
11fdf7f2 682 ceph_assert(in_flight.count(expire_to) == 1);
7c673cae
FG
683
684 auto iter = in_flight.find(expire_to);
11fdf7f2 685 ceph_assert(iter != in_flight.end());
7c673cae 686 if (iter == in_flight.begin()) {
11fdf7f2
TL
687 uint64_t pos = expire_to;
688 if (!pending_expire.empty()) {
689 auto n = iter;
690 ++n;
691 if (n == in_flight.end()) {
692 pos = *pending_expire.rbegin();
693 pending_expire.clear();
694 } else {
695 auto p = pending_expire.begin();
696 do {
697 if (*p >= n->first)
698 break;
699 pos = *p;
700 pending_expire.erase(p++);
701 } while (p != pending_expire.end());
702 }
703 }
704 dout(10) << "expiring to 0x" << std::hex << pos << std::dec << dendl;
705 journaler.set_expire_pos(pos);
7c673cae
FG
706 } else {
707 // This is completely fine, we're not supposed to purge files in
708 // order when doing them in parallel.
709 dout(10) << "non-sequential completion, not expiring anything" << dendl;
11fdf7f2 710 pending_expire.insert(expire_to);
7c673cae
FG
711 }
712
713 ops_in_flight -= _calculate_ops(iter->second);
714 logger->set(l_pq_executing_ops, ops_in_flight);
92f5a8d4
TL
715 ops_high_water = std::max(ops_high_water, ops_in_flight);
716 logger->set(l_pq_executing_ops_high_water, ops_high_water);
7c673cae 717
11fdf7f2 718 dout(10) << "completed item for ino " << iter->second.ino << dendl;
7c673cae
FG
719
720 in_flight.erase(iter);
721 logger->set(l_pq_executing, in_flight.size());
20effc67
TL
722 files_high_water = std::max<uint64_t>(files_high_water,
723 in_flight.size());
92f5a8d4 724 logger->set(l_pq_executing_high_water, files_high_water);
7c673cae
FG
725 dout(10) << "in_flight.size() now " << in_flight.size() << dendl;
726
9f95a23c
TL
727 uint64_t write_pos = journaler.get_write_pos();
728 uint64_t read_pos = journaler.get_read_pos();
729 uint64_t expire_pos = journaler.get_expire_pos();
730 uint64_t item_num = (write_pos - (in_flight.size() ? expire_pos : read_pos))
731 / purge_item_journal_size;
732 dout(10) << "left purge items in journal: " << item_num
733 << " (purge_item_journal_size/write_pos/read_pos/expire_pos) now at "
734 << "(" << purge_item_journal_size << "/" << write_pos << "/" << read_pos
735 << "/" << expire_pos << ")" << dendl;
736
737 logger->set(l_pq_item_in_journal, item_num);
7c673cae
FG
738 logger->inc(l_pq_executed);
739}
740
741void PurgeQueue::update_op_limit(const MDSMap &mds_map)
742{
11fdf7f2 743 std::lock_guard l(lock);
7c673cae 744
f64942e4
AA
745 if (readonly) {
746 dout(10) << "skipping; PurgeQueue is readonly" << dendl;
747 return;
748 }
749
7c673cae
FG
750 uint64_t pg_count = 0;
751 objecter->with_osdmap([&](const OSDMap& o) {
752 // Number of PGs across all data pools
31f18b77 753 const std::vector<int64_t> &data_pools = mds_map.get_data_pools();
7c673cae
FG
754 for (const auto dp : data_pools) {
755 if (o.get_pg_pool(dp) == NULL) {
756 // It is possible that we have an older OSDMap than MDSMap,
757 // because we don't start watching every OSDMap until after
758 // MDSRank is initialized
759 dout(4) << " data pool " << dp << " not found in OSDMap" << dendl;
760 continue;
761 }
762 pg_count += o.get_pg_num(dp);
763 }
764 });
765
766 // Work out a limit based on n_pgs / n_mdss, multiplied by the user's
767 // preference for how many ops per PG
768 max_purge_ops = uint64_t(((double)pg_count / (double)mds_map.get_max_mds()) *
769 cct->_conf->mds_max_purge_ops_per_pg);
770
771 // User may also specify a hard limit, apply this if so.
772 if (cct->_conf->mds_max_purge_ops) {
11fdf7f2 773 max_purge_ops = std::min(max_purge_ops, cct->_conf->mds_max_purge_ops);
7c673cae
FG
774 }
775}
776
92f5a8d4 777void PurgeQueue::handle_conf_change(const std::set<std::string>& changed, const MDSMap& mds_map)
7c673cae
FG
778{
779 if (changed.count("mds_max_purge_ops")
780 || changed.count("mds_max_purge_ops_per_pg")) {
781 update_op_limit(mds_map);
782 } else if (changed.count("mds_max_purge_files")) {
11fdf7f2 783 std::lock_guard l(lock);
7c673cae
FG
784 if (in_flight.empty()) {
785 // We might have gone from zero to a finite limit, so
786 // might need to kick off consume.
787 dout(4) << "maybe start work again (max_purge_files="
92f5a8d4 788 << g_conf()->mds_max_purge_files << dendl;
9f95a23c 789 finisher.queue(new LambdaContext([this](int r){
11fdf7f2 790 std::lock_guard l(lock);
7c673cae
FG
791 _consume();
792 }));
793 }
794 }
795}
796
797bool PurgeQueue::drain(
798 uint64_t *progress,
799 uint64_t *progress_total,
800 size_t *in_flight_count
801 )
802{
11fdf7f2 803 std::lock_guard l(lock);
f64942e4
AA
804
805 if (readonly) {
806 dout(10) << "skipping drain; PurgeQueue is readonly" << dendl;
807 return true;
808 }
809
11fdf7f2
TL
810 ceph_assert(progress != nullptr);
811 ceph_assert(progress_total != nullptr);
812 ceph_assert(in_flight_count != nullptr);
7c673cae
FG
813
814 const bool done = in_flight.empty() && (
815 journaler.get_read_pos() == journaler.get_write_pos());
816 if (done) {
817 return true;
818 }
819
820 const uint64_t bytes_remaining = journaler.get_write_pos()
821 - journaler.get_read_pos();
822
823 if (!draining) {
824 // Start of draining: remember how much there was outstanding at
825 // this point so that we can give a progress percentage later
826 draining = true;
827
828 // Life the op throttle as this daemon now has nothing to do but
829 // drain the purge queue, so do it as fast as we can.
830 max_purge_ops = 0xffff;
831 }
832
11fdf7f2 833 drain_initial = std::max(bytes_remaining, drain_initial);
7c673cae
FG
834
835 *progress = drain_initial - bytes_remaining;
836 *progress_total = drain_initial;
837 *in_flight_count = in_flight.size();
838
839 return false;
840}
841
11fdf7f2
TL
842std::string_view PurgeItem::get_type_str() const
843{
844 switch(action) {
845 case PurgeItem::NONE: return "NONE";
846 case PurgeItem::PURGE_FILE: return "PURGE_FILE";
847 case PurgeItem::PURGE_DIR: return "PURGE_DIR";
848 case PurgeItem::TRUNCATE_FILE: return "TRUNCATE_FILE";
849 default:
850 return "UNKNOWN";
851 }
852}