*
*/
+#include <algorithm>
+#include <cerrno>
+
#include "Objecter.h"
#include "osd/OSDMap.h"
+#include "osd/error_code.h"
#include "Filer.h"
#include "mon/MonClient.h"
+#include "mon/error_code.h"
#include "msg/Messenger.h"
#include "msg/Message.h"
#include "messages/MWatchNotify.h"
-#include <errno.h>
+#include "common/Cond.h"
#include "common/config.h"
#include "common/perf_counters.h"
#include "common/scrub_types.h"
#include "include/str_list.h"
#include "common/errno.h"
#include "common/EventTrace.h"
+#include "common/async/waiter.h"
+#include "error_code.h"
+
+
+using std::list;
+using std::make_pair;
+using std::map;
+using std::ostream;
+using std::ostringstream;
+using std::pair;
+using std::set;
+using std::string;
+using std::stringstream;
+using std::vector;
+
+using ceph::decode;
+using ceph::encode;
+using ceph::Formatter;
+
+using std::defer_lock;
+using std::scoped_lock;
+using std::shared_lock;
+using std::unique_lock;
using ceph::real_time;
using ceph::real_clock;
using ceph::timespan;
+using ceph::shunique_lock;
+using ceph::acquire_shared;
+using ceph::acquire_unique;
+
+namespace bc = boost::container;
+namespace bs = boost::system;
+namespace ca = ceph::async;
+namespace cb = ceph::buffer;
#define dout_subsys ceph_subsys_objecter
#undef dout_prefix
l_osdc_op_send_bytes,
l_osdc_op_resend,
l_osdc_op_reply,
+ l_osdc_oplen_avg,
l_osdc_op,
l_osdc_op_r,
l_osdc_osdop_cmpxattr,
l_osdc_osdop_rmxattr,
l_osdc_osdop_resetxattrs,
- l_osdc_osdop_tmap_up,
- l_osdc_osdop_tmap_put,
- l_osdc_osdop_tmap_get,
l_osdc_osdop_call,
l_osdc_osdop_watch,
l_osdc_osdop_notify,
l_osdc_last,
};
+namespace {
+inline bs::error_code osdcode(int r) {
+ return (r < 0) ? bs::error_code(-r, osd_category()) : bs::error_code();
+}
+}
// config obs ----------------------------
-static const char *config_keys[] = {
- "crush_location",
- NULL
-};
-
class Objecter::RequestStateHook : public AdminSocketHook {
Objecter *m_objecter;
public:
explicit RequestStateHook(Objecter *objecter);
- bool call(std::string command, cmdmap_t& cmdmap, std::string format,
- bufferlist& out) override;
+ int call(std::string_view command, const cmdmap_t& cmdmap,
+ const bufferlist&,
+ Formatter *f,
+ std::ostream& ss,
+ cb::list& out) override;
};
-/**
- * This is a more limited form of C_Contexts, but that requires
- * a ceph_context which we don't have here.
- */
-class ObjectOperation::C_TwoContexts : public Context {
- Context *first;
- Context *second;
-public:
- C_TwoContexts(Context *first, Context *second) :
- first(first), second(second) {}
- void finish(int r) override {
- first->complete(r);
- second->complete(r);
- first = NULL;
- second = NULL;
- }
-
- ~C_TwoContexts() override {
- delete first;
- delete second;
- }
-};
-
-void ObjectOperation::add_handler(Context *extra) {
- size_t last = out_handler.size() - 1;
- Context *orig = out_handler[last];
- if (orig) {
- Context *wrapper = new C_TwoContexts(orig, extra);
- out_handler[last] = wrapper;
- } else {
- out_handler[last] = extra;
- }
-}
-
-Objecter::OSDSession::unique_completion_lock Objecter::OSDSession::get_lock(
- object_t& oid)
+std::unique_lock<std::mutex> Objecter::OSDSession::get_lock(object_t& oid)
{
if (oid.name.empty())
- return unique_completion_lock();
+ return {};
static constexpr uint32_t HASH_PRIME = 1021;
uint32_t h = ceph_str_hash_linux(oid.name.c_str(), oid.name.size())
% HASH_PRIME;
- return unique_completion_lock(completion_locks[h % num_locks],
- std::defer_lock);
+ return {completion_locks[h % num_locks], std::defer_lock};
}
const char** Objecter::get_tracked_conf_keys() const
{
+ static const char *config_keys[] = {
+ "crush_location",
+ "rados_mon_op_timeout",
+ "rados_osd_op_timeout",
+ NULL
+ };
return config_keys;
}
-void Objecter::handle_conf_change(const struct md_config_t *conf,
+void Objecter::handle_conf_change(const ConfigProxy& conf,
const std::set <std::string> &changed)
{
if (changed.count("crush_location")) {
update_crush_location();
}
+ if (changed.count("rados_mon_op_timeout")) {
+ mon_timeout = conf.get_val<std::chrono::seconds>("rados_mon_op_timeout");
+ }
+ if (changed.count("rados_osd_op_timeout")) {
+ osd_timeout = conf.get_val<std::chrono::seconds>("rados_osd_op_timeout");
+ }
}
void Objecter::update_crush_location()
*/
void Objecter::init()
{
- assert(!initialized);
+ ceph_assert(!initialized);
if (!logger) {
PerfCountersBuilder pcb(cct, "objecter", l_osdc_first, l_osdc_last);
PerfCountersBuilder::PRIO_CRITICAL);
pcb.add_u64(l_osdc_op_laggy, "op_laggy", "Laggy operations");
pcb.add_u64_counter(l_osdc_op_send, "op_send", "Sent operations");
- pcb.add_u64_counter(l_osdc_op_send_bytes, "op_send_bytes", "Sent data");
+ pcb.add_u64_counter(l_osdc_op_send_bytes, "op_send_bytes", "Sent data", NULL, 0, unit_t(UNIT_BYTES));
pcb.add_u64_counter(l_osdc_op_resend, "op_resend", "Resent operations");
pcb.add_u64_counter(l_osdc_op_reply, "op_reply", "Operation reply");
+ pcb.add_u64_avg(l_osdc_oplen_avg, "oplen_avg", "Average length of operation vector");
pcb.add_u64_counter(l_osdc_op, "op", "Operations");
pcb.add_u64_counter(l_osdc_op_r, "op_r", "Read operations", "rd",
"Remove xattr operations");
pcb.add_u64_counter(l_osdc_osdop_resetxattrs, "osdop_resetxattrs",
"Reset xattr operations");
- pcb.add_u64_counter(l_osdc_osdop_tmap_up, "osdop_tmap_up",
- "TMAP update operations");
- pcb.add_u64_counter(l_osdc_osdop_tmap_put, "osdop_tmap_put",
- "TMAP put operations");
- pcb.add_u64_counter(l_osdc_osdop_tmap_get, "osdop_tmap_get",
- "TMAP get operations");
pcb.add_u64_counter(l_osdc_osdop_call, "osdop_call",
"Call (execute) operations");
pcb.add_u64_counter(l_osdc_osdop_watch, "osdop_watch",
}
m_request_state_hook = new RequestStateHook(this);
- AdminSocket* admin_socket = cct->get_admin_socket();
+ auto admin_socket = cct->get_admin_socket();
int ret = admin_socket->register_command("objecter_requests",
- "objecter_requests",
m_request_state_hook,
"show in-progress osd requests");
update_crush_location();
- cct->_conf->add_observer(this);
+ cct->_conf.add_observer(this);
initialized = true;
}
start_tick();
if (o) {
osdmap->deepish_copy_from(*o);
+ prune_pg_mapping(osdmap->get_pools());
} else if (osdmap->get_epoch() == 0) {
_maybe_request_map();
}
void Objecter::shutdown()
{
- assert(initialized);
+ ceph_assert(initialized);
unique_lock wl(rwlock);
initialized = false;
- cct->_conf->remove_observer(this);
+ wl.unlock();
+ cct->_conf.remove_observer(this);
+ wl.lock();
- map<int,OSDSession*>::iterator p;
while (!osd_sessions.empty()) {
- p = osd_sessions.begin();
+ auto p = osd_sessions.begin();
close_session(p->second);
}
while(!check_latest_map_lingers.empty()) {
- map<uint64_t, LingerOp*>::iterator i = check_latest_map_lingers.begin();
+ auto i = check_latest_map_lingers.begin();
i->second->put();
check_latest_map_lingers.erase(i->first);
}
while(!check_latest_map_ops.empty()) {
- map<ceph_tid_t, Op*>::iterator i = check_latest_map_ops.begin();
+ auto i = check_latest_map_ops.begin();
i->second->put();
check_latest_map_ops.erase(i->first);
}
while(!check_latest_map_commands.empty()) {
- map<ceph_tid_t, CommandOp*>::iterator i
- = check_latest_map_commands.begin();
+ auto i = check_latest_map_commands.begin();
i->second->put();
check_latest_map_commands.erase(i->first);
}
while(!poolstat_ops.empty()) {
- map<ceph_tid_t,PoolStatOp*>::iterator i = poolstat_ops.begin();
+ auto i = poolstat_ops.begin();
delete i->second;
poolstat_ops.erase(i->first);
}
while(!statfs_ops.empty()) {
- map<ceph_tid_t, StatfsOp*>::iterator i = statfs_ops.begin();
+ auto i = statfs_ops.begin();
delete i->second;
statfs_ops.erase(i->first);
}
while(!pool_ops.empty()) {
- map<ceph_tid_t, PoolOp*>::iterator i = pool_ops.begin();
+ auto i = pool_ops.begin();
delete i->second;
pool_ops.erase(i->first);
}
ldout(cct, 20) << __func__ << " clearing up homeless session..." << dendl;
while(!homeless_session->linger_ops.empty()) {
- std::map<uint64_t, LingerOp*>::iterator i
- = homeless_session->linger_ops.begin();
+ auto i = homeless_session->linger_ops.begin();
ldout(cct, 10) << " linger_op " << i->first << dendl;
LingerOp *lop = i->second;
{
- OSDSession::unique_lock swl(homeless_session->lock);
+ std::unique_lock swl(homeless_session->lock);
_session_linger_op_remove(homeless_session, lop);
}
linger_ops.erase(lop->linger_id);
}
while(!homeless_session->ops.empty()) {
- std::map<ceph_tid_t, Op*>::iterator i = homeless_session->ops.begin();
+ auto i = homeless_session->ops.begin();
ldout(cct, 10) << " op " << i->first << dendl;
- Op *op = i->second;
+ auto op = i->second;
{
- OSDSession::unique_lock swl(homeless_session->lock);
+ std::unique_lock swl(homeless_session->lock);
_session_op_remove(homeless_session, op);
}
op->put();
}
while(!homeless_session->command_ops.empty()) {
- std::map<ceph_tid_t, CommandOp*>::iterator i
- = homeless_session->command_ops.begin();
+ auto i = homeless_session->command_ops.begin();
ldout(cct, 10) << " command_op " << i->first << dendl;
- CommandOp *cop = i->second;
+ auto cop = i->second;
{
- OSDSession::unique_lock swl(homeless_session->lock);
+ std::unique_lock swl(homeless_session->lock);
_session_command_op_remove(homeless_session, cop);
}
cop->put();
// This is safe because we guarantee no concurrent calls to
// shutdown() with the ::initialized check at start.
if (m_request_state_hook) {
- AdminSocket* admin_socket = cct->get_admin_socket();
- admin_socket->unregister_command("objecter_requests");
+ auto admin_socket = cct->get_admin_socket();
+ admin_socket->unregister_commands(m_request_state_hook);
delete m_request_state_hook;
m_request_state_hook = NULL;
}
}
void Objecter::_send_linger(LingerOp *info,
- shunique_lock& sul)
+ ceph::shunique_lock<ceph::shared_mutex>& sul)
{
- assert(sul.owns_lock() && sul.mutex() == &rwlock);
+ ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
- vector<OSDOp> opv;
- Context *oncommit = NULL;
- LingerOp::shared_lock watchl(info->watch_lock);
- bufferlist *poutbl = NULL;
+ fu2::unique_function<Op::OpSig> oncommit;
+ osdc_opvec opv;
+ std::shared_lock watchl(info->watch_lock);
+ cb::list *poutbl = nullptr;
if (info->registered && info->is_watch) {
ldout(cct, 15) << "send_linger " << info->linger_id << " reconnect"
<< dendl;
opv.back().op.watch.cookie = info->get_cookie();
opv.back().op.watch.op = CEPH_OSD_WATCH_OP_RECONNECT;
opv.back().op.watch.gen = ++info->register_gen;
- oncommit = new C_Linger_Reconnect(this, info);
+ oncommit = CB_Linger_Reconnect(this, info);
} else {
ldout(cct, 15) << "send_linger " << info->linger_id << " register"
<< dendl;
opv = info->ops;
- C_Linger_Commit *c = new C_Linger_Commit(this, info);
+ // TODO Augment ca::Completion with an equivalent of
+ // target so we can handle these cases better.
+ auto c = std::make_unique<CB_Linger_Commit>(this, info);
if (!info->is_watch) {
info->notify_id = 0;
poutbl = &c->outbl;
}
- oncommit = c;
+ oncommit = [c = std::move(c)](bs::error_code ec) mutable {
+ std::move(*c)(ec);
+ };
}
watchl.unlock();
- Op *o = new Op(info->target.base_oid, info->target.base_oloc,
- opv, info->target.flags | CEPH_OSD_FLAG_READ,
- oncommit, info->pobjver);
+ auto o = new Op(info->target.base_oid, info->target.base_oloc,
+ std::move(opv), info->target.flags | CEPH_OSD_FLAG_READ,
+ std::move(oncommit), info->pobjver);
o->outbl = poutbl;
o->snapid = info->snap;
o->snapc = info->snapc;
// do not resend this; we will send a new op to reregister
o->should_resend = false;
+ o->ctx_budgeted = true;
if (info->register_tid) {
- // repeat send. cancel old registeration op, if any.
- OSDSession::unique_lock sl(info->session->lock);
+ // repeat send. cancel old registration op, if any.
+ std::unique_lock sl(info->session->lock);
if (info->session->ops.count(info->register_tid)) {
- Op *o = info->session->ops[info->register_tid];
+ auto o = info->session->ops[info->register_tid];
_op_cancel_map_check(o);
_cancel_linger_op(o);
}
sl.unlock();
-
- _op_submit(o, sul, &info->register_tid);
- } else {
- // first send
- _op_submit_with_budget(o, sul, &info->register_tid);
}
+ _op_submit_with_budget(o, sul, &info->register_tid, &info->ctx_budget);
+
logger->inc(l_osdc_linger_send);
}
-void Objecter::_linger_commit(LingerOp *info, int r, bufferlist& outbl)
+void Objecter::_linger_commit(LingerOp *info, bs::error_code ec,
+ cb::list& outbl)
{
- LingerOp::unique_lock wl(info->watch_lock);
+ std::unique_lock wl(info->watch_lock);
ldout(cct, 10) << "_linger_commit " << info->linger_id << dendl;
if (info->on_reg_commit) {
- info->on_reg_commit->complete(r);
- info->on_reg_commit = NULL;
+ info->on_reg_commit->defer(std::move(info->on_reg_commit),
+ ec, cb::list{});
+ info->on_reg_commit.reset();
+ }
+ if (ec && info->on_notify_finish) {
+ info->on_notify_finish->defer(std::move(info->on_notify_finish),
+ ec, cb::list{});
+ info->on_notify_finish.reset();
}
// only tell the user the first time we do this
if (!info->is_watch) {
// make note of the notify_id
- bufferlist::iterator p = outbl.begin();
+ auto p = outbl.cbegin();
try {
- ::decode(info->notify_id, p);
+ decode(info->notify_id, p);
ldout(cct, 10) << "_linger_commit notify_id=" << info->notify_id
<< dendl;
}
- catch (buffer::error& e) {
+ catch (cb::error& e) {
}
}
}
-struct C_DoWatchError : public Context {
+class CB_DoWatchError {
Objecter *objecter;
- Objecter::LingerOp *info;
- int err;
- C_DoWatchError(Objecter *o, Objecter::LingerOp *i, int r)
- : objecter(o), info(i), err(r) {
- info->get();
+ boost::intrusive_ptr<Objecter::LingerOp> info;
+ bs::error_code ec;
+public:
+ CB_DoWatchError(Objecter *o, Objecter::LingerOp *i,
+ bs::error_code ec)
+ : objecter(o), info(i), ec(ec) {
info->_queued_async();
}
- void finish(int r) override {
- Objecter::unique_lock wl(objecter->rwlock);
+ void operator()() {
+ std::unique_lock wl(objecter->rwlock);
bool canceled = info->canceled;
wl.unlock();
if (!canceled) {
- info->watch_context->handle_error(info->get_cookie(), err);
+ info->handle(ec, 0, info->get_cookie(), 0, {});
}
info->finished_async();
- info->put();
}
};
-int Objecter::_normalize_watch_error(int r)
+bs::error_code Objecter::_normalize_watch_error(bs::error_code ec)
{
// translate ENOENT -> ENOTCONN so that a delete->disconnection
- // notification and a failure to reconnect becuase we raced with
+ // notification and a failure to reconnect because we raced with
// the delete appear the same to the user.
- if (r == -ENOENT)
- r = -ENOTCONN;
- return r;
+ if (ec == bs::errc::no_such_file_or_directory)
+ ec = bs::error_code(ENOTCONN, osd_category());
+ return ec;
}
-void Objecter::_linger_reconnect(LingerOp *info, int r)
+void Objecter::_linger_reconnect(LingerOp *info, bs::error_code ec)
{
- ldout(cct, 10) << __func__ << " " << info->linger_id << " = " << r
+ ldout(cct, 10) << __func__ << " " << info->linger_id << " = " << ec
<< " (last_error " << info->last_error << ")" << dendl;
- if (r < 0) {
- LingerOp::unique_lock wl(info->watch_lock);
+ std::unique_lock wl(info->watch_lock);
+ if (ec) {
if (!info->last_error) {
- r = _normalize_watch_error(r);
- info->last_error = r;
- if (info->watch_context) {
- finisher->queue(new C_DoWatchError(this, info, r));
+ ec = _normalize_watch_error(ec);
+ if (info->handle) {
+ boost::asio::defer(finish_strand, CB_DoWatchError(this, info, ec));
}
}
- wl.unlock();
}
+
+ info->last_error = ec;
}
void Objecter::_send_linger_ping(LingerOp *info)
return;
}
- ceph::mono_time now = ceph::mono_clock::now();
+ ceph::coarse_mono_time now = ceph::coarse_mono_clock::now();
ldout(cct, 10) << __func__ << " " << info->linger_id << " now " << now
<< dendl;
- vector<OSDOp> opv(1);
+ osdc_opvec opv(1);
opv[0].op.op = CEPH_OSD_OP_WATCH;
opv[0].op.watch.cookie = info->get_cookie();
opv[0].op.watch.op = CEPH_OSD_WATCH_OP_PING;
opv[0].op.watch.gen = info->register_gen;
- C_Linger_Ping *onack = new C_Linger_Ping(this, info);
+
Op *o = new Op(info->target.base_oid, info->target.base_oloc,
- opv, info->target.flags | CEPH_OSD_FLAG_READ,
- onack, NULL, NULL);
+ std::move(opv), info->target.flags | CEPH_OSD_FLAG_READ,
+ CB_Linger_Ping(this, info, now),
+ nullptr, nullptr);
o->target = info->target;
o->should_resend = false;
_send_op_account(o);
- MOSDOp *m = _prepare_osd_op(o);
o->tid = ++last_tid;
_session_op_assign(info->session, o);
- _send_op(o, m);
+ _send_op(o);
info->ping_tid = o->tid;
- onack->sent = now;
logger->inc(l_osdc_linger_ping);
}
-void Objecter::_linger_ping(LingerOp *info, int r, mono_time sent,
+void Objecter::_linger_ping(LingerOp *info, bs::error_code ec, ceph::coarse_mono_time sent,
uint32_t register_gen)
{
- LingerOp::unique_lock l(info->watch_lock);
+ std::unique_lock l(info->watch_lock);
ldout(cct, 10) << __func__ << " " << info->linger_id
- << " sent " << sent << " gen " << register_gen << " = " << r
+ << " sent " << sent << " gen " << register_gen << " = " << ec
<< " (last_error " << info->last_error
<< " register_gen " << info->register_gen << ")" << dendl;
if (info->register_gen == register_gen) {
- if (r == 0) {
+ if (!ec) {
info->watch_valid_thru = sent;
- } else if (r < 0 && !info->last_error) {
- r = _normalize_watch_error(r);
- info->last_error = r;
- if (info->watch_context) {
- finisher->queue(new C_DoWatchError(this, info, r));
+ } else if (ec && !info->last_error) {
+ ec = _normalize_watch_error(ec);
+ info->last_error = ec;
+ if (info->handle) {
+ boost::asio::defer(finish_strand, CB_DoWatchError(this, info, ec));
}
}
} else {
}
}
-int Objecter::linger_check(LingerOp *info)
+tl::expected<ceph::timespan,
+ bs::error_code> Objecter::linger_check(LingerOp *info)
{
- LingerOp::shared_lock l(info->watch_lock);
+ std::shared_lock l(info->watch_lock);
- mono_time stamp = info->watch_valid_thru;
+ ceph::coarse_mono_time stamp = info->watch_valid_thru;
if (!info->watch_pending_async.empty())
- stamp = MIN(info->watch_valid_thru, info->watch_pending_async.front());
- auto age = mono_clock::now() - stamp;
+ stamp = std::min(info->watch_valid_thru, info->watch_pending_async.front());
+ auto age = ceph::coarse_mono_clock::now() - stamp;
ldout(cct, 10) << __func__ << " " << info->linger_id
<< " err " << info->last_error
<< " age " << age << dendl;
if (info->last_error)
- return info->last_error;
+ return tl::unexpected(info->last_error);
// return a safe upper bound (we are truncating to ms)
- return
- 1 + std::chrono::duration_cast<std::chrono::milliseconds>(age).count();
+ return age;
}
void Objecter::linger_cancel(LingerOp *info)
ldout(cct, 20) << __func__ << " linger_id=" << info->linger_id << dendl;
if (!info->canceled) {
OSDSession *s = info->session;
- OSDSession::unique_lock sl(s->lock);
+ std::unique_lock sl(s->lock);
_session_linger_op_remove(s, info);
sl.unlock();
linger_ops.erase(info->linger_id);
linger_ops_set.erase(info);
- assert(linger_ops.size() == linger_ops_set.size());
+ ceph_assert(linger_ops.size() == linger_ops_set.size());
info->canceled = true;
info->put();
const object_locator_t& oloc,
int flags)
{
- LingerOp *info = new LingerOp;
+ unique_lock l(rwlock);
+ // Acquire linger ID
+ auto info = new LingerOp(this, ++max_linger_id);
info->target.base_oid = oid;
info->target.base_oloc = oloc;
if (info->target.base_oloc.key == oid)
info->target.base_oloc.key.clear();
info->target.flags = flags;
- info->watch_valid_thru = mono_clock::now();
-
- unique_lock l(rwlock);
-
- // Acquire linger ID
- info->linger_id = ++max_linger_id;
+ info->watch_valid_thru = ceph::coarse_mono_clock::now();
ldout(cct, 10) << __func__ << " info " << info
<< " linger_id " << info->linger_id
<< " cookie " << info->get_cookie()
<< dendl;
linger_ops[info->linger_id] = info;
linger_ops_set.insert(info);
- assert(linger_ops.size() == linger_ops_set.size());
+ ceph_assert(linger_ops.size() == linger_ops_set.size());
info->get(); // for the caller
return info;
ObjectOperation& op,
const SnapContext& snapc,
real_time mtime,
- bufferlist& inbl,
- Context *oncommit,
+ cb::list& inbl,
+ decltype(info->on_reg_commit)&& oncommit,
version_t *objver)
{
info->is_watch = true;
info->target.flags |= CEPH_OSD_FLAG_WRITE;
info->ops = op.ops;
info->inbl = inbl;
- info->poutbl = NULL;
info->pobjver = objver;
- info->on_reg_commit = oncommit;
+ info->on_reg_commit = std::move(oncommit);
+
+ info->ctx_budget = take_linger_budget(info);
shunique_lock sul(rwlock, ceph::acquire_unique);
_linger_submit(info, sul);
logger->inc(l_osdc_linger_active);
+ op.clear();
return info->linger_id;
}
ceph_tid_t Objecter::linger_notify(LingerOp *info,
ObjectOperation& op,
- snapid_t snap, bufferlist& inbl,
- bufferlist *poutbl,
- Context *onfinish,
+ snapid_t snap, cb::list& inbl,
+ decltype(LingerOp::on_reg_commit)&& onfinish,
version_t *objver)
{
info->snap = snap;
info->target.flags |= CEPH_OSD_FLAG_READ;
info->ops = op.ops;
info->inbl = inbl;
- info->poutbl = poutbl;
info->pobjver = objver;
- info->on_reg_commit = onfinish;
+ info->on_reg_commit = std::move(onfinish);
+ info->ctx_budget = take_linger_budget(info);
shunique_lock sul(rwlock, ceph::acquire_unique);
_linger_submit(info, sul);
logger->inc(l_osdc_linger_active);
+ op.clear();
return info->linger_id;
}
-void Objecter::_linger_submit(LingerOp *info, shunique_lock& sul)
+void Objecter::_linger_submit(LingerOp *info,
+ ceph::shunique_lock<ceph::shared_mutex>& sul)
{
- assert(sul.owns_lock() && sul.mutex() == &rwlock);
- assert(info->linger_id);
+ ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
+ ceph_assert(info->linger_id);
+ ceph_assert(info->ctx_budget != -1); // caller needs to have taken budget already!
// Populate Op::target
OSDSession *s = NULL;
- _calc_target(&info->target, nullptr);
+ int r = _calc_target(&info->target, nullptr);
+ switch (r) {
+ case RECALC_OP_TARGET_POOL_EIO:
+ _check_linger_pool_eio(info);
+ return;
+ }
// Create LingerOp<->OSDSession relation
- int r = _get_session(info->target.osd, &s, sul);
- assert(r == 0);
- OSDSession::unique_lock sl(s->lock);
+ r = _get_session(info->target.osd, &s, sul);
+ ceph_assert(r == 0);
+ unique_lock sl(s->lock);
_session_linger_op_assign(s, info);
sl.unlock();
put_session(s);
_send_linger(info, sul);
}
-struct C_DoWatchNotify : public Context {
+struct CB_DoWatchNotify {
Objecter *objecter;
- Objecter::LingerOp *info;
- MWatchNotify *msg;
- C_DoWatchNotify(Objecter *o, Objecter::LingerOp *i, MWatchNotify *m)
+ boost::intrusive_ptr<Objecter::LingerOp> info;
+ boost::intrusive_ptr<MWatchNotify> msg;
+ CB_DoWatchNotify(Objecter *o, Objecter::LingerOp *i, MWatchNotify *m)
: objecter(o), info(i), msg(m) {
- info->get();
info->_queued_async();
- msg->get();
}
- void finish(int r) override {
- objecter->_do_watch_notify(info, msg);
+ void operator()() {
+ objecter->_do_watch_notify(std::move(info), std::move(msg));
}
};
ldout(cct, 7) << __func__ << " cookie " << m->cookie << " dne" << dendl;
return;
}
- LingerOp::unique_lock wl(info->watch_lock);
+ std::unique_lock wl(info->watch_lock);
if (m->opcode == CEPH_WATCH_EVENT_DISCONNECT) {
if (!info->last_error) {
- info->last_error = -ENOTCONN;
- if (info->watch_context) {
- finisher->queue(new C_DoWatchError(this, info, -ENOTCONN));
+ info->last_error = bs::error_code(ENOTCONN, osd_category());
+ if (info->handle) {
+ boost::asio::defer(finish_strand, CB_DoWatchError(this, info,
+ info->last_error));
}
}
} else if (!info->is_watch) {
ldout(cct, 10) << __func__ << " reply notify " << m->notify_id
<< " != " << info->notify_id << ", ignoring" << dendl;
} else if (info->on_notify_finish) {
- info->notify_result_bl->claim(m->get_data());
- info->on_notify_finish->complete(m->return_code);
+ info->on_notify_finish->defer(
+ std::move(info->on_notify_finish),
+ osdcode(m->return_code), std::move(m->get_data()));
// if we race with reconnect we might get a second notify; only
// notify the caller once!
- info->on_notify_finish = NULL;
+ info->on_notify_finish = nullptr;
}
} else {
- finisher->queue(new C_DoWatchNotify(this, info, m));
+ boost::asio::defer(finish_strand, CB_DoWatchNotify(this, info, m));
}
}
-void Objecter::_do_watch_notify(LingerOp *info, MWatchNotify *m)
+void Objecter::_do_watch_notify(boost::intrusive_ptr<LingerOp> info,
+ boost::intrusive_ptr<MWatchNotify> m)
{
ldout(cct, 10) << __func__ << " " << *m << dendl;
shared_lock l(rwlock);
- assert(initialized);
+ ceph_assert(initialized);
if (info->canceled) {
l.unlock();
}
// notify completion?
- assert(info->is_watch);
- assert(info->watch_context);
- assert(m->opcode != CEPH_WATCH_EVENT_DISCONNECT);
+ ceph_assert(info->is_watch);
+ ceph_assert(info->handle);
+ ceph_assert(m->opcode != CEPH_WATCH_EVENT_DISCONNECT);
l.unlock();
switch (m->opcode) {
case CEPH_WATCH_EVENT_NOTIFY:
- info->watch_context->handle_notify(m->notify_id, m->cookie,
- m->notifier_gid, m->bl);
+ info->handle({}, m->notify_id, m->cookie, m->notifier_gid, std::move(m->bl));
break;
}
out:
info->finished_async();
- info->put();
- m->put();
}
bool Objecter::ms_dispatch(Message *m)
{
ldout(cct, 10) << __func__ << " " << cct << " " << *m << dendl;
- if (!initialized)
- return false;
-
switch (m->get_type()) {
// these we exlusively handle
case CEPH_MSG_OSD_OPREPLY:
return false;
}
-void Objecter::_scan_requests(OSDSession *s,
- bool force_resend,
- bool cluster_full,
- map<int64_t, bool> *pool_full_map,
- map<ceph_tid_t, Op*>& need_resend,
- list<LingerOp*>& need_resend_linger,
- map<ceph_tid_t, CommandOp*>& need_resend_command,
- shunique_lock& sul)
+void Objecter::_scan_requests(
+ OSDSession *s,
+ bool skipped_map,
+ bool cluster_full,
+ map<int64_t, bool> *pool_full_map,
+ map<ceph_tid_t, Op*>& need_resend,
+ list<LingerOp*>& need_resend_linger,
+ map<ceph_tid_t, CommandOp*>& need_resend_command,
+ ceph::shunique_lock<ceph::shared_mutex>& sul)
{
- assert(sul.owns_lock() && sul.mutex() == &rwlock);
+ ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
list<LingerOp*> unregister_lingers;
- OSDSession::unique_lock sl(s->lock);
+ std::unique_lock sl(s->lock);
// check for changed linger mappings (_before_ regular ops)
- map<ceph_tid_t,LingerOp*>::iterator lp = s->linger_ops.begin();
+ auto lp = s->linger_ops.begin();
while (lp != s->linger_ops.end()) {
- LingerOp *op = lp->second;
- assert(op->session == s);
+ auto op = lp->second;
+ ceph_assert(op->session == s);
// check_linger_pool_dne() may touch linger_ops; prevent iterator
// invalidation
++lp;
(*pool_full_map)[op->target.base_oloc.pool];
switch (r) {
case RECALC_OP_TARGET_NO_ACTION:
- if (!force_resend && !force_resend_writes)
+ if (!skipped_map && !force_resend_writes)
break;
// -- fall-thru --
case RECALC_OP_TARGET_NEED_RESEND:
unregister_lingers.push_back(op);
}
break;
+ case RECALC_OP_TARGET_POOL_EIO:
+ _check_linger_pool_eio(op);
+ ldout(cct, 10) << " need to unregister linger op "
+ << op->linger_id << dendl;
+ op->get();
+ unregister_lingers.push_back(op);
+ break;
}
}
// check for changed request mappings
- map<ceph_tid_t,Op*>::iterator p = s->ops.begin();
+ auto p = s->ops.begin();
while (p != s->ops.end()) {
Op *op = p->second;
++p; // check_op_pool_dne() may touch ops; prevent iterator invalidation
ldout(cct, 10) << " checking op " << op->tid << dendl;
+ _prune_snapc(osdmap->get_new_removed_snaps(), op);
bool force_resend_writes = cluster_full;
if (pool_full_map)
force_resend_writes = force_resend_writes ||
op->session ? op->session->con.get() : nullptr);
switch (r) {
case RECALC_OP_TARGET_NO_ACTION:
- if (!force_resend && !(force_resend_writes && op->respects_full()))
+ if (!skipped_map && !(force_resend_writes && op->target.respects_full()))
break;
// -- fall-thru --
case RECALC_OP_TARGET_NEED_RESEND:
- if (op->session) {
- _session_op_remove(op->session, op);
- }
+ _session_op_remove(op->session, op);
need_resend[op->tid] = op;
_op_cancel_map_check(op);
break;
case RECALC_OP_TARGET_POOL_DNE:
_check_op_pool_dne(op, &sl);
break;
+ case RECALC_OP_TARGET_POOL_EIO:
+ _check_op_pool_eio(op, &sl);
+ break;
}
}
// commands
- map<ceph_tid_t,CommandOp*>::iterator cp = s->command_ops.begin();
+ auto cp = s->command_ops.begin();
while (cp != s->command_ops.end()) {
- CommandOp *c = cp->second;
+ auto c = cp->second;
++cp;
ldout(cct, 10) << " checking command " << c->tid << dendl;
bool force_resend_writes = cluster_full;
switch (r) {
case RECALC_OP_TARGET_NO_ACTION:
// resend if skipped map; otherwise do nothing.
- if (!force_resend && !force_resend_writes)
+ if (!skipped_map && !force_resend_writes)
break;
// -- fall-thru --
case RECALC_OP_TARGET_NEED_RESEND:
need_resend_command[c->tid] = c;
- if (c->session) {
- _session_command_op_remove(c->session, c);
- }
+ _session_command_op_remove(c->session, c);
_command_cancel_map_check(c);
break;
case RECALC_OP_TARGET_POOL_DNE:
sl.unlock();
- for (list<LingerOp*>::iterator iter = unregister_lingers.begin();
+ for (auto iter = unregister_lingers.begin();
iter != unregister_lingers.end();
++iter) {
_linger_cancel(*iter);
void Objecter::handle_osd_map(MOSDMap *m)
{
- shunique_lock sul(rwlock, acquire_unique);
+ ceph::shunique_lock sul(rwlock, acquire_unique);
if (!initialized)
return;
- assert(osdmap);
+ ceph_assert(osdmap);
if (m->fsid != monc->get_fsid()) {
ldout(cct, 0) << "handle_osd_map fsid " << m->fsid
bool was_pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || cluster_full ||
_osdmap_has_pool_full();
map<int64_t, bool> pool_full_map;
- for (map<int64_t, pg_pool_t>::const_iterator it
- = osdmap->get_pools().begin();
+ for (auto it = osdmap->get_pools().begin();
it != osdmap->get_pools().end(); ++it)
pool_full_map[it->first] = _osdmap_pool_full(it->second);
OSDMap::Incremental inc(m->incremental_maps[e]);
osdmap->apply_incremental(inc);
- emit_blacklist_events(inc);
+ emit_blocklist_events(inc);
logger->inc(l_osdc_map_inc);
}
else if (m->maps.count(e)) {
ldout(cct, 3) << "handle_osd_map decoding full epoch " << e << dendl;
- OSDMap *new_osdmap = new OSDMap();
+ auto new_osdmap = std::make_unique<OSDMap>();
new_osdmap->decode(m->maps[e]);
- emit_blacklist_events(*osdmap, *new_osdmap);
-
- osdmap = new_osdmap;
+ emit_blocklist_events(*osdmap, *new_osdmap);
+ osdmap = std::move(new_osdmap);
logger->inc(l_osdc_map_full);
}
}
logger->set(l_osdc_map_epoch, osdmap->get_epoch());
+ prune_pg_mapping(osdmap->get_pools());
cluster_full = cluster_full || _osdmap_full_flag();
update_pool_full_map(pool_full_map);
// check all outstanding requests on every epoch
+ for (auto& i : need_resend) {
+ _prune_snapc(osdmap->get_new_removed_snaps(), i.second);
+ }
_scan_requests(homeless_session, skipped_map, cluster_full,
&pool_full_map, need_resend,
need_resend_linger, need_resend_command, sul);
- for (map<int,OSDSession*>::iterator p = osd_sessions.begin();
+ for (auto p = osd_sessions.begin();
p != osd_sessions.end(); ) {
- OSDSession *s = p->second;
+ auto s = p->second;
_scan_requests(s, skipped_map, cluster_full,
&pool_full_map, need_resend,
need_resend_linger, need_resend_command, sul);
// osd down or addr change?
if (!osdmap->is_up(s->osd) ||
(s->con &&
- s->con->get_peer_addr() != osdmap->get_inst(s->osd).addr)) {
+ s->con->get_peer_addrs() != osdmap->get_addrs(s->osd))) {
close_session(s);
}
}
- assert(e == osdmap->get_epoch());
+ ceph_assert(e == osdmap->get_epoch());
}
} else {
// first map. we want the full thing.
if (m->maps.count(m->get_last())) {
- for (map<int,OSDSession*>::iterator p = osd_sessions.begin();
+ for (auto p = osd_sessions.begin();
p != osd_sessions.end(); ++p) {
OSDSession *s = p->second;
_scan_requests(s, false, false, NULL, need_resend,
ldout(cct, 3) << "handle_osd_map decoding full epoch "
<< m->get_last() << dendl;
osdmap->decode(m->maps[m->get_last()]);
+ prune_pg_mapping(osdmap->get_pools());
_scan_requests(homeless_session, false, false, NULL,
need_resend, need_resend_linger,
}
// resend requests
- for (map<ceph_tid_t, Op*>::iterator p = need_resend.begin();
+ for (auto p = need_resend.begin();
p != need_resend.end(); ++p) {
- Op *op = p->second;
- OSDSession *s = op->session;
+ auto op = p->second;
+ auto s = op->session;
bool mapped_session = false;
if (!s) {
int r = _map_session(&op->target, &s, sul);
- assert(r == 0);
+ ceph_assert(r == 0);
mapped_session = true;
} else {
get_session(s);
}
- OSDSession::unique_lock sl(s->lock);
+ std::unique_lock sl(s->lock);
if (mapped_session) {
_session_op_assign(s, op);
}
sl.unlock();
put_session(s);
}
- for (list<LingerOp*>::iterator p = need_resend_linger.begin();
+ for (auto p = need_resend_linger.begin();
p != need_resend_linger.end(); ++p) {
LingerOp *op = *p;
- if (!op->session) {
- _calc_target(&op->target, nullptr);
- OSDSession *s = NULL;
- const int r = _get_session(op->target.osd, &s, sul);
- assert(r == 0);
- assert(s != NULL);
- op->session = s;
- put_session(s);
- }
+ ceph_assert(op->session);
if (!op->session->is_homeless()) {
logger->inc(l_osdc_linger_resend);
_send_linger(op, sul);
}
}
- for (map<ceph_tid_t,CommandOp*>::iterator p = need_resend_command.begin();
+ for (auto p = need_resend_command.begin();
p != need_resend_command.end(); ++p) {
- CommandOp *c = p->second;
+ auto c = p->second;
if (c->target.osd >= 0) {
_assign_command_session(c, sul);
if (c->session && !c->session->is_homeless()) {
_dump_active();
// finish any Contexts that were waiting on a map update
- map<epoch_t,list< pair< Context*, int > > >::iterator p =
- waiting_for_map.begin();
+ auto p = waiting_for_map.begin();
while (p != waiting_for_map.end() &&
p->first <= osdmap->get_epoch()) {
//go through the list and call the onfinish methods
- for (list<pair<Context*, int> >::iterator i = p->second.begin();
- i != p->second.end(); ++i) {
- i->first->complete(i->second);
+ for (auto& [c, ec] : p->second) {
+ ca::post(std::move(c), ec);
}
waiting_for_map.erase(p++);
}
}
}
-void Objecter::enable_blacklist_events()
+void Objecter::enable_blocklist_events()
{
unique_lock wl(rwlock);
- blacklist_events_enabled = true;
+ blocklist_events_enabled = true;
}
-void Objecter::consume_blacklist_events(std::set<entity_addr_t> *events)
+void Objecter::consume_blocklist_events(std::set<entity_addr_t> *events)
{
unique_lock wl(rwlock);
if (events->empty()) {
- events->swap(blacklist_events);
+ events->swap(blocklist_events);
} else {
- for (const auto &i : blacklist_events) {
+ for (const auto &i : blocklist_events) {
events->insert(i);
}
- blacklist_events.clear();
+ blocklist_events.clear();
}
}
-void Objecter::emit_blacklist_events(const OSDMap::Incremental &inc)
+void Objecter::emit_blocklist_events(const OSDMap::Incremental &inc)
{
- if (!blacklist_events_enabled) {
+ if (!blocklist_events_enabled) {
return;
}
- for (const auto &i : inc.new_blacklist) {
- blacklist_events.insert(i.first);
+ for (const auto &i : inc.new_blocklist) {
+ blocklist_events.insert(i.first);
}
}
-void Objecter::emit_blacklist_events(const OSDMap &old_osd_map,
+void Objecter::emit_blocklist_events(const OSDMap &old_osd_map,
const OSDMap &new_osd_map)
{
- if (!blacklist_events_enabled) {
+ if (!blocklist_events_enabled) {
return;
}
std::set<entity_addr_t> old_set;
std::set<entity_addr_t> new_set;
+ std::set<entity_addr_t> old_range_set;
+ std::set<entity_addr_t> new_range_set;
- old_osd_map.get_blacklist(&old_set);
- new_osd_map.get_blacklist(&new_set);
+ old_osd_map.get_blocklist(&old_set, &old_range_set);
+ new_osd_map.get_blocklist(&new_set, &new_range_set);
std::set<entity_addr_t> delta_set;
std::set_difference(
new_set.begin(), new_set.end(), old_set.begin(), old_set.end(),
std::inserter(delta_set, delta_set.begin()));
- blacklist_events.insert(delta_set.begin(), delta_set.end());
+ std::set_difference(
+ new_range_set.begin(), new_range_set.end(),
+ old_range_set.begin(), old_range_set.end(),
+ std::inserter(delta_set, delta_set.begin()));
+ blocklist_events.insert(delta_set.begin(), delta_set.end());
}
// op pool check
-void Objecter::C_Op_Map_Latest::finish(int r)
+void Objecter::CB_Op_Map_Latest::operator()(bs::error_code e,
+ version_t latest, version_t)
{
- if (r == -EAGAIN || r == -ECANCELED)
+ if (e == bs::errc::resource_unavailable_try_again ||
+ e == bs::errc::operation_canceled)
return;
lgeneric_subdout(objecter->cct, objecter, 10)
- << "op_map_latest r=" << r << " tid=" << tid
+ << "op_map_latest r=" << e << " tid=" << tid
<< " latest " << latest << dendl;
- Objecter::unique_lock wl(objecter->rwlock);
+ unique_lock wl(objecter->rwlock);
- map<ceph_tid_t, Op*>::iterator iter =
- objecter->check_latest_map_ops.find(tid);
+ auto iter = objecter->check_latest_map_ops.find(tid);
if (iter == objecter->check_latest_map_ops.end()) {
lgeneric_subdout(objecter->cct, objecter, 10)
<< "op_map_latest op "<< tid << " not found" << dendl;
if (op->map_dne_bound == 0)
op->map_dne_bound = latest;
- OSDSession::unique_lock sl(op->session->lock, defer_lock);
+ unique_lock sl(op->session->lock, defer_lock);
objecter->_check_op_pool_dne(op, &sl);
op->put();
const pg_pool_t *pi = osdmap->get_pg_pool(poolid);
if (!pi)
return -ENOENT;
- for (map<snapid_t,pool_snap_info_t>::const_iterator p = pi->snaps.begin();
+ for (auto p = pi->snaps.begin();
p != pi->snaps.end();
++p) {
snaps->push_back(p->first);
}
// sl may be unlocked.
-void Objecter::_check_op_pool_dne(Op *op, unique_lock *sl)
+void Objecter::_check_op_pool_dne(Op *op, std::unique_lock<std::shared_mutex> *sl)
{
// rwlock is locked unique
ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
<< " concluding pool " << op->target.base_pgid.pool()
<< " dne" << dendl;
- if (op->onfinish) {
- op->onfinish->complete(-ENOENT);
+ if (op->has_completion()) {
+ num_in_flight--;
+ op->complete(osdc_errc::pool_dne, -ENOENT);
}
OSDSession *s = op->session;
if (s) {
- assert(s != NULL);
- assert(sl->mutex() == &s->lock);
+ ceph_assert(s != NULL);
+ ceph_assert(sl->mutex() == &s->lock);
bool session_locked = sl->owns_lock();
if (!session_locked) {
sl->lock();
}
}
+// sl may be unlocked.
+void Objecter::_check_op_pool_eio(Op *op, std::unique_lock<std::shared_mutex> *sl)
+{
+ // rwlock is locked unique
+
+ // we had a new enough map
+ ldout(cct, 10) << "check_op_pool_eio tid " << op->tid
+ << " concluding pool " << op->target.base_pgid.pool()
+ << " has eio" << dendl;
+ if (op->has_completion()) {
+ num_in_flight--;
+ op->complete(osdc_errc::pool_eio, -EIO);
+ }
+
+ OSDSession *s = op->session;
+ if (s) {
+ ceph_assert(s != NULL);
+ ceph_assert(sl->mutex() == &s->lock);
+ bool session_locked = sl->owns_lock();
+ if (!session_locked) {
+ sl->lock();
+ }
+ _finish_op(op, 0);
+ if (!session_locked) {
+ sl->unlock();
+ }
+ } else {
+ _finish_op(op, 0); // no session
+ }
+}
+
void Objecter::_send_op_map_check(Op *op)
{
// rwlock is locked unique
if (check_latest_map_ops.count(op->tid) == 0) {
op->get();
check_latest_map_ops[op->tid] = op;
- C_Op_Map_Latest *c = new C_Op_Map_Latest(this, op->tid);
- monc->get_version("osdmap", &c->latest, NULL, c);
+ monc->get_version("osdmap", CB_Op_Map_Latest(this, op->tid));
}
}
void Objecter::_op_cancel_map_check(Op *op)
{
// rwlock is locked unique
- map<ceph_tid_t, Op*>::iterator iter =
- check_latest_map_ops.find(op->tid);
+ auto iter = check_latest_map_ops.find(op->tid);
if (iter != check_latest_map_ops.end()) {
Op *op = iter->second;
op->put();
// linger pool check
-void Objecter::C_Linger_Map_Latest::finish(int r)
+void Objecter::CB_Linger_Map_Latest::operator()(bs::error_code e,
+ version_t latest,
+ version_t)
{
- if (r == -EAGAIN || r == -ECANCELED) {
+ if (e == bs::errc::resource_unavailable_try_again ||
+ e == bs::errc::operation_canceled) {
// ignore callback; we will retry in resend_mon_ops()
return;
}
unique_lock wl(objecter->rwlock);
- map<uint64_t, LingerOp*>::iterator iter =
- objecter->check_latest_map_lingers.find(linger_id);
+ auto iter = objecter->check_latest_map_lingers.find(linger_id);
if (iter == objecter->check_latest_map_lingers.end()) {
return;
}
- LingerOp *op = iter->second;
+ auto op = iter->second;
objecter->check_latest_map_lingers.erase(iter);
if (op->map_dne_bound == 0)
}
if (op->map_dne_bound > 0) {
if (osdmap->get_epoch() >= op->map_dne_bound) {
+ std::unique_lock wl{op->watch_lock};
if (op->on_reg_commit) {
- op->on_reg_commit->complete(-ENOENT);
+ op->on_reg_commit->defer(std::move(op->on_reg_commit),
+ osdc_errc::pool_dne, cb::list{});
+ op->on_reg_commit = nullptr;
+ }
+ if (op->on_notify_finish) {
+ op->on_notify_finish->defer(std::move(op->on_notify_finish),
+ osdc_errc::pool_dne, cb::list{});
+ op->on_notify_finish = nullptr;
}
*need_unregister = true;
}
}
}
+void Objecter::_check_linger_pool_eio(LingerOp *op)
+{
+ // rwlock is locked unique
+
+ std::unique_lock wl{op->watch_lock};
+ if (op->on_reg_commit) {
+ op->on_reg_commit->defer(std::move(op->on_reg_commit),
+ osdc_errc::pool_dne, cb::list{});
+ op->on_reg_commit = nullptr;
+ }
+ if (op->on_notify_finish) {
+ op->on_notify_finish->defer(std::move(op->on_notify_finish),
+ osdc_errc::pool_dne, cb::list{});
+ op->on_notify_finish = nullptr;
+ }
+}
+
void Objecter::_send_linger_map_check(LingerOp *op)
{
// ask the monitor
if (check_latest_map_lingers.count(op->linger_id) == 0) {
op->get();
check_latest_map_lingers[op->linger_id] = op;
- C_Linger_Map_Latest *c = new C_Linger_Map_Latest(this, op->linger_id);
- monc->get_version("osdmap", &c->latest, NULL, c);
+ monc->get_version("osdmap", CB_Linger_Map_Latest(this, op->linger_id));
}
}
{
// rwlock is locked unique
- map<uint64_t, LingerOp*>::iterator iter =
- check_latest_map_lingers.find(op->linger_id);
+ auto iter = check_latest_map_lingers.find(op->linger_id);
if (iter != check_latest_map_lingers.end()) {
LingerOp *op = iter->second;
op->put();
// command pool check
-void Objecter::C_Command_Map_Latest::finish(int r)
+void Objecter::CB_Command_Map_Latest::operator()(bs::error_code e,
+ version_t latest, version_t)
{
- if (r == -EAGAIN || r == -ECANCELED) {
+ if (e == bs::errc::resource_unavailable_try_again ||
+ e == bs::errc::operation_canceled) {
// ignore callback; we will retry in resend_mon_ops()
return;
}
unique_lock wl(objecter->rwlock);
- map<uint64_t, CommandOp*>::iterator iter =
- objecter->check_latest_map_commands.find(tid);
+ auto iter = objecter->check_latest_map_commands.find(tid);
if (iter == objecter->check_latest_map_commands.end()) {
return;
}
- CommandOp *c = iter->second;
+ auto c = iter->second;
objecter->check_latest_map_commands.erase(iter);
if (c->map_dne_bound == 0)
c->map_dne_bound = latest;
+ unique_lock sul(c->session->lock);
objecter->_check_command_map_dne(c);
+ sul.unlock();
c->put();
}
void Objecter::_check_command_map_dne(CommandOp *c)
{
// rwlock is locked unique
+ // session is locked unique
ldout(cct, 10) << "_check_command_map_dne tid " << c->tid
<< " current " << osdmap->get_epoch()
<< dendl;
if (c->map_dne_bound > 0) {
if (osdmap->get_epoch() >= c->map_dne_bound) {
- _finish_command(c, c->map_check_error, c->map_check_error_str);
+ _finish_command(c, osdcode(c->map_check_error),
+ std::move(c->map_check_error_str), {});
}
} else {
_send_command_map_check(c);
void Objecter::_send_command_map_check(CommandOp *c)
{
// rwlock is locked unique
+ // session is locked unique
// ask the monitor
if (check_latest_map_commands.count(c->tid) == 0) {
c->get();
check_latest_map_commands[c->tid] = c;
- C_Command_Map_Latest *f = new C_Command_Map_Latest(this, c->tid);
- monc->get_version("osdmap", &f->latest, NULL, f);
+ monc->get_version("osdmap", CB_Command_Map_Latest(this, c->tid));
}
}
{
// rwlock is locked uniqe
- map<uint64_t, CommandOp*>::iterator iter =
- check_latest_map_commands.find(c->tid);
+ auto iter = check_latest_map_commands.find(c->tid);
if (iter != check_latest_map_commands.end()) {
- CommandOp *c = iter->second;
+ auto c = iter->second;
c->put();
check_latest_map_commands.erase(iter);
}
* @returns 0 on success, or -EAGAIN if the lock context requires
* promotion to write.
*/
-int Objecter::_get_session(int osd, OSDSession **session, shunique_lock& sul)
+int Objecter::_get_session(int osd, OSDSession **session,
+ shunique_lock<ceph::shared_mutex>& sul)
{
- assert(sul && sul.mutex() == &rwlock);
+ ceph_assert(sul && sul.mutex() == &rwlock);
if (osd < 0) {
*session = homeless_session;
return 0;
}
- map<int,OSDSession*>::iterator p = osd_sessions.find(osd);
+ auto p = osd_sessions.find(osd);
if (p != osd_sessions.end()) {
- OSDSession *s = p->second;
+ auto s = p->second;
s->get();
*session = s;
ldout(cct, 20) << __func__ << " s=" << s << " osd=" << osd << " "
if (!sul.owns_lock()) {
return -EAGAIN;
}
- OSDSession *s = new OSDSession(cct, osd);
+ auto s = new OSDSession(cct, osd);
osd_sessions[osd] = s;
- s->con = messenger->get_connection(osdmap->get_inst(osd));
- s->con->set_priv(s->get());
+ s->con = messenger->connect_to_osd(osdmap->get_addrs(osd));
+ s->con->set_priv(RefCountedPtr{s});
logger->inc(l_osdc_osd_session_open);
logger->set(l_osdc_osd_sessions, osd_sessions.size());
s->get();
void Objecter::get_session(Objecter::OSDSession *s)
{
- assert(s != NULL);
+ ceph_assert(s != NULL);
if (!s->is_homeless()) {
ldout(cct, 20) << __func__ << " s=" << s << " osd=" << s->osd << " "
void Objecter::_reopen_session(OSDSession *s)
{
+ // rwlock is locked unique
// s->lock is locked
- entity_inst_t inst = osdmap->get_inst(s->osd);
+ auto addrs = osdmap->get_addrs(s->osd);
ldout(cct, 10) << "reopen_session osd." << s->osd << " session, addr now "
- << inst << dendl;
+ << addrs << dendl;
if (s->con) {
s->con->set_priv(NULL);
s->con->mark_down();
logger->inc(l_osdc_osd_session_close);
}
- s->con = messenger->get_connection(inst);
- s->con->set_priv(s->get());
+ s->con = messenger->connect_to_osd(addrs);
+ s->con->set_priv(RefCountedPtr{s});
s->incarnation++;
logger->inc(l_osdc_osd_session_open);
}
s->con->mark_down();
logger->inc(l_osdc_osd_session_close);
}
- OSDSession::unique_lock sl(s->lock);
+ unique_lock sl(s->lock);
std::list<LingerOp*> homeless_lingers;
std::list<CommandOp*> homeless_commands;
std::list<Op*> homeless_ops;
while (!s->linger_ops.empty()) {
- std::map<uint64_t, LingerOp*>::iterator i = s->linger_ops.begin();
+ auto i = s->linger_ops.begin();
ldout(cct, 10) << " linger_op " << i->first << dendl;
homeless_lingers.push_back(i->second);
_session_linger_op_remove(s, i->second);
}
while (!s->ops.empty()) {
- std::map<ceph_tid_t, Op*>::iterator i = s->ops.begin();
+ auto i = s->ops.begin();
ldout(cct, 10) << " op " << i->first << dendl;
homeless_ops.push_back(i->second);
_session_op_remove(s, i->second);
}
while (!s->command_ops.empty()) {
- std::map<ceph_tid_t, CommandOp*>::iterator i = s->command_ops.begin();
+ auto i = s->command_ops.begin();
ldout(cct, 10) << " command_op " << i->first << dendl;
homeless_commands.push_back(i->second);
_session_command_op_remove(s, i->second);
// Assign any leftover ops to the homeless session
{
- OSDSession::unique_lock hsl(homeless_session->lock);
- for (std::list<LingerOp*>::iterator i = homeless_lingers.begin();
+ unique_lock hsl(homeless_session->lock);
+ for (auto i = homeless_lingers.begin();
i != homeless_lingers.end(); ++i) {
_session_linger_op_assign(homeless_session, *i);
}
- for (std::list<Op*>::iterator i = homeless_ops.begin();
+ for (auto i = homeless_ops.begin();
i != homeless_ops.end(); ++i) {
_session_op_assign(homeless_session, *i);
}
- for (std::list<CommandOp*>::iterator i = homeless_commands.begin();
+ for (auto i = homeless_commands.begin();
i != homeless_commands.end(); ++i) {
_session_command_op_assign(homeless_session, *i);
}
logger->set(l_osdc_osd_sessions, osd_sessions.size());
}
-void Objecter::wait_for_osd_map()
+void Objecter::wait_for_osd_map(epoch_t e)
{
unique_lock l(rwlock);
- if (osdmap->get_epoch()) {
+ if (osdmap->get_epoch() >= e) {
l.unlock();
return;
}
- // Leave this since it goes with C_SafeCond
- Mutex lock("");
- Cond cond;
- bool done;
- lock.Lock();
- C_SafeCond *context = new C_SafeCond(&lock, &cond, &done, NULL);
- waiting_for_map[0].push_back(pair<Context*, int>(context, 0));
+ ca::waiter<bs::error_code> w;
+ waiting_for_map[e].emplace_back(OpCompletion::create(
+ service.get_executor(),
+ w.ref()),
+ bs::error_code{});
l.unlock();
- while (!done)
- cond.Wait(lock);
- lock.Unlock();
-}
-
-struct C_Objecter_GetVersion : public Context {
- Objecter *objecter;
- uint64_t oldest, newest;
- Context *fin;
- C_Objecter_GetVersion(Objecter *o, Context *c)
- : objecter(o), oldest(0), newest(0), fin(c) {}
- void finish(int r) override {
- if (r >= 0) {
- objecter->get_latest_version(oldest, newest, fin);
- } else if (r == -EAGAIN) { // try again as instructed
- objecter->wait_for_latest_osdmap(fin);
- } else {
- // it doesn't return any other error codes!
- ceph_abort();
- }
- }
-};
-
-void Objecter::wait_for_latest_osdmap(Context *fin)
-{
- ldout(cct, 10) << __func__ << dendl;
- C_Objecter_GetVersion *c = new C_Objecter_GetVersion(this, fin);
- monc->get_version("osdmap", &c->newest, &c->oldest, c);
-}
-
-void Objecter::get_latest_version(epoch_t oldest, epoch_t newest, Context *fin)
-{
- unique_lock wl(rwlock);
- _get_latest_version(oldest, newest, fin);
+ w.wait();
}
void Objecter::_get_latest_version(epoch_t oldest, epoch_t newest,
- Context *fin)
+ std::unique_ptr<OpCompletion> fin,
+ std::unique_lock<ceph::shared_mutex>&& l)
{
- // rwlock is locked unique
+ ceph_assert(fin);
if (osdmap->get_epoch() >= newest) {
- ldout(cct, 10) << __func__ << " latest " << newest << ", have it" << dendl;
- if (fin)
- fin->complete(0);
- return;
+ ldout(cct, 10) << __func__ << " latest " << newest << ", have it" << dendl;
+ l.unlock();
+ ca::defer(std::move(fin), bs::error_code{});
+ } else {
+ ldout(cct, 10) << __func__ << " latest " << newest << ", waiting" << dendl;
+ _wait_for_new_map(std::move(fin), newest, bs::error_code{});
+ l.unlock();
}
-
- ldout(cct, 10) << __func__ << " latest " << newest << ", waiting" << dendl;
- _wait_for_new_map(fin, newest, 0);
}
void Objecter::maybe_request_map()
}
}
-void Objecter::_wait_for_new_map(Context *c, epoch_t epoch, int err)
+void Objecter::_wait_for_new_map(std::unique_ptr<OpCompletion> c, epoch_t epoch,
+ bs::error_code ec)
{
// rwlock is locked unique
- waiting_for_map[epoch].push_back(pair<Context *, int>(c, err));
+ waiting_for_map[epoch].emplace_back(std::move(c), ec);
_maybe_request_map();
}
}
}
-bool Objecter::wait_for_map(epoch_t epoch, Context *c, int err)
-{
- unique_lock wl(rwlock);
- if (osdmap->get_epoch() >= epoch) {
- return true;
- }
- _wait_for_new_map(c, epoch, err);
- return false;
-}
-
-void Objecter::kick_requests(OSDSession *session)
-{
- ldout(cct, 10) << "kick_requests for osd." << session->osd << dendl;
-
- map<uint64_t, LingerOp *> lresend;
- unique_lock wl(rwlock);
-
- OSDSession::unique_lock sl(session->lock);
- _kick_requests(session, lresend);
- sl.unlock();
-
- _linger_ops_resend(lresend, wl);
-}
-
void Objecter::_kick_requests(OSDSession *session,
map<uint64_t, LingerOp *>& lresend)
{
// resend ops
map<ceph_tid_t,Op*> resend; // resend in tid order
- for (map<ceph_tid_t, Op*>::iterator p = session->ops.begin();
- p != session->ops.end();) {
+ for (auto p = session->ops.begin(); p != session->ops.end();) {
Op *op = p->second;
++p;
- logger->inc(l_osdc_op_resend);
if (op->should_resend) {
if (!op->target.paused)
resend[op->tid] = op;
}
}
+ logger->inc(l_osdc_op_resend, resend.size());
while (!resend.empty()) {
_send_op(resend.begin()->second);
resend.erase(resend.begin());
}
// resend lingers
- for (map<ceph_tid_t, LingerOp*>::iterator j = session->linger_ops.begin();
+ logger->inc(l_osdc_linger_resend, session->linger_ops.size());
+ for (auto j = session->linger_ops.begin();
j != session->linger_ops.end(); ++j) {
LingerOp *op = j->second;
op->get();
- logger->inc(l_osdc_linger_resend);
- assert(lresend.count(j->first) == 0);
+ ceph_assert(lresend.count(j->first) == 0);
lresend[j->first] = op;
}
// resend commands
+ logger->inc(l_osdc_command_resend, session->command_ops.size());
map<uint64_t,CommandOp*> cresend; // resend in order
- for (map<ceph_tid_t, CommandOp*>::iterator k = session->command_ops.begin();
+ for (auto k = session->command_ops.begin();
k != session->command_ops.end(); ++k) {
- logger->inc(l_osdc_command_resend);
cresend[k->first] = k->second;
}
while (!cresend.empty()) {
}
void Objecter::_linger_ops_resend(map<uint64_t, LingerOp *>& lresend,
- unique_lock& ul)
+ unique_lock<ceph::shared_mutex>& ul)
{
- assert(ul.owns_lock());
+ ceph_assert(ul.owns_lock());
shunique_lock sul(std::move(ul));
while (!lresend.empty()) {
LingerOp *op = lresend.begin()->second;
op->put();
lresend.erase(lresend.begin());
}
- ul = unique_lock(sul.release_to_unique());
+ ul = sul.release_to_unique();
}
void Objecter::start_tick()
{
- assert(tick_event == 0);
+ ceph_assert(tick_event == 0);
tick_event =
timer.add_event(ceph::make_timespan(cct->_conf->objecter_tick_interval),
&Objecter::tick, this);
// look for laggy requests
- auto cutoff = ceph::mono_clock::now();
+ auto cutoff = ceph::coarse_mono_clock::now();
cutoff -= ceph::make_timespan(cct->_conf->objecter_timeout); // timeout
unsigned laggy_ops = 0;
- for (map<int,OSDSession*>::iterator siter = osd_sessions.begin();
+ for (auto siter = osd_sessions.begin();
siter != osd_sessions.end(); ++siter) {
- OSDSession *s = siter->second;
- OSDSession::lock_guard l(s->lock);
+ auto s = siter->second;
+ scoped_lock l(s->lock);
bool found = false;
- for (map<ceph_tid_t,Op*>::iterator p = s->ops.begin();
- p != s->ops.end();
- ++p) {
- Op *op = p->second;
- assert(op->session);
+ for (auto p = s->ops.begin(); p != s->ops.end(); ++p) {
+ auto op = p->second;
+ ceph_assert(op->session);
if (op->stamp < cutoff) {
ldout(cct, 2) << " tid " << p->first << " on osd." << op->session->osd
<< " is laggy" << dendl;
++laggy_ops;
}
}
- for (map<uint64_t,LingerOp*>::iterator p = s->linger_ops.begin();
+ for (auto p = s->linger_ops.begin();
p != s->linger_ops.end();
++p) {
- LingerOp *op = p->second;
- LingerOp::unique_lock wl(op->watch_lock);
- assert(op->session);
+ auto op = p->second;
+ std::unique_lock wl(op->watch_lock);
+ ceph_assert(op->session);
ldout(cct, 10) << " pinging osd that serves lingering tid " << p->first
<< " (osd." << op->session->osd << ")" << dendl;
found = true;
if (op->is_watch && op->registered && !op->last_error)
_send_linger_ping(op);
}
- for (map<uint64_t,CommandOp*>::iterator p = s->command_ops.begin();
+ for (auto p = s->command_ops.begin();
p != s->command_ops.end();
++p) {
- CommandOp *op = p->second;
- assert(op->session);
+ auto op = p->second;
+ ceph_assert(op->session);
ldout(cct, 10) << " pinging osd that serves command tid " << p->first
<< " (osd." << op->session->osd << ")" << dendl;
found = true;
if (!toping.empty()) {
// send a ping to these osds, to ensure we detect any session resets
// (osd reply message policy is lossy)
- for (set<OSDSession*>::const_iterator i = toping.begin();
- i != toping.end();
- ++i) {
+ for (auto i = toping.begin(); i != toping.end(); ++i) {
(*i)->con->send_message(new MPing);
}
}
- // Make sure we don't resechedule if we wake up after shutdown
+ // Make sure we don't reschedule if we wake up after shutdown
if (initialized) {
tick_event = timer.reschedule_me(ceph::make_timespan(
cct->_conf->objecter_tick_interval));
ldout(cct, 10) << "resend_mon_ops" << dendl;
- for (map<ceph_tid_t,PoolStatOp*>::iterator p = poolstat_ops.begin();
- p != poolstat_ops.end();
- ++p) {
+ for (auto p = poolstat_ops.begin(); p != poolstat_ops.end(); ++p) {
_poolstat_submit(p->second);
logger->inc(l_osdc_poolstat_resend);
}
- for (map<ceph_tid_t,StatfsOp*>::iterator p = statfs_ops.begin();
- p != statfs_ops.end();
- ++p) {
+ for (auto p = statfs_ops.begin(); p != statfs_ops.end(); ++p) {
_fs_stats_submit(p->second);
logger->inc(l_osdc_statfs_resend);
}
- for (map<ceph_tid_t,PoolOp*>::iterator p = pool_ops.begin();
- p != pool_ops.end();
- ++p) {
+ for (auto p = pool_ops.begin(); p != pool_ops.end(); ++p) {
_pool_op_submit(p->second);
logger->inc(l_osdc_poolop_resend);
}
- for (map<ceph_tid_t, Op*>::iterator p = check_latest_map_ops.begin();
+ for (auto p = check_latest_map_ops.begin();
p != check_latest_map_ops.end();
++p) {
- C_Op_Map_Latest *c = new C_Op_Map_Latest(this, p->second->tid);
- monc->get_version("osdmap", &c->latest, NULL, c);
+ monc->get_version("osdmap", CB_Op_Map_Latest(this, p->second->tid));
}
- for (map<uint64_t, LingerOp*>::iterator p = check_latest_map_lingers.begin();
+ for (auto p = check_latest_map_lingers.begin();
p != check_latest_map_lingers.end();
++p) {
- C_Linger_Map_Latest *c
- = new C_Linger_Map_Latest(this, p->second->linger_id);
- monc->get_version("osdmap", &c->latest, NULL, c);
+ monc->get_version("osdmap", CB_Linger_Map_Latest(this, p->second->linger_id));
}
- for (map<uint64_t, CommandOp*>::iterator p
- = check_latest_map_commands.begin();
+ for (auto p = check_latest_map_commands.begin();
p != check_latest_map_commands.end();
++p) {
- C_Command_Map_Latest *c = new C_Command_Map_Latest(this, p->second->tid);
- monc->get_version("osdmap", &c->latest, NULL, c);
+ monc->get_version("osdmap", CB_Command_Map_Latest(this, p->second->tid));
}
}
_op_submit_with_budget(op, rl, ptid, ctx_budget);
}
-void Objecter::_op_submit_with_budget(Op *op, shunique_lock& sul,
+void Objecter::_op_submit_with_budget(Op *op,
+ shunique_lock<ceph::shared_mutex>& sul,
ceph_tid_t *ptid,
int *ctx_budget)
{
- assert(initialized);
+ ceph_assert(initialized);
- assert(op->ops.size() == op->out_bl.size());
- assert(op->ops.size() == op->out_rval.size());
- assert(op->ops.size() == op->out_handler.size());
+ ceph_assert(op->ops.size() == op->out_bl.size());
+ ceph_assert(op->ops.size() == op->out_rval.size());
+ ceph_assert(op->ops.size() == op->out_handler.size());
// throttle. before we look at any state, because
// _take_op_budget() may drop our lock while it blocks.
inflight_ops++;
// add to gather set(s)
- if (op->onfinish) {
+ if (op->has_completion()) {
num_in_flight++;
} else {
ldout(cct, 20) << " note: not requesting reply" << dendl;
logger->inc(l_osdc_op_active);
logger->inc(l_osdc_op);
+ logger->inc(l_osdc_oplen_avg, op->ops.size());
if ((op->target.flags & (CEPH_OSD_FLAG_READ | CEPH_OSD_FLAG_WRITE)) ==
(CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE))
if (op->target.flags & CEPH_OSD_FLAG_PGOP)
logger->inc(l_osdc_op_pg);
- for (vector<OSDOp>::iterator p = op->ops.begin(); p != op->ops.end(); ++p) {
+ for (auto p = op->ops.begin(); p != op->ops.end(); ++p) {
int code = l_osdc_osdop_other;
switch (p->op.op) {
case CEPH_OSD_OP_STAT: code = l_osdc_osdop_stat; break;
case CEPH_OSD_OP_CMPXATTR: code = l_osdc_osdop_cmpxattr; break;
case CEPH_OSD_OP_RMXATTR: code = l_osdc_osdop_rmxattr; break;
case CEPH_OSD_OP_RESETXATTRS: code = l_osdc_osdop_resetxattrs; break;
- case CEPH_OSD_OP_TMAPUP: code = l_osdc_osdop_tmap_up; break;
- case CEPH_OSD_OP_TMAPPUT: code = l_osdc_osdop_tmap_put; break;
- case CEPH_OSD_OP_TMAPGET: code = l_osdc_osdop_tmap_get; break;
// OMAP read operations
case CEPH_OSD_OP_OMAPGETVALS:
}
}
-void Objecter::_op_submit(Op *op, shunique_lock& sul, ceph_tid_t *ptid)
+void Objecter::_op_submit(Op *op, shunique_lock<ceph::shared_mutex>& sul, ceph_tid_t *ptid)
{
// rwlock is locked
ldout(cct, 10) << __func__ << " op " << op << dendl;
// pick target
- assert(op->session == NULL);
+ ceph_assert(op->session == NULL);
OSDSession *s = NULL;
- bool check_for_latest_map = _calc_target(&op->target, nullptr)
- == RECALC_OP_TARGET_POOL_DNE;
+ bool check_for_latest_map = false;
+ int r = _calc_target(&op->target, nullptr);
+ switch(r) {
+ case RECALC_OP_TARGET_POOL_DNE:
+ check_for_latest_map = true;
+ break;
+ case RECALC_OP_TARGET_POOL_EIO:
+ if (op->has_completion()) {
+ op->complete(osdc_errc::pool_eio, -EIO);
+ }
+ return;
+ }
// Try to get a session, including a retry if we need to take write lock
- int r = _get_session(op->target.osd, &s, sul);
+ r = _get_session(op->target.osd, &s, sul);
if (r == -EAGAIN ||
- (check_for_latest_map && sul.owns_lock_shared())) {
+ (check_for_latest_map && sul.owns_lock_shared()) ||
+ cct->_conf->objecter_debug_inject_relock_delay) {
epoch_t orig_epoch = osdmap->get_epoch();
sul.unlock();
if (cct->_conf->objecter_debug_inject_relock_delay) {
}
}
if (r == -EAGAIN) {
- assert(s == NULL);
+ ceph_assert(s == NULL);
r = _get_session(op->target.osd, &s, sul);
}
- assert(r == 0);
- assert(s); // may be homeless
+ ceph_assert(r == 0);
+ ceph_assert(s); // may be homeless
_send_op_account(op);
// send?
- assert(op->target.flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE));
-
- if (osdmap_full_try) {
- op->target.flags |= CEPH_OSD_FLAG_FULL_TRY;
- }
+ ceph_assert(op->target.flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE));
bool need_send = false;
-
- if (osdmap->get_epoch() < epoch_barrier) {
- ldout(cct, 10) << " barrier, paused " << op << " tid " << op->tid
+ if (op->target.paused) {
+ ldout(cct, 10) << " tid " << op->tid << " op " << op << " is paused"
<< dendl;
- op->target.paused = true;
- _maybe_request_map();
- } else if ((op->target.flags & CEPH_OSD_FLAG_WRITE) &&
- osdmap->test_flag(CEPH_OSDMAP_PAUSEWR)) {
- ldout(cct, 10) << " paused modify " << op << " tid " << op->tid
- << dendl;
- op->target.paused = true;
- _maybe_request_map();
- } else if ((op->target.flags & CEPH_OSD_FLAG_READ) &&
- osdmap->test_flag(CEPH_OSDMAP_PAUSERD)) {
- ldout(cct, 10) << " paused read " << op << " tid " << op->tid
- << dendl;
- op->target.paused = true;
- _maybe_request_map();
- } else if (op->respects_full() &&
- (_osdmap_full_flag() ||
- _osdmap_pool_full(op->target.base_oloc.pool))) {
- ldout(cct, 0) << " FULL, paused modify " << op << " tid "
- << op->tid << dendl;
- op->target.paused = true;
_maybe_request_map();
} else if (!s->is_homeless()) {
need_send = true;
_maybe_request_map();
}
- MOSDOp *m = NULL;
- if (need_send) {
- m = _prepare_osd_op(op);
- }
-
- OSDSession::unique_lock sl(s->lock);
+ unique_lock sl(s->lock);
if (op->tid == 0)
op->tid = ++last_tid;
_session_op_assign(s, op);
if (need_send) {
- _send_op(op, m);
+ _send_op(op);
}
// Last chance to touch Op here, after giving up session lock it can
int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r)
{
- assert(initialized);
+ ceph_assert(initialized);
- OSDSession::unique_lock sl(s->lock);
+ unique_lock sl(s->lock);
- map<ceph_tid_t, Op*>::iterator p = s->ops.find(tid);
+ auto p = s->ops.find(tid);
if (p == s->ops.end()) {
ldout(cct, 10) << __func__ << " tid " << tid << " dne in session "
<< s->osd << dendl;
return -ENOENT;
}
+#if 0
if (s->con) {
- ldout(cct, 20) << " revoking rx buffer for " << tid
+ ldout(cct, 20) << " revoking rx ceph::buffer for " << tid
<< " on " << s->con << dendl;
s->con->revoke_rx_buffer(tid);
}
+#endif
ldout(cct, 10) << __func__ << " tid " << tid << " in session " << s->osd
<< dendl;
Op *op = p->second;
- if (op->onfinish) {
+ if (op->has_completion()) {
num_in_flight--;
- op->onfinish->complete(r);
- op->onfinish = NULL;
+ op->complete(osdcode(r), r);
}
_op_cancel_map_check(op);
_finish_op(op, r);
return ret;
}
+int Objecter::op_cancel(const vector<ceph_tid_t>& tids, int r)
+{
+ unique_lock wl(rwlock);
+ ldout(cct,10) << __func__ << " " << tids << dendl;
+ for (auto tid : tids) {
+ _op_cancel(tid, r);
+ }
+ return 0;
+}
+
int Objecter::_op_cancel(ceph_tid_t tid, int r)
{
int ret = 0;
start:
- for (map<int, OSDSession *>::iterator siter = osd_sessions.begin();
+ for (auto siter = osd_sessions.begin();
siter != osd_sessions.end(); ++siter) {
OSDSession *s = siter->second;
- OSDSession::shared_lock sl(s->lock);
+ shared_lock sl(s->lock);
if (s->ops.find(tid) != s->ops.end()) {
sl.unlock();
ret = op_cancel(s, tid, r);
<< " not found in live sessions" << dendl;
// Handle case where the op is in homeless session
- OSDSession::shared_lock sl(homeless_session->lock);
+ shared_lock sl(homeless_session->lock);
if (homeless_session->ops.find(tid) != homeless_session->ops.end()) {
sl.unlock();
ret = op_cancel(homeless_session, tid, r);
std::vector<ceph_tid_t> to_cancel;
bool found = false;
- for (map<int, OSDSession *>::iterator siter = osd_sessions.begin();
+ for (auto siter = osd_sessions.begin();
siter != osd_sessions.end(); ++siter) {
OSDSession *s = siter->second;
- OSDSession::shared_lock sl(s->lock);
- for (map<ceph_tid_t, Op*>::iterator op_i = s->ops.begin();
+ shared_lock sl(s->lock);
+ for (auto op_i = s->ops.begin();
op_i != s->ops.end(); ++op_i) {
if (op_i->second->target.flags & CEPH_OSD_FLAG_WRITE
&& (pool == -1 || op_i->second->target.target_oloc.pool == pool)) {
}
sl.unlock();
- for (std::vector<ceph_tid_t>::iterator titer = to_cancel.begin();
- titer != to_cancel.end();
- ++titer) {
+ for (auto titer = to_cancel.begin(); titer != to_cancel.end(); ++titer) {
int cancel_result = op_cancel(s, *titer, r);
// We hold rwlock across search and cancellation, so cancels
// should always succeed
- assert(cancel_result == 0);
+ ceph_assert(cancel_result == 0);
}
if (!found && to_cancel.size())
found = true;
const vector<int>& newacting,
bool any_change)
{
- if (OSDMap::primary_changed(
+ if (OSDMap::primary_changed_broken( // https://tracker.ceph.com/issues/43213
oldprimary,
oldacting,
newprimary,
const pg_pool_t *pi = osdmap->get_pg_pool(t->base_oloc.pool);
bool pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) ||
- _osdmap_full_flag() || _osdmap_pool_full(*pi);
+ (t->respects_full() && (_osdmap_full_flag() || _osdmap_pool_full(*pi)));
return (t->flags & CEPH_OSD_FLAG_READ && pauserd) ||
(t->flags & CEPH_OSD_FLAG_WRITE && pausewr) ||
bool Objecter::_osdmap_has_pool_full() const
{
- for (map<int64_t, pg_pool_t>::const_iterator it
- = osdmap->get_pools().begin();
+ for (auto it = osdmap->get_pools().begin();
it != osdmap->get_pools().end(); ++it) {
if (_osdmap_pool_full(it->second))
return true;
return false;
}
-bool Objecter::_osdmap_pool_full(const pg_pool_t &p) const
-{
- return p.has_flag(pg_pool_t::FLAG_FULL) && honor_osdmap_full;
-}
-
/**
* Wrapper around osdmap->test_flag for special handling of the FULL flag.
*/
bool Objecter::_osdmap_full_flag() const
{
- // Ignore the FULL flag if the caller has honor_osdmap_full
- return osdmap->test_flag(CEPH_OSDMAP_FULL) && honor_osdmap_full;
+ // Ignore the FULL flag if the caller does not have honor_osdmap_full
+ return osdmap->test_flag(CEPH_OSDMAP_FULL) && honor_pool_full;
}
void Objecter::update_pool_full_map(map<int64_t, bool>& pool_full_map)
return p->raw_hash_to_pg(p->hash_key(key, ns));
}
+void Objecter::_prune_snapc(
+ const mempool::osdmap::map<int64_t,
+ snap_interval_set_t>& new_removed_snaps,
+ Op *op)
+{
+ bool match = false;
+ auto i = new_removed_snaps.find(op->target.base_pgid.pool());
+ if (i != new_removed_snaps.end()) {
+ for (auto s : op->snapc.snaps) {
+ if (i->second.contains(s)) {
+ match = true;
+ break;
+ }
+ }
+ if (match) {
+ vector<snapid_t> new_snaps;
+ for (auto s : op->snapc.snaps) {
+ if (!i->second.contains(s)) {
+ new_snaps.push_back(s);
+ }
+ }
+ op->snapc.snaps.swap(new_snaps);
+ ldout(cct,10) << __func__ << " op " << op->tid << " snapc " << op->snapc
+ << " (was " << new_snaps << ")" << dendl;
+ }
+ }
+}
+
int Objecter::_calc_target(op_target_t *t, Connection *con, bool any_change)
{
// rwlock is locked
t->osd = -1;
return RECALC_OP_TARGET_POOL_DNE;
}
+
+ if (pi->has_flag(pg_pool_t::FLAG_EIO)) {
+ return RECALC_OP_TARGET_POOL_EIO;
+ }
+
ldout(cct,30) << __func__ << " base pi " << pi
<< " pg_num " << pi->get_pg_num() << dendl;
pg_t pgid;
if (t->precalc_pgid) {
- assert(t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY);
- assert(t->base_oid.name.empty()); // make sure this is a pg op
- assert(t->base_oloc.pool == (int64_t)t->base_pgid.pool());
+ ceph_assert(t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY);
+ ceph_assert(t->base_oid.name.empty()); // make sure this is a pg op
+ ceph_assert(t->base_oloc.pool == (int64_t)t->base_pgid.pool());
pgid = t->base_pgid;
} else {
int ret = osdmap->object_locator_to_pg(t->target_oid, t->target_oloc,
int size = pi->size;
int min_size = pi->min_size;
unsigned pg_num = pi->get_pg_num();
+ unsigned pg_num_mask = pi->get_pg_num_mask();
+ unsigned pg_num_pending = pi->get_pg_num_pending();
int up_primary, acting_primary;
vector<int> up, acting;
- osdmap->pg_to_up_acting_osds(pgid, &up, &up_primary,
- &acting, &acting_primary);
+ ps_t actual_ps = ceph_stable_mod(pgid.ps(), pg_num, pg_num_mask);
+ pg_t actual_pgid(actual_ps, pgid.pool());
+ if (!lookup_pg_mapping(actual_pgid, osdmap->get_epoch(), &up, &up_primary,
+ &acting, &acting_primary)) {
+ osdmap->pg_to_up_acting_osds(actual_pgid, &up, &up_primary,
+ &acting, &acting_primary);
+ pg_mapping_t pg_mapping(osdmap->get_epoch(),
+ up, up_primary, acting, acting_primary);
+ update_pg_mapping(actual_pgid, std::move(pg_mapping));
+ }
bool sort_bitwise = osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE);
bool recovery_deletes = osdmap->test_flag(CEPH_OSDMAP_RECOVERY_DELETES);
unsigned prev_seed = ceph_stable_mod(pgid.ps(), t->pg_num, t->pg_num_mask);
min_size,
t->pg_num,
pg_num,
+ t->pg_num_pending,
+ pg_num_pending,
t->sort_bitwise,
sort_bitwise,
t->recovery_deletes,
recovery_deletes,
+ t->peering_crush_bucket_count,
+ pi->peering_crush_bucket_count,
+ t->peering_crush_bucket_target,
+ pi->peering_crush_bucket_target,
+ t->peering_crush_bucket_barrier,
+ pi->peering_crush_bucket_barrier,
+ t->peering_crush_mandatory_member,
+ pi->peering_crush_mandatory_member,
prev_pgid)) {
force_resend = true;
}
bool unpaused = false;
- if (t->paused && !target_should_be_paused(t)) {
- t->paused = false;
+ bool should_be_paused = target_should_be_paused(t);
+ if (t->paused && !should_be_paused) {
unpaused = true;
}
+ if (t->paused != should_be_paused) {
+ ldout(cct, 10) << __func__ << " paused " << t->paused
+ << " -> " << should_be_paused << dendl;
+ t->paused = should_be_paused;
+ }
bool legacy_change =
t->pgid != pgid ||
is_pg_changed(
t->acting_primary, t->acting, acting_primary, acting,
t->used_replica || any_change);
- bool split = false;
+ bool split_or_merge = false;
if (t->pg_num) {
- split = prev_pgid.is_split(t->pg_num, pg_num, nullptr);
+ split_or_merge =
+ prev_pgid.is_split(t->pg_num, pg_num, nullptr) ||
+ prev_pgid.is_merge_source(t->pg_num, pg_num, nullptr) ||
+ prev_pgid.is_merge_target(t->pg_num, pg_num);
}
- if (legacy_change || split || force_resend) {
+ if (legacy_change || split_or_merge || force_resend) {
t->pgid = pgid;
- t->acting = acting;
+ t->acting = std::move(acting);
t->acting_primary = acting_primary;
t->up_primary = up_primary;
- t->up = up;
+ t->up = std::move(up);
t->size = size;
t->min_size = min_size;
t->pg_num = pg_num;
- t->pg_num_mask = pi->get_pg_num_mask();
- osdmap->get_primary_shard(
- pg_t(ceph_stable_mod(pgid.ps(), t->pg_num, t->pg_num_mask), pgid.pool()),
- &t->actual_pgid);
+ t->pg_num_mask = pg_num_mask;
+ t->pg_num_pending = pg_num_pending;
+ spg_t spgid(actual_pgid);
+ if (pi->is_erasure()) {
+ for (uint8_t i = 0; i < t->acting.size(); ++i) {
+ if (t->acting[i] == acting_primary) {
+ spgid.reset_shard(shard_id_t(i));
+ break;
+ }
+ }
+ }
+ t->actual_pgid = spgid;
t->sort_bitwise = sort_bitwise;
t->recovery_deletes = recovery_deletes;
+ t->peering_crush_bucket_count = pi->peering_crush_bucket_count;
+ t->peering_crush_bucket_target = pi->peering_crush_bucket_target;
+ t->peering_crush_bucket_barrier = pi->peering_crush_bucket_barrier;
+ t->peering_crush_mandatory_member = pi->peering_crush_mandatory_member;
ldout(cct, 10) << __func__ << " "
<< " raw pgid " << pgid << " -> actual " << t->actual_pgid
- << " acting " << acting
+ << " acting " << t->acting
<< " primary " << acting_primary << dendl;
t->used_replica = false;
- if (acting_primary == -1) {
- t->osd = -1;
- } else {
+ if ((t->flags & (CEPH_OSD_FLAG_BALANCE_READS |
+ CEPH_OSD_FLAG_LOCALIZE_READS)) &&
+ !is_write && pi->is_replicated() && t->acting.size() > 1) {
int osd;
- bool read = is_read && !is_write;
- if (read && (t->flags & CEPH_OSD_FLAG_BALANCE_READS)) {
- int p = rand() % acting.size();
+ ceph_assert(is_read && t->acting[0] == acting_primary);
+ if (t->flags & CEPH_OSD_FLAG_BALANCE_READS) {
+ int p = rand() % t->acting.size();
if (p)
t->used_replica = true;
- osd = acting[p];
- ldout(cct, 10) << " chose random osd." << osd << " of " << acting
+ osd = t->acting[p];
+ ldout(cct, 10) << " chose random osd." << osd << " of " << t->acting
<< dendl;
- } else if (read && (t->flags & CEPH_OSD_FLAG_LOCALIZE_READS) &&
- acting.size() > 1) {
+ } else {
// look for a local replica. prefer the primary if the
// distance is the same.
int best = -1;
int best_locality = 0;
- for (unsigned i = 0; i < acting.size(); ++i) {
+ for (unsigned i = 0; i < t->acting.size(); ++i) {
int locality = osdmap->crush->get_common_ancestor_distance(
- cct, acting[i], crush_location);
+ cct, t->acting[i], crush_location);
ldout(cct, 20) << __func__ << " localize: rank " << i
- << " osd." << acting[i]
+ << " osd." << t->acting[i]
<< " locality " << locality << dendl;
if (i == 0 ||
(locality >= 0 && best_locality >= 0 &&
t->used_replica = true;
}
}
- assert(best >= 0);
- osd = acting[best];
- } else {
- osd = acting_primary;
+ ceph_assert(best >= 0);
+ osd = t->acting[best];
}
t->osd = osd;
+ } else {
+ t->osd = acting_primary;
}
}
if (legacy_change || unpaused || force_resend) {
return RECALC_OP_TARGET_NEED_RESEND;
}
- if (split && con && con->has_features(CEPH_FEATUREMASK_RESEND_ON_SPLIT)) {
+ if (split_or_merge &&
+ (osdmap->require_osd_release >= ceph_release_t::luminous ||
+ HAVE_FEATURE(osdmap->get_xinfo(acting_primary).features,
+ RESEND_ON_SPLIT))) {
return RECALC_OP_TARGET_NEED_RESEND;
}
return RECALC_OP_TARGET_NO_ACTION;
}
int Objecter::_map_session(op_target_t *target, OSDSession **s,
- shunique_lock& sul)
+ shunique_lock<ceph::shared_mutex>& sul)
{
_calc_target(target, nullptr);
return _get_session(target->osd, s, sul);
void Objecter::_session_op_assign(OSDSession *to, Op *op)
{
// to->lock is locked
- assert(op->session == NULL);
- assert(op->tid);
+ ceph_assert(op->session == NULL);
+ ceph_assert(op->tid);
get_session(to);
op->session = to;
void Objecter::_session_op_remove(OSDSession *from, Op *op)
{
- assert(op->session == from);
+ ceph_assert(op->session == from);
// from->lock is locked
if (from->is_homeless()) {
void Objecter::_session_linger_op_assign(OSDSession *to, LingerOp *op)
{
// to lock is locked unique
- assert(op->session == NULL);
+ ceph_assert(op->session == NULL);
if (to->is_homeless()) {
num_homeless_ops++;
void Objecter::_session_linger_op_remove(OSDSession *from, LingerOp *op)
{
- assert(from == op->session);
+ ceph_assert(from == op->session);
// from->lock is locked unique
if (from->is_homeless()) {
void Objecter::_session_command_op_remove(OSDSession *from, CommandOp *op)
{
- assert(from == op->session);
+ ceph_assert(from == op->session);
// from->lock is locked
if (from->is_homeless()) {
void Objecter::_session_command_op_assign(OSDSession *to, CommandOp *op)
{
// to->lock is locked
- assert(op->session == NULL);
- assert(op->tid);
+ ceph_assert(op->session == NULL);
+ ceph_assert(op->tid);
if (to->is_homeless()) {
num_homeless_ops++;
}
int Objecter::_recalc_linger_op_target(LingerOp *linger_op,
- shunique_lock& sul)
+ shunique_lock<ceph::shared_mutex>& sul)
{
// rwlock is locked unique
OSDSession *s = NULL;
r = _get_session(linger_op->target.osd, &s, sul);
- assert(r == 0);
+ ceph_assert(r == 0);
if (linger_op->session != s) {
// NB locking two sessions (s and linger_op->session) at the
// same time here is only safe because we are the only one that
- // takes two, and we are holding rwlock for write. Disable
- // lockdep because it doesn't know that.
- OSDSession::unique_lock sl(s->lock);
+ // takes two, and we are holding rwlock for write. We use
+ // std::shared_mutex in OSDSession because lockdep doesn't know
+ // that.
+ unique_lock sl(s->lock);
_session_linger_op_remove(linger_op->session, linger_op);
_session_linger_op_assign(s, linger_op);
}
{
ldout(cct, 15) << "cancel_op " << op->tid << dendl;
- assert(!op->should_resend);
- if (op->onfinish) {
- delete op->onfinish;
+ ceph_assert(!op->should_resend);
+ if (op->has_completion()) {
+ op->onfinish = nullptr;
num_in_flight--;
}
void Objecter::_finish_op(Op *op, int r)
{
- ldout(cct, 15) << "finish_op " << op->tid << dendl;
+ ldout(cct, 15) << __func__ << " " << op->tid << dendl;
// op->session->lock is locked unique or op->session is null
- if (!op->ctx_budgeted && op->budgeted)
- put_op_budget(op);
+ if (!op->ctx_budgeted && op->budget >= 0) {
+ put_op_budget_bytes(op->budget);
+ op->budget = -1;
+ }
if (op->ontimeout && r != -ETIMEDOUT)
timer.cancel_event(op->ontimeout);
logger->dec(l_osdc_op_active);
- assert(check_latest_map_ops.find(op->tid) == check_latest_map_ops.end());
+ ceph_assert(check_latest_map_ops.find(op->tid) == check_latest_map_ops.end());
inflight_ops--;
op->put();
}
-void Objecter::finish_op(OSDSession *session, ceph_tid_t tid)
-{
- ldout(cct, 15) << "finish_op " << tid << dendl;
- shared_lock rl(rwlock);
-
- OSDSession::unique_lock wl(session->lock);
-
- map<ceph_tid_t, Op *>::iterator iter = session->ops.find(tid);
- if (iter == session->ops.end())
- return;
-
- Op *op = iter->second;
-
- _finish_op(op, 0);
-}
-
-MOSDOp *Objecter::_prepare_osd_op(Op *op)
+Objecter::MOSDOp *Objecter::_prepare_osd_op(Op *op)
{
// rwlock is locked
int flags = op->target.flags;
flags |= CEPH_OSD_FLAG_KNOWN_REDIR;
+ flags |= CEPH_OSD_FLAG_SUPPORTSPOOLEIO;
// Nothing checks this any longer, but needed for compatibility with
// pre-luminous osds
flags |= CEPH_OSD_FLAG_ONDISK;
- if (!honor_osdmap_full)
+ if (!honor_pool_full)
flags |= CEPH_OSD_FLAG_FULL_FORCE;
op->target.paused = false;
- op->stamp = ceph::mono_clock::now();
+ op->stamp = ceph::coarse_mono_clock::now();
hobject_t hobj = op->target.get_hobj();
- MOSDOp *m = new MOSDOp(client_inc, op->tid,
- hobj, op->target.actual_pgid,
- osdmap->get_epoch(),
- flags, op->features);
+ auto m = new MOSDOp(client_inc, op->tid,
+ hobj, op->target.actual_pgid,
+ osdmap->get_epoch(),
+ flags, op->features);
m->set_snapid(op->snapid);
m->set_snap_seq(op->snapc.seq);
}
logger->inc(l_osdc_op_send);
- logger->inc(l_osdc_op_send_bytes, m->get_data().length());
+ ssize_t sum = 0;
+ for (unsigned i = 0; i < m->ops.size(); i++) {
+ sum += m->ops[i].indata.length();
+ }
+ logger->inc(l_osdc_op_send_bytes, sum);
return m;
}
-void Objecter::_send_op(Op *op, MOSDOp *m)
+void Objecter::_send_op(Op *op)
{
// rwlock is locked
// op->session->lock is locked
// backoff?
- hobject_t hoid = op->target.get_hobj();
auto p = op->session->backoffs.find(op->target.actual_pgid);
if (p != op->session->backoffs.end()) {
+ hobject_t hoid = op->target.get_hobj();
auto q = p->second.lower_bound(hoid);
if (q != p->second.begin()) {
--q;
}
}
- if (!m) {
- assert(op->tid > 0);
- m = _prepare_osd_op(op);
- }
+ ceph_assert(op->tid > 0);
+ MOSDOp *m = _prepare_osd_op(op);
if (op->target.actual_pgid != m->get_spg()) {
ldout(cct, 10) << __func__ << " " << op->tid << " pgid change from "
<< dendl;
ConnectionRef con = op->session->con;
- assert(con);
+ ceph_assert(con);
- // preallocated rx buffer?
+#if 0
+ // preallocated rx ceph::buffer?
if (op->con) {
- ldout(cct, 20) << " revoking rx buffer for " << op->tid << " on "
+ ldout(cct, 20) << " revoking rx ceph::buffer for " << op->tid << " on "
<< op->con << dendl;
op->con->revoke_rx_buffer(op->tid);
}
if (op->outbl &&
op->ontimeout == 0 && // only post rx_buffer if no timeout; see #9582
op->outbl->length()) {
- ldout(cct, 20) << " posting rx buffer for " << op->tid << " on " << con
+ op->outbl->invalidate_crc(); // messenger writes through c_str()
+ ldout(cct, 20) << " posting rx ceph::buffer for " << op->tid << " on " << con
<< dendl;
op->con = con;
op->con->post_rx_buffer(op->tid, *op->outbl);
}
+#endif
op->incarnation = op->session->incarnation;
- m->set_tid(op->tid);
-
if (op->trace.valid()) {
m->trace.init("op msg", nullptr, &op->trace);
}
op->session->con->send_message(m);
}
-int Objecter::calc_op_budget(Op *op)
+int Objecter::calc_op_budget(const bc::small_vector_base<OSDOp>& ops)
{
int op_budget = 0;
- for (vector<OSDOp>::iterator i = op->ops.begin();
- i != op->ops.end();
- ++i) {
+ for (auto i = ops.begin(); i != ops.end(); ++i) {
if (i->op.op & CEPH_OSD_OP_MODE_WR) {
op_budget += i->indata.length();
} else if (ceph_osd_op_mode_read(i->op.op)) {
- if (ceph_osd_op_type_data(i->op.op)) {
- if ((int64_t)i->op.extent.length > 0)
- op_budget += (int64_t)i->op.extent.length;
+ if (ceph_osd_op_uses_extent(i->op.op)) {
+ if ((int64_t)i->op.extent.length > 0)
+ op_budget += (int64_t)i->op.extent.length;
} else if (ceph_osd_op_type_attr(i->op.op)) {
- op_budget += i->op.xattr.name_len + i->op.xattr.value_len;
+ op_budget += i->op.xattr.name_len + i->op.xattr.value_len;
}
}
}
}
void Objecter::_throttle_op(Op *op,
- shunique_lock& sul,
+ shunique_lock<ceph::shared_mutex>& sul,
int op_budget)
{
- assert(sul && sul.mutex() == &rwlock);
+ ceph_assert(sul && sul.mutex() == &rwlock);
bool locked_for_write = sul.owns_lock();
if (!op_budget)
- op_budget = calc_op_budget(op);
+ op_budget = calc_op_budget(op->ops);
if (!op_throttle_bytes.get_or_fail(op_budget)) { //couldn't take right now
sul.unlock();
op_throttle_bytes.get(op_budget);
}
}
-void Objecter::unregister_op(Op *op)
+int Objecter::take_linger_budget(LingerOp *info)
{
- OSDSession::unique_lock sl(op->session->lock);
- op->session->ops.erase(op->tid);
- sl.unlock();
- put_session(op->session);
- op->session = NULL;
-
- inflight_ops--;
+ return 1;
}
/* This function DOES put the passed message before returning */
}
ConnectionRef con = m->get_connection();
- OSDSession *s = static_cast<OSDSession*>(con->get_priv());
+ auto priv = con->get_priv();
+ auto s = static_cast<OSDSession*>(priv.get());
if (!s || s->con != con) {
ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
- if (s) {
- s->put();
- }
m->put();
return;
}
- OSDSession::unique_lock sl(s->lock);
+ unique_lock sl(s->lock);
map<ceph_tid_t, Op *>::iterator iter = s->ops.find(tid);
if (iter == s->ops.end()) {
" onnvram" : " ack"))
<< " ... stray" << dendl;
sl.unlock();
- put_session(s);
m->put();
return;
}
if (retry_writes_after_first_reply && op->attempts == 1 &&
(op->target.flags & CEPH_OSD_FLAG_WRITE)) {
ldout(cct, 7) << "retrying write after first reply: " << tid << dendl;
- if (op->onfinish) {
+ if (op->has_completion()) {
num_in_flight--;
}
_session_op_remove(s, op);
sl.unlock();
- put_session(s);
_op_submit(op, sul, NULL);
m->put();
<< op->session->con->get_peer_addr() << dendl;
m->put();
sl.unlock();
- put_session(s);
return;
}
} else {
// have, but that is better than doing callbacks out of order.
}
- Context *onfinish = 0;
+ decltype(op->onfinish) onfinish;
int rc = m->get_result();
if (m->is_redirect_reply()) {
ldout(cct, 5) << " got redirect reply; redirecting" << dendl;
- if (op->onfinish)
+ if (op->has_completion())
num_in_flight--;
_session_op_remove(s, op);
sl.unlock();
- put_session(s);
// FIXME: two redirects could race and reorder
op->tid = 0;
m->get_redirect().combine_with_locator(op->target.target_oloc,
op->target.target_oid.name);
- op->target.flags |= CEPH_OSD_FLAG_REDIRECTED;
+ op->target.flags |= (CEPH_OSD_FLAG_REDIRECTED |
+ CEPH_OSD_FLAG_IGNORE_CACHE |
+ CEPH_OSD_FLAG_IGNORE_OVERLAY);
_op_submit(op, sul, NULL);
m->put();
return;
if (rc == -EAGAIN) {
ldout(cct, 7) << " got -EAGAIN, resubmitting" << dendl;
- if (op->onfinish)
+ if (op->has_completion())
num_in_flight--;
_session_op_remove(s, op);
sl.unlock();
- put_session(s);
op->tid = 0;
op->target.flags &= ~(CEPH_OSD_FLAG_BALANCE_READS |
// got data?
if (op->outbl) {
+#if 0
if (op->con)
op->con->revoke_rx_buffer(op->tid);
- m->claim_data(*op->outbl);
+#endif
+ auto& bl = m->get_data();
+ if (op->outbl->length() == bl.length() &&
+ bl.get_num_buffers() <= 1) {
+ // this is here to keep previous users to *relied* on getting data
+ // read into existing buffers happy. Notably,
+ // libradosstriper::RadosStriperImpl::aio_read().
+ ldout(cct,10) << __func__ << " copying resulting " << bl.length()
+ << " into existing ceph::buffer of length " << op->outbl->length()
+ << dendl;
+ cb::list t;
+ t = std::move(*op->outbl);
+ t.invalidate_crc(); // we're overwriting the raw buffers via c_str()
+ bl.begin().copy(bl.length(), t.c_str());
+ op->outbl->substr_of(t, 0, bl.length());
+ } else {
+ m->claim_data(*op->outbl);
+ }
op->outbl = 0;
}
<< " != request ops " << op->ops
<< " from " << m->get_source_inst() << dendl;
- vector<bufferlist*>::iterator pb = op->out_bl.begin();
- vector<int*>::iterator pr = op->out_rval.begin();
- vector<Context*>::iterator ph = op->out_handler.begin();
- assert(op->out_bl.size() == op->out_rval.size());
- assert(op->out_bl.size() == op->out_handler.size());
- vector<OSDOp>::iterator p = out_ops.begin();
+ ceph_assert(op->ops.size() == op->out_bl.size());
+ ceph_assert(op->ops.size() == op->out_rval.size());
+ ceph_assert(op->ops.size() == op->out_ec.size());
+ ceph_assert(op->ops.size() == op->out_handler.size());
+ auto pb = op->out_bl.begin();
+ auto pr = op->out_rval.begin();
+ auto pe = op->out_ec.begin();
+ auto ph = op->out_handler.begin();
+ ceph_assert(op->out_bl.size() == op->out_rval.size());
+ ceph_assert(op->out_bl.size() == op->out_handler.size());
+ auto p = out_ops.begin();
for (unsigned i = 0;
p != out_ops.end() && pb != op->out_bl.end();
- ++i, ++p, ++pb, ++pr, ++ph) {
+ ++i, ++p, ++pb, ++pr, ++pe, ++ph) {
ldout(cct, 10) << " op " << i << " rval " << p->rval
<< " len " << p->outdata.length() << dendl;
if (*pb)
// can change it if e.g. decoding fails
if (*pr)
**pr = ceph_to_hostos_errno(p->rval);
+ if (*pe)
+ **pe = p->rval < 0 ? bs::error_code(-p->rval, osd_category()) :
+ bs::error_code();
if (*ph) {
- ldout(cct, 10) << " op " << i << " handler " << *ph << dendl;
- (*ph)->complete(ceph_to_hostos_errno(p->rval));
- *ph = NULL;
+ std::move((*ph))(p->rval < 0 ?
+ bs::error_code(-p->rval, osd_category()) :
+ bs::error_code(),
+ p->rval, p->outdata);
}
}
// NOTE: we assume that since we only request ONDISK ever we will
// only ever get back one (type of) ack ever.
- if (op->onfinish) {
+ if (op->has_completion()) {
num_in_flight--;
- onfinish = op->onfinish;
- op->onfinish = NULL;
+ onfinish = std::move(op->onfinish);
+ op->onfinish = nullptr;
}
logger->inc(l_osdc_op_reply);
sl.unlock();
// do callbacks
- if (onfinish) {
- onfinish->complete(rc);
+ if (Op::has_completion(onfinish)) {
+ Op::complete(std::move(onfinish), osdcode(rc), rc);
}
if (completion_lock.mutex()) {
completion_lock.unlock();
}
m->put();
- put_session(s);
}
void Objecter::handle_osd_backoff(MOSDBackoff *m)
}
ConnectionRef con = m->get_connection();
- OSDSession *s = static_cast<OSDSession*>(con->get_priv());
+ auto priv = con->get_priv();
+ auto s = static_cast<OSDSession*>(priv.get());
if (!s || s->con != con) {
ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
- if (s)
- s->put();
m->put();
return;
}
get_session(s);
- s->put(); // from get_priv() above
- OSDSession::unique_lock sl(s->lock);
+ unique_lock sl(s->lock);
switch (m->op) {
case CEPH_OSD_BACKOFF_OP_BLOCK:
// ack with original backoff's epoch so that the osd can discard this if
// there was a pg split.
- Message *r = new MOSDBackoff(m->pgid,
- m->map_epoch,
- CEPH_OSD_BACKOFF_OP_ACK_BLOCK,
- m->id, m->begin, m->end);
+ auto r = new MOSDBackoff(m->pgid, m->map_epoch,
+ CEPH_OSD_BACKOFF_OP_ACK_BLOCK,
+ m->id, m->begin, m->end);
// this priority must match the MOSDOps from _prepare_osd_op
r->set_priority(cct->_conf->osd_client_op_priority);
con->send_message(r);
<< " [" << b->begin << "," << b->end
<< ")" << dendl;
auto spgp = s->backoffs.find(b->pgid);
- assert(spgp != s->backoffs.end());
+ ceph_assert(spgp != s->backoffs.end());
spgp->second.erase(b->begin);
if (spgp->second.empty()) {
s->backoffs.erase(spgp);
shared_lock rl(rwlock);
list_context->pos = hobject_t(object_t(), string(), CEPH_NOSNAP,
pos, list_context->pool_id, string());
- ldout(cct, 10) << __func__ << list_context
+ ldout(cct, 10) << __func__ << " " << list_context
<< " pos " << pos << " -> " << list_context->pos << dendl;
pg_t actual = osdmap->raw_pg_to_pg(pg_t(pos, list_context->pool_id));
list_context->current_pg = actual.ps();
op.pg_nls(list_context->max_entries, list_context->filter,
list_context->pos, osdmap->get_epoch());
list_context->bl.clear();
- C_NList *onack = new C_NList(list_context, onfinish, this);
+ auto onack = new C_NList(list_context, onfinish, this);
object_locator_t oloc(list_context->pool_id, list_context->nspace);
// note current_pg in case we don't have (or lose) SORTBITWISE
{
ldout(cct, 10) << __func__ << " " << list_context << dendl;
- bufferlist::iterator iter = list_context->bl.begin();
+ auto iter = list_context->bl.cbegin();
pg_nls_response_t response;
- bufferlist extra_info;
- ::decode(response, iter);
+ decode(response, iter);
if (!iter.end()) {
- ::decode(extra_info, iter);
+ // we do this as legacy.
+ cb::list legacy_extra_info;
+ decode(legacy_extra_info, iter);
}
// if the osd returns 1 (newer code), or handle MAX, it means we
<< ", response.entries " << response.entries
<< ", handle " << response.handle
<< ", tentative new pos " << list_context->pos << dendl;
- list_context->extra_info.append(extra_info);
if (response_size) {
- list_context->list.splice(list_context->list.end(), response.entries);
+ std::move(response.entries.begin(), response.entries.end(),
+ std::back_inserter(list_context->list));
+ response.entries.clear();
}
if (list_context->list.size() >= list_context->max_entries) {
// snapshots
-int Objecter::create_pool_snap(int64_t pool, string& snap_name,
- Context *onfinish)
+void Objecter::create_pool_snap(int64_t pool, std::string_view snap_name,
+ decltype(PoolOp::onfinish)&& onfinish)
{
unique_lock wl(rwlock);
ldout(cct, 10) << "create_pool_snap; pool: " << pool << "; snap: "
<< snap_name << dendl;
const pg_pool_t *p = osdmap->get_pg_pool(pool);
- if (!p)
- return -EINVAL;
- if (p->snap_exists(snap_name.c_str()))
- return -EEXIST;
+ if (!p) {
+ onfinish->defer(std::move(onfinish), osdc_errc::pool_dne, cb::list{});
+ return;
+ }
+ if (p->snap_exists(snap_name)) {
+ onfinish->defer(std::move(onfinish), osdc_errc::snapshot_exists,
+ cb::list{});
+ return;
+ }
- PoolOp *op = new PoolOp;
- if (!op)
- return -ENOMEM;
+ auto op = new PoolOp;
op->tid = ++last_tid;
op->pool = pool;
op->name = snap_name;
- op->onfinish = onfinish;
+ op->onfinish = std::move(onfinish);
op->pool_op = POOL_OP_CREATE_SNAP;
pool_ops[op->tid] = op;
pool_op_submit(op);
-
- return 0;
}
-struct C_SelfmanagedSnap : public Context {
- bufferlist bl;
- snapid_t *psnapid;
- Context *fin;
- C_SelfmanagedSnap(snapid_t *ps, Context *f) : psnapid(ps), fin(f) {}
- void finish(int r) override {
- if (r == 0) {
- bufferlist::iterator p = bl.begin();
- ::decode(*psnapid, p);
+struct CB_SelfmanagedSnap {
+ std::unique_ptr<ca::Completion<void(bs::error_code, snapid_t)>> fin;
+ CB_SelfmanagedSnap(decltype(fin)&& fin)
+ : fin(std::move(fin)) {}
+ void operator()(bs::error_code ec, const cb::list& bl) {
+ snapid_t snapid = 0;
+ if (!ec) {
+ try {
+ auto p = bl.cbegin();
+ decode(snapid, p);
+ } catch (const cb::error& e) {
+ ec = e.code();
+ }
}
- fin->complete(r);
+ fin->defer(std::move(fin), ec, snapid);
}
};
-int Objecter::allocate_selfmanaged_snap(int64_t pool, snapid_t *psnapid,
- Context *onfinish)
+void Objecter::allocate_selfmanaged_snap(
+ int64_t pool,
+ std::unique_ptr<ca::Completion<void(bs::error_code, snapid_t)>> onfinish)
{
unique_lock wl(rwlock);
ldout(cct, 10) << "allocate_selfmanaged_snap; pool: " << pool << dendl;
- PoolOp *op = new PoolOp;
- if (!op) return -ENOMEM;
+ auto op = new PoolOp;
op->tid = ++last_tid;
op->pool = pool;
- C_SelfmanagedSnap *fin = new C_SelfmanagedSnap(psnapid, onfinish);
- op->onfinish = fin;
- op->blp = &fin->bl;
+ op->onfinish = PoolOp::OpComp::create(
+ service.get_executor(),
+ CB_SelfmanagedSnap(std::move(onfinish)));
op->pool_op = POOL_OP_CREATE_UNMANAGED_SNAP;
pool_ops[op->tid] = op;
pool_op_submit(op);
- return 0;
}
-int Objecter::delete_pool_snap(int64_t pool, string& snap_name,
- Context *onfinish)
+void Objecter::delete_pool_snap(
+ int64_t pool, std::string_view snap_name,
+ decltype(PoolOp::onfinish)&& onfinish)
{
unique_lock wl(rwlock);
ldout(cct, 10) << "delete_pool_snap; pool: " << pool << "; snap: "
<< snap_name << dendl;
const pg_pool_t *p = osdmap->get_pg_pool(pool);
- if (!p)
- return -EINVAL;
- if (!p->snap_exists(snap_name.c_str()))
- return -ENOENT;
+ if (!p) {
+ onfinish->defer(std::move(onfinish), osdc_errc::pool_dne, cb::list{});
+ return;
+ }
+
+ if (!p->snap_exists(snap_name)) {
+ onfinish->defer(std::move(onfinish), osdc_errc::snapshot_dne, cb::list{});
+ return;
+ }
- PoolOp *op = new PoolOp;
- if (!op)
- return -ENOMEM;
+ auto op = new PoolOp;
op->tid = ++last_tid;
op->pool = pool;
op->name = snap_name;
- op->onfinish = onfinish;
+ op->onfinish = std::move(onfinish);
op->pool_op = POOL_OP_DELETE_SNAP;
pool_ops[op->tid] = op;
pool_op_submit(op);
-
- return 0;
}
-int Objecter::delete_selfmanaged_snap(int64_t pool, snapid_t snap,
- Context *onfinish)
+void Objecter::delete_selfmanaged_snap(int64_t pool, snapid_t snap,
+ decltype(PoolOp::onfinish)&& onfinish)
{
unique_lock wl(rwlock);
ldout(cct, 10) << "delete_selfmanaged_snap; pool: " << pool << "; snap: "
<< snap << dendl;
- PoolOp *op = new PoolOp;
- if (!op) return -ENOMEM;
+ auto op = new PoolOp;
op->tid = ++last_tid;
op->pool = pool;
- op->onfinish = onfinish;
+ op->onfinish = std::move(onfinish);
op->pool_op = POOL_OP_DELETE_UNMANAGED_SNAP;
op->snapid = snap;
pool_ops[op->tid] = op;
pool_op_submit(op);
-
- return 0;
}
-int Objecter::create_pool(string& name, Context *onfinish, uint64_t auid,
- int crush_rule)
+void Objecter::create_pool(std::string_view name,
+ decltype(PoolOp::onfinish)&& onfinish,
+ int crush_rule)
{
unique_lock wl(rwlock);
ldout(cct, 10) << "create_pool name=" << name << dendl;
- if (osdmap->lookup_pg_pool_name(name) >= 0)
- return -EEXIST;
+ if (osdmap->lookup_pg_pool_name(name) >= 0) {
+ onfinish->defer(std::move(onfinish), osdc_errc::pool_exists, cb::list{});
+ return;
+ }
- PoolOp *op = new PoolOp;
- if (!op)
- return -ENOMEM;
+ auto op = new PoolOp;
op->tid = ++last_tid;
op->pool = 0;
op->name = name;
- op->onfinish = onfinish;
+ op->onfinish = std::move(onfinish);
op->pool_op = POOL_OP_CREATE;
pool_ops[op->tid] = op;
- op->auid = auid;
op->crush_rule = crush_rule;
pool_op_submit(op);
-
- return 0;
}
-int Objecter::delete_pool(int64_t pool, Context *onfinish)
+void Objecter::delete_pool(int64_t pool,
+ decltype(PoolOp::onfinish)&& onfinish)
{
unique_lock wl(rwlock);
ldout(cct, 10) << "delete_pool " << pool << dendl;
if (!osdmap->have_pg_pool(pool))
- return -ENOENT;
-
- _do_delete_pool(pool, onfinish);
- return 0;
+ onfinish->defer(std::move(onfinish), osdc_errc::pool_dne, cb::list{});
+ else
+ _do_delete_pool(pool, std::move(onfinish));
}
-int Objecter::delete_pool(const string &pool_name, Context *onfinish)
+void Objecter::delete_pool(std::string_view pool_name,
+ decltype(PoolOp::onfinish)&& onfinish)
{
unique_lock wl(rwlock);
ldout(cct, 10) << "delete_pool " << pool_name << dendl;
int64_t pool = osdmap->lookup_pg_pool_name(pool_name);
if (pool < 0)
- return pool;
-
- _do_delete_pool(pool, onfinish);
- return 0;
+ // This only returns one error: -ENOENT.
+ onfinish->defer(std::move(onfinish), osdc_errc::pool_dne, cb::list{});
+ else
+ _do_delete_pool(pool, std::move(onfinish));
}
-void Objecter::_do_delete_pool(int64_t pool, Context *onfinish)
+void Objecter::_do_delete_pool(int64_t pool,
+ decltype(PoolOp::onfinish)&& onfinish)
+
{
- PoolOp *op = new PoolOp;
+ auto op = new PoolOp;
op->tid = ++last_tid;
op->pool = pool;
op->name = "delete";
- op->onfinish = onfinish;
+ op->onfinish = std::move(onfinish);
op->pool_op = POOL_OP_DELETE;
pool_ops[op->tid] = op;
pool_op_submit(op);
}
-/**
- * change the auid owner of a pool by contacting the monitor.
- * This requires the current connection to have write permissions
- * on both the pool's current auid and the new (parameter) auid.
- * Uses the standard Context callback when done.
- */
-int Objecter::change_pool_auid(int64_t pool, Context *onfinish, uint64_t auid)
-{
- unique_lock wl(rwlock);
- ldout(cct, 10) << "change_pool_auid " << pool << " to " << auid << dendl;
- PoolOp *op = new PoolOp;
- if (!op) return -ENOMEM;
- op->tid = ++last_tid;
- op->pool = pool;
- op->name = "change_pool_auid";
- op->onfinish = onfinish;
- op->pool_op = POOL_OP_AUID_CHANGE;
- op->auid = auid;
- pool_ops[op->tid] = op;
-
- logger->set(l_osdc_poolop_active, pool_ops.size());
-
- pool_op_submit(op);
- return 0;
-}
-
void Objecter::pool_op_submit(PoolOp *op)
{
// rwlock is locked
// rwlock is locked unique
ldout(cct, 10) << "pool_op_submit " << op->tid << dendl;
- MPoolOp *m = new MPoolOp(monc->get_fsid(), op->tid, op->pool,
- op->name, op->pool_op,
- op->auid, last_seen_osdmap_version);
+ auto m = new MPoolOp(monc->get_fsid(), op->tid, op->pool,
+ op->name, op->pool_op,
+ last_seen_osdmap_version);
if (op->snapid) m->snapid = op->snapid;
if (op->crush_rule) m->crush_rule = op->crush_rule;
monc->send_mon_message(m);
- op->last_submit = ceph::mono_clock::now();
+ op->last_submit = ceph::coarse_mono_clock::now();
logger->inc(l_osdc_poolop_send);
}
/**
* Handle a reply to a PoolOp message. Check that we sent the message
- * and give the caller responsibility for the returned bufferlist.
+ * and give the caller responsibility for the returned cb::list.
* Then either call the finisher or stash the PoolOp, depending on if we
* have a new enough map.
* Lastly, clean up the message and PoolOp.
*/
void Objecter::handle_pool_op_reply(MPoolOpReply *m)
{
- FUNCTRACE();
+ int rc = m->replyCode;
+ auto ec = rc < 0 ? bs::error_code(-rc, mon_category()) : bs::error_code();
+ FUNCTRACE(cct);
shunique_lock sul(rwlock, acquire_shared);
if (!initialized) {
sul.unlock();
ldout(cct, 10) << "handle_pool_op_reply " << *m << dendl;
ceph_tid_t tid = m->get_tid();
- map<ceph_tid_t, PoolOp *>::iterator iter = pool_ops.find(tid);
+ auto iter = pool_ops.find(tid);
if (iter != pool_ops.end()) {
PoolOp *op = iter->second;
ldout(cct, 10) << "have request " << tid << " at " << op << " Op: "
<< ceph_pool_op_name(op->pool_op) << dendl;
- if (op->blp)
- op->blp->claim(m->response_data);
+ cb::list bl{std::move(m->response_data)};
if (m->version > last_seen_osdmap_version)
last_seen_osdmap_version = m->version;
if (osdmap->get_epoch() < m->epoch) {
if (osdmap->get_epoch() < m->epoch) {
ldout(cct, 20) << "waiting for client to reach epoch " << m->epoch
<< " before calling back" << dendl;
- _wait_for_new_map(op->onfinish, m->epoch, m->replyCode);
+ _wait_for_new_map(OpCompletion::create(
+ service.get_executor(),
+ [o = std::move(op->onfinish),
+ bl = std::move(bl)](
+ bs::error_code ec) mutable {
+ o->defer(std::move(o), ec, bl);
+ }),
+ m->epoch,
+ ec);
} else {
// map epoch changed, probably because a MOSDMap message
// sneaked in. Do caller-specified callback now or else
// we lose it forever.
- assert(op->onfinish);
- op->onfinish->complete(m->replyCode);
+ ceph_assert(op->onfinish);
+ op->onfinish->defer(std::move(op->onfinish), ec, std::move(bl));
}
} else {
- assert(op->onfinish);
- op->onfinish->complete(m->replyCode);
+ ceph_assert(op->onfinish);
+ op->onfinish->defer(std::move(op->onfinish), ec, std::move(bl));
}
- op->onfinish = NULL;
+ op->onfinish = nullptr;
if (!sul.owns_lock()) {
sul.unlock();
sul.lock();
int Objecter::pool_op_cancel(ceph_tid_t tid, int r)
{
- assert(initialized);
+ ceph_assert(initialized);
unique_lock wl(rwlock);
- map<ceph_tid_t, PoolOp*>::iterator it = pool_ops.find(tid);
+ auto it = pool_ops.find(tid);
if (it == pool_ops.end()) {
ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
return -ENOENT;
PoolOp *op = it->second;
if (op->onfinish)
- op->onfinish->complete(r);
+ op->onfinish->defer(std::move(op->onfinish), osdcode(r), cb::list{});
_finish_pool_op(op, r);
return 0;
// pool stats
-void Objecter::get_pool_stats(list<string>& pools,
- map<string,pool_stat_t> *result,
- Context *onfinish)
+void Objecter::get_pool_stats(
+ const std::vector<std::string>& pools,
+ decltype(PoolStatOp::onfinish)&& onfinish)
{
ldout(cct, 10) << "get_pool_stats " << pools << dendl;
- PoolStatOp *op = new PoolStatOp;
+ auto op = new PoolStatOp;
op->tid = ++last_tid;
op->pools = pools;
- op->pool_stats = result;
- op->onfinish = onfinish;
+ op->onfinish = std::move(onfinish);
if (mon_timeout > timespan(0)) {
op->ontimeout = timer.add_event(mon_timeout,
[this, op]() {
monc->send_mon_message(new MGetPoolStats(monc->get_fsid(), op->tid,
op->pools,
last_seen_pgmap_version));
- op->last_submit = ceph::mono_clock::now();
+ op->last_submit = ceph::coarse_mono_clock::now();
logger->inc(l_osdc_poolstat_send);
}
return;
}
- map<ceph_tid_t, PoolStatOp *>::iterator iter = poolstat_ops.find(tid);
+ auto iter = poolstat_ops.find(tid);
if (iter != poolstat_ops.end()) {
PoolStatOp *op = poolstat_ops[tid];
ldout(cct, 10) << "have request " << tid << " at " << op << dendl;
- *op->pool_stats = m->pool_stats;
if (m->version > last_seen_pgmap_version) {
last_seen_pgmap_version = m->version;
}
- op->onfinish->complete(0);
+ op->onfinish->defer(std::move(op->onfinish), bs::error_code{},
+ std::move(m->pool_stats), m->per_pool);
_finish_pool_stat_op(op, 0);
} else {
ldout(cct, 10) << "unknown request " << tid << dendl;
int Objecter::pool_stat_op_cancel(ceph_tid_t tid, int r)
{
- assert(initialized);
+ ceph_assert(initialized);
unique_lock wl(rwlock);
- map<ceph_tid_t, PoolStatOp*>::iterator it = poolstat_ops.find(tid);
+ auto it = poolstat_ops.find(tid);
if (it == poolstat_ops.end()) {
ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
return -ENOENT;
ldout(cct, 10) << __func__ << " tid " << tid << dendl;
- PoolStatOp *op = it->second;
+ auto op = it->second;
if (op->onfinish)
- op->onfinish->complete(r);
+ op->onfinish->defer(std::move(op->onfinish), osdcode(r),
+ bc::flat_map<std::string, pool_stat_t>{}, false);
_finish_pool_stat_op(op, r);
return 0;
}
delete op;
}
-void Objecter::get_fs_stats(ceph_statfs& result,
- boost::optional<int64_t> data_pool,
- Context *onfinish)
+void Objecter::get_fs_stats(std::optional<int64_t> poolid,
+ decltype(StatfsOp::onfinish)&& onfinish)
{
ldout(cct, 10) << "get_fs_stats" << dendl;
unique_lock l(rwlock);
- StatfsOp *op = new StatfsOp;
+ auto op = new StatfsOp;
op->tid = ++last_tid;
- op->stats = &result;
- op->data_pool = data_pool;
- op->onfinish = onfinish;
+ op->data_pool = poolid;
+ op->onfinish = std::move(onfinish);
if (mon_timeout > timespan(0)) {
op->ontimeout = timer.add_event(mon_timeout,
[this, op]() {
monc->send_mon_message(new MStatfs(monc->get_fsid(), op->tid,
op->data_pool,
last_seen_pgmap_version));
- op->last_submit = ceph::mono_clock::now();
+ op->last_submit = ceph::coarse_mono_clock::now();
logger->inc(l_osdc_statfs_send);
}
if (statfs_ops.count(tid)) {
StatfsOp *op = statfs_ops[tid];
ldout(cct, 10) << "have request " << tid << " at " << op << dendl;
- *(op->stats) = m->h.st;
if (m->h.version > last_seen_pgmap_version)
last_seen_pgmap_version = m->h.version;
- op->onfinish->complete(0);
+ op->onfinish->defer(std::move(op->onfinish), bs::error_code{}, m->h.st);
_finish_statfs_op(op, 0);
} else {
ldout(cct, 10) << "unknown request " << tid << dendl;
int Objecter::statfs_op_cancel(ceph_tid_t tid, int r)
{
- assert(initialized);
+ ceph_assert(initialized);
unique_lock wl(rwlock);
- map<ceph_tid_t, StatfsOp*>::iterator it = statfs_ops.find(tid);
+ auto it = statfs_ops.find(tid);
if (it == statfs_ops.end()) {
ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
return -ENOENT;
ldout(cct, 10) << __func__ << " tid " << tid << dendl;
- StatfsOp *op = it->second;
+ auto op = it->second;
if (op->onfinish)
- op->onfinish->complete(r);
+ op->onfinish->defer(std::move(op->onfinish), osdcode(r), ceph_statfs{});
_finish_statfs_op(op, r);
return 0;
}
// scatter/gather
void Objecter::_sg_read_finish(vector<ObjectExtent>& extents,
- vector<bufferlist>& resultbl,
- bufferlist *bl, Context *onfinish)
+ vector<cb::list>& resultbl,
+ cb::list *bl, Context *onfinish)
{
// all done
ldout(cct, 15) << "_sg_read_finish" << dendl;
if (extents.size() > 1) {
Striper::StripedReadResult r;
- vector<bufferlist>::iterator bit = resultbl.begin();
- for (vector<ObjectExtent>::iterator eit = extents.begin();
+ auto bit = resultbl.begin();
+ for (auto eit = extents.begin();
eit != extents.end();
++eit, ++bit) {
r.add_partial_result(cct, *bit, eit->buffer_extents);
r.assemble_result(cct, *bl, false);
} else {
ldout(cct, 15) << " only one frag" << dendl;
- bl->claim(resultbl[0]);
+ *bl = std::move(resultbl[0]);
}
// done
if (!initialized)
return false;
if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
- OSDSession *session = static_cast<OSDSession*>(con->get_priv());
+ unique_lock wl(rwlock);
+
+ auto priv = con->get_priv();
+ auto session = static_cast<OSDSession*>(priv.get());
if (session) {
ldout(cct, 1) << "ms_handle_reset " << con << " session " << session
<< " osd." << session->osd << dendl;
- unique_lock wl(rwlock);
- if (!initialized) {
+ // the session maybe had been closed if new osdmap just handled
+ // says the osd down
+ if (!(initialized && osdmap->is_up(session->osd))) {
+ ldout(cct, 1) << "ms_handle_reset aborted,initialized=" << initialized << dendl;
wl.unlock();
return false;
}
map<uint64_t, LingerOp *> lresend;
- OSDSession::unique_lock sl(session->lock);
+ unique_lock sl(session->lock);
_reopen_session(session);
_kick_requests(session, lresend);
sl.unlock();
_linger_ops_resend(lresend, wl);
wl.unlock();
maybe_request_map();
- session->put();
}
return true;
}
return false;
}
-bool Objecter::ms_get_authorizer(int dest_type,
- AuthAuthorizer **authorizer,
- bool force_new)
-{
- if (!initialized)
- return false;
- if (dest_type == CEPH_ENTITY_TYPE_MON)
- return true;
- *authorizer = monc->build_authorizer(dest_type);
- return *authorizer != NULL;
-}
-
void Objecter::op_target_t::dump(Formatter *f) const
{
f->dump_stream("pg") << pgid;
void Objecter::_dump_active(OSDSession *s)
{
- for (map<ceph_tid_t,Op*>::iterator p = s->ops.begin();
- p != s->ops.end();
- ++p) {
+ for (auto p = s->ops.begin(); p != s->ops.end(); ++p) {
Op *op = p->second;
ldout(cct, 20) << op->tid << "\t" << op->target.pgid
<< "\tosd." << (op->session ? op->session->osd : -1)
{
ldout(cct, 20) << "dump_active .. " << num_homeless_ops << " homeless"
<< dendl;
- for (map<int, OSDSession *>::iterator siter = osd_sessions.begin();
+ for (auto siter = osd_sessions.begin();
siter != osd_sessions.end(); ++siter) {
- OSDSession *s = siter->second;
- OSDSession::shared_lock sl(s->lock);
+ auto s = siter->second;
+ shared_lock sl(s->lock);
_dump_active(s);
sl.unlock();
}
void Objecter::_dump_ops(const OSDSession *s, Formatter *fmt)
{
- for (map<ceph_tid_t,Op*>::const_iterator p = s->ops.begin();
- p != s->ops.end();
- ++p) {
+ for (auto p = s->ops.begin(); p != s->ops.end(); ++p) {
Op *op = p->second;
+ auto age = std::chrono::duration<double>(ceph::coarse_mono_clock::now() - op->stamp);
fmt->open_object_section("op");
fmt->dump_unsigned("tid", op->tid);
op->target.dump(fmt);
fmt->dump_stream("last_sent") << op->stamp;
+ fmt->dump_float("age", age.count());
fmt->dump_int("attempts", op->attempts);
fmt->dump_stream("snapid") << op->snapid;
fmt->dump_stream("snap_context") << op->snapc;
fmt->dump_stream("mtime") << op->mtime;
fmt->open_array_section("osd_ops");
- for (vector<OSDOp>::const_iterator it = op->ops.begin();
- it != op->ops.end();
- ++it) {
+ for (auto it = op->ops.begin(); it != op->ops.end(); ++it) {
fmt->dump_stream("osd_op") << *it;
}
fmt->close_section(); // osd_ops array
{
// Read-lock on Objecter held
fmt->open_array_section("ops");
- for (map<int, OSDSession *>::const_iterator siter = osd_sessions.begin();
+ for (auto siter = osd_sessions.begin();
siter != osd_sessions.end(); ++siter) {
OSDSession *s = siter->second;
- OSDSession::shared_lock sl(s->lock);
+ shared_lock sl(s->lock);
_dump_ops(s, fmt);
sl.unlock();
}
void Objecter::_dump_linger_ops(const OSDSession *s, Formatter *fmt)
{
- for (map<uint64_t, LingerOp*>::const_iterator p = s->linger_ops.begin();
- p != s->linger_ops.end();
- ++p) {
- LingerOp *op = p->second;
+ for (auto p = s->linger_ops.begin(); p != s->linger_ops.end(); ++p) {
+ auto op = p->second;
fmt->open_object_section("linger_op");
fmt->dump_unsigned("linger_id", op->linger_id);
op->target.dump(fmt);
{
// We have a read-lock on the objecter
fmt->open_array_section("linger_ops");
- for (map<int, OSDSession *>::const_iterator siter = osd_sessions.begin();
+ for (auto siter = osd_sessions.begin();
siter != osd_sessions.end(); ++siter) {
- OSDSession *s = siter->second;
- OSDSession::shared_lock sl(s->lock);
+ auto s = siter->second;
+ shared_lock sl(s->lock);
_dump_linger_ops(s, fmt);
sl.unlock();
}
void Objecter::_dump_command_ops(const OSDSession *s, Formatter *fmt)
{
- for (map<uint64_t, CommandOp*>::const_iterator p = s->command_ops.begin();
- p != s->command_ops.end();
- ++p) {
- CommandOp *op = p->second;
+ for (auto p = s->command_ops.begin(); p != s->command_ops.end(); ++p) {
+ auto op = p->second;
fmt->open_object_section("command_op");
fmt->dump_unsigned("command_id", op->tid);
fmt->dump_int("osd", op->session ? op->session->osd : -1);
fmt->open_array_section("command");
- for (vector<string>::const_iterator q = op->cmd.begin();
- q != op->cmd.end(); ++q)
+ for (auto q = op->cmd.begin(); q != op->cmd.end(); ++q)
fmt->dump_string("word", *q);
fmt->close_section();
if (op->target_osd >= 0)
{
// We have a read-lock on the Objecter here
fmt->open_array_section("command_ops");
- for (map<int, OSDSession *>::const_iterator siter = osd_sessions.begin();
+ for (auto siter = osd_sessions.begin();
siter != osd_sessions.end(); ++siter) {
- OSDSession *s = siter->second;
- OSDSession::shared_lock sl(s->lock);
+ auto s = siter->second;
+ shared_lock sl(s->lock);
_dump_command_ops(s, fmt);
sl.unlock();
}
void Objecter::dump_pool_ops(Formatter *fmt) const
{
fmt->open_array_section("pool_ops");
- for (map<ceph_tid_t, PoolOp*>::const_iterator p = pool_ops.begin();
- p != pool_ops.end();
- ++p) {
- PoolOp *op = p->second;
+ for (auto p = pool_ops.begin(); p != pool_ops.end(); ++p) {
+ auto op = p->second;
fmt->open_object_section("pool_op");
fmt->dump_unsigned("tid", op->tid);
fmt->dump_int("pool", op->pool);
fmt->dump_string("name", op->name);
fmt->dump_int("operation_type", op->pool_op);
- fmt->dump_unsigned("auid", op->auid);
fmt->dump_unsigned("crush_rule", op->crush_rule);
fmt->dump_stream("snapid") << op->snapid;
fmt->dump_stream("last_sent") << op->last_submit;
void Objecter::dump_pool_stat_ops(Formatter *fmt) const
{
fmt->open_array_section("pool_stat_ops");
- for (map<ceph_tid_t, PoolStatOp*>::const_iterator p = poolstat_ops.begin();
+ for (auto p = poolstat_ops.begin();
p != poolstat_ops.end();
++p) {
PoolStatOp *op = p->second;
fmt->dump_stream("last_sent") << op->last_submit;
fmt->open_array_section("pools");
- for (list<string>::const_iterator it = op->pools.begin();
- it != op->pools.end();
- ++it) {
- fmt->dump_string("pool", *it);
+ for (const auto& it : op->pools) {
+ fmt->dump_string("pool", it);
}
fmt->close_section(); // pools array
void Objecter::dump_statfs_ops(Formatter *fmt) const
{
fmt->open_array_section("statfs_ops");
- for (map<ceph_tid_t, StatfsOp*>::const_iterator p = statfs_ops.begin();
- p != statfs_ops.end();
- ++p) {
- StatfsOp *op = p->second;
+ for (auto p = statfs_ops.begin(); p != statfs_ops.end(); ++p) {
+ auto op = p->second;
fmt->open_object_section("statfs_op");
fmt->dump_unsigned("tid", op->tid);
fmt->dump_stream("last_sent") << op->last_submit;
{
}
-bool Objecter::RequestStateHook::call(std::string command, cmdmap_t& cmdmap,
- std::string format, bufferlist& out)
+int Objecter::RequestStateHook::call(std::string_view command,
+ const cmdmap_t& cmdmap,
+ const bufferlist&,
+ Formatter *f,
+ std::ostream& ss,
+ cb::list& out)
{
- Formatter *f = Formatter::create(format, "json-pretty", "json-pretty");
shared_lock rl(m_objecter->rwlock);
m_objecter->dump_requests(f);
- f->flush(out);
- delete f;
- return true;
+ return 0;
}
-void Objecter::blacklist_self(bool set)
+void Objecter::blocklist_self(bool set)
{
- ldout(cct, 10) << "blacklist_self " << (set ? "add" : "rm") << dendl;
+ ldout(cct, 10) << "blocklist_self " << (set ? "add" : "rm") << dendl;
vector<string> cmd;
- cmd.push_back("{\"prefix\":\"osd blacklist\", ");
+ cmd.push_back("{\"prefix\":\"osd blocklist\", ");
if (set)
- cmd.push_back("\"blacklistop\":\"add\",");
+ cmd.push_back("\"blocklistop\":\"add\",");
else
- cmd.push_back("\"blacklistop\":\"rm\",");
+ cmd.push_back("\"blocklistop\":\"rm\",");
stringstream ss;
- ss << messenger->get_myaddr();
+ // this is somewhat imprecise in that we are blocklisting our first addr only
+ ss << messenger->get_myaddrs().front().get_legacy_str();
cmd.push_back("\"addr\":\"" + ss.str() + "\"");
- MMonCommand *m = new MMonCommand(monc->get_fsid());
+ auto m = new MMonCommand(monc->get_fsid());
m->cmd = cmd;
+ // NOTE: no fallback to legacy blacklist command implemented here
+ // since this is only used for test code.
+
monc->send_mon_message(m);
}
}
ConnectionRef con = m->get_connection();
- OSDSession *s = static_cast<OSDSession*>(con->get_priv());
+ auto priv = con->get_priv();
+ auto s = static_cast<OSDSession*>(priv.get());
if (!s || s->con != con) {
ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
m->put();
- if (s)
- s->put();
return;
}
- OSDSession::shared_lock sl(s->lock);
- map<ceph_tid_t,CommandOp*>::iterator p = s->command_ops.find(m->get_tid());
+ shared_lock sl(s->lock);
+ auto p = s->command_ops.find(m->get_tid());
if (p == s->command_ops.end()) {
ldout(cct, 10) << "handle_command_reply tid " << m->get_tid()
<< " not found" << dendl;
m->put();
sl.unlock();
- if (s)
- s->put();
return;
}
<< dendl;
m->put();
sl.unlock();
- if (s)
- s->put();
return;
}
- if (c->poutbl) {
- c->poutbl->claim(m->get_data());
+
+ if (m->r == -EAGAIN) {
+ ldout(cct,10) << __func__ << " tid " << m->get_tid()
+ << " got EAGAIN, requesting map and resending" << dendl;
+ // NOTE: This might resend twice... once now, and once again when
+ // we get an updated osdmap and the PG is found to have moved.
+ _maybe_request_map();
+ _send_command(c);
+ m->put();
+ sl.unlock();
+ return;
}
sl.unlock();
+ unique_lock sul(s->lock);
+ _finish_command(c, m->r < 0 ? bs::error_code(-m->r, osd_category()) :
+ bs::error_code(), std::move(m->rs),
+ std::move(m->get_data()));
+ sul.unlock();
- _finish_command(c, m->r, m->rs);
m->put();
- if (s)
- s->put();
}
+Objecter::LingerOp::LingerOp(Objecter *o, uint64_t linger_id)
+ : objecter(o),
+ linger_id(linger_id),
+ watch_lock(ceph::make_shared_mutex(
+ fmt::format("LingerOp::watch_lock #{}", linger_id)))
+{}
+
void Objecter::submit_command(CommandOp *c, ceph_tid_t *ptid)
{
shunique_lock sul(rwlock, ceph::acquire_unique);
c->tid = tid;
{
- OSDSession::unique_lock hs_wl(homeless_session->lock);
+ unique_lock hs_wl(homeless_session->lock);
_session_command_op_assign(homeless_session, c);
}
if (osd_timeout > timespan(0)) {
c->ontimeout = timer.add_event(osd_timeout,
[this, c, tid]() {
- command_op_cancel(c->session, tid,
- -ETIMEDOUT); });
+ command_op_cancel(
+ c->session, tid,
+ osdc_errc::timed_out); });
}
if (!c->session->is_homeless()) {
}
if (c->map_check_error)
_send_command_map_check(c);
- *ptid = tid;
+ if (ptid)
+ *ptid = tid;
logger->inc(l_osdc_command_active);
}
-int Objecter::_calc_command_target(CommandOp *c, shunique_lock& sul)
+int Objecter::_calc_command_target(CommandOp *c,
+ shunique_lock<ceph::shared_mutex>& sul)
{
- assert(sul.owns_lock() && sul.mutex() == &rwlock);
+ ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
c->map_check_error = 0;
OSDSession *s;
int r = _get_session(c->target.osd, &s, sul);
- assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
+ ceph_assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
if (c->session != s) {
put_session(s);
}
void Objecter::_assign_command_session(CommandOp *c,
- shunique_lock& sul)
+ shunique_lock<ceph::shared_mutex>& sul)
{
- assert(sul.owns_lock() && sul.mutex() == &rwlock);
+ ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
OSDSession *s;
int r = _get_session(c->target.osd, &s, sul);
- assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
+ ceph_assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
if (c->session != s) {
if (c->session) {
OSDSession *cs = c->session;
- OSDSession::unique_lock csl(cs->lock);
+ unique_lock csl(cs->lock);
_session_command_op_remove(c->session, c);
csl.unlock();
}
- OSDSession::unique_lock sl(s->lock);
+ unique_lock sl(s->lock);
_session_command_op_assign(s, c);
}
void Objecter::_send_command(CommandOp *c)
{
ldout(cct, 10) << "_send_command " << c->tid << dendl;
- assert(c->session);
- assert(c->session->con);
- MCommand *m = new MCommand(monc->monmap.fsid);
+ ceph_assert(c->session);
+ ceph_assert(c->session->con);
+ auto m = new MCommand(monc->monmap.fsid);
m->cmd = c->cmd;
m->set_data(c->inbl);
m->set_tid(c->tid);
logger->inc(l_osdc_command_send);
}
-int Objecter::command_op_cancel(OSDSession *s, ceph_tid_t tid, int r)
+int Objecter::command_op_cancel(OSDSession *s, ceph_tid_t tid,
+ bs::error_code ec)
{
- assert(initialized);
+ ceph_assert(initialized);
unique_lock wl(rwlock);
- map<ceph_tid_t, CommandOp*>::iterator it = s->command_ops.find(tid);
+ auto it = s->command_ops.find(tid);
if (it == s->command_ops.end()) {
ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
return -ENOENT;
CommandOp *op = it->second;
_command_cancel_map_check(op);
- _finish_command(op, r, "");
+ unique_lock sl(op->session->lock);
+ _finish_command(op, ec, {}, {});
+ sl.unlock();
return 0;
}
-void Objecter::_finish_command(CommandOp *c, int r, string rs)
+void Objecter::_finish_command(CommandOp *c, bs::error_code ec,
+ string&& rs, cb::list&& bl)
{
// rwlock is locked unique
+ // session lock is locked
- ldout(cct, 10) << "_finish_command " << c->tid << " = " << r << " "
+ ldout(cct, 10) << "_finish_command " << c->tid << " = " << ec << " "
<< rs << dendl;
- if (c->prs)
- *c->prs = rs;
+
if (c->onfinish)
- c->onfinish->complete(r);
+ c->onfinish->defer(std::move(c->onfinish), ec, std::move(rs), std::move(bl));
- if (c->ontimeout && r != -ETIMEDOUT)
+ if (c->ontimeout && ec != bs::errc::timed_out)
timer.cancel_event(c->ontimeout);
- OSDSession *s = c->session;
- OSDSession::unique_lock sl(s->lock);
_session_command_op_remove(c->session, c);
- sl.unlock();
c->put();
{
// Caller is responsible for re-assigning or
// destroying any ops that were assigned to us
- assert(ops.empty());
- assert(linger_ops.empty());
- assert(command_ops.empty());
+ ceph_assert(ops.empty());
+ ceph_assert(linger_ops.empty());
+ ceph_assert(command_ops.empty());
}
-Objecter::~Objecter()
+Objecter::Objecter(CephContext *cct,
+ Messenger *m, MonClient *mc,
+ boost::asio::io_context& service) :
+ Dispatcher(cct), messenger(m), monc(mc), service(service)
{
- delete osdmap;
+ mon_timeout = cct->_conf.get_val<std::chrono::seconds>("rados_mon_op_timeout");
+ osd_timeout = cct->_conf.get_val<std::chrono::seconds>("rados_osd_op_timeout");
+}
- assert(homeless_session->get_nref() == 1);
- assert(num_homeless_ops == 0);
+Objecter::~Objecter()
+{
+ ceph_assert(homeless_session->get_nref() == 1);
+ ceph_assert(num_homeless_ops == 0);
homeless_session->put();
- assert(osd_sessions.empty());
- assert(poolstat_ops.empty());
- assert(statfs_ops.empty());
- assert(pool_ops.empty());
- assert(waiting_for_map.empty());
- assert(linger_ops.empty());
- assert(check_latest_map_lingers.empty());
- assert(check_latest_map_ops.empty());
- assert(check_latest_map_commands.empty());
+ ceph_assert(osd_sessions.empty());
+ ceph_assert(poolstat_ops.empty());
+ ceph_assert(statfs_ops.empty());
+ ceph_assert(pool_ops.empty());
+ ceph_assert(waiting_for_map.empty());
+ ceph_assert(linger_ops.empty());
+ ceph_assert(check_latest_map_lingers.empty());
+ ceph_assert(check_latest_map_ops.empty());
+ ceph_assert(check_latest_map_commands.empty());
- assert(!m_request_state_hook);
- assert(!logger);
+ ceph_assert(!m_request_state_hook);
+ ceph_assert(!logger);
}
/**
* sending any more operations to OSDs. Use this
* when it is known that the client can't trust
* anything from before this epoch (e.g. due to
- * client blacklist at this epoch).
+ * client blocklist at this epoch).
*/
void Objecter::set_epoch_barrier(epoch_t epoch)
{
return hobject_t::get_max();
}
-struct C_EnumerateReply : public Context {
- bufferlist bl;
-
- Objecter *objecter;
- hobject_t *next;
- std::list<librados::ListObjectImpl> *result;
+template<typename T>
+struct EnumerationContext {
+ Objecter* objecter;
const hobject_t end;
- const int64_t pool_id;
- Context *on_finish;
+ const cb::list filter;
+ uint32_t max;
+ const object_locator_t oloc;
+ std::vector<T> ls;
+private:
+ fu2::unique_function<void(bs::error_code,
+ std::vector<T>,
+ hobject_t) &&> on_finish;
+public:
+ epoch_t epoch = 0;
+ int budget = -1;
+
+ EnumerationContext(Objecter* objecter,
+ hobject_t end, cb::list filter,
+ uint32_t max, object_locator_t oloc,
+ decltype(on_finish) on_finish)
+ : objecter(objecter), end(std::move(end)), filter(std::move(filter)),
+ max(max), oloc(std::move(oloc)), on_finish(std::move(on_finish)) {}
+
+ void operator()(bs::error_code ec,
+ std::vector<T> v,
+ hobject_t h) && {
+ if (budget >= 0) {
+ objecter->put_op_budget_bytes(budget);
+ budget = -1;
+ }
- epoch_t epoch;
- int budget;
+ std::move(on_finish)(ec, std::move(v), std::move(h));
+ }
+};
+
+template<typename T>
+struct CB_EnumerateReply {
+ cb::list bl;
+
+ Objecter* objecter;
+ std::unique_ptr<EnumerationContext<T>> ctx;
- C_EnumerateReply(Objecter *objecter_, hobject_t *next_,
- std::list<librados::ListObjectImpl> *result_,
- const hobject_t end_, const int64_t pool_id_, Context *on_finish_) :
- objecter(objecter_), next(next_), result(result_),
- end(end_), pool_id(pool_id_), on_finish(on_finish_),
- epoch(0), budget(0)
- {}
+ CB_EnumerateReply(Objecter* objecter,
+ std::unique_ptr<EnumerationContext<T>>&& ctx) :
+ objecter(objecter), ctx(std::move(ctx)) {}
- void finish(int r) override {
- objecter->_enumerate_reply(
- bl, r, end, pool_id, budget, epoch, result, next, on_finish);
+ void operator()(bs::error_code ec) {
+ objecter->_enumerate_reply(std::move(bl), ec, std::move(ctx));
}
};
+template<typename T>
void Objecter::enumerate_objects(
- int64_t pool_id,
- const std::string &ns,
- const hobject_t &start,
- const hobject_t &end,
- const uint32_t max,
- const bufferlist &filter_bl,
- std::list<librados::ListObjectImpl> *result,
- hobject_t *next,
- Context *on_finish)
-{
- assert(result);
-
+ int64_t pool_id,
+ std::string_view ns,
+ hobject_t start,
+ hobject_t end,
+ const uint32_t max,
+ const cb::list& filter_bl,
+ fu2::unique_function<void(bs::error_code,
+ std::vector<T>,
+ hobject_t) &&> on_finish) {
if (!end.is_max() && start > end) {
lderr(cct) << __func__ << ": start " << start << " > end " << end << dendl;
- on_finish->complete(-EINVAL);
+ std::move(on_finish)(osdc_errc::precondition_violated, {}, {});
return;
}
if (max < 1) {
lderr(cct) << __func__ << ": result size may not be zero" << dendl;
- on_finish->complete(-EINVAL);
+ std::move(on_finish)(osdc_errc::precondition_violated, {}, {});
return;
}
if (start.is_max()) {
- on_finish->complete(0);
+ std::move(on_finish)({}, {}, {});
return;
}
shared_lock rl(rwlock);
- assert(osdmap->get_epoch());
+ ceph_assert(osdmap->get_epoch());
if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) {
rl.unlock();
lderr(cct) << __func__ << ": SORTBITWISE cluster flag not set" << dendl;
- on_finish->complete(-EOPNOTSUPP);
+ std::move(on_finish)(osdc_errc::not_supported, {}, {});
return;
}
- const pg_pool_t *p = osdmap->get_pg_pool(pool_id);
+ const pg_pool_t* p = osdmap->get_pg_pool(pool_id);
if (!p) {
lderr(cct) << __func__ << ": pool " << pool_id << " DNE in osd epoch "
<< osdmap->get_epoch() << dendl;
rl.unlock();
- on_finish->complete(-ENOENT);
+ std::move(on_finish)(osdc_errc::pool_dne, {}, {});
return;
} else {
rl.unlock();
}
- ldout(cct, 20) << __func__ << ": start=" << start << " end=" << end << dendl;
-
- // Stash completion state
- C_EnumerateReply *on_ack = new C_EnumerateReply(
- this, next, result, end, pool_id, on_finish);
-
+ _issue_enumerate(start,
+ std::make_unique<EnumerationContext<T>>(
+ this, std::move(end), filter_bl,
+ max, object_locator_t{pool_id, ns},
+ std::move(on_finish)));
+}
+
+template
+void Objecter::enumerate_objects<librados::ListObjectImpl>(
+ int64_t pool_id,
+ std::string_view ns,
+ hobject_t start,
+ hobject_t end,
+ const uint32_t max,
+ const cb::list& filter_bl,
+ fu2::unique_function<void(bs::error_code,
+ std::vector<librados::ListObjectImpl>,
+ hobject_t) &&> on_finish);
+
+template
+void Objecter::enumerate_objects<neorados::Entry>(
+ int64_t pool_id,
+ std::string_view ns,
+ hobject_t start,
+ hobject_t end,
+ const uint32_t max,
+ const cb::list& filter_bl,
+ fu2::unique_function<void(bs::error_code,
+ std::vector<neorados::Entry>,
+ hobject_t) &&> on_finish);
+
+
+
+template<typename T>
+void Objecter::_issue_enumerate(hobject_t start,
+ std::unique_ptr<EnumerationContext<T>> ctx) {
ObjectOperation op;
- op.pg_nls(max, filter_bl, start, 0);
+ auto c = ctx.get();
+ op.pg_nls(c->max, c->filter, start, osdmap->get_epoch());
+ auto on_ack = std::make_unique<CB_EnumerateReply<T>>(this, std::move(ctx));
+ // I hate having to do this. Try to find a cleaner way
+ // later.
+ auto epoch = &c->epoch;
+ auto budget = &c->budget;
+ auto pbl = &on_ack->bl;
// Issue. See you later in _enumerate_reply
- object_locator_t oloc(pool_id, ns);
- pg_read(start.get_hash(), oloc, op,
- &on_ack->bl, 0, on_ack, &on_ack->epoch, &on_ack->budget);
-}
-
+ pg_read(start.get_hash(),
+ c->oloc, op, pbl, 0,
+ Op::OpComp::create(service.get_executor(),
+ [c = std::move(on_ack)]
+ (bs::error_code ec) mutable {
+ (*c)(ec);
+ }), epoch, budget);
+}
+
+template
+void Objecter::_issue_enumerate<librados::ListObjectImpl>(
+ hobject_t start,
+ std::unique_ptr<EnumerationContext<librados::ListObjectImpl>> ctx);
+template
+void Objecter::_issue_enumerate<neorados::Entry>(
+ hobject_t start, std::unique_ptr<EnumerationContext<neorados::Entry>> ctx);
+
+template<typename T>
void Objecter::_enumerate_reply(
- bufferlist &bl,
- int r,
- const hobject_t &end,
- const int64_t pool_id,
- int budget,
- epoch_t reply_epoch,
- std::list<librados::ListObjectImpl> *result,
- hobject_t *next,
- Context *on_finish)
-{
- if (budget > 0) {
- put_op_budget_bytes(budget);
- }
-
- if (r < 0) {
- ldout(cct, 4) << __func__ << ": remote error " << r << dendl;
- on_finish->complete(r);
+ cb::list&& bl,
+ bs::error_code ec,
+ std::unique_ptr<EnumerationContext<T>>&& ctx)
+{
+ if (ec) {
+ std::move(*ctx)(ec, {}, {});
return;
}
- assert(next != NULL);
-
// Decode the results
- bufferlist::iterator iter = bl.begin();
- pg_nls_response_t response;
+ auto iter = bl.cbegin();
+ pg_nls_response_template<T> response;
+
+ try {
+ response.decode(iter);
+ if (!iter.end()) {
+ // extra_info isn't used anywhere. We do this solely to preserve
+ // backward compatibility
+ cb::list legacy_extra_info;
+ decode(legacy_extra_info, iter);
+ }
+ } catch (const bs::system_error& e) {
+ std::move(*ctx)(e.code(), {}, {});
+ return;
+ }
- // XXX extra_info doesn't seem used anywhere?
- bufferlist extra_info;
- ::decode(response, iter);
- if (!iter.end()) {
- ::decode(extra_info, iter);
+ shared_lock rl(rwlock);
+ auto pool = osdmap->get_pg_pool(ctx->oloc.get_pool());
+ rl.unlock();
+ if (!pool) {
+ // pool is gone, drop any results which are now meaningless.
+ std::move(*ctx)(osdc_errc::pool_dne, {}, {});
+ return;
}
- ldout(cct, 10) << __func__ << ": got " << response.entries.size()
- << " handle " << response.handle
- << " reply_epoch " << reply_epoch << dendl;
- ldout(cct, 20) << __func__ << ": response.entries.size "
- << response.entries.size() << ", response.entries "
- << response.entries << dendl;
- if (response.handle <= end) {
- *next = response.handle;
+ hobject_t next;
+ if ((response.handle <= ctx->end)) {
+ next = response.handle;
} else {
- ldout(cct, 10) << __func__ << ": adjusted next down to end " << end
- << dendl;
- *next = end;
+ next = ctx->end;
// drop anything after 'end'
- shared_lock rl(rwlock);
- const pg_pool_t *pool = osdmap->get_pg_pool(pool_id);
- if (!pool) {
- // pool is gone, drop any results which are now meaningless.
- rl.unlock();
- on_finish->complete(-ENOENT);
- return;
- }
while (!response.entries.empty()) {
uint32_t hash = response.entries.back().locator.empty() ?
pool->hash_key(response.entries.back().oid,
response.entries.back().locator,
CEPH_NOSNAP,
hash,
- pool_id,
+ ctx->oloc.get_pool(),
response.entries.back().nspace);
- if (last < end)
+ if (last < ctx->end)
break;
- ldout(cct, 20) << __func__ << " dropping item " << last
- << " >= end " << end << dendl;
response.entries.pop_back();
}
- rl.unlock();
}
- if (!response.entries.empty()) {
- result->merge(response.entries);
+
+ if (response.entries.size() <= ctx->max) {
+ ctx->max -= response.entries.size();
+ std::move(response.entries.begin(), response.entries.end(),
+ std::back_inserter(ctx->ls));
+ } else {
+ auto i = response.entries.begin();
+ while (ctx->max > 0) {
+ ctx->ls.push_back(std::move(*i));
+ --(ctx->max);
+ ++i;
+ }
+ uint32_t hash =
+ i->locator.empty() ?
+ pool->hash_key(i->oid, i->nspace) :
+ pool->hash_key(i->locator, i->nspace);
+
+ next = hobject_t{i->oid, i->locator,
+ CEPH_NOSNAP,
+ hash,
+ ctx->oloc.get_pool(),
+ i->nspace};
}
- // release the listing context's budget once all
- // OPs (in the session) are finished
-#if 0
- put_nlist_context_budget(list_context);
-#endif
- on_finish->complete(r);
- return;
+ if (next == ctx->end || ctx->max == 0) {
+ std::move(*ctx)(ec, std::move(ctx->ls), std::move(next));
+ } else {
+ _issue_enumerate(next, std::move(ctx));
+ }
}
+template
+void Objecter::_enumerate_reply<librados::ListObjectImpl>(
+ cb::list&& bl,
+ bs::error_code ec,
+ std::unique_ptr<EnumerationContext<librados::ListObjectImpl>>&& ctx);
+
+template
+void Objecter::_enumerate_reply<neorados::Entry>(
+ cb::list&& bl,
+ bs::error_code ec,
+ std::unique_ptr<EnumerationContext<neorados::Entry>>&& ctx);
+
namespace {
using namespace librados;
template <typename T>
- void do_decode(std::vector<T>& items, std::vector<bufferlist>& bls)
+ void do_decode(std::vector<T>& items, std::vector<cb::list>& bls)
{
for (auto bl : bls) {
- auto p = bl.begin();
+ auto p = bl.cbegin();
T t;
decode(t, p);
items.push_back(t);
}
struct C_ObjectOperation_scrub_ls : public Context {
- bufferlist bl;
- uint32_t *interval;
+ cb::list bl;
+ uint32_t* interval;
std::vector<inconsistent_obj_t> *objects = nullptr;
std::vector<inconsistent_snapset_t> *snapsets = nullptr;
- int *rval;
+ int* rval;
- C_ObjectOperation_scrub_ls(uint32_t *interval,
- std::vector<inconsistent_obj_t> *objects,
- int *rval)
+ C_ObjectOperation_scrub_ls(uint32_t* interval,
+ std::vector<inconsistent_obj_t>* objects,
+ int* rval)
: interval(interval), objects(objects), rval(rval) {}
- C_ObjectOperation_scrub_ls(uint32_t *interval,
- std::vector<inconsistent_snapset_t> *snapsets,
- int *rval)
+ C_ObjectOperation_scrub_ls(uint32_t* interval,
+ std::vector<inconsistent_snapset_t>* snapsets,
+ int* rval)
: interval(interval), snapsets(snapsets), rval(rval) {}
void finish(int r) override {
if (r < 0 && r != -EAGAIN) {
try {
decode();
- } catch (buffer::error&) {
+ } catch (cb::error&) {
if (rval)
*rval = -EIO;
}
private:
void decode() {
scrub_ls_result_t result;
- auto p = bl.begin();
+ auto p = bl.cbegin();
result.decode(p);
*interval = result.interval;
if (objects) {
};
template <typename T>
- void do_scrub_ls(::ObjectOperation *op,
+ void do_scrub_ls(::ObjectOperation* op,
const scrub_ls_arg_t& arg,
std::vector<T> *items,
- uint32_t *interval,
- int *rval)
+ uint32_t* interval,
+ int* rval)
{
OSDOp& osd_op = op->add_op(CEPH_OSD_OP_SCRUBLS);
op->flags |= CEPH_OSD_FLAG_PGOP;
- assert(interval);
+ ceph_assert(interval);
arg.encode(osd_op.indata);
unsigned p = op->ops.size() - 1;
- auto *h = new C_ObjectOperation_scrub_ls{interval, items, rval};
- op->out_handler[p] = h;
+ auto h = new C_ObjectOperation_scrub_ls{interval, items, rval};
+ op->set_handler(h);
op->out_bl[p] = &h->bl;
op->out_rval[p] = rval;
}
void ::ObjectOperation::scrub_ls(const librados::object_id_t& start_after,
uint64_t max_to_get,
- std::vector<librados::inconsistent_obj_t> *objects,
- uint32_t *interval,
- int *rval)
+ std::vector<librados::inconsistent_obj_t>* objects,
+ uint32_t* interval,
+ int* rval)
{
scrub_ls_arg_t arg = {*interval, 0, start_after, max_to_get};
do_scrub_ls(this, arg, objects, interval, rval);