]> git.proxmox.com Git - ceph.git/blame - ceph/src/mds/CInode.cc
update sources to 12.2.10
[ceph.git] / ceph / src / mds / CInode.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include "include/int_types.h"
16#include "common/errno.h"
17
18#include <string>
19#include <stdio.h>
20
21#include "CInode.h"
22#include "CDir.h"
23#include "CDentry.h"
24
25#include "MDSRank.h"
26#include "MDCache.h"
27#include "MDLog.h"
28#include "Locker.h"
29#include "Mutation.h"
30
31#include "events/EUpdate.h"
32
33#include "osdc/Objecter.h"
34
35#include "snap.h"
36
37#include "LogSegment.h"
38
39#include "common/Clock.h"
40
41#include "messages/MLock.h"
42#include "messages/MClientCaps.h"
43
44#include "common/config.h"
45#include "global/global_context.h"
46#include "include/assert.h"
47
48#include "mds/MDSContinuation.h"
49#include "mds/InoTable.h"
50
51#define dout_context g_ceph_context
52#define dout_subsys ceph_subsys_mds
53#undef dout_prefix
54#define dout_prefix *_dout << "mds." << mdcache->mds->get_nodeid() << ".cache.ino(" << inode.ino << ") "
55
56
57class CInodeIOContext : public MDSIOContextBase
58{
59protected:
60 CInode *in;
61 MDSRank *get_mds() override {return in->mdcache->mds;}
62public:
63 explicit CInodeIOContext(CInode *in_) : in(in_) {
64 assert(in != NULL);
65 }
66};
67
68
69LockType CInode::versionlock_type(CEPH_LOCK_IVERSION);
70LockType CInode::authlock_type(CEPH_LOCK_IAUTH);
71LockType CInode::linklock_type(CEPH_LOCK_ILINK);
72LockType CInode::dirfragtreelock_type(CEPH_LOCK_IDFT);
73LockType CInode::filelock_type(CEPH_LOCK_IFILE);
74LockType CInode::xattrlock_type(CEPH_LOCK_IXATTR);
75LockType CInode::snaplock_type(CEPH_LOCK_ISNAP);
76LockType CInode::nestlock_type(CEPH_LOCK_INEST);
77LockType CInode::flocklock_type(CEPH_LOCK_IFLOCK);
78LockType CInode::policylock_type(CEPH_LOCK_IPOLICY);
79
80//int cinode_pins[CINODE_NUM_PINS]; // counts
81ostream& CInode::print_db_line_prefix(ostream& out)
82{
83 return out << ceph_clock_now() << " mds." << mdcache->mds->get_nodeid() << ".cache.ino(" << inode.ino << ") ";
84}
85
86/*
87 * write caps and lock ids
88 */
89struct cinode_lock_info_t cinode_lock_info[] = {
90 { CEPH_LOCK_IFILE, CEPH_CAP_ANY_FILE_WR },
91 { CEPH_LOCK_IAUTH, CEPH_CAP_AUTH_EXCL },
92 { CEPH_LOCK_ILINK, CEPH_CAP_LINK_EXCL },
93 { CEPH_LOCK_IXATTR, CEPH_CAP_XATTR_EXCL },
94};
95int num_cinode_locks = sizeof(cinode_lock_info) / sizeof(cinode_lock_info[0]);
96
97
98
99ostream& operator<<(ostream& out, const CInode& in)
100{
101 string path;
102 in.make_path_string(path, true);
103
104 out << "[inode " << in.inode.ino;
105 out << " ["
106 << (in.is_multiversion() ? "...":"")
107 << in.first << "," << in.last << "]";
108 out << " " << path << (in.is_dir() ? "/":"");
109
110 if (in.is_auth()) {
111 out << " auth";
112 if (in.is_replicated())
113 out << in.get_replicas();
114 } else {
115 mds_authority_t a = in.authority();
116 out << " rep@" << a.first;
117 if (a.second != CDIR_AUTH_UNKNOWN)
118 out << "," << a.second;
119 out << "." << in.get_replica_nonce();
120 }
121
122 if (in.is_symlink())
123 out << " symlink='" << in.symlink << "'";
124 if (in.is_dir() && !in.dirfragtree.empty())
125 out << " " << in.dirfragtree;
126
127 out << " v" << in.get_version();
128 if (in.get_projected_version() > in.get_version())
129 out << " pv" << in.get_projected_version();
130
131 if (in.is_auth_pinned()) {
132 out << " ap=" << in.get_num_auth_pins() << "+" << in.get_num_nested_auth_pins();
133#ifdef MDS_AUTHPIN_SET
134 out << "(" << in.auth_pin_set << ")";
135#endif
136 }
137
138 if (in.snaprealm)
139 out << " snaprealm=" << in.snaprealm;
140
141 if (in.state_test(CInode::STATE_AMBIGUOUSAUTH)) out << " AMBIGAUTH";
142 if (in.state_test(CInode::STATE_NEEDSRECOVER)) out << " needsrecover";
143 if (in.state_test(CInode::STATE_RECOVERING)) out << " recovering";
144 if (in.state_test(CInode::STATE_DIRTYPARENT)) out << " dirtyparent";
145 if (in.state_test(CInode::STATE_MISSINGOBJS)) out << " missingobjs";
146 if (in.is_freezing_inode()) out << " FREEZING=" << in.auth_pin_freeze_allowance;
147 if (in.is_frozen_inode()) out << " FROZEN";
148 if (in.is_frozen_auth_pin()) out << " FROZEN_AUTHPIN";
149
94b18763 150 const CInode::mempool_inode *pi = in.get_projected_inode();
7c673cae
FG
151 if (pi->is_truncating())
152 out << " truncating(" << pi->truncate_from << " to " << pi->truncate_size << ")";
153
154 if (in.inode.is_dir()) {
155 out << " " << in.inode.dirstat;
156 if (g_conf->mds_debug_scatterstat && in.is_projected()) {
94b18763 157 const CInode::mempool_inode *pi = in.get_projected_inode();
7c673cae
FG
158 out << "->" << pi->dirstat;
159 }
160 } else {
161 out << " s=" << in.inode.size;
162 if (in.inode.nlink != 1)
163 out << " nl=" << in.inode.nlink;
164 }
165
166 // rstat
167 out << " " << in.inode.rstat;
168 if (!(in.inode.rstat == in.inode.accounted_rstat))
169 out << "/" << in.inode.accounted_rstat;
170 if (g_conf->mds_debug_scatterstat && in.is_projected()) {
94b18763 171 const CInode::mempool_inode *pi = in.get_projected_inode();
7c673cae
FG
172 out << "->" << pi->rstat;
173 if (!(pi->rstat == pi->accounted_rstat))
174 out << "/" << pi->accounted_rstat;
175 }
176
177 if (!in.client_need_snapflush.empty())
178 out << " need_snapflush=" << in.client_need_snapflush;
179
180
181 // locks
182 if (!in.authlock.is_sync_and_unlocked())
183 out << " " << in.authlock;
184 if (!in.linklock.is_sync_and_unlocked())
185 out << " " << in.linklock;
186 if (in.inode.is_dir()) {
187 if (!in.dirfragtreelock.is_sync_and_unlocked())
188 out << " " << in.dirfragtreelock;
189 if (!in.snaplock.is_sync_and_unlocked())
190 out << " " << in.snaplock;
191 if (!in.nestlock.is_sync_and_unlocked())
192 out << " " << in.nestlock;
193 if (!in.policylock.is_sync_and_unlocked())
194 out << " " << in.policylock;
195 } else {
196 if (!in.flocklock.is_sync_and_unlocked())
197 out << " " << in.flocklock;
198 }
199 if (!in.filelock.is_sync_and_unlocked())
200 out << " " << in.filelock;
201 if (!in.xattrlock.is_sync_and_unlocked())
202 out << " " << in.xattrlock;
203 if (!in.versionlock.is_sync_and_unlocked())
204 out << " " << in.versionlock;
205
206 // hack: spit out crap on which clients have caps
207 if (in.inode.client_ranges.size())
208 out << " cr=" << in.inode.client_ranges;
209
210 if (!in.get_client_caps().empty()) {
211 out << " caps={";
212 for (map<client_t,Capability*>::const_iterator it = in.get_client_caps().begin();
213 it != in.get_client_caps().end();
214 ++it) {
215 if (it != in.get_client_caps().begin()) out << ",";
216 out << it->first << "="
217 << ccap_string(it->second->pending());
218 if (it->second->issued() != it->second->pending())
219 out << "/" << ccap_string(it->second->issued());
220 out << "/" << ccap_string(it->second->wanted())
221 << "@" << it->second->get_last_sent();
222 }
223 out << "}";
224 if (in.get_loner() >= 0 || in.get_wanted_loner() >= 0) {
225 out << ",l=" << in.get_loner();
226 if (in.get_loner() != in.get_wanted_loner())
227 out << "(" << in.get_wanted_loner() << ")";
228 }
229 }
230 if (!in.get_mds_caps_wanted().empty()) {
231 out << " mcw={";
94b18763
FG
232 bool first = true;
233 for (const auto &p : in.get_mds_caps_wanted()) {
234 if (!first)
7c673cae 235 out << ',';
94b18763
FG
236 out << p.first << '=' << ccap_string(p.second);
237 first = false;
7c673cae
FG
238 }
239 out << '}';
240 }
241
242 if (in.get_num_ref()) {
243 out << " |";
244 in.print_pin_set(out);
245 }
246
247 if (in.inode.export_pin != MDS_RANK_NONE) {
248 out << " export_pin=" << in.inode.export_pin;
249 }
250
251 out << " " << &in;
252 out << "]";
253 return out;
254}
255
256ostream& operator<<(ostream& out, const CInode::scrub_stamp_info_t& si)
257{
258 out << "{scrub_start_version: " << si.scrub_start_version
259 << ", scrub_start_stamp: " << si.scrub_start_stamp
260 << ", last_scrub_version: " << si.last_scrub_version
261 << ", last_scrub_stamp: " << si.last_scrub_stamp;
262 return out;
263}
264
265
266
267void CInode::print(ostream& out)
268{
269 out << *this;
270}
271
272
273
274void CInode::add_need_snapflush(CInode *snapin, snapid_t snapid, client_t client)
275{
276 dout(10) << "add_need_snapflush client." << client << " snapid " << snapid << " on " << snapin << dendl;
277
278 if (client_need_snapflush.empty()) {
279 get(CInode::PIN_NEEDSNAPFLUSH);
280
281 // FIXME: this is non-optimal, as we'll block freezes/migrations for potentially
282 // long periods waiting for clients to flush their snaps.
283 auth_pin(this); // pin head inode...
284 }
285
94b18763 286 auto &clients = client_need_snapflush[snapid];
7c673cae
FG
287 if (clients.empty())
288 snapin->auth_pin(this); // ...and pin snapped/old inode!
289
290 clients.insert(client);
291}
292
293void CInode::remove_need_snapflush(CInode *snapin, snapid_t snapid, client_t client)
294{
94b18763
FG
295 dout(10) << __func__ << " client." << client << " snapid " << snapid << " on " << snapin << dendl;
296 auto it = client_need_snapflush.find(snapid);
297 if (it == client_need_snapflush.end()) {
7c673cae
FG
298 dout(10) << " snapid not found" << dendl;
299 return;
300 }
94b18763
FG
301 size_t n = it->second.erase(client);
302 if (n == 0) {
7c673cae
FG
303 dout(10) << " client not found" << dendl;
304 return;
305 }
94b18763
FG
306 if (it->second.empty()) {
307 client_need_snapflush.erase(it);
7c673cae
FG
308 snapin->auth_unpin(this);
309
310 if (client_need_snapflush.empty()) {
311 put(CInode::PIN_NEEDSNAPFLUSH);
312 auth_unpin(this);
313 }
314 }
315}
316
317bool CInode::split_need_snapflush(CInode *cowin, CInode *in)
318{
319 dout(10) << "split_need_snapflush [" << cowin->first << "," << cowin->last << "] for " << *cowin << dendl;
320 bool need_flush = false;
94b18763
FG
321 for (auto it = client_need_snapflush.lower_bound(cowin->first);
322 it != client_need_snapflush.end() && it->first < in->first; ) {
323 assert(!it->second.empty());
324 if (cowin->last >= it->first) {
7c673cae
FG
325 cowin->auth_pin(this);
326 need_flush = true;
94b18763
FG
327 ++it;
328 } else {
329 it = client_need_snapflush.erase(it);
330 }
7c673cae
FG
331 in->auth_unpin(this);
332 }
333 return need_flush;
334}
335
336void CInode::mark_dirty_rstat()
337{
338 if (!state_test(STATE_DIRTYRSTAT)) {
339 dout(10) << "mark_dirty_rstat" << dendl;
340 state_set(STATE_DIRTYRSTAT);
341 get(PIN_DIRTYRSTAT);
224ce89b
WB
342 CDentry *pdn = get_projected_parent_dn();
343 if (pdn->is_auth()) {
344 CDir *pdir = pdn->dir;
345 pdir->dirty_rstat_inodes.push_back(&dirty_rstat_item);
346 mdcache->mds->locker->mark_updated_scatterlock(&pdir->inode->nestlock);
347 } else {
348 // under cross-MDS rename.
349 // DIRTYRSTAT flag will get cleared when rename finishes
350 assert(state_test(STATE_AMBIGUOUSAUTH));
351 }
7c673cae
FG
352 }
353}
354void CInode::clear_dirty_rstat()
355{
356 if (state_test(STATE_DIRTYRSTAT)) {
357 dout(10) << "clear_dirty_rstat" << dendl;
358 state_clear(STATE_DIRTYRSTAT);
359 put(PIN_DIRTYRSTAT);
360 dirty_rstat_item.remove_myself();
361 }
362}
363
94b18763
FG
364/* Ideally this function would be subsumed by project_inode but it is also
365 * needed by CInode::project_past_snaprealm_parent so we keep it.
366 */
367sr_t &CInode::project_snaprealm(projected_inode &pi)
7c673cae 368{
94b18763
FG
369 const sr_t *cur_srnode = get_projected_srnode();
370
371 assert(!pi.snapnode);
372 if (cur_srnode) {
373 pi.snapnode.reset(new sr_t(*cur_srnode));
7c673cae 374 } else {
94b18763
FG
375 pi.snapnode.reset(new sr_t());
376 pi.snapnode->created = 0;
377 pi.snapnode->current_parent_since = get_oldest_snap();
7c673cae 378 }
94b18763 379 ++num_projected_srnodes;
7c673cae 380
94b18763
FG
381 dout(10) << __func__ << " " << pi.snapnode.get() << dendl;
382 return *pi.snapnode.get();
383}
7c673cae 384
94b18763
FG
385CInode::projected_inode &CInode::project_inode(bool xattr, bool snap)
386{
387 if (projected_nodes.empty()) {
388 projected_nodes.emplace_back(inode);
389 } else {
390 projected_nodes.emplace_back(projected_nodes.back().inode);
7c673cae 391 }
94b18763 392 auto &pi = projected_nodes.back();
7c673cae
FG
393
394 if (scrub_infop && scrub_infop->last_scrub_dirty) {
94b18763
FG
395 pi.inode.last_scrub_stamp = scrub_infop->last_scrub_stamp;
396 pi.inode.last_scrub_version = scrub_infop->last_scrub_version;
7c673cae
FG
397 scrub_infop->last_scrub_dirty = false;
398 scrub_maybe_delete_info();
399 }
94b18763
FG
400
401 if (xattr) {
402 pi.xattrs.reset(new mempool_xattr_map(*get_projected_xattrs()));
403 ++num_projected_xattrs;
404 }
405
406 if (snap) {
407 project_snaprealm(pi);
408 }
409
410 dout(15) << __func__ << " " << pi.inode.ino << dendl;
411 return pi;
7c673cae
FG
412}
413
414void CInode::pop_and_dirty_projected_inode(LogSegment *ls)
415{
416 assert(!projected_nodes.empty());
94b18763
FG
417 auto &front = projected_nodes.front();
418 dout(15) << __func__ << " " << front.inode.ino
419 << " v" << front.inode.version << dendl;
7c673cae
FG
420 int64_t old_pool = inode.layout.pool_id;
421
94b18763
FG
422 mark_dirty(front.inode.version, ls);
423 inode = front.inode;
7c673cae
FG
424
425 if (inode.is_backtrace_updated())
28e407b8 426 mark_dirty_parent(ls, old_pool != inode.layout.pool_id);
7c673cae 427
94b18763 428 if (front.xattrs) {
7c673cae 429 --num_projected_xattrs;
94b18763 430 xattrs = *front.xattrs;
7c673cae
FG
431 }
432
94b18763
FG
433 auto &snapnode = front.snapnode;
434 if (snapnode) {
435 pop_projected_snaprealm(snapnode.get());
7c673cae
FG
436 --num_projected_srnodes;
437 }
438
7c673cae
FG
439 projected_nodes.pop_front();
440}
441
7c673cae
FG
442/* if newparent != parent, add parent to past_parents
443 if parent DNE, we need to find what the parent actually is and fill that in */
444void CInode::project_past_snaprealm_parent(SnapRealm *newparent)
445{
94b18763
FG
446 assert(!projected_nodes.empty());
447 sr_t &new_snap = project_snaprealm(projected_nodes.back());
7c673cae
FG
448 SnapRealm *oldparent;
449 if (!snaprealm) {
450 oldparent = find_snaprealm();
94b18763 451 new_snap.seq = oldparent->get_newest_seq();
7c673cae
FG
452 }
453 else
454 oldparent = snaprealm->parent;
455
456 if (newparent != oldparent) {
457 snapid_t oldparentseq = oldparent->get_newest_seq();
94b18763
FG
458 if (oldparentseq + 1 > new_snap.current_parent_since) {
459 new_snap.past_parents[oldparentseq].ino = oldparent->inode->ino();
460 new_snap.past_parents[oldparentseq].first = new_snap.current_parent_since;
7c673cae 461 }
94b18763 462 new_snap.current_parent_since = std::max(oldparentseq, newparent->get_last_created()) + 1;
7c673cae
FG
463 }
464}
465
466void CInode::pop_projected_snaprealm(sr_t *next_snaprealm)
467{
468 assert(next_snaprealm);
469 dout(10) << "pop_projected_snaprealm " << next_snaprealm
470 << " seq" << next_snaprealm->seq << dendl;
471 bool invalidate_cached_snaps = false;
472 if (!snaprealm) {
473 open_snaprealm();
474 } else if (next_snaprealm->past_parents.size() !=
475 snaprealm->srnode.past_parents.size()) {
476 invalidate_cached_snaps = true;
477 // re-open past parents
478 snaprealm->_close_parents();
479
480 dout(10) << " realm " << *snaprealm << " past_parents " << snaprealm->srnode.past_parents
481 << " -> " << next_snaprealm->past_parents << dendl;
482 }
483 snaprealm->srnode = *next_snaprealm;
7c673cae
FG
484
485 // we should be able to open these up (or have them already be open).
486 bool ok = snaprealm->_open_parents(NULL);
487 assert(ok);
488
489 if (invalidate_cached_snaps)
490 snaprealm->invalidate_cached_snaps();
491
492 if (snaprealm->parent)
493 dout(10) << " realm " << *snaprealm << " parent " << *snaprealm->parent << dendl;
494}
495
496
497// ====== CInode =======
498
499// dirfrags
500
94b18763 501__u32 InodeStoreBase::hash_dentry_name(boost::string_view dn)
7c673cae
FG
502{
503 int which = inode.dir_layout.dl_dir_hash;
504 if (!which)
505 which = CEPH_STR_HASH_LINUX;
506 assert(ceph_str_hash_valid(which));
507 return ceph_str_hash(which, dn.data(), dn.length());
508}
509
94b18763 510frag_t InodeStoreBase::pick_dirfrag(boost::string_view dn)
7c673cae
FG
511{
512 if (dirfragtree.empty())
513 return frag_t(); // avoid the string hash if we can.
514
515 __u32 h = hash_dentry_name(dn);
516 return dirfragtree[h];
517}
518
519bool CInode::get_dirfrags_under(frag_t fg, list<CDir*>& ls)
520{
521 bool all = true;
94b18763 522 std::list<frag_t> fglist;
7c673cae
FG
523 dirfragtree.get_leaves_under(fg, fglist);
524 for (list<frag_t>::iterator p = fglist.begin(); p != fglist.end(); ++p)
525 if (dirfrags.count(*p))
526 ls.push_back(dirfrags[*p]);
527 else
528 all = false;
529
530 if (all)
531 return all;
532
533 fragtree_t tmpdft;
534 tmpdft.force_to_leaf(g_ceph_context, fg);
94b18763
FG
535 for (auto &p : dirfrags) {
536 tmpdft.force_to_leaf(g_ceph_context, p.first);
537 if (fg.contains(p.first) && !dirfragtree.is_leaf(p.first))
538 ls.push_back(p.second);
7c673cae
FG
539 }
540
541 all = true;
542 tmpdft.get_leaves_under(fg, fglist);
94b18763
FG
543 for (const auto &p : fglist) {
544 if (!dirfrags.count(p)) {
7c673cae
FG
545 all = false;
546 break;
547 }
94b18763 548 }
7c673cae
FG
549
550 return all;
551}
552
553void CInode::verify_dirfrags()
554{
555 bool bad = false;
94b18763
FG
556 for (const auto &p : dirfrags) {
557 if (!dirfragtree.is_leaf(p.first)) {
558 dout(0) << "have open dirfrag " << p.first << " but not leaf in " << dirfragtree
559 << ": " << *p.second << dendl;
7c673cae
FG
560 bad = true;
561 }
562 }
563 assert(!bad);
564}
565
566void CInode::force_dirfrags()
567{
568 bool bad = false;
94b18763
FG
569 for (auto &p : dirfrags) {
570 if (!dirfragtree.is_leaf(p.first)) {
571 dout(0) << "have open dirfrag " << p.first << " but not leaf in " << dirfragtree
572 << ": " << *p.second << dendl;
7c673cae
FG
573 bad = true;
574 }
575 }
576
577 if (bad) {
578 list<frag_t> leaves;
579 dirfragtree.get_leaves(leaves);
580 for (list<frag_t>::iterator p = leaves.begin(); p != leaves.end(); ++p)
581 mdcache->get_force_dirfrag(dirfrag_t(ino(),*p), true);
582 }
583
584 verify_dirfrags();
585}
586
587CDir *CInode::get_approx_dirfrag(frag_t fg)
588{
589 CDir *dir = get_dirfrag(fg);
590 if (dir) return dir;
591
592 // find a child?
593 list<CDir*> ls;
594 get_dirfrags_under(fg, ls);
595 if (!ls.empty())
596 return ls.front();
597
598 // try parents?
599 while (fg.bits() > 0) {
600 fg = fg.parent();
601 dir = get_dirfrag(fg);
602 if (dir) return dir;
603 }
604 return NULL;
605}
606
7c673cae
FG
607CDir *CInode::get_or_open_dirfrag(MDCache *mdcache, frag_t fg)
608{
609 assert(is_dir());
610
611 // have it?
612 CDir *dir = get_dirfrag(fg);
613 if (!dir) {
614 // create it.
615 assert(is_auth() || mdcache->mds->is_any_replay());
616 dir = new CDir(this, fg, mdcache, is_auth());
617 add_dirfrag(dir);
618 }
619 return dir;
620}
621
622CDir *CInode::add_dirfrag(CDir *dir)
623{
624 assert(dirfrags.count(dir->dirfrag().frag) == 0);
625 dirfrags[dir->dirfrag().frag] = dir;
626
627 if (stickydir_ref > 0) {
628 dir->state_set(CDir::STATE_STICKY);
629 dir->get(CDir::PIN_STICKY);
630 }
631
632 maybe_export_pin();
633
634 return dir;
635}
636
637void CInode::close_dirfrag(frag_t fg)
638{
639 dout(14) << "close_dirfrag " << fg << dendl;
640 assert(dirfrags.count(fg));
641
642 CDir *dir = dirfrags[fg];
643 dir->remove_null_dentries();
644
645 // clear dirty flag
646 if (dir->is_dirty())
647 dir->mark_clean();
648
649 if (stickydir_ref > 0) {
650 dir->state_clear(CDir::STATE_STICKY);
651 dir->put(CDir::PIN_STICKY);
652 }
1adf2230
AA
653
654 if (dir->is_subtree_root())
655 num_subtree_roots--;
7c673cae
FG
656
657 // dump any remaining dentries, for debugging purposes
94b18763
FG
658 for (const auto &p : dir->items)
659 dout(14) << __func__ << " LEFTOVER dn " << *p.second << dendl;
7c673cae
FG
660
661 assert(dir->get_num_ref() == 0);
662 delete dir;
663 dirfrags.erase(fg);
664}
665
666void CInode::close_dirfrags()
667{
668 while (!dirfrags.empty())
669 close_dirfrag(dirfrags.begin()->first);
670}
671
672bool CInode::has_subtree_root_dirfrag(int auth)
673{
1adf2230
AA
674 if (num_subtree_roots > 0) {
675 if (auth == -1)
7c673cae 676 return true;
1adf2230
AA
677 for (const auto &p : dirfrags) {
678 if (p.second->is_subtree_root() &&
679 p.second->dir_auth.first == auth)
680 return true;
681 }
94b18763 682 }
7c673cae
FG
683 return false;
684}
685
686bool CInode::has_subtree_or_exporting_dirfrag()
687{
1adf2230
AA
688 if (num_subtree_roots > 0 || num_exporting_dirs > 0)
689 return true;
7c673cae
FG
690 return false;
691}
692
693void CInode::get_stickydirs()
694{
695 if (stickydir_ref == 0) {
696 get(PIN_STICKYDIRS);
94b18763
FG
697 for (const auto &p : dirfrags) {
698 p.second->state_set(CDir::STATE_STICKY);
699 p.second->get(CDir::PIN_STICKY);
7c673cae
FG
700 }
701 }
702 stickydir_ref++;
703}
704
705void CInode::put_stickydirs()
706{
707 assert(stickydir_ref > 0);
708 stickydir_ref--;
709 if (stickydir_ref == 0) {
710 put(PIN_STICKYDIRS);
94b18763
FG
711 for (const auto &p : dirfrags) {
712 p.second->state_clear(CDir::STATE_STICKY);
713 p.second->put(CDir::PIN_STICKY);
7c673cae
FG
714 }
715 }
716}
717
718
719
720
721
722// pins
723
724void CInode::first_get()
725{
726 // pin my dentry?
727 if (parent)
728 parent->get(CDentry::PIN_INODEPIN);
729}
730
731void CInode::last_put()
732{
733 // unpin my dentry?
734 if (parent)
735 parent->put(CDentry::PIN_INODEPIN);
736}
737
738void CInode::_put()
739{
740 if (get_num_ref() == (int)is_dirty() + (int)is_dirty_parent())
741 mdcache->maybe_eval_stray(this, true);
742}
743
744void CInode::add_remote_parent(CDentry *p)
745{
746 if (remote_parents.empty())
747 get(PIN_REMOTEPARENT);
748 remote_parents.insert(p);
749}
750void CInode::remove_remote_parent(CDentry *p)
751{
752 remote_parents.erase(p);
753 if (remote_parents.empty())
754 put(PIN_REMOTEPARENT);
755}
756
757
758
759
760CDir *CInode::get_parent_dir()
761{
762 if (parent)
763 return parent->dir;
764 return NULL;
765}
766CDir *CInode::get_projected_parent_dir()
767{
768 CDentry *p = get_projected_parent_dn();
769 if (p)
770 return p->dir;
771 return NULL;
772}
773CInode *CInode::get_parent_inode()
774{
775 if (parent)
776 return parent->dir->inode;
777 return NULL;
778}
779
780bool CInode::is_projected_ancestor_of(CInode *other)
781{
782 while (other) {
783 if (other == this)
784 return true;
785 if (!other->get_projected_parent_dn())
786 break;
787 other = other->get_projected_parent_dn()->get_dir()->get_inode();
788 }
789 return false;
790}
791
792/*
793 * Because a non-directory inode may have multiple links, the use_parent
794 * argument allows selecting which parent to use for path construction. This
795 * argument is only meaningful for the final component (i.e. the first of the
796 * nested calls) because directories cannot have multiple hard links. If
797 * use_parent is NULL and projected is true, the primary parent's projected
798 * inode is used all the way up the path chain. Otherwise the primary parent
799 * stable inode is used.
800 */
801void CInode::make_path_string(string& s, bool projected, const CDentry *use_parent) const
802{
803 if (!use_parent) {
804 use_parent = projected ? get_projected_parent_dn() : parent;
805 }
806
807 if (use_parent) {
808 use_parent->make_path_string(s, projected);
809 } else if (is_root()) {
810 s = "";
811 } else if (is_mdsdir()) {
812 char t[40];
813 uint64_t eino(ino());
814 eino -= MDS_INO_MDSDIR_OFFSET;
815 snprintf(t, sizeof(t), "~mds%" PRId64, eino);
816 s = t;
817 } else {
818 char n[40];
819 uint64_t eino(ino());
820 snprintf(n, sizeof(n), "#%" PRIx64, eino);
821 s += n;
822 }
823}
824
825void CInode::make_path(filepath& fp, bool projected) const
826{
827 const CDentry *use_parent = projected ? get_projected_parent_dn() : parent;
828 if (use_parent) {
829 assert(!is_base());
830 use_parent->make_path(fp, projected);
831 } else {
832 fp = filepath(ino());
833 }
834}
835
836void CInode::name_stray_dentry(string& dname)
837{
838 char s[20];
839 snprintf(s, sizeof(s), "%llx", (unsigned long long)inode.ino.val);
840 dname = s;
841}
842
843version_t CInode::pre_dirty()
844{
845 version_t pv;
846 CDentry* _cdentry = get_projected_parent_dn();
847 if (_cdentry) {
848 pv = _cdentry->pre_dirty(get_projected_version());
849 dout(10) << "pre_dirty " << pv << " (current v " << inode.version << ")" << dendl;
850 } else {
851 assert(is_base());
852 pv = get_projected_version() + 1;
853 }
94b18763 854 // force update backtrace for old format inode (see mempool_inode::decode)
7c673cae 855 if (inode.backtrace_version == 0 && !projected_nodes.empty()) {
94b18763
FG
856 mempool_inode &pi = projected_nodes.back().inode;
857 if (pi.backtrace_version == 0)
858 pi.update_backtrace(pv);
7c673cae
FG
859 }
860 return pv;
861}
862
863void CInode::_mark_dirty(LogSegment *ls)
864{
865 if (!state_test(STATE_DIRTY)) {
866 state_set(STATE_DIRTY);
867 get(PIN_DIRTY);
868 assert(ls);
869 }
870
871 // move myself to this segment's dirty list
872 if (ls)
873 ls->dirty_inodes.push_back(&item_dirty);
874}
875
876void CInode::mark_dirty(version_t pv, LogSegment *ls) {
877
878 dout(10) << "mark_dirty " << *this << dendl;
879
880 /*
881 NOTE: I may already be dirty, but this fn _still_ needs to be called so that
882 the directory is (perhaps newly) dirtied, and so that parent_dir_version is
883 updated below.
884 */
885
886 // only auth can get dirty. "dirty" async data in replicas is relative to
887 // filelock state, not the dirty flag.
888 assert(is_auth());
889
890 // touch my private version
891 assert(inode.version < pv);
892 inode.version = pv;
893 _mark_dirty(ls);
894
895 // mark dentry too
896 if (parent)
897 parent->mark_dirty(pv, ls);
898}
899
900
901void CInode::mark_clean()
902{
903 dout(10) << " mark_clean " << *this << dendl;
904 if (state_test(STATE_DIRTY)) {
905 state_clear(STATE_DIRTY);
906 put(PIN_DIRTY);
907
908 // remove myself from ls dirty list
909 item_dirty.remove_myself();
910 }
911}
912
913
914// --------------
915// per-inode storage
916// (currently for root inode only)
917
918struct C_IO_Inode_Stored : public CInodeIOContext {
919 version_t version;
920 Context *fin;
921 C_IO_Inode_Stored(CInode *i, version_t v, Context *f) : CInodeIOContext(i), version(v), fin(f) {}
922 void finish(int r) override {
923 in->_stored(r, version, fin);
924 }
91327a77
AA
925 void print(ostream& out) const override {
926 out << "inode_store(" << in->ino() << ")";
927 }
7c673cae
FG
928};
929
930object_t InodeStoreBase::get_object_name(inodeno_t ino, frag_t fg, const char *suffix)
931{
932 char n[60];
933 snprintf(n, sizeof(n), "%llx.%08llx%s", (long long unsigned)ino, (long long unsigned)fg, suffix ? suffix : "");
934 return object_t(n);
935}
936
937void CInode::store(MDSInternalContextBase *fin)
938{
939 dout(10) << "store " << get_version() << dendl;
940 assert(is_base());
941
942 if (snaprealm)
943 purge_stale_snap_data(snaprealm->get_snaps());
944
945 // encode
946 bufferlist bl;
947 string magic = CEPH_FS_ONDISK_MAGIC;
948 ::encode(magic, bl);
949 encode_store(bl, mdcache->mds->mdsmap->get_up_features());
950
951 // write it.
952 SnapContext snapc;
953 ObjectOperation m;
954 m.write_full(bl);
955
956 object_t oid = CInode::get_object_name(ino(), frag_t(), ".inode");
957 object_locator_t oloc(mdcache->mds->mdsmap->get_metadata_pool());
958
959 Context *newfin =
960 new C_OnFinisher(new C_IO_Inode_Stored(this, get_version(), fin),
961 mdcache->mds->finisher);
962 mdcache->mds->objecter->mutate(oid, oloc, m, snapc,
963 ceph::real_clock::now(), 0,
964 newfin);
965}
966
967void CInode::_stored(int r, version_t v, Context *fin)
968{
969 if (r < 0) {
970 dout(1) << "store error " << r << " v " << v << " on " << *this << dendl;
d2e6a577
FG
971 mdcache->mds->clog->error() << "failed to store inode " << ino()
972 << " object: " << cpp_strerror(r);
7c673cae
FG
973 mdcache->mds->handle_write_error(r);
974 fin->complete(r);
975 return;
976 }
977
978 dout(10) << "_stored " << v << " on " << *this << dendl;
979 if (v == get_projected_version())
980 mark_clean();
981
982 fin->complete(0);
983}
984
985void CInode::flush(MDSInternalContextBase *fin)
986{
987 dout(10) << "flush " << *this << dendl;
988 assert(is_auth() && can_auth_pin());
989
990 MDSGatherBuilder gather(g_ceph_context);
991
992 if (is_dirty_parent()) {
993 store_backtrace(gather.new_sub());
994 }
995 if (is_dirty()) {
996 if (is_base()) {
997 store(gather.new_sub());
998 } else {
999 parent->dir->commit(0, gather.new_sub());
1000 }
1001 }
1002
1003 if (gather.has_subs()) {
1004 gather.set_finisher(fin);
1005 gather.activate();
1006 } else {
1007 fin->complete(0);
1008 }
1009}
1010
1011struct C_IO_Inode_Fetched : public CInodeIOContext {
1012 bufferlist bl, bl2;
1013 Context *fin;
1014 C_IO_Inode_Fetched(CInode *i, Context *f) : CInodeIOContext(i), fin(f) {}
1015 void finish(int r) override {
1016 // Ignore 'r', because we fetch from two places, so r is usually ENOENT
1017 in->_fetched(bl, bl2, fin);
1018 }
91327a77
AA
1019 void print(ostream& out) const override {
1020 out << "inode_fetch(" << in->ino() << ")";
1021 }
7c673cae
FG
1022};
1023
1024void CInode::fetch(MDSInternalContextBase *fin)
1025{
1026 dout(10) << "fetch" << dendl;
1027
1028 C_IO_Inode_Fetched *c = new C_IO_Inode_Fetched(this, fin);
1029 C_GatherBuilder gather(g_ceph_context, new C_OnFinisher(c, mdcache->mds->finisher));
1030
1031 object_t oid = CInode::get_object_name(ino(), frag_t(), "");
1032 object_locator_t oloc(mdcache->mds->mdsmap->get_metadata_pool());
1033
1034 // Old on-disk format: inode stored in xattr of a dirfrag
1035 ObjectOperation rd;
1036 rd.getxattr("inode", &c->bl, NULL);
1037 mdcache->mds->objecter->read(oid, oloc, rd, CEPH_NOSNAP, (bufferlist*)NULL, 0, gather.new_sub());
1038
1039 // Current on-disk format: inode stored in a .inode object
1040 object_t oid2 = CInode::get_object_name(ino(), frag_t(), ".inode");
1041 mdcache->mds->objecter->read(oid2, oloc, 0, 0, CEPH_NOSNAP, &c->bl2, 0, gather.new_sub());
1042
1043 gather.activate();
1044}
1045
1046void CInode::_fetched(bufferlist& bl, bufferlist& bl2, Context *fin)
1047{
1048 dout(10) << "_fetched got " << bl.length() << " and " << bl2.length() << dendl;
1049 bufferlist::iterator p;
1050 if (bl2.length()) {
1051 p = bl2.begin();
1052 } else if (bl.length()) {
1053 p = bl.begin();
1054 } else {
d2e6a577 1055 derr << "No data while reading inode " << ino() << dendl;
7c673cae
FG
1056 fin->complete(-ENOENT);
1057 return;
1058 }
1059
1060 // Attempt decode
1061 try {
1062 string magic;
1063 ::decode(magic, p);
1064 dout(10) << " magic is '" << magic << "' (expecting '"
1065 << CEPH_FS_ONDISK_MAGIC << "')" << dendl;
1066 if (magic != CEPH_FS_ONDISK_MAGIC) {
1067 dout(0) << "on disk magic '" << magic << "' != my magic '" << CEPH_FS_ONDISK_MAGIC
1068 << "'" << dendl;
1069 fin->complete(-EINVAL);
1070 } else {
1071 decode_store(p);
1072 dout(10) << "_fetched " << *this << dendl;
1073 fin->complete(0);
1074 }
1075 } catch (buffer::error &err) {
d2e6a577 1076 derr << "Corrupt inode " << ino() << ": " << err << dendl;
7c673cae
FG
1077 fin->complete(-EINVAL);
1078 return;
1079 }
1080}
1081
1082void CInode::build_backtrace(int64_t pool, inode_backtrace_t& bt)
1083{
1084 bt.ino = inode.ino;
1085 bt.ancestors.clear();
1086 bt.pool = pool;
1087
1088 CInode *in = this;
1089 CDentry *pdn = get_parent_dn();
1090 while (pdn) {
1091 CInode *diri = pdn->get_dir()->get_inode();
94b18763 1092 bt.ancestors.push_back(inode_backpointer_t(diri->ino(), pdn->get_name(), in->inode.version));
7c673cae
FG
1093 in = diri;
1094 pdn = in->get_parent_dn();
1095 }
94b18763 1096 for (auto &p : inode.old_pools) {
7c673cae 1097 // don't add our own pool id to old_pools to avoid looping (e.g. setlayout 0, 1, 0)
94b18763
FG
1098 if (p != pool)
1099 bt.old_pools.insert(p);
7c673cae
FG
1100 }
1101}
1102
1103struct C_IO_Inode_StoredBacktrace : public CInodeIOContext {
1104 version_t version;
1105 Context *fin;
1106 C_IO_Inode_StoredBacktrace(CInode *i, version_t v, Context *f) : CInodeIOContext(i), version(v), fin(f) {}
1107 void finish(int r) override {
1108 in->_stored_backtrace(r, version, fin);
1109 }
91327a77
AA
1110 void print(ostream& out) const override {
1111 out << "backtrace_store(" << in->ino() << ")";
1112 }
7c673cae
FG
1113};
1114
1115void CInode::store_backtrace(MDSInternalContextBase *fin, int op_prio)
1116{
1117 dout(10) << "store_backtrace on " << *this << dendl;
1118 assert(is_dirty_parent());
1119
1120 if (op_prio < 0)
1121 op_prio = CEPH_MSG_PRIO_DEFAULT;
1122
1123 auth_pin(this);
1124
1125 const int64_t pool = get_backtrace_pool();
1126 inode_backtrace_t bt;
1127 build_backtrace(pool, bt);
1128 bufferlist parent_bl;
1129 ::encode(bt, parent_bl);
1130
1131 ObjectOperation op;
1132 op.priority = op_prio;
1133 op.create(false);
1134 op.setxattr("parent", parent_bl);
1135
1136 bufferlist layout_bl;
1137 ::encode(inode.layout, layout_bl, mdcache->mds->mdsmap->get_up_features());
1138 op.setxattr("layout", layout_bl);
1139
1140 SnapContext snapc;
1141 object_t oid = get_object_name(ino(), frag_t(), "");
1142 object_locator_t oloc(pool);
1143 Context *fin2 = new C_OnFinisher(
1144 new C_IO_Inode_StoredBacktrace(this, inode.backtrace_version, fin),
1145 mdcache->mds->finisher);
1146
1147 if (!state_test(STATE_DIRTYPOOL) || inode.old_pools.empty()) {
1148 dout(20) << __func__ << ": no dirtypool or no old pools" << dendl;
1149 mdcache->mds->objecter->mutate(oid, oloc, op, snapc,
1150 ceph::real_clock::now(),
1151 0, fin2);
1152 return;
1153 }
1154
1155 C_GatherBuilder gather(g_ceph_context, fin2);
1156 mdcache->mds->objecter->mutate(oid, oloc, op, snapc,
1157 ceph::real_clock::now(),
1158 0, gather.new_sub());
1159
1160 // In the case where DIRTYPOOL is set, we update all old pools backtraces
1161 // such that anyone reading them will see the new pool ID in
1162 // inode_backtrace_t::pool and go read everything else from there.
94b18763
FG
1163 for (const auto &p : inode.old_pools) {
1164 if (p == pool)
7c673cae
FG
1165 continue;
1166
94b18763 1167 dout(20) << __func__ << ": updating old pool " << p << dendl;
7c673cae
FG
1168
1169 ObjectOperation op;
1170 op.priority = op_prio;
1171 op.create(false);
1172 op.setxattr("parent", parent_bl);
1173
94b18763 1174 object_locator_t oloc(p);
7c673cae
FG
1175 mdcache->mds->objecter->mutate(oid, oloc, op, snapc,
1176 ceph::real_clock::now(),
1177 0, gather.new_sub());
1178 }
1179 gather.activate();
1180}
1181
1182void CInode::_stored_backtrace(int r, version_t v, Context *fin)
1183{
1184 if (r == -ENOENT) {
1185 const int64_t pool = get_backtrace_pool();
1186 bool exists = mdcache->mds->objecter->with_osdmap(
1187 [pool](const OSDMap &osd_map) {
1188 return osd_map.have_pg_pool(pool);
1189 });
1190
1191 // This ENOENT is because the pool doesn't exist (the user deleted it
1192 // out from under us), so the backtrace can never be written, so pretend
1193 // to succeed so that the user can proceed to e.g. delete the file.
1194 if (!exists) {
1195 dout(4) << "store_backtrace got ENOENT: a data pool was deleted "
1196 "beneath us!" << dendl;
1197 r = 0;
1198 }
1199 }
1200
1201 if (r < 0) {
1202 dout(1) << "store backtrace error " << r << " v " << v << dendl;
1203 mdcache->mds->clog->error() << "failed to store backtrace on ino "
1204 << ino() << " object"
1205 << ", pool " << get_backtrace_pool()
1206 << ", errno " << r;
1207 mdcache->mds->handle_write_error(r);
1208 if (fin)
1209 fin->complete(r);
1210 return;
1211 }
1212
1213 dout(10) << "_stored_backtrace v " << v << dendl;
1214
1215 auth_unpin(this);
1216 if (v == inode.backtrace_version)
1217 clear_dirty_parent();
1218 if (fin)
1219 fin->complete(0);
1220}
1221
1222void CInode::fetch_backtrace(Context *fin, bufferlist *backtrace)
1223{
1224 mdcache->fetch_backtrace(inode.ino, get_backtrace_pool(), *backtrace, fin);
1225}
1226
28e407b8 1227void CInode::mark_dirty_parent(LogSegment *ls, bool dirty_pool)
7c673cae
FG
1228{
1229 if (!state_test(STATE_DIRTYPARENT)) {
1230 dout(10) << "mark_dirty_parent" << dendl;
1231 state_set(STATE_DIRTYPARENT);
1232 get(PIN_DIRTYPARENT);
1233 assert(ls);
1234 }
1235 if (dirty_pool)
1236 state_set(STATE_DIRTYPOOL);
1237 if (ls)
1238 ls->dirty_parent_inodes.push_back(&item_dirty_parent);
1239}
1240
1241void CInode::clear_dirty_parent()
1242{
1243 if (state_test(STATE_DIRTYPARENT)) {
1244 dout(10) << "clear_dirty_parent" << dendl;
1245 state_clear(STATE_DIRTYPARENT);
1246 state_clear(STATE_DIRTYPOOL);
1247 put(PIN_DIRTYPARENT);
1248 item_dirty_parent.remove_myself();
1249 }
1250}
1251
1252void CInode::verify_diri_backtrace(bufferlist &bl, int err)
1253{
1254 if (is_base() || is_dirty_parent() || !is_auth())
1255 return;
1256
1257 dout(10) << "verify_diri_backtrace" << dendl;
1258
1259 if (err == 0) {
1260 inode_backtrace_t backtrace;
1261 ::decode(backtrace, bl);
1262 CDentry *pdn = get_parent_dn();
1263 if (backtrace.ancestors.empty() ||
94b18763 1264 backtrace.ancestors[0].dname != pdn->get_name() ||
7c673cae
FG
1265 backtrace.ancestors[0].dirino != pdn->get_dir()->ino())
1266 err = -EINVAL;
1267 }
1268
1269 if (err) {
1270 MDSRank *mds = mdcache->mds;
d2e6a577 1271 mds->clog->error() << "bad backtrace on directory inode " << ino();
7c673cae
FG
1272 assert(!"bad backtrace" == (g_conf->mds_verify_backtrace > 1));
1273
28e407b8 1274 mark_dirty_parent(mds->mdlog->get_current_segment(), false);
7c673cae
FG
1275 mds->mdlog->flush();
1276 }
1277}
1278
1279// ------------------
1280// parent dir
1281
1282
1283void InodeStoreBase::encode_bare(bufferlist &bl, uint64_t features,
1284 const bufferlist *snap_blob) const
1285{
1286 ::encode(inode, bl, features);
1287 if (is_symlink())
1288 ::encode(symlink, bl);
1289 ::encode(dirfragtree, bl);
1290 ::encode(xattrs, bl);
1291 if (snap_blob)
1292 ::encode(*snap_blob, bl);
1293 else
1294 ::encode(bufferlist(), bl);
1295 ::encode(old_inodes, bl, features);
1296 ::encode(oldest_snap, bl);
1297 ::encode(damage_flags, bl);
1298}
1299
1300void InodeStoreBase::encode(bufferlist &bl, uint64_t features,
1301 const bufferlist *snap_blob) const
1302{
1303 ENCODE_START(6, 4, bl);
1304 encode_bare(bl, features, snap_blob);
1305 ENCODE_FINISH(bl);
1306}
1307
1308void CInode::encode_store(bufferlist& bl, uint64_t features)
1309{
1310 bufferlist snap_blob;
1311 encode_snap_blob(snap_blob);
1312 InodeStoreBase::encode(bl, mdcache->mds->mdsmap->get_up_features(),
1313 &snap_blob);
1314}
1315
1316void InodeStoreBase::decode_bare(bufferlist::iterator &bl,
1317 bufferlist& snap_blob, __u8 struct_v)
1318{
1319 ::decode(inode, bl);
94b18763
FG
1320 if (is_symlink()) {
1321 std::string tmp;
1322 ::decode(tmp, bl);
1323 symlink = mempool::mds_co::string(boost::string_view(tmp));
1324 }
7c673cae
FG
1325 ::decode(dirfragtree, bl);
1326 ::decode(xattrs, bl);
1327 ::decode(snap_blob, bl);
1328
1329 ::decode(old_inodes, bl);
1330 if (struct_v == 2 && inode.is_dir()) {
1331 bool default_layout_exists;
1332 ::decode(default_layout_exists, bl);
1333 if (default_layout_exists) {
1334 ::decode(struct_v, bl); // this was a default_file_layout
1335 ::decode(inode.layout, bl); // but we only care about the layout portion
1336 }
1337 }
1338
1339 if (struct_v >= 5) {
1340 // InodeStore is embedded in dentries without proper versioning, so
1341 // we consume up to the end of the buffer
1342 if (!bl.end()) {
1343 ::decode(oldest_snap, bl);
1344 }
1345
1346 if (!bl.end()) {
1347 ::decode(damage_flags, bl);
1348 }
1349 }
1350}
1351
1352
1353void InodeStoreBase::decode(bufferlist::iterator &bl, bufferlist& snap_blob)
1354{
1355 DECODE_START_LEGACY_COMPAT_LEN(5, 4, 4, bl);
1356 decode_bare(bl, snap_blob, struct_v);
1357 DECODE_FINISH(bl);
1358}
1359
1360void CInode::decode_store(bufferlist::iterator& bl)
1361{
1362 bufferlist snap_blob;
1363 InodeStoreBase::decode(bl, snap_blob);
1364 decode_snap_blob(snap_blob);
1365}
1366
1367// ------------------
1368// locking
1369
1370void CInode::set_object_info(MDSCacheObjectInfo &info)
1371{
1372 info.ino = ino();
1373 info.snapid = last;
1374}
1375
1376void CInode::encode_lock_state(int type, bufferlist& bl)
1377{
1378 ::encode(first, bl);
1379
1380 switch (type) {
1381 case CEPH_LOCK_IAUTH:
1382 ::encode(inode.version, bl);
1383 ::encode(inode.ctime, bl);
1384 ::encode(inode.mode, bl);
1385 ::encode(inode.uid, bl);
1386 ::encode(inode.gid, bl);
1387 break;
1388
1389 case CEPH_LOCK_ILINK:
1390 ::encode(inode.version, bl);
1391 ::encode(inode.ctime, bl);
1392 ::encode(inode.nlink, bl);
1393 break;
1394
1395 case CEPH_LOCK_IDFT:
1396 if (is_auth()) {
1397 ::encode(inode.version, bl);
1398 } else {
1399 // treat flushing as dirty when rejoining cache
1400 bool dirty = dirfragtreelock.is_dirty_or_flushing();
1401 ::encode(dirty, bl);
1402 }
1403 {
1404 // encode the raw tree
1405 ::encode(dirfragtree, bl);
1406
1407 // also specify which frags are mine
1408 set<frag_t> myfrags;
1409 list<CDir*> dfls;
1410 get_dirfrags(dfls);
1411 for (list<CDir*>::iterator p = dfls.begin(); p != dfls.end(); ++p)
1412 if ((*p)->is_auth()) {
1413 frag_t fg = (*p)->get_frag();
1414 myfrags.insert(fg);
1415 }
1416 ::encode(myfrags, bl);
1417 }
1418 break;
1419
1420 case CEPH_LOCK_IFILE:
1421 if (is_auth()) {
1422 ::encode(inode.version, bl);
1423 ::encode(inode.ctime, bl);
1424 ::encode(inode.mtime, bl);
1425 ::encode(inode.atime, bl);
1426 ::encode(inode.time_warp_seq, bl);
1427 if (!is_dir()) {
1428 ::encode(inode.layout, bl, mdcache->mds->mdsmap->get_up_features());
1429 ::encode(inode.size, bl);
1430 ::encode(inode.truncate_seq, bl);
1431 ::encode(inode.truncate_size, bl);
1432 ::encode(inode.client_ranges, bl);
1433 ::encode(inode.inline_data, bl);
1434 }
1435 } else {
1436 // treat flushing as dirty when rejoining cache
1437 bool dirty = filelock.is_dirty_or_flushing();
1438 ::encode(dirty, bl);
1439 }
1440
1441 {
1442 dout(15) << "encode_lock_state inode.dirstat is " << inode.dirstat << dendl;
1443 ::encode(inode.dirstat, bl); // only meaningful if i am auth.
1444 bufferlist tmp;
1445 __u32 n = 0;
94b18763
FG
1446 for (const auto &p : dirfrags) {
1447 frag_t fg = p.first;
1448 CDir *dir = p.second;
7c673cae
FG
1449 if (is_auth() || dir->is_auth()) {
1450 fnode_t *pf = dir->get_projected_fnode();
1451 dout(15) << fg << " " << *dir << dendl;
1452 dout(20) << fg << " fragstat " << pf->fragstat << dendl;
1453 dout(20) << fg << " accounted_fragstat " << pf->accounted_fragstat << dendl;
1454 ::encode(fg, tmp);
1455 ::encode(dir->first, tmp);
1456 ::encode(pf->fragstat, tmp);
1457 ::encode(pf->accounted_fragstat, tmp);
1458 n++;
1459 }
1460 }
1461 ::encode(n, bl);
1462 bl.claim_append(tmp);
1463 }
1464 break;
1465
1466 case CEPH_LOCK_INEST:
1467 if (is_auth()) {
1468 ::encode(inode.version, bl);
1469 } else {
1470 // treat flushing as dirty when rejoining cache
1471 bool dirty = nestlock.is_dirty_or_flushing();
1472 ::encode(dirty, bl);
1473 }
1474 {
1475 dout(15) << "encode_lock_state inode.rstat is " << inode.rstat << dendl;
1476 ::encode(inode.rstat, bl); // only meaningful if i am auth.
1477 bufferlist tmp;
1478 __u32 n = 0;
94b18763
FG
1479 for (const auto &p : dirfrags) {
1480 frag_t fg = p.first;
1481 CDir *dir = p.second;
7c673cae
FG
1482 if (is_auth() || dir->is_auth()) {
1483 fnode_t *pf = dir->get_projected_fnode();
1484 dout(10) << fg << " " << *dir << dendl;
1485 dout(10) << fg << " " << pf->rstat << dendl;
1486 dout(10) << fg << " " << pf->rstat << dendl;
1487 dout(10) << fg << " " << dir->dirty_old_rstat << dendl;
1488 ::encode(fg, tmp);
1489 ::encode(dir->first, tmp);
1490 ::encode(pf->rstat, tmp);
1491 ::encode(pf->accounted_rstat, tmp);
1492 ::encode(dir->dirty_old_rstat, tmp);
1493 n++;
1494 }
1495 }
1496 ::encode(n, bl);
1497 bl.claim_append(tmp);
1498 }
1499 break;
1500
1501 case CEPH_LOCK_IXATTR:
1502 ::encode(inode.version, bl);
1503 ::encode(inode.ctime, bl);
1504 ::encode(xattrs, bl);
1505 break;
1506
1507 case CEPH_LOCK_ISNAP:
1508 ::encode(inode.version, bl);
1509 ::encode(inode.ctime, bl);
1510 encode_snap(bl);
1511 break;
1512
1513 case CEPH_LOCK_IFLOCK:
1514 ::encode(inode.version, bl);
1515 _encode_file_locks(bl);
1516 break;
1517
1518 case CEPH_LOCK_IPOLICY:
1519 if (inode.is_dir()) {
1520 ::encode(inode.version, bl);
1521 ::encode(inode.ctime, bl);
1522 ::encode(inode.layout, bl, mdcache->mds->mdsmap->get_up_features());
1523 ::encode(inode.quota, bl);
1524 ::encode(inode.export_pin, bl);
1525 }
1526 break;
1527
1528 default:
1529 ceph_abort();
1530 }
1531}
1532
1533
1534/* for more info on scatterlocks, see comments by Locker::scatter_writebehind */
1535
1536void CInode::decode_lock_state(int type, bufferlist& bl)
1537{
1538 bufferlist::iterator p = bl.begin();
1539 utime_t tm;
1540
1541 snapid_t newfirst;
1542 ::decode(newfirst, p);
1543
1544 if (!is_auth() && newfirst != first) {
1545 dout(10) << "decode_lock_state first " << first << " -> " << newfirst << dendl;
1546 assert(newfirst > first);
1547 if (!is_multiversion() && parent) {
1548 assert(parent->first == first);
1549 parent->first = newfirst;
1550 }
1551 first = newfirst;
1552 }
1553
1554 switch (type) {
1555 case CEPH_LOCK_IAUTH:
1556 ::decode(inode.version, p);
1557 ::decode(tm, p);
1558 if (inode.ctime < tm) inode.ctime = tm;
1559 ::decode(inode.mode, p);
1560 ::decode(inode.uid, p);
1561 ::decode(inode.gid, p);
1562 break;
1563
1564 case CEPH_LOCK_ILINK:
1565 ::decode(inode.version, p);
1566 ::decode(tm, p);
1567 if (inode.ctime < tm) inode.ctime = tm;
1568 ::decode(inode.nlink, p);
1569 break;
1570
1571 case CEPH_LOCK_IDFT:
1572 if (is_auth()) {
1573 bool replica_dirty;
1574 ::decode(replica_dirty, p);
1575 if (replica_dirty) {
1576 dout(10) << "decode_lock_state setting dftlock dirty flag" << dendl;
1577 dirfragtreelock.mark_dirty(); // ok bc we're auth and caller will handle
1578 }
1579 } else {
1580 ::decode(inode.version, p);
1581 }
1582 {
1583 fragtree_t temp;
1584 ::decode(temp, p);
1585 set<frag_t> authfrags;
1586 ::decode(authfrags, p);
1587 if (is_auth()) {
1588 // auth. believe replica's auth frags only.
1589 for (set<frag_t>::iterator p = authfrags.begin(); p != authfrags.end(); ++p)
1590 if (!dirfragtree.is_leaf(*p)) {
1591 dout(10) << " forcing frag " << *p << " to leaf (split|merge)" << dendl;
1592 dirfragtree.force_to_leaf(g_ceph_context, *p);
1593 dirfragtreelock.mark_dirty(); // ok bc we're auth and caller will handle
1594 }
1595 } else {
1596 // replica. take the new tree, BUT make sure any open
1597 // dirfrags remain leaves (they may have split _after_ this
1598 // dft was scattered, or we may still be be waiting on the
1599 // notify from the auth)
1600 dirfragtree.swap(temp);
94b18763
FG
1601 for (const auto &p : dirfrags) {
1602 if (!dirfragtree.is_leaf(p.first)) {
1603 dout(10) << " forcing open dirfrag " << p.first << " to leaf (racing with split|merge)" << dendl;
1604 dirfragtree.force_to_leaf(g_ceph_context, p.first);
7c673cae 1605 }
94b18763
FG
1606 if (p.second->is_auth())
1607 p.second->state_clear(CDir::STATE_DIRTYDFT);
7c673cae
FG
1608 }
1609 }
1610 if (g_conf->mds_debug_frag)
1611 verify_dirfrags();
1612 }
1613 break;
1614
1615 case CEPH_LOCK_IFILE:
1616 if (!is_auth()) {
1617 ::decode(inode.version, p);
1618 ::decode(tm, p);
1619 if (inode.ctime < tm) inode.ctime = tm;
1620 ::decode(inode.mtime, p);
1621 ::decode(inode.atime, p);
1622 ::decode(inode.time_warp_seq, p);
1623 if (!is_dir()) {
1624 ::decode(inode.layout, p);
1625 ::decode(inode.size, p);
1626 ::decode(inode.truncate_seq, p);
1627 ::decode(inode.truncate_size, p);
1628 ::decode(inode.client_ranges, p);
1629 ::decode(inode.inline_data, p);
1630 }
1631 } else {
1632 bool replica_dirty;
1633 ::decode(replica_dirty, p);
1634 if (replica_dirty) {
1635 dout(10) << "decode_lock_state setting filelock dirty flag" << dendl;
1636 filelock.mark_dirty(); // ok bc we're auth and caller will handle
1637 }
1638 }
1639 {
1640 frag_info_t dirstat;
1641 ::decode(dirstat, p);
1642 if (!is_auth()) {
1643 dout(10) << " taking inode dirstat " << dirstat << " for " << *this << dendl;
1644 inode.dirstat = dirstat; // take inode summation if replica
1645 }
1646 __u32 n;
1647 ::decode(n, p);
1648 dout(10) << " ...got " << n << " fragstats on " << *this << dendl;
1649 while (n--) {
1650 frag_t fg;
1651 snapid_t fgfirst;
1652 frag_info_t fragstat;
1653 frag_info_t accounted_fragstat;
1654 ::decode(fg, p);
1655 ::decode(fgfirst, p);
1656 ::decode(fragstat, p);
1657 ::decode(accounted_fragstat, p);
1658 dout(10) << fg << " [" << fgfirst << ",head] " << dendl;
1659 dout(10) << fg << " fragstat " << fragstat << dendl;
1660 dout(20) << fg << " accounted_fragstat " << accounted_fragstat << dendl;
1661
1662 CDir *dir = get_dirfrag(fg);
1663 if (is_auth()) {
1664 assert(dir); // i am auth; i had better have this dir open
1665 dout(10) << fg << " first " << dir->first << " -> " << fgfirst
1666 << " on " << *dir << dendl;
1667 dir->first = fgfirst;
1668 dir->fnode.fragstat = fragstat;
1669 dir->fnode.accounted_fragstat = accounted_fragstat;
1670 dir->first = fgfirst;
1671 if (!(fragstat == accounted_fragstat)) {
1672 dout(10) << fg << " setting filelock updated flag" << dendl;
1673 filelock.mark_dirty(); // ok bc we're auth and caller will handle
1674 }
1675 } else {
1676 if (dir && dir->is_auth()) {
1677 dout(10) << fg << " first " << dir->first << " -> " << fgfirst
1678 << " on " << *dir << dendl;
1679 dir->first = fgfirst;
1680 fnode_t *pf = dir->get_projected_fnode();
1681 finish_scatter_update(&filelock, dir,
1682 inode.dirstat.version, pf->accounted_fragstat.version);
1683 }
1684 }
1685 }
1686 }
1687 break;
1688
1689 case CEPH_LOCK_INEST:
1690 if (is_auth()) {
1691 bool replica_dirty;
1692 ::decode(replica_dirty, p);
1693 if (replica_dirty) {
1694 dout(10) << "decode_lock_state setting nestlock dirty flag" << dendl;
1695 nestlock.mark_dirty(); // ok bc we're auth and caller will handle
1696 }
1697 } else {
1698 ::decode(inode.version, p);
1699 }
1700 {
1701 nest_info_t rstat;
1702 ::decode(rstat, p);
1703 if (!is_auth()) {
1704 dout(10) << " taking inode rstat " << rstat << " for " << *this << dendl;
1705 inode.rstat = rstat; // take inode summation if replica
1706 }
1707 __u32 n;
1708 ::decode(n, p);
1709 while (n--) {
1710 frag_t fg;
1711 snapid_t fgfirst;
1712 nest_info_t rstat;
1713 nest_info_t accounted_rstat;
94b18763 1714 decltype(CDir::dirty_old_rstat) dirty_old_rstat;
7c673cae
FG
1715 ::decode(fg, p);
1716 ::decode(fgfirst, p);
1717 ::decode(rstat, p);
1718 ::decode(accounted_rstat, p);
1719 ::decode(dirty_old_rstat, p);
1720 dout(10) << fg << " [" << fgfirst << ",head]" << dendl;
1721 dout(10) << fg << " rstat " << rstat << dendl;
1722 dout(10) << fg << " accounted_rstat " << accounted_rstat << dendl;
1723 dout(10) << fg << " dirty_old_rstat " << dirty_old_rstat << dendl;
1724
1725 CDir *dir = get_dirfrag(fg);
1726 if (is_auth()) {
1727 assert(dir); // i am auth; i had better have this dir open
1728 dout(10) << fg << " first " << dir->first << " -> " << fgfirst
1729 << " on " << *dir << dendl;
1730 dir->first = fgfirst;
1731 dir->fnode.rstat = rstat;
1732 dir->fnode.accounted_rstat = accounted_rstat;
1733 dir->dirty_old_rstat.swap(dirty_old_rstat);
1734 if (!(rstat == accounted_rstat) || !dir->dirty_old_rstat.empty()) {
1735 dout(10) << fg << " setting nestlock updated flag" << dendl;
1736 nestlock.mark_dirty(); // ok bc we're auth and caller will handle
1737 }
1738 } else {
1739 if (dir && dir->is_auth()) {
1740 dout(10) << fg << " first " << dir->first << " -> " << fgfirst
1741 << " on " << *dir << dendl;
1742 dir->first = fgfirst;
1743 fnode_t *pf = dir->get_projected_fnode();
1744 finish_scatter_update(&nestlock, dir,
1745 inode.rstat.version, pf->accounted_rstat.version);
1746 }
1747 }
1748 }
1749 }
1750 break;
1751
1752 case CEPH_LOCK_IXATTR:
1753 ::decode(inode.version, p);
1754 ::decode(tm, p);
1755 if (inode.ctime < tm) inode.ctime = tm;
1756 ::decode(xattrs, p);
1757 break;
1758
1759 case CEPH_LOCK_ISNAP:
1760 {
1761 ::decode(inode.version, p);
1762 ::decode(tm, p);
1763 if (inode.ctime < tm) inode.ctime = tm;
1764 snapid_t seq = 0;
1765 if (snaprealm)
1766 seq = snaprealm->srnode.seq;
1767 decode_snap(p);
1768 if (snaprealm && snaprealm->srnode.seq != seq)
1769 mdcache->do_realm_invalidate_and_update_notify(this, seq ? CEPH_SNAP_OP_UPDATE:CEPH_SNAP_OP_SPLIT);
1770 }
1771 break;
1772
1773 case CEPH_LOCK_IFLOCK:
1774 ::decode(inode.version, p);
1775 _decode_file_locks(p);
1776 break;
1777
1778 case CEPH_LOCK_IPOLICY:
1779 if (inode.is_dir()) {
1780 ::decode(inode.version, p);
1781 ::decode(tm, p);
1782 if (inode.ctime < tm) inode.ctime = tm;
1783 ::decode(inode.layout, p);
1784 ::decode(inode.quota, p);
31f18b77 1785 mds_rank_t old_pin = inode.export_pin;
7c673cae 1786 ::decode(inode.export_pin, p);
31f18b77 1787 maybe_export_pin(old_pin != inode.export_pin);
7c673cae
FG
1788 }
1789 break;
1790
1791 default:
1792 ceph_abort();
1793 }
1794}
1795
1796
1797bool CInode::is_dirty_scattered()
1798{
1799 return
1800 filelock.is_dirty_or_flushing() ||
1801 nestlock.is_dirty_or_flushing() ||
1802 dirfragtreelock.is_dirty_or_flushing();
1803}
1804
1805void CInode::clear_scatter_dirty()
1806{
1807 filelock.remove_dirty();
1808 nestlock.remove_dirty();
1809 dirfragtreelock.remove_dirty();
1810}
1811
1812void CInode::clear_dirty_scattered(int type)
1813{
1814 dout(10) << "clear_dirty_scattered " << type << " on " << *this << dendl;
b32b8144 1815 assert(is_dir());
7c673cae
FG
1816 switch (type) {
1817 case CEPH_LOCK_IFILE:
1818 item_dirty_dirfrag_dir.remove_myself();
1819 break;
1820
1821 case CEPH_LOCK_INEST:
1822 item_dirty_dirfrag_nest.remove_myself();
1823 break;
1824
1825 case CEPH_LOCK_IDFT:
1826 item_dirty_dirfrag_dirfragtree.remove_myself();
1827 break;
1828
1829 default:
1830 ceph_abort();
1831 }
1832}
1833
1834
1835/*
1836 * when we initially scatter a lock, we need to check if any of the dirfrags
1837 * have out of date accounted_rstat/fragstat. if so, mark the lock stale.
1838 */
1839/* for more info on scatterlocks, see comments by Locker::scatter_writebehind */
1840void CInode::start_scatter(ScatterLock *lock)
1841{
1842 dout(10) << "start_scatter " << *lock << " on " << *this << dendl;
1843 assert(is_auth());
94b18763 1844 mempool_inode *pi = get_projected_inode();
7c673cae 1845
94b18763
FG
1846 for (const auto &p : dirfrags) {
1847 frag_t fg = p.first;
1848 CDir *dir = p.second;
7c673cae
FG
1849 fnode_t *pf = dir->get_projected_fnode();
1850 dout(20) << fg << " " << *dir << dendl;
1851
1852 if (!dir->is_auth())
1853 continue;
1854
1855 switch (lock->get_type()) {
1856 case CEPH_LOCK_IFILE:
1857 finish_scatter_update(lock, dir, pi->dirstat.version, pf->accounted_fragstat.version);
1858 break;
1859
1860 case CEPH_LOCK_INEST:
1861 finish_scatter_update(lock, dir, pi->rstat.version, pf->accounted_rstat.version);
1862 break;
1863
1864 case CEPH_LOCK_IDFT:
1865 dir->state_clear(CDir::STATE_DIRTYDFT);
1866 break;
1867 }
1868 }
1869}
1870
1871
1872class C_Inode_FragUpdate : public MDSLogContextBase {
1873protected:
1874 CInode *in;
1875 CDir *dir;
1876 MutationRef mut;
1877 MDSRank *get_mds() override {return in->mdcache->mds;}
1878 void finish(int r) override {
1879 in->_finish_frag_update(dir, mut);
1880 }
1881
1882public:
1883 C_Inode_FragUpdate(CInode *i, CDir *d, MutationRef& m) : in(i), dir(d), mut(m) {}
1884};
1885
1886void CInode::finish_scatter_update(ScatterLock *lock, CDir *dir,
1887 version_t inode_version, version_t dir_accounted_version)
1888{
1889 frag_t fg = dir->get_frag();
1890 assert(dir->is_auth());
1891
1892 if (dir->is_frozen()) {
1893 dout(10) << "finish_scatter_update " << fg << " frozen, marking " << *lock << " stale " << *dir << dendl;
1894 } else if (dir->get_version() == 0) {
1895 dout(10) << "finish_scatter_update " << fg << " not loaded, marking " << *lock << " stale " << *dir << dendl;
1896 } else {
1897 if (dir_accounted_version != inode_version) {
1898 dout(10) << "finish_scatter_update " << fg << " journaling accounted scatterstat update v" << inode_version << dendl;
1899
1900 MDLog *mdlog = mdcache->mds->mdlog;
1901 MutationRef mut(new MutationImpl());
1902 mut->ls = mdlog->get_current_segment();
1903
94b18763 1904 mempool_inode *pi = get_projected_inode();
7c673cae 1905 fnode_t *pf = dir->project_fnode();
7c673cae
FG
1906
1907 const char *ename = 0;
1908 switch (lock->get_type()) {
1909 case CEPH_LOCK_IFILE:
1910 pf->fragstat.version = pi->dirstat.version;
1911 pf->accounted_fragstat = pf->fragstat;
1912 ename = "lock ifile accounted scatter stat update";
1913 break;
1914 case CEPH_LOCK_INEST:
1915 pf->rstat.version = pi->rstat.version;
1916 pf->accounted_rstat = pf->rstat;
1917 ename = "lock inest accounted scatter stat update";
c07f9fc5
FG
1918
1919 if (!is_auth() && lock->get_state() == LOCK_MIX) {
1920 dout(10) << "finish_scatter_update try to assimilate dirty rstat on "
1921 << *dir << dendl;
1922 dir->assimilate_dirty_rstat_inodes();
1923 }
1924
7c673cae
FG
1925 break;
1926 default:
1927 ceph_abort();
1928 }
1929
c07f9fc5 1930 pf->version = dir->pre_dirty();
7c673cae
FG
1931 mut->add_projected_fnode(dir);
1932
1933 EUpdate *le = new EUpdate(mdlog, ename);
1934 mdlog->start_entry(le);
1935 le->metablob.add_dir_context(dir);
1936 le->metablob.add_dir(dir, true);
1937
1938 assert(!dir->is_frozen());
1939 mut->auth_pin(dir);
c07f9fc5
FG
1940
1941 if (lock->get_type() == CEPH_LOCK_INEST &&
1942 !is_auth() && lock->get_state() == LOCK_MIX) {
1943 dout(10) << "finish_scatter_update finish assimilating dirty rstat on "
1944 << *dir << dendl;
1945 dir->assimilate_dirty_rstat_inodes_finish(mut, &le->metablob);
1946
1947 if (!(pf->rstat == pf->accounted_rstat)) {
1948 if (mut->wrlocks.count(&nestlock) == 0) {
1949 mdcache->mds->locker->wrlock_force(&nestlock, mut);
1950 }
1951
1952 mdcache->mds->locker->mark_updated_scatterlock(&nestlock);
1953 mut->ls->dirty_dirfrag_nest.push_back(&item_dirty_dirfrag_nest);
1954 }
1955 }
7c673cae
FG
1956
1957 mdlog->submit_entry(le, new C_Inode_FragUpdate(this, dir, mut));
1958 } else {
1959 dout(10) << "finish_scatter_update " << fg << " accounted " << *lock
1960 << " scatter stat unchanged at v" << dir_accounted_version << dendl;
1961 }
1962 }
1963}
1964
1965void CInode::_finish_frag_update(CDir *dir, MutationRef& mut)
1966{
1967 dout(10) << "_finish_frag_update on " << *dir << dendl;
1968 mut->apply();
c07f9fc5 1969 mdcache->mds->locker->drop_locks(mut.get());
7c673cae
FG
1970 mut->cleanup();
1971}
1972
1973
1974/*
1975 * when we gather a lock, we need to assimilate dirfrag changes into the inode
1976 * state. it's possible we can't update the dirfrag accounted_rstat/fragstat
1977 * because the frag is auth and frozen, or that the replica couldn't for the same
1978 * reason. hopefully it will get updated the next time the lock cycles.
1979 *
1980 * we have two dimensions of behavior:
1981 * - we may be (auth and !frozen), and able to update, or not.
1982 * - the frag may be stale, or not.
1983 *
1984 * if the frag is non-stale, we want to assimilate the diff into the
1985 * inode, regardless of whether it's auth or updateable.
1986 *
1987 * if we update the frag, we want to set accounted_fragstat = frag,
1988 * both if we took the diff or it was stale and we are making it
1989 * un-stale.
1990 */
1991/* for more info on scatterlocks, see comments by Locker::scatter_writebehind */
1992void CInode::finish_scatter_gather_update(int type)
1993{
1994 LogChannelRef clog = mdcache->mds->clog;
1995
1996 dout(10) << "finish_scatter_gather_update " << type << " on " << *this << dendl;
1997 assert(is_auth());
1998
1999 switch (type) {
2000 case CEPH_LOCK_IFILE:
2001 {
2002 fragtree_t tmpdft = dirfragtree;
2003 struct frag_info_t dirstat;
2004 bool dirstat_valid = true;
2005
2006 // adjust summation
2007 assert(is_auth());
94b18763 2008 mempool_inode *pi = get_projected_inode();
7c673cae
FG
2009
2010 bool touched_mtime = false, touched_chattr = false;
2011 dout(20) << " orig dirstat " << pi->dirstat << dendl;
2012 pi->dirstat.version++;
94b18763
FG
2013 for (const auto &p : dirfrags) {
2014 frag_t fg = p.first;
2015 CDir *dir = p.second;
7c673cae
FG
2016 dout(20) << fg << " " << *dir << dendl;
2017
2018 bool update;
2019 if (dir->get_version() != 0) {
2020 update = dir->is_auth() && !dir->is_frozen();
2021 } else {
2022 update = false;
2023 dirstat_valid = false;
2024 }
2025
2026 fnode_t *pf = dir->get_projected_fnode();
2027 if (update)
2028 pf = dir->project_fnode();
2029
2030 if (pf->accounted_fragstat.version == pi->dirstat.version - 1) {
2031 dout(20) << fg << " fragstat " << pf->fragstat << dendl;
2032 dout(20) << fg << " accounted_fragstat " << pf->accounted_fragstat << dendl;
2033 pi->dirstat.add_delta(pf->fragstat, pf->accounted_fragstat, &touched_mtime, &touched_chattr);
2034 } else {
2035 dout(20) << fg << " skipping STALE accounted_fragstat " << pf->accounted_fragstat << dendl;
2036 }
2037
2038 if (pf->fragstat.nfiles < 0 ||
2039 pf->fragstat.nsubdirs < 0) {
2040 clog->error() << "bad/negative dir size on "
2041 << dir->dirfrag() << " " << pf->fragstat;
2042 assert(!"bad/negative fragstat" == g_conf->mds_verify_scatter);
2043
2044 if (pf->fragstat.nfiles < 0)
2045 pf->fragstat.nfiles = 0;
2046 if (pf->fragstat.nsubdirs < 0)
2047 pf->fragstat.nsubdirs = 0;
2048 }
2049
2050 if (update) {
2051 pf->accounted_fragstat = pf->fragstat;
2052 pf->fragstat.version = pf->accounted_fragstat.version = pi->dirstat.version;
2053 dout(10) << fg << " updated accounted_fragstat " << pf->fragstat << " on " << *dir << dendl;
2054 }
2055
2056 tmpdft.force_to_leaf(g_ceph_context, fg);
2057 dirstat.add(pf->fragstat);
2058 }
2059 if (touched_mtime)
2060 pi->mtime = pi->ctime = pi->dirstat.mtime;
2061 if (touched_chattr)
2062 pi->change_attr = pi->dirstat.change_attr;
2063 dout(20) << " final dirstat " << pi->dirstat << dendl;
2064
2065 if (dirstat_valid && !dirstat.same_sums(pi->dirstat)) {
2066 list<frag_t> ls;
2067 tmpdft.get_leaves_under(frag_t(), ls);
2068 for (list<frag_t>::iterator p = ls.begin(); p != ls.end(); ++p)
2069 if (!dirfrags.count(*p)) {
2070 dirstat_valid = false;
2071 break;
2072 }
2073 if (dirstat_valid) {
2074 if (state_test(CInode::STATE_REPAIRSTATS)) {
2075 dout(20) << " dirstat mismatch, fixing" << dendl;
2076 } else {
2077 clog->error() << "unmatched fragstat on " << ino() << ", inode has "
2078 << pi->dirstat << ", dirfrags have " << dirstat;
2079 assert(!"unmatched fragstat" == g_conf->mds_verify_scatter);
2080 }
2081 // trust the dirfrags for now
2082 version_t v = pi->dirstat.version;
2083 if (pi->dirstat.mtime > dirstat.mtime)
2084 dirstat.mtime = pi->dirstat.mtime;
2085 if (pi->dirstat.change_attr > dirstat.change_attr)
2086 dirstat.change_attr = pi->dirstat.change_attr;
2087 pi->dirstat = dirstat;
2088 pi->dirstat.version = v;
2089 }
2090 }
2091
d2e6a577
FG
2092 if (pi->dirstat.nfiles < 0 || pi->dirstat.nsubdirs < 0)
2093 {
2094 std::string path;
2095 make_path_string(path);
2096 clog->error() << "Inconsistent statistics detected: fragstat on inode "
2097 << ino() << " (" << path << "), inode has " << pi->dirstat;
7c673cae
FG
2098 assert(!"bad/negative fragstat" == g_conf->mds_verify_scatter);
2099
2100 if (pi->dirstat.nfiles < 0)
2101 pi->dirstat.nfiles = 0;
2102 if (pi->dirstat.nsubdirs < 0)
2103 pi->dirstat.nsubdirs = 0;
2104 }
2105 }
2106 break;
2107
2108 case CEPH_LOCK_INEST:
2109 {
2110 fragtree_t tmpdft = dirfragtree;
2111 nest_info_t rstat;
2112 rstat.rsubdirs = 1;
2113 bool rstat_valid = true;
2114
2115 // adjust summation
2116 assert(is_auth());
94b18763 2117 mempool_inode *pi = get_projected_inode();
7c673cae
FG
2118 dout(20) << " orig rstat " << pi->rstat << dendl;
2119 pi->rstat.version++;
94b18763
FG
2120 for (const auto &p : dirfrags) {
2121 frag_t fg = p.first;
2122 CDir *dir = p.second;
7c673cae
FG
2123 dout(20) << fg << " " << *dir << dendl;
2124
2125 bool update;
2126 if (dir->get_version() != 0) {
2127 update = dir->is_auth() && !dir->is_frozen();
2128 } else {
2129 update = false;
2130 rstat_valid = false;
2131 }
2132
2133 fnode_t *pf = dir->get_projected_fnode();
2134 if (update)
2135 pf = dir->project_fnode();
2136
2137 if (pf->accounted_rstat.version == pi->rstat.version-1) {
2138 // only pull this frag's dirty rstat inodes into the frag if
2139 // the frag is non-stale and updateable. if it's stale,
2140 // that info will just get thrown out!
2141 if (update)
2142 dir->assimilate_dirty_rstat_inodes();
2143
2144 dout(20) << fg << " rstat " << pf->rstat << dendl;
2145 dout(20) << fg << " accounted_rstat " << pf->accounted_rstat << dendl;
2146 dout(20) << fg << " dirty_old_rstat " << dir->dirty_old_rstat << dendl;
2147 mdcache->project_rstat_frag_to_inode(pf->rstat, pf->accounted_rstat,
2148 dir->first, CEPH_NOSNAP, this, true);
94b18763
FG
2149 for (auto &p : dir->dirty_old_rstat) {
2150 mdcache->project_rstat_frag_to_inode(p.second.rstat, p.second.accounted_rstat,
2151 p.second.first, p.first, this, true);
2152 }
7c673cae
FG
2153 if (update) // dir contents not valid if frozen or non-auth
2154 dir->check_rstats();
2155 } else {
2156 dout(20) << fg << " skipping STALE accounted_rstat " << pf->accounted_rstat << dendl;
2157 }
2158 if (update) {
2159 pf->accounted_rstat = pf->rstat;
2160 dir->dirty_old_rstat.clear();
2161 pf->rstat.version = pf->accounted_rstat.version = pi->rstat.version;
2162 dir->check_rstats();
2163 dout(10) << fg << " updated accounted_rstat " << pf->rstat << " on " << *dir << dendl;
2164 }
2165
2166 tmpdft.force_to_leaf(g_ceph_context, fg);
2167 rstat.add(pf->rstat);
2168 }
2169 dout(20) << " final rstat " << pi->rstat << dendl;
2170
2171 if (rstat_valid && !rstat.same_sums(pi->rstat)) {
2172 list<frag_t> ls;
2173 tmpdft.get_leaves_under(frag_t(), ls);
2174 for (list<frag_t>::iterator p = ls.begin(); p != ls.end(); ++p)
2175 if (!dirfrags.count(*p)) {
2176 rstat_valid = false;
2177 break;
2178 }
2179 if (rstat_valid) {
2180 if (state_test(CInode::STATE_REPAIRSTATS)) {
2181 dout(20) << " rstat mismatch, fixing" << dendl;
2182 } else {
d2e6a577
FG
2183 clog->error() << "inconsistent rstat on inode " << ino()
2184 << ", inode has " << pi->rstat
2185 << ", directory fragments have " << rstat;
7c673cae
FG
2186 assert(!"unmatched rstat" == g_conf->mds_verify_scatter);
2187 }
2188 // trust the dirfrag for now
2189 version_t v = pi->rstat.version;
2190 if (pi->rstat.rctime > rstat.rctime)
2191 rstat.rctime = pi->rstat.rctime;
2192 pi->rstat = rstat;
2193 pi->rstat.version = v;
2194 }
2195 }
2196
2197 mdcache->broadcast_quota_to_client(this);
2198 }
2199 break;
2200
2201 case CEPH_LOCK_IDFT:
2202 break;
2203
2204 default:
2205 ceph_abort();
2206 }
2207}
2208
2209void CInode::finish_scatter_gather_update_accounted(int type, MutationRef& mut, EMetaBlob *metablob)
2210{
2211 dout(10) << "finish_scatter_gather_update_accounted " << type << " on " << *this << dendl;
2212 assert(is_auth());
2213
94b18763
FG
2214 for (const auto &p : dirfrags) {
2215 CDir *dir = p.second;
7c673cae
FG
2216 if (!dir->is_auth() || dir->get_version() == 0 || dir->is_frozen())
2217 continue;
2218
2219 if (type == CEPH_LOCK_IDFT)
2220 continue; // nothing to do.
2221
2222 dout(10) << " journaling updated frag accounted_ on " << *dir << dendl;
2223 assert(dir->is_projected());
2224 fnode_t *pf = dir->get_projected_fnode();
2225 pf->version = dir->pre_dirty();
2226 mut->add_projected_fnode(dir);
2227 metablob->add_dir(dir, true);
2228 mut->auth_pin(dir);
2229
2230 if (type == CEPH_LOCK_INEST)
2231 dir->assimilate_dirty_rstat_inodes_finish(mut, metablob);
2232 }
2233}
2234
2235// waiting
2236
2237bool CInode::is_frozen() const
2238{
2239 if (is_frozen_inode()) return true;
2240 if (parent && parent->dir->is_frozen()) return true;
2241 return false;
2242}
2243
2244bool CInode::is_frozen_dir() const
2245{
2246 if (parent && parent->dir->is_frozen_dir()) return true;
2247 return false;
2248}
2249
2250bool CInode::is_freezing() const
2251{
2252 if (is_freezing_inode()) return true;
2253 if (parent && parent->dir->is_freezing()) return true;
2254 return false;
2255}
2256
2257void CInode::add_dir_waiter(frag_t fg, MDSInternalContextBase *c)
2258{
2259 if (waiting_on_dir.empty())
2260 get(PIN_DIRWAITER);
2261 waiting_on_dir[fg].push_back(c);
2262 dout(10) << "add_dir_waiter frag " << fg << " " << c << " on " << *this << dendl;
2263}
2264
2265void CInode::take_dir_waiting(frag_t fg, list<MDSInternalContextBase*>& ls)
2266{
2267 if (waiting_on_dir.empty())
2268 return;
2269
94b18763
FG
2270 auto it = waiting_on_dir.find(fg);
2271 if (it != waiting_on_dir.end()) {
2272 dout(10) << __func__ << " frag " << fg << " on " << *this << dendl;
2273 ls.splice(ls.end(), it->second);
2274 waiting_on_dir.erase(it);
7c673cae
FG
2275
2276 if (waiting_on_dir.empty())
2277 put(PIN_DIRWAITER);
2278 }
2279}
2280
2281void CInode::add_waiter(uint64_t tag, MDSInternalContextBase *c)
2282{
2283 dout(10) << "add_waiter tag " << std::hex << tag << std::dec << " " << c
2284 << " !ambig " << !state_test(STATE_AMBIGUOUSAUTH)
2285 << " !frozen " << !is_frozen_inode()
2286 << " !freezing " << !is_freezing_inode()
2287 << dendl;
2288 // wait on the directory?
2289 // make sure its not the inode that is explicitly ambiguous|freezing|frozen
2290 if (((tag & WAIT_SINGLEAUTH) && !state_test(STATE_AMBIGUOUSAUTH)) ||
2291 ((tag & WAIT_UNFREEZE) &&
2292 !is_frozen_inode() && !is_freezing_inode() && !is_frozen_auth_pin())) {
2293 dout(15) << "passing waiter up tree" << dendl;
2294 parent->dir->add_waiter(tag, c);
2295 return;
2296 }
2297 dout(15) << "taking waiter here" << dendl;
2298 MDSCacheObject::add_waiter(tag, c);
2299}
2300
2301void CInode::take_waiting(uint64_t mask, list<MDSInternalContextBase*>& ls)
2302{
2303 if ((mask & WAIT_DIR) && !waiting_on_dir.empty()) {
2304 // take all dentry waiters
2305 while (!waiting_on_dir.empty()) {
94b18763
FG
2306 auto it = waiting_on_dir.begin();
2307 dout(10) << __func__ << " dirfrag " << it->first << " on " << *this << dendl;
2308 ls.splice(ls.end(), it->second);
2309 waiting_on_dir.erase(it);
7c673cae
FG
2310 }
2311 put(PIN_DIRWAITER);
2312 }
2313
2314 // waiting
2315 MDSCacheObject::take_waiting(mask, ls);
2316}
2317
2318bool CInode::freeze_inode(int auth_pin_allowance)
2319{
2320 assert(auth_pin_allowance > 0); // otherwise we need to adjust parent's nested_auth_pins
2321 assert(auth_pins >= auth_pin_allowance);
2322 if (auth_pins > auth_pin_allowance) {
2323 dout(10) << "freeze_inode - waiting for auth_pins to drop to " << auth_pin_allowance << dendl;
2324 auth_pin_freeze_allowance = auth_pin_allowance;
2325 get(PIN_FREEZING);
2326 state_set(STATE_FREEZING);
2327 return false;
2328 }
2329
2330 dout(10) << "freeze_inode - frozen" << dendl;
2331 assert(auth_pins == auth_pin_allowance);
2332 if (!state_test(STATE_FROZEN)) {
2333 get(PIN_FROZEN);
2334 state_set(STATE_FROZEN);
2335 }
2336 return true;
2337}
2338
2339void CInode::unfreeze_inode(list<MDSInternalContextBase*>& finished)
2340{
2341 dout(10) << "unfreeze_inode" << dendl;
2342 if (state_test(STATE_FREEZING)) {
2343 state_clear(STATE_FREEZING);
2344 put(PIN_FREEZING);
2345 } else if (state_test(STATE_FROZEN)) {
2346 state_clear(STATE_FROZEN);
2347 put(PIN_FROZEN);
2348 } else
2349 ceph_abort();
2350 take_waiting(WAIT_UNFREEZE, finished);
2351}
2352
2353void CInode::unfreeze_inode()
2354{
2355 list<MDSInternalContextBase*> finished;
2356 unfreeze_inode(finished);
2357 mdcache->mds->queue_waiters(finished);
2358}
2359
2360void CInode::freeze_auth_pin()
2361{
2362 assert(state_test(CInode::STATE_FROZEN));
2363 state_set(CInode::STATE_FROZENAUTHPIN);
2364}
2365
2366void CInode::unfreeze_auth_pin()
2367{
2368 assert(state_test(CInode::STATE_FROZENAUTHPIN));
2369 state_clear(CInode::STATE_FROZENAUTHPIN);
2370 if (!state_test(STATE_FREEZING|STATE_FROZEN)) {
2371 list<MDSInternalContextBase*> finished;
2372 take_waiting(WAIT_UNFREEZE, finished);
2373 mdcache->mds->queue_waiters(finished);
2374 }
2375}
2376
2377void CInode::clear_ambiguous_auth(list<MDSInternalContextBase*>& finished)
2378{
2379 assert(state_test(CInode::STATE_AMBIGUOUSAUTH));
2380 state_clear(CInode::STATE_AMBIGUOUSAUTH);
2381 take_waiting(CInode::WAIT_SINGLEAUTH, finished);
2382}
2383
2384void CInode::clear_ambiguous_auth()
2385{
2386 list<MDSInternalContextBase*> finished;
2387 clear_ambiguous_auth(finished);
2388 mdcache->mds->queue_waiters(finished);
2389}
2390
2391// auth_pins
91327a77
AA
2392bool CInode::can_auth_pin(int *err_ret) const {
2393 int err;
2394 if (!is_auth()) {
2395 err = ERR_NOT_AUTH;
2396 } else if (is_freezing_inode() || is_frozen_inode() || is_frozen_auth_pin()) {
2397 err = ERR_EXPORTING_INODE;
2398 } else {
2399 if (parent)
2400 return parent->can_auth_pin(err_ret);
2401 err = 0;
2402 }
2403 if (err && err_ret)
2404 *err_ret = err;
2405 return !err;
7c673cae
FG
2406}
2407
2408void CInode::auth_pin(void *by)
2409{
2410 if (auth_pins == 0)
2411 get(PIN_AUTHPIN);
2412 auth_pins++;
2413
2414#ifdef MDS_AUTHPIN_SET
2415 auth_pin_set.insert(by);
2416#endif
2417
2418 dout(10) << "auth_pin by " << by << " on " << *this
2419 << " now " << auth_pins << "+" << nested_auth_pins
2420 << dendl;
2421
2422 if (parent)
2423 parent->adjust_nested_auth_pins(1, 1, this);
2424}
2425
2426void CInode::auth_unpin(void *by)
2427{
2428 auth_pins--;
2429
2430#ifdef MDS_AUTHPIN_SET
2431 assert(auth_pin_set.count(by));
2432 auth_pin_set.erase(auth_pin_set.find(by));
2433#endif
2434
2435 if (auth_pins == 0)
2436 put(PIN_AUTHPIN);
2437
2438 dout(10) << "auth_unpin by " << by << " on " << *this
2439 << " now " << auth_pins << "+" << nested_auth_pins
2440 << dendl;
2441
2442 assert(auth_pins >= 0);
2443
2444 if (parent)
2445 parent->adjust_nested_auth_pins(-1, -1, by);
2446
2447 if (is_freezing_inode() &&
2448 auth_pins == auth_pin_freeze_allowance) {
2449 dout(10) << "auth_unpin freezing!" << dendl;
2450 get(PIN_FROZEN);
2451 put(PIN_FREEZING);
2452 state_clear(STATE_FREEZING);
2453 state_set(STATE_FROZEN);
2454 finish_waiting(WAIT_FROZEN);
2455 }
2456}
2457
2458void CInode::adjust_nested_auth_pins(int a, void *by)
2459{
2460 assert(a);
2461 nested_auth_pins += a;
2462 dout(35) << "adjust_nested_auth_pins by " << by
2463 << " change " << a << " yields "
2464 << auth_pins << "+" << nested_auth_pins << dendl;
2465 assert(nested_auth_pins >= 0);
2466
2467 if (g_conf->mds_debug_auth_pins) {
2468 // audit
2469 int s = 0;
94b18763
FG
2470 for (const auto &p : dirfrags) {
2471 CDir *dir = p.second;
7c673cae
FG
2472 if (!dir->is_subtree_root() && dir->get_cum_auth_pins())
2473 s++;
2474 }
2475 assert(s == nested_auth_pins);
2476 }
2477
2478 if (parent)
2479 parent->adjust_nested_auth_pins(a, 0, by);
2480}
2481
2482
2483// authority
2484
2485mds_authority_t CInode::authority() const
2486{
2487 if (inode_auth.first >= 0)
2488 return inode_auth;
2489
2490 if (parent)
2491 return parent->dir->authority();
2492
2493 // new items that are not yet linked in (in the committed plane) belong
2494 // to their first parent.
2495 if (!projected_parent.empty())
2496 return projected_parent.front()->dir->authority();
2497
2498 return CDIR_AUTH_UNDEF;
2499}
2500
2501
2502// SNAP
2503
2504snapid_t CInode::get_oldest_snap()
2505{
2506 snapid_t t = first;
2507 if (!old_inodes.empty())
2508 t = old_inodes.begin()->second.first;
2509 return MIN(t, oldest_snap);
2510}
2511
94b18763 2512CInode::mempool_old_inode& CInode::cow_old_inode(snapid_t follows, bool cow_head)
7c673cae
FG
2513{
2514 assert(follows >= first);
2515
94b18763
FG
2516 mempool_inode *pi = cow_head ? get_projected_inode() : get_previous_projected_inode();
2517 mempool_xattr_map *px = cow_head ? get_projected_xattrs() : get_previous_projected_xattrs();
7c673cae 2518
94b18763 2519 mempool_old_inode &old = old_inodes[follows];
7c673cae
FG
2520 old.first = first;
2521 old.inode = *pi;
2522 old.xattrs = *px;
2523
2524 if (first < oldest_snap)
2525 oldest_snap = first;
2526
2527 dout(10) << " " << px->size() << " xattrs cowed, " << *px << dendl;
2528
2529 old.inode.trim_client_ranges(follows);
2530
2531 if (g_conf->mds_snap_rstat &&
2532 !(old.inode.rstat == old.inode.accounted_rstat))
2533 dirty_old_rstats.insert(follows);
2534
2535 first = follows+1;
2536
2537 dout(10) << "cow_old_inode " << (cow_head ? "head" : "previous_head" )
2538 << " to [" << old.first << "," << follows << "] on "
2539 << *this << dendl;
2540
2541 return old;
2542}
2543
2544void CInode::split_old_inode(snapid_t snap)
2545{
94b18763
FG
2546 auto it = old_inodes.lower_bound(snap);
2547 assert(it != old_inodes.end() && it->second.first < snap);
7c673cae 2548
94b18763
FG
2549 mempool_old_inode &old = old_inodes[snap - 1];
2550 old = it->second;
7c673cae 2551
94b18763
FG
2552 it->second.first = snap;
2553 dout(10) << __func__ << " " << "[" << old.first << "," << it->first
2554 << "] to [" << snap << "," << it->first << "] on " << *this << dendl;
7c673cae
FG
2555}
2556
2557void CInode::pre_cow_old_inode()
2558{
2559 snapid_t follows = find_snaprealm()->get_newest_seq();
2560 if (first <= follows)
2561 cow_old_inode(follows, true);
2562}
2563
2564void CInode::purge_stale_snap_data(const set<snapid_t>& snaps)
2565{
2566 dout(10) << "purge_stale_snap_data " << snaps << dendl;
2567
94b18763
FG
2568 for (auto it = old_inodes.begin(); it != old_inodes.end(); ) {
2569 const snapid_t &id = it->first;
2570 const auto &s = snaps.lower_bound(it->second.first);
2571 if (s == snaps.end() || *s > id) {
2572 dout(10) << " purging old_inode [" << it->second.first << "," << id << "]" << dendl;
2573 it = old_inodes.erase(it);
2574 } else {
2575 ++it;
2576 }
7c673cae
FG
2577 }
2578}
2579
2580/*
2581 * pick/create an old_inode
2582 */
94b18763 2583CInode::mempool_old_inode * CInode::pick_old_inode(snapid_t snap)
7c673cae 2584{
94b18763
FG
2585 auto it = old_inodes.lower_bound(snap); // p is first key >= to snap
2586 if (it != old_inodes.end() && it->second.first <= snap) {
2587 dout(10) << __func__ << " snap " << snap << " -> [" << it->second.first << "," << it->first << "]" << dendl;
2588 return &it->second;
7c673cae
FG
2589 }
2590 dout(10) << "pick_old_inode snap " << snap << " -> nothing" << dendl;
2591 return NULL;
2592}
2593
2594void CInode::open_snaprealm(bool nosplit)
2595{
2596 if (!snaprealm) {
2597 SnapRealm *parent = find_snaprealm();
2598 snaprealm = new SnapRealm(mdcache, this);
2599 if (parent) {
2600 dout(10) << "open_snaprealm " << snaprealm
2601 << " parent is " << parent
2602 << dendl;
2603 dout(30) << " siblings are " << parent->open_children << dendl;
2604 snaprealm->parent = parent;
2605 if (!nosplit)
2606 parent->split_at(snaprealm);
2607 parent->open_children.insert(snaprealm);
2608 }
2609 }
2610}
2611void CInode::close_snaprealm(bool nojoin)
2612{
2613 if (snaprealm) {
2614 dout(15) << "close_snaprealm " << *snaprealm << dendl;
2615 snaprealm->close_parents();
2616 if (snaprealm->parent) {
2617 snaprealm->parent->open_children.erase(snaprealm);
2618 //if (!nojoin)
2619 //snaprealm->parent->join(snaprealm);
2620 }
2621 delete snaprealm;
2622 snaprealm = 0;
2623 }
2624}
2625
2626SnapRealm *CInode::find_snaprealm() const
2627{
2628 const CInode *cur = this;
2629 while (!cur->snaprealm) {
2630 if (cur->get_parent_dn())
2631 cur = cur->get_parent_dn()->get_dir()->get_inode();
2632 else if (get_projected_parent_dn())
2633 cur = cur->get_projected_parent_dn()->get_dir()->get_inode();
2634 else
2635 break;
2636 }
2637 return cur->snaprealm;
2638}
2639
2640void CInode::encode_snap_blob(bufferlist &snapbl)
2641{
2642 if (snaprealm) {
2643 ::encode(snaprealm->srnode, snapbl);
2644 dout(20) << "encode_snap_blob " << *snaprealm << dendl;
2645 }
2646}
2647void CInode::decode_snap_blob(bufferlist& snapbl)
2648{
2649 if (snapbl.length()) {
2650 open_snaprealm();
2651 bufferlist::iterator p = snapbl.begin();
2652 ::decode(snaprealm->srnode, p);
2653 if (is_base()) {
2654 bool ok = snaprealm->_open_parents(NULL);
2655 assert(ok);
2656 }
2657 dout(20) << "decode_snap_blob " << *snaprealm << dendl;
2658 }
2659}
2660
2661void CInode::encode_snap(bufferlist& bl)
2662{
2663 bufferlist snapbl;
2664 encode_snap_blob(snapbl);
2665 ::encode(snapbl, bl);
2666 ::encode(oldest_snap, bl);
2667}
2668
2669void CInode::decode_snap(bufferlist::iterator& p)
2670{
2671 bufferlist snapbl;
2672 ::decode(snapbl, p);
2673 ::decode(oldest_snap, p);
2674 decode_snap_blob(snapbl);
2675}
2676
2677// =============================================
2678
2679client_t CInode::calc_ideal_loner()
2680{
2681 if (mdcache->is_readonly())
2682 return -1;
2683 if (!mds_caps_wanted.empty())
2684 return -1;
2685
2686 int n = 0;
2687 client_t loner = -1;
2688 for (map<client_t,Capability*>::iterator it = client_caps.begin();
2689 it != client_caps.end();
2690 ++it)
2691 if (!it->second->is_stale() &&
2692 ((it->second->wanted() & (CEPH_CAP_ANY_WR|CEPH_CAP_FILE_WR|CEPH_CAP_FILE_RD)) ||
2693 (inode.is_dir() && !has_subtree_root_dirfrag()))) {
2694 if (n)
2695 return -1;
2696 n++;
2697 loner = it->first;
2698 }
2699 return loner;
2700}
2701
b32b8144 2702bool CInode::choose_ideal_loner()
7c673cae
FG
2703{
2704 want_loner_cap = calc_ideal_loner();
b32b8144
FG
2705 int changed = false;
2706 if (loner_cap >= 0 && loner_cap != want_loner_cap) {
2707 if (!try_drop_loner())
2708 return false;
2709 changed = true;
2710 }
2711
2712 if (want_loner_cap >= 0) {
2713 if (loner_cap < 0) {
2714 set_loner_cap(want_loner_cap);
2715 changed = true;
2716 } else
2717 assert(loner_cap == want_loner_cap);
2718 }
2719 return changed;
7c673cae
FG
2720}
2721
2722bool CInode::try_set_loner()
2723{
2724 assert(want_loner_cap >= 0);
2725 if (loner_cap >= 0 && loner_cap != want_loner_cap)
2726 return false;
2727 set_loner_cap(want_loner_cap);
2728 return true;
2729}
2730
2731void CInode::set_loner_cap(client_t l)
2732{
2733 loner_cap = l;
2734 authlock.set_excl_client(loner_cap);
2735 filelock.set_excl_client(loner_cap);
2736 linklock.set_excl_client(loner_cap);
2737 xattrlock.set_excl_client(loner_cap);
2738}
2739
2740bool CInode::try_drop_loner()
2741{
2742 if (loner_cap < 0)
2743 return true;
2744
2745 int other_allowed = get_caps_allowed_by_type(CAP_ANY);
2746 Capability *cap = get_client_cap(loner_cap);
2747 if (!cap ||
2748 (cap->issued() & ~other_allowed) == 0) {
2749 set_loner_cap(-1);
2750 return true;
2751 }
2752 return false;
2753}
2754
2755
2756// choose new lock state during recovery, based on issued caps
2757void CInode::choose_lock_state(SimpleLock *lock, int allissued)
2758{
2759 int shift = lock->get_cap_shift();
2760 int issued = (allissued >> shift) & lock->get_cap_mask();
2761 if (is_auth()) {
2762 if (lock->is_xlocked()) {
2763 // do nothing here
2764 } else if (lock->get_state() != LOCK_MIX) {
2765 if (issued & (CEPH_CAP_GEXCL | CEPH_CAP_GBUFFER))
2766 lock->set_state(LOCK_EXCL);
2767 else if (issued & CEPH_CAP_GWR)
2768 lock->set_state(LOCK_MIX);
2769 else if (lock->is_dirty()) {
2770 if (is_replicated())
2771 lock->set_state(LOCK_MIX);
2772 else
2773 lock->set_state(LOCK_LOCK);
2774 } else
2775 lock->set_state(LOCK_SYNC);
2776 }
2777 } else {
2778 // our states have already been chosen during rejoin.
2779 if (lock->is_xlocked())
2780 assert(lock->get_state() == LOCK_LOCK);
2781 }
2782}
2783
2784void CInode::choose_lock_states(int dirty_caps)
2785{
2786 int issued = get_caps_issued() | dirty_caps;
b32b8144
FG
2787 if (is_auth() && (issued & (CEPH_CAP_ANY_EXCL|CEPH_CAP_ANY_WR)))
2788 choose_ideal_loner();
7c673cae
FG
2789 choose_lock_state(&filelock, issued);
2790 choose_lock_state(&nestlock, issued);
2791 choose_lock_state(&dirfragtreelock, issued);
2792 choose_lock_state(&authlock, issued);
2793 choose_lock_state(&xattrlock, issued);
2794 choose_lock_state(&linklock, issued);
2795}
2796
2797Capability *CInode::add_client_cap(client_t client, Session *session, SnapRealm *conrealm)
2798{
2799 if (client_caps.empty()) {
2800 get(PIN_CAPS);
2801 if (conrealm)
2802 containing_realm = conrealm;
2803 else
2804 containing_realm = find_snaprealm();
2805 containing_realm->inodes_with_caps.push_back(&item_caps);
2806 dout(10) << "add_client_cap first cap, joining realm " << *containing_realm << dendl;
2807 }
2808
2809 if (client_caps.empty())
2810 mdcache->num_inodes_with_caps++;
2811
2812 Capability *cap = new Capability(this, ++mdcache->last_cap_id, client);
2813 assert(client_caps.count(client) == 0);
2814 client_caps[client] = cap;
2815
2816 session->add_cap(cap);
2817 if (session->is_stale())
2818 cap->mark_stale();
2819
2820 cap->client_follows = first-1;
2821
2822 containing_realm->add_cap(client, cap);
2823
2824 return cap;
2825}
2826
2827void CInode::remove_client_cap(client_t client)
2828{
2829 assert(client_caps.count(client) == 1);
2830 Capability *cap = client_caps[client];
2831
2832 cap->item_session_caps.remove_myself();
2833 cap->item_revoking_caps.remove_myself();
2834 cap->item_client_revoking_caps.remove_myself();
2835 containing_realm->remove_cap(client, cap);
2836
2837 if (client == loner_cap)
2838 loner_cap = -1;
2839
2840 delete cap;
2841 client_caps.erase(client);
2842 if (client_caps.empty()) {
2843 dout(10) << "remove_client_cap last cap, leaving realm " << *containing_realm << dendl;
2844 put(PIN_CAPS);
2845 item_caps.remove_myself();
2846 containing_realm = NULL;
2847 item_open_file.remove_myself(); // unpin logsegment
2848 mdcache->num_inodes_with_caps--;
2849 }
2850
2851 //clean up advisory locks
2852 bool fcntl_removed = fcntl_locks ? fcntl_locks->remove_all_from(client) : false;
2853 bool flock_removed = flock_locks ? flock_locks->remove_all_from(client) : false;
2854 if (fcntl_removed || flock_removed) {
2855 list<MDSInternalContextBase*> waiters;
2856 take_waiting(CInode::WAIT_FLOCK, waiters);
2857 mdcache->mds->queue_waiters(waiters);
2858 }
2859}
2860
2861void CInode::move_to_realm(SnapRealm *realm)
2862{
2863 dout(10) << "move_to_realm joining realm " << *realm
2864 << ", leaving realm " << *containing_realm << dendl;
2865 for (map<client_t,Capability*>::iterator q = client_caps.begin();
2866 q != client_caps.end();
2867 ++q) {
2868 containing_realm->remove_cap(q->first, q->second);
2869 realm->add_cap(q->first, q->second);
2870 }
2871 item_caps.remove_myself();
2872 realm->inodes_with_caps.push_back(&item_caps);
2873 containing_realm = realm;
2874}
2875
2876Capability *CInode::reconnect_cap(client_t client, const cap_reconnect_t& icr, Session *session)
2877{
2878 Capability *cap = get_client_cap(client);
2879 if (cap) {
2880 // FIXME?
2881 cap->merge(icr.capinfo.wanted, icr.capinfo.issued);
2882 } else {
2883 cap = add_client_cap(client, session);
2884 cap->set_cap_id(icr.capinfo.cap_id);
2885 cap->set_wanted(icr.capinfo.wanted);
2886 cap->issue_norevoke(icr.capinfo.issued);
2887 cap->reset_seq();
2888 }
2889 cap->set_last_issue_stamp(ceph_clock_now());
2890 return cap;
2891}
2892
2893void CInode::clear_client_caps_after_export()
2894{
2895 while (!client_caps.empty())
2896 remove_client_cap(client_caps.begin()->first);
2897 loner_cap = -1;
2898 want_loner_cap = -1;
2899 mds_caps_wanted.clear();
2900}
2901
2902void CInode::export_client_caps(map<client_t,Capability::Export>& cl)
2903{
2904 for (map<client_t,Capability*>::iterator it = client_caps.begin();
2905 it != client_caps.end();
2906 ++it) {
2907 cl[it->first] = it->second->make_export();
2908 }
2909}
2910
2911 // caps allowed
2912int CInode::get_caps_liked() const
2913{
2914 if (is_dir())
2915 return CEPH_CAP_PIN | CEPH_CAP_ANY_EXCL | CEPH_CAP_ANY_SHARED; // but not, say, FILE_RD|WR|WRBUFFER
2916 else
2917 return CEPH_CAP_ANY & ~CEPH_CAP_FILE_LAZYIO;
2918}
2919
2920int CInode::get_caps_allowed_ever() const
2921{
2922 int allowed;
2923 if (is_dir())
2924 allowed = CEPH_CAP_PIN | CEPH_CAP_ANY_EXCL | CEPH_CAP_ANY_SHARED;
2925 else
2926 allowed = CEPH_CAP_ANY;
2927 return allowed &
2928 (CEPH_CAP_PIN |
2929 (filelock.gcaps_allowed_ever() << filelock.get_cap_shift()) |
2930 (authlock.gcaps_allowed_ever() << authlock.get_cap_shift()) |
2931 (xattrlock.gcaps_allowed_ever() << xattrlock.get_cap_shift()) |
2932 (linklock.gcaps_allowed_ever() << linklock.get_cap_shift()));
2933}
2934
2935int CInode::get_caps_allowed_by_type(int type) const
2936{
2937 return
2938 CEPH_CAP_PIN |
2939 (filelock.gcaps_allowed(type) << filelock.get_cap_shift()) |
2940 (authlock.gcaps_allowed(type) << authlock.get_cap_shift()) |
2941 (xattrlock.gcaps_allowed(type) << xattrlock.get_cap_shift()) |
2942 (linklock.gcaps_allowed(type) << linklock.get_cap_shift());
2943}
2944
2945int CInode::get_caps_careful() const
2946{
2947 return
2948 (filelock.gcaps_careful() << filelock.get_cap_shift()) |
2949 (authlock.gcaps_careful() << authlock.get_cap_shift()) |
2950 (xattrlock.gcaps_careful() << xattrlock.get_cap_shift()) |
2951 (linklock.gcaps_careful() << linklock.get_cap_shift());
2952}
2953
2954int CInode::get_xlocker_mask(client_t client) const
2955{
2956 return
2957 (filelock.gcaps_xlocker_mask(client) << filelock.get_cap_shift()) |
2958 (authlock.gcaps_xlocker_mask(client) << authlock.get_cap_shift()) |
2959 (xattrlock.gcaps_xlocker_mask(client) << xattrlock.get_cap_shift()) |
2960 (linklock.gcaps_xlocker_mask(client) << linklock.get_cap_shift());
2961}
2962
94b18763 2963int CInode::get_caps_allowed_for_client(Session *session, mempool_inode *file_i) const
7c673cae
FG
2964{
2965 client_t client = session->info.inst.name.num();
2966 int allowed;
2967 if (client == get_loner()) {
2968 // as the loner, we get the loner_caps AND any xlocker_caps for things we have xlocked
2969 allowed =
2970 get_caps_allowed_by_type(CAP_LONER) |
2971 (get_caps_allowed_by_type(CAP_XLOCKER) & get_xlocker_mask(client));
2972 } else {
2973 allowed = get_caps_allowed_by_type(CAP_ANY);
2974 }
2975
2976 if (!is_dir()) {
2977 if ((file_i->inline_data.version != CEPH_INLINE_NONE &&
2978 !session->connection->has_feature(CEPH_FEATURE_MDS_INLINE_DATA)) ||
2979 (!file_i->layout.pool_ns.empty() &&
2980 !session->connection->has_feature(CEPH_FEATURE_FS_FILE_LAYOUT_V2)))
2981 allowed &= ~(CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR);
2982 }
2983 return allowed;
2984}
2985
2986// caps issued, wanted
2987int CInode::get_caps_issued(int *ploner, int *pother, int *pxlocker,
2988 int shift, int mask)
2989{
2990 int c = 0;
2991 int loner = 0, other = 0, xlocker = 0;
2992 if (!is_auth()) {
2993 loner_cap = -1;
2994 }
2995
2996 for (map<client_t,Capability*>::const_iterator it = client_caps.begin();
2997 it != client_caps.end();
2998 ++it) {
2999 int i = it->second->issued();
3000 c |= i;
3001 if (it->first == loner_cap)
3002 loner |= i;
3003 else
3004 other |= i;
3005 xlocker |= get_xlocker_mask(it->first) & i;
3006 }
3007 if (ploner) *ploner = (loner >> shift) & mask;
3008 if (pother) *pother = (other >> shift) & mask;
3009 if (pxlocker) *pxlocker = (xlocker >> shift) & mask;
3010 return (c >> shift) & mask;
3011}
3012
3013bool CInode::is_any_caps_wanted() const
3014{
3015 for (map<client_t,Capability*>::const_iterator it = client_caps.begin();
3016 it != client_caps.end();
3017 ++it)
3018 if (it->second->wanted())
3019 return true;
3020 return false;
3021}
3022
3023int CInode::get_caps_wanted(int *ploner, int *pother, int shift, int mask) const
3024{
3025 int w = 0;
3026 int loner = 0, other = 0;
3027 for (map<client_t,Capability*>::const_iterator it = client_caps.begin();
3028 it != client_caps.end();
3029 ++it) {
3030 if (!it->second->is_stale()) {
3031 int t = it->second->wanted();
3032 w |= t;
3033 if (it->first == loner_cap)
3034 loner |= t;
3035 else
3036 other |= t;
3037 }
3038 //cout << " get_caps_wanted client " << it->first << " " << cap_string(it->second.wanted()) << endl;
3039 }
3040 if (is_auth())
94b18763
FG
3041 for (const auto &p : mds_caps_wanted) {
3042 w |= p.second;
3043 other |= p.second;
7c673cae
FG
3044 //cout << " get_caps_wanted mds " << it->first << " " << cap_string(it->second) << endl;
3045 }
3046 if (ploner) *ploner = (loner >> shift) & mask;
3047 if (pother) *pother = (other >> shift) & mask;
3048 return (w >> shift) & mask;
3049}
3050
3051bool CInode::issued_caps_need_gather(SimpleLock *lock)
3052{
3053 int loner_issued, other_issued, xlocker_issued;
3054 get_caps_issued(&loner_issued, &other_issued, &xlocker_issued,
3055 lock->get_cap_shift(), lock->get_cap_mask());
3056 if ((loner_issued & ~lock->gcaps_allowed(CAP_LONER)) ||
3057 (other_issued & ~lock->gcaps_allowed(CAP_ANY)) ||
3058 (xlocker_issued & ~lock->gcaps_allowed(CAP_XLOCKER)))
3059 return true;
3060 return false;
3061}
3062
3063void CInode::replicate_relax_locks()
3064{
3065 //dout(10) << " relaxing locks on " << *this << dendl;
3066 assert(is_auth());
3067 assert(!is_replicated());
3068
3069 authlock.replicate_relax();
3070 linklock.replicate_relax();
3071 dirfragtreelock.replicate_relax();
3072 filelock.replicate_relax();
3073 xattrlock.replicate_relax();
3074 snaplock.replicate_relax();
3075 nestlock.replicate_relax();
3076 flocklock.replicate_relax();
3077 policylock.replicate_relax();
3078}
3079
3080
3081
3082// =============================================
3083
3084int CInode::encode_inodestat(bufferlist& bl, Session *session,
3085 SnapRealm *dir_realm,
3086 snapid_t snapid,
3087 unsigned max_bytes,
3088 int getattr_caps)
3089{
31f18b77 3090 client_t client = session->info.inst.name.num();
7c673cae
FG
3091 assert(snapid);
3092 assert(session->connection);
3093
3094 bool valid = true;
3095
3096 // pick a version!
94b18763
FG
3097 mempool_inode *oi = &inode;
3098 mempool_inode *pi = get_projected_inode();
7c673cae 3099
94b18763 3100 CInode::mempool_xattr_map *pxattrs = nullptr;
7c673cae
FG
3101
3102 if (snapid != CEPH_NOSNAP) {
3103
3104 // for now at least, old_inodes is only defined/valid on the auth
3105 if (!is_auth())
3106 valid = false;
3107
3108 if (is_multiversion()) {
94b18763
FG
3109 auto it = old_inodes.lower_bound(snapid);
3110 if (it != old_inodes.end()) {
3111 if (it->second.first > snapid) {
3112 if (it != old_inodes.begin())
3113 --it;
7c673cae 3114 }
94b18763
FG
3115 if (it->second.first <= snapid && snapid <= it->first) {
3116 dout(15) << __func__ << " snapid " << snapid
3117 << " to old_inode [" << it->second.first << "," << it->first << "]"
3118 << " " << it->second.inode.rstat
7c673cae 3119 << dendl;
94b18763
FG
3120 auto &p = it->second;
3121 pi = oi = &p.inode;
3122 pxattrs = &p.xattrs;
7c673cae
FG
3123 } else {
3124 // snapshoted remote dentry can result this
3125 dout(0) << "encode_inodestat old_inode for snapid " << snapid
3126 << " not found" << dendl;
3127 }
3128 }
3129 } else if (snapid < first || snapid > last) {
3130 // snapshoted remote dentry can result this
3131 dout(0) << "encode_inodestat [" << first << "," << last << "]"
3132 << " not match snapid " << snapid << dendl;
3133 }
3134 }
3135
3136 SnapRealm *realm = find_snaprealm();
3137
3138 bool no_caps = !valid ||
3139 session->is_stale() ||
3140 (dir_realm && realm != dir_realm) ||
3141 is_frozen() ||
3142 state_test(CInode::STATE_EXPORTINGCAPS);
3143 if (no_caps)
3144 dout(20) << "encode_inodestat no caps"
3145 << (!valid?", !valid":"")
3146 << (session->is_stale()?", session stale ":"")
3147 << ((dir_realm && realm != dir_realm)?", snaprealm differs ":"")
3148 << (is_frozen()?", frozen inode":"")
3149 << (state_test(CInode::STATE_EXPORTINGCAPS)?", exporting caps":"")
3150 << dendl;
3151
3152
3153 // "fake" a version that is old (stable) version, +1 if projected.
3154 version_t version = (oi->version * 2) + is_projected();
3155
3156 Capability *cap = get_client_cap(client);
3157 bool pfile = filelock.is_xlocked_by_client(client) || get_loner() == client;
3158 //(cap && (cap->issued() & CEPH_CAP_FILE_EXCL));
3159 bool pauth = authlock.is_xlocked_by_client(client) || get_loner() == client;
3160 bool plink = linklock.is_xlocked_by_client(client) || get_loner() == client;
3161 bool pxattr = xattrlock.is_xlocked_by_client(client) || get_loner() == client;
3162
3163 bool plocal = versionlock.get_last_wrlock_client() == client;
3164 bool ppolicy = policylock.is_xlocked_by_client(client) || get_loner()==client;
3165
94b18763 3166 mempool_inode *any_i = (pfile|pauth|plink|pxattr|plocal) ? pi : oi;
7c673cae
FG
3167
3168 dout(20) << " pfile " << pfile << " pauth " << pauth
3169 << " plink " << plink << " pxattr " << pxattr
3170 << " plocal " << plocal
3171 << " ctime " << any_i->ctime
3172 << " valid=" << valid << dendl;
3173
3174 // file
94b18763 3175 mempool_inode *file_i = pfile ? pi:oi;
7c673cae
FG
3176 file_layout_t layout;
3177 if (is_dir()) {
3178 layout = (ppolicy ? pi : oi)->layout;
3179 } else {
3180 layout = file_i->layout;
3181 }
3182
3183 // max_size is min of projected, actual
3184 uint64_t max_size =
3185 MIN(oi->client_ranges.count(client) ?
3186 oi->client_ranges[client].range.last : 0,
3187 pi->client_ranges.count(client) ?
3188 pi->client_ranges[client].range.last : 0);
3189
3190 // inline data
3191 version_t inline_version = 0;
3192 bufferlist inline_data;
3193 if (file_i->inline_data.version == CEPH_INLINE_NONE) {
3194 inline_version = CEPH_INLINE_NONE;
3195 } else if ((!cap && !no_caps) ||
3196 (cap && cap->client_inline_version < file_i->inline_data.version) ||
3197 (getattr_caps & CEPH_CAP_FILE_RD)) { // client requests inline data
3198 inline_version = file_i->inline_data.version;
3199 if (file_i->inline_data.length() > 0)
3200 inline_data = file_i->inline_data.get_data();
3201 }
3202
3203 // nest (do same as file... :/)
3204 if (cap) {
3205 cap->last_rbytes = file_i->rstat.rbytes;
3206 cap->last_rsize = file_i->rstat.rsize();
3207 }
3208
3209 // auth
94b18763 3210 mempool_inode *auth_i = pauth ? pi:oi;
7c673cae
FG
3211
3212 // link
94b18763 3213 mempool_inode *link_i = plink ? pi:oi;
7c673cae
FG
3214
3215 // xattr
94b18763 3216 mempool_inode *xattr_i = pxattr ? pi:oi;
7c673cae
FG
3217
3218 // xattr
3219 bufferlist xbl;
3220 version_t xattr_version;
3221 if ((!cap && !no_caps) ||
3222 (cap && cap->client_xattr_version < xattr_i->xattr_version) ||
3223 (getattr_caps & CEPH_CAP_XATTR_SHARED)) { // client requests xattrs
3224 if (!pxattrs)
3225 pxattrs = pxattr ? get_projected_xattrs() : &xattrs;
3226 ::encode(*pxattrs, xbl);
3227 xattr_version = xattr_i->xattr_version;
3228 } else {
3229 xattr_version = 0;
3230 }
3231
3232 // do we have room?
3233 if (max_bytes) {
3234 unsigned bytes = 8 + 8 + 4 + 8 + 8 + sizeof(ceph_mds_reply_cap) +
3235 sizeof(struct ceph_file_layout) + 4 + layout.pool_ns.size() +
3236 sizeof(struct ceph_timespec) * 3 +
3237 4 + 8 + 8 + 8 + 4 + 4 + 4 + 4 + 4 +
3238 8 + 8 + 8 + 8 + 8 + sizeof(struct ceph_timespec) +
3239 4;
3240 bytes += sizeof(__u32);
3241 bytes += (sizeof(__u32) + sizeof(__u32)) * dirfragtree._splits.size();
3242 bytes += sizeof(__u32) + symlink.length();
3243 bytes += sizeof(__u32) + xbl.length();
3244 bytes += sizeof(version_t) + sizeof(__u32) + inline_data.length();
3245 if (bytes > max_bytes)
3246 return -ENOSPC;
3247 }
3248
3249
3250 // encode caps
3251 struct ceph_mds_reply_cap ecap;
3252 if (snapid != CEPH_NOSNAP) {
3253 /*
3254 * snapped inodes (files or dirs) only get read-only caps. always
3255 * issue everything possible, since it is read only.
3256 *
3257 * if a snapped inode has caps, limit issued caps based on the
3258 * lock state.
3259 *
3260 * if it is a live inode, limit issued caps based on the lock
3261 * state.
3262 *
3263 * do NOT adjust cap issued state, because the client always
3264 * tracks caps per-snap and the mds does either per-interval or
3265 * multiversion.
3266 */
3267 ecap.caps = valid ? get_caps_allowed_by_type(CAP_ANY) : CEPH_STAT_CAP_INODE;
3268 if (last == CEPH_NOSNAP || is_any_caps())
3269 ecap.caps = ecap.caps & get_caps_allowed_for_client(session, file_i);
3270 ecap.seq = 0;
3271 ecap.mseq = 0;
3272 ecap.realm = 0;
3273 } else {
3274 if (!no_caps && !cap) {
3275 // add a new cap
3276 cap = add_client_cap(client, session, realm);
b32b8144
FG
3277 if (is_auth())
3278 choose_ideal_loner();
7c673cae
FG
3279 }
3280
3281 int issue = 0;
3282 if (!no_caps && cap) {
3283 int likes = get_caps_liked();
3284 int allowed = get_caps_allowed_for_client(session, file_i);
3285 issue = (cap->wanted() | likes) & allowed;
3286 cap->issue_norevoke(issue);
3287 issue = cap->pending();
3288 dout(10) << "encode_inodestat issuing " << ccap_string(issue)
3289 << " seq " << cap->get_last_seq() << dendl;
3290 } else if (cap && cap->is_new() && !dir_realm) {
3291 // alway issue new caps to client, otherwise the caps get lost
3292 assert(cap->is_stale());
3293 issue = cap->pending() | CEPH_CAP_PIN;
3294 cap->issue_norevoke(issue);
3295 dout(10) << "encode_inodestat issuing " << ccap_string(issue)
3296 << " seq " << cap->get_last_seq()
3297 << "(stale|new caps)" << dendl;
3298 }
3299
3300 if (issue) {
3301 cap->set_last_issue();
3302 cap->set_last_issue_stamp(ceph_clock_now());
3303 cap->clear_new();
3304 ecap.caps = issue;
3305 ecap.wanted = cap->wanted();
3306 ecap.cap_id = cap->get_cap_id();
3307 ecap.seq = cap->get_last_seq();
3308 ecap.mseq = cap->get_mseq();
3309 ecap.realm = realm->inode->ino();
3310 } else {
3311 ecap.cap_id = 0;
3312 ecap.caps = 0;
3313 ecap.seq = 0;
3314 ecap.mseq = 0;
3315 ecap.realm = 0;
3316 ecap.wanted = 0;
3317 }
3318 }
3319 ecap.flags = is_auth() ? CEPH_CAP_FLAG_AUTH : 0;
3320 dout(10) << "encode_inodestat caps " << ccap_string(ecap.caps)
3321 << " seq " << ecap.seq << " mseq " << ecap.mseq
3322 << " xattrv " << xattr_version << " len " << xbl.length()
3323 << dendl;
3324
3325 if (inline_data.length() && cap) {
3326 if ((cap->pending() | getattr_caps) & CEPH_CAP_FILE_SHARED) {
3327 dout(10) << "including inline version " << inline_version << dendl;
3328 cap->client_inline_version = inline_version;
3329 } else {
3330 dout(10) << "dropping inline version " << inline_version << dendl;
3331 inline_version = 0;
3332 inline_data.clear();
3333 }
3334 }
3335
3336 // include those xattrs?
3337 if (xbl.length() && cap) {
3338 if ((cap->pending() | getattr_caps) & CEPH_CAP_XATTR_SHARED) {
3339 dout(10) << "including xattrs version " << xattr_i->xattr_version << dendl;
3340 cap->client_xattr_version = xattr_i->xattr_version;
3341 } else {
3342 dout(10) << "dropping xattrs version " << xattr_i->xattr_version << dendl;
3343 xbl.clear(); // no xattrs .. XXX what's this about?!?
3344 xattr_version = 0;
3345 }
3346 }
3347
3348 /*
3349 * note: encoding matches MClientReply::InodeStat
3350 */
3351 ::encode(oi->ino, bl);
3352 ::encode(snapid, bl);
3353 ::encode(oi->rdev, bl);
3354 ::encode(version, bl);
3355
3356 ::encode(xattr_version, bl);
3357
3358 ::encode(ecap, bl);
3359 {
3360 ceph_file_layout legacy_layout;
3361 layout.to_legacy(&legacy_layout);
3362 ::encode(legacy_layout, bl);
3363 }
3364 ::encode(any_i->ctime, bl);
3365 ::encode(file_i->mtime, bl);
3366 ::encode(file_i->atime, bl);
3367 ::encode(file_i->time_warp_seq, bl);
3368 ::encode(file_i->size, bl);
3369 ::encode(max_size, bl);
3370 ::encode(file_i->truncate_size, bl);
3371 ::encode(file_i->truncate_seq, bl);
3372
3373 ::encode(auth_i->mode, bl);
3374 ::encode((uint32_t)auth_i->uid, bl);
3375 ::encode((uint32_t)auth_i->gid, bl);
3376
3377 ::encode(link_i->nlink, bl);
3378
3379 ::encode(file_i->dirstat.nfiles, bl);
3380 ::encode(file_i->dirstat.nsubdirs, bl);
3381 ::encode(file_i->rstat.rbytes, bl);
3382 ::encode(file_i->rstat.rfiles, bl);
3383 ::encode(file_i->rstat.rsubdirs, bl);
3384 ::encode(file_i->rstat.rctime, bl);
3385
3386 dirfragtree.encode(bl);
3387
3388 ::encode(symlink, bl);
3389 if (session->connection->has_feature(CEPH_FEATURE_DIRLAYOUTHASH)) {
3390 ::encode(file_i->dir_layout, bl);
3391 }
3392 ::encode(xbl, bl);
3393 if (session->connection->has_feature(CEPH_FEATURE_MDS_INLINE_DATA)) {
3394 ::encode(inline_version, bl);
3395 ::encode(inline_data, bl);
3396 }
3397 if (session->connection->has_feature(CEPH_FEATURE_MDS_QUOTA)) {
94b18763 3398 mempool_inode *policy_i = ppolicy ? pi : oi;
7c673cae
FG
3399 ::encode(policy_i->quota, bl);
3400 }
3401 if (session->connection->has_feature(CEPH_FEATURE_FS_FILE_LAYOUT_V2)) {
3402 ::encode(layout.pool_ns, bl);
3403 }
3404 if (session->connection->has_feature(CEPH_FEATURE_FS_BTIME)) {
3405 ::encode(any_i->btime, bl);
3406 ::encode(any_i->change_attr, bl);
3407 }
3408
3409 return valid;
3410}
3411
3412void CInode::encode_cap_message(MClientCaps *m, Capability *cap)
3413{
3414 assert(cap);
3415
3416 client_t client = cap->get_client();
3417
3418 bool pfile = filelock.is_xlocked_by_client(client) || (cap->issued() & CEPH_CAP_FILE_EXCL);
3419 bool pauth = authlock.is_xlocked_by_client(client);
3420 bool plink = linklock.is_xlocked_by_client(client);
3421 bool pxattr = xattrlock.is_xlocked_by_client(client);
3422
94b18763
FG
3423 mempool_inode *oi = &inode;
3424 mempool_inode *pi = get_projected_inode();
3425 mempool_inode *i = (pfile|pauth|plink|pxattr) ? pi : oi;
7c673cae
FG
3426
3427 dout(20) << "encode_cap_message pfile " << pfile
3428 << " pauth " << pauth << " plink " << plink << " pxattr " << pxattr
3429 << " ctime " << i->ctime << dendl;
3430
3431 i = pfile ? pi:oi;
3432 m->set_layout(i->layout);
3433 m->size = i->size;
3434 m->truncate_seq = i->truncate_seq;
3435 m->truncate_size = i->truncate_size;
3436 m->mtime = i->mtime;
3437 m->atime = i->atime;
3438 m->ctime = i->ctime;
3439 m->change_attr = i->change_attr;
3440 m->time_warp_seq = i->time_warp_seq;
28e407b8
AA
3441 m->nfiles = i->dirstat.nfiles;
3442 m->nsubdirs = i->dirstat.nsubdirs;
7c673cae
FG
3443
3444 if (cap->client_inline_version < i->inline_data.version) {
3445 m->inline_version = cap->client_inline_version = i->inline_data.version;
3446 if (i->inline_data.length() > 0)
3447 m->inline_data = i->inline_data.get_data();
3448 } else {
3449 m->inline_version = 0;
3450 }
3451
3452 // max_size is min of projected, actual.
3453 uint64_t oldms = oi->client_ranges.count(client) ? oi->client_ranges[client].range.last : 0;
3454 uint64_t newms = pi->client_ranges.count(client) ? pi->client_ranges[client].range.last : 0;
3455 m->max_size = MIN(oldms, newms);
3456
3457 i = pauth ? pi:oi;
3458 m->head.mode = i->mode;
3459 m->head.uid = i->uid;
3460 m->head.gid = i->gid;
3461
3462 i = plink ? pi:oi;
3463 m->head.nlink = i->nlink;
3464
3465 i = pxattr ? pi:oi;
94b18763 3466 auto ix = pxattr ? get_projected_xattrs() : &xattrs;
7c673cae
FG
3467 if ((cap->pending() & CEPH_CAP_XATTR_SHARED) &&
3468 i->xattr_version > cap->client_xattr_version) {
3469 dout(10) << " including xattrs v " << i->xattr_version << dendl;
3470 ::encode(*ix, m->xattrbl);
3471 m->head.xattr_version = i->xattr_version;
3472 cap->client_xattr_version = i->xattr_version;
3473 }
3474}
3475
3476
3477
3478void CInode::_encode_base(bufferlist& bl, uint64_t features)
3479{
3480 ::encode(first, bl);
3481 ::encode(inode, bl, features);
3482 ::encode(symlink, bl);
3483 ::encode(dirfragtree, bl);
3484 ::encode(xattrs, bl);
3485 ::encode(old_inodes, bl, features);
3486 ::encode(damage_flags, bl);
3487 encode_snap(bl);
3488}
3489void CInode::_decode_base(bufferlist::iterator& p)
3490{
3491 ::decode(first, p);
3492 ::decode(inode, p);
94b18763
FG
3493 {
3494 std::string tmp;
3495 ::decode(tmp, p);
3496 symlink = mempool::mds_co::string(boost::string_view(tmp));
3497 }
7c673cae
FG
3498 ::decode(dirfragtree, p);
3499 ::decode(xattrs, p);
3500 ::decode(old_inodes, p);
3501 ::decode(damage_flags, p);
3502 decode_snap(p);
3503}
3504
3505void CInode::_encode_locks_full(bufferlist& bl)
3506{
3507 ::encode(authlock, bl);
3508 ::encode(linklock, bl);
3509 ::encode(dirfragtreelock, bl);
3510 ::encode(filelock, bl);
3511 ::encode(xattrlock, bl);
3512 ::encode(snaplock, bl);
3513 ::encode(nestlock, bl);
3514 ::encode(flocklock, bl);
3515 ::encode(policylock, bl);
3516
3517 ::encode(loner_cap, bl);
3518}
3519void CInode::_decode_locks_full(bufferlist::iterator& p)
3520{
3521 ::decode(authlock, p);
3522 ::decode(linklock, p);
3523 ::decode(dirfragtreelock, p);
3524 ::decode(filelock, p);
3525 ::decode(xattrlock, p);
3526 ::decode(snaplock, p);
3527 ::decode(nestlock, p);
3528 ::decode(flocklock, p);
3529 ::decode(policylock, p);
3530
3531 ::decode(loner_cap, p);
3532 set_loner_cap(loner_cap);
3533 want_loner_cap = loner_cap; // for now, we'll eval() shortly.
3534}
3535
b32b8144 3536void CInode::_encode_locks_state_for_replica(bufferlist& bl, bool need_recover)
7c673cae
FG
3537{
3538 authlock.encode_state_for_replica(bl);
3539 linklock.encode_state_for_replica(bl);
3540 dirfragtreelock.encode_state_for_replica(bl);
3541 filelock.encode_state_for_replica(bl);
3542 nestlock.encode_state_for_replica(bl);
3543 xattrlock.encode_state_for_replica(bl);
3544 snaplock.encode_state_for_replica(bl);
3545 flocklock.encode_state_for_replica(bl);
3546 policylock.encode_state_for_replica(bl);
b32b8144 3547 ::encode(need_recover, bl);
7c673cae 3548}
b32b8144 3549
7c673cae
FG
3550void CInode::_encode_locks_state_for_rejoin(bufferlist& bl, int rep)
3551{
3552 authlock.encode_state_for_replica(bl);
3553 linklock.encode_state_for_replica(bl);
3554 dirfragtreelock.encode_state_for_rejoin(bl, rep);
3555 filelock.encode_state_for_rejoin(bl, rep);
3556 nestlock.encode_state_for_rejoin(bl, rep);
3557 xattrlock.encode_state_for_replica(bl);
3558 snaplock.encode_state_for_replica(bl);
3559 flocklock.encode_state_for_replica(bl);
3560 policylock.encode_state_for_replica(bl);
3561}
b32b8144 3562
7c673cae
FG
3563void CInode::_decode_locks_state(bufferlist::iterator& p, bool is_new)
3564{
3565 authlock.decode_state(p, is_new);
3566 linklock.decode_state(p, is_new);
3567 dirfragtreelock.decode_state(p, is_new);
3568 filelock.decode_state(p, is_new);
3569 nestlock.decode_state(p, is_new);
3570 xattrlock.decode_state(p, is_new);
3571 snaplock.decode_state(p, is_new);
3572 flocklock.decode_state(p, is_new);
3573 policylock.decode_state(p, is_new);
b32b8144
FG
3574
3575 bool need_recover;
3576 ::decode(need_recover, p);
3577 if (need_recover && is_new) {
3578 // Auth mds replicated this inode while it's recovering. Auth mds may take xlock on the lock
3579 // and change the object when replaying unsafe requests.
3580 authlock.mark_need_recover();
3581 linklock.mark_need_recover();
3582 dirfragtreelock.mark_need_recover();
3583 filelock.mark_need_recover();
3584 nestlock.mark_need_recover();
3585 xattrlock.mark_need_recover();
3586 snaplock.mark_need_recover();
3587 flocklock.mark_need_recover();
3588 policylock.mark_need_recover();
3589 }
7c673cae
FG
3590}
3591void CInode::_decode_locks_rejoin(bufferlist::iterator& p, list<MDSInternalContextBase*>& waiters,
b32b8144
FG
3592 list<SimpleLock*>& eval_locks, bool survivor)
3593{
3594 authlock.decode_state_rejoin(p, waiters, survivor);
3595 linklock.decode_state_rejoin(p, waiters, survivor);
3596 dirfragtreelock.decode_state_rejoin(p, waiters, survivor);
3597 filelock.decode_state_rejoin(p, waiters, survivor);
3598 nestlock.decode_state_rejoin(p, waiters, survivor);
3599 xattrlock.decode_state_rejoin(p, waiters, survivor);
3600 snaplock.decode_state_rejoin(p, waiters, survivor);
3601 flocklock.decode_state_rejoin(p, waiters, survivor);
3602 policylock.decode_state_rejoin(p, waiters, survivor);
7c673cae
FG
3603
3604 if (!dirfragtreelock.is_stable() && !dirfragtreelock.is_wrlocked())
3605 eval_locks.push_back(&dirfragtreelock);
3606 if (!filelock.is_stable() && !filelock.is_wrlocked())
3607 eval_locks.push_back(&filelock);
3608 if (!nestlock.is_stable() && !nestlock.is_wrlocked())
3609 eval_locks.push_back(&nestlock);
3610}
3611
3612
3613// IMPORT/EXPORT
3614
3615void CInode::encode_export(bufferlist& bl)
3616{
3617 ENCODE_START(5, 4, bl);
3618 _encode_base(bl, mdcache->mds->mdsmap->get_up_features());
3619
3620 ::encode(state, bl);
3621
3622 ::encode(pop, bl);
3623
181888fb 3624 ::encode(get_replicas(), bl);
7c673cae
FG
3625
3626 // include scatterlock info for any bounding CDirs
3627 bufferlist bounding;
3628 if (inode.is_dir())
94b18763
FG
3629 for (const auto &p : dirfrags) {
3630 CDir *dir = p.second;
7c673cae 3631 if (dir->state_test(CDir::STATE_EXPORTBOUND)) {
94b18763 3632 ::encode(p.first, bounding);
7c673cae
FG
3633 ::encode(dir->fnode.fragstat, bounding);
3634 ::encode(dir->fnode.accounted_fragstat, bounding);
3635 ::encode(dir->fnode.rstat, bounding);
3636 ::encode(dir->fnode.accounted_rstat, bounding);
3637 dout(10) << " encoded fragstat/rstat info for " << *dir << dendl;
3638 }
3639 }
3640 ::encode(bounding, bl);
3641
3642 _encode_locks_full(bl);
3643
3644 _encode_file_locks(bl);
3645
3646 ENCODE_FINISH(bl);
3647
3648 get(PIN_TEMPEXPORTING);
3649}
3650
3651void CInode::finish_export(utime_t now)
3652{
3653 state &= MASK_STATE_EXPORT_KEPT;
3654
3655 pop.zero(now);
3656
3657 // just in case!
3658 //dirlock.clear_updated();
3659
3660 loner_cap = -1;
3661
3662 put(PIN_TEMPEXPORTING);
3663}
3664
3665void CInode::decode_import(bufferlist::iterator& p,
3666 LogSegment *ls)
3667{
3668 DECODE_START(5, p);
3669
3670 _decode_base(p);
3671
3672 unsigned s;
3673 ::decode(s, p);
3674 state_set(STATE_AUTH | (s & MASK_STATE_EXPORTED));
3675
3676 if (is_dirty()) {
3677 get(PIN_DIRTY);
3678 _mark_dirty(ls);
3679 }
3680 if (is_dirty_parent()) {
3681 get(PIN_DIRTYPARENT);
28e407b8 3682 mark_dirty_parent(ls);
7c673cae
FG
3683 }
3684
3685 ::decode(pop, ceph_clock_now(), p);
3686
181888fb
FG
3687 ::decode(get_replicas(), p);
3688 if (is_replicated())
7c673cae
FG
3689 get(PIN_REPLICATED);
3690 replica_nonce = 0;
3691
3692 // decode fragstat info on bounding cdirs
3693 bufferlist bounding;
3694 ::decode(bounding, p);
3695 bufferlist::iterator q = bounding.begin();
3696 while (!q.end()) {
3697 frag_t fg;
3698 ::decode(fg, q);
3699 CDir *dir = get_dirfrag(fg);
3700 assert(dir); // we should have all bounds open
3701
3702 // Only take the remote's fragstat/rstat if we are non-auth for
3703 // this dirfrag AND the lock is NOT in a scattered (MIX) state.
3704 // We know lock is stable, and MIX is the only state in which
3705 // the inode auth (who sent us this data) may not have the best
3706 // info.
3707
3708 // HMM: Are there cases where dir->is_auth() is an insufficient
3709 // check because the dirfrag is under migration? That implies
3710 // it is frozen (and in a SYNC or LOCK state). FIXME.
3711
3712 if (dir->is_auth() ||
3713 filelock.get_state() == LOCK_MIX) {
3714 dout(10) << " skipped fragstat info for " << *dir << dendl;
3715 frag_info_t f;
3716 ::decode(f, q);
3717 ::decode(f, q);
3718 } else {
3719 ::decode(dir->fnode.fragstat, q);
3720 ::decode(dir->fnode.accounted_fragstat, q);
3721 dout(10) << " took fragstat info for " << *dir << dendl;
3722 }
3723 if (dir->is_auth() ||
3724 nestlock.get_state() == LOCK_MIX) {
3725 dout(10) << " skipped rstat info for " << *dir << dendl;
3726 nest_info_t n;
3727 ::decode(n, q);
3728 ::decode(n, q);
3729 } else {
3730 ::decode(dir->fnode.rstat, q);
3731 ::decode(dir->fnode.accounted_rstat, q);
3732 dout(10) << " took rstat info for " << *dir << dendl;
3733 }
3734 }
3735
3736 _decode_locks_full(p);
3737
3738 _decode_file_locks(p);
3739
3740 DECODE_FINISH(p);
3741}
3742
3743
3744void InodeStoreBase::dump(Formatter *f) const
3745{
3746 inode.dump(f);
3747 f->dump_string("symlink", symlink);
3748 f->open_array_section("old_inodes");
94b18763 3749 for (const auto &p : old_inodes) {
7c673cae 3750 f->open_object_section("old_inode");
94b18763
FG
3751 // The key is the last snapid, the first is in the mempool_old_inode
3752 f->dump_int("last", p.first);
3753 p.second.dump(f);
7c673cae
FG
3754 f->close_section(); // old_inode
3755 }
3756 f->close_section(); // old_inodes
3757
3758 f->open_object_section("dirfragtree");
3759 dirfragtree.dump(f);
3760 f->close_section(); // dirfragtree
3761}
3762
3763
3764void InodeStore::generate_test_instances(list<InodeStore*> &ls)
3765{
3766 InodeStore *populated = new InodeStore;
3767 populated->inode.ino = 0xdeadbeef;
3768 populated->symlink = "rhubarb";
3769 ls.push_back(populated);
3770}
3771
3772void CInode::validate_disk_state(CInode::validated_data *results,
3773 MDSInternalContext *fin)
3774{
3775 class ValidationContinuation : public MDSContinuation {
3776 public:
3777 MDSInternalContext *fin;
3778 CInode *in;
3779 CInode::validated_data *results;
3780 bufferlist bl;
3781 CInode *shadow_in;
3782
3783 enum {
3784 START = 0,
3785 BACKTRACE,
3786 INODE,
3787 DIRFRAGS
3788 };
3789
3790 ValidationContinuation(CInode *i,
3791 CInode::validated_data *data_r,
3792 MDSInternalContext *fin_) :
3793 MDSContinuation(i->mdcache->mds->server),
3794 fin(fin_),
3795 in(i),
3796 results(data_r),
3797 shadow_in(NULL) {
3798 set_callback(START, static_cast<Continuation::stagePtr>(&ValidationContinuation::_start));
3799 set_callback(BACKTRACE, static_cast<Continuation::stagePtr>(&ValidationContinuation::_backtrace));
3800 set_callback(INODE, static_cast<Continuation::stagePtr>(&ValidationContinuation::_inode_disk));
3801 set_callback(DIRFRAGS, static_cast<Continuation::stagePtr>(&ValidationContinuation::_dirfrags));
3802 }
3803
3804 ~ValidationContinuation() override {
b32b8144
FG
3805 if (shadow_in) {
3806 delete shadow_in;
3807 in->mdcache->num_shadow_inodes--;
3808 }
7c673cae
FG
3809 }
3810
3811 /**
3812 * Fetch backtrace and set tag if tag is non-empty
3813 */
94b18763 3814 void fetch_backtrace_and_tag(CInode *in, boost::string_view tag,
7c673cae
FG
3815 Context *fin, int *bt_r, bufferlist *bt)
3816 {
3817 const int64_t pool = in->get_backtrace_pool();
3818 object_t oid = CInode::get_object_name(in->ino(), frag_t(), "");
3819
3820 ObjectOperation fetch;
3821 fetch.getxattr("parent", bt, bt_r);
3822 in->mdcache->mds->objecter->read(oid, object_locator_t(pool), fetch, CEPH_NOSNAP,
3823 NULL, 0, fin);
3824 if (!tag.empty()) {
3825 ObjectOperation scrub_tag;
3826 bufferlist tag_bl;
3827 ::encode(tag, tag_bl);
3828 scrub_tag.setxattr("scrub_tag", tag_bl);
3829 SnapContext snapc;
3830 in->mdcache->mds->objecter->mutate(oid, object_locator_t(pool), scrub_tag, snapc,
3831 ceph::real_clock::now(),
3832 0, NULL);
3833 }
3834 }
3835
3836 bool _start(int rval) {
3837 if (in->is_dirty()) {
3838 MDCache *mdcache = in->mdcache;
94b18763 3839 mempool_inode& inode = in->inode;
7c673cae
FG
3840 dout(20) << "validating a dirty CInode; results will be inconclusive"
3841 << dendl;
3842 }
3843 if (in->is_symlink()) {
3844 // there's nothing to do for symlinks!
3845 return true;
3846 }
3847
3848 C_OnFinisher *conf = new C_OnFinisher(get_io_callback(BACKTRACE),
3849 in->mdcache->mds->finisher);
3850
3851 // Whether we have a tag to apply depends on ScrubHeader (if one is
3852 // present)
3853 if (in->scrub_infop) {
3854 // I'm a non-orphan, so look up my ScrubHeader via my linkage
94b18763 3855 boost::string_view tag = in->scrub_infop->header->get_tag();
7c673cae
FG
3856 // Rather than using the usual CInode::fetch_backtrace,
3857 // use a special variant that optionally writes a tag in the same
3858 // operation.
3859 fetch_backtrace_and_tag(in, tag, conf,
3860 &results->backtrace.ondisk_read_retval, &bl);
3861 } else {
3862 // When we're invoked outside of ScrubStack we might be called
3863 // on an orphaned inode like /
3864 fetch_backtrace_and_tag(in, {}, conf,
3865 &results->backtrace.ondisk_read_retval, &bl);
3866 }
3867 return false;
3868 }
3869
3870 bool _backtrace(int rval) {
3871 // set up basic result reporting and make sure we got the data
3872 results->performed_validation = true; // at least, some of it!
3873 results->backtrace.checked = true;
3874
3875 const int64_t pool = in->get_backtrace_pool();
3876 inode_backtrace_t& memory_backtrace = results->backtrace.memory_value;
3877 in->build_backtrace(pool, memory_backtrace);
3878 bool equivalent, divergent;
3879 int memory_newer;
3880
3881 MDCache *mdcache = in->mdcache; // For the benefit of dout
94b18763 3882 const mempool_inode& inode = in->inode; // For the benefit of dout
7c673cae
FG
3883
3884 // Ignore rval because it's the result of a FAILOK operation
3885 // from fetch_backtrace_and_tag: the real result is in
3886 // backtrace.ondisk_read_retval
3887 dout(20) << "ondisk_read_retval: " << results->backtrace.ondisk_read_retval << dendl;
3888 if (results->backtrace.ondisk_read_retval != 0) {
3889 results->backtrace.error_str << "failed to read off disk; see retval";
3890 goto next;
3891 }
3892
3893 // extract the backtrace, and compare it to a newly-constructed one
3894 try {
3895 bufferlist::iterator p = bl.begin();
3896 ::decode(results->backtrace.ondisk_value, p);
3897 dout(10) << "decoded " << bl.length() << " bytes of backtrace successfully" << dendl;
3898 } catch (buffer::error&) {
3899 if (results->backtrace.ondisk_read_retval == 0 && rval != 0) {
3900 // Cases where something has clearly gone wrong with the overall
3901 // fetch op, though we didn't get a nonzero rc from the getxattr
3902 // operation. e.g. object missing.
3903 results->backtrace.ondisk_read_retval = rval;
3904 }
3905 results->backtrace.error_str << "failed to decode on-disk backtrace ("
3906 << bl.length() << " bytes)!";
3907 goto next;
3908 }
3909
3910 memory_newer = memory_backtrace.compare(results->backtrace.ondisk_value,
3911 &equivalent, &divergent);
3912
3913 if (divergent || memory_newer < 0) {
3914 // we're divergent, or on-disk version is newer
3915 results->backtrace.error_str << "On-disk backtrace is divergent or newer";
3916 } else {
3917 results->backtrace.passed = true;
3918 }
3919next:
3920
3921 if (!results->backtrace.passed && in->scrub_infop->header->get_repair()) {
3922 std::string path;
3923 in->make_path_string(path);
d2e6a577
FG
3924 in->mdcache->mds->clog->warn() << "bad backtrace on inode " << in->ino()
3925 << "(" << path << "), rewriting it";
28e407b8 3926 in->mark_dirty_parent(in->mdcache->mds->mdlog->get_current_segment(),
7c673cae 3927 false);
b32b8144
FG
3928 // Flag that we repaired this BT so that it won't go into damagetable
3929 results->backtrace.repaired = true;
3930
3931 // Flag that we did some repair work so that our repair operation
3932 // can be flushed at end of scrub
3933 in->scrub_infop->header->set_repaired();
7c673cae
FG
3934 }
3935
3936 // If the inode's number was free in the InoTable, fix that
3937 // (#15619)
3938 {
3939 InoTable *inotable = mdcache->mds->inotable;
3940
d2e6a577 3941 dout(10) << "scrub: inotable ino = " << inode.ino << dendl;
7c673cae
FG
3942 dout(10) << "scrub: inotable free says "
3943 << inotable->is_marked_free(inode.ino) << dendl;
3944
3945 if (inotable->is_marked_free(inode.ino)) {
3946 LogChannelRef clog = in->mdcache->mds->clog;
3947 clog->error() << "scrub: inode wrongly marked free: 0x" << std::hex
3948 << inode.ino;
3949
3950 if (in->scrub_infop->header->get_repair()) {
3951 bool repaired = inotable->repair(inode.ino);
3952 if (repaired) {
3953 clog->error() << "inode table repaired for inode: 0x" << std::hex
3954 << inode.ino;
3955
3956 inotable->save();
3957 } else {
3958 clog->error() << "Cannot repair inotable while other operations"
3959 " are in progress";
3960 }
3961 }
3962 }
3963 }
3964
3965 // quit if we're a file, or kick off directory checks otherwise
3966 // TODO: validate on-disk inode for non-base directories
3967 if (!in->is_dir()) {
3968 return true;
3969 }
3970
3971 return validate_directory_data();
3972 }
3973
3974 bool validate_directory_data() {
3975 assert(in->is_dir());
3976
3977 if (in->is_base()) {
b32b8144
FG
3978 if (!shadow_in) {
3979 shadow_in = new CInode(in->mdcache);
3980 in->mdcache->create_unlinked_system_inode(shadow_in, in->inode.ino, in->inode.mode);
3981 in->mdcache->num_shadow_inodes++;
3982 }
7c673cae
FG
3983 shadow_in->fetch(get_internal_callback(INODE));
3984 return false;
3985 } else {
3986 results->inode.passed = true;
3987 return check_dirfrag_rstats();
3988 }
3989 }
3990
3991 bool _inode_disk(int rval) {
3992 results->inode.checked = true;
3993 results->inode.ondisk_read_retval = rval;
3994 results->inode.ondisk_value = shadow_in->inode;
3995 results->inode.memory_value = in->inode;
3996
94b18763
FG
3997 mempool_inode& si = shadow_in->inode;
3998 mempool_inode& i = in->inode;
7c673cae
FG
3999 if (si.version > i.version) {
4000 // uh, what?
4001 results->inode.error_str << "On-disk inode is newer than in-memory one!";
4002 goto next;
4003 } else {
4004 bool divergent = false;
4005 int r = i.compare(si, &divergent);
4006 results->inode.passed = !divergent && r >= 0;
4007 if (!results->inode.passed) {
4008 results->inode.error_str <<
4009 "On-disk inode is divergent or newer than in-memory one!";
4010 goto next;
4011 }
4012 }
4013next:
4014 return check_dirfrag_rstats();
4015 }
4016
4017 bool check_dirfrag_rstats() {
4018 MDSGatherBuilder gather(g_ceph_context);
4019 std::list<frag_t> frags;
4020 in->dirfragtree.get_leaves(frags);
4021 for (list<frag_t>::iterator p = frags.begin();
4022 p != frags.end();
4023 ++p) {
4024 CDir *dir = in->get_or_open_dirfrag(in->mdcache, *p);
4025 dir->scrub_info();
4026 if (!dir->scrub_infop->header)
4027 dir->scrub_infop->header = in->scrub_infop->header;
4028 if (dir->is_complete()) {
4029 dir->scrub_local();
4030 } else {
4031 dir->scrub_infop->need_scrub_local = true;
4032 dir->fetch(gather.new_sub(), false);
4033 }
4034 }
4035 if (gather.has_subs()) {
4036 gather.set_finisher(get_internal_callback(DIRFRAGS));
4037 gather.activate();
4038 return false;
4039 } else {
4040 return immediate(DIRFRAGS, 0);
4041 }
4042 }
4043
4044 bool _dirfrags(int rval) {
4045 int frags_errors = 0;
4046 // basic reporting setup
4047 results->raw_stats.checked = true;
4048 results->raw_stats.ondisk_read_retval = rval;
4049
4050 results->raw_stats.memory_value.dirstat = in->inode.dirstat;
4051 results->raw_stats.memory_value.rstat = in->inode.rstat;
4052 frag_info_t& dir_info = results->raw_stats.ondisk_value.dirstat;
4053 nest_info_t& nest_info = results->raw_stats.ondisk_value.rstat;
4054
4055 if (rval != 0) {
4056 results->raw_stats.error_str << "Failed to read dirfrags off disk";
4057 goto next;
4058 }
4059
4060 // check each dirfrag...
94b18763
FG
4061 for (const auto &p : in->dirfrags) {
4062 CDir *dir = p.second;
7c673cae
FG
4063 assert(dir->get_version() > 0);
4064 nest_info.add(dir->fnode.accounted_rstat);
4065 dir_info.add(dir->fnode.accounted_fragstat);
4066 if (dir->scrub_infop &&
4067 dir->scrub_infop->pending_scrub_error) {
4068 dir->scrub_infop->pending_scrub_error = false;
4069 if (dir->scrub_infop->header->get_repair()) {
b32b8144 4070 results->raw_stats.repaired = true;
7c673cae 4071 results->raw_stats.error_str
94b18763 4072 << "dirfrag(" << p.first << ") has bad stats (will be fixed); ";
7c673cae
FG
4073 } else {
4074 results->raw_stats.error_str
94b18763 4075 << "dirfrag(" << p.first << ") has bad stats; ";
7c673cae
FG
4076 }
4077 frags_errors++;
4078 }
4079 }
4080 nest_info.rsubdirs++; // it gets one to account for self
4081 // ...and that their sum matches our inode settings
4082 if (!dir_info.same_sums(in->inode.dirstat) ||
4083 !nest_info.same_sums(in->inode.rstat)) {
4084 if (in->scrub_infop &&
4085 in->scrub_infop->header->get_repair()) {
4086 results->raw_stats.error_str
4087 << "freshly-calculated rstats don't match existing ones (will be fixed)";
4088 in->mdcache->repair_inode_stats(in);
b32b8144 4089 results->raw_stats.repaired = true;
7c673cae
FG
4090 } else {
4091 results->raw_stats.error_str
4092 << "freshly-calculated rstats don't match existing ones";
4093 }
4094 goto next;
4095 }
4096 if (frags_errors > 0)
4097 goto next;
4098
4099 results->raw_stats.passed = true;
4100next:
4101 return true;
4102 }
4103
4104 void _done() override {
4105 if ((!results->raw_stats.checked || results->raw_stats.passed) &&
4106 (!results->backtrace.checked || results->backtrace.passed) &&
4107 (!results->inode.checked || results->inode.passed))
4108 results->passed_validation = true;
4109 if (fin) {
4110 fin->complete(get_rval());
4111 }
4112 }
4113 };
4114
4115
4116 dout(10) << "scrub starting validate_disk_state on " << *this << dendl;
4117 ValidationContinuation *vc = new ValidationContinuation(this,
4118 results,
4119 fin);
4120 vc->begin();
4121}
4122
4123void CInode::validated_data::dump(Formatter *f) const
4124{
4125 f->open_object_section("results");
4126 {
4127 f->dump_bool("performed_validation", performed_validation);
4128 f->dump_bool("passed_validation", passed_validation);
4129 f->open_object_section("backtrace");
4130 {
4131 f->dump_bool("checked", backtrace.checked);
4132 f->dump_bool("passed", backtrace.passed);
4133 f->dump_int("read_ret_val", backtrace.ondisk_read_retval);
4134 f->dump_stream("ondisk_value") << backtrace.ondisk_value;
4135 f->dump_stream("memoryvalue") << backtrace.memory_value;
4136 f->dump_string("error_str", backtrace.error_str.str());
4137 }
4138 f->close_section(); // backtrace
4139 f->open_object_section("raw_stats");
4140 {
4141 f->dump_bool("checked", raw_stats.checked);
4142 f->dump_bool("passed", raw_stats.passed);
4143 f->dump_int("read_ret_val", raw_stats.ondisk_read_retval);
4144 f->dump_stream("ondisk_value.dirstat") << raw_stats.ondisk_value.dirstat;
4145 f->dump_stream("ondisk_value.rstat") << raw_stats.ondisk_value.rstat;
4146 f->dump_stream("memory_value.dirrstat") << raw_stats.memory_value.dirstat;
4147 f->dump_stream("memory_value.rstat") << raw_stats.memory_value.rstat;
4148 f->dump_string("error_str", raw_stats.error_str.str());
4149 }
4150 f->close_section(); // raw_stats
4151 // dump failure return code
4152 int rc = 0;
4153 if (backtrace.checked && backtrace.ondisk_read_retval)
4154 rc = backtrace.ondisk_read_retval;
4155 if (inode.checked && inode.ondisk_read_retval)
4156 rc = inode.ondisk_read_retval;
4157 if (raw_stats.checked && raw_stats.ondisk_read_retval)
4158 rc = raw_stats.ondisk_read_retval;
4159 f->dump_int("return_code", rc);
4160 }
4161 f->close_section(); // results
4162}
4163
b32b8144
FG
4164bool CInode::validated_data::all_damage_repaired() const
4165{
4166 bool unrepaired =
4167 (raw_stats.checked && !raw_stats.passed && !raw_stats.repaired)
4168 ||
4169 (backtrace.checked && !backtrace.passed && !backtrace.repaired)
4170 ||
4171 (inode.checked && !inode.passed && !inode.repaired);
4172
4173 return !unrepaired;
4174}
4175
7c673cae
FG
4176void CInode::dump(Formatter *f) const
4177{
4178 InodeStoreBase::dump(f);
4179
4180 MDSCacheObject::dump(f);
4181
4182 f->open_object_section("versionlock");
4183 versionlock.dump(f);
4184 f->close_section();
4185
4186 f->open_object_section("authlock");
4187 authlock.dump(f);
4188 f->close_section();
4189
4190 f->open_object_section("linklock");
4191 linklock.dump(f);
4192 f->close_section();
4193
4194 f->open_object_section("dirfragtreelock");
4195 dirfragtreelock.dump(f);
4196 f->close_section();
4197
4198 f->open_object_section("filelock");
4199 filelock.dump(f);
4200 f->close_section();
4201
4202 f->open_object_section("xattrlock");
4203 xattrlock.dump(f);
4204 f->close_section();
4205
4206 f->open_object_section("snaplock");
4207 snaplock.dump(f);
4208 f->close_section();
4209
4210 f->open_object_section("nestlock");
4211 nestlock.dump(f);
4212 f->close_section();
4213
4214 f->open_object_section("flocklock");
4215 flocklock.dump(f);
4216 f->close_section();
4217
4218 f->open_object_section("policylock");
4219 policylock.dump(f);
4220 f->close_section();
4221
4222 f->open_array_section("states");
4223 MDSCacheObject::dump_states(f);
4224 if (state_test(STATE_EXPORTING))
4225 f->dump_string("state", "exporting");
4226 if (state_test(STATE_OPENINGDIR))
4227 f->dump_string("state", "openingdir");
4228 if (state_test(STATE_FREEZING))
4229 f->dump_string("state", "freezing");
4230 if (state_test(STATE_FROZEN))
4231 f->dump_string("state", "frozen");
4232 if (state_test(STATE_AMBIGUOUSAUTH))
4233 f->dump_string("state", "ambiguousauth");
4234 if (state_test(STATE_EXPORTINGCAPS))
4235 f->dump_string("state", "exportingcaps");
4236 if (state_test(STATE_NEEDSRECOVER))
4237 f->dump_string("state", "needsrecover");
4238 if (state_test(STATE_PURGING))
4239 f->dump_string("state", "purging");
4240 if (state_test(STATE_DIRTYPARENT))
4241 f->dump_string("state", "dirtyparent");
4242 if (state_test(STATE_DIRTYRSTAT))
4243 f->dump_string("state", "dirtyrstat");
4244 if (state_test(STATE_STRAYPINNED))
4245 f->dump_string("state", "straypinned");
4246 if (state_test(STATE_FROZENAUTHPIN))
4247 f->dump_string("state", "frozenauthpin");
4248 if (state_test(STATE_DIRTYPOOL))
4249 f->dump_string("state", "dirtypool");
4250 if (state_test(STATE_ORPHAN))
4251 f->dump_string("state", "orphan");
4252 if (state_test(STATE_MISSINGOBJS))
4253 f->dump_string("state", "missingobjs");
4254 f->close_section();
4255
4256 f->open_array_section("client_caps");
4257 for (map<client_t,Capability*>::const_iterator it = client_caps.begin();
4258 it != client_caps.end(); ++it) {
4259 f->open_object_section("client_cap");
4260 f->dump_int("client_id", it->first.v);
4261 f->dump_string("pending", ccap_string(it->second->pending()));
4262 f->dump_string("issued", ccap_string(it->second->issued()));
4263 f->dump_string("wanted", ccap_string(it->second->wanted()));
b32b8144 4264 f->dump_int("last_sent", it->second->get_last_sent());
7c673cae
FG
4265 f->close_section();
4266 }
4267 f->close_section();
4268
4269 f->dump_int("loner", loner_cap.v);
4270 f->dump_int("want_loner", want_loner_cap.v);
4271
4272 f->open_array_section("mds_caps_wanted");
94b18763 4273 for (const auto &p : mds_caps_wanted) {
7c673cae 4274 f->open_object_section("mds_cap_wanted");
94b18763
FG
4275 f->dump_int("rank", p.first);
4276 f->dump_string("cap", ccap_string(p.second));
7c673cae
FG
4277 f->close_section();
4278 }
4279 f->close_section();
4280}
4281
4282/****** Scrub Stuff *****/
4283void CInode::scrub_info_create() const
4284{
4285 dout(25) << __func__ << dendl;
4286 assert(!scrub_infop);
4287
4288 // break out of const-land to set up implicit initial state
4289 CInode *me = const_cast<CInode*>(this);
94b18763 4290 mempool_inode *in = me->get_projected_inode();
7c673cae
FG
4291
4292 scrub_info_t *si = new scrub_info_t();
4293 si->scrub_start_stamp = si->last_scrub_stamp = in->last_scrub_stamp;
4294 si->scrub_start_version = si->last_scrub_version = in->last_scrub_version;
4295
4296 me->scrub_infop = si;
4297}
4298
4299void CInode::scrub_maybe_delete_info()
4300{
4301 if (scrub_infop &&
4302 !scrub_infop->scrub_in_progress &&
4303 !scrub_infop->last_scrub_dirty) {
4304 delete scrub_infop;
4305 scrub_infop = NULL;
4306 }
4307}
4308
4309void CInode::scrub_initialize(CDentry *scrub_parent,
b32b8144 4310 ScrubHeaderRef& header,
7c673cae
FG
4311 MDSInternalContextBase *f)
4312{
4313 dout(20) << __func__ << " with scrub_version " << get_version() << dendl;
94b18763
FG
4314 if (scrub_is_in_progress()) {
4315 dout(20) << __func__ << " inode moved during scrub, reinitializing "
4316 << dendl;
4317 assert(scrub_infop->scrub_parent);
4318 CDentry *dn = scrub_infop->scrub_parent;
4319 CDir *dir = dn->dir;
4320 dn->put(CDentry::PIN_SCRUBPARENT);
4321 assert(dir->scrub_infop && dir->scrub_infop->directory_scrubbing);
4322 dir->scrub_infop->directories_scrubbing.erase(dn->key());
4323 dir->scrub_infop->others_scrubbing.erase(dn->key());
4324 }
7c673cae
FG
4325 scrub_info();
4326 if (!scrub_infop)
4327 scrub_infop = new scrub_info_t();
4328
4329 if (get_projected_inode()->is_dir()) {
4330 // fill in dirfrag_stamps with initial state
4331 std::list<frag_t> frags;
4332 dirfragtree.get_leaves(frags);
4333 for (std::list<frag_t>::iterator i = frags.begin();
4334 i != frags.end();
4335 ++i) {
4336 if (header->get_force())
4337 scrub_infop->dirfrag_stamps[*i].reset();
4338 else
4339 scrub_infop->dirfrag_stamps[*i];
4340 }
4341 }
4342
4343 if (scrub_parent)
4344 scrub_parent->get(CDentry::PIN_SCRUBPARENT);
4345 scrub_infop->scrub_parent = scrub_parent;
4346 scrub_infop->on_finish = f;
4347 scrub_infop->scrub_in_progress = true;
4348 scrub_infop->children_scrubbed = false;
4349 scrub_infop->header = header;
4350
4351 scrub_infop->scrub_start_version = get_version();
4352 scrub_infop->scrub_start_stamp = ceph_clock_now();
4353 // right now we don't handle remote inodes
4354}
4355
4356int CInode::scrub_dirfrag_next(frag_t* out_dirfrag)
4357{
4358 dout(20) << __func__ << dendl;
4359 assert(scrub_is_in_progress());
4360
4361 if (!is_dir()) {
4362 return -ENOTDIR;
4363 }
4364
4365 std::map<frag_t, scrub_stamp_info_t>::iterator i =
4366 scrub_infop->dirfrag_stamps.begin();
4367
4368 while (i != scrub_infop->dirfrag_stamps.end()) {
4369 if (i->second.scrub_start_version < scrub_infop->scrub_start_version) {
4370 i->second.scrub_start_version = get_projected_version();
4371 i->second.scrub_start_stamp = ceph_clock_now();
4372 *out_dirfrag = i->first;
4373 dout(20) << " return frag " << *out_dirfrag << dendl;
4374 return 0;
4375 }
4376 ++i;
4377 }
4378
4379 dout(20) << " no frags left, ENOENT " << dendl;
4380 return ENOENT;
4381}
4382
4383void CInode::scrub_dirfrags_scrubbing(list<frag_t>* out_dirfrags)
4384{
4385 assert(out_dirfrags != NULL);
4386 assert(scrub_infop != NULL);
4387
4388 out_dirfrags->clear();
4389 std::map<frag_t, scrub_stamp_info_t>::iterator i =
4390 scrub_infop->dirfrag_stamps.begin();
4391
4392 while (i != scrub_infop->dirfrag_stamps.end()) {
4393 if (i->second.scrub_start_version >= scrub_infop->scrub_start_version) {
4394 if (i->second.last_scrub_version < scrub_infop->scrub_start_version)
4395 out_dirfrags->push_back(i->first);
4396 } else {
4397 return;
4398 }
4399
4400 ++i;
4401 }
4402}
4403
4404void CInode::scrub_dirfrag_finished(frag_t dirfrag)
4405{
4406 dout(20) << __func__ << " on frag " << dirfrag << dendl;
4407 assert(scrub_is_in_progress());
4408
4409 std::map<frag_t, scrub_stamp_info_t>::iterator i =
4410 scrub_infop->dirfrag_stamps.find(dirfrag);
4411 assert(i != scrub_infop->dirfrag_stamps.end());
4412
4413 scrub_stamp_info_t &si = i->second;
4414 si.last_scrub_stamp = si.scrub_start_stamp;
4415 si.last_scrub_version = si.scrub_start_version;
4416}
4417
4418void CInode::scrub_finished(MDSInternalContextBase **c) {
4419 dout(20) << __func__ << dendl;
4420 assert(scrub_is_in_progress());
4421 for (std::map<frag_t, scrub_stamp_info_t>::iterator i =
4422 scrub_infop->dirfrag_stamps.begin();
4423 i != scrub_infop->dirfrag_stamps.end();
4424 ++i) {
4425 if(i->second.last_scrub_version != i->second.scrub_start_version) {
4426 derr << i->second.last_scrub_version << " != "
4427 << i->second.scrub_start_version << dendl;
4428 }
4429 assert(i->second.last_scrub_version == i->second.scrub_start_version);
4430 }
4431
4432 scrub_infop->last_scrub_version = scrub_infop->scrub_start_version;
4433 scrub_infop->last_scrub_stamp = scrub_infop->scrub_start_stamp;
4434 scrub_infop->last_scrub_dirty = true;
4435 scrub_infop->scrub_in_progress = false;
4436
4437 if (scrub_infop->scrub_parent) {
4438 CDentry *dn = scrub_infop->scrub_parent;
4439 scrub_infop->scrub_parent = NULL;
4440 dn->dir->scrub_dentry_finished(dn);
4441 dn->put(CDentry::PIN_SCRUBPARENT);
4442 }
4443
4444 *c = scrub_infop->on_finish;
4445 scrub_infop->on_finish = NULL;
4446
4447 if (scrub_infop->header->get_origin() == this) {
4448 // We are at the point that a tagging scrub was initiated
4449 LogChannelRef clog = mdcache->mds->clog;
b32b8144
FG
4450 if (scrub_infop->header->get_tag().empty()) {
4451 clog->info() << "scrub complete";
4452 } else {
4453 clog->info() << "scrub complete with tag '"
4454 << scrub_infop->header->get_tag() << "'";
4455 }
7c673cae
FG
4456 }
4457}
4458
4459int64_t CInode::get_backtrace_pool() const
4460{
4461 if (is_dir()) {
4462 return mdcache->mds->mdsmap->get_metadata_pool();
4463 } else {
4464 // Files are required to have an explicit layout that specifies
4465 // a pool
4466 assert(inode.layout.pool_id != -1);
4467 return inode.layout.pool_id;
4468 }
4469}
4470
31f18b77
FG
4471void CInode::maybe_export_pin(bool update)
4472{
4473 if (!g_conf->mds_bal_export_pin)
4474 return;
4475 if (!is_dir() || !is_normal())
4476 return;
7c673cae 4477
31f18b77
FG
4478 mds_rank_t export_pin = get_export_pin(false);
4479 if (export_pin == MDS_RANK_NONE && !update)
4480 return;
7c673cae 4481
31f18b77
FG
4482 if (state_test(CInode::STATE_QUEUEDEXPORTPIN))
4483 return;
4484
4485 bool queue = false;
4486 for (auto p = dirfrags.begin(); p != dirfrags.end(); p++) {
4487 CDir *dir = p->second;
4488 if (!dir->is_auth())
4489 continue;
4490 if (export_pin != MDS_RANK_NONE) {
4491 if (dir->is_subtree_root()) {
4492 // set auxsubtree bit or export it
4493 if (!dir->state_test(CDir::STATE_AUXSUBTREE) ||
4494 export_pin != dir->get_dir_auth().first)
4495 queue = true;
4496 } else {
4497 // create aux subtree or export it
4498 queue = true;
7c673cae 4499 }
31f18b77
FG
4500 } else {
4501 // clear aux subtrees ?
4502 queue = dir->state_test(CDir::STATE_AUXSUBTREE);
4503 }
4504 if (queue) {
4505 state_set(CInode::STATE_QUEUEDEXPORTPIN);
7c673cae 4506 mdcache->export_pin_queue.insert(this);
31f18b77 4507 break;
7c673cae
FG
4508 }
4509 }
4510}
4511
4512void CInode::set_export_pin(mds_rank_t rank)
4513{
4514 assert(is_dir());
4515 assert(is_projected());
4516 get_projected_inode()->export_pin = rank;
31f18b77 4517 maybe_export_pin(true);
7c673cae
FG
4518}
4519
4520mds_rank_t CInode::get_export_pin(bool inherit) const
4521{
4522 /* An inode that is export pinned may not necessarily be a subtree root, we
4523 * need to traverse the parents. A base or system inode cannot be pinned.
4524 * N.B. inodes not yet linked into a dir (i.e. anonymous inodes) will not
4525 * have a parent yet.
4526 */
b32b8144
FG
4527 const CInode *in = this;
4528 while (true) {
4529 if (in->is_system())
4530 break;
4531 const CDentry *pdn = in->get_projected_parent_dn();
4532 if (!pdn)
4533 break;
94b18763 4534 const mempool_inode *pi = in->get_projected_inode();
b32b8144
FG
4535 // ignore export pin for unlinked directory
4536 if (pi->nlink == 0)
4537 break;
4538 if (pi->export_pin >= 0)
4539 return pi->export_pin;
4540
4541 if (!inherit)
4542 break;
4543 in = pdn->get_dir()->inode;
7c673cae
FG
4544 }
4545 return MDS_RANK_NONE;
4546}
4547
4548bool CInode::is_exportable(mds_rank_t dest) const
4549{
4550 mds_rank_t pin = get_export_pin();
4551 if (pin == dest) {
4552 return true;
4553 } else if (pin >= 0) {
4554 return false;
4555 } else {
4556 return true;
4557 }
4558}
181888fb
FG
4559
4560MEMPOOL_DEFINE_OBJECT_FACTORY(CInode, co_inode, mds_co);