]> git.proxmox.com Git - ceph.git/blob - ceph/src/mds/journal.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / mds / journal.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include "common/config.h"
16 #include "osdc/Journaler.h"
17 #include "events/ESubtreeMap.h"
18 #include "events/ESession.h"
19 #include "events/ESessions.h"
20
21 #include "events/EMetaBlob.h"
22 #include "events/EResetJournal.h"
23 #include "events/ENoOp.h"
24
25 #include "events/EUpdate.h"
26 #include "events/ESlaveUpdate.h"
27 #include "events/EOpen.h"
28 #include "events/ECommitted.h"
29
30 #include "events/EExport.h"
31 #include "events/EImportStart.h"
32 #include "events/EImportFinish.h"
33 #include "events/EFragment.h"
34
35 #include "events/ETableClient.h"
36 #include "events/ETableServer.h"
37
38 #include "include/stringify.h"
39
40 #include "LogSegment.h"
41
42 #include "MDSRank.h"
43 #include "MDLog.h"
44 #include "MDCache.h"
45 #include "Server.h"
46 #include "Migrator.h"
47 #include "Mutation.h"
48
49 #include "InoTable.h"
50 #include "MDSTableClient.h"
51 #include "MDSTableServer.h"
52
53 #include "Locker.h"
54
55 #define dout_context g_ceph_context
56 #define dout_subsys ceph_subsys_mds
57 #undef dout_prefix
58 #define dout_prefix *_dout << "mds." << mds->get_nodeid() << ".journal "
59
60
61 // -----------------------
62 // LogSegment
63
64 void LogSegment::try_to_expire(MDSRank *mds, MDSGatherBuilder &gather_bld, int op_prio)
65 {
66 set<CDir*> commit;
67
68 dout(6) << "LogSegment(" << seq << "/" << offset << ").try_to_expire" << dendl;
69
70 ceph_assert(g_conf()->mds_kill_journal_expire_at != 1);
71
72 // commit dirs
73 for (elist<CDir*>::iterator p = new_dirfrags.begin(); !p.end(); ++p) {
74 dout(20) << " new_dirfrag " << **p << dendl;
75 ceph_assert((*p)->is_auth());
76 commit.insert(*p);
77 }
78 for (elist<CDir*>::iterator p = dirty_dirfrags.begin(); !p.end(); ++p) {
79 dout(20) << " dirty_dirfrag " << **p << dendl;
80 ceph_assert((*p)->is_auth());
81 commit.insert(*p);
82 }
83 for (elist<CDentry*>::iterator p = dirty_dentries.begin(); !p.end(); ++p) {
84 dout(20) << " dirty_dentry " << **p << dendl;
85 ceph_assert((*p)->is_auth());
86 commit.insert((*p)->get_dir());
87 }
88 for (elist<CInode*>::iterator p = dirty_inodes.begin(); !p.end(); ++p) {
89 dout(20) << " dirty_inode " << **p << dendl;
90 ceph_assert((*p)->is_auth());
91 if ((*p)->is_base()) {
92 (*p)->store(gather_bld.new_sub());
93 } else
94 commit.insert((*p)->get_parent_dn()->get_dir());
95 }
96
97 if (!commit.empty()) {
98 for (set<CDir*>::iterator p = commit.begin();
99 p != commit.end();
100 ++p) {
101 CDir *dir = *p;
102 ceph_assert(dir->is_auth());
103 if (dir->can_auth_pin()) {
104 dout(15) << "try_to_expire committing " << *dir << dendl;
105 dir->commit(0, gather_bld.new_sub(), false, op_prio);
106 } else {
107 dout(15) << "try_to_expire waiting for unfreeze on " << *dir << dendl;
108 dir->add_waiter(CDir::WAIT_UNFREEZE, gather_bld.new_sub());
109 }
110 }
111 }
112
113 // master ops with possibly uncommitted slaves
114 for (set<metareqid_t>::iterator p = uncommitted_masters.begin();
115 p != uncommitted_masters.end();
116 ++p) {
117 dout(10) << "try_to_expire waiting for slaves to ack commit on " << *p << dendl;
118 mds->mdcache->wait_for_uncommitted_master(*p, gather_bld.new_sub());
119 }
120
121 // uncommitted fragments
122 for (set<dirfrag_t>::iterator p = uncommitted_fragments.begin();
123 p != uncommitted_fragments.end();
124 ++p) {
125 dout(10) << "try_to_expire waiting for uncommitted fragment " << *p << dendl;
126 mds->mdcache->wait_for_uncommitted_fragment(*p, gather_bld.new_sub());
127 }
128
129 // nudge scatterlocks
130 for (elist<CInode*>::iterator p = dirty_dirfrag_dir.begin(); !p.end(); ++p) {
131 CInode *in = *p;
132 dout(10) << "try_to_expire waiting for dirlock flush on " << *in << dendl;
133 mds->locker->scatter_nudge(&in->filelock, gather_bld.new_sub());
134 }
135 for (elist<CInode*>::iterator p = dirty_dirfrag_dirfragtree.begin(); !p.end(); ++p) {
136 CInode *in = *p;
137 dout(10) << "try_to_expire waiting for dirfragtreelock flush on " << *in << dendl;
138 mds->locker->scatter_nudge(&in->dirfragtreelock, gather_bld.new_sub());
139 }
140 for (elist<CInode*>::iterator p = dirty_dirfrag_nest.begin(); !p.end(); ++p) {
141 CInode *in = *p;
142 dout(10) << "try_to_expire waiting for nest flush on " << *in << dendl;
143 mds->locker->scatter_nudge(&in->nestlock, gather_bld.new_sub());
144 }
145
146 ceph_assert(g_conf()->mds_kill_journal_expire_at != 2);
147
148 // open files and snap inodes
149 if (!open_files.empty()) {
150 ceph_assert(!mds->mdlog->is_capped()); // hmm FIXME
151 EOpen *le = 0;
152 LogSegment *ls = mds->mdlog->get_current_segment();
153 ceph_assert(ls != this);
154 elist<CInode*>::iterator p = open_files.begin(member_offset(CInode, item_open_file));
155 while (!p.end()) {
156 CInode *in = *p;
157 ++p;
158 if (in->last != CEPH_NOSNAP && in->is_auth() && !in->client_snap_caps.empty()) {
159 // journal snap inodes that need flush. This simplify the mds failover hanlding
160 dout(20) << "try_to_expire requeueing snap needflush inode " << *in << dendl;
161 if (!le) {
162 le = new EOpen(mds->mdlog);
163 mds->mdlog->start_entry(le);
164 }
165 le->add_clean_inode(in);
166 ls->open_files.push_back(&in->item_open_file);
167 } else {
168 // open files are tracked by open file table, no need to journal them again
169 in->item_open_file.remove_myself();
170 }
171 }
172 if (le) {
173 mds->mdlog->submit_entry(le);
174 mds->mdlog->wait_for_safe(gather_bld.new_sub());
175 dout(10) << "try_to_expire waiting for open files to rejournal" << dendl;
176 }
177 }
178
179 ceph_assert(g_conf()->mds_kill_journal_expire_at != 3);
180
181 // backtraces to be stored/updated
182 for (elist<CInode*>::iterator p = dirty_parent_inodes.begin(); !p.end(); ++p) {
183 CInode *in = *p;
184 ceph_assert(in->is_auth());
185 if (in->can_auth_pin()) {
186 dout(15) << "try_to_expire waiting for storing backtrace on " << *in << dendl;
187 in->store_backtrace(gather_bld.new_sub(), op_prio);
188 } else {
189 dout(15) << "try_to_expire waiting for unfreeze on " << *in << dendl;
190 in->add_waiter(CInode::WAIT_UNFREEZE, gather_bld.new_sub());
191 }
192 }
193
194 ceph_assert(g_conf()->mds_kill_journal_expire_at != 4);
195
196 // slave updates
197 for (elist<MDSlaveUpdate*>::iterator p = slave_updates.begin(member_offset(MDSlaveUpdate,
198 item));
199 !p.end(); ++p) {
200 MDSlaveUpdate *su = *p;
201 dout(10) << "try_to_expire waiting on slave update " << su << dendl;
202 ceph_assert(su->waiter == 0);
203 su->waiter = gather_bld.new_sub();
204 }
205
206 // idalloc
207 if (inotablev > mds->inotable->get_committed_version()) {
208 dout(10) << "try_to_expire saving inotable table, need " << inotablev
209 << ", committed is " << mds->inotable->get_committed_version()
210 << " (" << mds->inotable->get_committing_version() << ")"
211 << dendl;
212 mds->inotable->save(gather_bld.new_sub(), inotablev);
213 }
214
215 // sessionmap
216 if (sessionmapv > mds->sessionmap.get_committed()) {
217 dout(10) << "try_to_expire saving sessionmap, need " << sessionmapv
218 << ", committed is " << mds->sessionmap.get_committed()
219 << " (" << mds->sessionmap.get_committing() << ")"
220 << dendl;
221 mds->sessionmap.save(gather_bld.new_sub(), sessionmapv);
222 }
223
224 // updates to sessions for completed_requests
225 mds->sessionmap.save_if_dirty(touched_sessions, &gather_bld);
226 touched_sessions.clear();
227
228 // pending commit atids
229 for (map<int, ceph::unordered_set<version_t> >::iterator p = pending_commit_tids.begin();
230 p != pending_commit_tids.end();
231 ++p) {
232 MDSTableClient *client = mds->get_table_client(p->first);
233 ceph_assert(client);
234 for (ceph::unordered_set<version_t>::iterator q = p->second.begin();
235 q != p->second.end();
236 ++q) {
237 dout(10) << "try_to_expire " << get_mdstable_name(p->first) << " transaction " << *q
238 << " pending commit (not yet acked), waiting" << dendl;
239 ceph_assert(!client->has_committed(*q));
240 client->wait_for_ack(*q, gather_bld.new_sub());
241 }
242 }
243
244 // table servers
245 for (map<int, version_t>::iterator p = tablev.begin();
246 p != tablev.end();
247 ++p) {
248 MDSTableServer *server = mds->get_table_server(p->first);
249 ceph_assert(server);
250 if (p->second > server->get_committed_version()) {
251 dout(10) << "try_to_expire waiting for " << get_mdstable_name(p->first)
252 << " to save, need " << p->second << dendl;
253 server->save(gather_bld.new_sub());
254 }
255 }
256
257 // truncating
258 for (set<CInode*>::iterator p = truncating_inodes.begin();
259 p != truncating_inodes.end();
260 ++p) {
261 dout(10) << "try_to_expire waiting for truncate of " << **p << dendl;
262 (*p)->add_waiter(CInode::WAIT_TRUNC, gather_bld.new_sub());
263 }
264
265 if (gather_bld.has_subs()) {
266 dout(6) << "LogSegment(" << seq << "/" << offset << ").try_to_expire waiting" << dendl;
267 mds->mdlog->flush();
268 } else {
269 ceph_assert(g_conf()->mds_kill_journal_expire_at != 5);
270 dout(6) << "LogSegment(" << seq << "/" << offset << ").try_to_expire success" << dendl;
271 }
272 }
273
274
275 // -----------------------
276 // EMetaBlob
277
278 void EMetaBlob::add_dir_context(CDir *dir, int mode)
279 {
280 MDSRank *mds = dir->cache->mds;
281
282 list<CDentry*> parents;
283
284 // it may be okay not to include the maybe items, if
285 // - we journaled the maybe child inode in this segment
286 // - that subtree turns out to be unambiguously auth
287 list<CDentry*> maybe;
288 bool maybenot = false;
289
290 while (true) {
291 // already have this dir? (we must always add in order)
292 if (lump_map.count(dir->dirfrag())) {
293 dout(20) << "EMetaBlob::add_dir_context(" << dir << ") have lump " << dir->dirfrag() << dendl;
294 break;
295 }
296
297 // stop at root/stray
298 CInode *diri = dir->get_inode();
299 CDentry *parent = diri->get_projected_parent_dn();
300
301 if (mode == TO_AUTH_SUBTREE_ROOT) {
302 // subtree root?
303 if (dir->is_subtree_root()) {
304 // match logic in MDCache::create_subtree_map()
305 if (dir->get_dir_auth().first == mds->get_nodeid()) {
306 mds_authority_t parent_auth = parent ? parent->authority() : CDIR_AUTH_UNDEF;
307 if (parent_auth.first == dir->get_dir_auth().first) {
308 if (parent_auth.second == CDIR_AUTH_UNKNOWN &&
309 !dir->is_ambiguous_dir_auth() &&
310 !dir->state_test(CDir::STATE_EXPORTBOUND) &&
311 !dir->state_test(CDir::STATE_AUXSUBTREE) &&
312 !diri->state_test(CInode::STATE_AMBIGUOUSAUTH)) {
313 dout(0) << "EMetaBlob::add_dir_context unexpected subtree " << *dir << dendl;
314 ceph_abort();
315 }
316 dout(20) << "EMetaBlob::add_dir_context(" << dir << ") ambiguous or transient subtree " << dendl;
317 } else {
318 // it's an auth subtree, we don't need maybe (if any), and we're done.
319 dout(20) << "EMetaBlob::add_dir_context(" << dir << ") reached unambig auth subtree, don't need " << maybe
320 << " at " << *dir << dendl;
321 maybe.clear();
322 break;
323 }
324 } else {
325 dout(20) << "EMetaBlob::add_dir_context(" << dir << ") reached ambig or !auth subtree, need " << maybe
326 << " at " << *dir << dendl;
327 // we need the maybe list after all!
328 parents.splice(parents.begin(), maybe);
329 maybenot = false;
330 }
331 }
332
333 // was the inode journaled in this blob?
334 if (event_seq && diri->last_journaled == event_seq) {
335 dout(20) << "EMetaBlob::add_dir_context(" << dir << ") already have diri this blob " << *diri << dendl;
336 break;
337 }
338
339 // have we journaled this inode since the last subtree map?
340 if (!maybenot && last_subtree_map && diri->last_journaled >= last_subtree_map) {
341 dout(20) << "EMetaBlob::add_dir_context(" << dir << ") already have diri in this segment ("
342 << diri->last_journaled << " >= " << last_subtree_map << "), setting maybenot flag "
343 << *diri << dendl;
344 maybenot = true;
345 }
346 }
347
348 if (!parent)
349 break;
350
351 if (maybenot) {
352 dout(25) << "EMetaBlob::add_dir_context(" << dir << ") maybe " << *parent << dendl;
353 maybe.push_front(parent);
354 } else {
355 dout(25) << "EMetaBlob::add_dir_context(" << dir << ") definitely " << *parent << dendl;
356 parents.push_front(parent);
357 }
358
359 dir = parent->get_dir();
360 }
361
362 parents.splice(parents.begin(), maybe);
363
364 dout(20) << "EMetaBlob::add_dir_context final: " << parents << dendl;
365 for (list<CDentry*>::iterator p = parents.begin(); p != parents.end(); ++p) {
366 ceph_assert((*p)->get_projected_linkage()->is_primary());
367 add_dentry(*p, false);
368 }
369 }
370
371 void EMetaBlob::update_segment(LogSegment *ls)
372 {
373 // dirty inode mtimes
374 // -> handled directly by Server.cc, replay()
375
376 // alloc table update?
377 if (inotablev)
378 ls->inotablev = inotablev;
379 if (sessionmapv)
380 ls->sessionmapv = sessionmapv;
381
382 // truncated inodes
383 // -> handled directly by Server.cc
384
385 // client requests
386 // note the newest request per client
387 //if (!client_reqs.empty())
388 // ls->last_client_tid[client_reqs.rbegin()->client] = client_reqs.rbegin()->tid);
389 }
390
391 // EMetaBlob::fullbit
392
393 void EMetaBlob::fullbit::encode(bufferlist& bl, uint64_t features) const {
394 ENCODE_START(8, 5, bl);
395 encode(dn, bl);
396 encode(dnfirst, bl);
397 encode(dnlast, bl);
398 encode(dnv, bl);
399 encode(inode, bl, features);
400 encode(xattrs, bl);
401 if (inode.is_symlink())
402 encode(symlink, bl);
403 if (inode.is_dir()) {
404 encode(dirfragtree, bl);
405 encode(snapbl, bl);
406 }
407 encode(state, bl);
408 if (old_inodes.empty()) {
409 encode(false, bl);
410 } else {
411 encode(true, bl);
412 encode(old_inodes, bl, features);
413 }
414 if (!inode.is_dir())
415 encode(snapbl, bl);
416 encode(oldest_snap, bl);
417 ENCODE_FINISH(bl);
418 }
419
420 void EMetaBlob::fullbit::decode(bufferlist::const_iterator &bl) {
421 DECODE_START_LEGACY_COMPAT_LEN(7, 5, 5, bl);
422 decode(dn, bl);
423 decode(dnfirst, bl);
424 decode(dnlast, bl);
425 decode(dnv, bl);
426 decode(inode, bl);
427 decode(xattrs, bl);
428 if (inode.is_symlink())
429 decode(symlink, bl);
430 if (inode.is_dir()) {
431 decode(dirfragtree, bl);
432 decode(snapbl, bl);
433 if ((struct_v == 2) || (struct_v == 3)) {
434 bool dir_layout_exists;
435 decode(dir_layout_exists, bl);
436 if (dir_layout_exists) {
437 __u8 dir_struct_v;
438 decode(dir_struct_v, bl); // default_file_layout version
439 decode(inode.layout, bl); // and actual layout, that we care about
440 }
441 }
442 }
443 if (struct_v >= 6) {
444 decode(state, bl);
445 } else {
446 bool dirty;
447 decode(dirty, bl);
448 state = dirty ? EMetaBlob::fullbit::STATE_DIRTY : 0;
449 }
450
451 if (struct_v >= 3) {
452 bool old_inodes_present;
453 decode(old_inodes_present, bl);
454 if (old_inodes_present) {
455 decode(old_inodes, bl);
456 }
457 }
458 if (!inode.is_dir()) {
459 if (struct_v >= 7)
460 decode(snapbl, bl);
461 }
462 if (struct_v >= 8)
463 decode(oldest_snap, bl);
464 else
465 oldest_snap = CEPH_NOSNAP;
466
467 DECODE_FINISH(bl);
468 }
469
470 void EMetaBlob::fullbit::dump(Formatter *f) const
471 {
472 f->dump_string("dentry", dn);
473 f->dump_stream("snapid.first") << dnfirst;
474 f->dump_stream("snapid.last") << dnlast;
475 f->dump_int("dentry version", dnv);
476 f->open_object_section("inode");
477 inode.dump(f);
478 f->close_section(); // inode
479 f->open_object_section("xattrs");
480 for (const auto &p : xattrs) {
481 std::string s(p.second.c_str(), p.second.length());
482 f->dump_string(p.first.c_str(), s);
483 }
484 f->close_section(); // xattrs
485 if (inode.is_symlink()) {
486 f->dump_string("symlink", symlink);
487 }
488 if (inode.is_dir()) {
489 f->dump_stream("frag tree") << dirfragtree;
490 f->dump_string("has_snapbl", snapbl.length() ? "true" : "false");
491 if (inode.has_layout()) {
492 f->open_object_section("file layout policy");
493 // FIXME
494 f->dump_string("layout", "the layout exists");
495 f->close_section(); // file layout policy
496 }
497 }
498 f->dump_string("state", state_string());
499 if (!old_inodes.empty()) {
500 f->open_array_section("old inodes");
501 for (const auto &p : old_inodes) {
502 f->open_object_section("inode");
503 f->dump_int("snapid", p.first);
504 p.second.dump(f);
505 f->close_section(); // inode
506 }
507 f->close_section(); // old inodes
508 }
509 }
510
511 void EMetaBlob::fullbit::generate_test_instances(list<EMetaBlob::fullbit*>& ls)
512 {
513 CInode::mempool_inode inode;
514 fragtree_t fragtree;
515 CInode::mempool_xattr_map empty_xattrs;
516 bufferlist empty_snapbl;
517 fullbit *sample = new fullbit("/testdn", 0, 0, 0,
518 inode, fragtree, empty_xattrs, "", 0, empty_snapbl,
519 false, NULL);
520 ls.push_back(sample);
521 }
522
523 void EMetaBlob::fullbit::update_inode(MDSRank *mds, CInode *in)
524 {
525 in->inode = inode;
526 in->xattrs = xattrs;
527 in->maybe_export_pin();
528 if (in->inode.is_dir()) {
529 if (!(in->dirfragtree == dirfragtree)) {
530 dout(10) << "EMetaBlob::fullbit::update_inode dft " << in->dirfragtree << " -> "
531 << dirfragtree << " on " << *in << dendl;
532 in->dirfragtree = dirfragtree;
533 in->force_dirfrags();
534 if (in->has_dirfrags() && in->authority() == CDIR_AUTH_UNDEF) {
535 list<CDir*> ls;
536 in->get_nested_dirfrags(ls);
537 for (list<CDir*>::iterator p = ls.begin(); p != ls.end(); ++p) {
538 CDir *dir = *p;
539 if (dir->get_num_any() == 0 &&
540 mds->mdcache->can_trim_non_auth_dirfrag(dir)) {
541 dout(10) << " closing empty non-auth dirfrag " << *dir << dendl;
542 in->close_dirfrag(dir->get_frag());
543 }
544 }
545 }
546 }
547 } else if (in->inode.is_symlink()) {
548 in->symlink = symlink;
549 }
550 in->old_inodes = old_inodes;
551 if (!in->old_inodes.empty()) {
552 snapid_t min_first = in->old_inodes.rbegin()->first + 1;
553 if (min_first > in->first)
554 in->first = min_first;
555 }
556
557 /*
558 * we can do this before linking hte inode bc the split_at would
559 * be a no-op.. we have no children (namely open snaprealms) to
560 * divy up
561 */
562 in->oldest_snap = oldest_snap;
563 in->decode_snap_blob(snapbl);
564
565 /*
566 * In case there was anything malformed in the journal that we are
567 * replaying, do sanity checks on the inodes we're replaying and
568 * go damaged instead of letting any trash into a live cache
569 */
570 if (in->is_file()) {
571 // Files must have valid layouts with a pool set
572 if (in->inode.layout.pool_id == -1 || !in->inode.layout.is_valid()) {
573 dout(0) << "EMetaBlob.replay invalid layout on ino " << *in
574 << ": " << in->inode.layout << dendl;
575 std::ostringstream oss;
576 oss << "Invalid layout for inode " << in->ino() << " in journal";
577 mds->clog->error() << oss.str();
578 mds->damaged();
579 ceph_abort(); // Should be unreachable because damaged() calls respawn()
580 }
581 }
582 }
583
584 // EMetaBlob::remotebit
585
586 void EMetaBlob::remotebit::encode(bufferlist& bl) const
587 {
588 ENCODE_START(2, 2, bl);
589 encode(dn, bl);
590 encode(dnfirst, bl);
591 encode(dnlast, bl);
592 encode(dnv, bl);
593 encode(ino, bl);
594 encode(d_type, bl);
595 encode(dirty, bl);
596 ENCODE_FINISH(bl);
597 }
598
599 void EMetaBlob::remotebit::decode(bufferlist::const_iterator &bl)
600 {
601 DECODE_START_LEGACY_COMPAT_LEN(2, 2, 2, bl);
602 decode(dn, bl);
603 decode(dnfirst, bl);
604 decode(dnlast, bl);
605 decode(dnv, bl);
606 decode(ino, bl);
607 decode(d_type, bl);
608 decode(dirty, bl);
609 DECODE_FINISH(bl);
610 }
611
612 void EMetaBlob::remotebit::dump(Formatter *f) const
613 {
614 f->dump_string("dentry", dn);
615 f->dump_int("snapid.first", dnfirst);
616 f->dump_int("snapid.last", dnlast);
617 f->dump_int("dentry version", dnv);
618 f->dump_int("inodeno", ino);
619 uint32_t type = DTTOIF(d_type) & S_IFMT; // convert to type entries
620 string type_string;
621 switch(type) {
622 case S_IFREG:
623 type_string = "file"; break;
624 case S_IFLNK:
625 type_string = "symlink"; break;
626 case S_IFDIR:
627 type_string = "directory"; break;
628 case S_IFIFO:
629 type_string = "fifo"; break;
630 case S_IFCHR:
631 type_string = "chr"; break;
632 case S_IFBLK:
633 type_string = "blk"; break;
634 case S_IFSOCK:
635 type_string = "sock"; break;
636 default:
637 assert (0 == "unknown d_type!");
638 }
639 f->dump_string("d_type", type_string);
640 f->dump_string("dirty", dirty ? "true" : "false");
641 }
642
643 void EMetaBlob::remotebit::
644 generate_test_instances(list<EMetaBlob::remotebit*>& ls)
645 {
646 remotebit *remote = new remotebit("/test/dn", 0, 10, 15, 1, IFTODT(S_IFREG), false);
647 ls.push_back(remote);
648 }
649
650 // EMetaBlob::nullbit
651
652 void EMetaBlob::nullbit::encode(bufferlist& bl) const
653 {
654 ENCODE_START(2, 2, bl);
655 encode(dn, bl);
656 encode(dnfirst, bl);
657 encode(dnlast, bl);
658 encode(dnv, bl);
659 encode(dirty, bl);
660 ENCODE_FINISH(bl);
661 }
662
663 void EMetaBlob::nullbit::decode(bufferlist::const_iterator &bl)
664 {
665 DECODE_START_LEGACY_COMPAT_LEN(2, 2, 2, bl);
666 decode(dn, bl);
667 decode(dnfirst, bl);
668 decode(dnlast, bl);
669 decode(dnv, bl);
670 decode(dirty, bl);
671 DECODE_FINISH(bl);
672 }
673
674 void EMetaBlob::nullbit::dump(Formatter *f) const
675 {
676 f->dump_string("dentry", dn);
677 f->dump_int("snapid.first", dnfirst);
678 f->dump_int("snapid.last", dnlast);
679 f->dump_int("dentry version", dnv);
680 f->dump_string("dirty", dirty ? "true" : "false");
681 }
682
683 void EMetaBlob::nullbit::generate_test_instances(list<nullbit*>& ls)
684 {
685 nullbit *sample = new nullbit("/test/dentry", 0, 10, 15, false);
686 nullbit *sample2 = new nullbit("/test/dirty", 10, 20, 25, true);
687 ls.push_back(sample);
688 ls.push_back(sample2);
689 }
690
691 // EMetaBlob::dirlump
692
693 void EMetaBlob::dirlump::encode(bufferlist& bl, uint64_t features) const
694 {
695 ENCODE_START(2, 2, bl);
696 encode(fnode, bl);
697 encode(state, bl);
698 encode(nfull, bl);
699 encode(nremote, bl);
700 encode(nnull, bl);
701 _encode_bits(features);
702 encode(dnbl, bl);
703 ENCODE_FINISH(bl);
704 }
705
706 void EMetaBlob::dirlump::decode(bufferlist::const_iterator &bl)
707 {
708 DECODE_START_LEGACY_COMPAT_LEN(2, 2, 2, bl)
709 decode(fnode, bl);
710 decode(state, bl);
711 decode(nfull, bl);
712 decode(nremote, bl);
713 decode(nnull, bl);
714 decode(dnbl, bl);
715 dn_decoded = false; // don't decode bits unless we need them.
716 DECODE_FINISH(bl);
717 }
718
719 void EMetaBlob::dirlump::dump(Formatter *f) const
720 {
721 if (!dn_decoded) {
722 dirlump *me = const_cast<dirlump*>(this);
723 me->_decode_bits();
724 }
725 f->open_object_section("fnode");
726 fnode.dump(f);
727 f->close_section(); // fnode
728 f->dump_string("state", state_string());
729 f->dump_int("nfull", nfull);
730 f->dump_int("nremote", nremote);
731 f->dump_int("nnull", nnull);
732
733 f->open_array_section("full bits");
734 for (const auto& iter : dfull) {
735 f->open_object_section("fullbit");
736 iter.dump(f);
737 f->close_section(); // fullbit
738 }
739 f->close_section(); // full bits
740 f->open_array_section("remote bits");
741 for (const auto& iter : dremote) {
742 f->open_object_section("remotebit");
743 iter.dump(f);
744 f->close_section(); // remotebit
745 }
746 f->close_section(); // remote bits
747 f->open_array_section("null bits");
748 for (const auto& iter : dnull) {
749 f->open_object_section("null bit");
750 iter.dump(f);
751 f->close_section(); // null bit
752 }
753 f->close_section(); // null bits
754 }
755
756 void EMetaBlob::dirlump::generate_test_instances(list<dirlump*>& ls)
757 {
758 ls.push_back(new dirlump());
759 }
760
761 /**
762 * EMetaBlob proper
763 */
764 void EMetaBlob::encode(bufferlist& bl, uint64_t features) const
765 {
766 ENCODE_START(8, 5, bl);
767 encode(lump_order, bl);
768 encode(lump_map, bl, features);
769 encode(roots, bl, features);
770 encode(table_tids, bl);
771 encode(opened_ino, bl);
772 encode(allocated_ino, bl);
773 encode(used_preallocated_ino, bl);
774 encode(preallocated_inos, bl);
775 encode(client_name, bl);
776 encode(inotablev, bl);
777 encode(sessionmapv, bl);
778 encode(truncate_start, bl);
779 encode(truncate_finish, bl);
780 encode(destroyed_inodes, bl);
781 encode(client_reqs, bl);
782 encode(renamed_dirino, bl);
783 encode(renamed_dir_frags, bl);
784 {
785 // make MDSRank use v6 format happy
786 int64_t i = -1;
787 bool b = false;
788 encode(i, bl);
789 encode(b, bl);
790 }
791 encode(client_flushes, bl);
792 ENCODE_FINISH(bl);
793 }
794 void EMetaBlob::decode(bufferlist::const_iterator &bl)
795 {
796 DECODE_START_LEGACY_COMPAT_LEN(7, 5, 5, bl);
797 decode(lump_order, bl);
798 decode(lump_map, bl);
799 if (struct_v >= 4) {
800 decode(roots, bl);
801 } else {
802 bufferlist rootbl;
803 decode(rootbl, bl);
804 if (rootbl.length()) {
805 auto p = rootbl.cbegin();
806 roots.emplace_back(p);
807 }
808 }
809 decode(table_tids, bl);
810 decode(opened_ino, bl);
811 decode(allocated_ino, bl);
812 decode(used_preallocated_ino, bl);
813 decode(preallocated_inos, bl);
814 decode(client_name, bl);
815 decode(inotablev, bl);
816 decode(sessionmapv, bl);
817 decode(truncate_start, bl);
818 decode(truncate_finish, bl);
819 decode(destroyed_inodes, bl);
820 if (struct_v >= 2) {
821 decode(client_reqs, bl);
822 } else {
823 list<metareqid_t> r;
824 decode(r, bl);
825 while (!r.empty()) {
826 client_reqs.push_back(pair<metareqid_t,uint64_t>(r.front(), 0));
827 r.pop_front();
828 }
829 }
830 if (struct_v >= 3) {
831 decode(renamed_dirino, bl);
832 decode(renamed_dir_frags, bl);
833 }
834 if (struct_v >= 6) {
835 // ignore
836 int64_t i;
837 bool b;
838 decode(i, bl);
839 decode(b, bl);
840 }
841 if (struct_v >= 8) {
842 decode(client_flushes, bl);
843 }
844 DECODE_FINISH(bl);
845 }
846
847
848 /**
849 * Get all inodes touched by this metablob. Includes the 'bits' within
850 * dirlumps, and the inodes of the dirs themselves.
851 */
852 void EMetaBlob::get_inodes(
853 std::set<inodeno_t> &inodes) const
854 {
855 // For all dirlumps in this metablob
856 for (std::map<dirfrag_t, dirlump>::const_iterator i = lump_map.begin(); i != lump_map.end(); ++i) {
857 // Record inode of dirlump
858 inodeno_t const dir_ino = i->first.ino;
859 inodes.insert(dir_ino);
860
861 // Decode dirlump bits
862 dirlump const &dl = i->second;
863 dl._decode_bits();
864
865 // Record inodes of fullbits
866 for (const auto& iter : dl.get_dfull()) {
867 inodes.insert(iter.inode.ino);
868 }
869
870 // Record inodes of remotebits
871 for (const auto& iter : dl.get_dremote()) {
872 inodes.insert(iter.ino);
873 }
874 }
875 }
876
877
878 /**
879 * Get a map of dirfrag to set of dentries in that dirfrag which are
880 * touched in this operation.
881 */
882 void EMetaBlob::get_dentries(std::map<dirfrag_t, std::set<std::string> > &dentries) const
883 {
884 for (std::map<dirfrag_t, dirlump>::const_iterator i = lump_map.begin(); i != lump_map.end(); ++i) {
885 dirlump const &dl = i->second;
886 dirfrag_t const &df = i->first;
887
888 // Get all bits
889 dl._decode_bits();
890
891 // For all bits, store dentry
892 for (const auto& iter : dl.get_dfull()) {
893 dentries[df].insert(iter.dn);
894 }
895 for (const auto& iter : dl.get_dremote()) {
896 dentries[df].insert(iter.dn);
897 }
898 for (const auto& iter : dl.get_dnull()) {
899 dentries[df].insert(iter.dn);
900 }
901 }
902 }
903
904
905
906 /**
907 * Calculate all paths that we can infer are touched by this metablob. Only uses
908 * information local to this metablob so it may only be the path within the
909 * subtree.
910 */
911 void EMetaBlob::get_paths(
912 std::vector<std::string> &paths) const
913 {
914 // Each dentry has a 'location' which is a 2-tuple of parent inode and dentry name
915 typedef std::pair<inodeno_t, std::string> Location;
916
917 // Whenever we see a dentry within a dirlump, we remember it as a child of
918 // the dirlump's inode
919 std::map<inodeno_t, std::list<std::string> > children;
920
921 // Whenever we see a location for an inode, remember it: this allows us to
922 // build a path given an inode
923 std::map<inodeno_t, Location> ino_locations;
924
925 // Special case: operations on root inode populate roots but not dirlumps
926 if (lump_map.empty() && !roots.empty()) {
927 paths.push_back("/");
928 return;
929 }
930
931 // First pass
932 // ==========
933 // Build a tiny local metadata cache for the path structure in this metablob
934 for (std::map<dirfrag_t, dirlump>::const_iterator i = lump_map.begin(); i != lump_map.end(); ++i) {
935 inodeno_t const dir_ino = i->first.ino;
936 dirlump const &dl = i->second;
937 dl._decode_bits();
938
939 for (const auto& iter : dl.get_dfull()) {
940 std::string_view dentry = iter.dn;
941 children[dir_ino].emplace_back(dentry);
942 ino_locations[iter.inode.ino] = Location(dir_ino, dentry);
943 }
944
945 for (const auto& iter : dl.get_dremote()) {
946 std::string_view dentry = iter.dn;
947 children[dir_ino].emplace_back(dentry);
948 }
949
950 for (const auto& iter : dl.get_dnull()) {
951 std::string_view dentry = iter.dn;
952 children[dir_ino].emplace_back(dentry);
953 }
954 }
955
956 std::vector<Location> leaf_locations;
957
958 // Second pass
959 // ===========
960 // Output paths for all childless nodes in the metablob
961 for (std::map<dirfrag_t, dirlump>::const_iterator i = lump_map.begin(); i != lump_map.end(); ++i) {
962 inodeno_t const dir_ino = i->first.ino;
963 dirlump const &dl = i->second;
964 dl._decode_bits();
965
966 for (const auto& iter : dl.get_dfull()) {
967 std::string_view dentry = iter.dn;
968 if (children.find(iter.inode.ino) == children.end()) {
969 leaf_locations.push_back(Location(dir_ino, dentry));
970 }
971 }
972
973 for (const auto& iter : dl.get_dremote()) {
974 std::string_view dentry = iter.dn;
975 leaf_locations.push_back(Location(dir_ino, dentry));
976 }
977
978 for (const auto& iter : dl.get_dnull()) {
979 std::string_view dentry = iter.dn;
980 leaf_locations.push_back(Location(dir_ino, dentry));
981 }
982 }
983
984 // For all the leaf locations identified, generate paths
985 for (std::vector<Location>::iterator i = leaf_locations.begin(); i != leaf_locations.end(); ++i) {
986 Location const &loc = *i;
987 std::string path = loc.second;
988 inodeno_t ino = loc.first;
989 std::map<inodeno_t, Location>::iterator iter = ino_locations.find(ino);
990 while(iter != ino_locations.end()) {
991 Location const &loc = iter->second;
992 if (!path.empty()) {
993 path = loc.second + "/" + path;
994 } else {
995 path = loc.second + path;
996 }
997 iter = ino_locations.find(loc.first);
998 }
999
1000 paths.push_back(path);
1001 }
1002 }
1003
1004
1005 void EMetaBlob::dump(Formatter *f) const
1006 {
1007 f->open_array_section("lumps");
1008 for (const auto& d : lump_order) {
1009 f->open_object_section("lump");
1010 f->open_object_section("dirfrag");
1011 f->dump_stream("dirfrag") << d;
1012 f->close_section(); // dirfrag
1013 f->open_object_section("dirlump");
1014 lump_map.at(d).dump(f);
1015 f->close_section(); // dirlump
1016 f->close_section(); // lump
1017 }
1018 f->close_section(); // lumps
1019
1020 f->open_array_section("roots");
1021 for (const auto& iter : roots) {
1022 f->open_object_section("root");
1023 iter.dump(f);
1024 f->close_section(); // root
1025 }
1026 f->close_section(); // roots
1027
1028 f->open_array_section("tableclient tranactions");
1029 for (const auto& p : table_tids) {
1030 f->open_object_section("transaction");
1031 f->dump_int("tid", p.first);
1032 f->dump_int("version", p.second);
1033 f->close_section(); // transaction
1034 }
1035 f->close_section(); // tableclient transactions
1036
1037 f->dump_int("renamed directory inodeno", renamed_dirino);
1038
1039 f->open_array_section("renamed directory fragments");
1040 for (const auto& p : renamed_dir_frags) {
1041 f->dump_int("frag", p);
1042 }
1043 f->close_section(); // renamed directory fragments
1044
1045 f->dump_int("inotable version", inotablev);
1046 f->dump_int("SessionMap version", sessionmapv);
1047 f->dump_int("allocated ino", allocated_ino);
1048
1049 f->dump_stream("preallocated inos") << preallocated_inos;
1050 f->dump_int("used preallocated ino", used_preallocated_ino);
1051
1052 f->open_object_section("client name");
1053 client_name.dump(f);
1054 f->close_section(); // client name
1055
1056 f->open_array_section("inodes starting a truncate");
1057 for(const auto& ino : truncate_start) {
1058 f->dump_int("inodeno", ino);
1059 }
1060 f->close_section(); // truncate inodes
1061 f->open_array_section("inodes finishing a truncated");
1062 for(const auto& p : truncate_finish) {
1063 f->open_object_section("inode+segment");
1064 f->dump_int("inodeno", p.first);
1065 f->dump_int("truncate starting segment", p.second);
1066 f->close_section(); // truncated inode
1067 }
1068 f->close_section(); // truncate finish inodes
1069
1070 f->open_array_section("destroyed inodes");
1071 for(vector<inodeno_t>::const_iterator i = destroyed_inodes.begin();
1072 i != destroyed_inodes.end(); ++i) {
1073 f->dump_int("inodeno", *i);
1074 }
1075 f->close_section(); // destroyed inodes
1076
1077 f->open_array_section("client requests");
1078 for(const auto& p : client_reqs) {
1079 f->open_object_section("Client request");
1080 f->dump_stream("request ID") << p.first;
1081 f->dump_int("oldest request on client", p.second);
1082 f->close_section(); // request
1083 }
1084 f->close_section(); // client requests
1085 }
1086
1087 void EMetaBlob::generate_test_instances(list<EMetaBlob*>& ls)
1088 {
1089 ls.push_back(new EMetaBlob());
1090 }
1091
1092 void EMetaBlob::replay(MDSRank *mds, LogSegment *logseg, MDSlaveUpdate *slaveup)
1093 {
1094 dout(10) << "EMetaBlob.replay " << lump_map.size() << " dirlumps by " << client_name << dendl;
1095
1096 ceph_assert(logseg);
1097
1098 ceph_assert(g_conf()->mds_kill_journal_replay_at != 1);
1099
1100 for (auto& p : roots) {
1101 CInode *in = mds->mdcache->get_inode(p.inode.ino);
1102 bool isnew = in ? false:true;
1103 if (!in)
1104 in = new CInode(mds->mdcache, false, 2, CEPH_NOSNAP);
1105 p.update_inode(mds, in);
1106
1107 if (isnew)
1108 mds->mdcache->add_inode(in);
1109 if (p.is_dirty()) in->_mark_dirty(logseg);
1110 dout(10) << "EMetaBlob.replay " << (isnew ? " added root ":" updated root ") << *in << dendl;
1111 }
1112
1113 CInode *renamed_diri = 0;
1114 CDir *olddir = 0;
1115 if (renamed_dirino) {
1116 renamed_diri = mds->mdcache->get_inode(renamed_dirino);
1117 if (renamed_diri)
1118 dout(10) << "EMetaBlob.replay renamed inode is " << *renamed_diri << dendl;
1119 else
1120 dout(10) << "EMetaBlob.replay don't have renamed ino " << renamed_dirino << dendl;
1121
1122 int nnull = 0;
1123 for (const auto& lp : lump_order) {
1124 dirlump &lump = lump_map[lp];
1125 if (lump.nnull) {
1126 dout(10) << "EMetaBlob.replay found null dentry in dir " << lp << dendl;
1127 nnull += lump.nnull;
1128 }
1129 }
1130 ceph_assert(nnull <= 1);
1131 }
1132
1133 // keep track of any inodes we unlink and don't relink elsewhere
1134 map<CInode*, CDir*> unlinked;
1135 set<CInode*> linked;
1136
1137 // walk through my dirs (in order!)
1138 for (const auto& lp : lump_order) {
1139 dout(10) << "EMetaBlob.replay dir " << lp << dendl;
1140 dirlump &lump = lump_map[lp];
1141
1142 // the dir
1143 CDir *dir = mds->mdcache->get_force_dirfrag(lp, true);
1144 if (!dir) {
1145 // hmm. do i have the inode?
1146 CInode *diri = mds->mdcache->get_inode((lp).ino);
1147 if (!diri) {
1148 if (MDS_INO_IS_MDSDIR(lp.ino)) {
1149 ceph_assert(MDS_INO_MDSDIR(mds->get_nodeid()) != lp.ino);
1150 diri = mds->mdcache->create_system_inode(lp.ino, S_IFDIR|0755);
1151 diri->state_clear(CInode::STATE_AUTH);
1152 dout(10) << "EMetaBlob.replay created base " << *diri << dendl;
1153 } else {
1154 dout(0) << "EMetaBlob.replay missing dir ino " << lp.ino << dendl;
1155 mds->clog->error() << "failure replaying journal (EMetaBlob)";
1156 mds->damaged();
1157 ceph_abort(); // Should be unreachable because damaged() calls respawn()
1158 }
1159 }
1160
1161 // create the dirfrag
1162 dir = diri->get_or_open_dirfrag(mds->mdcache, lp.frag);
1163
1164 if (MDS_INO_IS_BASE(lp.ino))
1165 mds->mdcache->adjust_subtree_auth(dir, CDIR_AUTH_UNDEF);
1166
1167 dout(10) << "EMetaBlob.replay added dir " << *dir << dendl;
1168 }
1169 dir->set_version( lump.fnode.version );
1170 dir->fnode = lump.fnode;
1171
1172 if (lump.is_importing()) {
1173 dir->state_set(CDir::STATE_AUTH);
1174 dir->state_clear(CDir::STATE_COMPLETE);
1175 }
1176 if (lump.is_dirty()) {
1177 dir->_mark_dirty(logseg);
1178
1179 if (!(dir->fnode.rstat == dir->fnode.accounted_rstat)) {
1180 dout(10) << "EMetaBlob.replay dirty nestinfo on " << *dir << dendl;
1181 mds->locker->mark_updated_scatterlock(&dir->inode->nestlock);
1182 logseg->dirty_dirfrag_nest.push_back(&dir->inode->item_dirty_dirfrag_nest);
1183 } else {
1184 dout(10) << "EMetaBlob.replay clean nestinfo on " << *dir << dendl;
1185 }
1186 if (!(dir->fnode.fragstat == dir->fnode.accounted_fragstat)) {
1187 dout(10) << "EMetaBlob.replay dirty fragstat on " << *dir << dendl;
1188 mds->locker->mark_updated_scatterlock(&dir->inode->filelock);
1189 logseg->dirty_dirfrag_dir.push_back(&dir->inode->item_dirty_dirfrag_dir);
1190 } else {
1191 dout(10) << "EMetaBlob.replay clean fragstat on " << *dir << dendl;
1192 }
1193 }
1194 if (lump.is_dirty_dft()) {
1195 dout(10) << "EMetaBlob.replay dirty dirfragtree on " << *dir << dendl;
1196 dir->state_set(CDir::STATE_DIRTYDFT);
1197 mds->locker->mark_updated_scatterlock(&dir->inode->dirfragtreelock);
1198 logseg->dirty_dirfrag_dirfragtree.push_back(&dir->inode->item_dirty_dirfrag_dirfragtree);
1199 }
1200 if (lump.is_new())
1201 dir->mark_new(logseg);
1202 if (lump.is_complete())
1203 dir->mark_complete();
1204
1205 dout(10) << "EMetaBlob.replay updated dir " << *dir << dendl;
1206
1207 // decode bits
1208 lump._decode_bits();
1209
1210 // full dentry+inode pairs
1211 for (auto& fb : lump._get_dfull()) {
1212 CDentry *dn = dir->lookup_exact_snap(fb.dn, fb.dnlast);
1213 if (!dn) {
1214 dn = dir->add_null_dentry(fb.dn, fb.dnfirst, fb.dnlast);
1215 dn->set_version(fb.dnv);
1216 if (fb.is_dirty()) dn->_mark_dirty(logseg);
1217 dout(10) << "EMetaBlob.replay added (full) " << *dn << dendl;
1218 } else {
1219 dn->set_version(fb.dnv);
1220 if (fb.is_dirty()) dn->_mark_dirty(logseg);
1221 dout(10) << "EMetaBlob.replay for [" << fb.dnfirst << "," << fb.dnlast << "] had " << *dn << dendl;
1222 dn->first = fb.dnfirst;
1223 ceph_assert(dn->last == fb.dnlast);
1224 }
1225 if (lump.is_importing())
1226 dn->state_set(CDentry::STATE_AUTH);
1227
1228 CInode *in = mds->mdcache->get_inode(fb.inode.ino, fb.dnlast);
1229 if (!in) {
1230 in = new CInode(mds->mdcache, dn->is_auth(), fb.dnfirst, fb.dnlast);
1231 fb.update_inode(mds, in);
1232 mds->mdcache->add_inode(in);
1233 if (!dn->get_linkage()->is_null()) {
1234 if (dn->get_linkage()->is_primary()) {
1235 unlinked[dn->get_linkage()->get_inode()] = dir;
1236 stringstream ss;
1237 ss << "EMetaBlob.replay FIXME had dentry linked to wrong inode " << *dn
1238 << " " << *dn->get_linkage()->get_inode() << " should be " << fb.inode.ino;
1239 dout(0) << ss.str() << dendl;
1240 mds->clog->warn(ss);
1241 }
1242 dir->unlink_inode(dn, false);
1243 }
1244 if (unlinked.count(in))
1245 linked.insert(in);
1246 dir->link_primary_inode(dn, in);
1247 dout(10) << "EMetaBlob.replay added " << *in << dendl;
1248 } else {
1249 in->first = fb.dnfirst;
1250 fb.update_inode(mds, in);
1251 if (dn->get_linkage()->get_inode() != in && in->get_parent_dn()) {
1252 dout(10) << "EMetaBlob.replay unlinking " << *in << dendl;
1253 unlinked[in] = in->get_parent_dir();
1254 in->get_parent_dir()->unlink_inode(in->get_parent_dn());
1255 }
1256 if (dn->get_linkage()->get_inode() != in) {
1257 if (!dn->get_linkage()->is_null()) { // note: might be remote. as with stray reintegration.
1258 if (dn->get_linkage()->is_primary()) {
1259 unlinked[dn->get_linkage()->get_inode()] = dir;
1260 stringstream ss;
1261 ss << "EMetaBlob.replay FIXME had dentry linked to wrong inode " << *dn
1262 << " " << *dn->get_linkage()->get_inode() << " should be " << fb.inode.ino;
1263 dout(0) << ss.str() << dendl;
1264 mds->clog->warn(ss);
1265 }
1266 dir->unlink_inode(dn, false);
1267 }
1268 if (unlinked.count(in))
1269 linked.insert(in);
1270 dir->link_primary_inode(dn, in);
1271 dout(10) << "EMetaBlob.replay linked " << *in << dendl;
1272 } else {
1273 dout(10) << "EMetaBlob.replay for [" << fb.dnfirst << "," << fb.dnlast << "] had " << *in << dendl;
1274 }
1275 ceph_assert(in->first == fb.dnfirst ||
1276 (in->is_multiversion() && in->first > fb.dnfirst));
1277 }
1278 if (fb.is_dirty())
1279 in->_mark_dirty(logseg);
1280 if (fb.is_dirty_parent())
1281 in->mark_dirty_parent(logseg, fb.is_dirty_pool());
1282 if (fb.need_snapflush())
1283 logseg->open_files.push_back(&in->item_open_file);
1284 if (dn->is_auth())
1285 in->state_set(CInode::STATE_AUTH);
1286 else
1287 in->state_clear(CInode::STATE_AUTH);
1288 ceph_assert(g_conf()->mds_kill_journal_replay_at != 2);
1289 }
1290
1291 // remote dentries
1292 for (const auto& rb : lump.get_dremote()) {
1293 CDentry *dn = dir->lookup_exact_snap(rb.dn, rb.dnlast);
1294 if (!dn) {
1295 dn = dir->add_remote_dentry(rb.dn, rb.ino, rb.d_type, rb.dnfirst, rb.dnlast);
1296 dn->set_version(rb.dnv);
1297 if (rb.dirty) dn->_mark_dirty(logseg);
1298 dout(10) << "EMetaBlob.replay added " << *dn << dendl;
1299 } else {
1300 if (!dn->get_linkage()->is_null()) {
1301 dout(10) << "EMetaBlob.replay unlinking " << *dn << dendl;
1302 if (dn->get_linkage()->is_primary()) {
1303 unlinked[dn->get_linkage()->get_inode()] = dir;
1304 stringstream ss;
1305 ss << "EMetaBlob.replay FIXME had dentry linked to wrong inode " << *dn
1306 << " " << *dn->get_linkage()->get_inode() << " should be remote " << rb.ino;
1307 dout(0) << ss.str() << dendl;
1308 }
1309 dir->unlink_inode(dn, false);
1310 }
1311 dir->link_remote_inode(dn, rb.ino, rb.d_type);
1312 dn->set_version(rb.dnv);
1313 if (rb.dirty) dn->_mark_dirty(logseg);
1314 dout(10) << "EMetaBlob.replay for [" << rb.dnfirst << "," << rb.dnlast << "] had " << *dn << dendl;
1315 dn->first = rb.dnfirst;
1316 ceph_assert(dn->last == rb.dnlast);
1317 }
1318 if (lump.is_importing())
1319 dn->state_set(CDentry::STATE_AUTH);
1320 }
1321
1322 // null dentries
1323 for (const auto& nb : lump.get_dnull()) {
1324 CDentry *dn = dir->lookup_exact_snap(nb.dn, nb.dnlast);
1325 if (!dn) {
1326 dn = dir->add_null_dentry(nb.dn, nb.dnfirst, nb.dnlast);
1327 dn->set_version(nb.dnv);
1328 if (nb.dirty) dn->_mark_dirty(logseg);
1329 dout(10) << "EMetaBlob.replay added (nullbit) " << *dn << dendl;
1330 } else {
1331 dn->first = nb.dnfirst;
1332 if (!dn->get_linkage()->is_null()) {
1333 dout(10) << "EMetaBlob.replay unlinking " << *dn << dendl;
1334 CInode *in = dn->get_linkage()->get_inode();
1335 // For renamed inode, We may call CInode::force_dirfrag() later.
1336 // CInode::force_dirfrag() doesn't work well when inode is detached
1337 // from the hierarchy.
1338 if (!renamed_diri || renamed_diri != in) {
1339 if (dn->get_linkage()->is_primary())
1340 unlinked[in] = dir;
1341 dir->unlink_inode(dn);
1342 }
1343 }
1344 dn->set_version(nb.dnv);
1345 if (nb.dirty) dn->_mark_dirty(logseg);
1346 dout(10) << "EMetaBlob.replay had " << *dn << dendl;
1347 ceph_assert(dn->last == nb.dnlast);
1348 }
1349 olddir = dir;
1350 if (lump.is_importing())
1351 dn->state_set(CDentry::STATE_AUTH);
1352
1353 // Make null dentries the first things we trim
1354 dout(10) << "EMetaBlob.replay pushing to bottom of lru " << *dn << dendl;
1355 }
1356 }
1357
1358 ceph_assert(g_conf()->mds_kill_journal_replay_at != 3);
1359
1360 if (renamed_dirino) {
1361 if (renamed_diri) {
1362 ceph_assert(unlinked.count(renamed_diri));
1363 ceph_assert(linked.count(renamed_diri));
1364 olddir = unlinked[renamed_diri];
1365 } else {
1366 // we imported a diri we haven't seen before
1367 renamed_diri = mds->mdcache->get_inode(renamed_dirino);
1368 ceph_assert(renamed_diri); // it was in the metablob
1369 }
1370
1371 if (olddir) {
1372 if (olddir->authority() != CDIR_AUTH_UNDEF &&
1373 renamed_diri->authority() == CDIR_AUTH_UNDEF) {
1374 ceph_assert(slaveup); // auth to non-auth, must be slave prepare
1375 frag_vec_t leaves;
1376 renamed_diri->dirfragtree.get_leaves(leaves);
1377 for (const auto& leaf : leaves) {
1378 CDir *dir = renamed_diri->get_dirfrag(leaf);
1379 ceph_assert(dir);
1380 if (dir->get_dir_auth() == CDIR_AUTH_UNDEF)
1381 // preserve subtree bound until slave commit
1382 slaveup->olddirs.insert(dir->inode);
1383 else
1384 dir->state_set(CDir::STATE_AUTH);
1385 }
1386 }
1387
1388 mds->mdcache->adjust_subtree_after_rename(renamed_diri, olddir, false);
1389
1390 // see if we can discard the subtree we renamed out of
1391 CDir *root = mds->mdcache->get_subtree_root(olddir);
1392 if (root->get_dir_auth() == CDIR_AUTH_UNDEF) {
1393 if (slaveup) // preserve the old dir until slave commit
1394 slaveup->olddirs.insert(olddir->inode);
1395 else
1396 mds->mdcache->try_trim_non_auth_subtree(root);
1397 }
1398 }
1399
1400 // if we are the srci importer, we'll also have some dirfrags we have to open up...
1401 if (renamed_diri->authority() != CDIR_AUTH_UNDEF) {
1402 for (const auto& p : renamed_dir_frags) {
1403 CDir *dir = renamed_diri->get_dirfrag(p);
1404 if (dir) {
1405 // we already had the inode before, and we already adjusted this subtree accordingly.
1406 dout(10) << " already had+adjusted rename import bound " << *dir << dendl;
1407 ceph_assert(olddir);
1408 continue;
1409 }
1410 dir = renamed_diri->get_or_open_dirfrag(mds->mdcache, p);
1411 dout(10) << " creating new rename import bound " << *dir << dendl;
1412 dir->state_clear(CDir::STATE_AUTH);
1413 mds->mdcache->adjust_subtree_auth(dir, CDIR_AUTH_UNDEF);
1414 }
1415 }
1416
1417 // rename may overwrite an empty directory and move it into stray dir.
1418 unlinked.erase(renamed_diri);
1419 for (map<CInode*, CDir*>::iterator p = unlinked.begin(); p != unlinked.end(); ++p) {
1420 if (!linked.count(p->first))
1421 continue;
1422 ceph_assert(p->first->is_dir());
1423 mds->mdcache->adjust_subtree_after_rename(p->first, p->second, false);
1424 }
1425 }
1426
1427 if (!unlinked.empty()) {
1428 for (set<CInode*>::iterator p = linked.begin(); p != linked.end(); ++p)
1429 unlinked.erase(*p);
1430 dout(10) << " unlinked set contains " << unlinked << dendl;
1431 for (map<CInode*, CDir*>::iterator p = unlinked.begin(); p != unlinked.end(); ++p) {
1432 CInode *in = p->first;
1433 if (slaveup) { // preserve unlinked inodes until slave commit
1434 slaveup->unlinked.insert(in);
1435 if (in->snaprealm)
1436 in->snaprealm->adjust_parent();
1437 } else
1438 mds->mdcache->remove_inode_recursive(in);
1439 }
1440 }
1441
1442 // table client transactions
1443 for (const auto& p : table_tids) {
1444 dout(10) << "EMetaBlob.replay noting " << get_mdstable_name(p.first)
1445 << " transaction " << p.second << dendl;
1446 MDSTableClient *client = mds->get_table_client(p.first);
1447 if (client)
1448 client->got_journaled_agree(p.second, logseg);
1449 }
1450
1451 // opened ino?
1452 if (opened_ino) {
1453 CInode *in = mds->mdcache->get_inode(opened_ino);
1454 ceph_assert(in);
1455 dout(10) << "EMetaBlob.replay noting opened inode " << *in << dendl;
1456 logseg->open_files.push_back(&in->item_open_file);
1457 }
1458
1459 // allocated_inos
1460 if (inotablev) {
1461 if (mds->inotable->get_version() >= inotablev) {
1462 dout(10) << "EMetaBlob.replay inotable tablev " << inotablev
1463 << " <= table " << mds->inotable->get_version() << dendl;
1464 } else {
1465 dout(10) << "EMetaBlob.replay inotable v " << inotablev
1466 << " - 1 == table " << mds->inotable->get_version()
1467 << " allocated+used " << allocated_ino
1468 << " prealloc " << preallocated_inos
1469 << dendl;
1470 if (allocated_ino)
1471 mds->inotable->replay_alloc_id(allocated_ino);
1472 if (preallocated_inos.size())
1473 mds->inotable->replay_alloc_ids(preallocated_inos);
1474
1475 // [repair bad inotable updates]
1476 if (inotablev > mds->inotable->get_version()) {
1477 mds->clog->error() << "journal replay inotablev mismatch "
1478 << mds->inotable->get_version() << " -> " << inotablev;
1479 mds->inotable->force_replay_version(inotablev);
1480 }
1481
1482 ceph_assert(inotablev == mds->inotable->get_version());
1483 }
1484 }
1485 if (sessionmapv) {
1486 if (mds->sessionmap.get_version() >= sessionmapv) {
1487 dout(10) << "EMetaBlob.replay sessionmap v " << sessionmapv
1488 << " <= table " << mds->sessionmap.get_version() << dendl;
1489 } else if (mds->sessionmap.get_version() + 2 >= sessionmapv) {
1490 dout(10) << "EMetaBlob.replay sessionmap v " << sessionmapv
1491 << " -(1|2) == table " << mds->sessionmap.get_version()
1492 << " prealloc " << preallocated_inos
1493 << " used " << used_preallocated_ino
1494 << dendl;
1495 Session *session = mds->sessionmap.get_session(client_name);
1496 if (session) {
1497 dout(20) << " (session prealloc " << session->info.prealloc_inos << ")" << dendl;
1498 if (used_preallocated_ino) {
1499 if (!session->info.prealloc_inos.empty()) {
1500 inodeno_t next = session->next_ino();
1501 inodeno_t i = session->take_ino(used_preallocated_ino);
1502 if (next != i)
1503 mds->clog->warn() << " replayed op " << client_reqs << " used ino " << i
1504 << " but session next is " << next;
1505 ceph_assert(i == used_preallocated_ino);
1506 session->info.used_inos.clear();
1507 }
1508 mds->sessionmap.replay_dirty_session(session);
1509 }
1510 if (!preallocated_inos.empty()) {
1511 session->info.prealloc_inos.insert(preallocated_inos);
1512 mds->sessionmap.replay_dirty_session(session);
1513 }
1514
1515 } else {
1516 dout(10) << "EMetaBlob.replay no session for " << client_name << dendl;
1517 if (used_preallocated_ino) {
1518 mds->sessionmap.replay_advance_version();
1519 }
1520 if (!preallocated_inos.empty())
1521 mds->sessionmap.replay_advance_version();
1522 }
1523 ceph_assert(sessionmapv == mds->sessionmap.get_version());
1524 } else {
1525 mds->clog->error() << "journal replay sessionmap v " << sessionmapv
1526 << " -(1|2) > table " << mds->sessionmap.get_version();
1527 ceph_assert(g_conf()->mds_wipe_sessions);
1528 mds->sessionmap.wipe();
1529 mds->sessionmap.set_version(sessionmapv);
1530 }
1531 }
1532
1533 // truncating inodes
1534 for (const auto& ino : truncate_start) {
1535 CInode *in = mds->mdcache->get_inode(ino);
1536 ceph_assert(in);
1537 mds->mdcache->add_recovered_truncate(in, logseg);
1538 }
1539 for (const auto& p : truncate_finish) {
1540 LogSegment *ls = mds->mdlog->get_segment(p.second);
1541 if (ls) {
1542 CInode *in = mds->mdcache->get_inode(p.first);
1543 ceph_assert(in);
1544 mds->mdcache->remove_recovered_truncate(in, ls);
1545 }
1546 }
1547
1548 // destroyed inodes
1549 if (!destroyed_inodes.empty()) {
1550 for (vector<inodeno_t>::iterator p = destroyed_inodes.begin();
1551 p != destroyed_inodes.end();
1552 ++p) {
1553 CInode *in = mds->mdcache->get_inode(*p);
1554 if (in) {
1555 dout(10) << "EMetaBlob.replay destroyed " << *p << ", dropping " << *in << dendl;
1556 CDentry *parent = in->get_parent_dn();
1557 mds->mdcache->remove_inode(in);
1558 if (parent) {
1559 dout(10) << "EMetaBlob.replay unlinked from dentry " << *parent << dendl;
1560 ceph_assert(parent->get_linkage()->is_null());
1561 }
1562 } else {
1563 dout(10) << "EMetaBlob.replay destroyed " << *p << ", not in cache" << dendl;
1564 }
1565 }
1566 mds->mdcache->open_file_table.note_destroyed_inos(logseg->seq, destroyed_inodes);
1567 }
1568
1569 // client requests
1570 for (const auto& p : client_reqs) {
1571 if (p.first.name.is_client()) {
1572 dout(10) << "EMetaBlob.replay request " << p.first << " trim_to " << p.second << dendl;
1573 inodeno_t created = allocated_ino ? allocated_ino : used_preallocated_ino;
1574 // if we allocated an inode, there should be exactly one client request id.
1575 ceph_assert(created == inodeno_t() || client_reqs.size() == 1);
1576
1577 Session *session = mds->sessionmap.get_session(p.first.name);
1578 if (session) {
1579 session->add_completed_request(p.first.tid, created);
1580 if (p.second)
1581 session->trim_completed_requests(p.second);
1582 }
1583 }
1584 }
1585
1586 // client flushes
1587 for (const auto& p : client_flushes) {
1588 if (p.first.name.is_client()) {
1589 dout(10) << "EMetaBlob.replay flush " << p.first << " trim_to " << p.second << dendl;
1590 Session *session = mds->sessionmap.get_session(p.first.name);
1591 if (session) {
1592 session->add_completed_flush(p.first.tid);
1593 if (p.second)
1594 session->trim_completed_flushes(p.second);
1595 }
1596 }
1597 }
1598
1599 // update segment
1600 update_segment(logseg);
1601
1602 ceph_assert(g_conf()->mds_kill_journal_replay_at != 4);
1603 }
1604
1605 // -----------------------
1606 // ESession
1607
1608 void ESession::update_segment()
1609 {
1610 get_segment()->sessionmapv = cmapv;
1611 if (inos.size() && inotablev)
1612 get_segment()->inotablev = inotablev;
1613 }
1614
1615 void ESession::replay(MDSRank *mds)
1616 {
1617 if (mds->sessionmap.get_version() >= cmapv) {
1618 dout(10) << "ESession.replay sessionmap " << mds->sessionmap.get_version()
1619 << " >= " << cmapv << ", noop" << dendl;
1620 } else {
1621 dout(10) << "ESession.replay sessionmap " << mds->sessionmap.get_version()
1622 << " < " << cmapv << " " << (open ? "open":"close") << " " << client_inst << dendl;
1623 Session *session;
1624 if (open) {
1625 session = mds->sessionmap.get_or_add_session(client_inst);
1626 mds->sessionmap.set_state(session, Session::STATE_OPEN);
1627 session->set_client_metadata(client_metadata);
1628 dout(10) << " opened session " << session->info.inst << dendl;
1629 } else {
1630 session = mds->sessionmap.get_session(client_inst.name);
1631 if (session) { // there always should be a session, but there's a bug
1632 if (session->get_connection() == NULL) {
1633 dout(10) << " removed session " << session->info.inst << dendl;
1634 mds->sessionmap.remove_session(session);
1635 session = NULL;
1636 } else {
1637 session->clear(); // the client has reconnected; keep the Session, but reset
1638 dout(10) << " reset session " << session->info.inst << " (they reconnected)" << dendl;
1639 }
1640 } else {
1641 mds->clog->error() << "replayed stray Session close event for " << client_inst
1642 << " from time " << stamp << ", ignoring";
1643 }
1644 }
1645 if (session) {
1646 mds->sessionmap.replay_dirty_session(session);
1647 } else {
1648 mds->sessionmap.replay_advance_version();
1649 }
1650 ceph_assert(mds->sessionmap.get_version() == cmapv);
1651 }
1652
1653 if (inos.size() && inotablev) {
1654 if (mds->inotable->get_version() >= inotablev) {
1655 dout(10) << "ESession.replay inotable " << mds->inotable->get_version()
1656 << " >= " << inotablev << ", noop" << dendl;
1657 } else {
1658 dout(10) << "ESession.replay inotable " << mds->inotable->get_version()
1659 << " < " << inotablev << " " << (open ? "add":"remove") << dendl;
1660 ceph_assert(!open); // for now
1661 mds->inotable->replay_release_ids(inos);
1662 ceph_assert(mds->inotable->get_version() == inotablev);
1663 }
1664 }
1665
1666 update_segment();
1667 }
1668
1669 void ESession::encode(bufferlist &bl, uint64_t features) const
1670 {
1671 ENCODE_START(5, 5, bl);
1672 encode(stamp, bl);
1673 encode(client_inst, bl, features);
1674 encode(open, bl);
1675 encode(cmapv, bl);
1676 encode(inos, bl);
1677 encode(inotablev, bl);
1678 encode(client_metadata, bl);
1679 ENCODE_FINISH(bl);
1680 }
1681
1682 void ESession::decode(bufferlist::const_iterator &bl)
1683 {
1684 DECODE_START_LEGACY_COMPAT_LEN(5, 3, 3, bl);
1685 if (struct_v >= 2)
1686 decode(stamp, bl);
1687 decode(client_inst, bl);
1688 decode(open, bl);
1689 decode(cmapv, bl);
1690 decode(inos, bl);
1691 decode(inotablev, bl);
1692 if (struct_v == 4) {
1693 decode(client_metadata.kv_map, bl);
1694 } else if (struct_v >= 5) {
1695 decode(client_metadata, bl);
1696 }
1697 DECODE_FINISH(bl);
1698 }
1699
1700 void ESession::dump(Formatter *f) const
1701 {
1702 f->dump_stream("client instance") << client_inst;
1703 f->dump_string("open", open ? "true" : "false");
1704 f->dump_int("client map version", cmapv);
1705 f->dump_stream("inos") << inos;
1706 f->dump_int("inotable version", inotablev);
1707 f->open_object_section("client_metadata");
1708 client_metadata.dump(f);
1709 f->close_section(); // client_metadata
1710 }
1711
1712 void ESession::generate_test_instances(list<ESession*>& ls)
1713 {
1714 ls.push_back(new ESession);
1715 }
1716
1717 // -----------------------
1718 // ESessions
1719
1720 void ESessions::encode(bufferlist &bl, uint64_t features) const
1721 {
1722 ENCODE_START(2, 1, bl);
1723 encode(client_map, bl, features);
1724 encode(cmapv, bl);
1725 encode(stamp, bl);
1726 encode(client_metadata_map, bl);
1727 ENCODE_FINISH(bl);
1728 }
1729
1730 void ESessions::decode_old(bufferlist::const_iterator &bl)
1731 {
1732 using ceph::decode;
1733 decode(client_map, bl);
1734 decode(cmapv, bl);
1735 if (!bl.end())
1736 decode(stamp, bl);
1737 }
1738
1739 void ESessions::decode_new(bufferlist::const_iterator &bl)
1740 {
1741 DECODE_START(2, bl);
1742 decode(client_map, bl);
1743 decode(cmapv, bl);
1744 decode(stamp, bl);
1745 if (struct_v >= 2)
1746 decode(client_metadata_map, bl);
1747 DECODE_FINISH(bl);
1748 }
1749
1750 void ESessions::dump(Formatter *f) const
1751 {
1752 f->dump_int("client map version", cmapv);
1753
1754 f->open_array_section("client map");
1755 for (map<client_t,entity_inst_t>::const_iterator i = client_map.begin();
1756 i != client_map.end(); ++i) {
1757 f->open_object_section("client");
1758 f->dump_int("client id", i->first.v);
1759 f->dump_stream("client entity") << i->second;
1760 f->close_section(); // client
1761 }
1762 f->close_section(); // client map
1763 }
1764
1765 void ESessions::generate_test_instances(list<ESessions*>& ls)
1766 {
1767 ls.push_back(new ESessions());
1768 }
1769
1770 void ESessions::update_segment()
1771 {
1772 get_segment()->sessionmapv = cmapv;
1773 }
1774
1775 void ESessions::replay(MDSRank *mds)
1776 {
1777 if (mds->sessionmap.get_version() >= cmapv) {
1778 dout(10) << "ESessions.replay sessionmap " << mds->sessionmap.get_version()
1779 << " >= " << cmapv << ", noop" << dendl;
1780 } else {
1781 dout(10) << "ESessions.replay sessionmap " << mds->sessionmap.get_version()
1782 << " < " << cmapv << dendl;
1783 mds->sessionmap.replay_open_sessions(client_map, client_metadata_map);
1784 ceph_assert(mds->sessionmap.get_version() == cmapv);
1785 }
1786 update_segment();
1787 }
1788
1789
1790 // -----------------------
1791 // ETableServer
1792
1793 void ETableServer::encode(bufferlist& bl, uint64_t features) const
1794 {
1795 ENCODE_START(3, 3, bl);
1796 encode(stamp, bl);
1797 encode(table, bl);
1798 encode(op, bl);
1799 encode(reqid, bl);
1800 encode(bymds, bl);
1801 encode(mutation, bl);
1802 encode(tid, bl);
1803 encode(version, bl);
1804 ENCODE_FINISH(bl);
1805 }
1806
1807 void ETableServer::decode(bufferlist::const_iterator &bl)
1808 {
1809 DECODE_START_LEGACY_COMPAT_LEN(3, 3, 3, bl);
1810 if (struct_v >= 2)
1811 decode(stamp, bl);
1812 decode(table, bl);
1813 decode(op, bl);
1814 decode(reqid, bl);
1815 decode(bymds, bl);
1816 decode(mutation, bl);
1817 decode(tid, bl);
1818 decode(version, bl);
1819 DECODE_FINISH(bl);
1820 }
1821
1822 void ETableServer::dump(Formatter *f) const
1823 {
1824 f->dump_int("table id", table);
1825 f->dump_int("op", op);
1826 f->dump_int("request id", reqid);
1827 f->dump_int("by mds", bymds);
1828 f->dump_int("tid", tid);
1829 f->dump_int("version", version);
1830 }
1831
1832 void ETableServer::generate_test_instances(list<ETableServer*>& ls)
1833 {
1834 ls.push_back(new ETableServer());
1835 }
1836
1837
1838 void ETableServer::update_segment()
1839 {
1840 get_segment()->tablev[table] = version;
1841 }
1842
1843 void ETableServer::replay(MDSRank *mds)
1844 {
1845 MDSTableServer *server = mds->get_table_server(table);
1846 if (!server)
1847 return;
1848
1849 if (server->get_version() >= version) {
1850 dout(10) << "ETableServer.replay " << get_mdstable_name(table)
1851 << " " << get_mdstableserver_opname(op)
1852 << " event " << version
1853 << " <= table " << server->get_version() << dendl;
1854 return;
1855 }
1856
1857 dout(10) << " ETableServer.replay " << get_mdstable_name(table)
1858 << " " << get_mdstableserver_opname(op)
1859 << " event " << version << " - 1 == table " << server->get_version() << dendl;
1860 ceph_assert(version-1 == server->get_version());
1861
1862 switch (op) {
1863 case TABLESERVER_OP_PREPARE: {
1864 server->_note_prepare(bymds, reqid, true);
1865 bufferlist out;
1866 server->_prepare(mutation, reqid, bymds, out);
1867 mutation = std::move(out);
1868 break;
1869 }
1870 case TABLESERVER_OP_COMMIT:
1871 server->_commit(tid, MMDSTableRequest::ref());
1872 server->_note_commit(tid, true);
1873 break;
1874 case TABLESERVER_OP_ROLLBACK:
1875 server->_rollback(tid);
1876 server->_note_rollback(tid, true);
1877 break;
1878 case TABLESERVER_OP_SERVER_UPDATE:
1879 server->_server_update(mutation);
1880 server->_note_server_update(mutation, true);
1881 break;
1882 default:
1883 mds->clog->error() << "invalid tableserver op in ETableServer";
1884 mds->damaged();
1885 ceph_abort(); // Should be unreachable because damaged() calls respawn()
1886 }
1887
1888 ceph_assert(version == server->get_version());
1889 update_segment();
1890 }
1891
1892
1893 // ---------------------
1894 // ETableClient
1895
1896 void ETableClient::encode(bufferlist& bl, uint64_t features) const
1897 {
1898 ENCODE_START(3, 3, bl);
1899 encode(stamp, bl);
1900 encode(table, bl);
1901 encode(op, bl);
1902 encode(tid, bl);
1903 ENCODE_FINISH(bl);
1904 }
1905
1906 void ETableClient::decode(bufferlist::const_iterator &bl)
1907 {
1908 DECODE_START_LEGACY_COMPAT_LEN(3, 3, 3, bl);
1909 if (struct_v >= 2)
1910 decode(stamp, bl);
1911 decode(table, bl);
1912 decode(op, bl);
1913 decode(tid, bl);
1914 DECODE_FINISH(bl);
1915 }
1916
1917 void ETableClient::dump(Formatter *f) const
1918 {
1919 f->dump_int("table", table);
1920 f->dump_int("op", op);
1921 f->dump_int("tid", tid);
1922 }
1923
1924 void ETableClient::generate_test_instances(list<ETableClient*>& ls)
1925 {
1926 ls.push_back(new ETableClient());
1927 }
1928
1929 void ETableClient::replay(MDSRank *mds)
1930 {
1931 dout(10) << " ETableClient.replay " << get_mdstable_name(table)
1932 << " op " << get_mdstableserver_opname(op)
1933 << " tid " << tid << dendl;
1934
1935 MDSTableClient *client = mds->get_table_client(table);
1936 if (!client)
1937 return;
1938
1939 ceph_assert(op == TABLESERVER_OP_ACK);
1940 client->got_journaled_ack(tid);
1941 }
1942
1943
1944 // -----------------------
1945 // ESnap
1946 /*
1947 void ESnap::update_segment()
1948 {
1949 get_segment()->tablev[TABLE_SNAP] = version;
1950 }
1951
1952 void ESnap::replay(MDSRank *mds)
1953 {
1954 if (mds->snaptable->get_version() >= version) {
1955 dout(10) << "ESnap.replay event " << version
1956 << " <= table " << mds->snaptable->get_version() << dendl;
1957 return;
1958 }
1959
1960 dout(10) << " ESnap.replay event " << version
1961 << " - 1 == table " << mds->snaptable->get_version() << dendl;
1962 ceph_assert(version-1 == mds->snaptable->get_version());
1963
1964 if (create) {
1965 version_t v;
1966 snapid_t s = mds->snaptable->create(snap.dirino, snap.name, snap.stamp, &v);
1967 ceph_assert(s == snap.snapid);
1968 } else {
1969 mds->snaptable->remove(snap.snapid);
1970 }
1971
1972 ceph_assert(version == mds->snaptable->get_version());
1973 }
1974 */
1975
1976
1977
1978 // -----------------------
1979 // EUpdate
1980
1981 void EUpdate::encode(bufferlist &bl, uint64_t features) const
1982 {
1983 ENCODE_START(4, 4, bl);
1984 encode(stamp, bl);
1985 encode(type, bl);
1986 encode(metablob, bl, features);
1987 encode(client_map, bl);
1988 encode(cmapv, bl);
1989 encode(reqid, bl);
1990 encode(had_slaves, bl);
1991 ENCODE_FINISH(bl);
1992 }
1993
1994 void EUpdate::decode(bufferlist::const_iterator &bl)
1995 {
1996 DECODE_START_LEGACY_COMPAT_LEN(4, 4, 4, bl);
1997 if (struct_v >= 2)
1998 decode(stamp, bl);
1999 decode(type, bl);
2000 decode(metablob, bl);
2001 decode(client_map, bl);
2002 if (struct_v >= 3)
2003 decode(cmapv, bl);
2004 decode(reqid, bl);
2005 decode(had_slaves, bl);
2006 DECODE_FINISH(bl);
2007 }
2008
2009 void EUpdate::dump(Formatter *f) const
2010 {
2011 f->open_object_section("metablob");
2012 metablob.dump(f);
2013 f->close_section(); // metablob
2014
2015 f->dump_string("type", type);
2016 f->dump_int("client map length", client_map.length());
2017 f->dump_int("client map version", cmapv);
2018 f->dump_stream("reqid") << reqid;
2019 f->dump_string("had slaves", had_slaves ? "true" : "false");
2020 }
2021
2022 void EUpdate::generate_test_instances(list<EUpdate*>& ls)
2023 {
2024 ls.push_back(new EUpdate());
2025 }
2026
2027
2028 void EUpdate::update_segment()
2029 {
2030 auto&& segment = get_segment();
2031 metablob.update_segment(segment);
2032
2033 if (client_map.length())
2034 segment->sessionmapv = cmapv;
2035
2036 if (had_slaves)
2037 segment->uncommitted_masters.insert(reqid);
2038 }
2039
2040 void EUpdate::replay(MDSRank *mds)
2041 {
2042 auto&& segment = get_segment();
2043 metablob.replay(mds, segment);
2044
2045 if (had_slaves) {
2046 dout(10) << "EUpdate.replay " << reqid << " had slaves, expecting a matching ECommitted" << dendl;
2047 segment->uncommitted_masters.insert(reqid);
2048 set<mds_rank_t> slaves;
2049 mds->mdcache->add_uncommitted_master(reqid, segment, slaves, true);
2050 }
2051
2052 if (client_map.length()) {
2053 if (mds->sessionmap.get_version() >= cmapv) {
2054 dout(10) << "EUpdate.replay sessionmap v " << cmapv
2055 << " <= table " << mds->sessionmap.get_version() << dendl;
2056 } else {
2057 dout(10) << "EUpdate.replay sessionmap " << mds->sessionmap.get_version()
2058 << " < " << cmapv << dendl;
2059 // open client sessions?
2060 map<client_t,entity_inst_t> cm;
2061 map<client_t,client_metadata_t> cmm;
2062 auto blp = client_map.cbegin();
2063 using ceph::decode;
2064 decode(cm, blp);
2065 if (!blp.end())
2066 decode(cmm, blp);
2067 mds->sessionmap.replay_open_sessions(cm, cmm);
2068
2069 ceph_assert(mds->sessionmap.get_version() == cmapv);
2070 }
2071 }
2072 update_segment();
2073 }
2074
2075
2076 // ------------------------
2077 // EOpen
2078
2079 void EOpen::encode(bufferlist &bl, uint64_t features) const {
2080 ENCODE_START(4, 3, bl);
2081 encode(stamp, bl);
2082 encode(metablob, bl, features);
2083 encode(inos, bl);
2084 encode(snap_inos, bl);
2085 ENCODE_FINISH(bl);
2086 }
2087
2088 void EOpen::decode(bufferlist::const_iterator &bl) {
2089 DECODE_START_LEGACY_COMPAT_LEN(3, 3, 3, bl);
2090 if (struct_v >= 2)
2091 decode(stamp, bl);
2092 decode(metablob, bl);
2093 decode(inos, bl);
2094 if (struct_v >= 4)
2095 decode(snap_inos, bl);
2096 DECODE_FINISH(bl);
2097 }
2098
2099 void EOpen::dump(Formatter *f) const
2100 {
2101 f->open_object_section("metablob");
2102 metablob.dump(f);
2103 f->close_section(); // metablob
2104 f->open_array_section("inos involved");
2105 for (vector<inodeno_t>::const_iterator i = inos.begin();
2106 i != inos.end(); ++i) {
2107 f->dump_int("ino", *i);
2108 }
2109 f->close_section(); // inos
2110 }
2111
2112 void EOpen::generate_test_instances(list<EOpen*>& ls)
2113 {
2114 ls.push_back(new EOpen());
2115 ls.push_back(new EOpen());
2116 ls.back()->add_ino(0);
2117 }
2118
2119 void EOpen::update_segment()
2120 {
2121 // ??
2122 }
2123
2124 void EOpen::replay(MDSRank *mds)
2125 {
2126 dout(10) << "EOpen.replay " << dendl;
2127 auto&& segment = get_segment();
2128 metablob.replay(mds, segment);
2129
2130 // note which segments inodes belong to, so we don't have to start rejournaling them
2131 for (const auto &ino : inos) {
2132 CInode *in = mds->mdcache->get_inode(ino);
2133 if (!in) {
2134 dout(0) << "EOpen.replay ino " << ino << " not in metablob" << dendl;
2135 ceph_assert(in);
2136 }
2137 segment->open_files.push_back(&in->item_open_file);
2138 }
2139 for (const auto &vino : snap_inos) {
2140 CInode *in = mds->mdcache->get_inode(vino);
2141 if (!in) {
2142 dout(0) << "EOpen.replay ino " << vino << " not in metablob" << dendl;
2143 ceph_assert(in);
2144 }
2145 segment->open_files.push_back(&in->item_open_file);
2146 }
2147 }
2148
2149
2150 // -----------------------
2151 // ECommitted
2152
2153 void ECommitted::replay(MDSRank *mds)
2154 {
2155 if (mds->mdcache->uncommitted_masters.count(reqid)) {
2156 dout(10) << "ECommitted.replay " << reqid << dendl;
2157 mds->mdcache->uncommitted_masters[reqid].ls->uncommitted_masters.erase(reqid);
2158 mds->mdcache->uncommitted_masters.erase(reqid);
2159 } else {
2160 dout(10) << "ECommitted.replay " << reqid << " -- didn't see original op" << dendl;
2161 }
2162 }
2163
2164 void ECommitted::encode(bufferlist& bl, uint64_t features) const
2165 {
2166 ENCODE_START(3, 3, bl);
2167 encode(stamp, bl);
2168 encode(reqid, bl);
2169 ENCODE_FINISH(bl);
2170 }
2171
2172 void ECommitted::decode(bufferlist::const_iterator& bl)
2173 {
2174 DECODE_START_LEGACY_COMPAT_LEN(3, 3, 3, bl);
2175 if (struct_v >= 2)
2176 decode(stamp, bl);
2177 decode(reqid, bl);
2178 DECODE_FINISH(bl);
2179 }
2180
2181 void ECommitted::dump(Formatter *f) const {
2182 f->dump_stream("stamp") << stamp;
2183 f->dump_stream("reqid") << reqid;
2184 }
2185
2186 void ECommitted::generate_test_instances(list<ECommitted*>& ls)
2187 {
2188 ls.push_back(new ECommitted);
2189 ls.push_back(new ECommitted);
2190 ls.back()->stamp = utime_t(1, 2);
2191 ls.back()->reqid = metareqid_t(entity_name_t::CLIENT(123), 456);
2192 }
2193
2194 // -----------------------
2195 // ESlaveUpdate
2196
2197 void link_rollback::encode(bufferlist &bl) const
2198 {
2199 ENCODE_START(3, 2, bl);
2200 encode(reqid, bl);
2201 encode(ino, bl);
2202 encode(was_inc, bl);
2203 encode(old_ctime, bl);
2204 encode(old_dir_mtime, bl);
2205 encode(old_dir_rctime, bl);
2206 encode(snapbl, bl);
2207 ENCODE_FINISH(bl);
2208 }
2209
2210 void link_rollback::decode(bufferlist::const_iterator &bl)
2211 {
2212 DECODE_START_LEGACY_COMPAT_LEN(3, 2, 2, bl);
2213 decode(reqid, bl);
2214 decode(ino, bl);
2215 decode(was_inc, bl);
2216 decode(old_ctime, bl);
2217 decode(old_dir_mtime, bl);
2218 decode(old_dir_rctime, bl);
2219 if (struct_v >= 3)
2220 decode(snapbl, bl);
2221 DECODE_FINISH(bl);
2222 }
2223
2224 void link_rollback::dump(Formatter *f) const
2225 {
2226 f->dump_stream("metareqid") << reqid;
2227 f->dump_int("ino", ino);
2228 f->dump_string("was incremented", was_inc ? "true" : "false");
2229 f->dump_stream("old_ctime") << old_ctime;
2230 f->dump_stream("old_dir_mtime") << old_dir_mtime;
2231 f->dump_stream("old_dir_rctime") << old_dir_rctime;
2232 }
2233
2234 void link_rollback::generate_test_instances(list<link_rollback*>& ls)
2235 {
2236 ls.push_back(new link_rollback());
2237 }
2238
2239 void rmdir_rollback::encode(bufferlist& bl) const
2240 {
2241 ENCODE_START(3, 2, bl);
2242 encode(reqid, bl);
2243 encode(src_dir, bl);
2244 encode(src_dname, bl);
2245 encode(dest_dir, bl);
2246 encode(dest_dname, bl);
2247 encode(snapbl, bl);
2248 ENCODE_FINISH(bl);
2249 }
2250
2251 void rmdir_rollback::decode(bufferlist::const_iterator& bl)
2252 {
2253 DECODE_START_LEGACY_COMPAT_LEN(3, 2, 2, bl);
2254 decode(reqid, bl);
2255 decode(src_dir, bl);
2256 decode(src_dname, bl);
2257 decode(dest_dir, bl);
2258 decode(dest_dname, bl);
2259 if (struct_v >= 3)
2260 decode(snapbl, bl);
2261 DECODE_FINISH(bl);
2262 }
2263
2264 void rmdir_rollback::dump(Formatter *f) const
2265 {
2266 f->dump_stream("metareqid") << reqid;
2267 f->dump_stream("source directory") << src_dir;
2268 f->dump_string("source dname", src_dname);
2269 f->dump_stream("destination directory") << dest_dir;
2270 f->dump_string("destination dname", dest_dname);
2271 }
2272
2273 void rmdir_rollback::generate_test_instances(list<rmdir_rollback*>& ls)
2274 {
2275 ls.push_back(new rmdir_rollback());
2276 }
2277
2278 void rename_rollback::drec::encode(bufferlist &bl) const
2279 {
2280 ENCODE_START(2, 2, bl);
2281 encode(dirfrag, bl);
2282 encode(dirfrag_old_mtime, bl);
2283 encode(dirfrag_old_rctime, bl);
2284 encode(ino, bl);
2285 encode(remote_ino, bl);
2286 encode(dname, bl);
2287 encode(remote_d_type, bl);
2288 encode(old_ctime, bl);
2289 ENCODE_FINISH(bl);
2290 }
2291
2292 void rename_rollback::drec::decode(bufferlist::const_iterator &bl)
2293 {
2294 DECODE_START_LEGACY_COMPAT_LEN(2, 2, 2, bl);
2295 decode(dirfrag, bl);
2296 decode(dirfrag_old_mtime, bl);
2297 decode(dirfrag_old_rctime, bl);
2298 decode(ino, bl);
2299 decode(remote_ino, bl);
2300 decode(dname, bl);
2301 decode(remote_d_type, bl);
2302 decode(old_ctime, bl);
2303 DECODE_FINISH(bl);
2304 }
2305
2306 void rename_rollback::drec::dump(Formatter *f) const
2307 {
2308 f->dump_stream("directory fragment") << dirfrag;
2309 f->dump_stream("directory old mtime") << dirfrag_old_mtime;
2310 f->dump_stream("directory old rctime") << dirfrag_old_rctime;
2311 f->dump_int("ino", ino);
2312 f->dump_int("remote ino", remote_ino);
2313 f->dump_string("dname", dname);
2314 uint32_t type = DTTOIF(remote_d_type) & S_IFMT; // convert to type entries
2315 string type_string;
2316 switch(type) {
2317 case S_IFREG:
2318 type_string = "file"; break;
2319 case S_IFLNK:
2320 type_string = "symlink"; break;
2321 case S_IFDIR:
2322 type_string = "directory"; break;
2323 default:
2324 type_string = "UNKNOWN-" + stringify((int)type); break;
2325 }
2326 f->dump_string("remote dtype", type_string);
2327 f->dump_stream("old ctime") << old_ctime;
2328 }
2329
2330 void rename_rollback::drec::generate_test_instances(list<drec*>& ls)
2331 {
2332 ls.push_back(new drec());
2333 ls.back()->remote_d_type = IFTODT(S_IFREG);
2334 }
2335
2336 void rename_rollback::encode(bufferlist &bl) const
2337 {
2338 ENCODE_START(3, 2, bl);
2339 encode(reqid, bl);
2340 encode(orig_src, bl);
2341 encode(orig_dest, bl);
2342 encode(stray, bl);
2343 encode(ctime, bl);
2344 encode(srci_snapbl, bl);
2345 encode(desti_snapbl, bl);
2346 ENCODE_FINISH(bl);
2347 }
2348
2349 void rename_rollback::decode(bufferlist::const_iterator &bl)
2350 {
2351 DECODE_START_LEGACY_COMPAT_LEN(3, 2, 2, bl);
2352 decode(reqid, bl);
2353 decode(orig_src, bl);
2354 decode(orig_dest, bl);
2355 decode(stray, bl);
2356 decode(ctime, bl);
2357 if (struct_v >= 3) {
2358 decode(srci_snapbl, bl);
2359 decode(desti_snapbl, bl);
2360 }
2361 DECODE_FINISH(bl);
2362 }
2363
2364 void rename_rollback::dump(Formatter *f) const
2365 {
2366 f->dump_stream("request id") << reqid;
2367 f->open_object_section("original src drec");
2368 orig_src.dump(f);
2369 f->close_section(); // original src drec
2370 f->open_object_section("original dest drec");
2371 orig_dest.dump(f);
2372 f->close_section(); // original dest drec
2373 f->open_object_section("stray drec");
2374 stray.dump(f);
2375 f->close_section(); // stray drec
2376 f->dump_stream("ctime") << ctime;
2377 }
2378
2379 void rename_rollback::generate_test_instances(list<rename_rollback*>& ls)
2380 {
2381 ls.push_back(new rename_rollback());
2382 ls.back()->orig_src.remote_d_type = IFTODT(S_IFREG);
2383 ls.back()->orig_dest.remote_d_type = IFTODT(S_IFREG);
2384 ls.back()->stray.remote_d_type = IFTODT(S_IFREG);
2385 }
2386
2387 void ESlaveUpdate::encode(bufferlist &bl, uint64_t features) const
2388 {
2389 ENCODE_START(3, 3, bl);
2390 encode(stamp, bl);
2391 encode(type, bl);
2392 encode(reqid, bl);
2393 encode(master, bl);
2394 encode(op, bl);
2395 encode(origop, bl);
2396 encode(commit, bl, features);
2397 encode(rollback, bl);
2398 ENCODE_FINISH(bl);
2399 }
2400
2401 void ESlaveUpdate::decode(bufferlist::const_iterator &bl)
2402 {
2403 DECODE_START_LEGACY_COMPAT_LEN(3, 3, 3, bl);
2404 if (struct_v >= 2)
2405 decode(stamp, bl);
2406 decode(type, bl);
2407 decode(reqid, bl);
2408 decode(master, bl);
2409 decode(op, bl);
2410 decode(origop, bl);
2411 decode(commit, bl);
2412 decode(rollback, bl);
2413 DECODE_FINISH(bl);
2414 }
2415
2416 void ESlaveUpdate::dump(Formatter *f) const
2417 {
2418 f->open_object_section("metablob");
2419 commit.dump(f);
2420 f->close_section(); // metablob
2421
2422 f->dump_int("rollback length", rollback.length());
2423 f->dump_string("type", type);
2424 f->dump_stream("metareqid") << reqid;
2425 f->dump_int("master", master);
2426 f->dump_int("op", op);
2427 f->dump_int("original op", origop);
2428 }
2429
2430 void ESlaveUpdate::generate_test_instances(list<ESlaveUpdate*>& ls)
2431 {
2432 ls.push_back(new ESlaveUpdate());
2433 }
2434
2435
2436 void ESlaveUpdate::replay(MDSRank *mds)
2437 {
2438 MDSlaveUpdate *su;
2439 auto&& segment = get_segment();
2440 switch (op) {
2441 case ESlaveUpdate::OP_PREPARE:
2442 dout(10) << "ESlaveUpdate.replay prepare " << reqid << " for mds." << master
2443 << ": applying commit, saving rollback info" << dendl;
2444 su = new MDSlaveUpdate(origop, rollback, segment->slave_updates);
2445 commit.replay(mds, segment, su);
2446 mds->mdcache->add_uncommitted_slave_update(reqid, master, su);
2447 break;
2448
2449 case ESlaveUpdate::OP_COMMIT:
2450 su = mds->mdcache->get_uncommitted_slave_update(reqid, master);
2451 if (su) {
2452 dout(10) << "ESlaveUpdate.replay commit " << reqid << " for mds." << master << dendl;
2453 mds->mdcache->finish_uncommitted_slave_update(reqid, master);
2454 } else {
2455 dout(10) << "ESlaveUpdate.replay commit " << reqid << " for mds." << master
2456 << ": ignoring, no previously saved prepare" << dendl;
2457 }
2458 break;
2459
2460 case ESlaveUpdate::OP_ROLLBACK:
2461 dout(10) << "ESlaveUpdate.replay abort " << reqid << " for mds." << master
2462 << ": applying rollback commit blob" << dendl;
2463 commit.replay(mds, segment);
2464 su = mds->mdcache->get_uncommitted_slave_update(reqid, master);
2465 if (su)
2466 mds->mdcache->finish_uncommitted_slave_update(reqid, master);
2467 break;
2468
2469 default:
2470 mds->clog->error() << "invalid op in ESlaveUpdate";
2471 mds->damaged();
2472 ceph_abort(); // Should be unreachable because damaged() calls respawn()
2473 }
2474 }
2475
2476
2477 // -----------------------
2478 // ESubtreeMap
2479
2480 void ESubtreeMap::encode(bufferlist& bl, uint64_t features) const
2481 {
2482 ENCODE_START(6, 5, bl);
2483 encode(stamp, bl);
2484 encode(metablob, bl, features);
2485 encode(subtrees, bl);
2486 encode(ambiguous_subtrees, bl);
2487 encode(expire_pos, bl);
2488 encode(event_seq, bl);
2489 ENCODE_FINISH(bl);
2490 }
2491
2492 void ESubtreeMap::decode(bufferlist::const_iterator &bl)
2493 {
2494 DECODE_START_LEGACY_COMPAT_LEN(6, 5, 5, bl);
2495 if (struct_v >= 2)
2496 decode(stamp, bl);
2497 decode(metablob, bl);
2498 decode(subtrees, bl);
2499 if (struct_v >= 4)
2500 decode(ambiguous_subtrees, bl);
2501 if (struct_v >= 3)
2502 decode(expire_pos, bl);
2503 if (struct_v >= 6)
2504 decode(event_seq, bl);
2505 DECODE_FINISH(bl);
2506 }
2507
2508 void ESubtreeMap::dump(Formatter *f) const
2509 {
2510 f->open_object_section("metablob");
2511 metablob.dump(f);
2512 f->close_section(); // metablob
2513
2514 f->open_array_section("subtrees");
2515 for(map<dirfrag_t,vector<dirfrag_t> >::const_iterator i = subtrees.begin();
2516 i != subtrees.end(); ++i) {
2517 f->open_object_section("tree");
2518 f->dump_stream("root dirfrag") << i->first;
2519 for (vector<dirfrag_t>::const_iterator j = i->second.begin();
2520 j != i->second.end(); ++j) {
2521 f->dump_stream("bound dirfrag") << *j;
2522 }
2523 f->close_section(); // tree
2524 }
2525 f->close_section(); // subtrees
2526
2527 f->open_array_section("ambiguous subtrees");
2528 for(set<dirfrag_t>::const_iterator i = ambiguous_subtrees.begin();
2529 i != ambiguous_subtrees.end(); ++i) {
2530 f->dump_stream("dirfrag") << *i;
2531 }
2532 f->close_section(); // ambiguous subtrees
2533
2534 f->dump_int("expire position", expire_pos);
2535 }
2536
2537 void ESubtreeMap::generate_test_instances(list<ESubtreeMap*>& ls)
2538 {
2539 ls.push_back(new ESubtreeMap());
2540 }
2541
2542 void ESubtreeMap::replay(MDSRank *mds)
2543 {
2544 if (expire_pos && expire_pos > mds->mdlog->journaler->get_expire_pos())
2545 mds->mdlog->journaler->set_expire_pos(expire_pos);
2546
2547 // suck up the subtree map?
2548 if (mds->mdcache->is_subtrees()) {
2549 dout(10) << "ESubtreeMap.replay -- i already have import map; verifying" << dendl;
2550 int errors = 0;
2551
2552 for (map<dirfrag_t, vector<dirfrag_t> >::iterator p = subtrees.begin();
2553 p != subtrees.end();
2554 ++p) {
2555 CDir *dir = mds->mdcache->get_dirfrag(p->first);
2556 if (!dir) {
2557 mds->clog->error() << " replayed ESubtreeMap at " << get_start_off()
2558 << " subtree root " << p->first << " not in cache";
2559 ++errors;
2560 continue;
2561 }
2562
2563 if (!mds->mdcache->is_subtree(dir)) {
2564 mds->clog->error() << " replayed ESubtreeMap at " << get_start_off()
2565 << " subtree root " << p->first << " not a subtree in cache";
2566 ++errors;
2567 continue;
2568 }
2569 if (dir->get_dir_auth().first != mds->get_nodeid()) {
2570 mds->clog->error() << " replayed ESubtreeMap at " << get_start_off()
2571 << " subtree root " << p->first
2572 << " is not mine in cache (it's " << dir->get_dir_auth() << ")";
2573 ++errors;
2574 continue;
2575 }
2576
2577 for (vector<dirfrag_t>::iterator q = p->second.begin(); q != p->second.end(); ++q)
2578 mds->mdcache->get_force_dirfrag(*q, true);
2579
2580 set<CDir*> bounds;
2581 mds->mdcache->get_subtree_bounds(dir, bounds);
2582 for (vector<dirfrag_t>::iterator q = p->second.begin(); q != p->second.end(); ++q) {
2583 CDir *b = mds->mdcache->get_dirfrag(*q);
2584 if (!b) {
2585 mds->clog->error() << " replayed ESubtreeMap at " << get_start_off()
2586 << " subtree " << p->first << " bound " << *q << " not in cache";
2587 ++errors;
2588 continue;
2589 }
2590 if (bounds.count(b) == 0) {
2591 mds->clog->error() << " replayed ESubtreeMap at " << get_start_off()
2592 << " subtree " << p->first << " bound " << *q << " not a bound in cache";
2593 ++errors;
2594 continue;
2595 }
2596 bounds.erase(b);
2597 }
2598 for (set<CDir*>::iterator q = bounds.begin(); q != bounds.end(); ++q) {
2599 mds->clog->error() << " replayed ESubtreeMap at " << get_start_off()
2600 << " subtree " << p->first << " has extra bound in cache " << (*q)->dirfrag();
2601 ++errors;
2602 }
2603
2604 if (ambiguous_subtrees.count(p->first)) {
2605 if (!mds->mdcache->have_ambiguous_import(p->first)) {
2606 mds->clog->error() << " replayed ESubtreeMap at " << get_start_off()
2607 << " subtree " << p->first << " is ambiguous but is not in our cache";
2608 ++errors;
2609 }
2610 } else {
2611 if (mds->mdcache->have_ambiguous_import(p->first)) {
2612 mds->clog->error() << " replayed ESubtreeMap at " << get_start_off()
2613 << " subtree " << p->first << " is not ambiguous but is in our cache";
2614 ++errors;
2615 }
2616 }
2617 }
2618
2619 std::vector<CDir*> dirs;
2620 mds->mdcache->get_subtrees(dirs);
2621 for (const auto& dir : dirs) {
2622 if (dir->get_dir_auth().first != mds->get_nodeid())
2623 continue;
2624 if (subtrees.count(dir->dirfrag()) == 0) {
2625 mds->clog->error() << " replayed ESubtreeMap at " << get_start_off()
2626 << " does not include cache subtree " << dir->dirfrag();
2627 ++errors;
2628 }
2629 }
2630
2631 if (errors) {
2632 dout(0) << "journal subtrees: " << subtrees << dendl;
2633 dout(0) << "journal ambig_subtrees: " << ambiguous_subtrees << dendl;
2634 mds->mdcache->show_subtrees();
2635 ceph_assert(!g_conf()->mds_debug_subtrees || errors == 0);
2636 }
2637 return;
2638 }
2639
2640 dout(10) << "ESubtreeMap.replay -- reconstructing (auth) subtree spanning tree" << dendl;
2641
2642 // first, stick the spanning tree in my cache
2643 //metablob.print(*_dout);
2644 metablob.replay(mds, get_segment());
2645
2646 // restore import/export maps
2647 for (map<dirfrag_t, vector<dirfrag_t> >::iterator p = subtrees.begin();
2648 p != subtrees.end();
2649 ++p) {
2650 CDir *dir = mds->mdcache->get_dirfrag(p->first);
2651 ceph_assert(dir);
2652 if (ambiguous_subtrees.count(p->first)) {
2653 // ambiguous!
2654 mds->mdcache->add_ambiguous_import(p->first, p->second);
2655 mds->mdcache->adjust_bounded_subtree_auth(dir, p->second,
2656 mds_authority_t(mds->get_nodeid(), mds->get_nodeid()));
2657 } else {
2658 // not ambiguous
2659 mds->mdcache->adjust_bounded_subtree_auth(dir, p->second, mds->get_nodeid());
2660 }
2661 }
2662
2663 mds->mdcache->recalc_auth_bits(true);
2664
2665 mds->mdcache->show_subtrees();
2666 }
2667
2668
2669
2670 // -----------------------
2671 // EFragment
2672
2673 void EFragment::replay(MDSRank *mds)
2674 {
2675 dout(10) << "EFragment.replay " << op_name(op) << " " << ino << " " << basefrag << " by " << bits << dendl;
2676
2677 list<CDir*> resultfrags;
2678 MDSContext::vec waiters;
2679
2680 // in may be NULL if it wasn't in our cache yet. if it's a prepare
2681 // it will be once we replay the metablob , but first we need to
2682 // refragment anything we already have in the cache.
2683 CInode *in = mds->mdcache->get_inode(ino);
2684
2685 auto&& segment = get_segment();
2686 switch (op) {
2687 case OP_PREPARE:
2688 mds->mdcache->add_uncommitted_fragment(dirfrag_t(ino, basefrag), bits, orig_frags, segment, &rollback);
2689
2690 if (in)
2691 mds->mdcache->adjust_dir_fragments(in, basefrag, bits, resultfrags, waiters, true);
2692 break;
2693
2694 case OP_ROLLBACK: {
2695 frag_vec_t old_frags;
2696 if (in) {
2697 in->dirfragtree.get_leaves_under(basefrag, old_frags);
2698 if (orig_frags.empty()) {
2699 // old format EFragment
2700 mds->mdcache->adjust_dir_fragments(in, basefrag, -bits, resultfrags, waiters, true);
2701 } else {
2702 for (const auto& fg : orig_frags)
2703 mds->mdcache->force_dir_fragment(in, fg);
2704 }
2705 }
2706 mds->mdcache->rollback_uncommitted_fragment(dirfrag_t(ino, basefrag), std::move(old_frags));
2707 break;
2708 }
2709
2710 case OP_COMMIT:
2711 case OP_FINISH:
2712 mds->mdcache->finish_uncommitted_fragment(dirfrag_t(ino, basefrag), op);
2713 break;
2714
2715 default:
2716 ceph_abort();
2717 }
2718
2719 metablob.replay(mds, segment);
2720 if (in && g_conf()->mds_debug_frag)
2721 in->verify_dirfrags();
2722 }
2723
2724 void EFragment::encode(bufferlist &bl, uint64_t features) const {
2725 ENCODE_START(5, 4, bl);
2726 encode(stamp, bl);
2727 encode(op, bl);
2728 encode(ino, bl);
2729 encode(basefrag, bl);
2730 encode(bits, bl);
2731 encode(metablob, bl, features);
2732 encode(orig_frags, bl);
2733 encode(rollback, bl);
2734 ENCODE_FINISH(bl);
2735 }
2736
2737 void EFragment::decode(bufferlist::const_iterator &bl) {
2738 DECODE_START_LEGACY_COMPAT_LEN(5, 4, 4, bl);
2739 if (struct_v >= 2)
2740 decode(stamp, bl);
2741 if (struct_v >= 3)
2742 decode(op, bl);
2743 decode(ino, bl);
2744 decode(basefrag, bl);
2745 decode(bits, bl);
2746 decode(metablob, bl);
2747 if (struct_v >= 5) {
2748 decode(orig_frags, bl);
2749 decode(rollback, bl);
2750 }
2751 DECODE_FINISH(bl);
2752 }
2753
2754 void EFragment::dump(Formatter *f) const
2755 {
2756 /*f->open_object_section("Metablob");
2757 metablob.dump(f); // sadly we don't have this; dunno if we'll get it
2758 f->close_section();*/
2759 f->dump_string("op", op_name(op));
2760 f->dump_stream("ino") << ino;
2761 f->dump_stream("base frag") << basefrag;
2762 f->dump_int("bits", bits);
2763 }
2764
2765 void EFragment::generate_test_instances(list<EFragment*>& ls)
2766 {
2767 ls.push_back(new EFragment);
2768 ls.push_back(new EFragment);
2769 ls.back()->op = OP_PREPARE;
2770 ls.back()->ino = 1;
2771 ls.back()->bits = 5;
2772 }
2773
2774 void dirfrag_rollback::encode(bufferlist &bl) const
2775 {
2776 ENCODE_START(1, 1, bl);
2777 encode(fnode, bl);
2778 ENCODE_FINISH(bl);
2779 }
2780
2781 void dirfrag_rollback::decode(bufferlist::const_iterator &bl)
2782 {
2783 DECODE_START(1, bl);
2784 decode(fnode, bl);
2785 DECODE_FINISH(bl);
2786 }
2787
2788
2789
2790 // =========================================================================
2791
2792 // -----------------------
2793 // EExport
2794
2795 void EExport::replay(MDSRank *mds)
2796 {
2797 dout(10) << "EExport.replay " << base << dendl;
2798 auto&& segment = get_segment();
2799 metablob.replay(mds, segment);
2800
2801 CDir *dir = mds->mdcache->get_dirfrag(base);
2802 ceph_assert(dir);
2803
2804 set<CDir*> realbounds;
2805 for (set<dirfrag_t>::iterator p = bounds.begin();
2806 p != bounds.end();
2807 ++p) {
2808 CDir *bd = mds->mdcache->get_dirfrag(*p);
2809 ceph_assert(bd);
2810 realbounds.insert(bd);
2811 }
2812
2813 // adjust auth away
2814 mds->mdcache->adjust_bounded_subtree_auth(dir, realbounds, CDIR_AUTH_UNDEF);
2815
2816 mds->mdcache->try_trim_non_auth_subtree(dir);
2817 }
2818
2819 void EExport::encode(bufferlist& bl, uint64_t features) const
2820 {
2821 ENCODE_START(4, 3, bl);
2822 encode(stamp, bl);
2823 encode(metablob, bl, features);
2824 encode(base, bl);
2825 encode(bounds, bl);
2826 encode(target, bl);
2827 ENCODE_FINISH(bl);
2828 }
2829
2830 void EExport::decode(bufferlist::const_iterator &bl)
2831 {
2832 DECODE_START_LEGACY_COMPAT_LEN(3, 3, 3, bl);
2833 if (struct_v >= 2)
2834 decode(stamp, bl);
2835 decode(metablob, bl);
2836 decode(base, bl);
2837 decode(bounds, bl);
2838 if (struct_v >= 4)
2839 decode(target, bl);
2840 DECODE_FINISH(bl);
2841 }
2842
2843 void EExport::dump(Formatter *f) const
2844 {
2845 f->dump_float("stamp", (double)stamp);
2846 /*f->open_object_section("Metablob");
2847 metablob.dump(f); // sadly we don't have this; dunno if we'll get it
2848 f->close_section();*/
2849 f->dump_stream("base dirfrag") << base;
2850 f->open_array_section("bounds dirfrags");
2851 for (set<dirfrag_t>::const_iterator i = bounds.begin();
2852 i != bounds.end(); ++i) {
2853 f->dump_stream("dirfrag") << *i;
2854 }
2855 f->close_section(); // bounds dirfrags
2856 }
2857
2858 void EExport::generate_test_instances(list<EExport*>& ls)
2859 {
2860 EExport *sample = new EExport();
2861 ls.push_back(sample);
2862 }
2863
2864
2865 // -----------------------
2866 // EImportStart
2867
2868 void EImportStart::update_segment()
2869 {
2870 get_segment()->sessionmapv = cmapv;
2871 }
2872
2873 void EImportStart::replay(MDSRank *mds)
2874 {
2875 dout(10) << "EImportStart.replay " << base << " bounds " << bounds << dendl;
2876 //metablob.print(*_dout);
2877 auto&& segment = get_segment();
2878 metablob.replay(mds, segment);
2879
2880 // put in ambiguous import list
2881 mds->mdcache->add_ambiguous_import(base, bounds);
2882
2883 // set auth partially to us so we don't trim it
2884 CDir *dir = mds->mdcache->get_dirfrag(base);
2885 ceph_assert(dir);
2886
2887 set<CDir*> realbounds;
2888 for (vector<dirfrag_t>::iterator p = bounds.begin();
2889 p != bounds.end();
2890 ++p) {
2891 CDir *bd = mds->mdcache->get_dirfrag(*p);
2892 ceph_assert(bd);
2893 if (!bd->is_subtree_root())
2894 bd->state_clear(CDir::STATE_AUTH);
2895 realbounds.insert(bd);
2896 }
2897
2898 mds->mdcache->adjust_bounded_subtree_auth(dir, realbounds,
2899 mds_authority_t(mds->get_nodeid(), mds->get_nodeid()));
2900
2901 // open client sessions?
2902 if (mds->sessionmap.get_version() >= cmapv) {
2903 dout(10) << "EImportStart.replay sessionmap " << mds->sessionmap.get_version()
2904 << " >= " << cmapv << ", noop" << dendl;
2905 } else {
2906 dout(10) << "EImportStart.replay sessionmap " << mds->sessionmap.get_version()
2907 << " < " << cmapv << dendl;
2908 map<client_t,entity_inst_t> cm;
2909 map<client_t,client_metadata_t> cmm;
2910 auto blp = client_map.cbegin();
2911 using ceph::decode;
2912 decode(cm, blp);
2913 if (!blp.end())
2914 decode(cmm, blp);
2915 mds->sessionmap.replay_open_sessions(cm, cmm);
2916
2917 if (mds->sessionmap.get_version() != cmapv) {
2918 derr << "sessionmap version " << mds->sessionmap.get_version()
2919 << " != cmapv " << cmapv << dendl;
2920 mds->clog->error() << "failure replaying journal (EImportStart)";
2921 mds->damaged();
2922 ceph_abort(); // Should be unreachable because damaged() calls respawn()
2923 }
2924 }
2925 update_segment();
2926 }
2927
2928 void EImportStart::encode(bufferlist &bl, uint64_t features) const {
2929 ENCODE_START(4, 3, bl);
2930 encode(stamp, bl);
2931 encode(base, bl);
2932 encode(metablob, bl, features);
2933 encode(bounds, bl);
2934 encode(cmapv, bl);
2935 encode(client_map, bl);
2936 encode(from, bl);
2937 ENCODE_FINISH(bl);
2938 }
2939
2940 void EImportStart::decode(bufferlist::const_iterator &bl) {
2941 DECODE_START_LEGACY_COMPAT_LEN(3, 3, 3, bl);
2942 if (struct_v >= 2)
2943 decode(stamp, bl);
2944 decode(base, bl);
2945 decode(metablob, bl);
2946 decode(bounds, bl);
2947 decode(cmapv, bl);
2948 decode(client_map, bl);
2949 if (struct_v >= 4)
2950 decode(from, bl);
2951 DECODE_FINISH(bl);
2952 }
2953
2954 void EImportStart::dump(Formatter *f) const
2955 {
2956 f->dump_stream("base dirfrag") << base;
2957 f->open_array_section("boundary dirfrags");
2958 for (vector<dirfrag_t>::const_iterator iter = bounds.begin();
2959 iter != bounds.end(); ++iter) {
2960 f->dump_stream("frag") << *iter;
2961 }
2962 f->close_section();
2963 }
2964
2965 void EImportStart::generate_test_instances(list<EImportStart*>& ls)
2966 {
2967 ls.push_back(new EImportStart);
2968 }
2969
2970 // -----------------------
2971 // EImportFinish
2972
2973 void EImportFinish::replay(MDSRank *mds)
2974 {
2975 if (mds->mdcache->have_ambiguous_import(base)) {
2976 dout(10) << "EImportFinish.replay " << base << " success=" << success << dendl;
2977 if (success) {
2978 mds->mdcache->finish_ambiguous_import(base);
2979 } else {
2980 CDir *dir = mds->mdcache->get_dirfrag(base);
2981 ceph_assert(dir);
2982 vector<dirfrag_t> bounds;
2983 mds->mdcache->get_ambiguous_import_bounds(base, bounds);
2984 mds->mdcache->adjust_bounded_subtree_auth(dir, bounds, CDIR_AUTH_UNDEF);
2985 mds->mdcache->cancel_ambiguous_import(dir);
2986 mds->mdcache->try_trim_non_auth_subtree(dir);
2987 }
2988 } else {
2989 // this shouldn't happen unless this is an old journal
2990 dout(10) << "EImportFinish.replay " << base << " success=" << success
2991 << " on subtree not marked as ambiguous"
2992 << dendl;
2993 mds->clog->error() << "failure replaying journal (EImportFinish)";
2994 mds->damaged();
2995 ceph_abort(); // Should be unreachable because damaged() calls respawn()
2996 }
2997 }
2998
2999 void EImportFinish::encode(bufferlist& bl, uint64_t features) const
3000 {
3001 ENCODE_START(3, 3, bl);
3002 encode(stamp, bl);
3003 encode(base, bl);
3004 encode(success, bl);
3005 ENCODE_FINISH(bl);
3006 }
3007
3008 void EImportFinish::decode(bufferlist::const_iterator &bl)
3009 {
3010 DECODE_START_LEGACY_COMPAT_LEN(3, 3, 3, bl);
3011 if (struct_v >= 2)
3012 decode(stamp, bl);
3013 decode(base, bl);
3014 decode(success, bl);
3015 DECODE_FINISH(bl);
3016 }
3017
3018 void EImportFinish::dump(Formatter *f) const
3019 {
3020 f->dump_stream("base dirfrag") << base;
3021 f->dump_string("success", success ? "true" : "false");
3022 }
3023 void EImportFinish::generate_test_instances(list<EImportFinish*>& ls)
3024 {
3025 ls.push_back(new EImportFinish);
3026 ls.push_back(new EImportFinish);
3027 ls.back()->success = true;
3028 }
3029
3030
3031 // ------------------------
3032 // EResetJournal
3033
3034 void EResetJournal::encode(bufferlist& bl, uint64_t features) const
3035 {
3036 ENCODE_START(2, 2, bl);
3037 encode(stamp, bl);
3038 ENCODE_FINISH(bl);
3039 }
3040
3041 void EResetJournal::decode(bufferlist::const_iterator &bl)
3042 {
3043 DECODE_START_LEGACY_COMPAT_LEN(2, 2, 2, bl);
3044 decode(stamp, bl);
3045 DECODE_FINISH(bl);
3046 }
3047
3048 void EResetJournal::dump(Formatter *f) const
3049 {
3050 f->dump_stream("timestamp") << stamp;
3051 }
3052
3053 void EResetJournal::generate_test_instances(list<EResetJournal*>& ls)
3054 {
3055 ls.push_back(new EResetJournal());
3056 }
3057
3058 void EResetJournal::replay(MDSRank *mds)
3059 {
3060 dout(1) << "EResetJournal" << dendl;
3061
3062 mds->sessionmap.wipe();
3063 mds->inotable->replay_reset();
3064
3065 if (mds->mdsmap->get_root() == mds->get_nodeid()) {
3066 CDir *rootdir = mds->mdcache->get_root()->get_or_open_dirfrag(mds->mdcache, frag_t());
3067 mds->mdcache->adjust_subtree_auth(rootdir, mds->get_nodeid());
3068 }
3069
3070 CDir *mydir = mds->mdcache->get_myin()->get_or_open_dirfrag(mds->mdcache, frag_t());
3071 mds->mdcache->adjust_subtree_auth(mydir, mds->get_nodeid());
3072
3073 mds->mdcache->recalc_auth_bits(true);
3074
3075 mds->mdcache->show_subtrees();
3076 }
3077
3078
3079 void ENoOp::encode(bufferlist &bl, uint64_t features) const
3080 {
3081 ENCODE_START(2, 2, bl);
3082 encode(pad_size, bl);
3083 uint8_t const pad = 0xff;
3084 for (unsigned int i = 0; i < pad_size; ++i) {
3085 encode(pad, bl);
3086 }
3087 ENCODE_FINISH(bl);
3088 }
3089
3090
3091 void ENoOp::decode(bufferlist::const_iterator &bl)
3092 {
3093 DECODE_START(2, bl);
3094 decode(pad_size, bl);
3095 if (bl.get_remaining() != pad_size) {
3096 // This is spiritually an assertion, but expressing in a way that will let
3097 // journal debug tools catch it and recognise a malformed entry.
3098 throw buffer::end_of_buffer();
3099 } else {
3100 bl.advance(pad_size);
3101 }
3102 DECODE_FINISH(bl);
3103 }
3104
3105
3106 void ENoOp::replay(MDSRank *mds)
3107 {
3108 dout(4) << "ENoOp::replay, " << pad_size << " bytes skipped in journal" << dendl;
3109 }
3110
3111 /**
3112 * If re-formatting an old journal that used absolute log position
3113 * references as segment sequence numbers, use this function to update
3114 * it.
3115 *
3116 * @param mds
3117 * MDSRank instance, just used for logging
3118 * @param old_to_new
3119 * Map of old journal segment sequence numbers to new journal segment sequence numbers
3120 *
3121 * @return
3122 * True if the event was modified.
3123 */
3124 bool EMetaBlob::rewrite_truncate_finish(MDSRank const *mds,
3125 std::map<log_segment_seq_t, log_segment_seq_t> const &old_to_new)
3126 {
3127 bool modified = false;
3128 map<inodeno_t, log_segment_seq_t> new_trunc_finish;
3129 for (const auto& p : truncate_finish) {
3130 auto q = old_to_new.find(p.second);
3131 if (q != old_to_new.end()) {
3132 dout(20) << __func__ << " applying segment seq mapping "
3133 << p.second << " -> " << q->second << dendl;
3134 new_trunc_finish.emplace(p.first, q->second);
3135 modified = true;
3136 } else {
3137 dout(20) << __func__ << " no segment seq mapping found for "
3138 << p.second << dendl;
3139 new_trunc_finish.insert(p);
3140 }
3141 }
3142 truncate_finish.swap(new_trunc_finish);
3143
3144 return modified;
3145 }