]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #ifndef CEPH_LIBRBD_JOURNAL_H | |
5 | #define CEPH_LIBRBD_JOURNAL_H | |
6 | ||
7 | #include "include/int_types.h" | |
8 | #include "include/Context.h" | |
9 | #include "include/interval_set.h" | |
11fdf7f2 | 10 | #include "include/rados/librados_fwd.hpp" |
9f95a23c | 11 | #include "common/AsyncOpTracker.h" |
7c673cae | 12 | #include "common/Cond.h" |
a4b75251 | 13 | #include "common/Timer.h" |
9f95a23c | 14 | #include "common/RefCountedObj.h" |
7c673cae FG |
15 | #include "journal/Future.h" |
16 | #include "journal/JournalMetadataListener.h" | |
17 | #include "journal/ReplayEntry.h" | |
18 | #include "journal/ReplayHandler.h" | |
19 | #include "librbd/Utils.h" | |
f67539c2 | 20 | #include "librbd/asio/ContextWQ.h" |
7c673cae FG |
21 | #include "librbd/journal/Types.h" |
22 | #include "librbd/journal/TypeTraits.h" | |
23 | ||
24 | #include <algorithm> | |
25 | #include <list> | |
26 | #include <string> | |
27 | #include <atomic> | |
28 | #include <unordered_map> | |
29 | ||
f67539c2 | 30 | class ContextWQ; |
f67539c2 | 31 | namespace journal { class Journaler; } |
7c673cae FG |
32 | |
33 | namespace librbd { | |
34 | ||
35 | class ImageCtx; | |
36 | ||
7c673cae FG |
37 | namespace journal { template <typename> class Replay; } |
38 | ||
39 | template <typename ImageCtxT = ImageCtx> | |
9f95a23c | 40 | class Journal : public RefCountedObject { |
7c673cae FG |
41 | public: |
42 | /** | |
43 | * @verbatim | |
44 | * | |
45 | * <start> | |
46 | * | | |
47 | * v | |
48 | * UNINITIALIZED ---> INITIALIZING ---> REPLAYING ------> FLUSHING ---> READY | |
49 | * | * . ^ * . * | | |
50 | * | * . | * . * | | |
51 | * | * . | (error) * . . . . . . . * | | |
52 | * | * . | * . * | | |
53 | * | * . | v . * | | |
54 | * | * . | FLUSHING_RESTART . * | | |
55 | * | * . | | . * | | |
56 | * | * . | | . * | | |
57 | * | * . | v . * v | |
58 | * | * . | RESTARTING < * * * * * STOPPING | |
59 | * | * . | | . | | |
60 | * | * . | | . | | |
61 | * | * * * * * * . \-------------/ . | | |
62 | * | * (error) . . | | |
63 | * | * . . . . . . . . . . . . . . . . | | |
64 | * | * . . | | |
65 | * | v v v | | |
66 | * | CLOSED <----- CLOSING <---------------------------------------/ | |
67 | * | | | |
68 | * | v | |
69 | * \---> <finish> | |
70 | * | |
71 | * @endverbatim | |
72 | */ | |
73 | enum State { | |
74 | STATE_UNINITIALIZED, | |
75 | STATE_INITIALIZING, | |
76 | STATE_REPLAYING, | |
77 | STATE_FLUSHING_RESTART, | |
78 | STATE_RESTARTING_REPLAY, | |
79 | STATE_FLUSHING_REPLAY, | |
80 | STATE_READY, | |
81 | STATE_STOPPING, | |
82 | STATE_CLOSING, | |
83 | STATE_CLOSED | |
84 | }; | |
85 | ||
86 | static const std::string IMAGE_CLIENT_ID; | |
87 | static const std::string LOCAL_MIRROR_UUID; | |
88 | static const std::string ORPHAN_MIRROR_UUID; | |
89 | ||
7c673cae FG |
90 | Journal(ImageCtxT &image_ctx); |
91 | ~Journal(); | |
92 | ||
f67539c2 TL |
93 | static void get_work_queue(CephContext *cct, ContextWQ **work_queue); |
94 | ||
7c673cae FG |
95 | static bool is_journal_supported(ImageCtxT &image_ctx); |
96 | static int create(librados::IoCtx &io_ctx, const std::string &image_id, | |
97 | uint8_t order, uint8_t splay_width, | |
98 | const std::string &object_pool); | |
99 | static int remove(librados::IoCtx &io_ctx, const std::string &image_id); | |
100 | static int reset(librados::IoCtx &io_ctx, const std::string &image_id); | |
101 | ||
102 | static void is_tag_owner(ImageCtxT *image_ctx, bool *is_tag_owner, | |
103 | Context *on_finish); | |
104 | static void is_tag_owner(librados::IoCtx& io_ctx, std::string& image_id, | |
f67539c2 | 105 | bool *is_tag_owner, asio::ContextWQ *op_work_queue, |
7c673cae FG |
106 | Context *on_finish); |
107 | static void get_tag_owner(librados::IoCtx& io_ctx, std::string& image_id, | |
108 | std::string *mirror_uuid, | |
f67539c2 | 109 | asio::ContextWQ *op_work_queue, Context *on_finish); |
7c673cae FG |
110 | static int request_resync(ImageCtxT *image_ctx); |
111 | static void promote(ImageCtxT *image_ctx, Context *on_finish); | |
112 | static void demote(ImageCtxT *image_ctx, Context *on_finish); | |
113 | ||
114 | bool is_journal_ready() const; | |
115 | bool is_journal_replaying() const; | |
116 | bool is_journal_appending() const; | |
117 | ||
118 | void wait_for_journal_ready(Context *on_ready); | |
119 | ||
120 | void open(Context *on_finish); | |
121 | void close(Context *on_finish); | |
122 | ||
123 | bool is_tag_owner() const; | |
124 | uint64_t get_tag_tid() const; | |
125 | journal::TagData get_tag_data() const; | |
126 | ||
127 | void allocate_local_tag(Context *on_finish); | |
128 | void allocate_tag(const std::string &mirror_uuid, | |
129 | const journal::TagPredecessor &predecessor, | |
130 | Context *on_finish); | |
131 | ||
132 | void flush_commit_position(Context *on_finish); | |
133 | ||
494da23a TL |
134 | void user_flushed(); |
135 | ||
7c673cae FG |
136 | uint64_t append_write_event(uint64_t offset, size_t length, |
137 | const bufferlist &bl, | |
7c673cae FG |
138 | bool flush_entry); |
139 | uint64_t append_io_event(journal::EventEntry &&event_entry, | |
7c673cae | 140 | uint64_t offset, size_t length, |
b32b8144 | 141 | bool flush_entry, int filter_ret_val); |
7c673cae FG |
142 | void commit_io_event(uint64_t tid, int r); |
143 | void commit_io_event_extent(uint64_t tid, uint64_t offset, uint64_t length, | |
144 | int r); | |
145 | ||
146 | void append_op_event(uint64_t op_tid, journal::EventEntry &&event_entry, | |
147 | Context *on_safe); | |
148 | void commit_op_event(uint64_t tid, int r, Context *on_safe); | |
149 | void replay_op_ready(uint64_t op_tid, Context *on_resume); | |
150 | ||
151 | void flush_event(uint64_t tid, Context *on_safe); | |
152 | void wait_event(uint64_t tid, Context *on_safe); | |
153 | ||
154 | uint64_t allocate_op_tid() { | |
155 | uint64_t op_tid = ++m_op_tid; | |
11fdf7f2 | 156 | ceph_assert(op_tid != 0); |
7c673cae FG |
157 | return op_tid; |
158 | } | |
159 | ||
160 | void start_external_replay(journal::Replay<ImageCtxT> **journal_replay, | |
161 | Context *on_start); | |
162 | void stop_external_replay(); | |
163 | ||
164 | void add_listener(journal::Listener *listener); | |
165 | void remove_listener(journal::Listener *listener); | |
166 | ||
167 | int is_resync_requested(bool *do_resync); | |
168 | ||
169 | inline ContextWQ *get_work_queue() { | |
170 | return m_work_queue; | |
171 | } | |
172 | ||
173 | private: | |
174 | ImageCtxT &m_image_ctx; | |
175 | ||
176 | // mock unit testing support | |
177 | typedef journal::TypeTraits<ImageCtxT> TypeTraits; | |
178 | typedef typename TypeTraits::Journaler Journaler; | |
179 | typedef typename TypeTraits::Future Future; | |
180 | typedef typename TypeTraits::ReplayEntry ReplayEntry; | |
181 | ||
182 | typedef std::list<bufferlist> Bufferlists; | |
183 | typedef std::list<Context *> Contexts; | |
184 | typedef std::list<Future> Futures; | |
185 | typedef interval_set<uint64_t> ExtentInterval; | |
186 | ||
187 | struct Event { | |
188 | Futures futures; | |
7c673cae FG |
189 | Contexts on_safe_contexts; |
190 | ExtentInterval pending_extents; | |
b32b8144 | 191 | int filter_ret_val = 0; |
7c673cae FG |
192 | bool committed_io = false; |
193 | bool safe = false; | |
194 | int ret_val = 0; | |
195 | ||
196 | Event() { | |
197 | } | |
11fdf7f2 TL |
198 | Event(const Futures &_futures, uint64_t offset, size_t length, |
199 | int filter_ret_val) | |
200 | : futures(_futures), filter_ret_val(filter_ret_val) { | |
7c673cae FG |
201 | if (length > 0) { |
202 | pending_extents.insert(offset, length); | |
203 | } | |
204 | } | |
205 | }; | |
206 | ||
207 | typedef std::unordered_map<uint64_t, Event> Events; | |
208 | typedef std::unordered_map<uint64_t, Future> TidToFutures; | |
209 | ||
210 | struct C_IOEventSafe : public Context { | |
211 | Journal *journal; | |
212 | uint64_t tid; | |
213 | ||
214 | C_IOEventSafe(Journal *_journal, uint64_t _tid) | |
215 | : journal(_journal), tid(_tid) { | |
216 | } | |
217 | ||
218 | void finish(int r) override { | |
219 | journal->handle_io_event_safe(r, tid); | |
220 | } | |
221 | }; | |
222 | ||
223 | struct C_OpEventSafe : public Context { | |
224 | Journal *journal; | |
225 | uint64_t tid; | |
226 | Future op_start_future; | |
227 | Future op_finish_future; | |
228 | Context *on_safe; | |
229 | ||
230 | C_OpEventSafe(Journal *journal, uint64_t tid, const Future &op_start_future, | |
231 | const Future &op_finish_future, Context *on_safe) | |
232 | : journal(journal), tid(tid), op_start_future(op_start_future), | |
233 | op_finish_future(op_finish_future), on_safe(on_safe) { | |
234 | } | |
235 | ||
236 | void finish(int r) override { | |
237 | journal->handle_op_event_safe(r, tid, op_start_future, op_finish_future, | |
238 | on_safe); | |
239 | } | |
240 | }; | |
241 | ||
242 | struct C_ReplayProcessSafe : public Context { | |
243 | Journal *journal; | |
244 | ReplayEntry replay_entry; | |
245 | ||
246 | C_ReplayProcessSafe(Journal *journal, ReplayEntry &&replay_entry) : | |
247 | journal(journal), replay_entry(std::move(replay_entry)) { | |
248 | } | |
249 | void finish(int r) override { | |
250 | journal->handle_replay_process_safe(replay_entry, r); | |
251 | } | |
252 | }; | |
253 | ||
254 | struct ReplayHandler : public ::journal::ReplayHandler { | |
255 | Journal *journal; | |
256 | ReplayHandler(Journal *_journal) : journal(_journal) { | |
257 | } | |
258 | ||
7c673cae FG |
259 | void handle_entries_available() override { |
260 | journal->handle_replay_ready(); | |
261 | } | |
262 | void handle_complete(int r) override { | |
263 | journal->handle_replay_complete(r); | |
264 | } | |
265 | }; | |
266 | ||
267 | ContextWQ *m_work_queue = nullptr; | |
268 | SafeTimer *m_timer = nullptr; | |
9f95a23c | 269 | ceph::mutex *m_timer_lock = nullptr; |
7c673cae FG |
270 | |
271 | Journaler *m_journaler; | |
9f95a23c | 272 | mutable ceph::mutex m_lock = ceph::make_mutex("Journal<I>::m_lock"); |
7c673cae FG |
273 | State m_state; |
274 | uint64_t m_max_append_size = 0; | |
275 | uint64_t m_tag_class = 0; | |
276 | uint64_t m_tag_tid = 0; | |
277 | journal::ImageClientMeta m_client_meta; | |
278 | journal::TagData m_tag_data; | |
279 | ||
280 | int m_error_result; | |
281 | Contexts m_wait_for_state_contexts; | |
282 | ||
283 | ReplayHandler m_replay_handler; | |
284 | bool m_close_pending; | |
285 | ||
9f95a23c | 286 | ceph::mutex m_event_lock = ceph::make_mutex("Journal<I>::m_event_lock"); |
7c673cae FG |
287 | uint64_t m_event_tid; |
288 | Events m_events; | |
289 | ||
494da23a TL |
290 | std::atomic<bool> m_user_flushed = false; |
291 | ||
7c673cae FG |
292 | std::atomic<uint64_t> m_op_tid = { 0 }; |
293 | TidToFutures m_op_futures; | |
294 | ||
295 | bool m_processing_entry = false; | |
296 | bool m_blocking_writes; | |
297 | ||
298 | journal::Replay<ImageCtxT> *m_journal_replay; | |
299 | ||
9f95a23c | 300 | AsyncOpTracker m_async_journal_op_tracker; |
7c673cae FG |
301 | |
302 | struct MetadataListener : public ::journal::JournalMetadataListener { | |
303 | Journal<ImageCtxT> *journal; | |
304 | ||
305 | MetadataListener(Journal<ImageCtxT> *journal) : journal(journal) { } | |
306 | ||
f67539c2 | 307 | void handle_update(::journal::JournalMetadata *) override; |
7c673cae FG |
308 | } m_metadata_listener; |
309 | ||
310 | typedef std::set<journal::Listener *> Listeners; | |
311 | Listeners m_listeners; | |
9f95a23c | 312 | ceph::condition_variable m_listener_cond; |
7c673cae FG |
313 | bool m_listener_notify = false; |
314 | ||
315 | uint64_t m_refresh_sequence = 0; | |
316 | ||
9f95a23c TL |
317 | bool is_journal_replaying(const ceph::mutex &) const; |
318 | bool is_tag_owner(const ceph::mutex &) const; | |
7c673cae FG |
319 | |
320 | uint64_t append_io_events(journal::EventType event_type, | |
321 | const Bufferlists &bufferlists, | |
b32b8144 FG |
322 | uint64_t offset, size_t length, bool flush_entry, |
323 | int filter_ret_val); | |
9f95a23c | 324 | Future wait_event(ceph::mutex &lock, uint64_t tid, Context *on_safe); |
7c673cae FG |
325 | |
326 | void create_journaler(); | |
327 | void destroy_journaler(int r); | |
328 | void recreate_journaler(int r); | |
329 | ||
330 | void complete_event(typename Events::iterator it, int r); | |
331 | ||
332 | void start_append(); | |
333 | ||
334 | void handle_open(int r); | |
335 | ||
336 | void handle_replay_ready(); | |
337 | void handle_replay_complete(int r); | |
338 | void handle_replay_process_ready(int r); | |
339 | void handle_replay_process_safe(ReplayEntry replay_entry, int r); | |
340 | ||
341 | void handle_start_external_replay(int r, | |
342 | journal::Replay<ImageCtxT> **journal_replay, | |
343 | Context *on_finish); | |
344 | ||
345 | void handle_flushing_restart(int r); | |
346 | void handle_flushing_replay(); | |
347 | ||
348 | void handle_recording_stopped(int r); | |
349 | ||
350 | void handle_journal_destroyed(int r); | |
351 | ||
352 | void handle_io_event_safe(int r, uint64_t tid); | |
353 | void handle_op_event_safe(int r, uint64_t tid, const Future &op_start_future, | |
354 | const Future &op_finish_future, Context *on_safe); | |
355 | ||
356 | void stop_recording(); | |
357 | ||
358 | void transition_state(State state, int r); | |
359 | ||
360 | bool is_steady_state() const; | |
361 | void wait_for_steady_state(Context *on_state); | |
362 | ||
363 | int check_resync_requested(bool *do_resync); | |
364 | ||
365 | void handle_metadata_updated(); | |
366 | void handle_refresh_metadata(uint64_t refresh_sequence, uint64_t tag_tid, | |
367 | journal::TagData tag_data, int r); | |
368 | ||
369 | }; | |
370 | ||
371 | } // namespace librbd | |
372 | ||
373 | extern template class librbd::Journal<librbd::ImageCtx>; | |
374 | ||
375 | #endif // CEPH_LIBRBD_JOURNAL_H |