1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #ifndef CEPH_LIBRBD_JOURNAL_H
5 #define CEPH_LIBRBD_JOURNAL_H
7 #include "include/int_types.h"
8 #include "include/Context.h"
9 #include "include/interval_set.h"
10 #include "include/rados/librados_fwd.hpp"
11 #include "common/AsyncOpTracker.h"
12 #include "common/Cond.h"
13 #include "common/RefCountedObj.h"
14 #include "journal/Future.h"
15 #include "journal/JournalMetadataListener.h"
16 #include "journal/ReplayEntry.h"
17 #include "journal/ReplayHandler.h"
18 #include "librbd/Utils.h"
19 #include "librbd/asio/ContextWQ.h"
20 #include "librbd/journal/Types.h"
21 #include "librbd/journal/TypeTraits.h"
27 #include <unordered_map>
31 namespace journal
{ class Journaler
; }
37 namespace journal
{ template <typename
> class Replay
; }
39 template <typename ImageCtxT
= ImageCtx
>
40 class Journal
: public RefCountedObject
{
48 * UNINITIALIZED ---> INITIALIZING ---> REPLAYING ------> FLUSHING ---> READY
51 * | * . | (error) * . . . . . . . * |
54 * | * . | FLUSHING_RESTART . * |
58 * | * . | RESTARTING < * * * * * STOPPING
61 * | * * * * * * . \-------------/ . |
63 * | * . . . . . . . . . . . . . . . . |
66 * | CLOSED <----- CLOSING <---------------------------------------/
77 STATE_FLUSHING_RESTART
,
78 STATE_RESTARTING_REPLAY
,
79 STATE_FLUSHING_REPLAY
,
86 static const std::string IMAGE_CLIENT_ID
;
87 static const std::string LOCAL_MIRROR_UUID
;
88 static const std::string ORPHAN_MIRROR_UUID
;
90 Journal(ImageCtxT
&image_ctx
);
93 static void get_work_queue(CephContext
*cct
, ContextWQ
**work_queue
);
95 static bool is_journal_supported(ImageCtxT
&image_ctx
);
96 static int create(librados::IoCtx
&io_ctx
, const std::string
&image_id
,
97 uint8_t order
, uint8_t splay_width
,
98 const std::string
&object_pool
);
99 static int remove(librados::IoCtx
&io_ctx
, const std::string
&image_id
);
100 static int reset(librados::IoCtx
&io_ctx
, const std::string
&image_id
);
102 static void is_tag_owner(ImageCtxT
*image_ctx
, bool *is_tag_owner
,
104 static void is_tag_owner(librados::IoCtx
& io_ctx
, std::string
& image_id
,
105 bool *is_tag_owner
, asio::ContextWQ
*op_work_queue
,
107 static void get_tag_owner(librados::IoCtx
& io_ctx
, std::string
& image_id
,
108 std::string
*mirror_uuid
,
109 asio::ContextWQ
*op_work_queue
, Context
*on_finish
);
110 static int request_resync(ImageCtxT
*image_ctx
);
111 static void promote(ImageCtxT
*image_ctx
, Context
*on_finish
);
112 static void demote(ImageCtxT
*image_ctx
, Context
*on_finish
);
114 bool is_journal_ready() const;
115 bool is_journal_replaying() const;
116 bool is_journal_appending() const;
118 void wait_for_journal_ready(Context
*on_ready
);
120 void open(Context
*on_finish
);
121 void close(Context
*on_finish
);
123 bool is_tag_owner() const;
124 uint64_t get_tag_tid() const;
125 journal::TagData
get_tag_data() const;
127 void allocate_local_tag(Context
*on_finish
);
128 void allocate_tag(const std::string
&mirror_uuid
,
129 const journal::TagPredecessor
&predecessor
,
132 void flush_commit_position(Context
*on_finish
);
136 uint64_t append_write_event(uint64_t offset
, size_t length
,
137 const bufferlist
&bl
,
139 uint64_t append_io_event(journal::EventEntry
&&event_entry
,
140 uint64_t offset
, size_t length
,
141 bool flush_entry
, int filter_ret_val
);
142 void commit_io_event(uint64_t tid
, int r
);
143 void commit_io_event_extent(uint64_t tid
, uint64_t offset
, uint64_t length
,
146 void append_op_event(uint64_t op_tid
, journal::EventEntry
&&event_entry
,
148 void commit_op_event(uint64_t tid
, int r
, Context
*on_safe
);
149 void replay_op_ready(uint64_t op_tid
, Context
*on_resume
);
151 void flush_event(uint64_t tid
, Context
*on_safe
);
152 void wait_event(uint64_t tid
, Context
*on_safe
);
154 uint64_t allocate_op_tid() {
155 uint64_t op_tid
= ++m_op_tid
;
156 ceph_assert(op_tid
!= 0);
160 void start_external_replay(journal::Replay
<ImageCtxT
> **journal_replay
,
162 void stop_external_replay();
164 void add_listener(journal::Listener
*listener
);
165 void remove_listener(journal::Listener
*listener
);
167 int is_resync_requested(bool *do_resync
);
169 inline ContextWQ
*get_work_queue() {
174 ImageCtxT
&m_image_ctx
;
176 // mock unit testing support
177 typedef journal::TypeTraits
<ImageCtxT
> TypeTraits
;
178 typedef typename
TypeTraits::Journaler Journaler
;
179 typedef typename
TypeTraits::Future Future
;
180 typedef typename
TypeTraits::ReplayEntry ReplayEntry
;
182 typedef std::list
<bufferlist
> Bufferlists
;
183 typedef std::list
<Context
*> Contexts
;
184 typedef std::list
<Future
> Futures
;
185 typedef interval_set
<uint64_t> ExtentInterval
;
189 Contexts on_safe_contexts
;
190 ExtentInterval pending_extents
;
191 int filter_ret_val
= 0;
192 bool committed_io
= false;
198 Event(const Futures
&_futures
, uint64_t offset
, size_t length
,
200 : futures(_futures
), filter_ret_val(filter_ret_val
) {
202 pending_extents
.insert(offset
, length
);
207 typedef std::unordered_map
<uint64_t, Event
> Events
;
208 typedef std::unordered_map
<uint64_t, Future
> TidToFutures
;
210 struct C_IOEventSafe
: public Context
{
214 C_IOEventSafe(Journal
*_journal
, uint64_t _tid
)
215 : journal(_journal
), tid(_tid
) {
218 void finish(int r
) override
{
219 journal
->handle_io_event_safe(r
, tid
);
223 struct C_OpEventSafe
: public Context
{
226 Future op_start_future
;
227 Future op_finish_future
;
230 C_OpEventSafe(Journal
*journal
, uint64_t tid
, const Future
&op_start_future
,
231 const Future
&op_finish_future
, Context
*on_safe
)
232 : journal(journal
), tid(tid
), op_start_future(op_start_future
),
233 op_finish_future(op_finish_future
), on_safe(on_safe
) {
236 void finish(int r
) override
{
237 journal
->handle_op_event_safe(r
, tid
, op_start_future
, op_finish_future
,
242 struct C_ReplayProcessSafe
: public Context
{
244 ReplayEntry replay_entry
;
246 C_ReplayProcessSafe(Journal
*journal
, ReplayEntry
&&replay_entry
) :
247 journal(journal
), replay_entry(std::move(replay_entry
)) {
249 void finish(int r
) override
{
250 journal
->handle_replay_process_safe(replay_entry
, r
);
254 struct ReplayHandler
: public ::journal::ReplayHandler
{
256 ReplayHandler(Journal
*_journal
) : journal(_journal
) {
259 void handle_entries_available() override
{
260 journal
->handle_replay_ready();
262 void handle_complete(int r
) override
{
263 journal
->handle_replay_complete(r
);
267 ContextWQ
*m_work_queue
= nullptr;
268 SafeTimer
*m_timer
= nullptr;
269 ceph::mutex
*m_timer_lock
= nullptr;
271 Journaler
*m_journaler
;
272 mutable ceph::mutex m_lock
= ceph::make_mutex("Journal<I>::m_lock");
274 uint64_t m_max_append_size
= 0;
275 uint64_t m_tag_class
= 0;
276 uint64_t m_tag_tid
= 0;
277 journal::ImageClientMeta m_client_meta
;
278 journal::TagData m_tag_data
;
281 Contexts m_wait_for_state_contexts
;
283 ReplayHandler m_replay_handler
;
284 bool m_close_pending
;
286 ceph::mutex m_event_lock
= ceph::make_mutex("Journal<I>::m_event_lock");
287 uint64_t m_event_tid
;
290 std::atomic
<bool> m_user_flushed
= false;
292 std::atomic
<uint64_t> m_op_tid
= { 0 };
293 TidToFutures m_op_futures
;
295 bool m_processing_entry
= false;
296 bool m_blocking_writes
;
298 journal::Replay
<ImageCtxT
> *m_journal_replay
;
300 AsyncOpTracker m_async_journal_op_tracker
;
302 struct MetadataListener
: public ::journal::JournalMetadataListener
{
303 Journal
<ImageCtxT
> *journal
;
305 MetadataListener(Journal
<ImageCtxT
> *journal
) : journal(journal
) { }
307 void handle_update(::journal::JournalMetadata
*) override
;
308 } m_metadata_listener
;
310 typedef std::set
<journal::Listener
*> Listeners
;
311 Listeners m_listeners
;
312 ceph::condition_variable m_listener_cond
;
313 bool m_listener_notify
= false;
315 uint64_t m_refresh_sequence
= 0;
317 bool is_journal_replaying(const ceph::mutex
&) const;
318 bool is_tag_owner(const ceph::mutex
&) const;
320 uint64_t append_io_events(journal::EventType event_type
,
321 const Bufferlists
&bufferlists
,
322 uint64_t offset
, size_t length
, bool flush_entry
,
324 Future
wait_event(ceph::mutex
&lock
, uint64_t tid
, Context
*on_safe
);
326 void create_journaler();
327 void destroy_journaler(int r
);
328 void recreate_journaler(int r
);
330 void complete_event(typename
Events::iterator it
, int r
);
334 void handle_open(int r
);
336 void handle_replay_ready();
337 void handle_replay_complete(int r
);
338 void handle_replay_process_ready(int r
);
339 void handle_replay_process_safe(ReplayEntry replay_entry
, int r
);
341 void handle_start_external_replay(int r
,
342 journal::Replay
<ImageCtxT
> **journal_replay
,
345 void handle_flushing_restart(int r
);
346 void handle_flushing_replay();
348 void handle_recording_stopped(int r
);
350 void handle_journal_destroyed(int r
);
352 void handle_io_event_safe(int r
, uint64_t tid
);
353 void handle_op_event_safe(int r
, uint64_t tid
, const Future
&op_start_future
,
354 const Future
&op_finish_future
, Context
*on_safe
);
356 void stop_recording();
358 void transition_state(State state
, int r
);
360 bool is_steady_state() const;
361 void wait_for_steady_state(Context
*on_state
);
363 int check_resync_requested(bool *do_resync
);
365 void handle_metadata_updated();
366 void handle_refresh_metadata(uint64_t refresh_sequence
, uint64_t tag_tid
,
367 journal::TagData tag_data
, int r
);
371 } // namespace librbd
373 extern template class librbd::Journal
<librbd::ImageCtx
>;
375 #endif // CEPH_LIBRBD_JOURNAL_H