* Foundation. See file COPYING.
*
*/
-
-
#ifndef CEPH_MDLOG_H
#define CEPH_MDLOG_H
+#include "include/common_fwd.h"
+
enum {
l_mdl_first = 5000,
l_mdl_evadd,
#include "include/Context.h"
#include "MDSContext.h"
-#include "common/Thread.h"
#include "common/Cond.h"
+#include "common/Finisher.h"
+#include "common/Thread.h"
#include "LogSegment.h"
#include <list>
+#include <map>
class Journaler;
class JournalPointer;
class LogSegment;
class ESubtreeMap;
-class PerfCounters;
-
-#include <map>
-using std::map;
-
-#include "common/Finisher.h"
-
-
class MDLog {
public:
- MDSRank *mds;
-protected:
- int num_events; // in events
-
- int unflushed;
-
- bool capped;
-
- // Log position which is persistent *and* for which
- // submit_entry wait_for_safe callbacks have already
- // been called.
- uint64_t safe_pos;
-
- inodeno_t ino;
- Journaler *journaler;
-
- PerfCounters *logger;
-
-
- // -- replay --
- class ReplayThread : public Thread {
- MDLog *log;
- public:
- explicit ReplayThread(MDLog *l) : log(l) {}
- void* entry() override {
- log->_replay_thread();
- return 0;
- }
- } replay_thread;
- bool already_replayed;
-
- friend class ReplayThread;
- friend class C_MDL_Replay;
-
- MDSContext::vec waitfor_replay;
-
- void _replay(); // old way
- void _replay_thread(); // new way
-
- // Journal recovery/rewrite logic
- class RecoveryThread : public Thread {
- MDLog *log;
- MDSContext *completion;
- public:
- void set_completion(MDSContext *c) {completion = c;}
- explicit RecoveryThread(MDLog *l) : log(l), completion(NULL) {}
- void* entry() override {
- log->_recovery_thread(completion);
- return 0;
- }
- } recovery_thread;
- void _recovery_thread(MDSContext *completion);
- void _reformat_journal(JournalPointer const &jp, Journaler *old_journal, MDSContext *completion);
-
- // -- segments --
- map<uint64_t,LogSegment*> segments;
- set<LogSegment*> expiring_segments;
- set<LogSegment*> expired_segments;
- std::size_t pre_segments_size = 0; // the num of segments when the mds finished replay-journal, to calc the num of segments growing
- uint64_t event_seq;
- int expiring_events;
- int expired_events;
-
- struct PendingEvent {
- LogEvent *le;
- MDSContext *fin;
- bool flush;
- PendingEvent(LogEvent *e, MDSContext *c, bool f=false) : le(e), fin(c), flush(f) {}
- };
-
- int64_t mdsmap_up_features;
- map<uint64_t,list<PendingEvent> > pending_events; // log segment -> event list
- Mutex submit_mutex;
- Cond submit_cond;
-
- void set_safe_pos(uint64_t pos)
- {
- std::lock_guard l(submit_mutex);
- ceph_assert(pos >= safe_pos);
- safe_pos = pos;
- }
- friend class MDSLogContextBase;
-
- void _submit_thread();
- class SubmitThread : public Thread {
- MDLog *log;
- public:
- explicit SubmitThread(MDLog *l) : log(l) {}
- void* entry() override {
- log->_submit_thread();
- return 0;
- }
- } submit_thread;
- friend class SubmitThread;
+ explicit MDLog(MDSRank *m) : mds(m),
+ replay_thread(this),
+ recovery_thread(this),
+ submit_thread(this) {}
+ ~MDLog();
-public:
const std::set<LogSegment*> &get_expiring_segments() const
{
return expiring_segments;
}
-protected:
- // -- subtreemaps --
- friend class ESubtreeMap;
- friend class MDCache;
-
- uint64_t get_last_segment_seq() const {
- ceph_assert(!segments.empty());
- return segments.rbegin()->first;
- }
- LogSegment *get_oldest_segment() {
- return segments.begin()->second;
- }
- void remove_oldest_segment() {
- map<uint64_t, LogSegment*>::iterator p = segments.begin();
- delete p->second;
- segments.erase(p);
- }
-
-public:
void create_logger();
-
- // replay state
- map<inodeno_t, set<inodeno_t> > pending_exports;
-
void set_write_iohint(unsigned iohint_flags);
-public:
- explicit MDLog(MDSRank *m) : mds(m),
- num_events(0),
- unflushed(0),
- capped(false),
- safe_pos(0),
- journaler(0),
- logger(0),
- replay_thread(this),
- already_replayed(false),
- recovery_thread(this),
- event_seq(0), expiring_events(0), expired_events(0),
- mdsmap_up_features(0),
- submit_mutex("MDLog::submit_mutex"),
- submit_thread(this),
- cur_event(NULL) { }
- ~MDLog();
-
-
-private:
- // -- segments --
- void _start_new_segment();
- void _prepare_new_segment();
- void _journal_segment_subtree_map(MDSContext *onsync);
-public:
void start_new_segment() {
std::lock_guard l(submit_mutex);
_start_new_segment();
_prepare_new_segment();
}
void journal_segment_subtree_map(MDSContext *onsync=NULL) {
- submit_mutex.Lock();
- _journal_segment_subtree_map(onsync);
- submit_mutex.Unlock();
+ {
+ std::lock_guard l{submit_mutex};
+ _journal_segment_subtree_map(onsync);
+ }
if (onsync)
flush();
}
return segments.rbegin()->second;
}
- LogSegment *get_segment(log_segment_seq_t seq) {
+ LogSegment *get_segment(LogSegment::seq_t seq) {
if (segments.count(seq))
return segments[seq];
return NULL;
void kick_submitter();
void shutdown();
- // -- events --
-private:
- LogEvent *cur_event;
-public:
void _start_entry(LogEvent *e);
void start_entry(LogEvent *e) {
std::lock_guard l(submit_mutex);
void submit_entry(LogEvent *e, MDSLogContextBase *c = 0) {
std::lock_guard l(submit_mutex);
_submit_entry(e, c);
- submit_cond.Signal();
+ submit_cond.notify_all();
}
void start_submit_entry(LogEvent *e, MDSLogContextBase *c = 0) {
std::lock_guard l(submit_mutex);
_start_entry(e);
_submit_entry(e, c);
- submit_cond.Signal();
+ submit_cond.notify_all();
}
bool entry_is_open() const { return cur_event != NULL; }
return unflushed == 0;
}
-private:
- void try_expire(LogSegment *ls, int op_prio);
- void _maybe_expired(LogSegment *ls, int op_prio);
- void _expired(LogSegment *ls);
- void _trim_expired_segments();
-
- friend class C_MaybeExpiredSegment;
- friend class C_MDL_Flushed;
- friend class C_OFT_Committed;
-
-public:
void trim_expired_segments();
void trim(int max=-1);
int trim_all();
return expiring_segments.empty() && expired_segments.empty();
};
-private:
- void write_head(MDSContext *onfinish);
-
-public:
void create(MDSContext *onfinish); // fresh, empty log!
void open(MDSContext *onopen); // append() or replay() to follow!
void reopen(MDSContext *onopen);
void standby_trim_segments();
void dump_replay_status(Formatter *f) const;
-};
+ MDSRank *mds;
+ // replay state
+ std::map<inodeno_t, set<inodeno_t>> pending_exports;
+
+protected:
+ struct PendingEvent {
+ PendingEvent(LogEvent *e, MDSContext *c, bool f=false) : le(e), fin(c), flush(f) {}
+ LogEvent *le;
+ MDSContext *fin;
+ bool flush;
+ };
+
+ // -- replay --
+ class ReplayThread : public Thread {
+ public:
+ explicit ReplayThread(MDLog *l) : log(l) {}
+ void* entry() override {
+ log->_replay_thread();
+ return 0;
+ }
+ private:
+ MDLog *log;
+ } replay_thread;
+
+ // Journal recovery/rewrite logic
+ class RecoveryThread : public Thread {
+ public:
+ explicit RecoveryThread(MDLog *l) : log(l) {}
+ void set_completion(MDSContext *c) {completion = c;}
+ void* entry() override {
+ log->_recovery_thread(completion);
+ return 0;
+ }
+ private:
+ MDLog *log;
+ MDSContext *completion = nullptr;
+ } recovery_thread;
+
+ class SubmitThread : public Thread {
+ public:
+ explicit SubmitThread(MDLog *l) : log(l) {}
+ void* entry() override {
+ log->_submit_thread();
+ return 0;
+ }
+ private:
+ MDLog *log;
+ } submit_thread;
+
+ friend class ReplayThread;
+ friend class C_MDL_Replay;
+ friend class MDSLogContextBase;
+ friend class SubmitThread;
+ // -- subtreemaps --
+ friend class ESubtreeMap;
+ friend class MDCache;
+
+ void _replay(); // old way
+ void _replay_thread(); // new way
+
+ void _recovery_thread(MDSContext *completion);
+ void _reformat_journal(JournalPointer const &jp, Journaler *old_journal, MDSContext *completion);
+
+ void set_safe_pos(uint64_t pos)
+ {
+ std::lock_guard l(submit_mutex);
+ ceph_assert(pos >= safe_pos);
+ safe_pos = pos;
+ }
+
+ void _submit_thread();
+
+ uint64_t get_last_segment_seq() const {
+ ceph_assert(!segments.empty());
+ return segments.rbegin()->first;
+ }
+ LogSegment *get_oldest_segment() {
+ return segments.begin()->second;
+ }
+ void remove_oldest_segment() {
+ std::map<uint64_t, LogSegment*>::iterator p = segments.begin();
+ delete p->second;
+ segments.erase(p);
+ }
+
+ int num_events = 0; // in events
+ int unflushed = 0;
+ bool capped = false;
+
+ // Log position which is persistent *and* for which
+ // submit_entry wait_for_safe callbacks have already
+ // been called.
+ uint64_t safe_pos = 0;
+
+ inodeno_t ino;
+ Journaler *journaler = nullptr;
+
+ PerfCounters *logger = nullptr;
+
+ bool already_replayed = false;
+
+ MDSContext::vec waitfor_replay;
+
+ // -- segments --
+ std::map<uint64_t,LogSegment*> segments;
+ set<LogSegment*> expiring_segments;
+ set<LogSegment*> expired_segments;
+ std::size_t pre_segments_size = 0; // the num of segments when the mds finished replay-journal, to calc the num of segments growing
+ uint64_t event_seq = 0;
+ int expiring_events = 0;
+ int expired_events = 0;
+
+ int64_t mdsmap_up_features = 0;
+ std::map<uint64_t,list<PendingEvent> > pending_events; // log segment -> event list
+ ceph::mutex submit_mutex = ceph::make_mutex("MDLog::submit_mutex");
+ ceph::condition_variable submit_cond;
+
+private:
+ friend class C_MaybeExpiredSegment;
+ friend class C_MDL_Flushed;
+ friend class C_OFT_Committed;
+
+ // -- segments --
+ void _start_new_segment();
+ void _prepare_new_segment();
+ void _journal_segment_subtree_map(MDSContext *onsync);
+
+ void try_expire(LogSegment *ls, int op_prio);
+ void _maybe_expired(LogSegment *ls, int op_prio);
+ void _expired(LogSegment *ls);
+ void _trim_expired_segments();
+ void write_head(MDSContext *onfinish);
+
+ // -- events --
+ LogEvent *cur_event = nullptr;
+};
#endif