uint32_t *out_flags;
uint32_t *out_data_digest;
uint32_t *out_omap_digest;
- vector<pair<osd_reqid_t, version_t> > *out_reqids;
+ mempool::osd_pglog::vector<pair<osd_reqid_t, version_t> > *out_reqids;
uint64_t *out_truncate_seq;
uint64_t *out_truncate_size;
int *prval;
uint32_t *flags,
uint32_t *dd,
uint32_t *od,
- vector<pair<osd_reqid_t, version_t> > *oreqids,
+ mempool::osd_pglog::vector<pair<osd_reqid_t, version_t> > *oreqids,
uint64_t *otseq,
uint64_t *otsize,
int *r)
uint32_t *out_flags,
uint32_t *out_data_digest,
uint32_t *out_omap_digest,
- vector<pair<osd_reqid_t, version_t> > *out_reqids,
+ mempool::osd_pglog::vector<pair<osd_reqid_t, version_t> > *out_reqids,
uint64_t *truncate_seq,
uint64_t *truncate_size,
int *prval) {
add_op(CEPH_OSD_OP_CACHE_EVICT);
}
+ /*
+ * Extensible tier
+ */
+ void set_redirect(object_t tgt, snapid_t snapid, object_locator_t tgt_oloc,
+ version_t tgt_version) {
+ OSDOp& osd_op = add_op(CEPH_OSD_OP_SET_REDIRECT);
+ osd_op.op.copy_from.snapid = snapid;
+ osd_op.op.copy_from.src_version = tgt_version;
+ ::encode(tgt, osd_op.indata);
+ ::encode(tgt_oloc, osd_op.indata);
+ }
+
void set_alloc_hint(uint64_t expected_object_size,
uint64_t expected_write_size,
uint32_t flags) {
using Dispatcher::cct;
std::multimap<string,string> crush_location;
- atomic_t initialized;
+ std::atomic<bool> initialized{false};
private:
- atomic64_t last_tid;
- atomic_t inflight_ops;
- atomic_t client_inc;
+ std::atomic<uint64_t> last_tid{0};
+ std::atomic<unsigned> inflight_ops{0};
+ std::atomic<int> client_inc{-1};
uint64_t max_linger_id;
- atomic_t num_in_flight;
- atomic_t global_op_flags; // flags which are applied to each IO op
+ std::atomic<unsigned> num_in_flight{0};
+ std::atomic<int> global_op_flags{0}; // flags which are applied to each IO op
bool keep_balanced_budget;
bool honor_osdmap_full;
bool osdmap_full_try;
+ // If this is true, accumulate a set of blacklisted entities
+ // to be drained by consume_blacklist_events.
+ bool blacklist_events_enabled;
+ std::set<entity_addr_t> blacklist_events;
+
public:
void maybe_request_map();
+
+ void enable_blacklist_events();
private:
void _maybe_request_map();
if (target.base_oloc.key == o)
target.base_oloc.key.clear();
- if (parent_trace && parent_trace->valid())
+ if (parent_trace && parent_trace->valid()) {
trace.init("op", nullptr, parent_trace);
+ trace.event("start");
+ }
}
bool operator<(const Op& other) const {
delete out_handler.back();
out_handler.pop_back();
}
+ trace.event("finish");
}
};
map<ceph_tid_t,PoolStatOp*> poolstat_ops;
map<ceph_tid_t,StatfsOp*> statfs_ops;
map<ceph_tid_t,PoolOp*> pool_ops;
- atomic_t num_homeless_ops;
+ std::atomic<unsigned> num_homeless_ops{0};
OSDSession *homeless_session;
double osd_timeout) :
Dispatcher(cct_), messenger(m), monc(mc), finisher(fin),
trace_endpoint("0.0.0.0", 0, "Objecter"),
- osdmap(new OSDMap), initialized(0), last_tid(0), client_inc(-1),
- max_linger_id(0), num_in_flight(0), global_op_flags(0),
+ osdmap(new OSDMap),
+ max_linger_id(0),
keep_balanced_budget(false), honor_osdmap_full(true), osdmap_full_try(false),
+ blacklist_events_enabled(false),
last_seen_osdmap_version(0), last_seen_pgmap_version(0),
logger(NULL), tick_event(0), m_request_state_hook(NULL),
- num_homeless_ops(0),
homeless_session(new OSDSession(cct, -1)),
mon_timeout(ceph::make_timespan(mon_timeout)),
osd_timeout(ceph::make_timespan(osd_timeout)),
void handle_osd_map(class MOSDMap *m);
void wait_for_osd_map();
+ /**
+ * Get list of entities blacklisted since this was last called,
+ * and reset the list.
+ *
+ * Uses a std::set because typical use case is to compare some
+ * other list of clients to see which overlap with the blacklisted
+ * addrs.
+ *
+ */
+ void consume_blacklist_events(std::set<entity_addr_t> *events);
+
int pool_snap_by_name(int64_t poolid,
const char *snap_name,
snapid_t *snap) const;
int pool_snap_list(int64_t poolid, vector<uint64_t> *snaps);
private:
+ void emit_blacklist_events(const OSDMap::Incremental &inc);
+ void emit_blacklist_events(const OSDMap &old_osd_map,
+ const OSDMap &new_osd_map);
+
// low-level
void _op_submit(Op *op, shunique_lock& lc, ceph_tid_t *ptid);
void _op_submit_with_budget(Op *op, shunique_lock& lc,
void op_submit(Op *op, ceph_tid_t *ptid = NULL, int *ctx_budget = NULL);
bool is_active() {
shared_lock l(rwlock);
- return !((!inflight_ops.read()) && linger_ops.empty() &&
+ return !((!inflight_ops) && linger_ops.empty() &&
poolstat_ops.empty() && statfs_ops.empty());
}
void dump_pool_stat_ops(Formatter *fmt) const;
void dump_statfs_ops(Formatter *fmt) const;
- int get_client_incarnation() const { return client_inc.read(); }
- void set_client_incarnation(int inc) { client_inc.set(inc); }
+ int get_client_incarnation() const { return client_inc; }
+ void set_client_incarnation(int inc) { client_inc = inc; }
bool have_map(epoch_t epoch);
/// wait for epoch; true if we already have it
void _get_latest_version(epoch_t oldest, epoch_t neweset, Context *fin);
/** Get the current set of global op flags */
- int get_global_op_flags() { return global_op_flags.read(); }
+ int get_global_op_flags() const { return global_op_flags; }
/** Add a flag to the global op flags, not really atomic operation */
void add_global_op_flags(int flag) {
- global_op_flags.set(global_op_flags.read() | flag);
+ global_op_flags.fetch_or(flag);
}
- /** Clear the passed flags from the global op flag set, not really
- atomic operation */
+ /** Clear the passed flags from the global op flag set */
void clear_global_op_flag(int flags) {
- global_op_flags.set(global_op_flags.read() & ~flags);
+ global_op_flags.fetch_and(~flags);
}
/// cancel an in-progress request with the given return code
Context *oncommit, version_t *objver = NULL,
osd_reqid_t reqid = osd_reqid_t(),
ZTracer::Trace *parent_trace = nullptr) {
- Op *o = new Op(oid, oloc, op.ops, flags | global_op_flags.read() |
+ Op *o = new Op(oid, oloc, op.ops, flags | global_op_flags |
CEPH_OSD_FLAG_WRITE, oncommit, objver, nullptr, parent_trace);
o->priority = op.priority;
o->mtime = mtime;
int *data_offset = NULL,
uint64_t features = 0,
ZTracer::Trace *parent_trace = nullptr) {
- Op *o = new Op(oid, oloc, op.ops, flags | global_op_flags.read() |
+ Op *o = new Op(oid, oloc, op.ops, flags | global_op_flags |
CEPH_OSD_FLAG_READ, onack, objver, data_offset, parent_trace);
o->priority = op.priority;
o->snapid = snapid;
int *ctx_budget) {
Op *o = new Op(object_t(), oloc,
op.ops,
- flags | global_op_flags.read() | CEPH_OSD_FLAG_READ |
+ flags | global_op_flags | CEPH_OSD_FLAG_READ |
CEPH_OSD_FLAG_IGNORE_OVERLAY,
onack, NULL);
o->target.precalc_pgid = true;
int i = init_ops(ops, 1, extra_ops);
ops[i].op.op = CEPH_OSD_OP_STAT;
C_Stat *fin = new C_Stat(psize, pmtime, onfinish);
- Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
+ Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
CEPH_OSD_FLAG_READ, fin, objver);
o->snapid = snap;
o->outbl = &fin->bl;
ops[i].op.extent.truncate_size = 0;
ops[i].op.extent.truncate_seq = 0;
ops[i].op.flags = op_flags;
- Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
+ Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
CEPH_OSD_FLAG_READ, onfinish, objver, nullptr, parent_trace);
o->snapid = snap;
o->outbl = pbl;
ops[i].op.extent.truncate_seq = 0;
ops[i].indata = cmp_bl;
ops[i].op.flags = op_flags;
- Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
+ Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
CEPH_OSD_FLAG_READ, onfinish, objver);
o->snapid = snap;
return o;
ops[i].op.extent.truncate_size = trunc_size;
ops[i].op.extent.truncate_seq = trunc_seq;
ops[i].op.flags = op_flags;
- Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
+ Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
CEPH_OSD_FLAG_READ, onfinish, objver);
o->snapid = snap;
o->outbl = pbl;
ops[i].op.extent.length = len;
ops[i].op.extent.truncate_size = 0;
ops[i].op.extent.truncate_seq = 0;
- Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
+ Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
CEPH_OSD_FLAG_READ, onfinish, objver);
o->snapid = snap;
o->outbl = pbl;
ops[i].op.xattr.value_len = 0;
if (name)
ops[i].indata.append(name);
- Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
+ Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
CEPH_OSD_FLAG_READ, onfinish, objver);
o->snapid = snap;
o->outbl = pbl;
int i = init_ops(ops, 1, extra_ops);
ops[i].op.op = CEPH_OSD_OP_GETXATTRS;
C_GetAttrs *fin = new C_GetAttrs(attrset, onfinish);
- Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
+ Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
CEPH_OSD_FLAG_READ, fin, objver);
o->snapid = snap;
o->outbl = &fin->bl;
snapid_t snap, bufferlist *pbl, int flags,
Context *onfinish, version_t *objver = NULL,
ObjectOperation *extra_ops = NULL) {
- return read(oid, oloc, 0, 0, snap, pbl, flags | global_op_flags.read() |
+ return read(oid, oloc, 0, 0, snap, pbl, flags | global_op_flags |
CEPH_OSD_FLAG_READ, onfinish, objver, extra_ops);
}
const SnapContext& snapc, int flags,
Context *oncommit,
version_t *objver = NULL) {
- Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
+ Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
CEPH_OSD_FLAG_WRITE, oncommit, objver);
o->mtime = mtime;
o->snapc = snapc;
ops[i].op.extent.truncate_seq = 0;
ops[i].indata = bl;
ops[i].op.flags = op_flags;
- Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
+ Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
CEPH_OSD_FLAG_WRITE, oncommit, objver,
nullptr, parent_trace);
o->mtime = mtime;
ops[i].op.extent.truncate_size = 0;
ops[i].op.extent.truncate_seq = 0;
ops[i].indata = bl;
- Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
+ Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
CEPH_OSD_FLAG_WRITE, oncommit, objver);
o->mtime = mtime;
o->snapc = snapc;
ops[i].op.extent.truncate_seq = trunc_seq;
ops[i].indata = bl;
ops[i].op.flags = op_flags;
- Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
+ Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
CEPH_OSD_FLAG_WRITE, oncommit, objver);
o->mtime = mtime;
o->snapc = snapc;
ops[i].op.extent.length = bl.length();
ops[i].indata = bl;
ops[i].op.flags = op_flags;
- Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
+ Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
CEPH_OSD_FLAG_WRITE, oncommit, objver);
o->mtime = mtime;
o->snapc = snapc;
ops[i].op.writesame.data_length = bl.length();
ops[i].indata = bl;
ops[i].op.flags = op_flags;
- Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
+ Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
CEPH_OSD_FLAG_WRITE, oncommit, objver);
o->mtime = mtime;
o->snapc = snapc;
ops[i].op.extent.offset = trunc_size;
ops[i].op.extent.truncate_size = trunc_size;
ops[i].op.extent.truncate_seq = trunc_seq;
- Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
+ Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
CEPH_OSD_FLAG_WRITE, oncommit, objver);
o->mtime = mtime;
o->snapc = snapc;
ops[i].op.op = CEPH_OSD_OP_ZERO;
ops[i].op.extent.offset = off;
ops[i].op.extent.length = len;
- Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
+ Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
CEPH_OSD_FLAG_WRITE, oncommit, objver);
o->mtime = mtime;
o->snapc = snapc;
int i = init_ops(ops, 1, extra_ops);
ops[i].op.op = CEPH_OSD_OP_CREATE;
ops[i].op.flags = create_flags;
- Op *o = new Op(oid, oloc, ops, global_flags | global_op_flags.read() |
+ Op *o = new Op(oid, oloc, ops, global_flags | global_op_flags |
CEPH_OSD_FLAG_WRITE, oncommit, objver);
o->mtime = mtime;
o->snapc = snapc;
vector<OSDOp> ops;
int i = init_ops(ops, 1, extra_ops);
ops[i].op.op = CEPH_OSD_OP_DELETE;
- Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
+ Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
CEPH_OSD_FLAG_WRITE, oncommit, objver);
o->mtime = mtime;
o->snapc = snapc;
if (name)
ops[i].indata.append(name);
ops[i].indata.append(bl);
- Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
+ Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
CEPH_OSD_FLAG_WRITE, oncommit, objver);
o->mtime = mtime;
o->snapc = snapc;
ops[i].op.xattr.value_len = 0;
if (name)
ops[i].indata.append(name);
- Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
+ Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
CEPH_OSD_FLAG_WRITE, oncommit, objver);
o->mtime = mtime;
o->snapc = snapc;