#include "PurgeQueue.h"
+#include <string.h>
#define dout_context cct
#define dout_subsys ceph_subsys_mds
return *_dout << "mds." << rank << ".purge_queue ";
}
+const std::map<std::string, PurgeItem::Action> PurgeItem::actions = {
+ {"NONE", PurgeItem::NONE},
+ {"PURGE_FILE", PurgeItem::PURGE_FILE},
+ {"TRUNCATE_FILE", PurgeItem::TRUNCATE_FILE},
+ {"PURGE_DIR", PurgeItem::PURGE_DIR}
+};
+
void PurgeItem::encode(bufferlist &bl) const
{
- ENCODE_START(1, 1, bl);
- ::encode((uint8_t)action, bl);
- ::encode(ino, bl);
- ::encode(size, bl);
- ::encode(layout, bl, CEPH_FEATURE_FS_FILE_LAYOUT_V2);
- ::encode(old_pools, bl);
- ::encode(snapc, bl);
- ::encode(fragtree, bl);
+ ENCODE_START(2, 1, bl);
+ encode((uint8_t)action, bl);
+ encode(ino, bl);
+ encode(size, bl);
+ encode(layout, bl, CEPH_FEATURE_FS_FILE_LAYOUT_V2);
+ encode(old_pools, bl);
+ encode(snapc, bl);
+ encode(fragtree, bl);
+ encode(stamp, bl);
+ uint8_t static const pad = 0xff;
+ for (unsigned int i = 0; i<pad_size; i++) {
+ encode(pad, bl);
+ }
ENCODE_FINISH(bl);
}
-void PurgeItem::decode(bufferlist::iterator &p)
+void PurgeItem::decode(bufferlist::const_iterator &p)
{
- DECODE_START(1, p);
- ::decode((uint8_t&)action, p);
- ::decode(ino, p);
- ::decode(size, p);
- ::decode(layout, p);
- ::decode(old_pools, p);
- ::decode(snapc, p);
- ::decode(fragtree, p);
+ DECODE_START(2, p);
+ bool done = false;
+ if (struct_v == 1) {
+ auto p_start = p;
+ try {
+ // bad encoding introduced by v13.2.2
+ decode(stamp, p);
+ decode(pad_size, p);
+ p += pad_size;
+ decode((uint8_t&)action, p);
+ decode(ino, p);
+ decode(size, p);
+ decode(layout, p);
+ decode(old_pools, p);
+ decode(snapc, p);
+ decode(fragtree, p);
+ if (p.get_off() > struct_end)
+ throw buffer::end_of_buffer();
+ done = true;
+ } catch (const buffer::error &e) {
+ p = p_start;
+ }
+ }
+ if (!done) {
+ decode((uint8_t&)action, p);
+ decode(ino, p);
+ decode(size, p);
+ decode(layout, p);
+ decode(old_pools, p);
+ decode(snapc, p);
+ decode(fragtree, p);
+ if (struct_v >= 2) {
+ decode(stamp, p);
+ }
+ }
DECODE_FINISH(p);
}
:
cct(cct_),
rank(rank_),
- lock("PurgeQueue"),
metadata_pool(metadata_pool_),
finisher(cct, "PurgeQueue", "PQ_Finisher"),
timer(cct, lock),
journaler("pq", MDS_INO_PURGE_QUEUE + rank, metadata_pool,
CEPH_FS_ONDISK_MAGIC, objecter_, nullptr, 0,
&finisher),
- on_error(on_error_),
- ops_in_flight(0),
- max_purge_ops(0),
- drain_initial(0),
- draining(false),
- delayed_flush(nullptr),
- recovered(false)
+ on_error(on_error_)
{
- assert(cct != nullptr);
- assert(on_error != nullptr);
- assert(objecter != nullptr);
+ ceph_assert(cct != nullptr);
+ ceph_assert(on_error != nullptr);
+ ceph_assert(objecter != nullptr);
journaler.set_write_error_handler(on_error);
}
if (logger) {
g_ceph_context->get_perfcounters_collection()->remove(logger.get());
}
+ delete on_error;
}
void PurgeQueue::create_logger()
{
- PerfCountersBuilder pcb(g_ceph_context,
- "purge_queue", l_pq_first, l_pq_last);
+ PerfCountersBuilder pcb(g_ceph_context, "purge_queue", l_pq_first, l_pq_last);
+
+ pcb.add_u64_counter(l_pq_executed, "pq_executed", "Purge queue tasks executed",
+ "purg", PerfCountersBuilder::PRIO_INTERESTING);
+
+ pcb.set_prio_default(PerfCountersBuilder::PRIO_USEFUL);
pcb.add_u64(l_pq_executing_ops, "pq_executing_ops", "Purge queue ops in flight");
+ pcb.add_u64(l_pq_executing_ops_high_water, "pq_executing_ops_high_water", "Maximum number of executing file purge ops");
pcb.add_u64(l_pq_executing, "pq_executing", "Purge queue tasks in flight");
- pcb.add_u64_counter(l_pq_executed, "pq_executed", "Purge queue tasks executed", "purg",
- PerfCountersBuilder::PRIO_INTERESTING);
+ pcb.add_u64(l_pq_executing_high_water, "pq_executing_high_water", "Maximum number of executing file purges");
+ pcb.add_u64(l_pq_item_in_journal, "pq_item_in_journal", "Purge item left in journal");
logger.reset(pcb.create_perf_counters());
g_ceph_context->get_perfcounters_collection()->add(logger.get());
void PurgeQueue::init()
{
- Mutex::Locker l(lock);
+ std::lock_guard l(lock);
- assert(logger != nullptr);
+ ceph_assert(logger != nullptr);
finisher.start();
timer.init();
void PurgeQueue::activate()
{
- Mutex::Locker l(lock);
+ std::lock_guard l(lock);
+
+ {
+ PurgeItem item;
+ bufferlist bl;
+
+ // calculate purge item serialized size stored in journal
+ // used to count how many items still left in journal later
+ ::encode(item, bl);
+ purge_item_journal_size = bl.length() + journaler.get_journal_envelope_size();
+ }
+
+ if (readonly) {
+ dout(10) << "skipping activate: PurgeQueue is readonly" << dendl;
+ return;
+ }
+
if (journaler.get_read_pos() == journaler.get_write_pos())
return;
if (in_flight.empty()) {
dout(4) << "start work (by drain)" << dendl;
- finisher.queue(new FunctionContext([this](int r) {
- Mutex::Locker l(lock);
+ finisher.queue(new LambdaContext([this](int r) {
+ std::lock_guard l(lock);
_consume();
}));
}
void PurgeQueue::shutdown()
{
- Mutex::Locker l(lock);
+ std::lock_guard l(lock);
journaler.shutdown();
timer.shutdown();
{
dout(4) << "opening" << dendl;
- Mutex::Locker l(lock);
+ std::lock_guard l(lock);
if (completion)
waiting_for_recovery.push_back(completion);
- journaler.recover(new FunctionContext([this](int r){
+ journaler.recover(new LambdaContext([this](int r){
if (r == -ENOENT) {
dout(1) << "Purge Queue not found, assuming this is an upgrade and "
"creating it." << dendl;
create(NULL);
} else if (r == 0) {
- Mutex::Locker l(lock);
+ std::lock_guard l(lock);
dout(4) << "open complete" << dendl;
// Journaler only guarantees entries before head write_pos have been
finish_contexts(g_ceph_context, waiting_for_recovery);
} else {
derr << "Error " << r << " loading Journaler" << dendl;
- on_error->complete(r);
+ _go_readonly(r);
}
}));
}
void PurgeQueue::wait_for_recovery(Context* c)
{
- Mutex::Locker l(lock);
- if (recovered)
+ std::lock_guard l(lock);
+ if (recovered) {
c->complete(0);
- else
+ } else if (readonly) {
+ dout(10) << "cannot wait for recovery: PurgeQueue is readonly" << dendl;
+ c->complete(-EROFS);
+ } else {
waiting_for_recovery.push_back(c);
+ }
}
void PurgeQueue::_recover()
{
- assert(lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(lock));
// Journaler::is_readable() adjusts write_pos if partial entry is encountered
while (1) {
if (!journaler.is_readable() &&
!journaler.get_error() &&
journaler.get_read_pos() < journaler.get_write_pos()) {
- journaler.wait_for_readable(new FunctionContext([this](int r) {
- Mutex::Locker l(lock);
+ journaler.wait_for_readable(new LambdaContext([this](int r) {
+ std::lock_guard l(lock);
_recover();
}));
return;
if (journaler.get_error()) {
int r = journaler.get_error();
derr << "Error " << r << " recovering write_pos" << dendl;
- on_error->complete(r);
+ _go_readonly(r);
return;
}
bufferlist bl;
bool readable = journaler.try_read_entry(bl);
- assert(readable); // we checked earlier
+ ceph_assert(readable); // we checked earlier
}
}
void PurgeQueue::create(Context *fin)
{
dout(4) << "creating" << dendl;
- Mutex::Locker l(lock);
+ std::lock_guard l(lock);
if (fin)
waiting_for_recovery.push_back(fin);
layout.pool_id = metadata_pool;
journaler.set_writeable();
journaler.create(&layout, JOURNAL_FORMAT_RESILIENT);
- journaler.write_head(new FunctionContext([this](int r) {
- Mutex::Locker l(lock);
- recovered = true;
- finish_contexts(g_ceph_context, waiting_for_recovery);
+ journaler.write_head(new LambdaContext([this](int r) {
+ std::lock_guard l(lock);
+ if (r) {
+ _go_readonly(r);
+ } else {
+ recovered = true;
+ finish_contexts(g_ceph_context, waiting_for_recovery);
+ }
}));
}
*/
void PurgeQueue::push(const PurgeItem &pi, Context *completion)
{
- dout(4) << "pushing inode 0x" << std::hex << pi.ino << std::dec << dendl;
- Mutex::Locker l(lock);
+ dout(4) << "pushing inode " << pi.ino << dendl;
+ std::lock_guard l(lock);
+
+ if (readonly) {
+ dout(10) << "cannot push inode: PurgeQueue is readonly" << dendl;
+ completion->complete(-EROFS);
+ return;
+ }
// Callers should have waited for open() before using us
- assert(!journaler.is_readonly());
+ ceph_assert(!journaler.is_readonly());
bufferlist bl;
- ::encode(pi, bl);
+ encode(pi, bl);
journaler.append_entry(bl);
journaler.wait_for_flush(completion);
if (!could_consume) {
// Usually, it is not necessary to explicitly flush here, because the reader
// will get flushes generated inside Journaler::is_readable. However,
- // if we remain in a can_consume()==false state for a long period then
+ // if we remain in a _can_consume()==false state for a long period then
// we should flush in order to allow MDCache to drop its strays rather
// than having them wait for purgequeue to progress.
if (!delayed_flush) {
- delayed_flush = new FunctionContext([this](int r){
+ delayed_flush = new LambdaContext([this](int r){
delayed_flush = nullptr;
journaler.flush();
});
timer.add_event_after(
- g_conf->mds_purge_queue_busy_flush_period,
+ g_conf()->mds_purge_queue_busy_flush_period,
delayed_flush);
}
}
uint32_t ops_required = 0;
if (item.action == PurgeItem::PURGE_DIR) {
// Directory, count dirfrags to be deleted
- std::list<frag_t> ls;
+ frag_vec_t leaves;
if (!item.fragtree.is_leaf(frag_t())) {
- item.fragtree.get_leaves(ls);
+ item.fragtree.get_leaves(leaves);
}
// One for the root, plus any leaves
- ops_required = 1 + ls.size();
+ ops_required = 1 + leaves.size();
} else {
// File, work out concurrent Filer::purge deletes
const uint64_t num = (item.size > 0) ?
Striper::get_num_objects(item.layout, item.size) : 1;
- ops_required = MIN(num, g_conf->filer_max_purge_ops);
+ ops_required = std::min(num, g_conf()->filer_max_purge_ops);
// Account for removing (or zeroing) backtrace
ops_required += 1;
return ops_required;
}
-bool PurgeQueue::can_consume()
+bool PurgeQueue::_can_consume()
{
+ if (readonly) {
+ dout(10) << "can't consume: PurgeQueue is readonly" << dendl;
+ return false;
+ }
+
dout(20) << ops_in_flight << "/" << max_purge_ops << " ops, "
- << in_flight.size() << "/" << g_conf->mds_max_purge_files
+ << in_flight.size() << "/" << g_conf()->mds_max_purge_files
<< " files" << dendl;
if (in_flight.size() == 0 && cct->_conf->mds_max_purge_files > 0) {
}
}
+void PurgeQueue::_go_readonly(int r)
+{
+ if (readonly) return;
+ dout(1) << "going readonly because internal IO failed: " << strerror(-r) << dendl;
+ readonly = true;
+ finisher.queue(on_error, r);
+ on_error = nullptr;
+ journaler.set_readonly();
+ finish_contexts(g_ceph_context, waiting_for_recovery, r);
+}
+
bool PurgeQueue::_consume()
{
- assert(lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(lock));
bool could_consume = false;
- while(can_consume()) {
+ while(_can_consume()) {
if (delayed_flush) {
// We are now going to read from the journal, so any proactive
delayed_flush = nullptr;
}
+ if (int r = journaler.get_error()) {
+ derr << "Error " << r << " recovering write_pos" << dendl;
+ _go_readonly(r);
+ return could_consume;
+ }
+
if (!journaler.is_readable()) {
dout(10) << " not readable right now" << dendl;
// Because we are the writer and the reader of the journal
// via the same Journaler instance, we never need to reread_head
if (!journaler.have_waiter()) {
- journaler.wait_for_readable(new FunctionContext([this](int r) {
- Mutex::Locker l(lock);
+ journaler.wait_for_readable(new LambdaContext([this](int r) {
+ std::lock_guard l(lock);
if (r == 0) {
_consume();
+ } else if (r != -EAGAIN) {
+ _go_readonly(r);
}
}));
}
// The journaler is readable: consume an entry
bufferlist bl;
bool readable = journaler.try_read_entry(bl);
- assert(readable); // we checked earlier
+ ceph_assert(readable); // we checked earlier
dout(20) << " decoding entry" << dendl;
PurgeItem item;
- bufferlist::iterator q = bl.begin();
+ auto q = bl.cbegin();
try {
- ::decode(item, q);
+ decode(item, q);
} catch (const buffer::error &err) {
derr << "Decode error at read_pos=0x" << std::hex
<< journaler.get_read_pos() << dendl;
- on_error->complete(0);
+ _go_readonly(EIO);
}
- dout(20) << " executing item (0x" << std::hex << item.ino
- << std::dec << ")" << dendl;
+ dout(20) << " executing item (" << item.ino << ")" << dendl;
_execute_item(item, journaler.get_read_pos());
}
const PurgeItem &item,
uint64_t expire_to)
{
- assert(lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(lock));
in_flight[expire_to] = item;
logger->set(l_pq_executing, in_flight.size());
- ops_in_flight += _calculate_ops(item);
+ files_high_water = std::max(files_high_water, in_flight.size());
+ logger->set(l_pq_executing_high_water, files_high_water);
+ auto ops = _calculate_ops(item);
+ ops_in_flight += ops;
logger->set(l_pq_executing_ops, ops_in_flight);
+ ops_high_water = std::max(ops_high_water, ops_in_flight);
+ logger->set(l_pq_executing_ops_high_water, ops_high_water);
SnapContext nullsnapc;
}
} else if (item.action == PurgeItem::PURGE_DIR) {
object_locator_t oloc(metadata_pool);
- std::list<frag_t> frags;
+ frag_vec_t leaves;
if (!item.fragtree.is_leaf(frag_t()))
- item.fragtree.get_leaves(frags);
- frags.push_back(frag_t());
- for (const auto &frag : frags) {
- object_t oid = CInode::get_object_name(item.ino, frag, "");
+ item.fragtree.get_leaves(leaves);
+ leaves.push_back(frag_t());
+ for (const auto &leaf : leaves) {
+ object_t oid = CInode::get_object_name(item.ino, leaf, "");
dout(10) << " remove dirfrag " << oid << dendl;
objecter->remove(oid, oloc, nullsnapc,
ceph::real_clock::now(),
} else {
derr << "Invalid item (action=" << item.action << ") in purge queue, "
"dropping it" << dendl;
+ ops_in_flight -= ops;
+ logger->set(l_pq_executing_ops, ops_in_flight);
+ ops_high_water = std::max(ops_high_water, ops_in_flight);
+ logger->set(l_pq_executing_ops_high_water, ops_high_water);
in_flight.erase(expire_to);
logger->set(l_pq_executing, in_flight.size());
+ files_high_water = std::max(files_high_water, in_flight.size());
+ logger->set(l_pq_executing_high_water, files_high_water);
return;
}
- assert(gather.has_subs());
+ ceph_assert(gather.has_subs());
gather.set_finisher(new C_OnFinisher(
- new FunctionContext([this, expire_to](int r){
- Mutex::Locker l(lock);
- _execute_item_complete(expire_to);
+ new LambdaContext([this, expire_to](int r){
+ std::lock_guard l(lock);
+ if (r == -EBLACKLISTED) {
+ finisher.queue(on_error, r);
+ on_error = nullptr;
+ return;
+ }
+
+ _execute_item_complete(expire_to);
_consume();
// Have we gone idle? If so, do an extra write_head now instead of
// Also do this periodically even if not idle, so that the persisted
// expire_pos doesn't fall too far behind our progress when consuming
// a very long queue.
- if (in_flight.empty() || journaler.write_head_needed()) {
- journaler.write_head(new FunctionContext([this](int r){
- journaler.trim();
- }));
+ if (!readonly &&
+ (in_flight.empty() || journaler.write_head_needed())) {
+ journaler.write_head(nullptr);
}
}), &finisher));
void PurgeQueue::_execute_item_complete(
uint64_t expire_to)
{
- assert(lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(lock));
dout(10) << "complete at 0x" << std::hex << expire_to << std::dec << dendl;
- assert(in_flight.count(expire_to) == 1);
+ ceph_assert(in_flight.count(expire_to) == 1);
auto iter = in_flight.find(expire_to);
- assert(iter != in_flight.end());
+ ceph_assert(iter != in_flight.end());
if (iter == in_flight.begin()) {
- // This was the lowest journal position in flight, so we can now
- // safely expire the journal up to here.
- dout(10) << "expiring to 0x" << std::hex << expire_to << std::dec << dendl;
- journaler.set_expire_pos(expire_to);
+ uint64_t pos = expire_to;
+ if (!pending_expire.empty()) {
+ auto n = iter;
+ ++n;
+ if (n == in_flight.end()) {
+ pos = *pending_expire.rbegin();
+ pending_expire.clear();
+ } else {
+ auto p = pending_expire.begin();
+ do {
+ if (*p >= n->first)
+ break;
+ pos = *p;
+ pending_expire.erase(p++);
+ } while (p != pending_expire.end());
+ }
+ }
+ dout(10) << "expiring to 0x" << std::hex << pos << std::dec << dendl;
+ journaler.set_expire_pos(pos);
} else {
// This is completely fine, we're not supposed to purge files in
// order when doing them in parallel.
dout(10) << "non-sequential completion, not expiring anything" << dendl;
+ pending_expire.insert(expire_to);
}
ops_in_flight -= _calculate_ops(iter->second);
logger->set(l_pq_executing_ops, ops_in_flight);
+ ops_high_water = std::max(ops_high_water, ops_in_flight);
+ logger->set(l_pq_executing_ops_high_water, ops_high_water);
- dout(10) << "completed item for ino 0x" << std::hex << iter->second.ino
- << std::dec << dendl;
+ dout(10) << "completed item for ino " << iter->second.ino << dendl;
in_flight.erase(iter);
logger->set(l_pq_executing, in_flight.size());
+ files_high_water = std::max(files_high_water, in_flight.size());
+ logger->set(l_pq_executing_high_water, files_high_water);
dout(10) << "in_flight.size() now " << in_flight.size() << dendl;
+ uint64_t write_pos = journaler.get_write_pos();
+ uint64_t read_pos = journaler.get_read_pos();
+ uint64_t expire_pos = journaler.get_expire_pos();
+ uint64_t item_num = (write_pos - (in_flight.size() ? expire_pos : read_pos))
+ / purge_item_journal_size;
+ dout(10) << "left purge items in journal: " << item_num
+ << " (purge_item_journal_size/write_pos/read_pos/expire_pos) now at "
+ << "(" << purge_item_journal_size << "/" << write_pos << "/" << read_pos
+ << "/" << expire_pos << ")" << dendl;
+
+ logger->set(l_pq_item_in_journal, item_num);
logger->inc(l_pq_executed);
}
void PurgeQueue::update_op_limit(const MDSMap &mds_map)
{
- Mutex::Locker l(lock);
+ std::lock_guard l(lock);
+
+ if (readonly) {
+ dout(10) << "skipping; PurgeQueue is readonly" << dendl;
+ return;
+ }
uint64_t pg_count = 0;
objecter->with_osdmap([&](const OSDMap& o) {
// User may also specify a hard limit, apply this if so.
if (cct->_conf->mds_max_purge_ops) {
- max_purge_ops = MIN(max_purge_ops, cct->_conf->mds_max_purge_ops);
+ max_purge_ops = std::min(max_purge_ops, cct->_conf->mds_max_purge_ops);
}
}
-void PurgeQueue::handle_conf_change(const struct md_config_t *conf,
- const std::set <std::string> &changed,
- const MDSMap &mds_map)
+void PurgeQueue::handle_conf_change(const std::set<std::string>& changed, const MDSMap& mds_map)
{
if (changed.count("mds_max_purge_ops")
|| changed.count("mds_max_purge_ops_per_pg")) {
update_op_limit(mds_map);
} else if (changed.count("mds_max_purge_files")) {
- Mutex::Locker l(lock);
-
+ std::lock_guard l(lock);
if (in_flight.empty()) {
// We might have gone from zero to a finite limit, so
// might need to kick off consume.
dout(4) << "maybe start work again (max_purge_files="
- << conf->mds_max_purge_files << dendl;
- finisher.queue(new FunctionContext([this](int r){
- Mutex::Locker l(lock);
+ << g_conf()->mds_max_purge_files << dendl;
+ finisher.queue(new LambdaContext([this](int r){
+ std::lock_guard l(lock);
_consume();
}));
}
size_t *in_flight_count
)
{
- assert(progress != nullptr);
- assert(progress_total != nullptr);
- assert(in_flight_count != nullptr);
+ std::lock_guard l(lock);
+
+ if (readonly) {
+ dout(10) << "skipping drain; PurgeQueue is readonly" << dendl;
+ return true;
+ }
+
+ ceph_assert(progress != nullptr);
+ ceph_assert(progress_total != nullptr);
+ ceph_assert(in_flight_count != nullptr);
const bool done = in_flight.empty() && (
journaler.get_read_pos() == journaler.get_write_pos());
max_purge_ops = 0xffff;
}
- drain_initial = max(bytes_remaining, drain_initial);
+ drain_initial = std::max(bytes_remaining, drain_initial);
*progress = drain_initial - bytes_remaining;
*progress_total = drain_initial;
return false;
}
+std::string_view PurgeItem::get_type_str() const
+{
+ switch(action) {
+ case PurgeItem::NONE: return "NONE";
+ case PurgeItem::PURGE_FILE: return "PURGE_FILE";
+ case PurgeItem::PURGE_DIR: return "PURGE_DIR";
+ case PurgeItem::TRUNCATE_FILE: return "TRUNCATE_FILE";
+ default:
+ return "UNKNOWN";
+ }
+}
+