timer->shutdown();
}
delete timer;
+ timer = nullptr;
work_queue->drain();
delete work_queue;
+ work_queue = nullptr;
thread_pool->stop();
delete thread_pool;
+ thread_pool = nullptr;
}
Journaler::Journaler(librados::IoCtx &header_ioctx,
Journaler::~Journaler() {
if (m_metadata != nullptr) {
- assert(!m_metadata->is_initialized());
+ ceph_assert(!m_metadata->is_initialized());
if (!m_initialized) {
// never initialized -- ensure any in-flight ops are complete
// since we wouldn't expect shut_down to be invoked
m_metadata->put();
m_metadata = nullptr;
}
- assert(m_trimmer == nullptr);
- assert(m_player == nullptr);
- assert(m_recorder == nullptr);
+ ceph_assert(m_trimmer == nullptr);
+ ceph_assert(m_player == nullptr);
+ ceph_assert(m_recorder == nullptr);
delete m_threads;
+ m_threads = nullptr;
}
void Journaler::exists(Context *on_finish) const {
librados::ObjectReadOperation op;
- op.stat(NULL, NULL, NULL);
+ op.stat(nullptr, nullptr, nullptr);
librados::AioCompletion *comp =
librados::Rados::aio_create_completion(on_finish, nullptr, rados_ctx_callback);
- int r = m_header_ioctx.aio_operate(m_header_oid, comp, &op, NULL);
- assert(r == 0);
+ int r = m_header_ioctx.aio_operate(m_header_oid, comp, &op, nullptr);
+ ceph_assert(r == 0);
comp->release();
}
}
void Journaler::shut_down(Context *on_finish) {
- assert(m_player == nullptr);
- assert(m_recorder == nullptr);
+ ceph_assert(m_player == nullptr);
+ ceph_assert(m_recorder == nullptr);
JournalMetadata *metadata = nullptr;
+ ceph_assert(m_metadata != nullptr);
std::swap(metadata, m_metadata);
- assert(metadata != nullptr);
+ ceph_assert(metadata != nullptr);
on_finish = new FunctionContext([metadata, on_finish](int r) {
metadata->put();
JournalTrimmer *trimmer = nullptr;
std::swap(trimmer, m_trimmer);
- if (trimmer == nullptr) {
+ if (!trimmer) {
metadata->shut_down(on_finish);
return;
}
librados::AioCompletion *comp =
librados::Rados::aio_create_completion(on_finish, nullptr, rados_ctx_callback);
int r = m_header_ioctx.aio_operate(m_header_oid, comp, &op);
- assert(r == 0);
+ ceph_assert(r == 0);
comp->release();
}
librados::AioCompletion *comp = librados::Rados::aio_create_completion(
on_finish, nullptr, utils::rados_ctx_callback);
r = m_header_ioctx.aio_remove(m_header_oid, comp);
- assert(r == 0);
+ ceph_assert(r == 0);
comp->release();
});
bool Journaler::try_pop_front(ReplayEntry *replay_entry,
uint64_t *tag_tid) {
- assert(m_player != NULL);
+ ceph_assert(m_player != nullptr);
Entry entry;
uint64_t commit_tid;
void Journaler::stop_replay(Context *on_finish) {
JournalPlayer *player = nullptr;
+ ceph_assert(m_player != nullptr);
std::swap(player, m_player);
- assert(player != nullptr);
+ ceph_assert(player != nullptr);
on_finish = new FunctionContext([player, on_finish](int r) {
delete player;
}
void Journaler::start_append(int flush_interval, uint64_t flush_bytes,
- double flush_age) {
- assert(m_recorder == NULL);
+ double flush_age, uint64_t max_in_flight_appends) {
+ ceph_assert(m_recorder == nullptr);
// TODO verify active object set >= current replay object set
m_recorder = new JournalRecorder(m_data_ioctx, m_object_oid_prefix,
m_metadata, flush_interval, flush_bytes,
- flush_age);
+ flush_age, max_in_flight_appends);
}
void Journaler::stop_append(Context *on_safe) {
JournalRecorder *recorder = nullptr;
+ ceph_assert(m_recorder != nullptr);
std::swap(recorder, m_recorder);
- assert(recorder != nullptr);
+ ceph_assert(recorder != nullptr);
on_safe = new FunctionContext([recorder, on_safe](int r) {
delete recorder;
uint64_t max_payload_size = m_metadata->get_object_size() -
Entry::get_fixed_size();
if (m_metadata->get_settings().max_payload_bytes > 0) {
- max_payload_size = MIN(max_payload_size,
- m_metadata->get_settings().max_payload_bytes);
+ max_payload_size = std::min(max_payload_size,
+ m_metadata->get_settings().max_payload_bytes);
}
return max_payload_size;
}
}
void Journaler::create_player(ReplayHandler *replay_handler) {
- assert(m_player == NULL);
+ ceph_assert(m_player == nullptr);
m_player = new JournalPlayer(m_data_ioctx, m_object_oid_prefix, m_metadata,
replay_handler);
}
void Journaler::get_metadata(uint8_t *order, uint8_t *splay_width,
int64_t *pool_id) {
- assert(m_metadata != NULL);
+ ceph_assert(m_metadata != nullptr);
*order = m_metadata->get_order();
*splay_width = m_metadata->get_splay_width();
std::ostream &operator<<(std::ostream &os,
const Journaler &journaler) {
os << "[metadata=";
- if (journaler.m_metadata != NULL) {
+ if (journaler.m_metadata) {
os << *journaler.m_metadata;
} else {
os << "NULL";