]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #ifndef CEPH_LIBRBD_JOURNAL_H | |
5 | #define CEPH_LIBRBD_JOURNAL_H | |
6 | ||
7 | #include "include/int_types.h" | |
8 | #include "include/Context.h" | |
9 | #include "include/interval_set.h" | |
11fdf7f2 | 10 | #include "include/rados/librados_fwd.hpp" |
9f95a23c | 11 | #include "common/AsyncOpTracker.h" |
7c673cae | 12 | #include "common/Cond.h" |
9f95a23c | 13 | #include "common/RefCountedObj.h" |
7c673cae FG |
14 | #include "common/WorkQueue.h" |
15 | #include "journal/Future.h" | |
16 | #include "journal/JournalMetadataListener.h" | |
17 | #include "journal/ReplayEntry.h" | |
18 | #include "journal/ReplayHandler.h" | |
19 | #include "librbd/Utils.h" | |
20 | #include "librbd/journal/Types.h" | |
21 | #include "librbd/journal/TypeTraits.h" | |
22 | ||
23 | #include <algorithm> | |
24 | #include <list> | |
25 | #include <string> | |
26 | #include <atomic> | |
27 | #include <unordered_map> | |
28 | ||
29 | class SafeTimer; | |
30 | namespace journal { | |
31 | class Journaler; | |
32 | } | |
7c673cae FG |
33 | |
34 | namespace librbd { | |
35 | ||
36 | class ImageCtx; | |
37 | ||
7c673cae FG |
38 | namespace journal { template <typename> class Replay; } |
39 | ||
40 | template <typename ImageCtxT = ImageCtx> | |
9f95a23c | 41 | class Journal : public RefCountedObject { |
7c673cae FG |
42 | public: |
43 | /** | |
44 | * @verbatim | |
45 | * | |
46 | * <start> | |
47 | * | | |
48 | * v | |
49 | * UNINITIALIZED ---> INITIALIZING ---> REPLAYING ------> FLUSHING ---> READY | |
50 | * | * . ^ * . * | | |
51 | * | * . | * . * | | |
52 | * | * . | (error) * . . . . . . . * | | |
53 | * | * . | * . * | | |
54 | * | * . | v . * | | |
55 | * | * . | FLUSHING_RESTART . * | | |
56 | * | * . | | . * | | |
57 | * | * . | | . * | | |
58 | * | * . | v . * v | |
59 | * | * . | RESTARTING < * * * * * STOPPING | |
60 | * | * . | | . | | |
61 | * | * . | | . | | |
62 | * | * * * * * * . \-------------/ . | | |
63 | * | * (error) . . | | |
64 | * | * . . . . . . . . . . . . . . . . | | |
65 | * | * . . | | |
66 | * | v v v | | |
67 | * | CLOSED <----- CLOSING <---------------------------------------/ | |
68 | * | | | |
69 | * | v | |
70 | * \---> <finish> | |
71 | * | |
72 | * @endverbatim | |
73 | */ | |
74 | enum State { | |
75 | STATE_UNINITIALIZED, | |
76 | STATE_INITIALIZING, | |
77 | STATE_REPLAYING, | |
78 | STATE_FLUSHING_RESTART, | |
79 | STATE_RESTARTING_REPLAY, | |
80 | STATE_FLUSHING_REPLAY, | |
81 | STATE_READY, | |
82 | STATE_STOPPING, | |
83 | STATE_CLOSING, | |
84 | STATE_CLOSED | |
85 | }; | |
86 | ||
87 | static const std::string IMAGE_CLIENT_ID; | |
88 | static const std::string LOCAL_MIRROR_UUID; | |
89 | static const std::string ORPHAN_MIRROR_UUID; | |
90 | ||
7c673cae FG |
91 | Journal(ImageCtxT &image_ctx); |
92 | ~Journal(); | |
93 | ||
94 | static bool is_journal_supported(ImageCtxT &image_ctx); | |
95 | static int create(librados::IoCtx &io_ctx, const std::string &image_id, | |
96 | uint8_t order, uint8_t splay_width, | |
97 | const std::string &object_pool); | |
98 | static int remove(librados::IoCtx &io_ctx, const std::string &image_id); | |
99 | static int reset(librados::IoCtx &io_ctx, const std::string &image_id); | |
100 | ||
101 | static void is_tag_owner(ImageCtxT *image_ctx, bool *is_tag_owner, | |
102 | Context *on_finish); | |
103 | static void is_tag_owner(librados::IoCtx& io_ctx, std::string& image_id, | |
104 | bool *is_tag_owner, ContextWQ *op_work_queue, | |
105 | Context *on_finish); | |
106 | static void get_tag_owner(librados::IoCtx& io_ctx, std::string& image_id, | |
107 | std::string *mirror_uuid, | |
108 | ContextWQ *op_work_queue, Context *on_finish); | |
109 | static int request_resync(ImageCtxT *image_ctx); | |
110 | static void promote(ImageCtxT *image_ctx, Context *on_finish); | |
111 | static void demote(ImageCtxT *image_ctx, Context *on_finish); | |
112 | ||
113 | bool is_journal_ready() const; | |
114 | bool is_journal_replaying() const; | |
115 | bool is_journal_appending() const; | |
116 | ||
117 | void wait_for_journal_ready(Context *on_ready); | |
118 | ||
119 | void open(Context *on_finish); | |
120 | void close(Context *on_finish); | |
121 | ||
122 | bool is_tag_owner() const; | |
123 | uint64_t get_tag_tid() const; | |
124 | journal::TagData get_tag_data() const; | |
125 | ||
126 | void allocate_local_tag(Context *on_finish); | |
127 | void allocate_tag(const std::string &mirror_uuid, | |
128 | const journal::TagPredecessor &predecessor, | |
129 | Context *on_finish); | |
130 | ||
131 | void flush_commit_position(Context *on_finish); | |
132 | ||
494da23a TL |
133 | void user_flushed(); |
134 | ||
7c673cae FG |
135 | uint64_t append_write_event(uint64_t offset, size_t length, |
136 | const bufferlist &bl, | |
7c673cae FG |
137 | bool flush_entry); |
138 | uint64_t append_io_event(journal::EventEntry &&event_entry, | |
7c673cae | 139 | uint64_t offset, size_t length, |
b32b8144 | 140 | bool flush_entry, int filter_ret_val); |
7c673cae FG |
141 | void commit_io_event(uint64_t tid, int r); |
142 | void commit_io_event_extent(uint64_t tid, uint64_t offset, uint64_t length, | |
143 | int r); | |
144 | ||
145 | void append_op_event(uint64_t op_tid, journal::EventEntry &&event_entry, | |
146 | Context *on_safe); | |
147 | void commit_op_event(uint64_t tid, int r, Context *on_safe); | |
148 | void replay_op_ready(uint64_t op_tid, Context *on_resume); | |
149 | ||
150 | void flush_event(uint64_t tid, Context *on_safe); | |
151 | void wait_event(uint64_t tid, Context *on_safe); | |
152 | ||
153 | uint64_t allocate_op_tid() { | |
154 | uint64_t op_tid = ++m_op_tid; | |
11fdf7f2 | 155 | ceph_assert(op_tid != 0); |
7c673cae FG |
156 | return op_tid; |
157 | } | |
158 | ||
159 | void start_external_replay(journal::Replay<ImageCtxT> **journal_replay, | |
160 | Context *on_start); | |
161 | void stop_external_replay(); | |
162 | ||
163 | void add_listener(journal::Listener *listener); | |
164 | void remove_listener(journal::Listener *listener); | |
165 | ||
166 | int is_resync_requested(bool *do_resync); | |
167 | ||
168 | inline ContextWQ *get_work_queue() { | |
169 | return m_work_queue; | |
170 | } | |
171 | ||
172 | private: | |
173 | ImageCtxT &m_image_ctx; | |
174 | ||
175 | // mock unit testing support | |
176 | typedef journal::TypeTraits<ImageCtxT> TypeTraits; | |
177 | typedef typename TypeTraits::Journaler Journaler; | |
178 | typedef typename TypeTraits::Future Future; | |
179 | typedef typename TypeTraits::ReplayEntry ReplayEntry; | |
180 | ||
181 | typedef std::list<bufferlist> Bufferlists; | |
182 | typedef std::list<Context *> Contexts; | |
183 | typedef std::list<Future> Futures; | |
184 | typedef interval_set<uint64_t> ExtentInterval; | |
185 | ||
186 | struct Event { | |
187 | Futures futures; | |
7c673cae FG |
188 | Contexts on_safe_contexts; |
189 | ExtentInterval pending_extents; | |
b32b8144 | 190 | int filter_ret_val = 0; |
7c673cae FG |
191 | bool committed_io = false; |
192 | bool safe = false; | |
193 | int ret_val = 0; | |
194 | ||
195 | Event() { | |
196 | } | |
11fdf7f2 TL |
197 | Event(const Futures &_futures, uint64_t offset, size_t length, |
198 | int filter_ret_val) | |
199 | : futures(_futures), filter_ret_val(filter_ret_val) { | |
7c673cae FG |
200 | if (length > 0) { |
201 | pending_extents.insert(offset, length); | |
202 | } | |
203 | } | |
204 | }; | |
205 | ||
206 | typedef std::unordered_map<uint64_t, Event> Events; | |
207 | typedef std::unordered_map<uint64_t, Future> TidToFutures; | |
208 | ||
209 | struct C_IOEventSafe : public Context { | |
210 | Journal *journal; | |
211 | uint64_t tid; | |
212 | ||
213 | C_IOEventSafe(Journal *_journal, uint64_t _tid) | |
214 | : journal(_journal), tid(_tid) { | |
215 | } | |
216 | ||
217 | void finish(int r) override { | |
218 | journal->handle_io_event_safe(r, tid); | |
219 | } | |
220 | }; | |
221 | ||
222 | struct C_OpEventSafe : public Context { | |
223 | Journal *journal; | |
224 | uint64_t tid; | |
225 | Future op_start_future; | |
226 | Future op_finish_future; | |
227 | Context *on_safe; | |
228 | ||
229 | C_OpEventSafe(Journal *journal, uint64_t tid, const Future &op_start_future, | |
230 | const Future &op_finish_future, Context *on_safe) | |
231 | : journal(journal), tid(tid), op_start_future(op_start_future), | |
232 | op_finish_future(op_finish_future), on_safe(on_safe) { | |
233 | } | |
234 | ||
235 | void finish(int r) override { | |
236 | journal->handle_op_event_safe(r, tid, op_start_future, op_finish_future, | |
237 | on_safe); | |
238 | } | |
239 | }; | |
240 | ||
241 | struct C_ReplayProcessSafe : public Context { | |
242 | Journal *journal; | |
243 | ReplayEntry replay_entry; | |
244 | ||
245 | C_ReplayProcessSafe(Journal *journal, ReplayEntry &&replay_entry) : | |
246 | journal(journal), replay_entry(std::move(replay_entry)) { | |
247 | } | |
248 | void finish(int r) override { | |
249 | journal->handle_replay_process_safe(replay_entry, r); | |
250 | } | |
251 | }; | |
252 | ||
253 | struct ReplayHandler : public ::journal::ReplayHandler { | |
254 | Journal *journal; | |
255 | ReplayHandler(Journal *_journal) : journal(_journal) { | |
256 | } | |
257 | ||
7c673cae FG |
258 | void handle_entries_available() override { |
259 | journal->handle_replay_ready(); | |
260 | } | |
261 | void handle_complete(int r) override { | |
262 | journal->handle_replay_complete(r); | |
263 | } | |
264 | }; | |
265 | ||
266 | ContextWQ *m_work_queue = nullptr; | |
267 | SafeTimer *m_timer = nullptr; | |
9f95a23c | 268 | ceph::mutex *m_timer_lock = nullptr; |
7c673cae FG |
269 | |
270 | Journaler *m_journaler; | |
9f95a23c | 271 | mutable ceph::mutex m_lock = ceph::make_mutex("Journal<I>::m_lock"); |
7c673cae FG |
272 | State m_state; |
273 | uint64_t m_max_append_size = 0; | |
274 | uint64_t m_tag_class = 0; | |
275 | uint64_t m_tag_tid = 0; | |
276 | journal::ImageClientMeta m_client_meta; | |
277 | journal::TagData m_tag_data; | |
278 | ||
279 | int m_error_result; | |
280 | Contexts m_wait_for_state_contexts; | |
281 | ||
282 | ReplayHandler m_replay_handler; | |
283 | bool m_close_pending; | |
284 | ||
9f95a23c | 285 | ceph::mutex m_event_lock = ceph::make_mutex("Journal<I>::m_event_lock"); |
7c673cae FG |
286 | uint64_t m_event_tid; |
287 | Events m_events; | |
288 | ||
494da23a TL |
289 | std::atomic<bool> m_user_flushed = false; |
290 | ||
7c673cae FG |
291 | std::atomic<uint64_t> m_op_tid = { 0 }; |
292 | TidToFutures m_op_futures; | |
293 | ||
294 | bool m_processing_entry = false; | |
295 | bool m_blocking_writes; | |
296 | ||
297 | journal::Replay<ImageCtxT> *m_journal_replay; | |
298 | ||
9f95a23c | 299 | AsyncOpTracker m_async_journal_op_tracker; |
7c673cae FG |
300 | |
301 | struct MetadataListener : public ::journal::JournalMetadataListener { | |
302 | Journal<ImageCtxT> *journal; | |
303 | ||
304 | MetadataListener(Journal<ImageCtxT> *journal) : journal(journal) { } | |
305 | ||
306 | void handle_update(::journal::JournalMetadata *) override { | |
9f95a23c | 307 | auto ctx = new LambdaContext([this](int r) { |
7c673cae FG |
308 | journal->handle_metadata_updated(); |
309 | }); | |
310 | journal->m_work_queue->queue(ctx, 0); | |
311 | } | |
312 | } m_metadata_listener; | |
313 | ||
314 | typedef std::set<journal::Listener *> Listeners; | |
315 | Listeners m_listeners; | |
9f95a23c | 316 | ceph::condition_variable m_listener_cond; |
7c673cae FG |
317 | bool m_listener_notify = false; |
318 | ||
319 | uint64_t m_refresh_sequence = 0; | |
320 | ||
9f95a23c TL |
321 | bool is_journal_replaying(const ceph::mutex &) const; |
322 | bool is_tag_owner(const ceph::mutex &) const; | |
7c673cae FG |
323 | |
324 | uint64_t append_io_events(journal::EventType event_type, | |
325 | const Bufferlists &bufferlists, | |
b32b8144 FG |
326 | uint64_t offset, size_t length, bool flush_entry, |
327 | int filter_ret_val); | |
9f95a23c | 328 | Future wait_event(ceph::mutex &lock, uint64_t tid, Context *on_safe); |
7c673cae FG |
329 | |
330 | void create_journaler(); | |
331 | void destroy_journaler(int r); | |
332 | void recreate_journaler(int r); | |
333 | ||
334 | void complete_event(typename Events::iterator it, int r); | |
335 | ||
336 | void start_append(); | |
337 | ||
338 | void handle_open(int r); | |
339 | ||
340 | void handle_replay_ready(); | |
341 | void handle_replay_complete(int r); | |
342 | void handle_replay_process_ready(int r); | |
343 | void handle_replay_process_safe(ReplayEntry replay_entry, int r); | |
344 | ||
345 | void handle_start_external_replay(int r, | |
346 | journal::Replay<ImageCtxT> **journal_replay, | |
347 | Context *on_finish); | |
348 | ||
349 | void handle_flushing_restart(int r); | |
350 | void handle_flushing_replay(); | |
351 | ||
352 | void handle_recording_stopped(int r); | |
353 | ||
354 | void handle_journal_destroyed(int r); | |
355 | ||
356 | void handle_io_event_safe(int r, uint64_t tid); | |
357 | void handle_op_event_safe(int r, uint64_t tid, const Future &op_start_future, | |
358 | const Future &op_finish_future, Context *on_safe); | |
359 | ||
360 | void stop_recording(); | |
361 | ||
362 | void transition_state(State state, int r); | |
363 | ||
364 | bool is_steady_state() const; | |
365 | void wait_for_steady_state(Context *on_state); | |
366 | ||
367 | int check_resync_requested(bool *do_resync); | |
368 | ||
369 | void handle_metadata_updated(); | |
370 | void handle_refresh_metadata(uint64_t refresh_sequence, uint64_t tag_tid, | |
371 | journal::TagData tag_data, int r); | |
372 | ||
373 | }; | |
374 | ||
375 | } // namespace librbd | |
376 | ||
377 | extern template class librbd::Journal<librbd::ImageCtx>; | |
378 | ||
379 | #endif // CEPH_LIBRBD_JOURNAL_H |