]> git.proxmox.com Git - ceph.git/blame - ceph/src/librbd/Journal.h
import 15.2.0 Octopus source
[ceph.git] / ceph / src / librbd / Journal.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#ifndef CEPH_LIBRBD_JOURNAL_H
5#define CEPH_LIBRBD_JOURNAL_H
6
7#include "include/int_types.h"
8#include "include/Context.h"
9#include "include/interval_set.h"
11fdf7f2 10#include "include/rados/librados_fwd.hpp"
9f95a23c 11#include "common/AsyncOpTracker.h"
7c673cae 12#include "common/Cond.h"
9f95a23c 13#include "common/RefCountedObj.h"
7c673cae
FG
14#include "common/WorkQueue.h"
15#include "journal/Future.h"
16#include "journal/JournalMetadataListener.h"
17#include "journal/ReplayEntry.h"
18#include "journal/ReplayHandler.h"
19#include "librbd/Utils.h"
20#include "librbd/journal/Types.h"
21#include "librbd/journal/TypeTraits.h"
22
23#include <algorithm>
24#include <list>
25#include <string>
26#include <atomic>
27#include <unordered_map>
28
29class SafeTimer;
30namespace journal {
31class Journaler;
32}
7c673cae
FG
33
34namespace librbd {
35
36class ImageCtx;
37
7c673cae
FG
38namespace journal { template <typename> class Replay; }
39
40template <typename ImageCtxT = ImageCtx>
9f95a23c 41class Journal : public RefCountedObject {
7c673cae
FG
42public:
43 /**
44 * @verbatim
45 *
46 * <start>
47 * |
48 * v
49 * UNINITIALIZED ---> INITIALIZING ---> REPLAYING ------> FLUSHING ---> READY
50 * | * . ^ * . * |
51 * | * . | * . * |
52 * | * . | (error) * . . . . . . . * |
53 * | * . | * . * |
54 * | * . | v . * |
55 * | * . | FLUSHING_RESTART . * |
56 * | * . | | . * |
57 * | * . | | . * |
58 * | * . | v . * v
59 * | * . | RESTARTING < * * * * * STOPPING
60 * | * . | | . |
61 * | * . | | . |
62 * | * * * * * * . \-------------/ . |
63 * | * (error) . . |
64 * | * . . . . . . . . . . . . . . . . |
65 * | * . . |
66 * | v v v |
67 * | CLOSED <----- CLOSING <---------------------------------------/
68 * | |
69 * | v
70 * \---> <finish>
71 *
72 * @endverbatim
73 */
74 enum State {
75 STATE_UNINITIALIZED,
76 STATE_INITIALIZING,
77 STATE_REPLAYING,
78 STATE_FLUSHING_RESTART,
79 STATE_RESTARTING_REPLAY,
80 STATE_FLUSHING_REPLAY,
81 STATE_READY,
82 STATE_STOPPING,
83 STATE_CLOSING,
84 STATE_CLOSED
85 };
86
87 static const std::string IMAGE_CLIENT_ID;
88 static const std::string LOCAL_MIRROR_UUID;
89 static const std::string ORPHAN_MIRROR_UUID;
90
7c673cae
FG
91 Journal(ImageCtxT &image_ctx);
92 ~Journal();
93
94 static bool is_journal_supported(ImageCtxT &image_ctx);
95 static int create(librados::IoCtx &io_ctx, const std::string &image_id,
96 uint8_t order, uint8_t splay_width,
97 const std::string &object_pool);
98 static int remove(librados::IoCtx &io_ctx, const std::string &image_id);
99 static int reset(librados::IoCtx &io_ctx, const std::string &image_id);
100
101 static void is_tag_owner(ImageCtxT *image_ctx, bool *is_tag_owner,
102 Context *on_finish);
103 static void is_tag_owner(librados::IoCtx& io_ctx, std::string& image_id,
104 bool *is_tag_owner, ContextWQ *op_work_queue,
105 Context *on_finish);
106 static void get_tag_owner(librados::IoCtx& io_ctx, std::string& image_id,
107 std::string *mirror_uuid,
108 ContextWQ *op_work_queue, Context *on_finish);
109 static int request_resync(ImageCtxT *image_ctx);
110 static void promote(ImageCtxT *image_ctx, Context *on_finish);
111 static void demote(ImageCtxT *image_ctx, Context *on_finish);
112
113 bool is_journal_ready() const;
114 bool is_journal_replaying() const;
115 bool is_journal_appending() const;
116
117 void wait_for_journal_ready(Context *on_ready);
118
119 void open(Context *on_finish);
120 void close(Context *on_finish);
121
122 bool is_tag_owner() const;
123 uint64_t get_tag_tid() const;
124 journal::TagData get_tag_data() const;
125
126 void allocate_local_tag(Context *on_finish);
127 void allocate_tag(const std::string &mirror_uuid,
128 const journal::TagPredecessor &predecessor,
129 Context *on_finish);
130
131 void flush_commit_position(Context *on_finish);
132
494da23a
TL
133 void user_flushed();
134
7c673cae
FG
135 uint64_t append_write_event(uint64_t offset, size_t length,
136 const bufferlist &bl,
7c673cae
FG
137 bool flush_entry);
138 uint64_t append_io_event(journal::EventEntry &&event_entry,
7c673cae 139 uint64_t offset, size_t length,
b32b8144 140 bool flush_entry, int filter_ret_val);
7c673cae
FG
141 void commit_io_event(uint64_t tid, int r);
142 void commit_io_event_extent(uint64_t tid, uint64_t offset, uint64_t length,
143 int r);
144
145 void append_op_event(uint64_t op_tid, journal::EventEntry &&event_entry,
146 Context *on_safe);
147 void commit_op_event(uint64_t tid, int r, Context *on_safe);
148 void replay_op_ready(uint64_t op_tid, Context *on_resume);
149
150 void flush_event(uint64_t tid, Context *on_safe);
151 void wait_event(uint64_t tid, Context *on_safe);
152
153 uint64_t allocate_op_tid() {
154 uint64_t op_tid = ++m_op_tid;
11fdf7f2 155 ceph_assert(op_tid != 0);
7c673cae
FG
156 return op_tid;
157 }
158
159 void start_external_replay(journal::Replay<ImageCtxT> **journal_replay,
160 Context *on_start);
161 void stop_external_replay();
162
163 void add_listener(journal::Listener *listener);
164 void remove_listener(journal::Listener *listener);
165
166 int is_resync_requested(bool *do_resync);
167
168 inline ContextWQ *get_work_queue() {
169 return m_work_queue;
170 }
171
172private:
173 ImageCtxT &m_image_ctx;
174
175 // mock unit testing support
176 typedef journal::TypeTraits<ImageCtxT> TypeTraits;
177 typedef typename TypeTraits::Journaler Journaler;
178 typedef typename TypeTraits::Future Future;
179 typedef typename TypeTraits::ReplayEntry ReplayEntry;
180
181 typedef std::list<bufferlist> Bufferlists;
182 typedef std::list<Context *> Contexts;
183 typedef std::list<Future> Futures;
184 typedef interval_set<uint64_t> ExtentInterval;
185
186 struct Event {
187 Futures futures;
7c673cae
FG
188 Contexts on_safe_contexts;
189 ExtentInterval pending_extents;
b32b8144 190 int filter_ret_val = 0;
7c673cae
FG
191 bool committed_io = false;
192 bool safe = false;
193 int ret_val = 0;
194
195 Event() {
196 }
11fdf7f2
TL
197 Event(const Futures &_futures, uint64_t offset, size_t length,
198 int filter_ret_val)
199 : futures(_futures), filter_ret_val(filter_ret_val) {
7c673cae
FG
200 if (length > 0) {
201 pending_extents.insert(offset, length);
202 }
203 }
204 };
205
206 typedef std::unordered_map<uint64_t, Event> Events;
207 typedef std::unordered_map<uint64_t, Future> TidToFutures;
208
209 struct C_IOEventSafe : public Context {
210 Journal *journal;
211 uint64_t tid;
212
213 C_IOEventSafe(Journal *_journal, uint64_t _tid)
214 : journal(_journal), tid(_tid) {
215 }
216
217 void finish(int r) override {
218 journal->handle_io_event_safe(r, tid);
219 }
220 };
221
222 struct C_OpEventSafe : public Context {
223 Journal *journal;
224 uint64_t tid;
225 Future op_start_future;
226 Future op_finish_future;
227 Context *on_safe;
228
229 C_OpEventSafe(Journal *journal, uint64_t tid, const Future &op_start_future,
230 const Future &op_finish_future, Context *on_safe)
231 : journal(journal), tid(tid), op_start_future(op_start_future),
232 op_finish_future(op_finish_future), on_safe(on_safe) {
233 }
234
235 void finish(int r) override {
236 journal->handle_op_event_safe(r, tid, op_start_future, op_finish_future,
237 on_safe);
238 }
239 };
240
241 struct C_ReplayProcessSafe : public Context {
242 Journal *journal;
243 ReplayEntry replay_entry;
244
245 C_ReplayProcessSafe(Journal *journal, ReplayEntry &&replay_entry) :
246 journal(journal), replay_entry(std::move(replay_entry)) {
247 }
248 void finish(int r) override {
249 journal->handle_replay_process_safe(replay_entry, r);
250 }
251 };
252
253 struct ReplayHandler : public ::journal::ReplayHandler {
254 Journal *journal;
255 ReplayHandler(Journal *_journal) : journal(_journal) {
256 }
257
7c673cae
FG
258 void handle_entries_available() override {
259 journal->handle_replay_ready();
260 }
261 void handle_complete(int r) override {
262 journal->handle_replay_complete(r);
263 }
264 };
265
266 ContextWQ *m_work_queue = nullptr;
267 SafeTimer *m_timer = nullptr;
9f95a23c 268 ceph::mutex *m_timer_lock = nullptr;
7c673cae
FG
269
270 Journaler *m_journaler;
9f95a23c 271 mutable ceph::mutex m_lock = ceph::make_mutex("Journal<I>::m_lock");
7c673cae
FG
272 State m_state;
273 uint64_t m_max_append_size = 0;
274 uint64_t m_tag_class = 0;
275 uint64_t m_tag_tid = 0;
276 journal::ImageClientMeta m_client_meta;
277 journal::TagData m_tag_data;
278
279 int m_error_result;
280 Contexts m_wait_for_state_contexts;
281
282 ReplayHandler m_replay_handler;
283 bool m_close_pending;
284
9f95a23c 285 ceph::mutex m_event_lock = ceph::make_mutex("Journal<I>::m_event_lock");
7c673cae
FG
286 uint64_t m_event_tid;
287 Events m_events;
288
494da23a
TL
289 std::atomic<bool> m_user_flushed = false;
290
7c673cae
FG
291 std::atomic<uint64_t> m_op_tid = { 0 };
292 TidToFutures m_op_futures;
293
294 bool m_processing_entry = false;
295 bool m_blocking_writes;
296
297 journal::Replay<ImageCtxT> *m_journal_replay;
298
9f95a23c 299 AsyncOpTracker m_async_journal_op_tracker;
7c673cae
FG
300
301 struct MetadataListener : public ::journal::JournalMetadataListener {
302 Journal<ImageCtxT> *journal;
303
304 MetadataListener(Journal<ImageCtxT> *journal) : journal(journal) { }
305
306 void handle_update(::journal::JournalMetadata *) override {
9f95a23c 307 auto ctx = new LambdaContext([this](int r) {
7c673cae
FG
308 journal->handle_metadata_updated();
309 });
310 journal->m_work_queue->queue(ctx, 0);
311 }
312 } m_metadata_listener;
313
314 typedef std::set<journal::Listener *> Listeners;
315 Listeners m_listeners;
9f95a23c 316 ceph::condition_variable m_listener_cond;
7c673cae
FG
317 bool m_listener_notify = false;
318
319 uint64_t m_refresh_sequence = 0;
320
9f95a23c
TL
321 bool is_journal_replaying(const ceph::mutex &) const;
322 bool is_tag_owner(const ceph::mutex &) const;
7c673cae
FG
323
324 uint64_t append_io_events(journal::EventType event_type,
325 const Bufferlists &bufferlists,
b32b8144
FG
326 uint64_t offset, size_t length, bool flush_entry,
327 int filter_ret_val);
9f95a23c 328 Future wait_event(ceph::mutex &lock, uint64_t tid, Context *on_safe);
7c673cae
FG
329
330 void create_journaler();
331 void destroy_journaler(int r);
332 void recreate_journaler(int r);
333
334 void complete_event(typename Events::iterator it, int r);
335
336 void start_append();
337
338 void handle_open(int r);
339
340 void handle_replay_ready();
341 void handle_replay_complete(int r);
342 void handle_replay_process_ready(int r);
343 void handle_replay_process_safe(ReplayEntry replay_entry, int r);
344
345 void handle_start_external_replay(int r,
346 journal::Replay<ImageCtxT> **journal_replay,
347 Context *on_finish);
348
349 void handle_flushing_restart(int r);
350 void handle_flushing_replay();
351
352 void handle_recording_stopped(int r);
353
354 void handle_journal_destroyed(int r);
355
356 void handle_io_event_safe(int r, uint64_t tid);
357 void handle_op_event_safe(int r, uint64_t tid, const Future &op_start_future,
358 const Future &op_finish_future, Context *on_safe);
359
360 void stop_recording();
361
362 void transition_state(State state, int r);
363
364 bool is_steady_state() const;
365 void wait_for_steady_state(Context *on_state);
366
367 int check_resync_requested(bool *do_resync);
368
369 void handle_metadata_updated();
370 void handle_refresh_metadata(uint64_t refresh_sequence, uint64_t tag_tid,
371 journal::TagData tag_data, int r);
372
373};
374
375} // namespace librbd
376
377extern template class librbd::Journal<librbd::ImageCtx>;
378
379#endif // CEPH_LIBRBD_JOURNAL_H