*/
void Objecter::init()
{
- assert(!initialized.read());
+ assert(!initialized);
if (!logger) {
PerfCountersBuilder pcb(cct, "objecter", l_osdc_first, l_osdc_last);
PerfCountersBuilder::PRIO_CRITICAL);
pcb.add_u64(l_osdc_op_laggy, "op_laggy", "Laggy operations");
pcb.add_u64_counter(l_osdc_op_send, "op_send", "Sent operations");
- pcb.add_u64_counter(l_osdc_op_send_bytes, "op_send_bytes", "Sent data");
+ pcb.add_u64_counter(l_osdc_op_send_bytes, "op_send_bytes", "Sent data", NULL, 0, unit_t(BYTES));
pcb.add_u64_counter(l_osdc_op_resend, "op_resend", "Resent operations");
pcb.add_u64_counter(l_osdc_op_reply, "op_reply", "Operation reply");
cct->_conf->add_observer(this);
- initialized.set(1);
+ initialized = true;
}
/*
void Objecter::shutdown()
{
- assert(initialized.read());
+ assert(initialized);
unique_lock wl(rwlock);
- initialized.set(0);
+ initialized = false;
cct->_conf->remove_observer(this);
o->mtime = info->mtime;
o->target = info->target;
- o->tid = last_tid.inc();
+ o->tid = ++last_tid;
// do not resend this; we will send a new op to reregister
o->should_resend = false;
info->on_reg_commit->complete(r);
info->on_reg_commit = NULL;
}
+ if (r < 0 && info->on_notify_finish) {
+ info->on_notify_finish->complete(r);
+ info->on_notify_finish = nullptr;
+ }
// only tell the user the first time we do this
info->registered = true;
o->should_resend = false;
_send_op_account(o);
MOSDOp *m = _prepare_osd_op(o);
- o->tid = last_tid.inc();
+ o->tid = ++last_tid;
_session_op_assign(info->session, o);
_send_op(o, m);
info->ping_tid = o->tid;
void Objecter::handle_watch_notify(MWatchNotify *m)
{
shared_lock l(rwlock);
- if (!initialized.read()) {
+ if (!initialized) {
return;
}
ldout(cct, 10) << __func__ << " " << *m << dendl;
shared_lock l(rwlock);
- assert(initialized.read());
+ assert(initialized);
if (info->canceled) {
l.unlock();
bool Objecter::ms_dispatch(Message *m)
{
ldout(cct, 10) << __func__ << " " << cct << " " << *m << dendl;
- if (!initialized.read())
+ if (!initialized)
return false;
switch (m->get_type()) {
void Objecter::handle_osd_map(MOSDMap *m)
{
shunique_lock sul(rwlock, acquire_unique);
- if (!initialized.read())
+ if (!initialized)
return;
assert(osdmap);
<< dendl;
OSDMap::Incremental inc(m->incremental_maps[e]);
osdmap->apply_incremental(inc);
+
+ emit_blacklist_events(inc);
+
logger->inc(l_osdc_map_inc);
}
else if (m->maps.count(e)) {
ldout(cct, 3) << "handle_osd_map decoding full epoch " << e << dendl;
- osdmap->decode(m->maps[e]);
+ OSDMap *new_osdmap = new OSDMap();
+ new_osdmap->decode(m->maps[e]);
+
+ emit_blacklist_events(*osdmap, *new_osdmap);
+
+ osdmap = new_osdmap;
+
logger->inc(l_osdc_map_full);
}
else {
if (!op->session) {
_calc_target(&op->target, nullptr);
OSDSession *s = NULL;
- int const r = _get_session(op->target.osd, &s, sul);
+ const int r = _get_session(op->target.osd, &s, sul);
assert(r == 0);
assert(s != NULL);
op->session = s;
}
}
+void Objecter::enable_blacklist_events()
+{
+ unique_lock wl(rwlock);
+
+ blacklist_events_enabled = true;
+}
+
+void Objecter::consume_blacklist_events(std::set<entity_addr_t> *events)
+{
+ unique_lock wl(rwlock);
+
+ if (events->empty()) {
+ events->swap(blacklist_events);
+ } else {
+ for (const auto &i : blacklist_events) {
+ events->insert(i);
+ }
+ blacklist_events.clear();
+ }
+}
+
+void Objecter::emit_blacklist_events(const OSDMap::Incremental &inc)
+{
+ if (!blacklist_events_enabled) {
+ return;
+ }
+
+ for (const auto &i : inc.new_blacklist) {
+ blacklist_events.insert(i.first);
+ }
+}
+
+void Objecter::emit_blacklist_events(const OSDMap &old_osd_map,
+ const OSDMap &new_osd_map)
+{
+ if (!blacklist_events_enabled) {
+ return;
+ }
+
+ std::set<entity_addr_t> old_set;
+ std::set<entity_addr_t> new_set;
+
+ old_osd_map.get_blacklist(&old_set);
+ new_osd_map.get_blacklist(&new_set);
+
+ std::set<entity_addr_t> delta_set;
+ std::set_difference(
+ new_set.begin(), new_set.end(), old_set.begin(), old_set.end(),
+ std::inserter(delta_set, delta_set.begin()));
+ blacklist_events.insert(delta_set.begin(), delta_set.end());
+}
+
// op pool check
void Objecter::C_Op_Map_Latest::finish(int r)
}
if (op->map_dne_bound > 0) {
if (osdmap->get_epoch() >= op->map_dne_bound) {
+ LingerOp::unique_lock wl{op->watch_lock};
if (op->on_reg_commit) {
op->on_reg_commit->complete(-ENOENT);
+ op->on_reg_commit = nullptr;
+ }
+ if (op->on_notify_finish) {
+ op->on_notify_finish->complete(-ENOENT);
+ op->on_notify_finish = nullptr;
}
*need_unregister = true;
}
if (c->map_dne_bound == 0)
c->map_dne_bound = latest;
+ OSDSession::unique_lock sul(c->session->lock);
objecter->_check_command_map_dne(c);
+ sul.unlock();
c->put();
}
void Objecter::_check_command_map_dne(CommandOp *c)
{
// rwlock is locked unique
+ // session is locked unique
ldout(cct, 10) << "_check_command_map_dne tid " << c->tid
<< " current " << osdmap->get_epoch()
void Objecter::_send_command_map_check(CommandOp *c)
{
// rwlock is locked unique
+ // session is locked unique
// ask the monitor
if (check_latest_map_commands.count(c->tid) == 0) {
// we are only called by C_Tick
tick_event = 0;
- if (!initialized.read()) {
+ if (!initialized) {
// we raced with shutdown
ldout(cct, 10) << __func__ << " raced with shutdown" << dendl;
return;
if (found)
toping.insert(s);
}
- if (num_homeless_ops.read() || !toping.empty()) {
+ if (num_homeless_ops || !toping.empty()) {
_maybe_request_map();
}
}
// Make sure we don't resechedule if we wake up after shutdown
- if (initialized.read()) {
+ if (initialized) {
tick_event = timer.reschedule_me(ceph::make_timespan(
cct->_conf->objecter_tick_interval));
}
ceph_tid_t tid = 0;
if (!ptid)
ptid = &tid;
+ op->trace.event("op submit");
_op_submit_with_budget(op, rl, ptid, ctx_budget);
}
ceph_tid_t *ptid,
int *ctx_budget)
{
- assert(initialized.read());
+ assert(initialized);
assert(op->ops.size() == op->out_bl.size());
assert(op->ops.size() == op->out_rval.size());
if (osd_timeout > timespan(0)) {
if (op->tid == 0)
- op->tid = last_tid.inc();
+ op->tid = ++last_tid;
auto tid = op->tid;
op->ontimeout = timer.add_event(osd_timeout,
[this, tid]() {
void Objecter::_send_op_account(Op *op)
{
- inflight_ops.inc();
+ inflight_ops++;
// add to gather set(s)
if (op->onfinish) {
- num_in_flight.inc();
+ num_in_flight++;
} else {
ldout(cct, 20) << " note: not requesting reply" << dendl;
}
OSDSession::unique_lock sl(s->lock);
if (op->tid == 0)
- op->tid = last_tid.inc();
+ op->tid = ++last_tid;
ldout(cct, 10) << "_op_submit oid " << op->target.base_oid
<< " '" << op->target.base_oloc << "' '"
sl.unlock();
put_session(s);
- ldout(cct, 5) << num_in_flight.read() << " in flight" << dendl;
+ ldout(cct, 5) << num_in_flight << " in flight" << dendl;
}
int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r)
{
- assert(initialized.read());
+ assert(initialized);
OSDSession::unique_lock sl(s->lock);
<< dendl;
Op *op = p->second;
if (op->onfinish) {
- num_in_flight.dec();
+ num_in_flight--;
op->onfinish->complete(r);
op->onfinish = NULL;
}
return ret;
}
+int Objecter::op_cancel(const vector<ceph_tid_t>& tids, int r)
+{
+ unique_lock wl(rwlock);
+ ldout(cct,10) << __func__ << " " << tids << dendl;
+ for (auto tid : tids) {
+ _op_cancel(tid, r);
+ }
+ return 0;
+}
+
int Objecter::_op_cancel(ceph_tid_t tid, int r)
{
int ret = 0;
osdmap->pg_to_up_acting_osds(pgid, &up, &up_primary,
&acting, &acting_primary);
bool sort_bitwise = osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE);
+ bool recovery_deletes = osdmap->test_flag(CEPH_OSDMAP_RECOVERY_DELETES);
unsigned prev_seed = ceph_stable_mod(pgid.ps(), t->pg_num, t->pg_num_mask);
pg_t prev_pgid(prev_seed, pgid.pool());
if (any_change && PastIntervals::is_new_interval(
pg_num,
t->sort_bitwise,
sort_bitwise,
+ t->recovery_deletes,
+ recovery_deletes,
prev_pgid)) {
force_resend = true;
}
pg_t(ceph_stable_mod(pgid.ps(), t->pg_num, t->pg_num_mask), pgid.pool()),
&t->actual_pgid);
t->sort_bitwise = sort_bitwise;
+ t->recovery_deletes = recovery_deletes;
ldout(cct, 10) << __func__ << " "
<< " raw pgid " << pgid << " -> actual " << t->actual_pgid
<< " acting " << acting
to->ops[op->tid] = op;
if (to->is_homeless()) {
- num_homeless_ops.inc();
+ num_homeless_ops++;
}
ldout(cct, 15) << __func__ << " " << to->osd << " " << op->tid << dendl;
// from->lock is locked
if (from->is_homeless()) {
- num_homeless_ops.dec();
+ num_homeless_ops--;
}
from->ops.erase(op->tid);
assert(op->session == NULL);
if (to->is_homeless()) {
- num_homeless_ops.inc();
+ num_homeless_ops++;
}
get_session(to);
// from->lock is locked unique
if (from->is_homeless()) {
- num_homeless_ops.dec();
+ num_homeless_ops--;
}
from->linger_ops.erase(op->linger_id);
// from->lock is locked
if (from->is_homeless()) {
- num_homeless_ops.dec();
+ num_homeless_ops--;
}
from->command_ops.erase(op->tid);
assert(op->tid);
if (to->is_homeless()) {
- num_homeless_ops.inc();
+ num_homeless_ops++;
}
get_session(to);
assert(!op->should_resend);
if (op->onfinish) {
delete op->onfinish;
- num_in_flight.dec();
+ num_in_flight--;
}
_finish_op(op, 0);
assert(check_latest_map_ops.find(op->tid) == check_latest_map_ops.end());
- inflight_ops.dec();
+ inflight_ops--;
op->put();
}
op->stamp = ceph::mono_clock::now();
hobject_t hobj = op->target.get_hobj();
- MOSDOp *m = new MOSDOp(client_inc.read(), op->tid,
+ MOSDOp *m = new MOSDOp(client_inc, op->tid,
hobj, op->target.actual_pgid,
osdmap->get_epoch(),
flags, op->features);
m->ops = op->ops;
m->set_mtime(op->mtime);
m->set_retry_attempt(op->attempts++);
- m->trace = op->trace;
- if (!m->trace && cct->_conf->osdc_blkin_trace_all)
- m->trace.init("objecter op", &trace_endpoint);
+
+ if (!op->trace.valid() && cct->_conf->osdc_blkin_trace_all) {
+ op->trace.init("op", &trace_endpoint);
+ }
if (op->priority)
m->set_priority(op->priority);
}
logger->inc(l_osdc_op_send);
- logger->inc(l_osdc_op_send_bytes, m->get_data().length());
+ ssize_t sum = 0;
+ for (unsigned i = 0; i < m->ops.size(); i++) {
+ sum += m->ops[i].indata.length();
+ }
+ logger->inc(l_osdc_op_send_bytes, sum);
return m;
}
// op->session->lock is locked
// backoff?
- hobject_t hoid = op->target.get_hobj();
auto p = op->session->backoffs.find(op->target.actual_pgid);
if (p != op->session->backoffs.end()) {
+ hobject_t hoid = op->target.get_hobj();
auto q = p->second.lower_bound(hoid);
if (q != p->second.begin()) {
--q;
m->set_tid(op->tid);
+ if (op->trace.valid()) {
+ m->trace.init("op msg", nullptr, &op->trace);
+ }
op->session->con->send_message(m);
}
put_session(op->session);
op->session = NULL;
- inflight_ops.dec();
+ inflight_ops--;
}
/* This function DOES put the passed message before returning */
ceph_tid_t tid = m->get_tid();
shunique_lock sul(rwlock, ceph::acquire_shared);
- if (!initialized.read()) {
+ if (!initialized) {
m->put();
return;
}
<< " attempt " << m->get_retry_attempt()
<< dendl;
Op *op = iter->second;
+ op->trace.event("osd op reply");
if (retry_writes_after_first_reply && op->attempts == 1 &&
(op->target.flags & CEPH_OSD_FLAG_WRITE)) {
ldout(cct, 7) << "retrying write after first reply: " << tid << dendl;
if (op->onfinish) {
- num_in_flight.dec();
+ num_in_flight--;
}
_session_op_remove(s, op);
sl.unlock();
if (m->is_redirect_reply()) {
ldout(cct, 5) << " got redirect reply; redirecting" << dendl;
if (op->onfinish)
- num_in_flight.dec();
+ num_in_flight--;
_session_op_remove(s, op);
sl.unlock();
put_session(s);
op->tid = 0;
m->get_redirect().combine_with_locator(op->target.target_oloc,
op->target.target_oid.name);
- op->target.flags |= CEPH_OSD_FLAG_REDIRECTED;
+ op->target.flags |= (CEPH_OSD_FLAG_REDIRECTED | CEPH_OSD_FLAG_IGNORE_OVERLAY);
_op_submit(op, sul, NULL);
m->put();
return;
if (rc == -EAGAIN) {
ldout(cct, 7) << " got -EAGAIN, resubmitting" << dendl;
-
- // new tid
- s->ops.erase(op->tid);
- op->tid = last_tid.inc();
-
- _send_op(op);
+ if (op->onfinish)
+ num_in_flight--;
+ _session_op_remove(s, op);
sl.unlock();
put_session(s);
+
+ op->tid = 0;
+ op->target.flags &= ~(CEPH_OSD_FLAG_BALANCE_READS |
+ CEPH_OSD_FLAG_LOCALIZE_READS);
+ op->target.pgid = pg_t();
+ _op_submit(op, sul, NULL);
m->put();
return;
}
// set rval before running handlers so that handlers
// can change it if e.g. decoding fails
if (*pr)
- **pr = ceph_to_host_errno(p->rval);
+ **pr = ceph_to_hostos_errno(p->rval);
if (*ph) {
ldout(cct, 10) << " op " << i << " handler " << *ph << dendl;
- (*ph)->complete(ceph_to_host_errno(p->rval));
+ (*ph)->complete(ceph_to_hostos_errno(p->rval));
*ph = NULL;
}
}
// only ever get back one (type of) ack ever.
if (op->onfinish) {
- num_in_flight.dec();
+ num_in_flight--;
onfinish = op->onfinish;
op->onfinish = NULL;
}
ldout(cct, 15) << "handle_osd_op_reply completed tid " << tid << dendl;
_finish_op(op, 0);
- ldout(cct, 5) << num_in_flight.read() << " in flight" << dendl;
+ ldout(cct, 5) << num_in_flight << " in flight" << dendl;
// serialize completions
if (completion_lock.mutex()) {
{
ldout(cct, 10) << __func__ << " " << *m << dendl;
shunique_lock sul(rwlock, ceph::acquire_shared);
- if (!initialized.read()) {
+ if (!initialized) {
m->put();
return;
}
PoolOp *op = new PoolOp;
if (!op)
return -ENOMEM;
- op->tid = last_tid.inc();
+ op->tid = ++last_tid;
op->pool = pool;
op->name = snap_name;
op->onfinish = onfinish;
ldout(cct, 10) << "allocate_selfmanaged_snap; pool: " << pool << dendl;
PoolOp *op = new PoolOp;
if (!op) return -ENOMEM;
- op->tid = last_tid.inc();
+ op->tid = ++last_tid;
op->pool = pool;
C_SelfmanagedSnap *fin = new C_SelfmanagedSnap(psnapid, onfinish);
op->onfinish = fin;
PoolOp *op = new PoolOp;
if (!op)
return -ENOMEM;
- op->tid = last_tid.inc();
+ op->tid = ++last_tid;
op->pool = pool;
op->name = snap_name;
op->onfinish = onfinish;
<< snap << dendl;
PoolOp *op = new PoolOp;
if (!op) return -ENOMEM;
- op->tid = last_tid.inc();
+ op->tid = ++last_tid;
op->pool = pool;
op->onfinish = onfinish;
op->pool_op = POOL_OP_DELETE_UNMANAGED_SNAP;
PoolOp *op = new PoolOp;
if (!op)
return -ENOMEM;
- op->tid = last_tid.inc();
+ op->tid = ++last_tid;
op->pool = 0;
op->name = name;
op->onfinish = onfinish;
void Objecter::_do_delete_pool(int64_t pool, Context *onfinish)
{
PoolOp *op = new PoolOp;
- op->tid = last_tid.inc();
+ op->tid = ++last_tid;
op->pool = pool;
op->name = "delete";
op->onfinish = onfinish;
ldout(cct, 10) << "change_pool_auid " << pool << " to " << auid << dendl;
PoolOp *op = new PoolOp;
if (!op) return -ENOMEM;
- op->tid = last_tid.inc();
+ op->tid = ++last_tid;
op->pool = pool;
op->name = "change_pool_auid";
op->onfinish = onfinish;
{
FUNCTRACE();
shunique_lock sul(rwlock, acquire_shared);
- if (!initialized.read()) {
+ if (!initialized) {
sul.unlock();
m->put();
return;
int Objecter::pool_op_cancel(ceph_tid_t tid, int r)
{
- assert(initialized.read());
+ assert(initialized);
unique_lock wl(rwlock);
ldout(cct, 10) << "get_pool_stats " << pools << dendl;
PoolStatOp *op = new PoolStatOp;
- op->tid = last_tid.inc();
+ op->tid = ++last_tid;
op->pools = pools;
op->pool_stats = result;
op->onfinish = onfinish;
ceph_tid_t tid = m->get_tid();
unique_lock wl(rwlock);
- if (!initialized.read()) {
+ if (!initialized) {
m->put();
return;
}
int Objecter::pool_stat_op_cancel(ceph_tid_t tid, int r)
{
- assert(initialized.read());
+ assert(initialized);
unique_lock wl(rwlock);
delete op;
}
-void Objecter::get_fs_stats(ceph_statfs& result, Context *onfinish)
+void Objecter::get_fs_stats(ceph_statfs& result,
+ boost::optional<int64_t> data_pool,
+ Context *onfinish)
{
ldout(cct, 10) << "get_fs_stats" << dendl;
unique_lock l(rwlock);
StatfsOp *op = new StatfsOp;
- op->tid = last_tid.inc();
+ op->tid = ++last_tid;
op->stats = &result;
+ op->data_pool = data_pool;
op->onfinish = onfinish;
if (mon_timeout > timespan(0)) {
op->ontimeout = timer.add_event(mon_timeout,
ldout(cct, 10) << "fs_stats_submit" << op->tid << dendl;
monc->send_mon_message(new MStatfs(monc->get_fsid(), op->tid,
+ op->data_pool,
last_seen_pgmap_version));
op->last_submit = ceph::mono_clock::now();
void Objecter::handle_fs_stats_reply(MStatfsReply *m)
{
unique_lock wl(rwlock);
- if (!initialized.read()) {
+ if (!initialized) {
m->put();
return;
}
int Objecter::statfs_op_cancel(ceph_tid_t tid, int r)
{
- assert(initialized.read());
+ assert(initialized);
unique_lock wl(rwlock);
void Objecter::ms_handle_connect(Connection *con)
{
ldout(cct, 10) << "ms_handle_connect " << con << dendl;
- if (!initialized.read())
+ if (!initialized)
return;
if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON)
bool Objecter::ms_handle_reset(Connection *con)
{
- if (!initialized.read())
+ if (!initialized)
return false;
if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
OSDSession *session = static_cast<OSDSession*>(con->get_priv());
ldout(cct, 1) << "ms_handle_reset " << con << " session " << session
<< " osd." << session->osd << dendl;
unique_lock wl(rwlock);
- if (!initialized.read()) {
+ if (!initialized) {
wl.unlock();
return false;
}
AuthAuthorizer **authorizer,
bool force_new)
{
- if (!initialized.read())
+ if (!initialized)
return false;
if (dest_type == CEPH_ENTITY_TYPE_MON)
return true;
void Objecter::_dump_active()
{
- ldout(cct, 20) << "dump_active .. " << num_homeless_ops.read() << " homeless"
+ ldout(cct, 20) << "dump_active .. " << num_homeless_ops << " homeless"
<< dendl;
for (map<int, OSDSession *>::iterator siter = osd_sessions.begin();
siter != osd_sessions.end(); ++siter) {
void Objecter::handle_command_reply(MCommandReply *m)
{
unique_lock wl(rwlock);
- if (!initialized.read()) {
+ if (!initialized) {
m->put();
return;
}
sl.unlock();
-
+ OSDSession::unique_lock sul(s->lock);
_finish_command(c, m->r, m->rs);
+ sul.unlock();
+
m->put();
if (s)
s->put();
{
shunique_lock sul(rwlock, ceph::acquire_unique);
- ceph_tid_t tid = last_tid.inc();
+ ceph_tid_t tid = ++last_tid;
ldout(cct, 10) << "_submit_command " << tid << " " << c->cmd << dendl;
c->tid = tid;
int Objecter::command_op_cancel(OSDSession *s, ceph_tid_t tid, int r)
{
- assert(initialized.read());
+ assert(initialized);
unique_lock wl(rwlock);
CommandOp *op = it->second;
_command_cancel_map_check(op);
+ OSDSession::unique_lock sl(op->session->lock);
_finish_command(op, r, "");
+ sl.unlock();
return 0;
}
void Objecter::_finish_command(CommandOp *c, int r, string rs)
{
// rwlock is locked unique
+ // session lock is locked
ldout(cct, 10) << "_finish_command " << c->tid << " = " << r << " "
<< rs << dendl;
timer.cancel_event(c->ontimeout);
OSDSession *s = c->session;
- OSDSession::unique_lock sl(s->lock);
_session_command_op_remove(c->session, c);
- sl.unlock();
c->put();
delete osdmap;
assert(homeless_session->get_nref() == 1);
- assert(num_homeless_ops.read() == 0);
+ assert(num_homeless_ops == 0);
homeless_session->put();
assert(osd_sessions.empty());