void PurgeItem::decode(bufferlist::const_iterator &p)
{
DECODE_START(2, p);
- decode((uint8_t&)action, p);
- decode(ino, p);
- decode(size, p);
- decode(layout, p);
- decode(old_pools, p);
- decode(snapc, p);
- decode(fragtree, p);
- if (struct_v >= 2) {
- decode(stamp, p);
+ bool done = false;
+ if (struct_v == 1) {
+ auto p_start = p;
+ try {
+ // bad encoding introduced by v13.2.2
+ decode(stamp, p);
+ decode(pad_size, p);
+ p += pad_size;
+ decode((uint8_t&)action, p);
+ decode(ino, p);
+ decode(size, p);
+ decode(layout, p);
+ decode(old_pools, p);
+ decode(snapc, p);
+ decode(fragtree, p);
+ if (p.get_off() > struct_end)
+ throw buffer::end_of_buffer();
+ done = true;
+ } catch (const buffer::error &e) {
+ p = p_start;
+ }
+ }
+ if (!done) {
+ decode((uint8_t&)action, p);
+ decode(ino, p);
+ decode(size, p);
+ decode(layout, p);
+ decode(old_pools, p);
+ decode(snapc, p);
+ decode(fragtree, p);
+ if (struct_v >= 2) {
+ decode(stamp, p);
+ }
}
DECODE_FINISH(p);
}
:
cct(cct_),
rank(rank_),
- lock("PurgeQueue"),
metadata_pool(metadata_pool_),
finisher(cct, "PurgeQueue", "PQ_Finisher"),
timer(cct, lock),
journaler("pq", MDS_INO_PURGE_QUEUE + rank, metadata_pool,
CEPH_FS_ONDISK_MAGIC, objecter_, nullptr, 0,
&finisher),
- on_error(on_error_),
- ops_in_flight(0),
- max_purge_ops(0),
- drain_initial(0),
- draining(false),
- delayed_flush(nullptr),
- recovered(false)
+ on_error(on_error_)
{
ceph_assert(cct != nullptr);
ceph_assert(on_error != nullptr);
pcb.add_u64(l_pq_executing_ops_high_water, "pq_executing_ops_high_water", "Maximum number of executing file purge ops");
pcb.add_u64(l_pq_executing, "pq_executing", "Purge queue tasks in flight");
pcb.add_u64(l_pq_executing_high_water, "pq_executing_high_water", "Maximum number of executing file purges");
+ pcb.add_u64(l_pq_item_in_journal, "pq_item_in_journal", "Purge item left in journal");
logger.reset(pcb.create_perf_counters());
g_ceph_context->get_perfcounters_collection()->add(logger.get());
{
std::lock_guard l(lock);
+ {
+ PurgeItem item;
+ bufferlist bl;
+
+ // calculate purge item serialized size stored in journal
+ // used to count how many items still left in journal later
+ ::encode(item, bl);
+ purge_item_journal_size = bl.length() + journaler.get_journal_envelope_size();
+ }
+
if (readonly) {
dout(10) << "skipping activate: PurgeQueue is readonly" << dendl;
return;
if (in_flight.empty()) {
dout(4) << "start work (by drain)" << dendl;
- finisher.queue(new FunctionContext([this](int r) {
+ finisher.queue(new LambdaContext([this](int r) {
std::lock_guard l(lock);
_consume();
}));
if (completion)
waiting_for_recovery.push_back(completion);
- journaler.recover(new FunctionContext([this](int r){
+ journaler.recover(new LambdaContext([this](int r){
if (r == -ENOENT) {
dout(1) << "Purge Queue not found, assuming this is an upgrade and "
"creating it." << dendl;
void PurgeQueue::_recover()
{
- ceph_assert(lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(lock));
// Journaler::is_readable() adjusts write_pos if partial entry is encountered
while (1) {
if (!journaler.is_readable() &&
!journaler.get_error() &&
journaler.get_read_pos() < journaler.get_write_pos()) {
- journaler.wait_for_readable(new FunctionContext([this](int r) {
+ journaler.wait_for_readable(new LambdaContext([this](int r) {
std::lock_guard l(lock);
_recover();
}));
layout.pool_id = metadata_pool;
journaler.set_writeable();
journaler.create(&layout, JOURNAL_FORMAT_RESILIENT);
- journaler.write_head(new FunctionContext([this](int r) {
+ journaler.write_head(new LambdaContext([this](int r) {
std::lock_guard l(lock);
if (r) {
_go_readonly(r);
// we should flush in order to allow MDCache to drop its strays rather
// than having them wait for purgequeue to progress.
if (!delayed_flush) {
- delayed_flush = new FunctionContext([this](int r){
+ delayed_flush = new LambdaContext([this](int r){
delayed_flush = nullptr;
journaler.flush();
});
if (readonly) return;
dout(1) << "going readonly because internal IO failed: " << strerror(-r) << dendl;
readonly = true;
- on_error->complete(r);
+ finisher.queue(on_error, r);
on_error = nullptr;
journaler.set_readonly();
finish_contexts(g_ceph_context, waiting_for_recovery, r);
bool PurgeQueue::_consume()
{
- ceph_assert(lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(lock));
bool could_consume = false;
while(_can_consume()) {
// Because we are the writer and the reader of the journal
// via the same Journaler instance, we never need to reread_head
if (!journaler.have_waiter()) {
- journaler.wait_for_readable(new FunctionContext([this](int r) {
+ journaler.wait_for_readable(new LambdaContext([this](int r) {
std::lock_guard l(lock);
if (r == 0) {
_consume();
const PurgeItem &item,
uint64_t expire_to)
{
- ceph_assert(lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(lock));
in_flight[expire_to] = item;
logger->set(l_pq_executing, in_flight.size());
ceph_assert(gather.has_subs());
gather.set_finisher(new C_OnFinisher(
- new FunctionContext([this, expire_to](int r){
+ new LambdaContext([this, expire_to](int r){
std::lock_guard l(lock);
- _execute_item_complete(expire_to);
+ if (r == -EBLACKLISTED) {
+ finisher.queue(on_error, r);
+ on_error = nullptr;
+ return;
+ }
+
+ _execute_item_complete(expire_to);
_consume();
// Have we gone idle? If so, do an extra write_head now instead of
// Also do this periodically even if not idle, so that the persisted
// expire_pos doesn't fall too far behind our progress when consuming
// a very long queue.
- if (in_flight.empty() || journaler.write_head_needed()) {
+ if (!readonly &&
+ (in_flight.empty() || journaler.write_head_needed())) {
journaler.write_head(nullptr);
}
}), &finisher));
void PurgeQueue::_execute_item_complete(
uint64_t expire_to)
{
- ceph_assert(lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(lock));
dout(10) << "complete at 0x" << std::hex << expire_to << std::dec << dendl;
ceph_assert(in_flight.count(expire_to) == 1);
logger->set(l_pq_executing_high_water, files_high_water);
dout(10) << "in_flight.size() now " << in_flight.size() << dendl;
+ uint64_t write_pos = journaler.get_write_pos();
+ uint64_t read_pos = journaler.get_read_pos();
+ uint64_t expire_pos = journaler.get_expire_pos();
+ uint64_t item_num = (write_pos - (in_flight.size() ? expire_pos : read_pos))
+ / purge_item_journal_size;
+ dout(10) << "left purge items in journal: " << item_num
+ << " (purge_item_journal_size/write_pos/read_pos/expire_pos) now at "
+ << "(" << purge_item_journal_size << "/" << write_pos << "/" << read_pos
+ << "/" << expire_pos << ")" << dendl;
+
+ logger->set(l_pq_item_in_journal, item_num);
logger->inc(l_pq_executed);
}
// might need to kick off consume.
dout(4) << "maybe start work again (max_purge_files="
<< g_conf()->mds_max_purge_files << dendl;
- finisher.queue(new FunctionContext([this](int r){
+ finisher.queue(new LambdaContext([this](int r){
std::lock_guard l(lock);
_consume();
}));