]> git.proxmox.com Git - ceph.git/blame - ceph/src/mds/CInode.cc
bump version to 12.0.3-pve3
[ceph.git] / ceph / src / mds / CInode.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include "include/int_types.h"
16#include "common/errno.h"
17
18#include <string>
19#include <stdio.h>
20
21#include "CInode.h"
22#include "CDir.h"
23#include "CDentry.h"
24
25#include "MDSRank.h"
26#include "MDCache.h"
27#include "MDLog.h"
28#include "Locker.h"
29#include "Mutation.h"
30
31#include "events/EUpdate.h"
32
33#include "osdc/Objecter.h"
34
35#include "snap.h"
36
37#include "LogSegment.h"
38
39#include "common/Clock.h"
40
41#include "messages/MLock.h"
42#include "messages/MClientCaps.h"
43
44#include "common/config.h"
45#include "global/global_context.h"
46#include "include/assert.h"
47
48#include "mds/MDSContinuation.h"
49#include "mds/InoTable.h"
50
51#define dout_context g_ceph_context
52#define dout_subsys ceph_subsys_mds
53#undef dout_prefix
54#define dout_prefix *_dout << "mds." << mdcache->mds->get_nodeid() << ".cache.ino(" << inode.ino << ") "
55
56
57class CInodeIOContext : public MDSIOContextBase
58{
59protected:
60 CInode *in;
61 MDSRank *get_mds() override {return in->mdcache->mds;}
62public:
63 explicit CInodeIOContext(CInode *in_) : in(in_) {
64 assert(in != NULL);
65 }
66};
67
68
69LockType CInode::versionlock_type(CEPH_LOCK_IVERSION);
70LockType CInode::authlock_type(CEPH_LOCK_IAUTH);
71LockType CInode::linklock_type(CEPH_LOCK_ILINK);
72LockType CInode::dirfragtreelock_type(CEPH_LOCK_IDFT);
73LockType CInode::filelock_type(CEPH_LOCK_IFILE);
74LockType CInode::xattrlock_type(CEPH_LOCK_IXATTR);
75LockType CInode::snaplock_type(CEPH_LOCK_ISNAP);
76LockType CInode::nestlock_type(CEPH_LOCK_INEST);
77LockType CInode::flocklock_type(CEPH_LOCK_IFLOCK);
78LockType CInode::policylock_type(CEPH_LOCK_IPOLICY);
79
80//int cinode_pins[CINODE_NUM_PINS]; // counts
81ostream& CInode::print_db_line_prefix(ostream& out)
82{
83 return out << ceph_clock_now() << " mds." << mdcache->mds->get_nodeid() << ".cache.ino(" << inode.ino << ") ";
84}
85
86/*
87 * write caps and lock ids
88 */
89struct cinode_lock_info_t cinode_lock_info[] = {
90 { CEPH_LOCK_IFILE, CEPH_CAP_ANY_FILE_WR },
91 { CEPH_LOCK_IAUTH, CEPH_CAP_AUTH_EXCL },
92 { CEPH_LOCK_ILINK, CEPH_CAP_LINK_EXCL },
93 { CEPH_LOCK_IXATTR, CEPH_CAP_XATTR_EXCL },
94};
95int num_cinode_locks = sizeof(cinode_lock_info) / sizeof(cinode_lock_info[0]);
96
97
98
99ostream& operator<<(ostream& out, const CInode& in)
100{
101 string path;
102 in.make_path_string(path, true);
103
104 out << "[inode " << in.inode.ino;
105 out << " ["
106 << (in.is_multiversion() ? "...":"")
107 << in.first << "," << in.last << "]";
108 out << " " << path << (in.is_dir() ? "/":"");
109
110 if (in.is_auth()) {
111 out << " auth";
112 if (in.is_replicated())
113 out << in.get_replicas();
114 } else {
115 mds_authority_t a = in.authority();
116 out << " rep@" << a.first;
117 if (a.second != CDIR_AUTH_UNKNOWN)
118 out << "," << a.second;
119 out << "." << in.get_replica_nonce();
120 }
121
122 if (in.is_symlink())
123 out << " symlink='" << in.symlink << "'";
124 if (in.is_dir() && !in.dirfragtree.empty())
125 out << " " << in.dirfragtree;
126
127 out << " v" << in.get_version();
128 if (in.get_projected_version() > in.get_version())
129 out << " pv" << in.get_projected_version();
130
131 if (in.is_auth_pinned()) {
132 out << " ap=" << in.get_num_auth_pins() << "+" << in.get_num_nested_auth_pins();
133#ifdef MDS_AUTHPIN_SET
134 out << "(" << in.auth_pin_set << ")";
135#endif
136 }
137
138 if (in.snaprealm)
139 out << " snaprealm=" << in.snaprealm;
140
141 if (in.state_test(CInode::STATE_AMBIGUOUSAUTH)) out << " AMBIGAUTH";
142 if (in.state_test(CInode::STATE_NEEDSRECOVER)) out << " needsrecover";
143 if (in.state_test(CInode::STATE_RECOVERING)) out << " recovering";
144 if (in.state_test(CInode::STATE_DIRTYPARENT)) out << " dirtyparent";
145 if (in.state_test(CInode::STATE_MISSINGOBJS)) out << " missingobjs";
146 if (in.is_freezing_inode()) out << " FREEZING=" << in.auth_pin_freeze_allowance;
147 if (in.is_frozen_inode()) out << " FROZEN";
148 if (in.is_frozen_auth_pin()) out << " FROZEN_AUTHPIN";
149
150 const inode_t *pi = in.get_projected_inode();
151 if (pi->is_truncating())
152 out << " truncating(" << pi->truncate_from << " to " << pi->truncate_size << ")";
153
154 if (in.inode.is_dir()) {
155 out << " " << in.inode.dirstat;
156 if (g_conf->mds_debug_scatterstat && in.is_projected()) {
157 const inode_t *pi = in.get_projected_inode();
158 out << "->" << pi->dirstat;
159 }
160 } else {
161 out << " s=" << in.inode.size;
162 if (in.inode.nlink != 1)
163 out << " nl=" << in.inode.nlink;
164 }
165
166 // rstat
167 out << " " << in.inode.rstat;
168 if (!(in.inode.rstat == in.inode.accounted_rstat))
169 out << "/" << in.inode.accounted_rstat;
170 if (g_conf->mds_debug_scatterstat && in.is_projected()) {
171 const inode_t *pi = in.get_projected_inode();
172 out << "->" << pi->rstat;
173 if (!(pi->rstat == pi->accounted_rstat))
174 out << "/" << pi->accounted_rstat;
175 }
176
177 if (!in.client_need_snapflush.empty())
178 out << " need_snapflush=" << in.client_need_snapflush;
179
180
181 // locks
182 if (!in.authlock.is_sync_and_unlocked())
183 out << " " << in.authlock;
184 if (!in.linklock.is_sync_and_unlocked())
185 out << " " << in.linklock;
186 if (in.inode.is_dir()) {
187 if (!in.dirfragtreelock.is_sync_and_unlocked())
188 out << " " << in.dirfragtreelock;
189 if (!in.snaplock.is_sync_and_unlocked())
190 out << " " << in.snaplock;
191 if (!in.nestlock.is_sync_and_unlocked())
192 out << " " << in.nestlock;
193 if (!in.policylock.is_sync_and_unlocked())
194 out << " " << in.policylock;
195 } else {
196 if (!in.flocklock.is_sync_and_unlocked())
197 out << " " << in.flocklock;
198 }
199 if (!in.filelock.is_sync_and_unlocked())
200 out << " " << in.filelock;
201 if (!in.xattrlock.is_sync_and_unlocked())
202 out << " " << in.xattrlock;
203 if (!in.versionlock.is_sync_and_unlocked())
204 out << " " << in.versionlock;
205
206 // hack: spit out crap on which clients have caps
207 if (in.inode.client_ranges.size())
208 out << " cr=" << in.inode.client_ranges;
209
210 if (!in.get_client_caps().empty()) {
211 out << " caps={";
212 for (map<client_t,Capability*>::const_iterator it = in.get_client_caps().begin();
213 it != in.get_client_caps().end();
214 ++it) {
215 if (it != in.get_client_caps().begin()) out << ",";
216 out << it->first << "="
217 << ccap_string(it->second->pending());
218 if (it->second->issued() != it->second->pending())
219 out << "/" << ccap_string(it->second->issued());
220 out << "/" << ccap_string(it->second->wanted())
221 << "@" << it->second->get_last_sent();
222 }
223 out << "}";
224 if (in.get_loner() >= 0 || in.get_wanted_loner() >= 0) {
225 out << ",l=" << in.get_loner();
226 if (in.get_loner() != in.get_wanted_loner())
227 out << "(" << in.get_wanted_loner() << ")";
228 }
229 }
230 if (!in.get_mds_caps_wanted().empty()) {
231 out << " mcw={";
232 for (compact_map<int,int>::const_iterator p = in.get_mds_caps_wanted().begin();
233 p != in.get_mds_caps_wanted().end();
234 ++p) {
235 if (p != in.get_mds_caps_wanted().begin())
236 out << ',';
237 out << p->first << '=' << ccap_string(p->second);
238 }
239 out << '}';
240 }
241
242 if (in.get_num_ref()) {
243 out << " |";
244 in.print_pin_set(out);
245 }
246
247 if (in.inode.export_pin != MDS_RANK_NONE) {
248 out << " export_pin=" << in.inode.export_pin;
249 }
250
251 out << " " << &in;
252 out << "]";
253 return out;
254}
255
256ostream& operator<<(ostream& out, const CInode::scrub_stamp_info_t& si)
257{
258 out << "{scrub_start_version: " << si.scrub_start_version
259 << ", scrub_start_stamp: " << si.scrub_start_stamp
260 << ", last_scrub_version: " << si.last_scrub_version
261 << ", last_scrub_stamp: " << si.last_scrub_stamp;
262 return out;
263}
264
265
266
267void CInode::print(ostream& out)
268{
269 out << *this;
270}
271
272
273
274void CInode::add_need_snapflush(CInode *snapin, snapid_t snapid, client_t client)
275{
276 dout(10) << "add_need_snapflush client." << client << " snapid " << snapid << " on " << snapin << dendl;
277
278 if (client_need_snapflush.empty()) {
279 get(CInode::PIN_NEEDSNAPFLUSH);
280
281 // FIXME: this is non-optimal, as we'll block freezes/migrations for potentially
282 // long periods waiting for clients to flush their snaps.
283 auth_pin(this); // pin head inode...
284 }
285
286 set<client_t>& clients = client_need_snapflush[snapid];
287 if (clients.empty())
288 snapin->auth_pin(this); // ...and pin snapped/old inode!
289
290 clients.insert(client);
291}
292
293void CInode::remove_need_snapflush(CInode *snapin, snapid_t snapid, client_t client)
294{
295 dout(10) << "remove_need_snapflush client." << client << " snapid " << snapid << " on " << snapin << dendl;
296 compact_map<snapid_t, std::set<client_t> >::iterator p = client_need_snapflush.find(snapid);
297 if (p == client_need_snapflush.end()) {
298 dout(10) << " snapid not found" << dendl;
299 return;
300 }
301 if (!p->second.count(client)) {
302 dout(10) << " client not found" << dendl;
303 return;
304 }
305 p->second.erase(client);
306 if (p->second.empty()) {
307 client_need_snapflush.erase(p);
308 snapin->auth_unpin(this);
309
310 if (client_need_snapflush.empty()) {
311 put(CInode::PIN_NEEDSNAPFLUSH);
312 auth_unpin(this);
313 }
314 }
315}
316
317bool CInode::split_need_snapflush(CInode *cowin, CInode *in)
318{
319 dout(10) << "split_need_snapflush [" << cowin->first << "," << cowin->last << "] for " << *cowin << dendl;
320 bool need_flush = false;
321 for (compact_map<snapid_t, set<client_t> >::iterator p = client_need_snapflush.lower_bound(cowin->first);
322 p != client_need_snapflush.end() && p->first < in->first; ) {
323 compact_map<snapid_t, set<client_t> >::iterator q = p;
324 ++p;
325 assert(!q->second.empty());
326 if (cowin->last >= q->first) {
327 cowin->auth_pin(this);
328 need_flush = true;
329 } else
330 client_need_snapflush.erase(q);
331 in->auth_unpin(this);
332 }
333 return need_flush;
334}
335
336void CInode::mark_dirty_rstat()
337{
338 if (!state_test(STATE_DIRTYRSTAT)) {
339 dout(10) << "mark_dirty_rstat" << dendl;
340 state_set(STATE_DIRTYRSTAT);
341 get(PIN_DIRTYRSTAT);
342 CDentry *dn = get_projected_parent_dn();
343 CDir *pdir = dn->dir;
344 pdir->dirty_rstat_inodes.push_back(&dirty_rstat_item);
345
346 mdcache->mds->locker->mark_updated_scatterlock(&pdir->inode->nestlock);
347 }
348}
349void CInode::clear_dirty_rstat()
350{
351 if (state_test(STATE_DIRTYRSTAT)) {
352 dout(10) << "clear_dirty_rstat" << dendl;
353 state_clear(STATE_DIRTYRSTAT);
354 put(PIN_DIRTYRSTAT);
355 dirty_rstat_item.remove_myself();
356 }
357}
358
359inode_t *CInode::project_inode(map<string,bufferptr> *px)
360{
361 if (projected_nodes.empty()) {
362 projected_nodes.push_back(new projected_inode_t(new inode_t(inode)));
363 if (px)
364 *px = xattrs;
365 } else {
366 projected_nodes.push_back(new projected_inode_t(
367 new inode_t(*projected_nodes.back()->inode)));
368 if (px)
369 *px = *get_projected_xattrs();
370 }
371
372 projected_inode_t &pi = *projected_nodes.back();
373
374 if (px) {
375 pi.xattrs = px;
376 ++num_projected_xattrs;
377 }
378
379 if (scrub_infop && scrub_infop->last_scrub_dirty) {
380 pi.inode->last_scrub_stamp = scrub_infop->last_scrub_stamp;
381 pi.inode->last_scrub_version = scrub_infop->last_scrub_version;
382 scrub_infop->last_scrub_dirty = false;
383 scrub_maybe_delete_info();
384 }
385 dout(15) << "project_inode " << pi.inode << dendl;
386 return pi.inode;
387}
388
389void CInode::pop_and_dirty_projected_inode(LogSegment *ls)
390{
391 assert(!projected_nodes.empty());
392 dout(15) << "pop_and_dirty_projected_inode " << projected_nodes.front()->inode
393 << " v" << projected_nodes.front()->inode->version << dendl;
394 int64_t old_pool = inode.layout.pool_id;
395
396 mark_dirty(projected_nodes.front()->inode->version, ls);
397 inode = *projected_nodes.front()->inode;
398
399 if (inode.is_backtrace_updated())
400 _mark_dirty_parent(ls, old_pool != inode.layout.pool_id);
401
402 map<string,bufferptr> *px = projected_nodes.front()->xattrs;
403 if (px) {
404 --num_projected_xattrs;
405 xattrs = *px;
406 delete px;
407 }
408
409 if (projected_nodes.front()->snapnode) {
410 pop_projected_snaprealm(projected_nodes.front()->snapnode);
411 --num_projected_srnodes;
412 }
413
414 delete projected_nodes.front()->inode;
415 delete projected_nodes.front();
416
417 projected_nodes.pop_front();
418}
419
420sr_t *CInode::project_snaprealm(snapid_t snapid)
421{
422 sr_t *cur_srnode = get_projected_srnode();
423 sr_t *new_srnode;
424
425 if (cur_srnode) {
426 new_srnode = new sr_t(*cur_srnode);
427 } else {
428 new_srnode = new sr_t();
429 new_srnode->created = snapid;
430 new_srnode->current_parent_since = get_oldest_snap();
431 }
432 dout(10) << "project_snaprealm " << new_srnode << dendl;
433 projected_nodes.back()->snapnode = new_srnode;
434 ++num_projected_srnodes;
435 return new_srnode;
436}
437
438/* if newparent != parent, add parent to past_parents
439 if parent DNE, we need to find what the parent actually is and fill that in */
440void CInode::project_past_snaprealm_parent(SnapRealm *newparent)
441{
442 sr_t *new_snap = project_snaprealm();
443 SnapRealm *oldparent;
444 if (!snaprealm) {
445 oldparent = find_snaprealm();
446 new_snap->seq = oldparent->get_newest_seq();
447 }
448 else
449 oldparent = snaprealm->parent;
450
451 if (newparent != oldparent) {
452 snapid_t oldparentseq = oldparent->get_newest_seq();
453 if (oldparentseq + 1 > new_snap->current_parent_since) {
454 new_snap->past_parents[oldparentseq].ino = oldparent->inode->ino();
455 new_snap->past_parents[oldparentseq].first = new_snap->current_parent_since;
456 }
457 new_snap->current_parent_since = MAX(oldparentseq, newparent->get_last_created()) + 1;
458 }
459}
460
461void CInode::pop_projected_snaprealm(sr_t *next_snaprealm)
462{
463 assert(next_snaprealm);
464 dout(10) << "pop_projected_snaprealm " << next_snaprealm
465 << " seq" << next_snaprealm->seq << dendl;
466 bool invalidate_cached_snaps = false;
467 if (!snaprealm) {
468 open_snaprealm();
469 } else if (next_snaprealm->past_parents.size() !=
470 snaprealm->srnode.past_parents.size()) {
471 invalidate_cached_snaps = true;
472 // re-open past parents
473 snaprealm->_close_parents();
474
475 dout(10) << " realm " << *snaprealm << " past_parents " << snaprealm->srnode.past_parents
476 << " -> " << next_snaprealm->past_parents << dendl;
477 }
478 snaprealm->srnode = *next_snaprealm;
479 delete next_snaprealm;
480
481 // we should be able to open these up (or have them already be open).
482 bool ok = snaprealm->_open_parents(NULL);
483 assert(ok);
484
485 if (invalidate_cached_snaps)
486 snaprealm->invalidate_cached_snaps();
487
488 if (snaprealm->parent)
489 dout(10) << " realm " << *snaprealm << " parent " << *snaprealm->parent << dendl;
490}
491
492
493// ====== CInode =======
494
495// dirfrags
496
497__u32 InodeStoreBase::hash_dentry_name(const string &dn)
498{
499 int which = inode.dir_layout.dl_dir_hash;
500 if (!which)
501 which = CEPH_STR_HASH_LINUX;
502 assert(ceph_str_hash_valid(which));
503 return ceph_str_hash(which, dn.data(), dn.length());
504}
505
506frag_t InodeStoreBase::pick_dirfrag(const string& dn)
507{
508 if (dirfragtree.empty())
509 return frag_t(); // avoid the string hash if we can.
510
511 __u32 h = hash_dentry_name(dn);
512 return dirfragtree[h];
513}
514
515bool CInode::get_dirfrags_under(frag_t fg, list<CDir*>& ls)
516{
517 bool all = true;
518 list<frag_t> fglist;
519 dirfragtree.get_leaves_under(fg, fglist);
520 for (list<frag_t>::iterator p = fglist.begin(); p != fglist.end(); ++p)
521 if (dirfrags.count(*p))
522 ls.push_back(dirfrags[*p]);
523 else
524 all = false;
525
526 if (all)
527 return all;
528
529 fragtree_t tmpdft;
530 tmpdft.force_to_leaf(g_ceph_context, fg);
531 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin(); p != dirfrags.end(); ++p) {
532 tmpdft.force_to_leaf(g_ceph_context, p->first);
533 if (fg.contains(p->first) && !dirfragtree.is_leaf(p->first))
534 ls.push_back(p->second);
535 }
536
537 all = true;
538 tmpdft.get_leaves_under(fg, fglist);
539 for (list<frag_t>::iterator p = fglist.begin(); p != fglist.end(); ++p)
540 if (!dirfrags.count(*p)) {
541 all = false;
542 break;
543 }
544
545 return all;
546}
547
548void CInode::verify_dirfrags()
549{
550 bool bad = false;
551 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin(); p != dirfrags.end(); ++p) {
552 if (!dirfragtree.is_leaf(p->first)) {
553 dout(0) << "have open dirfrag " << p->first << " but not leaf in " << dirfragtree
554 << ": " << *p->second << dendl;
555 bad = true;
556 }
557 }
558 assert(!bad);
559}
560
561void CInode::force_dirfrags()
562{
563 bool bad = false;
564 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin(); p != dirfrags.end(); ++p) {
565 if (!dirfragtree.is_leaf(p->first)) {
566 dout(0) << "have open dirfrag " << p->first << " but not leaf in " << dirfragtree
567 << ": " << *p->second << dendl;
568 bad = true;
569 }
570 }
571
572 if (bad) {
573 list<frag_t> leaves;
574 dirfragtree.get_leaves(leaves);
575 for (list<frag_t>::iterator p = leaves.begin(); p != leaves.end(); ++p)
576 mdcache->get_force_dirfrag(dirfrag_t(ino(),*p), true);
577 }
578
579 verify_dirfrags();
580}
581
582CDir *CInode::get_approx_dirfrag(frag_t fg)
583{
584 CDir *dir = get_dirfrag(fg);
585 if (dir) return dir;
586
587 // find a child?
588 list<CDir*> ls;
589 get_dirfrags_under(fg, ls);
590 if (!ls.empty())
591 return ls.front();
592
593 // try parents?
594 while (fg.bits() > 0) {
595 fg = fg.parent();
596 dir = get_dirfrag(fg);
597 if (dir) return dir;
598 }
599 return NULL;
600}
601
602void CInode::get_dirfrags(list<CDir*>& ls)
603{
604 // all dirfrags
605 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
606 p != dirfrags.end();
607 ++p)
608 ls.push_back(p->second);
609}
610void CInode::get_nested_dirfrags(list<CDir*>& ls)
611{
612 // dirfrags in same subtree
613 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
614 p != dirfrags.end();
615 ++p)
616 if (!p->second->is_subtree_root())
617 ls.push_back(p->second);
618}
619void CInode::get_subtree_dirfrags(list<CDir*>& ls)
620{
621 // dirfrags that are roots of new subtrees
622 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
623 p != dirfrags.end();
624 ++p)
625 if (p->second->is_subtree_root())
626 ls.push_back(p->second);
627}
628
629
630CDir *CInode::get_or_open_dirfrag(MDCache *mdcache, frag_t fg)
631{
632 assert(is_dir());
633
634 // have it?
635 CDir *dir = get_dirfrag(fg);
636 if (!dir) {
637 // create it.
638 assert(is_auth() || mdcache->mds->is_any_replay());
639 dir = new CDir(this, fg, mdcache, is_auth());
640 add_dirfrag(dir);
641 }
642 return dir;
643}
644
645CDir *CInode::add_dirfrag(CDir *dir)
646{
647 assert(dirfrags.count(dir->dirfrag().frag) == 0);
648 dirfrags[dir->dirfrag().frag] = dir;
649
650 if (stickydir_ref > 0) {
651 dir->state_set(CDir::STATE_STICKY);
652 dir->get(CDir::PIN_STICKY);
653 }
654
655 maybe_export_pin();
656
657 return dir;
658}
659
660void CInode::close_dirfrag(frag_t fg)
661{
662 dout(14) << "close_dirfrag " << fg << dendl;
663 assert(dirfrags.count(fg));
664
665 CDir *dir = dirfrags[fg];
666 dir->remove_null_dentries();
667
668 // clear dirty flag
669 if (dir->is_dirty())
670 dir->mark_clean();
671
672 if (stickydir_ref > 0) {
673 dir->state_clear(CDir::STATE_STICKY);
674 dir->put(CDir::PIN_STICKY);
675 }
676
677 // dump any remaining dentries, for debugging purposes
678 for (CDir::map_t::iterator p = dir->items.begin();
679 p != dir->items.end();
680 ++p)
681 dout(14) << "close_dirfrag LEFTOVER dn " << *p->second << dendl;
682
683 assert(dir->get_num_ref() == 0);
684 delete dir;
685 dirfrags.erase(fg);
686}
687
688void CInode::close_dirfrags()
689{
690 while (!dirfrags.empty())
691 close_dirfrag(dirfrags.begin()->first);
692}
693
694bool CInode::has_subtree_root_dirfrag(int auth)
695{
696 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
697 p != dirfrags.end();
698 ++p)
699 if (p->second->is_subtree_root() &&
700 (auth == -1 || p->second->dir_auth.first == auth))
701 return true;
702 return false;
703}
704
705bool CInode::has_subtree_or_exporting_dirfrag()
706{
707 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
708 p != dirfrags.end();
709 ++p)
710 if (p->second->is_subtree_root() ||
711 p->second->state_test(CDir::STATE_EXPORTING))
712 return true;
713 return false;
714}
715
716void CInode::get_stickydirs()
717{
718 if (stickydir_ref == 0) {
719 get(PIN_STICKYDIRS);
720 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
721 p != dirfrags.end();
722 ++p) {
723 p->second->state_set(CDir::STATE_STICKY);
724 p->second->get(CDir::PIN_STICKY);
725 }
726 }
727 stickydir_ref++;
728}
729
730void CInode::put_stickydirs()
731{
732 assert(stickydir_ref > 0);
733 stickydir_ref--;
734 if (stickydir_ref == 0) {
735 put(PIN_STICKYDIRS);
736 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
737 p != dirfrags.end();
738 ++p) {
739 p->second->state_clear(CDir::STATE_STICKY);
740 p->second->put(CDir::PIN_STICKY);
741 }
742 }
743}
744
745
746
747
748
749// pins
750
751void CInode::first_get()
752{
753 // pin my dentry?
754 if (parent)
755 parent->get(CDentry::PIN_INODEPIN);
756}
757
758void CInode::last_put()
759{
760 // unpin my dentry?
761 if (parent)
762 parent->put(CDentry::PIN_INODEPIN);
763}
764
765void CInode::_put()
766{
767 if (get_num_ref() == (int)is_dirty() + (int)is_dirty_parent())
768 mdcache->maybe_eval_stray(this, true);
769}
770
771void CInode::add_remote_parent(CDentry *p)
772{
773 if (remote_parents.empty())
774 get(PIN_REMOTEPARENT);
775 remote_parents.insert(p);
776}
777void CInode::remove_remote_parent(CDentry *p)
778{
779 remote_parents.erase(p);
780 if (remote_parents.empty())
781 put(PIN_REMOTEPARENT);
782}
783
784
785
786
787CDir *CInode::get_parent_dir()
788{
789 if (parent)
790 return parent->dir;
791 return NULL;
792}
793CDir *CInode::get_projected_parent_dir()
794{
795 CDentry *p = get_projected_parent_dn();
796 if (p)
797 return p->dir;
798 return NULL;
799}
800CInode *CInode::get_parent_inode()
801{
802 if (parent)
803 return parent->dir->inode;
804 return NULL;
805}
806
807bool CInode::is_projected_ancestor_of(CInode *other)
808{
809 while (other) {
810 if (other == this)
811 return true;
812 if (!other->get_projected_parent_dn())
813 break;
814 other = other->get_projected_parent_dn()->get_dir()->get_inode();
815 }
816 return false;
817}
818
819/*
820 * Because a non-directory inode may have multiple links, the use_parent
821 * argument allows selecting which parent to use for path construction. This
822 * argument is only meaningful for the final component (i.e. the first of the
823 * nested calls) because directories cannot have multiple hard links. If
824 * use_parent is NULL and projected is true, the primary parent's projected
825 * inode is used all the way up the path chain. Otherwise the primary parent
826 * stable inode is used.
827 */
828void CInode::make_path_string(string& s, bool projected, const CDentry *use_parent) const
829{
830 if (!use_parent) {
831 use_parent = projected ? get_projected_parent_dn() : parent;
832 }
833
834 if (use_parent) {
835 use_parent->make_path_string(s, projected);
836 } else if (is_root()) {
837 s = "";
838 } else if (is_mdsdir()) {
839 char t[40];
840 uint64_t eino(ino());
841 eino -= MDS_INO_MDSDIR_OFFSET;
842 snprintf(t, sizeof(t), "~mds%" PRId64, eino);
843 s = t;
844 } else {
845 char n[40];
846 uint64_t eino(ino());
847 snprintf(n, sizeof(n), "#%" PRIx64, eino);
848 s += n;
849 }
850}
851
852void CInode::make_path(filepath& fp, bool projected) const
853{
854 const CDentry *use_parent = projected ? get_projected_parent_dn() : parent;
855 if (use_parent) {
856 assert(!is_base());
857 use_parent->make_path(fp, projected);
858 } else {
859 fp = filepath(ino());
860 }
861}
862
863void CInode::name_stray_dentry(string& dname)
864{
865 char s[20];
866 snprintf(s, sizeof(s), "%llx", (unsigned long long)inode.ino.val);
867 dname = s;
868}
869
870version_t CInode::pre_dirty()
871{
872 version_t pv;
873 CDentry* _cdentry = get_projected_parent_dn();
874 if (_cdentry) {
875 pv = _cdentry->pre_dirty(get_projected_version());
876 dout(10) << "pre_dirty " << pv << " (current v " << inode.version << ")" << dendl;
877 } else {
878 assert(is_base());
879 pv = get_projected_version() + 1;
880 }
881 // force update backtrace for old format inode (see inode_t::decode)
882 if (inode.backtrace_version == 0 && !projected_nodes.empty()) {
883 inode_t *pi = projected_nodes.back()->inode;
884 if (pi->backtrace_version == 0)
885 pi->update_backtrace(pv);
886 }
887 return pv;
888}
889
890void CInode::_mark_dirty(LogSegment *ls)
891{
892 if (!state_test(STATE_DIRTY)) {
893 state_set(STATE_DIRTY);
894 get(PIN_DIRTY);
895 assert(ls);
896 }
897
898 // move myself to this segment's dirty list
899 if (ls)
900 ls->dirty_inodes.push_back(&item_dirty);
901}
902
903void CInode::mark_dirty(version_t pv, LogSegment *ls) {
904
905 dout(10) << "mark_dirty " << *this << dendl;
906
907 /*
908 NOTE: I may already be dirty, but this fn _still_ needs to be called so that
909 the directory is (perhaps newly) dirtied, and so that parent_dir_version is
910 updated below.
911 */
912
913 // only auth can get dirty. "dirty" async data in replicas is relative to
914 // filelock state, not the dirty flag.
915 assert(is_auth());
916
917 // touch my private version
918 assert(inode.version < pv);
919 inode.version = pv;
920 _mark_dirty(ls);
921
922 // mark dentry too
923 if (parent)
924 parent->mark_dirty(pv, ls);
925}
926
927
928void CInode::mark_clean()
929{
930 dout(10) << " mark_clean " << *this << dendl;
931 if (state_test(STATE_DIRTY)) {
932 state_clear(STATE_DIRTY);
933 put(PIN_DIRTY);
934
935 // remove myself from ls dirty list
936 item_dirty.remove_myself();
937 }
938}
939
940
941// --------------
942// per-inode storage
943// (currently for root inode only)
944
945struct C_IO_Inode_Stored : public CInodeIOContext {
946 version_t version;
947 Context *fin;
948 C_IO_Inode_Stored(CInode *i, version_t v, Context *f) : CInodeIOContext(i), version(v), fin(f) {}
949 void finish(int r) override {
950 in->_stored(r, version, fin);
951 }
952};
953
954object_t InodeStoreBase::get_object_name(inodeno_t ino, frag_t fg, const char *suffix)
955{
956 char n[60];
957 snprintf(n, sizeof(n), "%llx.%08llx%s", (long long unsigned)ino, (long long unsigned)fg, suffix ? suffix : "");
958 return object_t(n);
959}
960
961void CInode::store(MDSInternalContextBase *fin)
962{
963 dout(10) << "store " << get_version() << dendl;
964 assert(is_base());
965
966 if (snaprealm)
967 purge_stale_snap_data(snaprealm->get_snaps());
968
969 // encode
970 bufferlist bl;
971 string magic = CEPH_FS_ONDISK_MAGIC;
972 ::encode(magic, bl);
973 encode_store(bl, mdcache->mds->mdsmap->get_up_features());
974
975 // write it.
976 SnapContext snapc;
977 ObjectOperation m;
978 m.write_full(bl);
979
980 object_t oid = CInode::get_object_name(ino(), frag_t(), ".inode");
981 object_locator_t oloc(mdcache->mds->mdsmap->get_metadata_pool());
982
983 Context *newfin =
984 new C_OnFinisher(new C_IO_Inode_Stored(this, get_version(), fin),
985 mdcache->mds->finisher);
986 mdcache->mds->objecter->mutate(oid, oloc, m, snapc,
987 ceph::real_clock::now(), 0,
988 newfin);
989}
990
991void CInode::_stored(int r, version_t v, Context *fin)
992{
993 if (r < 0) {
994 dout(1) << "store error " << r << " v " << v << " on " << *this << dendl;
995 mdcache->mds->clog->error() << "failed to store ino " << ino() << " object,"
996 << " errno " << r;
997 mdcache->mds->handle_write_error(r);
998 fin->complete(r);
999 return;
1000 }
1001
1002 dout(10) << "_stored " << v << " on " << *this << dendl;
1003 if (v == get_projected_version())
1004 mark_clean();
1005
1006 fin->complete(0);
1007}
1008
1009void CInode::flush(MDSInternalContextBase *fin)
1010{
1011 dout(10) << "flush " << *this << dendl;
1012 assert(is_auth() && can_auth_pin());
1013
1014 MDSGatherBuilder gather(g_ceph_context);
1015
1016 if (is_dirty_parent()) {
1017 store_backtrace(gather.new_sub());
1018 }
1019 if (is_dirty()) {
1020 if (is_base()) {
1021 store(gather.new_sub());
1022 } else {
1023 parent->dir->commit(0, gather.new_sub());
1024 }
1025 }
1026
1027 if (gather.has_subs()) {
1028 gather.set_finisher(fin);
1029 gather.activate();
1030 } else {
1031 fin->complete(0);
1032 }
1033}
1034
1035struct C_IO_Inode_Fetched : public CInodeIOContext {
1036 bufferlist bl, bl2;
1037 Context *fin;
1038 C_IO_Inode_Fetched(CInode *i, Context *f) : CInodeIOContext(i), fin(f) {}
1039 void finish(int r) override {
1040 // Ignore 'r', because we fetch from two places, so r is usually ENOENT
1041 in->_fetched(bl, bl2, fin);
1042 }
1043};
1044
1045void CInode::fetch(MDSInternalContextBase *fin)
1046{
1047 dout(10) << "fetch" << dendl;
1048
1049 C_IO_Inode_Fetched *c = new C_IO_Inode_Fetched(this, fin);
1050 C_GatherBuilder gather(g_ceph_context, new C_OnFinisher(c, mdcache->mds->finisher));
1051
1052 object_t oid = CInode::get_object_name(ino(), frag_t(), "");
1053 object_locator_t oloc(mdcache->mds->mdsmap->get_metadata_pool());
1054
1055 // Old on-disk format: inode stored in xattr of a dirfrag
1056 ObjectOperation rd;
1057 rd.getxattr("inode", &c->bl, NULL);
1058 mdcache->mds->objecter->read(oid, oloc, rd, CEPH_NOSNAP, (bufferlist*)NULL, 0, gather.new_sub());
1059
1060 // Current on-disk format: inode stored in a .inode object
1061 object_t oid2 = CInode::get_object_name(ino(), frag_t(), ".inode");
1062 mdcache->mds->objecter->read(oid2, oloc, 0, 0, CEPH_NOSNAP, &c->bl2, 0, gather.new_sub());
1063
1064 gather.activate();
1065}
1066
1067void CInode::_fetched(bufferlist& bl, bufferlist& bl2, Context *fin)
1068{
1069 dout(10) << "_fetched got " << bl.length() << " and " << bl2.length() << dendl;
1070 bufferlist::iterator p;
1071 if (bl2.length()) {
1072 p = bl2.begin();
1073 } else if (bl.length()) {
1074 p = bl.begin();
1075 } else {
1076 derr << "No data while reading inode 0x" << std::hex << ino()
1077 << std::dec << dendl;
1078 fin->complete(-ENOENT);
1079 return;
1080 }
1081
1082 // Attempt decode
1083 try {
1084 string magic;
1085 ::decode(magic, p);
1086 dout(10) << " magic is '" << magic << "' (expecting '"
1087 << CEPH_FS_ONDISK_MAGIC << "')" << dendl;
1088 if (magic != CEPH_FS_ONDISK_MAGIC) {
1089 dout(0) << "on disk magic '" << magic << "' != my magic '" << CEPH_FS_ONDISK_MAGIC
1090 << "'" << dendl;
1091 fin->complete(-EINVAL);
1092 } else {
1093 decode_store(p);
1094 dout(10) << "_fetched " << *this << dendl;
1095 fin->complete(0);
1096 }
1097 } catch (buffer::error &err) {
1098 derr << "Corrupt inode 0x" << std::hex << ino() << std::dec
1099 << ": " << err << dendl;
1100 fin->complete(-EINVAL);
1101 return;
1102 }
1103}
1104
1105void CInode::build_backtrace(int64_t pool, inode_backtrace_t& bt)
1106{
1107 bt.ino = inode.ino;
1108 bt.ancestors.clear();
1109 bt.pool = pool;
1110
1111 CInode *in = this;
1112 CDentry *pdn = get_parent_dn();
1113 while (pdn) {
1114 CInode *diri = pdn->get_dir()->get_inode();
1115 bt.ancestors.push_back(inode_backpointer_t(diri->ino(), pdn->name, in->inode.version));
1116 in = diri;
1117 pdn = in->get_parent_dn();
1118 }
1119 for (compact_set<int64_t>::iterator i = inode.old_pools.begin();
1120 i != inode.old_pools.end();
1121 ++i) {
1122 // don't add our own pool id to old_pools to avoid looping (e.g. setlayout 0, 1, 0)
1123 if (*i != pool)
1124 bt.old_pools.insert(*i);
1125 }
1126}
1127
1128struct C_IO_Inode_StoredBacktrace : public CInodeIOContext {
1129 version_t version;
1130 Context *fin;
1131 C_IO_Inode_StoredBacktrace(CInode *i, version_t v, Context *f) : CInodeIOContext(i), version(v), fin(f) {}
1132 void finish(int r) override {
1133 in->_stored_backtrace(r, version, fin);
1134 }
1135};
1136
1137void CInode::store_backtrace(MDSInternalContextBase *fin, int op_prio)
1138{
1139 dout(10) << "store_backtrace on " << *this << dendl;
1140 assert(is_dirty_parent());
1141
1142 if (op_prio < 0)
1143 op_prio = CEPH_MSG_PRIO_DEFAULT;
1144
1145 auth_pin(this);
1146
1147 const int64_t pool = get_backtrace_pool();
1148 inode_backtrace_t bt;
1149 build_backtrace(pool, bt);
1150 bufferlist parent_bl;
1151 ::encode(bt, parent_bl);
1152
1153 ObjectOperation op;
1154 op.priority = op_prio;
1155 op.create(false);
1156 op.setxattr("parent", parent_bl);
1157
1158 bufferlist layout_bl;
1159 ::encode(inode.layout, layout_bl, mdcache->mds->mdsmap->get_up_features());
1160 op.setxattr("layout", layout_bl);
1161
1162 SnapContext snapc;
1163 object_t oid = get_object_name(ino(), frag_t(), "");
1164 object_locator_t oloc(pool);
1165 Context *fin2 = new C_OnFinisher(
1166 new C_IO_Inode_StoredBacktrace(this, inode.backtrace_version, fin),
1167 mdcache->mds->finisher);
1168
1169 if (!state_test(STATE_DIRTYPOOL) || inode.old_pools.empty()) {
1170 dout(20) << __func__ << ": no dirtypool or no old pools" << dendl;
1171 mdcache->mds->objecter->mutate(oid, oloc, op, snapc,
1172 ceph::real_clock::now(),
1173 0, fin2);
1174 return;
1175 }
1176
1177 C_GatherBuilder gather(g_ceph_context, fin2);
1178 mdcache->mds->objecter->mutate(oid, oloc, op, snapc,
1179 ceph::real_clock::now(),
1180 0, gather.new_sub());
1181
1182 // In the case where DIRTYPOOL is set, we update all old pools backtraces
1183 // such that anyone reading them will see the new pool ID in
1184 // inode_backtrace_t::pool and go read everything else from there.
1185 for (compact_set<int64_t>::iterator p = inode.old_pools.begin();
1186 p != inode.old_pools.end();
1187 ++p) {
1188 if (*p == pool)
1189 continue;
1190
1191 dout(20) << __func__ << ": updating old pool " << *p << dendl;
1192
1193 ObjectOperation op;
1194 op.priority = op_prio;
1195 op.create(false);
1196 op.setxattr("parent", parent_bl);
1197
1198 object_locator_t oloc(*p);
1199 mdcache->mds->objecter->mutate(oid, oloc, op, snapc,
1200 ceph::real_clock::now(),
1201 0, gather.new_sub());
1202 }
1203 gather.activate();
1204}
1205
1206void CInode::_stored_backtrace(int r, version_t v, Context *fin)
1207{
1208 if (r == -ENOENT) {
1209 const int64_t pool = get_backtrace_pool();
1210 bool exists = mdcache->mds->objecter->with_osdmap(
1211 [pool](const OSDMap &osd_map) {
1212 return osd_map.have_pg_pool(pool);
1213 });
1214
1215 // This ENOENT is because the pool doesn't exist (the user deleted it
1216 // out from under us), so the backtrace can never be written, so pretend
1217 // to succeed so that the user can proceed to e.g. delete the file.
1218 if (!exists) {
1219 dout(4) << "store_backtrace got ENOENT: a data pool was deleted "
1220 "beneath us!" << dendl;
1221 r = 0;
1222 }
1223 }
1224
1225 if (r < 0) {
1226 dout(1) << "store backtrace error " << r << " v " << v << dendl;
1227 mdcache->mds->clog->error() << "failed to store backtrace on ino "
1228 << ino() << " object"
1229 << ", pool " << get_backtrace_pool()
1230 << ", errno " << r;
1231 mdcache->mds->handle_write_error(r);
1232 if (fin)
1233 fin->complete(r);
1234 return;
1235 }
1236
1237 dout(10) << "_stored_backtrace v " << v << dendl;
1238
1239 auth_unpin(this);
1240 if (v == inode.backtrace_version)
1241 clear_dirty_parent();
1242 if (fin)
1243 fin->complete(0);
1244}
1245
1246void CInode::fetch_backtrace(Context *fin, bufferlist *backtrace)
1247{
1248 mdcache->fetch_backtrace(inode.ino, get_backtrace_pool(), *backtrace, fin);
1249}
1250
1251void CInode::_mark_dirty_parent(LogSegment *ls, bool dirty_pool)
1252{
1253 if (!state_test(STATE_DIRTYPARENT)) {
1254 dout(10) << "mark_dirty_parent" << dendl;
1255 state_set(STATE_DIRTYPARENT);
1256 get(PIN_DIRTYPARENT);
1257 assert(ls);
1258 }
1259 if (dirty_pool)
1260 state_set(STATE_DIRTYPOOL);
1261 if (ls)
1262 ls->dirty_parent_inodes.push_back(&item_dirty_parent);
1263}
1264
1265void CInode::clear_dirty_parent()
1266{
1267 if (state_test(STATE_DIRTYPARENT)) {
1268 dout(10) << "clear_dirty_parent" << dendl;
1269 state_clear(STATE_DIRTYPARENT);
1270 state_clear(STATE_DIRTYPOOL);
1271 put(PIN_DIRTYPARENT);
1272 item_dirty_parent.remove_myself();
1273 }
1274}
1275
1276void CInode::verify_diri_backtrace(bufferlist &bl, int err)
1277{
1278 if (is_base() || is_dirty_parent() || !is_auth())
1279 return;
1280
1281 dout(10) << "verify_diri_backtrace" << dendl;
1282
1283 if (err == 0) {
1284 inode_backtrace_t backtrace;
1285 ::decode(backtrace, bl);
1286 CDentry *pdn = get_parent_dn();
1287 if (backtrace.ancestors.empty() ||
1288 backtrace.ancestors[0].dname != pdn->name ||
1289 backtrace.ancestors[0].dirino != pdn->get_dir()->ino())
1290 err = -EINVAL;
1291 }
1292
1293 if (err) {
1294 MDSRank *mds = mdcache->mds;
1295 mds->clog->error() << "bad backtrace on dir ino " << ino();
1296 assert(!"bad backtrace" == (g_conf->mds_verify_backtrace > 1));
1297
1298 _mark_dirty_parent(mds->mdlog->get_current_segment(), false);
1299 mds->mdlog->flush();
1300 }
1301}
1302
1303// ------------------
1304// parent dir
1305
1306
1307void InodeStoreBase::encode_bare(bufferlist &bl, uint64_t features,
1308 const bufferlist *snap_blob) const
1309{
1310 ::encode(inode, bl, features);
1311 if (is_symlink())
1312 ::encode(symlink, bl);
1313 ::encode(dirfragtree, bl);
1314 ::encode(xattrs, bl);
1315 if (snap_blob)
1316 ::encode(*snap_blob, bl);
1317 else
1318 ::encode(bufferlist(), bl);
1319 ::encode(old_inodes, bl, features);
1320 ::encode(oldest_snap, bl);
1321 ::encode(damage_flags, bl);
1322}
1323
1324void InodeStoreBase::encode(bufferlist &bl, uint64_t features,
1325 const bufferlist *snap_blob) const
1326{
1327 ENCODE_START(6, 4, bl);
1328 encode_bare(bl, features, snap_blob);
1329 ENCODE_FINISH(bl);
1330}
1331
1332void CInode::encode_store(bufferlist& bl, uint64_t features)
1333{
1334 bufferlist snap_blob;
1335 encode_snap_blob(snap_blob);
1336 InodeStoreBase::encode(bl, mdcache->mds->mdsmap->get_up_features(),
1337 &snap_blob);
1338}
1339
1340void InodeStoreBase::decode_bare(bufferlist::iterator &bl,
1341 bufferlist& snap_blob, __u8 struct_v)
1342{
1343 ::decode(inode, bl);
1344 if (is_symlink())
1345 ::decode(symlink, bl);
1346 ::decode(dirfragtree, bl);
1347 ::decode(xattrs, bl);
1348 ::decode(snap_blob, bl);
1349
1350 ::decode(old_inodes, bl);
1351 if (struct_v == 2 && inode.is_dir()) {
1352 bool default_layout_exists;
1353 ::decode(default_layout_exists, bl);
1354 if (default_layout_exists) {
1355 ::decode(struct_v, bl); // this was a default_file_layout
1356 ::decode(inode.layout, bl); // but we only care about the layout portion
1357 }
1358 }
1359
1360 if (struct_v >= 5) {
1361 // InodeStore is embedded in dentries without proper versioning, so
1362 // we consume up to the end of the buffer
1363 if (!bl.end()) {
1364 ::decode(oldest_snap, bl);
1365 }
1366
1367 if (!bl.end()) {
1368 ::decode(damage_flags, bl);
1369 }
1370 }
1371}
1372
1373
1374void InodeStoreBase::decode(bufferlist::iterator &bl, bufferlist& snap_blob)
1375{
1376 DECODE_START_LEGACY_COMPAT_LEN(5, 4, 4, bl);
1377 decode_bare(bl, snap_blob, struct_v);
1378 DECODE_FINISH(bl);
1379}
1380
1381void CInode::decode_store(bufferlist::iterator& bl)
1382{
1383 bufferlist snap_blob;
1384 InodeStoreBase::decode(bl, snap_blob);
1385 decode_snap_blob(snap_blob);
1386}
1387
1388// ------------------
1389// locking
1390
1391void CInode::set_object_info(MDSCacheObjectInfo &info)
1392{
1393 info.ino = ino();
1394 info.snapid = last;
1395}
1396
1397void CInode::encode_lock_state(int type, bufferlist& bl)
1398{
1399 ::encode(first, bl);
1400
1401 switch (type) {
1402 case CEPH_LOCK_IAUTH:
1403 ::encode(inode.version, bl);
1404 ::encode(inode.ctime, bl);
1405 ::encode(inode.mode, bl);
1406 ::encode(inode.uid, bl);
1407 ::encode(inode.gid, bl);
1408 break;
1409
1410 case CEPH_LOCK_ILINK:
1411 ::encode(inode.version, bl);
1412 ::encode(inode.ctime, bl);
1413 ::encode(inode.nlink, bl);
1414 break;
1415
1416 case CEPH_LOCK_IDFT:
1417 if (is_auth()) {
1418 ::encode(inode.version, bl);
1419 } else {
1420 // treat flushing as dirty when rejoining cache
1421 bool dirty = dirfragtreelock.is_dirty_or_flushing();
1422 ::encode(dirty, bl);
1423 }
1424 {
1425 // encode the raw tree
1426 ::encode(dirfragtree, bl);
1427
1428 // also specify which frags are mine
1429 set<frag_t> myfrags;
1430 list<CDir*> dfls;
1431 get_dirfrags(dfls);
1432 for (list<CDir*>::iterator p = dfls.begin(); p != dfls.end(); ++p)
1433 if ((*p)->is_auth()) {
1434 frag_t fg = (*p)->get_frag();
1435 myfrags.insert(fg);
1436 }
1437 ::encode(myfrags, bl);
1438 }
1439 break;
1440
1441 case CEPH_LOCK_IFILE:
1442 if (is_auth()) {
1443 ::encode(inode.version, bl);
1444 ::encode(inode.ctime, bl);
1445 ::encode(inode.mtime, bl);
1446 ::encode(inode.atime, bl);
1447 ::encode(inode.time_warp_seq, bl);
1448 if (!is_dir()) {
1449 ::encode(inode.layout, bl, mdcache->mds->mdsmap->get_up_features());
1450 ::encode(inode.size, bl);
1451 ::encode(inode.truncate_seq, bl);
1452 ::encode(inode.truncate_size, bl);
1453 ::encode(inode.client_ranges, bl);
1454 ::encode(inode.inline_data, bl);
1455 }
1456 } else {
1457 // treat flushing as dirty when rejoining cache
1458 bool dirty = filelock.is_dirty_or_flushing();
1459 ::encode(dirty, bl);
1460 }
1461
1462 {
1463 dout(15) << "encode_lock_state inode.dirstat is " << inode.dirstat << dendl;
1464 ::encode(inode.dirstat, bl); // only meaningful if i am auth.
1465 bufferlist tmp;
1466 __u32 n = 0;
1467 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
1468 p != dirfrags.end();
1469 ++p) {
1470 frag_t fg = p->first;
1471 CDir *dir = p->second;
1472 if (is_auth() || dir->is_auth()) {
1473 fnode_t *pf = dir->get_projected_fnode();
1474 dout(15) << fg << " " << *dir << dendl;
1475 dout(20) << fg << " fragstat " << pf->fragstat << dendl;
1476 dout(20) << fg << " accounted_fragstat " << pf->accounted_fragstat << dendl;
1477 ::encode(fg, tmp);
1478 ::encode(dir->first, tmp);
1479 ::encode(pf->fragstat, tmp);
1480 ::encode(pf->accounted_fragstat, tmp);
1481 n++;
1482 }
1483 }
1484 ::encode(n, bl);
1485 bl.claim_append(tmp);
1486 }
1487 break;
1488
1489 case CEPH_LOCK_INEST:
1490 if (is_auth()) {
1491 ::encode(inode.version, bl);
1492 } else {
1493 // treat flushing as dirty when rejoining cache
1494 bool dirty = nestlock.is_dirty_or_flushing();
1495 ::encode(dirty, bl);
1496 }
1497 {
1498 dout(15) << "encode_lock_state inode.rstat is " << inode.rstat << dendl;
1499 ::encode(inode.rstat, bl); // only meaningful if i am auth.
1500 bufferlist tmp;
1501 __u32 n = 0;
1502 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
1503 p != dirfrags.end();
1504 ++p) {
1505 frag_t fg = p->first;
1506 CDir *dir = p->second;
1507 if (is_auth() || dir->is_auth()) {
1508 fnode_t *pf = dir->get_projected_fnode();
1509 dout(10) << fg << " " << *dir << dendl;
1510 dout(10) << fg << " " << pf->rstat << dendl;
1511 dout(10) << fg << " " << pf->rstat << dendl;
1512 dout(10) << fg << " " << dir->dirty_old_rstat << dendl;
1513 ::encode(fg, tmp);
1514 ::encode(dir->first, tmp);
1515 ::encode(pf->rstat, tmp);
1516 ::encode(pf->accounted_rstat, tmp);
1517 ::encode(dir->dirty_old_rstat, tmp);
1518 n++;
1519 }
1520 }
1521 ::encode(n, bl);
1522 bl.claim_append(tmp);
1523 }
1524 break;
1525
1526 case CEPH_LOCK_IXATTR:
1527 ::encode(inode.version, bl);
1528 ::encode(inode.ctime, bl);
1529 ::encode(xattrs, bl);
1530 break;
1531
1532 case CEPH_LOCK_ISNAP:
1533 ::encode(inode.version, bl);
1534 ::encode(inode.ctime, bl);
1535 encode_snap(bl);
1536 break;
1537
1538 case CEPH_LOCK_IFLOCK:
1539 ::encode(inode.version, bl);
1540 _encode_file_locks(bl);
1541 break;
1542
1543 case CEPH_LOCK_IPOLICY:
1544 if (inode.is_dir()) {
1545 ::encode(inode.version, bl);
1546 ::encode(inode.ctime, bl);
1547 ::encode(inode.layout, bl, mdcache->mds->mdsmap->get_up_features());
1548 ::encode(inode.quota, bl);
1549 ::encode(inode.export_pin, bl);
1550 }
1551 break;
1552
1553 default:
1554 ceph_abort();
1555 }
1556}
1557
1558
1559/* for more info on scatterlocks, see comments by Locker::scatter_writebehind */
1560
1561void CInode::decode_lock_state(int type, bufferlist& bl)
1562{
1563 bufferlist::iterator p = bl.begin();
1564 utime_t tm;
1565
1566 snapid_t newfirst;
1567 ::decode(newfirst, p);
1568
1569 if (!is_auth() && newfirst != first) {
1570 dout(10) << "decode_lock_state first " << first << " -> " << newfirst << dendl;
1571 assert(newfirst > first);
1572 if (!is_multiversion() && parent) {
1573 assert(parent->first == first);
1574 parent->first = newfirst;
1575 }
1576 first = newfirst;
1577 }
1578
1579 switch (type) {
1580 case CEPH_LOCK_IAUTH:
1581 ::decode(inode.version, p);
1582 ::decode(tm, p);
1583 if (inode.ctime < tm) inode.ctime = tm;
1584 ::decode(inode.mode, p);
1585 ::decode(inode.uid, p);
1586 ::decode(inode.gid, p);
1587 break;
1588
1589 case CEPH_LOCK_ILINK:
1590 ::decode(inode.version, p);
1591 ::decode(tm, p);
1592 if (inode.ctime < tm) inode.ctime = tm;
1593 ::decode(inode.nlink, p);
1594 break;
1595
1596 case CEPH_LOCK_IDFT:
1597 if (is_auth()) {
1598 bool replica_dirty;
1599 ::decode(replica_dirty, p);
1600 if (replica_dirty) {
1601 dout(10) << "decode_lock_state setting dftlock dirty flag" << dendl;
1602 dirfragtreelock.mark_dirty(); // ok bc we're auth and caller will handle
1603 }
1604 } else {
1605 ::decode(inode.version, p);
1606 }
1607 {
1608 fragtree_t temp;
1609 ::decode(temp, p);
1610 set<frag_t> authfrags;
1611 ::decode(authfrags, p);
1612 if (is_auth()) {
1613 // auth. believe replica's auth frags only.
1614 for (set<frag_t>::iterator p = authfrags.begin(); p != authfrags.end(); ++p)
1615 if (!dirfragtree.is_leaf(*p)) {
1616 dout(10) << " forcing frag " << *p << " to leaf (split|merge)" << dendl;
1617 dirfragtree.force_to_leaf(g_ceph_context, *p);
1618 dirfragtreelock.mark_dirty(); // ok bc we're auth and caller will handle
1619 }
1620 } else {
1621 // replica. take the new tree, BUT make sure any open
1622 // dirfrags remain leaves (they may have split _after_ this
1623 // dft was scattered, or we may still be be waiting on the
1624 // notify from the auth)
1625 dirfragtree.swap(temp);
1626 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
1627 p != dirfrags.end();
1628 ++p) {
1629 if (!dirfragtree.is_leaf(p->first)) {
1630 dout(10) << " forcing open dirfrag " << p->first << " to leaf (racing with split|merge)" << dendl;
1631 dirfragtree.force_to_leaf(g_ceph_context, p->first);
1632 }
1633 if (p->second->is_auth())
1634 p->second->state_clear(CDir::STATE_DIRTYDFT);
1635 }
1636 }
1637 if (g_conf->mds_debug_frag)
1638 verify_dirfrags();
1639 }
1640 break;
1641
1642 case CEPH_LOCK_IFILE:
1643 if (!is_auth()) {
1644 ::decode(inode.version, p);
1645 ::decode(tm, p);
1646 if (inode.ctime < tm) inode.ctime = tm;
1647 ::decode(inode.mtime, p);
1648 ::decode(inode.atime, p);
1649 ::decode(inode.time_warp_seq, p);
1650 if (!is_dir()) {
1651 ::decode(inode.layout, p);
1652 ::decode(inode.size, p);
1653 ::decode(inode.truncate_seq, p);
1654 ::decode(inode.truncate_size, p);
1655 ::decode(inode.client_ranges, p);
1656 ::decode(inode.inline_data, p);
1657 }
1658 } else {
1659 bool replica_dirty;
1660 ::decode(replica_dirty, p);
1661 if (replica_dirty) {
1662 dout(10) << "decode_lock_state setting filelock dirty flag" << dendl;
1663 filelock.mark_dirty(); // ok bc we're auth and caller will handle
1664 }
1665 }
1666 {
1667 frag_info_t dirstat;
1668 ::decode(dirstat, p);
1669 if (!is_auth()) {
1670 dout(10) << " taking inode dirstat " << dirstat << " for " << *this << dendl;
1671 inode.dirstat = dirstat; // take inode summation if replica
1672 }
1673 __u32 n;
1674 ::decode(n, p);
1675 dout(10) << " ...got " << n << " fragstats on " << *this << dendl;
1676 while (n--) {
1677 frag_t fg;
1678 snapid_t fgfirst;
1679 frag_info_t fragstat;
1680 frag_info_t accounted_fragstat;
1681 ::decode(fg, p);
1682 ::decode(fgfirst, p);
1683 ::decode(fragstat, p);
1684 ::decode(accounted_fragstat, p);
1685 dout(10) << fg << " [" << fgfirst << ",head] " << dendl;
1686 dout(10) << fg << " fragstat " << fragstat << dendl;
1687 dout(20) << fg << " accounted_fragstat " << accounted_fragstat << dendl;
1688
1689 CDir *dir = get_dirfrag(fg);
1690 if (is_auth()) {
1691 assert(dir); // i am auth; i had better have this dir open
1692 dout(10) << fg << " first " << dir->first << " -> " << fgfirst
1693 << " on " << *dir << dendl;
1694 dir->first = fgfirst;
1695 dir->fnode.fragstat = fragstat;
1696 dir->fnode.accounted_fragstat = accounted_fragstat;
1697 dir->first = fgfirst;
1698 if (!(fragstat == accounted_fragstat)) {
1699 dout(10) << fg << " setting filelock updated flag" << dendl;
1700 filelock.mark_dirty(); // ok bc we're auth and caller will handle
1701 }
1702 } else {
1703 if (dir && dir->is_auth()) {
1704 dout(10) << fg << " first " << dir->first << " -> " << fgfirst
1705 << " on " << *dir << dendl;
1706 dir->first = fgfirst;
1707 fnode_t *pf = dir->get_projected_fnode();
1708 finish_scatter_update(&filelock, dir,
1709 inode.dirstat.version, pf->accounted_fragstat.version);
1710 }
1711 }
1712 }
1713 }
1714 break;
1715
1716 case CEPH_LOCK_INEST:
1717 if (is_auth()) {
1718 bool replica_dirty;
1719 ::decode(replica_dirty, p);
1720 if (replica_dirty) {
1721 dout(10) << "decode_lock_state setting nestlock dirty flag" << dendl;
1722 nestlock.mark_dirty(); // ok bc we're auth and caller will handle
1723 }
1724 } else {
1725 ::decode(inode.version, p);
1726 }
1727 {
1728 nest_info_t rstat;
1729 ::decode(rstat, p);
1730 if (!is_auth()) {
1731 dout(10) << " taking inode rstat " << rstat << " for " << *this << dendl;
1732 inode.rstat = rstat; // take inode summation if replica
1733 }
1734 __u32 n;
1735 ::decode(n, p);
1736 while (n--) {
1737 frag_t fg;
1738 snapid_t fgfirst;
1739 nest_info_t rstat;
1740 nest_info_t accounted_rstat;
1741 compact_map<snapid_t,old_rstat_t> dirty_old_rstat;
1742 ::decode(fg, p);
1743 ::decode(fgfirst, p);
1744 ::decode(rstat, p);
1745 ::decode(accounted_rstat, p);
1746 ::decode(dirty_old_rstat, p);
1747 dout(10) << fg << " [" << fgfirst << ",head]" << dendl;
1748 dout(10) << fg << " rstat " << rstat << dendl;
1749 dout(10) << fg << " accounted_rstat " << accounted_rstat << dendl;
1750 dout(10) << fg << " dirty_old_rstat " << dirty_old_rstat << dendl;
1751
1752 CDir *dir = get_dirfrag(fg);
1753 if (is_auth()) {
1754 assert(dir); // i am auth; i had better have this dir open
1755 dout(10) << fg << " first " << dir->first << " -> " << fgfirst
1756 << " on " << *dir << dendl;
1757 dir->first = fgfirst;
1758 dir->fnode.rstat = rstat;
1759 dir->fnode.accounted_rstat = accounted_rstat;
1760 dir->dirty_old_rstat.swap(dirty_old_rstat);
1761 if (!(rstat == accounted_rstat) || !dir->dirty_old_rstat.empty()) {
1762 dout(10) << fg << " setting nestlock updated flag" << dendl;
1763 nestlock.mark_dirty(); // ok bc we're auth and caller will handle
1764 }
1765 } else {
1766 if (dir && dir->is_auth()) {
1767 dout(10) << fg << " first " << dir->first << " -> " << fgfirst
1768 << " on " << *dir << dendl;
1769 dir->first = fgfirst;
1770 fnode_t *pf = dir->get_projected_fnode();
1771 finish_scatter_update(&nestlock, dir,
1772 inode.rstat.version, pf->accounted_rstat.version);
1773 }
1774 }
1775 }
1776 }
1777 break;
1778
1779 case CEPH_LOCK_IXATTR:
1780 ::decode(inode.version, p);
1781 ::decode(tm, p);
1782 if (inode.ctime < tm) inode.ctime = tm;
1783 ::decode(xattrs, p);
1784 break;
1785
1786 case CEPH_LOCK_ISNAP:
1787 {
1788 ::decode(inode.version, p);
1789 ::decode(tm, p);
1790 if (inode.ctime < tm) inode.ctime = tm;
1791 snapid_t seq = 0;
1792 if (snaprealm)
1793 seq = snaprealm->srnode.seq;
1794 decode_snap(p);
1795 if (snaprealm && snaprealm->srnode.seq != seq)
1796 mdcache->do_realm_invalidate_and_update_notify(this, seq ? CEPH_SNAP_OP_UPDATE:CEPH_SNAP_OP_SPLIT);
1797 }
1798 break;
1799
1800 case CEPH_LOCK_IFLOCK:
1801 ::decode(inode.version, p);
1802 _decode_file_locks(p);
1803 break;
1804
1805 case CEPH_LOCK_IPOLICY:
1806 if (inode.is_dir()) {
1807 ::decode(inode.version, p);
1808 ::decode(tm, p);
1809 if (inode.ctime < tm) inode.ctime = tm;
1810 ::decode(inode.layout, p);
1811 ::decode(inode.quota, p);
1812 ::decode(inode.export_pin, p);
1813 maybe_export_pin();
1814 }
1815 break;
1816
1817 default:
1818 ceph_abort();
1819 }
1820}
1821
1822
1823bool CInode::is_dirty_scattered()
1824{
1825 return
1826 filelock.is_dirty_or_flushing() ||
1827 nestlock.is_dirty_or_flushing() ||
1828 dirfragtreelock.is_dirty_or_flushing();
1829}
1830
1831void CInode::clear_scatter_dirty()
1832{
1833 filelock.remove_dirty();
1834 nestlock.remove_dirty();
1835 dirfragtreelock.remove_dirty();
1836}
1837
1838void CInode::clear_dirty_scattered(int type)
1839{
1840 dout(10) << "clear_dirty_scattered " << type << " on " << *this << dendl;
1841 switch (type) {
1842 case CEPH_LOCK_IFILE:
1843 item_dirty_dirfrag_dir.remove_myself();
1844 break;
1845
1846 case CEPH_LOCK_INEST:
1847 item_dirty_dirfrag_nest.remove_myself();
1848 break;
1849
1850 case CEPH_LOCK_IDFT:
1851 item_dirty_dirfrag_dirfragtree.remove_myself();
1852 break;
1853
1854 default:
1855 ceph_abort();
1856 }
1857}
1858
1859
1860/*
1861 * when we initially scatter a lock, we need to check if any of the dirfrags
1862 * have out of date accounted_rstat/fragstat. if so, mark the lock stale.
1863 */
1864/* for more info on scatterlocks, see comments by Locker::scatter_writebehind */
1865void CInode::start_scatter(ScatterLock *lock)
1866{
1867 dout(10) << "start_scatter " << *lock << " on " << *this << dendl;
1868 assert(is_auth());
1869 inode_t *pi = get_projected_inode();
1870
1871 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
1872 p != dirfrags.end();
1873 ++p) {
1874 frag_t fg = p->first;
1875 CDir *dir = p->second;
1876 fnode_t *pf = dir->get_projected_fnode();
1877 dout(20) << fg << " " << *dir << dendl;
1878
1879 if (!dir->is_auth())
1880 continue;
1881
1882 switch (lock->get_type()) {
1883 case CEPH_LOCK_IFILE:
1884 finish_scatter_update(lock, dir, pi->dirstat.version, pf->accounted_fragstat.version);
1885 break;
1886
1887 case CEPH_LOCK_INEST:
1888 finish_scatter_update(lock, dir, pi->rstat.version, pf->accounted_rstat.version);
1889 break;
1890
1891 case CEPH_LOCK_IDFT:
1892 dir->state_clear(CDir::STATE_DIRTYDFT);
1893 break;
1894 }
1895 }
1896}
1897
1898
1899class C_Inode_FragUpdate : public MDSLogContextBase {
1900protected:
1901 CInode *in;
1902 CDir *dir;
1903 MutationRef mut;
1904 MDSRank *get_mds() override {return in->mdcache->mds;}
1905 void finish(int r) override {
1906 in->_finish_frag_update(dir, mut);
1907 }
1908
1909public:
1910 C_Inode_FragUpdate(CInode *i, CDir *d, MutationRef& m) : in(i), dir(d), mut(m) {}
1911};
1912
1913void CInode::finish_scatter_update(ScatterLock *lock, CDir *dir,
1914 version_t inode_version, version_t dir_accounted_version)
1915{
1916 frag_t fg = dir->get_frag();
1917 assert(dir->is_auth());
1918
1919 if (dir->is_frozen()) {
1920 dout(10) << "finish_scatter_update " << fg << " frozen, marking " << *lock << " stale " << *dir << dendl;
1921 } else if (dir->get_version() == 0) {
1922 dout(10) << "finish_scatter_update " << fg << " not loaded, marking " << *lock << " stale " << *dir << dendl;
1923 } else {
1924 if (dir_accounted_version != inode_version) {
1925 dout(10) << "finish_scatter_update " << fg << " journaling accounted scatterstat update v" << inode_version << dendl;
1926
1927 MDLog *mdlog = mdcache->mds->mdlog;
1928 MutationRef mut(new MutationImpl());
1929 mut->ls = mdlog->get_current_segment();
1930
1931 inode_t *pi = get_projected_inode();
1932 fnode_t *pf = dir->project_fnode();
1933 pf->version = dir->pre_dirty();
1934
1935 const char *ename = 0;
1936 switch (lock->get_type()) {
1937 case CEPH_LOCK_IFILE:
1938 pf->fragstat.version = pi->dirstat.version;
1939 pf->accounted_fragstat = pf->fragstat;
1940 ename = "lock ifile accounted scatter stat update";
1941 break;
1942 case CEPH_LOCK_INEST:
1943 pf->rstat.version = pi->rstat.version;
1944 pf->accounted_rstat = pf->rstat;
1945 ename = "lock inest accounted scatter stat update";
1946 break;
1947 default:
1948 ceph_abort();
1949 }
1950
1951 mut->add_projected_fnode(dir);
1952
1953 EUpdate *le = new EUpdate(mdlog, ename);
1954 mdlog->start_entry(le);
1955 le->metablob.add_dir_context(dir);
1956 le->metablob.add_dir(dir, true);
1957
1958 assert(!dir->is_frozen());
1959 mut->auth_pin(dir);
1960
1961 mdlog->submit_entry(le, new C_Inode_FragUpdate(this, dir, mut));
1962 } else {
1963 dout(10) << "finish_scatter_update " << fg << " accounted " << *lock
1964 << " scatter stat unchanged at v" << dir_accounted_version << dendl;
1965 }
1966 }
1967}
1968
1969void CInode::_finish_frag_update(CDir *dir, MutationRef& mut)
1970{
1971 dout(10) << "_finish_frag_update on " << *dir << dendl;
1972 mut->apply();
1973 mut->cleanup();
1974}
1975
1976
1977/*
1978 * when we gather a lock, we need to assimilate dirfrag changes into the inode
1979 * state. it's possible we can't update the dirfrag accounted_rstat/fragstat
1980 * because the frag is auth and frozen, or that the replica couldn't for the same
1981 * reason. hopefully it will get updated the next time the lock cycles.
1982 *
1983 * we have two dimensions of behavior:
1984 * - we may be (auth and !frozen), and able to update, or not.
1985 * - the frag may be stale, or not.
1986 *
1987 * if the frag is non-stale, we want to assimilate the diff into the
1988 * inode, regardless of whether it's auth or updateable.
1989 *
1990 * if we update the frag, we want to set accounted_fragstat = frag,
1991 * both if we took the diff or it was stale and we are making it
1992 * un-stale.
1993 */
1994/* for more info on scatterlocks, see comments by Locker::scatter_writebehind */
1995void CInode::finish_scatter_gather_update(int type)
1996{
1997 LogChannelRef clog = mdcache->mds->clog;
1998
1999 dout(10) << "finish_scatter_gather_update " << type << " on " << *this << dendl;
2000 assert(is_auth());
2001
2002 switch (type) {
2003 case CEPH_LOCK_IFILE:
2004 {
2005 fragtree_t tmpdft = dirfragtree;
2006 struct frag_info_t dirstat;
2007 bool dirstat_valid = true;
2008
2009 // adjust summation
2010 assert(is_auth());
2011 inode_t *pi = get_projected_inode();
2012
2013 bool touched_mtime = false, touched_chattr = false;
2014 dout(20) << " orig dirstat " << pi->dirstat << dendl;
2015 pi->dirstat.version++;
2016 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
2017 p != dirfrags.end();
2018 ++p) {
2019 frag_t fg = p->first;
2020 CDir *dir = p->second;
2021 dout(20) << fg << " " << *dir << dendl;
2022
2023 bool update;
2024 if (dir->get_version() != 0) {
2025 update = dir->is_auth() && !dir->is_frozen();
2026 } else {
2027 update = false;
2028 dirstat_valid = false;
2029 }
2030
2031 fnode_t *pf = dir->get_projected_fnode();
2032 if (update)
2033 pf = dir->project_fnode();
2034
2035 if (pf->accounted_fragstat.version == pi->dirstat.version - 1) {
2036 dout(20) << fg << " fragstat " << pf->fragstat << dendl;
2037 dout(20) << fg << " accounted_fragstat " << pf->accounted_fragstat << dendl;
2038 pi->dirstat.add_delta(pf->fragstat, pf->accounted_fragstat, &touched_mtime, &touched_chattr);
2039 } else {
2040 dout(20) << fg << " skipping STALE accounted_fragstat " << pf->accounted_fragstat << dendl;
2041 }
2042
2043 if (pf->fragstat.nfiles < 0 ||
2044 pf->fragstat.nsubdirs < 0) {
2045 clog->error() << "bad/negative dir size on "
2046 << dir->dirfrag() << " " << pf->fragstat;
2047 assert(!"bad/negative fragstat" == g_conf->mds_verify_scatter);
2048
2049 if (pf->fragstat.nfiles < 0)
2050 pf->fragstat.nfiles = 0;
2051 if (pf->fragstat.nsubdirs < 0)
2052 pf->fragstat.nsubdirs = 0;
2053 }
2054
2055 if (update) {
2056 pf->accounted_fragstat = pf->fragstat;
2057 pf->fragstat.version = pf->accounted_fragstat.version = pi->dirstat.version;
2058 dout(10) << fg << " updated accounted_fragstat " << pf->fragstat << " on " << *dir << dendl;
2059 }
2060
2061 tmpdft.force_to_leaf(g_ceph_context, fg);
2062 dirstat.add(pf->fragstat);
2063 }
2064 if (touched_mtime)
2065 pi->mtime = pi->ctime = pi->dirstat.mtime;
2066 if (touched_chattr)
2067 pi->change_attr = pi->dirstat.change_attr;
2068 dout(20) << " final dirstat " << pi->dirstat << dendl;
2069
2070 if (dirstat_valid && !dirstat.same_sums(pi->dirstat)) {
2071 list<frag_t> ls;
2072 tmpdft.get_leaves_under(frag_t(), ls);
2073 for (list<frag_t>::iterator p = ls.begin(); p != ls.end(); ++p)
2074 if (!dirfrags.count(*p)) {
2075 dirstat_valid = false;
2076 break;
2077 }
2078 if (dirstat_valid) {
2079 if (state_test(CInode::STATE_REPAIRSTATS)) {
2080 dout(20) << " dirstat mismatch, fixing" << dendl;
2081 } else {
2082 clog->error() << "unmatched fragstat on " << ino() << ", inode has "
2083 << pi->dirstat << ", dirfrags have " << dirstat;
2084 assert(!"unmatched fragstat" == g_conf->mds_verify_scatter);
2085 }
2086 // trust the dirfrags for now
2087 version_t v = pi->dirstat.version;
2088 if (pi->dirstat.mtime > dirstat.mtime)
2089 dirstat.mtime = pi->dirstat.mtime;
2090 if (pi->dirstat.change_attr > dirstat.change_attr)
2091 dirstat.change_attr = pi->dirstat.change_attr;
2092 pi->dirstat = dirstat;
2093 pi->dirstat.version = v;
2094 }
2095 }
2096
2097 if (pi->dirstat.nfiles < 0 ||
2098 pi->dirstat.nsubdirs < 0) {
2099 clog->error() << "bad/negative fragstat on " << ino()
2100 << ", inode has " << pi->dirstat;
2101 assert(!"bad/negative fragstat" == g_conf->mds_verify_scatter);
2102
2103 if (pi->dirstat.nfiles < 0)
2104 pi->dirstat.nfiles = 0;
2105 if (pi->dirstat.nsubdirs < 0)
2106 pi->dirstat.nsubdirs = 0;
2107 }
2108 }
2109 break;
2110
2111 case CEPH_LOCK_INEST:
2112 {
2113 fragtree_t tmpdft = dirfragtree;
2114 nest_info_t rstat;
2115 rstat.rsubdirs = 1;
2116 bool rstat_valid = true;
2117
2118 // adjust summation
2119 assert(is_auth());
2120 inode_t *pi = get_projected_inode();
2121 dout(20) << " orig rstat " << pi->rstat << dendl;
2122 pi->rstat.version++;
2123 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
2124 p != dirfrags.end();
2125 ++p) {
2126 frag_t fg = p->first;
2127 CDir *dir = p->second;
2128 dout(20) << fg << " " << *dir << dendl;
2129
2130 bool update;
2131 if (dir->get_version() != 0) {
2132 update = dir->is_auth() && !dir->is_frozen();
2133 } else {
2134 update = false;
2135 rstat_valid = false;
2136 }
2137
2138 fnode_t *pf = dir->get_projected_fnode();
2139 if (update)
2140 pf = dir->project_fnode();
2141
2142 if (pf->accounted_rstat.version == pi->rstat.version-1) {
2143 // only pull this frag's dirty rstat inodes into the frag if
2144 // the frag is non-stale and updateable. if it's stale,
2145 // that info will just get thrown out!
2146 if (update)
2147 dir->assimilate_dirty_rstat_inodes();
2148
2149 dout(20) << fg << " rstat " << pf->rstat << dendl;
2150 dout(20) << fg << " accounted_rstat " << pf->accounted_rstat << dendl;
2151 dout(20) << fg << " dirty_old_rstat " << dir->dirty_old_rstat << dendl;
2152 mdcache->project_rstat_frag_to_inode(pf->rstat, pf->accounted_rstat,
2153 dir->first, CEPH_NOSNAP, this, true);
2154 for (compact_map<snapid_t,old_rstat_t>::iterator q = dir->dirty_old_rstat.begin();
2155 q != dir->dirty_old_rstat.end();
2156 ++q)
2157 mdcache->project_rstat_frag_to_inode(q->second.rstat, q->second.accounted_rstat,
2158 q->second.first, q->first, this, true);
2159 if (update) // dir contents not valid if frozen or non-auth
2160 dir->check_rstats();
2161 } else {
2162 dout(20) << fg << " skipping STALE accounted_rstat " << pf->accounted_rstat << dendl;
2163 }
2164 if (update) {
2165 pf->accounted_rstat = pf->rstat;
2166 dir->dirty_old_rstat.clear();
2167 pf->rstat.version = pf->accounted_rstat.version = pi->rstat.version;
2168 dir->check_rstats();
2169 dout(10) << fg << " updated accounted_rstat " << pf->rstat << " on " << *dir << dendl;
2170 }
2171
2172 tmpdft.force_to_leaf(g_ceph_context, fg);
2173 rstat.add(pf->rstat);
2174 }
2175 dout(20) << " final rstat " << pi->rstat << dendl;
2176
2177 if (rstat_valid && !rstat.same_sums(pi->rstat)) {
2178 list<frag_t> ls;
2179 tmpdft.get_leaves_under(frag_t(), ls);
2180 for (list<frag_t>::iterator p = ls.begin(); p != ls.end(); ++p)
2181 if (!dirfrags.count(*p)) {
2182 rstat_valid = false;
2183 break;
2184 }
2185 if (rstat_valid) {
2186 if (state_test(CInode::STATE_REPAIRSTATS)) {
2187 dout(20) << " rstat mismatch, fixing" << dendl;
2188 } else {
2189 clog->error() << "unmatched rstat on " << ino() << ", inode has "
2190 << pi->rstat << ", dirfrags have " << rstat;
2191 assert(!"unmatched rstat" == g_conf->mds_verify_scatter);
2192 }
2193 // trust the dirfrag for now
2194 version_t v = pi->rstat.version;
2195 if (pi->rstat.rctime > rstat.rctime)
2196 rstat.rctime = pi->rstat.rctime;
2197 pi->rstat = rstat;
2198 pi->rstat.version = v;
2199 }
2200 }
2201
2202 mdcache->broadcast_quota_to_client(this);
2203 }
2204 break;
2205
2206 case CEPH_LOCK_IDFT:
2207 break;
2208
2209 default:
2210 ceph_abort();
2211 }
2212}
2213
2214void CInode::finish_scatter_gather_update_accounted(int type, MutationRef& mut, EMetaBlob *metablob)
2215{
2216 dout(10) << "finish_scatter_gather_update_accounted " << type << " on " << *this << dendl;
2217 assert(is_auth());
2218
2219 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
2220 p != dirfrags.end();
2221 ++p) {
2222 CDir *dir = p->second;
2223 if (!dir->is_auth() || dir->get_version() == 0 || dir->is_frozen())
2224 continue;
2225
2226 if (type == CEPH_LOCK_IDFT)
2227 continue; // nothing to do.
2228
2229 dout(10) << " journaling updated frag accounted_ on " << *dir << dendl;
2230 assert(dir->is_projected());
2231 fnode_t *pf = dir->get_projected_fnode();
2232 pf->version = dir->pre_dirty();
2233 mut->add_projected_fnode(dir);
2234 metablob->add_dir(dir, true);
2235 mut->auth_pin(dir);
2236
2237 if (type == CEPH_LOCK_INEST)
2238 dir->assimilate_dirty_rstat_inodes_finish(mut, metablob);
2239 }
2240}
2241
2242// waiting
2243
2244bool CInode::is_frozen() const
2245{
2246 if (is_frozen_inode()) return true;
2247 if (parent && parent->dir->is_frozen()) return true;
2248 return false;
2249}
2250
2251bool CInode::is_frozen_dir() const
2252{
2253 if (parent && parent->dir->is_frozen_dir()) return true;
2254 return false;
2255}
2256
2257bool CInode::is_freezing() const
2258{
2259 if (is_freezing_inode()) return true;
2260 if (parent && parent->dir->is_freezing()) return true;
2261 return false;
2262}
2263
2264void CInode::add_dir_waiter(frag_t fg, MDSInternalContextBase *c)
2265{
2266 if (waiting_on_dir.empty())
2267 get(PIN_DIRWAITER);
2268 waiting_on_dir[fg].push_back(c);
2269 dout(10) << "add_dir_waiter frag " << fg << " " << c << " on " << *this << dendl;
2270}
2271
2272void CInode::take_dir_waiting(frag_t fg, list<MDSInternalContextBase*>& ls)
2273{
2274 if (waiting_on_dir.empty())
2275 return;
2276
2277 compact_map<frag_t, list<MDSInternalContextBase*> >::iterator p = waiting_on_dir.find(fg);
2278 if (p != waiting_on_dir.end()) {
2279 dout(10) << "take_dir_waiting frag " << fg << " on " << *this << dendl;
2280 ls.splice(ls.end(), p->second);
2281 waiting_on_dir.erase(p);
2282
2283 if (waiting_on_dir.empty())
2284 put(PIN_DIRWAITER);
2285 }
2286}
2287
2288void CInode::add_waiter(uint64_t tag, MDSInternalContextBase *c)
2289{
2290 dout(10) << "add_waiter tag " << std::hex << tag << std::dec << " " << c
2291 << " !ambig " << !state_test(STATE_AMBIGUOUSAUTH)
2292 << " !frozen " << !is_frozen_inode()
2293 << " !freezing " << !is_freezing_inode()
2294 << dendl;
2295 // wait on the directory?
2296 // make sure its not the inode that is explicitly ambiguous|freezing|frozen
2297 if (((tag & WAIT_SINGLEAUTH) && !state_test(STATE_AMBIGUOUSAUTH)) ||
2298 ((tag & WAIT_UNFREEZE) &&
2299 !is_frozen_inode() && !is_freezing_inode() && !is_frozen_auth_pin())) {
2300 dout(15) << "passing waiter up tree" << dendl;
2301 parent->dir->add_waiter(tag, c);
2302 return;
2303 }
2304 dout(15) << "taking waiter here" << dendl;
2305 MDSCacheObject::add_waiter(tag, c);
2306}
2307
2308void CInode::take_waiting(uint64_t mask, list<MDSInternalContextBase*>& ls)
2309{
2310 if ((mask & WAIT_DIR) && !waiting_on_dir.empty()) {
2311 // take all dentry waiters
2312 while (!waiting_on_dir.empty()) {
2313 compact_map<frag_t, list<MDSInternalContextBase*> >::iterator p = waiting_on_dir.begin();
2314 dout(10) << "take_waiting dirfrag " << p->first << " on " << *this << dendl;
2315 ls.splice(ls.end(), p->second);
2316 waiting_on_dir.erase(p);
2317 }
2318 put(PIN_DIRWAITER);
2319 }
2320
2321 // waiting
2322 MDSCacheObject::take_waiting(mask, ls);
2323}
2324
2325bool CInode::freeze_inode(int auth_pin_allowance)
2326{
2327 assert(auth_pin_allowance > 0); // otherwise we need to adjust parent's nested_auth_pins
2328 assert(auth_pins >= auth_pin_allowance);
2329 if (auth_pins > auth_pin_allowance) {
2330 dout(10) << "freeze_inode - waiting for auth_pins to drop to " << auth_pin_allowance << dendl;
2331 auth_pin_freeze_allowance = auth_pin_allowance;
2332 get(PIN_FREEZING);
2333 state_set(STATE_FREEZING);
2334 return false;
2335 }
2336
2337 dout(10) << "freeze_inode - frozen" << dendl;
2338 assert(auth_pins == auth_pin_allowance);
2339 if (!state_test(STATE_FROZEN)) {
2340 get(PIN_FROZEN);
2341 state_set(STATE_FROZEN);
2342 }
2343 return true;
2344}
2345
2346void CInode::unfreeze_inode(list<MDSInternalContextBase*>& finished)
2347{
2348 dout(10) << "unfreeze_inode" << dendl;
2349 if (state_test(STATE_FREEZING)) {
2350 state_clear(STATE_FREEZING);
2351 put(PIN_FREEZING);
2352 } else if (state_test(STATE_FROZEN)) {
2353 state_clear(STATE_FROZEN);
2354 put(PIN_FROZEN);
2355 } else
2356 ceph_abort();
2357 take_waiting(WAIT_UNFREEZE, finished);
2358}
2359
2360void CInode::unfreeze_inode()
2361{
2362 list<MDSInternalContextBase*> finished;
2363 unfreeze_inode(finished);
2364 mdcache->mds->queue_waiters(finished);
2365}
2366
2367void CInode::freeze_auth_pin()
2368{
2369 assert(state_test(CInode::STATE_FROZEN));
2370 state_set(CInode::STATE_FROZENAUTHPIN);
2371}
2372
2373void CInode::unfreeze_auth_pin()
2374{
2375 assert(state_test(CInode::STATE_FROZENAUTHPIN));
2376 state_clear(CInode::STATE_FROZENAUTHPIN);
2377 if (!state_test(STATE_FREEZING|STATE_FROZEN)) {
2378 list<MDSInternalContextBase*> finished;
2379 take_waiting(WAIT_UNFREEZE, finished);
2380 mdcache->mds->queue_waiters(finished);
2381 }
2382}
2383
2384void CInode::clear_ambiguous_auth(list<MDSInternalContextBase*>& finished)
2385{
2386 assert(state_test(CInode::STATE_AMBIGUOUSAUTH));
2387 state_clear(CInode::STATE_AMBIGUOUSAUTH);
2388 take_waiting(CInode::WAIT_SINGLEAUTH, finished);
2389}
2390
2391void CInode::clear_ambiguous_auth()
2392{
2393 list<MDSInternalContextBase*> finished;
2394 clear_ambiguous_auth(finished);
2395 mdcache->mds->queue_waiters(finished);
2396}
2397
2398// auth_pins
2399bool CInode::can_auth_pin() const {
2400 if (!is_auth() || is_freezing_inode() || is_frozen_inode() || is_frozen_auth_pin())
2401 return false;
2402 if (parent)
2403 return parent->can_auth_pin();
2404 return true;
2405}
2406
2407void CInode::auth_pin(void *by)
2408{
2409 if (auth_pins == 0)
2410 get(PIN_AUTHPIN);
2411 auth_pins++;
2412
2413#ifdef MDS_AUTHPIN_SET
2414 auth_pin_set.insert(by);
2415#endif
2416
2417 dout(10) << "auth_pin by " << by << " on " << *this
2418 << " now " << auth_pins << "+" << nested_auth_pins
2419 << dendl;
2420
2421 if (parent)
2422 parent->adjust_nested_auth_pins(1, 1, this);
2423}
2424
2425void CInode::auth_unpin(void *by)
2426{
2427 auth_pins--;
2428
2429#ifdef MDS_AUTHPIN_SET
2430 assert(auth_pin_set.count(by));
2431 auth_pin_set.erase(auth_pin_set.find(by));
2432#endif
2433
2434 if (auth_pins == 0)
2435 put(PIN_AUTHPIN);
2436
2437 dout(10) << "auth_unpin by " << by << " on " << *this
2438 << " now " << auth_pins << "+" << nested_auth_pins
2439 << dendl;
2440
2441 assert(auth_pins >= 0);
2442
2443 if (parent)
2444 parent->adjust_nested_auth_pins(-1, -1, by);
2445
2446 if (is_freezing_inode() &&
2447 auth_pins == auth_pin_freeze_allowance) {
2448 dout(10) << "auth_unpin freezing!" << dendl;
2449 get(PIN_FROZEN);
2450 put(PIN_FREEZING);
2451 state_clear(STATE_FREEZING);
2452 state_set(STATE_FROZEN);
2453 finish_waiting(WAIT_FROZEN);
2454 }
2455}
2456
2457void CInode::adjust_nested_auth_pins(int a, void *by)
2458{
2459 assert(a);
2460 nested_auth_pins += a;
2461 dout(35) << "adjust_nested_auth_pins by " << by
2462 << " change " << a << " yields "
2463 << auth_pins << "+" << nested_auth_pins << dendl;
2464 assert(nested_auth_pins >= 0);
2465
2466 if (g_conf->mds_debug_auth_pins) {
2467 // audit
2468 int s = 0;
2469 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
2470 p != dirfrags.end();
2471 ++p) {
2472 CDir *dir = p->second;
2473 if (!dir->is_subtree_root() && dir->get_cum_auth_pins())
2474 s++;
2475 }
2476 assert(s == nested_auth_pins);
2477 }
2478
2479 if (parent)
2480 parent->adjust_nested_auth_pins(a, 0, by);
2481}
2482
2483
2484// authority
2485
2486mds_authority_t CInode::authority() const
2487{
2488 if (inode_auth.first >= 0)
2489 return inode_auth;
2490
2491 if (parent)
2492 return parent->dir->authority();
2493
2494 // new items that are not yet linked in (in the committed plane) belong
2495 // to their first parent.
2496 if (!projected_parent.empty())
2497 return projected_parent.front()->dir->authority();
2498
2499 return CDIR_AUTH_UNDEF;
2500}
2501
2502
2503// SNAP
2504
2505snapid_t CInode::get_oldest_snap()
2506{
2507 snapid_t t = first;
2508 if (!old_inodes.empty())
2509 t = old_inodes.begin()->second.first;
2510 return MIN(t, oldest_snap);
2511}
2512
2513old_inode_t& CInode::cow_old_inode(snapid_t follows, bool cow_head)
2514{
2515 assert(follows >= first);
2516
2517 inode_t *pi = cow_head ? get_projected_inode() : get_previous_projected_inode();
2518 map<string,bufferptr> *px = cow_head ? get_projected_xattrs() : get_previous_projected_xattrs();
2519
2520 old_inode_t &old = old_inodes[follows];
2521 old.first = first;
2522 old.inode = *pi;
2523 old.xattrs = *px;
2524
2525 if (first < oldest_snap)
2526 oldest_snap = first;
2527
2528 dout(10) << " " << px->size() << " xattrs cowed, " << *px << dendl;
2529
2530 old.inode.trim_client_ranges(follows);
2531
2532 if (g_conf->mds_snap_rstat &&
2533 !(old.inode.rstat == old.inode.accounted_rstat))
2534 dirty_old_rstats.insert(follows);
2535
2536 first = follows+1;
2537
2538 dout(10) << "cow_old_inode " << (cow_head ? "head" : "previous_head" )
2539 << " to [" << old.first << "," << follows << "] on "
2540 << *this << dendl;
2541
2542 return old;
2543}
2544
2545void CInode::split_old_inode(snapid_t snap)
2546{
2547 compact_map<snapid_t, old_inode_t>::iterator p = old_inodes.lower_bound(snap);
2548 assert(p != old_inodes.end() && p->second.first < snap);
2549
2550 old_inode_t &old = old_inodes[snap - 1];
2551 old = p->second;
2552
2553 p->second.first = snap;
2554 dout(10) << "split_old_inode " << "[" << old.first << "," << p->first
2555 << "] to [" << snap << "," << p->first << "] on " << *this << dendl;
2556}
2557
2558void CInode::pre_cow_old_inode()
2559{
2560 snapid_t follows = find_snaprealm()->get_newest_seq();
2561 if (first <= follows)
2562 cow_old_inode(follows, true);
2563}
2564
2565void CInode::purge_stale_snap_data(const set<snapid_t>& snaps)
2566{
2567 dout(10) << "purge_stale_snap_data " << snaps << dendl;
2568
2569 if (old_inodes.empty())
2570 return;
2571
2572 compact_map<snapid_t,old_inode_t>::iterator p = old_inodes.begin();
2573 while (p != old_inodes.end()) {
2574 set<snapid_t>::const_iterator q = snaps.lower_bound(p->second.first);
2575 if (q == snaps.end() || *q > p->first) {
2576 dout(10) << " purging old_inode [" << p->second.first << "," << p->first << "]" << dendl;
2577 old_inodes.erase(p++);
2578 } else
2579 ++p;
2580 }
2581}
2582
2583/*
2584 * pick/create an old_inode
2585 */
2586old_inode_t * CInode::pick_old_inode(snapid_t snap)
2587{
2588 compact_map<snapid_t, old_inode_t>::iterator p = old_inodes.lower_bound(snap); // p is first key >= to snap
2589 if (p != old_inodes.end() && p->second.first <= snap) {
2590 dout(10) << "pick_old_inode snap " << snap << " -> [" << p->second.first << "," << p->first << "]" << dendl;
2591 return &p->second;
2592 }
2593 dout(10) << "pick_old_inode snap " << snap << " -> nothing" << dendl;
2594 return NULL;
2595}
2596
2597void CInode::open_snaprealm(bool nosplit)
2598{
2599 if (!snaprealm) {
2600 SnapRealm *parent = find_snaprealm();
2601 snaprealm = new SnapRealm(mdcache, this);
2602 if (parent) {
2603 dout(10) << "open_snaprealm " << snaprealm
2604 << " parent is " << parent
2605 << dendl;
2606 dout(30) << " siblings are " << parent->open_children << dendl;
2607 snaprealm->parent = parent;
2608 if (!nosplit)
2609 parent->split_at(snaprealm);
2610 parent->open_children.insert(snaprealm);
2611 }
2612 }
2613}
2614void CInode::close_snaprealm(bool nojoin)
2615{
2616 if (snaprealm) {
2617 dout(15) << "close_snaprealm " << *snaprealm << dendl;
2618 snaprealm->close_parents();
2619 if (snaprealm->parent) {
2620 snaprealm->parent->open_children.erase(snaprealm);
2621 //if (!nojoin)
2622 //snaprealm->parent->join(snaprealm);
2623 }
2624 delete snaprealm;
2625 snaprealm = 0;
2626 }
2627}
2628
2629SnapRealm *CInode::find_snaprealm() const
2630{
2631 const CInode *cur = this;
2632 while (!cur->snaprealm) {
2633 if (cur->get_parent_dn())
2634 cur = cur->get_parent_dn()->get_dir()->get_inode();
2635 else if (get_projected_parent_dn())
2636 cur = cur->get_projected_parent_dn()->get_dir()->get_inode();
2637 else
2638 break;
2639 }
2640 return cur->snaprealm;
2641}
2642
2643void CInode::encode_snap_blob(bufferlist &snapbl)
2644{
2645 if (snaprealm) {
2646 ::encode(snaprealm->srnode, snapbl);
2647 dout(20) << "encode_snap_blob " << *snaprealm << dendl;
2648 }
2649}
2650void CInode::decode_snap_blob(bufferlist& snapbl)
2651{
2652 if (snapbl.length()) {
2653 open_snaprealm();
2654 bufferlist::iterator p = snapbl.begin();
2655 ::decode(snaprealm->srnode, p);
2656 if (is_base()) {
2657 bool ok = snaprealm->_open_parents(NULL);
2658 assert(ok);
2659 }
2660 dout(20) << "decode_snap_blob " << *snaprealm << dendl;
2661 }
2662}
2663
2664void CInode::encode_snap(bufferlist& bl)
2665{
2666 bufferlist snapbl;
2667 encode_snap_blob(snapbl);
2668 ::encode(snapbl, bl);
2669 ::encode(oldest_snap, bl);
2670}
2671
2672void CInode::decode_snap(bufferlist::iterator& p)
2673{
2674 bufferlist snapbl;
2675 ::decode(snapbl, p);
2676 ::decode(oldest_snap, p);
2677 decode_snap_blob(snapbl);
2678}
2679
2680// =============================================
2681
2682client_t CInode::calc_ideal_loner()
2683{
2684 if (mdcache->is_readonly())
2685 return -1;
2686 if (!mds_caps_wanted.empty())
2687 return -1;
2688
2689 int n = 0;
2690 client_t loner = -1;
2691 for (map<client_t,Capability*>::iterator it = client_caps.begin();
2692 it != client_caps.end();
2693 ++it)
2694 if (!it->second->is_stale() &&
2695 ((it->second->wanted() & (CEPH_CAP_ANY_WR|CEPH_CAP_FILE_WR|CEPH_CAP_FILE_RD)) ||
2696 (inode.is_dir() && !has_subtree_root_dirfrag()))) {
2697 if (n)
2698 return -1;
2699 n++;
2700 loner = it->first;
2701 }
2702 return loner;
2703}
2704
2705client_t CInode::choose_ideal_loner()
2706{
2707 want_loner_cap = calc_ideal_loner();
2708 return want_loner_cap;
2709}
2710
2711bool CInode::try_set_loner()
2712{
2713 assert(want_loner_cap >= 0);
2714 if (loner_cap >= 0 && loner_cap != want_loner_cap)
2715 return false;
2716 set_loner_cap(want_loner_cap);
2717 return true;
2718}
2719
2720void CInode::set_loner_cap(client_t l)
2721{
2722 loner_cap = l;
2723 authlock.set_excl_client(loner_cap);
2724 filelock.set_excl_client(loner_cap);
2725 linklock.set_excl_client(loner_cap);
2726 xattrlock.set_excl_client(loner_cap);
2727}
2728
2729bool CInode::try_drop_loner()
2730{
2731 if (loner_cap < 0)
2732 return true;
2733
2734 int other_allowed = get_caps_allowed_by_type(CAP_ANY);
2735 Capability *cap = get_client_cap(loner_cap);
2736 if (!cap ||
2737 (cap->issued() & ~other_allowed) == 0) {
2738 set_loner_cap(-1);
2739 return true;
2740 }
2741 return false;
2742}
2743
2744
2745// choose new lock state during recovery, based on issued caps
2746void CInode::choose_lock_state(SimpleLock *lock, int allissued)
2747{
2748 int shift = lock->get_cap_shift();
2749 int issued = (allissued >> shift) & lock->get_cap_mask();
2750 if (is_auth()) {
2751 if (lock->is_xlocked()) {
2752 // do nothing here
2753 } else if (lock->get_state() != LOCK_MIX) {
2754 if (issued & (CEPH_CAP_GEXCL | CEPH_CAP_GBUFFER))
2755 lock->set_state(LOCK_EXCL);
2756 else if (issued & CEPH_CAP_GWR)
2757 lock->set_state(LOCK_MIX);
2758 else if (lock->is_dirty()) {
2759 if (is_replicated())
2760 lock->set_state(LOCK_MIX);
2761 else
2762 lock->set_state(LOCK_LOCK);
2763 } else
2764 lock->set_state(LOCK_SYNC);
2765 }
2766 } else {
2767 // our states have already been chosen during rejoin.
2768 if (lock->is_xlocked())
2769 assert(lock->get_state() == LOCK_LOCK);
2770 }
2771}
2772
2773void CInode::choose_lock_states(int dirty_caps)
2774{
2775 int issued = get_caps_issued() | dirty_caps;
2776 if (is_auth() && (issued & (CEPH_CAP_ANY_EXCL|CEPH_CAP_ANY_WR)) &&
2777 choose_ideal_loner() >= 0)
2778 try_set_loner();
2779 choose_lock_state(&filelock, issued);
2780 choose_lock_state(&nestlock, issued);
2781 choose_lock_state(&dirfragtreelock, issued);
2782 choose_lock_state(&authlock, issued);
2783 choose_lock_state(&xattrlock, issued);
2784 choose_lock_state(&linklock, issued);
2785}
2786
2787Capability *CInode::add_client_cap(client_t client, Session *session, SnapRealm *conrealm)
2788{
2789 if (client_caps.empty()) {
2790 get(PIN_CAPS);
2791 if (conrealm)
2792 containing_realm = conrealm;
2793 else
2794 containing_realm = find_snaprealm();
2795 containing_realm->inodes_with_caps.push_back(&item_caps);
2796 dout(10) << "add_client_cap first cap, joining realm " << *containing_realm << dendl;
2797 }
2798
2799 if (client_caps.empty())
2800 mdcache->num_inodes_with_caps++;
2801
2802 Capability *cap = new Capability(this, ++mdcache->last_cap_id, client);
2803 assert(client_caps.count(client) == 0);
2804 client_caps[client] = cap;
2805
2806 session->add_cap(cap);
2807 if (session->is_stale())
2808 cap->mark_stale();
2809
2810 cap->client_follows = first-1;
2811
2812 containing_realm->add_cap(client, cap);
2813
2814 return cap;
2815}
2816
2817void CInode::remove_client_cap(client_t client)
2818{
2819 assert(client_caps.count(client) == 1);
2820 Capability *cap = client_caps[client];
2821
2822 cap->item_session_caps.remove_myself();
2823 cap->item_revoking_caps.remove_myself();
2824 cap->item_client_revoking_caps.remove_myself();
2825 containing_realm->remove_cap(client, cap);
2826
2827 if (client == loner_cap)
2828 loner_cap = -1;
2829
2830 delete cap;
2831 client_caps.erase(client);
2832 if (client_caps.empty()) {
2833 dout(10) << "remove_client_cap last cap, leaving realm " << *containing_realm << dendl;
2834 put(PIN_CAPS);
2835 item_caps.remove_myself();
2836 containing_realm = NULL;
2837 item_open_file.remove_myself(); // unpin logsegment
2838 mdcache->num_inodes_with_caps--;
2839 }
2840
2841 //clean up advisory locks
2842 bool fcntl_removed = fcntl_locks ? fcntl_locks->remove_all_from(client) : false;
2843 bool flock_removed = flock_locks ? flock_locks->remove_all_from(client) : false;
2844 if (fcntl_removed || flock_removed) {
2845 list<MDSInternalContextBase*> waiters;
2846 take_waiting(CInode::WAIT_FLOCK, waiters);
2847 mdcache->mds->queue_waiters(waiters);
2848 }
2849}
2850
2851void CInode::move_to_realm(SnapRealm *realm)
2852{
2853 dout(10) << "move_to_realm joining realm " << *realm
2854 << ", leaving realm " << *containing_realm << dendl;
2855 for (map<client_t,Capability*>::iterator q = client_caps.begin();
2856 q != client_caps.end();
2857 ++q) {
2858 containing_realm->remove_cap(q->first, q->second);
2859 realm->add_cap(q->first, q->second);
2860 }
2861 item_caps.remove_myself();
2862 realm->inodes_with_caps.push_back(&item_caps);
2863 containing_realm = realm;
2864}
2865
2866Capability *CInode::reconnect_cap(client_t client, const cap_reconnect_t& icr, Session *session)
2867{
2868 Capability *cap = get_client_cap(client);
2869 if (cap) {
2870 // FIXME?
2871 cap->merge(icr.capinfo.wanted, icr.capinfo.issued);
2872 } else {
2873 cap = add_client_cap(client, session);
2874 cap->set_cap_id(icr.capinfo.cap_id);
2875 cap->set_wanted(icr.capinfo.wanted);
2876 cap->issue_norevoke(icr.capinfo.issued);
2877 cap->reset_seq();
2878 }
2879 cap->set_last_issue_stamp(ceph_clock_now());
2880 return cap;
2881}
2882
2883void CInode::clear_client_caps_after_export()
2884{
2885 while (!client_caps.empty())
2886 remove_client_cap(client_caps.begin()->first);
2887 loner_cap = -1;
2888 want_loner_cap = -1;
2889 mds_caps_wanted.clear();
2890}
2891
2892void CInode::export_client_caps(map<client_t,Capability::Export>& cl)
2893{
2894 for (map<client_t,Capability*>::iterator it = client_caps.begin();
2895 it != client_caps.end();
2896 ++it) {
2897 cl[it->first] = it->second->make_export();
2898 }
2899}
2900
2901 // caps allowed
2902int CInode::get_caps_liked() const
2903{
2904 if (is_dir())
2905 return CEPH_CAP_PIN | CEPH_CAP_ANY_EXCL | CEPH_CAP_ANY_SHARED; // but not, say, FILE_RD|WR|WRBUFFER
2906 else
2907 return CEPH_CAP_ANY & ~CEPH_CAP_FILE_LAZYIO;
2908}
2909
2910int CInode::get_caps_allowed_ever() const
2911{
2912 int allowed;
2913 if (is_dir())
2914 allowed = CEPH_CAP_PIN | CEPH_CAP_ANY_EXCL | CEPH_CAP_ANY_SHARED;
2915 else
2916 allowed = CEPH_CAP_ANY;
2917 return allowed &
2918 (CEPH_CAP_PIN |
2919 (filelock.gcaps_allowed_ever() << filelock.get_cap_shift()) |
2920 (authlock.gcaps_allowed_ever() << authlock.get_cap_shift()) |
2921 (xattrlock.gcaps_allowed_ever() << xattrlock.get_cap_shift()) |
2922 (linklock.gcaps_allowed_ever() << linklock.get_cap_shift()));
2923}
2924
2925int CInode::get_caps_allowed_by_type(int type) const
2926{
2927 return
2928 CEPH_CAP_PIN |
2929 (filelock.gcaps_allowed(type) << filelock.get_cap_shift()) |
2930 (authlock.gcaps_allowed(type) << authlock.get_cap_shift()) |
2931 (xattrlock.gcaps_allowed(type) << xattrlock.get_cap_shift()) |
2932 (linklock.gcaps_allowed(type) << linklock.get_cap_shift());
2933}
2934
2935int CInode::get_caps_careful() const
2936{
2937 return
2938 (filelock.gcaps_careful() << filelock.get_cap_shift()) |
2939 (authlock.gcaps_careful() << authlock.get_cap_shift()) |
2940 (xattrlock.gcaps_careful() << xattrlock.get_cap_shift()) |
2941 (linklock.gcaps_careful() << linklock.get_cap_shift());
2942}
2943
2944int CInode::get_xlocker_mask(client_t client) const
2945{
2946 return
2947 (filelock.gcaps_xlocker_mask(client) << filelock.get_cap_shift()) |
2948 (authlock.gcaps_xlocker_mask(client) << authlock.get_cap_shift()) |
2949 (xattrlock.gcaps_xlocker_mask(client) << xattrlock.get_cap_shift()) |
2950 (linklock.gcaps_xlocker_mask(client) << linklock.get_cap_shift());
2951}
2952
2953int CInode::get_caps_allowed_for_client(Session *session, inode_t *file_i) const
2954{
2955 client_t client = session->info.inst.name.num();
2956 int allowed;
2957 if (client == get_loner()) {
2958 // as the loner, we get the loner_caps AND any xlocker_caps for things we have xlocked
2959 allowed =
2960 get_caps_allowed_by_type(CAP_LONER) |
2961 (get_caps_allowed_by_type(CAP_XLOCKER) & get_xlocker_mask(client));
2962 } else {
2963 allowed = get_caps_allowed_by_type(CAP_ANY);
2964 }
2965
2966 if (!is_dir()) {
2967 if ((file_i->inline_data.version != CEPH_INLINE_NONE &&
2968 !session->connection->has_feature(CEPH_FEATURE_MDS_INLINE_DATA)) ||
2969 (!file_i->layout.pool_ns.empty() &&
2970 !session->connection->has_feature(CEPH_FEATURE_FS_FILE_LAYOUT_V2)))
2971 allowed &= ~(CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR);
2972 }
2973 return allowed;
2974}
2975
2976// caps issued, wanted
2977int CInode::get_caps_issued(int *ploner, int *pother, int *pxlocker,
2978 int shift, int mask)
2979{
2980 int c = 0;
2981 int loner = 0, other = 0, xlocker = 0;
2982 if (!is_auth()) {
2983 loner_cap = -1;
2984 }
2985
2986 for (map<client_t,Capability*>::const_iterator it = client_caps.begin();
2987 it != client_caps.end();
2988 ++it) {
2989 int i = it->second->issued();
2990 c |= i;
2991 if (it->first == loner_cap)
2992 loner |= i;
2993 else
2994 other |= i;
2995 xlocker |= get_xlocker_mask(it->first) & i;
2996 }
2997 if (ploner) *ploner = (loner >> shift) & mask;
2998 if (pother) *pother = (other >> shift) & mask;
2999 if (pxlocker) *pxlocker = (xlocker >> shift) & mask;
3000 return (c >> shift) & mask;
3001}
3002
3003bool CInode::is_any_caps_wanted() const
3004{
3005 for (map<client_t,Capability*>::const_iterator it = client_caps.begin();
3006 it != client_caps.end();
3007 ++it)
3008 if (it->second->wanted())
3009 return true;
3010 return false;
3011}
3012
3013int CInode::get_caps_wanted(int *ploner, int *pother, int shift, int mask) const
3014{
3015 int w = 0;
3016 int loner = 0, other = 0;
3017 for (map<client_t,Capability*>::const_iterator it = client_caps.begin();
3018 it != client_caps.end();
3019 ++it) {
3020 if (!it->second->is_stale()) {
3021 int t = it->second->wanted();
3022 w |= t;
3023 if (it->first == loner_cap)
3024 loner |= t;
3025 else
3026 other |= t;
3027 }
3028 //cout << " get_caps_wanted client " << it->first << " " << cap_string(it->second.wanted()) << endl;
3029 }
3030 if (is_auth())
3031 for (compact_map<int,int>::const_iterator it = mds_caps_wanted.begin();
3032 it != mds_caps_wanted.end();
3033 ++it) {
3034 w |= it->second;
3035 other |= it->second;
3036 //cout << " get_caps_wanted mds " << it->first << " " << cap_string(it->second) << endl;
3037 }
3038 if (ploner) *ploner = (loner >> shift) & mask;
3039 if (pother) *pother = (other >> shift) & mask;
3040 return (w >> shift) & mask;
3041}
3042
3043bool CInode::issued_caps_need_gather(SimpleLock *lock)
3044{
3045 int loner_issued, other_issued, xlocker_issued;
3046 get_caps_issued(&loner_issued, &other_issued, &xlocker_issued,
3047 lock->get_cap_shift(), lock->get_cap_mask());
3048 if ((loner_issued & ~lock->gcaps_allowed(CAP_LONER)) ||
3049 (other_issued & ~lock->gcaps_allowed(CAP_ANY)) ||
3050 (xlocker_issued & ~lock->gcaps_allowed(CAP_XLOCKER)))
3051 return true;
3052 return false;
3053}
3054
3055void CInode::replicate_relax_locks()
3056{
3057 //dout(10) << " relaxing locks on " << *this << dendl;
3058 assert(is_auth());
3059 assert(!is_replicated());
3060
3061 authlock.replicate_relax();
3062 linklock.replicate_relax();
3063 dirfragtreelock.replicate_relax();
3064 filelock.replicate_relax();
3065 xattrlock.replicate_relax();
3066 snaplock.replicate_relax();
3067 nestlock.replicate_relax();
3068 flocklock.replicate_relax();
3069 policylock.replicate_relax();
3070}
3071
3072
3073
3074// =============================================
3075
3076int CInode::encode_inodestat(bufferlist& bl, Session *session,
3077 SnapRealm *dir_realm,
3078 snapid_t snapid,
3079 unsigned max_bytes,
3080 int getattr_caps)
3081{
3082 int client = session->info.inst.name.num();
3083 assert(snapid);
3084 assert(session->connection);
3085
3086 bool valid = true;
3087
3088 // pick a version!
3089 inode_t *oi = &inode;
3090 inode_t *pi = get_projected_inode();
3091
3092 map<string, bufferptr> *pxattrs = 0;
3093
3094 if (snapid != CEPH_NOSNAP) {
3095
3096 // for now at least, old_inodes is only defined/valid on the auth
3097 if (!is_auth())
3098 valid = false;
3099
3100 if (is_multiversion()) {
3101 compact_map<snapid_t,old_inode_t>::iterator p = old_inodes.lower_bound(snapid);
3102 if (p != old_inodes.end()) {
3103 if (p->second.first > snapid) {
3104 if (p != old_inodes.begin())
3105 --p;
3106 }
3107 if (p->second.first <= snapid && snapid <= p->first) {
3108 dout(15) << "encode_inodestat snapid " << snapid
3109 << " to old_inode [" << p->second.first << "," << p->first << "]"
3110 << " " << p->second.inode.rstat
3111 << dendl;
3112 pi = oi = &p->second.inode;
3113 pxattrs = &p->second.xattrs;
3114 } else {
3115 // snapshoted remote dentry can result this
3116 dout(0) << "encode_inodestat old_inode for snapid " << snapid
3117 << " not found" << dendl;
3118 }
3119 }
3120 } else if (snapid < first || snapid > last) {
3121 // snapshoted remote dentry can result this
3122 dout(0) << "encode_inodestat [" << first << "," << last << "]"
3123 << " not match snapid " << snapid << dendl;
3124 }
3125 }
3126
3127 SnapRealm *realm = find_snaprealm();
3128
3129 bool no_caps = !valid ||
3130 session->is_stale() ||
3131 (dir_realm && realm != dir_realm) ||
3132 is_frozen() ||
3133 state_test(CInode::STATE_EXPORTINGCAPS);
3134 if (no_caps)
3135 dout(20) << "encode_inodestat no caps"
3136 << (!valid?", !valid":"")
3137 << (session->is_stale()?", session stale ":"")
3138 << ((dir_realm && realm != dir_realm)?", snaprealm differs ":"")
3139 << (is_frozen()?", frozen inode":"")
3140 << (state_test(CInode::STATE_EXPORTINGCAPS)?", exporting caps":"")
3141 << dendl;
3142
3143
3144 // "fake" a version that is old (stable) version, +1 if projected.
3145 version_t version = (oi->version * 2) + is_projected();
3146
3147 Capability *cap = get_client_cap(client);
3148 bool pfile = filelock.is_xlocked_by_client(client) || get_loner() == client;
3149 //(cap && (cap->issued() & CEPH_CAP_FILE_EXCL));
3150 bool pauth = authlock.is_xlocked_by_client(client) || get_loner() == client;
3151 bool plink = linklock.is_xlocked_by_client(client) || get_loner() == client;
3152 bool pxattr = xattrlock.is_xlocked_by_client(client) || get_loner() == client;
3153
3154 bool plocal = versionlock.get_last_wrlock_client() == client;
3155 bool ppolicy = policylock.is_xlocked_by_client(client) || get_loner()==client;
3156
3157 inode_t *any_i = (pfile|pauth|plink|pxattr|plocal) ? pi : oi;
3158
3159 dout(20) << " pfile " << pfile << " pauth " << pauth
3160 << " plink " << plink << " pxattr " << pxattr
3161 << " plocal " << plocal
3162 << " ctime " << any_i->ctime
3163 << " valid=" << valid << dendl;
3164
3165 // file
3166 inode_t *file_i = pfile ? pi:oi;
3167 file_layout_t layout;
3168 if (is_dir()) {
3169 layout = (ppolicy ? pi : oi)->layout;
3170 } else {
3171 layout = file_i->layout;
3172 }
3173
3174 // max_size is min of projected, actual
3175 uint64_t max_size =
3176 MIN(oi->client_ranges.count(client) ?
3177 oi->client_ranges[client].range.last : 0,
3178 pi->client_ranges.count(client) ?
3179 pi->client_ranges[client].range.last : 0);
3180
3181 // inline data
3182 version_t inline_version = 0;
3183 bufferlist inline_data;
3184 if (file_i->inline_data.version == CEPH_INLINE_NONE) {
3185 inline_version = CEPH_INLINE_NONE;
3186 } else if ((!cap && !no_caps) ||
3187 (cap && cap->client_inline_version < file_i->inline_data.version) ||
3188 (getattr_caps & CEPH_CAP_FILE_RD)) { // client requests inline data
3189 inline_version = file_i->inline_data.version;
3190 if (file_i->inline_data.length() > 0)
3191 inline_data = file_i->inline_data.get_data();
3192 }
3193
3194 // nest (do same as file... :/)
3195 if (cap) {
3196 cap->last_rbytes = file_i->rstat.rbytes;
3197 cap->last_rsize = file_i->rstat.rsize();
3198 }
3199
3200 // auth
3201 inode_t *auth_i = pauth ? pi:oi;
3202
3203 // link
3204 inode_t *link_i = plink ? pi:oi;
3205
3206 // xattr
3207 inode_t *xattr_i = pxattr ? pi:oi;
3208
3209 // xattr
3210 bufferlist xbl;
3211 version_t xattr_version;
3212 if ((!cap && !no_caps) ||
3213 (cap && cap->client_xattr_version < xattr_i->xattr_version) ||
3214 (getattr_caps & CEPH_CAP_XATTR_SHARED)) { // client requests xattrs
3215 if (!pxattrs)
3216 pxattrs = pxattr ? get_projected_xattrs() : &xattrs;
3217 ::encode(*pxattrs, xbl);
3218 xattr_version = xattr_i->xattr_version;
3219 } else {
3220 xattr_version = 0;
3221 }
3222
3223 // do we have room?
3224 if (max_bytes) {
3225 unsigned bytes = 8 + 8 + 4 + 8 + 8 + sizeof(ceph_mds_reply_cap) +
3226 sizeof(struct ceph_file_layout) + 4 + layout.pool_ns.size() +
3227 sizeof(struct ceph_timespec) * 3 +
3228 4 + 8 + 8 + 8 + 4 + 4 + 4 + 4 + 4 +
3229 8 + 8 + 8 + 8 + 8 + sizeof(struct ceph_timespec) +
3230 4;
3231 bytes += sizeof(__u32);
3232 bytes += (sizeof(__u32) + sizeof(__u32)) * dirfragtree._splits.size();
3233 bytes += sizeof(__u32) + symlink.length();
3234 bytes += sizeof(__u32) + xbl.length();
3235 bytes += sizeof(version_t) + sizeof(__u32) + inline_data.length();
3236 if (bytes > max_bytes)
3237 return -ENOSPC;
3238 }
3239
3240
3241 // encode caps
3242 struct ceph_mds_reply_cap ecap;
3243 if (snapid != CEPH_NOSNAP) {
3244 /*
3245 * snapped inodes (files or dirs) only get read-only caps. always
3246 * issue everything possible, since it is read only.
3247 *
3248 * if a snapped inode has caps, limit issued caps based on the
3249 * lock state.
3250 *
3251 * if it is a live inode, limit issued caps based on the lock
3252 * state.
3253 *
3254 * do NOT adjust cap issued state, because the client always
3255 * tracks caps per-snap and the mds does either per-interval or
3256 * multiversion.
3257 */
3258 ecap.caps = valid ? get_caps_allowed_by_type(CAP_ANY) : CEPH_STAT_CAP_INODE;
3259 if (last == CEPH_NOSNAP || is_any_caps())
3260 ecap.caps = ecap.caps & get_caps_allowed_for_client(session, file_i);
3261 ecap.seq = 0;
3262 ecap.mseq = 0;
3263 ecap.realm = 0;
3264 } else {
3265 if (!no_caps && !cap) {
3266 // add a new cap
3267 cap = add_client_cap(client, session, realm);
3268 if (is_auth()) {
3269 if (choose_ideal_loner() >= 0)
3270 try_set_loner();
3271 else if (get_wanted_loner() < 0)
3272 try_drop_loner();
3273 }
3274 }
3275
3276 int issue = 0;
3277 if (!no_caps && cap) {
3278 int likes = get_caps_liked();
3279 int allowed = get_caps_allowed_for_client(session, file_i);
3280 issue = (cap->wanted() | likes) & allowed;
3281 cap->issue_norevoke(issue);
3282 issue = cap->pending();
3283 dout(10) << "encode_inodestat issuing " << ccap_string(issue)
3284 << " seq " << cap->get_last_seq() << dendl;
3285 } else if (cap && cap->is_new() && !dir_realm) {
3286 // alway issue new caps to client, otherwise the caps get lost
3287 assert(cap->is_stale());
3288 issue = cap->pending() | CEPH_CAP_PIN;
3289 cap->issue_norevoke(issue);
3290 dout(10) << "encode_inodestat issuing " << ccap_string(issue)
3291 << " seq " << cap->get_last_seq()
3292 << "(stale|new caps)" << dendl;
3293 }
3294
3295 if (issue) {
3296 cap->set_last_issue();
3297 cap->set_last_issue_stamp(ceph_clock_now());
3298 cap->clear_new();
3299 ecap.caps = issue;
3300 ecap.wanted = cap->wanted();
3301 ecap.cap_id = cap->get_cap_id();
3302 ecap.seq = cap->get_last_seq();
3303 ecap.mseq = cap->get_mseq();
3304 ecap.realm = realm->inode->ino();
3305 } else {
3306 ecap.cap_id = 0;
3307 ecap.caps = 0;
3308 ecap.seq = 0;
3309 ecap.mseq = 0;
3310 ecap.realm = 0;
3311 ecap.wanted = 0;
3312 }
3313 }
3314 ecap.flags = is_auth() ? CEPH_CAP_FLAG_AUTH : 0;
3315 dout(10) << "encode_inodestat caps " << ccap_string(ecap.caps)
3316 << " seq " << ecap.seq << " mseq " << ecap.mseq
3317 << " xattrv " << xattr_version << " len " << xbl.length()
3318 << dendl;
3319
3320 if (inline_data.length() && cap) {
3321 if ((cap->pending() | getattr_caps) & CEPH_CAP_FILE_SHARED) {
3322 dout(10) << "including inline version " << inline_version << dendl;
3323 cap->client_inline_version = inline_version;
3324 } else {
3325 dout(10) << "dropping inline version " << inline_version << dendl;
3326 inline_version = 0;
3327 inline_data.clear();
3328 }
3329 }
3330
3331 // include those xattrs?
3332 if (xbl.length() && cap) {
3333 if ((cap->pending() | getattr_caps) & CEPH_CAP_XATTR_SHARED) {
3334 dout(10) << "including xattrs version " << xattr_i->xattr_version << dendl;
3335 cap->client_xattr_version = xattr_i->xattr_version;
3336 } else {
3337 dout(10) << "dropping xattrs version " << xattr_i->xattr_version << dendl;
3338 xbl.clear(); // no xattrs .. XXX what's this about?!?
3339 xattr_version = 0;
3340 }
3341 }
3342
3343 /*
3344 * note: encoding matches MClientReply::InodeStat
3345 */
3346 ::encode(oi->ino, bl);
3347 ::encode(snapid, bl);
3348 ::encode(oi->rdev, bl);
3349 ::encode(version, bl);
3350
3351 ::encode(xattr_version, bl);
3352
3353 ::encode(ecap, bl);
3354 {
3355 ceph_file_layout legacy_layout;
3356 layout.to_legacy(&legacy_layout);
3357 ::encode(legacy_layout, bl);
3358 }
3359 ::encode(any_i->ctime, bl);
3360 ::encode(file_i->mtime, bl);
3361 ::encode(file_i->atime, bl);
3362 ::encode(file_i->time_warp_seq, bl);
3363 ::encode(file_i->size, bl);
3364 ::encode(max_size, bl);
3365 ::encode(file_i->truncate_size, bl);
3366 ::encode(file_i->truncate_seq, bl);
3367
3368 ::encode(auth_i->mode, bl);
3369 ::encode((uint32_t)auth_i->uid, bl);
3370 ::encode((uint32_t)auth_i->gid, bl);
3371
3372 ::encode(link_i->nlink, bl);
3373
3374 ::encode(file_i->dirstat.nfiles, bl);
3375 ::encode(file_i->dirstat.nsubdirs, bl);
3376 ::encode(file_i->rstat.rbytes, bl);
3377 ::encode(file_i->rstat.rfiles, bl);
3378 ::encode(file_i->rstat.rsubdirs, bl);
3379 ::encode(file_i->rstat.rctime, bl);
3380
3381 dirfragtree.encode(bl);
3382
3383 ::encode(symlink, bl);
3384 if (session->connection->has_feature(CEPH_FEATURE_DIRLAYOUTHASH)) {
3385 ::encode(file_i->dir_layout, bl);
3386 }
3387 ::encode(xbl, bl);
3388 if (session->connection->has_feature(CEPH_FEATURE_MDS_INLINE_DATA)) {
3389 ::encode(inline_version, bl);
3390 ::encode(inline_data, bl);
3391 }
3392 if (session->connection->has_feature(CEPH_FEATURE_MDS_QUOTA)) {
3393 inode_t *policy_i = ppolicy ? pi : oi;
3394 ::encode(policy_i->quota, bl);
3395 }
3396 if (session->connection->has_feature(CEPH_FEATURE_FS_FILE_LAYOUT_V2)) {
3397 ::encode(layout.pool_ns, bl);
3398 }
3399 if (session->connection->has_feature(CEPH_FEATURE_FS_BTIME)) {
3400 ::encode(any_i->btime, bl);
3401 ::encode(any_i->change_attr, bl);
3402 }
3403
3404 return valid;
3405}
3406
3407void CInode::encode_cap_message(MClientCaps *m, Capability *cap)
3408{
3409 assert(cap);
3410
3411 client_t client = cap->get_client();
3412
3413 bool pfile = filelock.is_xlocked_by_client(client) || (cap->issued() & CEPH_CAP_FILE_EXCL);
3414 bool pauth = authlock.is_xlocked_by_client(client);
3415 bool plink = linklock.is_xlocked_by_client(client);
3416 bool pxattr = xattrlock.is_xlocked_by_client(client);
3417
3418 inode_t *oi = &inode;
3419 inode_t *pi = get_projected_inode();
3420 inode_t *i = (pfile|pauth|plink|pxattr) ? pi : oi;
3421
3422 dout(20) << "encode_cap_message pfile " << pfile
3423 << " pauth " << pauth << " plink " << plink << " pxattr " << pxattr
3424 << " ctime " << i->ctime << dendl;
3425
3426 i = pfile ? pi:oi;
3427 m->set_layout(i->layout);
3428 m->size = i->size;
3429 m->truncate_seq = i->truncate_seq;
3430 m->truncate_size = i->truncate_size;
3431 m->mtime = i->mtime;
3432 m->atime = i->atime;
3433 m->ctime = i->ctime;
3434 m->change_attr = i->change_attr;
3435 m->time_warp_seq = i->time_warp_seq;
3436
3437 if (cap->client_inline_version < i->inline_data.version) {
3438 m->inline_version = cap->client_inline_version = i->inline_data.version;
3439 if (i->inline_data.length() > 0)
3440 m->inline_data = i->inline_data.get_data();
3441 } else {
3442 m->inline_version = 0;
3443 }
3444
3445 // max_size is min of projected, actual.
3446 uint64_t oldms = oi->client_ranges.count(client) ? oi->client_ranges[client].range.last : 0;
3447 uint64_t newms = pi->client_ranges.count(client) ? pi->client_ranges[client].range.last : 0;
3448 m->max_size = MIN(oldms, newms);
3449
3450 i = pauth ? pi:oi;
3451 m->head.mode = i->mode;
3452 m->head.uid = i->uid;
3453 m->head.gid = i->gid;
3454
3455 i = plink ? pi:oi;
3456 m->head.nlink = i->nlink;
3457
3458 i = pxattr ? pi:oi;
3459 map<string,bufferptr> *ix = pxattr ? get_projected_xattrs() : &xattrs;
3460 if ((cap->pending() & CEPH_CAP_XATTR_SHARED) &&
3461 i->xattr_version > cap->client_xattr_version) {
3462 dout(10) << " including xattrs v " << i->xattr_version << dendl;
3463 ::encode(*ix, m->xattrbl);
3464 m->head.xattr_version = i->xattr_version;
3465 cap->client_xattr_version = i->xattr_version;
3466 }
3467}
3468
3469
3470
3471void CInode::_encode_base(bufferlist& bl, uint64_t features)
3472{
3473 ::encode(first, bl);
3474 ::encode(inode, bl, features);
3475 ::encode(symlink, bl);
3476 ::encode(dirfragtree, bl);
3477 ::encode(xattrs, bl);
3478 ::encode(old_inodes, bl, features);
3479 ::encode(damage_flags, bl);
3480 encode_snap(bl);
3481}
3482void CInode::_decode_base(bufferlist::iterator& p)
3483{
3484 ::decode(first, p);
3485 ::decode(inode, p);
3486 ::decode(symlink, p);
3487 ::decode(dirfragtree, p);
3488 ::decode(xattrs, p);
3489 ::decode(old_inodes, p);
3490 ::decode(damage_flags, p);
3491 decode_snap(p);
3492}
3493
3494void CInode::_encode_locks_full(bufferlist& bl)
3495{
3496 ::encode(authlock, bl);
3497 ::encode(linklock, bl);
3498 ::encode(dirfragtreelock, bl);
3499 ::encode(filelock, bl);
3500 ::encode(xattrlock, bl);
3501 ::encode(snaplock, bl);
3502 ::encode(nestlock, bl);
3503 ::encode(flocklock, bl);
3504 ::encode(policylock, bl);
3505
3506 ::encode(loner_cap, bl);
3507}
3508void CInode::_decode_locks_full(bufferlist::iterator& p)
3509{
3510 ::decode(authlock, p);
3511 ::decode(linklock, p);
3512 ::decode(dirfragtreelock, p);
3513 ::decode(filelock, p);
3514 ::decode(xattrlock, p);
3515 ::decode(snaplock, p);
3516 ::decode(nestlock, p);
3517 ::decode(flocklock, p);
3518 ::decode(policylock, p);
3519
3520 ::decode(loner_cap, p);
3521 set_loner_cap(loner_cap);
3522 want_loner_cap = loner_cap; // for now, we'll eval() shortly.
3523}
3524
3525void CInode::_encode_locks_state_for_replica(bufferlist& bl)
3526{
3527 authlock.encode_state_for_replica(bl);
3528 linklock.encode_state_for_replica(bl);
3529 dirfragtreelock.encode_state_for_replica(bl);
3530 filelock.encode_state_for_replica(bl);
3531 nestlock.encode_state_for_replica(bl);
3532 xattrlock.encode_state_for_replica(bl);
3533 snaplock.encode_state_for_replica(bl);
3534 flocklock.encode_state_for_replica(bl);
3535 policylock.encode_state_for_replica(bl);
3536}
3537void CInode::_encode_locks_state_for_rejoin(bufferlist& bl, int rep)
3538{
3539 authlock.encode_state_for_replica(bl);
3540 linklock.encode_state_for_replica(bl);
3541 dirfragtreelock.encode_state_for_rejoin(bl, rep);
3542 filelock.encode_state_for_rejoin(bl, rep);
3543 nestlock.encode_state_for_rejoin(bl, rep);
3544 xattrlock.encode_state_for_replica(bl);
3545 snaplock.encode_state_for_replica(bl);
3546 flocklock.encode_state_for_replica(bl);
3547 policylock.encode_state_for_replica(bl);
3548}
3549void CInode::_decode_locks_state(bufferlist::iterator& p, bool is_new)
3550{
3551 authlock.decode_state(p, is_new);
3552 linklock.decode_state(p, is_new);
3553 dirfragtreelock.decode_state(p, is_new);
3554 filelock.decode_state(p, is_new);
3555 nestlock.decode_state(p, is_new);
3556 xattrlock.decode_state(p, is_new);
3557 snaplock.decode_state(p, is_new);
3558 flocklock.decode_state(p, is_new);
3559 policylock.decode_state(p, is_new);
3560}
3561void CInode::_decode_locks_rejoin(bufferlist::iterator& p, list<MDSInternalContextBase*>& waiters,
3562 list<SimpleLock*>& eval_locks)
3563{
3564 authlock.decode_state_rejoin(p, waiters);
3565 linklock.decode_state_rejoin(p, waiters);
3566 dirfragtreelock.decode_state_rejoin(p, waiters);
3567 filelock.decode_state_rejoin(p, waiters);
3568 nestlock.decode_state_rejoin(p, waiters);
3569 xattrlock.decode_state_rejoin(p, waiters);
3570 snaplock.decode_state_rejoin(p, waiters);
3571 flocklock.decode_state_rejoin(p, waiters);
3572 policylock.decode_state_rejoin(p, waiters);
3573
3574 if (!dirfragtreelock.is_stable() && !dirfragtreelock.is_wrlocked())
3575 eval_locks.push_back(&dirfragtreelock);
3576 if (!filelock.is_stable() && !filelock.is_wrlocked())
3577 eval_locks.push_back(&filelock);
3578 if (!nestlock.is_stable() && !nestlock.is_wrlocked())
3579 eval_locks.push_back(&nestlock);
3580}
3581
3582
3583// IMPORT/EXPORT
3584
3585void CInode::encode_export(bufferlist& bl)
3586{
3587 ENCODE_START(5, 4, bl);
3588 _encode_base(bl, mdcache->mds->mdsmap->get_up_features());
3589
3590 ::encode(state, bl);
3591
3592 ::encode(pop, bl);
3593
3594 ::encode(replica_map, bl);
3595
3596 // include scatterlock info for any bounding CDirs
3597 bufferlist bounding;
3598 if (inode.is_dir())
3599 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
3600 p != dirfrags.end();
3601 ++p) {
3602 CDir *dir = p->second;
3603 if (dir->state_test(CDir::STATE_EXPORTBOUND)) {
3604 ::encode(p->first, bounding);
3605 ::encode(dir->fnode.fragstat, bounding);
3606 ::encode(dir->fnode.accounted_fragstat, bounding);
3607 ::encode(dir->fnode.rstat, bounding);
3608 ::encode(dir->fnode.accounted_rstat, bounding);
3609 dout(10) << " encoded fragstat/rstat info for " << *dir << dendl;
3610 }
3611 }
3612 ::encode(bounding, bl);
3613
3614 _encode_locks_full(bl);
3615
3616 _encode_file_locks(bl);
3617
3618 ENCODE_FINISH(bl);
3619
3620 get(PIN_TEMPEXPORTING);
3621}
3622
3623void CInode::finish_export(utime_t now)
3624{
3625 state &= MASK_STATE_EXPORT_KEPT;
3626
3627 pop.zero(now);
3628
3629 // just in case!
3630 //dirlock.clear_updated();
3631
3632 loner_cap = -1;
3633
3634 put(PIN_TEMPEXPORTING);
3635}
3636
3637void CInode::decode_import(bufferlist::iterator& p,
3638 LogSegment *ls)
3639{
3640 DECODE_START(5, p);
3641
3642 _decode_base(p);
3643
3644 unsigned s;
3645 ::decode(s, p);
3646 state_set(STATE_AUTH | (s & MASK_STATE_EXPORTED));
3647
3648 if (is_dirty()) {
3649 get(PIN_DIRTY);
3650 _mark_dirty(ls);
3651 }
3652 if (is_dirty_parent()) {
3653 get(PIN_DIRTYPARENT);
3654 _mark_dirty_parent(ls);
3655 }
3656
3657 ::decode(pop, ceph_clock_now(), p);
3658
3659 ::decode(replica_map, p);
3660 if (!replica_map.empty())
3661 get(PIN_REPLICATED);
3662 replica_nonce = 0;
3663
3664 // decode fragstat info on bounding cdirs
3665 bufferlist bounding;
3666 ::decode(bounding, p);
3667 bufferlist::iterator q = bounding.begin();
3668 while (!q.end()) {
3669 frag_t fg;
3670 ::decode(fg, q);
3671 CDir *dir = get_dirfrag(fg);
3672 assert(dir); // we should have all bounds open
3673
3674 // Only take the remote's fragstat/rstat if we are non-auth for
3675 // this dirfrag AND the lock is NOT in a scattered (MIX) state.
3676 // We know lock is stable, and MIX is the only state in which
3677 // the inode auth (who sent us this data) may not have the best
3678 // info.
3679
3680 // HMM: Are there cases where dir->is_auth() is an insufficient
3681 // check because the dirfrag is under migration? That implies
3682 // it is frozen (and in a SYNC or LOCK state). FIXME.
3683
3684 if (dir->is_auth() ||
3685 filelock.get_state() == LOCK_MIX) {
3686 dout(10) << " skipped fragstat info for " << *dir << dendl;
3687 frag_info_t f;
3688 ::decode(f, q);
3689 ::decode(f, q);
3690 } else {
3691 ::decode(dir->fnode.fragstat, q);
3692 ::decode(dir->fnode.accounted_fragstat, q);
3693 dout(10) << " took fragstat info for " << *dir << dendl;
3694 }
3695 if (dir->is_auth() ||
3696 nestlock.get_state() == LOCK_MIX) {
3697 dout(10) << " skipped rstat info for " << *dir << dendl;
3698 nest_info_t n;
3699 ::decode(n, q);
3700 ::decode(n, q);
3701 } else {
3702 ::decode(dir->fnode.rstat, q);
3703 ::decode(dir->fnode.accounted_rstat, q);
3704 dout(10) << " took rstat info for " << *dir << dendl;
3705 }
3706 }
3707
3708 _decode_locks_full(p);
3709
3710 _decode_file_locks(p);
3711
3712 DECODE_FINISH(p);
3713}
3714
3715
3716void InodeStoreBase::dump(Formatter *f) const
3717{
3718 inode.dump(f);
3719 f->dump_string("symlink", symlink);
3720 f->open_array_section("old_inodes");
3721 for (compact_map<snapid_t, old_inode_t>::const_iterator i = old_inodes.begin();
3722 i != old_inodes.end(); ++i) {
3723 f->open_object_section("old_inode");
3724 {
3725 // The key is the last snapid, the first is in the old_inode_t
3726 f->dump_int("last", i->first);
3727 i->second.dump(f);
3728 }
3729 f->close_section(); // old_inode
3730 }
3731 f->close_section(); // old_inodes
3732
3733 f->open_object_section("dirfragtree");
3734 dirfragtree.dump(f);
3735 f->close_section(); // dirfragtree
3736}
3737
3738
3739void InodeStore::generate_test_instances(list<InodeStore*> &ls)
3740{
3741 InodeStore *populated = new InodeStore;
3742 populated->inode.ino = 0xdeadbeef;
3743 populated->symlink = "rhubarb";
3744 ls.push_back(populated);
3745}
3746
3747void CInode::validate_disk_state(CInode::validated_data *results,
3748 MDSInternalContext *fin)
3749{
3750 class ValidationContinuation : public MDSContinuation {
3751 public:
3752 MDSInternalContext *fin;
3753 CInode *in;
3754 CInode::validated_data *results;
3755 bufferlist bl;
3756 CInode *shadow_in;
3757
3758 enum {
3759 START = 0,
3760 BACKTRACE,
3761 INODE,
3762 DIRFRAGS
3763 };
3764
3765 ValidationContinuation(CInode *i,
3766 CInode::validated_data *data_r,
3767 MDSInternalContext *fin_) :
3768 MDSContinuation(i->mdcache->mds->server),
3769 fin(fin_),
3770 in(i),
3771 results(data_r),
3772 shadow_in(NULL) {
3773 set_callback(START, static_cast<Continuation::stagePtr>(&ValidationContinuation::_start));
3774 set_callback(BACKTRACE, static_cast<Continuation::stagePtr>(&ValidationContinuation::_backtrace));
3775 set_callback(INODE, static_cast<Continuation::stagePtr>(&ValidationContinuation::_inode_disk));
3776 set_callback(DIRFRAGS, static_cast<Continuation::stagePtr>(&ValidationContinuation::_dirfrags));
3777 }
3778
3779 ~ValidationContinuation() override {
3780 delete shadow_in;
3781 }
3782
3783 /**
3784 * Fetch backtrace and set tag if tag is non-empty
3785 */
3786 void fetch_backtrace_and_tag(CInode *in, std::string tag,
3787 Context *fin, int *bt_r, bufferlist *bt)
3788 {
3789 const int64_t pool = in->get_backtrace_pool();
3790 object_t oid = CInode::get_object_name(in->ino(), frag_t(), "");
3791
3792 ObjectOperation fetch;
3793 fetch.getxattr("parent", bt, bt_r);
3794 in->mdcache->mds->objecter->read(oid, object_locator_t(pool), fetch, CEPH_NOSNAP,
3795 NULL, 0, fin);
3796 if (!tag.empty()) {
3797 ObjectOperation scrub_tag;
3798 bufferlist tag_bl;
3799 ::encode(tag, tag_bl);
3800 scrub_tag.setxattr("scrub_tag", tag_bl);
3801 SnapContext snapc;
3802 in->mdcache->mds->objecter->mutate(oid, object_locator_t(pool), scrub_tag, snapc,
3803 ceph::real_clock::now(),
3804 0, NULL);
3805 }
3806 }
3807
3808 bool _start(int rval) {
3809 if (in->is_dirty()) {
3810 MDCache *mdcache = in->mdcache;
3811 inode_t& inode = in->inode;
3812 dout(20) << "validating a dirty CInode; results will be inconclusive"
3813 << dendl;
3814 }
3815 if (in->is_symlink()) {
3816 // there's nothing to do for symlinks!
3817 return true;
3818 }
3819
3820 C_OnFinisher *conf = new C_OnFinisher(get_io_callback(BACKTRACE),
3821 in->mdcache->mds->finisher);
3822
3823 // Whether we have a tag to apply depends on ScrubHeader (if one is
3824 // present)
3825 if (in->scrub_infop) {
3826 // I'm a non-orphan, so look up my ScrubHeader via my linkage
3827 const std::string &tag = in->scrub_infop->header->get_tag();
3828 // Rather than using the usual CInode::fetch_backtrace,
3829 // use a special variant that optionally writes a tag in the same
3830 // operation.
3831 fetch_backtrace_and_tag(in, tag, conf,
3832 &results->backtrace.ondisk_read_retval, &bl);
3833 } else {
3834 // When we're invoked outside of ScrubStack we might be called
3835 // on an orphaned inode like /
3836 fetch_backtrace_and_tag(in, {}, conf,
3837 &results->backtrace.ondisk_read_retval, &bl);
3838 }
3839 return false;
3840 }
3841
3842 bool _backtrace(int rval) {
3843 // set up basic result reporting and make sure we got the data
3844 results->performed_validation = true; // at least, some of it!
3845 results->backtrace.checked = true;
3846
3847 const int64_t pool = in->get_backtrace_pool();
3848 inode_backtrace_t& memory_backtrace = results->backtrace.memory_value;
3849 in->build_backtrace(pool, memory_backtrace);
3850 bool equivalent, divergent;
3851 int memory_newer;
3852
3853 MDCache *mdcache = in->mdcache; // For the benefit of dout
3854 const inode_t& inode = in->inode; // For the benefit of dout
3855
3856 // Ignore rval because it's the result of a FAILOK operation
3857 // from fetch_backtrace_and_tag: the real result is in
3858 // backtrace.ondisk_read_retval
3859 dout(20) << "ondisk_read_retval: " << results->backtrace.ondisk_read_retval << dendl;
3860 if (results->backtrace.ondisk_read_retval != 0) {
3861 results->backtrace.error_str << "failed to read off disk; see retval";
3862 goto next;
3863 }
3864
3865 // extract the backtrace, and compare it to a newly-constructed one
3866 try {
3867 bufferlist::iterator p = bl.begin();
3868 ::decode(results->backtrace.ondisk_value, p);
3869 dout(10) << "decoded " << bl.length() << " bytes of backtrace successfully" << dendl;
3870 } catch (buffer::error&) {
3871 if (results->backtrace.ondisk_read_retval == 0 && rval != 0) {
3872 // Cases where something has clearly gone wrong with the overall
3873 // fetch op, though we didn't get a nonzero rc from the getxattr
3874 // operation. e.g. object missing.
3875 results->backtrace.ondisk_read_retval = rval;
3876 }
3877 results->backtrace.error_str << "failed to decode on-disk backtrace ("
3878 << bl.length() << " bytes)!";
3879 goto next;
3880 }
3881
3882 memory_newer = memory_backtrace.compare(results->backtrace.ondisk_value,
3883 &equivalent, &divergent);
3884
3885 if (divergent || memory_newer < 0) {
3886 // we're divergent, or on-disk version is newer
3887 results->backtrace.error_str << "On-disk backtrace is divergent or newer";
3888 } else {
3889 results->backtrace.passed = true;
3890 }
3891next:
3892
3893 if (!results->backtrace.passed && in->scrub_infop->header->get_repair()) {
3894 std::string path;
3895 in->make_path_string(path);
3896 in->mdcache->mds->clog->warn() << "bad backtrace on inode " << *in
3897 << ", rewriting it at " << path;
3898 in->_mark_dirty_parent(in->mdcache->mds->mdlog->get_current_segment(),
3899 false);
3900 }
3901
3902 // If the inode's number was free in the InoTable, fix that
3903 // (#15619)
3904 {
3905 InoTable *inotable = mdcache->mds->inotable;
3906
3907 dout(10) << "scrub: inotable ino = 0x" << std::hex << inode.ino << dendl;
3908 dout(10) << "scrub: inotable free says "
3909 << inotable->is_marked_free(inode.ino) << dendl;
3910
3911 if (inotable->is_marked_free(inode.ino)) {
3912 LogChannelRef clog = in->mdcache->mds->clog;
3913 clog->error() << "scrub: inode wrongly marked free: 0x" << std::hex
3914 << inode.ino;
3915
3916 if (in->scrub_infop->header->get_repair()) {
3917 bool repaired = inotable->repair(inode.ino);
3918 if (repaired) {
3919 clog->error() << "inode table repaired for inode: 0x" << std::hex
3920 << inode.ino;
3921
3922 inotable->save();
3923 } else {
3924 clog->error() << "Cannot repair inotable while other operations"
3925 " are in progress";
3926 }
3927 }
3928 }
3929 }
3930
3931 // quit if we're a file, or kick off directory checks otherwise
3932 // TODO: validate on-disk inode for non-base directories
3933 if (!in->is_dir()) {
3934 return true;
3935 }
3936
3937 return validate_directory_data();
3938 }
3939
3940 bool validate_directory_data() {
3941 assert(in->is_dir());
3942
3943 if (in->is_base()) {
3944 shadow_in = new CInode(in->mdcache);
3945 in->mdcache->create_unlinked_system_inode(shadow_in,
3946 in->inode.ino,
3947 in->inode.mode);
3948 shadow_in->fetch(get_internal_callback(INODE));
3949 return false;
3950 } else {
3951 results->inode.passed = true;
3952 return check_dirfrag_rstats();
3953 }
3954 }
3955
3956 bool _inode_disk(int rval) {
3957 results->inode.checked = true;
3958 results->inode.ondisk_read_retval = rval;
3959 results->inode.ondisk_value = shadow_in->inode;
3960 results->inode.memory_value = in->inode;
3961
3962 inode_t& si = shadow_in->inode;
3963 inode_t& i = in->inode;
3964 if (si.version > i.version) {
3965 // uh, what?
3966 results->inode.error_str << "On-disk inode is newer than in-memory one!";
3967 goto next;
3968 } else {
3969 bool divergent = false;
3970 int r = i.compare(si, &divergent);
3971 results->inode.passed = !divergent && r >= 0;
3972 if (!results->inode.passed) {
3973 results->inode.error_str <<
3974 "On-disk inode is divergent or newer than in-memory one!";
3975 goto next;
3976 }
3977 }
3978next:
3979 return check_dirfrag_rstats();
3980 }
3981
3982 bool check_dirfrag_rstats() {
3983 MDSGatherBuilder gather(g_ceph_context);
3984 std::list<frag_t> frags;
3985 in->dirfragtree.get_leaves(frags);
3986 for (list<frag_t>::iterator p = frags.begin();
3987 p != frags.end();
3988 ++p) {
3989 CDir *dir = in->get_or_open_dirfrag(in->mdcache, *p);
3990 dir->scrub_info();
3991 if (!dir->scrub_infop->header)
3992 dir->scrub_infop->header = in->scrub_infop->header;
3993 if (dir->is_complete()) {
3994 dir->scrub_local();
3995 } else {
3996 dir->scrub_infop->need_scrub_local = true;
3997 dir->fetch(gather.new_sub(), false);
3998 }
3999 }
4000 if (gather.has_subs()) {
4001 gather.set_finisher(get_internal_callback(DIRFRAGS));
4002 gather.activate();
4003 return false;
4004 } else {
4005 return immediate(DIRFRAGS, 0);
4006 }
4007 }
4008
4009 bool _dirfrags(int rval) {
4010 int frags_errors = 0;
4011 // basic reporting setup
4012 results->raw_stats.checked = true;
4013 results->raw_stats.ondisk_read_retval = rval;
4014
4015 results->raw_stats.memory_value.dirstat = in->inode.dirstat;
4016 results->raw_stats.memory_value.rstat = in->inode.rstat;
4017 frag_info_t& dir_info = results->raw_stats.ondisk_value.dirstat;
4018 nest_info_t& nest_info = results->raw_stats.ondisk_value.rstat;
4019
4020 if (rval != 0) {
4021 results->raw_stats.error_str << "Failed to read dirfrags off disk";
4022 goto next;
4023 }
4024
4025 // check each dirfrag...
4026 for (compact_map<frag_t,CDir*>::iterator p = in->dirfrags.begin();
4027 p != in->dirfrags.end();
4028 ++p) {
4029 CDir *dir = p->second;
4030 assert(dir->get_version() > 0);
4031 nest_info.add(dir->fnode.accounted_rstat);
4032 dir_info.add(dir->fnode.accounted_fragstat);
4033 if (dir->scrub_infop &&
4034 dir->scrub_infop->pending_scrub_error) {
4035 dir->scrub_infop->pending_scrub_error = false;
4036 if (dir->scrub_infop->header->get_repair()) {
4037 results->raw_stats.error_str
4038 << "dirfrag(" << p->first << ") has bad stats (will be fixed); ";
4039 } else {
4040 results->raw_stats.error_str
4041 << "dirfrag(" << p->first << ") has bad stats; ";
4042 }
4043 frags_errors++;
4044 }
4045 }
4046 nest_info.rsubdirs++; // it gets one to account for self
4047 // ...and that their sum matches our inode settings
4048 if (!dir_info.same_sums(in->inode.dirstat) ||
4049 !nest_info.same_sums(in->inode.rstat)) {
4050 if (in->scrub_infop &&
4051 in->scrub_infop->header->get_repair()) {
4052 results->raw_stats.error_str
4053 << "freshly-calculated rstats don't match existing ones (will be fixed)";
4054 in->mdcache->repair_inode_stats(in);
4055 } else {
4056 results->raw_stats.error_str
4057 << "freshly-calculated rstats don't match existing ones";
4058 }
4059 goto next;
4060 }
4061 if (frags_errors > 0)
4062 goto next;
4063
4064 results->raw_stats.passed = true;
4065next:
4066 return true;
4067 }
4068
4069 void _done() override {
4070 if ((!results->raw_stats.checked || results->raw_stats.passed) &&
4071 (!results->backtrace.checked || results->backtrace.passed) &&
4072 (!results->inode.checked || results->inode.passed))
4073 results->passed_validation = true;
4074 if (fin) {
4075 fin->complete(get_rval());
4076 }
4077 }
4078 };
4079
4080
4081 dout(10) << "scrub starting validate_disk_state on " << *this << dendl;
4082 ValidationContinuation *vc = new ValidationContinuation(this,
4083 results,
4084 fin);
4085 vc->begin();
4086}
4087
4088void CInode::validated_data::dump(Formatter *f) const
4089{
4090 f->open_object_section("results");
4091 {
4092 f->dump_bool("performed_validation", performed_validation);
4093 f->dump_bool("passed_validation", passed_validation);
4094 f->open_object_section("backtrace");
4095 {
4096 f->dump_bool("checked", backtrace.checked);
4097 f->dump_bool("passed", backtrace.passed);
4098 f->dump_int("read_ret_val", backtrace.ondisk_read_retval);
4099 f->dump_stream("ondisk_value") << backtrace.ondisk_value;
4100 f->dump_stream("memoryvalue") << backtrace.memory_value;
4101 f->dump_string("error_str", backtrace.error_str.str());
4102 }
4103 f->close_section(); // backtrace
4104 f->open_object_section("raw_stats");
4105 {
4106 f->dump_bool("checked", raw_stats.checked);
4107 f->dump_bool("passed", raw_stats.passed);
4108 f->dump_int("read_ret_val", raw_stats.ondisk_read_retval);
4109 f->dump_stream("ondisk_value.dirstat") << raw_stats.ondisk_value.dirstat;
4110 f->dump_stream("ondisk_value.rstat") << raw_stats.ondisk_value.rstat;
4111 f->dump_stream("memory_value.dirrstat") << raw_stats.memory_value.dirstat;
4112 f->dump_stream("memory_value.rstat") << raw_stats.memory_value.rstat;
4113 f->dump_string("error_str", raw_stats.error_str.str());
4114 }
4115 f->close_section(); // raw_stats
4116 // dump failure return code
4117 int rc = 0;
4118 if (backtrace.checked && backtrace.ondisk_read_retval)
4119 rc = backtrace.ondisk_read_retval;
4120 if (inode.checked && inode.ondisk_read_retval)
4121 rc = inode.ondisk_read_retval;
4122 if (raw_stats.checked && raw_stats.ondisk_read_retval)
4123 rc = raw_stats.ondisk_read_retval;
4124 f->dump_int("return_code", rc);
4125 }
4126 f->close_section(); // results
4127}
4128
4129void CInode::dump(Formatter *f) const
4130{
4131 InodeStoreBase::dump(f);
4132
4133 MDSCacheObject::dump(f);
4134
4135 f->open_object_section("versionlock");
4136 versionlock.dump(f);
4137 f->close_section();
4138
4139 f->open_object_section("authlock");
4140 authlock.dump(f);
4141 f->close_section();
4142
4143 f->open_object_section("linklock");
4144 linklock.dump(f);
4145 f->close_section();
4146
4147 f->open_object_section("dirfragtreelock");
4148 dirfragtreelock.dump(f);
4149 f->close_section();
4150
4151 f->open_object_section("filelock");
4152 filelock.dump(f);
4153 f->close_section();
4154
4155 f->open_object_section("xattrlock");
4156 xattrlock.dump(f);
4157 f->close_section();
4158
4159 f->open_object_section("snaplock");
4160 snaplock.dump(f);
4161 f->close_section();
4162
4163 f->open_object_section("nestlock");
4164 nestlock.dump(f);
4165 f->close_section();
4166
4167 f->open_object_section("flocklock");
4168 flocklock.dump(f);
4169 f->close_section();
4170
4171 f->open_object_section("policylock");
4172 policylock.dump(f);
4173 f->close_section();
4174
4175 f->open_array_section("states");
4176 MDSCacheObject::dump_states(f);
4177 if (state_test(STATE_EXPORTING))
4178 f->dump_string("state", "exporting");
4179 if (state_test(STATE_OPENINGDIR))
4180 f->dump_string("state", "openingdir");
4181 if (state_test(STATE_FREEZING))
4182 f->dump_string("state", "freezing");
4183 if (state_test(STATE_FROZEN))
4184 f->dump_string("state", "frozen");
4185 if (state_test(STATE_AMBIGUOUSAUTH))
4186 f->dump_string("state", "ambiguousauth");
4187 if (state_test(STATE_EXPORTINGCAPS))
4188 f->dump_string("state", "exportingcaps");
4189 if (state_test(STATE_NEEDSRECOVER))
4190 f->dump_string("state", "needsrecover");
4191 if (state_test(STATE_PURGING))
4192 f->dump_string("state", "purging");
4193 if (state_test(STATE_DIRTYPARENT))
4194 f->dump_string("state", "dirtyparent");
4195 if (state_test(STATE_DIRTYRSTAT))
4196 f->dump_string("state", "dirtyrstat");
4197 if (state_test(STATE_STRAYPINNED))
4198 f->dump_string("state", "straypinned");
4199 if (state_test(STATE_FROZENAUTHPIN))
4200 f->dump_string("state", "frozenauthpin");
4201 if (state_test(STATE_DIRTYPOOL))
4202 f->dump_string("state", "dirtypool");
4203 if (state_test(STATE_ORPHAN))
4204 f->dump_string("state", "orphan");
4205 if (state_test(STATE_MISSINGOBJS))
4206 f->dump_string("state", "missingobjs");
4207 f->close_section();
4208
4209 f->open_array_section("client_caps");
4210 for (map<client_t,Capability*>::const_iterator it = client_caps.begin();
4211 it != client_caps.end(); ++it) {
4212 f->open_object_section("client_cap");
4213 f->dump_int("client_id", it->first.v);
4214 f->dump_string("pending", ccap_string(it->second->pending()));
4215 f->dump_string("issued", ccap_string(it->second->issued()));
4216 f->dump_string("wanted", ccap_string(it->second->wanted()));
4217 f->dump_string("last_sent", ccap_string(it->second->get_last_sent()));
4218 f->close_section();
4219 }
4220 f->close_section();
4221
4222 f->dump_int("loner", loner_cap.v);
4223 f->dump_int("want_loner", want_loner_cap.v);
4224
4225 f->open_array_section("mds_caps_wanted");
4226 for (compact_map<int,int>::const_iterator p = mds_caps_wanted.begin();
4227 p != mds_caps_wanted.end(); ++p) {
4228 f->open_object_section("mds_cap_wanted");
4229 f->dump_int("rank", p->first);
4230 f->dump_string("cap", ccap_string(p->second));
4231 f->close_section();
4232 }
4233 f->close_section();
4234}
4235
4236/****** Scrub Stuff *****/
4237void CInode::scrub_info_create() const
4238{
4239 dout(25) << __func__ << dendl;
4240 assert(!scrub_infop);
4241
4242 // break out of const-land to set up implicit initial state
4243 CInode *me = const_cast<CInode*>(this);
4244 inode_t *in = me->get_projected_inode();
4245
4246 scrub_info_t *si = new scrub_info_t();
4247 si->scrub_start_stamp = si->last_scrub_stamp = in->last_scrub_stamp;
4248 si->scrub_start_version = si->last_scrub_version = in->last_scrub_version;
4249
4250 me->scrub_infop = si;
4251}
4252
4253void CInode::scrub_maybe_delete_info()
4254{
4255 if (scrub_infop &&
4256 !scrub_infop->scrub_in_progress &&
4257 !scrub_infop->last_scrub_dirty) {
4258 delete scrub_infop;
4259 scrub_infop = NULL;
4260 }
4261}
4262
4263void CInode::scrub_initialize(CDentry *scrub_parent,
4264 const ScrubHeaderRefConst& header,
4265 MDSInternalContextBase *f)
4266{
4267 dout(20) << __func__ << " with scrub_version " << get_version() << dendl;
4268 assert(!scrub_is_in_progress());
4269 scrub_info();
4270 if (!scrub_infop)
4271 scrub_infop = new scrub_info_t();
4272
4273 if (get_projected_inode()->is_dir()) {
4274 // fill in dirfrag_stamps with initial state
4275 std::list<frag_t> frags;
4276 dirfragtree.get_leaves(frags);
4277 for (std::list<frag_t>::iterator i = frags.begin();
4278 i != frags.end();
4279 ++i) {
4280 if (header->get_force())
4281 scrub_infop->dirfrag_stamps[*i].reset();
4282 else
4283 scrub_infop->dirfrag_stamps[*i];
4284 }
4285 }
4286
4287 if (scrub_parent)
4288 scrub_parent->get(CDentry::PIN_SCRUBPARENT);
4289 scrub_infop->scrub_parent = scrub_parent;
4290 scrub_infop->on_finish = f;
4291 scrub_infop->scrub_in_progress = true;
4292 scrub_infop->children_scrubbed = false;
4293 scrub_infop->header = header;
4294
4295 scrub_infop->scrub_start_version = get_version();
4296 scrub_infop->scrub_start_stamp = ceph_clock_now();
4297 // right now we don't handle remote inodes
4298}
4299
4300int CInode::scrub_dirfrag_next(frag_t* out_dirfrag)
4301{
4302 dout(20) << __func__ << dendl;
4303 assert(scrub_is_in_progress());
4304
4305 if (!is_dir()) {
4306 return -ENOTDIR;
4307 }
4308
4309 std::map<frag_t, scrub_stamp_info_t>::iterator i =
4310 scrub_infop->dirfrag_stamps.begin();
4311
4312 while (i != scrub_infop->dirfrag_stamps.end()) {
4313 if (i->second.scrub_start_version < scrub_infop->scrub_start_version) {
4314 i->second.scrub_start_version = get_projected_version();
4315 i->second.scrub_start_stamp = ceph_clock_now();
4316 *out_dirfrag = i->first;
4317 dout(20) << " return frag " << *out_dirfrag << dendl;
4318 return 0;
4319 }
4320 ++i;
4321 }
4322
4323 dout(20) << " no frags left, ENOENT " << dendl;
4324 return ENOENT;
4325}
4326
4327void CInode::scrub_dirfrags_scrubbing(list<frag_t>* out_dirfrags)
4328{
4329 assert(out_dirfrags != NULL);
4330 assert(scrub_infop != NULL);
4331
4332 out_dirfrags->clear();
4333 std::map<frag_t, scrub_stamp_info_t>::iterator i =
4334 scrub_infop->dirfrag_stamps.begin();
4335
4336 while (i != scrub_infop->dirfrag_stamps.end()) {
4337 if (i->second.scrub_start_version >= scrub_infop->scrub_start_version) {
4338 if (i->second.last_scrub_version < scrub_infop->scrub_start_version)
4339 out_dirfrags->push_back(i->first);
4340 } else {
4341 return;
4342 }
4343
4344 ++i;
4345 }
4346}
4347
4348void CInode::scrub_dirfrag_finished(frag_t dirfrag)
4349{
4350 dout(20) << __func__ << " on frag " << dirfrag << dendl;
4351 assert(scrub_is_in_progress());
4352
4353 std::map<frag_t, scrub_stamp_info_t>::iterator i =
4354 scrub_infop->dirfrag_stamps.find(dirfrag);
4355 assert(i != scrub_infop->dirfrag_stamps.end());
4356
4357 scrub_stamp_info_t &si = i->second;
4358 si.last_scrub_stamp = si.scrub_start_stamp;
4359 si.last_scrub_version = si.scrub_start_version;
4360}
4361
4362void CInode::scrub_finished(MDSInternalContextBase **c) {
4363 dout(20) << __func__ << dendl;
4364 assert(scrub_is_in_progress());
4365 for (std::map<frag_t, scrub_stamp_info_t>::iterator i =
4366 scrub_infop->dirfrag_stamps.begin();
4367 i != scrub_infop->dirfrag_stamps.end();
4368 ++i) {
4369 if(i->second.last_scrub_version != i->second.scrub_start_version) {
4370 derr << i->second.last_scrub_version << " != "
4371 << i->second.scrub_start_version << dendl;
4372 }
4373 assert(i->second.last_scrub_version == i->second.scrub_start_version);
4374 }
4375
4376 scrub_infop->last_scrub_version = scrub_infop->scrub_start_version;
4377 scrub_infop->last_scrub_stamp = scrub_infop->scrub_start_stamp;
4378 scrub_infop->last_scrub_dirty = true;
4379 scrub_infop->scrub_in_progress = false;
4380
4381 if (scrub_infop->scrub_parent) {
4382 CDentry *dn = scrub_infop->scrub_parent;
4383 scrub_infop->scrub_parent = NULL;
4384 dn->dir->scrub_dentry_finished(dn);
4385 dn->put(CDentry::PIN_SCRUBPARENT);
4386 }
4387
4388 *c = scrub_infop->on_finish;
4389 scrub_infop->on_finish = NULL;
4390
4391 if (scrub_infop->header->get_origin() == this) {
4392 // We are at the point that a tagging scrub was initiated
4393 LogChannelRef clog = mdcache->mds->clog;
4394 clog->info() << "scrub complete with tag '" << scrub_infop->header->get_tag() << "'";
4395 }
4396}
4397
4398int64_t CInode::get_backtrace_pool() const
4399{
4400 if (is_dir()) {
4401 return mdcache->mds->mdsmap->get_metadata_pool();
4402 } else {
4403 // Files are required to have an explicit layout that specifies
4404 // a pool
4405 assert(inode.layout.pool_id != -1);
4406 return inode.layout.pool_id;
4407 }
4408}
4409
4410class C_CInode_ExportPin : public MDSInternalContext {
4411public:
4412 explicit C_CInode_ExportPin(CInode *in) : MDSInternalContext(in->mdcache->mds), in(in) {
4413 in->get(MDSCacheObject::PIN_PTRWAITER);
4414 }
4415 ~C_CInode_ExportPin() {
4416 in->put(MDSCacheObject::PIN_PTRWAITER);
4417 }
4418
4419 void finish(int r) override {
4420 in->maybe_export_pin();
4421 }
4422private:
4423 CInode *in;
4424};
4425
4426void CInode::maybe_export_pin()
4427{
4428 if (g_conf->mds_bal_export_pin && is_dir() && is_normal()) {
4429 mds_rank_t pin = get_export_pin(false);
4430 dout(20) << "maybe_export_pin export_pin=" << pin << " on " << *this << dendl;
4431 if (pin == mdcache->mds->get_nodeid()) {
4432 for (auto it = dirfrags.begin(); it != dirfrags.end(); it++) {
4433 CDir *cd = it->second;
4434 dout(20) << "dirfrag: " << *cd << dendl;
4435 if (cd->state_test(CDir::STATE_CREATING)) {
4436 /* inode is not journaled yet */
4437 cd->add_waiter(CDir::WAIT_CREATED, new C_CInode_ExportPin(this));
4438 dout(15) << "aux subtree pin of " << *cd << " delayed for finished creation" << dendl;
4439 continue;
4440 }
4441 if (cd->state_test(CDir::STATE_AUXSUBTREE)) continue;
4442 CDir *subtree = mdcache->get_subtree_root(cd);
4443 assert(subtree);
4444 if (subtree->is_ambiguous_auth()) {
4445 subtree->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_CInode_ExportPin(this));
4446 dout(15) << "aux subtree pin of " << *cd << " delayed for single auth on subtree " << *subtree << dendl;
4447 } else if (subtree->is_auth()) {
4448 assert(cd->is_auth());
4449 if (subtree->is_frozen() || subtree->is_freezing()) {
4450 subtree->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_CInode_ExportPin(this));
4451 dout(15) << "aux subtree pin of " << *cd << " delayed for unfreeze on subtree " << *subtree << dendl;
4452 } else {
4453 cd->state_set(CDir::STATE_AUXSUBTREE);
4454 mdcache->adjust_subtree_auth(cd, mdcache->mds->get_nodeid());
4455 dout(15) << "aux subtree pinned " << *cd << dendl;
4456 }
4457 } else {
4458 assert(!cd->is_auth());
4459 dout(15) << "not setting aux subtree pin for " << *cd << " because not auth" << dendl;
4460 }
4461 }
4462 } else if (pin != MDS_RANK_NONE) {
4463 for (auto it = dirfrags.begin(); it != dirfrags.end(); it++) {
4464 CDir *cd = it->second;
4465 if (cd->is_auth() && cd->state_test(CDir::STATE_AUXSUBTREE)) {
4466 assert(!(cd->is_frozen() || cd->is_freezing()));
4467 assert(!cd->state_test(CDir::STATE_EXPORTBOUND));
4468 cd->state_clear(CDir::STATE_AUXSUBTREE); /* merge will happen eventually */
4469 dout(15) << "cleared aux subtree pin " << *cd << dendl;
4470 }
4471 }
4472 dout(20) << "adding to export_pin_queue " << *this << dendl;
4473 mdcache->export_pin_queue.insert(this);
4474 }
4475 }
4476}
4477
4478void CInode::set_export_pin(mds_rank_t rank)
4479{
4480 assert(is_dir());
4481 assert(is_projected());
4482 get_projected_inode()->export_pin = rank;
4483 maybe_export_pin();
4484}
4485
4486mds_rank_t CInode::get_export_pin(bool inherit) const
4487{
4488 /* An inode that is export pinned may not necessarily be a subtree root, we
4489 * need to traverse the parents. A base or system inode cannot be pinned.
4490 * N.B. inodes not yet linked into a dir (i.e. anonymous inodes) will not
4491 * have a parent yet.
4492 */
4493 for (const CInode *in = this; !in->is_base() && !in->is_system() && in->get_projected_parent_dn(); in = in->get_projected_parent_dn()->dir->inode) {
4494 mds_rank_t pin = in->get_projected_inode()->export_pin;
4495 if (pin >= 0) {
4496 return pin;
4497 }
4498 if (!inherit) break;
4499 }
4500 return MDS_RANK_NONE;
4501}
4502
4503bool CInode::is_exportable(mds_rank_t dest) const
4504{
4505 mds_rank_t pin = get_export_pin();
4506 if (pin == dest) {
4507 return true;
4508 } else if (pin >= 0) {
4509 return false;
4510 } else {
4511 return true;
4512 }
4513}