]> git.proxmox.com Git - ceph.git/blob - ceph/src/mds/PurgeQueue.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / mds / PurgeQueue.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2015 Red Hat
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include "common/debug.h"
16 #include "mds/mdstypes.h"
17 #include "mds/CInode.h"
18 #include "mds/MDCache.h"
19
20 #include "PurgeQueue.h"
21
22 #define dout_context cct
23 #define dout_subsys ceph_subsys_mds
24 #undef dout_prefix
25 #define dout_prefix _prefix(_dout, rank) << __func__ << ": "
26
27 using namespace std;
28
29 static ostream& _prefix(std::ostream *_dout, mds_rank_t rank) {
30 return *_dout << "mds." << rank << ".purge_queue ";
31 }
32
33 const std::map<std::string, PurgeItem::Action> PurgeItem::actions = {
34 {"NONE", PurgeItem::NONE},
35 {"PURGE_FILE", PurgeItem::PURGE_FILE},
36 {"TRUNCATE_FILE", PurgeItem::TRUNCATE_FILE},
37 {"PURGE_DIR", PurgeItem::PURGE_DIR}
38 };
39
40 void PurgeItem::encode(bufferlist &bl) const
41 {
42 ENCODE_START(2, 1, bl);
43 encode((uint8_t)action, bl);
44 encode(ino, bl);
45 encode(size, bl);
46 encode(layout, bl, CEPH_FEATURE_FS_FILE_LAYOUT_V2);
47 encode(old_pools, bl);
48 encode(snapc, bl);
49 encode(fragtree, bl);
50 encode(stamp, bl);
51 uint8_t static const pad = 0xff;
52 for (unsigned int i = 0; i<pad_size; i++) {
53 encode(pad, bl);
54 }
55 ENCODE_FINISH(bl);
56 }
57
58 void PurgeItem::decode(bufferlist::const_iterator &p)
59 {
60 DECODE_START(2, p);
61 bool done = false;
62 if (struct_v == 1) {
63 auto p_start = p;
64 try {
65 // bad encoding introduced by v13.2.2
66 decode(stamp, p);
67 decode(pad_size, p);
68 p += pad_size;
69 uint8_t raw_action;
70 decode(raw_action, p);
71 action = (Action)raw_action;
72 decode(ino, p);
73 decode(size, p);
74 decode(layout, p);
75 decode(old_pools, p);
76 decode(snapc, p);
77 decode(fragtree, p);
78 if (p.get_off() > struct_end)
79 throw buffer::end_of_buffer();
80 done = true;
81 } catch (const buffer::error &e) {
82 p = p_start;
83 }
84 }
85 if (!done) {
86 uint8_t raw_action;
87 decode(raw_action, p);
88 action = (Action)raw_action;
89 decode(ino, p);
90 decode(size, p);
91 decode(layout, p);
92 decode(old_pools, p);
93 decode(snapc, p);
94 decode(fragtree, p);
95 if (struct_v >= 2) {
96 decode(stamp, p);
97 }
98 }
99 DECODE_FINISH(p);
100 }
101
102 // if Objecter has any slow requests, take that as a hint and
103 // slow down our rate of purging
104 PurgeQueue::PurgeQueue(
105 CephContext *cct_,
106 mds_rank_t rank_,
107 const int64_t metadata_pool_,
108 Objecter *objecter_,
109 Context *on_error_)
110 :
111 cct(cct_),
112 rank(rank_),
113 metadata_pool(metadata_pool_),
114 finisher(cct, "PurgeQueue", "PQ_Finisher"),
115 timer(cct, lock),
116 filer(objecter_, &finisher),
117 objecter(objecter_),
118 journaler("pq", MDS_INO_PURGE_QUEUE + rank, metadata_pool,
119 CEPH_FS_ONDISK_MAGIC, objecter_, nullptr, 0,
120 &finisher),
121 on_error(on_error_)
122 {
123 ceph_assert(cct != nullptr);
124 ceph_assert(on_error != nullptr);
125 ceph_assert(objecter != nullptr);
126 journaler.set_write_error_handler(on_error);
127 }
128
129 PurgeQueue::~PurgeQueue()
130 {
131 if (logger) {
132 g_ceph_context->get_perfcounters_collection()->remove(logger.get());
133 }
134 delete on_error;
135 }
136
137 void PurgeQueue::create_logger()
138 {
139 PerfCountersBuilder pcb(g_ceph_context, "purge_queue", l_pq_first, l_pq_last);
140
141 pcb.add_u64_counter(l_pq_executed, "pq_executed", "Purge queue tasks executed",
142 "purg", PerfCountersBuilder::PRIO_INTERESTING);
143
144 pcb.set_prio_default(PerfCountersBuilder::PRIO_USEFUL);
145 pcb.add_u64(l_pq_executing_ops, "pq_executing_ops", "Purge queue ops in flight");
146 pcb.add_u64(l_pq_executing_ops_high_water, "pq_executing_ops_high_water", "Maximum number of executing file purge ops");
147 pcb.add_u64(l_pq_executing, "pq_executing", "Purge queue tasks in flight");
148 pcb.add_u64(l_pq_executing_high_water, "pq_executing_high_water", "Maximum number of executing file purges");
149 pcb.add_u64(l_pq_item_in_journal, "pq_item_in_journal", "Purge item left in journal");
150
151 logger.reset(pcb.create_perf_counters());
152 g_ceph_context->get_perfcounters_collection()->add(logger.get());
153 }
154
155 void PurgeQueue::init()
156 {
157 std::lock_guard l(lock);
158
159 ceph_assert(logger != nullptr);
160
161 finisher.start();
162 timer.init();
163 }
164
165 void PurgeQueue::activate()
166 {
167 std::lock_guard l(lock);
168
169 {
170 PurgeItem item;
171 bufferlist bl;
172
173 // calculate purge item serialized size stored in journal
174 // used to count how many items still left in journal later
175 ::encode(item, bl);
176 purge_item_journal_size = bl.length() + journaler.get_journal_envelope_size();
177 }
178
179 if (readonly) {
180 dout(10) << "skipping activate: PurgeQueue is readonly" << dendl;
181 return;
182 }
183
184 if (journaler.get_read_pos() == journaler.get_write_pos())
185 return;
186
187 if (in_flight.empty()) {
188 dout(4) << "start work (by drain)" << dendl;
189 finisher.queue(new LambdaContext([this](int r) {
190 std::lock_guard l(lock);
191 _consume();
192 }));
193 }
194 }
195
196 void PurgeQueue::shutdown()
197 {
198 std::lock_guard l(lock);
199
200 journaler.shutdown();
201 timer.shutdown();
202 finisher.stop();
203 }
204
205 void PurgeQueue::open(Context *completion)
206 {
207 dout(4) << "opening" << dendl;
208
209 std::lock_guard l(lock);
210
211 if (completion)
212 waiting_for_recovery.push_back(completion);
213
214 journaler.recover(new LambdaContext([this](int r){
215 if (r == -CEPHFS_ENOENT) {
216 dout(1) << "Purge Queue not found, assuming this is an upgrade and "
217 "creating it." << dendl;
218 create(NULL);
219 } else if (r == 0) {
220 std::lock_guard l(lock);
221 dout(4) << "open complete" << dendl;
222
223 // Journaler only guarantees entries before head write_pos have been
224 // fully flushed. Before appending new entries, we need to find and
225 // drop any partial written entry.
226 if (journaler.last_committed.write_pos < journaler.get_write_pos()) {
227 dout(4) << "recovering write_pos" << dendl;
228 journaler.set_read_pos(journaler.last_committed.write_pos);
229 _recover();
230 return;
231 }
232
233 journaler.set_writeable();
234 recovered = true;
235 finish_contexts(g_ceph_context, waiting_for_recovery);
236 } else {
237 derr << "Error " << r << " loading Journaler" << dendl;
238 _go_readonly(r);
239 }
240 }));
241 }
242
243 void PurgeQueue::wait_for_recovery(Context* c)
244 {
245 std::lock_guard l(lock);
246 if (recovered) {
247 c->complete(0);
248 } else if (readonly) {
249 dout(10) << "cannot wait for recovery: PurgeQueue is readonly" << dendl;
250 c->complete(-CEPHFS_EROFS);
251 } else {
252 waiting_for_recovery.push_back(c);
253 }
254 }
255
256 void PurgeQueue::_recover()
257 {
258 ceph_assert(ceph_mutex_is_locked_by_me(lock));
259
260 // Journaler::is_readable() adjusts write_pos if partial entry is encountered
261 while (1) {
262 if (!journaler.is_readable() &&
263 !journaler.get_error() &&
264 journaler.get_read_pos() < journaler.get_write_pos()) {
265 journaler.wait_for_readable(new LambdaContext([this](int r) {
266 std::lock_guard l(lock);
267 _recover();
268 }));
269 return;
270 }
271
272 if (journaler.get_error()) {
273 int r = journaler.get_error();
274 derr << "Error " << r << " recovering write_pos" << dendl;
275 _go_readonly(r);
276 return;
277 }
278
279 if (journaler.get_read_pos() == journaler.get_write_pos()) {
280 dout(4) << "write_pos recovered" << dendl;
281 // restore original read_pos
282 journaler.set_read_pos(journaler.last_committed.expire_pos);
283 journaler.set_writeable();
284 recovered = true;
285 finish_contexts(g_ceph_context, waiting_for_recovery);
286 return;
287 }
288
289 bufferlist bl;
290 bool readable = journaler.try_read_entry(bl);
291 ceph_assert(readable); // we checked earlier
292 }
293 }
294
295 void PurgeQueue::create(Context *fin)
296 {
297 dout(4) << "creating" << dendl;
298 std::lock_guard l(lock);
299
300 if (fin)
301 waiting_for_recovery.push_back(fin);
302
303 file_layout_t layout = file_layout_t::get_default();
304 layout.pool_id = metadata_pool;
305 journaler.set_writeable();
306 journaler.create(&layout, JOURNAL_FORMAT_RESILIENT);
307 journaler.write_head(new LambdaContext([this](int r) {
308 std::lock_guard l(lock);
309 if (r) {
310 _go_readonly(r);
311 } else {
312 recovered = true;
313 finish_contexts(g_ceph_context, waiting_for_recovery);
314 }
315 }));
316 }
317
318 /**
319 * The `completion` context will always be called back via a Finisher
320 */
321 void PurgeQueue::push(const PurgeItem &pi, Context *completion)
322 {
323 dout(4) << "pushing inode " << pi.ino << dendl;
324 std::lock_guard l(lock);
325
326 if (readonly) {
327 dout(10) << "cannot push inode: PurgeQueue is readonly" << dendl;
328 completion->complete(-CEPHFS_EROFS);
329 return;
330 }
331
332 // Callers should have waited for open() before using us
333 ceph_assert(!journaler.is_readonly());
334
335 bufferlist bl;
336
337 encode(pi, bl);
338 journaler.append_entry(bl);
339 journaler.wait_for_flush(completion);
340
341 // Maybe go ahead and do something with it right away
342 bool could_consume = _consume();
343 if (!could_consume) {
344 // Usually, it is not necessary to explicitly flush here, because the reader
345 // will get flushes generated inside Journaler::is_readable. However,
346 // if we remain in a _can_consume()==false state for a long period then
347 // we should flush in order to allow MDCache to drop its strays rather
348 // than having them wait for purgequeue to progress.
349 if (!delayed_flush) {
350 delayed_flush = new LambdaContext([this](int r){
351 delayed_flush = nullptr;
352 journaler.flush();
353 });
354
355 timer.add_event_after(
356 g_conf()->mds_purge_queue_busy_flush_period,
357 delayed_flush);
358 }
359 }
360 }
361
362 uint32_t PurgeQueue::_calculate_ops(const PurgeItem &item) const
363 {
364 uint32_t ops_required = 0;
365 if (item.action == PurgeItem::PURGE_DIR) {
366 // Directory, count dirfrags to be deleted
367 frag_vec_t leaves;
368 if (!item.fragtree.is_leaf(frag_t())) {
369 item.fragtree.get_leaves(leaves);
370 }
371 // One for the root, plus any leaves
372 ops_required = 1 + leaves.size();
373 } else {
374 // File, work out concurrent Filer::purge deletes
375 // Account for removing (or zeroing) backtrace
376 const uint64_t num = (item.size > 0) ?
377 Striper::get_num_objects(item.layout, item.size) : 1;
378
379 ops_required = std::min(num, g_conf()->filer_max_purge_ops);
380
381 // Account for deletions for old pools
382 if (item.action != PurgeItem::TRUNCATE_FILE) {
383 ops_required += item.old_pools.size();
384 }
385 }
386
387 return ops_required;
388 }
389
390 bool PurgeQueue::_can_consume()
391 {
392 if (readonly) {
393 dout(10) << "can't consume: PurgeQueue is readonly" << dendl;
394 return false;
395 }
396
397 dout(20) << ops_in_flight << "/" << max_purge_ops << " ops, "
398 << in_flight.size() << "/" << g_conf()->mds_max_purge_files
399 << " files" << dendl;
400
401 if (in_flight.size() == 0 && cct->_conf->mds_max_purge_files > 0) {
402 // Always permit consumption if nothing is in flight, so that the ops
403 // limit can never be so low as to forbid all progress (unless
404 // administrator has deliberately paused purging by setting max
405 // purge files to zero).
406 return true;
407 }
408
409 if (ops_in_flight >= max_purge_ops) {
410 dout(20) << "Throttling on op limit " << ops_in_flight << "/"
411 << max_purge_ops << dendl;
412 return false;
413 }
414
415 if (in_flight.size() >= cct->_conf->mds_max_purge_files) {
416 dout(20) << "Throttling on item limit " << in_flight.size()
417 << "/" << cct->_conf->mds_max_purge_files << dendl;
418 return false;
419 } else {
420 return true;
421 }
422 }
423
424 void PurgeQueue::_go_readonly(int r)
425 {
426 if (readonly) return;
427 dout(1) << "going readonly because internal IO failed: " << strerror(-r) << dendl;
428 readonly = true;
429 finisher.queue(on_error, r);
430 on_error = nullptr;
431 journaler.set_readonly();
432 finish_contexts(g_ceph_context, waiting_for_recovery, r);
433 }
434
435 bool PurgeQueue::_consume()
436 {
437 ceph_assert(ceph_mutex_is_locked_by_me(lock));
438
439 bool could_consume = false;
440 while(_can_consume()) {
441
442 if (delayed_flush) {
443 // We are now going to read from the journal, so any proactive
444 // flush is no longer necessary. This is not functionally necessary
445 // but it can avoid generating extra fragmented flush IOs.
446 timer.cancel_event(delayed_flush);
447 delayed_flush = nullptr;
448 }
449
450 if (int r = journaler.get_error()) {
451 derr << "Error " << r << " recovering write_pos" << dendl;
452 _go_readonly(r);
453 return could_consume;
454 }
455
456 if (!journaler.is_readable()) {
457 dout(10) << " not readable right now" << dendl;
458 // Because we are the writer and the reader of the journal
459 // via the same Journaler instance, we never need to reread_head
460 if (!journaler.have_waiter()) {
461 journaler.wait_for_readable(new LambdaContext([this](int r) {
462 std::lock_guard l(lock);
463 if (r == 0) {
464 _consume();
465 } else if (r != -CEPHFS_EAGAIN) {
466 _go_readonly(r);
467 }
468 }));
469 }
470
471 return could_consume;
472 }
473
474 could_consume = true;
475 // The journaler is readable: consume an entry
476 bufferlist bl;
477 bool readable = journaler.try_read_entry(bl);
478 ceph_assert(readable); // we checked earlier
479
480 dout(20) << " decoding entry" << dendl;
481 PurgeItem item;
482 auto q = bl.cbegin();
483 try {
484 decode(item, q);
485 } catch (const buffer::error &err) {
486 derr << "Decode error at read_pos=0x" << std::hex
487 << journaler.get_read_pos() << dendl;
488 _go_readonly(CEPHFS_EIO);
489 }
490 dout(20) << " executing item (" << item.ino << ")" << dendl;
491 _execute_item(item, journaler.get_read_pos());
492 }
493
494 dout(10) << " cannot consume right now" << dendl;
495
496 return could_consume;
497 }
498
499 class C_IO_PurgeItem_Commit : public Context {
500 public:
501 C_IO_PurgeItem_Commit(PurgeQueue *pq, std::vector<PurgeItemCommitOp> ops, uint64_t expire_to)
502 : purge_queue(pq), ops_vec(std::move(ops)), expire_to(expire_to) {
503 }
504
505 void finish(int r) override {
506 purge_queue->_commit_ops(r, ops_vec, expire_to);
507 }
508
509 private:
510 PurgeQueue *purge_queue;
511 std::vector<PurgeItemCommitOp> ops_vec;
512 uint64_t expire_to;
513 };
514
515 void PurgeQueue::_commit_ops(int r, const std::vector<PurgeItemCommitOp>& ops_vec, uint64_t expire_to)
516 {
517 if (r < 0) {
518 derr << " r = " << r << dendl;
519 return;
520 }
521
522 SnapContext nullsnapc;
523 C_GatherBuilder gather(cct);
524
525 for (auto &op : ops_vec) {
526 dout(10) << op.item.get_type_str() << dendl;
527 if (op.type == PurgeItemCommitOp::PURGE_OP_RANGE) {
528 uint64_t first_obj = 0, num_obj = 0;
529 uint64_t num = Striper::get_num_objects(op.item.layout, op.item.size);
530 num_obj = num;
531
532 if (op.item.action == PurgeItem::TRUNCATE_FILE) {
533 first_obj = 1;
534 if (num > 1)
535 num_obj = num - 1;
536 else
537 continue;
538 }
539
540 filer.purge_range(op.item.ino, &op.item.layout, op.item.snapc,
541 first_obj, num_obj, ceph::real_clock::now(), op.flags,
542 gather.new_sub());
543 } else if (op.type == PurgeItemCommitOp::PURGE_OP_REMOVE) {
544 if (op.item.action == PurgeItem::PURGE_DIR) {
545 objecter->remove(op.oid, op.oloc, nullsnapc,
546 ceph::real_clock::now(), op.flags,
547 gather.new_sub());
548 } else {
549 objecter->remove(op.oid, op.oloc, op.item.snapc,
550 ceph::real_clock::now(), op.flags,
551 gather.new_sub());
552 }
553 } else if (op.type == PurgeItemCommitOp::PURGE_OP_ZERO) {
554 filer.zero(op.item.ino, &op.item.layout, op.item.snapc,
555 0, op.item.layout.object_size, ceph::real_clock::now(), 0, true,
556 gather.new_sub());
557 } else {
558 derr << "Invalid purge op: " << op.type << dendl;
559 ceph_abort();
560 }
561 }
562
563 ceph_assert(gather.has_subs());
564
565 gather.set_finisher(new C_OnFinisher(
566 new LambdaContext([this, expire_to](int r) {
567 std::lock_guard l(lock);
568
569 if (r == -CEPHFS_EBLOCKLISTED) {
570 finisher.queue(on_error, r);
571 on_error = nullptr;
572 return;
573 }
574
575 _execute_item_complete(expire_to);
576 _consume();
577
578 // Have we gone idle? If so, do an extra write_head now instead of
579 // waiting for next flush after journaler_write_head_interval.
580 // Also do this periodically even if not idle, so that the persisted
581 // expire_pos doesn't fall too far behind our progress when consuming
582 // a very long queue.
583 if (!readonly &&
584 (in_flight.empty() || journaler.write_head_needed())) {
585 journaler.write_head(nullptr);
586 }
587 }), &finisher));
588
589 gather.activate();
590 }
591
592 void PurgeQueue::_execute_item(
593 const PurgeItem &item,
594 uint64_t expire_to)
595 {
596 ceph_assert(ceph_mutex_is_locked_by_me(lock));
597
598 in_flight[expire_to] = item;
599 logger->set(l_pq_executing, in_flight.size());
600 files_high_water = std::max<uint64_t>(files_high_water,
601 in_flight.size());
602 logger->set(l_pq_executing_high_water, files_high_water);
603 auto ops = _calculate_ops(item);
604 ops_in_flight += ops;
605 logger->set(l_pq_executing_ops, ops_in_flight);
606 ops_high_water = std::max(ops_high_water, ops_in_flight);
607 logger->set(l_pq_executing_ops_high_water, ops_high_water);
608
609 std::vector<PurgeItemCommitOp> ops_vec;
610 auto submit_ops = [&]() {
611 finisher.queue(new C_IO_PurgeItem_Commit(this, std::move(ops_vec), expire_to));
612 };
613
614 if (item.action == PurgeItem::PURGE_FILE) {
615 if (item.size > 0) {
616 uint64_t num = Striper::get_num_objects(item.layout, item.size);
617 dout(10) << " 0~" << item.size << " objects 0~" << num
618 << " snapc " << item.snapc << " on " << item.ino << dendl;
619 ops_vec.emplace_back(item, PurgeItemCommitOp::PURGE_OP_RANGE, 0);
620 }
621
622 // remove the backtrace object if it was not purged
623 object_t oid = CInode::get_object_name(item.ino, frag_t(), "");
624 if (ops_vec.empty() || !item.layout.pool_ns.empty()) {
625 object_locator_t oloc(item.layout.pool_id);
626 dout(10) << " remove backtrace object " << oid
627 << " pool " << oloc.pool << " snapc " << item.snapc << dendl;
628 ops_vec.emplace_back(item, PurgeItemCommitOp::PURGE_OP_REMOVE, 0, oid, oloc);
629 }
630
631 // remove old backtrace objects
632 for (const auto &p : item.old_pools) {
633 object_locator_t oloc(p);
634 dout(10) << " remove backtrace object " << oid
635 << " old pool " << p << " snapc " << item.snapc << dendl;
636 ops_vec.emplace_back(item, PurgeItemCommitOp::PURGE_OP_REMOVE, 0, oid, oloc);
637 }
638 } else if (item.action == PurgeItem::PURGE_DIR) {
639 object_locator_t oloc(metadata_pool);
640 frag_vec_t leaves;
641 if (!item.fragtree.is_leaf(frag_t()))
642 item.fragtree.get_leaves(leaves);
643 leaves.push_back(frag_t());
644 for (const auto &leaf : leaves) {
645 object_t oid = CInode::get_object_name(item.ino, leaf, "");
646 dout(10) << " remove dirfrag " << oid << dendl;
647 ops_vec.emplace_back(item, PurgeItemCommitOp::PURGE_OP_REMOVE, 0, oid, oloc);
648 }
649 } else if (item.action == PurgeItem::TRUNCATE_FILE) {
650 const uint64_t num = Striper::get_num_objects(item.layout, item.size);
651 dout(10) << " 0~" << item.size << " objects 0~" << num
652 << " snapc " << item.snapc << " on " << item.ino << dendl;
653
654 // keep backtrace object
655 if (num > 1) {
656 ops_vec.emplace_back(item, PurgeItemCommitOp::PURGE_OP_RANGE, 0);
657 }
658 ops_vec.emplace_back(item, PurgeItemCommitOp::PURGE_OP_ZERO, 0);
659 } else {
660 derr << "Invalid item (action=" << item.action << ") in purge queue, "
661 "dropping it" << dendl;
662 ops_in_flight -= ops;
663 logger->set(l_pq_executing_ops, ops_in_flight);
664 ops_high_water = std::max(ops_high_water, ops_in_flight);
665 logger->set(l_pq_executing_ops_high_water, ops_high_water);
666 in_flight.erase(expire_to);
667 logger->set(l_pq_executing, in_flight.size());
668 files_high_water = std::max<uint64_t>(files_high_water,
669 in_flight.size());
670 logger->set(l_pq_executing_high_water, files_high_water);
671 return;
672 }
673
674 submit_ops();
675 }
676
677 void PurgeQueue::_execute_item_complete(
678 uint64_t expire_to)
679 {
680 ceph_assert(ceph_mutex_is_locked_by_me(lock));
681 dout(10) << "complete at 0x" << std::hex << expire_to << std::dec << dendl;
682 ceph_assert(in_flight.count(expire_to) == 1);
683
684 auto iter = in_flight.find(expire_to);
685 ceph_assert(iter != in_flight.end());
686 if (iter == in_flight.begin()) {
687 uint64_t pos = expire_to;
688 if (!pending_expire.empty()) {
689 auto n = iter;
690 ++n;
691 if (n == in_flight.end()) {
692 pos = *pending_expire.rbegin();
693 pending_expire.clear();
694 } else {
695 auto p = pending_expire.begin();
696 do {
697 if (*p >= n->first)
698 break;
699 pos = *p;
700 pending_expire.erase(p++);
701 } while (p != pending_expire.end());
702 }
703 }
704 dout(10) << "expiring to 0x" << std::hex << pos << std::dec << dendl;
705 journaler.set_expire_pos(pos);
706 } else {
707 // This is completely fine, we're not supposed to purge files in
708 // order when doing them in parallel.
709 dout(10) << "non-sequential completion, not expiring anything" << dendl;
710 pending_expire.insert(expire_to);
711 }
712
713 ops_in_flight -= _calculate_ops(iter->second);
714 logger->set(l_pq_executing_ops, ops_in_flight);
715 ops_high_water = std::max(ops_high_water, ops_in_flight);
716 logger->set(l_pq_executing_ops_high_water, ops_high_water);
717
718 dout(10) << "completed item for ino " << iter->second.ino << dendl;
719
720 in_flight.erase(iter);
721 logger->set(l_pq_executing, in_flight.size());
722 files_high_water = std::max<uint64_t>(files_high_water,
723 in_flight.size());
724 logger->set(l_pq_executing_high_water, files_high_water);
725 dout(10) << "in_flight.size() now " << in_flight.size() << dendl;
726
727 uint64_t write_pos = journaler.get_write_pos();
728 uint64_t read_pos = journaler.get_read_pos();
729 uint64_t expire_pos = journaler.get_expire_pos();
730 uint64_t item_num = (write_pos - (in_flight.size() ? expire_pos : read_pos))
731 / purge_item_journal_size;
732 dout(10) << "left purge items in journal: " << item_num
733 << " (purge_item_journal_size/write_pos/read_pos/expire_pos) now at "
734 << "(" << purge_item_journal_size << "/" << write_pos << "/" << read_pos
735 << "/" << expire_pos << ")" << dendl;
736
737 logger->set(l_pq_item_in_journal, item_num);
738 logger->inc(l_pq_executed);
739 }
740
741 void PurgeQueue::update_op_limit(const MDSMap &mds_map)
742 {
743 std::lock_guard l(lock);
744
745 if (readonly) {
746 dout(10) << "skipping; PurgeQueue is readonly" << dendl;
747 return;
748 }
749
750 uint64_t pg_count = 0;
751 objecter->with_osdmap([&](const OSDMap& o) {
752 // Number of PGs across all data pools
753 const std::vector<int64_t> &data_pools = mds_map.get_data_pools();
754 for (const auto dp : data_pools) {
755 if (o.get_pg_pool(dp) == NULL) {
756 // It is possible that we have an older OSDMap than MDSMap,
757 // because we don't start watching every OSDMap until after
758 // MDSRank is initialized
759 dout(4) << " data pool " << dp << " not found in OSDMap" << dendl;
760 continue;
761 }
762 pg_count += o.get_pg_num(dp);
763 }
764 });
765
766 // Work out a limit based on n_pgs / n_mdss, multiplied by the user's
767 // preference for how many ops per PG
768 max_purge_ops = uint64_t(((double)pg_count / (double)mds_map.get_max_mds()) *
769 cct->_conf->mds_max_purge_ops_per_pg);
770
771 // User may also specify a hard limit, apply this if so.
772 if (cct->_conf->mds_max_purge_ops) {
773 max_purge_ops = std::min(max_purge_ops, cct->_conf->mds_max_purge_ops);
774 }
775 }
776
777 void PurgeQueue::handle_conf_change(const std::set<std::string>& changed, const MDSMap& mds_map)
778 {
779 if (changed.count("mds_max_purge_ops")
780 || changed.count("mds_max_purge_ops_per_pg")) {
781 update_op_limit(mds_map);
782 } else if (changed.count("mds_max_purge_files")) {
783 std::lock_guard l(lock);
784 if (in_flight.empty()) {
785 // We might have gone from zero to a finite limit, so
786 // might need to kick off consume.
787 dout(4) << "maybe start work again (max_purge_files="
788 << g_conf()->mds_max_purge_files << dendl;
789 finisher.queue(new LambdaContext([this](int r){
790 std::lock_guard l(lock);
791 _consume();
792 }));
793 }
794 }
795 }
796
797 bool PurgeQueue::drain(
798 uint64_t *progress,
799 uint64_t *progress_total,
800 size_t *in_flight_count
801 )
802 {
803 std::lock_guard l(lock);
804
805 if (readonly) {
806 dout(10) << "skipping drain; PurgeQueue is readonly" << dendl;
807 return true;
808 }
809
810 ceph_assert(progress != nullptr);
811 ceph_assert(progress_total != nullptr);
812 ceph_assert(in_flight_count != nullptr);
813
814 const bool done = in_flight.empty() && (
815 journaler.get_read_pos() == journaler.get_write_pos());
816 if (done) {
817 return true;
818 }
819
820 const uint64_t bytes_remaining = journaler.get_write_pos()
821 - journaler.get_read_pos();
822
823 if (!draining) {
824 // Start of draining: remember how much there was outstanding at
825 // this point so that we can give a progress percentage later
826 draining = true;
827
828 // Life the op throttle as this daemon now has nothing to do but
829 // drain the purge queue, so do it as fast as we can.
830 max_purge_ops = 0xffff;
831 }
832
833 drain_initial = std::max(bytes_remaining, drain_initial);
834
835 *progress = drain_initial - bytes_remaining;
836 *progress_total = drain_initial;
837 *in_flight_count = in_flight.size();
838
839 return false;
840 }
841
842 std::string_view PurgeItem::get_type_str() const
843 {
844 switch(action) {
845 case PurgeItem::NONE: return "NONE";
846 case PurgeItem::PURGE_FILE: return "PURGE_FILE";
847 case PurgeItem::PURGE_DIR: return "PURGE_DIR";
848 case PurgeItem::TRUNCATE_FILE: return "TRUNCATE_FILE";
849 default:
850 return "UNKNOWN";
851 }
852 }
853