]> git.proxmox.com Git - ceph.git/blob - ceph/src/mds/CInode.cc
update source to 12.2.11
[ceph.git] / ceph / src / mds / CInode.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include "include/int_types.h"
16 #include "common/errno.h"
17
18 #include <string>
19 #include <stdio.h>
20
21 #include "CInode.h"
22 #include "CDir.h"
23 #include "CDentry.h"
24
25 #include "MDSRank.h"
26 #include "MDCache.h"
27 #include "MDLog.h"
28 #include "Locker.h"
29 #include "Mutation.h"
30
31 #include "events/EUpdate.h"
32
33 #include "osdc/Objecter.h"
34
35 #include "snap.h"
36
37 #include "LogSegment.h"
38
39 #include "common/Clock.h"
40
41 #include "messages/MLock.h"
42 #include "messages/MClientCaps.h"
43
44 #include "common/config.h"
45 #include "global/global_context.h"
46 #include "include/assert.h"
47
48 #include "mds/MDSContinuation.h"
49 #include "mds/InoTable.h"
50
51 #define dout_context g_ceph_context
52 #define dout_subsys ceph_subsys_mds
53 #undef dout_prefix
54 #define dout_prefix *_dout << "mds." << mdcache->mds->get_nodeid() << ".cache.ino(" << inode.ino << ") "
55
56
57 class CInodeIOContext : public MDSIOContextBase
58 {
59 protected:
60 CInode *in;
61 MDSRank *get_mds() override {return in->mdcache->mds;}
62 public:
63 explicit CInodeIOContext(CInode *in_) : in(in_) {
64 assert(in != NULL);
65 }
66 };
67
68
69 LockType CInode::versionlock_type(CEPH_LOCK_IVERSION);
70 LockType CInode::authlock_type(CEPH_LOCK_IAUTH);
71 LockType CInode::linklock_type(CEPH_LOCK_ILINK);
72 LockType CInode::dirfragtreelock_type(CEPH_LOCK_IDFT);
73 LockType CInode::filelock_type(CEPH_LOCK_IFILE);
74 LockType CInode::xattrlock_type(CEPH_LOCK_IXATTR);
75 LockType CInode::snaplock_type(CEPH_LOCK_ISNAP);
76 LockType CInode::nestlock_type(CEPH_LOCK_INEST);
77 LockType CInode::flocklock_type(CEPH_LOCK_IFLOCK);
78 LockType CInode::policylock_type(CEPH_LOCK_IPOLICY);
79
80 //int cinode_pins[CINODE_NUM_PINS]; // counts
81 ostream& CInode::print_db_line_prefix(ostream& out)
82 {
83 return out << ceph_clock_now() << " mds." << mdcache->mds->get_nodeid() << ".cache.ino(" << inode.ino << ") ";
84 }
85
86 /*
87 * write caps and lock ids
88 */
89 struct cinode_lock_info_t cinode_lock_info[] = {
90 { CEPH_LOCK_IFILE, CEPH_CAP_ANY_FILE_WR },
91 { CEPH_LOCK_IAUTH, CEPH_CAP_AUTH_EXCL },
92 { CEPH_LOCK_ILINK, CEPH_CAP_LINK_EXCL },
93 { CEPH_LOCK_IXATTR, CEPH_CAP_XATTR_EXCL },
94 };
95 int num_cinode_locks = sizeof(cinode_lock_info) / sizeof(cinode_lock_info[0]);
96
97
98
99 ostream& operator<<(ostream& out, const CInode& in)
100 {
101 string path;
102 in.make_path_string(path, true);
103
104 out << "[inode " << in.inode.ino;
105 out << " ["
106 << (in.is_multiversion() ? "...":"")
107 << in.first << "," << in.last << "]";
108 out << " " << path << (in.is_dir() ? "/":"");
109
110 if (in.is_auth()) {
111 out << " auth";
112 if (in.is_replicated())
113 out << in.get_replicas();
114 } else {
115 mds_authority_t a = in.authority();
116 out << " rep@" << a.first;
117 if (a.second != CDIR_AUTH_UNKNOWN)
118 out << "," << a.second;
119 out << "." << in.get_replica_nonce();
120 }
121
122 if (in.is_symlink())
123 out << " symlink='" << in.symlink << "'";
124 if (in.is_dir() && !in.dirfragtree.empty())
125 out << " " << in.dirfragtree;
126
127 out << " v" << in.get_version();
128 if (in.get_projected_version() > in.get_version())
129 out << " pv" << in.get_projected_version();
130
131 if (in.is_auth_pinned()) {
132 out << " ap=" << in.get_num_auth_pins() << "+" << in.get_num_nested_auth_pins();
133 #ifdef MDS_AUTHPIN_SET
134 out << "(" << in.auth_pin_set << ")";
135 #endif
136 }
137
138 if (in.snaprealm)
139 out << " snaprealm=" << in.snaprealm;
140
141 if (in.state_test(CInode::STATE_AMBIGUOUSAUTH)) out << " AMBIGAUTH";
142 if (in.state_test(CInode::STATE_NEEDSRECOVER)) out << " needsrecover";
143 if (in.state_test(CInode::STATE_RECOVERING)) out << " recovering";
144 if (in.state_test(CInode::STATE_DIRTYPARENT)) out << " dirtyparent";
145 if (in.state_test(CInode::STATE_MISSINGOBJS)) out << " missingobjs";
146 if (in.is_freezing_inode()) out << " FREEZING=" << in.auth_pin_freeze_allowance;
147 if (in.is_frozen_inode()) out << " FROZEN";
148 if (in.is_frozen_auth_pin()) out << " FROZEN_AUTHPIN";
149
150 const CInode::mempool_inode *pi = in.get_projected_inode();
151 if (pi->is_truncating())
152 out << " truncating(" << pi->truncate_from << " to " << pi->truncate_size << ")";
153
154 if (in.inode.is_dir()) {
155 out << " " << in.inode.dirstat;
156 if (g_conf->mds_debug_scatterstat && in.is_projected()) {
157 const CInode::mempool_inode *pi = in.get_projected_inode();
158 out << "->" << pi->dirstat;
159 }
160 } else {
161 out << " s=" << in.inode.size;
162 if (in.inode.nlink != 1)
163 out << " nl=" << in.inode.nlink;
164 }
165
166 // rstat
167 out << " " << in.inode.rstat;
168 if (!(in.inode.rstat == in.inode.accounted_rstat))
169 out << "/" << in.inode.accounted_rstat;
170 if (g_conf->mds_debug_scatterstat && in.is_projected()) {
171 const CInode::mempool_inode *pi = in.get_projected_inode();
172 out << "->" << pi->rstat;
173 if (!(pi->rstat == pi->accounted_rstat))
174 out << "/" << pi->accounted_rstat;
175 }
176
177 if (!in.client_need_snapflush.empty())
178 out << " need_snapflush=" << in.client_need_snapflush;
179
180
181 // locks
182 if (!in.authlock.is_sync_and_unlocked())
183 out << " " << in.authlock;
184 if (!in.linklock.is_sync_and_unlocked())
185 out << " " << in.linklock;
186 if (in.inode.is_dir()) {
187 if (!in.dirfragtreelock.is_sync_and_unlocked())
188 out << " " << in.dirfragtreelock;
189 if (!in.snaplock.is_sync_and_unlocked())
190 out << " " << in.snaplock;
191 if (!in.nestlock.is_sync_and_unlocked())
192 out << " " << in.nestlock;
193 if (!in.policylock.is_sync_and_unlocked())
194 out << " " << in.policylock;
195 } else {
196 if (!in.flocklock.is_sync_and_unlocked())
197 out << " " << in.flocklock;
198 }
199 if (!in.filelock.is_sync_and_unlocked())
200 out << " " << in.filelock;
201 if (!in.xattrlock.is_sync_and_unlocked())
202 out << " " << in.xattrlock;
203 if (!in.versionlock.is_sync_and_unlocked())
204 out << " " << in.versionlock;
205
206 // hack: spit out crap on which clients have caps
207 if (in.inode.client_ranges.size())
208 out << " cr=" << in.inode.client_ranges;
209
210 if (!in.get_client_caps().empty()) {
211 out << " caps={";
212 for (map<client_t,Capability*>::const_iterator it = in.get_client_caps().begin();
213 it != in.get_client_caps().end();
214 ++it) {
215 if (it != in.get_client_caps().begin()) out << ",";
216 out << it->first << "="
217 << ccap_string(it->second->pending());
218 if (it->second->issued() != it->second->pending())
219 out << "/" << ccap_string(it->second->issued());
220 out << "/" << ccap_string(it->second->wanted())
221 << "@" << it->second->get_last_sent();
222 }
223 out << "}";
224 if (in.get_loner() >= 0 || in.get_wanted_loner() >= 0) {
225 out << ",l=" << in.get_loner();
226 if (in.get_loner() != in.get_wanted_loner())
227 out << "(" << in.get_wanted_loner() << ")";
228 }
229 }
230 if (!in.get_mds_caps_wanted().empty()) {
231 out << " mcw={";
232 bool first = true;
233 for (const auto &p : in.get_mds_caps_wanted()) {
234 if (!first)
235 out << ',';
236 out << p.first << '=' << ccap_string(p.second);
237 first = false;
238 }
239 out << '}';
240 }
241
242 if (in.get_num_ref()) {
243 out << " |";
244 in.print_pin_set(out);
245 }
246
247 if (in.inode.export_pin != MDS_RANK_NONE) {
248 out << " export_pin=" << in.inode.export_pin;
249 }
250
251 out << " " << &in;
252 out << "]";
253 return out;
254 }
255
256 ostream& operator<<(ostream& out, const CInode::scrub_stamp_info_t& si)
257 {
258 out << "{scrub_start_version: " << si.scrub_start_version
259 << ", scrub_start_stamp: " << si.scrub_start_stamp
260 << ", last_scrub_version: " << si.last_scrub_version
261 << ", last_scrub_stamp: " << si.last_scrub_stamp;
262 return out;
263 }
264
265
266
267 void CInode::print(ostream& out)
268 {
269 out << *this;
270 }
271
272
273
274 void CInode::add_need_snapflush(CInode *snapin, snapid_t snapid, client_t client)
275 {
276 dout(10) << "add_need_snapflush client." << client << " snapid " << snapid << " on " << snapin << dendl;
277
278 if (client_need_snapflush.empty()) {
279 get(CInode::PIN_NEEDSNAPFLUSH);
280
281 // FIXME: this is non-optimal, as we'll block freezes/migrations for potentially
282 // long periods waiting for clients to flush their snaps.
283 auth_pin(this); // pin head inode...
284 }
285
286 auto &clients = client_need_snapflush[snapid];
287 if (clients.empty())
288 snapin->auth_pin(this); // ...and pin snapped/old inode!
289
290 clients.insert(client);
291 }
292
293 void CInode::remove_need_snapflush(CInode *snapin, snapid_t snapid, client_t client)
294 {
295 dout(10) << __func__ << " client." << client << " snapid " << snapid << " on " << snapin << dendl;
296 auto it = client_need_snapflush.find(snapid);
297 if (it == client_need_snapflush.end()) {
298 dout(10) << " snapid not found" << dendl;
299 return;
300 }
301 size_t n = it->second.erase(client);
302 if (n == 0) {
303 dout(10) << " client not found" << dendl;
304 return;
305 }
306 if (it->second.empty()) {
307 client_need_snapflush.erase(it);
308 snapin->auth_unpin(this);
309
310 if (client_need_snapflush.empty()) {
311 put(CInode::PIN_NEEDSNAPFLUSH);
312 auth_unpin(this);
313 }
314 }
315 }
316
317 bool CInode::split_need_snapflush(CInode *cowin, CInode *in)
318 {
319 dout(10) << "split_need_snapflush [" << cowin->first << "," << cowin->last << "] for " << *cowin << dendl;
320 bool need_flush = false;
321 for (auto it = client_need_snapflush.lower_bound(cowin->first);
322 it != client_need_snapflush.end() && it->first < in->first; ) {
323 assert(!it->second.empty());
324 if (cowin->last >= it->first) {
325 cowin->auth_pin(this);
326 need_flush = true;
327 ++it;
328 } else {
329 it = client_need_snapflush.erase(it);
330 }
331 in->auth_unpin(this);
332 }
333 return need_flush;
334 }
335
336 void CInode::mark_dirty_rstat()
337 {
338 if (!state_test(STATE_DIRTYRSTAT)) {
339 dout(10) << "mark_dirty_rstat" << dendl;
340 state_set(STATE_DIRTYRSTAT);
341 get(PIN_DIRTYRSTAT);
342 CDentry *pdn = get_projected_parent_dn();
343 if (pdn->is_auth()) {
344 CDir *pdir = pdn->dir;
345 pdir->dirty_rstat_inodes.push_back(&dirty_rstat_item);
346 mdcache->mds->locker->mark_updated_scatterlock(&pdir->inode->nestlock);
347 } else {
348 // under cross-MDS rename.
349 // DIRTYRSTAT flag will get cleared when rename finishes
350 assert(state_test(STATE_AMBIGUOUSAUTH));
351 }
352 }
353 }
354 void CInode::clear_dirty_rstat()
355 {
356 if (state_test(STATE_DIRTYRSTAT)) {
357 dout(10) << "clear_dirty_rstat" << dendl;
358 state_clear(STATE_DIRTYRSTAT);
359 put(PIN_DIRTYRSTAT);
360 dirty_rstat_item.remove_myself();
361 }
362 }
363
364 /* Ideally this function would be subsumed by project_inode but it is also
365 * needed by CInode::project_past_snaprealm_parent so we keep it.
366 */
367 sr_t &CInode::project_snaprealm(projected_inode &pi)
368 {
369 const sr_t *cur_srnode = get_projected_srnode();
370
371 assert(!pi.snapnode);
372 if (cur_srnode) {
373 pi.snapnode.reset(new sr_t(*cur_srnode));
374 } else {
375 pi.snapnode.reset(new sr_t());
376 pi.snapnode->created = 0;
377 pi.snapnode->current_parent_since = get_oldest_snap();
378 }
379 ++num_projected_srnodes;
380
381 dout(10) << __func__ << " " << pi.snapnode.get() << dendl;
382 return *pi.snapnode.get();
383 }
384
385 CInode::projected_inode &CInode::project_inode(bool xattr, bool snap)
386 {
387 if (projected_nodes.empty()) {
388 projected_nodes.emplace_back(inode);
389 } else {
390 projected_nodes.emplace_back(projected_nodes.back().inode);
391 }
392 auto &pi = projected_nodes.back();
393
394 if (scrub_infop && scrub_infop->last_scrub_dirty) {
395 pi.inode.last_scrub_stamp = scrub_infop->last_scrub_stamp;
396 pi.inode.last_scrub_version = scrub_infop->last_scrub_version;
397 scrub_infop->last_scrub_dirty = false;
398 scrub_maybe_delete_info();
399 }
400
401 if (xattr) {
402 pi.xattrs.reset(new mempool_xattr_map(*get_projected_xattrs()));
403 ++num_projected_xattrs;
404 }
405
406 if (snap) {
407 project_snaprealm(pi);
408 }
409
410 dout(15) << __func__ << " " << pi.inode.ino << dendl;
411 return pi;
412 }
413
414 void CInode::pop_and_dirty_projected_inode(LogSegment *ls)
415 {
416 assert(!projected_nodes.empty());
417 auto &front = projected_nodes.front();
418 dout(15) << __func__ << " " << front.inode.ino
419 << " v" << front.inode.version << dendl;
420 int64_t old_pool = inode.layout.pool_id;
421
422 mark_dirty(front.inode.version, ls);
423 bool new_export_pin = inode.export_pin != front.inode.export_pin;
424 inode = front.inode;
425 if (new_export_pin)
426 maybe_export_pin(true);
427
428 if (inode.is_backtrace_updated())
429 mark_dirty_parent(ls, old_pool != inode.layout.pool_id);
430
431 if (front.xattrs) {
432 --num_projected_xattrs;
433 xattrs = *front.xattrs;
434 }
435
436 auto &snapnode = front.snapnode;
437 if (snapnode) {
438 pop_projected_snaprealm(snapnode.get());
439 --num_projected_srnodes;
440 }
441
442 projected_nodes.pop_front();
443 }
444
445 /* if newparent != parent, add parent to past_parents
446 if parent DNE, we need to find what the parent actually is and fill that in */
447 void CInode::project_past_snaprealm_parent(SnapRealm *newparent)
448 {
449 assert(!projected_nodes.empty());
450 sr_t &new_snap = project_snaprealm(projected_nodes.back());
451 SnapRealm *oldparent;
452 if (!snaprealm) {
453 oldparent = find_snaprealm();
454 new_snap.seq = oldparent->get_newest_seq();
455 }
456 else
457 oldparent = snaprealm->parent;
458
459 if (newparent != oldparent) {
460 snapid_t oldparentseq = oldparent->get_newest_seq();
461 if (oldparentseq + 1 > new_snap.current_parent_since) {
462 new_snap.past_parents[oldparentseq].ino = oldparent->inode->ino();
463 new_snap.past_parents[oldparentseq].first = new_snap.current_parent_since;
464 }
465 new_snap.current_parent_since = std::max(oldparentseq, newparent->get_last_created()) + 1;
466 }
467 }
468
469 void CInode::pop_projected_snaprealm(sr_t *next_snaprealm)
470 {
471 assert(next_snaprealm);
472 dout(10) << "pop_projected_snaprealm " << next_snaprealm
473 << " seq" << next_snaprealm->seq << dendl;
474 bool invalidate_cached_snaps = false;
475 if (!snaprealm) {
476 open_snaprealm();
477 } else if (next_snaprealm->past_parents.size() !=
478 snaprealm->srnode.past_parents.size()) {
479 invalidate_cached_snaps = true;
480 // re-open past parents
481 snaprealm->_close_parents();
482
483 dout(10) << " realm " << *snaprealm << " past_parents " << snaprealm->srnode.past_parents
484 << " -> " << next_snaprealm->past_parents << dendl;
485 }
486 snaprealm->srnode = *next_snaprealm;
487
488 // we should be able to open these up (or have them already be open).
489 bool ok = snaprealm->_open_parents(NULL);
490 assert(ok);
491
492 if (invalidate_cached_snaps)
493 snaprealm->invalidate_cached_snaps();
494
495 if (snaprealm->parent)
496 dout(10) << " realm " << *snaprealm << " parent " << *snaprealm->parent << dendl;
497 }
498
499
500 // ====== CInode =======
501
502 // dirfrags
503
504 __u32 InodeStoreBase::hash_dentry_name(boost::string_view dn)
505 {
506 int which = inode.dir_layout.dl_dir_hash;
507 if (!which)
508 which = CEPH_STR_HASH_LINUX;
509 assert(ceph_str_hash_valid(which));
510 return ceph_str_hash(which, dn.data(), dn.length());
511 }
512
513 frag_t InodeStoreBase::pick_dirfrag(boost::string_view dn)
514 {
515 if (dirfragtree.empty())
516 return frag_t(); // avoid the string hash if we can.
517
518 __u32 h = hash_dentry_name(dn);
519 return dirfragtree[h];
520 }
521
522 bool CInode::get_dirfrags_under(frag_t fg, list<CDir*>& ls)
523 {
524 bool all = true;
525 std::list<frag_t> fglist;
526 dirfragtree.get_leaves_under(fg, fglist);
527 for (list<frag_t>::iterator p = fglist.begin(); p != fglist.end(); ++p)
528 if (dirfrags.count(*p))
529 ls.push_back(dirfrags[*p]);
530 else
531 all = false;
532
533 if (all)
534 return all;
535
536 fragtree_t tmpdft;
537 tmpdft.force_to_leaf(g_ceph_context, fg);
538 for (auto &p : dirfrags) {
539 tmpdft.force_to_leaf(g_ceph_context, p.first);
540 if (fg.contains(p.first) && !dirfragtree.is_leaf(p.first))
541 ls.push_back(p.second);
542 }
543
544 all = true;
545 tmpdft.get_leaves_under(fg, fglist);
546 for (const auto &p : fglist) {
547 if (!dirfrags.count(p)) {
548 all = false;
549 break;
550 }
551 }
552
553 return all;
554 }
555
556 void CInode::verify_dirfrags()
557 {
558 bool bad = false;
559 for (const auto &p : dirfrags) {
560 if (!dirfragtree.is_leaf(p.first)) {
561 dout(0) << "have open dirfrag " << p.first << " but not leaf in " << dirfragtree
562 << ": " << *p.second << dendl;
563 bad = true;
564 }
565 }
566 assert(!bad);
567 }
568
569 void CInode::force_dirfrags()
570 {
571 bool bad = false;
572 for (auto &p : dirfrags) {
573 if (!dirfragtree.is_leaf(p.first)) {
574 dout(0) << "have open dirfrag " << p.first << " but not leaf in " << dirfragtree
575 << ": " << *p.second << dendl;
576 bad = true;
577 }
578 }
579
580 if (bad) {
581 list<frag_t> leaves;
582 dirfragtree.get_leaves(leaves);
583 for (list<frag_t>::iterator p = leaves.begin(); p != leaves.end(); ++p)
584 mdcache->get_force_dirfrag(dirfrag_t(ino(),*p), true);
585 }
586
587 verify_dirfrags();
588 }
589
590 CDir *CInode::get_approx_dirfrag(frag_t fg)
591 {
592 CDir *dir = get_dirfrag(fg);
593 if (dir) return dir;
594
595 // find a child?
596 list<CDir*> ls;
597 get_dirfrags_under(fg, ls);
598 if (!ls.empty())
599 return ls.front();
600
601 // try parents?
602 while (fg.bits() > 0) {
603 fg = fg.parent();
604 dir = get_dirfrag(fg);
605 if (dir) return dir;
606 }
607 return NULL;
608 }
609
610 CDir *CInode::get_or_open_dirfrag(MDCache *mdcache, frag_t fg)
611 {
612 assert(is_dir());
613
614 // have it?
615 CDir *dir = get_dirfrag(fg);
616 if (!dir) {
617 // create it.
618 assert(is_auth() || mdcache->mds->is_any_replay());
619 dir = new CDir(this, fg, mdcache, is_auth());
620 add_dirfrag(dir);
621 }
622 return dir;
623 }
624
625 CDir *CInode::add_dirfrag(CDir *dir)
626 {
627 assert(dirfrags.count(dir->dirfrag().frag) == 0);
628 dirfrags[dir->dirfrag().frag] = dir;
629
630 if (stickydir_ref > 0) {
631 dir->state_set(CDir::STATE_STICKY);
632 dir->get(CDir::PIN_STICKY);
633 }
634
635 maybe_export_pin();
636
637 return dir;
638 }
639
640 void CInode::close_dirfrag(frag_t fg)
641 {
642 dout(14) << "close_dirfrag " << fg << dendl;
643 assert(dirfrags.count(fg));
644
645 CDir *dir = dirfrags[fg];
646 dir->remove_null_dentries();
647
648 // clear dirty flag
649 if (dir->is_dirty())
650 dir->mark_clean();
651
652 if (stickydir_ref > 0) {
653 dir->state_clear(CDir::STATE_STICKY);
654 dir->put(CDir::PIN_STICKY);
655 }
656
657 if (dir->is_subtree_root())
658 num_subtree_roots--;
659
660 // dump any remaining dentries, for debugging purposes
661 for (const auto &p : dir->items)
662 dout(14) << __func__ << " LEFTOVER dn " << *p.second << dendl;
663
664 assert(dir->get_num_ref() == 0);
665 delete dir;
666 dirfrags.erase(fg);
667 }
668
669 void CInode::close_dirfrags()
670 {
671 while (!dirfrags.empty())
672 close_dirfrag(dirfrags.begin()->first);
673 }
674
675 bool CInode::has_subtree_root_dirfrag(int auth)
676 {
677 if (num_subtree_roots > 0) {
678 if (auth == -1)
679 return true;
680 for (const auto &p : dirfrags) {
681 if (p.second->is_subtree_root() &&
682 p.second->dir_auth.first == auth)
683 return true;
684 }
685 }
686 return false;
687 }
688
689 bool CInode::has_subtree_or_exporting_dirfrag()
690 {
691 if (num_subtree_roots > 0 || num_exporting_dirs > 0)
692 return true;
693 return false;
694 }
695
696 void CInode::get_stickydirs()
697 {
698 if (stickydir_ref == 0) {
699 get(PIN_STICKYDIRS);
700 for (const auto &p : dirfrags) {
701 p.second->state_set(CDir::STATE_STICKY);
702 p.second->get(CDir::PIN_STICKY);
703 }
704 }
705 stickydir_ref++;
706 }
707
708 void CInode::put_stickydirs()
709 {
710 assert(stickydir_ref > 0);
711 stickydir_ref--;
712 if (stickydir_ref == 0) {
713 put(PIN_STICKYDIRS);
714 for (const auto &p : dirfrags) {
715 p.second->state_clear(CDir::STATE_STICKY);
716 p.second->put(CDir::PIN_STICKY);
717 }
718 }
719 }
720
721
722
723
724
725 // pins
726
727 void CInode::first_get()
728 {
729 // pin my dentry?
730 if (parent)
731 parent->get(CDentry::PIN_INODEPIN);
732 }
733
734 void CInode::last_put()
735 {
736 // unpin my dentry?
737 if (parent)
738 parent->put(CDentry::PIN_INODEPIN);
739 }
740
741 void CInode::_put()
742 {
743 if (get_num_ref() == (int)is_dirty() + (int)is_dirty_parent())
744 mdcache->maybe_eval_stray(this, true);
745 }
746
747 void CInode::add_remote_parent(CDentry *p)
748 {
749 if (remote_parents.empty())
750 get(PIN_REMOTEPARENT);
751 remote_parents.insert(p);
752 }
753 void CInode::remove_remote_parent(CDentry *p)
754 {
755 remote_parents.erase(p);
756 if (remote_parents.empty())
757 put(PIN_REMOTEPARENT);
758 }
759
760
761
762
763 CDir *CInode::get_parent_dir()
764 {
765 if (parent)
766 return parent->dir;
767 return NULL;
768 }
769 CDir *CInode::get_projected_parent_dir()
770 {
771 CDentry *p = get_projected_parent_dn();
772 if (p)
773 return p->dir;
774 return NULL;
775 }
776 CInode *CInode::get_parent_inode()
777 {
778 if (parent)
779 return parent->dir->inode;
780 return NULL;
781 }
782
783 bool CInode::is_projected_ancestor_of(CInode *other)
784 {
785 while (other) {
786 if (other == this)
787 return true;
788 if (!other->get_projected_parent_dn())
789 break;
790 other = other->get_projected_parent_dn()->get_dir()->get_inode();
791 }
792 return false;
793 }
794
795 /*
796 * Because a non-directory inode may have multiple links, the use_parent
797 * argument allows selecting which parent to use for path construction. This
798 * argument is only meaningful for the final component (i.e. the first of the
799 * nested calls) because directories cannot have multiple hard links. If
800 * use_parent is NULL and projected is true, the primary parent's projected
801 * inode is used all the way up the path chain. Otherwise the primary parent
802 * stable inode is used.
803 */
804 void CInode::make_path_string(string& s, bool projected, const CDentry *use_parent) const
805 {
806 if (!use_parent) {
807 use_parent = projected ? get_projected_parent_dn() : parent;
808 }
809
810 if (use_parent) {
811 use_parent->make_path_string(s, projected);
812 } else if (is_root()) {
813 s = "";
814 } else if (is_mdsdir()) {
815 char t[40];
816 uint64_t eino(ino());
817 eino -= MDS_INO_MDSDIR_OFFSET;
818 snprintf(t, sizeof(t), "~mds%" PRId64, eino);
819 s = t;
820 } else {
821 char n[40];
822 uint64_t eino(ino());
823 snprintf(n, sizeof(n), "#%" PRIx64, eino);
824 s += n;
825 }
826 }
827
828 void CInode::make_path(filepath& fp, bool projected) const
829 {
830 const CDentry *use_parent = projected ? get_projected_parent_dn() : parent;
831 if (use_parent) {
832 assert(!is_base());
833 use_parent->make_path(fp, projected);
834 } else {
835 fp = filepath(ino());
836 }
837 }
838
839 void CInode::name_stray_dentry(string& dname)
840 {
841 char s[20];
842 snprintf(s, sizeof(s), "%llx", (unsigned long long)inode.ino.val);
843 dname = s;
844 }
845
846 version_t CInode::pre_dirty()
847 {
848 version_t pv;
849 CDentry* _cdentry = get_projected_parent_dn();
850 if (_cdentry) {
851 pv = _cdentry->pre_dirty(get_projected_version());
852 dout(10) << "pre_dirty " << pv << " (current v " << inode.version << ")" << dendl;
853 } else {
854 assert(is_base());
855 pv = get_projected_version() + 1;
856 }
857 // force update backtrace for old format inode (see mempool_inode::decode)
858 if (inode.backtrace_version == 0 && !projected_nodes.empty()) {
859 mempool_inode &pi = projected_nodes.back().inode;
860 if (pi.backtrace_version == 0)
861 pi.update_backtrace(pv);
862 }
863 return pv;
864 }
865
866 void CInode::_mark_dirty(LogSegment *ls)
867 {
868 if (!state_test(STATE_DIRTY)) {
869 state_set(STATE_DIRTY);
870 get(PIN_DIRTY);
871 assert(ls);
872 }
873
874 // move myself to this segment's dirty list
875 if (ls)
876 ls->dirty_inodes.push_back(&item_dirty);
877 }
878
879 void CInode::mark_dirty(version_t pv, LogSegment *ls) {
880
881 dout(10) << "mark_dirty " << *this << dendl;
882
883 /*
884 NOTE: I may already be dirty, but this fn _still_ needs to be called so that
885 the directory is (perhaps newly) dirtied, and so that parent_dir_version is
886 updated below.
887 */
888
889 // only auth can get dirty. "dirty" async data in replicas is relative to
890 // filelock state, not the dirty flag.
891 assert(is_auth());
892
893 // touch my private version
894 assert(inode.version < pv);
895 inode.version = pv;
896 _mark_dirty(ls);
897
898 // mark dentry too
899 if (parent)
900 parent->mark_dirty(pv, ls);
901 }
902
903
904 void CInode::mark_clean()
905 {
906 dout(10) << " mark_clean " << *this << dendl;
907 if (state_test(STATE_DIRTY)) {
908 state_clear(STATE_DIRTY);
909 put(PIN_DIRTY);
910
911 // remove myself from ls dirty list
912 item_dirty.remove_myself();
913 }
914 }
915
916
917 // --------------
918 // per-inode storage
919 // (currently for root inode only)
920
921 struct C_IO_Inode_Stored : public CInodeIOContext {
922 version_t version;
923 Context *fin;
924 C_IO_Inode_Stored(CInode *i, version_t v, Context *f) : CInodeIOContext(i), version(v), fin(f) {}
925 void finish(int r) override {
926 in->_stored(r, version, fin);
927 }
928 void print(ostream& out) const override {
929 out << "inode_store(" << in->ino() << ")";
930 }
931 };
932
933 object_t InodeStoreBase::get_object_name(inodeno_t ino, frag_t fg, const char *suffix)
934 {
935 char n[60];
936 snprintf(n, sizeof(n), "%llx.%08llx%s", (long long unsigned)ino, (long long unsigned)fg, suffix ? suffix : "");
937 return object_t(n);
938 }
939
940 void CInode::store(MDSInternalContextBase *fin)
941 {
942 dout(10) << "store " << get_version() << dendl;
943 assert(is_base());
944
945 if (snaprealm)
946 purge_stale_snap_data(snaprealm->get_snaps());
947
948 // encode
949 bufferlist bl;
950 string magic = CEPH_FS_ONDISK_MAGIC;
951 ::encode(magic, bl);
952 encode_store(bl, mdcache->mds->mdsmap->get_up_features());
953
954 // write it.
955 SnapContext snapc;
956 ObjectOperation m;
957 m.write_full(bl);
958
959 object_t oid = CInode::get_object_name(ino(), frag_t(), ".inode");
960 object_locator_t oloc(mdcache->mds->mdsmap->get_metadata_pool());
961
962 Context *newfin =
963 new C_OnFinisher(new C_IO_Inode_Stored(this, get_version(), fin),
964 mdcache->mds->finisher);
965 mdcache->mds->objecter->mutate(oid, oloc, m, snapc,
966 ceph::real_clock::now(), 0,
967 newfin);
968 }
969
970 void CInode::_stored(int r, version_t v, Context *fin)
971 {
972 if (r < 0) {
973 dout(1) << "store error " << r << " v " << v << " on " << *this << dendl;
974 mdcache->mds->clog->error() << "failed to store inode " << ino()
975 << " object: " << cpp_strerror(r);
976 mdcache->mds->handle_write_error(r);
977 fin->complete(r);
978 return;
979 }
980
981 dout(10) << "_stored " << v << " on " << *this << dendl;
982 if (v == get_projected_version())
983 mark_clean();
984
985 fin->complete(0);
986 }
987
988 void CInode::flush(MDSInternalContextBase *fin)
989 {
990 dout(10) << "flush " << *this << dendl;
991 assert(is_auth() && can_auth_pin());
992
993 MDSGatherBuilder gather(g_ceph_context);
994
995 if (is_dirty_parent()) {
996 store_backtrace(gather.new_sub());
997 }
998 if (is_dirty()) {
999 if (is_base()) {
1000 store(gather.new_sub());
1001 } else {
1002 parent->dir->commit(0, gather.new_sub());
1003 }
1004 }
1005
1006 if (gather.has_subs()) {
1007 gather.set_finisher(fin);
1008 gather.activate();
1009 } else {
1010 fin->complete(0);
1011 }
1012 }
1013
1014 struct C_IO_Inode_Fetched : public CInodeIOContext {
1015 bufferlist bl, bl2;
1016 Context *fin;
1017 C_IO_Inode_Fetched(CInode *i, Context *f) : CInodeIOContext(i), fin(f) {}
1018 void finish(int r) override {
1019 // Ignore 'r', because we fetch from two places, so r is usually ENOENT
1020 in->_fetched(bl, bl2, fin);
1021 }
1022 void print(ostream& out) const override {
1023 out << "inode_fetch(" << in->ino() << ")";
1024 }
1025 };
1026
1027 void CInode::fetch(MDSInternalContextBase *fin)
1028 {
1029 dout(10) << "fetch" << dendl;
1030
1031 C_IO_Inode_Fetched *c = new C_IO_Inode_Fetched(this, fin);
1032 C_GatherBuilder gather(g_ceph_context, new C_OnFinisher(c, mdcache->mds->finisher));
1033
1034 object_t oid = CInode::get_object_name(ino(), frag_t(), "");
1035 object_locator_t oloc(mdcache->mds->mdsmap->get_metadata_pool());
1036
1037 // Old on-disk format: inode stored in xattr of a dirfrag
1038 ObjectOperation rd;
1039 rd.getxattr("inode", &c->bl, NULL);
1040 mdcache->mds->objecter->read(oid, oloc, rd, CEPH_NOSNAP, (bufferlist*)NULL, 0, gather.new_sub());
1041
1042 // Current on-disk format: inode stored in a .inode object
1043 object_t oid2 = CInode::get_object_name(ino(), frag_t(), ".inode");
1044 mdcache->mds->objecter->read(oid2, oloc, 0, 0, CEPH_NOSNAP, &c->bl2, 0, gather.new_sub());
1045
1046 gather.activate();
1047 }
1048
1049 void CInode::_fetched(bufferlist& bl, bufferlist& bl2, Context *fin)
1050 {
1051 dout(10) << "_fetched got " << bl.length() << " and " << bl2.length() << dendl;
1052 bufferlist::iterator p;
1053 if (bl2.length()) {
1054 p = bl2.begin();
1055 } else if (bl.length()) {
1056 p = bl.begin();
1057 } else {
1058 derr << "No data while reading inode " << ino() << dendl;
1059 fin->complete(-ENOENT);
1060 return;
1061 }
1062
1063 // Attempt decode
1064 try {
1065 string magic;
1066 ::decode(magic, p);
1067 dout(10) << " magic is '" << magic << "' (expecting '"
1068 << CEPH_FS_ONDISK_MAGIC << "')" << dendl;
1069 if (magic != CEPH_FS_ONDISK_MAGIC) {
1070 dout(0) << "on disk magic '" << magic << "' != my magic '" << CEPH_FS_ONDISK_MAGIC
1071 << "'" << dendl;
1072 fin->complete(-EINVAL);
1073 } else {
1074 decode_store(p);
1075 dout(10) << "_fetched " << *this << dendl;
1076 fin->complete(0);
1077 }
1078 } catch (buffer::error &err) {
1079 derr << "Corrupt inode " << ino() << ": " << err << dendl;
1080 fin->complete(-EINVAL);
1081 return;
1082 }
1083 }
1084
1085 void CInode::build_backtrace(int64_t pool, inode_backtrace_t& bt)
1086 {
1087 bt.ino = inode.ino;
1088 bt.ancestors.clear();
1089 bt.pool = pool;
1090
1091 CInode *in = this;
1092 CDentry *pdn = get_parent_dn();
1093 while (pdn) {
1094 CInode *diri = pdn->get_dir()->get_inode();
1095 bt.ancestors.push_back(inode_backpointer_t(diri->ino(), pdn->get_name(), in->inode.version));
1096 in = diri;
1097 pdn = in->get_parent_dn();
1098 }
1099 for (auto &p : inode.old_pools) {
1100 // don't add our own pool id to old_pools to avoid looping (e.g. setlayout 0, 1, 0)
1101 if (p != pool)
1102 bt.old_pools.insert(p);
1103 }
1104 }
1105
1106 struct C_IO_Inode_StoredBacktrace : public CInodeIOContext {
1107 version_t version;
1108 Context *fin;
1109 C_IO_Inode_StoredBacktrace(CInode *i, version_t v, Context *f) : CInodeIOContext(i), version(v), fin(f) {}
1110 void finish(int r) override {
1111 in->_stored_backtrace(r, version, fin);
1112 }
1113 void print(ostream& out) const override {
1114 out << "backtrace_store(" << in->ino() << ")";
1115 }
1116 };
1117
1118 void CInode::store_backtrace(MDSInternalContextBase *fin, int op_prio)
1119 {
1120 dout(10) << "store_backtrace on " << *this << dendl;
1121 assert(is_dirty_parent());
1122
1123 if (op_prio < 0)
1124 op_prio = CEPH_MSG_PRIO_DEFAULT;
1125
1126 auth_pin(this);
1127
1128 const int64_t pool = get_backtrace_pool();
1129 inode_backtrace_t bt;
1130 build_backtrace(pool, bt);
1131 bufferlist parent_bl;
1132 ::encode(bt, parent_bl);
1133
1134 ObjectOperation op;
1135 op.priority = op_prio;
1136 op.create(false);
1137 op.setxattr("parent", parent_bl);
1138
1139 bufferlist layout_bl;
1140 ::encode(inode.layout, layout_bl, mdcache->mds->mdsmap->get_up_features());
1141 op.setxattr("layout", layout_bl);
1142
1143 SnapContext snapc;
1144 object_t oid = get_object_name(ino(), frag_t(), "");
1145 object_locator_t oloc(pool);
1146 Context *fin2 = new C_OnFinisher(
1147 new C_IO_Inode_StoredBacktrace(this, inode.backtrace_version, fin),
1148 mdcache->mds->finisher);
1149
1150 if (!state_test(STATE_DIRTYPOOL) || inode.old_pools.empty()) {
1151 dout(20) << __func__ << ": no dirtypool or no old pools" << dendl;
1152 mdcache->mds->objecter->mutate(oid, oloc, op, snapc,
1153 ceph::real_clock::now(),
1154 0, fin2);
1155 return;
1156 }
1157
1158 C_GatherBuilder gather(g_ceph_context, fin2);
1159 mdcache->mds->objecter->mutate(oid, oloc, op, snapc,
1160 ceph::real_clock::now(),
1161 0, gather.new_sub());
1162
1163 // In the case where DIRTYPOOL is set, we update all old pools backtraces
1164 // such that anyone reading them will see the new pool ID in
1165 // inode_backtrace_t::pool and go read everything else from there.
1166 for (const auto &p : inode.old_pools) {
1167 if (p == pool)
1168 continue;
1169
1170 dout(20) << __func__ << ": updating old pool " << p << dendl;
1171
1172 ObjectOperation op;
1173 op.priority = op_prio;
1174 op.create(false);
1175 op.setxattr("parent", parent_bl);
1176
1177 object_locator_t oloc(p);
1178 mdcache->mds->objecter->mutate(oid, oloc, op, snapc,
1179 ceph::real_clock::now(),
1180 0, gather.new_sub());
1181 }
1182 gather.activate();
1183 }
1184
1185 void CInode::_stored_backtrace(int r, version_t v, Context *fin)
1186 {
1187 if (r == -ENOENT) {
1188 const int64_t pool = get_backtrace_pool();
1189 bool exists = mdcache->mds->objecter->with_osdmap(
1190 [pool](const OSDMap &osd_map) {
1191 return osd_map.have_pg_pool(pool);
1192 });
1193
1194 // This ENOENT is because the pool doesn't exist (the user deleted it
1195 // out from under us), so the backtrace can never be written, so pretend
1196 // to succeed so that the user can proceed to e.g. delete the file.
1197 if (!exists) {
1198 dout(4) << "store_backtrace got ENOENT: a data pool was deleted "
1199 "beneath us!" << dendl;
1200 r = 0;
1201 }
1202 }
1203
1204 if (r < 0) {
1205 dout(1) << "store backtrace error " << r << " v " << v << dendl;
1206 mdcache->mds->clog->error() << "failed to store backtrace on ino "
1207 << ino() << " object"
1208 << ", pool " << get_backtrace_pool()
1209 << ", errno " << r;
1210 mdcache->mds->handle_write_error(r);
1211 if (fin)
1212 fin->complete(r);
1213 return;
1214 }
1215
1216 dout(10) << "_stored_backtrace v " << v << dendl;
1217
1218 auth_unpin(this);
1219 if (v == inode.backtrace_version)
1220 clear_dirty_parent();
1221 if (fin)
1222 fin->complete(0);
1223 }
1224
1225 void CInode::fetch_backtrace(Context *fin, bufferlist *backtrace)
1226 {
1227 mdcache->fetch_backtrace(inode.ino, get_backtrace_pool(), *backtrace, fin);
1228 }
1229
1230 void CInode::mark_dirty_parent(LogSegment *ls, bool dirty_pool)
1231 {
1232 if (!state_test(STATE_DIRTYPARENT)) {
1233 dout(10) << "mark_dirty_parent" << dendl;
1234 state_set(STATE_DIRTYPARENT);
1235 get(PIN_DIRTYPARENT);
1236 assert(ls);
1237 }
1238 if (dirty_pool)
1239 state_set(STATE_DIRTYPOOL);
1240 if (ls)
1241 ls->dirty_parent_inodes.push_back(&item_dirty_parent);
1242 }
1243
1244 void CInode::clear_dirty_parent()
1245 {
1246 if (state_test(STATE_DIRTYPARENT)) {
1247 dout(10) << "clear_dirty_parent" << dendl;
1248 state_clear(STATE_DIRTYPARENT);
1249 state_clear(STATE_DIRTYPOOL);
1250 put(PIN_DIRTYPARENT);
1251 item_dirty_parent.remove_myself();
1252 }
1253 }
1254
1255 void CInode::verify_diri_backtrace(bufferlist &bl, int err)
1256 {
1257 if (is_base() || is_dirty_parent() || !is_auth())
1258 return;
1259
1260 dout(10) << "verify_diri_backtrace" << dendl;
1261
1262 if (err == 0) {
1263 inode_backtrace_t backtrace;
1264 ::decode(backtrace, bl);
1265 CDentry *pdn = get_parent_dn();
1266 if (backtrace.ancestors.empty() ||
1267 backtrace.ancestors[0].dname != pdn->get_name() ||
1268 backtrace.ancestors[0].dirino != pdn->get_dir()->ino())
1269 err = -EINVAL;
1270 }
1271
1272 if (err) {
1273 MDSRank *mds = mdcache->mds;
1274 mds->clog->error() << "bad backtrace on directory inode " << ino();
1275 assert(!"bad backtrace" == (g_conf->mds_verify_backtrace > 1));
1276
1277 mark_dirty_parent(mds->mdlog->get_current_segment(), false);
1278 mds->mdlog->flush();
1279 }
1280 }
1281
1282 // ------------------
1283 // parent dir
1284
1285
1286 void InodeStoreBase::encode_bare(bufferlist &bl, uint64_t features,
1287 const bufferlist *snap_blob) const
1288 {
1289 ::encode(inode, bl, features);
1290 if (is_symlink())
1291 ::encode(symlink, bl);
1292 ::encode(dirfragtree, bl);
1293 ::encode(xattrs, bl);
1294 if (snap_blob)
1295 ::encode(*snap_blob, bl);
1296 else
1297 ::encode(bufferlist(), bl);
1298 ::encode(old_inodes, bl, features);
1299 ::encode(oldest_snap, bl);
1300 ::encode(damage_flags, bl);
1301 }
1302
1303 void InodeStoreBase::encode(bufferlist &bl, uint64_t features,
1304 const bufferlist *snap_blob) const
1305 {
1306 ENCODE_START(6, 4, bl);
1307 encode_bare(bl, features, snap_blob);
1308 ENCODE_FINISH(bl);
1309 }
1310
1311 void CInode::encode_store(bufferlist& bl, uint64_t features)
1312 {
1313 bufferlist snap_blob;
1314 encode_snap_blob(snap_blob);
1315 InodeStoreBase::encode(bl, mdcache->mds->mdsmap->get_up_features(),
1316 &snap_blob);
1317 }
1318
1319 void InodeStoreBase::decode_bare(bufferlist::iterator &bl,
1320 bufferlist& snap_blob, __u8 struct_v)
1321 {
1322 ::decode(inode, bl);
1323 if (is_symlink()) {
1324 std::string tmp;
1325 ::decode(tmp, bl);
1326 symlink = mempool::mds_co::string(boost::string_view(tmp));
1327 }
1328 ::decode(dirfragtree, bl);
1329 ::decode(xattrs, bl);
1330 ::decode(snap_blob, bl);
1331
1332 ::decode(old_inodes, bl);
1333 if (struct_v == 2 && inode.is_dir()) {
1334 bool default_layout_exists;
1335 ::decode(default_layout_exists, bl);
1336 if (default_layout_exists) {
1337 ::decode(struct_v, bl); // this was a default_file_layout
1338 ::decode(inode.layout, bl); // but we only care about the layout portion
1339 }
1340 }
1341
1342 if (struct_v >= 5) {
1343 // InodeStore is embedded in dentries without proper versioning, so
1344 // we consume up to the end of the buffer
1345 if (!bl.end()) {
1346 ::decode(oldest_snap, bl);
1347 }
1348
1349 if (!bl.end()) {
1350 ::decode(damage_flags, bl);
1351 }
1352 }
1353 }
1354
1355
1356 void InodeStoreBase::decode(bufferlist::iterator &bl, bufferlist& snap_blob)
1357 {
1358 DECODE_START_LEGACY_COMPAT_LEN(5, 4, 4, bl);
1359 decode_bare(bl, snap_blob, struct_v);
1360 DECODE_FINISH(bl);
1361 }
1362
1363 void CInode::decode_store(bufferlist::iterator& bl)
1364 {
1365 bufferlist snap_blob;
1366 InodeStoreBase::decode(bl, snap_blob);
1367 decode_snap_blob(snap_blob);
1368 }
1369
1370 // ------------------
1371 // locking
1372
1373 void CInode::set_object_info(MDSCacheObjectInfo &info)
1374 {
1375 info.ino = ino();
1376 info.snapid = last;
1377 }
1378
1379 void CInode::encode_lock_state(int type, bufferlist& bl)
1380 {
1381 ::encode(first, bl);
1382
1383 switch (type) {
1384 case CEPH_LOCK_IAUTH:
1385 ::encode(inode.version, bl);
1386 ::encode(inode.ctime, bl);
1387 ::encode(inode.mode, bl);
1388 ::encode(inode.uid, bl);
1389 ::encode(inode.gid, bl);
1390 break;
1391
1392 case CEPH_LOCK_ILINK:
1393 ::encode(inode.version, bl);
1394 ::encode(inode.ctime, bl);
1395 ::encode(inode.nlink, bl);
1396 break;
1397
1398 case CEPH_LOCK_IDFT:
1399 if (is_auth()) {
1400 ::encode(inode.version, bl);
1401 } else {
1402 // treat flushing as dirty when rejoining cache
1403 bool dirty = dirfragtreelock.is_dirty_or_flushing();
1404 ::encode(dirty, bl);
1405 }
1406 {
1407 // encode the raw tree
1408 ::encode(dirfragtree, bl);
1409
1410 // also specify which frags are mine
1411 set<frag_t> myfrags;
1412 list<CDir*> dfls;
1413 get_dirfrags(dfls);
1414 for (list<CDir*>::iterator p = dfls.begin(); p != dfls.end(); ++p)
1415 if ((*p)->is_auth()) {
1416 frag_t fg = (*p)->get_frag();
1417 myfrags.insert(fg);
1418 }
1419 ::encode(myfrags, bl);
1420 }
1421 break;
1422
1423 case CEPH_LOCK_IFILE:
1424 if (is_auth()) {
1425 ::encode(inode.version, bl);
1426 ::encode(inode.ctime, bl);
1427 ::encode(inode.mtime, bl);
1428 ::encode(inode.atime, bl);
1429 ::encode(inode.time_warp_seq, bl);
1430 if (!is_dir()) {
1431 ::encode(inode.layout, bl, mdcache->mds->mdsmap->get_up_features());
1432 ::encode(inode.size, bl);
1433 ::encode(inode.truncate_seq, bl);
1434 ::encode(inode.truncate_size, bl);
1435 ::encode(inode.client_ranges, bl);
1436 ::encode(inode.inline_data, bl);
1437 }
1438 } else {
1439 // treat flushing as dirty when rejoining cache
1440 bool dirty = filelock.is_dirty_or_flushing();
1441 ::encode(dirty, bl);
1442 }
1443
1444 {
1445 dout(15) << "encode_lock_state inode.dirstat is " << inode.dirstat << dendl;
1446 ::encode(inode.dirstat, bl); // only meaningful if i am auth.
1447 bufferlist tmp;
1448 __u32 n = 0;
1449 for (const auto &p : dirfrags) {
1450 frag_t fg = p.first;
1451 CDir *dir = p.second;
1452 if (is_auth() || dir->is_auth()) {
1453 fnode_t *pf = dir->get_projected_fnode();
1454 dout(15) << fg << " " << *dir << dendl;
1455 dout(20) << fg << " fragstat " << pf->fragstat << dendl;
1456 dout(20) << fg << " accounted_fragstat " << pf->accounted_fragstat << dendl;
1457 ::encode(fg, tmp);
1458 ::encode(dir->first, tmp);
1459 ::encode(pf->fragstat, tmp);
1460 ::encode(pf->accounted_fragstat, tmp);
1461 n++;
1462 }
1463 }
1464 ::encode(n, bl);
1465 bl.claim_append(tmp);
1466 }
1467 break;
1468
1469 case CEPH_LOCK_INEST:
1470 if (is_auth()) {
1471 ::encode(inode.version, bl);
1472 } else {
1473 // treat flushing as dirty when rejoining cache
1474 bool dirty = nestlock.is_dirty_or_flushing();
1475 ::encode(dirty, bl);
1476 }
1477 {
1478 dout(15) << "encode_lock_state inode.rstat is " << inode.rstat << dendl;
1479 ::encode(inode.rstat, bl); // only meaningful if i am auth.
1480 bufferlist tmp;
1481 __u32 n = 0;
1482 for (const auto &p : dirfrags) {
1483 frag_t fg = p.first;
1484 CDir *dir = p.second;
1485 if (is_auth() || dir->is_auth()) {
1486 fnode_t *pf = dir->get_projected_fnode();
1487 dout(10) << fg << " " << *dir << dendl;
1488 dout(10) << fg << " " << pf->rstat << dendl;
1489 dout(10) << fg << " " << pf->rstat << dendl;
1490 dout(10) << fg << " " << dir->dirty_old_rstat << dendl;
1491 ::encode(fg, tmp);
1492 ::encode(dir->first, tmp);
1493 ::encode(pf->rstat, tmp);
1494 ::encode(pf->accounted_rstat, tmp);
1495 ::encode(dir->dirty_old_rstat, tmp);
1496 n++;
1497 }
1498 }
1499 ::encode(n, bl);
1500 bl.claim_append(tmp);
1501 }
1502 break;
1503
1504 case CEPH_LOCK_IXATTR:
1505 ::encode(inode.version, bl);
1506 ::encode(inode.ctime, bl);
1507 ::encode(xattrs, bl);
1508 break;
1509
1510 case CEPH_LOCK_ISNAP:
1511 ::encode(inode.version, bl);
1512 ::encode(inode.ctime, bl);
1513 encode_snap(bl);
1514 break;
1515
1516 case CEPH_LOCK_IFLOCK:
1517 ::encode(inode.version, bl);
1518 _encode_file_locks(bl);
1519 break;
1520
1521 case CEPH_LOCK_IPOLICY:
1522 if (inode.is_dir()) {
1523 ::encode(inode.version, bl);
1524 ::encode(inode.ctime, bl);
1525 ::encode(inode.layout, bl, mdcache->mds->mdsmap->get_up_features());
1526 ::encode(inode.quota, bl);
1527 ::encode(inode.export_pin, bl);
1528 }
1529 break;
1530
1531 default:
1532 ceph_abort();
1533 }
1534 }
1535
1536
1537 /* for more info on scatterlocks, see comments by Locker::scatter_writebehind */
1538
1539 void CInode::decode_lock_state(int type, bufferlist& bl)
1540 {
1541 bufferlist::iterator p = bl.begin();
1542 utime_t tm;
1543
1544 snapid_t newfirst;
1545 ::decode(newfirst, p);
1546
1547 if (!is_auth() && newfirst != first) {
1548 dout(10) << "decode_lock_state first " << first << " -> " << newfirst << dendl;
1549 assert(newfirst > first);
1550 if (!is_multiversion() && parent) {
1551 assert(parent->first == first);
1552 parent->first = newfirst;
1553 }
1554 first = newfirst;
1555 }
1556
1557 switch (type) {
1558 case CEPH_LOCK_IAUTH:
1559 ::decode(inode.version, p);
1560 ::decode(tm, p);
1561 if (inode.ctime < tm) inode.ctime = tm;
1562 ::decode(inode.mode, p);
1563 ::decode(inode.uid, p);
1564 ::decode(inode.gid, p);
1565 break;
1566
1567 case CEPH_LOCK_ILINK:
1568 ::decode(inode.version, p);
1569 ::decode(tm, p);
1570 if (inode.ctime < tm) inode.ctime = tm;
1571 ::decode(inode.nlink, p);
1572 break;
1573
1574 case CEPH_LOCK_IDFT:
1575 if (is_auth()) {
1576 bool replica_dirty;
1577 ::decode(replica_dirty, p);
1578 if (replica_dirty) {
1579 dout(10) << "decode_lock_state setting dftlock dirty flag" << dendl;
1580 dirfragtreelock.mark_dirty(); // ok bc we're auth and caller will handle
1581 }
1582 } else {
1583 ::decode(inode.version, p);
1584 }
1585 {
1586 fragtree_t temp;
1587 ::decode(temp, p);
1588 set<frag_t> authfrags;
1589 ::decode(authfrags, p);
1590 if (is_auth()) {
1591 // auth. believe replica's auth frags only.
1592 for (set<frag_t>::iterator p = authfrags.begin(); p != authfrags.end(); ++p)
1593 if (!dirfragtree.is_leaf(*p)) {
1594 dout(10) << " forcing frag " << *p << " to leaf (split|merge)" << dendl;
1595 dirfragtree.force_to_leaf(g_ceph_context, *p);
1596 dirfragtreelock.mark_dirty(); // ok bc we're auth and caller will handle
1597 }
1598 } else {
1599 // replica. take the new tree, BUT make sure any open
1600 // dirfrags remain leaves (they may have split _after_ this
1601 // dft was scattered, or we may still be be waiting on the
1602 // notify from the auth)
1603 dirfragtree.swap(temp);
1604 for (const auto &p : dirfrags) {
1605 if (!dirfragtree.is_leaf(p.first)) {
1606 dout(10) << " forcing open dirfrag " << p.first << " to leaf (racing with split|merge)" << dendl;
1607 dirfragtree.force_to_leaf(g_ceph_context, p.first);
1608 }
1609 if (p.second->is_auth())
1610 p.second->state_clear(CDir::STATE_DIRTYDFT);
1611 }
1612 }
1613 if (g_conf->mds_debug_frag)
1614 verify_dirfrags();
1615 }
1616 break;
1617
1618 case CEPH_LOCK_IFILE:
1619 if (!is_auth()) {
1620 ::decode(inode.version, p);
1621 ::decode(tm, p);
1622 if (inode.ctime < tm) inode.ctime = tm;
1623 ::decode(inode.mtime, p);
1624 ::decode(inode.atime, p);
1625 ::decode(inode.time_warp_seq, p);
1626 if (!is_dir()) {
1627 ::decode(inode.layout, p);
1628 ::decode(inode.size, p);
1629 ::decode(inode.truncate_seq, p);
1630 ::decode(inode.truncate_size, p);
1631 ::decode(inode.client_ranges, p);
1632 ::decode(inode.inline_data, p);
1633 }
1634 } else {
1635 bool replica_dirty;
1636 ::decode(replica_dirty, p);
1637 if (replica_dirty) {
1638 dout(10) << "decode_lock_state setting filelock dirty flag" << dendl;
1639 filelock.mark_dirty(); // ok bc we're auth and caller will handle
1640 }
1641 }
1642 {
1643 frag_info_t dirstat;
1644 ::decode(dirstat, p);
1645 if (!is_auth()) {
1646 dout(10) << " taking inode dirstat " << dirstat << " for " << *this << dendl;
1647 inode.dirstat = dirstat; // take inode summation if replica
1648 }
1649 __u32 n;
1650 ::decode(n, p);
1651 dout(10) << " ...got " << n << " fragstats on " << *this << dendl;
1652 while (n--) {
1653 frag_t fg;
1654 snapid_t fgfirst;
1655 frag_info_t fragstat;
1656 frag_info_t accounted_fragstat;
1657 ::decode(fg, p);
1658 ::decode(fgfirst, p);
1659 ::decode(fragstat, p);
1660 ::decode(accounted_fragstat, p);
1661 dout(10) << fg << " [" << fgfirst << ",head] " << dendl;
1662 dout(10) << fg << " fragstat " << fragstat << dendl;
1663 dout(20) << fg << " accounted_fragstat " << accounted_fragstat << dendl;
1664
1665 CDir *dir = get_dirfrag(fg);
1666 if (is_auth()) {
1667 assert(dir); // i am auth; i had better have this dir open
1668 dout(10) << fg << " first " << dir->first << " -> " << fgfirst
1669 << " on " << *dir << dendl;
1670 dir->first = fgfirst;
1671 dir->fnode.fragstat = fragstat;
1672 dir->fnode.accounted_fragstat = accounted_fragstat;
1673 dir->first = fgfirst;
1674 if (!(fragstat == accounted_fragstat)) {
1675 dout(10) << fg << " setting filelock updated flag" << dendl;
1676 filelock.mark_dirty(); // ok bc we're auth and caller will handle
1677 }
1678 } else {
1679 if (dir && dir->is_auth()) {
1680 dout(10) << fg << " first " << dir->first << " -> " << fgfirst
1681 << " on " << *dir << dendl;
1682 dir->first = fgfirst;
1683 fnode_t *pf = dir->get_projected_fnode();
1684 finish_scatter_update(&filelock, dir,
1685 inode.dirstat.version, pf->accounted_fragstat.version);
1686 }
1687 }
1688 }
1689 }
1690 break;
1691
1692 case CEPH_LOCK_INEST:
1693 if (is_auth()) {
1694 bool replica_dirty;
1695 ::decode(replica_dirty, p);
1696 if (replica_dirty) {
1697 dout(10) << "decode_lock_state setting nestlock dirty flag" << dendl;
1698 nestlock.mark_dirty(); // ok bc we're auth and caller will handle
1699 }
1700 } else {
1701 ::decode(inode.version, p);
1702 }
1703 {
1704 nest_info_t rstat;
1705 ::decode(rstat, p);
1706 if (!is_auth()) {
1707 dout(10) << " taking inode rstat " << rstat << " for " << *this << dendl;
1708 inode.rstat = rstat; // take inode summation if replica
1709 }
1710 __u32 n;
1711 ::decode(n, p);
1712 while (n--) {
1713 frag_t fg;
1714 snapid_t fgfirst;
1715 nest_info_t rstat;
1716 nest_info_t accounted_rstat;
1717 decltype(CDir::dirty_old_rstat) dirty_old_rstat;
1718 ::decode(fg, p);
1719 ::decode(fgfirst, p);
1720 ::decode(rstat, p);
1721 ::decode(accounted_rstat, p);
1722 ::decode(dirty_old_rstat, p);
1723 dout(10) << fg << " [" << fgfirst << ",head]" << dendl;
1724 dout(10) << fg << " rstat " << rstat << dendl;
1725 dout(10) << fg << " accounted_rstat " << accounted_rstat << dendl;
1726 dout(10) << fg << " dirty_old_rstat " << dirty_old_rstat << dendl;
1727
1728 CDir *dir = get_dirfrag(fg);
1729 if (is_auth()) {
1730 assert(dir); // i am auth; i had better have this dir open
1731 dout(10) << fg << " first " << dir->first << " -> " << fgfirst
1732 << " on " << *dir << dendl;
1733 dir->first = fgfirst;
1734 dir->fnode.rstat = rstat;
1735 dir->fnode.accounted_rstat = accounted_rstat;
1736 dir->dirty_old_rstat.swap(dirty_old_rstat);
1737 if (!(rstat == accounted_rstat) || !dir->dirty_old_rstat.empty()) {
1738 dout(10) << fg << " setting nestlock updated flag" << dendl;
1739 nestlock.mark_dirty(); // ok bc we're auth and caller will handle
1740 }
1741 } else {
1742 if (dir && dir->is_auth()) {
1743 dout(10) << fg << " first " << dir->first << " -> " << fgfirst
1744 << " on " << *dir << dendl;
1745 dir->first = fgfirst;
1746 fnode_t *pf = dir->get_projected_fnode();
1747 finish_scatter_update(&nestlock, dir,
1748 inode.rstat.version, pf->accounted_rstat.version);
1749 }
1750 }
1751 }
1752 }
1753 break;
1754
1755 case CEPH_LOCK_IXATTR:
1756 ::decode(inode.version, p);
1757 ::decode(tm, p);
1758 if (inode.ctime < tm) inode.ctime = tm;
1759 ::decode(xattrs, p);
1760 break;
1761
1762 case CEPH_LOCK_ISNAP:
1763 {
1764 ::decode(inode.version, p);
1765 ::decode(tm, p);
1766 if (inode.ctime < tm) inode.ctime = tm;
1767 snapid_t seq = 0;
1768 if (snaprealm)
1769 seq = snaprealm->srnode.seq;
1770 decode_snap(p);
1771 if (snaprealm && snaprealm->srnode.seq != seq)
1772 mdcache->do_realm_invalidate_and_update_notify(this, seq ? CEPH_SNAP_OP_UPDATE:CEPH_SNAP_OP_SPLIT);
1773 }
1774 break;
1775
1776 case CEPH_LOCK_IFLOCK:
1777 ::decode(inode.version, p);
1778 _decode_file_locks(p);
1779 break;
1780
1781 case CEPH_LOCK_IPOLICY:
1782 if (inode.is_dir()) {
1783 ::decode(inode.version, p);
1784 ::decode(tm, p);
1785 if (inode.ctime < tm) inode.ctime = tm;
1786 ::decode(inode.layout, p);
1787 ::decode(inode.quota, p);
1788 mds_rank_t old_pin = inode.export_pin;
1789 ::decode(inode.export_pin, p);
1790 maybe_export_pin(old_pin != inode.export_pin);
1791 }
1792 break;
1793
1794 default:
1795 ceph_abort();
1796 }
1797 }
1798
1799
1800 bool CInode::is_dirty_scattered()
1801 {
1802 return
1803 filelock.is_dirty_or_flushing() ||
1804 nestlock.is_dirty_or_flushing() ||
1805 dirfragtreelock.is_dirty_or_flushing();
1806 }
1807
1808 void CInode::clear_scatter_dirty()
1809 {
1810 filelock.remove_dirty();
1811 nestlock.remove_dirty();
1812 dirfragtreelock.remove_dirty();
1813 }
1814
1815 void CInode::clear_dirty_scattered(int type)
1816 {
1817 dout(10) << "clear_dirty_scattered " << type << " on " << *this << dendl;
1818 assert(is_dir());
1819 switch (type) {
1820 case CEPH_LOCK_IFILE:
1821 item_dirty_dirfrag_dir.remove_myself();
1822 break;
1823
1824 case CEPH_LOCK_INEST:
1825 item_dirty_dirfrag_nest.remove_myself();
1826 break;
1827
1828 case CEPH_LOCK_IDFT:
1829 item_dirty_dirfrag_dirfragtree.remove_myself();
1830 break;
1831
1832 default:
1833 ceph_abort();
1834 }
1835 }
1836
1837
1838 /*
1839 * when we initially scatter a lock, we need to check if any of the dirfrags
1840 * have out of date accounted_rstat/fragstat. if so, mark the lock stale.
1841 */
1842 /* for more info on scatterlocks, see comments by Locker::scatter_writebehind */
1843 void CInode::start_scatter(ScatterLock *lock)
1844 {
1845 dout(10) << "start_scatter " << *lock << " on " << *this << dendl;
1846 assert(is_auth());
1847 mempool_inode *pi = get_projected_inode();
1848
1849 for (const auto &p : dirfrags) {
1850 frag_t fg = p.first;
1851 CDir *dir = p.second;
1852 fnode_t *pf = dir->get_projected_fnode();
1853 dout(20) << fg << " " << *dir << dendl;
1854
1855 if (!dir->is_auth())
1856 continue;
1857
1858 switch (lock->get_type()) {
1859 case CEPH_LOCK_IFILE:
1860 finish_scatter_update(lock, dir, pi->dirstat.version, pf->accounted_fragstat.version);
1861 break;
1862
1863 case CEPH_LOCK_INEST:
1864 finish_scatter_update(lock, dir, pi->rstat.version, pf->accounted_rstat.version);
1865 break;
1866
1867 case CEPH_LOCK_IDFT:
1868 dir->state_clear(CDir::STATE_DIRTYDFT);
1869 break;
1870 }
1871 }
1872 }
1873
1874
1875 class C_Inode_FragUpdate : public MDSLogContextBase {
1876 protected:
1877 CInode *in;
1878 CDir *dir;
1879 MutationRef mut;
1880 MDSRank *get_mds() override {return in->mdcache->mds;}
1881 void finish(int r) override {
1882 in->_finish_frag_update(dir, mut);
1883 }
1884
1885 public:
1886 C_Inode_FragUpdate(CInode *i, CDir *d, MutationRef& m) : in(i), dir(d), mut(m) {}
1887 };
1888
1889 void CInode::finish_scatter_update(ScatterLock *lock, CDir *dir,
1890 version_t inode_version, version_t dir_accounted_version)
1891 {
1892 frag_t fg = dir->get_frag();
1893 assert(dir->is_auth());
1894
1895 if (dir->is_frozen()) {
1896 dout(10) << "finish_scatter_update " << fg << " frozen, marking " << *lock << " stale " << *dir << dendl;
1897 } else if (dir->get_version() == 0) {
1898 dout(10) << "finish_scatter_update " << fg << " not loaded, marking " << *lock << " stale " << *dir << dendl;
1899 } else {
1900 if (dir_accounted_version != inode_version) {
1901 dout(10) << "finish_scatter_update " << fg << " journaling accounted scatterstat update v" << inode_version << dendl;
1902
1903 MDLog *mdlog = mdcache->mds->mdlog;
1904 MutationRef mut(new MutationImpl());
1905 mut->ls = mdlog->get_current_segment();
1906
1907 mempool_inode *pi = get_projected_inode();
1908 fnode_t *pf = dir->project_fnode();
1909
1910 const char *ename = 0;
1911 switch (lock->get_type()) {
1912 case CEPH_LOCK_IFILE:
1913 pf->fragstat.version = pi->dirstat.version;
1914 pf->accounted_fragstat = pf->fragstat;
1915 ename = "lock ifile accounted scatter stat update";
1916 break;
1917 case CEPH_LOCK_INEST:
1918 pf->rstat.version = pi->rstat.version;
1919 pf->accounted_rstat = pf->rstat;
1920 ename = "lock inest accounted scatter stat update";
1921
1922 if (!is_auth() && lock->get_state() == LOCK_MIX) {
1923 dout(10) << "finish_scatter_update try to assimilate dirty rstat on "
1924 << *dir << dendl;
1925 dir->assimilate_dirty_rstat_inodes();
1926 }
1927
1928 break;
1929 default:
1930 ceph_abort();
1931 }
1932
1933 pf->version = dir->pre_dirty();
1934 mut->add_projected_fnode(dir);
1935
1936 EUpdate *le = new EUpdate(mdlog, ename);
1937 mdlog->start_entry(le);
1938 le->metablob.add_dir_context(dir);
1939 le->metablob.add_dir(dir, true);
1940
1941 assert(!dir->is_frozen());
1942 mut->auth_pin(dir);
1943
1944 if (lock->get_type() == CEPH_LOCK_INEST &&
1945 !is_auth() && lock->get_state() == LOCK_MIX) {
1946 dout(10) << "finish_scatter_update finish assimilating dirty rstat on "
1947 << *dir << dendl;
1948 dir->assimilate_dirty_rstat_inodes_finish(mut, &le->metablob);
1949
1950 if (!(pf->rstat == pf->accounted_rstat)) {
1951 if (mut->wrlocks.count(&nestlock) == 0) {
1952 mdcache->mds->locker->wrlock_force(&nestlock, mut);
1953 }
1954
1955 mdcache->mds->locker->mark_updated_scatterlock(&nestlock);
1956 mut->ls->dirty_dirfrag_nest.push_back(&item_dirty_dirfrag_nest);
1957 }
1958 }
1959
1960 mdlog->submit_entry(le, new C_Inode_FragUpdate(this, dir, mut));
1961 } else {
1962 dout(10) << "finish_scatter_update " << fg << " accounted " << *lock
1963 << " scatter stat unchanged at v" << dir_accounted_version << dendl;
1964 }
1965 }
1966 }
1967
1968 void CInode::_finish_frag_update(CDir *dir, MutationRef& mut)
1969 {
1970 dout(10) << "_finish_frag_update on " << *dir << dendl;
1971 mut->apply();
1972 mdcache->mds->locker->drop_locks(mut.get());
1973 mut->cleanup();
1974 }
1975
1976
1977 /*
1978 * when we gather a lock, we need to assimilate dirfrag changes into the inode
1979 * state. it's possible we can't update the dirfrag accounted_rstat/fragstat
1980 * because the frag is auth and frozen, or that the replica couldn't for the same
1981 * reason. hopefully it will get updated the next time the lock cycles.
1982 *
1983 * we have two dimensions of behavior:
1984 * - we may be (auth and !frozen), and able to update, or not.
1985 * - the frag may be stale, or not.
1986 *
1987 * if the frag is non-stale, we want to assimilate the diff into the
1988 * inode, regardless of whether it's auth or updateable.
1989 *
1990 * if we update the frag, we want to set accounted_fragstat = frag,
1991 * both if we took the diff or it was stale and we are making it
1992 * un-stale.
1993 */
1994 /* for more info on scatterlocks, see comments by Locker::scatter_writebehind */
1995 void CInode::finish_scatter_gather_update(int type)
1996 {
1997 LogChannelRef clog = mdcache->mds->clog;
1998
1999 dout(10) << "finish_scatter_gather_update " << type << " on " << *this << dendl;
2000 assert(is_auth());
2001
2002 switch (type) {
2003 case CEPH_LOCK_IFILE:
2004 {
2005 fragtree_t tmpdft = dirfragtree;
2006 struct frag_info_t dirstat;
2007 bool dirstat_valid = true;
2008
2009 // adjust summation
2010 assert(is_auth());
2011 mempool_inode *pi = get_projected_inode();
2012
2013 bool touched_mtime = false, touched_chattr = false;
2014 dout(20) << " orig dirstat " << pi->dirstat << dendl;
2015 pi->dirstat.version++;
2016 for (const auto &p : dirfrags) {
2017 frag_t fg = p.first;
2018 CDir *dir = p.second;
2019 dout(20) << fg << " " << *dir << dendl;
2020
2021 bool update;
2022 if (dir->get_version() != 0) {
2023 update = dir->is_auth() && !dir->is_frozen();
2024 } else {
2025 update = false;
2026 dirstat_valid = false;
2027 }
2028
2029 fnode_t *pf = dir->get_projected_fnode();
2030 if (update)
2031 pf = dir->project_fnode();
2032
2033 if (pf->accounted_fragstat.version == pi->dirstat.version - 1) {
2034 dout(20) << fg << " fragstat " << pf->fragstat << dendl;
2035 dout(20) << fg << " accounted_fragstat " << pf->accounted_fragstat << dendl;
2036 pi->dirstat.add_delta(pf->fragstat, pf->accounted_fragstat, &touched_mtime, &touched_chattr);
2037 } else {
2038 dout(20) << fg << " skipping STALE accounted_fragstat " << pf->accounted_fragstat << dendl;
2039 }
2040
2041 if (pf->fragstat.nfiles < 0 ||
2042 pf->fragstat.nsubdirs < 0) {
2043 clog->error() << "bad/negative dir size on "
2044 << dir->dirfrag() << " " << pf->fragstat;
2045 assert(!"bad/negative fragstat" == g_conf->mds_verify_scatter);
2046
2047 if (pf->fragstat.nfiles < 0)
2048 pf->fragstat.nfiles = 0;
2049 if (pf->fragstat.nsubdirs < 0)
2050 pf->fragstat.nsubdirs = 0;
2051 }
2052
2053 if (update) {
2054 pf->accounted_fragstat = pf->fragstat;
2055 pf->fragstat.version = pf->accounted_fragstat.version = pi->dirstat.version;
2056 dout(10) << fg << " updated accounted_fragstat " << pf->fragstat << " on " << *dir << dendl;
2057 }
2058
2059 tmpdft.force_to_leaf(g_ceph_context, fg);
2060 dirstat.add(pf->fragstat);
2061 }
2062 if (touched_mtime)
2063 pi->mtime = pi->ctime = pi->dirstat.mtime;
2064 if (touched_chattr)
2065 pi->change_attr = pi->dirstat.change_attr;
2066 dout(20) << " final dirstat " << pi->dirstat << dendl;
2067
2068 if (dirstat_valid && !dirstat.same_sums(pi->dirstat)) {
2069 list<frag_t> ls;
2070 tmpdft.get_leaves_under(frag_t(), ls);
2071 for (list<frag_t>::iterator p = ls.begin(); p != ls.end(); ++p)
2072 if (!dirfrags.count(*p)) {
2073 dirstat_valid = false;
2074 break;
2075 }
2076 if (dirstat_valid) {
2077 if (state_test(CInode::STATE_REPAIRSTATS)) {
2078 dout(20) << " dirstat mismatch, fixing" << dendl;
2079 } else {
2080 clog->error() << "unmatched fragstat on " << ino() << ", inode has "
2081 << pi->dirstat << ", dirfrags have " << dirstat;
2082 assert(!"unmatched fragstat" == g_conf->mds_verify_scatter);
2083 }
2084 // trust the dirfrags for now
2085 version_t v = pi->dirstat.version;
2086 if (pi->dirstat.mtime > dirstat.mtime)
2087 dirstat.mtime = pi->dirstat.mtime;
2088 if (pi->dirstat.change_attr > dirstat.change_attr)
2089 dirstat.change_attr = pi->dirstat.change_attr;
2090 pi->dirstat = dirstat;
2091 pi->dirstat.version = v;
2092 }
2093 }
2094
2095 if (pi->dirstat.nfiles < 0 || pi->dirstat.nsubdirs < 0)
2096 {
2097 std::string path;
2098 make_path_string(path);
2099 clog->error() << "Inconsistent statistics detected: fragstat on inode "
2100 << ino() << " (" << path << "), inode has " << pi->dirstat;
2101 assert(!"bad/negative fragstat" == g_conf->mds_verify_scatter);
2102
2103 if (pi->dirstat.nfiles < 0)
2104 pi->dirstat.nfiles = 0;
2105 if (pi->dirstat.nsubdirs < 0)
2106 pi->dirstat.nsubdirs = 0;
2107 }
2108 }
2109 break;
2110
2111 case CEPH_LOCK_INEST:
2112 {
2113 fragtree_t tmpdft = dirfragtree;
2114 nest_info_t rstat;
2115 rstat.rsubdirs = 1;
2116 bool rstat_valid = true;
2117
2118 // adjust summation
2119 assert(is_auth());
2120 mempool_inode *pi = get_projected_inode();
2121 dout(20) << " orig rstat " << pi->rstat << dendl;
2122 pi->rstat.version++;
2123 for (const auto &p : dirfrags) {
2124 frag_t fg = p.first;
2125 CDir *dir = p.second;
2126 dout(20) << fg << " " << *dir << dendl;
2127
2128 bool update;
2129 if (dir->get_version() != 0) {
2130 update = dir->is_auth() && !dir->is_frozen();
2131 } else {
2132 update = false;
2133 rstat_valid = false;
2134 }
2135
2136 fnode_t *pf = dir->get_projected_fnode();
2137 if (update)
2138 pf = dir->project_fnode();
2139
2140 if (pf->accounted_rstat.version == pi->rstat.version-1) {
2141 // only pull this frag's dirty rstat inodes into the frag if
2142 // the frag is non-stale and updateable. if it's stale,
2143 // that info will just get thrown out!
2144 if (update)
2145 dir->assimilate_dirty_rstat_inodes();
2146
2147 dout(20) << fg << " rstat " << pf->rstat << dendl;
2148 dout(20) << fg << " accounted_rstat " << pf->accounted_rstat << dendl;
2149 dout(20) << fg << " dirty_old_rstat " << dir->dirty_old_rstat << dendl;
2150 mdcache->project_rstat_frag_to_inode(pf->rstat, pf->accounted_rstat,
2151 dir->first, CEPH_NOSNAP, this, true);
2152 for (auto &p : dir->dirty_old_rstat) {
2153 mdcache->project_rstat_frag_to_inode(p.second.rstat, p.second.accounted_rstat,
2154 p.second.first, p.first, this, true);
2155 }
2156 if (update) // dir contents not valid if frozen or non-auth
2157 dir->check_rstats();
2158 } else {
2159 dout(20) << fg << " skipping STALE accounted_rstat " << pf->accounted_rstat << dendl;
2160 }
2161 if (update) {
2162 pf->accounted_rstat = pf->rstat;
2163 dir->dirty_old_rstat.clear();
2164 pf->rstat.version = pf->accounted_rstat.version = pi->rstat.version;
2165 dir->check_rstats();
2166 dout(10) << fg << " updated accounted_rstat " << pf->rstat << " on " << *dir << dendl;
2167 }
2168
2169 tmpdft.force_to_leaf(g_ceph_context, fg);
2170 rstat.add(pf->rstat);
2171 }
2172 dout(20) << " final rstat " << pi->rstat << dendl;
2173
2174 if (rstat_valid && !rstat.same_sums(pi->rstat)) {
2175 list<frag_t> ls;
2176 tmpdft.get_leaves_under(frag_t(), ls);
2177 for (list<frag_t>::iterator p = ls.begin(); p != ls.end(); ++p)
2178 if (!dirfrags.count(*p)) {
2179 rstat_valid = false;
2180 break;
2181 }
2182 if (rstat_valid) {
2183 if (state_test(CInode::STATE_REPAIRSTATS)) {
2184 dout(20) << " rstat mismatch, fixing" << dendl;
2185 } else {
2186 clog->error() << "inconsistent rstat on inode " << ino()
2187 << ", inode has " << pi->rstat
2188 << ", directory fragments have " << rstat;
2189 assert(!"unmatched rstat" == g_conf->mds_verify_scatter);
2190 }
2191 // trust the dirfrag for now
2192 version_t v = pi->rstat.version;
2193 if (pi->rstat.rctime > rstat.rctime)
2194 rstat.rctime = pi->rstat.rctime;
2195 pi->rstat = rstat;
2196 pi->rstat.version = v;
2197 }
2198 }
2199
2200 mdcache->broadcast_quota_to_client(this);
2201 }
2202 break;
2203
2204 case CEPH_LOCK_IDFT:
2205 break;
2206
2207 default:
2208 ceph_abort();
2209 }
2210 }
2211
2212 void CInode::finish_scatter_gather_update_accounted(int type, MutationRef& mut, EMetaBlob *metablob)
2213 {
2214 dout(10) << "finish_scatter_gather_update_accounted " << type << " on " << *this << dendl;
2215 assert(is_auth());
2216
2217 for (const auto &p : dirfrags) {
2218 CDir *dir = p.second;
2219 if (!dir->is_auth() || dir->get_version() == 0 || dir->is_frozen())
2220 continue;
2221
2222 if (type == CEPH_LOCK_IDFT)
2223 continue; // nothing to do.
2224
2225 dout(10) << " journaling updated frag accounted_ on " << *dir << dendl;
2226 assert(dir->is_projected());
2227 fnode_t *pf = dir->get_projected_fnode();
2228 pf->version = dir->pre_dirty();
2229 mut->add_projected_fnode(dir);
2230 metablob->add_dir(dir, true);
2231 mut->auth_pin(dir);
2232
2233 if (type == CEPH_LOCK_INEST)
2234 dir->assimilate_dirty_rstat_inodes_finish(mut, metablob);
2235 }
2236 }
2237
2238 // waiting
2239
2240 bool CInode::is_frozen() const
2241 {
2242 if (is_frozen_inode()) return true;
2243 if (parent && parent->dir->is_frozen()) return true;
2244 return false;
2245 }
2246
2247 bool CInode::is_frozen_dir() const
2248 {
2249 if (parent && parent->dir->is_frozen_dir()) return true;
2250 return false;
2251 }
2252
2253 bool CInode::is_freezing() const
2254 {
2255 if (is_freezing_inode()) return true;
2256 if (parent && parent->dir->is_freezing()) return true;
2257 return false;
2258 }
2259
2260 void CInode::add_dir_waiter(frag_t fg, MDSInternalContextBase *c)
2261 {
2262 if (waiting_on_dir.empty())
2263 get(PIN_DIRWAITER);
2264 waiting_on_dir[fg].push_back(c);
2265 dout(10) << "add_dir_waiter frag " << fg << " " << c << " on " << *this << dendl;
2266 }
2267
2268 void CInode::take_dir_waiting(frag_t fg, list<MDSInternalContextBase*>& ls)
2269 {
2270 if (waiting_on_dir.empty())
2271 return;
2272
2273 auto it = waiting_on_dir.find(fg);
2274 if (it != waiting_on_dir.end()) {
2275 dout(10) << __func__ << " frag " << fg << " on " << *this << dendl;
2276 ls.splice(ls.end(), it->second);
2277 waiting_on_dir.erase(it);
2278
2279 if (waiting_on_dir.empty())
2280 put(PIN_DIRWAITER);
2281 }
2282 }
2283
2284 void CInode::add_waiter(uint64_t tag, MDSInternalContextBase *c)
2285 {
2286 dout(10) << "add_waiter tag " << std::hex << tag << std::dec << " " << c
2287 << " !ambig " << !state_test(STATE_AMBIGUOUSAUTH)
2288 << " !frozen " << !is_frozen_inode()
2289 << " !freezing " << !is_freezing_inode()
2290 << dendl;
2291 // wait on the directory?
2292 // make sure its not the inode that is explicitly ambiguous|freezing|frozen
2293 if (((tag & WAIT_SINGLEAUTH) && !state_test(STATE_AMBIGUOUSAUTH)) ||
2294 ((tag & WAIT_UNFREEZE) &&
2295 !is_frozen_inode() && !is_freezing_inode() && !is_frozen_auth_pin())) {
2296 dout(15) << "passing waiter up tree" << dendl;
2297 parent->dir->add_waiter(tag, c);
2298 return;
2299 }
2300 dout(15) << "taking waiter here" << dendl;
2301 MDSCacheObject::add_waiter(tag, c);
2302 }
2303
2304 void CInode::take_waiting(uint64_t mask, list<MDSInternalContextBase*>& ls)
2305 {
2306 if ((mask & WAIT_DIR) && !waiting_on_dir.empty()) {
2307 // take all dentry waiters
2308 while (!waiting_on_dir.empty()) {
2309 auto it = waiting_on_dir.begin();
2310 dout(10) << __func__ << " dirfrag " << it->first << " on " << *this << dendl;
2311 ls.splice(ls.end(), it->second);
2312 waiting_on_dir.erase(it);
2313 }
2314 put(PIN_DIRWAITER);
2315 }
2316
2317 // waiting
2318 MDSCacheObject::take_waiting(mask, ls);
2319 }
2320
2321 bool CInode::freeze_inode(int auth_pin_allowance)
2322 {
2323 assert(auth_pin_allowance > 0); // otherwise we need to adjust parent's nested_auth_pins
2324 assert(auth_pins >= auth_pin_allowance);
2325 if (auth_pins > auth_pin_allowance) {
2326 dout(10) << "freeze_inode - waiting for auth_pins to drop to " << auth_pin_allowance << dendl;
2327 auth_pin_freeze_allowance = auth_pin_allowance;
2328 get(PIN_FREEZING);
2329 state_set(STATE_FREEZING);
2330 return false;
2331 }
2332
2333 dout(10) << "freeze_inode - frozen" << dendl;
2334 assert(auth_pins == auth_pin_allowance);
2335 if (!state_test(STATE_FROZEN)) {
2336 get(PIN_FROZEN);
2337 state_set(STATE_FROZEN);
2338 }
2339 return true;
2340 }
2341
2342 void CInode::unfreeze_inode(list<MDSInternalContextBase*>& finished)
2343 {
2344 dout(10) << "unfreeze_inode" << dendl;
2345 if (state_test(STATE_FREEZING)) {
2346 state_clear(STATE_FREEZING);
2347 put(PIN_FREEZING);
2348 } else if (state_test(STATE_FROZEN)) {
2349 state_clear(STATE_FROZEN);
2350 put(PIN_FROZEN);
2351 } else
2352 ceph_abort();
2353 take_waiting(WAIT_UNFREEZE, finished);
2354 }
2355
2356 void CInode::unfreeze_inode()
2357 {
2358 list<MDSInternalContextBase*> finished;
2359 unfreeze_inode(finished);
2360 mdcache->mds->queue_waiters(finished);
2361 }
2362
2363 void CInode::freeze_auth_pin()
2364 {
2365 assert(state_test(CInode::STATE_FROZEN));
2366 state_set(CInode::STATE_FROZENAUTHPIN);
2367 }
2368
2369 void CInode::unfreeze_auth_pin()
2370 {
2371 assert(state_test(CInode::STATE_FROZENAUTHPIN));
2372 state_clear(CInode::STATE_FROZENAUTHPIN);
2373 if (!state_test(STATE_FREEZING|STATE_FROZEN)) {
2374 list<MDSInternalContextBase*> finished;
2375 take_waiting(WAIT_UNFREEZE, finished);
2376 mdcache->mds->queue_waiters(finished);
2377 }
2378 }
2379
2380 void CInode::clear_ambiguous_auth(list<MDSInternalContextBase*>& finished)
2381 {
2382 assert(state_test(CInode::STATE_AMBIGUOUSAUTH));
2383 state_clear(CInode::STATE_AMBIGUOUSAUTH);
2384 take_waiting(CInode::WAIT_SINGLEAUTH, finished);
2385 }
2386
2387 void CInode::clear_ambiguous_auth()
2388 {
2389 list<MDSInternalContextBase*> finished;
2390 clear_ambiguous_auth(finished);
2391 mdcache->mds->queue_waiters(finished);
2392 }
2393
2394 // auth_pins
2395 bool CInode::can_auth_pin(int *err_ret) const {
2396 int err;
2397 if (!is_auth()) {
2398 err = ERR_NOT_AUTH;
2399 } else if (is_freezing_inode() || is_frozen_inode() || is_frozen_auth_pin()) {
2400 err = ERR_EXPORTING_INODE;
2401 } else {
2402 if (parent)
2403 return parent->can_auth_pin(err_ret);
2404 err = 0;
2405 }
2406 if (err && err_ret)
2407 *err_ret = err;
2408 return !err;
2409 }
2410
2411 void CInode::auth_pin(void *by)
2412 {
2413 if (auth_pins == 0)
2414 get(PIN_AUTHPIN);
2415 auth_pins++;
2416
2417 #ifdef MDS_AUTHPIN_SET
2418 auth_pin_set.insert(by);
2419 #endif
2420
2421 dout(10) << "auth_pin by " << by << " on " << *this
2422 << " now " << auth_pins << "+" << nested_auth_pins
2423 << dendl;
2424
2425 if (parent)
2426 parent->adjust_nested_auth_pins(1, 1, this);
2427 }
2428
2429 void CInode::auth_unpin(void *by)
2430 {
2431 auth_pins--;
2432
2433 #ifdef MDS_AUTHPIN_SET
2434 assert(auth_pin_set.count(by));
2435 auth_pin_set.erase(auth_pin_set.find(by));
2436 #endif
2437
2438 if (auth_pins == 0)
2439 put(PIN_AUTHPIN);
2440
2441 dout(10) << "auth_unpin by " << by << " on " << *this
2442 << " now " << auth_pins << "+" << nested_auth_pins
2443 << dendl;
2444
2445 assert(auth_pins >= 0);
2446
2447 if (parent)
2448 parent->adjust_nested_auth_pins(-1, -1, by);
2449
2450 if (is_freezing_inode() &&
2451 auth_pins == auth_pin_freeze_allowance) {
2452 dout(10) << "auth_unpin freezing!" << dendl;
2453 get(PIN_FROZEN);
2454 put(PIN_FREEZING);
2455 state_clear(STATE_FREEZING);
2456 state_set(STATE_FROZEN);
2457 finish_waiting(WAIT_FROZEN);
2458 }
2459 }
2460
2461 void CInode::adjust_nested_auth_pins(int a, void *by)
2462 {
2463 assert(a);
2464 nested_auth_pins += a;
2465 dout(35) << "adjust_nested_auth_pins by " << by
2466 << " change " << a << " yields "
2467 << auth_pins << "+" << nested_auth_pins << dendl;
2468 assert(nested_auth_pins >= 0);
2469
2470 if (g_conf->mds_debug_auth_pins) {
2471 // audit
2472 int s = 0;
2473 for (const auto &p : dirfrags) {
2474 CDir *dir = p.second;
2475 if (!dir->is_subtree_root() && dir->get_cum_auth_pins())
2476 s++;
2477 }
2478 assert(s == nested_auth_pins);
2479 }
2480
2481 if (parent)
2482 parent->adjust_nested_auth_pins(a, 0, by);
2483 }
2484
2485
2486 // authority
2487
2488 mds_authority_t CInode::authority() const
2489 {
2490 if (inode_auth.first >= 0)
2491 return inode_auth;
2492
2493 if (parent)
2494 return parent->dir->authority();
2495
2496 // new items that are not yet linked in (in the committed plane) belong
2497 // to their first parent.
2498 if (!projected_parent.empty())
2499 return projected_parent.front()->dir->authority();
2500
2501 return CDIR_AUTH_UNDEF;
2502 }
2503
2504
2505 // SNAP
2506
2507 snapid_t CInode::get_oldest_snap()
2508 {
2509 snapid_t t = first;
2510 if (!old_inodes.empty())
2511 t = old_inodes.begin()->second.first;
2512 return MIN(t, oldest_snap);
2513 }
2514
2515 CInode::mempool_old_inode& CInode::cow_old_inode(snapid_t follows, bool cow_head)
2516 {
2517 assert(follows >= first);
2518
2519 mempool_inode *pi = cow_head ? get_projected_inode() : get_previous_projected_inode();
2520 mempool_xattr_map *px = cow_head ? get_projected_xattrs() : get_previous_projected_xattrs();
2521
2522 mempool_old_inode &old = old_inodes[follows];
2523 old.first = first;
2524 old.inode = *pi;
2525 old.xattrs = *px;
2526
2527 if (first < oldest_snap)
2528 oldest_snap = first;
2529
2530 dout(10) << " " << px->size() << " xattrs cowed, " << *px << dendl;
2531
2532 old.inode.trim_client_ranges(follows);
2533
2534 if (g_conf->mds_snap_rstat &&
2535 !(old.inode.rstat == old.inode.accounted_rstat))
2536 dirty_old_rstats.insert(follows);
2537
2538 first = follows+1;
2539
2540 dout(10) << "cow_old_inode " << (cow_head ? "head" : "previous_head" )
2541 << " to [" << old.first << "," << follows << "] on "
2542 << *this << dendl;
2543
2544 return old;
2545 }
2546
2547 void CInode::split_old_inode(snapid_t snap)
2548 {
2549 auto it = old_inodes.lower_bound(snap);
2550 assert(it != old_inodes.end() && it->second.first < snap);
2551
2552 mempool_old_inode &old = old_inodes[snap - 1];
2553 old = it->second;
2554
2555 it->second.first = snap;
2556 dout(10) << __func__ << " " << "[" << old.first << "," << it->first
2557 << "] to [" << snap << "," << it->first << "] on " << *this << dendl;
2558 }
2559
2560 void CInode::pre_cow_old_inode()
2561 {
2562 snapid_t follows = find_snaprealm()->get_newest_seq();
2563 if (first <= follows)
2564 cow_old_inode(follows, true);
2565 }
2566
2567 void CInode::purge_stale_snap_data(const set<snapid_t>& snaps)
2568 {
2569 dout(10) << "purge_stale_snap_data " << snaps << dendl;
2570
2571 for (auto it = old_inodes.begin(); it != old_inodes.end(); ) {
2572 const snapid_t &id = it->first;
2573 const auto &s = snaps.lower_bound(it->second.first);
2574 if (s == snaps.end() || *s > id) {
2575 dout(10) << " purging old_inode [" << it->second.first << "," << id << "]" << dendl;
2576 it = old_inodes.erase(it);
2577 } else {
2578 ++it;
2579 }
2580 }
2581 }
2582
2583 /*
2584 * pick/create an old_inode
2585 */
2586 CInode::mempool_old_inode * CInode::pick_old_inode(snapid_t snap)
2587 {
2588 auto it = old_inodes.lower_bound(snap); // p is first key >= to snap
2589 if (it != old_inodes.end() && it->second.first <= snap) {
2590 dout(10) << __func__ << " snap " << snap << " -> [" << it->second.first << "," << it->first << "]" << dendl;
2591 return &it->second;
2592 }
2593 dout(10) << "pick_old_inode snap " << snap << " -> nothing" << dendl;
2594 return NULL;
2595 }
2596
2597 void CInode::open_snaprealm(bool nosplit)
2598 {
2599 if (!snaprealm) {
2600 SnapRealm *parent = find_snaprealm();
2601 snaprealm = new SnapRealm(mdcache, this);
2602 if (parent) {
2603 dout(10) << "open_snaprealm " << snaprealm
2604 << " parent is " << parent
2605 << dendl;
2606 dout(30) << " siblings are " << parent->open_children << dendl;
2607 snaprealm->parent = parent;
2608 if (!nosplit)
2609 parent->split_at(snaprealm);
2610 parent->open_children.insert(snaprealm);
2611 }
2612 }
2613 }
2614 void CInode::close_snaprealm(bool nojoin)
2615 {
2616 if (snaprealm) {
2617 dout(15) << "close_snaprealm " << *snaprealm << dendl;
2618 snaprealm->close_parents();
2619 if (snaprealm->parent) {
2620 snaprealm->parent->open_children.erase(snaprealm);
2621 //if (!nojoin)
2622 //snaprealm->parent->join(snaprealm);
2623 }
2624 delete snaprealm;
2625 snaprealm = 0;
2626 }
2627 }
2628
2629 SnapRealm *CInode::find_snaprealm() const
2630 {
2631 const CInode *cur = this;
2632 while (!cur->snaprealm) {
2633 if (cur->get_parent_dn())
2634 cur = cur->get_parent_dn()->get_dir()->get_inode();
2635 else if (get_projected_parent_dn())
2636 cur = cur->get_projected_parent_dn()->get_dir()->get_inode();
2637 else
2638 break;
2639 }
2640 return cur->snaprealm;
2641 }
2642
2643 void CInode::encode_snap_blob(bufferlist &snapbl)
2644 {
2645 if (snaprealm) {
2646 ::encode(snaprealm->srnode, snapbl);
2647 dout(20) << "encode_snap_blob " << *snaprealm << dendl;
2648 }
2649 }
2650 void CInode::decode_snap_blob(bufferlist& snapbl)
2651 {
2652 if (snapbl.length()) {
2653 open_snaprealm();
2654 bufferlist::iterator p = snapbl.begin();
2655 ::decode(snaprealm->srnode, p);
2656 if (is_base()) {
2657 bool ok = snaprealm->_open_parents(NULL);
2658 assert(ok);
2659 }
2660 dout(20) << "decode_snap_blob " << *snaprealm << dendl;
2661 }
2662 }
2663
2664 void CInode::encode_snap(bufferlist& bl)
2665 {
2666 bufferlist snapbl;
2667 encode_snap_blob(snapbl);
2668 ::encode(snapbl, bl);
2669 ::encode(oldest_snap, bl);
2670 }
2671
2672 void CInode::decode_snap(bufferlist::iterator& p)
2673 {
2674 bufferlist snapbl;
2675 ::decode(snapbl, p);
2676 ::decode(oldest_snap, p);
2677 decode_snap_blob(snapbl);
2678 }
2679
2680 // =============================================
2681
2682 client_t CInode::calc_ideal_loner()
2683 {
2684 if (mdcache->is_readonly())
2685 return -1;
2686 if (!mds_caps_wanted.empty())
2687 return -1;
2688
2689 int n = 0;
2690 client_t loner = -1;
2691 for (map<client_t,Capability*>::iterator it = client_caps.begin();
2692 it != client_caps.end();
2693 ++it)
2694 if (!it->second->is_stale() &&
2695 ((it->second->wanted() & (CEPH_CAP_ANY_WR|CEPH_CAP_FILE_WR|CEPH_CAP_FILE_RD)) ||
2696 (inode.is_dir() && !has_subtree_root_dirfrag()))) {
2697 if (n)
2698 return -1;
2699 n++;
2700 loner = it->first;
2701 }
2702 return loner;
2703 }
2704
2705 bool CInode::choose_ideal_loner()
2706 {
2707 want_loner_cap = calc_ideal_loner();
2708 int changed = false;
2709 if (loner_cap >= 0 && loner_cap != want_loner_cap) {
2710 if (!try_drop_loner())
2711 return false;
2712 changed = true;
2713 }
2714
2715 if (want_loner_cap >= 0) {
2716 if (loner_cap < 0) {
2717 set_loner_cap(want_loner_cap);
2718 changed = true;
2719 } else
2720 assert(loner_cap == want_loner_cap);
2721 }
2722 return changed;
2723 }
2724
2725 bool CInode::try_set_loner()
2726 {
2727 assert(want_loner_cap >= 0);
2728 if (loner_cap >= 0 && loner_cap != want_loner_cap)
2729 return false;
2730 set_loner_cap(want_loner_cap);
2731 return true;
2732 }
2733
2734 void CInode::set_loner_cap(client_t l)
2735 {
2736 loner_cap = l;
2737 authlock.set_excl_client(loner_cap);
2738 filelock.set_excl_client(loner_cap);
2739 linklock.set_excl_client(loner_cap);
2740 xattrlock.set_excl_client(loner_cap);
2741 }
2742
2743 bool CInode::try_drop_loner()
2744 {
2745 if (loner_cap < 0)
2746 return true;
2747
2748 int other_allowed = get_caps_allowed_by_type(CAP_ANY);
2749 Capability *cap = get_client_cap(loner_cap);
2750 if (!cap ||
2751 (cap->issued() & ~other_allowed) == 0) {
2752 set_loner_cap(-1);
2753 return true;
2754 }
2755 return false;
2756 }
2757
2758
2759 // choose new lock state during recovery, based on issued caps
2760 void CInode::choose_lock_state(SimpleLock *lock, int allissued)
2761 {
2762 int shift = lock->get_cap_shift();
2763 int issued = (allissued >> shift) & lock->get_cap_mask();
2764 if (is_auth()) {
2765 if (lock->is_xlocked()) {
2766 // do nothing here
2767 } else if (lock->get_state() != LOCK_MIX) {
2768 if (issued & (CEPH_CAP_GEXCL | CEPH_CAP_GBUFFER))
2769 lock->set_state(LOCK_EXCL);
2770 else if (issued & CEPH_CAP_GWR)
2771 lock->set_state(LOCK_MIX);
2772 else if (lock->is_dirty()) {
2773 if (is_replicated())
2774 lock->set_state(LOCK_MIX);
2775 else
2776 lock->set_state(LOCK_LOCK);
2777 } else
2778 lock->set_state(LOCK_SYNC);
2779 }
2780 } else {
2781 // our states have already been chosen during rejoin.
2782 if (lock->is_xlocked())
2783 assert(lock->get_state() == LOCK_LOCK);
2784 }
2785 }
2786
2787 void CInode::choose_lock_states(int dirty_caps)
2788 {
2789 int issued = get_caps_issued() | dirty_caps;
2790 if (is_auth() && (issued & (CEPH_CAP_ANY_EXCL|CEPH_CAP_ANY_WR)))
2791 choose_ideal_loner();
2792 choose_lock_state(&filelock, issued);
2793 choose_lock_state(&nestlock, issued);
2794 choose_lock_state(&dirfragtreelock, issued);
2795 choose_lock_state(&authlock, issued);
2796 choose_lock_state(&xattrlock, issued);
2797 choose_lock_state(&linklock, issued);
2798 }
2799
2800 Capability *CInode::add_client_cap(client_t client, Session *session, SnapRealm *conrealm)
2801 {
2802 if (client_caps.empty()) {
2803 get(PIN_CAPS);
2804 if (conrealm)
2805 containing_realm = conrealm;
2806 else
2807 containing_realm = find_snaprealm();
2808 containing_realm->inodes_with_caps.push_back(&item_caps);
2809 dout(10) << "add_client_cap first cap, joining realm " << *containing_realm << dendl;
2810 }
2811
2812 if (client_caps.empty())
2813 mdcache->num_inodes_with_caps++;
2814
2815 Capability *cap = new Capability(this, ++mdcache->last_cap_id, client);
2816 assert(client_caps.count(client) == 0);
2817 client_caps[client] = cap;
2818
2819 session->add_cap(cap);
2820 if (session->is_stale())
2821 cap->mark_stale();
2822
2823 cap->client_follows = first-1;
2824
2825 containing_realm->add_cap(client, cap);
2826
2827 return cap;
2828 }
2829
2830 void CInode::remove_client_cap(client_t client)
2831 {
2832 assert(client_caps.count(client) == 1);
2833 Capability *cap = client_caps[client];
2834
2835 cap->item_session_caps.remove_myself();
2836 cap->item_revoking_caps.remove_myself();
2837 cap->item_client_revoking_caps.remove_myself();
2838 containing_realm->remove_cap(client, cap);
2839
2840 if (client == loner_cap)
2841 loner_cap = -1;
2842
2843 delete cap;
2844 client_caps.erase(client);
2845 if (client_caps.empty()) {
2846 dout(10) << "remove_client_cap last cap, leaving realm " << *containing_realm << dendl;
2847 put(PIN_CAPS);
2848 item_caps.remove_myself();
2849 containing_realm = NULL;
2850 item_open_file.remove_myself(); // unpin logsegment
2851 mdcache->num_inodes_with_caps--;
2852 }
2853
2854 //clean up advisory locks
2855 bool fcntl_removed = fcntl_locks ? fcntl_locks->remove_all_from(client) : false;
2856 bool flock_removed = flock_locks ? flock_locks->remove_all_from(client) : false;
2857 if (fcntl_removed || flock_removed) {
2858 list<MDSInternalContextBase*> waiters;
2859 take_waiting(CInode::WAIT_FLOCK, waiters);
2860 mdcache->mds->queue_waiters(waiters);
2861 }
2862 }
2863
2864 void CInode::move_to_realm(SnapRealm *realm)
2865 {
2866 dout(10) << "move_to_realm joining realm " << *realm
2867 << ", leaving realm " << *containing_realm << dendl;
2868 for (map<client_t,Capability*>::iterator q = client_caps.begin();
2869 q != client_caps.end();
2870 ++q) {
2871 containing_realm->remove_cap(q->first, q->second);
2872 realm->add_cap(q->first, q->second);
2873 }
2874 item_caps.remove_myself();
2875 realm->inodes_with_caps.push_back(&item_caps);
2876 containing_realm = realm;
2877 }
2878
2879 Capability *CInode::reconnect_cap(client_t client, const cap_reconnect_t& icr, Session *session)
2880 {
2881 Capability *cap = get_client_cap(client);
2882 if (cap) {
2883 // FIXME?
2884 cap->merge(icr.capinfo.wanted, icr.capinfo.issued);
2885 } else {
2886 cap = add_client_cap(client, session);
2887 cap->set_cap_id(icr.capinfo.cap_id);
2888 cap->set_wanted(icr.capinfo.wanted);
2889 cap->issue_norevoke(icr.capinfo.issued);
2890 cap->reset_seq();
2891 }
2892 cap->set_last_issue_stamp(ceph_clock_now());
2893 return cap;
2894 }
2895
2896 void CInode::clear_client_caps_after_export()
2897 {
2898 while (!client_caps.empty())
2899 remove_client_cap(client_caps.begin()->first);
2900 loner_cap = -1;
2901 want_loner_cap = -1;
2902 mds_caps_wanted.clear();
2903 }
2904
2905 void CInode::export_client_caps(map<client_t,Capability::Export>& cl)
2906 {
2907 for (map<client_t,Capability*>::iterator it = client_caps.begin();
2908 it != client_caps.end();
2909 ++it) {
2910 cl[it->first] = it->second->make_export();
2911 }
2912 }
2913
2914 // caps allowed
2915 int CInode::get_caps_liked() const
2916 {
2917 if (is_dir())
2918 return CEPH_CAP_PIN | CEPH_CAP_ANY_EXCL | CEPH_CAP_ANY_SHARED; // but not, say, FILE_RD|WR|WRBUFFER
2919 else
2920 return CEPH_CAP_ANY & ~CEPH_CAP_FILE_LAZYIO;
2921 }
2922
2923 int CInode::get_caps_allowed_ever() const
2924 {
2925 int allowed;
2926 if (is_dir())
2927 allowed = CEPH_CAP_PIN | CEPH_CAP_ANY_EXCL | CEPH_CAP_ANY_SHARED;
2928 else
2929 allowed = CEPH_CAP_ANY;
2930 return allowed &
2931 (CEPH_CAP_PIN |
2932 (filelock.gcaps_allowed_ever() << filelock.get_cap_shift()) |
2933 (authlock.gcaps_allowed_ever() << authlock.get_cap_shift()) |
2934 (xattrlock.gcaps_allowed_ever() << xattrlock.get_cap_shift()) |
2935 (linklock.gcaps_allowed_ever() << linklock.get_cap_shift()));
2936 }
2937
2938 int CInode::get_caps_allowed_by_type(int type) const
2939 {
2940 return
2941 CEPH_CAP_PIN |
2942 (filelock.gcaps_allowed(type) << filelock.get_cap_shift()) |
2943 (authlock.gcaps_allowed(type) << authlock.get_cap_shift()) |
2944 (xattrlock.gcaps_allowed(type) << xattrlock.get_cap_shift()) |
2945 (linklock.gcaps_allowed(type) << linklock.get_cap_shift());
2946 }
2947
2948 int CInode::get_caps_careful() const
2949 {
2950 return
2951 (filelock.gcaps_careful() << filelock.get_cap_shift()) |
2952 (authlock.gcaps_careful() << authlock.get_cap_shift()) |
2953 (xattrlock.gcaps_careful() << xattrlock.get_cap_shift()) |
2954 (linklock.gcaps_careful() << linklock.get_cap_shift());
2955 }
2956
2957 int CInode::get_xlocker_mask(client_t client) const
2958 {
2959 return
2960 (filelock.gcaps_xlocker_mask(client) << filelock.get_cap_shift()) |
2961 (authlock.gcaps_xlocker_mask(client) << authlock.get_cap_shift()) |
2962 (xattrlock.gcaps_xlocker_mask(client) << xattrlock.get_cap_shift()) |
2963 (linklock.gcaps_xlocker_mask(client) << linklock.get_cap_shift());
2964 }
2965
2966 int CInode::get_caps_allowed_for_client(Session *session, mempool_inode *file_i) const
2967 {
2968 client_t client = session->info.inst.name.num();
2969 int allowed;
2970 if (client == get_loner()) {
2971 // as the loner, we get the loner_caps AND any xlocker_caps for things we have xlocked
2972 allowed =
2973 get_caps_allowed_by_type(CAP_LONER) |
2974 (get_caps_allowed_by_type(CAP_XLOCKER) & get_xlocker_mask(client));
2975 } else {
2976 allowed = get_caps_allowed_by_type(CAP_ANY);
2977 }
2978
2979 if (!is_dir()) {
2980 if ((file_i->inline_data.version != CEPH_INLINE_NONE &&
2981 !session->connection->has_feature(CEPH_FEATURE_MDS_INLINE_DATA)) ||
2982 (!file_i->layout.pool_ns.empty() &&
2983 !session->connection->has_feature(CEPH_FEATURE_FS_FILE_LAYOUT_V2)))
2984 allowed &= ~(CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR);
2985 }
2986 return allowed;
2987 }
2988
2989 // caps issued, wanted
2990 int CInode::get_caps_issued(int *ploner, int *pother, int *pxlocker,
2991 int shift, int mask)
2992 {
2993 int c = 0;
2994 int loner = 0, other = 0, xlocker = 0;
2995 if (!is_auth()) {
2996 loner_cap = -1;
2997 }
2998
2999 for (map<client_t,Capability*>::const_iterator it = client_caps.begin();
3000 it != client_caps.end();
3001 ++it) {
3002 int i = it->second->issued();
3003 c |= i;
3004 if (it->first == loner_cap)
3005 loner |= i;
3006 else
3007 other |= i;
3008 xlocker |= get_xlocker_mask(it->first) & i;
3009 }
3010 if (ploner) *ploner = (loner >> shift) & mask;
3011 if (pother) *pother = (other >> shift) & mask;
3012 if (pxlocker) *pxlocker = (xlocker >> shift) & mask;
3013 return (c >> shift) & mask;
3014 }
3015
3016 bool CInode::is_any_caps_wanted() const
3017 {
3018 for (map<client_t,Capability*>::const_iterator it = client_caps.begin();
3019 it != client_caps.end();
3020 ++it)
3021 if (it->second->wanted())
3022 return true;
3023 return false;
3024 }
3025
3026 int CInode::get_caps_wanted(int *ploner, int *pother, int shift, int mask) const
3027 {
3028 int w = 0;
3029 int loner = 0, other = 0;
3030 for (map<client_t,Capability*>::const_iterator it = client_caps.begin();
3031 it != client_caps.end();
3032 ++it) {
3033 if (!it->second->is_stale()) {
3034 int t = it->second->wanted();
3035 w |= t;
3036 if (it->first == loner_cap)
3037 loner |= t;
3038 else
3039 other |= t;
3040 }
3041 //cout << " get_caps_wanted client " << it->first << " " << cap_string(it->second.wanted()) << endl;
3042 }
3043 if (is_auth())
3044 for (const auto &p : mds_caps_wanted) {
3045 w |= p.second;
3046 other |= p.second;
3047 //cout << " get_caps_wanted mds " << it->first << " " << cap_string(it->second) << endl;
3048 }
3049 if (ploner) *ploner = (loner >> shift) & mask;
3050 if (pother) *pother = (other >> shift) & mask;
3051 return (w >> shift) & mask;
3052 }
3053
3054 bool CInode::issued_caps_need_gather(SimpleLock *lock)
3055 {
3056 int loner_issued, other_issued, xlocker_issued;
3057 get_caps_issued(&loner_issued, &other_issued, &xlocker_issued,
3058 lock->get_cap_shift(), lock->get_cap_mask());
3059 if ((loner_issued & ~lock->gcaps_allowed(CAP_LONER)) ||
3060 (other_issued & ~lock->gcaps_allowed(CAP_ANY)) ||
3061 (xlocker_issued & ~lock->gcaps_allowed(CAP_XLOCKER)))
3062 return true;
3063 return false;
3064 }
3065
3066 void CInode::replicate_relax_locks()
3067 {
3068 //dout(10) << " relaxing locks on " << *this << dendl;
3069 assert(is_auth());
3070 assert(!is_replicated());
3071
3072 authlock.replicate_relax();
3073 linklock.replicate_relax();
3074 dirfragtreelock.replicate_relax();
3075 filelock.replicate_relax();
3076 xattrlock.replicate_relax();
3077 snaplock.replicate_relax();
3078 nestlock.replicate_relax();
3079 flocklock.replicate_relax();
3080 policylock.replicate_relax();
3081 }
3082
3083
3084
3085 // =============================================
3086
3087 int CInode::encode_inodestat(bufferlist& bl, Session *session,
3088 SnapRealm *dir_realm,
3089 snapid_t snapid,
3090 unsigned max_bytes,
3091 int getattr_caps)
3092 {
3093 client_t client = session->info.inst.name.num();
3094 assert(snapid);
3095 assert(session->connection);
3096
3097 bool valid = true;
3098
3099 // pick a version!
3100 mempool_inode *oi = &inode;
3101 mempool_inode *pi = get_projected_inode();
3102
3103 CInode::mempool_xattr_map *pxattrs = nullptr;
3104
3105 if (snapid != CEPH_NOSNAP) {
3106
3107 // for now at least, old_inodes is only defined/valid on the auth
3108 if (!is_auth())
3109 valid = false;
3110
3111 if (is_multiversion()) {
3112 auto it = old_inodes.lower_bound(snapid);
3113 if (it != old_inodes.end()) {
3114 if (it->second.first > snapid) {
3115 if (it != old_inodes.begin())
3116 --it;
3117 }
3118 if (it->second.first <= snapid && snapid <= it->first) {
3119 dout(15) << __func__ << " snapid " << snapid
3120 << " to old_inode [" << it->second.first << "," << it->first << "]"
3121 << " " << it->second.inode.rstat
3122 << dendl;
3123 auto &p = it->second;
3124 pi = oi = &p.inode;
3125 pxattrs = &p.xattrs;
3126 } else {
3127 // snapshoted remote dentry can result this
3128 dout(0) << "encode_inodestat old_inode for snapid " << snapid
3129 << " not found" << dendl;
3130 }
3131 }
3132 } else if (snapid < first || snapid > last) {
3133 // snapshoted remote dentry can result this
3134 dout(0) << "encode_inodestat [" << first << "," << last << "]"
3135 << " not match snapid " << snapid << dendl;
3136 }
3137 }
3138
3139 SnapRealm *realm = find_snaprealm();
3140
3141 bool no_caps = !valid ||
3142 session->is_stale() ||
3143 (dir_realm && realm != dir_realm) ||
3144 is_frozen() ||
3145 state_test(CInode::STATE_EXPORTINGCAPS);
3146 if (no_caps)
3147 dout(20) << "encode_inodestat no caps"
3148 << (!valid?", !valid":"")
3149 << (session->is_stale()?", session stale ":"")
3150 << ((dir_realm && realm != dir_realm)?", snaprealm differs ":"")
3151 << (is_frozen()?", frozen inode":"")
3152 << (state_test(CInode::STATE_EXPORTINGCAPS)?", exporting caps":"")
3153 << dendl;
3154
3155
3156 // "fake" a version that is old (stable) version, +1 if projected.
3157 version_t version = (oi->version * 2) + is_projected();
3158
3159 Capability *cap = get_client_cap(client);
3160 bool pfile = filelock.is_xlocked_by_client(client) || get_loner() == client;
3161 //(cap && (cap->issued() & CEPH_CAP_FILE_EXCL));
3162 bool pauth = authlock.is_xlocked_by_client(client) || get_loner() == client;
3163 bool plink = linklock.is_xlocked_by_client(client) || get_loner() == client;
3164 bool pxattr = xattrlock.is_xlocked_by_client(client) || get_loner() == client;
3165
3166 bool plocal = versionlock.get_last_wrlock_client() == client;
3167 bool ppolicy = policylock.is_xlocked_by_client(client) || get_loner()==client;
3168
3169 mempool_inode *any_i = (pfile|pauth|plink|pxattr|plocal) ? pi : oi;
3170
3171 dout(20) << " pfile " << pfile << " pauth " << pauth
3172 << " plink " << plink << " pxattr " << pxattr
3173 << " plocal " << plocal
3174 << " ctime " << any_i->ctime
3175 << " valid=" << valid << dendl;
3176
3177 // file
3178 mempool_inode *file_i = pfile ? pi:oi;
3179 file_layout_t layout;
3180 if (is_dir()) {
3181 layout = (ppolicy ? pi : oi)->layout;
3182 } else {
3183 layout = file_i->layout;
3184 }
3185
3186 // max_size is min of projected, actual
3187 uint64_t max_size =
3188 MIN(oi->client_ranges.count(client) ?
3189 oi->client_ranges[client].range.last : 0,
3190 pi->client_ranges.count(client) ?
3191 pi->client_ranges[client].range.last : 0);
3192
3193 // inline data
3194 version_t inline_version = 0;
3195 bufferlist inline_data;
3196 if (file_i->inline_data.version == CEPH_INLINE_NONE) {
3197 inline_version = CEPH_INLINE_NONE;
3198 } else if ((!cap && !no_caps) ||
3199 (cap && cap->client_inline_version < file_i->inline_data.version) ||
3200 (getattr_caps & CEPH_CAP_FILE_RD)) { // client requests inline data
3201 inline_version = file_i->inline_data.version;
3202 if (file_i->inline_data.length() > 0)
3203 inline_data = file_i->inline_data.get_data();
3204 }
3205
3206 // nest (do same as file... :/)
3207 if (cap) {
3208 cap->last_rbytes = file_i->rstat.rbytes;
3209 cap->last_rsize = file_i->rstat.rsize();
3210 }
3211
3212 // auth
3213 mempool_inode *auth_i = pauth ? pi:oi;
3214
3215 // link
3216 mempool_inode *link_i = plink ? pi:oi;
3217
3218 // xattr
3219 mempool_inode *xattr_i = pxattr ? pi:oi;
3220
3221 // xattr
3222 bufferlist xbl;
3223 version_t xattr_version;
3224 if ((!cap && !no_caps) ||
3225 (cap && cap->client_xattr_version < xattr_i->xattr_version) ||
3226 (getattr_caps & CEPH_CAP_XATTR_SHARED)) { // client requests xattrs
3227 if (!pxattrs)
3228 pxattrs = pxattr ? get_projected_xattrs() : &xattrs;
3229 ::encode(*pxattrs, xbl);
3230 xattr_version = xattr_i->xattr_version;
3231 } else {
3232 xattr_version = 0;
3233 }
3234
3235 // do we have room?
3236 if (max_bytes) {
3237 unsigned bytes = 8 + 8 + 4 + 8 + 8 + sizeof(ceph_mds_reply_cap) +
3238 sizeof(struct ceph_file_layout) + 4 + layout.pool_ns.size() +
3239 sizeof(struct ceph_timespec) * 3 +
3240 4 + 8 + 8 + 8 + 4 + 4 + 4 + 4 + 4 +
3241 8 + 8 + 8 + 8 + 8 + sizeof(struct ceph_timespec) +
3242 4;
3243 bytes += sizeof(__u32);
3244 bytes += (sizeof(__u32) + sizeof(__u32)) * dirfragtree._splits.size();
3245 bytes += sizeof(__u32) + symlink.length();
3246 bytes += sizeof(__u32) + xbl.length();
3247 bytes += sizeof(version_t) + sizeof(__u32) + inline_data.length();
3248 if (bytes > max_bytes)
3249 return -ENOSPC;
3250 }
3251
3252
3253 // encode caps
3254 struct ceph_mds_reply_cap ecap;
3255 if (snapid != CEPH_NOSNAP) {
3256 /*
3257 * snapped inodes (files or dirs) only get read-only caps. always
3258 * issue everything possible, since it is read only.
3259 *
3260 * if a snapped inode has caps, limit issued caps based on the
3261 * lock state.
3262 *
3263 * if it is a live inode, limit issued caps based on the lock
3264 * state.
3265 *
3266 * do NOT adjust cap issued state, because the client always
3267 * tracks caps per-snap and the mds does either per-interval or
3268 * multiversion.
3269 */
3270 ecap.caps = valid ? get_caps_allowed_by_type(CAP_ANY) : CEPH_STAT_CAP_INODE;
3271 if (last == CEPH_NOSNAP || is_any_caps())
3272 ecap.caps = ecap.caps & get_caps_allowed_for_client(session, file_i);
3273 ecap.seq = 0;
3274 ecap.mseq = 0;
3275 ecap.realm = 0;
3276 } else {
3277 if (!no_caps && !cap) {
3278 // add a new cap
3279 cap = add_client_cap(client, session, realm);
3280 if (is_auth())
3281 choose_ideal_loner();
3282 }
3283
3284 int issue = 0;
3285 if (!no_caps && cap) {
3286 int likes = get_caps_liked();
3287 int allowed = get_caps_allowed_for_client(session, file_i);
3288 issue = (cap->wanted() | likes) & allowed;
3289 cap->issue_norevoke(issue);
3290 issue = cap->pending();
3291 dout(10) << "encode_inodestat issuing " << ccap_string(issue)
3292 << " seq " << cap->get_last_seq() << dendl;
3293 } else if (cap && cap->is_new() && !dir_realm) {
3294 // alway issue new caps to client, otherwise the caps get lost
3295 assert(cap->is_stale());
3296 issue = cap->pending() | CEPH_CAP_PIN;
3297 cap->issue_norevoke(issue);
3298 dout(10) << "encode_inodestat issuing " << ccap_string(issue)
3299 << " seq " << cap->get_last_seq()
3300 << "(stale|new caps)" << dendl;
3301 }
3302
3303 if (issue) {
3304 cap->set_last_issue();
3305 cap->set_last_issue_stamp(ceph_clock_now());
3306 cap->clear_new();
3307 ecap.caps = issue;
3308 ecap.wanted = cap->wanted();
3309 ecap.cap_id = cap->get_cap_id();
3310 ecap.seq = cap->get_last_seq();
3311 ecap.mseq = cap->get_mseq();
3312 ecap.realm = realm->inode->ino();
3313 } else {
3314 ecap.cap_id = 0;
3315 ecap.caps = 0;
3316 ecap.seq = 0;
3317 ecap.mseq = 0;
3318 ecap.realm = 0;
3319 ecap.wanted = 0;
3320 }
3321 }
3322 ecap.flags = is_auth() ? CEPH_CAP_FLAG_AUTH : 0;
3323 dout(10) << "encode_inodestat caps " << ccap_string(ecap.caps)
3324 << " seq " << ecap.seq << " mseq " << ecap.mseq
3325 << " xattrv " << xattr_version << " len " << xbl.length()
3326 << dendl;
3327
3328 if (inline_data.length() && cap) {
3329 if ((cap->pending() | getattr_caps) & CEPH_CAP_FILE_SHARED) {
3330 dout(10) << "including inline version " << inline_version << dendl;
3331 cap->client_inline_version = inline_version;
3332 } else {
3333 dout(10) << "dropping inline version " << inline_version << dendl;
3334 inline_version = 0;
3335 inline_data.clear();
3336 }
3337 }
3338
3339 // include those xattrs?
3340 if (xbl.length() && cap) {
3341 if ((cap->pending() | getattr_caps) & CEPH_CAP_XATTR_SHARED) {
3342 dout(10) << "including xattrs version " << xattr_i->xattr_version << dendl;
3343 cap->client_xattr_version = xattr_i->xattr_version;
3344 } else {
3345 dout(10) << "dropping xattrs version " << xattr_i->xattr_version << dendl;
3346 xbl.clear(); // no xattrs .. XXX what's this about?!?
3347 xattr_version = 0;
3348 }
3349 }
3350
3351 /*
3352 * note: encoding matches MClientReply::InodeStat
3353 */
3354 ::encode(oi->ino, bl);
3355 ::encode(snapid, bl);
3356 ::encode(oi->rdev, bl);
3357 ::encode(version, bl);
3358
3359 ::encode(xattr_version, bl);
3360
3361 ::encode(ecap, bl);
3362 {
3363 ceph_file_layout legacy_layout;
3364 layout.to_legacy(&legacy_layout);
3365 ::encode(legacy_layout, bl);
3366 }
3367 ::encode(any_i->ctime, bl);
3368 ::encode(file_i->mtime, bl);
3369 ::encode(file_i->atime, bl);
3370 ::encode(file_i->time_warp_seq, bl);
3371 ::encode(file_i->size, bl);
3372 ::encode(max_size, bl);
3373 ::encode(file_i->truncate_size, bl);
3374 ::encode(file_i->truncate_seq, bl);
3375
3376 ::encode(auth_i->mode, bl);
3377 ::encode((uint32_t)auth_i->uid, bl);
3378 ::encode((uint32_t)auth_i->gid, bl);
3379
3380 ::encode(link_i->nlink, bl);
3381
3382 ::encode(file_i->dirstat.nfiles, bl);
3383 ::encode(file_i->dirstat.nsubdirs, bl);
3384 ::encode(file_i->rstat.rbytes, bl);
3385 ::encode(file_i->rstat.rfiles, bl);
3386 ::encode(file_i->rstat.rsubdirs, bl);
3387 ::encode(file_i->rstat.rctime, bl);
3388
3389 dirfragtree.encode(bl);
3390
3391 ::encode(symlink, bl);
3392 if (session->connection->has_feature(CEPH_FEATURE_DIRLAYOUTHASH)) {
3393 ::encode(file_i->dir_layout, bl);
3394 }
3395 ::encode(xbl, bl);
3396 if (session->connection->has_feature(CEPH_FEATURE_MDS_INLINE_DATA)) {
3397 ::encode(inline_version, bl);
3398 ::encode(inline_data, bl);
3399 }
3400 if (session->connection->has_feature(CEPH_FEATURE_MDS_QUOTA)) {
3401 mempool_inode *policy_i = ppolicy ? pi : oi;
3402 ::encode(policy_i->quota, bl);
3403 }
3404 if (session->connection->has_feature(CEPH_FEATURE_FS_FILE_LAYOUT_V2)) {
3405 ::encode(layout.pool_ns, bl);
3406 }
3407 if (session->connection->has_feature(CEPH_FEATURE_FS_BTIME)) {
3408 ::encode(any_i->btime, bl);
3409 ::encode(any_i->change_attr, bl);
3410 }
3411
3412 return valid;
3413 }
3414
3415 void CInode::encode_cap_message(MClientCaps *m, Capability *cap)
3416 {
3417 assert(cap);
3418
3419 client_t client = cap->get_client();
3420
3421 bool pfile = filelock.is_xlocked_by_client(client) || (cap->issued() & CEPH_CAP_FILE_EXCL);
3422 bool pauth = authlock.is_xlocked_by_client(client);
3423 bool plink = linklock.is_xlocked_by_client(client);
3424 bool pxattr = xattrlock.is_xlocked_by_client(client);
3425
3426 mempool_inode *oi = &inode;
3427 mempool_inode *pi = get_projected_inode();
3428 mempool_inode *i = (pfile|pauth|plink|pxattr) ? pi : oi;
3429
3430 dout(20) << "encode_cap_message pfile " << pfile
3431 << " pauth " << pauth << " plink " << plink << " pxattr " << pxattr
3432 << " ctime " << i->ctime << dendl;
3433
3434 i = pfile ? pi:oi;
3435 m->set_layout(i->layout);
3436 m->size = i->size;
3437 m->truncate_seq = i->truncate_seq;
3438 m->truncate_size = i->truncate_size;
3439 m->mtime = i->mtime;
3440 m->atime = i->atime;
3441 m->ctime = i->ctime;
3442 m->change_attr = i->change_attr;
3443 m->time_warp_seq = i->time_warp_seq;
3444 m->nfiles = i->dirstat.nfiles;
3445 m->nsubdirs = i->dirstat.nsubdirs;
3446
3447 if (cap->client_inline_version < i->inline_data.version) {
3448 m->inline_version = cap->client_inline_version = i->inline_data.version;
3449 if (i->inline_data.length() > 0)
3450 m->inline_data = i->inline_data.get_data();
3451 } else {
3452 m->inline_version = 0;
3453 }
3454
3455 // max_size is min of projected, actual.
3456 uint64_t oldms = oi->client_ranges.count(client) ? oi->client_ranges[client].range.last : 0;
3457 uint64_t newms = pi->client_ranges.count(client) ? pi->client_ranges[client].range.last : 0;
3458 m->max_size = MIN(oldms, newms);
3459
3460 i = pauth ? pi:oi;
3461 m->head.mode = i->mode;
3462 m->head.uid = i->uid;
3463 m->head.gid = i->gid;
3464
3465 i = plink ? pi:oi;
3466 m->head.nlink = i->nlink;
3467
3468 i = pxattr ? pi:oi;
3469 auto ix = pxattr ? get_projected_xattrs() : &xattrs;
3470 if ((cap->pending() & CEPH_CAP_XATTR_SHARED) &&
3471 i->xattr_version > cap->client_xattr_version) {
3472 dout(10) << " including xattrs v " << i->xattr_version << dendl;
3473 ::encode(*ix, m->xattrbl);
3474 m->head.xattr_version = i->xattr_version;
3475 cap->client_xattr_version = i->xattr_version;
3476 }
3477 }
3478
3479
3480
3481 void CInode::_encode_base(bufferlist& bl, uint64_t features)
3482 {
3483 ::encode(first, bl);
3484 ::encode(inode, bl, features);
3485 ::encode(symlink, bl);
3486 ::encode(dirfragtree, bl);
3487 ::encode(xattrs, bl);
3488 ::encode(old_inodes, bl, features);
3489 ::encode(damage_flags, bl);
3490 encode_snap(bl);
3491 }
3492 void CInode::_decode_base(bufferlist::iterator& p)
3493 {
3494 ::decode(first, p);
3495 ::decode(inode, p);
3496 {
3497 std::string tmp;
3498 ::decode(tmp, p);
3499 symlink = mempool::mds_co::string(boost::string_view(tmp));
3500 }
3501 ::decode(dirfragtree, p);
3502 ::decode(xattrs, p);
3503 ::decode(old_inodes, p);
3504 ::decode(damage_flags, p);
3505 decode_snap(p);
3506 }
3507
3508 void CInode::_encode_locks_full(bufferlist& bl)
3509 {
3510 ::encode(authlock, bl);
3511 ::encode(linklock, bl);
3512 ::encode(dirfragtreelock, bl);
3513 ::encode(filelock, bl);
3514 ::encode(xattrlock, bl);
3515 ::encode(snaplock, bl);
3516 ::encode(nestlock, bl);
3517 ::encode(flocklock, bl);
3518 ::encode(policylock, bl);
3519
3520 ::encode(loner_cap, bl);
3521 }
3522 void CInode::_decode_locks_full(bufferlist::iterator& p)
3523 {
3524 ::decode(authlock, p);
3525 ::decode(linklock, p);
3526 ::decode(dirfragtreelock, p);
3527 ::decode(filelock, p);
3528 ::decode(xattrlock, p);
3529 ::decode(snaplock, p);
3530 ::decode(nestlock, p);
3531 ::decode(flocklock, p);
3532 ::decode(policylock, p);
3533
3534 ::decode(loner_cap, p);
3535 set_loner_cap(loner_cap);
3536 want_loner_cap = loner_cap; // for now, we'll eval() shortly.
3537 }
3538
3539 void CInode::_encode_locks_state_for_replica(bufferlist& bl, bool need_recover)
3540 {
3541 authlock.encode_state_for_replica(bl);
3542 linklock.encode_state_for_replica(bl);
3543 dirfragtreelock.encode_state_for_replica(bl);
3544 filelock.encode_state_for_replica(bl);
3545 nestlock.encode_state_for_replica(bl);
3546 xattrlock.encode_state_for_replica(bl);
3547 snaplock.encode_state_for_replica(bl);
3548 flocklock.encode_state_for_replica(bl);
3549 policylock.encode_state_for_replica(bl);
3550 ::encode(need_recover, bl);
3551 }
3552
3553 void CInode::_encode_locks_state_for_rejoin(bufferlist& bl, int rep)
3554 {
3555 authlock.encode_state_for_replica(bl);
3556 linklock.encode_state_for_replica(bl);
3557 dirfragtreelock.encode_state_for_rejoin(bl, rep);
3558 filelock.encode_state_for_rejoin(bl, rep);
3559 nestlock.encode_state_for_rejoin(bl, rep);
3560 xattrlock.encode_state_for_replica(bl);
3561 snaplock.encode_state_for_replica(bl);
3562 flocklock.encode_state_for_replica(bl);
3563 policylock.encode_state_for_replica(bl);
3564 }
3565
3566 void CInode::_decode_locks_state(bufferlist::iterator& p, bool is_new)
3567 {
3568 authlock.decode_state(p, is_new);
3569 linklock.decode_state(p, is_new);
3570 dirfragtreelock.decode_state(p, is_new);
3571 filelock.decode_state(p, is_new);
3572 nestlock.decode_state(p, is_new);
3573 xattrlock.decode_state(p, is_new);
3574 snaplock.decode_state(p, is_new);
3575 flocklock.decode_state(p, is_new);
3576 policylock.decode_state(p, is_new);
3577
3578 bool need_recover;
3579 ::decode(need_recover, p);
3580 if (need_recover && is_new) {
3581 // Auth mds replicated this inode while it's recovering. Auth mds may take xlock on the lock
3582 // and change the object when replaying unsafe requests.
3583 authlock.mark_need_recover();
3584 linklock.mark_need_recover();
3585 dirfragtreelock.mark_need_recover();
3586 filelock.mark_need_recover();
3587 nestlock.mark_need_recover();
3588 xattrlock.mark_need_recover();
3589 snaplock.mark_need_recover();
3590 flocklock.mark_need_recover();
3591 policylock.mark_need_recover();
3592 }
3593 }
3594 void CInode::_decode_locks_rejoin(bufferlist::iterator& p, list<MDSInternalContextBase*>& waiters,
3595 list<SimpleLock*>& eval_locks, bool survivor)
3596 {
3597 authlock.decode_state_rejoin(p, waiters, survivor);
3598 linklock.decode_state_rejoin(p, waiters, survivor);
3599 dirfragtreelock.decode_state_rejoin(p, waiters, survivor);
3600 filelock.decode_state_rejoin(p, waiters, survivor);
3601 nestlock.decode_state_rejoin(p, waiters, survivor);
3602 xattrlock.decode_state_rejoin(p, waiters, survivor);
3603 snaplock.decode_state_rejoin(p, waiters, survivor);
3604 flocklock.decode_state_rejoin(p, waiters, survivor);
3605 policylock.decode_state_rejoin(p, waiters, survivor);
3606
3607 if (!dirfragtreelock.is_stable() && !dirfragtreelock.is_wrlocked())
3608 eval_locks.push_back(&dirfragtreelock);
3609 if (!filelock.is_stable() && !filelock.is_wrlocked())
3610 eval_locks.push_back(&filelock);
3611 if (!nestlock.is_stable() && !nestlock.is_wrlocked())
3612 eval_locks.push_back(&nestlock);
3613 }
3614
3615
3616 // IMPORT/EXPORT
3617
3618 void CInode::encode_export(bufferlist& bl)
3619 {
3620 ENCODE_START(5, 4, bl);
3621 _encode_base(bl, mdcache->mds->mdsmap->get_up_features());
3622
3623 ::encode(state, bl);
3624
3625 ::encode(pop, bl);
3626
3627 ::encode(get_replicas(), bl);
3628
3629 // include scatterlock info for any bounding CDirs
3630 bufferlist bounding;
3631 if (inode.is_dir())
3632 for (const auto &p : dirfrags) {
3633 CDir *dir = p.second;
3634 if (dir->state_test(CDir::STATE_EXPORTBOUND)) {
3635 ::encode(p.first, bounding);
3636 ::encode(dir->fnode.fragstat, bounding);
3637 ::encode(dir->fnode.accounted_fragstat, bounding);
3638 ::encode(dir->fnode.rstat, bounding);
3639 ::encode(dir->fnode.accounted_rstat, bounding);
3640 dout(10) << " encoded fragstat/rstat info for " << *dir << dendl;
3641 }
3642 }
3643 ::encode(bounding, bl);
3644
3645 _encode_locks_full(bl);
3646
3647 _encode_file_locks(bl);
3648
3649 ENCODE_FINISH(bl);
3650
3651 get(PIN_TEMPEXPORTING);
3652 }
3653
3654 void CInode::finish_export(utime_t now)
3655 {
3656 state &= MASK_STATE_EXPORT_KEPT;
3657
3658 pop.zero(now);
3659
3660 // just in case!
3661 //dirlock.clear_updated();
3662
3663 loner_cap = -1;
3664
3665 put(PIN_TEMPEXPORTING);
3666 }
3667
3668 void CInode::decode_import(bufferlist::iterator& p,
3669 LogSegment *ls)
3670 {
3671 DECODE_START(5, p);
3672
3673 _decode_base(p);
3674
3675 unsigned s;
3676 ::decode(s, p);
3677 state_set(STATE_AUTH | (s & MASK_STATE_EXPORTED));
3678
3679 if (is_dirty()) {
3680 get(PIN_DIRTY);
3681 _mark_dirty(ls);
3682 }
3683 if (is_dirty_parent()) {
3684 get(PIN_DIRTYPARENT);
3685 mark_dirty_parent(ls);
3686 }
3687
3688 ::decode(pop, ceph_clock_now(), p);
3689
3690 ::decode(get_replicas(), p);
3691 if (is_replicated())
3692 get(PIN_REPLICATED);
3693 replica_nonce = 0;
3694
3695 // decode fragstat info on bounding cdirs
3696 bufferlist bounding;
3697 ::decode(bounding, p);
3698 bufferlist::iterator q = bounding.begin();
3699 while (!q.end()) {
3700 frag_t fg;
3701 ::decode(fg, q);
3702 CDir *dir = get_dirfrag(fg);
3703 assert(dir); // we should have all bounds open
3704
3705 // Only take the remote's fragstat/rstat if we are non-auth for
3706 // this dirfrag AND the lock is NOT in a scattered (MIX) state.
3707 // We know lock is stable, and MIX is the only state in which
3708 // the inode auth (who sent us this data) may not have the best
3709 // info.
3710
3711 // HMM: Are there cases where dir->is_auth() is an insufficient
3712 // check because the dirfrag is under migration? That implies
3713 // it is frozen (and in a SYNC or LOCK state). FIXME.
3714
3715 if (dir->is_auth() ||
3716 filelock.get_state() == LOCK_MIX) {
3717 dout(10) << " skipped fragstat info for " << *dir << dendl;
3718 frag_info_t f;
3719 ::decode(f, q);
3720 ::decode(f, q);
3721 } else {
3722 ::decode(dir->fnode.fragstat, q);
3723 ::decode(dir->fnode.accounted_fragstat, q);
3724 dout(10) << " took fragstat info for " << *dir << dendl;
3725 }
3726 if (dir->is_auth() ||
3727 nestlock.get_state() == LOCK_MIX) {
3728 dout(10) << " skipped rstat info for " << *dir << dendl;
3729 nest_info_t n;
3730 ::decode(n, q);
3731 ::decode(n, q);
3732 } else {
3733 ::decode(dir->fnode.rstat, q);
3734 ::decode(dir->fnode.accounted_rstat, q);
3735 dout(10) << " took rstat info for " << *dir << dendl;
3736 }
3737 }
3738
3739 _decode_locks_full(p);
3740
3741 _decode_file_locks(p);
3742
3743 DECODE_FINISH(p);
3744 }
3745
3746
3747 void InodeStoreBase::dump(Formatter *f) const
3748 {
3749 inode.dump(f);
3750 f->dump_string("symlink", symlink);
3751 f->open_array_section("old_inodes");
3752 for (const auto &p : old_inodes) {
3753 f->open_object_section("old_inode");
3754 // The key is the last snapid, the first is in the mempool_old_inode
3755 f->dump_int("last", p.first);
3756 p.second.dump(f);
3757 f->close_section(); // old_inode
3758 }
3759 f->close_section(); // old_inodes
3760
3761 f->open_object_section("dirfragtree");
3762 dirfragtree.dump(f);
3763 f->close_section(); // dirfragtree
3764 }
3765
3766
3767 void InodeStore::generate_test_instances(list<InodeStore*> &ls)
3768 {
3769 InodeStore *populated = new InodeStore;
3770 populated->inode.ino = 0xdeadbeef;
3771 populated->symlink = "rhubarb";
3772 ls.push_back(populated);
3773 }
3774
3775 void CInode::validate_disk_state(CInode::validated_data *results,
3776 MDSInternalContext *fin)
3777 {
3778 class ValidationContinuation : public MDSContinuation {
3779 public:
3780 MDSInternalContext *fin;
3781 CInode *in;
3782 CInode::validated_data *results;
3783 bufferlist bl;
3784 CInode *shadow_in;
3785
3786 enum {
3787 START = 0,
3788 BACKTRACE,
3789 INODE,
3790 DIRFRAGS
3791 };
3792
3793 ValidationContinuation(CInode *i,
3794 CInode::validated_data *data_r,
3795 MDSInternalContext *fin_) :
3796 MDSContinuation(i->mdcache->mds->server),
3797 fin(fin_),
3798 in(i),
3799 results(data_r),
3800 shadow_in(NULL) {
3801 set_callback(START, static_cast<Continuation::stagePtr>(&ValidationContinuation::_start));
3802 set_callback(BACKTRACE, static_cast<Continuation::stagePtr>(&ValidationContinuation::_backtrace));
3803 set_callback(INODE, static_cast<Continuation::stagePtr>(&ValidationContinuation::_inode_disk));
3804 set_callback(DIRFRAGS, static_cast<Continuation::stagePtr>(&ValidationContinuation::_dirfrags));
3805 }
3806
3807 ~ValidationContinuation() override {
3808 if (shadow_in) {
3809 delete shadow_in;
3810 in->mdcache->num_shadow_inodes--;
3811 }
3812 }
3813
3814 /**
3815 * Fetch backtrace and set tag if tag is non-empty
3816 */
3817 void fetch_backtrace_and_tag(CInode *in, boost::string_view tag,
3818 Context *fin, int *bt_r, bufferlist *bt)
3819 {
3820 const int64_t pool = in->get_backtrace_pool();
3821 object_t oid = CInode::get_object_name(in->ino(), frag_t(), "");
3822
3823 ObjectOperation fetch;
3824 fetch.getxattr("parent", bt, bt_r);
3825 in->mdcache->mds->objecter->read(oid, object_locator_t(pool), fetch, CEPH_NOSNAP,
3826 NULL, 0, fin);
3827 if (!tag.empty()) {
3828 ObjectOperation scrub_tag;
3829 bufferlist tag_bl;
3830 ::encode(tag, tag_bl);
3831 scrub_tag.setxattr("scrub_tag", tag_bl);
3832 SnapContext snapc;
3833 in->mdcache->mds->objecter->mutate(oid, object_locator_t(pool), scrub_tag, snapc,
3834 ceph::real_clock::now(),
3835 0, NULL);
3836 }
3837 }
3838
3839 bool _start(int rval) {
3840 if (in->is_dirty()) {
3841 MDCache *mdcache = in->mdcache;
3842 mempool_inode& inode = in->inode;
3843 dout(20) << "validating a dirty CInode; results will be inconclusive"
3844 << dendl;
3845 }
3846 if (in->is_symlink()) {
3847 // there's nothing to do for symlinks!
3848 return true;
3849 }
3850
3851 C_OnFinisher *conf = new C_OnFinisher(get_io_callback(BACKTRACE),
3852 in->mdcache->mds->finisher);
3853
3854 // Whether we have a tag to apply depends on ScrubHeader (if one is
3855 // present)
3856 if (in->scrub_infop) {
3857 // I'm a non-orphan, so look up my ScrubHeader via my linkage
3858 boost::string_view tag = in->scrub_infop->header->get_tag();
3859 // Rather than using the usual CInode::fetch_backtrace,
3860 // use a special variant that optionally writes a tag in the same
3861 // operation.
3862 fetch_backtrace_and_tag(in, tag, conf,
3863 &results->backtrace.ondisk_read_retval, &bl);
3864 } else {
3865 // When we're invoked outside of ScrubStack we might be called
3866 // on an orphaned inode like /
3867 fetch_backtrace_and_tag(in, {}, conf,
3868 &results->backtrace.ondisk_read_retval, &bl);
3869 }
3870 return false;
3871 }
3872
3873 bool _backtrace(int rval) {
3874 // set up basic result reporting and make sure we got the data
3875 results->performed_validation = true; // at least, some of it!
3876 results->backtrace.checked = true;
3877
3878 const int64_t pool = in->get_backtrace_pool();
3879 inode_backtrace_t& memory_backtrace = results->backtrace.memory_value;
3880 in->build_backtrace(pool, memory_backtrace);
3881 bool equivalent, divergent;
3882 int memory_newer;
3883
3884 MDCache *mdcache = in->mdcache; // For the benefit of dout
3885 const mempool_inode& inode = in->inode; // For the benefit of dout
3886
3887 // Ignore rval because it's the result of a FAILOK operation
3888 // from fetch_backtrace_and_tag: the real result is in
3889 // backtrace.ondisk_read_retval
3890 dout(20) << "ondisk_read_retval: " << results->backtrace.ondisk_read_retval << dendl;
3891 if (results->backtrace.ondisk_read_retval != 0) {
3892 results->backtrace.error_str << "failed to read off disk; see retval";
3893 goto next;
3894 }
3895
3896 // extract the backtrace, and compare it to a newly-constructed one
3897 try {
3898 bufferlist::iterator p = bl.begin();
3899 ::decode(results->backtrace.ondisk_value, p);
3900 dout(10) << "decoded " << bl.length() << " bytes of backtrace successfully" << dendl;
3901 } catch (buffer::error&) {
3902 if (results->backtrace.ondisk_read_retval == 0 && rval != 0) {
3903 // Cases where something has clearly gone wrong with the overall
3904 // fetch op, though we didn't get a nonzero rc from the getxattr
3905 // operation. e.g. object missing.
3906 results->backtrace.ondisk_read_retval = rval;
3907 }
3908 results->backtrace.error_str << "failed to decode on-disk backtrace ("
3909 << bl.length() << " bytes)!";
3910 goto next;
3911 }
3912
3913 memory_newer = memory_backtrace.compare(results->backtrace.ondisk_value,
3914 &equivalent, &divergent);
3915
3916 if (divergent || memory_newer < 0) {
3917 // we're divergent, or on-disk version is newer
3918 results->backtrace.error_str << "On-disk backtrace is divergent or newer";
3919 } else {
3920 results->backtrace.passed = true;
3921 }
3922 next:
3923
3924 if (!results->backtrace.passed && in->scrub_infop->header->get_repair()) {
3925 std::string path;
3926 in->make_path_string(path);
3927 in->mdcache->mds->clog->warn() << "bad backtrace on inode " << in->ino()
3928 << "(" << path << "), rewriting it";
3929 in->mark_dirty_parent(in->mdcache->mds->mdlog->get_current_segment(),
3930 false);
3931 // Flag that we repaired this BT so that it won't go into damagetable
3932 results->backtrace.repaired = true;
3933
3934 // Flag that we did some repair work so that our repair operation
3935 // can be flushed at end of scrub
3936 in->scrub_infop->header->set_repaired();
3937 }
3938
3939 // If the inode's number was free in the InoTable, fix that
3940 // (#15619)
3941 {
3942 InoTable *inotable = mdcache->mds->inotable;
3943
3944 dout(10) << "scrub: inotable ino = " << inode.ino << dendl;
3945 dout(10) << "scrub: inotable free says "
3946 << inotable->is_marked_free(inode.ino) << dendl;
3947
3948 if (inotable->is_marked_free(inode.ino)) {
3949 LogChannelRef clog = in->mdcache->mds->clog;
3950 clog->error() << "scrub: inode wrongly marked free: 0x" << std::hex
3951 << inode.ino;
3952
3953 if (in->scrub_infop->header->get_repair()) {
3954 bool repaired = inotable->repair(inode.ino);
3955 if (repaired) {
3956 clog->error() << "inode table repaired for inode: 0x" << std::hex
3957 << inode.ino;
3958
3959 inotable->save();
3960 } else {
3961 clog->error() << "Cannot repair inotable while other operations"
3962 " are in progress";
3963 }
3964 }
3965 }
3966 }
3967
3968 // quit if we're a file, or kick off directory checks otherwise
3969 // TODO: validate on-disk inode for non-base directories
3970 if (!in->is_dir()) {
3971 return true;
3972 }
3973
3974 return validate_directory_data();
3975 }
3976
3977 bool validate_directory_data() {
3978 assert(in->is_dir());
3979
3980 if (in->is_base()) {
3981 if (!shadow_in) {
3982 shadow_in = new CInode(in->mdcache);
3983 in->mdcache->create_unlinked_system_inode(shadow_in, in->inode.ino, in->inode.mode);
3984 in->mdcache->num_shadow_inodes++;
3985 }
3986 shadow_in->fetch(get_internal_callback(INODE));
3987 return false;
3988 } else {
3989 results->inode.passed = true;
3990 return check_dirfrag_rstats();
3991 }
3992 }
3993
3994 bool _inode_disk(int rval) {
3995 results->inode.checked = true;
3996 results->inode.ondisk_read_retval = rval;
3997 results->inode.ondisk_value = shadow_in->inode;
3998 results->inode.memory_value = in->inode;
3999
4000 mempool_inode& si = shadow_in->inode;
4001 mempool_inode& i = in->inode;
4002 if (si.version > i.version) {
4003 // uh, what?
4004 results->inode.error_str << "On-disk inode is newer than in-memory one!";
4005 goto next;
4006 } else {
4007 bool divergent = false;
4008 int r = i.compare(si, &divergent);
4009 results->inode.passed = !divergent && r >= 0;
4010 if (!results->inode.passed) {
4011 results->inode.error_str <<
4012 "On-disk inode is divergent or newer than in-memory one!";
4013 goto next;
4014 }
4015 }
4016 next:
4017 return check_dirfrag_rstats();
4018 }
4019
4020 bool check_dirfrag_rstats() {
4021 MDSGatherBuilder gather(g_ceph_context);
4022 std::list<frag_t> frags;
4023 in->dirfragtree.get_leaves(frags);
4024 for (list<frag_t>::iterator p = frags.begin();
4025 p != frags.end();
4026 ++p) {
4027 CDir *dir = in->get_or_open_dirfrag(in->mdcache, *p);
4028 dir->scrub_info();
4029 if (!dir->scrub_infop->header)
4030 dir->scrub_infop->header = in->scrub_infop->header;
4031 if (dir->is_complete()) {
4032 dir->scrub_local();
4033 } else {
4034 dir->scrub_infop->need_scrub_local = true;
4035 dir->fetch(gather.new_sub(), false);
4036 }
4037 }
4038 if (gather.has_subs()) {
4039 gather.set_finisher(get_internal_callback(DIRFRAGS));
4040 gather.activate();
4041 return false;
4042 } else {
4043 return immediate(DIRFRAGS, 0);
4044 }
4045 }
4046
4047 bool _dirfrags(int rval) {
4048 int frags_errors = 0;
4049 // basic reporting setup
4050 results->raw_stats.checked = true;
4051 results->raw_stats.ondisk_read_retval = rval;
4052
4053 results->raw_stats.memory_value.dirstat = in->inode.dirstat;
4054 results->raw_stats.memory_value.rstat = in->inode.rstat;
4055 frag_info_t& dir_info = results->raw_stats.ondisk_value.dirstat;
4056 nest_info_t& nest_info = results->raw_stats.ondisk_value.rstat;
4057
4058 if (rval != 0) {
4059 results->raw_stats.error_str << "Failed to read dirfrags off disk";
4060 goto next;
4061 }
4062
4063 // check each dirfrag...
4064 for (const auto &p : in->dirfrags) {
4065 CDir *dir = p.second;
4066 assert(dir->get_version() > 0);
4067 nest_info.add(dir->fnode.accounted_rstat);
4068 dir_info.add(dir->fnode.accounted_fragstat);
4069 if (dir->scrub_infop &&
4070 dir->scrub_infop->pending_scrub_error) {
4071 dir->scrub_infop->pending_scrub_error = false;
4072 if (dir->scrub_infop->header->get_repair()) {
4073 results->raw_stats.repaired = true;
4074 results->raw_stats.error_str
4075 << "dirfrag(" << p.first << ") has bad stats (will be fixed); ";
4076 } else {
4077 results->raw_stats.error_str
4078 << "dirfrag(" << p.first << ") has bad stats; ";
4079 }
4080 frags_errors++;
4081 }
4082 }
4083 nest_info.rsubdirs++; // it gets one to account for self
4084 // ...and that their sum matches our inode settings
4085 if (!dir_info.same_sums(in->inode.dirstat) ||
4086 !nest_info.same_sums(in->inode.rstat)) {
4087 if (in->scrub_infop &&
4088 in->scrub_infop->header->get_repair()) {
4089 results->raw_stats.error_str
4090 << "freshly-calculated rstats don't match existing ones (will be fixed)";
4091 in->mdcache->repair_inode_stats(in);
4092 results->raw_stats.repaired = true;
4093 } else {
4094 results->raw_stats.error_str
4095 << "freshly-calculated rstats don't match existing ones";
4096 }
4097 goto next;
4098 }
4099 if (frags_errors > 0)
4100 goto next;
4101
4102 results->raw_stats.passed = true;
4103 next:
4104 return true;
4105 }
4106
4107 void _done() override {
4108 if ((!results->raw_stats.checked || results->raw_stats.passed) &&
4109 (!results->backtrace.checked || results->backtrace.passed) &&
4110 (!results->inode.checked || results->inode.passed))
4111 results->passed_validation = true;
4112 if (fin) {
4113 fin->complete(get_rval());
4114 }
4115 }
4116 };
4117
4118
4119 dout(10) << "scrub starting validate_disk_state on " << *this << dendl;
4120 ValidationContinuation *vc = new ValidationContinuation(this,
4121 results,
4122 fin);
4123 vc->begin();
4124 }
4125
4126 void CInode::validated_data::dump(Formatter *f) const
4127 {
4128 f->open_object_section("results");
4129 {
4130 f->dump_bool("performed_validation", performed_validation);
4131 f->dump_bool("passed_validation", passed_validation);
4132 f->open_object_section("backtrace");
4133 {
4134 f->dump_bool("checked", backtrace.checked);
4135 f->dump_bool("passed", backtrace.passed);
4136 f->dump_int("read_ret_val", backtrace.ondisk_read_retval);
4137 f->dump_stream("ondisk_value") << backtrace.ondisk_value;
4138 f->dump_stream("memoryvalue") << backtrace.memory_value;
4139 f->dump_string("error_str", backtrace.error_str.str());
4140 }
4141 f->close_section(); // backtrace
4142 f->open_object_section("raw_stats");
4143 {
4144 f->dump_bool("checked", raw_stats.checked);
4145 f->dump_bool("passed", raw_stats.passed);
4146 f->dump_int("read_ret_val", raw_stats.ondisk_read_retval);
4147 f->dump_stream("ondisk_value.dirstat") << raw_stats.ondisk_value.dirstat;
4148 f->dump_stream("ondisk_value.rstat") << raw_stats.ondisk_value.rstat;
4149 f->dump_stream("memory_value.dirrstat") << raw_stats.memory_value.dirstat;
4150 f->dump_stream("memory_value.rstat") << raw_stats.memory_value.rstat;
4151 f->dump_string("error_str", raw_stats.error_str.str());
4152 }
4153 f->close_section(); // raw_stats
4154 // dump failure return code
4155 int rc = 0;
4156 if (backtrace.checked && backtrace.ondisk_read_retval)
4157 rc = backtrace.ondisk_read_retval;
4158 if (inode.checked && inode.ondisk_read_retval)
4159 rc = inode.ondisk_read_retval;
4160 if (raw_stats.checked && raw_stats.ondisk_read_retval)
4161 rc = raw_stats.ondisk_read_retval;
4162 f->dump_int("return_code", rc);
4163 }
4164 f->close_section(); // results
4165 }
4166
4167 bool CInode::validated_data::all_damage_repaired() const
4168 {
4169 bool unrepaired =
4170 (raw_stats.checked && !raw_stats.passed && !raw_stats.repaired)
4171 ||
4172 (backtrace.checked && !backtrace.passed && !backtrace.repaired)
4173 ||
4174 (inode.checked && !inode.passed && !inode.repaired);
4175
4176 return !unrepaired;
4177 }
4178
4179 void CInode::dump(Formatter *f) const
4180 {
4181 InodeStoreBase::dump(f);
4182
4183 MDSCacheObject::dump(f);
4184
4185 f->open_object_section("versionlock");
4186 versionlock.dump(f);
4187 f->close_section();
4188
4189 f->open_object_section("authlock");
4190 authlock.dump(f);
4191 f->close_section();
4192
4193 f->open_object_section("linklock");
4194 linklock.dump(f);
4195 f->close_section();
4196
4197 f->open_object_section("dirfragtreelock");
4198 dirfragtreelock.dump(f);
4199 f->close_section();
4200
4201 f->open_object_section("filelock");
4202 filelock.dump(f);
4203 f->close_section();
4204
4205 f->open_object_section("xattrlock");
4206 xattrlock.dump(f);
4207 f->close_section();
4208
4209 f->open_object_section("snaplock");
4210 snaplock.dump(f);
4211 f->close_section();
4212
4213 f->open_object_section("nestlock");
4214 nestlock.dump(f);
4215 f->close_section();
4216
4217 f->open_object_section("flocklock");
4218 flocklock.dump(f);
4219 f->close_section();
4220
4221 f->open_object_section("policylock");
4222 policylock.dump(f);
4223 f->close_section();
4224
4225 f->open_array_section("states");
4226 MDSCacheObject::dump_states(f);
4227 if (state_test(STATE_EXPORTING))
4228 f->dump_string("state", "exporting");
4229 if (state_test(STATE_OPENINGDIR))
4230 f->dump_string("state", "openingdir");
4231 if (state_test(STATE_FREEZING))
4232 f->dump_string("state", "freezing");
4233 if (state_test(STATE_FROZEN))
4234 f->dump_string("state", "frozen");
4235 if (state_test(STATE_AMBIGUOUSAUTH))
4236 f->dump_string("state", "ambiguousauth");
4237 if (state_test(STATE_EXPORTINGCAPS))
4238 f->dump_string("state", "exportingcaps");
4239 if (state_test(STATE_NEEDSRECOVER))
4240 f->dump_string("state", "needsrecover");
4241 if (state_test(STATE_PURGING))
4242 f->dump_string("state", "purging");
4243 if (state_test(STATE_DIRTYPARENT))
4244 f->dump_string("state", "dirtyparent");
4245 if (state_test(STATE_DIRTYRSTAT))
4246 f->dump_string("state", "dirtyrstat");
4247 if (state_test(STATE_STRAYPINNED))
4248 f->dump_string("state", "straypinned");
4249 if (state_test(STATE_FROZENAUTHPIN))
4250 f->dump_string("state", "frozenauthpin");
4251 if (state_test(STATE_DIRTYPOOL))
4252 f->dump_string("state", "dirtypool");
4253 if (state_test(STATE_ORPHAN))
4254 f->dump_string("state", "orphan");
4255 if (state_test(STATE_MISSINGOBJS))
4256 f->dump_string("state", "missingobjs");
4257 f->close_section();
4258
4259 f->open_array_section("client_caps");
4260 for (map<client_t,Capability*>::const_iterator it = client_caps.begin();
4261 it != client_caps.end(); ++it) {
4262 f->open_object_section("client_cap");
4263 f->dump_int("client_id", it->first.v);
4264 f->dump_string("pending", ccap_string(it->second->pending()));
4265 f->dump_string("issued", ccap_string(it->second->issued()));
4266 f->dump_string("wanted", ccap_string(it->second->wanted()));
4267 f->dump_int("last_sent", it->second->get_last_sent());
4268 f->close_section();
4269 }
4270 f->close_section();
4271
4272 f->dump_int("loner", loner_cap.v);
4273 f->dump_int("want_loner", want_loner_cap.v);
4274
4275 f->open_array_section("mds_caps_wanted");
4276 for (const auto &p : mds_caps_wanted) {
4277 f->open_object_section("mds_cap_wanted");
4278 f->dump_int("rank", p.first);
4279 f->dump_string("cap", ccap_string(p.second));
4280 f->close_section();
4281 }
4282 f->close_section();
4283 }
4284
4285 /****** Scrub Stuff *****/
4286 void CInode::scrub_info_create() const
4287 {
4288 dout(25) << __func__ << dendl;
4289 assert(!scrub_infop);
4290
4291 // break out of const-land to set up implicit initial state
4292 CInode *me = const_cast<CInode*>(this);
4293 mempool_inode *in = me->get_projected_inode();
4294
4295 scrub_info_t *si = new scrub_info_t();
4296 si->scrub_start_stamp = si->last_scrub_stamp = in->last_scrub_stamp;
4297 si->scrub_start_version = si->last_scrub_version = in->last_scrub_version;
4298
4299 me->scrub_infop = si;
4300 }
4301
4302 void CInode::scrub_maybe_delete_info()
4303 {
4304 if (scrub_infop &&
4305 !scrub_infop->scrub_in_progress &&
4306 !scrub_infop->last_scrub_dirty) {
4307 delete scrub_infop;
4308 scrub_infop = NULL;
4309 }
4310 }
4311
4312 void CInode::scrub_initialize(CDentry *scrub_parent,
4313 ScrubHeaderRef& header,
4314 MDSInternalContextBase *f)
4315 {
4316 dout(20) << __func__ << " with scrub_version " << get_version() << dendl;
4317 if (scrub_is_in_progress()) {
4318 dout(20) << __func__ << " inode moved during scrub, reinitializing "
4319 << dendl;
4320 assert(scrub_infop->scrub_parent);
4321 CDentry *dn = scrub_infop->scrub_parent;
4322 CDir *dir = dn->dir;
4323 dn->put(CDentry::PIN_SCRUBPARENT);
4324 assert(dir->scrub_infop && dir->scrub_infop->directory_scrubbing);
4325 dir->scrub_infop->directories_scrubbing.erase(dn->key());
4326 dir->scrub_infop->others_scrubbing.erase(dn->key());
4327 }
4328 scrub_info();
4329 if (!scrub_infop)
4330 scrub_infop = new scrub_info_t();
4331
4332 if (get_projected_inode()->is_dir()) {
4333 // fill in dirfrag_stamps with initial state
4334 std::list<frag_t> frags;
4335 dirfragtree.get_leaves(frags);
4336 for (std::list<frag_t>::iterator i = frags.begin();
4337 i != frags.end();
4338 ++i) {
4339 if (header->get_force())
4340 scrub_infop->dirfrag_stamps[*i].reset();
4341 else
4342 scrub_infop->dirfrag_stamps[*i];
4343 }
4344 }
4345
4346 if (scrub_parent)
4347 scrub_parent->get(CDentry::PIN_SCRUBPARENT);
4348 scrub_infop->scrub_parent = scrub_parent;
4349 scrub_infop->on_finish = f;
4350 scrub_infop->scrub_in_progress = true;
4351 scrub_infop->children_scrubbed = false;
4352 scrub_infop->header = header;
4353
4354 scrub_infop->scrub_start_version = get_version();
4355 scrub_infop->scrub_start_stamp = ceph_clock_now();
4356 // right now we don't handle remote inodes
4357 }
4358
4359 int CInode::scrub_dirfrag_next(frag_t* out_dirfrag)
4360 {
4361 dout(20) << __func__ << dendl;
4362 assert(scrub_is_in_progress());
4363
4364 if (!is_dir()) {
4365 return -ENOTDIR;
4366 }
4367
4368 std::map<frag_t, scrub_stamp_info_t>::iterator i =
4369 scrub_infop->dirfrag_stamps.begin();
4370
4371 while (i != scrub_infop->dirfrag_stamps.end()) {
4372 if (i->second.scrub_start_version < scrub_infop->scrub_start_version) {
4373 i->second.scrub_start_version = get_projected_version();
4374 i->second.scrub_start_stamp = ceph_clock_now();
4375 *out_dirfrag = i->first;
4376 dout(20) << " return frag " << *out_dirfrag << dendl;
4377 return 0;
4378 }
4379 ++i;
4380 }
4381
4382 dout(20) << " no frags left, ENOENT " << dendl;
4383 return ENOENT;
4384 }
4385
4386 void CInode::scrub_dirfrags_scrubbing(list<frag_t>* out_dirfrags)
4387 {
4388 assert(out_dirfrags != NULL);
4389 assert(scrub_infop != NULL);
4390
4391 out_dirfrags->clear();
4392 std::map<frag_t, scrub_stamp_info_t>::iterator i =
4393 scrub_infop->dirfrag_stamps.begin();
4394
4395 while (i != scrub_infop->dirfrag_stamps.end()) {
4396 if (i->second.scrub_start_version >= scrub_infop->scrub_start_version) {
4397 if (i->second.last_scrub_version < scrub_infop->scrub_start_version)
4398 out_dirfrags->push_back(i->first);
4399 } else {
4400 return;
4401 }
4402
4403 ++i;
4404 }
4405 }
4406
4407 void CInode::scrub_dirfrag_finished(frag_t dirfrag)
4408 {
4409 dout(20) << __func__ << " on frag " << dirfrag << dendl;
4410 assert(scrub_is_in_progress());
4411
4412 std::map<frag_t, scrub_stamp_info_t>::iterator i =
4413 scrub_infop->dirfrag_stamps.find(dirfrag);
4414 assert(i != scrub_infop->dirfrag_stamps.end());
4415
4416 scrub_stamp_info_t &si = i->second;
4417 si.last_scrub_stamp = si.scrub_start_stamp;
4418 si.last_scrub_version = si.scrub_start_version;
4419 }
4420
4421 void CInode::scrub_finished(MDSInternalContextBase **c) {
4422 dout(20) << __func__ << dendl;
4423 assert(scrub_is_in_progress());
4424 for (std::map<frag_t, scrub_stamp_info_t>::iterator i =
4425 scrub_infop->dirfrag_stamps.begin();
4426 i != scrub_infop->dirfrag_stamps.end();
4427 ++i) {
4428 if(i->second.last_scrub_version != i->second.scrub_start_version) {
4429 derr << i->second.last_scrub_version << " != "
4430 << i->second.scrub_start_version << dendl;
4431 }
4432 assert(i->second.last_scrub_version == i->second.scrub_start_version);
4433 }
4434
4435 scrub_infop->last_scrub_version = scrub_infop->scrub_start_version;
4436 scrub_infop->last_scrub_stamp = scrub_infop->scrub_start_stamp;
4437 scrub_infop->last_scrub_dirty = true;
4438 scrub_infop->scrub_in_progress = false;
4439
4440 if (scrub_infop->scrub_parent) {
4441 CDentry *dn = scrub_infop->scrub_parent;
4442 scrub_infop->scrub_parent = NULL;
4443 dn->dir->scrub_dentry_finished(dn);
4444 dn->put(CDentry::PIN_SCRUBPARENT);
4445 }
4446
4447 *c = scrub_infop->on_finish;
4448 scrub_infop->on_finish = NULL;
4449
4450 if (scrub_infop->header->get_origin() == this) {
4451 // We are at the point that a tagging scrub was initiated
4452 LogChannelRef clog = mdcache->mds->clog;
4453 if (scrub_infop->header->get_tag().empty()) {
4454 clog->info() << "scrub complete";
4455 } else {
4456 clog->info() << "scrub complete with tag '"
4457 << scrub_infop->header->get_tag() << "'";
4458 }
4459 }
4460 }
4461
4462 int64_t CInode::get_backtrace_pool() const
4463 {
4464 if (is_dir()) {
4465 return mdcache->mds->mdsmap->get_metadata_pool();
4466 } else {
4467 // Files are required to have an explicit layout that specifies
4468 // a pool
4469 assert(inode.layout.pool_id != -1);
4470 return inode.layout.pool_id;
4471 }
4472 }
4473
4474 void CInode::maybe_export_pin(bool update)
4475 {
4476 if (!g_conf->mds_bal_export_pin)
4477 return;
4478 if (!is_dir() || !is_normal())
4479 return;
4480
4481 mds_rank_t export_pin = get_export_pin(false);
4482 if (export_pin == MDS_RANK_NONE && !update)
4483 return;
4484
4485 if (state_test(CInode::STATE_QUEUEDEXPORTPIN))
4486 return;
4487
4488 bool queue = false;
4489 for (auto p = dirfrags.begin(); p != dirfrags.end(); p++) {
4490 CDir *dir = p->second;
4491 if (!dir->is_auth())
4492 continue;
4493 if (export_pin != MDS_RANK_NONE) {
4494 if (dir->is_subtree_root()) {
4495 // set auxsubtree bit or export it
4496 if (!dir->state_test(CDir::STATE_AUXSUBTREE) ||
4497 export_pin != dir->get_dir_auth().first)
4498 queue = true;
4499 } else {
4500 // create aux subtree or export it
4501 queue = true;
4502 }
4503 } else {
4504 // clear aux subtrees ?
4505 queue = dir->state_test(CDir::STATE_AUXSUBTREE);
4506 }
4507 if (queue) {
4508 state_set(CInode::STATE_QUEUEDEXPORTPIN);
4509 mdcache->export_pin_queue.insert(this);
4510 break;
4511 }
4512 }
4513 }
4514
4515 void CInode::set_export_pin(mds_rank_t rank)
4516 {
4517 assert(is_dir());
4518 assert(is_projected());
4519 get_projected_inode()->export_pin = rank;
4520 }
4521
4522 mds_rank_t CInode::get_export_pin(bool inherit) const
4523 {
4524 /* An inode that is export pinned may not necessarily be a subtree root, we
4525 * need to traverse the parents. A base or system inode cannot be pinned.
4526 * N.B. inodes not yet linked into a dir (i.e. anonymous inodes) will not
4527 * have a parent yet.
4528 */
4529 const CInode *in = this;
4530 while (true) {
4531 if (in->is_system())
4532 break;
4533 const CDentry *pdn = in->get_parent_dn();
4534 if (!pdn)
4535 break;
4536 // ignore export pin for unlinked directory
4537 if (in->get_inode().nlink == 0)
4538 break;
4539 if (in->get_inode().export_pin >= 0)
4540 return in->get_inode().export_pin;
4541
4542 if (!inherit)
4543 break;
4544 in = pdn->get_dir()->inode;
4545 }
4546 return MDS_RANK_NONE;
4547 }
4548
4549 bool CInode::is_exportable(mds_rank_t dest) const
4550 {
4551 mds_rank_t pin = get_export_pin();
4552 if (pin == dest) {
4553 return true;
4554 } else if (pin >= 0) {
4555 return false;
4556 } else {
4557 return true;
4558 }
4559 }
4560
4561 MEMPOOL_DEFINE_OBJECT_FACTORY(CInode, co_inode, mds_co);