#include "include/lru.h"
#include "include/Context.h"
#include "include/xlist.h"
+#include "include/common_fwd.h"
#include "common/Cond.h"
#include "common/Finisher.h"
#include "Objecter.h"
#include "Striper.h"
-class CephContext;
class WritebackHandler;
-class PerfCounters;
enum {
l_objectcacher_first = 25000,
// read scatter/gather
struct OSDRead {
- vector<ObjectExtent> extents;
+ std::vector<ObjectExtent> extents;
snapid_t snap;
- bufferlist *bl;
+ ceph::buffer::list *bl;
int fadvise_flags;
- OSDRead(snapid_t s, bufferlist *b, int f)
+ OSDRead(snapid_t s, ceph::buffer::list *b, int f)
: snap(s), bl(b), fadvise_flags(f) {}
};
- OSDRead *prepare_read(snapid_t snap, bufferlist *b, int f) const {
+ OSDRead *prepare_read(snapid_t snap, ceph::buffer::list *b, int f) const {
return new OSDRead(snap, b, f);
}
// write scatter/gather
struct OSDWrite {
- vector<ObjectExtent> extents;
+ std::vector<ObjectExtent> extents;
SnapContext snapc;
- bufferlist bl;
+ ceph::buffer::list bl;
ceph::real_time mtime;
int fadvise_flags;
ceph_tid_t journal_tid;
- OSDWrite(const SnapContext& sc, const bufferlist& b, ceph::real_time mt,
+ OSDWrite(const SnapContext& sc, const ceph::buffer::list& b, ceph::real_time mt,
int f, ceph_tid_t _journal_tid)
: snapc(sc), bl(b), mtime(mt), fadvise_flags(f),
journal_tid(_journal_tid) {}
};
OSDWrite *prepare_write(const SnapContext& sc,
- const bufferlist &b,
+ const ceph::buffer::list &b,
ceph::real_time mt,
int f,
ceph_tid_t journal_tid) const {
public:
Object *ob;
- bufferlist bl;
+ ceph::buffer::list bl;
ceph_tid_t last_write_tid; // version of bh (if non-zero)
ceph_tid_t last_read_tid; // tid of last read op (if any)
ceph::real_time last_write;
ceph_tid_t journal_tid;
int error; // holds return value for failed reads
- map<loff_t, list<Context*> > waitfor_read;
+ std::map<loff_t, std::list<Context*> > waitfor_read;
// cons
explicit BufferHead(Object *o) :
}
int get_state() const { return state; }
+ inline int get_error() const {
+ return error;
+ }
+ inline void set_error(int _error) {
+ error = _error;
+ }
+
inline ceph_tid_t get_journal_tid() const {
return journal_tid;
}
// reference counting
int get() {
- assert(ref >= 0);
+ ceph_assert(ref >= 0);
if (ref == 0) lru_pin();
return ++ref;
}
int put() {
- assert(ref > 0);
+ ceph_assert(ref > 0);
if (ref == 1) lru_unpin();
--ref;
return ref;
bool complete;
bool exists;
- map<loff_t, BufferHead*> data;
+ std::map<loff_t, BufferHead*> data;
ceph_tid_t last_write_tid; // version of bh (if non-zero)
- ceph_tid_t last_commit_tid; // last update commited.
+ ceph_tid_t last_commit_tid; // last update committed.
int dirty_or_tx;
- map< ceph_tid_t, list<Context*> > waitfor_commit;
+ std::map< ceph_tid_t, std::list<Context*> > waitfor_commit;
xlist<C_ReadFinish*> reads;
Object(const Object&) = delete;
}
~Object() {
reads.clear();
- assert(ref == 0);
- assert(data.empty());
- assert(dirty_or_tx == 0);
+ ceph_assert(ref == 0);
+ ceph_assert(data.empty());
+ ceph_assert(dirty_or_tx == 0);
set_item.remove_myself();
}
object_t get_oid() { return oid.oid; }
snapid_t get_snap() { return oid.snap; }
ObjectSet *get_object_set() const { return oset; }
- string get_namespace() { return oloc.nspace; }
+ std::string get_namespace() { return oloc.nspace; }
uint64_t get_object_number() const { return object_no; }
const object_locator_t& get_oloc() const { return oloc; }
bool can_close() const {
if (lru_is_expireable()) {
- assert(data.empty());
- assert(waitfor_commit.empty());
+ ceph_assert(data.empty());
+ ceph_assert(waitfor_commit.empty());
return true;
}
return false;
* @param offset object byte offset
* @return iterator pointing to buffer, or data.end()
*/
- map<loff_t,BufferHead*>::const_iterator data_lower_bound(loff_t offset) const {
- map<loff_t,BufferHead*>::const_iterator p = data.lower_bound(offset);
+ std::map<loff_t,BufferHead*>::const_iterator data_lower_bound(loff_t offset) const {
+ auto p = data.lower_bound(offset);
if (p != data.begin() &&
(p == data.end() || p->first > offset)) {
--p; // might overlap!
void add_bh(BufferHead *bh) {
if (data.empty())
get();
- assert(data.count(bh->start()) == 0);
+ ceph_assert(data.count(bh->start()) == 0);
data[bh->start()] = bh;
}
void remove_bh(BufferHead *bh) {
- assert(data.count(bh->start()));
+ ceph_assert(data.count(bh->start()));
data.erase(bh->start());
if (data.empty())
put();
// mid-level
BufferHead *split(BufferHead *bh, loff_t off);
void merge_left(BufferHead *left, BufferHead *right);
+ bool can_merge_bh(BufferHead *left, BufferHead *right);
void try_merge_bh(BufferHead *bh);
+ void maybe_rebuild_buffer(BufferHead *bh);
bool is_cached(loff_t off, loff_t len) const;
bool include_all_cached_data(loff_t off, loff_t len);
int map_read(ObjectExtent &ex,
- map<loff_t, BufferHead*>& hits,
- map<loff_t, BufferHead*>& missing,
- map<loff_t, BufferHead*>& rx,
- map<loff_t, BufferHead*>& errors);
+ std::map<loff_t, BufferHead*>& hits,
+ std::map<loff_t, BufferHead*>& missing,
+ std::map<loff_t, BufferHead*>& rx,
+ std::map<loff_t, BufferHead*>& errors);
BufferHead *map_write(ObjectExtent &ex, ceph_tid_t tid);
void replace_journal_tid(BufferHead *bh, ceph_tid_t tid);
void truncate(loff_t s);
- void discard(loff_t off, loff_t len);
+ void discard(loff_t off, loff_t len, C_GatherBuilder* commit_gather);
// reference counting
int get() {
- assert(ref >= 0);
+ ceph_assert(ref >= 0);
if (ref == 0) lru_pin();
return ++ref;
}
int put() {
- assert(ref > 0);
+ ceph_assert(ref > 0);
if (ref == 1) lru_unpin();
--ref;
return ref;
WritebackHandler& writeback_handler;
bool scattered_write;
- string name;
- Mutex& lock;
+ std::string name;
+ ceph::mutex& lock;
uint64_t max_dirty, target_dirty, max_size, max_objects;
ceph::timespan max_dirty_age;
void *flush_set_callback_arg;
// indexed by pool_id
- vector<ceph::unordered_map<sobject_t, Object*> > objects;
+ std::vector<ceph::unordered_map<sobject_t, Object*> > objects;
- list<Context*> waitfor_read;
+ std::list<Context*> waitfor_read;
ceph_tid_t last_read_tid;
- set<BufferHead*, BufferHead::ptr_lt> dirty_or_tx_bh;
+ std::set<BufferHead*, BufferHead::ptr_lt> dirty_or_tx_bh;
LRU bh_lru_dirty, bh_lru_rest;
LRU ob_lru;
- Cond flusher_cond;
+ ceph::condition_variable flusher_cond;
bool flusher_stop;
void flusher_entry();
class FlusherThread : public Thread {
void close_object(Object *ob);
// bh stats
- Cond stat_cond;
+ ceph::condition_variable stat_cond;
loff_t stat_clean;
loff_t stat_zero;
loff_t stat_error;
loff_t stat_dirty_waiting; // bytes that writers are waiting on to write
+ size_t stat_nr_dirty_waiters;
+
void verify_stats() const;
void bh_stat_add(BufferHead *bh);
loff_t get_stat_tx() const { return stat_tx; }
loff_t get_stat_rx() const { return stat_rx; }
loff_t get_stat_dirty() const { return stat_dirty; }
- loff_t get_stat_dirty_waiting() const { return stat_dirty_waiting; }
loff_t get_stat_clean() const { return stat_clean; }
loff_t get_stat_zero() const { return stat_zero; }
+ loff_t get_stat_dirty_waiting() const { return stat_dirty_waiting; }
+ size_t get_stat_nr_dirty_waiters() const { return stat_nr_dirty_waiters; }
void touch_bh(BufferHead *bh) {
if (bh->is_dirty())
void bh_read(BufferHead *bh, int op_flags,
const ZTracer::Trace &parent_trace);
void bh_write(BufferHead *bh, const ZTracer::Trace &parent_trace);
- void bh_write_scattered(list<BufferHead*>& blist);
+ void bh_write_scattered(std::list<BufferHead*>& blist);
void bh_write_adjacencies(BufferHead *bh, ceph::real_time cutoff,
int64_t *amount, int *max_count);
void purge(Object *o);
int64_t reads_outstanding;
- Cond read_cond;
+ ceph::condition_variable read_cond;
int _readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
bool external_call, ZTracer::Trace *trace);
public:
void bh_read_finish(int64_t poolid, sobject_t oid, ceph_tid_t tid,
loff_t offset, uint64_t length,
- bufferlist &bl, int r,
+ ceph::buffer::list &bl, int r,
bool trust_enoent);
void bh_write_commit(int64_t poolid, sobject_t oid,
- vector<pair<loff_t, uint64_t> >& ranges,
+ std::vector<std::pair<loff_t, uint64_t> >& ranges,
ceph_tid_t t, int r);
class C_WriteCommit;
- ObjectCacher(CephContext *cct_, string name, WritebackHandler& wb, Mutex& l,
+ ObjectCacher(CephContext *cct_, std::string name, WritebackHandler& wb, ceph::mutex& l,
flush_set_callback_t flush_callback,
void *flush_callback_arg,
uint64_t max_bytes, uint64_t max_objects,
flusher_thread.create("flusher");
}
void stop() {
- assert(flusher_thread.is_started());
- lock.Lock(); // hmm.. watch out for deadlock!
+ ceph_assert(flusher_thread.is_started());
+ lock.lock(); // hmm.. watch out for deadlock!
flusher_stop = true;
- flusher_cond.Signal();
- lock.Unlock();
+ flusher_cond.notify_all();
+ lock.unlock();
flusher_thread.join();
}
ZTracer::Trace *parent_trace = nullptr);
int writex(OSDWrite *wr, ObjectSet *oset, Context *onfreespace,
ZTracer::Trace *parent_trace = nullptr);
- bool is_cached(ObjectSet *oset, vector<ObjectExtent>& extents,
+ bool is_cached(ObjectSet *oset, std::vector<ObjectExtent>& extents,
snapid_t snapid);
private:
// write blocking
int _wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset,
ZTracer::Trace *trace, Context *onfreespace);
- void maybe_wait_for_writeback(uint64_t len, ZTracer::Trace *trace);
+ void _maybe_wait_for_writeback(uint64_t len, ZTracer::Trace *trace);
bool _flush_set_finish(C_GatherBuilder *gather, Context *onfinish);
+ void _discard(ObjectSet *oset, const std::vector<ObjectExtent>& exls,
+ C_GatherBuilder* gather);
+ void _discard_finish(ObjectSet *oset, bool was_dirty, Context* on_finish);
+
public:
bool set_is_empty(ObjectSet *oset);
bool set_is_cached(ObjectSet *oset);
bool set_is_dirty_or_committing(ObjectSet *oset);
bool flush_set(ObjectSet *oset, Context *onfinish=0);
- bool flush_set(ObjectSet *oset, vector<ObjectExtent>& ex,
+ bool flush_set(ObjectSet *oset, std::vector<ObjectExtent>& ex,
ZTracer::Trace *trace, Context *onfinish = 0);
bool flush_all(Context *onfinish = 0);
loff_t release_set(ObjectSet *oset);
uint64_t release_all();
- void discard_set(ObjectSet *oset, const vector<ObjectExtent>& ex);
+ void discard_set(ObjectSet *oset, const std::vector<ObjectExtent>& ex);
+ void discard_writeback(ObjectSet *oset, const std::vector<ObjectExtent>& ex,
+ Context* on_finish);
/**
* Retry any in-flight reads that get -ENOENT instead of marking
max_size = v;
}
void set_max_dirty_age(double a) {
- max_dirty_age = make_timespan(a);
+ max_dirty_age = ceph::make_timespan(a);
}
void set_max_objects(int64_t v) {
max_objects = v;
/*** async+caching (non-blocking) file interface ***/
int file_is_cached(ObjectSet *oset, file_layout_t *layout,
snapid_t snapid, loff_t offset, uint64_t len) {
- vector<ObjectExtent> extents;
+ std::vector<ObjectExtent> extents;
Striper::file_to_extents(cct, oset->ino, layout, offset, len,
oset->truncate_size, extents);
return is_cached(oset, extents, snapid);
}
int file_read(ObjectSet *oset, file_layout_t *layout, snapid_t snapid,
- loff_t offset, uint64_t len, bufferlist *bl, int flags,
+ loff_t offset, uint64_t len, ceph::buffer::list *bl, int flags,
Context *onfinish) {
OSDRead *rd = prepare_read(snapid, bl, flags);
Striper::file_to_extents(cct, oset->ino, layout, offset, len,
int file_write(ObjectSet *oset, file_layout_t *layout,
const SnapContext& snapc, loff_t offset, uint64_t len,
- bufferlist& bl, ceph::real_time mtime, int flags) {
+ ceph::buffer::list& bl, ceph::real_time mtime, int flags) {
OSDWrite *wr = prepare_write(snapc, bl, mtime, flags, 0);
Striper::file_to_extents(cct, oset->ino, layout, offset, len,
oset->truncate_size, wr->extents);
- return writex(wr, oset, NULL);
+ return writex(wr, oset, nullptr);
}
bool file_flush(ObjectSet *oset, file_layout_t *layout,
const SnapContext& snapc, loff_t offset, uint64_t len,
Context *onfinish) {
- vector<ObjectExtent> extents;
+ std::vector<ObjectExtent> extents;
Striper::file_to_extents(cct, oset->ino, layout, offset, len,
oset->truncate_size, extents);
ZTracer::Trace trace;
};
-inline ostream& operator<<(ostream &out, const ObjectCacher::BufferHead &bh)
+inline std::ostream& operator<<(std::ostream &out,
+ const ObjectCacher::BufferHead &bh)
{
out << "bh[ " << &bh << " "
<< bh.start() << "~" << bh.length()
if (bh.error) out << " error=" << bh.error;
out << "]";
out << " waiters = {";
- for (map<loff_t, list<Context*> >::const_iterator it
- = bh.waitfor_read.begin();
- it != bh.waitfor_read.end(); ++it) {
+ for (auto it = bh.waitfor_read.begin(); it != bh.waitfor_read.end(); ++it) {
out << " " << it->first << "->[";
- for (list<Context*>::const_iterator lit = it->second.begin();
+ for (auto lit = it->second.begin();
lit != it->second.end(); ++lit) {
out << *lit << ", ";
}
return out;
}
-inline ostream& operator<<(ostream &out, const ObjectCacher::ObjectSet &os)
+inline std::ostream& operator<<(std::ostream &out,
+ const ObjectCacher::ObjectSet &os)
{
return out << "objectset[" << os.ino
<< " ts " << os.truncate_seq << "/" << os.truncate_size
<< "]";
}
-inline ostream& operator<<(ostream &out, const ObjectCacher::Object &ob)
+inline std::ostream& operator<<(std::ostream &out,
+ const ObjectCacher::Object &ob)
{
out << "object["
- << ob.get_soid() << " oset " << ob.oset << dec
+ << ob.get_soid() << " oset " << ob.oset << std::dec
<< " wr " << ob.last_write_tid << "/" << ob.last_commit_tid;
if (ob.complete)