]> git.proxmox.com Git - ceph.git/blob - ceph/src/librbd/Journal.h
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / librbd / Journal.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #ifndef CEPH_LIBRBD_JOURNAL_H
5 #define CEPH_LIBRBD_JOURNAL_H
6
7 #include "include/int_types.h"
8 #include "include/Context.h"
9 #include "include/interval_set.h"
10 #include "include/rados/librados_fwd.hpp"
11 #include "common/AsyncOpTracker.h"
12 #include "common/Cond.h"
13 #include "common/Timer.h"
14 #include "common/RefCountedObj.h"
15 #include "journal/Future.h"
16 #include "journal/JournalMetadataListener.h"
17 #include "journal/ReplayEntry.h"
18 #include "journal/ReplayHandler.h"
19 #include "librbd/Utils.h"
20 #include "librbd/asio/ContextWQ.h"
21 #include "librbd/journal/Types.h"
22 #include "librbd/journal/TypeTraits.h"
23
24 #include <algorithm>
25 #include <list>
26 #include <string>
27 #include <atomic>
28 #include <unordered_map>
29
30 class ContextWQ;
31 namespace journal { class Journaler; }
32
33 namespace librbd {
34
35 class ImageCtx;
36
37 namespace journal { template <typename> class Replay; }
38
39 template <typename ImageCtxT = ImageCtx>
40 class Journal : public RefCountedObject {
41 public:
42 /**
43 * @verbatim
44 *
45 * <start>
46 * |
47 * v
48 * UNINITIALIZED ---> INITIALIZING ---> REPLAYING ------> FLUSHING ---> READY
49 * | * . ^ * . * |
50 * | * . | * . * |
51 * | * . | (error) * . . . . . . . * |
52 * | * . | * . * |
53 * | * . | v . * |
54 * | * . | FLUSHING_RESTART . * |
55 * | * . | | . * |
56 * | * . | | . * |
57 * | * . | v . * v
58 * | * . | RESTARTING < * * * * * STOPPING
59 * | * . | | . |
60 * | * . | | . |
61 * | * * * * * * . \-------------/ . |
62 * | * (error) . . |
63 * | * . . . . . . . . . . . . . . . . |
64 * | * . . |
65 * | v v v |
66 * | CLOSED <----- CLOSING <---------------------------------------/
67 * | |
68 * | v
69 * \---> <finish>
70 *
71 * @endverbatim
72 */
73 enum State {
74 STATE_UNINITIALIZED,
75 STATE_INITIALIZING,
76 STATE_REPLAYING,
77 STATE_FLUSHING_RESTART,
78 STATE_RESTARTING_REPLAY,
79 STATE_FLUSHING_REPLAY,
80 STATE_READY,
81 STATE_STOPPING,
82 STATE_CLOSING,
83 STATE_CLOSED
84 };
85
86 static const std::string IMAGE_CLIENT_ID;
87 static const std::string LOCAL_MIRROR_UUID;
88 static const std::string ORPHAN_MIRROR_UUID;
89
90 Journal(ImageCtxT &image_ctx);
91 ~Journal();
92
93 static void get_work_queue(CephContext *cct, ContextWQ **work_queue);
94
95 static bool is_journal_supported(ImageCtxT &image_ctx);
96 static int create(librados::IoCtx &io_ctx, const std::string &image_id,
97 uint8_t order, uint8_t splay_width,
98 const std::string &object_pool);
99 static int remove(librados::IoCtx &io_ctx, const std::string &image_id);
100 static int reset(librados::IoCtx &io_ctx, const std::string &image_id);
101
102 static void is_tag_owner(ImageCtxT *image_ctx, bool *is_tag_owner,
103 Context *on_finish);
104 static void is_tag_owner(librados::IoCtx& io_ctx, std::string& image_id,
105 bool *is_tag_owner, asio::ContextWQ *op_work_queue,
106 Context *on_finish);
107 static void get_tag_owner(librados::IoCtx& io_ctx, std::string& image_id,
108 std::string *mirror_uuid,
109 asio::ContextWQ *op_work_queue, Context *on_finish);
110 static int request_resync(ImageCtxT *image_ctx);
111 static void promote(ImageCtxT *image_ctx, Context *on_finish);
112 static void demote(ImageCtxT *image_ctx, Context *on_finish);
113
114 bool is_journal_ready() const;
115 bool is_journal_replaying() const;
116 bool is_journal_appending() const;
117
118 void wait_for_journal_ready(Context *on_ready);
119
120 void open(Context *on_finish);
121 void close(Context *on_finish);
122
123 bool is_tag_owner() const;
124 uint64_t get_tag_tid() const;
125 journal::TagData get_tag_data() const;
126
127 void allocate_local_tag(Context *on_finish);
128 void allocate_tag(const std::string &mirror_uuid,
129 const journal::TagPredecessor &predecessor,
130 Context *on_finish);
131
132 void flush_commit_position(Context *on_finish);
133
134 void user_flushed();
135
136 uint64_t append_write_event(uint64_t offset, size_t length,
137 const bufferlist &bl,
138 bool flush_entry);
139 uint64_t append_compare_and_write_event(uint64_t offset,
140 size_t length,
141 const bufferlist &cmp_bl,
142 const bufferlist &write_bl,
143 bool flush_entry);
144 uint64_t append_io_event(journal::EventEntry &&event_entry,
145 uint64_t offset, size_t length,
146 bool flush_entry, int filter_ret_val);
147 void commit_io_event(uint64_t tid, int r);
148 void commit_io_event_extent(uint64_t tid, uint64_t offset, uint64_t length,
149 int r);
150
151 void append_op_event(uint64_t op_tid, journal::EventEntry &&event_entry,
152 Context *on_safe);
153 void commit_op_event(uint64_t tid, int r, Context *on_safe);
154 void replay_op_ready(uint64_t op_tid, Context *on_resume);
155
156 void flush_event(uint64_t tid, Context *on_safe);
157 void wait_event(uint64_t tid, Context *on_safe);
158
159 uint64_t allocate_op_tid() {
160 uint64_t op_tid = ++m_op_tid;
161 ceph_assert(op_tid != 0);
162 return op_tid;
163 }
164
165 void start_external_replay(journal::Replay<ImageCtxT> **journal_replay,
166 Context *on_start);
167 void stop_external_replay();
168
169 void add_listener(journal::Listener *listener);
170 void remove_listener(journal::Listener *listener);
171
172 int is_resync_requested(bool *do_resync);
173
174 inline ContextWQ *get_work_queue() {
175 return m_work_queue;
176 }
177
178 private:
179 ImageCtxT &m_image_ctx;
180
181 // mock unit testing support
182 typedef journal::TypeTraits<ImageCtxT> TypeTraits;
183 typedef typename TypeTraits::Journaler Journaler;
184 typedef typename TypeTraits::Future Future;
185 typedef typename TypeTraits::ReplayEntry ReplayEntry;
186
187 typedef std::list<bufferlist> Bufferlists;
188 typedef std::list<Context *> Contexts;
189 typedef std::list<Future> Futures;
190 typedef interval_set<uint64_t> ExtentInterval;
191
192 struct Event {
193 Futures futures;
194 Contexts on_safe_contexts;
195 ExtentInterval pending_extents;
196 int filter_ret_val = 0;
197 bool committed_io = false;
198 bool safe = false;
199 int ret_val = 0;
200
201 Event() {
202 }
203 Event(const Futures &_futures, uint64_t offset, size_t length,
204 int filter_ret_val)
205 : futures(_futures), filter_ret_val(filter_ret_val) {
206 if (length > 0) {
207 pending_extents.insert(offset, length);
208 }
209 }
210 };
211
212 typedef std::unordered_map<uint64_t, Event> Events;
213 typedef std::unordered_map<uint64_t, Future> TidToFutures;
214
215 struct C_IOEventSafe : public Context {
216 Journal *journal;
217 uint64_t tid;
218
219 C_IOEventSafe(Journal *_journal, uint64_t _tid)
220 : journal(_journal), tid(_tid) {
221 }
222
223 void finish(int r) override {
224 journal->handle_io_event_safe(r, tid);
225 }
226 };
227
228 struct C_OpEventSafe : public Context {
229 Journal *journal;
230 uint64_t tid;
231 Future op_start_future;
232 Future op_finish_future;
233 Context *on_safe;
234
235 C_OpEventSafe(Journal *journal, uint64_t tid, const Future &op_start_future,
236 const Future &op_finish_future, Context *on_safe)
237 : journal(journal), tid(tid), op_start_future(op_start_future),
238 op_finish_future(op_finish_future), on_safe(on_safe) {
239 }
240
241 void finish(int r) override {
242 journal->handle_op_event_safe(r, tid, op_start_future, op_finish_future,
243 on_safe);
244 }
245 };
246
247 struct C_ReplayProcessSafe : public Context {
248 Journal *journal;
249 ReplayEntry replay_entry;
250
251 C_ReplayProcessSafe(Journal *journal, ReplayEntry &&replay_entry) :
252 journal(journal), replay_entry(std::move(replay_entry)) {
253 }
254 void finish(int r) override {
255 journal->handle_replay_process_safe(replay_entry, r);
256 }
257 };
258
259 struct ReplayHandler : public ::journal::ReplayHandler {
260 Journal *journal;
261 ReplayHandler(Journal *_journal) : journal(_journal) {
262 }
263
264 void handle_entries_available() override {
265 journal->handle_replay_ready();
266 }
267 void handle_complete(int r) override {
268 journal->handle_replay_complete(r);
269 }
270 };
271
272 ContextWQ *m_work_queue = nullptr;
273 SafeTimer *m_timer = nullptr;
274 ceph::mutex *m_timer_lock = nullptr;
275
276 Journaler *m_journaler;
277 mutable ceph::mutex m_lock = ceph::make_mutex("Journal<I>::m_lock");
278 State m_state;
279 uint64_t m_max_append_size = 0;
280 uint64_t m_tag_class = 0;
281 uint64_t m_tag_tid = 0;
282 journal::ImageClientMeta m_client_meta;
283 journal::TagData m_tag_data;
284
285 int m_error_result;
286 Contexts m_wait_for_state_contexts;
287
288 ReplayHandler m_replay_handler;
289 bool m_close_pending;
290
291 ceph::mutex m_event_lock = ceph::make_mutex("Journal<I>::m_event_lock");
292 uint64_t m_event_tid;
293 Events m_events;
294
295 std::atomic<bool> m_user_flushed = false;
296
297 std::atomic<uint64_t> m_op_tid = { 0 };
298 TidToFutures m_op_futures;
299
300 bool m_processing_entry = false;
301 bool m_blocking_writes;
302
303 journal::Replay<ImageCtxT> *m_journal_replay;
304
305 AsyncOpTracker m_async_journal_op_tracker;
306
307 struct MetadataListener : public ::journal::JournalMetadataListener {
308 Journal<ImageCtxT> *journal;
309
310 MetadataListener(Journal<ImageCtxT> *journal) : journal(journal) { }
311
312 void handle_update(::journal::JournalMetadata *) override;
313 } m_metadata_listener;
314
315 typedef std::set<journal::Listener *> Listeners;
316 Listeners m_listeners;
317 ceph::condition_variable m_listener_cond;
318 bool m_listener_notify = false;
319
320 uint64_t m_refresh_sequence = 0;
321
322 bool is_journal_replaying(const ceph::mutex &) const;
323 bool is_tag_owner(const ceph::mutex &) const;
324
325 uint64_t append_io_events(journal::EventType event_type,
326 const Bufferlists &bufferlists,
327 uint64_t offset, size_t length, bool flush_entry,
328 int filter_ret_val);
329 Future wait_event(ceph::mutex &lock, uint64_t tid, Context *on_safe);
330
331 void create_journaler();
332 void destroy_journaler(int r);
333 void recreate_journaler(int r);
334
335 void complete_event(typename Events::iterator it, int r);
336
337 void start_append();
338
339 void handle_open(int r);
340
341 void handle_replay_ready();
342 void handle_replay_complete(int r);
343 void handle_replay_process_ready(int r);
344 void handle_replay_process_safe(ReplayEntry replay_entry, int r);
345
346 void handle_start_external_replay(int r,
347 journal::Replay<ImageCtxT> **journal_replay,
348 Context *on_finish);
349
350 void handle_flushing_restart(int r);
351 void handle_flushing_replay();
352
353 void handle_recording_stopped(int r);
354
355 void handle_journal_destroyed(int r);
356
357 void handle_io_event_safe(int r, uint64_t tid);
358 void handle_op_event_safe(int r, uint64_t tid, const Future &op_start_future,
359 const Future &op_finish_future, Context *on_safe);
360
361 void stop_recording();
362
363 void transition_state(State state, int r);
364
365 bool is_steady_state() const;
366 void wait_for_steady_state(Context *on_state);
367
368 int check_resync_requested(bool *do_resync);
369
370 void handle_metadata_updated();
371 void handle_refresh_metadata(uint64_t refresh_sequence, uint64_t tag_tid,
372 journal::TagData tag_data, int r);
373
374 };
375
376 } // namespace librbd
377
378 extern template class librbd::Journal<librbd::ImageCtx>;
379
380 #endif // CEPH_LIBRBD_JOURNAL_H