]> git.proxmox.com Git - ceph.git/blame - ceph/src/mds/MDLog.cc
update source to 12.2.11
[ceph.git] / ceph / src / mds / MDLog.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include "MDSRank.h"
16#include "MDLog.h"
17#include "MDCache.h"
18#include "LogEvent.h"
19#include "MDSContext.h"
20
21#include "osdc/Journaler.h"
22#include "mds/JournalPointer.h"
23
24#include "common/entity_name.h"
25#include "common/perf_counters.h"
26#include "common/Cond.h"
27
28#include "events/ESubtreeMap.h"
29
30#include "common/config.h"
31#include "common/errno.h"
32#include "include/assert.h"
33
34#define dout_context g_ceph_context
35#define dout_subsys ceph_subsys_mds
36#undef dout_prefix
37#define dout_prefix *_dout << "mds." << mds->get_nodeid() << ".log "
38
39// cons/des
40MDLog::~MDLog()
41{
42 if (journaler) { delete journaler; journaler = 0; }
43 if (logger) {
44 g_ceph_context->get_perfcounters_collection()->remove(logger);
45 delete logger;
46 logger = 0;
47 }
48}
49
50
51void MDLog::create_logger()
52{
53 PerfCountersBuilder plb(g_ceph_context, "mds_log", l_mdl_first, l_mdl_last);
54
91327a77
AA
55 plb.add_u64_counter(l_mdl_evadd, "evadd", "Events submitted", "subm",
56 PerfCountersBuilder::PRIO_INTERESTING);
57 plb.add_u64(l_mdl_ev, "ev", "Events", "evts",
58 PerfCountersBuilder::PRIO_INTERESTING);
59 plb.add_u64(l_mdl_seg, "seg", "Segments", "segs",
60 PerfCountersBuilder::PRIO_INTERESTING);
61
62 plb.set_prio_default(PerfCountersBuilder::PRIO_USEFUL);
7c673cae
FG
63 plb.add_u64(l_mdl_evexg, "evexg", "Expiring events");
64 plb.add_u64(l_mdl_evexd, "evexd", "Current expired events");
91327a77
AA
65 plb.add_u64(l_mdl_segexg, "segexg", "Expiring segments");
66 plb.add_u64(l_mdl_segexd, "segexd", "Current expired segments");
f64942e4
AA
67 plb.add_u64_counter(l_mdl_replayed, "replayed", "Events replayed",
68 "repl", PerfCountersBuilder::PRIO_INTERESTING);
91327a77
AA
69 plb.add_time_avg(l_mdl_jlat, "jlat", "Journaler flush latency");
70 plb.add_u64_counter(l_mdl_evex, "evex", "Total expired events");
71 plb.add_u64_counter(l_mdl_evtrm, "evtrm", "Trimmed events");
7c673cae
FG
72 plb.add_u64_counter(l_mdl_segadd, "segadd", "Segments added");
73 plb.add_u64_counter(l_mdl_segex, "segex", "Total expired segments");
74 plb.add_u64_counter(l_mdl_segtrm, "segtrm", "Trimmed segments");
7c673cae 75
91327a77 76 plb.set_prio_default(PerfCountersBuilder::PRIO_DEBUGONLY);
7c673cae
FG
77 plb.add_u64(l_mdl_expos, "expos", "Journaler xpire position");
78 plb.add_u64(l_mdl_wrpos, "wrpos", "Journaler write position");
79 plb.add_u64(l_mdl_rdpos, "rdpos", "Journaler read position");
7c673cae
FG
80
81 // logger
82 logger = plb.create_perf_counters();
83 g_ceph_context->get_perfcounters_collection()->add(logger);
84}
85
86void MDLog::set_write_iohint(unsigned iohint_flags)
87{
88 journaler->set_write_iohint(iohint_flags);
89}
90
91class C_MDL_WriteError : public MDSIOContextBase {
92 protected:
93 MDLog *mdlog;
94 MDSRank *get_mds() override {return mdlog->mds;}
95
96 void finish(int r) override {
97 MDSRank *mds = get_mds();
98 // assume journal is reliable, so don't choose action based on
99 // g_conf->mds_action_on_write_error.
100 if (r == -EBLACKLISTED) {
101 derr << "we have been blacklisted (fenced), respawning..." << dendl;
102 mds->respawn();
103 } else {
104 derr << "unhandled error " << cpp_strerror(r) << ", shutting down..." << dendl;
105 // Although it's possible that this could be something transient,
106 // it's severe and scary, so disable this rank until an administrator
107 // intervenes.
108 mds->clog->error() << "Unhandled journal write error on MDS rank " <<
109 mds->get_nodeid() << ": " << cpp_strerror(r) << ", shutting down.";
110 mds->damaged();
111 ceph_abort(); // damaged should never return
112 }
113 }
114
115 public:
91327a77
AA
116 explicit C_MDL_WriteError(MDLog *m) :
117 MDSIOContextBase(false), mdlog(m) {}
118 void print(ostream& out) const override {
119 out << "mdlog_write_error";
120 }
7c673cae
FG
121};
122
123
124void MDLog::write_head(MDSInternalContextBase *c)
125{
126 Context *fin = NULL;
127 if (c != NULL) {
128 fin = new C_IO_Wrapper(mds, c);
129 }
130 journaler->write_head(fin);
131}
132
133uint64_t MDLog::get_read_pos() const
134{
135 return journaler->get_read_pos();
136}
137
138uint64_t MDLog::get_write_pos() const
139{
140 return journaler->get_write_pos();
141}
142
143uint64_t MDLog::get_safe_pos() const
144{
145 return journaler->get_write_safe_pos();
146}
147
148
149
150void MDLog::create(MDSInternalContextBase *c)
151{
152 dout(5) << "create empty log" << dendl;
153
154 C_GatherBuilder gather(g_ceph_context);
155 // This requires an OnFinisher wrapper because Journaler will call back the completion for write_head inside its own lock
156 // XXX but should maybe that be handled inside Journaler?
157 gather.set_finisher(new C_IO_Wrapper(mds, c));
158
159 // The inode of the default Journaler we will create
160 ino = MDS_INO_LOG_OFFSET + mds->get_nodeid();
161
162 // Instantiate Journaler and start async write to RADOS
163 assert(journaler == NULL);
164 journaler = new Journaler("mdlog", ino, mds->mdsmap->get_metadata_pool(),
165 CEPH_FS_ONDISK_MAGIC, mds->objecter, logger,
166 l_mdl_jlat, mds->finisher);
167 assert(journaler->is_readonly());
168 journaler->set_write_error_handler(new C_MDL_WriteError(this));
169 journaler->set_writeable();
170 journaler->create(&mds->mdcache->default_log_layout, g_conf->mds_journal_format);
171 journaler->write_head(gather.new_sub());
172
173 // Async write JournalPointer to RADOS
174 JournalPointer jp(mds->get_nodeid(), mds->mdsmap->get_metadata_pool());
175 jp.front = ino;
176 jp.back = 0;
177 jp.save(mds->objecter, gather.new_sub());
178
179 gather.activate();
180
181 logger->set(l_mdl_expos, journaler->get_expire_pos());
182 logger->set(l_mdl_wrpos, journaler->get_write_pos());
183
184 submit_thread.create("md_submit");
185}
186
187void MDLog::open(MDSInternalContextBase *c)
188{
189 dout(5) << "open discovering log bounds" << dendl;
190
191 assert(!recovery_thread.is_started());
192 recovery_thread.set_completion(c);
193 recovery_thread.create("md_recov_open");
194
195 submit_thread.create("md_submit");
196 // either append() or replay() will follow.
197}
198
199/**
200 * Final part of reopen() procedure, after recovery_thread
201 * has done its thing we call append()
202 */
203class C_ReopenComplete : public MDSInternalContext {
204 MDLog *mdlog;
205 MDSInternalContextBase *on_complete;
206public:
207 C_ReopenComplete(MDLog *mdlog_, MDSInternalContextBase *on_complete_) : MDSInternalContext(mdlog_->mds), mdlog(mdlog_), on_complete(on_complete_) {}
208 void finish(int r) override {
209 mdlog->append();
210 on_complete->complete(r);
211 }
212};
213
214/**
215 * Given that open() has been called in the past, go through the journal
216 * recovery procedure again, potentially reformatting the journal if it
217 * was in an old format.
218 */
219void MDLog::reopen(MDSInternalContextBase *c)
220{
221 dout(5) << "reopen" << dendl;
222
223 // Because we will call append() at the completion of this, check that we have already
224 // read the whole journal.
225 assert(journaler != NULL);
226 assert(journaler->get_read_pos() == journaler->get_write_pos());
227
228 delete journaler;
229 journaler = NULL;
230
231 // recovery_thread was started at some point in the past. Although
232 // it has called it's completion if we made it back here, it might
233 // still not have been cleaned up: join it.
234 recovery_thread.join();
235
236 recovery_thread.set_completion(new C_ReopenComplete(this, c));
237 recovery_thread.create("md_recov_reopen");
238}
239
240void MDLog::append()
241{
242 dout(5) << "append positioning at end and marking writeable" << dendl;
243 journaler->set_read_pos(journaler->get_write_pos());
244 journaler->set_expire_pos(journaler->get_write_pos());
245
246 journaler->set_writeable();
247
248 logger->set(l_mdl_expos, journaler->get_write_pos());
249}
250
251
252
253// -------------------------------------------------
254
255void MDLog::_start_entry(LogEvent *e)
256{
257 assert(submit_mutex.is_locked_by_me());
258
259 assert(cur_event == NULL);
260 cur_event = e;
261
262 event_seq++;
263
264 EMetaBlob *metablob = e->get_metablob();
265 if (metablob) {
266 metablob->event_seq = event_seq;
267 metablob->last_subtree_map = get_last_segment_seq();
268 }
269}
270
271void MDLog::cancel_entry(LogEvent *le)
272{
273 assert(le == cur_event);
274 cur_event = NULL;
275 delete le;
276}
277
278void MDLog::_submit_entry(LogEvent *le, MDSLogContextBase *c)
279{
280 assert(submit_mutex.is_locked_by_me());
281 assert(!mds->is_any_replay());
282 assert(!capped);
283
284 assert(le == cur_event);
285 cur_event = NULL;
286
287 // let the event register itself in the segment
288 assert(!segments.empty());
289 LogSegment *ls = segments.rbegin()->second;
290 ls->num_events++;
291
292 le->_segment = ls;
293 le->update_segment();
294 le->set_stamp(ceph_clock_now());
295
296 mdsmap_up_features = mds->mdsmap->get_up_features();
297 pending_events[ls->seq].push_back(PendingEvent(le, c));
298 num_events++;
299
300 if (logger) {
301 logger->inc(l_mdl_evadd);
302 logger->set(l_mdl_ev, num_events);
303 }
304
305 unflushed++;
306
307 uint64_t period = journaler->get_layout_period();
308 // start a new segment?
309 if (le->get_type() == EVENT_SUBTREEMAP ||
310 (le->get_type() == EVENT_IMPORTFINISH && mds->is_resolve())) {
311 // avoid infinite loop when ESubtreeMap is very large.
312 // do not insert ESubtreeMap among EImportFinish events that finish
313 // disambiguate imports. Because the ESubtreeMap reflects the subtree
314 // state when all EImportFinish events are replayed.
315 } else if (ls->end/period != ls->offset/period ||
316 ls->num_events >= g_conf->mds_log_events_per_segment) {
317 dout(10) << "submit_entry also starting new segment: last = "
318 << ls->seq << "/" << ls->offset << ", event seq = " << event_seq << dendl;
319 _start_new_segment();
320 } else if (g_conf->mds_debug_subtrees &&
321 le->get_type() != EVENT_SUBTREEMAP_TEST) {
322 // debug: journal this every time to catch subtree replay bugs.
323 // use a different event id so it doesn't get interpreted as a
324 // LogSegment boundary on replay.
325 LogEvent *sle = mds->mdcache->create_subtree_map();
326 sle->set_type(EVENT_SUBTREEMAP_TEST);
327 _submit_entry(sle, NULL);
328 }
329}
330
331/**
332 * Invoked on the flush after each entry submitted
333 */
334class C_MDL_Flushed : public MDSLogContextBase {
335protected:
336 MDLog *mdlog;
337 MDSRank *get_mds() override {return mdlog->mds;}
338 MDSInternalContextBase *wrapped;
339
340 void finish(int r) override {
341 if (wrapped)
342 wrapped->complete(r);
343 }
344
345public:
346 C_MDL_Flushed(MDLog *m, MDSInternalContextBase *w)
347 : mdlog(m), wrapped(w) {}
348 C_MDL_Flushed(MDLog *m, uint64_t wp) : mdlog(m), wrapped(NULL) {
349 set_write_pos(wp);
350 }
351};
352
353void MDLog::_submit_thread()
354{
355 dout(10) << "_submit_thread start" << dendl;
356
357 submit_mutex.Lock();
358
359 while (!mds->is_daemon_stopping()) {
360 if (g_conf->mds_log_pause) {
361 submit_cond.Wait(submit_mutex);
362 continue;
363 }
364
365 map<uint64_t,list<PendingEvent> >::iterator it = pending_events.begin();
366 if (it == pending_events.end()) {
367 submit_cond.Wait(submit_mutex);
368 continue;
369 }
370
371 if (it->second.empty()) {
372 pending_events.erase(it);
373 continue;
374 }
375
376 int64_t features = mdsmap_up_features;
377 PendingEvent data = it->second.front();
378 it->second.pop_front();
379
380 submit_mutex.Unlock();
381
382 if (data.le) {
383 LogEvent *le = data.le;
384 LogSegment *ls = le->_segment;
385 // encode it, with event type
386 bufferlist bl;
387 le->encode_with_header(bl, features);
388
389 uint64_t write_pos = journaler->get_write_pos();
390
391 le->set_start_off(write_pos);
392 if (le->get_type() == EVENT_SUBTREEMAP)
393 ls->offset = write_pos;
394
395 dout(5) << "_submit_thread " << write_pos << "~" << bl.length()
396 << " : " << *le << dendl;
397
398 // journal it.
399 const uint64_t new_write_pos = journaler->append_entry(bl); // bl is destroyed.
400 ls->end = new_write_pos;
401
402 MDSLogContextBase *fin;
403 if (data.fin) {
404 fin = dynamic_cast<MDSLogContextBase*>(data.fin);
405 assert(fin);
406 fin->set_write_pos(new_write_pos);
407 } else {
408 fin = new C_MDL_Flushed(this, new_write_pos);
409 }
410
411 journaler->wait_for_flush(fin);
412
413 if (data.flush)
414 journaler->flush();
415
416 if (logger)
417 logger->set(l_mdl_wrpos, ls->end);
418
419 delete le;
420 } else {
421 if (data.fin) {
422 MDSInternalContextBase* fin =
423 dynamic_cast<MDSInternalContextBase*>(data.fin);
424 assert(fin);
425 C_MDL_Flushed *fin2 = new C_MDL_Flushed(this, fin);
426 fin2->set_write_pos(journaler->get_write_pos());
427 journaler->wait_for_flush(fin2);
428 }
429 if (data.flush)
430 journaler->flush();
431 }
432
433 submit_mutex.Lock();
434 if (data.flush)
435 unflushed = 0;
436 else if (data.le)
437 unflushed++;
438 }
439
440 submit_mutex.Unlock();
441}
442
443void MDLog::wait_for_safe(MDSInternalContextBase *c)
444{
445 submit_mutex.Lock();
446
447 bool no_pending = true;
448 if (!pending_events.empty()) {
449 pending_events.rbegin()->second.push_back(PendingEvent(NULL, c));
450 no_pending = false;
451 submit_cond.Signal();
452 }
453
454 submit_mutex.Unlock();
455
456 if (no_pending && c)
457 journaler->wait_for_flush(new C_IO_Wrapper(mds, c));
458}
459
460void MDLog::flush()
461{
462 submit_mutex.Lock();
463
464 bool do_flush = unflushed > 0;
465 unflushed = 0;
466 if (!pending_events.empty()) {
467 pending_events.rbegin()->second.push_back(PendingEvent(NULL, NULL, true));
468 do_flush = false;
469 submit_cond.Signal();
470 }
471
472 submit_mutex.Unlock();
473
474 if (do_flush)
475 journaler->flush();
476}
477
478void MDLog::kick_submitter()
479{
480 Mutex::Locker l(submit_mutex);
481 submit_cond.Signal();
482}
483
484void MDLog::cap()
485{
486 dout(5) << "cap" << dendl;
487 capped = true;
488}
489
490void MDLog::shutdown()
491{
492 assert(mds->mds_lock.is_locked_by_me());
493
494 dout(5) << "shutdown" << dendl;
495 if (submit_thread.is_started()) {
496 assert(mds->is_daemon_stopping());
497
498 if (submit_thread.am_self()) {
499 // Called suicide from the thread: trust it to do no work after
500 // returning from suicide, and subsequently respect mds->is_daemon_stopping()
501 // and fall out of its loop.
502 } else {
503 mds->mds_lock.Unlock();
504 // Because MDS::stopping is true, it's safe to drop mds_lock: nobody else
505 // picking it up will do anything with it.
506
507 submit_mutex.Lock();
508 submit_cond.Signal();
509 submit_mutex.Unlock();
510
511 mds->mds_lock.Lock();
512
513 submit_thread.join();
514 }
515 }
516
517 // Replay thread can be stuck inside e.g. Journaler::wait_for_readable,
518 // so we need to shutdown the journaler first.
519 if (journaler) {
520 journaler->shutdown();
521 }
522
523 if (replay_thread.is_started() && !replay_thread.am_self()) {
524 mds->mds_lock.Unlock();
525 replay_thread.join();
526 mds->mds_lock.Lock();
527 }
528
529 if (recovery_thread.is_started() && !recovery_thread.am_self()) {
530 mds->mds_lock.Unlock();
531 recovery_thread.join();
532 mds->mds_lock.Lock();
533 }
534}
535
536
537// -----------------------------
538// segments
539
540void MDLog::_start_new_segment()
541{
542 _prepare_new_segment();
543 _journal_segment_subtree_map(NULL);
544}
545
546void MDLog::_prepare_new_segment()
547{
548 assert(submit_mutex.is_locked_by_me());
549
550 uint64_t seq = event_seq + 1;
551 dout(7) << __func__ << " seq " << seq << dendl;
552
553 segments[seq] = new LogSegment(seq);
554
555 logger->inc(l_mdl_segadd);
556 logger->set(l_mdl_seg, segments.size());
557
558 // Adjust to next stray dir
559 dout(10) << "Advancing to next stray directory on mds " << mds->get_nodeid()
560 << dendl;
561 mds->mdcache->advance_stray();
562}
563
564void MDLog::_journal_segment_subtree_map(MDSInternalContextBase *onsync)
565{
566 assert(submit_mutex.is_locked_by_me());
567
568 dout(7) << __func__ << dendl;
569 ESubtreeMap *sle = mds->mdcache->create_subtree_map();
570 sle->event_seq = get_last_segment_seq();
571
572 _submit_entry(sle, new C_MDL_Flushed(this, onsync));
573}
574
575void MDLog::trim(int m)
576{
577 unsigned max_segments = g_conf->mds_log_max_segments;
578 int max_events = g_conf->mds_log_max_events;
579 if (m >= 0)
580 max_events = m;
581
582 if (mds->mdcache->is_readonly()) {
583 dout(10) << "trim, ignoring read-only FS" << dendl;
584 return;
585 }
586
587 // Clamp max_events to not be smaller than events per segment
588 if (max_events > 0 && max_events <= g_conf->mds_log_events_per_segment) {
589 max_events = g_conf->mds_log_events_per_segment + 1;
590 }
591
592 submit_mutex.Lock();
593
594 // trim!
595 dout(10) << "trim "
596 << segments.size() << " / " << max_segments << " segments, "
597 << num_events << " / " << max_events << " events"
598 << ", " << expiring_segments.size() << " (" << expiring_events << ") expiring"
599 << ", " << expired_segments.size() << " (" << expired_events << ") expired"
600 << dendl;
601
602 if (segments.empty()) {
603 submit_mutex.Unlock();
604 return;
605 }
606
607 // hack: only trim for a few seconds at a time
608 utime_t stop = ceph_clock_now();
609 stop += 2.0;
610
b32b8144
FG
611 int op_prio = CEPH_MSG_PRIO_LOW +
612 (CEPH_MSG_PRIO_HIGH - CEPH_MSG_PRIO_LOW) *
613 expiring_segments.size() / max_segments;
614 if (op_prio > CEPH_MSG_PRIO_HIGH)
615 op_prio = CEPH_MSG_PRIO_HIGH;
616
617 unsigned new_expiring_segments = 0;
618
7c673cae 619 map<uint64_t,LogSegment*>::iterator p = segments.begin();
b32b8144 620 while (p != segments.end()) {
7c673cae
FG
621 if (stop < ceph_clock_now())
622 break;
623
b32b8144
FG
624 unsigned num_remaining_segments = (segments.size() - expired_segments.size() - expiring_segments.size());
625 if ((num_remaining_segments <= max_segments) &&
626 (max_events < 0 || num_events - expiring_events - expired_events <= max_events))
7c673cae
FG
627 break;
628
b32b8144
FG
629 // Do not trim too many segments at once for peak workload. If mds keeps creating N segments each tick,
630 // the upper bound of 'num_remaining_segments - max_segments' is '2 * N'
631 if (new_expiring_segments * 2 > num_remaining_segments)
632 break;
7c673cae
FG
633
634 // look at first segment
635 LogSegment *ls = p->second;
636 assert(ls);
637 ++p;
638
639 if (pending_events.count(ls->seq) ||
640 ls->end > safe_pos) {
641 dout(5) << "trim segment " << ls->seq << "/" << ls->offset << ", not fully flushed yet, safe "
642 << journaler->get_write_safe_pos() << " < end " << ls->end << dendl;
643 break;
644 }
645 if (expiring_segments.count(ls)) {
646 dout(5) << "trim already expiring segment " << ls->seq << "/" << ls->offset
647 << ", " << ls->num_events << " events" << dendl;
648 } else if (expired_segments.count(ls)) {
649 dout(5) << "trim already expired segment " << ls->seq << "/" << ls->offset
650 << ", " << ls->num_events << " events" << dendl;
651 } else {
652 assert(expiring_segments.count(ls) == 0);
b32b8144 653 new_expiring_segments++;
7c673cae
FG
654 expiring_segments.insert(ls);
655 expiring_events += ls->num_events;
656 submit_mutex.Unlock();
657
658 uint64_t last_seq = ls->seq;
659 try_expire(ls, op_prio);
660
661 submit_mutex.Lock();
662 p = segments.lower_bound(last_seq + 1);
663 }
664 }
665
666 // discard expired segments and unlock submit_mutex
667 _trim_expired_segments();
668}
669
670class C_MaybeExpiredSegment : public MDSInternalContext {
671 MDLog *mdlog;
672 LogSegment *ls;
673 int op_prio;
674 public:
675 C_MaybeExpiredSegment(MDLog *mdl, LogSegment *s, int p) :
676 MDSInternalContext(mdl->mds), mdlog(mdl), ls(s), op_prio(p) {}
677 void finish(int res) override {
678 if (res < 0)
679 mdlog->mds->handle_write_error(res);
680 mdlog->_maybe_expired(ls, op_prio);
681 }
682};
683
684/**
685 * Like MDLog::trim, but instead of trimming to max_segments, trim all but the latest
686 * segment.
687 */
688int MDLog::trim_all()
689{
690 submit_mutex.Lock();
691
692 dout(10) << __func__ << ": "
693 << segments.size()
694 << "/" << expiring_segments.size()
695 << "/" << expired_segments.size() << dendl;
696
697 uint64_t last_seq = 0;
698 if (!segments.empty())
699 last_seq = get_last_segment_seq();
700
701 map<uint64_t,LogSegment*>::iterator p = segments.begin();
702 while (p != segments.end() &&
f64942e4
AA
703 p->first < last_seq &&
704 p->second->end < safe_pos) { // next segment should have been started
7c673cae
FG
705 LogSegment *ls = p->second;
706 ++p;
707
708 // Caller should have flushed journaler before calling this
709 if (pending_events.count(ls->seq)) {
710 dout(5) << __func__ << ": segment " << ls->seq << " has pending events" << dendl;
711 submit_mutex.Unlock();
712 return -EAGAIN;
713 }
714
715 if (expiring_segments.count(ls)) {
716 dout(5) << "trim already expiring segment " << ls->seq << "/" << ls->offset
717 << ", " << ls->num_events << " events" << dendl;
718 } else if (expired_segments.count(ls)) {
719 dout(5) << "trim already expired segment " << ls->seq << "/" << ls->offset
720 << ", " << ls->num_events << " events" << dendl;
721 } else {
722 assert(expiring_segments.count(ls) == 0);
723 expiring_segments.insert(ls);
724 expiring_events += ls->num_events;
725 submit_mutex.Unlock();
726
727 uint64_t next_seq = ls->seq + 1;
728 try_expire(ls, CEPH_MSG_PRIO_DEFAULT);
729
730 submit_mutex.Lock();
731 p = segments.lower_bound(next_seq);
732 }
733 }
734
735 _trim_expired_segments();
736
737 return 0;
738}
739
740
741void MDLog::try_expire(LogSegment *ls, int op_prio)
742{
743 MDSGatherBuilder gather_bld(g_ceph_context);
744 ls->try_to_expire(mds, gather_bld, op_prio);
745
746 if (gather_bld.has_subs()) {
747 dout(5) << "try_expire expiring segment " << ls->seq << "/" << ls->offset << dendl;
748 gather_bld.set_finisher(new C_MaybeExpiredSegment(this, ls, op_prio));
749 gather_bld.activate();
750 } else {
751 dout(10) << "try_expire expired segment " << ls->seq << "/" << ls->offset << dendl;
752 submit_mutex.Lock();
753 assert(expiring_segments.count(ls));
754 expiring_segments.erase(ls);
755 expiring_events -= ls->num_events;
756 _expired(ls);
757 submit_mutex.Unlock();
758 }
759
760 logger->set(l_mdl_segexg, expiring_segments.size());
761 logger->set(l_mdl_evexg, expiring_events);
762}
763
764void MDLog::_maybe_expired(LogSegment *ls, int op_prio)
765{
766 if (mds->mdcache->is_readonly()) {
767 dout(10) << "_maybe_expired, ignoring read-only FS" << dendl;
768 return;
769 }
770
771 dout(10) << "_maybe_expired segment " << ls->seq << "/" << ls->offset
772 << ", " << ls->num_events << " events" << dendl;
773 try_expire(ls, op_prio);
774}
775
776void MDLog::_trim_expired_segments()
777{
778 assert(submit_mutex.is_locked_by_me());
779
780 // trim expired segments?
781 bool trimmed = false;
782 while (!segments.empty()) {
783 LogSegment *ls = segments.begin()->second;
784 if (!expired_segments.count(ls)) {
785 dout(10) << "_trim_expired_segments waiting for " << ls->seq << "/" << ls->offset
786 << " to expire" << dendl;
787 break;
788 }
789
790 dout(10) << "_trim_expired_segments trimming expired "
791 << ls->seq << "/0x" << std::hex << ls->offset << std::dec << dendl;
792 expired_events -= ls->num_events;
793 expired_segments.erase(ls);
794 num_events -= ls->num_events;
795
796 // this was the oldest segment, adjust expire pos
797 if (journaler->get_expire_pos() < ls->end) {
798 journaler->set_expire_pos(ls->end);
31f18b77
FG
799 logger->set(l_mdl_expos, ls->end);
800 } else {
801 logger->set(l_mdl_expos, ls->offset);
7c673cae
FG
802 }
803
7c673cae
FG
804 logger->inc(l_mdl_segtrm);
805 logger->inc(l_mdl_evtrm, ls->num_events);
806
807 segments.erase(ls->seq);
808 delete ls;
809 trimmed = true;
810 }
811
812 submit_mutex.Unlock();
813
814 if (trimmed)
815 journaler->write_head(0);
816}
817
818void MDLog::trim_expired_segments()
819{
820 submit_mutex.Lock();
821 _trim_expired_segments();
822}
823
824void MDLog::_expired(LogSegment *ls)
825{
826 assert(submit_mutex.is_locked_by_me());
827
828 dout(5) << "_expired segment " << ls->seq << "/" << ls->offset
829 << ", " << ls->num_events << " events" << dendl;
830
831 if (!capped && ls == peek_current_segment()) {
832 dout(5) << "_expired not expiring " << ls->seq << "/" << ls->offset
833 << ", last one and !capped" << dendl;
834 } else {
835 // expired.
836 expired_segments.insert(ls);
837 expired_events += ls->num_events;
838
839 // Trigger all waiters
840 for (std::list<MDSInternalContextBase*>::iterator i = ls->expiry_waiters.begin();
841 i != ls->expiry_waiters.end(); ++i) {
842 (*i)->complete(0);
843 }
844 ls->expiry_waiters.clear();
845
846 logger->inc(l_mdl_evex, ls->num_events);
847 logger->inc(l_mdl_segex);
848 }
849
850 logger->set(l_mdl_ev, num_events);
851 logger->set(l_mdl_evexd, expired_events);
852 logger->set(l_mdl_seg, segments.size());
853 logger->set(l_mdl_segexd, expired_segments.size());
854}
855
856
857
858void MDLog::replay(MDSInternalContextBase *c)
859{
860 assert(journaler->is_active());
861 assert(journaler->is_readonly());
862
863 // empty?
864 if (journaler->get_read_pos() == journaler->get_write_pos()) {
865 dout(10) << "replay - journal empty, done." << dendl;
31f18b77 866 mds->mdcache->trim();
f64942e4
AA
867 if (mds->is_standby_replay())
868 mds->update_mlogger();
7c673cae
FG
869 if (c) {
870 c->complete(0);
871 }
872 return;
873 }
874
875 // add waiter
876 if (c)
877 waitfor_replay.push_back(c);
878
879 // go!
880 dout(10) << "replay start, from " << journaler->get_read_pos()
881 << " to " << journaler->get_write_pos() << dendl;
882
883 assert(num_events == 0 || already_replayed);
884 if (already_replayed) {
885 // Ensure previous instance of ReplayThread is joined before
886 // we create another one
887 replay_thread.join();
888 }
889 already_replayed = true;
890
891 replay_thread.create("md_log_replay");
892}
893
894
895/**
896 * Resolve the JournalPointer object to a journal file, and
897 * instantiate a Journaler object. This may re-write the journal
898 * if the journal in RADOS appears to be in an old format.
899 *
900 * This is a separate thread because of the way it is initialized from inside
901 * the mds lock, which is also the global objecter lock -- rather than split
902 * it up into hard-to-read async operations linked up by contexts,
903 *
904 * When this function completes, the `journaler` attribute will be set to
905 * a Journaler instance using the latest available serialization format.
906 */
907void MDLog::_recovery_thread(MDSInternalContextBase *completion)
908{
909 assert(journaler == NULL);
910 if (g_conf->mds_journal_format > JOURNAL_FORMAT_MAX) {
911 dout(0) << "Configuration value for mds_journal_format is out of bounds, max is "
912 << JOURNAL_FORMAT_MAX << dendl;
913
914 // Oh dear, something unreadable in the store for this rank: require
915 // operator intervention.
916 mds->damaged();
917 ceph_abort(); // damaged should not return
918 }
919
920 // First, read the pointer object.
921 // If the pointer object is not present, then create it with
922 // front = default ino and back = null
923 JournalPointer jp(mds->get_nodeid(), mds->mdsmap->get_metadata_pool());
d2e6a577 924 const int read_result = jp.load(mds->objecter);
7c673cae
FG
925 if (read_result == -ENOENT) {
926 inodeno_t const default_log_ino = MDS_INO_LOG_OFFSET + mds->get_nodeid();
927 jp.front = default_log_ino;
928 int write_result = jp.save(mds->objecter);
929 // Nothing graceful we can do for this
930 assert(write_result >= 0);
931 } else if (read_result == -EBLACKLISTED) {
932 derr << "Blacklisted during JournalPointer read! Respawning..." << dendl;
933 mds->respawn();
934 ceph_abort(); // Should be unreachable because respawn calls execv
935 } else if (read_result != 0) {
936 mds->clog->error() << "failed to read JournalPointer: " << read_result
937 << " (" << cpp_strerror(read_result) << ")";
938 mds->damaged_unlocked();
939 ceph_abort(); // Should be unreachable because damaged() calls respawn()
940 }
941
942 // If the back pointer is non-null, that means that a journal
943 // rewrite failed part way through. Erase the back journal
944 // to clean up.
945 if (jp.back) {
946 if (mds->is_standby_replay()) {
947 dout(1) << "Journal " << jp.front << " is being rewritten, "
948 << "cannot replay in standby until an active MDS completes rewrite" << dendl;
949 Mutex::Locker l(mds->mds_lock);
950 if (mds->is_daemon_stopping()) {
951 return;
952 }
953 completion->complete(-EAGAIN);
954 return;
955 }
956 dout(1) << "Erasing journal " << jp.back << dendl;
957 C_SaferCond erase_waiter;
958 Journaler back("mdlog", jp.back, mds->mdsmap->get_metadata_pool(),
959 CEPH_FS_ONDISK_MAGIC, mds->objecter, logger, l_mdl_jlat,
960 mds->finisher);
961
962 // Read all about this journal (header + extents)
963 C_SaferCond recover_wait;
964 back.recover(&recover_wait);
965 int recovery_result = recover_wait.wait();
966 if (recovery_result == -EBLACKLISTED) {
967 derr << "Blacklisted during journal recovery! Respawning..." << dendl;
968 mds->respawn();
969 ceph_abort(); // Should be unreachable because respawn calls execv
970 } else if (recovery_result != 0) {
971 // Journaler.recover succeeds if no journal objects are present: an error
972 // means something worse like a corrupt header, which we can't handle here.
973 mds->clog->error() << "Error recovering journal " << jp.front << ": "
974 << cpp_strerror(recovery_result);
975 mds->damaged_unlocked();
976 assert(recovery_result == 0); // Unreachable because damaged() calls respawn()
977 }
978
979 // We could read journal, so we can erase it.
980 back.erase(&erase_waiter);
981 int erase_result = erase_waiter.wait();
982
983 // If we are successful, or find no data, we can update the JournalPointer to
984 // reflect that the back journal is gone.
985 if (erase_result != 0 && erase_result != -ENOENT) {
986 derr << "Failed to erase journal " << jp.back << ": " << cpp_strerror(erase_result) << dendl;
987 } else {
988 dout(1) << "Successfully erased journal, updating journal pointer" << dendl;
989 jp.back = 0;
990 int write_result = jp.save(mds->objecter);
991 // Nothing graceful we can do for this
992 assert(write_result >= 0);
993 }
994 }
995
996 /* Read the header from the front journal */
997 Journaler *front_journal = new Journaler("mdlog", jp.front,
998 mds->mdsmap->get_metadata_pool(), CEPH_FS_ONDISK_MAGIC, mds->objecter,
999 logger, l_mdl_jlat, mds->finisher);
1000
1001 // Assign to ::journaler so that we can be aborted by ::shutdown while
1002 // waiting for journaler recovery
1003 {
1004 Mutex::Locker l(mds->mds_lock);
1005 journaler = front_journal;
1006 }
1007
1008 C_SaferCond recover_wait;
1009 front_journal->recover(&recover_wait);
1010 dout(4) << "Waiting for journal " << jp.front << " to recover..." << dendl;
1011 int recovery_result = recover_wait.wait();
1012 dout(4) << "Journal " << jp.front << " recovered." << dendl;
1013
1014 if (recovery_result == -EBLACKLISTED) {
1015 derr << "Blacklisted during journal recovery! Respawning..." << dendl;
1016 mds->respawn();
1017 ceph_abort(); // Should be unreachable because respawn calls execv
1018 } else if (recovery_result != 0) {
1019 mds->clog->error() << "Error recovering journal " << jp.front << ": "
1020 << cpp_strerror(recovery_result);
1021 mds->damaged_unlocked();
1022 assert(recovery_result == 0); // Unreachable because damaged() calls respawn()
1023 }
1024
1025 /* Check whether the front journal format is acceptable or needs re-write */
1026 if (front_journal->get_stream_format() > JOURNAL_FORMAT_MAX) {
1027 dout(0) << "Journal " << jp.front << " is in unknown format " << front_journal->get_stream_format()
1028 << ", does this MDS daemon require upgrade?" << dendl;
1029 {
1030 Mutex::Locker l(mds->mds_lock);
1031 if (mds->is_daemon_stopping()) {
1032 journaler = NULL;
1033 delete front_journal;
1034 return;
1035 }
1036 completion->complete(-EINVAL);
1037 }
1038 } else if (mds->is_standby_replay() || front_journal->get_stream_format() >= g_conf->mds_journal_format) {
1039 /* The journal is of configured format, or we are in standbyreplay and will
1040 * tolerate replaying old journals until we have to go active. Use front_journal as
1041 * our journaler attribute and complete */
1042 dout(4) << "Recovered journal " << jp.front << " in format " << front_journal->get_stream_format() << dendl;
1043 journaler->set_write_error_handler(new C_MDL_WriteError(this));
1044 {
1045 Mutex::Locker l(mds->mds_lock);
1046 if (mds->is_daemon_stopping()) {
1047 return;
1048 }
1049 completion->complete(0);
1050 }
1051 } else {
1052 /* Hand off to reformat routine, which will ultimately set the
1053 * completion when it has done its thing */
1054 dout(1) << "Journal " << jp.front << " has old format "
1055 << front_journal->get_stream_format() << ", it will now be updated" << dendl;
1056 _reformat_journal(jp, front_journal, completion);
1057 }
1058}
1059
1060/**
1061 * Blocking rewrite of the journal to a new file, followed by
1062 * swap of journal pointer to point to the new one.
1063 *
1064 * We write the new journal to the 'back' journal from the JournalPointer,
1065 * swapping pointers to make that one the front journal only when we have
1066 * safely completed.
1067 */
1068void MDLog::_reformat_journal(JournalPointer const &jp_in, Journaler *old_journal, MDSInternalContextBase *completion)
1069{
1070 assert(!jp_in.is_null());
1071 assert(completion != NULL);
1072 assert(old_journal != NULL);
1073
1074 JournalPointer jp = jp_in;
1075
1076 /* Set JournalPointer.back to the location we will write the new journal */
1077 inodeno_t primary_ino = MDS_INO_LOG_OFFSET + mds->get_nodeid();
1078 inodeno_t secondary_ino = MDS_INO_LOG_BACKUP_OFFSET + mds->get_nodeid();
1079 jp.back = (jp.front == primary_ino ? secondary_ino : primary_ino);
1080 int write_result = jp.save(mds->objecter);
1081 assert(write_result == 0);
1082
1083 /* Create the new Journaler file */
1084 Journaler *new_journal = new Journaler("mdlog", jp.back,
1085 mds->mdsmap->get_metadata_pool(), CEPH_FS_ONDISK_MAGIC, mds->objecter, logger, l_mdl_jlat, mds->finisher);
1086 dout(4) << "Writing new journal header " << jp.back << dendl;
1087 file_layout_t new_layout = old_journal->get_layout();
1088 new_journal->set_writeable();
1089 new_journal->create(&new_layout, g_conf->mds_journal_format);
1090
1091 /* Write the new journal header to RADOS */
1092 C_SaferCond write_head_wait;
1093 new_journal->write_head(&write_head_wait);
1094 write_head_wait.wait();
1095
1096 // Read in the old journal, and whenever we have readable events,
1097 // write them to the new journal.
1098 int r = 0;
1099
1100 // In old format journals before event_seq was introduced, the serialized
1101 // offset of a SubtreeMap message in the log is used as the unique ID for
1102 // a log segment. Because we change serialization, this will end up changing
1103 // for us, so we have to explicitly update the fields that point back to that
1104 // log segment.
1105 std::map<log_segment_seq_t, log_segment_seq_t> segment_pos_rewrite;
1106
1107 // The logic in here borrowed from replay_thread expects mds_lock to be held,
1108 // e.g. between checking readable and doing wait_for_readable so that journaler
1109 // state doesn't change in between.
1110 uint32_t events_transcribed = 0;
1111 while (1) {
1112 while (!old_journal->is_readable() &&
1113 old_journal->get_read_pos() < old_journal->get_write_pos() &&
1114 !old_journal->get_error()) {
1115
1116 // Issue a journal prefetch
1117 C_SaferCond readable_waiter;
1118 old_journal->wait_for_readable(&readable_waiter);
1119
1120 // Wait for a journal prefetch to complete
1121 readable_waiter.wait();
1122 }
1123 if (old_journal->get_error()) {
1124 r = old_journal->get_error();
1125 dout(0) << "_replay journaler got error " << r << ", aborting" << dendl;
1126 break;
1127 }
1128
1129 if (!old_journal->is_readable() &&
1130 old_journal->get_read_pos() == old_journal->get_write_pos())
1131 break;
1132
1133 // Read one serialized LogEvent
1134 assert(old_journal->is_readable());
1135 bufferlist bl;
1136 uint64_t le_pos = old_journal->get_read_pos();
1137 bool r = old_journal->try_read_entry(bl);
1138 if (!r && old_journal->get_error())
1139 continue;
1140 assert(r);
1141
1142 // Update segment_pos_rewrite
1143 LogEvent *le = LogEvent::decode(bl);
1144 if (le) {
1145 bool modified = false;
1146
1147 if (le->get_type() == EVENT_SUBTREEMAP ||
1148 le->get_type() == EVENT_RESETJOURNAL) {
1149 ESubtreeMap *sle = dynamic_cast<ESubtreeMap*>(le);
1150 if (sle == NULL || sle->event_seq == 0) {
1151 // A non-explicit event seq: the effective sequence number
1152 // of this segment is it's position in the old journal and
1153 // the new effective sequence number will be its position
1154 // in the new journal.
1155 segment_pos_rewrite[le_pos] = new_journal->get_write_pos();
1156 dout(20) << __func__ << " discovered segment seq mapping "
1157 << le_pos << " -> " << new_journal->get_write_pos() << dendl;
1158 }
1159 } else {
1160 event_seq++;
1161 }
1162
1163 // Rewrite segment references if necessary
1164 EMetaBlob *blob = le->get_metablob();
1165 if (blob) {
1166 modified = blob->rewrite_truncate_finish(mds, segment_pos_rewrite);
1167 }
1168
1169 // Zero-out expire_pos in subtreemap because offsets have changed
1170 // (expire_pos is just an optimization so it's safe to eliminate it)
1171 if (le->get_type() == EVENT_SUBTREEMAP
1172 || le->get_type() == EVENT_SUBTREEMAP_TEST) {
1173 ESubtreeMap *sle = dynamic_cast<ESubtreeMap*>(le);
1174 assert(sle != NULL);
1175 dout(20) << __func__ << " zeroing expire_pos in subtreemap event at "
1176 << le_pos << " seq=" << sle->event_seq << dendl;
1177 sle->expire_pos = 0;
1178 modified = true;
1179 }
1180
1181 if (modified) {
1182 bl.clear();
1183 le->encode_with_header(bl, mds->mdsmap->get_up_features());
1184 }
1185
1186 delete le;
1187 } else {
1188 // Failure from LogEvent::decode, our job is to change the journal wrapper,
1189 // not validate the contents, so pass it through.
1190 dout(1) << __func__ << " transcribing un-decodable LogEvent at old position "
1191 << old_journal->get_read_pos() << ", new position " << new_journal->get_write_pos()
1192 << dendl;
1193 }
1194
1195 // Write (buffered, synchronous) one serialized LogEvent
1196 events_transcribed += 1;
1197 new_journal->append_entry(bl);
1198 }
1199
1200 dout(1) << "Transcribed " << events_transcribed << " events, flushing new journal" << dendl;
1201 C_SaferCond flush_waiter;
1202 new_journal->flush(&flush_waiter);
1203 flush_waiter.wait();
1204
1205 // If failed to rewrite journal, leave the part written journal
1206 // as garbage to be cleaned up next startup.
1207 assert(r == 0);
1208
1209 /* Now that the new journal is safe, we can flip the pointers */
1210 inodeno_t const tmp = jp.front;
1211 jp.front = jp.back;
1212 jp.back = tmp;
1213 write_result = jp.save(mds->objecter);
1214 assert(write_result == 0);
1215
1216 /* Delete the old journal to free space */
1217 dout(1) << "New journal flushed, erasing old journal" << dendl;
1218 C_SaferCond erase_waiter;
1219 old_journal->erase(&erase_waiter);
1220 int erase_result = erase_waiter.wait();
1221 assert(erase_result == 0);
1222 {
1223 Mutex::Locker l(mds->mds_lock);
1224 if (mds->is_daemon_stopping()) {
1225 delete new_journal;
1226 return;
1227 }
1228 assert(journaler == old_journal);
1229 journaler = NULL;
1230 delete old_journal;
1231 }
1232
1233 /* Update the pointer to reflect we're back in clean single journal state. */
1234 jp.back = 0;
1235 write_result = jp.save(mds->objecter);
1236 assert(write_result == 0);
1237
1238 /* Reset the Journaler object to its default state */
1239 dout(1) << "Journal rewrite complete, continuing with normal startup" << dendl;
1240 {
1241 Mutex::Locker l(mds->mds_lock);
1242 if (mds->is_daemon_stopping()) {
1243 delete new_journal;
1244 return;
1245 }
1246 journaler = new_journal;
1247 journaler->set_readonly();
1248 journaler->set_write_error_handler(new C_MDL_WriteError(this));
1249 }
1250
1251 /* Trigger completion */
1252 {
1253 Mutex::Locker l(mds->mds_lock);
1254 if (mds->is_daemon_stopping()) {
1255 return;
1256 }
1257 completion->complete(0);
1258 }
1259}
1260
1261
1262// i am a separate thread
1263void MDLog::_replay_thread()
1264{
1265 dout(10) << "_replay_thread start" << dendl;
1266
1267 // loop
1268 int r = 0;
1269 while (1) {
1270 // wait for read?
1271 while (!journaler->is_readable() &&
1272 journaler->get_read_pos() < journaler->get_write_pos() &&
1273 !journaler->get_error()) {
1274 C_SaferCond readable_waiter;
1275 journaler->wait_for_readable(&readable_waiter);
1276 r = readable_waiter.wait();
1277 }
1278 if (journaler->get_error()) {
1279 r = journaler->get_error();
1280 dout(0) << "_replay journaler got error " << r << ", aborting" << dendl;
1281 if (r == -ENOENT) {
1282 if (mds->is_standby_replay()) {
1283 // journal has been trimmed by somebody else
1284 r = -EAGAIN;
1285 } else {
1286 mds->clog->error() << "missing journal object";
1287 mds->damaged_unlocked();
1288 ceph_abort(); // Should be unreachable because damaged() calls respawn()
1289 }
1290 } else if (r == -EINVAL) {
1291 if (journaler->get_read_pos() < journaler->get_expire_pos()) {
1292 // this should only happen if you're following somebody else
1293 if(journaler->is_readonly()) {
1294 dout(0) << "expire_pos is higher than read_pos, returning EAGAIN" << dendl;
1295 r = -EAGAIN;
1296 } else {
1297 mds->clog->error() << "invalid journaler offsets";
1298 mds->damaged_unlocked();
1299 ceph_abort(); // Should be unreachable because damaged() calls respawn()
1300 }
1301 } else {
1302 /* re-read head and check it
1303 * Given that replay happens in a separate thread and
1304 * the MDS is going to either shut down or restart when
1305 * we return this error, doing it synchronously is fine
1306 * -- as long as we drop the main mds lock--. */
1307 C_SaferCond reread_fin;
1308 journaler->reread_head(&reread_fin);
1309 int err = reread_fin.wait();
1310 if (err) {
1311 if (err == -ENOENT && mds->is_standby_replay()) {
1312 r = -EAGAIN;
1313 dout(1) << "Journal header went away while in standby replay, journal rewritten?"
1314 << dendl;
1315 break;
1316 } else {
1317 dout(0) << "got error while reading head: " << cpp_strerror(err)
1318 << dendl;
1319
1320 mds->clog->error() << "error reading journal header";
1321 mds->damaged_unlocked();
1322 ceph_abort(); // Should be unreachable because damaged() calls
1323 // respawn()
1324 }
1325 }
1326 standby_trim_segments();
1327 if (journaler->get_read_pos() < journaler->get_expire_pos()) {
1328 dout(0) << "expire_pos is higher than read_pos, returning EAGAIN" << dendl;
1329 r = -EAGAIN;
1330 }
1331 }
1332 }
1333 break;
1334 }
1335
1336 if (!journaler->is_readable() &&
1337 journaler->get_read_pos() == journaler->get_write_pos())
1338 break;
1339
1340 assert(journaler->is_readable() || mds->is_daemon_stopping());
1341
1342 // read it
1343 uint64_t pos = journaler->get_read_pos();
1344 bufferlist bl;
1345 bool r = journaler->try_read_entry(bl);
1346 if (!r && journaler->get_error())
1347 continue;
1348 assert(r);
1349
1350 // unpack event
1351 LogEvent *le = LogEvent::decode(bl);
1352 if (!le) {
1353 dout(0) << "_replay " << pos << "~" << bl.length() << " / " << journaler->get_write_pos()
1354 << " -- unable to decode event" << dendl;
1355 dout(0) << "dump of unknown or corrupt event:\n";
1356 bl.hexdump(*_dout);
1357 *_dout << dendl;
1358
1359 mds->clog->error() << "corrupt journal event at " << pos << "~"
1360 << bl.length() << " / "
1361 << journaler->get_write_pos();
1362 if (g_conf->mds_log_skip_corrupt_events) {
1363 continue;
1364 } else {
1365 mds->damaged_unlocked();
1366 ceph_abort(); // Should be unreachable because damaged() calls
1367 // respawn()
1368 }
1369
1370 }
1371 le->set_start_off(pos);
1372
1373 // new segment?
1374 if (le->get_type() == EVENT_SUBTREEMAP ||
1375 le->get_type() == EVENT_RESETJOURNAL) {
1376 ESubtreeMap *sle = dynamic_cast<ESubtreeMap*>(le);
1377 if (sle && sle->event_seq > 0)
1378 event_seq = sle->event_seq;
1379 else
1380 event_seq = pos;
1381 segments[event_seq] = new LogSegment(event_seq, pos);
1382 logger->set(l_mdl_seg, segments.size());
1383 } else {
1384 event_seq++;
1385 }
1386
1387 // have we seen an import map yet?
1388 if (segments.empty()) {
1389 dout(10) << "_replay " << pos << "~" << bl.length() << " / " << journaler->get_write_pos()
1390 << " " << le->get_stamp() << " -- waiting for subtree_map. (skipping " << *le << ")" << dendl;
1391 } else {
1392 dout(10) << "_replay " << pos << "~" << bl.length() << " / " << journaler->get_write_pos()
1393 << " " << le->get_stamp() << ": " << *le << dendl;
1394 le->_segment = get_current_segment(); // replay may need this
1395 le->_segment->num_events++;
1396 le->_segment->end = journaler->get_read_pos();
1397 num_events++;
1398
1399 {
1400 Mutex::Locker l(mds->mds_lock);
1401 if (mds->is_daemon_stopping()) {
1402 return;
1403 }
1404 logger->inc(l_mdl_replayed);
1405 le->replay(mds);
1406 }
1407 }
1408 delete le;
1409
1410 logger->set(l_mdl_rdpos, pos);
1411 }
1412
1413 // done!
1414 if (r == 0) {
1415 assert(journaler->get_read_pos() == journaler->get_write_pos());
1416 dout(10) << "_replay - complete, " << num_events
1417 << " events" << dendl;
1418
1419 logger->set(l_mdl_expos, journaler->get_expire_pos());
1420 }
1421
1422 safe_pos = journaler->get_write_safe_pos();
1423
1424 dout(10) << "_replay_thread kicking waiters" << dendl;
1425 {
1426 Mutex::Locker l(mds->mds_lock);
1427 if (mds->is_daemon_stopping()) {
1428 return;
1429 }
1430 finish_contexts(g_ceph_context, waitfor_replay, r);
1431 }
1432
1433 dout(10) << "_replay_thread finish" << dendl;
1434}
1435
1436void MDLog::standby_trim_segments()
1437{
1438 dout(10) << "standby_trim_segments" << dendl;
1439 uint64_t expire_pos = journaler->get_expire_pos();
1440 dout(10) << " expire_pos=" << expire_pos << dendl;
1441 bool removed_segment = false;
1442 while (have_any_segments()) {
1443 LogSegment *seg = get_oldest_segment();
1444 dout(10) << " segment seq=" << seg->seq << " " << seg->offset <<
1445 "~" << seg->end - seg->offset << dendl;
1446
1447 if (seg->end > expire_pos) {
1448 dout(10) << " won't remove, not expired!" << dendl;
1449 break;
1450 }
1451
1452 if (segments.size() == 1) {
1453 dout(10) << " won't remove, last segment!" << dendl;
1454 break;
1455 }
1456
1457 dout(10) << " removing segment" << dendl;
1458 mds->mdcache->standby_trim_segment(seg);
1459 remove_oldest_segment();
1460 removed_segment = true;
1461 }
1462
1463 if (removed_segment) {
1464 dout(20) << " calling mdcache->trim!" << dendl;
181888fb 1465 mds->mdcache->trim();
7c673cae
FG
1466 } else {
1467 dout(20) << " removed no segments!" << dendl;
1468 }
1469}
1470
1471void MDLog::dump_replay_status(Formatter *f) const
1472{
1473 f->open_object_section("replay_status");
1474 f->dump_unsigned("journal_read_pos", journaler ? journaler->get_read_pos() : 0);
1475 f->dump_unsigned("journal_write_pos", journaler ? journaler->get_write_pos() : 0);
1476 f->dump_unsigned("journal_expire_pos", journaler ? journaler->get_expire_pos() : 0);
1477 f->dump_unsigned("num_events", get_num_events());
1478 f->dump_unsigned("num_segments", get_num_segments());
1479 f->close_section();
1480}