l_osdc_osdop_cmpxattr,
l_osdc_osdop_rmxattr,
l_osdc_osdop_resetxattrs,
- l_osdc_osdop_tmap_up,
- l_osdc_osdop_tmap_put,
- l_osdc_osdop_tmap_get,
l_osdc_osdop_call,
l_osdc_osdop_watch,
l_osdc_osdop_notify,
Objecter *m_objecter;
public:
explicit RequestStateHook(Objecter *objecter);
- bool call(std::string command, cmdmap_t& cmdmap, std::string format,
- bufferlist& out) override;
+ bool call(std::string_view command, const cmdmap_t& cmdmap,
+ std::string_view format, bufferlist& out) override;
};
/**
}
-void Objecter::handle_conf_change(const struct md_config_t *conf,
+void Objecter::handle_conf_change(const ConfigProxy& conf,
const std::set <std::string> &changed)
{
if (changed.count("crush_location")) {
*/
void Objecter::init()
{
- assert(!initialized);
+ ceph_assert(!initialized);
if (!logger) {
PerfCountersBuilder pcb(cct, "objecter", l_osdc_first, l_osdc_last);
PerfCountersBuilder::PRIO_CRITICAL);
pcb.add_u64(l_osdc_op_laggy, "op_laggy", "Laggy operations");
pcb.add_u64_counter(l_osdc_op_send, "op_send", "Sent operations");
- pcb.add_u64_counter(l_osdc_op_send_bytes, "op_send_bytes", "Sent data", NULL, 0, unit_t(BYTES));
+ pcb.add_u64_counter(l_osdc_op_send_bytes, "op_send_bytes", "Sent data", NULL, 0, unit_t(UNIT_BYTES));
pcb.add_u64_counter(l_osdc_op_resend, "op_resend", "Resent operations");
pcb.add_u64_counter(l_osdc_op_reply, "op_reply", "Operation reply");
"Remove xattr operations");
pcb.add_u64_counter(l_osdc_osdop_resetxattrs, "osdop_resetxattrs",
"Reset xattr operations");
- pcb.add_u64_counter(l_osdc_osdop_tmap_up, "osdop_tmap_up",
- "TMAP update operations");
- pcb.add_u64_counter(l_osdc_osdop_tmap_put, "osdop_tmap_put",
- "TMAP put operations");
- pcb.add_u64_counter(l_osdc_osdop_tmap_get, "osdop_tmap_get",
- "TMAP get operations");
pcb.add_u64_counter(l_osdc_osdop_call, "osdop_call",
"Call (execute) operations");
pcb.add_u64_counter(l_osdc_osdop_watch, "osdop_watch",
update_crush_location();
- cct->_conf->add_observer(this);
+ cct->_conf.add_observer(this);
initialized = true;
}
void Objecter::shutdown()
{
- assert(initialized);
+ ceph_assert(initialized);
unique_lock wl(rwlock);
initialized = false;
wl.unlock();
- cct->_conf->remove_observer(this);
+ cct->_conf.remove_observer(this);
wl.lock();
map<int,OSDSession*>::iterator p;
void Objecter::_send_linger(LingerOp *info,
shunique_lock& sul)
{
- assert(sul.owns_lock() && sul.mutex() == &rwlock);
+ ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
vector<OSDOp> opv;
Context *oncommit = NULL;
// do not resend this; we will send a new op to reregister
o->should_resend = false;
+ o->ctx_budgeted = true;
if (info->register_tid) {
- // repeat send. cancel old registeration op, if any.
+ // repeat send. cancel old registration op, if any.
OSDSession::unique_lock sl(info->session->lock);
if (info->session->ops.count(info->register_tid)) {
Op *o = info->session->ops[info->register_tid];
_cancel_linger_op(o);
}
sl.unlock();
-
- _op_submit(o, sul, &info->register_tid);
- } else {
- // first send
- _op_submit_with_budget(o, sul, &info->register_tid);
}
+ _op_submit_with_budget(o, sul, &info->register_tid, &info->ctx_budget);
+
logger->inc(l_osdc_linger_send);
}
if (!info->is_watch) {
// make note of the notify_id
- bufferlist::iterator p = outbl.begin();
+ auto p = outbl.cbegin();
try {
- ::decode(info->notify_id, p);
+ decode(info->notify_id, p);
ldout(cct, 10) << "_linger_commit notify_id=" << info->notify_id
<< dendl;
}
int Objecter::_normalize_watch_error(int r)
{
// translate ENOENT -> ENOTCONN so that a delete->disconnection
- // notification and a failure to reconnect becuase we raced with
+ // notification and a failure to reconnect because we raced with
// the delete appear the same to the user.
if (r == -ENOENT)
r = -ENOTCONN;
return;
}
- ceph::mono_time now = ceph::mono_clock::now();
+ ceph::coarse_mono_time now = ceph::coarse_mono_clock::now();
ldout(cct, 10) << __func__ << " " << info->linger_id << " now " << now
<< dendl;
o->target = info->target;
o->should_resend = false;
_send_op_account(o);
- MOSDOp *m = _prepare_osd_op(o);
o->tid = ++last_tid;
_session_op_assign(info->session, o);
- _send_op(o, m);
+ _send_op(o);
info->ping_tid = o->tid;
onack->sent = now;
logger->inc(l_osdc_linger_ping);
}
-void Objecter::_linger_ping(LingerOp *info, int r, mono_time sent,
+void Objecter::_linger_ping(LingerOp *info, int r, ceph::coarse_mono_time sent,
uint32_t register_gen)
{
LingerOp::unique_lock l(info->watch_lock);
{
LingerOp::shared_lock l(info->watch_lock);
- mono_time stamp = info->watch_valid_thru;
+ ceph::coarse_mono_time stamp = info->watch_valid_thru;
if (!info->watch_pending_async.empty())
- stamp = MIN(info->watch_valid_thru, info->watch_pending_async.front());
- auto age = mono_clock::now() - stamp;
+ stamp = std::min(info->watch_valid_thru, info->watch_pending_async.front());
+ auto age = ceph::coarse_mono_clock::now() - stamp;
ldout(cct, 10) << __func__ << " " << info->linger_id
<< " err " << info->last_error
linger_ops.erase(info->linger_id);
linger_ops_set.erase(info);
- assert(linger_ops.size() == linger_ops_set.size());
+ ceph_assert(linger_ops.size() == linger_ops_set.size());
info->canceled = true;
info->put();
const object_locator_t& oloc,
int flags)
{
- LingerOp *info = new LingerOp;
+ LingerOp *info = new LingerOp(this);
info->target.base_oid = oid;
info->target.base_oloc = oloc;
if (info->target.base_oloc.key == oid)
info->target.base_oloc.key.clear();
info->target.flags = flags;
- info->watch_valid_thru = mono_clock::now();
+ info->watch_valid_thru = ceph::coarse_mono_clock::now();
unique_lock l(rwlock);
<< dendl;
linger_ops[info->linger_id] = info;
linger_ops_set.insert(info);
- assert(linger_ops.size() == linger_ops_set.size());
+ ceph_assert(linger_ops.size() == linger_ops_set.size());
info->get(); // for the caller
return info;
info->pobjver = objver;
info->on_reg_commit = oncommit;
+ info->ctx_budget = take_linger_budget(info);
+
shunique_lock sul(rwlock, ceph::acquire_unique);
_linger_submit(info, sul);
logger->inc(l_osdc_linger_active);
info->pobjver = objver;
info->on_reg_commit = onfinish;
+ info->ctx_budget = take_linger_budget(info);
+
shunique_lock sul(rwlock, ceph::acquire_unique);
_linger_submit(info, sul);
logger->inc(l_osdc_linger_active);
void Objecter::_linger_submit(LingerOp *info, shunique_lock& sul)
{
- assert(sul.owns_lock() && sul.mutex() == &rwlock);
- assert(info->linger_id);
+ ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
+ ceph_assert(info->linger_id);
+ ceph_assert(info->ctx_budget != -1); // caller needs to have taken budget already!
// Populate Op::target
OSDSession *s = NULL;
// Create LingerOp<->OSDSession relation
int r = _get_session(info->target.osd, &s, sul);
- assert(r == 0);
+ ceph_assert(r == 0);
OSDSession::unique_lock sl(s->lock);
_session_linger_op_assign(s, info);
sl.unlock();
ldout(cct, 10) << __func__ << " " << *m << dendl;
shared_lock l(rwlock);
- assert(initialized);
+ ceph_assert(initialized);
if (info->canceled) {
l.unlock();
}
// notify completion?
- assert(info->is_watch);
- assert(info->watch_context);
- assert(m->opcode != CEPH_WATCH_EVENT_DISCONNECT);
+ ceph_assert(info->is_watch);
+ ceph_assert(info->watch_context);
+ ceph_assert(m->opcode != CEPH_WATCH_EVENT_DISCONNECT);
l.unlock();
bool Objecter::ms_dispatch(Message *m)
{
ldout(cct, 10) << __func__ << " " << cct << " " << *m << dendl;
- if (!initialized)
- return false;
-
switch (m->get_type()) {
// these we exlusively handle
case CEPH_MSG_OSD_OPREPLY:
return false;
}
-void Objecter::_scan_requests(OSDSession *s,
- bool force_resend,
- bool cluster_full,
- map<int64_t, bool> *pool_full_map,
- map<ceph_tid_t, Op*>& need_resend,
- list<LingerOp*>& need_resend_linger,
- map<ceph_tid_t, CommandOp*>& need_resend_command,
- shunique_lock& sul)
+void Objecter::_scan_requests(
+ OSDSession *s,
+ bool skipped_map,
+ bool cluster_full,
+ map<int64_t, bool> *pool_full_map,
+ map<ceph_tid_t, Op*>& need_resend,
+ list<LingerOp*>& need_resend_linger,
+ map<ceph_tid_t, CommandOp*>& need_resend_command,
+ shunique_lock& sul,
+ const mempool::osdmap::map<int64_t,OSDMap::snap_interval_set_t> *gap_removed_snaps)
{
- assert(sul.owns_lock() && sul.mutex() == &rwlock);
+ ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
list<LingerOp*> unregister_lingers;
map<ceph_tid_t,LingerOp*>::iterator lp = s->linger_ops.begin();
while (lp != s->linger_ops.end()) {
LingerOp *op = lp->second;
- assert(op->session == s);
+ ceph_assert(op->session == s);
// check_linger_pool_dne() may touch linger_ops; prevent iterator
// invalidation
++lp;
(*pool_full_map)[op->target.base_oloc.pool];
switch (r) {
case RECALC_OP_TARGET_NO_ACTION:
- if (!force_resend && !force_resend_writes)
+ if (!skipped_map && !force_resend_writes)
break;
// -- fall-thru --
case RECALC_OP_TARGET_NEED_RESEND:
Op *op = p->second;
++p; // check_op_pool_dne() may touch ops; prevent iterator invalidation
ldout(cct, 10) << " checking op " << op->tid << dendl;
+ _prune_snapc(osdmap->get_new_removed_snaps(), op);
+ if (skipped_map) {
+ _prune_snapc(*gap_removed_snaps, op);
+ }
bool force_resend_writes = cluster_full;
if (pool_full_map)
force_resend_writes = force_resend_writes ||
op->session ? op->session->con.get() : nullptr);
switch (r) {
case RECALC_OP_TARGET_NO_ACTION:
- if (!force_resend && !(force_resend_writes && op->respects_full()))
+ if (!skipped_map && !(force_resend_writes && op->respects_full()))
break;
// -- fall-thru --
case RECALC_OP_TARGET_NEED_RESEND:
- if (op->session) {
- _session_op_remove(op->session, op);
- }
+ _session_op_remove(op->session, op);
need_resend[op->tid] = op;
_op_cancel_map_check(op);
break;
switch (r) {
case RECALC_OP_TARGET_NO_ACTION:
// resend if skipped map; otherwise do nothing.
- if (!force_resend && !force_resend_writes)
+ if (!skipped_map && !force_resend_writes)
break;
// -- fall-thru --
case RECALC_OP_TARGET_NEED_RESEND:
need_resend_command[c->tid] = c;
- if (c->session) {
- _session_command_op_remove(c->session, c);
- }
+ _session_command_op_remove(c->session, c);
_command_cancel_map_check(c);
break;
case RECALC_OP_TARGET_POOL_DNE:
if (!initialized)
return;
- assert(osdmap);
+ ceph_assert(osdmap);
if (m->fsid != monc->get_fsid()) {
ldout(cct, 0) << "handle_osd_map fsid " << m->fsid
update_pool_full_map(pool_full_map);
// check all outstanding requests on every epoch
+ for (auto& i : need_resend) {
+ _prune_snapc(osdmap->get_new_removed_snaps(), i.second);
+ if (skipped_map) {
+ _prune_snapc(m->gap_removed_snaps, i.second);
+ }
+ }
_scan_requests(homeless_session, skipped_map, cluster_full,
&pool_full_map, need_resend,
- need_resend_linger, need_resend_command, sul);
+ need_resend_linger, need_resend_command, sul,
+ &m->gap_removed_snaps);
for (map<int,OSDSession*>::iterator p = osd_sessions.begin();
p != osd_sessions.end(); ) {
OSDSession *s = p->second;
_scan_requests(s, skipped_map, cluster_full,
&pool_full_map, need_resend,
- need_resend_linger, need_resend_command, sul);
+ need_resend_linger, need_resend_command, sul,
+ &m->gap_removed_snaps);
++p;
// osd down or addr change?
if (!osdmap->is_up(s->osd) ||
(s->con &&
- s->con->get_peer_addr() != osdmap->get_inst(s->osd).addr)) {
+ s->con->get_peer_addrs() != osdmap->get_addrs(s->osd))) {
close_session(s);
}
}
- assert(e == osdmap->get_epoch());
+ ceph_assert(e == osdmap->get_epoch());
}
} else {
p != osd_sessions.end(); ++p) {
OSDSession *s = p->second;
_scan_requests(s, false, false, NULL, need_resend,
- need_resend_linger, need_resend_command, sul);
+ need_resend_linger, need_resend_command, sul,
+ nullptr);
}
ldout(cct, 3) << "handle_osd_map decoding full epoch "
<< m->get_last() << dendl;
_scan_requests(homeless_session, false, false, NULL,
need_resend, need_resend_linger,
- need_resend_command, sul);
+ need_resend_command, sul, nullptr);
} else {
ldout(cct, 3) << "handle_osd_map hmm, i want a full map, requesting"
<< dendl;
bool mapped_session = false;
if (!s) {
int r = _map_session(&op->target, &s, sul);
- assert(r == 0);
+ ceph_assert(r == 0);
mapped_session = true;
} else {
get_session(s);
for (list<LingerOp*>::iterator p = need_resend_linger.begin();
p != need_resend_linger.end(); ++p) {
LingerOp *op = *p;
- if (!op->session) {
- _calc_target(&op->target, nullptr);
- OSDSession *s = NULL;
- const int r = _get_session(op->target.osd, &s, sul);
- assert(r == 0);
- assert(s != NULL);
- op->session = s;
- put_session(s);
- }
+ ceph_assert(op->session);
if (!op->session->is_homeless()) {
logger->inc(l_osdc_linger_resend);
_send_linger(op, sul);
<< " concluding pool " << op->target.base_pgid.pool()
<< " dne" << dendl;
if (op->onfinish) {
+ num_in_flight--;
op->onfinish->complete(-ENOENT);
}
OSDSession *s = op->session;
if (s) {
- assert(s != NULL);
- assert(sl->mutex() == &s->lock);
+ ceph_assert(s != NULL);
+ ceph_assert(sl->mutex() == &s->lock);
bool session_locked = sl->owns_lock();
if (!session_locked) {
sl->lock();
*/
int Objecter::_get_session(int osd, OSDSession **session, shunique_lock& sul)
{
- assert(sul && sul.mutex() == &rwlock);
+ ceph_assert(sul && sul.mutex() == &rwlock);
if (osd < 0) {
*session = homeless_session;
}
OSDSession *s = new OSDSession(cct, osd);
osd_sessions[osd] = s;
- s->con = messenger->get_connection(osdmap->get_inst(osd));
- s->con->set_priv(s->get());
+ s->con = messenger->connect_to_osd(osdmap->get_addrs(osd));
+ s->con->set_priv(RefCountedPtr{s});
logger->inc(l_osdc_osd_session_open);
logger->set(l_osdc_osd_sessions, osd_sessions.size());
s->get();
void Objecter::get_session(Objecter::OSDSession *s)
{
- assert(s != NULL);
+ ceph_assert(s != NULL);
if (!s->is_homeless()) {
ldout(cct, 20) << __func__ << " s=" << s << " osd=" << s->osd << " "
// rwlock is locked unique
// s->lock is locked
- entity_inst_t inst = osdmap->get_inst(s->osd);
+ auto addrs = osdmap->get_addrs(s->osd);
ldout(cct, 10) << "reopen_session osd." << s->osd << " session, addr now "
- << inst << dendl;
+ << addrs << dendl;
if (s->con) {
s->con->set_priv(NULL);
s->con->mark_down();
logger->inc(l_osdc_osd_session_close);
}
- s->con = messenger->get_connection(inst);
- s->con->set_priv(s->get());
+ s->con = messenger->connect_to_osd(addrs);
+ s->con->set_priv(RefCountedPtr{s});
s->incarnation++;
logger->inc(l_osdc_osd_session_open);
}
void Objecter::get_latest_version(epoch_t oldest, epoch_t newest, Context *fin)
{
unique_lock wl(rwlock);
- _get_latest_version(oldest, newest, fin);
-}
-
-void Objecter::_get_latest_version(epoch_t oldest, epoch_t newest,
- Context *fin)
-{
- // rwlock is locked unique
if (osdmap->get_epoch() >= newest) {
- ldout(cct, 10) << __func__ << " latest " << newest << ", have it" << dendl;
+ ldout(cct, 10) << __func__ << " latest " << newest << ", have it" << dendl;
+ wl.unlock();
if (fin)
fin->complete(0);
return;
return false;
}
-void Objecter::kick_requests(OSDSession *session)
-{
- ldout(cct, 10) << "kick_requests for osd." << session->osd << dendl;
-
- map<uint64_t, LingerOp *> lresend;
- unique_lock wl(rwlock);
-
- OSDSession::unique_lock sl(session->lock);
- _kick_requests(session, lresend);
- sl.unlock();
-
- _linger_ops_resend(lresend, wl);
-}
-
void Objecter::_kick_requests(OSDSession *session,
map<uint64_t, LingerOp *>& lresend)
{
p != session->ops.end();) {
Op *op = p->second;
++p;
- logger->inc(l_osdc_op_resend);
if (op->should_resend) {
if (!op->target.paused)
resend[op->tid] = op;
}
}
+ logger->inc(l_osdc_op_resend, resend.size());
while (!resend.empty()) {
_send_op(resend.begin()->second);
resend.erase(resend.begin());
}
// resend lingers
+ logger->inc(l_osdc_linger_resend, session->linger_ops.size());
for (map<ceph_tid_t, LingerOp*>::iterator j = session->linger_ops.begin();
j != session->linger_ops.end(); ++j) {
LingerOp *op = j->second;
op->get();
- logger->inc(l_osdc_linger_resend);
- assert(lresend.count(j->first) == 0);
+ ceph_assert(lresend.count(j->first) == 0);
lresend[j->first] = op;
}
// resend commands
+ logger->inc(l_osdc_command_resend, session->command_ops.size());
map<uint64_t,CommandOp*> cresend; // resend in order
for (map<ceph_tid_t, CommandOp*>::iterator k = session->command_ops.begin();
k != session->command_ops.end(); ++k) {
- logger->inc(l_osdc_command_resend);
cresend[k->first] = k->second;
}
while (!cresend.empty()) {
void Objecter::_linger_ops_resend(map<uint64_t, LingerOp *>& lresend,
unique_lock& ul)
{
- assert(ul.owns_lock());
+ ceph_assert(ul.owns_lock());
shunique_lock sul(std::move(ul));
while (!lresend.empty()) {
LingerOp *op = lresend.begin()->second;
op->put();
lresend.erase(lresend.begin());
}
- ul = unique_lock(sul.release_to_unique());
+ ul = sul.release_to_unique();
}
void Objecter::start_tick()
{
- assert(tick_event == 0);
+ ceph_assert(tick_event == 0);
tick_event =
timer.add_event(ceph::make_timespan(cct->_conf->objecter_tick_interval),
&Objecter::tick, this);
// look for laggy requests
- auto cutoff = ceph::mono_clock::now();
+ auto cutoff = ceph::coarse_mono_clock::now();
cutoff -= ceph::make_timespan(cct->_conf->objecter_timeout); // timeout
unsigned laggy_ops = 0;
p != s->ops.end();
++p) {
Op *op = p->second;
- assert(op->session);
+ ceph_assert(op->session);
if (op->stamp < cutoff) {
ldout(cct, 2) << " tid " << p->first << " on osd." << op->session->osd
<< " is laggy" << dendl;
++p) {
LingerOp *op = p->second;
LingerOp::unique_lock wl(op->watch_lock);
- assert(op->session);
+ ceph_assert(op->session);
ldout(cct, 10) << " pinging osd that serves lingering tid " << p->first
<< " (osd." << op->session->osd << ")" << dendl;
found = true;
p != s->command_ops.end();
++p) {
CommandOp *op = p->second;
- assert(op->session);
+ ceph_assert(op->session);
ldout(cct, 10) << " pinging osd that serves command tid " << p->first
<< " (osd." << op->session->osd << ")" << dendl;
found = true;
}
}
- // Make sure we don't resechedule if we wake up after shutdown
+ // Make sure we don't reschedule if we wake up after shutdown
if (initialized) {
tick_event = timer.reschedule_me(ceph::make_timespan(
cct->_conf->objecter_tick_interval));
ceph_tid_t *ptid,
int *ctx_budget)
{
- assert(initialized);
+ ceph_assert(initialized);
- assert(op->ops.size() == op->out_bl.size());
- assert(op->ops.size() == op->out_rval.size());
- assert(op->ops.size() == op->out_handler.size());
+ ceph_assert(op->ops.size() == op->out_bl.size());
+ ceph_assert(op->ops.size() == op->out_rval.size());
+ ceph_assert(op->ops.size() == op->out_handler.size());
// throttle. before we look at any state, because
// _take_op_budget() may drop our lock while it blocks.
case CEPH_OSD_OP_CMPXATTR: code = l_osdc_osdop_cmpxattr; break;
case CEPH_OSD_OP_RMXATTR: code = l_osdc_osdop_rmxattr; break;
case CEPH_OSD_OP_RESETXATTRS: code = l_osdc_osdop_resetxattrs; break;
- case CEPH_OSD_OP_TMAPUP: code = l_osdc_osdop_tmap_up; break;
- case CEPH_OSD_OP_TMAPPUT: code = l_osdc_osdop_tmap_put; break;
- case CEPH_OSD_OP_TMAPGET: code = l_osdc_osdop_tmap_get; break;
// OMAP read operations
case CEPH_OSD_OP_OMAPGETVALS:
ldout(cct, 10) << __func__ << " op " << op << dendl;
// pick target
- assert(op->session == NULL);
+ ceph_assert(op->session == NULL);
OSDSession *s = NULL;
bool check_for_latest_map = _calc_target(&op->target, nullptr)
// Try to get a session, including a retry if we need to take write lock
int r = _get_session(op->target.osd, &s, sul);
if (r == -EAGAIN ||
- (check_for_latest_map && sul.owns_lock_shared())) {
+ (check_for_latest_map && sul.owns_lock_shared()) ||
+ cct->_conf->objecter_debug_inject_relock_delay) {
epoch_t orig_epoch = osdmap->get_epoch();
sul.unlock();
if (cct->_conf->objecter_debug_inject_relock_delay) {
}
}
if (r == -EAGAIN) {
- assert(s == NULL);
+ ceph_assert(s == NULL);
r = _get_session(op->target.osd, &s, sul);
}
- assert(r == 0);
- assert(s); // may be homeless
+ ceph_assert(r == 0);
+ ceph_assert(s); // may be homeless
_send_op_account(op);
// send?
- assert(op->target.flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE));
+ ceph_assert(op->target.flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE));
if (osdmap_full_try) {
op->target.flags |= CEPH_OSD_FLAG_FULL_TRY;
_maybe_request_map();
}
- MOSDOp *m = NULL;
- if (need_send) {
- m = _prepare_osd_op(op);
- }
-
OSDSession::unique_lock sl(s->lock);
if (op->tid == 0)
op->tid = ++last_tid;
_session_op_assign(s, op);
if (need_send) {
- _send_op(op, m);
+ _send_op(op);
}
// Last chance to touch Op here, after giving up session lock it can
int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r)
{
- assert(initialized);
+ ceph_assert(initialized);
OSDSession::unique_lock sl(s->lock);
return -ENOENT;
}
+#if 0
if (s->con) {
ldout(cct, 20) << " revoking rx buffer for " << tid
<< " on " << s->con << dendl;
s->con->revoke_rx_buffer(tid);
}
+#endif
ldout(cct, 10) << __func__ << " tid " << tid << " in session " << s->osd
<< dendl;
int cancel_result = op_cancel(s, *titer, r);
// We hold rwlock across search and cancellation, so cancels
// should always succeed
- assert(cancel_result == 0);
+ ceph_assert(cancel_result == 0);
}
if (!found && to_cancel.size())
found = true;
*/
bool Objecter::_osdmap_full_flag() const
{
- // Ignore the FULL flag if the caller has honor_osdmap_full
+ // Ignore the FULL flag if the caller does not have honor_osdmap_full
return osdmap->test_flag(CEPH_OSDMAP_FULL) && honor_osdmap_full;
}
return p->raw_hash_to_pg(p->hash_key(key, ns));
}
+void Objecter::_prune_snapc(
+ const mempool::osdmap::map<int64_t,
+ OSDMap::snap_interval_set_t>& new_removed_snaps,
+ Op *op)
+{
+ bool match = false;
+ auto i = new_removed_snaps.find(op->target.base_pgid.pool());
+ if (i != new_removed_snaps.end()) {
+ for (auto s : op->snapc.snaps) {
+ if (i->second.contains(s)) {
+ match = true;
+ break;
+ }
+ }
+ if (match) {
+ vector<snapid_t> new_snaps;
+ for (auto s : op->snapc.snaps) {
+ if (!i->second.contains(s)) {
+ new_snaps.push_back(s);
+ }
+ }
+ op->snapc.snaps.swap(new_snaps);
+ ldout(cct,10) << __func__ << " op " << op->tid << " snapc " << op->snapc
+ << " (was " << new_snaps << ")" << dendl;
+ }
+ }
+}
+
int Objecter::_calc_target(op_target_t *t, Connection *con, bool any_change)
{
// rwlock is locked
pg_t pgid;
if (t->precalc_pgid) {
- assert(t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY);
- assert(t->base_oid.name.empty()); // make sure this is a pg op
- assert(t->base_oloc.pool == (int64_t)t->base_pgid.pool());
+ ceph_assert(t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY);
+ ceph_assert(t->base_oid.name.empty()); // make sure this is a pg op
+ ceph_assert(t->base_oloc.pool == (int64_t)t->base_pgid.pool());
pgid = t->base_pgid;
} else {
int ret = osdmap->object_locator_to_pg(t->target_oid, t->target_oloc,
int size = pi->size;
int min_size = pi->min_size;
unsigned pg_num = pi->get_pg_num();
+ unsigned pg_num_pending = pi->get_pg_num_pending();
int up_primary, acting_primary;
vector<int> up, acting;
osdmap->pg_to_up_acting_osds(pgid, &up, &up_primary,
min_size,
t->pg_num,
pg_num,
+ t->pg_num_pending,
+ pg_num_pending,
t->sort_bitwise,
sort_bitwise,
t->recovery_deletes,
is_pg_changed(
t->acting_primary, t->acting, acting_primary, acting,
t->used_replica || any_change);
- bool split = false;
+ bool split_or_merge = false;
if (t->pg_num) {
- split = prev_pgid.is_split(t->pg_num, pg_num, nullptr);
+ split_or_merge =
+ prev_pgid.is_split(t->pg_num, pg_num, nullptr) ||
+ prev_pgid.is_merge_source(t->pg_num, pg_num, nullptr) ||
+ prev_pgid.is_merge_target(t->pg_num, pg_num);
}
- if (legacy_change || split || force_resend) {
+ if (legacy_change || split_or_merge || force_resend) {
t->pgid = pgid;
t->acting = acting;
t->acting_primary = acting_primary;
t->min_size = min_size;
t->pg_num = pg_num;
t->pg_num_mask = pi->get_pg_num_mask();
+ t->pg_num_pending = pg_num_pending;
osdmap->get_primary_shard(
pg_t(ceph_stable_mod(pgid.ps(), t->pg_num, t->pg_num_mask), pgid.pool()),
&t->actual_pgid);
t->used_replica = true;
}
}
- assert(best >= 0);
+ ceph_assert(best >= 0);
osd = acting[best];
} else {
osd = acting_primary;
if (legacy_change || unpaused || force_resend) {
return RECALC_OP_TARGET_NEED_RESEND;
}
- if (split &&
+ if (split_or_merge &&
(osdmap->require_osd_release >= CEPH_RELEASE_LUMINOUS ||
HAVE_FEATURE(osdmap->get_xinfo(acting_primary).features,
RESEND_ON_SPLIT))) {
void Objecter::_session_op_assign(OSDSession *to, Op *op)
{
// to->lock is locked
- assert(op->session == NULL);
- assert(op->tid);
+ ceph_assert(op->session == NULL);
+ ceph_assert(op->tid);
get_session(to);
op->session = to;
void Objecter::_session_op_remove(OSDSession *from, Op *op)
{
- assert(op->session == from);
+ ceph_assert(op->session == from);
// from->lock is locked
if (from->is_homeless()) {
void Objecter::_session_linger_op_assign(OSDSession *to, LingerOp *op)
{
// to lock is locked unique
- assert(op->session == NULL);
+ ceph_assert(op->session == NULL);
if (to->is_homeless()) {
num_homeless_ops++;
void Objecter::_session_linger_op_remove(OSDSession *from, LingerOp *op)
{
- assert(from == op->session);
+ ceph_assert(from == op->session);
// from->lock is locked unique
if (from->is_homeless()) {
void Objecter::_session_command_op_remove(OSDSession *from, CommandOp *op)
{
- assert(from == op->session);
+ ceph_assert(from == op->session);
// from->lock is locked
if (from->is_homeless()) {
void Objecter::_session_command_op_assign(OSDSession *to, CommandOp *op)
{
// to->lock is locked
- assert(op->session == NULL);
- assert(op->tid);
+ ceph_assert(op->session == NULL);
+ ceph_assert(op->tid);
if (to->is_homeless()) {
num_homeless_ops++;
OSDSession *s = NULL;
r = _get_session(linger_op->target.osd, &s, sul);
- assert(r == 0);
+ ceph_assert(r == 0);
if (linger_op->session != s) {
// NB locking two sessions (s and linger_op->session) at the
{
ldout(cct, 15) << "cancel_op " << op->tid << dendl;
- assert(!op->should_resend);
+ ceph_assert(!op->should_resend);
if (op->onfinish) {
delete op->onfinish;
num_in_flight--;
void Objecter::_finish_op(Op *op, int r)
{
- ldout(cct, 15) << "finish_op " << op->tid << dendl;
+ ldout(cct, 15) << __func__ << " " << op->tid << dendl;
// op->session->lock is locked unique or op->session is null
- if (!op->ctx_budgeted && op->budgeted)
- put_op_budget(op);
+ if (!op->ctx_budgeted && op->budget >= 0) {
+ put_op_budget_bytes(op->budget);
+ op->budget = -1;
+ }
if (op->ontimeout && r != -ETIMEDOUT)
timer.cancel_event(op->ontimeout);
logger->dec(l_osdc_op_active);
- assert(check_latest_map_ops.find(op->tid) == check_latest_map_ops.end());
+ ceph_assert(check_latest_map_ops.find(op->tid) == check_latest_map_ops.end());
inflight_ops--;
op->put();
}
-void Objecter::finish_op(OSDSession *session, ceph_tid_t tid)
-{
- ldout(cct, 15) << "finish_op " << tid << dendl;
- shared_lock rl(rwlock);
-
- OSDSession::unique_lock wl(session->lock);
-
- map<ceph_tid_t, Op *>::iterator iter = session->ops.find(tid);
- if (iter == session->ops.end())
- return;
-
- Op *op = iter->second;
-
- _finish_op(op, 0);
-}
-
MOSDOp *Objecter::_prepare_osd_op(Op *op)
{
// rwlock is locked
flags |= CEPH_OSD_FLAG_FULL_FORCE;
op->target.paused = false;
- op->stamp = ceph::mono_clock::now();
+ op->stamp = ceph::coarse_mono_clock::now();
hobject_t hobj = op->target.get_hobj();
MOSDOp *m = new MOSDOp(client_inc, op->tid,
return m;
}
-void Objecter::_send_op(Op *op, MOSDOp *m)
+void Objecter::_send_op(Op *op)
{
// rwlock is locked
// op->session->lock is locked
}
}
- if (!m) {
- assert(op->tid > 0);
- m = _prepare_osd_op(op);
- }
+ ceph_assert(op->tid > 0);
+ MOSDOp *m = _prepare_osd_op(op);
if (op->target.actual_pgid != m->get_spg()) {
ldout(cct, 10) << __func__ << " " << op->tid << " pgid change from "
<< dendl;
ConnectionRef con = op->session->con;
- assert(con);
+ ceph_assert(con);
+#if 0
// preallocated rx buffer?
if (op->con) {
ldout(cct, 20) << " revoking rx buffer for " << op->tid << " on "
if (op->outbl &&
op->ontimeout == 0 && // only post rx_buffer if no timeout; see #9582
op->outbl->length()) {
+ op->outbl->invalidate_crc(); // messenger writes through c_str()
ldout(cct, 20) << " posting rx buffer for " << op->tid << " on " << con
<< dendl;
op->con = con;
op->con->post_rx_buffer(op->tid, *op->outbl);
}
+#endif
op->incarnation = op->session->incarnation;
- m->set_tid(op->tid);
-
if (op->trace.valid()) {
m->trace.init("op msg", nullptr, &op->trace);
}
op->session->con->send_message(m);
}
-int Objecter::calc_op_budget(Op *op)
+int Objecter::calc_op_budget(const vector<OSDOp>& ops)
{
int op_budget = 0;
- for (vector<OSDOp>::iterator i = op->ops.begin();
- i != op->ops.end();
+ for (vector<OSDOp>::const_iterator i = ops.begin();
+ i != ops.end();
++i) {
if (i->op.op & CEPH_OSD_OP_MODE_WR) {
op_budget += i->indata.length();
shunique_lock& sul,
int op_budget)
{
- assert(sul && sul.mutex() == &rwlock);
+ ceph_assert(sul && sul.mutex() == &rwlock);
bool locked_for_write = sul.owns_lock();
if (!op_budget)
- op_budget = calc_op_budget(op);
+ op_budget = calc_op_budget(op->ops);
if (!op_throttle_bytes.get_or_fail(op_budget)) { //couldn't take right now
sul.unlock();
op_throttle_bytes.get(op_budget);
}
}
-void Objecter::unregister_op(Op *op)
+int Objecter::take_linger_budget(LingerOp *info)
{
- OSDSession::unique_lock sl(op->session->lock);
- op->session->ops.erase(op->tid);
- sl.unlock();
- put_session(op->session);
- op->session = NULL;
-
- inflight_ops--;
+ return 1;
}
/* This function DOES put the passed message before returning */
}
ConnectionRef con = m->get_connection();
- OSDSession *s = static_cast<OSDSession*>(con->get_priv());
+ auto priv = con->get_priv();
+ auto s = static_cast<OSDSession*>(priv.get());
if (!s || s->con != con) {
ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
- if (s) {
- s->put();
- }
m->put();
return;
}
" onnvram" : " ack"))
<< " ... stray" << dendl;
sl.unlock();
- put_session(s);
m->put();
return;
}
}
_session_op_remove(s, op);
sl.unlock();
- put_session(s);
_op_submit(op, sul, NULL);
m->put();
<< op->session->con->get_peer_addr() << dendl;
m->put();
sl.unlock();
- put_session(s);
return;
}
} else {
num_in_flight--;
_session_op_remove(s, op);
sl.unlock();
- put_session(s);
// FIXME: two redirects could race and reorder
num_in_flight--;
_session_op_remove(s, op);
sl.unlock();
- put_session(s);
op->tid = 0;
op->target.flags &= ~(CEPH_OSD_FLAG_BALANCE_READS |
// got data?
if (op->outbl) {
+#if 0
if (op->con)
op->con->revoke_rx_buffer(op->tid);
- m->claim_data(*op->outbl);
+#endif
+ auto& bl = m->get_data();
+ if (op->outbl->length() == bl.length() &&
+ bl.get_num_buffers() <= 1) {
+ // this is here to keep previous users to *relied* on getting data
+ // read into existing buffers happy. Notably,
+ // libradosstriper::RadosStriperImpl::aio_read().
+ ldout(cct,10) << __func__ << " copying resulting " << bl.length()
+ << " into existing buffer of length " << op->outbl->length()
+ << dendl;
+ bufferlist t;
+ t.claim(*op->outbl);
+ t.invalidate_crc(); // we're overwriting the raw buffers via c_str()
+ bl.copy(0, bl.length(), t.c_str());
+ op->outbl->substr_of(t, 0, bl.length());
+ } else {
+ m->claim_data(*op->outbl);
+ }
op->outbl = 0;
}
vector<bufferlist*>::iterator pb = op->out_bl.begin();
vector<int*>::iterator pr = op->out_rval.begin();
vector<Context*>::iterator ph = op->out_handler.begin();
- assert(op->out_bl.size() == op->out_rval.size());
- assert(op->out_bl.size() == op->out_handler.size());
+ ceph_assert(op->out_bl.size() == op->out_rval.size());
+ ceph_assert(op->out_bl.size() == op->out_handler.size());
vector<OSDOp>::iterator p = out_ops.begin();
for (unsigned i = 0;
p != out_ops.end() && pb != op->out_bl.end();
}
m->put();
- put_session(s);
}
void Objecter::handle_osd_backoff(MOSDBackoff *m)
}
ConnectionRef con = m->get_connection();
- OSDSession *s = static_cast<OSDSession*>(con->get_priv());
+ auto priv = con->get_priv();
+ auto s = static_cast<OSDSession*>(priv.get());
if (!s || s->con != con) {
ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
- if (s)
- s->put();
m->put();
return;
}
get_session(s);
- s->put(); // from get_priv() above
OSDSession::unique_lock sl(s->lock);
<< " [" << b->begin << "," << b->end
<< ")" << dendl;
auto spgp = s->backoffs.find(b->pgid);
- assert(spgp != s->backoffs.end());
+ ceph_assert(spgp != s->backoffs.end());
spgp->second.erase(b->begin);
if (spgp->second.empty()) {
s->backoffs.erase(spgp);
shared_lock rl(rwlock);
list_context->pos = hobject_t(object_t(), string(), CEPH_NOSNAP,
pos, list_context->pool_id, string());
- ldout(cct, 10) << __func__ << list_context
+ ldout(cct, 10) << __func__ << " " << list_context
<< " pos " << pos << " -> " << list_context->pos << dendl;
pg_t actual = osdmap->raw_pg_to_pg(pg_t(pos, list_context->pool_id));
list_context->current_pg = actual.ps();
{
ldout(cct, 10) << __func__ << " " << list_context << dendl;
- bufferlist::iterator iter = list_context->bl.begin();
+ auto iter = list_context->bl.cbegin();
pg_nls_response_t response;
bufferlist extra_info;
- ::decode(response, iter);
+ decode(response, iter);
if (!iter.end()) {
- ::decode(extra_info, iter);
+ decode(extra_info, iter);
}
// if the osd returns 1 (newer code), or handle MAX, it means we
C_SelfmanagedSnap(snapid_t *ps, Context *f) : psnapid(ps), fin(f) {}
void finish(int r) override {
if (r == 0) {
- bufferlist::iterator p = bl.begin();
- ::decode(*psnapid, p);
+ try {
+ auto p = bl.cbegin();
+ decode(*psnapid, p);
+ } catch (buffer::error&) {
+ r = -EIO;
+ }
}
fin->complete(r);
}
return 0;
}
-int Objecter::create_pool(string& name, Context *onfinish, uint64_t auid,
+int Objecter::create_pool(string& name, Context *onfinish,
int crush_rule)
{
unique_lock wl(rwlock);
op->onfinish = onfinish;
op->pool_op = POOL_OP_CREATE;
pool_ops[op->tid] = op;
- op->auid = auid;
op->crush_rule = crush_rule;
pool_op_submit(op);
pool_op_submit(op);
}
-/**
- * change the auid owner of a pool by contacting the monitor.
- * This requires the current connection to have write permissions
- * on both the pool's current auid and the new (parameter) auid.
- * Uses the standard Context callback when done.
- */
-int Objecter::change_pool_auid(int64_t pool, Context *onfinish, uint64_t auid)
-{
- unique_lock wl(rwlock);
- ldout(cct, 10) << "change_pool_auid " << pool << " to " << auid << dendl;
- PoolOp *op = new PoolOp;
- if (!op) return -ENOMEM;
- op->tid = ++last_tid;
- op->pool = pool;
- op->name = "change_pool_auid";
- op->onfinish = onfinish;
- op->pool_op = POOL_OP_AUID_CHANGE;
- op->auid = auid;
- pool_ops[op->tid] = op;
-
- logger->set(l_osdc_poolop_active, pool_ops.size());
-
- pool_op_submit(op);
- return 0;
-}
-
void Objecter::pool_op_submit(PoolOp *op)
{
// rwlock is locked
ldout(cct, 10) << "pool_op_submit " << op->tid << dendl;
MPoolOp *m = new MPoolOp(monc->get_fsid(), op->tid, op->pool,
op->name, op->pool_op,
- op->auid, last_seen_osdmap_version);
+ last_seen_osdmap_version);
if (op->snapid) m->snapid = op->snapid;
if (op->crush_rule) m->crush_rule = op->crush_rule;
monc->send_mon_message(m);
- op->last_submit = ceph::mono_clock::now();
+ op->last_submit = ceph::coarse_mono_clock::now();
logger->inc(l_osdc_poolop_send);
}
*/
void Objecter::handle_pool_op_reply(MPoolOpReply *m)
{
- FUNCTRACE();
+ FUNCTRACE(cct);
shunique_lock sul(rwlock, acquire_shared);
if (!initialized) {
sul.unlock();
// map epoch changed, probably because a MOSDMap message
// sneaked in. Do caller-specified callback now or else
// we lose it forever.
- assert(op->onfinish);
+ ceph_assert(op->onfinish);
op->onfinish->complete(m->replyCode);
}
} else {
- assert(op->onfinish);
+ ceph_assert(op->onfinish);
op->onfinish->complete(m->replyCode);
}
op->onfinish = NULL;
int Objecter::pool_op_cancel(ceph_tid_t tid, int r)
{
- assert(initialized);
+ ceph_assert(initialized);
unique_lock wl(rwlock);
monc->send_mon_message(new MGetPoolStats(monc->get_fsid(), op->tid,
op->pools,
last_seen_pgmap_version));
- op->last_submit = ceph::mono_clock::now();
+ op->last_submit = ceph::coarse_mono_clock::now();
logger->inc(l_osdc_poolstat_send);
}
int Objecter::pool_stat_op_cancel(ceph_tid_t tid, int r)
{
- assert(initialized);
+ ceph_assert(initialized);
unique_lock wl(rwlock);
monc->send_mon_message(new MStatfs(monc->get_fsid(), op->tid,
op->data_pool,
last_seen_pgmap_version));
- op->last_submit = ceph::mono_clock::now();
+ op->last_submit = ceph::coarse_mono_clock::now();
logger->inc(l_osdc_statfs_send);
}
int Objecter::statfs_op_cancel(ceph_tid_t tid, int r)
{
- assert(initialized);
+ ceph_assert(initialized);
unique_lock wl(rwlock);
return false;
if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
unique_lock wl(rwlock);
-
- OSDSession *session = static_cast<OSDSession*>(con->get_priv());
+
+ auto priv = con->get_priv();
+ auto session = static_cast<OSDSession*>(priv.get());
if (session) {
ldout(cct, 1) << "ms_handle_reset " << con << " session " << session
<< " osd." << session->osd << dendl;
_linger_ops_resend(lresend, wl);
wl.unlock();
maybe_request_map();
- session->put();
}
return true;
}
}
bool Objecter::ms_get_authorizer(int dest_type,
- AuthAuthorizer **authorizer,
- bool force_new)
+ AuthAuthorizer **authorizer)
{
if (!initialized)
return false;
fmt->dump_int("pool", op->pool);
fmt->dump_string("name", op->name);
fmt->dump_int("operation_type", op->pool_op);
- fmt->dump_unsigned("auid", op->auid);
fmt->dump_unsigned("crush_rule", op->crush_rule);
fmt->dump_stream("snapid") << op->snapid;
fmt->dump_stream("last_sent") << op->last_submit;
{
}
-bool Objecter::RequestStateHook::call(std::string command, cmdmap_t& cmdmap,
- std::string format, bufferlist& out)
+bool Objecter::RequestStateHook::call(std::string_view command,
+ const cmdmap_t& cmdmap,
+ std::string_view format,
+ bufferlist& out)
{
Formatter *f = Formatter::create(format, "json-pretty", "json-pretty");
shared_lock rl(m_objecter->rwlock);
else
cmd.push_back("\"blacklistop\":\"rm\",");
stringstream ss;
- ss << messenger->get_myaddr();
+ // this is somewhat imprecise in that we are blacklisting our first addr only
+ ss << messenger->get_myaddrs().front().get_legacy_str();
cmd.push_back("\"addr\":\"" + ss.str() + "\"");
MMonCommand *m = new MMonCommand(monc->get_fsid());
}
ConnectionRef con = m->get_connection();
- OSDSession *s = static_cast<OSDSession*>(con->get_priv());
+ auto priv = con->get_priv();
+ auto s = static_cast<OSDSession*>(priv.get());
if (!s || s->con != con) {
ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
m->put();
- if (s)
- s->put();
return;
}
<< " not found" << dendl;
m->put();
sl.unlock();
- if (s)
- s->put();
return;
}
<< dendl;
m->put();
sl.unlock();
- if (s)
- s->put();
return;
}
if (c->poutbl) {
sul.unlock();
m->put();
- if (s)
- s->put();
}
void Objecter::submit_command(CommandOp *c, ceph_tid_t *ptid)
int Objecter::_calc_command_target(CommandOp *c, shunique_lock& sul)
{
- assert(sul.owns_lock() && sul.mutex() == &rwlock);
+ ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
c->map_check_error = 0;
OSDSession *s;
int r = _get_session(c->target.osd, &s, sul);
- assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
+ ceph_assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
if (c->session != s) {
put_session(s);
void Objecter::_assign_command_session(CommandOp *c,
shunique_lock& sul)
{
- assert(sul.owns_lock() && sul.mutex() == &rwlock);
+ ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
OSDSession *s;
int r = _get_session(c->target.osd, &s, sul);
- assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
+ ceph_assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
if (c->session != s) {
if (c->session) {
void Objecter::_send_command(CommandOp *c)
{
ldout(cct, 10) << "_send_command " << c->tid << dendl;
- assert(c->session);
- assert(c->session->con);
+ ceph_assert(c->session);
+ ceph_assert(c->session->con);
MCommand *m = new MCommand(monc->monmap.fsid);
m->cmd = c->cmd;
m->set_data(c->inbl);
int Objecter::command_op_cancel(OSDSession *s, ceph_tid_t tid, int r)
{
- assert(initialized);
+ ceph_assert(initialized);
unique_lock wl(rwlock);
if (c->ontimeout && r != -ETIMEDOUT)
timer.cancel_event(c->ontimeout);
- OSDSession *s = c->session;
_session_command_op_remove(c->session, c);
c->put();
{
// Caller is responsible for re-assigning or
// destroying any ops that were assigned to us
- assert(ops.empty());
- assert(linger_ops.empty());
- assert(command_ops.empty());
+ ceph_assert(ops.empty());
+ ceph_assert(linger_ops.empty());
+ ceph_assert(command_ops.empty());
}
Objecter::~Objecter()
{
delete osdmap;
- assert(homeless_session->get_nref() == 1);
- assert(num_homeless_ops == 0);
+ ceph_assert(homeless_session->get_nref() == 1);
+ ceph_assert(num_homeless_ops == 0);
homeless_session->put();
- assert(osd_sessions.empty());
- assert(poolstat_ops.empty());
- assert(statfs_ops.empty());
- assert(pool_ops.empty());
- assert(waiting_for_map.empty());
- assert(linger_ops.empty());
- assert(check_latest_map_lingers.empty());
- assert(check_latest_map_ops.empty());
- assert(check_latest_map_commands.empty());
+ ceph_assert(osd_sessions.empty());
+ ceph_assert(poolstat_ops.empty());
+ ceph_assert(statfs_ops.empty());
+ ceph_assert(pool_ops.empty());
+ ceph_assert(waiting_for_map.empty());
+ ceph_assert(linger_ops.empty());
+ ceph_assert(check_latest_map_lingers.empty());
+ ceph_assert(check_latest_map_ops.empty());
+ ceph_assert(check_latest_map_commands.empty());
- assert(!m_request_state_hook);
- assert(!logger);
+ ceph_assert(!m_request_state_hook);
+ ceph_assert(!logger);
}
/**
const hobject_t end_, const int64_t pool_id_, Context *on_finish_) :
objecter(objecter_), next(next_), result(result_),
end(end_), pool_id(pool_id_), on_finish(on_finish_),
- epoch(0), budget(0)
+ epoch(0), budget(-1)
{}
void finish(int r) override {
hobject_t *next,
Context *on_finish)
{
- assert(result);
+ ceph_assert(result);
if (!end.is_max() && start > end) {
lderr(cct) << __func__ << ": start " << start << " > end " << end << dendl;
}
shared_lock rl(rwlock);
- assert(osdmap->get_epoch());
+ ceph_assert(osdmap->get_epoch());
if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) {
rl.unlock();
lderr(cct) << __func__ << ": SORTBITWISE cluster flag not set" << dendl;
hobject_t *next,
Context *on_finish)
{
- if (budget > 0) {
+ if (budget >= 0) {
put_op_budget_bytes(budget);
}
return;
}
- assert(next != NULL);
+ ceph_assert(next != NULL);
// Decode the results
- bufferlist::iterator iter = bl.begin();
+ auto iter = bl.cbegin();
pg_nls_response_t response;
// XXX extra_info doesn't seem used anywhere?
bufferlist extra_info;
- ::decode(response, iter);
+ decode(response, iter);
if (!iter.end()) {
- ::decode(extra_info, iter);
+ decode(extra_info, iter);
}
ldout(cct, 10) << __func__ << ": got " << response.entries.size()
void do_decode(std::vector<T>& items, std::vector<bufferlist>& bls)
{
for (auto bl : bls) {
- auto p = bl.begin();
+ auto p = bl.cbegin();
T t;
decode(t, p);
items.push_back(t);
private:
void decode() {
scrub_ls_result_t result;
- auto p = bl.begin();
+ auto p = bl.cbegin();
result.decode(p);
*interval = result.interval;
if (objects) {
{
OSDOp& osd_op = op->add_op(CEPH_OSD_OP_SCRUBLS);
op->flags |= CEPH_OSD_FLAG_PGOP;
- assert(interval);
+ ceph_assert(interval);
arg.encode(osd_op.indata);
unsigned p = op->ops.size() - 1;
auto *h = new C_ObjectOperation_scrub_ls{interval, items, rval};