]>
Commit | Line | Data |
---|---|---|
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- | |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #ifndef CEPH_LIBRBD_JOURNAL_H | |
5 | #define CEPH_LIBRBD_JOURNAL_H | |
6 | ||
7 | #include "include/int_types.h" | |
8 | #include "include/Context.h" | |
9 | #include "include/interval_set.h" | |
10 | #include "common/Cond.h" | |
11 | #include "common/Mutex.h" | |
12 | #include "common/Cond.h" | |
13 | #include "common/WorkQueue.h" | |
14 | #include "journal/Future.h" | |
15 | #include "journal/JournalMetadataListener.h" | |
16 | #include "journal/ReplayEntry.h" | |
17 | #include "journal/ReplayHandler.h" | |
18 | #include "librbd/Utils.h" | |
19 | #include "librbd/journal/Types.h" | |
20 | #include "librbd/journal/TypeTraits.h" | |
21 | ||
22 | #include <algorithm> | |
23 | #include <list> | |
24 | #include <string> | |
25 | #include <atomic> | |
26 | #include <unordered_map> | |
27 | ||
28 | class SafeTimer; | |
29 | namespace journal { | |
30 | class Journaler; | |
31 | } | |
32 | namespace librados { | |
33 | class IoCtx; | |
34 | } | |
35 | ||
36 | namespace librbd { | |
37 | ||
38 | class ImageCtx; | |
39 | ||
40 | namespace io { struct ObjectRequestHandle; } | |
41 | namespace journal { template <typename> class Replay; } | |
42 | ||
43 | template <typename ImageCtxT = ImageCtx> | |
44 | class Journal { | |
45 | public: | |
46 | /** | |
47 | * @verbatim | |
48 | * | |
49 | * <start> | |
50 | * | | |
51 | * v | |
52 | * UNINITIALIZED ---> INITIALIZING ---> REPLAYING ------> FLUSHING ---> READY | |
53 | * | * . ^ * . * | | |
54 | * | * . | * . * | | |
55 | * | * . | (error) * . . . . . . . * | | |
56 | * | * . | * . * | | |
57 | * | * . | v . * | | |
58 | * | * . | FLUSHING_RESTART . * | | |
59 | * | * . | | . * | | |
60 | * | * . | | . * | | |
61 | * | * . | v . * v | |
62 | * | * . | RESTARTING < * * * * * STOPPING | |
63 | * | * . | | . | | |
64 | * | * . | | . | | |
65 | * | * * * * * * . \-------------/ . | | |
66 | * | * (error) . . | | |
67 | * | * . . . . . . . . . . . . . . . . | | |
68 | * | * . . | | |
69 | * | v v v | | |
70 | * | CLOSED <----- CLOSING <---------------------------------------/ | |
71 | * | | | |
72 | * | v | |
73 | * \---> <finish> | |
74 | * | |
75 | * @endverbatim | |
76 | */ | |
77 | enum State { | |
78 | STATE_UNINITIALIZED, | |
79 | STATE_INITIALIZING, | |
80 | STATE_REPLAYING, | |
81 | STATE_FLUSHING_RESTART, | |
82 | STATE_RESTARTING_REPLAY, | |
83 | STATE_FLUSHING_REPLAY, | |
84 | STATE_READY, | |
85 | STATE_STOPPING, | |
86 | STATE_CLOSING, | |
87 | STATE_CLOSED | |
88 | }; | |
89 | ||
90 | static const std::string IMAGE_CLIENT_ID; | |
91 | static const std::string LOCAL_MIRROR_UUID; | |
92 | static const std::string ORPHAN_MIRROR_UUID; | |
93 | ||
94 | typedef std::list<io::ObjectRequestHandle *> IOObjectRequests; | |
95 | ||
96 | Journal(ImageCtxT &image_ctx); | |
97 | ~Journal(); | |
98 | ||
99 | static bool is_journal_supported(ImageCtxT &image_ctx); | |
100 | static int create(librados::IoCtx &io_ctx, const std::string &image_id, | |
101 | uint8_t order, uint8_t splay_width, | |
102 | const std::string &object_pool); | |
103 | static int remove(librados::IoCtx &io_ctx, const std::string &image_id); | |
104 | static int reset(librados::IoCtx &io_ctx, const std::string &image_id); | |
105 | ||
106 | static void is_tag_owner(ImageCtxT *image_ctx, bool *is_tag_owner, | |
107 | Context *on_finish); | |
108 | static void is_tag_owner(librados::IoCtx& io_ctx, std::string& image_id, | |
109 | bool *is_tag_owner, ContextWQ *op_work_queue, | |
110 | Context *on_finish); | |
111 | static void get_tag_owner(librados::IoCtx& io_ctx, std::string& image_id, | |
112 | std::string *mirror_uuid, | |
113 | ContextWQ *op_work_queue, Context *on_finish); | |
114 | static int request_resync(ImageCtxT *image_ctx); | |
115 | static void promote(ImageCtxT *image_ctx, Context *on_finish); | |
116 | static void demote(ImageCtxT *image_ctx, Context *on_finish); | |
117 | ||
118 | bool is_journal_ready() const; | |
119 | bool is_journal_replaying() const; | |
120 | bool is_journal_appending() const; | |
121 | ||
122 | void wait_for_journal_ready(Context *on_ready); | |
123 | ||
124 | void open(Context *on_finish); | |
125 | void close(Context *on_finish); | |
126 | ||
127 | bool is_tag_owner() const; | |
128 | uint64_t get_tag_tid() const; | |
129 | journal::TagData get_tag_data() const; | |
130 | ||
131 | void allocate_local_tag(Context *on_finish); | |
132 | void allocate_tag(const std::string &mirror_uuid, | |
133 | const journal::TagPredecessor &predecessor, | |
134 | Context *on_finish); | |
135 | ||
136 | void flush_commit_position(Context *on_finish); | |
137 | ||
138 | uint64_t append_write_event(uint64_t offset, size_t length, | |
139 | const bufferlist &bl, | |
140 | const IOObjectRequests &requests, | |
141 | bool flush_entry); | |
142 | uint64_t append_io_event(journal::EventEntry &&event_entry, | |
143 | const IOObjectRequests &requests, | |
144 | uint64_t offset, size_t length, | |
145 | bool flush_entry, int filter_ret_val); | |
146 | void commit_io_event(uint64_t tid, int r); | |
147 | void commit_io_event_extent(uint64_t tid, uint64_t offset, uint64_t length, | |
148 | int r); | |
149 | ||
150 | void append_op_event(uint64_t op_tid, journal::EventEntry &&event_entry, | |
151 | Context *on_safe); | |
152 | void commit_op_event(uint64_t tid, int r, Context *on_safe); | |
153 | void replay_op_ready(uint64_t op_tid, Context *on_resume); | |
154 | ||
155 | void flush_event(uint64_t tid, Context *on_safe); | |
156 | void wait_event(uint64_t tid, Context *on_safe); | |
157 | ||
158 | uint64_t allocate_op_tid() { | |
159 | uint64_t op_tid = ++m_op_tid; | |
160 | assert(op_tid != 0); | |
161 | return op_tid; | |
162 | } | |
163 | ||
164 | void start_external_replay(journal::Replay<ImageCtxT> **journal_replay, | |
165 | Context *on_start); | |
166 | void stop_external_replay(); | |
167 | ||
168 | void add_listener(journal::Listener *listener); | |
169 | void remove_listener(journal::Listener *listener); | |
170 | ||
171 | int is_resync_requested(bool *do_resync); | |
172 | ||
173 | inline ContextWQ *get_work_queue() { | |
174 | return m_work_queue; | |
175 | } | |
176 | ||
177 | private: | |
178 | ImageCtxT &m_image_ctx; | |
179 | ||
180 | // mock unit testing support | |
181 | typedef journal::TypeTraits<ImageCtxT> TypeTraits; | |
182 | typedef typename TypeTraits::Journaler Journaler; | |
183 | typedef typename TypeTraits::Future Future; | |
184 | typedef typename TypeTraits::ReplayEntry ReplayEntry; | |
185 | ||
186 | typedef std::list<bufferlist> Bufferlists; | |
187 | typedef std::list<Context *> Contexts; | |
188 | typedef std::list<Future> Futures; | |
189 | typedef interval_set<uint64_t> ExtentInterval; | |
190 | ||
191 | struct Event { | |
192 | Futures futures; | |
193 | IOObjectRequests aio_object_requests; | |
194 | Contexts on_safe_contexts; | |
195 | ExtentInterval pending_extents; | |
196 | int filter_ret_val = 0; | |
197 | bool committed_io = false; | |
198 | bool safe = false; | |
199 | int ret_val = 0; | |
200 | ||
201 | Event() { | |
202 | } | |
203 | Event(const Futures &_futures, const IOObjectRequests &_requests, | |
204 | uint64_t offset, size_t length, int filter_ret_val) | |
205 | : futures(_futures), aio_object_requests(_requests), | |
206 | filter_ret_val(filter_ret_val) { | |
207 | if (length > 0) { | |
208 | pending_extents.insert(offset, length); | |
209 | } | |
210 | } | |
211 | }; | |
212 | ||
213 | typedef std::unordered_map<uint64_t, Event> Events; | |
214 | typedef std::unordered_map<uint64_t, Future> TidToFutures; | |
215 | ||
216 | struct C_IOEventSafe : public Context { | |
217 | Journal *journal; | |
218 | uint64_t tid; | |
219 | ||
220 | C_IOEventSafe(Journal *_journal, uint64_t _tid) | |
221 | : journal(_journal), tid(_tid) { | |
222 | } | |
223 | ||
224 | void finish(int r) override { | |
225 | journal->handle_io_event_safe(r, tid); | |
226 | } | |
227 | }; | |
228 | ||
229 | struct C_OpEventSafe : public Context { | |
230 | Journal *journal; | |
231 | uint64_t tid; | |
232 | Future op_start_future; | |
233 | Future op_finish_future; | |
234 | Context *on_safe; | |
235 | ||
236 | C_OpEventSafe(Journal *journal, uint64_t tid, const Future &op_start_future, | |
237 | const Future &op_finish_future, Context *on_safe) | |
238 | : journal(journal), tid(tid), op_start_future(op_start_future), | |
239 | op_finish_future(op_finish_future), on_safe(on_safe) { | |
240 | } | |
241 | ||
242 | void finish(int r) override { | |
243 | journal->handle_op_event_safe(r, tid, op_start_future, op_finish_future, | |
244 | on_safe); | |
245 | } | |
246 | }; | |
247 | ||
248 | struct C_ReplayProcessSafe : public Context { | |
249 | Journal *journal; | |
250 | ReplayEntry replay_entry; | |
251 | ||
252 | C_ReplayProcessSafe(Journal *journal, ReplayEntry &&replay_entry) : | |
253 | journal(journal), replay_entry(std::move(replay_entry)) { | |
254 | } | |
255 | void finish(int r) override { | |
256 | journal->handle_replay_process_safe(replay_entry, r); | |
257 | } | |
258 | }; | |
259 | ||
260 | struct ReplayHandler : public ::journal::ReplayHandler { | |
261 | Journal *journal; | |
262 | ReplayHandler(Journal *_journal) : journal(_journal) { | |
263 | } | |
264 | ||
265 | void get() override { | |
266 | // TODO | |
267 | } | |
268 | void put() override { | |
269 | // TODO | |
270 | } | |
271 | ||
272 | void handle_entries_available() override { | |
273 | journal->handle_replay_ready(); | |
274 | } | |
275 | void handle_complete(int r) override { | |
276 | journal->handle_replay_complete(r); | |
277 | } | |
278 | }; | |
279 | ||
280 | ContextWQ *m_work_queue = nullptr; | |
281 | SafeTimer *m_timer = nullptr; | |
282 | Mutex *m_timer_lock = nullptr; | |
283 | ||
284 | Journaler *m_journaler; | |
285 | mutable Mutex m_lock; | |
286 | State m_state; | |
287 | uint64_t m_max_append_size = 0; | |
288 | uint64_t m_tag_class = 0; | |
289 | uint64_t m_tag_tid = 0; | |
290 | journal::ImageClientMeta m_client_meta; | |
291 | journal::TagData m_tag_data; | |
292 | ||
293 | int m_error_result; | |
294 | Contexts m_wait_for_state_contexts; | |
295 | ||
296 | ReplayHandler m_replay_handler; | |
297 | bool m_close_pending; | |
298 | ||
299 | Mutex m_event_lock; | |
300 | uint64_t m_event_tid; | |
301 | Events m_events; | |
302 | ||
303 | std::atomic<uint64_t> m_op_tid = { 0 }; | |
304 | TidToFutures m_op_futures; | |
305 | ||
306 | bool m_processing_entry = false; | |
307 | bool m_blocking_writes; | |
308 | ||
309 | journal::Replay<ImageCtxT> *m_journal_replay; | |
310 | ||
311 | util::AsyncOpTracker m_async_journal_op_tracker; | |
312 | ||
313 | struct MetadataListener : public ::journal::JournalMetadataListener { | |
314 | Journal<ImageCtxT> *journal; | |
315 | ||
316 | MetadataListener(Journal<ImageCtxT> *journal) : journal(journal) { } | |
317 | ||
318 | void handle_update(::journal::JournalMetadata *) override { | |
319 | FunctionContext *ctx = new FunctionContext([this](int r) { | |
320 | journal->handle_metadata_updated(); | |
321 | }); | |
322 | journal->m_work_queue->queue(ctx, 0); | |
323 | } | |
324 | } m_metadata_listener; | |
325 | ||
326 | typedef std::set<journal::Listener *> Listeners; | |
327 | Listeners m_listeners; | |
328 | Cond m_listener_cond; | |
329 | bool m_listener_notify = false; | |
330 | ||
331 | uint64_t m_refresh_sequence = 0; | |
332 | ||
333 | bool is_journal_replaying(const Mutex &) const; | |
334 | bool is_tag_owner(const Mutex &) const; | |
335 | ||
336 | uint64_t append_io_events(journal::EventType event_type, | |
337 | const Bufferlists &bufferlists, | |
338 | const IOObjectRequests &requests, | |
339 | uint64_t offset, size_t length, bool flush_entry, | |
340 | int filter_ret_val); | |
341 | Future wait_event(Mutex &lock, uint64_t tid, Context *on_safe); | |
342 | ||
343 | void create_journaler(); | |
344 | void destroy_journaler(int r); | |
345 | void recreate_journaler(int r); | |
346 | ||
347 | void complete_event(typename Events::iterator it, int r); | |
348 | ||
349 | void start_append(); | |
350 | ||
351 | void handle_open(int r); | |
352 | ||
353 | void handle_replay_ready(); | |
354 | void handle_replay_complete(int r); | |
355 | void handle_replay_process_ready(int r); | |
356 | void handle_replay_process_safe(ReplayEntry replay_entry, int r); | |
357 | ||
358 | void handle_start_external_replay(int r, | |
359 | journal::Replay<ImageCtxT> **journal_replay, | |
360 | Context *on_finish); | |
361 | ||
362 | void handle_flushing_restart(int r); | |
363 | void handle_flushing_replay(); | |
364 | ||
365 | void handle_recording_stopped(int r); | |
366 | ||
367 | void handle_journal_destroyed(int r); | |
368 | ||
369 | void handle_io_event_safe(int r, uint64_t tid); | |
370 | void handle_op_event_safe(int r, uint64_t tid, const Future &op_start_future, | |
371 | const Future &op_finish_future, Context *on_safe); | |
372 | ||
373 | void stop_recording(); | |
374 | ||
375 | void transition_state(State state, int r); | |
376 | ||
377 | bool is_steady_state() const; | |
378 | void wait_for_steady_state(Context *on_state); | |
379 | ||
380 | int check_resync_requested(bool *do_resync); | |
381 | ||
382 | void handle_metadata_updated(); | |
383 | void handle_refresh_metadata(uint64_t refresh_sequence, uint64_t tag_tid, | |
384 | journal::TagData tag_data, int r); | |
385 | ||
386 | }; | |
387 | ||
388 | } // namespace librbd | |
389 | ||
390 | extern template class librbd::Journal<librbd::ImageCtx>; | |
391 | ||
392 | #endif // CEPH_LIBRBD_JOURNAL_H |