]> git.proxmox.com Git - ceph.git/blame - ceph/src/mds/CInode.cc
update sources to 12.2.8
[ceph.git] / ceph / src / mds / CInode.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include "include/int_types.h"
16#include "common/errno.h"
17
18#include <string>
19#include <stdio.h>
20
21#include "CInode.h"
22#include "CDir.h"
23#include "CDentry.h"
24
25#include "MDSRank.h"
26#include "MDCache.h"
27#include "MDLog.h"
28#include "Locker.h"
29#include "Mutation.h"
30
31#include "events/EUpdate.h"
32
33#include "osdc/Objecter.h"
34
35#include "snap.h"
36
37#include "LogSegment.h"
38
39#include "common/Clock.h"
40
41#include "messages/MLock.h"
42#include "messages/MClientCaps.h"
43
44#include "common/config.h"
45#include "global/global_context.h"
46#include "include/assert.h"
47
48#include "mds/MDSContinuation.h"
49#include "mds/InoTable.h"
50
51#define dout_context g_ceph_context
52#define dout_subsys ceph_subsys_mds
53#undef dout_prefix
54#define dout_prefix *_dout << "mds." << mdcache->mds->get_nodeid() << ".cache.ino(" << inode.ino << ") "
55
56
57class CInodeIOContext : public MDSIOContextBase
58{
59protected:
60 CInode *in;
61 MDSRank *get_mds() override {return in->mdcache->mds;}
62public:
63 explicit CInodeIOContext(CInode *in_) : in(in_) {
64 assert(in != NULL);
65 }
66};
67
68
69LockType CInode::versionlock_type(CEPH_LOCK_IVERSION);
70LockType CInode::authlock_type(CEPH_LOCK_IAUTH);
71LockType CInode::linklock_type(CEPH_LOCK_ILINK);
72LockType CInode::dirfragtreelock_type(CEPH_LOCK_IDFT);
73LockType CInode::filelock_type(CEPH_LOCK_IFILE);
74LockType CInode::xattrlock_type(CEPH_LOCK_IXATTR);
75LockType CInode::snaplock_type(CEPH_LOCK_ISNAP);
76LockType CInode::nestlock_type(CEPH_LOCK_INEST);
77LockType CInode::flocklock_type(CEPH_LOCK_IFLOCK);
78LockType CInode::policylock_type(CEPH_LOCK_IPOLICY);
79
80//int cinode_pins[CINODE_NUM_PINS]; // counts
81ostream& CInode::print_db_line_prefix(ostream& out)
82{
83 return out << ceph_clock_now() << " mds." << mdcache->mds->get_nodeid() << ".cache.ino(" << inode.ino << ") ";
84}
85
86/*
87 * write caps and lock ids
88 */
89struct cinode_lock_info_t cinode_lock_info[] = {
90 { CEPH_LOCK_IFILE, CEPH_CAP_ANY_FILE_WR },
91 { CEPH_LOCK_IAUTH, CEPH_CAP_AUTH_EXCL },
92 { CEPH_LOCK_ILINK, CEPH_CAP_LINK_EXCL },
93 { CEPH_LOCK_IXATTR, CEPH_CAP_XATTR_EXCL },
94};
95int num_cinode_locks = sizeof(cinode_lock_info) / sizeof(cinode_lock_info[0]);
96
97
98
99ostream& operator<<(ostream& out, const CInode& in)
100{
101 string path;
102 in.make_path_string(path, true);
103
104 out << "[inode " << in.inode.ino;
105 out << " ["
106 << (in.is_multiversion() ? "...":"")
107 << in.first << "," << in.last << "]";
108 out << " " << path << (in.is_dir() ? "/":"");
109
110 if (in.is_auth()) {
111 out << " auth";
112 if (in.is_replicated())
113 out << in.get_replicas();
114 } else {
115 mds_authority_t a = in.authority();
116 out << " rep@" << a.first;
117 if (a.second != CDIR_AUTH_UNKNOWN)
118 out << "," << a.second;
119 out << "." << in.get_replica_nonce();
120 }
121
122 if (in.is_symlink())
123 out << " symlink='" << in.symlink << "'";
124 if (in.is_dir() && !in.dirfragtree.empty())
125 out << " " << in.dirfragtree;
126
127 out << " v" << in.get_version();
128 if (in.get_projected_version() > in.get_version())
129 out << " pv" << in.get_projected_version();
130
131 if (in.is_auth_pinned()) {
132 out << " ap=" << in.get_num_auth_pins() << "+" << in.get_num_nested_auth_pins();
133#ifdef MDS_AUTHPIN_SET
134 out << "(" << in.auth_pin_set << ")";
135#endif
136 }
137
138 if (in.snaprealm)
139 out << " snaprealm=" << in.snaprealm;
140
141 if (in.state_test(CInode::STATE_AMBIGUOUSAUTH)) out << " AMBIGAUTH";
142 if (in.state_test(CInode::STATE_NEEDSRECOVER)) out << " needsrecover";
143 if (in.state_test(CInode::STATE_RECOVERING)) out << " recovering";
144 if (in.state_test(CInode::STATE_DIRTYPARENT)) out << " dirtyparent";
145 if (in.state_test(CInode::STATE_MISSINGOBJS)) out << " missingobjs";
146 if (in.is_freezing_inode()) out << " FREEZING=" << in.auth_pin_freeze_allowance;
147 if (in.is_frozen_inode()) out << " FROZEN";
148 if (in.is_frozen_auth_pin()) out << " FROZEN_AUTHPIN";
149
94b18763 150 const CInode::mempool_inode *pi = in.get_projected_inode();
7c673cae
FG
151 if (pi->is_truncating())
152 out << " truncating(" << pi->truncate_from << " to " << pi->truncate_size << ")";
153
154 if (in.inode.is_dir()) {
155 out << " " << in.inode.dirstat;
156 if (g_conf->mds_debug_scatterstat && in.is_projected()) {
94b18763 157 const CInode::mempool_inode *pi = in.get_projected_inode();
7c673cae
FG
158 out << "->" << pi->dirstat;
159 }
160 } else {
161 out << " s=" << in.inode.size;
162 if (in.inode.nlink != 1)
163 out << " nl=" << in.inode.nlink;
164 }
165
166 // rstat
167 out << " " << in.inode.rstat;
168 if (!(in.inode.rstat == in.inode.accounted_rstat))
169 out << "/" << in.inode.accounted_rstat;
170 if (g_conf->mds_debug_scatterstat && in.is_projected()) {
94b18763 171 const CInode::mempool_inode *pi = in.get_projected_inode();
7c673cae
FG
172 out << "->" << pi->rstat;
173 if (!(pi->rstat == pi->accounted_rstat))
174 out << "/" << pi->accounted_rstat;
175 }
176
177 if (!in.client_need_snapflush.empty())
178 out << " need_snapflush=" << in.client_need_snapflush;
179
180
181 // locks
182 if (!in.authlock.is_sync_and_unlocked())
183 out << " " << in.authlock;
184 if (!in.linklock.is_sync_and_unlocked())
185 out << " " << in.linklock;
186 if (in.inode.is_dir()) {
187 if (!in.dirfragtreelock.is_sync_and_unlocked())
188 out << " " << in.dirfragtreelock;
189 if (!in.snaplock.is_sync_and_unlocked())
190 out << " " << in.snaplock;
191 if (!in.nestlock.is_sync_and_unlocked())
192 out << " " << in.nestlock;
193 if (!in.policylock.is_sync_and_unlocked())
194 out << " " << in.policylock;
195 } else {
196 if (!in.flocklock.is_sync_and_unlocked())
197 out << " " << in.flocklock;
198 }
199 if (!in.filelock.is_sync_and_unlocked())
200 out << " " << in.filelock;
201 if (!in.xattrlock.is_sync_and_unlocked())
202 out << " " << in.xattrlock;
203 if (!in.versionlock.is_sync_and_unlocked())
204 out << " " << in.versionlock;
205
206 // hack: spit out crap on which clients have caps
207 if (in.inode.client_ranges.size())
208 out << " cr=" << in.inode.client_ranges;
209
210 if (!in.get_client_caps().empty()) {
211 out << " caps={";
212 for (map<client_t,Capability*>::const_iterator it = in.get_client_caps().begin();
213 it != in.get_client_caps().end();
214 ++it) {
215 if (it != in.get_client_caps().begin()) out << ",";
216 out << it->first << "="
217 << ccap_string(it->second->pending());
218 if (it->second->issued() != it->second->pending())
219 out << "/" << ccap_string(it->second->issued());
220 out << "/" << ccap_string(it->second->wanted())
221 << "@" << it->second->get_last_sent();
222 }
223 out << "}";
224 if (in.get_loner() >= 0 || in.get_wanted_loner() >= 0) {
225 out << ",l=" << in.get_loner();
226 if (in.get_loner() != in.get_wanted_loner())
227 out << "(" << in.get_wanted_loner() << ")";
228 }
229 }
230 if (!in.get_mds_caps_wanted().empty()) {
231 out << " mcw={";
94b18763
FG
232 bool first = true;
233 for (const auto &p : in.get_mds_caps_wanted()) {
234 if (!first)
7c673cae 235 out << ',';
94b18763
FG
236 out << p.first << '=' << ccap_string(p.second);
237 first = false;
7c673cae
FG
238 }
239 out << '}';
240 }
241
242 if (in.get_num_ref()) {
243 out << " |";
244 in.print_pin_set(out);
245 }
246
247 if (in.inode.export_pin != MDS_RANK_NONE) {
248 out << " export_pin=" << in.inode.export_pin;
249 }
250
251 out << " " << &in;
252 out << "]";
253 return out;
254}
255
256ostream& operator<<(ostream& out, const CInode::scrub_stamp_info_t& si)
257{
258 out << "{scrub_start_version: " << si.scrub_start_version
259 << ", scrub_start_stamp: " << si.scrub_start_stamp
260 << ", last_scrub_version: " << si.last_scrub_version
261 << ", last_scrub_stamp: " << si.last_scrub_stamp;
262 return out;
263}
264
265
266
267void CInode::print(ostream& out)
268{
269 out << *this;
270}
271
272
273
274void CInode::add_need_snapflush(CInode *snapin, snapid_t snapid, client_t client)
275{
276 dout(10) << "add_need_snapflush client." << client << " snapid " << snapid << " on " << snapin << dendl;
277
278 if (client_need_snapflush.empty()) {
279 get(CInode::PIN_NEEDSNAPFLUSH);
280
281 // FIXME: this is non-optimal, as we'll block freezes/migrations for potentially
282 // long periods waiting for clients to flush their snaps.
283 auth_pin(this); // pin head inode...
284 }
285
94b18763 286 auto &clients = client_need_snapflush[snapid];
7c673cae
FG
287 if (clients.empty())
288 snapin->auth_pin(this); // ...and pin snapped/old inode!
289
290 clients.insert(client);
291}
292
293void CInode::remove_need_snapflush(CInode *snapin, snapid_t snapid, client_t client)
294{
94b18763
FG
295 dout(10) << __func__ << " client." << client << " snapid " << snapid << " on " << snapin << dendl;
296 auto it = client_need_snapflush.find(snapid);
297 if (it == client_need_snapflush.end()) {
7c673cae
FG
298 dout(10) << " snapid not found" << dendl;
299 return;
300 }
94b18763
FG
301 size_t n = it->second.erase(client);
302 if (n == 0) {
7c673cae
FG
303 dout(10) << " client not found" << dendl;
304 return;
305 }
94b18763
FG
306 if (it->second.empty()) {
307 client_need_snapflush.erase(it);
7c673cae
FG
308 snapin->auth_unpin(this);
309
310 if (client_need_snapflush.empty()) {
311 put(CInode::PIN_NEEDSNAPFLUSH);
312 auth_unpin(this);
313 }
314 }
315}
316
317bool CInode::split_need_snapflush(CInode *cowin, CInode *in)
318{
319 dout(10) << "split_need_snapflush [" << cowin->first << "," << cowin->last << "] for " << *cowin << dendl;
320 bool need_flush = false;
94b18763
FG
321 for (auto it = client_need_snapflush.lower_bound(cowin->first);
322 it != client_need_snapflush.end() && it->first < in->first; ) {
323 assert(!it->second.empty());
324 if (cowin->last >= it->first) {
7c673cae
FG
325 cowin->auth_pin(this);
326 need_flush = true;
94b18763
FG
327 ++it;
328 } else {
329 it = client_need_snapflush.erase(it);
330 }
7c673cae
FG
331 in->auth_unpin(this);
332 }
333 return need_flush;
334}
335
336void CInode::mark_dirty_rstat()
337{
338 if (!state_test(STATE_DIRTYRSTAT)) {
339 dout(10) << "mark_dirty_rstat" << dendl;
340 state_set(STATE_DIRTYRSTAT);
341 get(PIN_DIRTYRSTAT);
224ce89b
WB
342 CDentry *pdn = get_projected_parent_dn();
343 if (pdn->is_auth()) {
344 CDir *pdir = pdn->dir;
345 pdir->dirty_rstat_inodes.push_back(&dirty_rstat_item);
346 mdcache->mds->locker->mark_updated_scatterlock(&pdir->inode->nestlock);
347 } else {
348 // under cross-MDS rename.
349 // DIRTYRSTAT flag will get cleared when rename finishes
350 assert(state_test(STATE_AMBIGUOUSAUTH));
351 }
7c673cae
FG
352 }
353}
354void CInode::clear_dirty_rstat()
355{
356 if (state_test(STATE_DIRTYRSTAT)) {
357 dout(10) << "clear_dirty_rstat" << dendl;
358 state_clear(STATE_DIRTYRSTAT);
359 put(PIN_DIRTYRSTAT);
360 dirty_rstat_item.remove_myself();
361 }
362}
363
94b18763
FG
364/* Ideally this function would be subsumed by project_inode but it is also
365 * needed by CInode::project_past_snaprealm_parent so we keep it.
366 */
367sr_t &CInode::project_snaprealm(projected_inode &pi)
7c673cae 368{
94b18763
FG
369 const sr_t *cur_srnode = get_projected_srnode();
370
371 assert(!pi.snapnode);
372 if (cur_srnode) {
373 pi.snapnode.reset(new sr_t(*cur_srnode));
7c673cae 374 } else {
94b18763
FG
375 pi.snapnode.reset(new sr_t());
376 pi.snapnode->created = 0;
377 pi.snapnode->current_parent_since = get_oldest_snap();
7c673cae 378 }
94b18763 379 ++num_projected_srnodes;
7c673cae 380
94b18763
FG
381 dout(10) << __func__ << " " << pi.snapnode.get() << dendl;
382 return *pi.snapnode.get();
383}
7c673cae 384
94b18763
FG
385CInode::projected_inode &CInode::project_inode(bool xattr, bool snap)
386{
387 if (projected_nodes.empty()) {
388 projected_nodes.emplace_back(inode);
389 } else {
390 projected_nodes.emplace_back(projected_nodes.back().inode);
7c673cae 391 }
94b18763 392 auto &pi = projected_nodes.back();
7c673cae
FG
393
394 if (scrub_infop && scrub_infop->last_scrub_dirty) {
94b18763
FG
395 pi.inode.last_scrub_stamp = scrub_infop->last_scrub_stamp;
396 pi.inode.last_scrub_version = scrub_infop->last_scrub_version;
7c673cae
FG
397 scrub_infop->last_scrub_dirty = false;
398 scrub_maybe_delete_info();
399 }
94b18763
FG
400
401 if (xattr) {
402 pi.xattrs.reset(new mempool_xattr_map(*get_projected_xattrs()));
403 ++num_projected_xattrs;
404 }
405
406 if (snap) {
407 project_snaprealm(pi);
408 }
409
410 dout(15) << __func__ << " " << pi.inode.ino << dendl;
411 return pi;
7c673cae
FG
412}
413
414void CInode::pop_and_dirty_projected_inode(LogSegment *ls)
415{
416 assert(!projected_nodes.empty());
94b18763
FG
417 auto &front = projected_nodes.front();
418 dout(15) << __func__ << " " << front.inode.ino
419 << " v" << front.inode.version << dendl;
7c673cae
FG
420 int64_t old_pool = inode.layout.pool_id;
421
94b18763
FG
422 mark_dirty(front.inode.version, ls);
423 inode = front.inode;
7c673cae
FG
424
425 if (inode.is_backtrace_updated())
28e407b8 426 mark_dirty_parent(ls, old_pool != inode.layout.pool_id);
7c673cae 427
94b18763 428 if (front.xattrs) {
7c673cae 429 --num_projected_xattrs;
94b18763 430 xattrs = *front.xattrs;
7c673cae
FG
431 }
432
94b18763
FG
433 auto &snapnode = front.snapnode;
434 if (snapnode) {
435 pop_projected_snaprealm(snapnode.get());
7c673cae
FG
436 --num_projected_srnodes;
437 }
438
7c673cae
FG
439 projected_nodes.pop_front();
440}
441
7c673cae
FG
442/* if newparent != parent, add parent to past_parents
443 if parent DNE, we need to find what the parent actually is and fill that in */
444void CInode::project_past_snaprealm_parent(SnapRealm *newparent)
445{
94b18763
FG
446 assert(!projected_nodes.empty());
447 sr_t &new_snap = project_snaprealm(projected_nodes.back());
7c673cae
FG
448 SnapRealm *oldparent;
449 if (!snaprealm) {
450 oldparent = find_snaprealm();
94b18763 451 new_snap.seq = oldparent->get_newest_seq();
7c673cae
FG
452 }
453 else
454 oldparent = snaprealm->parent;
455
456 if (newparent != oldparent) {
457 snapid_t oldparentseq = oldparent->get_newest_seq();
94b18763
FG
458 if (oldparentseq + 1 > new_snap.current_parent_since) {
459 new_snap.past_parents[oldparentseq].ino = oldparent->inode->ino();
460 new_snap.past_parents[oldparentseq].first = new_snap.current_parent_since;
7c673cae 461 }
94b18763 462 new_snap.current_parent_since = std::max(oldparentseq, newparent->get_last_created()) + 1;
7c673cae
FG
463 }
464}
465
466void CInode::pop_projected_snaprealm(sr_t *next_snaprealm)
467{
468 assert(next_snaprealm);
469 dout(10) << "pop_projected_snaprealm " << next_snaprealm
470 << " seq" << next_snaprealm->seq << dendl;
471 bool invalidate_cached_snaps = false;
472 if (!snaprealm) {
473 open_snaprealm();
474 } else if (next_snaprealm->past_parents.size() !=
475 snaprealm->srnode.past_parents.size()) {
476 invalidate_cached_snaps = true;
477 // re-open past parents
478 snaprealm->_close_parents();
479
480 dout(10) << " realm " << *snaprealm << " past_parents " << snaprealm->srnode.past_parents
481 << " -> " << next_snaprealm->past_parents << dendl;
482 }
483 snaprealm->srnode = *next_snaprealm;
7c673cae
FG
484
485 // we should be able to open these up (or have them already be open).
486 bool ok = snaprealm->_open_parents(NULL);
487 assert(ok);
488
489 if (invalidate_cached_snaps)
490 snaprealm->invalidate_cached_snaps();
491
492 if (snaprealm->parent)
493 dout(10) << " realm " << *snaprealm << " parent " << *snaprealm->parent << dendl;
494}
495
496
497// ====== CInode =======
498
499// dirfrags
500
94b18763 501__u32 InodeStoreBase::hash_dentry_name(boost::string_view dn)
7c673cae
FG
502{
503 int which = inode.dir_layout.dl_dir_hash;
504 if (!which)
505 which = CEPH_STR_HASH_LINUX;
506 assert(ceph_str_hash_valid(which));
507 return ceph_str_hash(which, dn.data(), dn.length());
508}
509
94b18763 510frag_t InodeStoreBase::pick_dirfrag(boost::string_view dn)
7c673cae
FG
511{
512 if (dirfragtree.empty())
513 return frag_t(); // avoid the string hash if we can.
514
515 __u32 h = hash_dentry_name(dn);
516 return dirfragtree[h];
517}
518
519bool CInode::get_dirfrags_under(frag_t fg, list<CDir*>& ls)
520{
521 bool all = true;
94b18763 522 std::list<frag_t> fglist;
7c673cae
FG
523 dirfragtree.get_leaves_under(fg, fglist);
524 for (list<frag_t>::iterator p = fglist.begin(); p != fglist.end(); ++p)
525 if (dirfrags.count(*p))
526 ls.push_back(dirfrags[*p]);
527 else
528 all = false;
529
530 if (all)
531 return all;
532
533 fragtree_t tmpdft;
534 tmpdft.force_to_leaf(g_ceph_context, fg);
94b18763
FG
535 for (auto &p : dirfrags) {
536 tmpdft.force_to_leaf(g_ceph_context, p.first);
537 if (fg.contains(p.first) && !dirfragtree.is_leaf(p.first))
538 ls.push_back(p.second);
7c673cae
FG
539 }
540
541 all = true;
542 tmpdft.get_leaves_under(fg, fglist);
94b18763
FG
543 for (const auto &p : fglist) {
544 if (!dirfrags.count(p)) {
7c673cae
FG
545 all = false;
546 break;
547 }
94b18763 548 }
7c673cae
FG
549
550 return all;
551}
552
553void CInode::verify_dirfrags()
554{
555 bool bad = false;
94b18763
FG
556 for (const auto &p : dirfrags) {
557 if (!dirfragtree.is_leaf(p.first)) {
558 dout(0) << "have open dirfrag " << p.first << " but not leaf in " << dirfragtree
559 << ": " << *p.second << dendl;
7c673cae
FG
560 bad = true;
561 }
562 }
563 assert(!bad);
564}
565
566void CInode::force_dirfrags()
567{
568 bool bad = false;
94b18763
FG
569 for (auto &p : dirfrags) {
570 if (!dirfragtree.is_leaf(p.first)) {
571 dout(0) << "have open dirfrag " << p.first << " but not leaf in " << dirfragtree
572 << ": " << *p.second << dendl;
7c673cae
FG
573 bad = true;
574 }
575 }
576
577 if (bad) {
578 list<frag_t> leaves;
579 dirfragtree.get_leaves(leaves);
580 for (list<frag_t>::iterator p = leaves.begin(); p != leaves.end(); ++p)
581 mdcache->get_force_dirfrag(dirfrag_t(ino(),*p), true);
582 }
583
584 verify_dirfrags();
585}
586
587CDir *CInode::get_approx_dirfrag(frag_t fg)
588{
589 CDir *dir = get_dirfrag(fg);
590 if (dir) return dir;
591
592 // find a child?
593 list<CDir*> ls;
594 get_dirfrags_under(fg, ls);
595 if (!ls.empty())
596 return ls.front();
597
598 // try parents?
599 while (fg.bits() > 0) {
600 fg = fg.parent();
601 dir = get_dirfrag(fg);
602 if (dir) return dir;
603 }
604 return NULL;
605}
606
94b18763 607void CInode::get_dirfrags(std::list<CDir*>& ls)
7c673cae
FG
608{
609 // all dirfrags
94b18763
FG
610 for (const auto &p : dirfrags) {
611 ls.push_back(p.second);
612 }
7c673cae
FG
613}
614void CInode::get_nested_dirfrags(list<CDir*>& ls)
615{
616 // dirfrags in same subtree
94b18763
FG
617 for (const auto &p : dirfrags) {
618 if (!p.second->is_subtree_root())
619 ls.push_back(p.second);
620 }
7c673cae
FG
621}
622void CInode::get_subtree_dirfrags(list<CDir*>& ls)
623{
624 // dirfrags that are roots of new subtrees
94b18763
FG
625 for (const auto &p : dirfrags) {
626 if (p.second->is_subtree_root())
627 ls.push_back(p.second);
628 }
7c673cae
FG
629}
630
631
632CDir *CInode::get_or_open_dirfrag(MDCache *mdcache, frag_t fg)
633{
634 assert(is_dir());
635
636 // have it?
637 CDir *dir = get_dirfrag(fg);
638 if (!dir) {
639 // create it.
640 assert(is_auth() || mdcache->mds->is_any_replay());
641 dir = new CDir(this, fg, mdcache, is_auth());
642 add_dirfrag(dir);
643 }
644 return dir;
645}
646
647CDir *CInode::add_dirfrag(CDir *dir)
648{
649 assert(dirfrags.count(dir->dirfrag().frag) == 0);
650 dirfrags[dir->dirfrag().frag] = dir;
651
652 if (stickydir_ref > 0) {
653 dir->state_set(CDir::STATE_STICKY);
654 dir->get(CDir::PIN_STICKY);
655 }
656
657 maybe_export_pin();
658
659 return dir;
660}
661
662void CInode::close_dirfrag(frag_t fg)
663{
664 dout(14) << "close_dirfrag " << fg << dendl;
665 assert(dirfrags.count(fg));
666
667 CDir *dir = dirfrags[fg];
668 dir->remove_null_dentries();
669
670 // clear dirty flag
671 if (dir->is_dirty())
672 dir->mark_clean();
673
674 if (stickydir_ref > 0) {
675 dir->state_clear(CDir::STATE_STICKY);
676 dir->put(CDir::PIN_STICKY);
677 }
1adf2230
AA
678
679 if (dir->is_subtree_root())
680 num_subtree_roots--;
7c673cae
FG
681
682 // dump any remaining dentries, for debugging purposes
94b18763
FG
683 for (const auto &p : dir->items)
684 dout(14) << __func__ << " LEFTOVER dn " << *p.second << dendl;
7c673cae
FG
685
686 assert(dir->get_num_ref() == 0);
687 delete dir;
688 dirfrags.erase(fg);
689}
690
691void CInode::close_dirfrags()
692{
693 while (!dirfrags.empty())
694 close_dirfrag(dirfrags.begin()->first);
695}
696
697bool CInode::has_subtree_root_dirfrag(int auth)
698{
1adf2230
AA
699 if (num_subtree_roots > 0) {
700 if (auth == -1)
7c673cae 701 return true;
1adf2230
AA
702 for (const auto &p : dirfrags) {
703 if (p.second->is_subtree_root() &&
704 p.second->dir_auth.first == auth)
705 return true;
706 }
94b18763 707 }
7c673cae
FG
708 return false;
709}
710
711bool CInode::has_subtree_or_exporting_dirfrag()
712{
1adf2230
AA
713 if (num_subtree_roots > 0 || num_exporting_dirs > 0)
714 return true;
7c673cae
FG
715 return false;
716}
717
718void CInode::get_stickydirs()
719{
720 if (stickydir_ref == 0) {
721 get(PIN_STICKYDIRS);
94b18763
FG
722 for (const auto &p : dirfrags) {
723 p.second->state_set(CDir::STATE_STICKY);
724 p.second->get(CDir::PIN_STICKY);
7c673cae
FG
725 }
726 }
727 stickydir_ref++;
728}
729
730void CInode::put_stickydirs()
731{
732 assert(stickydir_ref > 0);
733 stickydir_ref--;
734 if (stickydir_ref == 0) {
735 put(PIN_STICKYDIRS);
94b18763
FG
736 for (const auto &p : dirfrags) {
737 p.second->state_clear(CDir::STATE_STICKY);
738 p.second->put(CDir::PIN_STICKY);
7c673cae
FG
739 }
740 }
741}
742
743
744
745
746
747// pins
748
749void CInode::first_get()
750{
751 // pin my dentry?
752 if (parent)
753 parent->get(CDentry::PIN_INODEPIN);
754}
755
756void CInode::last_put()
757{
758 // unpin my dentry?
759 if (parent)
760 parent->put(CDentry::PIN_INODEPIN);
761}
762
763void CInode::_put()
764{
765 if (get_num_ref() == (int)is_dirty() + (int)is_dirty_parent())
766 mdcache->maybe_eval_stray(this, true);
767}
768
769void CInode::add_remote_parent(CDentry *p)
770{
771 if (remote_parents.empty())
772 get(PIN_REMOTEPARENT);
773 remote_parents.insert(p);
774}
775void CInode::remove_remote_parent(CDentry *p)
776{
777 remote_parents.erase(p);
778 if (remote_parents.empty())
779 put(PIN_REMOTEPARENT);
780}
781
782
783
784
785CDir *CInode::get_parent_dir()
786{
787 if (parent)
788 return parent->dir;
789 return NULL;
790}
791CDir *CInode::get_projected_parent_dir()
792{
793 CDentry *p = get_projected_parent_dn();
794 if (p)
795 return p->dir;
796 return NULL;
797}
798CInode *CInode::get_parent_inode()
799{
800 if (parent)
801 return parent->dir->inode;
802 return NULL;
803}
804
805bool CInode::is_projected_ancestor_of(CInode *other)
806{
807 while (other) {
808 if (other == this)
809 return true;
810 if (!other->get_projected_parent_dn())
811 break;
812 other = other->get_projected_parent_dn()->get_dir()->get_inode();
813 }
814 return false;
815}
816
817/*
818 * Because a non-directory inode may have multiple links, the use_parent
819 * argument allows selecting which parent to use for path construction. This
820 * argument is only meaningful for the final component (i.e. the first of the
821 * nested calls) because directories cannot have multiple hard links. If
822 * use_parent is NULL and projected is true, the primary parent's projected
823 * inode is used all the way up the path chain. Otherwise the primary parent
824 * stable inode is used.
825 */
826void CInode::make_path_string(string& s, bool projected, const CDentry *use_parent) const
827{
828 if (!use_parent) {
829 use_parent = projected ? get_projected_parent_dn() : parent;
830 }
831
832 if (use_parent) {
833 use_parent->make_path_string(s, projected);
834 } else if (is_root()) {
835 s = "";
836 } else if (is_mdsdir()) {
837 char t[40];
838 uint64_t eino(ino());
839 eino -= MDS_INO_MDSDIR_OFFSET;
840 snprintf(t, sizeof(t), "~mds%" PRId64, eino);
841 s = t;
842 } else {
843 char n[40];
844 uint64_t eino(ino());
845 snprintf(n, sizeof(n), "#%" PRIx64, eino);
846 s += n;
847 }
848}
849
850void CInode::make_path(filepath& fp, bool projected) const
851{
852 const CDentry *use_parent = projected ? get_projected_parent_dn() : parent;
853 if (use_parent) {
854 assert(!is_base());
855 use_parent->make_path(fp, projected);
856 } else {
857 fp = filepath(ino());
858 }
859}
860
861void CInode::name_stray_dentry(string& dname)
862{
863 char s[20];
864 snprintf(s, sizeof(s), "%llx", (unsigned long long)inode.ino.val);
865 dname = s;
866}
867
868version_t CInode::pre_dirty()
869{
870 version_t pv;
871 CDentry* _cdentry = get_projected_parent_dn();
872 if (_cdentry) {
873 pv = _cdentry->pre_dirty(get_projected_version());
874 dout(10) << "pre_dirty " << pv << " (current v " << inode.version << ")" << dendl;
875 } else {
876 assert(is_base());
877 pv = get_projected_version() + 1;
878 }
94b18763 879 // force update backtrace for old format inode (see mempool_inode::decode)
7c673cae 880 if (inode.backtrace_version == 0 && !projected_nodes.empty()) {
94b18763
FG
881 mempool_inode &pi = projected_nodes.back().inode;
882 if (pi.backtrace_version == 0)
883 pi.update_backtrace(pv);
7c673cae
FG
884 }
885 return pv;
886}
887
888void CInode::_mark_dirty(LogSegment *ls)
889{
890 if (!state_test(STATE_DIRTY)) {
891 state_set(STATE_DIRTY);
892 get(PIN_DIRTY);
893 assert(ls);
894 }
895
896 // move myself to this segment's dirty list
897 if (ls)
898 ls->dirty_inodes.push_back(&item_dirty);
899}
900
901void CInode::mark_dirty(version_t pv, LogSegment *ls) {
902
903 dout(10) << "mark_dirty " << *this << dendl;
904
905 /*
906 NOTE: I may already be dirty, but this fn _still_ needs to be called so that
907 the directory is (perhaps newly) dirtied, and so that parent_dir_version is
908 updated below.
909 */
910
911 // only auth can get dirty. "dirty" async data in replicas is relative to
912 // filelock state, not the dirty flag.
913 assert(is_auth());
914
915 // touch my private version
916 assert(inode.version < pv);
917 inode.version = pv;
918 _mark_dirty(ls);
919
920 // mark dentry too
921 if (parent)
922 parent->mark_dirty(pv, ls);
923}
924
925
926void CInode::mark_clean()
927{
928 dout(10) << " mark_clean " << *this << dendl;
929 if (state_test(STATE_DIRTY)) {
930 state_clear(STATE_DIRTY);
931 put(PIN_DIRTY);
932
933 // remove myself from ls dirty list
934 item_dirty.remove_myself();
935 }
936}
937
938
939// --------------
940// per-inode storage
941// (currently for root inode only)
942
943struct C_IO_Inode_Stored : public CInodeIOContext {
944 version_t version;
945 Context *fin;
946 C_IO_Inode_Stored(CInode *i, version_t v, Context *f) : CInodeIOContext(i), version(v), fin(f) {}
947 void finish(int r) override {
948 in->_stored(r, version, fin);
949 }
950};
951
952object_t InodeStoreBase::get_object_name(inodeno_t ino, frag_t fg, const char *suffix)
953{
954 char n[60];
955 snprintf(n, sizeof(n), "%llx.%08llx%s", (long long unsigned)ino, (long long unsigned)fg, suffix ? suffix : "");
956 return object_t(n);
957}
958
959void CInode::store(MDSInternalContextBase *fin)
960{
961 dout(10) << "store " << get_version() << dendl;
962 assert(is_base());
963
964 if (snaprealm)
965 purge_stale_snap_data(snaprealm->get_snaps());
966
967 // encode
968 bufferlist bl;
969 string magic = CEPH_FS_ONDISK_MAGIC;
970 ::encode(magic, bl);
971 encode_store(bl, mdcache->mds->mdsmap->get_up_features());
972
973 // write it.
974 SnapContext snapc;
975 ObjectOperation m;
976 m.write_full(bl);
977
978 object_t oid = CInode::get_object_name(ino(), frag_t(), ".inode");
979 object_locator_t oloc(mdcache->mds->mdsmap->get_metadata_pool());
980
981 Context *newfin =
982 new C_OnFinisher(new C_IO_Inode_Stored(this, get_version(), fin),
983 mdcache->mds->finisher);
984 mdcache->mds->objecter->mutate(oid, oloc, m, snapc,
985 ceph::real_clock::now(), 0,
986 newfin);
987}
988
989void CInode::_stored(int r, version_t v, Context *fin)
990{
991 if (r < 0) {
992 dout(1) << "store error " << r << " v " << v << " on " << *this << dendl;
d2e6a577
FG
993 mdcache->mds->clog->error() << "failed to store inode " << ino()
994 << " object: " << cpp_strerror(r);
7c673cae
FG
995 mdcache->mds->handle_write_error(r);
996 fin->complete(r);
997 return;
998 }
999
1000 dout(10) << "_stored " << v << " on " << *this << dendl;
1001 if (v == get_projected_version())
1002 mark_clean();
1003
1004 fin->complete(0);
1005}
1006
1007void CInode::flush(MDSInternalContextBase *fin)
1008{
1009 dout(10) << "flush " << *this << dendl;
1010 assert(is_auth() && can_auth_pin());
1011
1012 MDSGatherBuilder gather(g_ceph_context);
1013
1014 if (is_dirty_parent()) {
1015 store_backtrace(gather.new_sub());
1016 }
1017 if (is_dirty()) {
1018 if (is_base()) {
1019 store(gather.new_sub());
1020 } else {
1021 parent->dir->commit(0, gather.new_sub());
1022 }
1023 }
1024
1025 if (gather.has_subs()) {
1026 gather.set_finisher(fin);
1027 gather.activate();
1028 } else {
1029 fin->complete(0);
1030 }
1031}
1032
1033struct C_IO_Inode_Fetched : public CInodeIOContext {
1034 bufferlist bl, bl2;
1035 Context *fin;
1036 C_IO_Inode_Fetched(CInode *i, Context *f) : CInodeIOContext(i), fin(f) {}
1037 void finish(int r) override {
1038 // Ignore 'r', because we fetch from two places, so r is usually ENOENT
1039 in->_fetched(bl, bl2, fin);
1040 }
1041};
1042
1043void CInode::fetch(MDSInternalContextBase *fin)
1044{
1045 dout(10) << "fetch" << dendl;
1046
1047 C_IO_Inode_Fetched *c = new C_IO_Inode_Fetched(this, fin);
1048 C_GatherBuilder gather(g_ceph_context, new C_OnFinisher(c, mdcache->mds->finisher));
1049
1050 object_t oid = CInode::get_object_name(ino(), frag_t(), "");
1051 object_locator_t oloc(mdcache->mds->mdsmap->get_metadata_pool());
1052
1053 // Old on-disk format: inode stored in xattr of a dirfrag
1054 ObjectOperation rd;
1055 rd.getxattr("inode", &c->bl, NULL);
1056 mdcache->mds->objecter->read(oid, oloc, rd, CEPH_NOSNAP, (bufferlist*)NULL, 0, gather.new_sub());
1057
1058 // Current on-disk format: inode stored in a .inode object
1059 object_t oid2 = CInode::get_object_name(ino(), frag_t(), ".inode");
1060 mdcache->mds->objecter->read(oid2, oloc, 0, 0, CEPH_NOSNAP, &c->bl2, 0, gather.new_sub());
1061
1062 gather.activate();
1063}
1064
1065void CInode::_fetched(bufferlist& bl, bufferlist& bl2, Context *fin)
1066{
1067 dout(10) << "_fetched got " << bl.length() << " and " << bl2.length() << dendl;
1068 bufferlist::iterator p;
1069 if (bl2.length()) {
1070 p = bl2.begin();
1071 } else if (bl.length()) {
1072 p = bl.begin();
1073 } else {
d2e6a577 1074 derr << "No data while reading inode " << ino() << dendl;
7c673cae
FG
1075 fin->complete(-ENOENT);
1076 return;
1077 }
1078
1079 // Attempt decode
1080 try {
1081 string magic;
1082 ::decode(magic, p);
1083 dout(10) << " magic is '" << magic << "' (expecting '"
1084 << CEPH_FS_ONDISK_MAGIC << "')" << dendl;
1085 if (magic != CEPH_FS_ONDISK_MAGIC) {
1086 dout(0) << "on disk magic '" << magic << "' != my magic '" << CEPH_FS_ONDISK_MAGIC
1087 << "'" << dendl;
1088 fin->complete(-EINVAL);
1089 } else {
1090 decode_store(p);
1091 dout(10) << "_fetched " << *this << dendl;
1092 fin->complete(0);
1093 }
1094 } catch (buffer::error &err) {
d2e6a577 1095 derr << "Corrupt inode " << ino() << ": " << err << dendl;
7c673cae
FG
1096 fin->complete(-EINVAL);
1097 return;
1098 }
1099}
1100
1101void CInode::build_backtrace(int64_t pool, inode_backtrace_t& bt)
1102{
1103 bt.ino = inode.ino;
1104 bt.ancestors.clear();
1105 bt.pool = pool;
1106
1107 CInode *in = this;
1108 CDentry *pdn = get_parent_dn();
1109 while (pdn) {
1110 CInode *diri = pdn->get_dir()->get_inode();
94b18763 1111 bt.ancestors.push_back(inode_backpointer_t(diri->ino(), pdn->get_name(), in->inode.version));
7c673cae
FG
1112 in = diri;
1113 pdn = in->get_parent_dn();
1114 }
94b18763 1115 for (auto &p : inode.old_pools) {
7c673cae 1116 // don't add our own pool id to old_pools to avoid looping (e.g. setlayout 0, 1, 0)
94b18763
FG
1117 if (p != pool)
1118 bt.old_pools.insert(p);
7c673cae
FG
1119 }
1120}
1121
1122struct C_IO_Inode_StoredBacktrace : public CInodeIOContext {
1123 version_t version;
1124 Context *fin;
1125 C_IO_Inode_StoredBacktrace(CInode *i, version_t v, Context *f) : CInodeIOContext(i), version(v), fin(f) {}
1126 void finish(int r) override {
1127 in->_stored_backtrace(r, version, fin);
1128 }
1129};
1130
1131void CInode::store_backtrace(MDSInternalContextBase *fin, int op_prio)
1132{
1133 dout(10) << "store_backtrace on " << *this << dendl;
1134 assert(is_dirty_parent());
1135
1136 if (op_prio < 0)
1137 op_prio = CEPH_MSG_PRIO_DEFAULT;
1138
1139 auth_pin(this);
1140
1141 const int64_t pool = get_backtrace_pool();
1142 inode_backtrace_t bt;
1143 build_backtrace(pool, bt);
1144 bufferlist parent_bl;
1145 ::encode(bt, parent_bl);
1146
1147 ObjectOperation op;
1148 op.priority = op_prio;
1149 op.create(false);
1150 op.setxattr("parent", parent_bl);
1151
1152 bufferlist layout_bl;
1153 ::encode(inode.layout, layout_bl, mdcache->mds->mdsmap->get_up_features());
1154 op.setxattr("layout", layout_bl);
1155
1156 SnapContext snapc;
1157 object_t oid = get_object_name(ino(), frag_t(), "");
1158 object_locator_t oloc(pool);
1159 Context *fin2 = new C_OnFinisher(
1160 new C_IO_Inode_StoredBacktrace(this, inode.backtrace_version, fin),
1161 mdcache->mds->finisher);
1162
1163 if (!state_test(STATE_DIRTYPOOL) || inode.old_pools.empty()) {
1164 dout(20) << __func__ << ": no dirtypool or no old pools" << dendl;
1165 mdcache->mds->objecter->mutate(oid, oloc, op, snapc,
1166 ceph::real_clock::now(),
1167 0, fin2);
1168 return;
1169 }
1170
1171 C_GatherBuilder gather(g_ceph_context, fin2);
1172 mdcache->mds->objecter->mutate(oid, oloc, op, snapc,
1173 ceph::real_clock::now(),
1174 0, gather.new_sub());
1175
1176 // In the case where DIRTYPOOL is set, we update all old pools backtraces
1177 // such that anyone reading them will see the new pool ID in
1178 // inode_backtrace_t::pool and go read everything else from there.
94b18763
FG
1179 for (const auto &p : inode.old_pools) {
1180 if (p == pool)
7c673cae
FG
1181 continue;
1182
94b18763 1183 dout(20) << __func__ << ": updating old pool " << p << dendl;
7c673cae
FG
1184
1185 ObjectOperation op;
1186 op.priority = op_prio;
1187 op.create(false);
1188 op.setxattr("parent", parent_bl);
1189
94b18763 1190 object_locator_t oloc(p);
7c673cae
FG
1191 mdcache->mds->objecter->mutate(oid, oloc, op, snapc,
1192 ceph::real_clock::now(),
1193 0, gather.new_sub());
1194 }
1195 gather.activate();
1196}
1197
1198void CInode::_stored_backtrace(int r, version_t v, Context *fin)
1199{
1200 if (r == -ENOENT) {
1201 const int64_t pool = get_backtrace_pool();
1202 bool exists = mdcache->mds->objecter->with_osdmap(
1203 [pool](const OSDMap &osd_map) {
1204 return osd_map.have_pg_pool(pool);
1205 });
1206
1207 // This ENOENT is because the pool doesn't exist (the user deleted it
1208 // out from under us), so the backtrace can never be written, so pretend
1209 // to succeed so that the user can proceed to e.g. delete the file.
1210 if (!exists) {
1211 dout(4) << "store_backtrace got ENOENT: a data pool was deleted "
1212 "beneath us!" << dendl;
1213 r = 0;
1214 }
1215 }
1216
1217 if (r < 0) {
1218 dout(1) << "store backtrace error " << r << " v " << v << dendl;
1219 mdcache->mds->clog->error() << "failed to store backtrace on ino "
1220 << ino() << " object"
1221 << ", pool " << get_backtrace_pool()
1222 << ", errno " << r;
1223 mdcache->mds->handle_write_error(r);
1224 if (fin)
1225 fin->complete(r);
1226 return;
1227 }
1228
1229 dout(10) << "_stored_backtrace v " << v << dendl;
1230
1231 auth_unpin(this);
1232 if (v == inode.backtrace_version)
1233 clear_dirty_parent();
1234 if (fin)
1235 fin->complete(0);
1236}
1237
1238void CInode::fetch_backtrace(Context *fin, bufferlist *backtrace)
1239{
1240 mdcache->fetch_backtrace(inode.ino, get_backtrace_pool(), *backtrace, fin);
1241}
1242
28e407b8 1243void CInode::mark_dirty_parent(LogSegment *ls, bool dirty_pool)
7c673cae
FG
1244{
1245 if (!state_test(STATE_DIRTYPARENT)) {
1246 dout(10) << "mark_dirty_parent" << dendl;
1247 state_set(STATE_DIRTYPARENT);
1248 get(PIN_DIRTYPARENT);
1249 assert(ls);
1250 }
1251 if (dirty_pool)
1252 state_set(STATE_DIRTYPOOL);
1253 if (ls)
1254 ls->dirty_parent_inodes.push_back(&item_dirty_parent);
1255}
1256
1257void CInode::clear_dirty_parent()
1258{
1259 if (state_test(STATE_DIRTYPARENT)) {
1260 dout(10) << "clear_dirty_parent" << dendl;
1261 state_clear(STATE_DIRTYPARENT);
1262 state_clear(STATE_DIRTYPOOL);
1263 put(PIN_DIRTYPARENT);
1264 item_dirty_parent.remove_myself();
1265 }
1266}
1267
1268void CInode::verify_diri_backtrace(bufferlist &bl, int err)
1269{
1270 if (is_base() || is_dirty_parent() || !is_auth())
1271 return;
1272
1273 dout(10) << "verify_diri_backtrace" << dendl;
1274
1275 if (err == 0) {
1276 inode_backtrace_t backtrace;
1277 ::decode(backtrace, bl);
1278 CDentry *pdn = get_parent_dn();
1279 if (backtrace.ancestors.empty() ||
94b18763 1280 backtrace.ancestors[0].dname != pdn->get_name() ||
7c673cae
FG
1281 backtrace.ancestors[0].dirino != pdn->get_dir()->ino())
1282 err = -EINVAL;
1283 }
1284
1285 if (err) {
1286 MDSRank *mds = mdcache->mds;
d2e6a577 1287 mds->clog->error() << "bad backtrace on directory inode " << ino();
7c673cae
FG
1288 assert(!"bad backtrace" == (g_conf->mds_verify_backtrace > 1));
1289
28e407b8 1290 mark_dirty_parent(mds->mdlog->get_current_segment(), false);
7c673cae
FG
1291 mds->mdlog->flush();
1292 }
1293}
1294
1295// ------------------
1296// parent dir
1297
1298
1299void InodeStoreBase::encode_bare(bufferlist &bl, uint64_t features,
1300 const bufferlist *snap_blob) const
1301{
1302 ::encode(inode, bl, features);
1303 if (is_symlink())
1304 ::encode(symlink, bl);
1305 ::encode(dirfragtree, bl);
1306 ::encode(xattrs, bl);
1307 if (snap_blob)
1308 ::encode(*snap_blob, bl);
1309 else
1310 ::encode(bufferlist(), bl);
1311 ::encode(old_inodes, bl, features);
1312 ::encode(oldest_snap, bl);
1313 ::encode(damage_flags, bl);
1314}
1315
1316void InodeStoreBase::encode(bufferlist &bl, uint64_t features,
1317 const bufferlist *snap_blob) const
1318{
1319 ENCODE_START(6, 4, bl);
1320 encode_bare(bl, features, snap_blob);
1321 ENCODE_FINISH(bl);
1322}
1323
1324void CInode::encode_store(bufferlist& bl, uint64_t features)
1325{
1326 bufferlist snap_blob;
1327 encode_snap_blob(snap_blob);
1328 InodeStoreBase::encode(bl, mdcache->mds->mdsmap->get_up_features(),
1329 &snap_blob);
1330}
1331
1332void InodeStoreBase::decode_bare(bufferlist::iterator &bl,
1333 bufferlist& snap_blob, __u8 struct_v)
1334{
1335 ::decode(inode, bl);
94b18763
FG
1336 if (is_symlink()) {
1337 std::string tmp;
1338 ::decode(tmp, bl);
1339 symlink = mempool::mds_co::string(boost::string_view(tmp));
1340 }
7c673cae
FG
1341 ::decode(dirfragtree, bl);
1342 ::decode(xattrs, bl);
1343 ::decode(snap_blob, bl);
1344
1345 ::decode(old_inodes, bl);
1346 if (struct_v == 2 && inode.is_dir()) {
1347 bool default_layout_exists;
1348 ::decode(default_layout_exists, bl);
1349 if (default_layout_exists) {
1350 ::decode(struct_v, bl); // this was a default_file_layout
1351 ::decode(inode.layout, bl); // but we only care about the layout portion
1352 }
1353 }
1354
1355 if (struct_v >= 5) {
1356 // InodeStore is embedded in dentries without proper versioning, so
1357 // we consume up to the end of the buffer
1358 if (!bl.end()) {
1359 ::decode(oldest_snap, bl);
1360 }
1361
1362 if (!bl.end()) {
1363 ::decode(damage_flags, bl);
1364 }
1365 }
1366}
1367
1368
1369void InodeStoreBase::decode(bufferlist::iterator &bl, bufferlist& snap_blob)
1370{
1371 DECODE_START_LEGACY_COMPAT_LEN(5, 4, 4, bl);
1372 decode_bare(bl, snap_blob, struct_v);
1373 DECODE_FINISH(bl);
1374}
1375
1376void CInode::decode_store(bufferlist::iterator& bl)
1377{
1378 bufferlist snap_blob;
1379 InodeStoreBase::decode(bl, snap_blob);
1380 decode_snap_blob(snap_blob);
1381}
1382
1383// ------------------
1384// locking
1385
1386void CInode::set_object_info(MDSCacheObjectInfo &info)
1387{
1388 info.ino = ino();
1389 info.snapid = last;
1390}
1391
1392void CInode::encode_lock_state(int type, bufferlist& bl)
1393{
1394 ::encode(first, bl);
1395
1396 switch (type) {
1397 case CEPH_LOCK_IAUTH:
1398 ::encode(inode.version, bl);
1399 ::encode(inode.ctime, bl);
1400 ::encode(inode.mode, bl);
1401 ::encode(inode.uid, bl);
1402 ::encode(inode.gid, bl);
1403 break;
1404
1405 case CEPH_LOCK_ILINK:
1406 ::encode(inode.version, bl);
1407 ::encode(inode.ctime, bl);
1408 ::encode(inode.nlink, bl);
1409 break;
1410
1411 case CEPH_LOCK_IDFT:
1412 if (is_auth()) {
1413 ::encode(inode.version, bl);
1414 } else {
1415 // treat flushing as dirty when rejoining cache
1416 bool dirty = dirfragtreelock.is_dirty_or_flushing();
1417 ::encode(dirty, bl);
1418 }
1419 {
1420 // encode the raw tree
1421 ::encode(dirfragtree, bl);
1422
1423 // also specify which frags are mine
1424 set<frag_t> myfrags;
1425 list<CDir*> dfls;
1426 get_dirfrags(dfls);
1427 for (list<CDir*>::iterator p = dfls.begin(); p != dfls.end(); ++p)
1428 if ((*p)->is_auth()) {
1429 frag_t fg = (*p)->get_frag();
1430 myfrags.insert(fg);
1431 }
1432 ::encode(myfrags, bl);
1433 }
1434 break;
1435
1436 case CEPH_LOCK_IFILE:
1437 if (is_auth()) {
1438 ::encode(inode.version, bl);
1439 ::encode(inode.ctime, bl);
1440 ::encode(inode.mtime, bl);
1441 ::encode(inode.atime, bl);
1442 ::encode(inode.time_warp_seq, bl);
1443 if (!is_dir()) {
1444 ::encode(inode.layout, bl, mdcache->mds->mdsmap->get_up_features());
1445 ::encode(inode.size, bl);
1446 ::encode(inode.truncate_seq, bl);
1447 ::encode(inode.truncate_size, bl);
1448 ::encode(inode.client_ranges, bl);
1449 ::encode(inode.inline_data, bl);
1450 }
1451 } else {
1452 // treat flushing as dirty when rejoining cache
1453 bool dirty = filelock.is_dirty_or_flushing();
1454 ::encode(dirty, bl);
1455 }
1456
1457 {
1458 dout(15) << "encode_lock_state inode.dirstat is " << inode.dirstat << dendl;
1459 ::encode(inode.dirstat, bl); // only meaningful if i am auth.
1460 bufferlist tmp;
1461 __u32 n = 0;
94b18763
FG
1462 for (const auto &p : dirfrags) {
1463 frag_t fg = p.first;
1464 CDir *dir = p.second;
7c673cae
FG
1465 if (is_auth() || dir->is_auth()) {
1466 fnode_t *pf = dir->get_projected_fnode();
1467 dout(15) << fg << " " << *dir << dendl;
1468 dout(20) << fg << " fragstat " << pf->fragstat << dendl;
1469 dout(20) << fg << " accounted_fragstat " << pf->accounted_fragstat << dendl;
1470 ::encode(fg, tmp);
1471 ::encode(dir->first, tmp);
1472 ::encode(pf->fragstat, tmp);
1473 ::encode(pf->accounted_fragstat, tmp);
1474 n++;
1475 }
1476 }
1477 ::encode(n, bl);
1478 bl.claim_append(tmp);
1479 }
1480 break;
1481
1482 case CEPH_LOCK_INEST:
1483 if (is_auth()) {
1484 ::encode(inode.version, bl);
1485 } else {
1486 // treat flushing as dirty when rejoining cache
1487 bool dirty = nestlock.is_dirty_or_flushing();
1488 ::encode(dirty, bl);
1489 }
1490 {
1491 dout(15) << "encode_lock_state inode.rstat is " << inode.rstat << dendl;
1492 ::encode(inode.rstat, bl); // only meaningful if i am auth.
1493 bufferlist tmp;
1494 __u32 n = 0;
94b18763
FG
1495 for (const auto &p : dirfrags) {
1496 frag_t fg = p.first;
1497 CDir *dir = p.second;
7c673cae
FG
1498 if (is_auth() || dir->is_auth()) {
1499 fnode_t *pf = dir->get_projected_fnode();
1500 dout(10) << fg << " " << *dir << dendl;
1501 dout(10) << fg << " " << pf->rstat << dendl;
1502 dout(10) << fg << " " << pf->rstat << dendl;
1503 dout(10) << fg << " " << dir->dirty_old_rstat << dendl;
1504 ::encode(fg, tmp);
1505 ::encode(dir->first, tmp);
1506 ::encode(pf->rstat, tmp);
1507 ::encode(pf->accounted_rstat, tmp);
1508 ::encode(dir->dirty_old_rstat, tmp);
1509 n++;
1510 }
1511 }
1512 ::encode(n, bl);
1513 bl.claim_append(tmp);
1514 }
1515 break;
1516
1517 case CEPH_LOCK_IXATTR:
1518 ::encode(inode.version, bl);
1519 ::encode(inode.ctime, bl);
1520 ::encode(xattrs, bl);
1521 break;
1522
1523 case CEPH_LOCK_ISNAP:
1524 ::encode(inode.version, bl);
1525 ::encode(inode.ctime, bl);
1526 encode_snap(bl);
1527 break;
1528
1529 case CEPH_LOCK_IFLOCK:
1530 ::encode(inode.version, bl);
1531 _encode_file_locks(bl);
1532 break;
1533
1534 case CEPH_LOCK_IPOLICY:
1535 if (inode.is_dir()) {
1536 ::encode(inode.version, bl);
1537 ::encode(inode.ctime, bl);
1538 ::encode(inode.layout, bl, mdcache->mds->mdsmap->get_up_features());
1539 ::encode(inode.quota, bl);
1540 ::encode(inode.export_pin, bl);
1541 }
1542 break;
1543
1544 default:
1545 ceph_abort();
1546 }
1547}
1548
1549
1550/* for more info on scatterlocks, see comments by Locker::scatter_writebehind */
1551
1552void CInode::decode_lock_state(int type, bufferlist& bl)
1553{
1554 bufferlist::iterator p = bl.begin();
1555 utime_t tm;
1556
1557 snapid_t newfirst;
1558 ::decode(newfirst, p);
1559
1560 if (!is_auth() && newfirst != first) {
1561 dout(10) << "decode_lock_state first " << first << " -> " << newfirst << dendl;
1562 assert(newfirst > first);
1563 if (!is_multiversion() && parent) {
1564 assert(parent->first == first);
1565 parent->first = newfirst;
1566 }
1567 first = newfirst;
1568 }
1569
1570 switch (type) {
1571 case CEPH_LOCK_IAUTH:
1572 ::decode(inode.version, p);
1573 ::decode(tm, p);
1574 if (inode.ctime < tm) inode.ctime = tm;
1575 ::decode(inode.mode, p);
1576 ::decode(inode.uid, p);
1577 ::decode(inode.gid, p);
1578 break;
1579
1580 case CEPH_LOCK_ILINK:
1581 ::decode(inode.version, p);
1582 ::decode(tm, p);
1583 if (inode.ctime < tm) inode.ctime = tm;
1584 ::decode(inode.nlink, p);
1585 break;
1586
1587 case CEPH_LOCK_IDFT:
1588 if (is_auth()) {
1589 bool replica_dirty;
1590 ::decode(replica_dirty, p);
1591 if (replica_dirty) {
1592 dout(10) << "decode_lock_state setting dftlock dirty flag" << dendl;
1593 dirfragtreelock.mark_dirty(); // ok bc we're auth and caller will handle
1594 }
1595 } else {
1596 ::decode(inode.version, p);
1597 }
1598 {
1599 fragtree_t temp;
1600 ::decode(temp, p);
1601 set<frag_t> authfrags;
1602 ::decode(authfrags, p);
1603 if (is_auth()) {
1604 // auth. believe replica's auth frags only.
1605 for (set<frag_t>::iterator p = authfrags.begin(); p != authfrags.end(); ++p)
1606 if (!dirfragtree.is_leaf(*p)) {
1607 dout(10) << " forcing frag " << *p << " to leaf (split|merge)" << dendl;
1608 dirfragtree.force_to_leaf(g_ceph_context, *p);
1609 dirfragtreelock.mark_dirty(); // ok bc we're auth and caller will handle
1610 }
1611 } else {
1612 // replica. take the new tree, BUT make sure any open
1613 // dirfrags remain leaves (they may have split _after_ this
1614 // dft was scattered, or we may still be be waiting on the
1615 // notify from the auth)
1616 dirfragtree.swap(temp);
94b18763
FG
1617 for (const auto &p : dirfrags) {
1618 if (!dirfragtree.is_leaf(p.first)) {
1619 dout(10) << " forcing open dirfrag " << p.first << " to leaf (racing with split|merge)" << dendl;
1620 dirfragtree.force_to_leaf(g_ceph_context, p.first);
7c673cae 1621 }
94b18763
FG
1622 if (p.second->is_auth())
1623 p.second->state_clear(CDir::STATE_DIRTYDFT);
7c673cae
FG
1624 }
1625 }
1626 if (g_conf->mds_debug_frag)
1627 verify_dirfrags();
1628 }
1629 break;
1630
1631 case CEPH_LOCK_IFILE:
1632 if (!is_auth()) {
1633 ::decode(inode.version, p);
1634 ::decode(tm, p);
1635 if (inode.ctime < tm) inode.ctime = tm;
1636 ::decode(inode.mtime, p);
1637 ::decode(inode.atime, p);
1638 ::decode(inode.time_warp_seq, p);
1639 if (!is_dir()) {
1640 ::decode(inode.layout, p);
1641 ::decode(inode.size, p);
1642 ::decode(inode.truncate_seq, p);
1643 ::decode(inode.truncate_size, p);
1644 ::decode(inode.client_ranges, p);
1645 ::decode(inode.inline_data, p);
1646 }
1647 } else {
1648 bool replica_dirty;
1649 ::decode(replica_dirty, p);
1650 if (replica_dirty) {
1651 dout(10) << "decode_lock_state setting filelock dirty flag" << dendl;
1652 filelock.mark_dirty(); // ok bc we're auth and caller will handle
1653 }
1654 }
1655 {
1656 frag_info_t dirstat;
1657 ::decode(dirstat, p);
1658 if (!is_auth()) {
1659 dout(10) << " taking inode dirstat " << dirstat << " for " << *this << dendl;
1660 inode.dirstat = dirstat; // take inode summation if replica
1661 }
1662 __u32 n;
1663 ::decode(n, p);
1664 dout(10) << " ...got " << n << " fragstats on " << *this << dendl;
1665 while (n--) {
1666 frag_t fg;
1667 snapid_t fgfirst;
1668 frag_info_t fragstat;
1669 frag_info_t accounted_fragstat;
1670 ::decode(fg, p);
1671 ::decode(fgfirst, p);
1672 ::decode(fragstat, p);
1673 ::decode(accounted_fragstat, p);
1674 dout(10) << fg << " [" << fgfirst << ",head] " << dendl;
1675 dout(10) << fg << " fragstat " << fragstat << dendl;
1676 dout(20) << fg << " accounted_fragstat " << accounted_fragstat << dendl;
1677
1678 CDir *dir = get_dirfrag(fg);
1679 if (is_auth()) {
1680 assert(dir); // i am auth; i had better have this dir open
1681 dout(10) << fg << " first " << dir->first << " -> " << fgfirst
1682 << " on " << *dir << dendl;
1683 dir->first = fgfirst;
1684 dir->fnode.fragstat = fragstat;
1685 dir->fnode.accounted_fragstat = accounted_fragstat;
1686 dir->first = fgfirst;
1687 if (!(fragstat == accounted_fragstat)) {
1688 dout(10) << fg << " setting filelock updated flag" << dendl;
1689 filelock.mark_dirty(); // ok bc we're auth and caller will handle
1690 }
1691 } else {
1692 if (dir && dir->is_auth()) {
1693 dout(10) << fg << " first " << dir->first << " -> " << fgfirst
1694 << " on " << *dir << dendl;
1695 dir->first = fgfirst;
1696 fnode_t *pf = dir->get_projected_fnode();
1697 finish_scatter_update(&filelock, dir,
1698 inode.dirstat.version, pf->accounted_fragstat.version);
1699 }
1700 }
1701 }
1702 }
1703 break;
1704
1705 case CEPH_LOCK_INEST:
1706 if (is_auth()) {
1707 bool replica_dirty;
1708 ::decode(replica_dirty, p);
1709 if (replica_dirty) {
1710 dout(10) << "decode_lock_state setting nestlock dirty flag" << dendl;
1711 nestlock.mark_dirty(); // ok bc we're auth and caller will handle
1712 }
1713 } else {
1714 ::decode(inode.version, p);
1715 }
1716 {
1717 nest_info_t rstat;
1718 ::decode(rstat, p);
1719 if (!is_auth()) {
1720 dout(10) << " taking inode rstat " << rstat << " for " << *this << dendl;
1721 inode.rstat = rstat; // take inode summation if replica
1722 }
1723 __u32 n;
1724 ::decode(n, p);
1725 while (n--) {
1726 frag_t fg;
1727 snapid_t fgfirst;
1728 nest_info_t rstat;
1729 nest_info_t accounted_rstat;
94b18763 1730 decltype(CDir::dirty_old_rstat) dirty_old_rstat;
7c673cae
FG
1731 ::decode(fg, p);
1732 ::decode(fgfirst, p);
1733 ::decode(rstat, p);
1734 ::decode(accounted_rstat, p);
1735 ::decode(dirty_old_rstat, p);
1736 dout(10) << fg << " [" << fgfirst << ",head]" << dendl;
1737 dout(10) << fg << " rstat " << rstat << dendl;
1738 dout(10) << fg << " accounted_rstat " << accounted_rstat << dendl;
1739 dout(10) << fg << " dirty_old_rstat " << dirty_old_rstat << dendl;
1740
1741 CDir *dir = get_dirfrag(fg);
1742 if (is_auth()) {
1743 assert(dir); // i am auth; i had better have this dir open
1744 dout(10) << fg << " first " << dir->first << " -> " << fgfirst
1745 << " on " << *dir << dendl;
1746 dir->first = fgfirst;
1747 dir->fnode.rstat = rstat;
1748 dir->fnode.accounted_rstat = accounted_rstat;
1749 dir->dirty_old_rstat.swap(dirty_old_rstat);
1750 if (!(rstat == accounted_rstat) || !dir->dirty_old_rstat.empty()) {
1751 dout(10) << fg << " setting nestlock updated flag" << dendl;
1752 nestlock.mark_dirty(); // ok bc we're auth and caller will handle
1753 }
1754 } else {
1755 if (dir && dir->is_auth()) {
1756 dout(10) << fg << " first " << dir->first << " -> " << fgfirst
1757 << " on " << *dir << dendl;
1758 dir->first = fgfirst;
1759 fnode_t *pf = dir->get_projected_fnode();
1760 finish_scatter_update(&nestlock, dir,
1761 inode.rstat.version, pf->accounted_rstat.version);
1762 }
1763 }
1764 }
1765 }
1766 break;
1767
1768 case CEPH_LOCK_IXATTR:
1769 ::decode(inode.version, p);
1770 ::decode(tm, p);
1771 if (inode.ctime < tm) inode.ctime = tm;
1772 ::decode(xattrs, p);
1773 break;
1774
1775 case CEPH_LOCK_ISNAP:
1776 {
1777 ::decode(inode.version, p);
1778 ::decode(tm, p);
1779 if (inode.ctime < tm) inode.ctime = tm;
1780 snapid_t seq = 0;
1781 if (snaprealm)
1782 seq = snaprealm->srnode.seq;
1783 decode_snap(p);
1784 if (snaprealm && snaprealm->srnode.seq != seq)
1785 mdcache->do_realm_invalidate_and_update_notify(this, seq ? CEPH_SNAP_OP_UPDATE:CEPH_SNAP_OP_SPLIT);
1786 }
1787 break;
1788
1789 case CEPH_LOCK_IFLOCK:
1790 ::decode(inode.version, p);
1791 _decode_file_locks(p);
1792 break;
1793
1794 case CEPH_LOCK_IPOLICY:
1795 if (inode.is_dir()) {
1796 ::decode(inode.version, p);
1797 ::decode(tm, p);
1798 if (inode.ctime < tm) inode.ctime = tm;
1799 ::decode(inode.layout, p);
1800 ::decode(inode.quota, p);
31f18b77 1801 mds_rank_t old_pin = inode.export_pin;
7c673cae 1802 ::decode(inode.export_pin, p);
31f18b77 1803 maybe_export_pin(old_pin != inode.export_pin);
7c673cae
FG
1804 }
1805 break;
1806
1807 default:
1808 ceph_abort();
1809 }
1810}
1811
1812
1813bool CInode::is_dirty_scattered()
1814{
1815 return
1816 filelock.is_dirty_or_flushing() ||
1817 nestlock.is_dirty_or_flushing() ||
1818 dirfragtreelock.is_dirty_or_flushing();
1819}
1820
1821void CInode::clear_scatter_dirty()
1822{
1823 filelock.remove_dirty();
1824 nestlock.remove_dirty();
1825 dirfragtreelock.remove_dirty();
1826}
1827
1828void CInode::clear_dirty_scattered(int type)
1829{
1830 dout(10) << "clear_dirty_scattered " << type << " on " << *this << dendl;
b32b8144 1831 assert(is_dir());
7c673cae
FG
1832 switch (type) {
1833 case CEPH_LOCK_IFILE:
1834 item_dirty_dirfrag_dir.remove_myself();
1835 break;
1836
1837 case CEPH_LOCK_INEST:
1838 item_dirty_dirfrag_nest.remove_myself();
1839 break;
1840
1841 case CEPH_LOCK_IDFT:
1842 item_dirty_dirfrag_dirfragtree.remove_myself();
1843 break;
1844
1845 default:
1846 ceph_abort();
1847 }
1848}
1849
1850
1851/*
1852 * when we initially scatter a lock, we need to check if any of the dirfrags
1853 * have out of date accounted_rstat/fragstat. if so, mark the lock stale.
1854 */
1855/* for more info on scatterlocks, see comments by Locker::scatter_writebehind */
1856void CInode::start_scatter(ScatterLock *lock)
1857{
1858 dout(10) << "start_scatter " << *lock << " on " << *this << dendl;
1859 assert(is_auth());
94b18763 1860 mempool_inode *pi = get_projected_inode();
7c673cae 1861
94b18763
FG
1862 for (const auto &p : dirfrags) {
1863 frag_t fg = p.first;
1864 CDir *dir = p.second;
7c673cae
FG
1865 fnode_t *pf = dir->get_projected_fnode();
1866 dout(20) << fg << " " << *dir << dendl;
1867
1868 if (!dir->is_auth())
1869 continue;
1870
1871 switch (lock->get_type()) {
1872 case CEPH_LOCK_IFILE:
1873 finish_scatter_update(lock, dir, pi->dirstat.version, pf->accounted_fragstat.version);
1874 break;
1875
1876 case CEPH_LOCK_INEST:
1877 finish_scatter_update(lock, dir, pi->rstat.version, pf->accounted_rstat.version);
1878 break;
1879
1880 case CEPH_LOCK_IDFT:
1881 dir->state_clear(CDir::STATE_DIRTYDFT);
1882 break;
1883 }
1884 }
1885}
1886
1887
1888class C_Inode_FragUpdate : public MDSLogContextBase {
1889protected:
1890 CInode *in;
1891 CDir *dir;
1892 MutationRef mut;
1893 MDSRank *get_mds() override {return in->mdcache->mds;}
1894 void finish(int r) override {
1895 in->_finish_frag_update(dir, mut);
1896 }
1897
1898public:
1899 C_Inode_FragUpdate(CInode *i, CDir *d, MutationRef& m) : in(i), dir(d), mut(m) {}
1900};
1901
1902void CInode::finish_scatter_update(ScatterLock *lock, CDir *dir,
1903 version_t inode_version, version_t dir_accounted_version)
1904{
1905 frag_t fg = dir->get_frag();
1906 assert(dir->is_auth());
1907
1908 if (dir->is_frozen()) {
1909 dout(10) << "finish_scatter_update " << fg << " frozen, marking " << *lock << " stale " << *dir << dendl;
1910 } else if (dir->get_version() == 0) {
1911 dout(10) << "finish_scatter_update " << fg << " not loaded, marking " << *lock << " stale " << *dir << dendl;
1912 } else {
1913 if (dir_accounted_version != inode_version) {
1914 dout(10) << "finish_scatter_update " << fg << " journaling accounted scatterstat update v" << inode_version << dendl;
1915
1916 MDLog *mdlog = mdcache->mds->mdlog;
1917 MutationRef mut(new MutationImpl());
1918 mut->ls = mdlog->get_current_segment();
1919
94b18763 1920 mempool_inode *pi = get_projected_inode();
7c673cae 1921 fnode_t *pf = dir->project_fnode();
7c673cae
FG
1922
1923 const char *ename = 0;
1924 switch (lock->get_type()) {
1925 case CEPH_LOCK_IFILE:
1926 pf->fragstat.version = pi->dirstat.version;
1927 pf->accounted_fragstat = pf->fragstat;
1928 ename = "lock ifile accounted scatter stat update";
1929 break;
1930 case CEPH_LOCK_INEST:
1931 pf->rstat.version = pi->rstat.version;
1932 pf->accounted_rstat = pf->rstat;
1933 ename = "lock inest accounted scatter stat update";
c07f9fc5
FG
1934
1935 if (!is_auth() && lock->get_state() == LOCK_MIX) {
1936 dout(10) << "finish_scatter_update try to assimilate dirty rstat on "
1937 << *dir << dendl;
1938 dir->assimilate_dirty_rstat_inodes();
1939 }
1940
7c673cae
FG
1941 break;
1942 default:
1943 ceph_abort();
1944 }
1945
c07f9fc5 1946 pf->version = dir->pre_dirty();
7c673cae
FG
1947 mut->add_projected_fnode(dir);
1948
1949 EUpdate *le = new EUpdate(mdlog, ename);
1950 mdlog->start_entry(le);
1951 le->metablob.add_dir_context(dir);
1952 le->metablob.add_dir(dir, true);
1953
1954 assert(!dir->is_frozen());
1955 mut->auth_pin(dir);
c07f9fc5
FG
1956
1957 if (lock->get_type() == CEPH_LOCK_INEST &&
1958 !is_auth() && lock->get_state() == LOCK_MIX) {
1959 dout(10) << "finish_scatter_update finish assimilating dirty rstat on "
1960 << *dir << dendl;
1961 dir->assimilate_dirty_rstat_inodes_finish(mut, &le->metablob);
1962
1963 if (!(pf->rstat == pf->accounted_rstat)) {
1964 if (mut->wrlocks.count(&nestlock) == 0) {
1965 mdcache->mds->locker->wrlock_force(&nestlock, mut);
1966 }
1967
1968 mdcache->mds->locker->mark_updated_scatterlock(&nestlock);
1969 mut->ls->dirty_dirfrag_nest.push_back(&item_dirty_dirfrag_nest);
1970 }
1971 }
7c673cae
FG
1972
1973 mdlog->submit_entry(le, new C_Inode_FragUpdate(this, dir, mut));
1974 } else {
1975 dout(10) << "finish_scatter_update " << fg << " accounted " << *lock
1976 << " scatter stat unchanged at v" << dir_accounted_version << dendl;
1977 }
1978 }
1979}
1980
1981void CInode::_finish_frag_update(CDir *dir, MutationRef& mut)
1982{
1983 dout(10) << "_finish_frag_update on " << *dir << dendl;
1984 mut->apply();
c07f9fc5 1985 mdcache->mds->locker->drop_locks(mut.get());
7c673cae
FG
1986 mut->cleanup();
1987}
1988
1989
1990/*
1991 * when we gather a lock, we need to assimilate dirfrag changes into the inode
1992 * state. it's possible we can't update the dirfrag accounted_rstat/fragstat
1993 * because the frag is auth and frozen, or that the replica couldn't for the same
1994 * reason. hopefully it will get updated the next time the lock cycles.
1995 *
1996 * we have two dimensions of behavior:
1997 * - we may be (auth and !frozen), and able to update, or not.
1998 * - the frag may be stale, or not.
1999 *
2000 * if the frag is non-stale, we want to assimilate the diff into the
2001 * inode, regardless of whether it's auth or updateable.
2002 *
2003 * if we update the frag, we want to set accounted_fragstat = frag,
2004 * both if we took the diff or it was stale and we are making it
2005 * un-stale.
2006 */
2007/* for more info on scatterlocks, see comments by Locker::scatter_writebehind */
2008void CInode::finish_scatter_gather_update(int type)
2009{
2010 LogChannelRef clog = mdcache->mds->clog;
2011
2012 dout(10) << "finish_scatter_gather_update " << type << " on " << *this << dendl;
2013 assert(is_auth());
2014
2015 switch (type) {
2016 case CEPH_LOCK_IFILE:
2017 {
2018 fragtree_t tmpdft = dirfragtree;
2019 struct frag_info_t dirstat;
2020 bool dirstat_valid = true;
2021
2022 // adjust summation
2023 assert(is_auth());
94b18763 2024 mempool_inode *pi = get_projected_inode();
7c673cae
FG
2025
2026 bool touched_mtime = false, touched_chattr = false;
2027 dout(20) << " orig dirstat " << pi->dirstat << dendl;
2028 pi->dirstat.version++;
94b18763
FG
2029 for (const auto &p : dirfrags) {
2030 frag_t fg = p.first;
2031 CDir *dir = p.second;
7c673cae
FG
2032 dout(20) << fg << " " << *dir << dendl;
2033
2034 bool update;
2035 if (dir->get_version() != 0) {
2036 update = dir->is_auth() && !dir->is_frozen();
2037 } else {
2038 update = false;
2039 dirstat_valid = false;
2040 }
2041
2042 fnode_t *pf = dir->get_projected_fnode();
2043 if (update)
2044 pf = dir->project_fnode();
2045
2046 if (pf->accounted_fragstat.version == pi->dirstat.version - 1) {
2047 dout(20) << fg << " fragstat " << pf->fragstat << dendl;
2048 dout(20) << fg << " accounted_fragstat " << pf->accounted_fragstat << dendl;
2049 pi->dirstat.add_delta(pf->fragstat, pf->accounted_fragstat, &touched_mtime, &touched_chattr);
2050 } else {
2051 dout(20) << fg << " skipping STALE accounted_fragstat " << pf->accounted_fragstat << dendl;
2052 }
2053
2054 if (pf->fragstat.nfiles < 0 ||
2055 pf->fragstat.nsubdirs < 0) {
2056 clog->error() << "bad/negative dir size on "
2057 << dir->dirfrag() << " " << pf->fragstat;
2058 assert(!"bad/negative fragstat" == g_conf->mds_verify_scatter);
2059
2060 if (pf->fragstat.nfiles < 0)
2061 pf->fragstat.nfiles = 0;
2062 if (pf->fragstat.nsubdirs < 0)
2063 pf->fragstat.nsubdirs = 0;
2064 }
2065
2066 if (update) {
2067 pf->accounted_fragstat = pf->fragstat;
2068 pf->fragstat.version = pf->accounted_fragstat.version = pi->dirstat.version;
2069 dout(10) << fg << " updated accounted_fragstat " << pf->fragstat << " on " << *dir << dendl;
2070 }
2071
2072 tmpdft.force_to_leaf(g_ceph_context, fg);
2073 dirstat.add(pf->fragstat);
2074 }
2075 if (touched_mtime)
2076 pi->mtime = pi->ctime = pi->dirstat.mtime;
2077 if (touched_chattr)
2078 pi->change_attr = pi->dirstat.change_attr;
2079 dout(20) << " final dirstat " << pi->dirstat << dendl;
2080
2081 if (dirstat_valid && !dirstat.same_sums(pi->dirstat)) {
2082 list<frag_t> ls;
2083 tmpdft.get_leaves_under(frag_t(), ls);
2084 for (list<frag_t>::iterator p = ls.begin(); p != ls.end(); ++p)
2085 if (!dirfrags.count(*p)) {
2086 dirstat_valid = false;
2087 break;
2088 }
2089 if (dirstat_valid) {
2090 if (state_test(CInode::STATE_REPAIRSTATS)) {
2091 dout(20) << " dirstat mismatch, fixing" << dendl;
2092 } else {
2093 clog->error() << "unmatched fragstat on " << ino() << ", inode has "
2094 << pi->dirstat << ", dirfrags have " << dirstat;
2095 assert(!"unmatched fragstat" == g_conf->mds_verify_scatter);
2096 }
2097 // trust the dirfrags for now
2098 version_t v = pi->dirstat.version;
2099 if (pi->dirstat.mtime > dirstat.mtime)
2100 dirstat.mtime = pi->dirstat.mtime;
2101 if (pi->dirstat.change_attr > dirstat.change_attr)
2102 dirstat.change_attr = pi->dirstat.change_attr;
2103 pi->dirstat = dirstat;
2104 pi->dirstat.version = v;
2105 }
2106 }
2107
d2e6a577
FG
2108 if (pi->dirstat.nfiles < 0 || pi->dirstat.nsubdirs < 0)
2109 {
2110 std::string path;
2111 make_path_string(path);
2112 clog->error() << "Inconsistent statistics detected: fragstat on inode "
2113 << ino() << " (" << path << "), inode has " << pi->dirstat;
7c673cae
FG
2114 assert(!"bad/negative fragstat" == g_conf->mds_verify_scatter);
2115
2116 if (pi->dirstat.nfiles < 0)
2117 pi->dirstat.nfiles = 0;
2118 if (pi->dirstat.nsubdirs < 0)
2119 pi->dirstat.nsubdirs = 0;
2120 }
2121 }
2122 break;
2123
2124 case CEPH_LOCK_INEST:
2125 {
2126 fragtree_t tmpdft = dirfragtree;
2127 nest_info_t rstat;
2128 rstat.rsubdirs = 1;
2129 bool rstat_valid = true;
2130
2131 // adjust summation
2132 assert(is_auth());
94b18763 2133 mempool_inode *pi = get_projected_inode();
7c673cae
FG
2134 dout(20) << " orig rstat " << pi->rstat << dendl;
2135 pi->rstat.version++;
94b18763
FG
2136 for (const auto &p : dirfrags) {
2137 frag_t fg = p.first;
2138 CDir *dir = p.second;
7c673cae
FG
2139 dout(20) << fg << " " << *dir << dendl;
2140
2141 bool update;
2142 if (dir->get_version() != 0) {
2143 update = dir->is_auth() && !dir->is_frozen();
2144 } else {
2145 update = false;
2146 rstat_valid = false;
2147 }
2148
2149 fnode_t *pf = dir->get_projected_fnode();
2150 if (update)
2151 pf = dir->project_fnode();
2152
2153 if (pf->accounted_rstat.version == pi->rstat.version-1) {
2154 // only pull this frag's dirty rstat inodes into the frag if
2155 // the frag is non-stale and updateable. if it's stale,
2156 // that info will just get thrown out!
2157 if (update)
2158 dir->assimilate_dirty_rstat_inodes();
2159
2160 dout(20) << fg << " rstat " << pf->rstat << dendl;
2161 dout(20) << fg << " accounted_rstat " << pf->accounted_rstat << dendl;
2162 dout(20) << fg << " dirty_old_rstat " << dir->dirty_old_rstat << dendl;
2163 mdcache->project_rstat_frag_to_inode(pf->rstat, pf->accounted_rstat,
2164 dir->first, CEPH_NOSNAP, this, true);
94b18763
FG
2165 for (auto &p : dir->dirty_old_rstat) {
2166 mdcache->project_rstat_frag_to_inode(p.second.rstat, p.second.accounted_rstat,
2167 p.second.first, p.first, this, true);
2168 }
7c673cae
FG
2169 if (update) // dir contents not valid if frozen or non-auth
2170 dir->check_rstats();
2171 } else {
2172 dout(20) << fg << " skipping STALE accounted_rstat " << pf->accounted_rstat << dendl;
2173 }
2174 if (update) {
2175 pf->accounted_rstat = pf->rstat;
2176 dir->dirty_old_rstat.clear();
2177 pf->rstat.version = pf->accounted_rstat.version = pi->rstat.version;
2178 dir->check_rstats();
2179 dout(10) << fg << " updated accounted_rstat " << pf->rstat << " on " << *dir << dendl;
2180 }
2181
2182 tmpdft.force_to_leaf(g_ceph_context, fg);
2183 rstat.add(pf->rstat);
2184 }
2185 dout(20) << " final rstat " << pi->rstat << dendl;
2186
2187 if (rstat_valid && !rstat.same_sums(pi->rstat)) {
2188 list<frag_t> ls;
2189 tmpdft.get_leaves_under(frag_t(), ls);
2190 for (list<frag_t>::iterator p = ls.begin(); p != ls.end(); ++p)
2191 if (!dirfrags.count(*p)) {
2192 rstat_valid = false;
2193 break;
2194 }
2195 if (rstat_valid) {
2196 if (state_test(CInode::STATE_REPAIRSTATS)) {
2197 dout(20) << " rstat mismatch, fixing" << dendl;
2198 } else {
d2e6a577
FG
2199 clog->error() << "inconsistent rstat on inode " << ino()
2200 << ", inode has " << pi->rstat
2201 << ", directory fragments have " << rstat;
7c673cae
FG
2202 assert(!"unmatched rstat" == g_conf->mds_verify_scatter);
2203 }
2204 // trust the dirfrag for now
2205 version_t v = pi->rstat.version;
2206 if (pi->rstat.rctime > rstat.rctime)
2207 rstat.rctime = pi->rstat.rctime;
2208 pi->rstat = rstat;
2209 pi->rstat.version = v;
2210 }
2211 }
2212
2213 mdcache->broadcast_quota_to_client(this);
2214 }
2215 break;
2216
2217 case CEPH_LOCK_IDFT:
2218 break;
2219
2220 default:
2221 ceph_abort();
2222 }
2223}
2224
2225void CInode::finish_scatter_gather_update_accounted(int type, MutationRef& mut, EMetaBlob *metablob)
2226{
2227 dout(10) << "finish_scatter_gather_update_accounted " << type << " on " << *this << dendl;
2228 assert(is_auth());
2229
94b18763
FG
2230 for (const auto &p : dirfrags) {
2231 CDir *dir = p.second;
7c673cae
FG
2232 if (!dir->is_auth() || dir->get_version() == 0 || dir->is_frozen())
2233 continue;
2234
2235 if (type == CEPH_LOCK_IDFT)
2236 continue; // nothing to do.
2237
2238 dout(10) << " journaling updated frag accounted_ on " << *dir << dendl;
2239 assert(dir->is_projected());
2240 fnode_t *pf = dir->get_projected_fnode();
2241 pf->version = dir->pre_dirty();
2242 mut->add_projected_fnode(dir);
2243 metablob->add_dir(dir, true);
2244 mut->auth_pin(dir);
2245
2246 if (type == CEPH_LOCK_INEST)
2247 dir->assimilate_dirty_rstat_inodes_finish(mut, metablob);
2248 }
2249}
2250
2251// waiting
2252
2253bool CInode::is_frozen() const
2254{
2255 if (is_frozen_inode()) return true;
2256 if (parent && parent->dir->is_frozen()) return true;
2257 return false;
2258}
2259
2260bool CInode::is_frozen_dir() const
2261{
2262 if (parent && parent->dir->is_frozen_dir()) return true;
2263 return false;
2264}
2265
2266bool CInode::is_freezing() const
2267{
2268 if (is_freezing_inode()) return true;
2269 if (parent && parent->dir->is_freezing()) return true;
2270 return false;
2271}
2272
2273void CInode::add_dir_waiter(frag_t fg, MDSInternalContextBase *c)
2274{
2275 if (waiting_on_dir.empty())
2276 get(PIN_DIRWAITER);
2277 waiting_on_dir[fg].push_back(c);
2278 dout(10) << "add_dir_waiter frag " << fg << " " << c << " on " << *this << dendl;
2279}
2280
2281void CInode::take_dir_waiting(frag_t fg, list<MDSInternalContextBase*>& ls)
2282{
2283 if (waiting_on_dir.empty())
2284 return;
2285
94b18763
FG
2286 auto it = waiting_on_dir.find(fg);
2287 if (it != waiting_on_dir.end()) {
2288 dout(10) << __func__ << " frag " << fg << " on " << *this << dendl;
2289 ls.splice(ls.end(), it->second);
2290 waiting_on_dir.erase(it);
7c673cae
FG
2291
2292 if (waiting_on_dir.empty())
2293 put(PIN_DIRWAITER);
2294 }
2295}
2296
2297void CInode::add_waiter(uint64_t tag, MDSInternalContextBase *c)
2298{
2299 dout(10) << "add_waiter tag " << std::hex << tag << std::dec << " " << c
2300 << " !ambig " << !state_test(STATE_AMBIGUOUSAUTH)
2301 << " !frozen " << !is_frozen_inode()
2302 << " !freezing " << !is_freezing_inode()
2303 << dendl;
2304 // wait on the directory?
2305 // make sure its not the inode that is explicitly ambiguous|freezing|frozen
2306 if (((tag & WAIT_SINGLEAUTH) && !state_test(STATE_AMBIGUOUSAUTH)) ||
2307 ((tag & WAIT_UNFREEZE) &&
2308 !is_frozen_inode() && !is_freezing_inode() && !is_frozen_auth_pin())) {
2309 dout(15) << "passing waiter up tree" << dendl;
2310 parent->dir->add_waiter(tag, c);
2311 return;
2312 }
2313 dout(15) << "taking waiter here" << dendl;
2314 MDSCacheObject::add_waiter(tag, c);
2315}
2316
2317void CInode::take_waiting(uint64_t mask, list<MDSInternalContextBase*>& ls)
2318{
2319 if ((mask & WAIT_DIR) && !waiting_on_dir.empty()) {
2320 // take all dentry waiters
2321 while (!waiting_on_dir.empty()) {
94b18763
FG
2322 auto it = waiting_on_dir.begin();
2323 dout(10) << __func__ << " dirfrag " << it->first << " on " << *this << dendl;
2324 ls.splice(ls.end(), it->second);
2325 waiting_on_dir.erase(it);
7c673cae
FG
2326 }
2327 put(PIN_DIRWAITER);
2328 }
2329
2330 // waiting
2331 MDSCacheObject::take_waiting(mask, ls);
2332}
2333
2334bool CInode::freeze_inode(int auth_pin_allowance)
2335{
2336 assert(auth_pin_allowance > 0); // otherwise we need to adjust parent's nested_auth_pins
2337 assert(auth_pins >= auth_pin_allowance);
2338 if (auth_pins > auth_pin_allowance) {
2339 dout(10) << "freeze_inode - waiting for auth_pins to drop to " << auth_pin_allowance << dendl;
2340 auth_pin_freeze_allowance = auth_pin_allowance;
2341 get(PIN_FREEZING);
2342 state_set(STATE_FREEZING);
2343 return false;
2344 }
2345
2346 dout(10) << "freeze_inode - frozen" << dendl;
2347 assert(auth_pins == auth_pin_allowance);
2348 if (!state_test(STATE_FROZEN)) {
2349 get(PIN_FROZEN);
2350 state_set(STATE_FROZEN);
2351 }
2352 return true;
2353}
2354
2355void CInode::unfreeze_inode(list<MDSInternalContextBase*>& finished)
2356{
2357 dout(10) << "unfreeze_inode" << dendl;
2358 if (state_test(STATE_FREEZING)) {
2359 state_clear(STATE_FREEZING);
2360 put(PIN_FREEZING);
2361 } else if (state_test(STATE_FROZEN)) {
2362 state_clear(STATE_FROZEN);
2363 put(PIN_FROZEN);
2364 } else
2365 ceph_abort();
2366 take_waiting(WAIT_UNFREEZE, finished);
2367}
2368
2369void CInode::unfreeze_inode()
2370{
2371 list<MDSInternalContextBase*> finished;
2372 unfreeze_inode(finished);
2373 mdcache->mds->queue_waiters(finished);
2374}
2375
2376void CInode::freeze_auth_pin()
2377{
2378 assert(state_test(CInode::STATE_FROZEN));
2379 state_set(CInode::STATE_FROZENAUTHPIN);
2380}
2381
2382void CInode::unfreeze_auth_pin()
2383{
2384 assert(state_test(CInode::STATE_FROZENAUTHPIN));
2385 state_clear(CInode::STATE_FROZENAUTHPIN);
2386 if (!state_test(STATE_FREEZING|STATE_FROZEN)) {
2387 list<MDSInternalContextBase*> finished;
2388 take_waiting(WAIT_UNFREEZE, finished);
2389 mdcache->mds->queue_waiters(finished);
2390 }
2391}
2392
2393void CInode::clear_ambiguous_auth(list<MDSInternalContextBase*>& finished)
2394{
2395 assert(state_test(CInode::STATE_AMBIGUOUSAUTH));
2396 state_clear(CInode::STATE_AMBIGUOUSAUTH);
2397 take_waiting(CInode::WAIT_SINGLEAUTH, finished);
2398}
2399
2400void CInode::clear_ambiguous_auth()
2401{
2402 list<MDSInternalContextBase*> finished;
2403 clear_ambiguous_auth(finished);
2404 mdcache->mds->queue_waiters(finished);
2405}
2406
2407// auth_pins
2408bool CInode::can_auth_pin() const {
2409 if (!is_auth() || is_freezing_inode() || is_frozen_inode() || is_frozen_auth_pin())
2410 return false;
2411 if (parent)
2412 return parent->can_auth_pin();
2413 return true;
2414}
2415
2416void CInode::auth_pin(void *by)
2417{
2418 if (auth_pins == 0)
2419 get(PIN_AUTHPIN);
2420 auth_pins++;
2421
2422#ifdef MDS_AUTHPIN_SET
2423 auth_pin_set.insert(by);
2424#endif
2425
2426 dout(10) << "auth_pin by " << by << " on " << *this
2427 << " now " << auth_pins << "+" << nested_auth_pins
2428 << dendl;
2429
2430 if (parent)
2431 parent->adjust_nested_auth_pins(1, 1, this);
2432}
2433
2434void CInode::auth_unpin(void *by)
2435{
2436 auth_pins--;
2437
2438#ifdef MDS_AUTHPIN_SET
2439 assert(auth_pin_set.count(by));
2440 auth_pin_set.erase(auth_pin_set.find(by));
2441#endif
2442
2443 if (auth_pins == 0)
2444 put(PIN_AUTHPIN);
2445
2446 dout(10) << "auth_unpin by " << by << " on " << *this
2447 << " now " << auth_pins << "+" << nested_auth_pins
2448 << dendl;
2449
2450 assert(auth_pins >= 0);
2451
2452 if (parent)
2453 parent->adjust_nested_auth_pins(-1, -1, by);
2454
2455 if (is_freezing_inode() &&
2456 auth_pins == auth_pin_freeze_allowance) {
2457 dout(10) << "auth_unpin freezing!" << dendl;
2458 get(PIN_FROZEN);
2459 put(PIN_FREEZING);
2460 state_clear(STATE_FREEZING);
2461 state_set(STATE_FROZEN);
2462 finish_waiting(WAIT_FROZEN);
2463 }
2464}
2465
2466void CInode::adjust_nested_auth_pins(int a, void *by)
2467{
2468 assert(a);
2469 nested_auth_pins += a;
2470 dout(35) << "adjust_nested_auth_pins by " << by
2471 << " change " << a << " yields "
2472 << auth_pins << "+" << nested_auth_pins << dendl;
2473 assert(nested_auth_pins >= 0);
2474
2475 if (g_conf->mds_debug_auth_pins) {
2476 // audit
2477 int s = 0;
94b18763
FG
2478 for (const auto &p : dirfrags) {
2479 CDir *dir = p.second;
7c673cae
FG
2480 if (!dir->is_subtree_root() && dir->get_cum_auth_pins())
2481 s++;
2482 }
2483 assert(s == nested_auth_pins);
2484 }
2485
2486 if (parent)
2487 parent->adjust_nested_auth_pins(a, 0, by);
2488}
2489
2490
2491// authority
2492
2493mds_authority_t CInode::authority() const
2494{
2495 if (inode_auth.first >= 0)
2496 return inode_auth;
2497
2498 if (parent)
2499 return parent->dir->authority();
2500
2501 // new items that are not yet linked in (in the committed plane) belong
2502 // to their first parent.
2503 if (!projected_parent.empty())
2504 return projected_parent.front()->dir->authority();
2505
2506 return CDIR_AUTH_UNDEF;
2507}
2508
2509
2510// SNAP
2511
2512snapid_t CInode::get_oldest_snap()
2513{
2514 snapid_t t = first;
2515 if (!old_inodes.empty())
2516 t = old_inodes.begin()->second.first;
2517 return MIN(t, oldest_snap);
2518}
2519
94b18763 2520CInode::mempool_old_inode& CInode::cow_old_inode(snapid_t follows, bool cow_head)
7c673cae
FG
2521{
2522 assert(follows >= first);
2523
94b18763
FG
2524 mempool_inode *pi = cow_head ? get_projected_inode() : get_previous_projected_inode();
2525 mempool_xattr_map *px = cow_head ? get_projected_xattrs() : get_previous_projected_xattrs();
7c673cae 2526
94b18763 2527 mempool_old_inode &old = old_inodes[follows];
7c673cae
FG
2528 old.first = first;
2529 old.inode = *pi;
2530 old.xattrs = *px;
2531
2532 if (first < oldest_snap)
2533 oldest_snap = first;
2534
2535 dout(10) << " " << px->size() << " xattrs cowed, " << *px << dendl;
2536
2537 old.inode.trim_client_ranges(follows);
2538
2539 if (g_conf->mds_snap_rstat &&
2540 !(old.inode.rstat == old.inode.accounted_rstat))
2541 dirty_old_rstats.insert(follows);
2542
2543 first = follows+1;
2544
2545 dout(10) << "cow_old_inode " << (cow_head ? "head" : "previous_head" )
2546 << " to [" << old.first << "," << follows << "] on "
2547 << *this << dendl;
2548
2549 return old;
2550}
2551
2552void CInode::split_old_inode(snapid_t snap)
2553{
94b18763
FG
2554 auto it = old_inodes.lower_bound(snap);
2555 assert(it != old_inodes.end() && it->second.first < snap);
7c673cae 2556
94b18763
FG
2557 mempool_old_inode &old = old_inodes[snap - 1];
2558 old = it->second;
7c673cae 2559
94b18763
FG
2560 it->second.first = snap;
2561 dout(10) << __func__ << " " << "[" << old.first << "," << it->first
2562 << "] to [" << snap << "," << it->first << "] on " << *this << dendl;
7c673cae
FG
2563}
2564
2565void CInode::pre_cow_old_inode()
2566{
2567 snapid_t follows = find_snaprealm()->get_newest_seq();
2568 if (first <= follows)
2569 cow_old_inode(follows, true);
2570}
2571
2572void CInode::purge_stale_snap_data(const set<snapid_t>& snaps)
2573{
2574 dout(10) << "purge_stale_snap_data " << snaps << dendl;
2575
94b18763
FG
2576 for (auto it = old_inodes.begin(); it != old_inodes.end(); ) {
2577 const snapid_t &id = it->first;
2578 const auto &s = snaps.lower_bound(it->second.first);
2579 if (s == snaps.end() || *s > id) {
2580 dout(10) << " purging old_inode [" << it->second.first << "," << id << "]" << dendl;
2581 it = old_inodes.erase(it);
2582 } else {
2583 ++it;
2584 }
7c673cae
FG
2585 }
2586}
2587
2588/*
2589 * pick/create an old_inode
2590 */
94b18763 2591CInode::mempool_old_inode * CInode::pick_old_inode(snapid_t snap)
7c673cae 2592{
94b18763
FG
2593 auto it = old_inodes.lower_bound(snap); // p is first key >= to snap
2594 if (it != old_inodes.end() && it->second.first <= snap) {
2595 dout(10) << __func__ << " snap " << snap << " -> [" << it->second.first << "," << it->first << "]" << dendl;
2596 return &it->second;
7c673cae
FG
2597 }
2598 dout(10) << "pick_old_inode snap " << snap << " -> nothing" << dendl;
2599 return NULL;
2600}
2601
2602void CInode::open_snaprealm(bool nosplit)
2603{
2604 if (!snaprealm) {
2605 SnapRealm *parent = find_snaprealm();
2606 snaprealm = new SnapRealm(mdcache, this);
2607 if (parent) {
2608 dout(10) << "open_snaprealm " << snaprealm
2609 << " parent is " << parent
2610 << dendl;
2611 dout(30) << " siblings are " << parent->open_children << dendl;
2612 snaprealm->parent = parent;
2613 if (!nosplit)
2614 parent->split_at(snaprealm);
2615 parent->open_children.insert(snaprealm);
2616 }
2617 }
2618}
2619void CInode::close_snaprealm(bool nojoin)
2620{
2621 if (snaprealm) {
2622 dout(15) << "close_snaprealm " << *snaprealm << dendl;
2623 snaprealm->close_parents();
2624 if (snaprealm->parent) {
2625 snaprealm->parent->open_children.erase(snaprealm);
2626 //if (!nojoin)
2627 //snaprealm->parent->join(snaprealm);
2628 }
2629 delete snaprealm;
2630 snaprealm = 0;
2631 }
2632}
2633
2634SnapRealm *CInode::find_snaprealm() const
2635{
2636 const CInode *cur = this;
2637 while (!cur->snaprealm) {
2638 if (cur->get_parent_dn())
2639 cur = cur->get_parent_dn()->get_dir()->get_inode();
2640 else if (get_projected_parent_dn())
2641 cur = cur->get_projected_parent_dn()->get_dir()->get_inode();
2642 else
2643 break;
2644 }
2645 return cur->snaprealm;
2646}
2647
2648void CInode::encode_snap_blob(bufferlist &snapbl)
2649{
2650 if (snaprealm) {
2651 ::encode(snaprealm->srnode, snapbl);
2652 dout(20) << "encode_snap_blob " << *snaprealm << dendl;
2653 }
2654}
2655void CInode::decode_snap_blob(bufferlist& snapbl)
2656{
2657 if (snapbl.length()) {
2658 open_snaprealm();
2659 bufferlist::iterator p = snapbl.begin();
2660 ::decode(snaprealm->srnode, p);
2661 if (is_base()) {
2662 bool ok = snaprealm->_open_parents(NULL);
2663 assert(ok);
2664 }
2665 dout(20) << "decode_snap_blob " << *snaprealm << dendl;
2666 }
2667}
2668
2669void CInode::encode_snap(bufferlist& bl)
2670{
2671 bufferlist snapbl;
2672 encode_snap_blob(snapbl);
2673 ::encode(snapbl, bl);
2674 ::encode(oldest_snap, bl);
2675}
2676
2677void CInode::decode_snap(bufferlist::iterator& p)
2678{
2679 bufferlist snapbl;
2680 ::decode(snapbl, p);
2681 ::decode(oldest_snap, p);
2682 decode_snap_blob(snapbl);
2683}
2684
2685// =============================================
2686
2687client_t CInode::calc_ideal_loner()
2688{
2689 if (mdcache->is_readonly())
2690 return -1;
2691 if (!mds_caps_wanted.empty())
2692 return -1;
2693
2694 int n = 0;
2695 client_t loner = -1;
2696 for (map<client_t,Capability*>::iterator it = client_caps.begin();
2697 it != client_caps.end();
2698 ++it)
2699 if (!it->second->is_stale() &&
2700 ((it->second->wanted() & (CEPH_CAP_ANY_WR|CEPH_CAP_FILE_WR|CEPH_CAP_FILE_RD)) ||
2701 (inode.is_dir() && !has_subtree_root_dirfrag()))) {
2702 if (n)
2703 return -1;
2704 n++;
2705 loner = it->first;
2706 }
2707 return loner;
2708}
2709
b32b8144 2710bool CInode::choose_ideal_loner()
7c673cae
FG
2711{
2712 want_loner_cap = calc_ideal_loner();
b32b8144
FG
2713 int changed = false;
2714 if (loner_cap >= 0 && loner_cap != want_loner_cap) {
2715 if (!try_drop_loner())
2716 return false;
2717 changed = true;
2718 }
2719
2720 if (want_loner_cap >= 0) {
2721 if (loner_cap < 0) {
2722 set_loner_cap(want_loner_cap);
2723 changed = true;
2724 } else
2725 assert(loner_cap == want_loner_cap);
2726 }
2727 return changed;
7c673cae
FG
2728}
2729
2730bool CInode::try_set_loner()
2731{
2732 assert(want_loner_cap >= 0);
2733 if (loner_cap >= 0 && loner_cap != want_loner_cap)
2734 return false;
2735 set_loner_cap(want_loner_cap);
2736 return true;
2737}
2738
2739void CInode::set_loner_cap(client_t l)
2740{
2741 loner_cap = l;
2742 authlock.set_excl_client(loner_cap);
2743 filelock.set_excl_client(loner_cap);
2744 linklock.set_excl_client(loner_cap);
2745 xattrlock.set_excl_client(loner_cap);
2746}
2747
2748bool CInode::try_drop_loner()
2749{
2750 if (loner_cap < 0)
2751 return true;
2752
2753 int other_allowed = get_caps_allowed_by_type(CAP_ANY);
2754 Capability *cap = get_client_cap(loner_cap);
2755 if (!cap ||
2756 (cap->issued() & ~other_allowed) == 0) {
2757 set_loner_cap(-1);
2758 return true;
2759 }
2760 return false;
2761}
2762
2763
2764// choose new lock state during recovery, based on issued caps
2765void CInode::choose_lock_state(SimpleLock *lock, int allissued)
2766{
2767 int shift = lock->get_cap_shift();
2768 int issued = (allissued >> shift) & lock->get_cap_mask();
2769 if (is_auth()) {
2770 if (lock->is_xlocked()) {
2771 // do nothing here
2772 } else if (lock->get_state() != LOCK_MIX) {
2773 if (issued & (CEPH_CAP_GEXCL | CEPH_CAP_GBUFFER))
2774 lock->set_state(LOCK_EXCL);
2775 else if (issued & CEPH_CAP_GWR)
2776 lock->set_state(LOCK_MIX);
2777 else if (lock->is_dirty()) {
2778 if (is_replicated())
2779 lock->set_state(LOCK_MIX);
2780 else
2781 lock->set_state(LOCK_LOCK);
2782 } else
2783 lock->set_state(LOCK_SYNC);
2784 }
2785 } else {
2786 // our states have already been chosen during rejoin.
2787 if (lock->is_xlocked())
2788 assert(lock->get_state() == LOCK_LOCK);
2789 }
2790}
2791
2792void CInode::choose_lock_states(int dirty_caps)
2793{
2794 int issued = get_caps_issued() | dirty_caps;
b32b8144
FG
2795 if (is_auth() && (issued & (CEPH_CAP_ANY_EXCL|CEPH_CAP_ANY_WR)))
2796 choose_ideal_loner();
7c673cae
FG
2797 choose_lock_state(&filelock, issued);
2798 choose_lock_state(&nestlock, issued);
2799 choose_lock_state(&dirfragtreelock, issued);
2800 choose_lock_state(&authlock, issued);
2801 choose_lock_state(&xattrlock, issued);
2802 choose_lock_state(&linklock, issued);
2803}
2804
2805Capability *CInode::add_client_cap(client_t client, Session *session, SnapRealm *conrealm)
2806{
2807 if (client_caps.empty()) {
2808 get(PIN_CAPS);
2809 if (conrealm)
2810 containing_realm = conrealm;
2811 else
2812 containing_realm = find_snaprealm();
2813 containing_realm->inodes_with_caps.push_back(&item_caps);
2814 dout(10) << "add_client_cap first cap, joining realm " << *containing_realm << dendl;
2815 }
2816
2817 if (client_caps.empty())
2818 mdcache->num_inodes_with_caps++;
2819
2820 Capability *cap = new Capability(this, ++mdcache->last_cap_id, client);
2821 assert(client_caps.count(client) == 0);
2822 client_caps[client] = cap;
2823
2824 session->add_cap(cap);
2825 if (session->is_stale())
2826 cap->mark_stale();
2827
2828 cap->client_follows = first-1;
2829
2830 containing_realm->add_cap(client, cap);
2831
2832 return cap;
2833}
2834
2835void CInode::remove_client_cap(client_t client)
2836{
2837 assert(client_caps.count(client) == 1);
2838 Capability *cap = client_caps[client];
2839
2840 cap->item_session_caps.remove_myself();
2841 cap->item_revoking_caps.remove_myself();
2842 cap->item_client_revoking_caps.remove_myself();
2843 containing_realm->remove_cap(client, cap);
2844
2845 if (client == loner_cap)
2846 loner_cap = -1;
2847
2848 delete cap;
2849 client_caps.erase(client);
2850 if (client_caps.empty()) {
2851 dout(10) << "remove_client_cap last cap, leaving realm " << *containing_realm << dendl;
2852 put(PIN_CAPS);
2853 item_caps.remove_myself();
2854 containing_realm = NULL;
2855 item_open_file.remove_myself(); // unpin logsegment
2856 mdcache->num_inodes_with_caps--;
2857 }
2858
2859 //clean up advisory locks
2860 bool fcntl_removed = fcntl_locks ? fcntl_locks->remove_all_from(client) : false;
2861 bool flock_removed = flock_locks ? flock_locks->remove_all_from(client) : false;
2862 if (fcntl_removed || flock_removed) {
2863 list<MDSInternalContextBase*> waiters;
2864 take_waiting(CInode::WAIT_FLOCK, waiters);
2865 mdcache->mds->queue_waiters(waiters);
2866 }
2867}
2868
2869void CInode::move_to_realm(SnapRealm *realm)
2870{
2871 dout(10) << "move_to_realm joining realm " << *realm
2872 << ", leaving realm " << *containing_realm << dendl;
2873 for (map<client_t,Capability*>::iterator q = client_caps.begin();
2874 q != client_caps.end();
2875 ++q) {
2876 containing_realm->remove_cap(q->first, q->second);
2877 realm->add_cap(q->first, q->second);
2878 }
2879 item_caps.remove_myself();
2880 realm->inodes_with_caps.push_back(&item_caps);
2881 containing_realm = realm;
2882}
2883
2884Capability *CInode::reconnect_cap(client_t client, const cap_reconnect_t& icr, Session *session)
2885{
2886 Capability *cap = get_client_cap(client);
2887 if (cap) {
2888 // FIXME?
2889 cap->merge(icr.capinfo.wanted, icr.capinfo.issued);
2890 } else {
2891 cap = add_client_cap(client, session);
2892 cap->set_cap_id(icr.capinfo.cap_id);
2893 cap->set_wanted(icr.capinfo.wanted);
2894 cap->issue_norevoke(icr.capinfo.issued);
2895 cap->reset_seq();
2896 }
2897 cap->set_last_issue_stamp(ceph_clock_now());
2898 return cap;
2899}
2900
2901void CInode::clear_client_caps_after_export()
2902{
2903 while (!client_caps.empty())
2904 remove_client_cap(client_caps.begin()->first);
2905 loner_cap = -1;
2906 want_loner_cap = -1;
2907 mds_caps_wanted.clear();
2908}
2909
2910void CInode::export_client_caps(map<client_t,Capability::Export>& cl)
2911{
2912 for (map<client_t,Capability*>::iterator it = client_caps.begin();
2913 it != client_caps.end();
2914 ++it) {
2915 cl[it->first] = it->second->make_export();
2916 }
2917}
2918
2919 // caps allowed
2920int CInode::get_caps_liked() const
2921{
2922 if (is_dir())
2923 return CEPH_CAP_PIN | CEPH_CAP_ANY_EXCL | CEPH_CAP_ANY_SHARED; // but not, say, FILE_RD|WR|WRBUFFER
2924 else
2925 return CEPH_CAP_ANY & ~CEPH_CAP_FILE_LAZYIO;
2926}
2927
2928int CInode::get_caps_allowed_ever() const
2929{
2930 int allowed;
2931 if (is_dir())
2932 allowed = CEPH_CAP_PIN | CEPH_CAP_ANY_EXCL | CEPH_CAP_ANY_SHARED;
2933 else
2934 allowed = CEPH_CAP_ANY;
2935 return allowed &
2936 (CEPH_CAP_PIN |
2937 (filelock.gcaps_allowed_ever() << filelock.get_cap_shift()) |
2938 (authlock.gcaps_allowed_ever() << authlock.get_cap_shift()) |
2939 (xattrlock.gcaps_allowed_ever() << xattrlock.get_cap_shift()) |
2940 (linklock.gcaps_allowed_ever() << linklock.get_cap_shift()));
2941}
2942
2943int CInode::get_caps_allowed_by_type(int type) const
2944{
2945 return
2946 CEPH_CAP_PIN |
2947 (filelock.gcaps_allowed(type) << filelock.get_cap_shift()) |
2948 (authlock.gcaps_allowed(type) << authlock.get_cap_shift()) |
2949 (xattrlock.gcaps_allowed(type) << xattrlock.get_cap_shift()) |
2950 (linklock.gcaps_allowed(type) << linklock.get_cap_shift());
2951}
2952
2953int CInode::get_caps_careful() const
2954{
2955 return
2956 (filelock.gcaps_careful() << filelock.get_cap_shift()) |
2957 (authlock.gcaps_careful() << authlock.get_cap_shift()) |
2958 (xattrlock.gcaps_careful() << xattrlock.get_cap_shift()) |
2959 (linklock.gcaps_careful() << linklock.get_cap_shift());
2960}
2961
2962int CInode::get_xlocker_mask(client_t client) const
2963{
2964 return
2965 (filelock.gcaps_xlocker_mask(client) << filelock.get_cap_shift()) |
2966 (authlock.gcaps_xlocker_mask(client) << authlock.get_cap_shift()) |
2967 (xattrlock.gcaps_xlocker_mask(client) << xattrlock.get_cap_shift()) |
2968 (linklock.gcaps_xlocker_mask(client) << linklock.get_cap_shift());
2969}
2970
94b18763 2971int CInode::get_caps_allowed_for_client(Session *session, mempool_inode *file_i) const
7c673cae
FG
2972{
2973 client_t client = session->info.inst.name.num();
2974 int allowed;
2975 if (client == get_loner()) {
2976 // as the loner, we get the loner_caps AND any xlocker_caps for things we have xlocked
2977 allowed =
2978 get_caps_allowed_by_type(CAP_LONER) |
2979 (get_caps_allowed_by_type(CAP_XLOCKER) & get_xlocker_mask(client));
2980 } else {
2981 allowed = get_caps_allowed_by_type(CAP_ANY);
2982 }
2983
2984 if (!is_dir()) {
2985 if ((file_i->inline_data.version != CEPH_INLINE_NONE &&
2986 !session->connection->has_feature(CEPH_FEATURE_MDS_INLINE_DATA)) ||
2987 (!file_i->layout.pool_ns.empty() &&
2988 !session->connection->has_feature(CEPH_FEATURE_FS_FILE_LAYOUT_V2)))
2989 allowed &= ~(CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR);
2990 }
2991 return allowed;
2992}
2993
2994// caps issued, wanted
2995int CInode::get_caps_issued(int *ploner, int *pother, int *pxlocker,
2996 int shift, int mask)
2997{
2998 int c = 0;
2999 int loner = 0, other = 0, xlocker = 0;
3000 if (!is_auth()) {
3001 loner_cap = -1;
3002 }
3003
3004 for (map<client_t,Capability*>::const_iterator it = client_caps.begin();
3005 it != client_caps.end();
3006 ++it) {
3007 int i = it->second->issued();
3008 c |= i;
3009 if (it->first == loner_cap)
3010 loner |= i;
3011 else
3012 other |= i;
3013 xlocker |= get_xlocker_mask(it->first) & i;
3014 }
3015 if (ploner) *ploner = (loner >> shift) & mask;
3016 if (pother) *pother = (other >> shift) & mask;
3017 if (pxlocker) *pxlocker = (xlocker >> shift) & mask;
3018 return (c >> shift) & mask;
3019}
3020
3021bool CInode::is_any_caps_wanted() const
3022{
3023 for (map<client_t,Capability*>::const_iterator it = client_caps.begin();
3024 it != client_caps.end();
3025 ++it)
3026 if (it->second->wanted())
3027 return true;
3028 return false;
3029}
3030
3031int CInode::get_caps_wanted(int *ploner, int *pother, int shift, int mask) const
3032{
3033 int w = 0;
3034 int loner = 0, other = 0;
3035 for (map<client_t,Capability*>::const_iterator it = client_caps.begin();
3036 it != client_caps.end();
3037 ++it) {
3038 if (!it->second->is_stale()) {
3039 int t = it->second->wanted();
3040 w |= t;
3041 if (it->first == loner_cap)
3042 loner |= t;
3043 else
3044 other |= t;
3045 }
3046 //cout << " get_caps_wanted client " << it->first << " " << cap_string(it->second.wanted()) << endl;
3047 }
3048 if (is_auth())
94b18763
FG
3049 for (const auto &p : mds_caps_wanted) {
3050 w |= p.second;
3051 other |= p.second;
7c673cae
FG
3052 //cout << " get_caps_wanted mds " << it->first << " " << cap_string(it->second) << endl;
3053 }
3054 if (ploner) *ploner = (loner >> shift) & mask;
3055 if (pother) *pother = (other >> shift) & mask;
3056 return (w >> shift) & mask;
3057}
3058
3059bool CInode::issued_caps_need_gather(SimpleLock *lock)
3060{
3061 int loner_issued, other_issued, xlocker_issued;
3062 get_caps_issued(&loner_issued, &other_issued, &xlocker_issued,
3063 lock->get_cap_shift(), lock->get_cap_mask());
3064 if ((loner_issued & ~lock->gcaps_allowed(CAP_LONER)) ||
3065 (other_issued & ~lock->gcaps_allowed(CAP_ANY)) ||
3066 (xlocker_issued & ~lock->gcaps_allowed(CAP_XLOCKER)))
3067 return true;
3068 return false;
3069}
3070
3071void CInode::replicate_relax_locks()
3072{
3073 //dout(10) << " relaxing locks on " << *this << dendl;
3074 assert(is_auth());
3075 assert(!is_replicated());
3076
3077 authlock.replicate_relax();
3078 linklock.replicate_relax();
3079 dirfragtreelock.replicate_relax();
3080 filelock.replicate_relax();
3081 xattrlock.replicate_relax();
3082 snaplock.replicate_relax();
3083 nestlock.replicate_relax();
3084 flocklock.replicate_relax();
3085 policylock.replicate_relax();
3086}
3087
3088
3089
3090// =============================================
3091
3092int CInode::encode_inodestat(bufferlist& bl, Session *session,
3093 SnapRealm *dir_realm,
3094 snapid_t snapid,
3095 unsigned max_bytes,
3096 int getattr_caps)
3097{
31f18b77 3098 client_t client = session->info.inst.name.num();
7c673cae
FG
3099 assert(snapid);
3100 assert(session->connection);
3101
3102 bool valid = true;
3103
3104 // pick a version!
94b18763
FG
3105 mempool_inode *oi = &inode;
3106 mempool_inode *pi = get_projected_inode();
7c673cae 3107
94b18763 3108 CInode::mempool_xattr_map *pxattrs = nullptr;
7c673cae
FG
3109
3110 if (snapid != CEPH_NOSNAP) {
3111
3112 // for now at least, old_inodes is only defined/valid on the auth
3113 if (!is_auth())
3114 valid = false;
3115
3116 if (is_multiversion()) {
94b18763
FG
3117 auto it = old_inodes.lower_bound(snapid);
3118 if (it != old_inodes.end()) {
3119 if (it->second.first > snapid) {
3120 if (it != old_inodes.begin())
3121 --it;
7c673cae 3122 }
94b18763
FG
3123 if (it->second.first <= snapid && snapid <= it->first) {
3124 dout(15) << __func__ << " snapid " << snapid
3125 << " to old_inode [" << it->second.first << "," << it->first << "]"
3126 << " " << it->second.inode.rstat
7c673cae 3127 << dendl;
94b18763
FG
3128 auto &p = it->second;
3129 pi = oi = &p.inode;
3130 pxattrs = &p.xattrs;
7c673cae
FG
3131 } else {
3132 // snapshoted remote dentry can result this
3133 dout(0) << "encode_inodestat old_inode for snapid " << snapid
3134 << " not found" << dendl;
3135 }
3136 }
3137 } else if (snapid < first || snapid > last) {
3138 // snapshoted remote dentry can result this
3139 dout(0) << "encode_inodestat [" << first << "," << last << "]"
3140 << " not match snapid " << snapid << dendl;
3141 }
3142 }
3143
3144 SnapRealm *realm = find_snaprealm();
3145
3146 bool no_caps = !valid ||
3147 session->is_stale() ||
3148 (dir_realm && realm != dir_realm) ||
3149 is_frozen() ||
3150 state_test(CInode::STATE_EXPORTINGCAPS);
3151 if (no_caps)
3152 dout(20) << "encode_inodestat no caps"
3153 << (!valid?", !valid":"")
3154 << (session->is_stale()?", session stale ":"")
3155 << ((dir_realm && realm != dir_realm)?", snaprealm differs ":"")
3156 << (is_frozen()?", frozen inode":"")
3157 << (state_test(CInode::STATE_EXPORTINGCAPS)?", exporting caps":"")
3158 << dendl;
3159
3160
3161 // "fake" a version that is old (stable) version, +1 if projected.
3162 version_t version = (oi->version * 2) + is_projected();
3163
3164 Capability *cap = get_client_cap(client);
3165 bool pfile = filelock.is_xlocked_by_client(client) || get_loner() == client;
3166 //(cap && (cap->issued() & CEPH_CAP_FILE_EXCL));
3167 bool pauth = authlock.is_xlocked_by_client(client) || get_loner() == client;
3168 bool plink = linklock.is_xlocked_by_client(client) || get_loner() == client;
3169 bool pxattr = xattrlock.is_xlocked_by_client(client) || get_loner() == client;
3170
3171 bool plocal = versionlock.get_last_wrlock_client() == client;
3172 bool ppolicy = policylock.is_xlocked_by_client(client) || get_loner()==client;
3173
94b18763 3174 mempool_inode *any_i = (pfile|pauth|plink|pxattr|plocal) ? pi : oi;
7c673cae
FG
3175
3176 dout(20) << " pfile " << pfile << " pauth " << pauth
3177 << " plink " << plink << " pxattr " << pxattr
3178 << " plocal " << plocal
3179 << " ctime " << any_i->ctime
3180 << " valid=" << valid << dendl;
3181
3182 // file
94b18763 3183 mempool_inode *file_i = pfile ? pi:oi;
7c673cae
FG
3184 file_layout_t layout;
3185 if (is_dir()) {
3186 layout = (ppolicy ? pi : oi)->layout;
3187 } else {
3188 layout = file_i->layout;
3189 }
3190
3191 // max_size is min of projected, actual
3192 uint64_t max_size =
3193 MIN(oi->client_ranges.count(client) ?
3194 oi->client_ranges[client].range.last : 0,
3195 pi->client_ranges.count(client) ?
3196 pi->client_ranges[client].range.last : 0);
3197
3198 // inline data
3199 version_t inline_version = 0;
3200 bufferlist inline_data;
3201 if (file_i->inline_data.version == CEPH_INLINE_NONE) {
3202 inline_version = CEPH_INLINE_NONE;
3203 } else if ((!cap && !no_caps) ||
3204 (cap && cap->client_inline_version < file_i->inline_data.version) ||
3205 (getattr_caps & CEPH_CAP_FILE_RD)) { // client requests inline data
3206 inline_version = file_i->inline_data.version;
3207 if (file_i->inline_data.length() > 0)
3208 inline_data = file_i->inline_data.get_data();
3209 }
3210
3211 // nest (do same as file... :/)
3212 if (cap) {
3213 cap->last_rbytes = file_i->rstat.rbytes;
3214 cap->last_rsize = file_i->rstat.rsize();
3215 }
3216
3217 // auth
94b18763 3218 mempool_inode *auth_i = pauth ? pi:oi;
7c673cae
FG
3219
3220 // link
94b18763 3221 mempool_inode *link_i = plink ? pi:oi;
7c673cae
FG
3222
3223 // xattr
94b18763 3224 mempool_inode *xattr_i = pxattr ? pi:oi;
7c673cae
FG
3225
3226 // xattr
3227 bufferlist xbl;
3228 version_t xattr_version;
3229 if ((!cap && !no_caps) ||
3230 (cap && cap->client_xattr_version < xattr_i->xattr_version) ||
3231 (getattr_caps & CEPH_CAP_XATTR_SHARED)) { // client requests xattrs
3232 if (!pxattrs)
3233 pxattrs = pxattr ? get_projected_xattrs() : &xattrs;
3234 ::encode(*pxattrs, xbl);
3235 xattr_version = xattr_i->xattr_version;
3236 } else {
3237 xattr_version = 0;
3238 }
3239
3240 // do we have room?
3241 if (max_bytes) {
3242 unsigned bytes = 8 + 8 + 4 + 8 + 8 + sizeof(ceph_mds_reply_cap) +
3243 sizeof(struct ceph_file_layout) + 4 + layout.pool_ns.size() +
3244 sizeof(struct ceph_timespec) * 3 +
3245 4 + 8 + 8 + 8 + 4 + 4 + 4 + 4 + 4 +
3246 8 + 8 + 8 + 8 + 8 + sizeof(struct ceph_timespec) +
3247 4;
3248 bytes += sizeof(__u32);
3249 bytes += (sizeof(__u32) + sizeof(__u32)) * dirfragtree._splits.size();
3250 bytes += sizeof(__u32) + symlink.length();
3251 bytes += sizeof(__u32) + xbl.length();
3252 bytes += sizeof(version_t) + sizeof(__u32) + inline_data.length();
3253 if (bytes > max_bytes)
3254 return -ENOSPC;
3255 }
3256
3257
3258 // encode caps
3259 struct ceph_mds_reply_cap ecap;
3260 if (snapid != CEPH_NOSNAP) {
3261 /*
3262 * snapped inodes (files or dirs) only get read-only caps. always
3263 * issue everything possible, since it is read only.
3264 *
3265 * if a snapped inode has caps, limit issued caps based on the
3266 * lock state.
3267 *
3268 * if it is a live inode, limit issued caps based on the lock
3269 * state.
3270 *
3271 * do NOT adjust cap issued state, because the client always
3272 * tracks caps per-snap and the mds does either per-interval or
3273 * multiversion.
3274 */
3275 ecap.caps = valid ? get_caps_allowed_by_type(CAP_ANY) : CEPH_STAT_CAP_INODE;
3276 if (last == CEPH_NOSNAP || is_any_caps())
3277 ecap.caps = ecap.caps & get_caps_allowed_for_client(session, file_i);
3278 ecap.seq = 0;
3279 ecap.mseq = 0;
3280 ecap.realm = 0;
3281 } else {
3282 if (!no_caps && !cap) {
3283 // add a new cap
3284 cap = add_client_cap(client, session, realm);
b32b8144
FG
3285 if (is_auth())
3286 choose_ideal_loner();
7c673cae
FG
3287 }
3288
3289 int issue = 0;
3290 if (!no_caps && cap) {
3291 int likes = get_caps_liked();
3292 int allowed = get_caps_allowed_for_client(session, file_i);
3293 issue = (cap->wanted() | likes) & allowed;
3294 cap->issue_norevoke(issue);
3295 issue = cap->pending();
3296 dout(10) << "encode_inodestat issuing " << ccap_string(issue)
3297 << " seq " << cap->get_last_seq() << dendl;
3298 } else if (cap && cap->is_new() && !dir_realm) {
3299 // alway issue new caps to client, otherwise the caps get lost
3300 assert(cap->is_stale());
3301 issue = cap->pending() | CEPH_CAP_PIN;
3302 cap->issue_norevoke(issue);
3303 dout(10) << "encode_inodestat issuing " << ccap_string(issue)
3304 << " seq " << cap->get_last_seq()
3305 << "(stale|new caps)" << dendl;
3306 }
3307
3308 if (issue) {
3309 cap->set_last_issue();
3310 cap->set_last_issue_stamp(ceph_clock_now());
3311 cap->clear_new();
3312 ecap.caps = issue;
3313 ecap.wanted = cap->wanted();
3314 ecap.cap_id = cap->get_cap_id();
3315 ecap.seq = cap->get_last_seq();
3316 ecap.mseq = cap->get_mseq();
3317 ecap.realm = realm->inode->ino();
3318 } else {
3319 ecap.cap_id = 0;
3320 ecap.caps = 0;
3321 ecap.seq = 0;
3322 ecap.mseq = 0;
3323 ecap.realm = 0;
3324 ecap.wanted = 0;
3325 }
3326 }
3327 ecap.flags = is_auth() ? CEPH_CAP_FLAG_AUTH : 0;
3328 dout(10) << "encode_inodestat caps " << ccap_string(ecap.caps)
3329 << " seq " << ecap.seq << " mseq " << ecap.mseq
3330 << " xattrv " << xattr_version << " len " << xbl.length()
3331 << dendl;
3332
3333 if (inline_data.length() && cap) {
3334 if ((cap->pending() | getattr_caps) & CEPH_CAP_FILE_SHARED) {
3335 dout(10) << "including inline version " << inline_version << dendl;
3336 cap->client_inline_version = inline_version;
3337 } else {
3338 dout(10) << "dropping inline version " << inline_version << dendl;
3339 inline_version = 0;
3340 inline_data.clear();
3341 }
3342 }
3343
3344 // include those xattrs?
3345 if (xbl.length() && cap) {
3346 if ((cap->pending() | getattr_caps) & CEPH_CAP_XATTR_SHARED) {
3347 dout(10) << "including xattrs version " << xattr_i->xattr_version << dendl;
3348 cap->client_xattr_version = xattr_i->xattr_version;
3349 } else {
3350 dout(10) << "dropping xattrs version " << xattr_i->xattr_version << dendl;
3351 xbl.clear(); // no xattrs .. XXX what's this about?!?
3352 xattr_version = 0;
3353 }
3354 }
3355
3356 /*
3357 * note: encoding matches MClientReply::InodeStat
3358 */
3359 ::encode(oi->ino, bl);
3360 ::encode(snapid, bl);
3361 ::encode(oi->rdev, bl);
3362 ::encode(version, bl);
3363
3364 ::encode(xattr_version, bl);
3365
3366 ::encode(ecap, bl);
3367 {
3368 ceph_file_layout legacy_layout;
3369 layout.to_legacy(&legacy_layout);
3370 ::encode(legacy_layout, bl);
3371 }
3372 ::encode(any_i->ctime, bl);
3373 ::encode(file_i->mtime, bl);
3374 ::encode(file_i->atime, bl);
3375 ::encode(file_i->time_warp_seq, bl);
3376 ::encode(file_i->size, bl);
3377 ::encode(max_size, bl);
3378 ::encode(file_i->truncate_size, bl);
3379 ::encode(file_i->truncate_seq, bl);
3380
3381 ::encode(auth_i->mode, bl);
3382 ::encode((uint32_t)auth_i->uid, bl);
3383 ::encode((uint32_t)auth_i->gid, bl);
3384
3385 ::encode(link_i->nlink, bl);
3386
3387 ::encode(file_i->dirstat.nfiles, bl);
3388 ::encode(file_i->dirstat.nsubdirs, bl);
3389 ::encode(file_i->rstat.rbytes, bl);
3390 ::encode(file_i->rstat.rfiles, bl);
3391 ::encode(file_i->rstat.rsubdirs, bl);
3392 ::encode(file_i->rstat.rctime, bl);
3393
3394 dirfragtree.encode(bl);
3395
3396 ::encode(symlink, bl);
3397 if (session->connection->has_feature(CEPH_FEATURE_DIRLAYOUTHASH)) {
3398 ::encode(file_i->dir_layout, bl);
3399 }
3400 ::encode(xbl, bl);
3401 if (session->connection->has_feature(CEPH_FEATURE_MDS_INLINE_DATA)) {
3402 ::encode(inline_version, bl);
3403 ::encode(inline_data, bl);
3404 }
3405 if (session->connection->has_feature(CEPH_FEATURE_MDS_QUOTA)) {
94b18763 3406 mempool_inode *policy_i = ppolicy ? pi : oi;
7c673cae
FG
3407 ::encode(policy_i->quota, bl);
3408 }
3409 if (session->connection->has_feature(CEPH_FEATURE_FS_FILE_LAYOUT_V2)) {
3410 ::encode(layout.pool_ns, bl);
3411 }
3412 if (session->connection->has_feature(CEPH_FEATURE_FS_BTIME)) {
3413 ::encode(any_i->btime, bl);
3414 ::encode(any_i->change_attr, bl);
3415 }
3416
3417 return valid;
3418}
3419
3420void CInode::encode_cap_message(MClientCaps *m, Capability *cap)
3421{
3422 assert(cap);
3423
3424 client_t client = cap->get_client();
3425
3426 bool pfile = filelock.is_xlocked_by_client(client) || (cap->issued() & CEPH_CAP_FILE_EXCL);
3427 bool pauth = authlock.is_xlocked_by_client(client);
3428 bool plink = linklock.is_xlocked_by_client(client);
3429 bool pxattr = xattrlock.is_xlocked_by_client(client);
3430
94b18763
FG
3431 mempool_inode *oi = &inode;
3432 mempool_inode *pi = get_projected_inode();
3433 mempool_inode *i = (pfile|pauth|plink|pxattr) ? pi : oi;
7c673cae
FG
3434
3435 dout(20) << "encode_cap_message pfile " << pfile
3436 << " pauth " << pauth << " plink " << plink << " pxattr " << pxattr
3437 << " ctime " << i->ctime << dendl;
3438
3439 i = pfile ? pi:oi;
3440 m->set_layout(i->layout);
3441 m->size = i->size;
3442 m->truncate_seq = i->truncate_seq;
3443 m->truncate_size = i->truncate_size;
3444 m->mtime = i->mtime;
3445 m->atime = i->atime;
3446 m->ctime = i->ctime;
3447 m->change_attr = i->change_attr;
3448 m->time_warp_seq = i->time_warp_seq;
28e407b8
AA
3449 m->nfiles = i->dirstat.nfiles;
3450 m->nsubdirs = i->dirstat.nsubdirs;
7c673cae
FG
3451
3452 if (cap->client_inline_version < i->inline_data.version) {
3453 m->inline_version = cap->client_inline_version = i->inline_data.version;
3454 if (i->inline_data.length() > 0)
3455 m->inline_data = i->inline_data.get_data();
3456 } else {
3457 m->inline_version = 0;
3458 }
3459
3460 // max_size is min of projected, actual.
3461 uint64_t oldms = oi->client_ranges.count(client) ? oi->client_ranges[client].range.last : 0;
3462 uint64_t newms = pi->client_ranges.count(client) ? pi->client_ranges[client].range.last : 0;
3463 m->max_size = MIN(oldms, newms);
3464
3465 i = pauth ? pi:oi;
3466 m->head.mode = i->mode;
3467 m->head.uid = i->uid;
3468 m->head.gid = i->gid;
3469
3470 i = plink ? pi:oi;
3471 m->head.nlink = i->nlink;
3472
3473 i = pxattr ? pi:oi;
94b18763 3474 auto ix = pxattr ? get_projected_xattrs() : &xattrs;
7c673cae
FG
3475 if ((cap->pending() & CEPH_CAP_XATTR_SHARED) &&
3476 i->xattr_version > cap->client_xattr_version) {
3477 dout(10) << " including xattrs v " << i->xattr_version << dendl;
3478 ::encode(*ix, m->xattrbl);
3479 m->head.xattr_version = i->xattr_version;
3480 cap->client_xattr_version = i->xattr_version;
3481 }
3482}
3483
3484
3485
3486void CInode::_encode_base(bufferlist& bl, uint64_t features)
3487{
3488 ::encode(first, bl);
3489 ::encode(inode, bl, features);
3490 ::encode(symlink, bl);
3491 ::encode(dirfragtree, bl);
3492 ::encode(xattrs, bl);
3493 ::encode(old_inodes, bl, features);
3494 ::encode(damage_flags, bl);
3495 encode_snap(bl);
3496}
3497void CInode::_decode_base(bufferlist::iterator& p)
3498{
3499 ::decode(first, p);
3500 ::decode(inode, p);
94b18763
FG
3501 {
3502 std::string tmp;
3503 ::decode(tmp, p);
3504 symlink = mempool::mds_co::string(boost::string_view(tmp));
3505 }
7c673cae
FG
3506 ::decode(dirfragtree, p);
3507 ::decode(xattrs, p);
3508 ::decode(old_inodes, p);
3509 ::decode(damage_flags, p);
3510 decode_snap(p);
3511}
3512
3513void CInode::_encode_locks_full(bufferlist& bl)
3514{
3515 ::encode(authlock, bl);
3516 ::encode(linklock, bl);
3517 ::encode(dirfragtreelock, bl);
3518 ::encode(filelock, bl);
3519 ::encode(xattrlock, bl);
3520 ::encode(snaplock, bl);
3521 ::encode(nestlock, bl);
3522 ::encode(flocklock, bl);
3523 ::encode(policylock, bl);
3524
3525 ::encode(loner_cap, bl);
3526}
3527void CInode::_decode_locks_full(bufferlist::iterator& p)
3528{
3529 ::decode(authlock, p);
3530 ::decode(linklock, p);
3531 ::decode(dirfragtreelock, p);
3532 ::decode(filelock, p);
3533 ::decode(xattrlock, p);
3534 ::decode(snaplock, p);
3535 ::decode(nestlock, p);
3536 ::decode(flocklock, p);
3537 ::decode(policylock, p);
3538
3539 ::decode(loner_cap, p);
3540 set_loner_cap(loner_cap);
3541 want_loner_cap = loner_cap; // for now, we'll eval() shortly.
3542}
3543
b32b8144 3544void CInode::_encode_locks_state_for_replica(bufferlist& bl, bool need_recover)
7c673cae
FG
3545{
3546 authlock.encode_state_for_replica(bl);
3547 linklock.encode_state_for_replica(bl);
3548 dirfragtreelock.encode_state_for_replica(bl);
3549 filelock.encode_state_for_replica(bl);
3550 nestlock.encode_state_for_replica(bl);
3551 xattrlock.encode_state_for_replica(bl);
3552 snaplock.encode_state_for_replica(bl);
3553 flocklock.encode_state_for_replica(bl);
3554 policylock.encode_state_for_replica(bl);
b32b8144 3555 ::encode(need_recover, bl);
7c673cae 3556}
b32b8144 3557
7c673cae
FG
3558void CInode::_encode_locks_state_for_rejoin(bufferlist& bl, int rep)
3559{
3560 authlock.encode_state_for_replica(bl);
3561 linklock.encode_state_for_replica(bl);
3562 dirfragtreelock.encode_state_for_rejoin(bl, rep);
3563 filelock.encode_state_for_rejoin(bl, rep);
3564 nestlock.encode_state_for_rejoin(bl, rep);
3565 xattrlock.encode_state_for_replica(bl);
3566 snaplock.encode_state_for_replica(bl);
3567 flocklock.encode_state_for_replica(bl);
3568 policylock.encode_state_for_replica(bl);
3569}
b32b8144 3570
7c673cae
FG
3571void CInode::_decode_locks_state(bufferlist::iterator& p, bool is_new)
3572{
3573 authlock.decode_state(p, is_new);
3574 linklock.decode_state(p, is_new);
3575 dirfragtreelock.decode_state(p, is_new);
3576 filelock.decode_state(p, is_new);
3577 nestlock.decode_state(p, is_new);
3578 xattrlock.decode_state(p, is_new);
3579 snaplock.decode_state(p, is_new);
3580 flocklock.decode_state(p, is_new);
3581 policylock.decode_state(p, is_new);
b32b8144
FG
3582
3583 bool need_recover;
3584 ::decode(need_recover, p);
3585 if (need_recover && is_new) {
3586 // Auth mds replicated this inode while it's recovering. Auth mds may take xlock on the lock
3587 // and change the object when replaying unsafe requests.
3588 authlock.mark_need_recover();
3589 linklock.mark_need_recover();
3590 dirfragtreelock.mark_need_recover();
3591 filelock.mark_need_recover();
3592 nestlock.mark_need_recover();
3593 xattrlock.mark_need_recover();
3594 snaplock.mark_need_recover();
3595 flocklock.mark_need_recover();
3596 policylock.mark_need_recover();
3597 }
7c673cae
FG
3598}
3599void CInode::_decode_locks_rejoin(bufferlist::iterator& p, list<MDSInternalContextBase*>& waiters,
b32b8144
FG
3600 list<SimpleLock*>& eval_locks, bool survivor)
3601{
3602 authlock.decode_state_rejoin(p, waiters, survivor);
3603 linklock.decode_state_rejoin(p, waiters, survivor);
3604 dirfragtreelock.decode_state_rejoin(p, waiters, survivor);
3605 filelock.decode_state_rejoin(p, waiters, survivor);
3606 nestlock.decode_state_rejoin(p, waiters, survivor);
3607 xattrlock.decode_state_rejoin(p, waiters, survivor);
3608 snaplock.decode_state_rejoin(p, waiters, survivor);
3609 flocklock.decode_state_rejoin(p, waiters, survivor);
3610 policylock.decode_state_rejoin(p, waiters, survivor);
7c673cae
FG
3611
3612 if (!dirfragtreelock.is_stable() && !dirfragtreelock.is_wrlocked())
3613 eval_locks.push_back(&dirfragtreelock);
3614 if (!filelock.is_stable() && !filelock.is_wrlocked())
3615 eval_locks.push_back(&filelock);
3616 if (!nestlock.is_stable() && !nestlock.is_wrlocked())
3617 eval_locks.push_back(&nestlock);
3618}
3619
3620
3621// IMPORT/EXPORT
3622
3623void CInode::encode_export(bufferlist& bl)
3624{
3625 ENCODE_START(5, 4, bl);
3626 _encode_base(bl, mdcache->mds->mdsmap->get_up_features());
3627
3628 ::encode(state, bl);
3629
3630 ::encode(pop, bl);
3631
181888fb 3632 ::encode(get_replicas(), bl);
7c673cae
FG
3633
3634 // include scatterlock info for any bounding CDirs
3635 bufferlist bounding;
3636 if (inode.is_dir())
94b18763
FG
3637 for (const auto &p : dirfrags) {
3638 CDir *dir = p.second;
7c673cae 3639 if (dir->state_test(CDir::STATE_EXPORTBOUND)) {
94b18763 3640 ::encode(p.first, bounding);
7c673cae
FG
3641 ::encode(dir->fnode.fragstat, bounding);
3642 ::encode(dir->fnode.accounted_fragstat, bounding);
3643 ::encode(dir->fnode.rstat, bounding);
3644 ::encode(dir->fnode.accounted_rstat, bounding);
3645 dout(10) << " encoded fragstat/rstat info for " << *dir << dendl;
3646 }
3647 }
3648 ::encode(bounding, bl);
3649
3650 _encode_locks_full(bl);
3651
3652 _encode_file_locks(bl);
3653
3654 ENCODE_FINISH(bl);
3655
3656 get(PIN_TEMPEXPORTING);
3657}
3658
3659void CInode::finish_export(utime_t now)
3660{
3661 state &= MASK_STATE_EXPORT_KEPT;
3662
3663 pop.zero(now);
3664
3665 // just in case!
3666 //dirlock.clear_updated();
3667
3668 loner_cap = -1;
3669
3670 put(PIN_TEMPEXPORTING);
3671}
3672
3673void CInode::decode_import(bufferlist::iterator& p,
3674 LogSegment *ls)
3675{
3676 DECODE_START(5, p);
3677
3678 _decode_base(p);
3679
3680 unsigned s;
3681 ::decode(s, p);
3682 state_set(STATE_AUTH | (s & MASK_STATE_EXPORTED));
3683
3684 if (is_dirty()) {
3685 get(PIN_DIRTY);
3686 _mark_dirty(ls);
3687 }
3688 if (is_dirty_parent()) {
3689 get(PIN_DIRTYPARENT);
28e407b8 3690 mark_dirty_parent(ls);
7c673cae
FG
3691 }
3692
3693 ::decode(pop, ceph_clock_now(), p);
3694
181888fb
FG
3695 ::decode(get_replicas(), p);
3696 if (is_replicated())
7c673cae
FG
3697 get(PIN_REPLICATED);
3698 replica_nonce = 0;
3699
3700 // decode fragstat info on bounding cdirs
3701 bufferlist bounding;
3702 ::decode(bounding, p);
3703 bufferlist::iterator q = bounding.begin();
3704 while (!q.end()) {
3705 frag_t fg;
3706 ::decode(fg, q);
3707 CDir *dir = get_dirfrag(fg);
3708 assert(dir); // we should have all bounds open
3709
3710 // Only take the remote's fragstat/rstat if we are non-auth for
3711 // this dirfrag AND the lock is NOT in a scattered (MIX) state.
3712 // We know lock is stable, and MIX is the only state in which
3713 // the inode auth (who sent us this data) may not have the best
3714 // info.
3715
3716 // HMM: Are there cases where dir->is_auth() is an insufficient
3717 // check because the dirfrag is under migration? That implies
3718 // it is frozen (and in a SYNC or LOCK state). FIXME.
3719
3720 if (dir->is_auth() ||
3721 filelock.get_state() == LOCK_MIX) {
3722 dout(10) << " skipped fragstat info for " << *dir << dendl;
3723 frag_info_t f;
3724 ::decode(f, q);
3725 ::decode(f, q);
3726 } else {
3727 ::decode(dir->fnode.fragstat, q);
3728 ::decode(dir->fnode.accounted_fragstat, q);
3729 dout(10) << " took fragstat info for " << *dir << dendl;
3730 }
3731 if (dir->is_auth() ||
3732 nestlock.get_state() == LOCK_MIX) {
3733 dout(10) << " skipped rstat info for " << *dir << dendl;
3734 nest_info_t n;
3735 ::decode(n, q);
3736 ::decode(n, q);
3737 } else {
3738 ::decode(dir->fnode.rstat, q);
3739 ::decode(dir->fnode.accounted_rstat, q);
3740 dout(10) << " took rstat info for " << *dir << dendl;
3741 }
3742 }
3743
3744 _decode_locks_full(p);
3745
3746 _decode_file_locks(p);
3747
3748 DECODE_FINISH(p);
3749}
3750
3751
3752void InodeStoreBase::dump(Formatter *f) const
3753{
3754 inode.dump(f);
3755 f->dump_string("symlink", symlink);
3756 f->open_array_section("old_inodes");
94b18763 3757 for (const auto &p : old_inodes) {
7c673cae 3758 f->open_object_section("old_inode");
94b18763
FG
3759 // The key is the last snapid, the first is in the mempool_old_inode
3760 f->dump_int("last", p.first);
3761 p.second.dump(f);
7c673cae
FG
3762 f->close_section(); // old_inode
3763 }
3764 f->close_section(); // old_inodes
3765
3766 f->open_object_section("dirfragtree");
3767 dirfragtree.dump(f);
3768 f->close_section(); // dirfragtree
3769}
3770
3771
3772void InodeStore::generate_test_instances(list<InodeStore*> &ls)
3773{
3774 InodeStore *populated = new InodeStore;
3775 populated->inode.ino = 0xdeadbeef;
3776 populated->symlink = "rhubarb";
3777 ls.push_back(populated);
3778}
3779
3780void CInode::validate_disk_state(CInode::validated_data *results,
3781 MDSInternalContext *fin)
3782{
3783 class ValidationContinuation : public MDSContinuation {
3784 public:
3785 MDSInternalContext *fin;
3786 CInode *in;
3787 CInode::validated_data *results;
3788 bufferlist bl;
3789 CInode *shadow_in;
3790
3791 enum {
3792 START = 0,
3793 BACKTRACE,
3794 INODE,
3795 DIRFRAGS
3796 };
3797
3798 ValidationContinuation(CInode *i,
3799 CInode::validated_data *data_r,
3800 MDSInternalContext *fin_) :
3801 MDSContinuation(i->mdcache->mds->server),
3802 fin(fin_),
3803 in(i),
3804 results(data_r),
3805 shadow_in(NULL) {
3806 set_callback(START, static_cast<Continuation::stagePtr>(&ValidationContinuation::_start));
3807 set_callback(BACKTRACE, static_cast<Continuation::stagePtr>(&ValidationContinuation::_backtrace));
3808 set_callback(INODE, static_cast<Continuation::stagePtr>(&ValidationContinuation::_inode_disk));
3809 set_callback(DIRFRAGS, static_cast<Continuation::stagePtr>(&ValidationContinuation::_dirfrags));
3810 }
3811
3812 ~ValidationContinuation() override {
b32b8144
FG
3813 if (shadow_in) {
3814 delete shadow_in;
3815 in->mdcache->num_shadow_inodes--;
3816 }
7c673cae
FG
3817 }
3818
3819 /**
3820 * Fetch backtrace and set tag if tag is non-empty
3821 */
94b18763 3822 void fetch_backtrace_and_tag(CInode *in, boost::string_view tag,
7c673cae
FG
3823 Context *fin, int *bt_r, bufferlist *bt)
3824 {
3825 const int64_t pool = in->get_backtrace_pool();
3826 object_t oid = CInode::get_object_name(in->ino(), frag_t(), "");
3827
3828 ObjectOperation fetch;
3829 fetch.getxattr("parent", bt, bt_r);
3830 in->mdcache->mds->objecter->read(oid, object_locator_t(pool), fetch, CEPH_NOSNAP,
3831 NULL, 0, fin);
3832 if (!tag.empty()) {
3833 ObjectOperation scrub_tag;
3834 bufferlist tag_bl;
3835 ::encode(tag, tag_bl);
3836 scrub_tag.setxattr("scrub_tag", tag_bl);
3837 SnapContext snapc;
3838 in->mdcache->mds->objecter->mutate(oid, object_locator_t(pool), scrub_tag, snapc,
3839 ceph::real_clock::now(),
3840 0, NULL);
3841 }
3842 }
3843
3844 bool _start(int rval) {
3845 if (in->is_dirty()) {
3846 MDCache *mdcache = in->mdcache;
94b18763 3847 mempool_inode& inode = in->inode;
7c673cae
FG
3848 dout(20) << "validating a dirty CInode; results will be inconclusive"
3849 << dendl;
3850 }
3851 if (in->is_symlink()) {
3852 // there's nothing to do for symlinks!
3853 return true;
3854 }
3855
3856 C_OnFinisher *conf = new C_OnFinisher(get_io_callback(BACKTRACE),
3857 in->mdcache->mds->finisher);
3858
3859 // Whether we have a tag to apply depends on ScrubHeader (if one is
3860 // present)
3861 if (in->scrub_infop) {
3862 // I'm a non-orphan, so look up my ScrubHeader via my linkage
94b18763 3863 boost::string_view tag = in->scrub_infop->header->get_tag();
7c673cae
FG
3864 // Rather than using the usual CInode::fetch_backtrace,
3865 // use a special variant that optionally writes a tag in the same
3866 // operation.
3867 fetch_backtrace_and_tag(in, tag, conf,
3868 &results->backtrace.ondisk_read_retval, &bl);
3869 } else {
3870 // When we're invoked outside of ScrubStack we might be called
3871 // on an orphaned inode like /
3872 fetch_backtrace_and_tag(in, {}, conf,
3873 &results->backtrace.ondisk_read_retval, &bl);
3874 }
3875 return false;
3876 }
3877
3878 bool _backtrace(int rval) {
3879 // set up basic result reporting and make sure we got the data
3880 results->performed_validation = true; // at least, some of it!
3881 results->backtrace.checked = true;
3882
3883 const int64_t pool = in->get_backtrace_pool();
3884 inode_backtrace_t& memory_backtrace = results->backtrace.memory_value;
3885 in->build_backtrace(pool, memory_backtrace);
3886 bool equivalent, divergent;
3887 int memory_newer;
3888
3889 MDCache *mdcache = in->mdcache; // For the benefit of dout
94b18763 3890 const mempool_inode& inode = in->inode; // For the benefit of dout
7c673cae
FG
3891
3892 // Ignore rval because it's the result of a FAILOK operation
3893 // from fetch_backtrace_and_tag: the real result is in
3894 // backtrace.ondisk_read_retval
3895 dout(20) << "ondisk_read_retval: " << results->backtrace.ondisk_read_retval << dendl;
3896 if (results->backtrace.ondisk_read_retval != 0) {
3897 results->backtrace.error_str << "failed to read off disk; see retval";
3898 goto next;
3899 }
3900
3901 // extract the backtrace, and compare it to a newly-constructed one
3902 try {
3903 bufferlist::iterator p = bl.begin();
3904 ::decode(results->backtrace.ondisk_value, p);
3905 dout(10) << "decoded " << bl.length() << " bytes of backtrace successfully" << dendl;
3906 } catch (buffer::error&) {
3907 if (results->backtrace.ondisk_read_retval == 0 && rval != 0) {
3908 // Cases where something has clearly gone wrong with the overall
3909 // fetch op, though we didn't get a nonzero rc from the getxattr
3910 // operation. e.g. object missing.
3911 results->backtrace.ondisk_read_retval = rval;
3912 }
3913 results->backtrace.error_str << "failed to decode on-disk backtrace ("
3914 << bl.length() << " bytes)!";
3915 goto next;
3916 }
3917
3918 memory_newer = memory_backtrace.compare(results->backtrace.ondisk_value,
3919 &equivalent, &divergent);
3920
3921 if (divergent || memory_newer < 0) {
3922 // we're divergent, or on-disk version is newer
3923 results->backtrace.error_str << "On-disk backtrace is divergent or newer";
3924 } else {
3925 results->backtrace.passed = true;
3926 }
3927next:
3928
3929 if (!results->backtrace.passed && in->scrub_infop->header->get_repair()) {
3930 std::string path;
3931 in->make_path_string(path);
d2e6a577
FG
3932 in->mdcache->mds->clog->warn() << "bad backtrace on inode " << in->ino()
3933 << "(" << path << "), rewriting it";
28e407b8 3934 in->mark_dirty_parent(in->mdcache->mds->mdlog->get_current_segment(),
7c673cae 3935 false);
b32b8144
FG
3936 // Flag that we repaired this BT so that it won't go into damagetable
3937 results->backtrace.repaired = true;
3938
3939 // Flag that we did some repair work so that our repair operation
3940 // can be flushed at end of scrub
3941 in->scrub_infop->header->set_repaired();
7c673cae
FG
3942 }
3943
3944 // If the inode's number was free in the InoTable, fix that
3945 // (#15619)
3946 {
3947 InoTable *inotable = mdcache->mds->inotable;
3948
d2e6a577 3949 dout(10) << "scrub: inotable ino = " << inode.ino << dendl;
7c673cae
FG
3950 dout(10) << "scrub: inotable free says "
3951 << inotable->is_marked_free(inode.ino) << dendl;
3952
3953 if (inotable->is_marked_free(inode.ino)) {
3954 LogChannelRef clog = in->mdcache->mds->clog;
3955 clog->error() << "scrub: inode wrongly marked free: 0x" << std::hex
3956 << inode.ino;
3957
3958 if (in->scrub_infop->header->get_repair()) {
3959 bool repaired = inotable->repair(inode.ino);
3960 if (repaired) {
3961 clog->error() << "inode table repaired for inode: 0x" << std::hex
3962 << inode.ino;
3963
3964 inotable->save();
3965 } else {
3966 clog->error() << "Cannot repair inotable while other operations"
3967 " are in progress";
3968 }
3969 }
3970 }
3971 }
3972
3973 // quit if we're a file, or kick off directory checks otherwise
3974 // TODO: validate on-disk inode for non-base directories
3975 if (!in->is_dir()) {
3976 return true;
3977 }
3978
3979 return validate_directory_data();
3980 }
3981
3982 bool validate_directory_data() {
3983 assert(in->is_dir());
3984
3985 if (in->is_base()) {
b32b8144
FG
3986 if (!shadow_in) {
3987 shadow_in = new CInode(in->mdcache);
3988 in->mdcache->create_unlinked_system_inode(shadow_in, in->inode.ino, in->inode.mode);
3989 in->mdcache->num_shadow_inodes++;
3990 }
7c673cae
FG
3991 shadow_in->fetch(get_internal_callback(INODE));
3992 return false;
3993 } else {
3994 results->inode.passed = true;
3995 return check_dirfrag_rstats();
3996 }
3997 }
3998
3999 bool _inode_disk(int rval) {
4000 results->inode.checked = true;
4001 results->inode.ondisk_read_retval = rval;
4002 results->inode.ondisk_value = shadow_in->inode;
4003 results->inode.memory_value = in->inode;
4004
94b18763
FG
4005 mempool_inode& si = shadow_in->inode;
4006 mempool_inode& i = in->inode;
7c673cae
FG
4007 if (si.version > i.version) {
4008 // uh, what?
4009 results->inode.error_str << "On-disk inode is newer than in-memory one!";
4010 goto next;
4011 } else {
4012 bool divergent = false;
4013 int r = i.compare(si, &divergent);
4014 results->inode.passed = !divergent && r >= 0;
4015 if (!results->inode.passed) {
4016 results->inode.error_str <<
4017 "On-disk inode is divergent or newer than in-memory one!";
4018 goto next;
4019 }
4020 }
4021next:
4022 return check_dirfrag_rstats();
4023 }
4024
4025 bool check_dirfrag_rstats() {
4026 MDSGatherBuilder gather(g_ceph_context);
4027 std::list<frag_t> frags;
4028 in->dirfragtree.get_leaves(frags);
4029 for (list<frag_t>::iterator p = frags.begin();
4030 p != frags.end();
4031 ++p) {
4032 CDir *dir = in->get_or_open_dirfrag(in->mdcache, *p);
4033 dir->scrub_info();
4034 if (!dir->scrub_infop->header)
4035 dir->scrub_infop->header = in->scrub_infop->header;
4036 if (dir->is_complete()) {
4037 dir->scrub_local();
4038 } else {
4039 dir->scrub_infop->need_scrub_local = true;
4040 dir->fetch(gather.new_sub(), false);
4041 }
4042 }
4043 if (gather.has_subs()) {
4044 gather.set_finisher(get_internal_callback(DIRFRAGS));
4045 gather.activate();
4046 return false;
4047 } else {
4048 return immediate(DIRFRAGS, 0);
4049 }
4050 }
4051
4052 bool _dirfrags(int rval) {
4053 int frags_errors = 0;
4054 // basic reporting setup
4055 results->raw_stats.checked = true;
4056 results->raw_stats.ondisk_read_retval = rval;
4057
4058 results->raw_stats.memory_value.dirstat = in->inode.dirstat;
4059 results->raw_stats.memory_value.rstat = in->inode.rstat;
4060 frag_info_t& dir_info = results->raw_stats.ondisk_value.dirstat;
4061 nest_info_t& nest_info = results->raw_stats.ondisk_value.rstat;
4062
4063 if (rval != 0) {
4064 results->raw_stats.error_str << "Failed to read dirfrags off disk";
4065 goto next;
4066 }
4067
4068 // check each dirfrag...
94b18763
FG
4069 for (const auto &p : in->dirfrags) {
4070 CDir *dir = p.second;
7c673cae
FG
4071 assert(dir->get_version() > 0);
4072 nest_info.add(dir->fnode.accounted_rstat);
4073 dir_info.add(dir->fnode.accounted_fragstat);
4074 if (dir->scrub_infop &&
4075 dir->scrub_infop->pending_scrub_error) {
4076 dir->scrub_infop->pending_scrub_error = false;
4077 if (dir->scrub_infop->header->get_repair()) {
b32b8144 4078 results->raw_stats.repaired = true;
7c673cae 4079 results->raw_stats.error_str
94b18763 4080 << "dirfrag(" << p.first << ") has bad stats (will be fixed); ";
7c673cae
FG
4081 } else {
4082 results->raw_stats.error_str
94b18763 4083 << "dirfrag(" << p.first << ") has bad stats; ";
7c673cae
FG
4084 }
4085 frags_errors++;
4086 }
4087 }
4088 nest_info.rsubdirs++; // it gets one to account for self
4089 // ...and that their sum matches our inode settings
4090 if (!dir_info.same_sums(in->inode.dirstat) ||
4091 !nest_info.same_sums(in->inode.rstat)) {
4092 if (in->scrub_infop &&
4093 in->scrub_infop->header->get_repair()) {
4094 results->raw_stats.error_str
4095 << "freshly-calculated rstats don't match existing ones (will be fixed)";
4096 in->mdcache->repair_inode_stats(in);
b32b8144 4097 results->raw_stats.repaired = true;
7c673cae
FG
4098 } else {
4099 results->raw_stats.error_str
4100 << "freshly-calculated rstats don't match existing ones";
4101 }
4102 goto next;
4103 }
4104 if (frags_errors > 0)
4105 goto next;
4106
4107 results->raw_stats.passed = true;
4108next:
4109 return true;
4110 }
4111
4112 void _done() override {
4113 if ((!results->raw_stats.checked || results->raw_stats.passed) &&
4114 (!results->backtrace.checked || results->backtrace.passed) &&
4115 (!results->inode.checked || results->inode.passed))
4116 results->passed_validation = true;
4117 if (fin) {
4118 fin->complete(get_rval());
4119 }
4120 }
4121 };
4122
4123
4124 dout(10) << "scrub starting validate_disk_state on " << *this << dendl;
4125 ValidationContinuation *vc = new ValidationContinuation(this,
4126 results,
4127 fin);
4128 vc->begin();
4129}
4130
4131void CInode::validated_data::dump(Formatter *f) const
4132{
4133 f->open_object_section("results");
4134 {
4135 f->dump_bool("performed_validation", performed_validation);
4136 f->dump_bool("passed_validation", passed_validation);
4137 f->open_object_section("backtrace");
4138 {
4139 f->dump_bool("checked", backtrace.checked);
4140 f->dump_bool("passed", backtrace.passed);
4141 f->dump_int("read_ret_val", backtrace.ondisk_read_retval);
4142 f->dump_stream("ondisk_value") << backtrace.ondisk_value;
4143 f->dump_stream("memoryvalue") << backtrace.memory_value;
4144 f->dump_string("error_str", backtrace.error_str.str());
4145 }
4146 f->close_section(); // backtrace
4147 f->open_object_section("raw_stats");
4148 {
4149 f->dump_bool("checked", raw_stats.checked);
4150 f->dump_bool("passed", raw_stats.passed);
4151 f->dump_int("read_ret_val", raw_stats.ondisk_read_retval);
4152 f->dump_stream("ondisk_value.dirstat") << raw_stats.ondisk_value.dirstat;
4153 f->dump_stream("ondisk_value.rstat") << raw_stats.ondisk_value.rstat;
4154 f->dump_stream("memory_value.dirrstat") << raw_stats.memory_value.dirstat;
4155 f->dump_stream("memory_value.rstat") << raw_stats.memory_value.rstat;
4156 f->dump_string("error_str", raw_stats.error_str.str());
4157 }
4158 f->close_section(); // raw_stats
4159 // dump failure return code
4160 int rc = 0;
4161 if (backtrace.checked && backtrace.ondisk_read_retval)
4162 rc = backtrace.ondisk_read_retval;
4163 if (inode.checked && inode.ondisk_read_retval)
4164 rc = inode.ondisk_read_retval;
4165 if (raw_stats.checked && raw_stats.ondisk_read_retval)
4166 rc = raw_stats.ondisk_read_retval;
4167 f->dump_int("return_code", rc);
4168 }
4169 f->close_section(); // results
4170}
4171
b32b8144
FG
4172bool CInode::validated_data::all_damage_repaired() const
4173{
4174 bool unrepaired =
4175 (raw_stats.checked && !raw_stats.passed && !raw_stats.repaired)
4176 ||
4177 (backtrace.checked && !backtrace.passed && !backtrace.repaired)
4178 ||
4179 (inode.checked && !inode.passed && !inode.repaired);
4180
4181 return !unrepaired;
4182}
4183
7c673cae
FG
4184void CInode::dump(Formatter *f) const
4185{
4186 InodeStoreBase::dump(f);
4187
4188 MDSCacheObject::dump(f);
4189
4190 f->open_object_section("versionlock");
4191 versionlock.dump(f);
4192 f->close_section();
4193
4194 f->open_object_section("authlock");
4195 authlock.dump(f);
4196 f->close_section();
4197
4198 f->open_object_section("linklock");
4199 linklock.dump(f);
4200 f->close_section();
4201
4202 f->open_object_section("dirfragtreelock");
4203 dirfragtreelock.dump(f);
4204 f->close_section();
4205
4206 f->open_object_section("filelock");
4207 filelock.dump(f);
4208 f->close_section();
4209
4210 f->open_object_section("xattrlock");
4211 xattrlock.dump(f);
4212 f->close_section();
4213
4214 f->open_object_section("snaplock");
4215 snaplock.dump(f);
4216 f->close_section();
4217
4218 f->open_object_section("nestlock");
4219 nestlock.dump(f);
4220 f->close_section();
4221
4222 f->open_object_section("flocklock");
4223 flocklock.dump(f);
4224 f->close_section();
4225
4226 f->open_object_section("policylock");
4227 policylock.dump(f);
4228 f->close_section();
4229
4230 f->open_array_section("states");
4231 MDSCacheObject::dump_states(f);
4232 if (state_test(STATE_EXPORTING))
4233 f->dump_string("state", "exporting");
4234 if (state_test(STATE_OPENINGDIR))
4235 f->dump_string("state", "openingdir");
4236 if (state_test(STATE_FREEZING))
4237 f->dump_string("state", "freezing");
4238 if (state_test(STATE_FROZEN))
4239 f->dump_string("state", "frozen");
4240 if (state_test(STATE_AMBIGUOUSAUTH))
4241 f->dump_string("state", "ambiguousauth");
4242 if (state_test(STATE_EXPORTINGCAPS))
4243 f->dump_string("state", "exportingcaps");
4244 if (state_test(STATE_NEEDSRECOVER))
4245 f->dump_string("state", "needsrecover");
4246 if (state_test(STATE_PURGING))
4247 f->dump_string("state", "purging");
4248 if (state_test(STATE_DIRTYPARENT))
4249 f->dump_string("state", "dirtyparent");
4250 if (state_test(STATE_DIRTYRSTAT))
4251 f->dump_string("state", "dirtyrstat");
4252 if (state_test(STATE_STRAYPINNED))
4253 f->dump_string("state", "straypinned");
4254 if (state_test(STATE_FROZENAUTHPIN))
4255 f->dump_string("state", "frozenauthpin");
4256 if (state_test(STATE_DIRTYPOOL))
4257 f->dump_string("state", "dirtypool");
4258 if (state_test(STATE_ORPHAN))
4259 f->dump_string("state", "orphan");
4260 if (state_test(STATE_MISSINGOBJS))
4261 f->dump_string("state", "missingobjs");
4262 f->close_section();
4263
4264 f->open_array_section("client_caps");
4265 for (map<client_t,Capability*>::const_iterator it = client_caps.begin();
4266 it != client_caps.end(); ++it) {
4267 f->open_object_section("client_cap");
4268 f->dump_int("client_id", it->first.v);
4269 f->dump_string("pending", ccap_string(it->second->pending()));
4270 f->dump_string("issued", ccap_string(it->second->issued()));
4271 f->dump_string("wanted", ccap_string(it->second->wanted()));
b32b8144 4272 f->dump_int("last_sent", it->second->get_last_sent());
7c673cae
FG
4273 f->close_section();
4274 }
4275 f->close_section();
4276
4277 f->dump_int("loner", loner_cap.v);
4278 f->dump_int("want_loner", want_loner_cap.v);
4279
4280 f->open_array_section("mds_caps_wanted");
94b18763 4281 for (const auto &p : mds_caps_wanted) {
7c673cae 4282 f->open_object_section("mds_cap_wanted");
94b18763
FG
4283 f->dump_int("rank", p.first);
4284 f->dump_string("cap", ccap_string(p.second));
7c673cae
FG
4285 f->close_section();
4286 }
4287 f->close_section();
4288}
4289
4290/****** Scrub Stuff *****/
4291void CInode::scrub_info_create() const
4292{
4293 dout(25) << __func__ << dendl;
4294 assert(!scrub_infop);
4295
4296 // break out of const-land to set up implicit initial state
4297 CInode *me = const_cast<CInode*>(this);
94b18763 4298 mempool_inode *in = me->get_projected_inode();
7c673cae
FG
4299
4300 scrub_info_t *si = new scrub_info_t();
4301 si->scrub_start_stamp = si->last_scrub_stamp = in->last_scrub_stamp;
4302 si->scrub_start_version = si->last_scrub_version = in->last_scrub_version;
4303
4304 me->scrub_infop = si;
4305}
4306
4307void CInode::scrub_maybe_delete_info()
4308{
4309 if (scrub_infop &&
4310 !scrub_infop->scrub_in_progress &&
4311 !scrub_infop->last_scrub_dirty) {
4312 delete scrub_infop;
4313 scrub_infop = NULL;
4314 }
4315}
4316
4317void CInode::scrub_initialize(CDentry *scrub_parent,
b32b8144 4318 ScrubHeaderRef& header,
7c673cae
FG
4319 MDSInternalContextBase *f)
4320{
4321 dout(20) << __func__ << " with scrub_version " << get_version() << dendl;
94b18763
FG
4322 if (scrub_is_in_progress()) {
4323 dout(20) << __func__ << " inode moved during scrub, reinitializing "
4324 << dendl;
4325 assert(scrub_infop->scrub_parent);
4326 CDentry *dn = scrub_infop->scrub_parent;
4327 CDir *dir = dn->dir;
4328 dn->put(CDentry::PIN_SCRUBPARENT);
4329 assert(dir->scrub_infop && dir->scrub_infop->directory_scrubbing);
4330 dir->scrub_infop->directories_scrubbing.erase(dn->key());
4331 dir->scrub_infop->others_scrubbing.erase(dn->key());
4332 }
7c673cae
FG
4333 scrub_info();
4334 if (!scrub_infop)
4335 scrub_infop = new scrub_info_t();
4336
4337 if (get_projected_inode()->is_dir()) {
4338 // fill in dirfrag_stamps with initial state
4339 std::list<frag_t> frags;
4340 dirfragtree.get_leaves(frags);
4341 for (std::list<frag_t>::iterator i = frags.begin();
4342 i != frags.end();
4343 ++i) {
4344 if (header->get_force())
4345 scrub_infop->dirfrag_stamps[*i].reset();
4346 else
4347 scrub_infop->dirfrag_stamps[*i];
4348 }
4349 }
4350
4351 if (scrub_parent)
4352 scrub_parent->get(CDentry::PIN_SCRUBPARENT);
4353 scrub_infop->scrub_parent = scrub_parent;
4354 scrub_infop->on_finish = f;
4355 scrub_infop->scrub_in_progress = true;
4356 scrub_infop->children_scrubbed = false;
4357 scrub_infop->header = header;
4358
4359 scrub_infop->scrub_start_version = get_version();
4360 scrub_infop->scrub_start_stamp = ceph_clock_now();
4361 // right now we don't handle remote inodes
4362}
4363
4364int CInode::scrub_dirfrag_next(frag_t* out_dirfrag)
4365{
4366 dout(20) << __func__ << dendl;
4367 assert(scrub_is_in_progress());
4368
4369 if (!is_dir()) {
4370 return -ENOTDIR;
4371 }
4372
4373 std::map<frag_t, scrub_stamp_info_t>::iterator i =
4374 scrub_infop->dirfrag_stamps.begin();
4375
4376 while (i != scrub_infop->dirfrag_stamps.end()) {
4377 if (i->second.scrub_start_version < scrub_infop->scrub_start_version) {
4378 i->second.scrub_start_version = get_projected_version();
4379 i->second.scrub_start_stamp = ceph_clock_now();
4380 *out_dirfrag = i->first;
4381 dout(20) << " return frag " << *out_dirfrag << dendl;
4382 return 0;
4383 }
4384 ++i;
4385 }
4386
4387 dout(20) << " no frags left, ENOENT " << dendl;
4388 return ENOENT;
4389}
4390
4391void CInode::scrub_dirfrags_scrubbing(list<frag_t>* out_dirfrags)
4392{
4393 assert(out_dirfrags != NULL);
4394 assert(scrub_infop != NULL);
4395
4396 out_dirfrags->clear();
4397 std::map<frag_t, scrub_stamp_info_t>::iterator i =
4398 scrub_infop->dirfrag_stamps.begin();
4399
4400 while (i != scrub_infop->dirfrag_stamps.end()) {
4401 if (i->second.scrub_start_version >= scrub_infop->scrub_start_version) {
4402 if (i->second.last_scrub_version < scrub_infop->scrub_start_version)
4403 out_dirfrags->push_back(i->first);
4404 } else {
4405 return;
4406 }
4407
4408 ++i;
4409 }
4410}
4411
4412void CInode::scrub_dirfrag_finished(frag_t dirfrag)
4413{
4414 dout(20) << __func__ << " on frag " << dirfrag << dendl;
4415 assert(scrub_is_in_progress());
4416
4417 std::map<frag_t, scrub_stamp_info_t>::iterator i =
4418 scrub_infop->dirfrag_stamps.find(dirfrag);
4419 assert(i != scrub_infop->dirfrag_stamps.end());
4420
4421 scrub_stamp_info_t &si = i->second;
4422 si.last_scrub_stamp = si.scrub_start_stamp;
4423 si.last_scrub_version = si.scrub_start_version;
4424}
4425
4426void CInode::scrub_finished(MDSInternalContextBase **c) {
4427 dout(20) << __func__ << dendl;
4428 assert(scrub_is_in_progress());
4429 for (std::map<frag_t, scrub_stamp_info_t>::iterator i =
4430 scrub_infop->dirfrag_stamps.begin();
4431 i != scrub_infop->dirfrag_stamps.end();
4432 ++i) {
4433 if(i->second.last_scrub_version != i->second.scrub_start_version) {
4434 derr << i->second.last_scrub_version << " != "
4435 << i->second.scrub_start_version << dendl;
4436 }
4437 assert(i->second.last_scrub_version == i->second.scrub_start_version);
4438 }
4439
4440 scrub_infop->last_scrub_version = scrub_infop->scrub_start_version;
4441 scrub_infop->last_scrub_stamp = scrub_infop->scrub_start_stamp;
4442 scrub_infop->last_scrub_dirty = true;
4443 scrub_infop->scrub_in_progress = false;
4444
4445 if (scrub_infop->scrub_parent) {
4446 CDentry *dn = scrub_infop->scrub_parent;
4447 scrub_infop->scrub_parent = NULL;
4448 dn->dir->scrub_dentry_finished(dn);
4449 dn->put(CDentry::PIN_SCRUBPARENT);
4450 }
4451
4452 *c = scrub_infop->on_finish;
4453 scrub_infop->on_finish = NULL;
4454
4455 if (scrub_infop->header->get_origin() == this) {
4456 // We are at the point that a tagging scrub was initiated
4457 LogChannelRef clog = mdcache->mds->clog;
b32b8144
FG
4458 if (scrub_infop->header->get_tag().empty()) {
4459 clog->info() << "scrub complete";
4460 } else {
4461 clog->info() << "scrub complete with tag '"
4462 << scrub_infop->header->get_tag() << "'";
4463 }
7c673cae
FG
4464 }
4465}
4466
4467int64_t CInode::get_backtrace_pool() const
4468{
4469 if (is_dir()) {
4470 return mdcache->mds->mdsmap->get_metadata_pool();
4471 } else {
4472 // Files are required to have an explicit layout that specifies
4473 // a pool
4474 assert(inode.layout.pool_id != -1);
4475 return inode.layout.pool_id;
4476 }
4477}
4478
31f18b77
FG
4479void CInode::maybe_export_pin(bool update)
4480{
4481 if (!g_conf->mds_bal_export_pin)
4482 return;
4483 if (!is_dir() || !is_normal())
4484 return;
7c673cae 4485
31f18b77
FG
4486 mds_rank_t export_pin = get_export_pin(false);
4487 if (export_pin == MDS_RANK_NONE && !update)
4488 return;
7c673cae 4489
31f18b77
FG
4490 if (state_test(CInode::STATE_QUEUEDEXPORTPIN))
4491 return;
4492
4493 bool queue = false;
4494 for (auto p = dirfrags.begin(); p != dirfrags.end(); p++) {
4495 CDir *dir = p->second;
4496 if (!dir->is_auth())
4497 continue;
4498 if (export_pin != MDS_RANK_NONE) {
4499 if (dir->is_subtree_root()) {
4500 // set auxsubtree bit or export it
4501 if (!dir->state_test(CDir::STATE_AUXSUBTREE) ||
4502 export_pin != dir->get_dir_auth().first)
4503 queue = true;
4504 } else {
4505 // create aux subtree or export it
4506 queue = true;
7c673cae 4507 }
31f18b77
FG
4508 } else {
4509 // clear aux subtrees ?
4510 queue = dir->state_test(CDir::STATE_AUXSUBTREE);
4511 }
4512 if (queue) {
4513 state_set(CInode::STATE_QUEUEDEXPORTPIN);
7c673cae 4514 mdcache->export_pin_queue.insert(this);
31f18b77 4515 break;
7c673cae
FG
4516 }
4517 }
4518}
4519
4520void CInode::set_export_pin(mds_rank_t rank)
4521{
4522 assert(is_dir());
4523 assert(is_projected());
4524 get_projected_inode()->export_pin = rank;
31f18b77 4525 maybe_export_pin(true);
7c673cae
FG
4526}
4527
4528mds_rank_t CInode::get_export_pin(bool inherit) const
4529{
4530 /* An inode that is export pinned may not necessarily be a subtree root, we
4531 * need to traverse the parents. A base or system inode cannot be pinned.
4532 * N.B. inodes not yet linked into a dir (i.e. anonymous inodes) will not
4533 * have a parent yet.
4534 */
b32b8144
FG
4535 const CInode *in = this;
4536 while (true) {
4537 if (in->is_system())
4538 break;
4539 const CDentry *pdn = in->get_projected_parent_dn();
4540 if (!pdn)
4541 break;
94b18763 4542 const mempool_inode *pi = in->get_projected_inode();
b32b8144
FG
4543 // ignore export pin for unlinked directory
4544 if (pi->nlink == 0)
4545 break;
4546 if (pi->export_pin >= 0)
4547 return pi->export_pin;
4548
4549 if (!inherit)
4550 break;
4551 in = pdn->get_dir()->inode;
7c673cae
FG
4552 }
4553 return MDS_RANK_NONE;
4554}
4555
4556bool CInode::is_exportable(mds_rank_t dest) const
4557{
4558 mds_rank_t pin = get_export_pin();
4559 if (pin == dest) {
4560 return true;
4561 } else if (pin >= 0) {
4562 return false;
4563 } else {
4564 return true;
4565 }
4566}
181888fb
FG
4567
4568MEMPOOL_DEFINE_OBJECT_FACTORY(CInode, co_inode, mds_co);