]> git.proxmox.com Git - ceph.git/blame - ceph/src/mds/PurgeQueue.cc
bump version to 12.2.1-pve3
[ceph.git] / ceph / src / mds / PurgeQueue.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2015 Red Hat
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include "common/debug.h"
16#include "mds/mdstypes.h"
17#include "mds/CInode.h"
18#include "mds/MDCache.h"
19
20#include "PurgeQueue.h"
21
22
23#define dout_context cct
24#define dout_subsys ceph_subsys_mds
25#undef dout_prefix
26#define dout_prefix _prefix(_dout, rank) << __func__ << ": "
27static ostream& _prefix(std::ostream *_dout, mds_rank_t rank) {
28 return *_dout << "mds." << rank << ".purge_queue ";
29}
30
31void PurgeItem::encode(bufferlist &bl) const
32{
33 ENCODE_START(1, 1, bl);
34 ::encode((uint8_t)action, bl);
35 ::encode(ino, bl);
36 ::encode(size, bl);
37 ::encode(layout, bl, CEPH_FEATURE_FS_FILE_LAYOUT_V2);
38 ::encode(old_pools, bl);
39 ::encode(snapc, bl);
40 ::encode(fragtree, bl);
41 ENCODE_FINISH(bl);
42}
43
44void PurgeItem::decode(bufferlist::iterator &p)
45{
46 DECODE_START(1, p);
47 ::decode((uint8_t&)action, p);
48 ::decode(ino, p);
49 ::decode(size, p);
50 ::decode(layout, p);
51 ::decode(old_pools, p);
52 ::decode(snapc, p);
53 ::decode(fragtree, p);
54 DECODE_FINISH(p);
55}
56
57// TODO: if Objecter has any slow requests, take that as a hint and
58// slow down our rate of purging (keep accepting pushes though)
59PurgeQueue::PurgeQueue(
60 CephContext *cct_,
61 mds_rank_t rank_,
62 const int64_t metadata_pool_,
63 Objecter *objecter_,
64 Context *on_error_)
65 :
66 cct(cct_),
67 rank(rank_),
68 lock("PurgeQueue"),
69 metadata_pool(metadata_pool_),
70 finisher(cct, "PurgeQueue", "PQ_Finisher"),
71 timer(cct, lock),
72 filer(objecter_, &finisher),
73 objecter(objecter_),
74 journaler("pq", MDS_INO_PURGE_QUEUE + rank, metadata_pool,
75 CEPH_FS_ONDISK_MAGIC, objecter_, nullptr, 0,
76 &finisher),
77 on_error(on_error_),
78 ops_in_flight(0),
79 max_purge_ops(0),
80 drain_initial(0),
81 draining(false),
82 delayed_flush(nullptr)
83{
84 assert(cct != nullptr);
85 assert(on_error != nullptr);
86 assert(objecter != nullptr);
87 journaler.set_write_error_handler(on_error);
88}
89
90PurgeQueue::~PurgeQueue()
91{
92 if (logger) {
93 g_ceph_context->get_perfcounters_collection()->remove(logger.get());
94 }
95}
96
97void PurgeQueue::create_logger()
98{
99 PerfCountersBuilder pcb(g_ceph_context,
100 "purge_queue", l_pq_first, l_pq_last);
101 pcb.add_u64(l_pq_executing_ops, "pq_executing_ops", "Purge queue ops in flight");
102 pcb.add_u64(l_pq_executing, "pq_executing", "Purge queue tasks in flight");
c07f9fc5
FG
103 pcb.add_u64_counter(l_pq_executed, "pq_executed", "Purge queue tasks executed", "purg",
104 PerfCountersBuilder::PRIO_INTERESTING);
7c673cae
FG
105
106 logger.reset(pcb.create_perf_counters());
107 g_ceph_context->get_perfcounters_collection()->add(logger.get());
108}
109
110void PurgeQueue::init()
111{
112 Mutex::Locker l(lock);
113
114 assert(logger != nullptr);
115
116 finisher.start();
117 timer.init();
118}
119
c07f9fc5
FG
120void PurgeQueue::activate()
121{
122 Mutex::Locker l(lock);
123 if (journaler.get_read_pos() == journaler.get_write_pos())
124 return;
125
126 if (in_flight.empty()) {
127 dout(4) << "start work (by drain)" << dendl;
128 finisher.queue(new FunctionContext([this](int r) {
129 Mutex::Locker l(lock);
130 _consume();
131 }));
132 }
133}
134
7c673cae
FG
135void PurgeQueue::shutdown()
136{
137 Mutex::Locker l(lock);
138
139 journaler.shutdown();
140 timer.shutdown();
141 finisher.stop();
142}
143
144void PurgeQueue::open(Context *completion)
145{
146 dout(4) << "opening" << dendl;
147
148 Mutex::Locker l(lock);
149
150 journaler.recover(new FunctionContext([this, completion](int r){
151 if (r == -ENOENT) {
152 dout(1) << "Purge Queue not found, assuming this is an upgrade and "
153 "creating it." << dendl;
154 create(completion);
155 } else if (r == 0) {
156 Mutex::Locker l(lock);
157 dout(4) << "open complete" << dendl;
158
159 // Journaler only guarantees entries before head write_pos have been
160 // fully flushed. Before appending new entries, we need to find and
161 // drop any partial written entry.
162 if (journaler.last_committed.write_pos < journaler.get_write_pos()) {
163 dout(4) << "recovering write_pos" << dendl;
164 journaler.set_read_pos(journaler.last_committed.write_pos);
165 _recover(completion);
166 return;
167 }
168
169 journaler.set_writeable();
170 completion->complete(0);
171 } else {
172 derr << "Error " << r << " loading Journaler" << dendl;
173 on_error->complete(r);
174 }
175 }));
176}
177
178
179void PurgeQueue::_recover(Context *completion)
180{
181 assert(lock.is_locked_by_me());
182
183 // Journaler::is_readable() adjusts write_pos if partial entry is encountered
184 while (1) {
185 if (!journaler.is_readable() &&
186 !journaler.get_error() &&
187 journaler.get_read_pos() < journaler.get_write_pos()) {
188 journaler.wait_for_readable(new FunctionContext([this, completion](int r) {
189 Mutex::Locker l(lock);
190 _recover(completion);
191 }));
192 return;
193 }
194
195 if (journaler.get_error()) {
196 int r = journaler.get_error();
197 derr << "Error " << r << " recovering write_pos" << dendl;
198 on_error->complete(r);
199 return;
200 }
201
202 if (journaler.get_read_pos() == journaler.get_write_pos()) {
203 dout(4) << "write_pos recovered" << dendl;
204 // restore original read_pos
205 journaler.set_read_pos(journaler.last_committed.expire_pos);
206 journaler.set_writeable();
207 completion->complete(0);
208 return;
209 }
210
211 bufferlist bl;
212 bool readable = journaler.try_read_entry(bl);
213 assert(readable); // we checked earlier
214 }
215}
216
217void PurgeQueue::create(Context *fin)
218{
219 dout(4) << "creating" << dendl;
220 Mutex::Locker l(lock);
221
222 file_layout_t layout = file_layout_t::get_default();
223 layout.pool_id = metadata_pool;
224 journaler.set_writeable();
225 journaler.create(&layout, JOURNAL_FORMAT_RESILIENT);
226 journaler.write_head(fin);
227}
228
229/**
230 * The `completion` context will always be called back via a Finisher
231 */
232void PurgeQueue::push(const PurgeItem &pi, Context *completion)
233{
234 dout(4) << "pushing inode 0x" << std::hex << pi.ino << std::dec << dendl;
235 Mutex::Locker l(lock);
236
237 // Callers should have waited for open() before using us
238 assert(!journaler.is_readonly());
239
240 bufferlist bl;
241
242 ::encode(pi, bl);
243 journaler.append_entry(bl);
244 journaler.wait_for_flush(completion);
245
246 // Maybe go ahead and do something with it right away
247 bool could_consume = _consume();
248 if (!could_consume) {
249 // Usually, it is not necessary to explicitly flush here, because the reader
250 // will get flushes generated inside Journaler::is_readable. However,
251 // if we remain in a can_consume()==false state for a long period then
252 // we should flush in order to allow MDCache to drop its strays rather
253 // than having them wait for purgequeue to progress.
254 if (!delayed_flush) {
255 delayed_flush = new FunctionContext([this](int r){
256 delayed_flush = nullptr;
257 journaler.flush();
258 });
259
260 timer.add_event_after(
261 g_conf->mds_purge_queue_busy_flush_period,
262 delayed_flush);
263 }
264 }
265}
266
267uint32_t PurgeQueue::_calculate_ops(const PurgeItem &item) const
268{
269 uint32_t ops_required = 0;
270 if (item.action == PurgeItem::PURGE_DIR) {
271 // Directory, count dirfrags to be deleted
272 std::list<frag_t> ls;
273 if (!item.fragtree.is_leaf(frag_t())) {
274 item.fragtree.get_leaves(ls);
275 }
276 // One for the root, plus any leaves
277 ops_required = 1 + ls.size();
278 } else {
279 // File, work out concurrent Filer::purge deletes
280 const uint64_t num = (item.size > 0) ?
281 Striper::get_num_objects(item.layout, item.size) : 1;
282
283 ops_required = MIN(num, g_conf->filer_max_purge_ops);
284
285 // Account for removing (or zeroing) backtrace
286 ops_required += 1;
287
288 // Account for deletions for old pools
289 if (item.action != PurgeItem::TRUNCATE_FILE) {
290 ops_required += item.old_pools.size();
291 }
292 }
293
294 return ops_required;
295}
296
297bool PurgeQueue::can_consume()
298{
299 dout(20) << ops_in_flight << "/" << max_purge_ops << " ops, "
300 << in_flight.size() << "/" << g_conf->mds_max_purge_files
301 << " files" << dendl;
302
303 if (in_flight.size() == 0 && cct->_conf->mds_max_purge_files > 0) {
304 // Always permit consumption if nothing is in flight, so that the ops
305 // limit can never be so low as to forbid all progress (unless
306 // administrator has deliberately paused purging by setting max
307 // purge files to zero).
308 return true;
309 }
310
311 if (ops_in_flight >= max_purge_ops) {
312 dout(20) << "Throttling on op limit " << ops_in_flight << "/"
313 << max_purge_ops << dendl;
314 return false;
315 }
316
317 if (in_flight.size() >= cct->_conf->mds_max_purge_files) {
318 dout(20) << "Throttling on item limit " << in_flight.size()
319 << "/" << cct->_conf->mds_max_purge_files << dendl;
320 return false;
321 } else {
322 return true;
323 }
324}
325
326bool PurgeQueue::_consume()
327{
328 assert(lock.is_locked_by_me());
329
330 bool could_consume = false;
331 while(can_consume()) {
332 could_consume = true;
333
334 if (delayed_flush) {
335 // We are now going to read from the journal, so any proactive
336 // flush is no longer necessary. This is not functionally necessary
337 // but it can avoid generating extra fragmented flush IOs.
338 timer.cancel_event(delayed_flush);
339 delayed_flush = nullptr;
340 }
341
342 if (!journaler.is_readable()) {
343 dout(10) << " not readable right now" << dendl;
344 // Because we are the writer and the reader of the journal
345 // via the same Journaler instance, we never need to reread_head
346 if (!journaler.have_waiter()) {
347 journaler.wait_for_readable(new FunctionContext([this](int r) {
348 Mutex::Locker l(lock);
349 if (r == 0) {
350 _consume();
351 }
352 }));
353 }
354
355 return could_consume;
356 }
357
358 // The journaler is readable: consume an entry
359 bufferlist bl;
360 bool readable = journaler.try_read_entry(bl);
361 assert(readable); // we checked earlier
362
363 dout(20) << " decoding entry" << dendl;
364 PurgeItem item;
365 bufferlist::iterator q = bl.begin();
366 try {
367 ::decode(item, q);
368 } catch (const buffer::error &err) {
369 derr << "Decode error at read_pos=0x" << std::hex
370 << journaler.get_read_pos() << dendl;
371 on_error->complete(0);
372 }
373 dout(20) << " executing item (0x" << std::hex << item.ino
374 << std::dec << ")" << dendl;
375 _execute_item(item, journaler.get_read_pos());
376 }
377
378 dout(10) << " cannot consume right now" << dendl;
379
380 return could_consume;
381}
382
383void PurgeQueue::_execute_item(
384 const PurgeItem &item,
385 uint64_t expire_to)
386{
387 assert(lock.is_locked_by_me());
388
389 in_flight[expire_to] = item;
390 logger->set(l_pq_executing, in_flight.size());
391 ops_in_flight += _calculate_ops(item);
392 logger->set(l_pq_executing_ops, ops_in_flight);
393
394 SnapContext nullsnapc;
395
396 C_GatherBuilder gather(cct);
397 if (item.action == PurgeItem::PURGE_FILE) {
398 if (item.size > 0) {
399 uint64_t num = Striper::get_num_objects(item.layout, item.size);
400 dout(10) << " 0~" << item.size << " objects 0~" << num
401 << " snapc " << item.snapc << " on " << item.ino << dendl;
402 filer.purge_range(item.ino, &item.layout, item.snapc,
403 0, num, ceph::real_clock::now(), 0,
404 gather.new_sub());
405 }
406
407 // remove the backtrace object if it was not purged
408 object_t oid = CInode::get_object_name(item.ino, frag_t(), "");
409 if (!gather.has_subs() || !item.layout.pool_ns.empty()) {
410 object_locator_t oloc(item.layout.pool_id);
411 dout(10) << " remove backtrace object " << oid
412 << " pool " << oloc.pool << " snapc " << item.snapc << dendl;
413 objecter->remove(oid, oloc, item.snapc,
414 ceph::real_clock::now(), 0,
415 gather.new_sub());
416 }
417
418 // remove old backtrace objects
419 for (const auto &p : item.old_pools) {
420 object_locator_t oloc(p);
421 dout(10) << " remove backtrace object " << oid
422 << " old pool " << p << " snapc " << item.snapc << dendl;
423 objecter->remove(oid, oloc, item.snapc,
424 ceph::real_clock::now(), 0,
425 gather.new_sub());
426 }
427 } else if (item.action == PurgeItem::PURGE_DIR) {
428 object_locator_t oloc(metadata_pool);
429 std::list<frag_t> frags;
430 if (!item.fragtree.is_leaf(frag_t()))
431 item.fragtree.get_leaves(frags);
432 frags.push_back(frag_t());
433 for (const auto &frag : frags) {
434 object_t oid = CInode::get_object_name(item.ino, frag, "");
435 dout(10) << " remove dirfrag " << oid << dendl;
436 objecter->remove(oid, oloc, nullsnapc,
437 ceph::real_clock::now(),
438 0, gather.new_sub());
439 }
440 } else if (item.action == PurgeItem::TRUNCATE_FILE) {
441 const uint64_t num = Striper::get_num_objects(item.layout, item.size);
442 dout(10) << " 0~" << item.size << " objects 0~" << num
443 << " snapc " << item.snapc << " on " << item.ino << dendl;
444
445 // keep backtrace object
446 if (num > 1) {
447 filer.purge_range(item.ino, &item.layout, item.snapc,
448 1, num - 1, ceph::real_clock::now(),
449 0, gather.new_sub());
450 }
451 filer.zero(item.ino, &item.layout, item.snapc,
452 0, item.layout.object_size,
453 ceph::real_clock::now(),
454 0, true, gather.new_sub());
455 } else {
456 derr << "Invalid item (action=" << item.action << ") in purge queue, "
457 "dropping it" << dendl;
458 in_flight.erase(expire_to);
459 logger->set(l_pq_executing, in_flight.size());
460 return;
461 }
462 assert(gather.has_subs());
463
31f18b77
FG
464 gather.set_finisher(new C_OnFinisher(
465 new FunctionContext([this, expire_to](int r){
466 Mutex::Locker l(lock);
467 _execute_item_complete(expire_to);
7c673cae 468
31f18b77 469 _consume();
7c673cae
FG
470
471 // Have we gone idle? If so, do an extra write_head now instead of
472 // waiting for next flush after journaler_write_head_interval.
473 // Also do this periodically even if not idle, so that the persisted
474 // expire_pos doesn't fall too far behind our progress when consuming
475 // a very long queue.
476 if (in_flight.empty() || journaler.write_head_needed()) {
477 journaler.write_head(new FunctionContext([this](int r){
478 journaler.trim();
479 }));
480 }
31f18b77
FG
481 }), &finisher));
482
7c673cae
FG
483 gather.activate();
484}
485
486void PurgeQueue::_execute_item_complete(
487 uint64_t expire_to)
488{
489 assert(lock.is_locked_by_me());
490 dout(10) << "complete at 0x" << std::hex << expire_to << std::dec << dendl;
491 assert(in_flight.count(expire_to) == 1);
492
493 auto iter = in_flight.find(expire_to);
494 assert(iter != in_flight.end());
495 if (iter == in_flight.begin()) {
496 // This was the lowest journal position in flight, so we can now
497 // safely expire the journal up to here.
498 dout(10) << "expiring to 0x" << std::hex << expire_to << std::dec << dendl;
499 journaler.set_expire_pos(expire_to);
500 } else {
501 // This is completely fine, we're not supposed to purge files in
502 // order when doing them in parallel.
503 dout(10) << "non-sequential completion, not expiring anything" << dendl;
504 }
505
506 ops_in_flight -= _calculate_ops(iter->second);
507 logger->set(l_pq_executing_ops, ops_in_flight);
508
509 dout(10) << "completed item for ino 0x" << std::hex << iter->second.ino
510 << std::dec << dendl;
511
512 in_flight.erase(iter);
513 logger->set(l_pq_executing, in_flight.size());
514 dout(10) << "in_flight.size() now " << in_flight.size() << dendl;
515
516 logger->inc(l_pq_executed);
517}
518
519void PurgeQueue::update_op_limit(const MDSMap &mds_map)
520{
521 Mutex::Locker l(lock);
522
523 uint64_t pg_count = 0;
524 objecter->with_osdmap([&](const OSDMap& o) {
525 // Number of PGs across all data pools
31f18b77 526 const std::vector<int64_t> &data_pools = mds_map.get_data_pools();
7c673cae
FG
527 for (const auto dp : data_pools) {
528 if (o.get_pg_pool(dp) == NULL) {
529 // It is possible that we have an older OSDMap than MDSMap,
530 // because we don't start watching every OSDMap until after
531 // MDSRank is initialized
532 dout(4) << " data pool " << dp << " not found in OSDMap" << dendl;
533 continue;
534 }
535 pg_count += o.get_pg_num(dp);
536 }
537 });
538
539 // Work out a limit based on n_pgs / n_mdss, multiplied by the user's
540 // preference for how many ops per PG
541 max_purge_ops = uint64_t(((double)pg_count / (double)mds_map.get_max_mds()) *
542 cct->_conf->mds_max_purge_ops_per_pg);
543
544 // User may also specify a hard limit, apply this if so.
545 if (cct->_conf->mds_max_purge_ops) {
546 max_purge_ops = MIN(max_purge_ops, cct->_conf->mds_max_purge_ops);
547 }
548}
549
550void PurgeQueue::handle_conf_change(const struct md_config_t *conf,
551 const std::set <std::string> &changed,
552 const MDSMap &mds_map)
553{
554 if (changed.count("mds_max_purge_ops")
555 || changed.count("mds_max_purge_ops_per_pg")) {
556 update_op_limit(mds_map);
557 } else if (changed.count("mds_max_purge_files")) {
558 Mutex::Locker l(lock);
559
560 if (in_flight.empty()) {
561 // We might have gone from zero to a finite limit, so
562 // might need to kick off consume.
563 dout(4) << "maybe start work again (max_purge_files="
564 << conf->mds_max_purge_files << dendl;
565 finisher.queue(new FunctionContext([this](int r){
566 Mutex::Locker l(lock);
567 _consume();
568 }));
569 }
570 }
571}
572
573bool PurgeQueue::drain(
574 uint64_t *progress,
575 uint64_t *progress_total,
576 size_t *in_flight_count
577 )
578{
579 assert(progress != nullptr);
580 assert(progress_total != nullptr);
581 assert(in_flight_count != nullptr);
582
583 const bool done = in_flight.empty() && (
584 journaler.get_read_pos() == journaler.get_write_pos());
585 if (done) {
586 return true;
587 }
588
589 const uint64_t bytes_remaining = journaler.get_write_pos()
590 - journaler.get_read_pos();
591
592 if (!draining) {
593 // Start of draining: remember how much there was outstanding at
594 // this point so that we can give a progress percentage later
595 draining = true;
596
597 // Life the op throttle as this daemon now has nothing to do but
598 // drain the purge queue, so do it as fast as we can.
599 max_purge_ops = 0xffff;
600 }
601
602 drain_initial = max(bytes_remaining, drain_initial);
603
604 *progress = drain_initial - bytes_remaining;
605 *progress_total = drain_initial;
606 *in_flight_count = in_flight.size();
607
608 return false;
609}
610