]> git.proxmox.com Git - ceph.git/blob - ceph/src/mds/PurgeQueue.cc
3e992c396d06da75337cfddecff1b71fa33b348d
[ceph.git] / ceph / src / mds / PurgeQueue.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2015 Red Hat
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include "common/debug.h"
16 #include "mds/mdstypes.h"
17 #include "mds/CInode.h"
18 #include "mds/MDCache.h"
19
20 #include "PurgeQueue.h"
21
22 #include <string.h>
23
24 #define dout_context cct
25 #define dout_subsys ceph_subsys_mds
26 #undef dout_prefix
27 #define dout_prefix _prefix(_dout, rank) << __func__ << ": "
28 static ostream& _prefix(std::ostream *_dout, mds_rank_t rank) {
29 return *_dout << "mds." << rank << ".purge_queue ";
30 }
31
32 const std::map<std::string, PurgeItem::Action> PurgeItem::actions = {
33 {"NONE", PurgeItem::NONE},
34 {"PURGE_FILE", PurgeItem::PURGE_FILE},
35 {"TRUNCATE_FILE", PurgeItem::TRUNCATE_FILE},
36 {"PURGE_DIR", PurgeItem::PURGE_DIR}
37 };
38
39 void PurgeItem::encode(bufferlist &bl) const
40 {
41 ENCODE_START(2, 1, bl);
42 encode((uint8_t)action, bl);
43 encode(ino, bl);
44 encode(size, bl);
45 encode(layout, bl, CEPH_FEATURE_FS_FILE_LAYOUT_V2);
46 encode(old_pools, bl);
47 encode(snapc, bl);
48 encode(fragtree, bl);
49 encode(stamp, bl);
50 uint8_t static const pad = 0xff;
51 for (unsigned int i = 0; i<pad_size; i++) {
52 encode(pad, bl);
53 }
54 ENCODE_FINISH(bl);
55 }
56
57 void PurgeItem::decode(bufferlist::const_iterator &p)
58 {
59 DECODE_START(2, p);
60 decode((uint8_t&)action, p);
61 decode(ino, p);
62 decode(size, p);
63 decode(layout, p);
64 decode(old_pools, p);
65 decode(snapc, p);
66 decode(fragtree, p);
67 if (struct_v >= 2) {
68 decode(stamp, p);
69 }
70 DECODE_FINISH(p);
71 }
72
73 // TODO: if Objecter has any slow requests, take that as a hint and
74 // slow down our rate of purging (keep accepting pushes though)
75 PurgeQueue::PurgeQueue(
76 CephContext *cct_,
77 mds_rank_t rank_,
78 const int64_t metadata_pool_,
79 Objecter *objecter_,
80 Context *on_error_)
81 :
82 cct(cct_),
83 rank(rank_),
84 lock("PurgeQueue"),
85 metadata_pool(metadata_pool_),
86 finisher(cct, "PurgeQueue", "PQ_Finisher"),
87 timer(cct, lock),
88 filer(objecter_, &finisher),
89 objecter(objecter_),
90 journaler("pq", MDS_INO_PURGE_QUEUE + rank, metadata_pool,
91 CEPH_FS_ONDISK_MAGIC, objecter_, nullptr, 0,
92 &finisher),
93 on_error(on_error_),
94 ops_in_flight(0),
95 max_purge_ops(0),
96 drain_initial(0),
97 draining(false),
98 delayed_flush(nullptr),
99 recovered(false)
100 {
101 ceph_assert(cct != nullptr);
102 ceph_assert(on_error != nullptr);
103 ceph_assert(objecter != nullptr);
104 journaler.set_write_error_handler(on_error);
105 }
106
107 PurgeQueue::~PurgeQueue()
108 {
109 if (logger) {
110 g_ceph_context->get_perfcounters_collection()->remove(logger.get());
111 }
112 delete on_error;
113 }
114
115 void PurgeQueue::create_logger()
116 {
117 PerfCountersBuilder pcb(g_ceph_context, "purge_queue", l_pq_first, l_pq_last);
118
119 pcb.add_u64_counter(l_pq_executed, "pq_executed", "Purge queue tasks executed",
120 "purg", PerfCountersBuilder::PRIO_INTERESTING);
121
122 pcb.set_prio_default(PerfCountersBuilder::PRIO_USEFUL);
123 pcb.add_u64(l_pq_executing_ops, "pq_executing_ops", "Purge queue ops in flight");
124 pcb.add_u64(l_pq_executing_ops_high_water, "pq_executing_ops_high_water", "Maximum number of executing file purge ops");
125 pcb.add_u64(l_pq_executing, "pq_executing", "Purge queue tasks in flight");
126 pcb.add_u64(l_pq_executing_high_water, "pq_executing_high_water", "Maximum number of executing file purges");
127
128 logger.reset(pcb.create_perf_counters());
129 g_ceph_context->get_perfcounters_collection()->add(logger.get());
130 }
131
132 void PurgeQueue::init()
133 {
134 std::lock_guard l(lock);
135
136 ceph_assert(logger != nullptr);
137
138 finisher.start();
139 timer.init();
140 }
141
142 void PurgeQueue::activate()
143 {
144 std::lock_guard l(lock);
145
146 if (readonly) {
147 dout(10) << "skipping activate: PurgeQueue is readonly" << dendl;
148 return;
149 }
150
151 if (journaler.get_read_pos() == journaler.get_write_pos())
152 return;
153
154 if (in_flight.empty()) {
155 dout(4) << "start work (by drain)" << dendl;
156 finisher.queue(new FunctionContext([this](int r) {
157 std::lock_guard l(lock);
158 _consume();
159 }));
160 }
161 }
162
163 void PurgeQueue::shutdown()
164 {
165 std::lock_guard l(lock);
166
167 journaler.shutdown();
168 timer.shutdown();
169 finisher.stop();
170 }
171
172 void PurgeQueue::open(Context *completion)
173 {
174 dout(4) << "opening" << dendl;
175
176 std::lock_guard l(lock);
177
178 if (completion)
179 waiting_for_recovery.push_back(completion);
180
181 journaler.recover(new FunctionContext([this](int r){
182 if (r == -ENOENT) {
183 dout(1) << "Purge Queue not found, assuming this is an upgrade and "
184 "creating it." << dendl;
185 create(NULL);
186 } else if (r == 0) {
187 std::lock_guard l(lock);
188 dout(4) << "open complete" << dendl;
189
190 // Journaler only guarantees entries before head write_pos have been
191 // fully flushed. Before appending new entries, we need to find and
192 // drop any partial written entry.
193 if (journaler.last_committed.write_pos < journaler.get_write_pos()) {
194 dout(4) << "recovering write_pos" << dendl;
195 journaler.set_read_pos(journaler.last_committed.write_pos);
196 _recover();
197 return;
198 }
199
200 journaler.set_writeable();
201 recovered = true;
202 finish_contexts(g_ceph_context, waiting_for_recovery);
203 } else {
204 derr << "Error " << r << " loading Journaler" << dendl;
205 _go_readonly(r);
206 }
207 }));
208 }
209
210 void PurgeQueue::wait_for_recovery(Context* c)
211 {
212 std::lock_guard l(lock);
213 if (recovered) {
214 c->complete(0);
215 } else if (readonly) {
216 dout(10) << "cannot wait for recovery: PurgeQueue is readonly" << dendl;
217 c->complete(-EROFS);
218 } else {
219 waiting_for_recovery.push_back(c);
220 }
221 }
222
223 void PurgeQueue::_recover()
224 {
225 ceph_assert(lock.is_locked_by_me());
226
227 // Journaler::is_readable() adjusts write_pos if partial entry is encountered
228 while (1) {
229 if (!journaler.is_readable() &&
230 !journaler.get_error() &&
231 journaler.get_read_pos() < journaler.get_write_pos()) {
232 journaler.wait_for_readable(new FunctionContext([this](int r) {
233 std::lock_guard l(lock);
234 _recover();
235 }));
236 return;
237 }
238
239 if (journaler.get_error()) {
240 int r = journaler.get_error();
241 derr << "Error " << r << " recovering write_pos" << dendl;
242 _go_readonly(r);
243 return;
244 }
245
246 if (journaler.get_read_pos() == journaler.get_write_pos()) {
247 dout(4) << "write_pos recovered" << dendl;
248 // restore original read_pos
249 journaler.set_read_pos(journaler.last_committed.expire_pos);
250 journaler.set_writeable();
251 recovered = true;
252 finish_contexts(g_ceph_context, waiting_for_recovery);
253 return;
254 }
255
256 bufferlist bl;
257 bool readable = journaler.try_read_entry(bl);
258 ceph_assert(readable); // we checked earlier
259 }
260 }
261
262 void PurgeQueue::create(Context *fin)
263 {
264 dout(4) << "creating" << dendl;
265 std::lock_guard l(lock);
266
267 if (fin)
268 waiting_for_recovery.push_back(fin);
269
270 file_layout_t layout = file_layout_t::get_default();
271 layout.pool_id = metadata_pool;
272 journaler.set_writeable();
273 journaler.create(&layout, JOURNAL_FORMAT_RESILIENT);
274 journaler.write_head(new FunctionContext([this](int r) {
275 std::lock_guard l(lock);
276 if (r) {
277 _go_readonly(r);
278 } else {
279 recovered = true;
280 finish_contexts(g_ceph_context, waiting_for_recovery);
281 }
282 }));
283 }
284
285 /**
286 * The `completion` context will always be called back via a Finisher
287 */
288 void PurgeQueue::push(const PurgeItem &pi, Context *completion)
289 {
290 dout(4) << "pushing inode " << pi.ino << dendl;
291 std::lock_guard l(lock);
292
293 if (readonly) {
294 dout(10) << "cannot push inode: PurgeQueue is readonly" << dendl;
295 completion->complete(-EROFS);
296 return;
297 }
298
299 // Callers should have waited for open() before using us
300 ceph_assert(!journaler.is_readonly());
301
302 bufferlist bl;
303
304 encode(pi, bl);
305 journaler.append_entry(bl);
306 journaler.wait_for_flush(completion);
307
308 // Maybe go ahead and do something with it right away
309 bool could_consume = _consume();
310 if (!could_consume) {
311 // Usually, it is not necessary to explicitly flush here, because the reader
312 // will get flushes generated inside Journaler::is_readable. However,
313 // if we remain in a _can_consume()==false state for a long period then
314 // we should flush in order to allow MDCache to drop its strays rather
315 // than having them wait for purgequeue to progress.
316 if (!delayed_flush) {
317 delayed_flush = new FunctionContext([this](int r){
318 delayed_flush = nullptr;
319 journaler.flush();
320 });
321
322 timer.add_event_after(
323 g_conf()->mds_purge_queue_busy_flush_period,
324 delayed_flush);
325 }
326 }
327 }
328
329 uint32_t PurgeQueue::_calculate_ops(const PurgeItem &item) const
330 {
331 uint32_t ops_required = 0;
332 if (item.action == PurgeItem::PURGE_DIR) {
333 // Directory, count dirfrags to be deleted
334 frag_vec_t leaves;
335 if (!item.fragtree.is_leaf(frag_t())) {
336 item.fragtree.get_leaves(leaves);
337 }
338 // One for the root, plus any leaves
339 ops_required = 1 + leaves.size();
340 } else {
341 // File, work out concurrent Filer::purge deletes
342 const uint64_t num = (item.size > 0) ?
343 Striper::get_num_objects(item.layout, item.size) : 1;
344
345 ops_required = std::min(num, g_conf()->filer_max_purge_ops);
346
347 // Account for removing (or zeroing) backtrace
348 ops_required += 1;
349
350 // Account for deletions for old pools
351 if (item.action != PurgeItem::TRUNCATE_FILE) {
352 ops_required += item.old_pools.size();
353 }
354 }
355
356 return ops_required;
357 }
358
359 bool PurgeQueue::_can_consume()
360 {
361 if (readonly) {
362 dout(10) << "can't consume: PurgeQueue is readonly" << dendl;
363 return false;
364 }
365
366 dout(20) << ops_in_flight << "/" << max_purge_ops << " ops, "
367 << in_flight.size() << "/" << g_conf()->mds_max_purge_files
368 << " files" << dendl;
369
370 if (in_flight.size() == 0 && cct->_conf->mds_max_purge_files > 0) {
371 // Always permit consumption if nothing is in flight, so that the ops
372 // limit can never be so low as to forbid all progress (unless
373 // administrator has deliberately paused purging by setting max
374 // purge files to zero).
375 return true;
376 }
377
378 if (ops_in_flight >= max_purge_ops) {
379 dout(20) << "Throttling on op limit " << ops_in_flight << "/"
380 << max_purge_ops << dendl;
381 return false;
382 }
383
384 if (in_flight.size() >= cct->_conf->mds_max_purge_files) {
385 dout(20) << "Throttling on item limit " << in_flight.size()
386 << "/" << cct->_conf->mds_max_purge_files << dendl;
387 return false;
388 } else {
389 return true;
390 }
391 }
392
393 void PurgeQueue::_go_readonly(int r)
394 {
395 if (readonly) return;
396 dout(1) << "going readonly because internal IO failed: " << strerror(-r) << dendl;
397 readonly = true;
398 on_error->complete(r);
399 on_error = nullptr;
400 journaler.set_readonly();
401 finish_contexts(g_ceph_context, waiting_for_recovery, r);
402 }
403
404 bool PurgeQueue::_consume()
405 {
406 ceph_assert(lock.is_locked_by_me());
407
408 bool could_consume = false;
409 while(_can_consume()) {
410
411 if (delayed_flush) {
412 // We are now going to read from the journal, so any proactive
413 // flush is no longer necessary. This is not functionally necessary
414 // but it can avoid generating extra fragmented flush IOs.
415 timer.cancel_event(delayed_flush);
416 delayed_flush = nullptr;
417 }
418
419 if (int r = journaler.get_error()) {
420 derr << "Error " << r << " recovering write_pos" << dendl;
421 _go_readonly(r);
422 return could_consume;
423 }
424
425 if (!journaler.is_readable()) {
426 dout(10) << " not readable right now" << dendl;
427 // Because we are the writer and the reader of the journal
428 // via the same Journaler instance, we never need to reread_head
429 if (!journaler.have_waiter()) {
430 journaler.wait_for_readable(new FunctionContext([this](int r) {
431 std::lock_guard l(lock);
432 if (r == 0) {
433 _consume();
434 } else if (r != -EAGAIN) {
435 _go_readonly(r);
436 }
437 }));
438 }
439
440 return could_consume;
441 }
442
443 could_consume = true;
444 // The journaler is readable: consume an entry
445 bufferlist bl;
446 bool readable = journaler.try_read_entry(bl);
447 ceph_assert(readable); // we checked earlier
448
449 dout(20) << " decoding entry" << dendl;
450 PurgeItem item;
451 auto q = bl.cbegin();
452 try {
453 decode(item, q);
454 } catch (const buffer::error &err) {
455 derr << "Decode error at read_pos=0x" << std::hex
456 << journaler.get_read_pos() << dendl;
457 _go_readonly(EIO);
458 }
459 dout(20) << " executing item (" << item.ino << ")" << dendl;
460 _execute_item(item, journaler.get_read_pos());
461 }
462
463 dout(10) << " cannot consume right now" << dendl;
464
465 return could_consume;
466 }
467
468 void PurgeQueue::_execute_item(
469 const PurgeItem &item,
470 uint64_t expire_to)
471 {
472 ceph_assert(lock.is_locked_by_me());
473
474 in_flight[expire_to] = item;
475 logger->set(l_pq_executing, in_flight.size());
476 files_high_water = std::max(files_high_water, in_flight.size());
477 logger->set(l_pq_executing_high_water, files_high_water);
478 auto ops = _calculate_ops(item);
479 ops_in_flight += ops;
480 logger->set(l_pq_executing_ops, ops_in_flight);
481 ops_high_water = std::max(ops_high_water, ops_in_flight);
482 logger->set(l_pq_executing_ops_high_water, ops_high_water);
483
484 SnapContext nullsnapc;
485
486 C_GatherBuilder gather(cct);
487 if (item.action == PurgeItem::PURGE_FILE) {
488 if (item.size > 0) {
489 uint64_t num = Striper::get_num_objects(item.layout, item.size);
490 dout(10) << " 0~" << item.size << " objects 0~" << num
491 << " snapc " << item.snapc << " on " << item.ino << dendl;
492 filer.purge_range(item.ino, &item.layout, item.snapc,
493 0, num, ceph::real_clock::now(), 0,
494 gather.new_sub());
495 }
496
497 // remove the backtrace object if it was not purged
498 object_t oid = CInode::get_object_name(item.ino, frag_t(), "");
499 if (!gather.has_subs() || !item.layout.pool_ns.empty()) {
500 object_locator_t oloc(item.layout.pool_id);
501 dout(10) << " remove backtrace object " << oid
502 << " pool " << oloc.pool << " snapc " << item.snapc << dendl;
503 objecter->remove(oid, oloc, item.snapc,
504 ceph::real_clock::now(), 0,
505 gather.new_sub());
506 }
507
508 // remove old backtrace objects
509 for (const auto &p : item.old_pools) {
510 object_locator_t oloc(p);
511 dout(10) << " remove backtrace object " << oid
512 << " old pool " << p << " snapc " << item.snapc << dendl;
513 objecter->remove(oid, oloc, item.snapc,
514 ceph::real_clock::now(), 0,
515 gather.new_sub());
516 }
517 } else if (item.action == PurgeItem::PURGE_DIR) {
518 object_locator_t oloc(metadata_pool);
519 frag_vec_t leaves;
520 if (!item.fragtree.is_leaf(frag_t()))
521 item.fragtree.get_leaves(leaves);
522 leaves.push_back(frag_t());
523 for (const auto &leaf : leaves) {
524 object_t oid = CInode::get_object_name(item.ino, leaf, "");
525 dout(10) << " remove dirfrag " << oid << dendl;
526 objecter->remove(oid, oloc, nullsnapc,
527 ceph::real_clock::now(),
528 0, gather.new_sub());
529 }
530 } else if (item.action == PurgeItem::TRUNCATE_FILE) {
531 const uint64_t num = Striper::get_num_objects(item.layout, item.size);
532 dout(10) << " 0~" << item.size << " objects 0~" << num
533 << " snapc " << item.snapc << " on " << item.ino << dendl;
534
535 // keep backtrace object
536 if (num > 1) {
537 filer.purge_range(item.ino, &item.layout, item.snapc,
538 1, num - 1, ceph::real_clock::now(),
539 0, gather.new_sub());
540 }
541 filer.zero(item.ino, &item.layout, item.snapc,
542 0, item.layout.object_size,
543 ceph::real_clock::now(),
544 0, true, gather.new_sub());
545 } else {
546 derr << "Invalid item (action=" << item.action << ") in purge queue, "
547 "dropping it" << dendl;
548 ops_in_flight -= ops;
549 logger->set(l_pq_executing_ops, ops_in_flight);
550 ops_high_water = std::max(ops_high_water, ops_in_flight);
551 logger->set(l_pq_executing_ops_high_water, ops_high_water);
552 in_flight.erase(expire_to);
553 logger->set(l_pq_executing, in_flight.size());
554 files_high_water = std::max(files_high_water, in_flight.size());
555 logger->set(l_pq_executing_high_water, files_high_water);
556 return;
557 }
558 ceph_assert(gather.has_subs());
559
560 gather.set_finisher(new C_OnFinisher(
561 new FunctionContext([this, expire_to](int r){
562 std::lock_guard l(lock);
563 _execute_item_complete(expire_to);
564
565 _consume();
566
567 // Have we gone idle? If so, do an extra write_head now instead of
568 // waiting for next flush after journaler_write_head_interval.
569 // Also do this periodically even if not idle, so that the persisted
570 // expire_pos doesn't fall too far behind our progress when consuming
571 // a very long queue.
572 if (in_flight.empty() || journaler.write_head_needed()) {
573 journaler.write_head(nullptr);
574 }
575 }), &finisher));
576
577 gather.activate();
578 }
579
580 void PurgeQueue::_execute_item_complete(
581 uint64_t expire_to)
582 {
583 ceph_assert(lock.is_locked_by_me());
584 dout(10) << "complete at 0x" << std::hex << expire_to << std::dec << dendl;
585 ceph_assert(in_flight.count(expire_to) == 1);
586
587 auto iter = in_flight.find(expire_to);
588 ceph_assert(iter != in_flight.end());
589 if (iter == in_flight.begin()) {
590 uint64_t pos = expire_to;
591 if (!pending_expire.empty()) {
592 auto n = iter;
593 ++n;
594 if (n == in_flight.end()) {
595 pos = *pending_expire.rbegin();
596 pending_expire.clear();
597 } else {
598 auto p = pending_expire.begin();
599 do {
600 if (*p >= n->first)
601 break;
602 pos = *p;
603 pending_expire.erase(p++);
604 } while (p != pending_expire.end());
605 }
606 }
607 dout(10) << "expiring to 0x" << std::hex << pos << std::dec << dendl;
608 journaler.set_expire_pos(pos);
609 } else {
610 // This is completely fine, we're not supposed to purge files in
611 // order when doing them in parallel.
612 dout(10) << "non-sequential completion, not expiring anything" << dendl;
613 pending_expire.insert(expire_to);
614 }
615
616 ops_in_flight -= _calculate_ops(iter->second);
617 logger->set(l_pq_executing_ops, ops_in_flight);
618 ops_high_water = std::max(ops_high_water, ops_in_flight);
619 logger->set(l_pq_executing_ops_high_water, ops_high_water);
620
621 dout(10) << "completed item for ino " << iter->second.ino << dendl;
622
623 in_flight.erase(iter);
624 logger->set(l_pq_executing, in_flight.size());
625 files_high_water = std::max(files_high_water, in_flight.size());
626 logger->set(l_pq_executing_high_water, files_high_water);
627 dout(10) << "in_flight.size() now " << in_flight.size() << dendl;
628
629 logger->inc(l_pq_executed);
630 }
631
632 void PurgeQueue::update_op_limit(const MDSMap &mds_map)
633 {
634 std::lock_guard l(lock);
635
636 if (readonly) {
637 dout(10) << "skipping; PurgeQueue is readonly" << dendl;
638 return;
639 }
640
641 uint64_t pg_count = 0;
642 objecter->with_osdmap([&](const OSDMap& o) {
643 // Number of PGs across all data pools
644 const std::vector<int64_t> &data_pools = mds_map.get_data_pools();
645 for (const auto dp : data_pools) {
646 if (o.get_pg_pool(dp) == NULL) {
647 // It is possible that we have an older OSDMap than MDSMap,
648 // because we don't start watching every OSDMap until after
649 // MDSRank is initialized
650 dout(4) << " data pool " << dp << " not found in OSDMap" << dendl;
651 continue;
652 }
653 pg_count += o.get_pg_num(dp);
654 }
655 });
656
657 // Work out a limit based on n_pgs / n_mdss, multiplied by the user's
658 // preference for how many ops per PG
659 max_purge_ops = uint64_t(((double)pg_count / (double)mds_map.get_max_mds()) *
660 cct->_conf->mds_max_purge_ops_per_pg);
661
662 // User may also specify a hard limit, apply this if so.
663 if (cct->_conf->mds_max_purge_ops) {
664 max_purge_ops = std::min(max_purge_ops, cct->_conf->mds_max_purge_ops);
665 }
666 }
667
668 void PurgeQueue::handle_conf_change(const std::set<std::string>& changed, const MDSMap& mds_map)
669 {
670 if (changed.count("mds_max_purge_ops")
671 || changed.count("mds_max_purge_ops_per_pg")) {
672 update_op_limit(mds_map);
673 } else if (changed.count("mds_max_purge_files")) {
674 std::lock_guard l(lock);
675 if (in_flight.empty()) {
676 // We might have gone from zero to a finite limit, so
677 // might need to kick off consume.
678 dout(4) << "maybe start work again (max_purge_files="
679 << g_conf()->mds_max_purge_files << dendl;
680 finisher.queue(new FunctionContext([this](int r){
681 std::lock_guard l(lock);
682 _consume();
683 }));
684 }
685 }
686 }
687
688 bool PurgeQueue::drain(
689 uint64_t *progress,
690 uint64_t *progress_total,
691 size_t *in_flight_count
692 )
693 {
694 std::lock_guard l(lock);
695
696 if (readonly) {
697 dout(10) << "skipping drain; PurgeQueue is readonly" << dendl;
698 return true;
699 }
700
701 ceph_assert(progress != nullptr);
702 ceph_assert(progress_total != nullptr);
703 ceph_assert(in_flight_count != nullptr);
704
705 const bool done = in_flight.empty() && (
706 journaler.get_read_pos() == journaler.get_write_pos());
707 if (done) {
708 return true;
709 }
710
711 const uint64_t bytes_remaining = journaler.get_write_pos()
712 - journaler.get_read_pos();
713
714 if (!draining) {
715 // Start of draining: remember how much there was outstanding at
716 // this point so that we can give a progress percentage later
717 draining = true;
718
719 // Life the op throttle as this daemon now has nothing to do but
720 // drain the purge queue, so do it as fast as we can.
721 max_purge_ops = 0xffff;
722 }
723
724 drain_initial = std::max(bytes_remaining, drain_initial);
725
726 *progress = drain_initial - bytes_remaining;
727 *progress_total = drain_initial;
728 *in_flight_count = in_flight.size();
729
730 return false;
731 }
732
733 std::string_view PurgeItem::get_type_str() const
734 {
735 switch(action) {
736 case PurgeItem::NONE: return "NONE";
737 case PurgeItem::PURGE_FILE: return "PURGE_FILE";
738 case PurgeItem::PURGE_DIR: return "PURGE_DIR";
739 case PurgeItem::TRUNCATE_FILE: return "TRUNCATE_FILE";
740 default:
741 return "UNKNOWN";
742 }
743 }
744