]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "journal/Journaler.h" | |
5 | #include "include/stringify.h" | |
6 | #include "common/errno.h" | |
7 | #include "common/Timer.h" | |
8 | #include "common/WorkQueue.h" | |
9 | #include "journal/Entry.h" | |
10 | #include "journal/FutureImpl.h" | |
11 | #include "journal/JournalMetadata.h" | |
12 | #include "journal/JournalPlayer.h" | |
13 | #include "journal/JournalRecorder.h" | |
14 | #include "journal/JournalTrimmer.h" | |
15 | #include "journal/ReplayEntry.h" | |
16 | #include "journal/ReplayHandler.h" | |
17 | #include "cls/journal/cls_journal_client.h" | |
18 | #include "cls/journal/cls_journal_types.h" | |
19 | #include "Utils.h" | |
20 | ||
21 | #define dout_subsys ceph_subsys_journaler | |
22 | #undef dout_prefix | |
23 | #define dout_prefix *_dout << "Journaler: " << this << " " | |
24 | ||
25 | namespace journal { | |
26 | ||
27 | namespace { | |
28 | ||
29 | static const std::string JOURNAL_HEADER_PREFIX = "journal."; | |
30 | static const std::string JOURNAL_OBJECT_PREFIX = "journal_data."; | |
31 | ||
32 | } // anonymous namespace | |
33 | ||
34 | using namespace cls::journal; | |
35 | using utils::rados_ctx_callback; | |
36 | ||
37 | std::string Journaler::header_oid(const std::string &journal_id) { | |
38 | return JOURNAL_HEADER_PREFIX + journal_id; | |
39 | } | |
40 | ||
41 | std::string Journaler::object_oid_prefix(int pool_id, | |
42 | const std::string &journal_id) { | |
43 | return JOURNAL_OBJECT_PREFIX + stringify(pool_id) + "." + journal_id + "."; | |
44 | } | |
45 | ||
46 | Journaler::Threads::Threads(CephContext *cct) | |
47 | : timer_lock("Journaler::timer_lock") { | |
48 | thread_pool = new ThreadPool(cct, "Journaler::thread_pool", "tp_journal", 1); | |
49 | thread_pool->start(); | |
50 | ||
51 | work_queue = new ContextWQ("Journaler::work_queue", 60, thread_pool); | |
52 | ||
53 | timer = new SafeTimer(cct, timer_lock, true); | |
54 | timer->init(); | |
55 | } | |
56 | ||
57 | Journaler::Threads::~Threads() { | |
58 | { | |
59 | Mutex::Locker timer_locker(timer_lock); | |
60 | timer->shutdown(); | |
61 | } | |
62 | delete timer; | |
63 | ||
64 | work_queue->drain(); | |
65 | delete work_queue; | |
66 | ||
67 | thread_pool->stop(); | |
68 | delete thread_pool; | |
69 | } | |
70 | ||
71 | Journaler::Journaler(librados::IoCtx &header_ioctx, | |
72 | const std::string &journal_id, | |
73 | const std::string &client_id, const Settings &settings) | |
74 | : m_threads(new Threads(reinterpret_cast<CephContext*>(header_ioctx.cct()))), | |
75 | m_client_id(client_id) { | |
76 | set_up(m_threads->work_queue, m_threads->timer, &m_threads->timer_lock, | |
77 | header_ioctx, journal_id, settings); | |
78 | } | |
79 | ||
80 | Journaler::Journaler(ContextWQ *work_queue, SafeTimer *timer, | |
81 | Mutex *timer_lock, librados::IoCtx &header_ioctx, | |
82 | const std::string &journal_id, | |
83 | const std::string &client_id, const Settings &settings) | |
84 | : m_client_id(client_id) { | |
85 | set_up(work_queue, timer, timer_lock, header_ioctx, journal_id, | |
86 | settings); | |
87 | } | |
88 | ||
89 | void Journaler::set_up(ContextWQ *work_queue, SafeTimer *timer, | |
90 | Mutex *timer_lock, librados::IoCtx &header_ioctx, | |
91 | const std::string &journal_id, | |
92 | const Settings &settings) { | |
93 | m_header_ioctx.dup(header_ioctx); | |
94 | m_cct = reinterpret_cast<CephContext *>(m_header_ioctx.cct()); | |
95 | ||
96 | m_header_oid = header_oid(journal_id); | |
97 | m_object_oid_prefix = object_oid_prefix(m_header_ioctx.get_id(), journal_id); | |
98 | ||
99 | m_metadata = new JournalMetadata(work_queue, timer, timer_lock, | |
100 | m_header_ioctx, m_header_oid, m_client_id, | |
101 | settings); | |
102 | m_metadata->get(); | |
103 | } | |
104 | ||
105 | Journaler::~Journaler() { | |
106 | if (m_metadata != nullptr) { | |
107 | assert(!m_metadata->is_initialized()); | |
108 | if (!m_initialized) { | |
109 | // never initialized -- ensure any in-flight ops are complete | |
110 | // since we wouldn't expect shut_down to be invoked | |
111 | m_metadata->wait_for_ops(); | |
112 | } | |
113 | m_metadata->put(); | |
114 | m_metadata = nullptr; | |
115 | } | |
116 | assert(m_trimmer == nullptr); | |
117 | assert(m_player == nullptr); | |
118 | assert(m_recorder == nullptr); | |
119 | ||
120 | delete m_threads; | |
121 | } | |
122 | ||
123 | void Journaler::exists(Context *on_finish) const { | |
124 | librados::ObjectReadOperation op; | |
125 | op.stat(NULL, NULL, NULL); | |
126 | ||
127 | librados::AioCompletion *comp = | |
128 | librados::Rados::aio_create_completion(on_finish, nullptr, rados_ctx_callback); | |
129 | int r = m_header_ioctx.aio_operate(m_header_oid, comp, &op, NULL); | |
130 | assert(r == 0); | |
131 | comp->release(); | |
132 | } | |
133 | ||
134 | void Journaler::init(Context *on_init) { | |
135 | m_initialized = true; | |
136 | m_metadata->init(new C_InitJournaler(this, on_init)); | |
137 | } | |
138 | ||
139 | int Journaler::init_complete() { | |
140 | int64_t pool_id = m_metadata->get_pool_id(); | |
141 | ||
142 | if (pool_id < 0 || pool_id == m_header_ioctx.get_id()) { | |
143 | ldout(m_cct, 20) << "using image pool for journal data" << dendl; | |
144 | m_data_ioctx.dup(m_header_ioctx); | |
145 | } else { | |
146 | ldout(m_cct, 20) << "using pool id=" << pool_id << " for journal data" | |
147 | << dendl; | |
148 | librados::Rados rados(m_header_ioctx); | |
149 | int r = rados.ioctx_create2(pool_id, m_data_ioctx); | |
150 | if (r < 0) { | |
151 | if (r == -ENOENT) { | |
152 | ldout(m_cct, 1) << "pool id=" << pool_id << " no longer exists" | |
153 | << dendl; | |
154 | } | |
155 | return r; | |
156 | } | |
157 | } | |
158 | m_trimmer = new JournalTrimmer(m_data_ioctx, m_object_oid_prefix, | |
159 | m_metadata); | |
160 | return 0; | |
161 | } | |
162 | ||
163 | void Journaler::shut_down() { | |
164 | C_SaferCond ctx; | |
165 | shut_down(&ctx); | |
166 | ctx.wait(); | |
167 | } | |
168 | ||
169 | void Journaler::shut_down(Context *on_finish) { | |
170 | assert(m_player == nullptr); | |
171 | assert(m_recorder == nullptr); | |
172 | ||
173 | JournalMetadata *metadata = nullptr; | |
174 | std::swap(metadata, m_metadata); | |
175 | assert(metadata != nullptr); | |
176 | ||
177 | on_finish = new FunctionContext([metadata, on_finish](int r) { | |
178 | metadata->put(); | |
179 | on_finish->complete(0); | |
180 | }); | |
181 | ||
182 | JournalTrimmer *trimmer = nullptr; | |
183 | std::swap(trimmer, m_trimmer); | |
184 | if (trimmer == nullptr) { | |
185 | metadata->shut_down(on_finish); | |
186 | return; | |
187 | } | |
188 | ||
189 | on_finish = new FunctionContext([trimmer, metadata, on_finish](int r) { | |
190 | delete trimmer; | |
191 | metadata->shut_down(on_finish); | |
192 | }); | |
193 | trimmer->shut_down(on_finish); | |
194 | } | |
195 | ||
196 | bool Journaler::is_initialized() const { | |
197 | return m_metadata->is_initialized(); | |
198 | } | |
199 | ||
200 | void Journaler::get_immutable_metadata(uint8_t *order, uint8_t *splay_width, | |
201 | int64_t *pool_id, Context *on_finish) { | |
202 | m_metadata->get_immutable_metadata(order, splay_width, pool_id, on_finish); | |
203 | } | |
204 | ||
205 | void Journaler::get_mutable_metadata(uint64_t *minimum_set, | |
206 | uint64_t *active_set, | |
207 | RegisteredClients *clients, | |
208 | Context *on_finish) { | |
209 | m_metadata->get_mutable_metadata(minimum_set, active_set, clients, on_finish); | |
210 | } | |
211 | ||
212 | void Journaler::create(uint8_t order, uint8_t splay_width, | |
213 | int64_t pool_id, Context *on_finish) { | |
214 | if (order > 64 || order < 12) { | |
215 | lderr(m_cct) << "order must be in the range [12, 64]" << dendl; | |
216 | on_finish->complete(-EDOM); | |
217 | return; | |
218 | } | |
219 | if (splay_width == 0) { | |
220 | on_finish->complete(-EINVAL); | |
221 | return; | |
222 | } | |
223 | ||
224 | ldout(m_cct, 5) << "creating new journal: " << m_header_oid << dendl; | |
225 | ||
226 | librados::ObjectWriteOperation op; | |
227 | client::create(&op, order, splay_width, pool_id); | |
228 | ||
229 | librados::AioCompletion *comp = | |
230 | librados::Rados::aio_create_completion(on_finish, nullptr, rados_ctx_callback); | |
231 | int r = m_header_ioctx.aio_operate(m_header_oid, comp, &op); | |
232 | assert(r == 0); | |
233 | comp->release(); | |
234 | } | |
235 | ||
236 | void Journaler::remove(bool force, Context *on_finish) { | |
237 | // chain journal removal (reverse order) | |
238 | on_finish = new FunctionContext([this, on_finish](int r) { | |
239 | librados::AioCompletion *comp = librados::Rados::aio_create_completion( | |
240 | on_finish, nullptr, utils::rados_ctx_callback); | |
241 | r = m_header_ioctx.aio_remove(m_header_oid, comp); | |
242 | assert(r == 0); | |
243 | comp->release(); | |
244 | }); | |
245 | ||
246 | on_finish = new FunctionContext([this, force, on_finish](int r) { | |
247 | m_trimmer->remove_objects(force, on_finish); | |
248 | }); | |
249 | ||
250 | m_metadata->shut_down(on_finish); | |
251 | } | |
252 | ||
253 | void Journaler::flush_commit_position(Context *on_safe) { | |
254 | m_metadata->flush_commit_position(on_safe); | |
255 | } | |
256 | ||
257 | void Journaler::add_listener(JournalMetadataListener *listener) { | |
258 | m_metadata->add_listener(listener); | |
259 | } | |
260 | ||
261 | void Journaler::remove_listener(JournalMetadataListener *listener) { | |
262 | m_metadata->remove_listener(listener); | |
263 | } | |
264 | ||
265 | int Journaler::register_client(const bufferlist &data) { | |
266 | C_SaferCond cond; | |
267 | register_client(data, &cond); | |
268 | return cond.wait(); | |
269 | } | |
270 | ||
271 | int Journaler::unregister_client() { | |
272 | C_SaferCond cond; | |
273 | unregister_client(&cond); | |
274 | return cond.wait(); | |
275 | } | |
276 | ||
277 | void Journaler::register_client(const bufferlist &data, Context *on_finish) { | |
278 | return m_metadata->register_client(data, on_finish); | |
279 | } | |
280 | ||
281 | void Journaler::update_client(const bufferlist &data, Context *on_finish) { | |
282 | return m_metadata->update_client(data, on_finish); | |
283 | } | |
284 | ||
285 | void Journaler::unregister_client(Context *on_finish) { | |
286 | return m_metadata->unregister_client(on_finish); | |
287 | } | |
288 | ||
289 | void Journaler::get_client(const std::string &client_id, | |
290 | cls::journal::Client *client, | |
291 | Context *on_finish) { | |
292 | m_metadata->get_client(client_id, client, on_finish); | |
293 | } | |
294 | ||
295 | int Journaler::get_cached_client(const std::string &client_id, | |
296 | cls::journal::Client *client) { | |
297 | RegisteredClients clients; | |
298 | m_metadata->get_registered_clients(&clients); | |
299 | ||
300 | auto it = clients.find({client_id, {}}); | |
301 | if (it == clients.end()) { | |
302 | return -ENOENT; | |
303 | } | |
304 | ||
305 | *client = *it; | |
306 | return 0; | |
307 | } | |
308 | ||
309 | void Journaler::allocate_tag(const bufferlist &data, cls::journal::Tag *tag, | |
310 | Context *on_finish) { | |
311 | m_metadata->allocate_tag(cls::journal::Tag::TAG_CLASS_NEW, data, tag, | |
312 | on_finish); | |
313 | } | |
314 | ||
315 | void Journaler::allocate_tag(uint64_t tag_class, const bufferlist &data, | |
316 | cls::journal::Tag *tag, Context *on_finish) { | |
317 | m_metadata->allocate_tag(tag_class, data, tag, on_finish); | |
318 | } | |
319 | ||
320 | void Journaler::get_tag(uint64_t tag_tid, Tag *tag, Context *on_finish) { | |
321 | m_metadata->get_tag(tag_tid, tag, on_finish); | |
322 | } | |
323 | ||
324 | void Journaler::get_tags(uint64_t tag_class, Tags *tags, Context *on_finish) { | |
325 | m_metadata->get_tags(0, tag_class, tags, on_finish); | |
326 | } | |
327 | ||
328 | void Journaler::get_tags(uint64_t start_after_tag_tid, uint64_t tag_class, | |
329 | Tags *tags, Context *on_finish) { | |
330 | m_metadata->get_tags(start_after_tag_tid, tag_class, tags, on_finish); | |
331 | } | |
332 | ||
333 | void Journaler::start_replay(ReplayHandler *replay_handler) { | |
334 | create_player(replay_handler); | |
335 | m_player->prefetch(); | |
336 | } | |
337 | ||
338 | void Journaler::start_live_replay(ReplayHandler *replay_handler, | |
339 | double interval) { | |
340 | create_player(replay_handler); | |
341 | m_player->prefetch_and_watch(interval); | |
342 | } | |
343 | ||
344 | bool Journaler::try_pop_front(ReplayEntry *replay_entry, | |
345 | uint64_t *tag_tid) { | |
346 | assert(m_player != NULL); | |
347 | ||
348 | Entry entry; | |
349 | uint64_t commit_tid; | |
350 | if (!m_player->try_pop_front(&entry, &commit_tid)) { | |
351 | return false; | |
352 | } | |
353 | ||
354 | *replay_entry = ReplayEntry(entry.get_data(), commit_tid); | |
355 | if (tag_tid != nullptr) { | |
356 | *tag_tid = entry.get_tag_tid(); | |
357 | } | |
358 | return true; | |
359 | } | |
360 | ||
361 | void Journaler::stop_replay() { | |
362 | C_SaferCond ctx; | |
363 | stop_replay(&ctx); | |
364 | ctx.wait(); | |
365 | } | |
366 | ||
367 | void Journaler::stop_replay(Context *on_finish) { | |
368 | JournalPlayer *player = nullptr; | |
369 | std::swap(player, m_player); | |
370 | assert(player != nullptr); | |
371 | ||
372 | on_finish = new FunctionContext([player, on_finish](int r) { | |
373 | delete player; | |
374 | on_finish->complete(r); | |
375 | }); | |
376 | player->shut_down(on_finish); | |
377 | } | |
378 | ||
379 | void Journaler::committed(const ReplayEntry &replay_entry) { | |
380 | m_trimmer->committed(replay_entry.get_commit_tid()); | |
381 | } | |
382 | ||
383 | void Journaler::committed(const Future &future) { | |
384 | FutureImplPtr future_impl = future.get_future_impl(); | |
385 | m_trimmer->committed(future_impl->get_commit_tid()); | |
386 | } | |
387 | ||
388 | void Journaler::start_append(int flush_interval, uint64_t flush_bytes, | |
389 | double flush_age) { | |
390 | assert(m_recorder == NULL); | |
391 | ||
392 | // TODO verify active object set >= current replay object set | |
393 | ||
394 | m_recorder = new JournalRecorder(m_data_ioctx, m_object_oid_prefix, | |
395 | m_metadata, flush_interval, flush_bytes, | |
396 | flush_age); | |
397 | } | |
398 | ||
399 | void Journaler::stop_append(Context *on_safe) { | |
400 | JournalRecorder *recorder = nullptr; | |
401 | std::swap(recorder, m_recorder); | |
402 | assert(recorder != nullptr); | |
403 | ||
404 | on_safe = new FunctionContext([recorder, on_safe](int r) { | |
405 | delete recorder; | |
406 | on_safe->complete(r); | |
407 | }); | |
408 | recorder->flush(on_safe); | |
409 | } | |
410 | ||
411 | uint64_t Journaler::get_max_append_size() const { | |
412 | uint64_t max_payload_size = m_metadata->get_object_size() - | |
413 | Entry::get_fixed_size(); | |
414 | if (m_metadata->get_settings().max_payload_bytes > 0) { | |
415 | max_payload_size = MIN(max_payload_size, | |
416 | m_metadata->get_settings().max_payload_bytes); | |
417 | } | |
418 | return max_payload_size; | |
419 | } | |
420 | ||
421 | Future Journaler::append(uint64_t tag_tid, const bufferlist &payload_bl) { | |
422 | return m_recorder->append(tag_tid, payload_bl); | |
423 | } | |
424 | ||
425 | void Journaler::flush_append(Context *on_safe) { | |
426 | m_recorder->flush(on_safe); | |
427 | } | |
428 | ||
429 | void Journaler::create_player(ReplayHandler *replay_handler) { | |
430 | assert(m_player == NULL); | |
431 | m_player = new JournalPlayer(m_data_ioctx, m_object_oid_prefix, m_metadata, | |
432 | replay_handler); | |
433 | } | |
434 | ||
435 | void Journaler::get_metadata(uint8_t *order, uint8_t *splay_width, | |
436 | int64_t *pool_id) { | |
437 | assert(m_metadata != NULL); | |
438 | ||
439 | *order = m_metadata->get_order(); | |
440 | *splay_width = m_metadata->get_splay_width(); | |
441 | *pool_id = m_metadata->get_pool_id(); | |
442 | } | |
443 | ||
444 | std::ostream &operator<<(std::ostream &os, | |
445 | const Journaler &journaler) { | |
446 | os << "[metadata="; | |
447 | if (journaler.m_metadata != NULL) { | |
448 | os << *journaler.m_metadata; | |
449 | } else { | |
450 | os << "NULL"; | |
451 | } | |
452 | os << "]"; | |
453 | return os; | |
454 | } | |
455 | ||
456 | } // namespace journal |