]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #ifndef CEPH_JOURNAL_JOURNAL_METADATA_H | |
5 | #define CEPH_JOURNAL_JOURNAL_METADATA_H | |
6 | ||
7 | #include "include/int_types.h" | |
8 | #include "include/Context.h" | |
9 | #include "include/rados/librados.hpp" | |
10 | #include "common/AsyncOpTracker.h" | |
11 | #include "common/Cond.h" | |
12 | #include "common/Mutex.h" | |
13 | #include "common/RefCountedObj.h" | |
14 | #include "common/WorkQueue.h" | |
15 | #include "cls/journal/cls_journal_types.h" | |
16 | #include "journal/JournalMetadataListener.h" | |
17 | #include "journal/Settings.h" | |
18 | #include <boost/intrusive_ptr.hpp> | |
19 | #include <boost/noncopyable.hpp> | |
20 | #include <boost/optional.hpp> | |
21 | #include <functional> | |
22 | #include <list> | |
23 | #include <map> | |
24 | #include <string> | |
11fdf7f2 | 25 | #include "include/ceph_assert.h" |
7c673cae FG |
26 | |
27 | class SafeTimer; | |
28 | ||
29 | namespace journal { | |
30 | ||
31 | class JournalMetadata; | |
32 | typedef boost::intrusive_ptr<JournalMetadata> JournalMetadataPtr; | |
33 | ||
34 | class JournalMetadata : public RefCountedObject, boost::noncopyable { | |
35 | public: | |
36 | typedef std::function<Context*()> CreateContext; | |
37 | typedef cls::journal::ObjectPosition ObjectPosition; | |
38 | typedef cls::journal::ObjectPositions ObjectPositions; | |
39 | typedef cls::journal::ObjectSetPosition ObjectSetPosition; | |
40 | typedef cls::journal::Client Client; | |
41 | typedef cls::journal::Tag Tag; | |
42 | ||
43 | typedef std::set<Client> RegisteredClients; | |
44 | typedef std::list<Tag> Tags; | |
45 | ||
46 | JournalMetadata(ContextWQ *work_queue, SafeTimer *timer, Mutex *timer_lock, | |
47 | librados::IoCtx &ioctx, const std::string &oid, | |
48 | const std::string &client_id, const Settings &settings); | |
49 | ~JournalMetadata() override; | |
50 | ||
51 | void init(Context *on_init); | |
52 | void shut_down(Context *on_finish); | |
53 | ||
54 | bool is_initialized() const { return m_initialized; } | |
55 | ||
56 | void get_immutable_metadata(uint8_t *order, uint8_t *splay_width, | |
57 | int64_t *pool_id, Context *on_finish); | |
58 | ||
59 | void get_mutable_metadata(uint64_t *minimum_set, uint64_t *active_set, | |
60 | RegisteredClients *clients, Context *on_finish); | |
61 | ||
62 | void add_listener(JournalMetadataListener *listener); | |
63 | void remove_listener(JournalMetadataListener *listener); | |
64 | ||
65 | void register_client(const bufferlist &data, Context *on_finish); | |
66 | void update_client(const bufferlist &data, Context *on_finish); | |
67 | void unregister_client(Context *on_finish); | |
68 | void get_client(const std::string &client_id, cls::journal::Client *client, | |
69 | Context *on_finish); | |
70 | ||
71 | void allocate_tag(uint64_t tag_class, const bufferlist &data, | |
72 | Tag *tag, Context *on_finish); | |
73 | void get_tag(uint64_t tag_tid, Tag *tag, Context *on_finish); | |
74 | void get_tags(uint64_t start_after_tag_tid, | |
75 | const boost::optional<uint64_t> &tag_class, Tags *tags, | |
76 | Context *on_finish); | |
77 | ||
78 | inline const Settings &get_settings() const { | |
79 | return m_settings; | |
80 | } | |
81 | inline const std::string &get_client_id() const { | |
82 | return m_client_id; | |
83 | } | |
84 | inline uint8_t get_order() const { | |
85 | return m_order; | |
86 | } | |
87 | inline uint64_t get_object_size() const { | |
88 | return 1 << m_order; | |
89 | } | |
90 | inline uint8_t get_splay_width() const { | |
91 | return m_splay_width; | |
92 | } | |
93 | inline int64_t get_pool_id() const { | |
94 | return m_pool_id; | |
95 | } | |
96 | ||
97 | inline void queue(Context *on_finish, int r) { | |
98 | m_work_queue->queue(on_finish, r); | |
99 | } | |
100 | ||
101 | inline ContextWQ *get_work_queue() { | |
102 | return m_work_queue; | |
103 | } | |
104 | ||
105 | inline SafeTimer &get_timer() { | |
106 | return *m_timer; | |
107 | } | |
108 | inline Mutex &get_timer_lock() { | |
109 | return *m_timer_lock; | |
110 | } | |
111 | ||
112 | void set_minimum_set(uint64_t object_set); | |
113 | inline uint64_t get_minimum_set() const { | |
114 | Mutex::Locker locker(m_lock); | |
115 | return m_minimum_set; | |
116 | } | |
117 | ||
118 | int set_active_set(uint64_t object_set); | |
119 | void set_active_set(uint64_t object_set, Context *on_finish); | |
120 | inline uint64_t get_active_set() const { | |
121 | Mutex::Locker locker(m_lock); | |
122 | return m_active_set; | |
123 | } | |
124 | ||
125 | void assert_active_tag(uint64_t tag_tid, Context *on_finish); | |
126 | ||
127 | void flush_commit_position(); | |
128 | void flush_commit_position(Context *on_safe); | |
129 | void get_commit_position(ObjectSetPosition *commit_position) const { | |
130 | Mutex::Locker locker(m_lock); | |
131 | *commit_position = m_client.commit_position; | |
132 | } | |
133 | ||
134 | void get_registered_clients(RegisteredClients *registered_clients) { | |
135 | Mutex::Locker locker(m_lock); | |
136 | *registered_clients = m_registered_clients; | |
137 | } | |
138 | ||
139 | inline uint64_t allocate_entry_tid(uint64_t tag_tid) { | |
140 | Mutex::Locker locker(m_lock); | |
141 | return m_allocated_entry_tids[tag_tid]++; | |
142 | } | |
143 | void reserve_entry_tid(uint64_t tag_tid, uint64_t entry_tid); | |
144 | bool get_last_allocated_entry_tid(uint64_t tag_tid, uint64_t *entry_tid) const; | |
145 | ||
146 | uint64_t allocate_commit_tid(uint64_t object_num, uint64_t tag_tid, | |
147 | uint64_t entry_tid); | |
148 | void overflow_commit_tid(uint64_t commit_tid, uint64_t object_num); | |
149 | void get_commit_entry(uint64_t commit_tid, uint64_t *object_num, | |
150 | uint64_t *tag_tid, uint64_t *entry_tid); | |
151 | void committed(uint64_t commit_tid, const CreateContext &create_context); | |
152 | ||
153 | void notify_update(); | |
154 | void async_notify_update(Context *on_safe); | |
155 | ||
156 | void wait_for_ops(); | |
157 | ||
158 | private: | |
159 | typedef std::map<uint64_t, uint64_t> AllocatedEntryTids; | |
160 | typedef std::list<JournalMetadataListener*> Listeners; | |
94b18763 | 161 | typedef std::list<Context*> Contexts; |
7c673cae FG |
162 | |
163 | struct CommitEntry { | |
164 | uint64_t object_num; | |
165 | uint64_t tag_tid; | |
166 | uint64_t entry_tid; | |
167 | bool committed; | |
168 | ||
169 | CommitEntry() : object_num(0), tag_tid(0), entry_tid(0), committed(false) { | |
170 | } | |
171 | CommitEntry(uint64_t _object_num, uint64_t _tag_tid, uint64_t _entry_tid) | |
172 | : object_num(_object_num), tag_tid(_tag_tid), entry_tid(_entry_tid), | |
173 | committed(false) { | |
174 | } | |
175 | }; | |
176 | typedef std::map<uint64_t, CommitEntry> CommitTids; | |
177 | ||
178 | struct C_WatchCtx : public librados::WatchCtx2 { | |
179 | JournalMetadata *journal_metadata; | |
180 | ||
181 | C_WatchCtx(JournalMetadata *_journal_metadata) | |
182 | : journal_metadata(_journal_metadata) {} | |
183 | ||
184 | void handle_notify(uint64_t notify_id, uint64_t cookie, | |
185 | uint64_t notifier_id, bufferlist& bl) override { | |
186 | journal_metadata->handle_watch_notify(notify_id, cookie); | |
187 | } | |
188 | void handle_error(uint64_t cookie, int err) override { | |
189 | journal_metadata->handle_watch_error(err); | |
190 | } | |
191 | }; | |
192 | ||
193 | struct C_WatchReset : public Context { | |
194 | JournalMetadata *journal_metadata; | |
195 | ||
196 | C_WatchReset(JournalMetadata *_journal_metadata) | |
197 | : journal_metadata(_journal_metadata) { | |
198 | journal_metadata->m_async_op_tracker.start_op(); | |
199 | } | |
200 | ~C_WatchReset() override { | |
201 | journal_metadata->m_async_op_tracker.finish_op(); | |
202 | } | |
203 | void finish(int r) override { | |
204 | journal_metadata->handle_watch_reset(); | |
205 | } | |
206 | }; | |
207 | ||
208 | struct C_CommitPositionTask : public Context { | |
209 | JournalMetadata *journal_metadata; | |
210 | ||
211 | C_CommitPositionTask(JournalMetadata *_journal_metadata) | |
212 | : journal_metadata(_journal_metadata) { | |
213 | journal_metadata->m_async_op_tracker.start_op(); | |
214 | } | |
215 | ~C_CommitPositionTask() override { | |
216 | journal_metadata->m_async_op_tracker.finish_op(); | |
217 | } | |
218 | void finish(int r) override { | |
219 | Mutex::Locker locker(journal_metadata->m_lock); | |
220 | journal_metadata->handle_commit_position_task(); | |
221 | }; | |
222 | }; | |
223 | ||
224 | struct C_AioNotify : public Context { | |
225 | JournalMetadata* journal_metadata; | |
226 | Context *on_safe; | |
227 | ||
228 | C_AioNotify(JournalMetadata *_journal_metadata, Context *_on_safe) | |
229 | : journal_metadata(_journal_metadata), on_safe(_on_safe) { | |
230 | journal_metadata->m_async_op_tracker.start_op(); | |
231 | } | |
232 | ~C_AioNotify() override { | |
233 | journal_metadata->m_async_op_tracker.finish_op(); | |
234 | } | |
235 | void finish(int r) override { | |
236 | journal_metadata->handle_notified(r); | |
237 | if (on_safe != nullptr) { | |
238 | on_safe->complete(0); | |
239 | } | |
240 | } | |
241 | }; | |
242 | ||
243 | struct C_NotifyUpdate : public Context { | |
244 | JournalMetadata* journal_metadata; | |
245 | Context *on_safe; | |
246 | ||
247 | C_NotifyUpdate(JournalMetadata *_journal_metadata, Context *_on_safe = NULL) | |
248 | : journal_metadata(_journal_metadata), on_safe(_on_safe) { | |
249 | journal_metadata->m_async_op_tracker.start_op(); | |
250 | } | |
251 | ~C_NotifyUpdate() override { | |
252 | journal_metadata->m_async_op_tracker.finish_op(); | |
253 | } | |
254 | void finish(int r) override { | |
255 | if (r == 0) { | |
256 | journal_metadata->async_notify_update(on_safe); | |
257 | return; | |
258 | } | |
259 | if (on_safe != NULL) { | |
260 | on_safe->complete(r); | |
261 | } | |
262 | } | |
263 | }; | |
264 | ||
265 | struct C_ImmutableMetadata : public Context { | |
266 | JournalMetadata* journal_metadata; | |
267 | Context *on_finish; | |
268 | ||
269 | C_ImmutableMetadata(JournalMetadata *_journal_metadata, Context *_on_finish) | |
270 | : journal_metadata(_journal_metadata), on_finish(_on_finish) { | |
271 | Mutex::Locker locker(journal_metadata->m_lock); | |
272 | journal_metadata->m_async_op_tracker.start_op(); | |
273 | } | |
274 | ~C_ImmutableMetadata() override { | |
275 | journal_metadata->m_async_op_tracker.finish_op(); | |
276 | } | |
277 | void finish(int r) override { | |
278 | journal_metadata->handle_immutable_metadata(r, on_finish); | |
279 | } | |
280 | }; | |
281 | ||
282 | struct C_Refresh : public Context { | |
283 | JournalMetadata* journal_metadata; | |
284 | uint64_t minimum_set; | |
285 | uint64_t active_set; | |
286 | RegisteredClients registered_clients; | |
7c673cae | 287 | |
94b18763 FG |
288 | C_Refresh(JournalMetadata *_journal_metadata) |
289 | : journal_metadata(_journal_metadata), minimum_set(0), active_set(0) { | |
7c673cae FG |
290 | Mutex::Locker locker(journal_metadata->m_lock); |
291 | journal_metadata->m_async_op_tracker.start_op(); | |
292 | } | |
293 | ~C_Refresh() override { | |
294 | journal_metadata->m_async_op_tracker.finish_op(); | |
295 | } | |
296 | void finish(int r) override { | |
297 | journal_metadata->handle_refresh_complete(this, r); | |
298 | } | |
299 | }; | |
300 | ||
301 | librados::IoCtx m_ioctx; | |
302 | CephContext *m_cct; | |
303 | std::string m_oid; | |
304 | std::string m_client_id; | |
305 | Settings m_settings; | |
306 | ||
307 | uint8_t m_order; | |
308 | uint8_t m_splay_width; | |
309 | int64_t m_pool_id; | |
310 | bool m_initialized; | |
311 | ||
312 | ContextWQ *m_work_queue; | |
313 | SafeTimer *m_timer; | |
314 | Mutex *m_timer_lock; | |
315 | ||
316 | mutable Mutex m_lock; | |
317 | ||
318 | uint64_t m_commit_tid; | |
319 | CommitTids m_pending_commit_tids; | |
320 | ||
321 | Listeners m_listeners; | |
322 | ||
323 | C_WatchCtx m_watch_ctx; | |
324 | uint64_t m_watch_handle; | |
325 | ||
326 | uint64_t m_minimum_set; | |
327 | uint64_t m_active_set; | |
328 | RegisteredClients m_registered_clients; | |
329 | Client m_client; | |
330 | ||
331 | AllocatedEntryTids m_allocated_entry_tids; | |
332 | ||
333 | size_t m_update_notifications; | |
334 | Cond m_update_cond; | |
335 | ||
94b18763 FG |
336 | size_t m_ignore_watch_notifies = 0; |
337 | size_t m_refreshes_in_progress = 0; | |
338 | Contexts m_refresh_ctxs; | |
339 | ||
7c673cae FG |
340 | uint64_t m_commit_position_tid = 0; |
341 | ObjectSetPosition m_commit_position; | |
342 | Context *m_commit_position_ctx; | |
343 | Context *m_commit_position_task_ctx; | |
344 | ||
94b18763 FG |
345 | size_t m_flush_commits_in_progress = 0; |
346 | Contexts m_flush_commit_position_ctxs; | |
347 | ||
7c673cae FG |
348 | AsyncOpTracker m_async_op_tracker; |
349 | ||
350 | void handle_immutable_metadata(int r, Context *on_init); | |
351 | ||
352 | void refresh(Context *on_finish); | |
353 | void handle_refresh_complete(C_Refresh *refresh, int r); | |
354 | ||
355 | void cancel_commit_task(); | |
356 | void schedule_commit_task(); | |
357 | void handle_commit_position_task(); | |
358 | ||
359 | void schedule_watch_reset(); | |
360 | void handle_watch_reset(); | |
361 | void handle_watch_notify(uint64_t notify_id, uint64_t cookie); | |
362 | void handle_watch_error(int err); | |
363 | void handle_notified(int r); | |
364 | ||
94b18763 | 365 | void schedule_laggy_clients_disconnect(Context *on_finish); |
7c673cae FG |
366 | |
367 | friend std::ostream &operator<<(std::ostream &os, | |
368 | const JournalMetadata &journal_metadata); | |
369 | }; | |
370 | ||
371 | std::ostream &operator<<(std::ostream &os, | |
372 | const JournalMetadata::RegisteredClients &clients); | |
373 | ||
374 | std::ostream &operator<<(std::ostream &os, | |
375 | const JournalMetadata &journal_metadata); | |
376 | ||
377 | } // namespace journal | |
378 | ||
379 | #endif // CEPH_JOURNAL_JOURNAL_METADATA_H |