1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #ifndef CEPH_LIBRBD_JOURNAL_H
5 #define CEPH_LIBRBD_JOURNAL_H
7 #include "include/int_types.h"
8 #include "include/Context.h"
9 #include "include/interval_set.h"
10 #include "include/rados/librados_fwd.hpp"
11 #include "common/AsyncOpTracker.h"
12 #include "common/Cond.h"
13 #include "common/Timer.h"
14 #include "common/RefCountedObj.h"
15 #include "journal/Future.h"
16 #include "journal/JournalMetadataListener.h"
17 #include "journal/ReplayEntry.h"
18 #include "journal/ReplayHandler.h"
19 #include "librbd/Utils.h"
20 #include "librbd/asio/ContextWQ.h"
21 #include "librbd/journal/Types.h"
22 #include "librbd/journal/TypeTraits.h"
28 #include <unordered_map>
31 namespace journal
{ class Journaler
; }
37 namespace journal
{ template <typename
> class Replay
; }
39 template <typename ImageCtxT
= ImageCtx
>
40 class Journal
: public RefCountedObject
{
48 * UNINITIALIZED ---> INITIALIZING ---> REPLAYING ------> FLUSHING ---> READY
51 * | * . | (error) * . . . . . . . * |
54 * | * . | FLUSHING_RESTART . * |
58 * | * . | RESTARTING < * * * * * STOPPING
61 * | * * * * * * . \-------------/ . |
63 * | * . . . . . . . . . . . . . . . . |
66 * | CLOSED <----- CLOSING <---------------------------------------/
77 STATE_FLUSHING_RESTART
,
78 STATE_RESTARTING_REPLAY
,
79 STATE_FLUSHING_REPLAY
,
86 static const std::string IMAGE_CLIENT_ID
;
87 static const std::string LOCAL_MIRROR_UUID
;
88 static const std::string ORPHAN_MIRROR_UUID
;
90 Journal(ImageCtxT
&image_ctx
);
93 static void get_work_queue(CephContext
*cct
, ContextWQ
**work_queue
);
95 static bool is_journal_supported(ImageCtxT
&image_ctx
);
96 static int create(librados::IoCtx
&io_ctx
, const std::string
&image_id
,
97 uint8_t order
, uint8_t splay_width
,
98 const std::string
&object_pool
);
99 static int remove(librados::IoCtx
&io_ctx
, const std::string
&image_id
);
100 static int reset(librados::IoCtx
&io_ctx
, const std::string
&image_id
);
102 static void is_tag_owner(ImageCtxT
*image_ctx
, bool *is_tag_owner
,
104 static void is_tag_owner(librados::IoCtx
& io_ctx
, std::string
& image_id
,
105 bool *is_tag_owner
, asio::ContextWQ
*op_work_queue
,
107 static void get_tag_owner(librados::IoCtx
& io_ctx
, std::string
& image_id
,
108 std::string
*mirror_uuid
,
109 asio::ContextWQ
*op_work_queue
, Context
*on_finish
);
110 static int request_resync(ImageCtxT
*image_ctx
);
111 static void promote(ImageCtxT
*image_ctx
, Context
*on_finish
);
112 static void demote(ImageCtxT
*image_ctx
, Context
*on_finish
);
114 bool is_journal_ready() const;
115 bool is_journal_replaying() const;
116 bool is_journal_appending() const;
118 void wait_for_journal_ready(Context
*on_ready
);
120 void open(Context
*on_finish
);
121 void close(Context
*on_finish
);
123 bool is_tag_owner() const;
124 uint64_t get_tag_tid() const;
125 journal::TagData
get_tag_data() const;
127 void allocate_local_tag(Context
*on_finish
);
128 void allocate_tag(const std::string
&mirror_uuid
,
129 const journal::TagPredecessor
&predecessor
,
132 void flush_commit_position(Context
*on_finish
);
136 uint64_t append_write_event(uint64_t offset
, size_t length
,
137 const bufferlist
&bl
,
139 uint64_t append_compare_and_write_event(uint64_t offset
,
141 const bufferlist
&cmp_bl
,
142 const bufferlist
&write_bl
,
144 uint64_t append_io_event(journal::EventEntry
&&event_entry
,
145 uint64_t offset
, size_t length
,
146 bool flush_entry
, int filter_ret_val
);
147 void commit_io_event(uint64_t tid
, int r
);
148 void commit_io_event_extent(uint64_t tid
, uint64_t offset
, uint64_t length
,
151 void append_op_event(uint64_t op_tid
, journal::EventEntry
&&event_entry
,
153 void commit_op_event(uint64_t tid
, int r
, Context
*on_safe
);
154 void replay_op_ready(uint64_t op_tid
, Context
*on_resume
);
156 void flush_event(uint64_t tid
, Context
*on_safe
);
157 void wait_event(uint64_t tid
, Context
*on_safe
);
159 uint64_t allocate_op_tid() {
160 uint64_t op_tid
= ++m_op_tid
;
161 ceph_assert(op_tid
!= 0);
165 void start_external_replay(journal::Replay
<ImageCtxT
> **journal_replay
,
167 void stop_external_replay();
169 void add_listener(journal::Listener
*listener
);
170 void remove_listener(journal::Listener
*listener
);
172 int is_resync_requested(bool *do_resync
);
174 inline ContextWQ
*get_work_queue() {
179 ImageCtxT
&m_image_ctx
;
181 // mock unit testing support
182 typedef journal::TypeTraits
<ImageCtxT
> TypeTraits
;
183 typedef typename
TypeTraits::Journaler Journaler
;
184 typedef typename
TypeTraits::Future Future
;
185 typedef typename
TypeTraits::ReplayEntry ReplayEntry
;
187 typedef std::list
<bufferlist
> Bufferlists
;
188 typedef std::list
<Context
*> Contexts
;
189 typedef std::list
<Future
> Futures
;
190 typedef interval_set
<uint64_t> ExtentInterval
;
194 Contexts on_safe_contexts
;
195 ExtentInterval pending_extents
;
196 int filter_ret_val
= 0;
197 bool committed_io
= false;
203 Event(const Futures
&_futures
, uint64_t offset
, size_t length
,
205 : futures(_futures
), filter_ret_val(filter_ret_val
) {
207 pending_extents
.insert(offset
, length
);
212 typedef std::unordered_map
<uint64_t, Event
> Events
;
213 typedef std::unordered_map
<uint64_t, Future
> TidToFutures
;
215 struct C_IOEventSafe
: public Context
{
219 C_IOEventSafe(Journal
*_journal
, uint64_t _tid
)
220 : journal(_journal
), tid(_tid
) {
223 void finish(int r
) override
{
224 journal
->handle_io_event_safe(r
, tid
);
228 struct C_OpEventSafe
: public Context
{
231 Future op_start_future
;
232 Future op_finish_future
;
235 C_OpEventSafe(Journal
*journal
, uint64_t tid
, const Future
&op_start_future
,
236 const Future
&op_finish_future
, Context
*on_safe
)
237 : journal(journal
), tid(tid
), op_start_future(op_start_future
),
238 op_finish_future(op_finish_future
), on_safe(on_safe
) {
241 void finish(int r
) override
{
242 journal
->handle_op_event_safe(r
, tid
, op_start_future
, op_finish_future
,
247 struct C_ReplayProcessSafe
: public Context
{
249 ReplayEntry replay_entry
;
251 C_ReplayProcessSafe(Journal
*journal
, ReplayEntry
&&replay_entry
) :
252 journal(journal
), replay_entry(std::move(replay_entry
)) {
254 void finish(int r
) override
{
255 journal
->handle_replay_process_safe(replay_entry
, r
);
259 struct ReplayHandler
: public ::journal::ReplayHandler
{
261 ReplayHandler(Journal
*_journal
) : journal(_journal
) {
264 void handle_entries_available() override
{
265 journal
->handle_replay_ready();
267 void handle_complete(int r
) override
{
268 journal
->handle_replay_complete(r
);
272 ContextWQ
*m_work_queue
= nullptr;
273 SafeTimer
*m_timer
= nullptr;
274 ceph::mutex
*m_timer_lock
= nullptr;
276 Journaler
*m_journaler
;
277 mutable ceph::mutex m_lock
= ceph::make_mutex("Journal<I>::m_lock");
279 uint64_t m_max_append_size
= 0;
280 uint64_t m_tag_class
= 0;
281 uint64_t m_tag_tid
= 0;
282 journal::ImageClientMeta m_client_meta
;
283 journal::TagData m_tag_data
;
286 Contexts m_wait_for_state_contexts
;
288 ReplayHandler m_replay_handler
;
289 bool m_close_pending
;
291 ceph::mutex m_event_lock
= ceph::make_mutex("Journal<I>::m_event_lock");
292 uint64_t m_event_tid
;
295 std::atomic
<bool> m_user_flushed
= false;
297 std::atomic
<uint64_t> m_op_tid
= { 0 };
298 TidToFutures m_op_futures
;
300 bool m_processing_entry
= false;
301 bool m_blocking_writes
;
303 journal::Replay
<ImageCtxT
> *m_journal_replay
;
305 AsyncOpTracker m_async_journal_op_tracker
;
307 struct MetadataListener
: public ::journal::JournalMetadataListener
{
308 Journal
<ImageCtxT
> *journal
;
310 MetadataListener(Journal
<ImageCtxT
> *journal
) : journal(journal
) { }
312 void handle_update(::journal::JournalMetadata
*) override
;
313 } m_metadata_listener
;
315 typedef std::set
<journal::Listener
*> Listeners
;
316 Listeners m_listeners
;
317 ceph::condition_variable m_listener_cond
;
318 bool m_listener_notify
= false;
320 uint64_t m_refresh_sequence
= 0;
322 bool is_journal_replaying(const ceph::mutex
&) const;
323 bool is_tag_owner(const ceph::mutex
&) const;
325 uint64_t append_io_events(journal::EventType event_type
,
326 const Bufferlists
&bufferlists
,
327 uint64_t offset
, size_t length
, bool flush_entry
,
329 Future
wait_event(ceph::mutex
&lock
, uint64_t tid
, Context
*on_safe
);
331 void create_journaler();
332 void destroy_journaler(int r
);
333 void recreate_journaler(int r
);
335 void complete_event(typename
Events::iterator it
, int r
);
339 void handle_open(int r
);
341 void handle_replay_ready();
342 void handle_replay_complete(int r
);
343 void handle_replay_process_ready(int r
);
344 void handle_replay_process_safe(ReplayEntry replay_entry
, int r
);
346 void handle_start_external_replay(int r
,
347 journal::Replay
<ImageCtxT
> **journal_replay
,
350 void handle_flushing_restart(int r
);
351 void handle_flushing_replay();
353 void handle_recording_stopped(int r
);
355 void handle_journal_destroyed(int r
);
357 void handle_io_event_safe(int r
, uint64_t tid
);
358 void handle_op_event_safe(int r
, uint64_t tid
, const Future
&op_start_future
,
359 const Future
&op_finish_future
, Context
*on_safe
);
361 void stop_recording();
363 void transition_state(State state
, int r
);
365 bool is_steady_state() const;
366 void wait_for_steady_state(Context
*on_state
);
368 int check_resync_requested(bool *do_resync
);
370 void handle_metadata_updated();
371 void handle_refresh_metadata(uint64_t refresh_sequence
, uint64_t tag_tid
,
372 journal::TagData tag_data
, int r
);
376 } // namespace librbd
378 extern template class librbd::Journal
<librbd::ImageCtx
>;
380 #endif // CEPH_LIBRBD_JOURNAL_H