#include "PurgeQueue.h"
+#include <string.h>
#define dout_context cct
#define dout_subsys ceph_subsys_mds
if (logger) {
g_ceph_context->get_perfcounters_collection()->remove(logger.get());
}
+ delete on_error;
}
void PurgeQueue::create_logger()
void PurgeQueue::activate()
{
Mutex::Locker l(lock);
+
+ if (readonly) {
+ dout(10) << "skipping activate: PurgeQueue is readonly" << dendl;
+ return;
+ }
+
if (journaler.get_read_pos() == journaler.get_write_pos())
return;
finish_contexts(g_ceph_context, waiting_for_recovery);
} else {
derr << "Error " << r << " loading Journaler" << dendl;
- on_error->complete(r);
+ _go_readonly(r);
}
}));
}
void PurgeQueue::wait_for_recovery(Context* c)
{
Mutex::Locker l(lock);
- if (recovered)
+ if (recovered) {
c->complete(0);
- else
+ } else if (readonly) {
+ dout(10) << "cannot wait for recovery: PurgeQueue is readonly" << dendl;
+ c->complete(-EROFS);
+ } else {
waiting_for_recovery.push_back(c);
+ }
}
void PurgeQueue::_recover()
if (journaler.get_error()) {
int r = journaler.get_error();
derr << "Error " << r << " recovering write_pos" << dendl;
- on_error->complete(r);
+ _go_readonly(r);
return;
}
journaler.create(&layout, JOURNAL_FORMAT_RESILIENT);
journaler.write_head(new FunctionContext([this](int r) {
Mutex::Locker l(lock);
- recovered = true;
- finish_contexts(g_ceph_context, waiting_for_recovery);
+ if (r) {
+ _go_readonly(r);
+ } else {
+ recovered = true;
+ finish_contexts(g_ceph_context, waiting_for_recovery);
+ }
}));
}
dout(4) << "pushing inode 0x" << std::hex << pi.ino << std::dec << dendl;
Mutex::Locker l(lock);
+ if (readonly) {
+ dout(10) << "cannot push inode: PurgeQueue is readonly" << dendl;
+ completion->complete(-EROFS);
+ return;
+ }
+
// Callers should have waited for open() before using us
assert(!journaler.is_readonly());
if (!could_consume) {
// Usually, it is not necessary to explicitly flush here, because the reader
// will get flushes generated inside Journaler::is_readable. However,
- // if we remain in a can_consume()==false state for a long period then
+ // if we remain in a _can_consume()==false state for a long period then
// we should flush in order to allow MDCache to drop its strays rather
// than having them wait for purgequeue to progress.
if (!delayed_flush) {
return ops_required;
}
-bool PurgeQueue::can_consume()
+bool PurgeQueue::_can_consume()
{
+ if (readonly) {
+ dout(10) << "can't consume: PurgeQueue is readonly" << dendl;
+ return false;
+ }
+
dout(20) << ops_in_flight << "/" << max_purge_ops << " ops, "
<< in_flight.size() << "/" << g_conf->mds_max_purge_files
<< " files" << dendl;
}
}
+void PurgeQueue::_go_readonly(int r)
+{
+ if (readonly) return;
+ dout(1) << "going readonly because internal IO failed: " << strerror(-r) << dendl;
+ readonly = true;
+ on_error->complete(r);
+ on_error = nullptr;
+ journaler.set_readonly();
+ finish_contexts(g_ceph_context, waiting_for_recovery, r);
+}
+
bool PurgeQueue::_consume()
{
assert(lock.is_locked_by_me());
bool could_consume = false;
- while(can_consume()) {
+ while(_can_consume()) {
if (delayed_flush) {
// We are now going to read from the journal, so any proactive
if (int r = journaler.get_error()) {
derr << "Error " << r << " recovering write_pos" << dendl;
- on_error->complete(r);
+ _go_readonly(r);
return could_consume;
}
if (r == 0) {
_consume();
} else if (r != -EAGAIN) {
- on_error->complete(r);
+ _go_readonly(r);
}
}));
}
} catch (const buffer::error &err) {
derr << "Decode error at read_pos=0x" << std::hex
<< journaler.get_read_pos() << dendl;
- on_error->complete(0);
+ _go_readonly(EIO);
}
dout(20) << " executing item (0x" << std::hex << item.ino
<< std::dec << ")" << dendl;
in_flight[expire_to] = item;
logger->set(l_pq_executing, in_flight.size());
- ops_in_flight += _calculate_ops(item);
+ auto ops = _calculate_ops(item);
+ ops_in_flight += ops;
logger->set(l_pq_executing_ops, ops_in_flight);
SnapContext nullsnapc;
} else {
derr << "Invalid item (action=" << item.action << ") in purge queue, "
"dropping it" << dendl;
+ ops_in_flight -= ops;
+ logger->set(l_pq_executing_ops, ops_in_flight);
in_flight.erase(expire_to);
logger->set(l_pq_executing, in_flight.size());
return;
// expire_pos doesn't fall too far behind our progress when consuming
// a very long queue.
if (in_flight.empty() || journaler.write_head_needed()) {
- journaler.write_head(new FunctionContext([this](int r){
- journaler.trim();
- }));
+ journaler.write_head(nullptr);
}
}), &finisher));
{
Mutex::Locker l(lock);
+ if (readonly) {
+ dout(10) << "skipping; PurgeQueue is readonly" << dendl;
+ return;
+ }
+
uint64_t pg_count = 0;
objecter->with_osdmap([&](const OSDMap& o) {
// Number of PGs across all data pools
size_t *in_flight_count
)
{
+ Mutex::Locker l(lock);
+
+ if (readonly) {
+ dout(10) << "skipping drain; PurgeQueue is readonly" << dendl;
+ return true;
+ }
+
assert(progress != nullptr);
assert(progress_total != nullptr);
assert(in_flight_count != nullptr);