]> git.proxmox.com Git - ceph.git/blame - ceph/src/mds/MDLog.cc
update sources to v12.2.3
[ceph.git] / ceph / src / mds / MDLog.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include "MDSRank.h"
16#include "MDLog.h"
17#include "MDCache.h"
18#include "LogEvent.h"
19#include "MDSContext.h"
20
21#include "osdc/Journaler.h"
22#include "mds/JournalPointer.h"
23
24#include "common/entity_name.h"
25#include "common/perf_counters.h"
26#include "common/Cond.h"
27
28#include "events/ESubtreeMap.h"
29
30#include "common/config.h"
31#include "common/errno.h"
32#include "include/assert.h"
33
34#define dout_context g_ceph_context
35#define dout_subsys ceph_subsys_mds
36#undef dout_prefix
37#define dout_prefix *_dout << "mds." << mds->get_nodeid() << ".log "
38
39// cons/des
40MDLog::~MDLog()
41{
42 if (journaler) { delete journaler; journaler = 0; }
43 if (logger) {
44 g_ceph_context->get_perfcounters_collection()->remove(logger);
45 delete logger;
46 logger = 0;
47 }
48}
49
50
51void MDLog::create_logger()
52{
53 PerfCountersBuilder plb(g_ceph_context, "mds_log", l_mdl_first, l_mdl_last);
54
55 plb.add_u64_counter(l_mdl_evadd, "evadd",
c07f9fc5 56 "Events submitted", "subm", PerfCountersBuilder::PRIO_INTERESTING);
7c673cae
FG
57 plb.add_u64_counter(l_mdl_evex, "evex", "Total expired events");
58 plb.add_u64_counter(l_mdl_evtrm, "evtrm", "Trimmed events");
59 plb.add_u64(l_mdl_ev, "ev",
c07f9fc5 60 "Events", "evts", PerfCountersBuilder::PRIO_INTERESTING);
7c673cae
FG
61 plb.add_u64(l_mdl_evexg, "evexg", "Expiring events");
62 plb.add_u64(l_mdl_evexd, "evexd", "Current expired events");
63
64 plb.add_u64_counter(l_mdl_segadd, "segadd", "Segments added");
65 plb.add_u64_counter(l_mdl_segex, "segex", "Total expired segments");
66 plb.add_u64_counter(l_mdl_segtrm, "segtrm", "Trimmed segments");
67 plb.add_u64(l_mdl_seg, "seg",
c07f9fc5 68 "Segments", "segs", PerfCountersBuilder::PRIO_INTERESTING);
7c673cae
FG
69 plb.add_u64(l_mdl_segexg, "segexg", "Expiring segments");
70 plb.add_u64(l_mdl_segexd, "segexd", "Current expired segments");
71
72 plb.add_u64(l_mdl_expos, "expos", "Journaler xpire position");
73 plb.add_u64(l_mdl_wrpos, "wrpos", "Journaler write position");
74 plb.add_u64(l_mdl_rdpos, "rdpos", "Journaler read position");
75 plb.add_time_avg(l_mdl_jlat, "jlat", "Journaler flush latency");
76
77 plb.add_u64_counter(l_mdl_replayed, "replayed", "Events replayed");
78
79 // logger
80 logger = plb.create_perf_counters();
81 g_ceph_context->get_perfcounters_collection()->add(logger);
82}
83
84void MDLog::set_write_iohint(unsigned iohint_flags)
85{
86 journaler->set_write_iohint(iohint_flags);
87}
88
89class C_MDL_WriteError : public MDSIOContextBase {
90 protected:
91 MDLog *mdlog;
92 MDSRank *get_mds() override {return mdlog->mds;}
93
94 void finish(int r) override {
95 MDSRank *mds = get_mds();
96 // assume journal is reliable, so don't choose action based on
97 // g_conf->mds_action_on_write_error.
98 if (r == -EBLACKLISTED) {
99 derr << "we have been blacklisted (fenced), respawning..." << dendl;
100 mds->respawn();
101 } else {
102 derr << "unhandled error " << cpp_strerror(r) << ", shutting down..." << dendl;
103 // Although it's possible that this could be something transient,
104 // it's severe and scary, so disable this rank until an administrator
105 // intervenes.
106 mds->clog->error() << "Unhandled journal write error on MDS rank " <<
107 mds->get_nodeid() << ": " << cpp_strerror(r) << ", shutting down.";
108 mds->damaged();
109 ceph_abort(); // damaged should never return
110 }
111 }
112
113 public:
114 explicit C_MDL_WriteError(MDLog *m) : mdlog(m) {}
115};
116
117
118void MDLog::write_head(MDSInternalContextBase *c)
119{
120 Context *fin = NULL;
121 if (c != NULL) {
122 fin = new C_IO_Wrapper(mds, c);
123 }
124 journaler->write_head(fin);
125}
126
127uint64_t MDLog::get_read_pos() const
128{
129 return journaler->get_read_pos();
130}
131
132uint64_t MDLog::get_write_pos() const
133{
134 return journaler->get_write_pos();
135}
136
137uint64_t MDLog::get_safe_pos() const
138{
139 return journaler->get_write_safe_pos();
140}
141
142
143
144void MDLog::create(MDSInternalContextBase *c)
145{
146 dout(5) << "create empty log" << dendl;
147
148 C_GatherBuilder gather(g_ceph_context);
149 // This requires an OnFinisher wrapper because Journaler will call back the completion for write_head inside its own lock
150 // XXX but should maybe that be handled inside Journaler?
151 gather.set_finisher(new C_IO_Wrapper(mds, c));
152
153 // The inode of the default Journaler we will create
154 ino = MDS_INO_LOG_OFFSET + mds->get_nodeid();
155
156 // Instantiate Journaler and start async write to RADOS
157 assert(journaler == NULL);
158 journaler = new Journaler("mdlog", ino, mds->mdsmap->get_metadata_pool(),
159 CEPH_FS_ONDISK_MAGIC, mds->objecter, logger,
160 l_mdl_jlat, mds->finisher);
161 assert(journaler->is_readonly());
162 journaler->set_write_error_handler(new C_MDL_WriteError(this));
163 journaler->set_writeable();
164 journaler->create(&mds->mdcache->default_log_layout, g_conf->mds_journal_format);
165 journaler->write_head(gather.new_sub());
166
167 // Async write JournalPointer to RADOS
168 JournalPointer jp(mds->get_nodeid(), mds->mdsmap->get_metadata_pool());
169 jp.front = ino;
170 jp.back = 0;
171 jp.save(mds->objecter, gather.new_sub());
172
173 gather.activate();
174
175 logger->set(l_mdl_expos, journaler->get_expire_pos());
176 logger->set(l_mdl_wrpos, journaler->get_write_pos());
177
178 submit_thread.create("md_submit");
179}
180
181void MDLog::open(MDSInternalContextBase *c)
182{
183 dout(5) << "open discovering log bounds" << dendl;
184
185 assert(!recovery_thread.is_started());
186 recovery_thread.set_completion(c);
187 recovery_thread.create("md_recov_open");
188
189 submit_thread.create("md_submit");
190 // either append() or replay() will follow.
191}
192
193/**
194 * Final part of reopen() procedure, after recovery_thread
195 * has done its thing we call append()
196 */
197class C_ReopenComplete : public MDSInternalContext {
198 MDLog *mdlog;
199 MDSInternalContextBase *on_complete;
200public:
201 C_ReopenComplete(MDLog *mdlog_, MDSInternalContextBase *on_complete_) : MDSInternalContext(mdlog_->mds), mdlog(mdlog_), on_complete(on_complete_) {}
202 void finish(int r) override {
203 mdlog->append();
204 on_complete->complete(r);
205 }
206};
207
208/**
209 * Given that open() has been called in the past, go through the journal
210 * recovery procedure again, potentially reformatting the journal if it
211 * was in an old format.
212 */
213void MDLog::reopen(MDSInternalContextBase *c)
214{
215 dout(5) << "reopen" << dendl;
216
217 // Because we will call append() at the completion of this, check that we have already
218 // read the whole journal.
219 assert(journaler != NULL);
220 assert(journaler->get_read_pos() == journaler->get_write_pos());
221
222 delete journaler;
223 journaler = NULL;
224
225 // recovery_thread was started at some point in the past. Although
226 // it has called it's completion if we made it back here, it might
227 // still not have been cleaned up: join it.
228 recovery_thread.join();
229
230 recovery_thread.set_completion(new C_ReopenComplete(this, c));
231 recovery_thread.create("md_recov_reopen");
232}
233
234void MDLog::append()
235{
236 dout(5) << "append positioning at end and marking writeable" << dendl;
237 journaler->set_read_pos(journaler->get_write_pos());
238 journaler->set_expire_pos(journaler->get_write_pos());
239
240 journaler->set_writeable();
241
242 logger->set(l_mdl_expos, journaler->get_write_pos());
243}
244
245
246
247// -------------------------------------------------
248
249void MDLog::_start_entry(LogEvent *e)
250{
251 assert(submit_mutex.is_locked_by_me());
252
253 assert(cur_event == NULL);
254 cur_event = e;
255
256 event_seq++;
257
258 EMetaBlob *metablob = e->get_metablob();
259 if (metablob) {
260 metablob->event_seq = event_seq;
261 metablob->last_subtree_map = get_last_segment_seq();
262 }
263}
264
265void MDLog::cancel_entry(LogEvent *le)
266{
267 assert(le == cur_event);
268 cur_event = NULL;
269 delete le;
270}
271
272void MDLog::_submit_entry(LogEvent *le, MDSLogContextBase *c)
273{
274 assert(submit_mutex.is_locked_by_me());
275 assert(!mds->is_any_replay());
276 assert(!capped);
277
278 assert(le == cur_event);
279 cur_event = NULL;
280
281 // let the event register itself in the segment
282 assert(!segments.empty());
283 LogSegment *ls = segments.rbegin()->second;
284 ls->num_events++;
285
286 le->_segment = ls;
287 le->update_segment();
288 le->set_stamp(ceph_clock_now());
289
290 mdsmap_up_features = mds->mdsmap->get_up_features();
291 pending_events[ls->seq].push_back(PendingEvent(le, c));
292 num_events++;
293
294 if (logger) {
295 logger->inc(l_mdl_evadd);
296 logger->set(l_mdl_ev, num_events);
297 }
298
299 unflushed++;
300
301 uint64_t period = journaler->get_layout_period();
302 // start a new segment?
303 if (le->get_type() == EVENT_SUBTREEMAP ||
304 (le->get_type() == EVENT_IMPORTFINISH && mds->is_resolve())) {
305 // avoid infinite loop when ESubtreeMap is very large.
306 // do not insert ESubtreeMap among EImportFinish events that finish
307 // disambiguate imports. Because the ESubtreeMap reflects the subtree
308 // state when all EImportFinish events are replayed.
309 } else if (ls->end/period != ls->offset/period ||
310 ls->num_events >= g_conf->mds_log_events_per_segment) {
311 dout(10) << "submit_entry also starting new segment: last = "
312 << ls->seq << "/" << ls->offset << ", event seq = " << event_seq << dendl;
313 _start_new_segment();
314 } else if (g_conf->mds_debug_subtrees &&
315 le->get_type() != EVENT_SUBTREEMAP_TEST) {
316 // debug: journal this every time to catch subtree replay bugs.
317 // use a different event id so it doesn't get interpreted as a
318 // LogSegment boundary on replay.
319 LogEvent *sle = mds->mdcache->create_subtree_map();
320 sle->set_type(EVENT_SUBTREEMAP_TEST);
321 _submit_entry(sle, NULL);
322 }
323}
324
325/**
326 * Invoked on the flush after each entry submitted
327 */
328class C_MDL_Flushed : public MDSLogContextBase {
329protected:
330 MDLog *mdlog;
331 MDSRank *get_mds() override {return mdlog->mds;}
332 MDSInternalContextBase *wrapped;
333
334 void finish(int r) override {
335 if (wrapped)
336 wrapped->complete(r);
337 }
338
339public:
340 C_MDL_Flushed(MDLog *m, MDSInternalContextBase *w)
341 : mdlog(m), wrapped(w) {}
342 C_MDL_Flushed(MDLog *m, uint64_t wp) : mdlog(m), wrapped(NULL) {
343 set_write_pos(wp);
344 }
345};
346
347void MDLog::_submit_thread()
348{
349 dout(10) << "_submit_thread start" << dendl;
350
351 submit_mutex.Lock();
352
353 while (!mds->is_daemon_stopping()) {
354 if (g_conf->mds_log_pause) {
355 submit_cond.Wait(submit_mutex);
356 continue;
357 }
358
359 map<uint64_t,list<PendingEvent> >::iterator it = pending_events.begin();
360 if (it == pending_events.end()) {
361 submit_cond.Wait(submit_mutex);
362 continue;
363 }
364
365 if (it->second.empty()) {
366 pending_events.erase(it);
367 continue;
368 }
369
370 int64_t features = mdsmap_up_features;
371 PendingEvent data = it->second.front();
372 it->second.pop_front();
373
374 submit_mutex.Unlock();
375
376 if (data.le) {
377 LogEvent *le = data.le;
378 LogSegment *ls = le->_segment;
379 // encode it, with event type
380 bufferlist bl;
381 le->encode_with_header(bl, features);
382
383 uint64_t write_pos = journaler->get_write_pos();
384
385 le->set_start_off(write_pos);
386 if (le->get_type() == EVENT_SUBTREEMAP)
387 ls->offset = write_pos;
388
389 dout(5) << "_submit_thread " << write_pos << "~" << bl.length()
390 << " : " << *le << dendl;
391
392 // journal it.
393 const uint64_t new_write_pos = journaler->append_entry(bl); // bl is destroyed.
394 ls->end = new_write_pos;
395
396 MDSLogContextBase *fin;
397 if (data.fin) {
398 fin = dynamic_cast<MDSLogContextBase*>(data.fin);
399 assert(fin);
400 fin->set_write_pos(new_write_pos);
401 } else {
402 fin = new C_MDL_Flushed(this, new_write_pos);
403 }
404
405 journaler->wait_for_flush(fin);
406
407 if (data.flush)
408 journaler->flush();
409
410 if (logger)
411 logger->set(l_mdl_wrpos, ls->end);
412
413 delete le;
414 } else {
415 if (data.fin) {
416 MDSInternalContextBase* fin =
417 dynamic_cast<MDSInternalContextBase*>(data.fin);
418 assert(fin);
419 C_MDL_Flushed *fin2 = new C_MDL_Flushed(this, fin);
420 fin2->set_write_pos(journaler->get_write_pos());
421 journaler->wait_for_flush(fin2);
422 }
423 if (data.flush)
424 journaler->flush();
425 }
426
427 submit_mutex.Lock();
428 if (data.flush)
429 unflushed = 0;
430 else if (data.le)
431 unflushed++;
432 }
433
434 submit_mutex.Unlock();
435}
436
437void MDLog::wait_for_safe(MDSInternalContextBase *c)
438{
439 submit_mutex.Lock();
440
441 bool no_pending = true;
442 if (!pending_events.empty()) {
443 pending_events.rbegin()->second.push_back(PendingEvent(NULL, c));
444 no_pending = false;
445 submit_cond.Signal();
446 }
447
448 submit_mutex.Unlock();
449
450 if (no_pending && c)
451 journaler->wait_for_flush(new C_IO_Wrapper(mds, c));
452}
453
454void MDLog::flush()
455{
456 submit_mutex.Lock();
457
458 bool do_flush = unflushed > 0;
459 unflushed = 0;
460 if (!pending_events.empty()) {
461 pending_events.rbegin()->second.push_back(PendingEvent(NULL, NULL, true));
462 do_flush = false;
463 submit_cond.Signal();
464 }
465
466 submit_mutex.Unlock();
467
468 if (do_flush)
469 journaler->flush();
470}
471
472void MDLog::kick_submitter()
473{
474 Mutex::Locker l(submit_mutex);
475 submit_cond.Signal();
476}
477
478void MDLog::cap()
479{
480 dout(5) << "cap" << dendl;
481 capped = true;
482}
483
484void MDLog::shutdown()
485{
486 assert(mds->mds_lock.is_locked_by_me());
487
488 dout(5) << "shutdown" << dendl;
489 if (submit_thread.is_started()) {
490 assert(mds->is_daemon_stopping());
491
492 if (submit_thread.am_self()) {
493 // Called suicide from the thread: trust it to do no work after
494 // returning from suicide, and subsequently respect mds->is_daemon_stopping()
495 // and fall out of its loop.
496 } else {
497 mds->mds_lock.Unlock();
498 // Because MDS::stopping is true, it's safe to drop mds_lock: nobody else
499 // picking it up will do anything with it.
500
501 submit_mutex.Lock();
502 submit_cond.Signal();
503 submit_mutex.Unlock();
504
505 mds->mds_lock.Lock();
506
507 submit_thread.join();
508 }
509 }
510
511 // Replay thread can be stuck inside e.g. Journaler::wait_for_readable,
512 // so we need to shutdown the journaler first.
513 if (journaler) {
514 journaler->shutdown();
515 }
516
517 if (replay_thread.is_started() && !replay_thread.am_self()) {
518 mds->mds_lock.Unlock();
519 replay_thread.join();
520 mds->mds_lock.Lock();
521 }
522
523 if (recovery_thread.is_started() && !recovery_thread.am_self()) {
524 mds->mds_lock.Unlock();
525 recovery_thread.join();
526 mds->mds_lock.Lock();
527 }
528}
529
530
531// -----------------------------
532// segments
533
534void MDLog::_start_new_segment()
535{
536 _prepare_new_segment();
537 _journal_segment_subtree_map(NULL);
538}
539
540void MDLog::_prepare_new_segment()
541{
542 assert(submit_mutex.is_locked_by_me());
543
544 uint64_t seq = event_seq + 1;
545 dout(7) << __func__ << " seq " << seq << dendl;
546
547 segments[seq] = new LogSegment(seq);
548
549 logger->inc(l_mdl_segadd);
550 logger->set(l_mdl_seg, segments.size());
551
552 // Adjust to next stray dir
553 dout(10) << "Advancing to next stray directory on mds " << mds->get_nodeid()
554 << dendl;
555 mds->mdcache->advance_stray();
556}
557
558void MDLog::_journal_segment_subtree_map(MDSInternalContextBase *onsync)
559{
560 assert(submit_mutex.is_locked_by_me());
561
562 dout(7) << __func__ << dendl;
563 ESubtreeMap *sle = mds->mdcache->create_subtree_map();
564 sle->event_seq = get_last_segment_seq();
565
566 _submit_entry(sle, new C_MDL_Flushed(this, onsync));
567}
568
569void MDLog::trim(int m)
570{
571 unsigned max_segments = g_conf->mds_log_max_segments;
572 int max_events = g_conf->mds_log_max_events;
573 if (m >= 0)
574 max_events = m;
575
576 if (mds->mdcache->is_readonly()) {
577 dout(10) << "trim, ignoring read-only FS" << dendl;
578 return;
579 }
580
581 // Clamp max_events to not be smaller than events per segment
582 if (max_events > 0 && max_events <= g_conf->mds_log_events_per_segment) {
583 max_events = g_conf->mds_log_events_per_segment + 1;
584 }
585
586 submit_mutex.Lock();
587
588 // trim!
589 dout(10) << "trim "
590 << segments.size() << " / " << max_segments << " segments, "
591 << num_events << " / " << max_events << " events"
592 << ", " << expiring_segments.size() << " (" << expiring_events << ") expiring"
593 << ", " << expired_segments.size() << " (" << expired_events << ") expired"
594 << dendl;
595
596 if (segments.empty()) {
597 submit_mutex.Unlock();
598 return;
599 }
600
601 // hack: only trim for a few seconds at a time
602 utime_t stop = ceph_clock_now();
603 stop += 2.0;
604
b32b8144
FG
605 int op_prio = CEPH_MSG_PRIO_LOW +
606 (CEPH_MSG_PRIO_HIGH - CEPH_MSG_PRIO_LOW) *
607 expiring_segments.size() / max_segments;
608 if (op_prio > CEPH_MSG_PRIO_HIGH)
609 op_prio = CEPH_MSG_PRIO_HIGH;
610
611 unsigned new_expiring_segments = 0;
612
7c673cae 613 map<uint64_t,LogSegment*>::iterator p = segments.begin();
b32b8144 614 while (p != segments.end()) {
7c673cae
FG
615 if (stop < ceph_clock_now())
616 break;
617
b32b8144
FG
618 unsigned num_remaining_segments = (segments.size() - expired_segments.size() - expiring_segments.size());
619 if ((num_remaining_segments <= max_segments) &&
620 (max_events < 0 || num_events - expiring_events - expired_events <= max_events))
7c673cae
FG
621 break;
622
b32b8144
FG
623 // Do not trim too many segments at once for peak workload. If mds keeps creating N segments each tick,
624 // the upper bound of 'num_remaining_segments - max_segments' is '2 * N'
625 if (new_expiring_segments * 2 > num_remaining_segments)
626 break;
7c673cae
FG
627
628 // look at first segment
629 LogSegment *ls = p->second;
630 assert(ls);
631 ++p;
632
633 if (pending_events.count(ls->seq) ||
634 ls->end > safe_pos) {
635 dout(5) << "trim segment " << ls->seq << "/" << ls->offset << ", not fully flushed yet, safe "
636 << journaler->get_write_safe_pos() << " < end " << ls->end << dendl;
637 break;
638 }
639 if (expiring_segments.count(ls)) {
640 dout(5) << "trim already expiring segment " << ls->seq << "/" << ls->offset
641 << ", " << ls->num_events << " events" << dendl;
642 } else if (expired_segments.count(ls)) {
643 dout(5) << "trim already expired segment " << ls->seq << "/" << ls->offset
644 << ", " << ls->num_events << " events" << dendl;
645 } else {
646 assert(expiring_segments.count(ls) == 0);
b32b8144 647 new_expiring_segments++;
7c673cae
FG
648 expiring_segments.insert(ls);
649 expiring_events += ls->num_events;
650 submit_mutex.Unlock();
651
652 uint64_t last_seq = ls->seq;
653 try_expire(ls, op_prio);
654
655 submit_mutex.Lock();
656 p = segments.lower_bound(last_seq + 1);
657 }
658 }
659
660 // discard expired segments and unlock submit_mutex
661 _trim_expired_segments();
662}
663
664class C_MaybeExpiredSegment : public MDSInternalContext {
665 MDLog *mdlog;
666 LogSegment *ls;
667 int op_prio;
668 public:
669 C_MaybeExpiredSegment(MDLog *mdl, LogSegment *s, int p) :
670 MDSInternalContext(mdl->mds), mdlog(mdl), ls(s), op_prio(p) {}
671 void finish(int res) override {
672 if (res < 0)
673 mdlog->mds->handle_write_error(res);
674 mdlog->_maybe_expired(ls, op_prio);
675 }
676};
677
678/**
679 * Like MDLog::trim, but instead of trimming to max_segments, trim all but the latest
680 * segment.
681 */
682int MDLog::trim_all()
683{
684 submit_mutex.Lock();
685
686 dout(10) << __func__ << ": "
687 << segments.size()
688 << "/" << expiring_segments.size()
689 << "/" << expired_segments.size() << dendl;
690
691 uint64_t last_seq = 0;
692 if (!segments.empty())
693 last_seq = get_last_segment_seq();
694
695 map<uint64_t,LogSegment*>::iterator p = segments.begin();
696 while (p != segments.end() &&
697 p->first < last_seq && p->second->end <= safe_pos) {
698 LogSegment *ls = p->second;
699 ++p;
700
701 // Caller should have flushed journaler before calling this
702 if (pending_events.count(ls->seq)) {
703 dout(5) << __func__ << ": segment " << ls->seq << " has pending events" << dendl;
704 submit_mutex.Unlock();
705 return -EAGAIN;
706 }
707
708 if (expiring_segments.count(ls)) {
709 dout(5) << "trim already expiring segment " << ls->seq << "/" << ls->offset
710 << ", " << ls->num_events << " events" << dendl;
711 } else if (expired_segments.count(ls)) {
712 dout(5) << "trim already expired segment " << ls->seq << "/" << ls->offset
713 << ", " << ls->num_events << " events" << dendl;
714 } else {
715 assert(expiring_segments.count(ls) == 0);
716 expiring_segments.insert(ls);
717 expiring_events += ls->num_events;
718 submit_mutex.Unlock();
719
720 uint64_t next_seq = ls->seq + 1;
721 try_expire(ls, CEPH_MSG_PRIO_DEFAULT);
722
723 submit_mutex.Lock();
724 p = segments.lower_bound(next_seq);
725 }
726 }
727
728 _trim_expired_segments();
729
730 return 0;
731}
732
733
734void MDLog::try_expire(LogSegment *ls, int op_prio)
735{
736 MDSGatherBuilder gather_bld(g_ceph_context);
737 ls->try_to_expire(mds, gather_bld, op_prio);
738
739 if (gather_bld.has_subs()) {
740 dout(5) << "try_expire expiring segment " << ls->seq << "/" << ls->offset << dendl;
741 gather_bld.set_finisher(new C_MaybeExpiredSegment(this, ls, op_prio));
742 gather_bld.activate();
743 } else {
744 dout(10) << "try_expire expired segment " << ls->seq << "/" << ls->offset << dendl;
745 submit_mutex.Lock();
746 assert(expiring_segments.count(ls));
747 expiring_segments.erase(ls);
748 expiring_events -= ls->num_events;
749 _expired(ls);
750 submit_mutex.Unlock();
751 }
752
753 logger->set(l_mdl_segexg, expiring_segments.size());
754 logger->set(l_mdl_evexg, expiring_events);
755}
756
757void MDLog::_maybe_expired(LogSegment *ls, int op_prio)
758{
759 if (mds->mdcache->is_readonly()) {
760 dout(10) << "_maybe_expired, ignoring read-only FS" << dendl;
761 return;
762 }
763
764 dout(10) << "_maybe_expired segment " << ls->seq << "/" << ls->offset
765 << ", " << ls->num_events << " events" << dendl;
766 try_expire(ls, op_prio);
767}
768
769void MDLog::_trim_expired_segments()
770{
771 assert(submit_mutex.is_locked_by_me());
772
773 // trim expired segments?
774 bool trimmed = false;
775 while (!segments.empty()) {
776 LogSegment *ls = segments.begin()->second;
777 if (!expired_segments.count(ls)) {
778 dout(10) << "_trim_expired_segments waiting for " << ls->seq << "/" << ls->offset
779 << " to expire" << dendl;
780 break;
781 }
782
783 dout(10) << "_trim_expired_segments trimming expired "
784 << ls->seq << "/0x" << std::hex << ls->offset << std::dec << dendl;
785 expired_events -= ls->num_events;
786 expired_segments.erase(ls);
787 num_events -= ls->num_events;
788
789 // this was the oldest segment, adjust expire pos
790 if (journaler->get_expire_pos() < ls->end) {
791 journaler->set_expire_pos(ls->end);
31f18b77
FG
792 logger->set(l_mdl_expos, ls->end);
793 } else {
794 logger->set(l_mdl_expos, ls->offset);
7c673cae
FG
795 }
796
7c673cae
FG
797 logger->inc(l_mdl_segtrm);
798 logger->inc(l_mdl_evtrm, ls->num_events);
799
800 segments.erase(ls->seq);
801 delete ls;
802 trimmed = true;
803 }
804
805 submit_mutex.Unlock();
806
807 if (trimmed)
808 journaler->write_head(0);
809}
810
811void MDLog::trim_expired_segments()
812{
813 submit_mutex.Lock();
814 _trim_expired_segments();
815}
816
817void MDLog::_expired(LogSegment *ls)
818{
819 assert(submit_mutex.is_locked_by_me());
820
821 dout(5) << "_expired segment " << ls->seq << "/" << ls->offset
822 << ", " << ls->num_events << " events" << dendl;
823
824 if (!capped && ls == peek_current_segment()) {
825 dout(5) << "_expired not expiring " << ls->seq << "/" << ls->offset
826 << ", last one and !capped" << dendl;
827 } else {
828 // expired.
829 expired_segments.insert(ls);
830 expired_events += ls->num_events;
831
832 // Trigger all waiters
833 for (std::list<MDSInternalContextBase*>::iterator i = ls->expiry_waiters.begin();
834 i != ls->expiry_waiters.end(); ++i) {
835 (*i)->complete(0);
836 }
837 ls->expiry_waiters.clear();
838
839 logger->inc(l_mdl_evex, ls->num_events);
840 logger->inc(l_mdl_segex);
841 }
842
843 logger->set(l_mdl_ev, num_events);
844 logger->set(l_mdl_evexd, expired_events);
845 logger->set(l_mdl_seg, segments.size());
846 logger->set(l_mdl_segexd, expired_segments.size());
847}
848
849
850
851void MDLog::replay(MDSInternalContextBase *c)
852{
853 assert(journaler->is_active());
854 assert(journaler->is_readonly());
855
856 // empty?
857 if (journaler->get_read_pos() == journaler->get_write_pos()) {
858 dout(10) << "replay - journal empty, done." << dendl;
31f18b77 859 mds->mdcache->trim();
7c673cae
FG
860 if (c) {
861 c->complete(0);
862 }
863 return;
864 }
865
866 // add waiter
867 if (c)
868 waitfor_replay.push_back(c);
869
870 // go!
871 dout(10) << "replay start, from " << journaler->get_read_pos()
872 << " to " << journaler->get_write_pos() << dendl;
873
874 assert(num_events == 0 || already_replayed);
875 if (already_replayed) {
876 // Ensure previous instance of ReplayThread is joined before
877 // we create another one
878 replay_thread.join();
879 }
880 already_replayed = true;
881
882 replay_thread.create("md_log_replay");
883}
884
885
886/**
887 * Resolve the JournalPointer object to a journal file, and
888 * instantiate a Journaler object. This may re-write the journal
889 * if the journal in RADOS appears to be in an old format.
890 *
891 * This is a separate thread because of the way it is initialized from inside
892 * the mds lock, which is also the global objecter lock -- rather than split
893 * it up into hard-to-read async operations linked up by contexts,
894 *
895 * When this function completes, the `journaler` attribute will be set to
896 * a Journaler instance using the latest available serialization format.
897 */
898void MDLog::_recovery_thread(MDSInternalContextBase *completion)
899{
900 assert(journaler == NULL);
901 if (g_conf->mds_journal_format > JOURNAL_FORMAT_MAX) {
902 dout(0) << "Configuration value for mds_journal_format is out of bounds, max is "
903 << JOURNAL_FORMAT_MAX << dendl;
904
905 // Oh dear, something unreadable in the store for this rank: require
906 // operator intervention.
907 mds->damaged();
908 ceph_abort(); // damaged should not return
909 }
910
911 // First, read the pointer object.
912 // If the pointer object is not present, then create it with
913 // front = default ino and back = null
914 JournalPointer jp(mds->get_nodeid(), mds->mdsmap->get_metadata_pool());
d2e6a577 915 const int read_result = jp.load(mds->objecter);
7c673cae
FG
916 if (read_result == -ENOENT) {
917 inodeno_t const default_log_ino = MDS_INO_LOG_OFFSET + mds->get_nodeid();
918 jp.front = default_log_ino;
919 int write_result = jp.save(mds->objecter);
920 // Nothing graceful we can do for this
921 assert(write_result >= 0);
922 } else if (read_result == -EBLACKLISTED) {
923 derr << "Blacklisted during JournalPointer read! Respawning..." << dendl;
924 mds->respawn();
925 ceph_abort(); // Should be unreachable because respawn calls execv
926 } else if (read_result != 0) {
927 mds->clog->error() << "failed to read JournalPointer: " << read_result
928 << " (" << cpp_strerror(read_result) << ")";
929 mds->damaged_unlocked();
930 ceph_abort(); // Should be unreachable because damaged() calls respawn()
931 }
932
933 // If the back pointer is non-null, that means that a journal
934 // rewrite failed part way through. Erase the back journal
935 // to clean up.
936 if (jp.back) {
937 if (mds->is_standby_replay()) {
938 dout(1) << "Journal " << jp.front << " is being rewritten, "
939 << "cannot replay in standby until an active MDS completes rewrite" << dendl;
940 Mutex::Locker l(mds->mds_lock);
941 if (mds->is_daemon_stopping()) {
942 return;
943 }
944 completion->complete(-EAGAIN);
945 return;
946 }
947 dout(1) << "Erasing journal " << jp.back << dendl;
948 C_SaferCond erase_waiter;
949 Journaler back("mdlog", jp.back, mds->mdsmap->get_metadata_pool(),
950 CEPH_FS_ONDISK_MAGIC, mds->objecter, logger, l_mdl_jlat,
951 mds->finisher);
952
953 // Read all about this journal (header + extents)
954 C_SaferCond recover_wait;
955 back.recover(&recover_wait);
956 int recovery_result = recover_wait.wait();
957 if (recovery_result == -EBLACKLISTED) {
958 derr << "Blacklisted during journal recovery! Respawning..." << dendl;
959 mds->respawn();
960 ceph_abort(); // Should be unreachable because respawn calls execv
961 } else if (recovery_result != 0) {
962 // Journaler.recover succeeds if no journal objects are present: an error
963 // means something worse like a corrupt header, which we can't handle here.
964 mds->clog->error() << "Error recovering journal " << jp.front << ": "
965 << cpp_strerror(recovery_result);
966 mds->damaged_unlocked();
967 assert(recovery_result == 0); // Unreachable because damaged() calls respawn()
968 }
969
970 // We could read journal, so we can erase it.
971 back.erase(&erase_waiter);
972 int erase_result = erase_waiter.wait();
973
974 // If we are successful, or find no data, we can update the JournalPointer to
975 // reflect that the back journal is gone.
976 if (erase_result != 0 && erase_result != -ENOENT) {
977 derr << "Failed to erase journal " << jp.back << ": " << cpp_strerror(erase_result) << dendl;
978 } else {
979 dout(1) << "Successfully erased journal, updating journal pointer" << dendl;
980 jp.back = 0;
981 int write_result = jp.save(mds->objecter);
982 // Nothing graceful we can do for this
983 assert(write_result >= 0);
984 }
985 }
986
987 /* Read the header from the front journal */
988 Journaler *front_journal = new Journaler("mdlog", jp.front,
989 mds->mdsmap->get_metadata_pool(), CEPH_FS_ONDISK_MAGIC, mds->objecter,
990 logger, l_mdl_jlat, mds->finisher);
991
992 // Assign to ::journaler so that we can be aborted by ::shutdown while
993 // waiting for journaler recovery
994 {
995 Mutex::Locker l(mds->mds_lock);
996 journaler = front_journal;
997 }
998
999 C_SaferCond recover_wait;
1000 front_journal->recover(&recover_wait);
1001 dout(4) << "Waiting for journal " << jp.front << " to recover..." << dendl;
1002 int recovery_result = recover_wait.wait();
1003 dout(4) << "Journal " << jp.front << " recovered." << dendl;
1004
1005 if (recovery_result == -EBLACKLISTED) {
1006 derr << "Blacklisted during journal recovery! Respawning..." << dendl;
1007 mds->respawn();
1008 ceph_abort(); // Should be unreachable because respawn calls execv
1009 } else if (recovery_result != 0) {
1010 mds->clog->error() << "Error recovering journal " << jp.front << ": "
1011 << cpp_strerror(recovery_result);
1012 mds->damaged_unlocked();
1013 assert(recovery_result == 0); // Unreachable because damaged() calls respawn()
1014 }
1015
1016 /* Check whether the front journal format is acceptable or needs re-write */
1017 if (front_journal->get_stream_format() > JOURNAL_FORMAT_MAX) {
1018 dout(0) << "Journal " << jp.front << " is in unknown format " << front_journal->get_stream_format()
1019 << ", does this MDS daemon require upgrade?" << dendl;
1020 {
1021 Mutex::Locker l(mds->mds_lock);
1022 if (mds->is_daemon_stopping()) {
1023 journaler = NULL;
1024 delete front_journal;
1025 return;
1026 }
1027 completion->complete(-EINVAL);
1028 }
1029 } else if (mds->is_standby_replay() || front_journal->get_stream_format() >= g_conf->mds_journal_format) {
1030 /* The journal is of configured format, or we are in standbyreplay and will
1031 * tolerate replaying old journals until we have to go active. Use front_journal as
1032 * our journaler attribute and complete */
1033 dout(4) << "Recovered journal " << jp.front << " in format " << front_journal->get_stream_format() << dendl;
1034 journaler->set_write_error_handler(new C_MDL_WriteError(this));
1035 {
1036 Mutex::Locker l(mds->mds_lock);
1037 if (mds->is_daemon_stopping()) {
1038 return;
1039 }
1040 completion->complete(0);
1041 }
1042 } else {
1043 /* Hand off to reformat routine, which will ultimately set the
1044 * completion when it has done its thing */
1045 dout(1) << "Journal " << jp.front << " has old format "
1046 << front_journal->get_stream_format() << ", it will now be updated" << dendl;
1047 _reformat_journal(jp, front_journal, completion);
1048 }
1049}
1050
1051/**
1052 * Blocking rewrite of the journal to a new file, followed by
1053 * swap of journal pointer to point to the new one.
1054 *
1055 * We write the new journal to the 'back' journal from the JournalPointer,
1056 * swapping pointers to make that one the front journal only when we have
1057 * safely completed.
1058 */
1059void MDLog::_reformat_journal(JournalPointer const &jp_in, Journaler *old_journal, MDSInternalContextBase *completion)
1060{
1061 assert(!jp_in.is_null());
1062 assert(completion != NULL);
1063 assert(old_journal != NULL);
1064
1065 JournalPointer jp = jp_in;
1066
1067 /* Set JournalPointer.back to the location we will write the new journal */
1068 inodeno_t primary_ino = MDS_INO_LOG_OFFSET + mds->get_nodeid();
1069 inodeno_t secondary_ino = MDS_INO_LOG_BACKUP_OFFSET + mds->get_nodeid();
1070 jp.back = (jp.front == primary_ino ? secondary_ino : primary_ino);
1071 int write_result = jp.save(mds->objecter);
1072 assert(write_result == 0);
1073
1074 /* Create the new Journaler file */
1075 Journaler *new_journal = new Journaler("mdlog", jp.back,
1076 mds->mdsmap->get_metadata_pool(), CEPH_FS_ONDISK_MAGIC, mds->objecter, logger, l_mdl_jlat, mds->finisher);
1077 dout(4) << "Writing new journal header " << jp.back << dendl;
1078 file_layout_t new_layout = old_journal->get_layout();
1079 new_journal->set_writeable();
1080 new_journal->create(&new_layout, g_conf->mds_journal_format);
1081
1082 /* Write the new journal header to RADOS */
1083 C_SaferCond write_head_wait;
1084 new_journal->write_head(&write_head_wait);
1085 write_head_wait.wait();
1086
1087 // Read in the old journal, and whenever we have readable events,
1088 // write them to the new journal.
1089 int r = 0;
1090
1091 // In old format journals before event_seq was introduced, the serialized
1092 // offset of a SubtreeMap message in the log is used as the unique ID for
1093 // a log segment. Because we change serialization, this will end up changing
1094 // for us, so we have to explicitly update the fields that point back to that
1095 // log segment.
1096 std::map<log_segment_seq_t, log_segment_seq_t> segment_pos_rewrite;
1097
1098 // The logic in here borrowed from replay_thread expects mds_lock to be held,
1099 // e.g. between checking readable and doing wait_for_readable so that journaler
1100 // state doesn't change in between.
1101 uint32_t events_transcribed = 0;
1102 while (1) {
1103 while (!old_journal->is_readable() &&
1104 old_journal->get_read_pos() < old_journal->get_write_pos() &&
1105 !old_journal->get_error()) {
1106
1107 // Issue a journal prefetch
1108 C_SaferCond readable_waiter;
1109 old_journal->wait_for_readable(&readable_waiter);
1110
1111 // Wait for a journal prefetch to complete
1112 readable_waiter.wait();
1113 }
1114 if (old_journal->get_error()) {
1115 r = old_journal->get_error();
1116 dout(0) << "_replay journaler got error " << r << ", aborting" << dendl;
1117 break;
1118 }
1119
1120 if (!old_journal->is_readable() &&
1121 old_journal->get_read_pos() == old_journal->get_write_pos())
1122 break;
1123
1124 // Read one serialized LogEvent
1125 assert(old_journal->is_readable());
1126 bufferlist bl;
1127 uint64_t le_pos = old_journal->get_read_pos();
1128 bool r = old_journal->try_read_entry(bl);
1129 if (!r && old_journal->get_error())
1130 continue;
1131 assert(r);
1132
1133 // Update segment_pos_rewrite
1134 LogEvent *le = LogEvent::decode(bl);
1135 if (le) {
1136 bool modified = false;
1137
1138 if (le->get_type() == EVENT_SUBTREEMAP ||
1139 le->get_type() == EVENT_RESETJOURNAL) {
1140 ESubtreeMap *sle = dynamic_cast<ESubtreeMap*>(le);
1141 if (sle == NULL || sle->event_seq == 0) {
1142 // A non-explicit event seq: the effective sequence number
1143 // of this segment is it's position in the old journal and
1144 // the new effective sequence number will be its position
1145 // in the new journal.
1146 segment_pos_rewrite[le_pos] = new_journal->get_write_pos();
1147 dout(20) << __func__ << " discovered segment seq mapping "
1148 << le_pos << " -> " << new_journal->get_write_pos() << dendl;
1149 }
1150 } else {
1151 event_seq++;
1152 }
1153
1154 // Rewrite segment references if necessary
1155 EMetaBlob *blob = le->get_metablob();
1156 if (blob) {
1157 modified = blob->rewrite_truncate_finish(mds, segment_pos_rewrite);
1158 }
1159
1160 // Zero-out expire_pos in subtreemap because offsets have changed
1161 // (expire_pos is just an optimization so it's safe to eliminate it)
1162 if (le->get_type() == EVENT_SUBTREEMAP
1163 || le->get_type() == EVENT_SUBTREEMAP_TEST) {
1164 ESubtreeMap *sle = dynamic_cast<ESubtreeMap*>(le);
1165 assert(sle != NULL);
1166 dout(20) << __func__ << " zeroing expire_pos in subtreemap event at "
1167 << le_pos << " seq=" << sle->event_seq << dendl;
1168 sle->expire_pos = 0;
1169 modified = true;
1170 }
1171
1172 if (modified) {
1173 bl.clear();
1174 le->encode_with_header(bl, mds->mdsmap->get_up_features());
1175 }
1176
1177 delete le;
1178 } else {
1179 // Failure from LogEvent::decode, our job is to change the journal wrapper,
1180 // not validate the contents, so pass it through.
1181 dout(1) << __func__ << " transcribing un-decodable LogEvent at old position "
1182 << old_journal->get_read_pos() << ", new position " << new_journal->get_write_pos()
1183 << dendl;
1184 }
1185
1186 // Write (buffered, synchronous) one serialized LogEvent
1187 events_transcribed += 1;
1188 new_journal->append_entry(bl);
1189 }
1190
1191 dout(1) << "Transcribed " << events_transcribed << " events, flushing new journal" << dendl;
1192 C_SaferCond flush_waiter;
1193 new_journal->flush(&flush_waiter);
1194 flush_waiter.wait();
1195
1196 // If failed to rewrite journal, leave the part written journal
1197 // as garbage to be cleaned up next startup.
1198 assert(r == 0);
1199
1200 /* Now that the new journal is safe, we can flip the pointers */
1201 inodeno_t const tmp = jp.front;
1202 jp.front = jp.back;
1203 jp.back = tmp;
1204 write_result = jp.save(mds->objecter);
1205 assert(write_result == 0);
1206
1207 /* Delete the old journal to free space */
1208 dout(1) << "New journal flushed, erasing old journal" << dendl;
1209 C_SaferCond erase_waiter;
1210 old_journal->erase(&erase_waiter);
1211 int erase_result = erase_waiter.wait();
1212 assert(erase_result == 0);
1213 {
1214 Mutex::Locker l(mds->mds_lock);
1215 if (mds->is_daemon_stopping()) {
1216 delete new_journal;
1217 return;
1218 }
1219 assert(journaler == old_journal);
1220 journaler = NULL;
1221 delete old_journal;
1222 }
1223
1224 /* Update the pointer to reflect we're back in clean single journal state. */
1225 jp.back = 0;
1226 write_result = jp.save(mds->objecter);
1227 assert(write_result == 0);
1228
1229 /* Reset the Journaler object to its default state */
1230 dout(1) << "Journal rewrite complete, continuing with normal startup" << dendl;
1231 {
1232 Mutex::Locker l(mds->mds_lock);
1233 if (mds->is_daemon_stopping()) {
1234 delete new_journal;
1235 return;
1236 }
1237 journaler = new_journal;
1238 journaler->set_readonly();
1239 journaler->set_write_error_handler(new C_MDL_WriteError(this));
1240 }
1241
1242 /* Trigger completion */
1243 {
1244 Mutex::Locker l(mds->mds_lock);
1245 if (mds->is_daemon_stopping()) {
1246 return;
1247 }
1248 completion->complete(0);
1249 }
1250}
1251
1252
1253// i am a separate thread
1254void MDLog::_replay_thread()
1255{
1256 dout(10) << "_replay_thread start" << dendl;
1257
1258 // loop
1259 int r = 0;
1260 while (1) {
1261 // wait for read?
1262 while (!journaler->is_readable() &&
1263 journaler->get_read_pos() < journaler->get_write_pos() &&
1264 !journaler->get_error()) {
1265 C_SaferCond readable_waiter;
1266 journaler->wait_for_readable(&readable_waiter);
1267 r = readable_waiter.wait();
1268 }
1269 if (journaler->get_error()) {
1270 r = journaler->get_error();
1271 dout(0) << "_replay journaler got error " << r << ", aborting" << dendl;
1272 if (r == -ENOENT) {
1273 if (mds->is_standby_replay()) {
1274 // journal has been trimmed by somebody else
1275 r = -EAGAIN;
1276 } else {
1277 mds->clog->error() << "missing journal object";
1278 mds->damaged_unlocked();
1279 ceph_abort(); // Should be unreachable because damaged() calls respawn()
1280 }
1281 } else if (r == -EINVAL) {
1282 if (journaler->get_read_pos() < journaler->get_expire_pos()) {
1283 // this should only happen if you're following somebody else
1284 if(journaler->is_readonly()) {
1285 dout(0) << "expire_pos is higher than read_pos, returning EAGAIN" << dendl;
1286 r = -EAGAIN;
1287 } else {
1288 mds->clog->error() << "invalid journaler offsets";
1289 mds->damaged_unlocked();
1290 ceph_abort(); // Should be unreachable because damaged() calls respawn()
1291 }
1292 } else {
1293 /* re-read head and check it
1294 * Given that replay happens in a separate thread and
1295 * the MDS is going to either shut down or restart when
1296 * we return this error, doing it synchronously is fine
1297 * -- as long as we drop the main mds lock--. */
1298 C_SaferCond reread_fin;
1299 journaler->reread_head(&reread_fin);
1300 int err = reread_fin.wait();
1301 if (err) {
1302 if (err == -ENOENT && mds->is_standby_replay()) {
1303 r = -EAGAIN;
1304 dout(1) << "Journal header went away while in standby replay, journal rewritten?"
1305 << dendl;
1306 break;
1307 } else {
1308 dout(0) << "got error while reading head: " << cpp_strerror(err)
1309 << dendl;
1310
1311 mds->clog->error() << "error reading journal header";
1312 mds->damaged_unlocked();
1313 ceph_abort(); // Should be unreachable because damaged() calls
1314 // respawn()
1315 }
1316 }
1317 standby_trim_segments();
1318 if (journaler->get_read_pos() < journaler->get_expire_pos()) {
1319 dout(0) << "expire_pos is higher than read_pos, returning EAGAIN" << dendl;
1320 r = -EAGAIN;
1321 }
1322 }
1323 }
1324 break;
1325 }
1326
1327 if (!journaler->is_readable() &&
1328 journaler->get_read_pos() == journaler->get_write_pos())
1329 break;
1330
1331 assert(journaler->is_readable() || mds->is_daemon_stopping());
1332
1333 // read it
1334 uint64_t pos = journaler->get_read_pos();
1335 bufferlist bl;
1336 bool r = journaler->try_read_entry(bl);
1337 if (!r && journaler->get_error())
1338 continue;
1339 assert(r);
1340
1341 // unpack event
1342 LogEvent *le = LogEvent::decode(bl);
1343 if (!le) {
1344 dout(0) << "_replay " << pos << "~" << bl.length() << " / " << journaler->get_write_pos()
1345 << " -- unable to decode event" << dendl;
1346 dout(0) << "dump of unknown or corrupt event:\n";
1347 bl.hexdump(*_dout);
1348 *_dout << dendl;
1349
1350 mds->clog->error() << "corrupt journal event at " << pos << "~"
1351 << bl.length() << " / "
1352 << journaler->get_write_pos();
1353 if (g_conf->mds_log_skip_corrupt_events) {
1354 continue;
1355 } else {
1356 mds->damaged_unlocked();
1357 ceph_abort(); // Should be unreachable because damaged() calls
1358 // respawn()
1359 }
1360
1361 }
1362 le->set_start_off(pos);
1363
1364 // new segment?
1365 if (le->get_type() == EVENT_SUBTREEMAP ||
1366 le->get_type() == EVENT_RESETJOURNAL) {
1367 ESubtreeMap *sle = dynamic_cast<ESubtreeMap*>(le);
1368 if (sle && sle->event_seq > 0)
1369 event_seq = sle->event_seq;
1370 else
1371 event_seq = pos;
1372 segments[event_seq] = new LogSegment(event_seq, pos);
1373 logger->set(l_mdl_seg, segments.size());
1374 } else {
1375 event_seq++;
1376 }
1377
1378 // have we seen an import map yet?
1379 if (segments.empty()) {
1380 dout(10) << "_replay " << pos << "~" << bl.length() << " / " << journaler->get_write_pos()
1381 << " " << le->get_stamp() << " -- waiting for subtree_map. (skipping " << *le << ")" << dendl;
1382 } else {
1383 dout(10) << "_replay " << pos << "~" << bl.length() << " / " << journaler->get_write_pos()
1384 << " " << le->get_stamp() << ": " << *le << dendl;
1385 le->_segment = get_current_segment(); // replay may need this
1386 le->_segment->num_events++;
1387 le->_segment->end = journaler->get_read_pos();
1388 num_events++;
1389
1390 {
1391 Mutex::Locker l(mds->mds_lock);
1392 if (mds->is_daemon_stopping()) {
1393 return;
1394 }
1395 logger->inc(l_mdl_replayed);
1396 le->replay(mds);
1397 }
1398 }
1399 delete le;
1400
1401 logger->set(l_mdl_rdpos, pos);
1402 }
1403
1404 // done!
1405 if (r == 0) {
1406 assert(journaler->get_read_pos() == journaler->get_write_pos());
1407 dout(10) << "_replay - complete, " << num_events
1408 << " events" << dendl;
1409
1410 logger->set(l_mdl_expos, journaler->get_expire_pos());
1411 }
1412
1413 safe_pos = journaler->get_write_safe_pos();
1414
1415 dout(10) << "_replay_thread kicking waiters" << dendl;
1416 {
1417 Mutex::Locker l(mds->mds_lock);
1418 if (mds->is_daemon_stopping()) {
1419 return;
1420 }
1421 finish_contexts(g_ceph_context, waitfor_replay, r);
1422 }
1423
1424 dout(10) << "_replay_thread finish" << dendl;
1425}
1426
1427void MDLog::standby_trim_segments()
1428{
1429 dout(10) << "standby_trim_segments" << dendl;
1430 uint64_t expire_pos = journaler->get_expire_pos();
1431 dout(10) << " expire_pos=" << expire_pos << dendl;
1432 bool removed_segment = false;
1433 while (have_any_segments()) {
1434 LogSegment *seg = get_oldest_segment();
1435 dout(10) << " segment seq=" << seg->seq << " " << seg->offset <<
1436 "~" << seg->end - seg->offset << dendl;
1437
1438 if (seg->end > expire_pos) {
1439 dout(10) << " won't remove, not expired!" << dendl;
1440 break;
1441 }
1442
1443 if (segments.size() == 1) {
1444 dout(10) << " won't remove, last segment!" << dendl;
1445 break;
1446 }
1447
1448 dout(10) << " removing segment" << dendl;
1449 mds->mdcache->standby_trim_segment(seg);
1450 remove_oldest_segment();
1451 removed_segment = true;
1452 }
1453
1454 if (removed_segment) {
1455 dout(20) << " calling mdcache->trim!" << dendl;
181888fb 1456 mds->mdcache->trim();
7c673cae
FG
1457 } else {
1458 dout(20) << " removed no segments!" << dendl;
1459 }
1460}
1461
1462void MDLog::dump_replay_status(Formatter *f) const
1463{
1464 f->open_object_section("replay_status");
1465 f->dump_unsigned("journal_read_pos", journaler ? journaler->get_read_pos() : 0);
1466 f->dump_unsigned("journal_write_pos", journaler ? journaler->get_write_pos() : 0);
1467 f->dump_unsigned("journal_expire_pos", journaler ? journaler->get_expire_pos() : 0);
1468 f->dump_unsigned("num_events", get_num_events());
1469 f->dump_unsigned("num_segments", get_num_segments());
1470 f->close_section();
1471}