1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #ifndef CEPH_LIBRBD_JOURNAL_H
5 #define CEPH_LIBRBD_JOURNAL_H
7 #include "include/int_types.h"
8 #include "include/Context.h"
9 #include "include/interval_set.h"
10 #include "include/rados/librados_fwd.hpp"
11 #include "common/Cond.h"
12 #include "common/Mutex.h"
13 #include "common/Cond.h"
14 #include "common/WorkQueue.h"
15 #include "journal/Future.h"
16 #include "journal/JournalMetadataListener.h"
17 #include "journal/ReplayEntry.h"
18 #include "journal/ReplayHandler.h"
19 #include "librbd/Utils.h"
20 #include "librbd/journal/Types.h"
21 #include "librbd/journal/TypeTraits.h"
27 #include <unordered_map>
38 namespace journal
{ template <typename
> class Replay
; }
40 template <typename ImageCtxT
= ImageCtx
>
49 * UNINITIALIZED ---> INITIALIZING ---> REPLAYING ------> FLUSHING ---> READY
52 * | * . | (error) * . . . . . . . * |
55 * | * . | FLUSHING_RESTART . * |
59 * | * . | RESTARTING < * * * * * STOPPING
62 * | * * * * * * . \-------------/ . |
64 * | * . . . . . . . . . . . . . . . . |
67 * | CLOSED <----- CLOSING <---------------------------------------/
78 STATE_FLUSHING_RESTART
,
79 STATE_RESTARTING_REPLAY
,
80 STATE_FLUSHING_REPLAY
,
87 static const std::string IMAGE_CLIENT_ID
;
88 static const std::string LOCAL_MIRROR_UUID
;
89 static const std::string ORPHAN_MIRROR_UUID
;
91 Journal(ImageCtxT
&image_ctx
);
94 static bool is_journal_supported(ImageCtxT
&image_ctx
);
95 static int create(librados::IoCtx
&io_ctx
, const std::string
&image_id
,
96 uint8_t order
, uint8_t splay_width
,
97 const std::string
&object_pool
);
98 static int remove(librados::IoCtx
&io_ctx
, const std::string
&image_id
);
99 static int reset(librados::IoCtx
&io_ctx
, const std::string
&image_id
);
101 static void is_tag_owner(ImageCtxT
*image_ctx
, bool *is_tag_owner
,
103 static void is_tag_owner(librados::IoCtx
& io_ctx
, std::string
& image_id
,
104 bool *is_tag_owner
, ContextWQ
*op_work_queue
,
106 static void get_tag_owner(librados::IoCtx
& io_ctx
, std::string
& image_id
,
107 std::string
*mirror_uuid
,
108 ContextWQ
*op_work_queue
, Context
*on_finish
);
109 static int request_resync(ImageCtxT
*image_ctx
);
110 static void promote(ImageCtxT
*image_ctx
, Context
*on_finish
);
111 static void demote(ImageCtxT
*image_ctx
, Context
*on_finish
);
113 bool is_journal_ready() const;
114 bool is_journal_replaying() const;
115 bool is_journal_appending() const;
117 void wait_for_journal_ready(Context
*on_ready
);
119 void open(Context
*on_finish
);
120 void close(Context
*on_finish
);
122 bool is_tag_owner() const;
123 uint64_t get_tag_tid() const;
124 journal::TagData
get_tag_data() const;
126 void allocate_local_tag(Context
*on_finish
);
127 void allocate_tag(const std::string
&mirror_uuid
,
128 const journal::TagPredecessor
&predecessor
,
131 void flush_commit_position(Context
*on_finish
);
135 uint64_t append_write_event(uint64_t offset
, size_t length
,
136 const bufferlist
&bl
,
138 uint64_t append_io_event(journal::EventEntry
&&event_entry
,
139 uint64_t offset
, size_t length
,
140 bool flush_entry
, int filter_ret_val
);
141 void commit_io_event(uint64_t tid
, int r
);
142 void commit_io_event_extent(uint64_t tid
, uint64_t offset
, uint64_t length
,
145 void append_op_event(uint64_t op_tid
, journal::EventEntry
&&event_entry
,
147 void commit_op_event(uint64_t tid
, int r
, Context
*on_safe
);
148 void replay_op_ready(uint64_t op_tid
, Context
*on_resume
);
150 void flush_event(uint64_t tid
, Context
*on_safe
);
151 void wait_event(uint64_t tid
, Context
*on_safe
);
153 uint64_t allocate_op_tid() {
154 uint64_t op_tid
= ++m_op_tid
;
155 ceph_assert(op_tid
!= 0);
159 void start_external_replay(journal::Replay
<ImageCtxT
> **journal_replay
,
161 void stop_external_replay();
163 void add_listener(journal::Listener
*listener
);
164 void remove_listener(journal::Listener
*listener
);
166 int is_resync_requested(bool *do_resync
);
168 inline ContextWQ
*get_work_queue() {
173 ImageCtxT
&m_image_ctx
;
175 // mock unit testing support
176 typedef journal::TypeTraits
<ImageCtxT
> TypeTraits
;
177 typedef typename
TypeTraits::Journaler Journaler
;
178 typedef typename
TypeTraits::Future Future
;
179 typedef typename
TypeTraits::ReplayEntry ReplayEntry
;
181 typedef std::list
<bufferlist
> Bufferlists
;
182 typedef std::list
<Context
*> Contexts
;
183 typedef std::list
<Future
> Futures
;
184 typedef interval_set
<uint64_t> ExtentInterval
;
188 Contexts on_safe_contexts
;
189 ExtentInterval pending_extents
;
190 int filter_ret_val
= 0;
191 bool committed_io
= false;
197 Event(const Futures
&_futures
, uint64_t offset
, size_t length
,
199 : futures(_futures
), filter_ret_val(filter_ret_val
) {
201 pending_extents
.insert(offset
, length
);
206 typedef std::unordered_map
<uint64_t, Event
> Events
;
207 typedef std::unordered_map
<uint64_t, Future
> TidToFutures
;
209 struct C_IOEventSafe
: public Context
{
213 C_IOEventSafe(Journal
*_journal
, uint64_t _tid
)
214 : journal(_journal
), tid(_tid
) {
217 void finish(int r
) override
{
218 journal
->handle_io_event_safe(r
, tid
);
222 struct C_OpEventSafe
: public Context
{
225 Future op_start_future
;
226 Future op_finish_future
;
229 C_OpEventSafe(Journal
*journal
, uint64_t tid
, const Future
&op_start_future
,
230 const Future
&op_finish_future
, Context
*on_safe
)
231 : journal(journal
), tid(tid
), op_start_future(op_start_future
),
232 op_finish_future(op_finish_future
), on_safe(on_safe
) {
235 void finish(int r
) override
{
236 journal
->handle_op_event_safe(r
, tid
, op_start_future
, op_finish_future
,
241 struct C_ReplayProcessSafe
: public Context
{
243 ReplayEntry replay_entry
;
245 C_ReplayProcessSafe(Journal
*journal
, ReplayEntry
&&replay_entry
) :
246 journal(journal
), replay_entry(std::move(replay_entry
)) {
248 void finish(int r
) override
{
249 journal
->handle_replay_process_safe(replay_entry
, r
);
253 struct ReplayHandler
: public ::journal::ReplayHandler
{
255 ReplayHandler(Journal
*_journal
) : journal(_journal
) {
258 void get() override
{
261 void put() override
{
265 void handle_entries_available() override
{
266 journal
->handle_replay_ready();
268 void handle_complete(int r
) override
{
269 journal
->handle_replay_complete(r
);
273 ContextWQ
*m_work_queue
= nullptr;
274 SafeTimer
*m_timer
= nullptr;
275 Mutex
*m_timer_lock
= nullptr;
277 Journaler
*m_journaler
;
278 mutable Mutex m_lock
;
280 uint64_t m_max_append_size
= 0;
281 uint64_t m_tag_class
= 0;
282 uint64_t m_tag_tid
= 0;
283 journal::ImageClientMeta m_client_meta
;
284 journal::TagData m_tag_data
;
287 Contexts m_wait_for_state_contexts
;
289 ReplayHandler m_replay_handler
;
290 bool m_close_pending
;
293 uint64_t m_event_tid
;
296 std::atomic
<bool> m_user_flushed
= false;
298 std::atomic
<uint64_t> m_op_tid
= { 0 };
299 TidToFutures m_op_futures
;
301 bool m_processing_entry
= false;
302 bool m_blocking_writes
;
304 journal::Replay
<ImageCtxT
> *m_journal_replay
;
306 util::AsyncOpTracker m_async_journal_op_tracker
;
308 struct MetadataListener
: public ::journal::JournalMetadataListener
{
309 Journal
<ImageCtxT
> *journal
;
311 MetadataListener(Journal
<ImageCtxT
> *journal
) : journal(journal
) { }
313 void handle_update(::journal::JournalMetadata
*) override
{
314 FunctionContext
*ctx
= new FunctionContext([this](int r
) {
315 journal
->handle_metadata_updated();
317 journal
->m_work_queue
->queue(ctx
, 0);
319 } m_metadata_listener
;
321 typedef std::set
<journal::Listener
*> Listeners
;
322 Listeners m_listeners
;
323 Cond m_listener_cond
;
324 bool m_listener_notify
= false;
326 uint64_t m_refresh_sequence
= 0;
328 bool is_journal_replaying(const Mutex
&) const;
329 bool is_tag_owner(const Mutex
&) const;
331 uint64_t append_io_events(journal::EventType event_type
,
332 const Bufferlists
&bufferlists
,
333 uint64_t offset
, size_t length
, bool flush_entry
,
335 Future
wait_event(Mutex
&lock
, uint64_t tid
, Context
*on_safe
);
337 void create_journaler();
338 void destroy_journaler(int r
);
339 void recreate_journaler(int r
);
341 void complete_event(typename
Events::iterator it
, int r
);
345 void handle_open(int r
);
347 void handle_replay_ready();
348 void handle_replay_complete(int r
);
349 void handle_replay_process_ready(int r
);
350 void handle_replay_process_safe(ReplayEntry replay_entry
, int r
);
352 void handle_start_external_replay(int r
,
353 journal::Replay
<ImageCtxT
> **journal_replay
,
356 void handle_flushing_restart(int r
);
357 void handle_flushing_replay();
359 void handle_recording_stopped(int r
);
361 void handle_journal_destroyed(int r
);
363 void handle_io_event_safe(int r
, uint64_t tid
);
364 void handle_op_event_safe(int r
, uint64_t tid
, const Future
&op_start_future
,
365 const Future
&op_finish_future
, Context
*on_safe
);
367 void stop_recording();
369 void transition_state(State state
, int r
);
371 bool is_steady_state() const;
372 void wait_for_steady_state(Context
*on_state
);
374 int check_resync_requested(bool *do_resync
);
376 void handle_metadata_updated();
377 void handle_refresh_metadata(uint64_t refresh_sequence
, uint64_t tag_tid
,
378 journal::TagData tag_data
, int r
);
382 } // namespace librbd
384 extern template class librbd::Journal
<librbd::ImageCtx
>;
386 #endif // CEPH_LIBRBD_JOURNAL_H