]> git.proxmox.com Git - ceph.git/blame - ceph/src/journal/JournalMetadata.h
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / journal / JournalMetadata.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#ifndef CEPH_JOURNAL_JOURNAL_METADATA_H
5#define CEPH_JOURNAL_JOURNAL_METADATA_H
6
7#include "include/int_types.h"
8#include "include/Context.h"
9#include "include/rados/librados.hpp"
10#include "common/AsyncOpTracker.h"
11#include "common/Cond.h"
12#include "common/Mutex.h"
13#include "common/RefCountedObj.h"
14#include "common/WorkQueue.h"
15#include "cls/journal/cls_journal_types.h"
16#include "journal/JournalMetadataListener.h"
17#include "journal/Settings.h"
18#include <boost/intrusive_ptr.hpp>
19#include <boost/noncopyable.hpp>
20#include <boost/optional.hpp>
21#include <functional>
22#include <list>
23#include <map>
24#include <string>
11fdf7f2 25#include "include/ceph_assert.h"
7c673cae
FG
26
27class SafeTimer;
28
29namespace journal {
30
31class JournalMetadata;
32typedef boost::intrusive_ptr<JournalMetadata> JournalMetadataPtr;
33
34class JournalMetadata : public RefCountedObject, boost::noncopyable {
35public:
36 typedef std::function<Context*()> CreateContext;
37 typedef cls::journal::ObjectPosition ObjectPosition;
38 typedef cls::journal::ObjectPositions ObjectPositions;
39 typedef cls::journal::ObjectSetPosition ObjectSetPosition;
40 typedef cls::journal::Client Client;
41 typedef cls::journal::Tag Tag;
42
43 typedef std::set<Client> RegisteredClients;
44 typedef std::list<Tag> Tags;
45
46 JournalMetadata(ContextWQ *work_queue, SafeTimer *timer, Mutex *timer_lock,
47 librados::IoCtx &ioctx, const std::string &oid,
48 const std::string &client_id, const Settings &settings);
49 ~JournalMetadata() override;
50
51 void init(Context *on_init);
52 void shut_down(Context *on_finish);
53
54 bool is_initialized() const { return m_initialized; }
55
56 void get_immutable_metadata(uint8_t *order, uint8_t *splay_width,
57 int64_t *pool_id, Context *on_finish);
58
59 void get_mutable_metadata(uint64_t *minimum_set, uint64_t *active_set,
60 RegisteredClients *clients, Context *on_finish);
61
62 void add_listener(JournalMetadataListener *listener);
63 void remove_listener(JournalMetadataListener *listener);
64
65 void register_client(const bufferlist &data, Context *on_finish);
66 void update_client(const bufferlist &data, Context *on_finish);
67 void unregister_client(Context *on_finish);
68 void get_client(const std::string &client_id, cls::journal::Client *client,
69 Context *on_finish);
70
71 void allocate_tag(uint64_t tag_class, const bufferlist &data,
72 Tag *tag, Context *on_finish);
73 void get_tag(uint64_t tag_tid, Tag *tag, Context *on_finish);
74 void get_tags(uint64_t start_after_tag_tid,
75 const boost::optional<uint64_t> &tag_class, Tags *tags,
76 Context *on_finish);
77
78 inline const Settings &get_settings() const {
79 return m_settings;
80 }
81 inline const std::string &get_client_id() const {
82 return m_client_id;
83 }
84 inline uint8_t get_order() const {
85 return m_order;
86 }
87 inline uint64_t get_object_size() const {
88 return 1 << m_order;
89 }
90 inline uint8_t get_splay_width() const {
91 return m_splay_width;
92 }
93 inline int64_t get_pool_id() const {
94 return m_pool_id;
95 }
96
97 inline void queue(Context *on_finish, int r) {
98 m_work_queue->queue(on_finish, r);
99 }
100
101 inline ContextWQ *get_work_queue() {
102 return m_work_queue;
103 }
104
105 inline SafeTimer &get_timer() {
106 return *m_timer;
107 }
108 inline Mutex &get_timer_lock() {
109 return *m_timer_lock;
110 }
111
112 void set_minimum_set(uint64_t object_set);
113 inline uint64_t get_minimum_set() const {
114 Mutex::Locker locker(m_lock);
115 return m_minimum_set;
116 }
117
118 int set_active_set(uint64_t object_set);
119 void set_active_set(uint64_t object_set, Context *on_finish);
120 inline uint64_t get_active_set() const {
121 Mutex::Locker locker(m_lock);
122 return m_active_set;
123 }
124
125 void assert_active_tag(uint64_t tag_tid, Context *on_finish);
126
127 void flush_commit_position();
128 void flush_commit_position(Context *on_safe);
129 void get_commit_position(ObjectSetPosition *commit_position) const {
130 Mutex::Locker locker(m_lock);
131 *commit_position = m_client.commit_position;
132 }
133
134 void get_registered_clients(RegisteredClients *registered_clients) {
135 Mutex::Locker locker(m_lock);
136 *registered_clients = m_registered_clients;
137 }
138
139 inline uint64_t allocate_entry_tid(uint64_t tag_tid) {
140 Mutex::Locker locker(m_lock);
141 return m_allocated_entry_tids[tag_tid]++;
142 }
143 void reserve_entry_tid(uint64_t tag_tid, uint64_t entry_tid);
144 bool get_last_allocated_entry_tid(uint64_t tag_tid, uint64_t *entry_tid) const;
145
146 uint64_t allocate_commit_tid(uint64_t object_num, uint64_t tag_tid,
147 uint64_t entry_tid);
148 void overflow_commit_tid(uint64_t commit_tid, uint64_t object_num);
149 void get_commit_entry(uint64_t commit_tid, uint64_t *object_num,
150 uint64_t *tag_tid, uint64_t *entry_tid);
151 void committed(uint64_t commit_tid, const CreateContext &create_context);
152
153 void notify_update();
154 void async_notify_update(Context *on_safe);
155
156 void wait_for_ops();
157
158private:
159 typedef std::map<uint64_t, uint64_t> AllocatedEntryTids;
160 typedef std::list<JournalMetadataListener*> Listeners;
94b18763 161 typedef std::list<Context*> Contexts;
7c673cae
FG
162
163 struct CommitEntry {
164 uint64_t object_num;
165 uint64_t tag_tid;
166 uint64_t entry_tid;
167 bool committed;
168
169 CommitEntry() : object_num(0), tag_tid(0), entry_tid(0), committed(false) {
170 }
171 CommitEntry(uint64_t _object_num, uint64_t _tag_tid, uint64_t _entry_tid)
172 : object_num(_object_num), tag_tid(_tag_tid), entry_tid(_entry_tid),
173 committed(false) {
174 }
175 };
176 typedef std::map<uint64_t, CommitEntry> CommitTids;
177
178 struct C_WatchCtx : public librados::WatchCtx2 {
179 JournalMetadata *journal_metadata;
180
181 C_WatchCtx(JournalMetadata *_journal_metadata)
182 : journal_metadata(_journal_metadata) {}
183
184 void handle_notify(uint64_t notify_id, uint64_t cookie,
185 uint64_t notifier_id, bufferlist& bl) override {
186 journal_metadata->handle_watch_notify(notify_id, cookie);
187 }
188 void handle_error(uint64_t cookie, int err) override {
189 journal_metadata->handle_watch_error(err);
190 }
191 };
192
193 struct C_WatchReset : public Context {
194 JournalMetadata *journal_metadata;
195
196 C_WatchReset(JournalMetadata *_journal_metadata)
197 : journal_metadata(_journal_metadata) {
198 journal_metadata->m_async_op_tracker.start_op();
199 }
200 ~C_WatchReset() override {
201 journal_metadata->m_async_op_tracker.finish_op();
202 }
203 void finish(int r) override {
204 journal_metadata->handle_watch_reset();
205 }
206 };
207
208 struct C_CommitPositionTask : public Context {
209 JournalMetadata *journal_metadata;
210
211 C_CommitPositionTask(JournalMetadata *_journal_metadata)
212 : journal_metadata(_journal_metadata) {
213 journal_metadata->m_async_op_tracker.start_op();
214 }
215 ~C_CommitPositionTask() override {
216 journal_metadata->m_async_op_tracker.finish_op();
217 }
218 void finish(int r) override {
219 Mutex::Locker locker(journal_metadata->m_lock);
220 journal_metadata->handle_commit_position_task();
221 };
222 };
223
224 struct C_AioNotify : public Context {
225 JournalMetadata* journal_metadata;
226 Context *on_safe;
227
228 C_AioNotify(JournalMetadata *_journal_metadata, Context *_on_safe)
229 : journal_metadata(_journal_metadata), on_safe(_on_safe) {
230 journal_metadata->m_async_op_tracker.start_op();
231 }
232 ~C_AioNotify() override {
233 journal_metadata->m_async_op_tracker.finish_op();
234 }
235 void finish(int r) override {
236 journal_metadata->handle_notified(r);
237 if (on_safe != nullptr) {
238 on_safe->complete(0);
239 }
240 }
241 };
242
243 struct C_NotifyUpdate : public Context {
244 JournalMetadata* journal_metadata;
245 Context *on_safe;
246
247 C_NotifyUpdate(JournalMetadata *_journal_metadata, Context *_on_safe = NULL)
248 : journal_metadata(_journal_metadata), on_safe(_on_safe) {
249 journal_metadata->m_async_op_tracker.start_op();
250 }
251 ~C_NotifyUpdate() override {
252 journal_metadata->m_async_op_tracker.finish_op();
253 }
254 void finish(int r) override {
255 if (r == 0) {
256 journal_metadata->async_notify_update(on_safe);
257 return;
258 }
259 if (on_safe != NULL) {
260 on_safe->complete(r);
261 }
262 }
263 };
264
265 struct C_ImmutableMetadata : public Context {
266 JournalMetadata* journal_metadata;
267 Context *on_finish;
268
269 C_ImmutableMetadata(JournalMetadata *_journal_metadata, Context *_on_finish)
270 : journal_metadata(_journal_metadata), on_finish(_on_finish) {
271 Mutex::Locker locker(journal_metadata->m_lock);
272 journal_metadata->m_async_op_tracker.start_op();
273 }
274 ~C_ImmutableMetadata() override {
275 journal_metadata->m_async_op_tracker.finish_op();
276 }
277 void finish(int r) override {
278 journal_metadata->handle_immutable_metadata(r, on_finish);
279 }
280 };
281
282 struct C_Refresh : public Context {
283 JournalMetadata* journal_metadata;
284 uint64_t minimum_set;
285 uint64_t active_set;
286 RegisteredClients registered_clients;
7c673cae 287
94b18763
FG
288 C_Refresh(JournalMetadata *_journal_metadata)
289 : journal_metadata(_journal_metadata), minimum_set(0), active_set(0) {
7c673cae
FG
290 Mutex::Locker locker(journal_metadata->m_lock);
291 journal_metadata->m_async_op_tracker.start_op();
292 }
293 ~C_Refresh() override {
294 journal_metadata->m_async_op_tracker.finish_op();
295 }
296 void finish(int r) override {
297 journal_metadata->handle_refresh_complete(this, r);
298 }
299 };
300
301 librados::IoCtx m_ioctx;
302 CephContext *m_cct;
303 std::string m_oid;
304 std::string m_client_id;
305 Settings m_settings;
306
307 uint8_t m_order;
308 uint8_t m_splay_width;
309 int64_t m_pool_id;
310 bool m_initialized;
311
312 ContextWQ *m_work_queue;
313 SafeTimer *m_timer;
314 Mutex *m_timer_lock;
315
316 mutable Mutex m_lock;
317
318 uint64_t m_commit_tid;
319 CommitTids m_pending_commit_tids;
320
321 Listeners m_listeners;
322
323 C_WatchCtx m_watch_ctx;
324 uint64_t m_watch_handle;
325
326 uint64_t m_minimum_set;
327 uint64_t m_active_set;
328 RegisteredClients m_registered_clients;
329 Client m_client;
330
331 AllocatedEntryTids m_allocated_entry_tids;
332
333 size_t m_update_notifications;
334 Cond m_update_cond;
335
94b18763
FG
336 size_t m_ignore_watch_notifies = 0;
337 size_t m_refreshes_in_progress = 0;
338 Contexts m_refresh_ctxs;
339
7c673cae
FG
340 uint64_t m_commit_position_tid = 0;
341 ObjectSetPosition m_commit_position;
342 Context *m_commit_position_ctx;
343 Context *m_commit_position_task_ctx;
344
94b18763
FG
345 size_t m_flush_commits_in_progress = 0;
346 Contexts m_flush_commit_position_ctxs;
347
7c673cae
FG
348 AsyncOpTracker m_async_op_tracker;
349
350 void handle_immutable_metadata(int r, Context *on_init);
351
352 void refresh(Context *on_finish);
353 void handle_refresh_complete(C_Refresh *refresh, int r);
354
355 void cancel_commit_task();
356 void schedule_commit_task();
357 void handle_commit_position_task();
358
359 void schedule_watch_reset();
360 void handle_watch_reset();
361 void handle_watch_notify(uint64_t notify_id, uint64_t cookie);
362 void handle_watch_error(int err);
363 void handle_notified(int r);
364
94b18763 365 void schedule_laggy_clients_disconnect(Context *on_finish);
7c673cae
FG
366
367 friend std::ostream &operator<<(std::ostream &os,
368 const JournalMetadata &journal_metadata);
369};
370
371std::ostream &operator<<(std::ostream &os,
372 const JournalMetadata::RegisteredClients &clients);
373
374std::ostream &operator<<(std::ostream &os,
375 const JournalMetadata &journal_metadata);
376
377} // namespace journal
378
379#endif // CEPH_JOURNAL_JOURNAL_METADATA_H