#define dout_prefix *_dout << "kstore(" << store->path << ").collection(" << cid << ") "
KStore::Collection::Collection(KStore *ns, coll_t cid)
- : CollectionImpl(cid),
+ : CollectionImpl(ns->cct, cid),
store(ns),
- lock("KStore::Collection::lock", true, false),
osr(new OpSequencer()),
onode_map(store->cct)
{
const ghobject_t& oid,
bool create)
{
- ceph_assert(create ? lock.is_wlocked() : lock.is_locked());
+ ceph_assert(create ? ceph_mutex_is_wlocked(lock) : ceph_mutex_is_locked(lock));
spg_t pgid;
if (cid.is_pg(&pgid)) {
path_fd(-1),
fsid_fd(-1),
mounted(false),
- coll_lock("KStore::coll_lock"),
nid_last(0),
nid_max(0),
throttle_ops(cct, "kstore_max_ops", cct->_conf->kstore_max_ops),
it->next()) {
coll_t cid;
if (cid.parse(it->key())) {
- CollectionRef c(new Collection(this, cid));
+ auto c = ceph::make_ref<Collection>(this, cid);
bufferlist bl = it->value();
auto p = bl.cbegin();
try {
ObjectStore::CollectionHandle KStore::create_new_collection(const coll_t& cid)
{
- auto *c = new Collection(this, cid);
- RWLock::WLocker l(coll_lock);
+ auto c = ceph::make_ref<Collection>(this, cid);
+ std::unique_lock l{coll_lock};
new_coll_map[cid] = c;
return c;
}
-int KStore::pool_statfs(uint64_t pool_id, struct store_statfs_t *buf)
+int KStore::pool_statfs(uint64_t pool_id, struct store_statfs_t *buf,
+ bool *per_pool_omap)
{
return -ENOTSUP;
}
KStore::CollectionRef KStore::_get_collection(coll_t cid)
{
- RWLock::RLocker l(coll_lock);
+ std::shared_lock l{coll_lock};
ceph::unordered_map<coll_t,CollectionRef>::iterator cp = coll_map.find(cid);
if (cp == coll_map.end())
return CollectionRef();
{
dout(10) << __func__ << " " << ch->cid << " " << oid << dendl;
Collection *c = static_cast<Collection*>(ch.get());
- RWLock::RLocker l(c->lock);
+ std::shared_lock l{c->lock};
OnodeRef o = c->get_onode(oid, false);
if (!o || !o->exists)
return false;
{
dout(10) << __func__ << " " << ch->cid << " " << oid << dendl;
Collection *c = static_cast<Collection*>(ch.get());
- RWLock::RLocker l(c->lock);
+ std::shared_lock l{c->lock};
OnodeRef o = c->get_onode(oid, false);
if (!o || !o->exists)
return -ENOENT;
<< dendl;
bl.clear();
Collection *c = static_cast<Collection*>(ch.get());
- RWLock::RLocker l(c->lock);
+ std::shared_lock l{c->lock};
int r;
if (offset == length && offset == 0)
length = o->onode.size;
- r = _do_read(o, offset, length, bl, op_flags);
+ r = _do_read(o, offset, length, bl, false, op_flags);
out:
dout(10) << __func__ << " " << ch->cid << " " << oid
uint64_t offset,
size_t length,
bufferlist& bl,
+ bool do_cache,
uint32_t op_flags)
{
int r = 0;
stripe_off = offset % stripe_size;
while (length > 0) {
bufferlist stripe;
- _do_read_stripe(o, offset - stripe_off, &stripe);
+ _do_read_stripe(o, offset - stripe_off, &stripe, do_cache);
dout(30) << __func__ << " stripe " << offset - stripe_off << " got "
<< stripe.length() << dendl;
unsigned swant = std::min<unsigned>(stripe_size - stripe_off, length);
CollectionRef c = static_cast<Collection*>(ch.get());
if (!c)
return -ENOENT;
- RWLock::RLocker l(c->lock);
+ std::shared_lock l{c->lock};
OnodeRef o = c->get_onode(oid, false);
if (!o || !o->exists) {
{
dout(15) << __func__ << " " << ch->cid << " " << oid << " " << name << dendl;
Collection *c = static_cast<Collection*>(ch.get());
- RWLock::RLocker l(c->lock);
+ std::shared_lock l{c->lock};
int r;
string k(name);
{
dout(15) << __func__ << " " << ch->cid << " " << oid << dendl;
Collection *c = static_cast<Collection*>(ch.get());
- RWLock::RLocker l(c->lock);
+ std::shared_lock l{c->lock};
int r;
OnodeRef o = c->get_onode(oid, false);
int KStore::list_collections(vector<coll_t>& ls)
{
- RWLock::RLocker l(coll_lock);
+ std::shared_lock l{coll_lock};
for (ceph::unordered_map<coll_t, CollectionRef>::iterator p = coll_map.begin();
p != coll_map.end();
++p)
bool KStore::collection_exists(const coll_t& c)
{
- RWLock::RLocker l(coll_lock);
+ std::shared_lock l{coll_lock};
return coll_map.count(c);
}
{
dout(15) << __func__ << " " << ch->cid << dendl;
Collection *c = static_cast<Collection*>(ch.get());
- RWLock::RLocker l(c->lock);
+ std::shared_lock l{c->lock};
dout(10) << __func__ << " " << ch->cid << " = " << c->cnode.bits << dendl;
return c->cnode.bits;
}
<< " start " << start << " end " << end << " max " << max << dendl;
int r;
{
- RWLock::RLocker l(c->lock);
+ std::shared_lock l{c->lock};
r = _collection_list(c, start, end, max, ls, pnext);
}
CollectionRef c, OnodeRef o, KeyValueDB::Iterator it)
: c(c), o(o), it(it)
{
- RWLock::RLocker l(c->lock);
+ std::shared_lock l{c->lock};
if (o->onode.omap_head) {
get_omap_key(o->onode.omap_head, string(), &head);
get_omap_tail(o->onode.omap_head, &tail);
int KStore::OmapIteratorImpl::seek_to_first()
{
- RWLock::RLocker l(c->lock);
+ std::shared_lock l{c->lock};
if (o->onode.omap_head) {
it->lower_bound(head);
} else {
int KStore::OmapIteratorImpl::upper_bound(const string& after)
{
- RWLock::RLocker l(c->lock);
+ std::shared_lock l{c->lock};
if (o->onode.omap_head) {
string key;
get_omap_key(o->onode.omap_head, after, &key);
int KStore::OmapIteratorImpl::lower_bound(const string& to)
{
- RWLock::RLocker l(c->lock);
+ std::shared_lock l{c->lock};
if (o->onode.omap_head) {
string key;
get_omap_key(o->onode.omap_head, to, &key);
bool KStore::OmapIteratorImpl::valid()
{
- RWLock::RLocker l(c->lock);
+ std::shared_lock l{c->lock};
if (o->onode.omap_head && it->valid() && it->raw_key().second <= tail) {
return true;
} else {
int KStore::OmapIteratorImpl::next()
{
- RWLock::RLocker l(c->lock);
+ std::shared_lock l{c->lock};
if (o->onode.omap_head) {
it->next();
return 0;
string KStore::OmapIteratorImpl::key()
{
- RWLock::RLocker l(c->lock);
+ std::shared_lock l{c->lock};
ceph_assert(it->valid());
string db_key = it->raw_key().second;
string user_key;
bufferlist KStore::OmapIteratorImpl::value()
{
- RWLock::RLocker l(c->lock);
+ std::shared_lock l{c->lock};
ceph_assert(it->valid());
return it->value();
}
{
dout(15) << __func__ << " " << ch->cid << " oid " << oid << dendl;
Collection *c = static_cast<Collection*>(ch.get());
- RWLock::RLocker l(c->lock);
+ std::shared_lock l{c->lock};
int r = 0;
OnodeRef o = c->get_onode(oid, false);
if (!o || !o->exists) {
{
dout(15) << __func__ << " " << ch->cid << " oid " << oid << dendl;
Collection *c = static_cast<Collection*>(ch.get());
- RWLock::RLocker l(c->lock);
+ std::shared_lock l{c->lock};
int r = 0;
OnodeRef o = c->get_onode(oid, false);
if (!o || !o->exists) {
{
dout(15) << __func__ << " " << ch->cid << " oid " << oid << dendl;
Collection *c = static_cast<Collection*>(ch.get());
- RWLock::RLocker l(c->lock);
+ std::shared_lock l{c->lock};
int r = 0;
OnodeRef o = c->get_onode(oid, false);
if (!o || !o->exists) {
{
dout(15) << __func__ << " " << ch->cid << " oid " << oid << dendl;
Collection *c = static_cast<Collection*>(ch.get());
- RWLock::RLocker l(c->lock);
+ std::shared_lock l{c->lock};
int r = 0;
OnodeRef o = c->get_onode(oid, false);
if (!o || !o->exists) {
{
dout(15) << __func__ << " " << ch->cid << " oid " << oid << dendl;
Collection *c = static_cast<Collection*>(ch.get());
- RWLock::RLocker l(c->lock);
+ std::shared_lock l{c->lock};
int r = 0;
OnodeRef o = c->get_onode(oid, false);
if (!o || !o->exists) {
dout(10) << __func__ << " " << ch->cid << " " << oid << dendl;
Collection *c = static_cast<Collection*>(ch.get());
- RWLock::RLocker l(c->lock);
+ std::shared_lock l{c->lock};
OnodeRef o = c->get_onode(oid, false);
if (!o || !o->exists) {
dout(10) << __func__ << " " << oid << "doesn't exist" <<dendl;
}
// object operations
- RWLock::WLocker l(c->lock);
+ std::unique_lock l{c->lock};
OnodeRef &o = ovec[op->oid];
if (!o) {
// these operations implicity create the object
bool create = false;
if (op->op == Transaction::OP_TOUCH ||
+ op->op == Transaction::OP_CREATE ||
op->op == Transaction::OP_WRITE ||
op->op == Transaction::OP_ZERO) {
create = true;
switch (op->op) {
case Transaction::OP_TOUCH:
+ case Transaction::OP_CREATE:
r = _touch(txc, c, o);
break;
}
}
-void KStore::_do_read_stripe(OnodeRef o, uint64_t offset, bufferlist *pbl)
+void KStore::_do_read_stripe(OnodeRef o, uint64_t offset, bufferlist *pbl, bool do_cache)
{
+ if (!do_cache) {
+ string key;
+ get_data_key(o->onode.nid, offset, &key);
+ db->get(PREFIX_DATA, key, pbl);
+ return;
+ }
+
map<uint64_t,bufferlist>::iterator p = o->pending_stripes.find(offset);
if (p == o->pending_stripes.end()) {
string key;
}
uint64_t stripe_off = offset - offset_rem;
bufferlist prev;
- _do_read_stripe(o, stripe_off, &prev);
+ _do_read_stripe(o, stripe_off, &prev, true);
dout(20) << __func__ << " read previous stripe " << stripe_off
<< ", got " << prev.length() << dendl;
bufferlist bl;
while (pos < offset + length) {
if (stripe_off || end - pos < stripe_size) {
bufferlist stripe;
- _do_read_stripe(o, pos - stripe_off, &stripe);
+ _do_read_stripe(o, pos - stripe_off, &stripe, true);
dout(30) << __func__ << " stripe " << pos - stripe_off << " got "
<< stripe.length() << dendl;
bufferlist bl;
while (pos < o->onode.size) {
if (stripe_off) {
bufferlist stripe;
- _do_read_stripe(o, pos - stripe_off, &stripe);
+ _do_read_stripe(o, pos - stripe_off, &stripe, true);
dout(30) << __func__ << " stripe " << pos - stripe_off << " got "
<< stripe.length() << dendl;
bufferlist t;
// data
oldo->flush();
- r = _do_read(oldo, 0, oldo->onode.size, bl, 0);
+ r = _do_read(oldo, 0, oldo->onode.size, bl, true, 0);
if (r < 0)
goto out;
newo->exists = true;
_assign_nid(txc, newo);
- r = _do_read(oldo, srcoff, length, bl, 0);
+ r = _do_read(oldo, srcoff, length, bl, true, 0);
if (r < 0)
goto out;
bufferlist bl;
{
- RWLock::WLocker l(coll_lock);
+ std::unique_lock l{coll_lock};
if (*c) {
r = -EEXIST;
goto out;
int r;
{
- RWLock::WLocker l(coll_lock);
+ std::unique_lock l{coll_lock};
if (!*c) {
r = -ENOENT;
goto out;
dout(15) << __func__ << " " << c->cid << " to " << d->cid << " "
<< " bits " << bits << dendl;
int r;
- RWLock::WLocker l(c->lock);
- RWLock::WLocker l2(d->lock);
+ std::unique_lock l{c->lock};
+ std::unique_lock l2{d->lock};
c->onode_map.clear();
d->onode_map.clear();
c->cnode.bits = bits;
dout(15) << __func__ << " " << (*c)->cid << " to " << d->cid << " "
<< " bits " << bits << dendl;
int r;
- RWLock::WLocker l((*c)->lock);
- RWLock::WLocker l2(d->lock);
+ std::scoped_lock l{(*c)->lock, d->lock};
(*c)->onode_map.clear();
d->onode_map.clear();
d->cnode.bits = bits;