1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "journal/Journaler.h"
5 #include "include/stringify.h"
6 #include "common/errno.h"
7 #include "common/Timer.h"
8 #include "common/WorkQueue.h"
9 #include "journal/Entry.h"
10 #include "journal/FutureImpl.h"
11 #include "journal/JournalMetadata.h"
12 #include "journal/JournalPlayer.h"
13 #include "journal/JournalRecorder.h"
14 #include "journal/JournalTrimmer.h"
15 #include "journal/ReplayEntry.h"
16 #include "journal/ReplayHandler.h"
17 #include "cls/journal/cls_journal_client.h"
18 #include "cls/journal/cls_journal_types.h"
21 #define dout_subsys ceph_subsys_journaler
23 #define dout_prefix *_dout << "Journaler: " << this << " "
29 static const std::string JOURNAL_HEADER_PREFIX
= "journal.";
30 static const std::string JOURNAL_OBJECT_PREFIX
= "journal_data.";
32 } // anonymous namespace
34 using namespace cls::journal
;
35 using utils::rados_ctx_callback
;
37 std::string
Journaler::header_oid(const std::string
&journal_id
) {
38 return JOURNAL_HEADER_PREFIX
+ journal_id
;
41 std::string
Journaler::object_oid_prefix(int pool_id
,
42 const std::string
&journal_id
) {
43 return JOURNAL_OBJECT_PREFIX
+ stringify(pool_id
) + "." + journal_id
+ ".";
46 Journaler::Threads::Threads(CephContext
*cct
)
47 : timer_lock("Journaler::timer_lock") {
48 thread_pool
= new ThreadPool(cct
, "Journaler::thread_pool", "tp_journal", 1);
51 work_queue
= new ContextWQ("Journaler::work_queue", 60, thread_pool
);
53 timer
= new SafeTimer(cct
, timer_lock
, true);
57 Journaler::Threads::~Threads() {
59 Mutex::Locker
timer_locker(timer_lock
);
71 Journaler::Journaler(librados::IoCtx
&header_ioctx
,
72 const std::string
&journal_id
,
73 const std::string
&client_id
, const Settings
&settings
)
74 : m_threads(new Threads(reinterpret_cast<CephContext
*>(header_ioctx
.cct()))),
75 m_client_id(client_id
) {
76 set_up(m_threads
->work_queue
, m_threads
->timer
, &m_threads
->timer_lock
,
77 header_ioctx
, journal_id
, settings
);
80 Journaler::Journaler(ContextWQ
*work_queue
, SafeTimer
*timer
,
81 Mutex
*timer_lock
, librados::IoCtx
&header_ioctx
,
82 const std::string
&journal_id
,
83 const std::string
&client_id
, const Settings
&settings
)
84 : m_client_id(client_id
) {
85 set_up(work_queue
, timer
, timer_lock
, header_ioctx
, journal_id
,
89 void Journaler::set_up(ContextWQ
*work_queue
, SafeTimer
*timer
,
90 Mutex
*timer_lock
, librados::IoCtx
&header_ioctx
,
91 const std::string
&journal_id
,
92 const Settings
&settings
) {
93 m_header_ioctx
.dup(header_ioctx
);
94 m_cct
= reinterpret_cast<CephContext
*>(m_header_ioctx
.cct());
96 m_header_oid
= header_oid(journal_id
);
97 m_object_oid_prefix
= object_oid_prefix(m_header_ioctx
.get_id(), journal_id
);
99 m_metadata
= new JournalMetadata(work_queue
, timer
, timer_lock
,
100 m_header_ioctx
, m_header_oid
, m_client_id
,
105 Journaler::~Journaler() {
106 if (m_metadata
!= nullptr) {
107 assert(!m_metadata
->is_initialized());
108 if (!m_initialized
) {
109 // never initialized -- ensure any in-flight ops are complete
110 // since we wouldn't expect shut_down to be invoked
111 m_metadata
->wait_for_ops();
114 m_metadata
= nullptr;
116 assert(m_trimmer
== nullptr);
117 assert(m_player
== nullptr);
118 assert(m_recorder
== nullptr);
123 void Journaler::exists(Context
*on_finish
) const {
124 librados::ObjectReadOperation op
;
125 op
.stat(NULL
, NULL
, NULL
);
127 librados::AioCompletion
*comp
=
128 librados::Rados::aio_create_completion(on_finish
, nullptr, rados_ctx_callback
);
129 int r
= m_header_ioctx
.aio_operate(m_header_oid
, comp
, &op
, NULL
);
134 void Journaler::init(Context
*on_init
) {
135 m_initialized
= true;
136 m_metadata
->init(new C_InitJournaler(this, on_init
));
139 int Journaler::init_complete() {
140 int64_t pool_id
= m_metadata
->get_pool_id();
142 if (pool_id
< 0 || pool_id
== m_header_ioctx
.get_id()) {
143 ldout(m_cct
, 20) << "using image pool for journal data" << dendl
;
144 m_data_ioctx
.dup(m_header_ioctx
);
146 ldout(m_cct
, 20) << "using pool id=" << pool_id
<< " for journal data"
148 librados::Rados
rados(m_header_ioctx
);
149 int r
= rados
.ioctx_create2(pool_id
, m_data_ioctx
);
152 ldout(m_cct
, 1) << "pool id=" << pool_id
<< " no longer exists"
158 m_trimmer
= new JournalTrimmer(m_data_ioctx
, m_object_oid_prefix
,
163 void Journaler::shut_down() {
169 void Journaler::shut_down(Context
*on_finish
) {
170 assert(m_player
== nullptr);
171 assert(m_recorder
== nullptr);
173 JournalMetadata
*metadata
= nullptr;
174 std::swap(metadata
, m_metadata
);
175 assert(metadata
!= nullptr);
177 on_finish
= new FunctionContext([metadata
, on_finish
](int r
) {
179 on_finish
->complete(0);
182 JournalTrimmer
*trimmer
= nullptr;
183 std::swap(trimmer
, m_trimmer
);
184 if (trimmer
== nullptr) {
185 metadata
->shut_down(on_finish
);
189 on_finish
= new FunctionContext([trimmer
, metadata
, on_finish
](int r
) {
191 metadata
->shut_down(on_finish
);
193 trimmer
->shut_down(on_finish
);
196 bool Journaler::is_initialized() const {
197 return m_metadata
->is_initialized();
200 void Journaler::get_immutable_metadata(uint8_t *order
, uint8_t *splay_width
,
201 int64_t *pool_id
, Context
*on_finish
) {
202 m_metadata
->get_immutable_metadata(order
, splay_width
, pool_id
, on_finish
);
205 void Journaler::get_mutable_metadata(uint64_t *minimum_set
,
206 uint64_t *active_set
,
207 RegisteredClients
*clients
,
208 Context
*on_finish
) {
209 m_metadata
->get_mutable_metadata(minimum_set
, active_set
, clients
, on_finish
);
212 void Journaler::create(uint8_t order
, uint8_t splay_width
,
213 int64_t pool_id
, Context
*on_finish
) {
214 if (order
> 64 || order
< 12) {
215 lderr(m_cct
) << "order must be in the range [12, 64]" << dendl
;
216 on_finish
->complete(-EDOM
);
219 if (splay_width
== 0) {
220 on_finish
->complete(-EINVAL
);
224 ldout(m_cct
, 5) << "creating new journal: " << m_header_oid
<< dendl
;
226 librados::ObjectWriteOperation op
;
227 client::create(&op
, order
, splay_width
, pool_id
);
229 librados::AioCompletion
*comp
=
230 librados::Rados::aio_create_completion(on_finish
, nullptr, rados_ctx_callback
);
231 int r
= m_header_ioctx
.aio_operate(m_header_oid
, comp
, &op
);
236 void Journaler::remove(bool force
, Context
*on_finish
) {
237 // chain journal removal (reverse order)
238 on_finish
= new FunctionContext([this, on_finish
](int r
) {
239 librados::AioCompletion
*comp
= librados::Rados::aio_create_completion(
240 on_finish
, nullptr, utils::rados_ctx_callback
);
241 r
= m_header_ioctx
.aio_remove(m_header_oid
, comp
);
246 on_finish
= new FunctionContext([this, force
, on_finish
](int r
) {
247 m_trimmer
->remove_objects(force
, on_finish
);
250 m_metadata
->shut_down(on_finish
);
253 void Journaler::flush_commit_position(Context
*on_safe
) {
254 m_metadata
->flush_commit_position(on_safe
);
257 void Journaler::add_listener(JournalMetadataListener
*listener
) {
258 m_metadata
->add_listener(listener
);
261 void Journaler::remove_listener(JournalMetadataListener
*listener
) {
262 m_metadata
->remove_listener(listener
);
265 int Journaler::register_client(const bufferlist
&data
) {
267 register_client(data
, &cond
);
271 int Journaler::unregister_client() {
273 unregister_client(&cond
);
277 void Journaler::register_client(const bufferlist
&data
, Context
*on_finish
) {
278 return m_metadata
->register_client(data
, on_finish
);
281 void Journaler::update_client(const bufferlist
&data
, Context
*on_finish
) {
282 return m_metadata
->update_client(data
, on_finish
);
285 void Journaler::unregister_client(Context
*on_finish
) {
286 return m_metadata
->unregister_client(on_finish
);
289 void Journaler::get_client(const std::string
&client_id
,
290 cls::journal::Client
*client
,
291 Context
*on_finish
) {
292 m_metadata
->get_client(client_id
, client
, on_finish
);
295 int Journaler::get_cached_client(const std::string
&client_id
,
296 cls::journal::Client
*client
) {
297 RegisteredClients clients
;
298 m_metadata
->get_registered_clients(&clients
);
300 auto it
= clients
.find({client_id
, {}});
301 if (it
== clients
.end()) {
309 void Journaler::allocate_tag(const bufferlist
&data
, cls::journal::Tag
*tag
,
310 Context
*on_finish
) {
311 m_metadata
->allocate_tag(cls::journal::Tag::TAG_CLASS_NEW
, data
, tag
,
315 void Journaler::allocate_tag(uint64_t tag_class
, const bufferlist
&data
,
316 cls::journal::Tag
*tag
, Context
*on_finish
) {
317 m_metadata
->allocate_tag(tag_class
, data
, tag
, on_finish
);
320 void Journaler::get_tag(uint64_t tag_tid
, Tag
*tag
, Context
*on_finish
) {
321 m_metadata
->get_tag(tag_tid
, tag
, on_finish
);
324 void Journaler::get_tags(uint64_t tag_class
, Tags
*tags
, Context
*on_finish
) {
325 m_metadata
->get_tags(0, tag_class
, tags
, on_finish
);
328 void Journaler::get_tags(uint64_t start_after_tag_tid
, uint64_t tag_class
,
329 Tags
*tags
, Context
*on_finish
) {
330 m_metadata
->get_tags(start_after_tag_tid
, tag_class
, tags
, on_finish
);
333 void Journaler::start_replay(ReplayHandler
*replay_handler
) {
334 create_player(replay_handler
);
335 m_player
->prefetch();
338 void Journaler::start_live_replay(ReplayHandler
*replay_handler
,
340 create_player(replay_handler
);
341 m_player
->prefetch_and_watch(interval
);
344 bool Journaler::try_pop_front(ReplayEntry
*replay_entry
,
346 assert(m_player
!= NULL
);
350 if (!m_player
->try_pop_front(&entry
, &commit_tid
)) {
354 *replay_entry
= ReplayEntry(entry
.get_data(), commit_tid
);
355 if (tag_tid
!= nullptr) {
356 *tag_tid
= entry
.get_tag_tid();
361 void Journaler::stop_replay() {
367 void Journaler::stop_replay(Context
*on_finish
) {
368 JournalPlayer
*player
= nullptr;
369 std::swap(player
, m_player
);
370 assert(player
!= nullptr);
372 on_finish
= new FunctionContext([player
, on_finish
](int r
) {
374 on_finish
->complete(r
);
376 player
->shut_down(on_finish
);
379 void Journaler::committed(const ReplayEntry
&replay_entry
) {
380 m_trimmer
->committed(replay_entry
.get_commit_tid());
383 void Journaler::committed(const Future
&future
) {
384 FutureImplPtr future_impl
= future
.get_future_impl();
385 m_trimmer
->committed(future_impl
->get_commit_tid());
388 void Journaler::start_append(int flush_interval
, uint64_t flush_bytes
,
390 assert(m_recorder
== NULL
);
392 // TODO verify active object set >= current replay object set
394 m_recorder
= new JournalRecorder(m_data_ioctx
, m_object_oid_prefix
,
395 m_metadata
, flush_interval
, flush_bytes
,
399 void Journaler::stop_append(Context
*on_safe
) {
400 JournalRecorder
*recorder
= nullptr;
401 std::swap(recorder
, m_recorder
);
402 assert(recorder
!= nullptr);
404 on_safe
= new FunctionContext([recorder
, on_safe
](int r
) {
406 on_safe
->complete(r
);
408 recorder
->flush(on_safe
);
411 uint64_t Journaler::get_max_append_size() const {
412 uint64_t max_payload_size
= m_metadata
->get_object_size() -
413 Entry::get_fixed_size();
414 if (m_metadata
->get_settings().max_payload_bytes
> 0) {
415 max_payload_size
= MIN(max_payload_size
,
416 m_metadata
->get_settings().max_payload_bytes
);
418 return max_payload_size
;
421 Future
Journaler::append(uint64_t tag_tid
, const bufferlist
&payload_bl
) {
422 return m_recorder
->append(tag_tid
, payload_bl
);
425 void Journaler::flush_append(Context
*on_safe
) {
426 m_recorder
->flush(on_safe
);
429 void Journaler::create_player(ReplayHandler
*replay_handler
) {
430 assert(m_player
== NULL
);
431 m_player
= new JournalPlayer(m_data_ioctx
, m_object_oid_prefix
, m_metadata
,
435 void Journaler::get_metadata(uint8_t *order
, uint8_t *splay_width
,
437 assert(m_metadata
!= NULL
);
439 *order
= m_metadata
->get_order();
440 *splay_width
= m_metadata
->get_splay_width();
441 *pool_id
= m_metadata
->get_pool_id();
444 std::ostream
&operator<<(std::ostream
&os
,
445 const Journaler
&journaler
) {
447 if (journaler
.m_metadata
!= NULL
) {
448 os
<< *journaler
.m_metadata
;
456 } // namespace journal