void MDLog::_start_entry(LogEvent *e)
{
- ceph_assert(submit_mutex.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex));
ceph_assert(cur_event == NULL);
cur_event = e;
void MDLog::_submit_entry(LogEvent *le, MDSLogContextBase *c)
{
- ceph_assert(submit_mutex.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex));
ceph_assert(!mds->is_any_replay());
ceph_assert(!capped);
{
dout(10) << "_submit_thread start" << dendl;
- submit_mutex.Lock();
+ std::unique_lock locker{submit_mutex};
while (!mds->is_daemon_stopping()) {
if (g_conf()->mds_log_pause) {
- submit_cond.Wait(submit_mutex);
+ submit_cond.wait(locker);
continue;
}
map<uint64_t,list<PendingEvent> >::iterator it = pending_events.begin();
if (it == pending_events.end()) {
- submit_cond.Wait(submit_mutex);
+ submit_cond.wait(locker);
continue;
}
PendingEvent data = it->second.front();
it->second.pop_front();
- submit_mutex.Unlock();
+ locker.unlock();
if (data.le) {
LogEvent *le = data.le;
journaler->flush();
}
- submit_mutex.Lock();
+ locker.lock();
if (data.flush)
unflushed = 0;
else if (data.le)
unflushed++;
}
-
- submit_mutex.Unlock();
}
void MDLog::wait_for_safe(MDSContext *c)
{
- submit_mutex.Lock();
+ submit_mutex.lock();
bool no_pending = true;
if (!pending_events.empty()) {
pending_events.rbegin()->second.push_back(PendingEvent(NULL, c));
no_pending = false;
- submit_cond.Signal();
+ submit_cond.notify_all();
}
- submit_mutex.Unlock();
+ submit_mutex.unlock();
if (no_pending && c)
journaler->wait_for_flush(new C_IO_Wrapper(mds, c));
void MDLog::flush()
{
- submit_mutex.Lock();
+ submit_mutex.lock();
bool do_flush = unflushed > 0;
unflushed = 0;
if (!pending_events.empty()) {
pending_events.rbegin()->second.push_back(PendingEvent(NULL, NULL, true));
do_flush = false;
- submit_cond.Signal();
+ submit_cond.notify_all();
}
- submit_mutex.Unlock();
+ submit_mutex.unlock();
if (do_flush)
journaler->flush();
void MDLog::kick_submitter()
{
std::lock_guard l(submit_mutex);
- submit_cond.Signal();
+ submit_cond.notify_all();
}
void MDLog::cap()
void MDLog::shutdown()
{
- ceph_assert(mds->mds_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(mds->mds_lock));
dout(5) << "shutdown" << dendl;
if (submit_thread.is_started()) {
// returning from suicide, and subsequently respect mds->is_daemon_stopping()
// and fall out of its loop.
} else {
- mds->mds_lock.Unlock();
+ mds->mds_lock.unlock();
// Because MDS::stopping is true, it's safe to drop mds_lock: nobody else
// picking it up will do anything with it.
- submit_mutex.Lock();
- submit_cond.Signal();
- submit_mutex.Unlock();
+ submit_mutex.lock();
+ submit_cond.notify_all();
+ submit_mutex.unlock();
- mds->mds_lock.Lock();
+ mds->mds_lock.lock();
submit_thread.join();
}
}
if (replay_thread.is_started() && !replay_thread.am_self()) {
- mds->mds_lock.Unlock();
+ mds->mds_lock.unlock();
replay_thread.join();
- mds->mds_lock.Lock();
+ mds->mds_lock.lock();
}
if (recovery_thread.is_started() && !recovery_thread.am_self()) {
- mds->mds_lock.Unlock();
+ mds->mds_lock.unlock();
recovery_thread.join();
- mds->mds_lock.Lock();
+ mds->mds_lock.lock();
}
}
void MDLog::_prepare_new_segment()
{
- ceph_assert(submit_mutex.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex));
uint64_t seq = event_seq + 1;
dout(7) << __func__ << " seq " << seq << dendl;
void MDLog::_journal_segment_subtree_map(MDSContext *onsync)
{
- ceph_assert(submit_mutex.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex));
dout(7) << __func__ << dendl;
ESubtreeMap *sle = mds->mdcache->create_subtree_map();
max_events = g_conf()->mds_log_events_per_segment + 1;
}
- submit_mutex.Lock();
+ submit_mutex.lock();
// trim!
dout(10) << "trim "
<< dendl;
if (segments.empty()) {
- submit_mutex.Unlock();
+ submit_mutex.unlock();
return;
}
new_expiring_segments++;
expiring_segments.insert(ls);
expiring_events += ls->num_events;
- submit_mutex.Unlock();
+ submit_mutex.unlock();
uint64_t last_seq = ls->seq;
try_expire(ls, op_prio);
- submit_mutex.Lock();
+ submit_mutex.lock();
p = segments.lower_bound(last_seq + 1);
}
}
uint64_t last_seq = get_last_segment_seq();
if (mds->mdcache->open_file_table.is_any_dirty() ||
last_seq > mds->mdcache->open_file_table.get_committed_log_seq()) {
- submit_mutex.Unlock();
+ submit_mutex.unlock();
mds->mdcache->open_file_table.commit(new C_OFT_Committed(this, last_seq),
last_seq, CEPH_MSG_PRIO_HIGH);
- submit_mutex.Lock();
+ submit_mutex.lock();
}
}
*/
int MDLog::trim_all()
{
- submit_mutex.Lock();
+ submit_mutex.lock();
dout(10) << __func__ << ": "
<< segments.size()
if (!capped &&
!mds->mdcache->open_file_table.is_any_committing() &&
last_seq > mds->mdcache->open_file_table.get_committing_log_seq()) {
- submit_mutex.Unlock();
+ submit_mutex.unlock();
mds->mdcache->open_file_table.commit(new C_OFT_Committed(this, last_seq),
last_seq, CEPH_MSG_PRIO_DEFAULT);
- submit_mutex.Lock();
+ submit_mutex.lock();
}
}
// Caller should have flushed journaler before calling this
if (pending_events.count(ls->seq)) {
dout(5) << __func__ << ": segment " << ls->seq << " has pending events" << dendl;
- submit_mutex.Unlock();
+ submit_mutex.unlock();
return -EAGAIN;
}
ceph_assert(expiring_segments.count(ls) == 0);
expiring_segments.insert(ls);
expiring_events += ls->num_events;
- submit_mutex.Unlock();
+ submit_mutex.unlock();
uint64_t next_seq = ls->seq + 1;
try_expire(ls, CEPH_MSG_PRIO_DEFAULT);
- submit_mutex.Lock();
+ submit_mutex.lock();
p = segments.lower_bound(next_seq);
}
}
gather_bld.activate();
} else {
dout(10) << "try_expire expired segment " << ls->seq << "/" << ls->offset << dendl;
- submit_mutex.Lock();
+ submit_mutex.lock();
ceph_assert(expiring_segments.count(ls));
expiring_segments.erase(ls);
expiring_events -= ls->num_events;
_expired(ls);
- submit_mutex.Unlock();
+ submit_mutex.unlock();
}
logger->set(l_mdl_segexg, expiring_segments.size());
void MDLog::_trim_expired_segments()
{
- ceph_assert(submit_mutex.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex));
uint64_t oft_committed_seq = mds->mdcache->open_file_table.get_committed_log_seq();
trimmed = true;
}
- submit_mutex.Unlock();
+ submit_mutex.unlock();
if (trimmed)
journaler->write_head(0);
void MDLog::trim_expired_segments()
{
- submit_mutex.Lock();
+ submit_mutex.lock();
_trim_expired_segments();
}
void MDLog::_expired(LogSegment *ls)
{
- ceph_assert(submit_mutex.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex));
dout(5) << "_expired segment " << ls->seq << "/" << ls->offset
<< ", " << ls->num_events << " events" << dendl;
// a log segment. Because we change serialization, this will end up changing
// for us, so we have to explicitly update the fields that point back to that
// log segment.
- std::map<log_segment_seq_t, log_segment_seq_t> segment_pos_rewrite;
+ std::map<LogSegment::seq_t, LogSegment::seq_t> segment_pos_rewrite;
// The logic in here borrowed from replay_thread expects mds_lock to be held,
// e.g. between checking readable and doing wait_for_readable so that journaler