#include <boost/thread/shared_mutex.hpp>
-#include "include/assert.h"
+#include "include/ceph_assert.h"
#include "include/buffer.h"
#include "include/types.h"
#include "include/rados/rados_types.hpp"
#include "common/admin_socket.h"
#include "common/ceph_time.h"
#include "common/ceph_timer.h"
-#include "common/Finisher.h"
+#include "common/config_obs.h"
#include "common/shunique_lock.h"
#include "common/zipkin_trace.h"
+#include "common/Finisher.h"
+#include "common/Throttle.h"
#include "messages/MOSDOp.h"
+#include "msg/Dispatcher.h"
#include "osd/OSDMap.h"
-using namespace std;
class Context;
class Messenger;
}
void set_last_op_flags(int flags) {
- assert(!ops.empty());
+ ceph_assert(!ops.empty());
ops.rbegin()->op.flags = flags;
}
osd_op.op.xattr.name_len = (name ? strlen(name) : 0);
osd_op.op.xattr.value_len = data.length();
if (name)
- osd_op.indata.append(name);
+ osd_op.indata.append(name, osd_op.op.xattr.name_len);
osd_op.indata.append(data);
}
void add_xattr_cmp(int op, const char *name, uint8_t cmp_op,
osd_op.op.xattr.cmp_op = cmp_op;
osd_op.op.xattr.cmp_mode = cmp_mode;
if (name)
- osd_op.indata.append(name);
+ osd_op.indata.append(name, osd_op.op.xattr.name_len);
osd_op.indata.append(data);
}
void add_call(int op, const char *cname, const char *method,
OSDOp& osd_op = add_op(op);
osd_op.op.pgls.count = count;
osd_op.op.pgls.start_epoch = start_epoch;
- ::encode(cookie, osd_op.indata);
+ encode(cookie, osd_op.indata);
}
void add_pgls_filter(int op, uint64_t count, const bufferlist& filter,
collection_list_handle_t cookie, epoch_t start_epoch) {
osd_op.op.pgls.start_epoch = start_epoch;
string cname = "pg";
string mname = "filter";
- ::encode(cname, osd_op.indata);
- ::encode(mname, osd_op.indata);
+ encode(cname, osd_op.indata);
+ encode(mname, osd_op.indata);
osd_op.indata.append(filter);
- ::encode(cookie, osd_op.indata);
+ encode(cookie, osd_op.indata);
}
void add_alloc_hint(int op, uint64_t expected_object_size,
uint64_t expected_write_size,
: psize(ps), pmtime(pm), ptime(pt), pts(_pts), prval(prval) {}
void finish(int r) override {
if (r >= 0) {
- bufferlist::iterator p = bl.begin();
+ auto p = bl.cbegin();
try {
uint64_t size;
ceph::real_time mtime;
- ::decode(size, p);
- ::decode(mtime, p);
+ decode(size, p);
+ decode(mtime, p);
if (psize)
*psize = size;
if (pmtime)
// object cmpext
struct C_ObjectOperation_cmpext : public Context {
int *prval;
- C_ObjectOperation_cmpext(int *prval)
+ explicit C_ObjectOperation_cmpext(int *prval)
: prval(prval) {}
void finish(int r) {
int *prval)
: data_bl(data_bl), extents(extents), prval(prval) {}
void finish(int r) override {
- bufferlist::iterator iter = bl.begin();
+ auto iter = bl.cbegin();
if (r >= 0) {
// NOTE: it's possible the sub-op has not been executed but the result
// code remains zeroed. Avoid the costly exception handling on a
// potential IO path.
if (bl.length() > 0) {
try {
- ::decode(*extents, iter);
- ::decode(*data_bl, iter);
+ decode(*extents, iter);
+ decode(*data_bl, iter);
} catch (buffer::error& e) {
if (prval)
*prval = -EIO;
}
void finish(int r) override {
if (r >= 0) {
- bufferlist::iterator p = bl.begin();
+ auto p = bl.cbegin();
try {
if (pattrs)
- ::decode(*pattrs, p);
+ decode(*pattrs, p);
if (ptruncated) {
std::map<std::string,bufferlist> ignore;
if (!pattrs) {
- ::decode(ignore, p);
+ decode(ignore, p);
pattrs = &ignore;
}
if (!p.end()) {
- ::decode(*ptruncated, p);
+ decode(*ptruncated, p);
} else {
// the OSD did not provide this. since old OSDs do not
// enfoce omap result limits either, we can infer it from
}
void finish(int r) override {
if (r >= 0) {
- bufferlist::iterator p = bl.begin();
+ auto p = bl.cbegin();
try {
if (pattrs)
- ::decode(*pattrs, p);
+ decode(*pattrs, p);
if (ptruncated) {
std::set<std::string> ignore;
if (!pattrs) {
- ::decode(ignore, p);
+ decode(ignore, p);
pattrs = &ignore;
}
if (!p.end()) {
- ::decode(*ptruncated, p);
+ decode(*ptruncated, p);
} else {
// the OSD did not provide this. since old OSDs do not
- // enfoce omap result limits either, we can infer it from
+ // enforce omap result limits either, we can infer it from
// the size of the result
*ptruncated = (pattrs->size() == max_entries);
}
: pwatchers(pw), prval(pr) {}
void finish(int r) override {
if (r >= 0) {
- bufferlist::iterator p = bl.begin();
+ auto p = bl.cbegin();
try {
obj_list_watch_response_t resp;
- ::decode(resp, p);
+ decode(resp, p);
if (pwatchers) {
for (list<watch_item_t>::iterator i = resp.entries.begin() ;
i != resp.entries.end() ; ++i) {
obj_watch_t ow;
- ostringstream sa;
- sa << i->addr;
- strncpy(ow.addr, sa.str().c_str(), 256);
+ string sa = i->addr.get_legacy_str();
+ strncpy(ow.addr, sa.c_str(), 256);
ow.watcher_id = i->name.num();
ow.cookie = i->cookie;
ow.timeout_seconds = i->timeout_seconds;
: psnaps(ps), prval(pr) {}
void finish(int r) override {
if (r >= 0) {
- bufferlist::iterator p = bl.begin();
+ auto p = bl.cbegin();
try {
obj_list_snap_response_t resp;
- ::decode(resp, p);
+ decode(resp, p);
if (psnaps) {
psnaps->clones.clear();
for (vector<clone_info>::iterator ci = resp.clones.begin();
}
void setxattrs(map<string, bufferlist>& attrs) {
bufferlist bl;
- ::encode(attrs, bl);
+ encode(attrs, bl);
add_xattr(CEPH_OSD_OP_RESETXATTRS, 0, bl.length());
}
void resetxattrs(const char *prefix, map<string, bufferlist>& attrs) {
bufferlist bl;
- ::encode(attrs, bl);
+ encode(attrs, bl);
add_xattr(CEPH_OSD_OP_RESETXATTRS, prefix, bl);
}
void tmap_update(bufferlist& bl) {
add_data(CEPH_OSD_OP_TMAPUP, 0, 0, bl);
}
- void tmap_put(bufferlist& bl) {
- add_data(CEPH_OSD_OP_TMAPPUT, 0, bl.length(), bl);
- }
- void tmap_get(bufferlist *pbl, int *prval) {
- add_op(CEPH_OSD_OP_TMAPGET);
- unsigned p = ops.size() - 1;
- out_bl[p] = pbl;
- out_rval[p] = prval;
- }
- void tmap_get() {
- add_op(CEPH_OSD_OP_TMAPGET);
- }
- void tmap_to_omap(bool nullok=false) {
- OSDOp& osd_op = add_op(CEPH_OSD_OP_TMAP2OMAP);
- if (nullok)
- osd_op.op.tmap2omap.flags = CEPH_OSD_TMAP2OMAP_NULLOK;
- }
// objectmap
void omap_get_keys(const string &start_after,
int *prval) {
OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETKEYS);
bufferlist bl;
- ::encode(start_after, bl);
- ::encode(max_to_get, bl);
+ encode(start_after, bl);
+ encode(max_to_get, bl);
op.op.extent.offset = 0;
op.op.extent.length = bl.length();
op.indata.claim_append(bl);
int *prval) {
OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETVALS);
bufferlist bl;
- ::encode(start_after, bl);
- ::encode(max_to_get, bl);
- ::encode(filter_prefix, bl);
+ encode(start_after, bl);
+ encode(max_to_get, bl);
+ encode(filter_prefix, bl);
op.op.extent.offset = 0;
op.op.extent.length = bl.length();
op.indata.claim_append(bl);
int *prval) {
OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETVALSBYKEYS);
bufferlist bl;
- ::encode(to_get, bl);
+ encode(to_get, bl);
op.op.extent.offset = 0;
op.op.extent.length = bl.length();
op.indata.claim_append(bl);
int *prval) {
OSDOp &op = add_op(CEPH_OSD_OP_OMAP_CMP);
bufferlist bl;
- ::encode(assertions, bl);
+ encode(assertions, bl);
op.op.extent.offset = 0;
op.op.extent.length = bl.length();
op.indata.claim_append(bl);
uint32_t *out_data_digest;
uint32_t *out_omap_digest;
mempool::osd_pglog::vector<pair<osd_reqid_t, version_t> > *out_reqids;
+ mempool::osd_pglog::map<uint32_t, int> *out_reqid_return_codes;
uint64_t *out_truncate_seq;
uint64_t *out_truncate_size;
int *prval;
uint32_t *dd,
uint32_t *od,
mempool::osd_pglog::vector<pair<osd_reqid_t, version_t> > *oreqids,
+ mempool::osd_pglog::map<uint32_t, int> *oreqid_return_codes,
uint64_t *otseq,
uint64_t *otsize,
int *r)
out_omap_data(o), out_snaps(osnaps), out_snap_seq(osnap_seq),
out_flags(flags), out_data_digest(dd), out_omap_digest(od),
out_reqids(oreqids),
+ out_reqid_return_codes(oreqid_return_codes),
out_truncate_seq(otseq),
out_truncate_size(otsize),
prval(r) {}
if (r < 0 && r != -ENOENT)
return;
try {
- bufferlist::iterator p = bl.begin();
+ auto p = bl.cbegin();
object_copy_data_t copy_reply;
- ::decode(copy_reply, p);
+ decode(copy_reply, p);
if (r == -ENOENT) {
if (out_reqids)
*out_reqids = copy_reply.reqids;
*out_omap_digest = copy_reply.omap_digest;
if (out_reqids)
*out_reqids = copy_reply.reqids;
+ if (out_reqid_return_codes)
+ *out_reqid_return_codes = copy_reply.reqid_return_codes;
if (out_truncate_seq)
*out_truncate_seq = copy_reply.truncate_seq;
if (out_truncate_size)
uint32_t *out_data_digest,
uint32_t *out_omap_digest,
mempool::osd_pglog::vector<pair<osd_reqid_t, version_t> > *out_reqids,
+ mempool::osd_pglog::map<uint32_t, int> *out_reqid_return_codes,
uint64_t *truncate_seq,
uint64_t *truncate_size,
int *prval) {
OSDOp& osd_op = add_op(CEPH_OSD_OP_COPY_GET);
osd_op.op.copy_get.max = max;
- ::encode(*cursor, osd_op.indata);
- ::encode(max, osd_op.indata);
+ encode(*cursor, osd_op.indata);
+ encode(max, osd_op.indata);
unsigned p = ops.size() - 1;
out_rval[p] = prval;
C_ObjectOperation_copyget *h =
out_attrs, out_data, out_omap_header,
out_omap_data, out_snaps, out_snap_seq,
out_flags, out_data_digest,
- out_omap_digest, out_reqids, truncate_seq,
+ out_omap_digest, out_reqids,
+ out_reqid_return_codes, truncate_seq,
truncate_size, prval);
out_bl[p] = &h->bl;
out_handler[p] = h;
if (r < 0)
return;
try {
- bufferlist::iterator p = bl.begin();
+ auto p = bl.cbegin();
bool isdirty;
- ::decode(isdirty, p);
+ decode(isdirty, p);
if (pisdirty)
*pisdirty = isdirty;
} catch (buffer::error& e) {
if (r < 0)
return;
try {
- bufferlist::iterator p = bl.begin();
+ auto p = bl.cbegin();
std::list< std::pair<ceph::real_time, ceph::real_time> > ls;
- ::decode(ls, p);
+ decode(ls, p);
if (ptls) {
ptls->clear();
for (auto p = ls.begin(); p != ls.end(); ++p)
void omap_set(const map<string, bufferlist> &map) {
bufferlist bl;
- ::encode(map, bl);
+ encode(map, bl);
add_data(CEPH_OSD_OP_OMAPSETVALS, 0, bl.length(), bl);
}
void omap_rm_keys(const std::set<std::string> &to_remove) {
bufferlist bl;
- ::encode(to_remove, bl);
+ encode(to_remove, bl);
add_data(CEPH_OSD_OP_OMAPRMKEYS, 0, bl.length(), bl);
}
bufferlist &bl, bufferlist *inbl) {
OSDOp& osd_op = add_op(CEPH_OSD_OP_NOTIFY);
osd_op.op.notify.cookie = cookie;
- ::encode(prot_ver, *inbl);
- ::encode(timeout, *inbl);
- ::encode(bl, *inbl);
+ encode(prot_ver, *inbl);
+ encode(timeout, *inbl);
+ encode(bl, *inbl);
osd_op.indata.append(*inbl);
}
bufferlist& reply_bl) {
OSDOp& osd_op = add_op(CEPH_OSD_OP_NOTIFY_ACK);
bufferlist bl;
- ::encode(notify_id, bl);
- ::encode(cookie, bl);
- ::encode(reply_bl, bl);
+ encode(notify_id, bl);
+ encode(cookie, bl);
+ encode(reply_bl, bl);
osd_op.indata.append(bl);
}
osd_op.op.copy_from.src_version = src_version;
osd_op.op.copy_from.flags = flags;
osd_op.op.copy_from.src_fadvise_flags = src_fadvise_flags;
- ::encode(src, osd_op.indata);
- ::encode(src_oloc, osd_op.indata);
+ encode(src, osd_op.indata);
+ encode(src_oloc, osd_op.indata);
}
/**
* Extensible tier
*/
void set_redirect(object_t tgt, snapid_t snapid, object_locator_t tgt_oloc,
- version_t tgt_version) {
+ version_t tgt_version, int flag) {
OSDOp& osd_op = add_op(CEPH_OSD_OP_SET_REDIRECT);
osd_op.op.copy_from.snapid = snapid;
osd_op.op.copy_from.src_version = tgt_version;
- ::encode(tgt, osd_op.indata);
- ::encode(tgt_oloc, osd_op.indata);
+ encode(tgt, osd_op.indata);
+ encode(tgt_oloc, osd_op.indata);
+ set_last_op_flags(flag);
+ }
+
+ void set_chunk(uint64_t src_offset, uint64_t src_length, object_locator_t tgt_oloc,
+ object_t tgt_oid, uint64_t tgt_offset, int flag) {
+ OSDOp& osd_op = add_op(CEPH_OSD_OP_SET_CHUNK);
+ encode(src_offset, osd_op.indata);
+ encode(src_length, osd_op.indata);
+ encode(tgt_oloc, osd_op.indata);
+ encode(tgt_oid, osd_op.indata);
+ encode(tgt_offset, osd_op.indata);
+ set_last_op_flags(flag);
+ }
+
+ void tier_promote() {
+ add_op(CEPH_OSD_OP_TIER_PROMOTE);
+ }
+
+ void unset_manifest() {
+ add_op(CEPH_OSD_OP_UNSET_MANIFEST);
}
void set_alloc_hint(uint64_t expected_object_size,
public:
// config observer bits
const char** get_tracked_conf_keys() const override;
- void handle_conf_change(const struct md_config_t *conf,
+ void handle_conf_change(const ConfigProxy& conf,
const std::set <std::string> &changed) override;
public:
version_t last_seen_osdmap_version;
version_t last_seen_pgmap_version;
- mutable boost::shared_mutex rwlock;
- using lock_guard = std::unique_lock<decltype(rwlock)>;
+ mutable std::shared_mutex rwlock;
+ using lock_guard = std::lock_guard<decltype(rwlock)>;
using unique_lock = std::unique_lock<decltype(rwlock)>;
using shared_lock = boost::shared_lock<decltype(rwlock)>;
using shunique_lock = ceph::shunique_lock<decltype(rwlock)>;
- ceph::timer<ceph::mono_clock> timer;
+ ceph::timer<ceph::coarse_mono_clock> timer;
PerfCounters *logger;
public:
/*** track pending operations ***/
// read
- public:
struct OSDSession;
spg_t actual_pgid; ///< last (actual) spg_t we mapped to
unsigned pg_num = 0; ///< last pg_num we mapped to
unsigned pg_num_mask = 0; ///< last pg_num_mask we mapped to
+ unsigned pg_num_pending = 0; ///< last pg_num we mapped to
vector<int> up; ///< set of up osds for last pg we mapped to
vector<int> acting; ///< set of acting osds for last pg we mapped to
int up_primary = -1; ///< last up_primary we mapped to
base_oloc(oloc)
{}
- op_target_t(pg_t pgid)
+ explicit op_target_t(pg_t pgid)
: base_oloc(pgid.pool(), pgid.ps()),
precalc_pgid(true),
base_pgid(pgid)
version_t *objver;
epoch_t *reply_epoch;
- ceph::mono_time stamp;
+ ceph::coarse_mono_time stamp;
epoch_t map_dne_bound;
- bool budgeted;
+ int budget;
/// true if we should resend this message on failure
bool should_resend;
objver(ov),
reply_epoch(NULL),
map_dne_bound(0),
- budgeted(false),
+ budget(-1),
should_resend(true),
ctx_budgeted(false),
data_offset(offset) {
psize(ps), pmtime(pm), fin(c) {}
void finish(int r) override {
if (r >= 0) {
- bufferlist::iterator p = bl.begin();
+ auto p = bl.cbegin();
uint64_t s;
ceph::real_time m;
- ::decode(s, p);
- ::decode(m, p);
+ decode(s, p);
+ decode(m, p);
if (psize)
*psize = s;
if (pmtime)
fin(c) {}
void finish(int r) override {
if (r >= 0) {
- bufferlist::iterator p = bl.begin();
- ::decode(attrset, p);
+ auto p = bl.cbegin();
+ decode(attrset, p);
}
fin->complete(r);
}
Context *onfinish;
uint64_t ontimeout;
- ceph::mono_time last_submit;
+ ceph::coarse_mono_time last_submit;
};
struct StatfsOp {
Context *onfinish;
uint64_t ontimeout;
- ceph::mono_time last_submit;
+ ceph::coarse_mono_time last_submit;
};
struct PoolOp {
Context *onfinish;
uint64_t ontimeout;
int pool_op;
- uint64_t auid;
int16_t crush_rule;
snapid_t snapid;
bufferlist *blp;
- ceph::mono_time last_submit;
+ ceph::coarse_mono_time last_submit;
PoolOp() : tid(0), pool(0), onfinish(NULL), ontimeout(0), pool_op(0),
- auid(0), crush_rule(0), snapid(0), blp(NULL) {}
+ crush_rule(0), snapid(0), blp(NULL) {}
};
// -- osd commands --
Context *onfinish = nullptr;
uint64_t ontimeout = 0;
- ceph::mono_time last_submit;
+ ceph::coarse_mono_time last_submit;
CommandOp(
int target_osd,
version_t *pobjver;
bool is_watch;
- ceph::mono_time watch_valid_thru; ///< send time for last acked ping
+ ceph::coarse_mono_time watch_valid_thru; ///< send time for last acked ping
int last_error; ///< error from last failed ping|reconnect, if any
- boost::shared_mutex watch_lock;
+ std::shared_mutex watch_lock;
using lock_guard = std::unique_lock<decltype(watch_lock)>;
using unique_lock = std::unique_lock<decltype(watch_lock)>;
using shared_lock = boost::shared_lock<decltype(watch_lock)>;
// queue of pending async operations, with the timestamp of
// when they were queued.
- list<ceph::mono_time> watch_pending_async;
+ list<ceph::coarse_mono_time> watch_pending_async;
uint32_t register_gen;
bool registered;
OSDSession *session;
+ Objecter *objecter;
+ int ctx_budget;
ceph_tid_t register_tid;
ceph_tid_t ping_tid;
epoch_t map_dne_bound;
void _queued_async() {
// watch_lock ust be locked unique
- watch_pending_async.push_back(ceph::mono_clock::now());
+ watch_pending_async.push_back(ceph::coarse_mono_clock::now());
}
void finished_async() {
unique_lock l(watch_lock);
- assert(!watch_pending_async.empty());
+ ceph_assert(!watch_pending_async.empty());
watch_pending_async.pop_front();
}
- LingerOp() : linger_id(0),
- target(object_t(), object_locator_t(), 0),
- snap(CEPH_NOSNAP), poutbl(NULL), pobjver(NULL),
- is_watch(false), last_error(0),
- register_gen(0),
- registered(false),
- canceled(false),
- on_reg_commit(NULL),
- on_notify_finish(NULL),
- notify_result_bl(NULL),
- notify_id(0),
- watch_context(NULL),
- session(NULL),
- register_tid(0),
- ping_tid(0),
- map_dne_bound(0) {}
-
- // no copy!
- const LingerOp &operator=(const LingerOp& r);
- LingerOp(const LingerOp& o);
+ explicit LingerOp(Objecter *o) : linger_id(0),
+ target(object_t(), object_locator_t(), 0),
+ snap(CEPH_NOSNAP), poutbl(NULL), pobjver(NULL),
+ is_watch(false), last_error(0),
+ register_gen(0),
+ registered(false),
+ canceled(false),
+ on_reg_commit(NULL),
+ on_notify_finish(NULL),
+ notify_result_bl(NULL),
+ notify_id(0),
+ watch_context(NULL),
+ session(NULL),
+ objecter(o),
+ ctx_budget(-1),
+ register_tid(0),
+ ping_tid(0),
+ map_dne_bound(0) {}
+
+ const LingerOp &operator=(const LingerOp& r) = delete;
+ LingerOp(const LingerOp& o) = delete;
uint64_t get_cookie() {
return reinterpret_cast<uint64_t>(this);
struct C_Linger_Ping : public Context {
Objecter *objecter;
LingerOp *info;
- ceph::mono_time sent;
+ ceph::coarse_mono_time sent;
uint32_t register_gen;
C_Linger_Ping(Objecter *o, LingerOp *l)
: objecter(o), info(l), register_gen(info->register_gen) {
};
struct OSDSession : public RefCountedObject {
- boost::shared_mutex lock;
+ std::shared_mutex lock;
using lock_guard = std::lock_guard<decltype(lock)>;
using unique_lock = std::unique_lock<decltype(lock)>;
using shared_lock = boost::shared_lock<decltype(lock)>;
ceph::timespan osd_timeout;
MOSDOp *_prepare_osd_op(Op *op);
- void _send_op(Op *op, MOSDOp *m = NULL);
+ void _send_op(Op *op);
void _send_op_account(Op *op);
void _cancel_linger_op(Op *op);
- void finish_op(OSDSession *session, ceph_tid_t tid);
void _finish_op(Op *op, int r);
static bool is_pg_changed(
int oldprimary,
};
bool _osdmap_full_flag() const;
bool _osdmap_has_pool_full() const;
+ void _prune_snapc(
+ const mempool::osdmap::map<int64_t, OSDMap::snap_interval_set_t>& new_removed_snaps,
+ Op *op);
bool target_should_be_paused(op_target_t *op);
int _calc_target(op_target_t *t, Connection *con,
void _linger_commit(LingerOp *info, int r, bufferlist& outbl);
void _linger_reconnect(LingerOp *info, int r);
void _send_linger_ping(LingerOp *info);
- void _linger_ping(LingerOp *info, int r, ceph::mono_time sent,
+ void _linger_ping(LingerOp *info, int r, ceph::coarse_mono_time sent,
uint32_t register_gen);
int _normalize_watch_error(int r);
void _send_command_map_check(CommandOp *op);
void _command_cancel_map_check(CommandOp *op);
- void kick_requests(OSDSession *session);
void _kick_requests(OSDSession *session, map<uint64_t, LingerOp *>& lresend);
void _linger_ops_resend(map<uint64_t, LingerOp *>& lresend, unique_lock& ul);
* and returned whenever an op is removed from the map
* If throttle_op needs to throttle it will unlock client_lock.
*/
- int calc_op_budget(Op *op);
+ int calc_op_budget(const vector<OSDOp>& ops);
void _throttle_op(Op *op, shunique_lock& sul, int op_size = 0);
int _take_op_budget(Op *op, shunique_lock& sul) {
- assert(sul && sul.mutex() == &rwlock);
- int op_budget = calc_op_budget(op);
+ ceph_assert(sul && sul.mutex() == &rwlock);
+ int op_budget = calc_op_budget(op->ops);
if (keep_balanced_budget) {
_throttle_op(op, sul, op_budget);
- } else {
+ } else { // update take_linger_budget to match this!
op_throttle_bytes.take(op_budget);
op_throttle_ops.take(1);
}
- op->budgeted = true;
+ op->budget = op_budget;
return op_budget;
}
+ int take_linger_budget(LingerOp *info);
+ friend class WatchContext; // to invoke put_up_budget_bytes
void put_op_budget_bytes(int op_budget) {
- assert(op_budget >= 0);
+ ceph_assert(op_budget >= 0);
op_throttle_bytes.put(op_budget);
op_throttle_ops.put(1);
}
- void put_op_budget(Op *op) {
- assert(op->budgeted);
- int op_budget = calc_op_budget(op);
- put_op_budget_bytes(op_budget);
- }
void put_nlist_context_budget(NListContext *list_context);
Throttle op_throttle_bytes, op_throttle_ops;
void set_osdmap_full_try() { osdmap_full_try = true; }
void unset_osdmap_full_try() { osdmap_full_try = false; }
- void _scan_requests(OSDSession *s,
- bool force_resend,
- bool cluster_full,
- map<int64_t, bool> *pool_full_map,
- map<ceph_tid_t, Op*>& need_resend,
- list<LingerOp*>& need_resend_linger,
- map<ceph_tid_t, CommandOp*>& need_resend_command,
- shunique_lock& sul);
+ void _scan_requests(
+ OSDSession *s,
+ bool skipped_map,
+ bool cluster_full,
+ map<int64_t, bool> *pool_full_map,
+ map<ceph_tid_t, Op*>& need_resend,
+ list<LingerOp*>& need_resend_linger,
+ map<ceph_tid_t, CommandOp*>& need_resend_command,
+ shunique_lock& sul,
+ const mempool::osdmap::map<int64_t,OSDMap::snap_interval_set_t> *gap_removed_snaps);
int64_t get_object_hash_position(int64_t pool, const string& key,
const string& ns);
void _op_submit_with_budget(Op *op, shunique_lock& lc,
ceph_tid_t *ptid,
int *ctx_budget = NULL);
- inline void unregister_op(Op *op);
-
// public interface
public:
void op_submit(Op *op, ceph_tid_t *ptid = NULL, int *ctx_budget = NULL);
void _wait_for_new_map(Context *c, epoch_t epoch, int err=0);
void wait_for_latest_osdmap(Context *fin);
void get_latest_version(epoch_t oldest, epoch_t neweset, Context *fin);
- void _get_latest_version(epoch_t oldest, epoch_t neweset, Context *fin);
/** Get the current set of global op flags */
int get_global_op_flags() const { return global_op_flags; }
void osd_command(int osd, const std::vector<string>& cmd,
const bufferlist& inbl, ceph_tid_t *ptid,
bufferlist *poutbl, string *prs, Context *onfinish) {
- assert(osd >= 0);
+ ceph_assert(osd >= 0);
CommandOp *c = new CommandOp(
osd,
cmd,
ops[i].op.xattr.name_len = (name ? strlen(name) : 0);
ops[i].op.xattr.value_len = 0;
if (name)
- ops[i].indata.append(name);
+ ops[i].indata.append(name, ops[i].op.xattr.name_len);
Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
CEPH_OSD_FLAG_READ, onfinish, objver);
o->snapid = snap;
ops[i].op.xattr.name_len = (name ? strlen(name) : 0);
ops[i].op.xattr.value_len = bl.length();
if (name)
- ops[i].indata.append(name);
+ ops[i].indata.append(name, ops[i].op.xattr.name_len);
ops[i].indata.append(bl);
Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
CEPH_OSD_FLAG_WRITE, oncommit, objver);
ops[i].op.xattr.name_len = (name ? strlen(name) : 0);
ops[i].op.xattr.value_len = 0;
if (name)
- ops[i].indata.append(name);
+ ops[i].indata.append(name, ops[i].op.xattr.name_len);
Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
CEPH_OSD_FLAG_WRITE, oncommit, objver);
o->mtime = mtime;
int delete_pool_snap(int64_t pool, string& snapName, Context *onfinish);
int delete_selfmanaged_snap(int64_t pool, snapid_t snap, Context *onfinish);
- int create_pool(string& name, Context *onfinish, uint64_t auid=0,
+ int create_pool(string& name, Context *onfinish,
int crush_rule=-1);
int delete_pool(int64_t pool, Context *onfinish);
int delete_pool(const string& name, Context *onfinish);
- int change_pool_auid(int64_t pool, Context *onfinish, uint64_t auid);
void handle_pool_op_reply(MPoolOpReply *m);
int pool_op_cancel(ceph_tid_t tid, int r);
bit != p->buffer_extents.end();
++bit)
bl.copy(bit->first, bit->second, cur);
- assert(cur.length() == p->length);
+ ceph_assert(cur.length() == p->length);
write_trunc(p->oid, p->oloc, p->offset, p->length,
snapc, cur, mtime, flags, p->truncate_size, trunc_seq,
oncommit ? gcom.new_sub():0,
void ms_handle_remote_reset(Connection *con) override;
bool ms_handle_refused(Connection *con) override;
bool ms_get_authorizer(int dest_type,
- AuthAuthorizer **authorizer,
- bool force_new) override;
+ AuthAuthorizer **authorizer) override;
void blacklist_self(bool set);