]> git.proxmox.com Git - ceph.git/blob - ceph/src/mds/CInode.cc
update sources to v12.1.1
[ceph.git] / ceph / src / mds / CInode.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include "include/int_types.h"
16 #include "common/errno.h"
17
18 #include <string>
19 #include <stdio.h>
20
21 #include "CInode.h"
22 #include "CDir.h"
23 #include "CDentry.h"
24
25 #include "MDSRank.h"
26 #include "MDCache.h"
27 #include "MDLog.h"
28 #include "Locker.h"
29 #include "Mutation.h"
30
31 #include "events/EUpdate.h"
32
33 #include "osdc/Objecter.h"
34
35 #include "snap.h"
36
37 #include "LogSegment.h"
38
39 #include "common/Clock.h"
40
41 #include "messages/MLock.h"
42 #include "messages/MClientCaps.h"
43
44 #include "common/config.h"
45 #include "global/global_context.h"
46 #include "include/assert.h"
47
48 #include "mds/MDSContinuation.h"
49 #include "mds/InoTable.h"
50
51 #define dout_context g_ceph_context
52 #define dout_subsys ceph_subsys_mds
53 #undef dout_prefix
54 #define dout_prefix *_dout << "mds." << mdcache->mds->get_nodeid() << ".cache.ino(" << inode.ino << ") "
55
56
57 class CInodeIOContext : public MDSIOContextBase
58 {
59 protected:
60 CInode *in;
61 MDSRank *get_mds() override {return in->mdcache->mds;}
62 public:
63 explicit CInodeIOContext(CInode *in_) : in(in_) {
64 assert(in != NULL);
65 }
66 };
67
68
69 LockType CInode::versionlock_type(CEPH_LOCK_IVERSION);
70 LockType CInode::authlock_type(CEPH_LOCK_IAUTH);
71 LockType CInode::linklock_type(CEPH_LOCK_ILINK);
72 LockType CInode::dirfragtreelock_type(CEPH_LOCK_IDFT);
73 LockType CInode::filelock_type(CEPH_LOCK_IFILE);
74 LockType CInode::xattrlock_type(CEPH_LOCK_IXATTR);
75 LockType CInode::snaplock_type(CEPH_LOCK_ISNAP);
76 LockType CInode::nestlock_type(CEPH_LOCK_INEST);
77 LockType CInode::flocklock_type(CEPH_LOCK_IFLOCK);
78 LockType CInode::policylock_type(CEPH_LOCK_IPOLICY);
79
80 //int cinode_pins[CINODE_NUM_PINS]; // counts
81 ostream& CInode::print_db_line_prefix(ostream& out)
82 {
83 return out << ceph_clock_now() << " mds." << mdcache->mds->get_nodeid() << ".cache.ino(" << inode.ino << ") ";
84 }
85
86 /*
87 * write caps and lock ids
88 */
89 struct cinode_lock_info_t cinode_lock_info[] = {
90 { CEPH_LOCK_IFILE, CEPH_CAP_ANY_FILE_WR },
91 { CEPH_LOCK_IAUTH, CEPH_CAP_AUTH_EXCL },
92 { CEPH_LOCK_ILINK, CEPH_CAP_LINK_EXCL },
93 { CEPH_LOCK_IXATTR, CEPH_CAP_XATTR_EXCL },
94 };
95 int num_cinode_locks = sizeof(cinode_lock_info) / sizeof(cinode_lock_info[0]);
96
97
98
99 ostream& operator<<(ostream& out, const CInode& in)
100 {
101 string path;
102 in.make_path_string(path, true);
103
104 out << "[inode " << in.inode.ino;
105 out << " ["
106 << (in.is_multiversion() ? "...":"")
107 << in.first << "," << in.last << "]";
108 out << " " << path << (in.is_dir() ? "/":"");
109
110 if (in.is_auth()) {
111 out << " auth";
112 if (in.is_replicated())
113 out << in.get_replicas();
114 } else {
115 mds_authority_t a = in.authority();
116 out << " rep@" << a.first;
117 if (a.second != CDIR_AUTH_UNKNOWN)
118 out << "," << a.second;
119 out << "." << in.get_replica_nonce();
120 }
121
122 if (in.is_symlink())
123 out << " symlink='" << in.symlink << "'";
124 if (in.is_dir() && !in.dirfragtree.empty())
125 out << " " << in.dirfragtree;
126
127 out << " v" << in.get_version();
128 if (in.get_projected_version() > in.get_version())
129 out << " pv" << in.get_projected_version();
130
131 if (in.is_auth_pinned()) {
132 out << " ap=" << in.get_num_auth_pins() << "+" << in.get_num_nested_auth_pins();
133 #ifdef MDS_AUTHPIN_SET
134 out << "(" << in.auth_pin_set << ")";
135 #endif
136 }
137
138 if (in.snaprealm)
139 out << " snaprealm=" << in.snaprealm;
140
141 if (in.state_test(CInode::STATE_AMBIGUOUSAUTH)) out << " AMBIGAUTH";
142 if (in.state_test(CInode::STATE_NEEDSRECOVER)) out << " needsrecover";
143 if (in.state_test(CInode::STATE_RECOVERING)) out << " recovering";
144 if (in.state_test(CInode::STATE_DIRTYPARENT)) out << " dirtyparent";
145 if (in.state_test(CInode::STATE_MISSINGOBJS)) out << " missingobjs";
146 if (in.is_freezing_inode()) out << " FREEZING=" << in.auth_pin_freeze_allowance;
147 if (in.is_frozen_inode()) out << " FROZEN";
148 if (in.is_frozen_auth_pin()) out << " FROZEN_AUTHPIN";
149
150 const inode_t *pi = in.get_projected_inode();
151 if (pi->is_truncating())
152 out << " truncating(" << pi->truncate_from << " to " << pi->truncate_size << ")";
153
154 if (in.inode.is_dir()) {
155 out << " " << in.inode.dirstat;
156 if (g_conf->mds_debug_scatterstat && in.is_projected()) {
157 const inode_t *pi = in.get_projected_inode();
158 out << "->" << pi->dirstat;
159 }
160 } else {
161 out << " s=" << in.inode.size;
162 if (in.inode.nlink != 1)
163 out << " nl=" << in.inode.nlink;
164 }
165
166 // rstat
167 out << " " << in.inode.rstat;
168 if (!(in.inode.rstat == in.inode.accounted_rstat))
169 out << "/" << in.inode.accounted_rstat;
170 if (g_conf->mds_debug_scatterstat && in.is_projected()) {
171 const inode_t *pi = in.get_projected_inode();
172 out << "->" << pi->rstat;
173 if (!(pi->rstat == pi->accounted_rstat))
174 out << "/" << pi->accounted_rstat;
175 }
176
177 if (!in.client_need_snapflush.empty())
178 out << " need_snapflush=" << in.client_need_snapflush;
179
180
181 // locks
182 if (!in.authlock.is_sync_and_unlocked())
183 out << " " << in.authlock;
184 if (!in.linklock.is_sync_and_unlocked())
185 out << " " << in.linklock;
186 if (in.inode.is_dir()) {
187 if (!in.dirfragtreelock.is_sync_and_unlocked())
188 out << " " << in.dirfragtreelock;
189 if (!in.snaplock.is_sync_and_unlocked())
190 out << " " << in.snaplock;
191 if (!in.nestlock.is_sync_and_unlocked())
192 out << " " << in.nestlock;
193 if (!in.policylock.is_sync_and_unlocked())
194 out << " " << in.policylock;
195 } else {
196 if (!in.flocklock.is_sync_and_unlocked())
197 out << " " << in.flocklock;
198 }
199 if (!in.filelock.is_sync_and_unlocked())
200 out << " " << in.filelock;
201 if (!in.xattrlock.is_sync_and_unlocked())
202 out << " " << in.xattrlock;
203 if (!in.versionlock.is_sync_and_unlocked())
204 out << " " << in.versionlock;
205
206 // hack: spit out crap on which clients have caps
207 if (in.inode.client_ranges.size())
208 out << " cr=" << in.inode.client_ranges;
209
210 if (!in.get_client_caps().empty()) {
211 out << " caps={";
212 for (map<client_t,Capability*>::const_iterator it = in.get_client_caps().begin();
213 it != in.get_client_caps().end();
214 ++it) {
215 if (it != in.get_client_caps().begin()) out << ",";
216 out << it->first << "="
217 << ccap_string(it->second->pending());
218 if (it->second->issued() != it->second->pending())
219 out << "/" << ccap_string(it->second->issued());
220 out << "/" << ccap_string(it->second->wanted())
221 << "@" << it->second->get_last_sent();
222 }
223 out << "}";
224 if (in.get_loner() >= 0 || in.get_wanted_loner() >= 0) {
225 out << ",l=" << in.get_loner();
226 if (in.get_loner() != in.get_wanted_loner())
227 out << "(" << in.get_wanted_loner() << ")";
228 }
229 }
230 if (!in.get_mds_caps_wanted().empty()) {
231 out << " mcw={";
232 for (compact_map<int,int>::const_iterator p = in.get_mds_caps_wanted().begin();
233 p != in.get_mds_caps_wanted().end();
234 ++p) {
235 if (p != in.get_mds_caps_wanted().begin())
236 out << ',';
237 out << p->first << '=' << ccap_string(p->second);
238 }
239 out << '}';
240 }
241
242 if (in.get_num_ref()) {
243 out << " |";
244 in.print_pin_set(out);
245 }
246
247 if (in.inode.export_pin != MDS_RANK_NONE) {
248 out << " export_pin=" << in.inode.export_pin;
249 }
250
251 out << " " << &in;
252 out << "]";
253 return out;
254 }
255
256 ostream& operator<<(ostream& out, const CInode::scrub_stamp_info_t& si)
257 {
258 out << "{scrub_start_version: " << si.scrub_start_version
259 << ", scrub_start_stamp: " << si.scrub_start_stamp
260 << ", last_scrub_version: " << si.last_scrub_version
261 << ", last_scrub_stamp: " << si.last_scrub_stamp;
262 return out;
263 }
264
265
266
267 void CInode::print(ostream& out)
268 {
269 out << *this;
270 }
271
272
273
274 void CInode::add_need_snapflush(CInode *snapin, snapid_t snapid, client_t client)
275 {
276 dout(10) << "add_need_snapflush client." << client << " snapid " << snapid << " on " << snapin << dendl;
277
278 if (client_need_snapflush.empty()) {
279 get(CInode::PIN_NEEDSNAPFLUSH);
280
281 // FIXME: this is non-optimal, as we'll block freezes/migrations for potentially
282 // long periods waiting for clients to flush their snaps.
283 auth_pin(this); // pin head inode...
284 }
285
286 set<client_t>& clients = client_need_snapflush[snapid];
287 if (clients.empty())
288 snapin->auth_pin(this); // ...and pin snapped/old inode!
289
290 clients.insert(client);
291 }
292
293 void CInode::remove_need_snapflush(CInode *snapin, snapid_t snapid, client_t client)
294 {
295 dout(10) << "remove_need_snapflush client." << client << " snapid " << snapid << " on " << snapin << dendl;
296 compact_map<snapid_t, std::set<client_t> >::iterator p = client_need_snapflush.find(snapid);
297 if (p == client_need_snapflush.end()) {
298 dout(10) << " snapid not found" << dendl;
299 return;
300 }
301 if (!p->second.count(client)) {
302 dout(10) << " client not found" << dendl;
303 return;
304 }
305 p->second.erase(client);
306 if (p->second.empty()) {
307 client_need_snapflush.erase(p);
308 snapin->auth_unpin(this);
309
310 if (client_need_snapflush.empty()) {
311 put(CInode::PIN_NEEDSNAPFLUSH);
312 auth_unpin(this);
313 }
314 }
315 }
316
317 bool CInode::split_need_snapflush(CInode *cowin, CInode *in)
318 {
319 dout(10) << "split_need_snapflush [" << cowin->first << "," << cowin->last << "] for " << *cowin << dendl;
320 bool need_flush = false;
321 for (compact_map<snapid_t, set<client_t> >::iterator p = client_need_snapflush.lower_bound(cowin->first);
322 p != client_need_snapflush.end() && p->first < in->first; ) {
323 compact_map<snapid_t, set<client_t> >::iterator q = p;
324 ++p;
325 assert(!q->second.empty());
326 if (cowin->last >= q->first) {
327 cowin->auth_pin(this);
328 need_flush = true;
329 } else
330 client_need_snapflush.erase(q);
331 in->auth_unpin(this);
332 }
333 return need_flush;
334 }
335
336 void CInode::mark_dirty_rstat()
337 {
338 if (!state_test(STATE_DIRTYRSTAT)) {
339 dout(10) << "mark_dirty_rstat" << dendl;
340 state_set(STATE_DIRTYRSTAT);
341 get(PIN_DIRTYRSTAT);
342 CDentry *pdn = get_projected_parent_dn();
343 if (pdn->is_auth()) {
344 CDir *pdir = pdn->dir;
345 pdir->dirty_rstat_inodes.push_back(&dirty_rstat_item);
346 mdcache->mds->locker->mark_updated_scatterlock(&pdir->inode->nestlock);
347 } else {
348 // under cross-MDS rename.
349 // DIRTYRSTAT flag will get cleared when rename finishes
350 assert(state_test(STATE_AMBIGUOUSAUTH));
351 }
352 }
353 }
354 void CInode::clear_dirty_rstat()
355 {
356 if (state_test(STATE_DIRTYRSTAT)) {
357 dout(10) << "clear_dirty_rstat" << dendl;
358 state_clear(STATE_DIRTYRSTAT);
359 put(PIN_DIRTYRSTAT);
360 dirty_rstat_item.remove_myself();
361 }
362 }
363
364 inode_t *CInode::project_inode(map<string,bufferptr> *px)
365 {
366 if (projected_nodes.empty()) {
367 projected_nodes.push_back(new projected_inode_t(new inode_t(inode)));
368 if (px)
369 *px = xattrs;
370 } else {
371 projected_nodes.push_back(new projected_inode_t(
372 new inode_t(*projected_nodes.back()->inode)));
373 if (px)
374 *px = *get_projected_xattrs();
375 }
376
377 projected_inode_t &pi = *projected_nodes.back();
378
379 if (px) {
380 pi.xattrs = px;
381 ++num_projected_xattrs;
382 }
383
384 if (scrub_infop && scrub_infop->last_scrub_dirty) {
385 pi.inode->last_scrub_stamp = scrub_infop->last_scrub_stamp;
386 pi.inode->last_scrub_version = scrub_infop->last_scrub_version;
387 scrub_infop->last_scrub_dirty = false;
388 scrub_maybe_delete_info();
389 }
390 dout(15) << "project_inode " << pi.inode << dendl;
391 return pi.inode;
392 }
393
394 void CInode::pop_and_dirty_projected_inode(LogSegment *ls)
395 {
396 assert(!projected_nodes.empty());
397 dout(15) << "pop_and_dirty_projected_inode " << projected_nodes.front()->inode
398 << " v" << projected_nodes.front()->inode->version << dendl;
399 int64_t old_pool = inode.layout.pool_id;
400
401 mark_dirty(projected_nodes.front()->inode->version, ls);
402 inode = *projected_nodes.front()->inode;
403
404 if (inode.is_backtrace_updated())
405 _mark_dirty_parent(ls, old_pool != inode.layout.pool_id);
406
407 map<string,bufferptr> *px = projected_nodes.front()->xattrs;
408 if (px) {
409 --num_projected_xattrs;
410 xattrs = *px;
411 delete px;
412 }
413
414 if (projected_nodes.front()->snapnode) {
415 pop_projected_snaprealm(projected_nodes.front()->snapnode);
416 --num_projected_srnodes;
417 }
418
419 delete projected_nodes.front()->inode;
420 delete projected_nodes.front();
421
422 projected_nodes.pop_front();
423 }
424
425 sr_t *CInode::project_snaprealm(snapid_t snapid)
426 {
427 sr_t *cur_srnode = get_projected_srnode();
428 sr_t *new_srnode;
429
430 if (cur_srnode) {
431 new_srnode = new sr_t(*cur_srnode);
432 } else {
433 new_srnode = new sr_t();
434 new_srnode->created = snapid;
435 new_srnode->current_parent_since = get_oldest_snap();
436 }
437 dout(10) << "project_snaprealm " << new_srnode << dendl;
438 projected_nodes.back()->snapnode = new_srnode;
439 ++num_projected_srnodes;
440 return new_srnode;
441 }
442
443 /* if newparent != parent, add parent to past_parents
444 if parent DNE, we need to find what the parent actually is and fill that in */
445 void CInode::project_past_snaprealm_parent(SnapRealm *newparent)
446 {
447 sr_t *new_snap = project_snaprealm();
448 SnapRealm *oldparent;
449 if (!snaprealm) {
450 oldparent = find_snaprealm();
451 new_snap->seq = oldparent->get_newest_seq();
452 }
453 else
454 oldparent = snaprealm->parent;
455
456 if (newparent != oldparent) {
457 snapid_t oldparentseq = oldparent->get_newest_seq();
458 if (oldparentseq + 1 > new_snap->current_parent_since) {
459 new_snap->past_parents[oldparentseq].ino = oldparent->inode->ino();
460 new_snap->past_parents[oldparentseq].first = new_snap->current_parent_since;
461 }
462 new_snap->current_parent_since = MAX(oldparentseq, newparent->get_last_created()) + 1;
463 }
464 }
465
466 void CInode::pop_projected_snaprealm(sr_t *next_snaprealm)
467 {
468 assert(next_snaprealm);
469 dout(10) << "pop_projected_snaprealm " << next_snaprealm
470 << " seq" << next_snaprealm->seq << dendl;
471 bool invalidate_cached_snaps = false;
472 if (!snaprealm) {
473 open_snaprealm();
474 } else if (next_snaprealm->past_parents.size() !=
475 snaprealm->srnode.past_parents.size()) {
476 invalidate_cached_snaps = true;
477 // re-open past parents
478 snaprealm->_close_parents();
479
480 dout(10) << " realm " << *snaprealm << " past_parents " << snaprealm->srnode.past_parents
481 << " -> " << next_snaprealm->past_parents << dendl;
482 }
483 snaprealm->srnode = *next_snaprealm;
484 delete next_snaprealm;
485
486 // we should be able to open these up (or have them already be open).
487 bool ok = snaprealm->_open_parents(NULL);
488 assert(ok);
489
490 if (invalidate_cached_snaps)
491 snaprealm->invalidate_cached_snaps();
492
493 if (snaprealm->parent)
494 dout(10) << " realm " << *snaprealm << " parent " << *snaprealm->parent << dendl;
495 }
496
497
498 // ====== CInode =======
499
500 // dirfrags
501
502 __u32 InodeStoreBase::hash_dentry_name(const string &dn)
503 {
504 int which = inode.dir_layout.dl_dir_hash;
505 if (!which)
506 which = CEPH_STR_HASH_LINUX;
507 assert(ceph_str_hash_valid(which));
508 return ceph_str_hash(which, dn.data(), dn.length());
509 }
510
511 frag_t InodeStoreBase::pick_dirfrag(const string& dn)
512 {
513 if (dirfragtree.empty())
514 return frag_t(); // avoid the string hash if we can.
515
516 __u32 h = hash_dentry_name(dn);
517 return dirfragtree[h];
518 }
519
520 bool CInode::get_dirfrags_under(frag_t fg, list<CDir*>& ls)
521 {
522 bool all = true;
523 list<frag_t> fglist;
524 dirfragtree.get_leaves_under(fg, fglist);
525 for (list<frag_t>::iterator p = fglist.begin(); p != fglist.end(); ++p)
526 if (dirfrags.count(*p))
527 ls.push_back(dirfrags[*p]);
528 else
529 all = false;
530
531 if (all)
532 return all;
533
534 fragtree_t tmpdft;
535 tmpdft.force_to_leaf(g_ceph_context, fg);
536 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin(); p != dirfrags.end(); ++p) {
537 tmpdft.force_to_leaf(g_ceph_context, p->first);
538 if (fg.contains(p->first) && !dirfragtree.is_leaf(p->first))
539 ls.push_back(p->second);
540 }
541
542 all = true;
543 tmpdft.get_leaves_under(fg, fglist);
544 for (list<frag_t>::iterator p = fglist.begin(); p != fglist.end(); ++p)
545 if (!dirfrags.count(*p)) {
546 all = false;
547 break;
548 }
549
550 return all;
551 }
552
553 void CInode::verify_dirfrags()
554 {
555 bool bad = false;
556 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin(); p != dirfrags.end(); ++p) {
557 if (!dirfragtree.is_leaf(p->first)) {
558 dout(0) << "have open dirfrag " << p->first << " but not leaf in " << dirfragtree
559 << ": " << *p->second << dendl;
560 bad = true;
561 }
562 }
563 assert(!bad);
564 }
565
566 void CInode::force_dirfrags()
567 {
568 bool bad = false;
569 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin(); p != dirfrags.end(); ++p) {
570 if (!dirfragtree.is_leaf(p->first)) {
571 dout(0) << "have open dirfrag " << p->first << " but not leaf in " << dirfragtree
572 << ": " << *p->second << dendl;
573 bad = true;
574 }
575 }
576
577 if (bad) {
578 list<frag_t> leaves;
579 dirfragtree.get_leaves(leaves);
580 for (list<frag_t>::iterator p = leaves.begin(); p != leaves.end(); ++p)
581 mdcache->get_force_dirfrag(dirfrag_t(ino(),*p), true);
582 }
583
584 verify_dirfrags();
585 }
586
587 CDir *CInode::get_approx_dirfrag(frag_t fg)
588 {
589 CDir *dir = get_dirfrag(fg);
590 if (dir) return dir;
591
592 // find a child?
593 list<CDir*> ls;
594 get_dirfrags_under(fg, ls);
595 if (!ls.empty())
596 return ls.front();
597
598 // try parents?
599 while (fg.bits() > 0) {
600 fg = fg.parent();
601 dir = get_dirfrag(fg);
602 if (dir) return dir;
603 }
604 return NULL;
605 }
606
607 void CInode::get_dirfrags(list<CDir*>& ls)
608 {
609 // all dirfrags
610 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
611 p != dirfrags.end();
612 ++p)
613 ls.push_back(p->second);
614 }
615 void CInode::get_nested_dirfrags(list<CDir*>& ls)
616 {
617 // dirfrags in same subtree
618 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
619 p != dirfrags.end();
620 ++p)
621 if (!p->second->is_subtree_root())
622 ls.push_back(p->second);
623 }
624 void CInode::get_subtree_dirfrags(list<CDir*>& ls)
625 {
626 // dirfrags that are roots of new subtrees
627 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
628 p != dirfrags.end();
629 ++p)
630 if (p->second->is_subtree_root())
631 ls.push_back(p->second);
632 }
633
634
635 CDir *CInode::get_or_open_dirfrag(MDCache *mdcache, frag_t fg)
636 {
637 assert(is_dir());
638
639 // have it?
640 CDir *dir = get_dirfrag(fg);
641 if (!dir) {
642 // create it.
643 assert(is_auth() || mdcache->mds->is_any_replay());
644 dir = new CDir(this, fg, mdcache, is_auth());
645 add_dirfrag(dir);
646 }
647 return dir;
648 }
649
650 CDir *CInode::add_dirfrag(CDir *dir)
651 {
652 assert(dirfrags.count(dir->dirfrag().frag) == 0);
653 dirfrags[dir->dirfrag().frag] = dir;
654
655 if (stickydir_ref > 0) {
656 dir->state_set(CDir::STATE_STICKY);
657 dir->get(CDir::PIN_STICKY);
658 }
659
660 maybe_export_pin();
661
662 return dir;
663 }
664
665 void CInode::close_dirfrag(frag_t fg)
666 {
667 dout(14) << "close_dirfrag " << fg << dendl;
668 assert(dirfrags.count(fg));
669
670 CDir *dir = dirfrags[fg];
671 dir->remove_null_dentries();
672
673 // clear dirty flag
674 if (dir->is_dirty())
675 dir->mark_clean();
676
677 if (stickydir_ref > 0) {
678 dir->state_clear(CDir::STATE_STICKY);
679 dir->put(CDir::PIN_STICKY);
680 }
681
682 // dump any remaining dentries, for debugging purposes
683 for (CDir::map_t::iterator p = dir->items.begin();
684 p != dir->items.end();
685 ++p)
686 dout(14) << "close_dirfrag LEFTOVER dn " << *p->second << dendl;
687
688 assert(dir->get_num_ref() == 0);
689 delete dir;
690 dirfrags.erase(fg);
691 }
692
693 void CInode::close_dirfrags()
694 {
695 while (!dirfrags.empty())
696 close_dirfrag(dirfrags.begin()->first);
697 }
698
699 bool CInode::has_subtree_root_dirfrag(int auth)
700 {
701 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
702 p != dirfrags.end();
703 ++p)
704 if (p->second->is_subtree_root() &&
705 (auth == -1 || p->second->dir_auth.first == auth))
706 return true;
707 return false;
708 }
709
710 bool CInode::has_subtree_or_exporting_dirfrag()
711 {
712 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
713 p != dirfrags.end();
714 ++p)
715 if (p->second->is_subtree_root() ||
716 p->second->state_test(CDir::STATE_EXPORTING))
717 return true;
718 return false;
719 }
720
721 void CInode::get_stickydirs()
722 {
723 if (stickydir_ref == 0) {
724 get(PIN_STICKYDIRS);
725 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
726 p != dirfrags.end();
727 ++p) {
728 p->second->state_set(CDir::STATE_STICKY);
729 p->second->get(CDir::PIN_STICKY);
730 }
731 }
732 stickydir_ref++;
733 }
734
735 void CInode::put_stickydirs()
736 {
737 assert(stickydir_ref > 0);
738 stickydir_ref--;
739 if (stickydir_ref == 0) {
740 put(PIN_STICKYDIRS);
741 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
742 p != dirfrags.end();
743 ++p) {
744 p->second->state_clear(CDir::STATE_STICKY);
745 p->second->put(CDir::PIN_STICKY);
746 }
747 }
748 }
749
750
751
752
753
754 // pins
755
756 void CInode::first_get()
757 {
758 // pin my dentry?
759 if (parent)
760 parent->get(CDentry::PIN_INODEPIN);
761 }
762
763 void CInode::last_put()
764 {
765 // unpin my dentry?
766 if (parent)
767 parent->put(CDentry::PIN_INODEPIN);
768 }
769
770 void CInode::_put()
771 {
772 if (get_num_ref() == (int)is_dirty() + (int)is_dirty_parent())
773 mdcache->maybe_eval_stray(this, true);
774 }
775
776 void CInode::add_remote_parent(CDentry *p)
777 {
778 if (remote_parents.empty())
779 get(PIN_REMOTEPARENT);
780 remote_parents.insert(p);
781 }
782 void CInode::remove_remote_parent(CDentry *p)
783 {
784 remote_parents.erase(p);
785 if (remote_parents.empty())
786 put(PIN_REMOTEPARENT);
787 }
788
789
790
791
792 CDir *CInode::get_parent_dir()
793 {
794 if (parent)
795 return parent->dir;
796 return NULL;
797 }
798 CDir *CInode::get_projected_parent_dir()
799 {
800 CDentry *p = get_projected_parent_dn();
801 if (p)
802 return p->dir;
803 return NULL;
804 }
805 CInode *CInode::get_parent_inode()
806 {
807 if (parent)
808 return parent->dir->inode;
809 return NULL;
810 }
811
812 bool CInode::is_projected_ancestor_of(CInode *other)
813 {
814 while (other) {
815 if (other == this)
816 return true;
817 if (!other->get_projected_parent_dn())
818 break;
819 other = other->get_projected_parent_dn()->get_dir()->get_inode();
820 }
821 return false;
822 }
823
824 /*
825 * Because a non-directory inode may have multiple links, the use_parent
826 * argument allows selecting which parent to use for path construction. This
827 * argument is only meaningful for the final component (i.e. the first of the
828 * nested calls) because directories cannot have multiple hard links. If
829 * use_parent is NULL and projected is true, the primary parent's projected
830 * inode is used all the way up the path chain. Otherwise the primary parent
831 * stable inode is used.
832 */
833 void CInode::make_path_string(string& s, bool projected, const CDentry *use_parent) const
834 {
835 if (!use_parent) {
836 use_parent = projected ? get_projected_parent_dn() : parent;
837 }
838
839 if (use_parent) {
840 use_parent->make_path_string(s, projected);
841 } else if (is_root()) {
842 s = "";
843 } else if (is_mdsdir()) {
844 char t[40];
845 uint64_t eino(ino());
846 eino -= MDS_INO_MDSDIR_OFFSET;
847 snprintf(t, sizeof(t), "~mds%" PRId64, eino);
848 s = t;
849 } else {
850 char n[40];
851 uint64_t eino(ino());
852 snprintf(n, sizeof(n), "#%" PRIx64, eino);
853 s += n;
854 }
855 }
856
857 void CInode::make_path(filepath& fp, bool projected) const
858 {
859 const CDentry *use_parent = projected ? get_projected_parent_dn() : parent;
860 if (use_parent) {
861 assert(!is_base());
862 use_parent->make_path(fp, projected);
863 } else {
864 fp = filepath(ino());
865 }
866 }
867
868 void CInode::name_stray_dentry(string& dname)
869 {
870 char s[20];
871 snprintf(s, sizeof(s), "%llx", (unsigned long long)inode.ino.val);
872 dname = s;
873 }
874
875 version_t CInode::pre_dirty()
876 {
877 version_t pv;
878 CDentry* _cdentry = get_projected_parent_dn();
879 if (_cdentry) {
880 pv = _cdentry->pre_dirty(get_projected_version());
881 dout(10) << "pre_dirty " << pv << " (current v " << inode.version << ")" << dendl;
882 } else {
883 assert(is_base());
884 pv = get_projected_version() + 1;
885 }
886 // force update backtrace for old format inode (see inode_t::decode)
887 if (inode.backtrace_version == 0 && !projected_nodes.empty()) {
888 inode_t *pi = projected_nodes.back()->inode;
889 if (pi->backtrace_version == 0)
890 pi->update_backtrace(pv);
891 }
892 return pv;
893 }
894
895 void CInode::_mark_dirty(LogSegment *ls)
896 {
897 if (!state_test(STATE_DIRTY)) {
898 state_set(STATE_DIRTY);
899 get(PIN_DIRTY);
900 assert(ls);
901 }
902
903 // move myself to this segment's dirty list
904 if (ls)
905 ls->dirty_inodes.push_back(&item_dirty);
906 }
907
908 void CInode::mark_dirty(version_t pv, LogSegment *ls) {
909
910 dout(10) << "mark_dirty " << *this << dendl;
911
912 /*
913 NOTE: I may already be dirty, but this fn _still_ needs to be called so that
914 the directory is (perhaps newly) dirtied, and so that parent_dir_version is
915 updated below.
916 */
917
918 // only auth can get dirty. "dirty" async data in replicas is relative to
919 // filelock state, not the dirty flag.
920 assert(is_auth());
921
922 // touch my private version
923 assert(inode.version < pv);
924 inode.version = pv;
925 _mark_dirty(ls);
926
927 // mark dentry too
928 if (parent)
929 parent->mark_dirty(pv, ls);
930 }
931
932
933 void CInode::mark_clean()
934 {
935 dout(10) << " mark_clean " << *this << dendl;
936 if (state_test(STATE_DIRTY)) {
937 state_clear(STATE_DIRTY);
938 put(PIN_DIRTY);
939
940 // remove myself from ls dirty list
941 item_dirty.remove_myself();
942 }
943 }
944
945
946 // --------------
947 // per-inode storage
948 // (currently for root inode only)
949
950 struct C_IO_Inode_Stored : public CInodeIOContext {
951 version_t version;
952 Context *fin;
953 C_IO_Inode_Stored(CInode *i, version_t v, Context *f) : CInodeIOContext(i), version(v), fin(f) {}
954 void finish(int r) override {
955 in->_stored(r, version, fin);
956 }
957 };
958
959 object_t InodeStoreBase::get_object_name(inodeno_t ino, frag_t fg, const char *suffix)
960 {
961 char n[60];
962 snprintf(n, sizeof(n), "%llx.%08llx%s", (long long unsigned)ino, (long long unsigned)fg, suffix ? suffix : "");
963 return object_t(n);
964 }
965
966 void CInode::store(MDSInternalContextBase *fin)
967 {
968 dout(10) << "store " << get_version() << dendl;
969 assert(is_base());
970
971 if (snaprealm)
972 purge_stale_snap_data(snaprealm->get_snaps());
973
974 // encode
975 bufferlist bl;
976 string magic = CEPH_FS_ONDISK_MAGIC;
977 ::encode(magic, bl);
978 encode_store(bl, mdcache->mds->mdsmap->get_up_features());
979
980 // write it.
981 SnapContext snapc;
982 ObjectOperation m;
983 m.write_full(bl);
984
985 object_t oid = CInode::get_object_name(ino(), frag_t(), ".inode");
986 object_locator_t oloc(mdcache->mds->mdsmap->get_metadata_pool());
987
988 Context *newfin =
989 new C_OnFinisher(new C_IO_Inode_Stored(this, get_version(), fin),
990 mdcache->mds->finisher);
991 mdcache->mds->objecter->mutate(oid, oloc, m, snapc,
992 ceph::real_clock::now(), 0,
993 newfin);
994 }
995
996 void CInode::_stored(int r, version_t v, Context *fin)
997 {
998 if (r < 0) {
999 dout(1) << "store error " << r << " v " << v << " on " << *this << dendl;
1000 mdcache->mds->clog->error() << "failed to store ino " << ino() << " object,"
1001 << " errno " << r;
1002 mdcache->mds->handle_write_error(r);
1003 fin->complete(r);
1004 return;
1005 }
1006
1007 dout(10) << "_stored " << v << " on " << *this << dendl;
1008 if (v == get_projected_version())
1009 mark_clean();
1010
1011 fin->complete(0);
1012 }
1013
1014 void CInode::flush(MDSInternalContextBase *fin)
1015 {
1016 dout(10) << "flush " << *this << dendl;
1017 assert(is_auth() && can_auth_pin());
1018
1019 MDSGatherBuilder gather(g_ceph_context);
1020
1021 if (is_dirty_parent()) {
1022 store_backtrace(gather.new_sub());
1023 }
1024 if (is_dirty()) {
1025 if (is_base()) {
1026 store(gather.new_sub());
1027 } else {
1028 parent->dir->commit(0, gather.new_sub());
1029 }
1030 }
1031
1032 if (gather.has_subs()) {
1033 gather.set_finisher(fin);
1034 gather.activate();
1035 } else {
1036 fin->complete(0);
1037 }
1038 }
1039
1040 struct C_IO_Inode_Fetched : public CInodeIOContext {
1041 bufferlist bl, bl2;
1042 Context *fin;
1043 C_IO_Inode_Fetched(CInode *i, Context *f) : CInodeIOContext(i), fin(f) {}
1044 void finish(int r) override {
1045 // Ignore 'r', because we fetch from two places, so r is usually ENOENT
1046 in->_fetched(bl, bl2, fin);
1047 }
1048 };
1049
1050 void CInode::fetch(MDSInternalContextBase *fin)
1051 {
1052 dout(10) << "fetch" << dendl;
1053
1054 C_IO_Inode_Fetched *c = new C_IO_Inode_Fetched(this, fin);
1055 C_GatherBuilder gather(g_ceph_context, new C_OnFinisher(c, mdcache->mds->finisher));
1056
1057 object_t oid = CInode::get_object_name(ino(), frag_t(), "");
1058 object_locator_t oloc(mdcache->mds->mdsmap->get_metadata_pool());
1059
1060 // Old on-disk format: inode stored in xattr of a dirfrag
1061 ObjectOperation rd;
1062 rd.getxattr("inode", &c->bl, NULL);
1063 mdcache->mds->objecter->read(oid, oloc, rd, CEPH_NOSNAP, (bufferlist*)NULL, 0, gather.new_sub());
1064
1065 // Current on-disk format: inode stored in a .inode object
1066 object_t oid2 = CInode::get_object_name(ino(), frag_t(), ".inode");
1067 mdcache->mds->objecter->read(oid2, oloc, 0, 0, CEPH_NOSNAP, &c->bl2, 0, gather.new_sub());
1068
1069 gather.activate();
1070 }
1071
1072 void CInode::_fetched(bufferlist& bl, bufferlist& bl2, Context *fin)
1073 {
1074 dout(10) << "_fetched got " << bl.length() << " and " << bl2.length() << dendl;
1075 bufferlist::iterator p;
1076 if (bl2.length()) {
1077 p = bl2.begin();
1078 } else if (bl.length()) {
1079 p = bl.begin();
1080 } else {
1081 derr << "No data while reading inode 0x" << std::hex << ino()
1082 << std::dec << dendl;
1083 fin->complete(-ENOENT);
1084 return;
1085 }
1086
1087 // Attempt decode
1088 try {
1089 string magic;
1090 ::decode(magic, p);
1091 dout(10) << " magic is '" << magic << "' (expecting '"
1092 << CEPH_FS_ONDISK_MAGIC << "')" << dendl;
1093 if (magic != CEPH_FS_ONDISK_MAGIC) {
1094 dout(0) << "on disk magic '" << magic << "' != my magic '" << CEPH_FS_ONDISK_MAGIC
1095 << "'" << dendl;
1096 fin->complete(-EINVAL);
1097 } else {
1098 decode_store(p);
1099 dout(10) << "_fetched " << *this << dendl;
1100 fin->complete(0);
1101 }
1102 } catch (buffer::error &err) {
1103 derr << "Corrupt inode 0x" << std::hex << ino() << std::dec
1104 << ": " << err << dendl;
1105 fin->complete(-EINVAL);
1106 return;
1107 }
1108 }
1109
1110 void CInode::build_backtrace(int64_t pool, inode_backtrace_t& bt)
1111 {
1112 bt.ino = inode.ino;
1113 bt.ancestors.clear();
1114 bt.pool = pool;
1115
1116 CInode *in = this;
1117 CDentry *pdn = get_parent_dn();
1118 while (pdn) {
1119 CInode *diri = pdn->get_dir()->get_inode();
1120 bt.ancestors.push_back(inode_backpointer_t(diri->ino(), pdn->name, in->inode.version));
1121 in = diri;
1122 pdn = in->get_parent_dn();
1123 }
1124 for (compact_set<int64_t>::iterator i = inode.old_pools.begin();
1125 i != inode.old_pools.end();
1126 ++i) {
1127 // don't add our own pool id to old_pools to avoid looping (e.g. setlayout 0, 1, 0)
1128 if (*i != pool)
1129 bt.old_pools.insert(*i);
1130 }
1131 }
1132
1133 struct C_IO_Inode_StoredBacktrace : public CInodeIOContext {
1134 version_t version;
1135 Context *fin;
1136 C_IO_Inode_StoredBacktrace(CInode *i, version_t v, Context *f) : CInodeIOContext(i), version(v), fin(f) {}
1137 void finish(int r) override {
1138 in->_stored_backtrace(r, version, fin);
1139 }
1140 };
1141
1142 void CInode::store_backtrace(MDSInternalContextBase *fin, int op_prio)
1143 {
1144 dout(10) << "store_backtrace on " << *this << dendl;
1145 assert(is_dirty_parent());
1146
1147 if (op_prio < 0)
1148 op_prio = CEPH_MSG_PRIO_DEFAULT;
1149
1150 auth_pin(this);
1151
1152 const int64_t pool = get_backtrace_pool();
1153 inode_backtrace_t bt;
1154 build_backtrace(pool, bt);
1155 bufferlist parent_bl;
1156 ::encode(bt, parent_bl);
1157
1158 ObjectOperation op;
1159 op.priority = op_prio;
1160 op.create(false);
1161 op.setxattr("parent", parent_bl);
1162
1163 bufferlist layout_bl;
1164 ::encode(inode.layout, layout_bl, mdcache->mds->mdsmap->get_up_features());
1165 op.setxattr("layout", layout_bl);
1166
1167 SnapContext snapc;
1168 object_t oid = get_object_name(ino(), frag_t(), "");
1169 object_locator_t oloc(pool);
1170 Context *fin2 = new C_OnFinisher(
1171 new C_IO_Inode_StoredBacktrace(this, inode.backtrace_version, fin),
1172 mdcache->mds->finisher);
1173
1174 if (!state_test(STATE_DIRTYPOOL) || inode.old_pools.empty()) {
1175 dout(20) << __func__ << ": no dirtypool or no old pools" << dendl;
1176 mdcache->mds->objecter->mutate(oid, oloc, op, snapc,
1177 ceph::real_clock::now(),
1178 0, fin2);
1179 return;
1180 }
1181
1182 C_GatherBuilder gather(g_ceph_context, fin2);
1183 mdcache->mds->objecter->mutate(oid, oloc, op, snapc,
1184 ceph::real_clock::now(),
1185 0, gather.new_sub());
1186
1187 // In the case where DIRTYPOOL is set, we update all old pools backtraces
1188 // such that anyone reading them will see the new pool ID in
1189 // inode_backtrace_t::pool and go read everything else from there.
1190 for (compact_set<int64_t>::iterator p = inode.old_pools.begin();
1191 p != inode.old_pools.end();
1192 ++p) {
1193 if (*p == pool)
1194 continue;
1195
1196 dout(20) << __func__ << ": updating old pool " << *p << dendl;
1197
1198 ObjectOperation op;
1199 op.priority = op_prio;
1200 op.create(false);
1201 op.setxattr("parent", parent_bl);
1202
1203 object_locator_t oloc(*p);
1204 mdcache->mds->objecter->mutate(oid, oloc, op, snapc,
1205 ceph::real_clock::now(),
1206 0, gather.new_sub());
1207 }
1208 gather.activate();
1209 }
1210
1211 void CInode::_stored_backtrace(int r, version_t v, Context *fin)
1212 {
1213 if (r == -ENOENT) {
1214 const int64_t pool = get_backtrace_pool();
1215 bool exists = mdcache->mds->objecter->with_osdmap(
1216 [pool](const OSDMap &osd_map) {
1217 return osd_map.have_pg_pool(pool);
1218 });
1219
1220 // This ENOENT is because the pool doesn't exist (the user deleted it
1221 // out from under us), so the backtrace can never be written, so pretend
1222 // to succeed so that the user can proceed to e.g. delete the file.
1223 if (!exists) {
1224 dout(4) << "store_backtrace got ENOENT: a data pool was deleted "
1225 "beneath us!" << dendl;
1226 r = 0;
1227 }
1228 }
1229
1230 if (r < 0) {
1231 dout(1) << "store backtrace error " << r << " v " << v << dendl;
1232 mdcache->mds->clog->error() << "failed to store backtrace on ino "
1233 << ino() << " object"
1234 << ", pool " << get_backtrace_pool()
1235 << ", errno " << r;
1236 mdcache->mds->handle_write_error(r);
1237 if (fin)
1238 fin->complete(r);
1239 return;
1240 }
1241
1242 dout(10) << "_stored_backtrace v " << v << dendl;
1243
1244 auth_unpin(this);
1245 if (v == inode.backtrace_version)
1246 clear_dirty_parent();
1247 if (fin)
1248 fin->complete(0);
1249 }
1250
1251 void CInode::fetch_backtrace(Context *fin, bufferlist *backtrace)
1252 {
1253 mdcache->fetch_backtrace(inode.ino, get_backtrace_pool(), *backtrace, fin);
1254 }
1255
1256 void CInode::_mark_dirty_parent(LogSegment *ls, bool dirty_pool)
1257 {
1258 if (!state_test(STATE_DIRTYPARENT)) {
1259 dout(10) << "mark_dirty_parent" << dendl;
1260 state_set(STATE_DIRTYPARENT);
1261 get(PIN_DIRTYPARENT);
1262 assert(ls);
1263 }
1264 if (dirty_pool)
1265 state_set(STATE_DIRTYPOOL);
1266 if (ls)
1267 ls->dirty_parent_inodes.push_back(&item_dirty_parent);
1268 }
1269
1270 void CInode::clear_dirty_parent()
1271 {
1272 if (state_test(STATE_DIRTYPARENT)) {
1273 dout(10) << "clear_dirty_parent" << dendl;
1274 state_clear(STATE_DIRTYPARENT);
1275 state_clear(STATE_DIRTYPOOL);
1276 put(PIN_DIRTYPARENT);
1277 item_dirty_parent.remove_myself();
1278 }
1279 }
1280
1281 void CInode::verify_diri_backtrace(bufferlist &bl, int err)
1282 {
1283 if (is_base() || is_dirty_parent() || !is_auth())
1284 return;
1285
1286 dout(10) << "verify_diri_backtrace" << dendl;
1287
1288 if (err == 0) {
1289 inode_backtrace_t backtrace;
1290 ::decode(backtrace, bl);
1291 CDentry *pdn = get_parent_dn();
1292 if (backtrace.ancestors.empty() ||
1293 backtrace.ancestors[0].dname != pdn->name ||
1294 backtrace.ancestors[0].dirino != pdn->get_dir()->ino())
1295 err = -EINVAL;
1296 }
1297
1298 if (err) {
1299 MDSRank *mds = mdcache->mds;
1300 mds->clog->error() << "bad backtrace on dir ino " << ino();
1301 assert(!"bad backtrace" == (g_conf->mds_verify_backtrace > 1));
1302
1303 _mark_dirty_parent(mds->mdlog->get_current_segment(), false);
1304 mds->mdlog->flush();
1305 }
1306 }
1307
1308 // ------------------
1309 // parent dir
1310
1311
1312 void InodeStoreBase::encode_bare(bufferlist &bl, uint64_t features,
1313 const bufferlist *snap_blob) const
1314 {
1315 ::encode(inode, bl, features);
1316 if (is_symlink())
1317 ::encode(symlink, bl);
1318 ::encode(dirfragtree, bl);
1319 ::encode(xattrs, bl);
1320 if (snap_blob)
1321 ::encode(*snap_blob, bl);
1322 else
1323 ::encode(bufferlist(), bl);
1324 ::encode(old_inodes, bl, features);
1325 ::encode(oldest_snap, bl);
1326 ::encode(damage_flags, bl);
1327 }
1328
1329 void InodeStoreBase::encode(bufferlist &bl, uint64_t features,
1330 const bufferlist *snap_blob) const
1331 {
1332 ENCODE_START(6, 4, bl);
1333 encode_bare(bl, features, snap_blob);
1334 ENCODE_FINISH(bl);
1335 }
1336
1337 void CInode::encode_store(bufferlist& bl, uint64_t features)
1338 {
1339 bufferlist snap_blob;
1340 encode_snap_blob(snap_blob);
1341 InodeStoreBase::encode(bl, mdcache->mds->mdsmap->get_up_features(),
1342 &snap_blob);
1343 }
1344
1345 void InodeStoreBase::decode_bare(bufferlist::iterator &bl,
1346 bufferlist& snap_blob, __u8 struct_v)
1347 {
1348 ::decode(inode, bl);
1349 if (is_symlink())
1350 ::decode(symlink, bl);
1351 ::decode(dirfragtree, bl);
1352 ::decode(xattrs, bl);
1353 ::decode(snap_blob, bl);
1354
1355 ::decode(old_inodes, bl);
1356 if (struct_v == 2 && inode.is_dir()) {
1357 bool default_layout_exists;
1358 ::decode(default_layout_exists, bl);
1359 if (default_layout_exists) {
1360 ::decode(struct_v, bl); // this was a default_file_layout
1361 ::decode(inode.layout, bl); // but we only care about the layout portion
1362 }
1363 }
1364
1365 if (struct_v >= 5) {
1366 // InodeStore is embedded in dentries without proper versioning, so
1367 // we consume up to the end of the buffer
1368 if (!bl.end()) {
1369 ::decode(oldest_snap, bl);
1370 }
1371
1372 if (!bl.end()) {
1373 ::decode(damage_flags, bl);
1374 }
1375 }
1376 }
1377
1378
1379 void InodeStoreBase::decode(bufferlist::iterator &bl, bufferlist& snap_blob)
1380 {
1381 DECODE_START_LEGACY_COMPAT_LEN(5, 4, 4, bl);
1382 decode_bare(bl, snap_blob, struct_v);
1383 DECODE_FINISH(bl);
1384 }
1385
1386 void CInode::decode_store(bufferlist::iterator& bl)
1387 {
1388 bufferlist snap_blob;
1389 InodeStoreBase::decode(bl, snap_blob);
1390 decode_snap_blob(snap_blob);
1391 }
1392
1393 // ------------------
1394 // locking
1395
1396 void CInode::set_object_info(MDSCacheObjectInfo &info)
1397 {
1398 info.ino = ino();
1399 info.snapid = last;
1400 }
1401
1402 void CInode::encode_lock_state(int type, bufferlist& bl)
1403 {
1404 ::encode(first, bl);
1405
1406 switch (type) {
1407 case CEPH_LOCK_IAUTH:
1408 ::encode(inode.version, bl);
1409 ::encode(inode.ctime, bl);
1410 ::encode(inode.mode, bl);
1411 ::encode(inode.uid, bl);
1412 ::encode(inode.gid, bl);
1413 break;
1414
1415 case CEPH_LOCK_ILINK:
1416 ::encode(inode.version, bl);
1417 ::encode(inode.ctime, bl);
1418 ::encode(inode.nlink, bl);
1419 break;
1420
1421 case CEPH_LOCK_IDFT:
1422 if (is_auth()) {
1423 ::encode(inode.version, bl);
1424 } else {
1425 // treat flushing as dirty when rejoining cache
1426 bool dirty = dirfragtreelock.is_dirty_or_flushing();
1427 ::encode(dirty, bl);
1428 }
1429 {
1430 // encode the raw tree
1431 ::encode(dirfragtree, bl);
1432
1433 // also specify which frags are mine
1434 set<frag_t> myfrags;
1435 list<CDir*> dfls;
1436 get_dirfrags(dfls);
1437 for (list<CDir*>::iterator p = dfls.begin(); p != dfls.end(); ++p)
1438 if ((*p)->is_auth()) {
1439 frag_t fg = (*p)->get_frag();
1440 myfrags.insert(fg);
1441 }
1442 ::encode(myfrags, bl);
1443 }
1444 break;
1445
1446 case CEPH_LOCK_IFILE:
1447 if (is_auth()) {
1448 ::encode(inode.version, bl);
1449 ::encode(inode.ctime, bl);
1450 ::encode(inode.mtime, bl);
1451 ::encode(inode.atime, bl);
1452 ::encode(inode.time_warp_seq, bl);
1453 if (!is_dir()) {
1454 ::encode(inode.layout, bl, mdcache->mds->mdsmap->get_up_features());
1455 ::encode(inode.size, bl);
1456 ::encode(inode.truncate_seq, bl);
1457 ::encode(inode.truncate_size, bl);
1458 ::encode(inode.client_ranges, bl);
1459 ::encode(inode.inline_data, bl);
1460 }
1461 } else {
1462 // treat flushing as dirty when rejoining cache
1463 bool dirty = filelock.is_dirty_or_flushing();
1464 ::encode(dirty, bl);
1465 }
1466
1467 {
1468 dout(15) << "encode_lock_state inode.dirstat is " << inode.dirstat << dendl;
1469 ::encode(inode.dirstat, bl); // only meaningful if i am auth.
1470 bufferlist tmp;
1471 __u32 n = 0;
1472 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
1473 p != dirfrags.end();
1474 ++p) {
1475 frag_t fg = p->first;
1476 CDir *dir = p->second;
1477 if (is_auth() || dir->is_auth()) {
1478 fnode_t *pf = dir->get_projected_fnode();
1479 dout(15) << fg << " " << *dir << dendl;
1480 dout(20) << fg << " fragstat " << pf->fragstat << dendl;
1481 dout(20) << fg << " accounted_fragstat " << pf->accounted_fragstat << dendl;
1482 ::encode(fg, tmp);
1483 ::encode(dir->first, tmp);
1484 ::encode(pf->fragstat, tmp);
1485 ::encode(pf->accounted_fragstat, tmp);
1486 n++;
1487 }
1488 }
1489 ::encode(n, bl);
1490 bl.claim_append(tmp);
1491 }
1492 break;
1493
1494 case CEPH_LOCK_INEST:
1495 if (is_auth()) {
1496 ::encode(inode.version, bl);
1497 } else {
1498 // treat flushing as dirty when rejoining cache
1499 bool dirty = nestlock.is_dirty_or_flushing();
1500 ::encode(dirty, bl);
1501 }
1502 {
1503 dout(15) << "encode_lock_state inode.rstat is " << inode.rstat << dendl;
1504 ::encode(inode.rstat, bl); // only meaningful if i am auth.
1505 bufferlist tmp;
1506 __u32 n = 0;
1507 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
1508 p != dirfrags.end();
1509 ++p) {
1510 frag_t fg = p->first;
1511 CDir *dir = p->second;
1512 if (is_auth() || dir->is_auth()) {
1513 fnode_t *pf = dir->get_projected_fnode();
1514 dout(10) << fg << " " << *dir << dendl;
1515 dout(10) << fg << " " << pf->rstat << dendl;
1516 dout(10) << fg << " " << pf->rstat << dendl;
1517 dout(10) << fg << " " << dir->dirty_old_rstat << dendl;
1518 ::encode(fg, tmp);
1519 ::encode(dir->first, tmp);
1520 ::encode(pf->rstat, tmp);
1521 ::encode(pf->accounted_rstat, tmp);
1522 ::encode(dir->dirty_old_rstat, tmp);
1523 n++;
1524 }
1525 }
1526 ::encode(n, bl);
1527 bl.claim_append(tmp);
1528 }
1529 break;
1530
1531 case CEPH_LOCK_IXATTR:
1532 ::encode(inode.version, bl);
1533 ::encode(inode.ctime, bl);
1534 ::encode(xattrs, bl);
1535 break;
1536
1537 case CEPH_LOCK_ISNAP:
1538 ::encode(inode.version, bl);
1539 ::encode(inode.ctime, bl);
1540 encode_snap(bl);
1541 break;
1542
1543 case CEPH_LOCK_IFLOCK:
1544 ::encode(inode.version, bl);
1545 _encode_file_locks(bl);
1546 break;
1547
1548 case CEPH_LOCK_IPOLICY:
1549 if (inode.is_dir()) {
1550 ::encode(inode.version, bl);
1551 ::encode(inode.ctime, bl);
1552 ::encode(inode.layout, bl, mdcache->mds->mdsmap->get_up_features());
1553 ::encode(inode.quota, bl);
1554 ::encode(inode.export_pin, bl);
1555 }
1556 break;
1557
1558 default:
1559 ceph_abort();
1560 }
1561 }
1562
1563
1564 /* for more info on scatterlocks, see comments by Locker::scatter_writebehind */
1565
1566 void CInode::decode_lock_state(int type, bufferlist& bl)
1567 {
1568 bufferlist::iterator p = bl.begin();
1569 utime_t tm;
1570
1571 snapid_t newfirst;
1572 ::decode(newfirst, p);
1573
1574 if (!is_auth() && newfirst != first) {
1575 dout(10) << "decode_lock_state first " << first << " -> " << newfirst << dendl;
1576 assert(newfirst > first);
1577 if (!is_multiversion() && parent) {
1578 assert(parent->first == first);
1579 parent->first = newfirst;
1580 }
1581 first = newfirst;
1582 }
1583
1584 switch (type) {
1585 case CEPH_LOCK_IAUTH:
1586 ::decode(inode.version, p);
1587 ::decode(tm, p);
1588 if (inode.ctime < tm) inode.ctime = tm;
1589 ::decode(inode.mode, p);
1590 ::decode(inode.uid, p);
1591 ::decode(inode.gid, p);
1592 break;
1593
1594 case CEPH_LOCK_ILINK:
1595 ::decode(inode.version, p);
1596 ::decode(tm, p);
1597 if (inode.ctime < tm) inode.ctime = tm;
1598 ::decode(inode.nlink, p);
1599 break;
1600
1601 case CEPH_LOCK_IDFT:
1602 if (is_auth()) {
1603 bool replica_dirty;
1604 ::decode(replica_dirty, p);
1605 if (replica_dirty) {
1606 dout(10) << "decode_lock_state setting dftlock dirty flag" << dendl;
1607 dirfragtreelock.mark_dirty(); // ok bc we're auth and caller will handle
1608 }
1609 } else {
1610 ::decode(inode.version, p);
1611 }
1612 {
1613 fragtree_t temp;
1614 ::decode(temp, p);
1615 set<frag_t> authfrags;
1616 ::decode(authfrags, p);
1617 if (is_auth()) {
1618 // auth. believe replica's auth frags only.
1619 for (set<frag_t>::iterator p = authfrags.begin(); p != authfrags.end(); ++p)
1620 if (!dirfragtree.is_leaf(*p)) {
1621 dout(10) << " forcing frag " << *p << " to leaf (split|merge)" << dendl;
1622 dirfragtree.force_to_leaf(g_ceph_context, *p);
1623 dirfragtreelock.mark_dirty(); // ok bc we're auth and caller will handle
1624 }
1625 } else {
1626 // replica. take the new tree, BUT make sure any open
1627 // dirfrags remain leaves (they may have split _after_ this
1628 // dft was scattered, or we may still be be waiting on the
1629 // notify from the auth)
1630 dirfragtree.swap(temp);
1631 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
1632 p != dirfrags.end();
1633 ++p) {
1634 if (!dirfragtree.is_leaf(p->first)) {
1635 dout(10) << " forcing open dirfrag " << p->first << " to leaf (racing with split|merge)" << dendl;
1636 dirfragtree.force_to_leaf(g_ceph_context, p->first);
1637 }
1638 if (p->second->is_auth())
1639 p->second->state_clear(CDir::STATE_DIRTYDFT);
1640 }
1641 }
1642 if (g_conf->mds_debug_frag)
1643 verify_dirfrags();
1644 }
1645 break;
1646
1647 case CEPH_LOCK_IFILE:
1648 if (!is_auth()) {
1649 ::decode(inode.version, p);
1650 ::decode(tm, p);
1651 if (inode.ctime < tm) inode.ctime = tm;
1652 ::decode(inode.mtime, p);
1653 ::decode(inode.atime, p);
1654 ::decode(inode.time_warp_seq, p);
1655 if (!is_dir()) {
1656 ::decode(inode.layout, p);
1657 ::decode(inode.size, p);
1658 ::decode(inode.truncate_seq, p);
1659 ::decode(inode.truncate_size, p);
1660 ::decode(inode.client_ranges, p);
1661 ::decode(inode.inline_data, p);
1662 }
1663 } else {
1664 bool replica_dirty;
1665 ::decode(replica_dirty, p);
1666 if (replica_dirty) {
1667 dout(10) << "decode_lock_state setting filelock dirty flag" << dendl;
1668 filelock.mark_dirty(); // ok bc we're auth and caller will handle
1669 }
1670 }
1671 {
1672 frag_info_t dirstat;
1673 ::decode(dirstat, p);
1674 if (!is_auth()) {
1675 dout(10) << " taking inode dirstat " << dirstat << " for " << *this << dendl;
1676 inode.dirstat = dirstat; // take inode summation if replica
1677 }
1678 __u32 n;
1679 ::decode(n, p);
1680 dout(10) << " ...got " << n << " fragstats on " << *this << dendl;
1681 while (n--) {
1682 frag_t fg;
1683 snapid_t fgfirst;
1684 frag_info_t fragstat;
1685 frag_info_t accounted_fragstat;
1686 ::decode(fg, p);
1687 ::decode(fgfirst, p);
1688 ::decode(fragstat, p);
1689 ::decode(accounted_fragstat, p);
1690 dout(10) << fg << " [" << fgfirst << ",head] " << dendl;
1691 dout(10) << fg << " fragstat " << fragstat << dendl;
1692 dout(20) << fg << " accounted_fragstat " << accounted_fragstat << dendl;
1693
1694 CDir *dir = get_dirfrag(fg);
1695 if (is_auth()) {
1696 assert(dir); // i am auth; i had better have this dir open
1697 dout(10) << fg << " first " << dir->first << " -> " << fgfirst
1698 << " on " << *dir << dendl;
1699 dir->first = fgfirst;
1700 dir->fnode.fragstat = fragstat;
1701 dir->fnode.accounted_fragstat = accounted_fragstat;
1702 dir->first = fgfirst;
1703 if (!(fragstat == accounted_fragstat)) {
1704 dout(10) << fg << " setting filelock updated flag" << dendl;
1705 filelock.mark_dirty(); // ok bc we're auth and caller will handle
1706 }
1707 } else {
1708 if (dir && dir->is_auth()) {
1709 dout(10) << fg << " first " << dir->first << " -> " << fgfirst
1710 << " on " << *dir << dendl;
1711 dir->first = fgfirst;
1712 fnode_t *pf = dir->get_projected_fnode();
1713 finish_scatter_update(&filelock, dir,
1714 inode.dirstat.version, pf->accounted_fragstat.version);
1715 }
1716 }
1717 }
1718 }
1719 break;
1720
1721 case CEPH_LOCK_INEST:
1722 if (is_auth()) {
1723 bool replica_dirty;
1724 ::decode(replica_dirty, p);
1725 if (replica_dirty) {
1726 dout(10) << "decode_lock_state setting nestlock dirty flag" << dendl;
1727 nestlock.mark_dirty(); // ok bc we're auth and caller will handle
1728 }
1729 } else {
1730 ::decode(inode.version, p);
1731 }
1732 {
1733 nest_info_t rstat;
1734 ::decode(rstat, p);
1735 if (!is_auth()) {
1736 dout(10) << " taking inode rstat " << rstat << " for " << *this << dendl;
1737 inode.rstat = rstat; // take inode summation if replica
1738 }
1739 __u32 n;
1740 ::decode(n, p);
1741 while (n--) {
1742 frag_t fg;
1743 snapid_t fgfirst;
1744 nest_info_t rstat;
1745 nest_info_t accounted_rstat;
1746 compact_map<snapid_t,old_rstat_t> dirty_old_rstat;
1747 ::decode(fg, p);
1748 ::decode(fgfirst, p);
1749 ::decode(rstat, p);
1750 ::decode(accounted_rstat, p);
1751 ::decode(dirty_old_rstat, p);
1752 dout(10) << fg << " [" << fgfirst << ",head]" << dendl;
1753 dout(10) << fg << " rstat " << rstat << dendl;
1754 dout(10) << fg << " accounted_rstat " << accounted_rstat << dendl;
1755 dout(10) << fg << " dirty_old_rstat " << dirty_old_rstat << dendl;
1756
1757 CDir *dir = get_dirfrag(fg);
1758 if (is_auth()) {
1759 assert(dir); // i am auth; i had better have this dir open
1760 dout(10) << fg << " first " << dir->first << " -> " << fgfirst
1761 << " on " << *dir << dendl;
1762 dir->first = fgfirst;
1763 dir->fnode.rstat = rstat;
1764 dir->fnode.accounted_rstat = accounted_rstat;
1765 dir->dirty_old_rstat.swap(dirty_old_rstat);
1766 if (!(rstat == accounted_rstat) || !dir->dirty_old_rstat.empty()) {
1767 dout(10) << fg << " setting nestlock updated flag" << dendl;
1768 nestlock.mark_dirty(); // ok bc we're auth and caller will handle
1769 }
1770 } else {
1771 if (dir && dir->is_auth()) {
1772 dout(10) << fg << " first " << dir->first << " -> " << fgfirst
1773 << " on " << *dir << dendl;
1774 dir->first = fgfirst;
1775 fnode_t *pf = dir->get_projected_fnode();
1776 finish_scatter_update(&nestlock, dir,
1777 inode.rstat.version, pf->accounted_rstat.version);
1778 }
1779 }
1780 }
1781 }
1782 break;
1783
1784 case CEPH_LOCK_IXATTR:
1785 ::decode(inode.version, p);
1786 ::decode(tm, p);
1787 if (inode.ctime < tm) inode.ctime = tm;
1788 ::decode(xattrs, p);
1789 break;
1790
1791 case CEPH_LOCK_ISNAP:
1792 {
1793 ::decode(inode.version, p);
1794 ::decode(tm, p);
1795 if (inode.ctime < tm) inode.ctime = tm;
1796 snapid_t seq = 0;
1797 if (snaprealm)
1798 seq = snaprealm->srnode.seq;
1799 decode_snap(p);
1800 if (snaprealm && snaprealm->srnode.seq != seq)
1801 mdcache->do_realm_invalidate_and_update_notify(this, seq ? CEPH_SNAP_OP_UPDATE:CEPH_SNAP_OP_SPLIT);
1802 }
1803 break;
1804
1805 case CEPH_LOCK_IFLOCK:
1806 ::decode(inode.version, p);
1807 _decode_file_locks(p);
1808 break;
1809
1810 case CEPH_LOCK_IPOLICY:
1811 if (inode.is_dir()) {
1812 ::decode(inode.version, p);
1813 ::decode(tm, p);
1814 if (inode.ctime < tm) inode.ctime = tm;
1815 ::decode(inode.layout, p);
1816 ::decode(inode.quota, p);
1817 mds_rank_t old_pin = inode.export_pin;
1818 ::decode(inode.export_pin, p);
1819 maybe_export_pin(old_pin != inode.export_pin);
1820 }
1821 break;
1822
1823 default:
1824 ceph_abort();
1825 }
1826 }
1827
1828
1829 bool CInode::is_dirty_scattered()
1830 {
1831 return
1832 filelock.is_dirty_or_flushing() ||
1833 nestlock.is_dirty_or_flushing() ||
1834 dirfragtreelock.is_dirty_or_flushing();
1835 }
1836
1837 void CInode::clear_scatter_dirty()
1838 {
1839 filelock.remove_dirty();
1840 nestlock.remove_dirty();
1841 dirfragtreelock.remove_dirty();
1842 }
1843
1844 void CInode::clear_dirty_scattered(int type)
1845 {
1846 dout(10) << "clear_dirty_scattered " << type << " on " << *this << dendl;
1847 switch (type) {
1848 case CEPH_LOCK_IFILE:
1849 item_dirty_dirfrag_dir.remove_myself();
1850 break;
1851
1852 case CEPH_LOCK_INEST:
1853 item_dirty_dirfrag_nest.remove_myself();
1854 break;
1855
1856 case CEPH_LOCK_IDFT:
1857 item_dirty_dirfrag_dirfragtree.remove_myself();
1858 break;
1859
1860 default:
1861 ceph_abort();
1862 }
1863 }
1864
1865
1866 /*
1867 * when we initially scatter a lock, we need to check if any of the dirfrags
1868 * have out of date accounted_rstat/fragstat. if so, mark the lock stale.
1869 */
1870 /* for more info on scatterlocks, see comments by Locker::scatter_writebehind */
1871 void CInode::start_scatter(ScatterLock *lock)
1872 {
1873 dout(10) << "start_scatter " << *lock << " on " << *this << dendl;
1874 assert(is_auth());
1875 inode_t *pi = get_projected_inode();
1876
1877 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
1878 p != dirfrags.end();
1879 ++p) {
1880 frag_t fg = p->first;
1881 CDir *dir = p->second;
1882 fnode_t *pf = dir->get_projected_fnode();
1883 dout(20) << fg << " " << *dir << dendl;
1884
1885 if (!dir->is_auth())
1886 continue;
1887
1888 switch (lock->get_type()) {
1889 case CEPH_LOCK_IFILE:
1890 finish_scatter_update(lock, dir, pi->dirstat.version, pf->accounted_fragstat.version);
1891 break;
1892
1893 case CEPH_LOCK_INEST:
1894 finish_scatter_update(lock, dir, pi->rstat.version, pf->accounted_rstat.version);
1895 break;
1896
1897 case CEPH_LOCK_IDFT:
1898 dir->state_clear(CDir::STATE_DIRTYDFT);
1899 break;
1900 }
1901 }
1902 }
1903
1904
1905 class C_Inode_FragUpdate : public MDSLogContextBase {
1906 protected:
1907 CInode *in;
1908 CDir *dir;
1909 MutationRef mut;
1910 MDSRank *get_mds() override {return in->mdcache->mds;}
1911 void finish(int r) override {
1912 in->_finish_frag_update(dir, mut);
1913 }
1914
1915 public:
1916 C_Inode_FragUpdate(CInode *i, CDir *d, MutationRef& m) : in(i), dir(d), mut(m) {}
1917 };
1918
1919 void CInode::finish_scatter_update(ScatterLock *lock, CDir *dir,
1920 version_t inode_version, version_t dir_accounted_version)
1921 {
1922 frag_t fg = dir->get_frag();
1923 assert(dir->is_auth());
1924
1925 if (dir->is_frozen()) {
1926 dout(10) << "finish_scatter_update " << fg << " frozen, marking " << *lock << " stale " << *dir << dendl;
1927 } else if (dir->get_version() == 0) {
1928 dout(10) << "finish_scatter_update " << fg << " not loaded, marking " << *lock << " stale " << *dir << dendl;
1929 } else {
1930 if (dir_accounted_version != inode_version) {
1931 dout(10) << "finish_scatter_update " << fg << " journaling accounted scatterstat update v" << inode_version << dendl;
1932
1933 MDLog *mdlog = mdcache->mds->mdlog;
1934 MutationRef mut(new MutationImpl());
1935 mut->ls = mdlog->get_current_segment();
1936
1937 inode_t *pi = get_projected_inode();
1938 fnode_t *pf = dir->project_fnode();
1939 pf->version = dir->pre_dirty();
1940
1941 const char *ename = 0;
1942 switch (lock->get_type()) {
1943 case CEPH_LOCK_IFILE:
1944 pf->fragstat.version = pi->dirstat.version;
1945 pf->accounted_fragstat = pf->fragstat;
1946 ename = "lock ifile accounted scatter stat update";
1947 break;
1948 case CEPH_LOCK_INEST:
1949 pf->rstat.version = pi->rstat.version;
1950 pf->accounted_rstat = pf->rstat;
1951 ename = "lock inest accounted scatter stat update";
1952 break;
1953 default:
1954 ceph_abort();
1955 }
1956
1957 mut->add_projected_fnode(dir);
1958
1959 EUpdate *le = new EUpdate(mdlog, ename);
1960 mdlog->start_entry(le);
1961 le->metablob.add_dir_context(dir);
1962 le->metablob.add_dir(dir, true);
1963
1964 assert(!dir->is_frozen());
1965 mut->auth_pin(dir);
1966
1967 mdlog->submit_entry(le, new C_Inode_FragUpdate(this, dir, mut));
1968 } else {
1969 dout(10) << "finish_scatter_update " << fg << " accounted " << *lock
1970 << " scatter stat unchanged at v" << dir_accounted_version << dendl;
1971 }
1972 }
1973 }
1974
1975 void CInode::_finish_frag_update(CDir *dir, MutationRef& mut)
1976 {
1977 dout(10) << "_finish_frag_update on " << *dir << dendl;
1978 mut->apply();
1979 mut->cleanup();
1980 }
1981
1982
1983 /*
1984 * when we gather a lock, we need to assimilate dirfrag changes into the inode
1985 * state. it's possible we can't update the dirfrag accounted_rstat/fragstat
1986 * because the frag is auth and frozen, or that the replica couldn't for the same
1987 * reason. hopefully it will get updated the next time the lock cycles.
1988 *
1989 * we have two dimensions of behavior:
1990 * - we may be (auth and !frozen), and able to update, or not.
1991 * - the frag may be stale, or not.
1992 *
1993 * if the frag is non-stale, we want to assimilate the diff into the
1994 * inode, regardless of whether it's auth or updateable.
1995 *
1996 * if we update the frag, we want to set accounted_fragstat = frag,
1997 * both if we took the diff or it was stale and we are making it
1998 * un-stale.
1999 */
2000 /* for more info on scatterlocks, see comments by Locker::scatter_writebehind */
2001 void CInode::finish_scatter_gather_update(int type)
2002 {
2003 LogChannelRef clog = mdcache->mds->clog;
2004
2005 dout(10) << "finish_scatter_gather_update " << type << " on " << *this << dendl;
2006 assert(is_auth());
2007
2008 switch (type) {
2009 case CEPH_LOCK_IFILE:
2010 {
2011 fragtree_t tmpdft = dirfragtree;
2012 struct frag_info_t dirstat;
2013 bool dirstat_valid = true;
2014
2015 // adjust summation
2016 assert(is_auth());
2017 inode_t *pi = get_projected_inode();
2018
2019 bool touched_mtime = false, touched_chattr = false;
2020 dout(20) << " orig dirstat " << pi->dirstat << dendl;
2021 pi->dirstat.version++;
2022 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
2023 p != dirfrags.end();
2024 ++p) {
2025 frag_t fg = p->first;
2026 CDir *dir = p->second;
2027 dout(20) << fg << " " << *dir << dendl;
2028
2029 bool update;
2030 if (dir->get_version() != 0) {
2031 update = dir->is_auth() && !dir->is_frozen();
2032 } else {
2033 update = false;
2034 dirstat_valid = false;
2035 }
2036
2037 fnode_t *pf = dir->get_projected_fnode();
2038 if (update)
2039 pf = dir->project_fnode();
2040
2041 if (pf->accounted_fragstat.version == pi->dirstat.version - 1) {
2042 dout(20) << fg << " fragstat " << pf->fragstat << dendl;
2043 dout(20) << fg << " accounted_fragstat " << pf->accounted_fragstat << dendl;
2044 pi->dirstat.add_delta(pf->fragstat, pf->accounted_fragstat, &touched_mtime, &touched_chattr);
2045 } else {
2046 dout(20) << fg << " skipping STALE accounted_fragstat " << pf->accounted_fragstat << dendl;
2047 }
2048
2049 if (pf->fragstat.nfiles < 0 ||
2050 pf->fragstat.nsubdirs < 0) {
2051 clog->error() << "bad/negative dir size on "
2052 << dir->dirfrag() << " " << pf->fragstat;
2053 assert(!"bad/negative fragstat" == g_conf->mds_verify_scatter);
2054
2055 if (pf->fragstat.nfiles < 0)
2056 pf->fragstat.nfiles = 0;
2057 if (pf->fragstat.nsubdirs < 0)
2058 pf->fragstat.nsubdirs = 0;
2059 }
2060
2061 if (update) {
2062 pf->accounted_fragstat = pf->fragstat;
2063 pf->fragstat.version = pf->accounted_fragstat.version = pi->dirstat.version;
2064 dout(10) << fg << " updated accounted_fragstat " << pf->fragstat << " on " << *dir << dendl;
2065 }
2066
2067 tmpdft.force_to_leaf(g_ceph_context, fg);
2068 dirstat.add(pf->fragstat);
2069 }
2070 if (touched_mtime)
2071 pi->mtime = pi->ctime = pi->dirstat.mtime;
2072 if (touched_chattr)
2073 pi->change_attr = pi->dirstat.change_attr;
2074 dout(20) << " final dirstat " << pi->dirstat << dendl;
2075
2076 if (dirstat_valid && !dirstat.same_sums(pi->dirstat)) {
2077 list<frag_t> ls;
2078 tmpdft.get_leaves_under(frag_t(), ls);
2079 for (list<frag_t>::iterator p = ls.begin(); p != ls.end(); ++p)
2080 if (!dirfrags.count(*p)) {
2081 dirstat_valid = false;
2082 break;
2083 }
2084 if (dirstat_valid) {
2085 if (state_test(CInode::STATE_REPAIRSTATS)) {
2086 dout(20) << " dirstat mismatch, fixing" << dendl;
2087 } else {
2088 clog->error() << "unmatched fragstat on " << ino() << ", inode has "
2089 << pi->dirstat << ", dirfrags have " << dirstat;
2090 assert(!"unmatched fragstat" == g_conf->mds_verify_scatter);
2091 }
2092 // trust the dirfrags for now
2093 version_t v = pi->dirstat.version;
2094 if (pi->dirstat.mtime > dirstat.mtime)
2095 dirstat.mtime = pi->dirstat.mtime;
2096 if (pi->dirstat.change_attr > dirstat.change_attr)
2097 dirstat.change_attr = pi->dirstat.change_attr;
2098 pi->dirstat = dirstat;
2099 pi->dirstat.version = v;
2100 }
2101 }
2102
2103 if (pi->dirstat.nfiles < 0 ||
2104 pi->dirstat.nsubdirs < 0) {
2105 clog->error() << "bad/negative fragstat on " << ino()
2106 << ", inode has " << pi->dirstat;
2107 assert(!"bad/negative fragstat" == g_conf->mds_verify_scatter);
2108
2109 if (pi->dirstat.nfiles < 0)
2110 pi->dirstat.nfiles = 0;
2111 if (pi->dirstat.nsubdirs < 0)
2112 pi->dirstat.nsubdirs = 0;
2113 }
2114 }
2115 break;
2116
2117 case CEPH_LOCK_INEST:
2118 {
2119 fragtree_t tmpdft = dirfragtree;
2120 nest_info_t rstat;
2121 rstat.rsubdirs = 1;
2122 bool rstat_valid = true;
2123
2124 // adjust summation
2125 assert(is_auth());
2126 inode_t *pi = get_projected_inode();
2127 dout(20) << " orig rstat " << pi->rstat << dendl;
2128 pi->rstat.version++;
2129 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
2130 p != dirfrags.end();
2131 ++p) {
2132 frag_t fg = p->first;
2133 CDir *dir = p->second;
2134 dout(20) << fg << " " << *dir << dendl;
2135
2136 bool update;
2137 if (dir->get_version() != 0) {
2138 update = dir->is_auth() && !dir->is_frozen();
2139 } else {
2140 update = false;
2141 rstat_valid = false;
2142 }
2143
2144 fnode_t *pf = dir->get_projected_fnode();
2145 if (update)
2146 pf = dir->project_fnode();
2147
2148 if (pf->accounted_rstat.version == pi->rstat.version-1) {
2149 // only pull this frag's dirty rstat inodes into the frag if
2150 // the frag is non-stale and updateable. if it's stale,
2151 // that info will just get thrown out!
2152 if (update)
2153 dir->assimilate_dirty_rstat_inodes();
2154
2155 dout(20) << fg << " rstat " << pf->rstat << dendl;
2156 dout(20) << fg << " accounted_rstat " << pf->accounted_rstat << dendl;
2157 dout(20) << fg << " dirty_old_rstat " << dir->dirty_old_rstat << dendl;
2158 mdcache->project_rstat_frag_to_inode(pf->rstat, pf->accounted_rstat,
2159 dir->first, CEPH_NOSNAP, this, true);
2160 for (compact_map<snapid_t,old_rstat_t>::iterator q = dir->dirty_old_rstat.begin();
2161 q != dir->dirty_old_rstat.end();
2162 ++q)
2163 mdcache->project_rstat_frag_to_inode(q->second.rstat, q->second.accounted_rstat,
2164 q->second.first, q->first, this, true);
2165 if (update) // dir contents not valid if frozen or non-auth
2166 dir->check_rstats();
2167 } else {
2168 dout(20) << fg << " skipping STALE accounted_rstat " << pf->accounted_rstat << dendl;
2169 }
2170 if (update) {
2171 pf->accounted_rstat = pf->rstat;
2172 dir->dirty_old_rstat.clear();
2173 pf->rstat.version = pf->accounted_rstat.version = pi->rstat.version;
2174 dir->check_rstats();
2175 dout(10) << fg << " updated accounted_rstat " << pf->rstat << " on " << *dir << dendl;
2176 }
2177
2178 tmpdft.force_to_leaf(g_ceph_context, fg);
2179 rstat.add(pf->rstat);
2180 }
2181 dout(20) << " final rstat " << pi->rstat << dendl;
2182
2183 if (rstat_valid && !rstat.same_sums(pi->rstat)) {
2184 list<frag_t> ls;
2185 tmpdft.get_leaves_under(frag_t(), ls);
2186 for (list<frag_t>::iterator p = ls.begin(); p != ls.end(); ++p)
2187 if (!dirfrags.count(*p)) {
2188 rstat_valid = false;
2189 break;
2190 }
2191 if (rstat_valid) {
2192 if (state_test(CInode::STATE_REPAIRSTATS)) {
2193 dout(20) << " rstat mismatch, fixing" << dendl;
2194 } else {
2195 clog->error() << "unmatched rstat on " << ino() << ", inode has "
2196 << pi->rstat << ", dirfrags have " << rstat;
2197 assert(!"unmatched rstat" == g_conf->mds_verify_scatter);
2198 }
2199 // trust the dirfrag for now
2200 version_t v = pi->rstat.version;
2201 if (pi->rstat.rctime > rstat.rctime)
2202 rstat.rctime = pi->rstat.rctime;
2203 pi->rstat = rstat;
2204 pi->rstat.version = v;
2205 }
2206 }
2207
2208 mdcache->broadcast_quota_to_client(this);
2209 }
2210 break;
2211
2212 case CEPH_LOCK_IDFT:
2213 break;
2214
2215 default:
2216 ceph_abort();
2217 }
2218 }
2219
2220 void CInode::finish_scatter_gather_update_accounted(int type, MutationRef& mut, EMetaBlob *metablob)
2221 {
2222 dout(10) << "finish_scatter_gather_update_accounted " << type << " on " << *this << dendl;
2223 assert(is_auth());
2224
2225 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
2226 p != dirfrags.end();
2227 ++p) {
2228 CDir *dir = p->second;
2229 if (!dir->is_auth() || dir->get_version() == 0 || dir->is_frozen())
2230 continue;
2231
2232 if (type == CEPH_LOCK_IDFT)
2233 continue; // nothing to do.
2234
2235 dout(10) << " journaling updated frag accounted_ on " << *dir << dendl;
2236 assert(dir->is_projected());
2237 fnode_t *pf = dir->get_projected_fnode();
2238 pf->version = dir->pre_dirty();
2239 mut->add_projected_fnode(dir);
2240 metablob->add_dir(dir, true);
2241 mut->auth_pin(dir);
2242
2243 if (type == CEPH_LOCK_INEST)
2244 dir->assimilate_dirty_rstat_inodes_finish(mut, metablob);
2245 }
2246 }
2247
2248 // waiting
2249
2250 bool CInode::is_frozen() const
2251 {
2252 if (is_frozen_inode()) return true;
2253 if (parent && parent->dir->is_frozen()) return true;
2254 return false;
2255 }
2256
2257 bool CInode::is_frozen_dir() const
2258 {
2259 if (parent && parent->dir->is_frozen_dir()) return true;
2260 return false;
2261 }
2262
2263 bool CInode::is_freezing() const
2264 {
2265 if (is_freezing_inode()) return true;
2266 if (parent && parent->dir->is_freezing()) return true;
2267 return false;
2268 }
2269
2270 void CInode::add_dir_waiter(frag_t fg, MDSInternalContextBase *c)
2271 {
2272 if (waiting_on_dir.empty())
2273 get(PIN_DIRWAITER);
2274 waiting_on_dir[fg].push_back(c);
2275 dout(10) << "add_dir_waiter frag " << fg << " " << c << " on " << *this << dendl;
2276 }
2277
2278 void CInode::take_dir_waiting(frag_t fg, list<MDSInternalContextBase*>& ls)
2279 {
2280 if (waiting_on_dir.empty())
2281 return;
2282
2283 compact_map<frag_t, list<MDSInternalContextBase*> >::iterator p = waiting_on_dir.find(fg);
2284 if (p != waiting_on_dir.end()) {
2285 dout(10) << "take_dir_waiting frag " << fg << " on " << *this << dendl;
2286 ls.splice(ls.end(), p->second);
2287 waiting_on_dir.erase(p);
2288
2289 if (waiting_on_dir.empty())
2290 put(PIN_DIRWAITER);
2291 }
2292 }
2293
2294 void CInode::add_waiter(uint64_t tag, MDSInternalContextBase *c)
2295 {
2296 dout(10) << "add_waiter tag " << std::hex << tag << std::dec << " " << c
2297 << " !ambig " << !state_test(STATE_AMBIGUOUSAUTH)
2298 << " !frozen " << !is_frozen_inode()
2299 << " !freezing " << !is_freezing_inode()
2300 << dendl;
2301 // wait on the directory?
2302 // make sure its not the inode that is explicitly ambiguous|freezing|frozen
2303 if (((tag & WAIT_SINGLEAUTH) && !state_test(STATE_AMBIGUOUSAUTH)) ||
2304 ((tag & WAIT_UNFREEZE) &&
2305 !is_frozen_inode() && !is_freezing_inode() && !is_frozen_auth_pin())) {
2306 dout(15) << "passing waiter up tree" << dendl;
2307 parent->dir->add_waiter(tag, c);
2308 return;
2309 }
2310 dout(15) << "taking waiter here" << dendl;
2311 MDSCacheObject::add_waiter(tag, c);
2312 }
2313
2314 void CInode::take_waiting(uint64_t mask, list<MDSInternalContextBase*>& ls)
2315 {
2316 if ((mask & WAIT_DIR) && !waiting_on_dir.empty()) {
2317 // take all dentry waiters
2318 while (!waiting_on_dir.empty()) {
2319 compact_map<frag_t, list<MDSInternalContextBase*> >::iterator p = waiting_on_dir.begin();
2320 dout(10) << "take_waiting dirfrag " << p->first << " on " << *this << dendl;
2321 ls.splice(ls.end(), p->second);
2322 waiting_on_dir.erase(p);
2323 }
2324 put(PIN_DIRWAITER);
2325 }
2326
2327 // waiting
2328 MDSCacheObject::take_waiting(mask, ls);
2329 }
2330
2331 bool CInode::freeze_inode(int auth_pin_allowance)
2332 {
2333 assert(auth_pin_allowance > 0); // otherwise we need to adjust parent's nested_auth_pins
2334 assert(auth_pins >= auth_pin_allowance);
2335 if (auth_pins > auth_pin_allowance) {
2336 dout(10) << "freeze_inode - waiting for auth_pins to drop to " << auth_pin_allowance << dendl;
2337 auth_pin_freeze_allowance = auth_pin_allowance;
2338 get(PIN_FREEZING);
2339 state_set(STATE_FREEZING);
2340 return false;
2341 }
2342
2343 dout(10) << "freeze_inode - frozen" << dendl;
2344 assert(auth_pins == auth_pin_allowance);
2345 if (!state_test(STATE_FROZEN)) {
2346 get(PIN_FROZEN);
2347 state_set(STATE_FROZEN);
2348 }
2349 return true;
2350 }
2351
2352 void CInode::unfreeze_inode(list<MDSInternalContextBase*>& finished)
2353 {
2354 dout(10) << "unfreeze_inode" << dendl;
2355 if (state_test(STATE_FREEZING)) {
2356 state_clear(STATE_FREEZING);
2357 put(PIN_FREEZING);
2358 } else if (state_test(STATE_FROZEN)) {
2359 state_clear(STATE_FROZEN);
2360 put(PIN_FROZEN);
2361 } else
2362 ceph_abort();
2363 take_waiting(WAIT_UNFREEZE, finished);
2364 }
2365
2366 void CInode::unfreeze_inode()
2367 {
2368 list<MDSInternalContextBase*> finished;
2369 unfreeze_inode(finished);
2370 mdcache->mds->queue_waiters(finished);
2371 }
2372
2373 void CInode::freeze_auth_pin()
2374 {
2375 assert(state_test(CInode::STATE_FROZEN));
2376 state_set(CInode::STATE_FROZENAUTHPIN);
2377 }
2378
2379 void CInode::unfreeze_auth_pin()
2380 {
2381 assert(state_test(CInode::STATE_FROZENAUTHPIN));
2382 state_clear(CInode::STATE_FROZENAUTHPIN);
2383 if (!state_test(STATE_FREEZING|STATE_FROZEN)) {
2384 list<MDSInternalContextBase*> finished;
2385 take_waiting(WAIT_UNFREEZE, finished);
2386 mdcache->mds->queue_waiters(finished);
2387 }
2388 }
2389
2390 void CInode::clear_ambiguous_auth(list<MDSInternalContextBase*>& finished)
2391 {
2392 assert(state_test(CInode::STATE_AMBIGUOUSAUTH));
2393 state_clear(CInode::STATE_AMBIGUOUSAUTH);
2394 take_waiting(CInode::WAIT_SINGLEAUTH, finished);
2395 }
2396
2397 void CInode::clear_ambiguous_auth()
2398 {
2399 list<MDSInternalContextBase*> finished;
2400 clear_ambiguous_auth(finished);
2401 mdcache->mds->queue_waiters(finished);
2402 }
2403
2404 // auth_pins
2405 bool CInode::can_auth_pin() const {
2406 if (!is_auth() || is_freezing_inode() || is_frozen_inode() || is_frozen_auth_pin())
2407 return false;
2408 if (parent)
2409 return parent->can_auth_pin();
2410 return true;
2411 }
2412
2413 void CInode::auth_pin(void *by)
2414 {
2415 if (auth_pins == 0)
2416 get(PIN_AUTHPIN);
2417 auth_pins++;
2418
2419 #ifdef MDS_AUTHPIN_SET
2420 auth_pin_set.insert(by);
2421 #endif
2422
2423 dout(10) << "auth_pin by " << by << " on " << *this
2424 << " now " << auth_pins << "+" << nested_auth_pins
2425 << dendl;
2426
2427 if (parent)
2428 parent->adjust_nested_auth_pins(1, 1, this);
2429 }
2430
2431 void CInode::auth_unpin(void *by)
2432 {
2433 auth_pins--;
2434
2435 #ifdef MDS_AUTHPIN_SET
2436 assert(auth_pin_set.count(by));
2437 auth_pin_set.erase(auth_pin_set.find(by));
2438 #endif
2439
2440 if (auth_pins == 0)
2441 put(PIN_AUTHPIN);
2442
2443 dout(10) << "auth_unpin by " << by << " on " << *this
2444 << " now " << auth_pins << "+" << nested_auth_pins
2445 << dendl;
2446
2447 assert(auth_pins >= 0);
2448
2449 if (parent)
2450 parent->adjust_nested_auth_pins(-1, -1, by);
2451
2452 if (is_freezing_inode() &&
2453 auth_pins == auth_pin_freeze_allowance) {
2454 dout(10) << "auth_unpin freezing!" << dendl;
2455 get(PIN_FROZEN);
2456 put(PIN_FREEZING);
2457 state_clear(STATE_FREEZING);
2458 state_set(STATE_FROZEN);
2459 finish_waiting(WAIT_FROZEN);
2460 }
2461 }
2462
2463 void CInode::adjust_nested_auth_pins(int a, void *by)
2464 {
2465 assert(a);
2466 nested_auth_pins += a;
2467 dout(35) << "adjust_nested_auth_pins by " << by
2468 << " change " << a << " yields "
2469 << auth_pins << "+" << nested_auth_pins << dendl;
2470 assert(nested_auth_pins >= 0);
2471
2472 if (g_conf->mds_debug_auth_pins) {
2473 // audit
2474 int s = 0;
2475 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
2476 p != dirfrags.end();
2477 ++p) {
2478 CDir *dir = p->second;
2479 if (!dir->is_subtree_root() && dir->get_cum_auth_pins())
2480 s++;
2481 }
2482 assert(s == nested_auth_pins);
2483 }
2484
2485 if (parent)
2486 parent->adjust_nested_auth_pins(a, 0, by);
2487 }
2488
2489
2490 // authority
2491
2492 mds_authority_t CInode::authority() const
2493 {
2494 if (inode_auth.first >= 0)
2495 return inode_auth;
2496
2497 if (parent)
2498 return parent->dir->authority();
2499
2500 // new items that are not yet linked in (in the committed plane) belong
2501 // to their first parent.
2502 if (!projected_parent.empty())
2503 return projected_parent.front()->dir->authority();
2504
2505 return CDIR_AUTH_UNDEF;
2506 }
2507
2508
2509 // SNAP
2510
2511 snapid_t CInode::get_oldest_snap()
2512 {
2513 snapid_t t = first;
2514 if (!old_inodes.empty())
2515 t = old_inodes.begin()->second.first;
2516 return MIN(t, oldest_snap);
2517 }
2518
2519 old_inode_t& CInode::cow_old_inode(snapid_t follows, bool cow_head)
2520 {
2521 assert(follows >= first);
2522
2523 inode_t *pi = cow_head ? get_projected_inode() : get_previous_projected_inode();
2524 map<string,bufferptr> *px = cow_head ? get_projected_xattrs() : get_previous_projected_xattrs();
2525
2526 old_inode_t &old = old_inodes[follows];
2527 old.first = first;
2528 old.inode = *pi;
2529 old.xattrs = *px;
2530
2531 if (first < oldest_snap)
2532 oldest_snap = first;
2533
2534 dout(10) << " " << px->size() << " xattrs cowed, " << *px << dendl;
2535
2536 old.inode.trim_client_ranges(follows);
2537
2538 if (g_conf->mds_snap_rstat &&
2539 !(old.inode.rstat == old.inode.accounted_rstat))
2540 dirty_old_rstats.insert(follows);
2541
2542 first = follows+1;
2543
2544 dout(10) << "cow_old_inode " << (cow_head ? "head" : "previous_head" )
2545 << " to [" << old.first << "," << follows << "] on "
2546 << *this << dendl;
2547
2548 return old;
2549 }
2550
2551 void CInode::split_old_inode(snapid_t snap)
2552 {
2553 compact_map<snapid_t, old_inode_t>::iterator p = old_inodes.lower_bound(snap);
2554 assert(p != old_inodes.end() && p->second.first < snap);
2555
2556 old_inode_t &old = old_inodes[snap - 1];
2557 old = p->second;
2558
2559 p->second.first = snap;
2560 dout(10) << "split_old_inode " << "[" << old.first << "," << p->first
2561 << "] to [" << snap << "," << p->first << "] on " << *this << dendl;
2562 }
2563
2564 void CInode::pre_cow_old_inode()
2565 {
2566 snapid_t follows = find_snaprealm()->get_newest_seq();
2567 if (first <= follows)
2568 cow_old_inode(follows, true);
2569 }
2570
2571 void CInode::purge_stale_snap_data(const set<snapid_t>& snaps)
2572 {
2573 dout(10) << "purge_stale_snap_data " << snaps << dendl;
2574
2575 if (old_inodes.empty())
2576 return;
2577
2578 compact_map<snapid_t,old_inode_t>::iterator p = old_inodes.begin();
2579 while (p != old_inodes.end()) {
2580 set<snapid_t>::const_iterator q = snaps.lower_bound(p->second.first);
2581 if (q == snaps.end() || *q > p->first) {
2582 dout(10) << " purging old_inode [" << p->second.first << "," << p->first << "]" << dendl;
2583 old_inodes.erase(p++);
2584 } else
2585 ++p;
2586 }
2587 }
2588
2589 /*
2590 * pick/create an old_inode
2591 */
2592 old_inode_t * CInode::pick_old_inode(snapid_t snap)
2593 {
2594 compact_map<snapid_t, old_inode_t>::iterator p = old_inodes.lower_bound(snap); // p is first key >= to snap
2595 if (p != old_inodes.end() && p->second.first <= snap) {
2596 dout(10) << "pick_old_inode snap " << snap << " -> [" << p->second.first << "," << p->first << "]" << dendl;
2597 return &p->second;
2598 }
2599 dout(10) << "pick_old_inode snap " << snap << " -> nothing" << dendl;
2600 return NULL;
2601 }
2602
2603 void CInode::open_snaprealm(bool nosplit)
2604 {
2605 if (!snaprealm) {
2606 SnapRealm *parent = find_snaprealm();
2607 snaprealm = new SnapRealm(mdcache, this);
2608 if (parent) {
2609 dout(10) << "open_snaprealm " << snaprealm
2610 << " parent is " << parent
2611 << dendl;
2612 dout(30) << " siblings are " << parent->open_children << dendl;
2613 snaprealm->parent = parent;
2614 if (!nosplit)
2615 parent->split_at(snaprealm);
2616 parent->open_children.insert(snaprealm);
2617 }
2618 }
2619 }
2620 void CInode::close_snaprealm(bool nojoin)
2621 {
2622 if (snaprealm) {
2623 dout(15) << "close_snaprealm " << *snaprealm << dendl;
2624 snaprealm->close_parents();
2625 if (snaprealm->parent) {
2626 snaprealm->parent->open_children.erase(snaprealm);
2627 //if (!nojoin)
2628 //snaprealm->parent->join(snaprealm);
2629 }
2630 delete snaprealm;
2631 snaprealm = 0;
2632 }
2633 }
2634
2635 SnapRealm *CInode::find_snaprealm() const
2636 {
2637 const CInode *cur = this;
2638 while (!cur->snaprealm) {
2639 if (cur->get_parent_dn())
2640 cur = cur->get_parent_dn()->get_dir()->get_inode();
2641 else if (get_projected_parent_dn())
2642 cur = cur->get_projected_parent_dn()->get_dir()->get_inode();
2643 else
2644 break;
2645 }
2646 return cur->snaprealm;
2647 }
2648
2649 void CInode::encode_snap_blob(bufferlist &snapbl)
2650 {
2651 if (snaprealm) {
2652 ::encode(snaprealm->srnode, snapbl);
2653 dout(20) << "encode_snap_blob " << *snaprealm << dendl;
2654 }
2655 }
2656 void CInode::decode_snap_blob(bufferlist& snapbl)
2657 {
2658 if (snapbl.length()) {
2659 open_snaprealm();
2660 bufferlist::iterator p = snapbl.begin();
2661 ::decode(snaprealm->srnode, p);
2662 if (is_base()) {
2663 bool ok = snaprealm->_open_parents(NULL);
2664 assert(ok);
2665 }
2666 dout(20) << "decode_snap_blob " << *snaprealm << dendl;
2667 }
2668 }
2669
2670 void CInode::encode_snap(bufferlist& bl)
2671 {
2672 bufferlist snapbl;
2673 encode_snap_blob(snapbl);
2674 ::encode(snapbl, bl);
2675 ::encode(oldest_snap, bl);
2676 }
2677
2678 void CInode::decode_snap(bufferlist::iterator& p)
2679 {
2680 bufferlist snapbl;
2681 ::decode(snapbl, p);
2682 ::decode(oldest_snap, p);
2683 decode_snap_blob(snapbl);
2684 }
2685
2686 // =============================================
2687
2688 client_t CInode::calc_ideal_loner()
2689 {
2690 if (mdcache->is_readonly())
2691 return -1;
2692 if (!mds_caps_wanted.empty())
2693 return -1;
2694
2695 int n = 0;
2696 client_t loner = -1;
2697 for (map<client_t,Capability*>::iterator it = client_caps.begin();
2698 it != client_caps.end();
2699 ++it)
2700 if (!it->second->is_stale() &&
2701 ((it->second->wanted() & (CEPH_CAP_ANY_WR|CEPH_CAP_FILE_WR|CEPH_CAP_FILE_RD)) ||
2702 (inode.is_dir() && !has_subtree_root_dirfrag()))) {
2703 if (n)
2704 return -1;
2705 n++;
2706 loner = it->first;
2707 }
2708 return loner;
2709 }
2710
2711 client_t CInode::choose_ideal_loner()
2712 {
2713 want_loner_cap = calc_ideal_loner();
2714 return want_loner_cap;
2715 }
2716
2717 bool CInode::try_set_loner()
2718 {
2719 assert(want_loner_cap >= 0);
2720 if (loner_cap >= 0 && loner_cap != want_loner_cap)
2721 return false;
2722 set_loner_cap(want_loner_cap);
2723 return true;
2724 }
2725
2726 void CInode::set_loner_cap(client_t l)
2727 {
2728 loner_cap = l;
2729 authlock.set_excl_client(loner_cap);
2730 filelock.set_excl_client(loner_cap);
2731 linklock.set_excl_client(loner_cap);
2732 xattrlock.set_excl_client(loner_cap);
2733 }
2734
2735 bool CInode::try_drop_loner()
2736 {
2737 if (loner_cap < 0)
2738 return true;
2739
2740 int other_allowed = get_caps_allowed_by_type(CAP_ANY);
2741 Capability *cap = get_client_cap(loner_cap);
2742 if (!cap ||
2743 (cap->issued() & ~other_allowed) == 0) {
2744 set_loner_cap(-1);
2745 return true;
2746 }
2747 return false;
2748 }
2749
2750
2751 // choose new lock state during recovery, based on issued caps
2752 void CInode::choose_lock_state(SimpleLock *lock, int allissued)
2753 {
2754 int shift = lock->get_cap_shift();
2755 int issued = (allissued >> shift) & lock->get_cap_mask();
2756 if (is_auth()) {
2757 if (lock->is_xlocked()) {
2758 // do nothing here
2759 } else if (lock->get_state() != LOCK_MIX) {
2760 if (issued & (CEPH_CAP_GEXCL | CEPH_CAP_GBUFFER))
2761 lock->set_state(LOCK_EXCL);
2762 else if (issued & CEPH_CAP_GWR)
2763 lock->set_state(LOCK_MIX);
2764 else if (lock->is_dirty()) {
2765 if (is_replicated())
2766 lock->set_state(LOCK_MIX);
2767 else
2768 lock->set_state(LOCK_LOCK);
2769 } else
2770 lock->set_state(LOCK_SYNC);
2771 }
2772 } else {
2773 // our states have already been chosen during rejoin.
2774 if (lock->is_xlocked())
2775 assert(lock->get_state() == LOCK_LOCK);
2776 }
2777 }
2778
2779 void CInode::choose_lock_states(int dirty_caps)
2780 {
2781 int issued = get_caps_issued() | dirty_caps;
2782 if (is_auth() && (issued & (CEPH_CAP_ANY_EXCL|CEPH_CAP_ANY_WR)) &&
2783 choose_ideal_loner() >= 0)
2784 try_set_loner();
2785 choose_lock_state(&filelock, issued);
2786 choose_lock_state(&nestlock, issued);
2787 choose_lock_state(&dirfragtreelock, issued);
2788 choose_lock_state(&authlock, issued);
2789 choose_lock_state(&xattrlock, issued);
2790 choose_lock_state(&linklock, issued);
2791 }
2792
2793 Capability *CInode::add_client_cap(client_t client, Session *session, SnapRealm *conrealm)
2794 {
2795 if (client_caps.empty()) {
2796 get(PIN_CAPS);
2797 if (conrealm)
2798 containing_realm = conrealm;
2799 else
2800 containing_realm = find_snaprealm();
2801 containing_realm->inodes_with_caps.push_back(&item_caps);
2802 dout(10) << "add_client_cap first cap, joining realm " << *containing_realm << dendl;
2803 }
2804
2805 if (client_caps.empty())
2806 mdcache->num_inodes_with_caps++;
2807
2808 Capability *cap = new Capability(this, ++mdcache->last_cap_id, client);
2809 assert(client_caps.count(client) == 0);
2810 client_caps[client] = cap;
2811
2812 session->add_cap(cap);
2813 if (session->is_stale())
2814 cap->mark_stale();
2815
2816 cap->client_follows = first-1;
2817
2818 containing_realm->add_cap(client, cap);
2819
2820 return cap;
2821 }
2822
2823 void CInode::remove_client_cap(client_t client)
2824 {
2825 assert(client_caps.count(client) == 1);
2826 Capability *cap = client_caps[client];
2827
2828 cap->item_session_caps.remove_myself();
2829 cap->item_revoking_caps.remove_myself();
2830 cap->item_client_revoking_caps.remove_myself();
2831 containing_realm->remove_cap(client, cap);
2832
2833 if (client == loner_cap)
2834 loner_cap = -1;
2835
2836 delete cap;
2837 client_caps.erase(client);
2838 if (client_caps.empty()) {
2839 dout(10) << "remove_client_cap last cap, leaving realm " << *containing_realm << dendl;
2840 put(PIN_CAPS);
2841 item_caps.remove_myself();
2842 containing_realm = NULL;
2843 item_open_file.remove_myself(); // unpin logsegment
2844 mdcache->num_inodes_with_caps--;
2845 }
2846
2847 //clean up advisory locks
2848 bool fcntl_removed = fcntl_locks ? fcntl_locks->remove_all_from(client) : false;
2849 bool flock_removed = flock_locks ? flock_locks->remove_all_from(client) : false;
2850 if (fcntl_removed || flock_removed) {
2851 list<MDSInternalContextBase*> waiters;
2852 take_waiting(CInode::WAIT_FLOCK, waiters);
2853 mdcache->mds->queue_waiters(waiters);
2854 }
2855 }
2856
2857 void CInode::move_to_realm(SnapRealm *realm)
2858 {
2859 dout(10) << "move_to_realm joining realm " << *realm
2860 << ", leaving realm " << *containing_realm << dendl;
2861 for (map<client_t,Capability*>::iterator q = client_caps.begin();
2862 q != client_caps.end();
2863 ++q) {
2864 containing_realm->remove_cap(q->first, q->second);
2865 realm->add_cap(q->first, q->second);
2866 }
2867 item_caps.remove_myself();
2868 realm->inodes_with_caps.push_back(&item_caps);
2869 containing_realm = realm;
2870 }
2871
2872 Capability *CInode::reconnect_cap(client_t client, const cap_reconnect_t& icr, Session *session)
2873 {
2874 Capability *cap = get_client_cap(client);
2875 if (cap) {
2876 // FIXME?
2877 cap->merge(icr.capinfo.wanted, icr.capinfo.issued);
2878 } else {
2879 cap = add_client_cap(client, session);
2880 cap->set_cap_id(icr.capinfo.cap_id);
2881 cap->set_wanted(icr.capinfo.wanted);
2882 cap->issue_norevoke(icr.capinfo.issued);
2883 cap->reset_seq();
2884 }
2885 cap->set_last_issue_stamp(ceph_clock_now());
2886 return cap;
2887 }
2888
2889 void CInode::clear_client_caps_after_export()
2890 {
2891 while (!client_caps.empty())
2892 remove_client_cap(client_caps.begin()->first);
2893 loner_cap = -1;
2894 want_loner_cap = -1;
2895 mds_caps_wanted.clear();
2896 }
2897
2898 void CInode::export_client_caps(map<client_t,Capability::Export>& cl)
2899 {
2900 for (map<client_t,Capability*>::iterator it = client_caps.begin();
2901 it != client_caps.end();
2902 ++it) {
2903 cl[it->first] = it->second->make_export();
2904 }
2905 }
2906
2907 // caps allowed
2908 int CInode::get_caps_liked() const
2909 {
2910 if (is_dir())
2911 return CEPH_CAP_PIN | CEPH_CAP_ANY_EXCL | CEPH_CAP_ANY_SHARED; // but not, say, FILE_RD|WR|WRBUFFER
2912 else
2913 return CEPH_CAP_ANY & ~CEPH_CAP_FILE_LAZYIO;
2914 }
2915
2916 int CInode::get_caps_allowed_ever() const
2917 {
2918 int allowed;
2919 if (is_dir())
2920 allowed = CEPH_CAP_PIN | CEPH_CAP_ANY_EXCL | CEPH_CAP_ANY_SHARED;
2921 else
2922 allowed = CEPH_CAP_ANY;
2923 return allowed &
2924 (CEPH_CAP_PIN |
2925 (filelock.gcaps_allowed_ever() << filelock.get_cap_shift()) |
2926 (authlock.gcaps_allowed_ever() << authlock.get_cap_shift()) |
2927 (xattrlock.gcaps_allowed_ever() << xattrlock.get_cap_shift()) |
2928 (linklock.gcaps_allowed_ever() << linklock.get_cap_shift()));
2929 }
2930
2931 int CInode::get_caps_allowed_by_type(int type) const
2932 {
2933 return
2934 CEPH_CAP_PIN |
2935 (filelock.gcaps_allowed(type) << filelock.get_cap_shift()) |
2936 (authlock.gcaps_allowed(type) << authlock.get_cap_shift()) |
2937 (xattrlock.gcaps_allowed(type) << xattrlock.get_cap_shift()) |
2938 (linklock.gcaps_allowed(type) << linklock.get_cap_shift());
2939 }
2940
2941 int CInode::get_caps_careful() const
2942 {
2943 return
2944 (filelock.gcaps_careful() << filelock.get_cap_shift()) |
2945 (authlock.gcaps_careful() << authlock.get_cap_shift()) |
2946 (xattrlock.gcaps_careful() << xattrlock.get_cap_shift()) |
2947 (linklock.gcaps_careful() << linklock.get_cap_shift());
2948 }
2949
2950 int CInode::get_xlocker_mask(client_t client) const
2951 {
2952 return
2953 (filelock.gcaps_xlocker_mask(client) << filelock.get_cap_shift()) |
2954 (authlock.gcaps_xlocker_mask(client) << authlock.get_cap_shift()) |
2955 (xattrlock.gcaps_xlocker_mask(client) << xattrlock.get_cap_shift()) |
2956 (linklock.gcaps_xlocker_mask(client) << linklock.get_cap_shift());
2957 }
2958
2959 int CInode::get_caps_allowed_for_client(Session *session, inode_t *file_i) const
2960 {
2961 client_t client = session->info.inst.name.num();
2962 int allowed;
2963 if (client == get_loner()) {
2964 // as the loner, we get the loner_caps AND any xlocker_caps for things we have xlocked
2965 allowed =
2966 get_caps_allowed_by_type(CAP_LONER) |
2967 (get_caps_allowed_by_type(CAP_XLOCKER) & get_xlocker_mask(client));
2968 } else {
2969 allowed = get_caps_allowed_by_type(CAP_ANY);
2970 }
2971
2972 if (!is_dir()) {
2973 if ((file_i->inline_data.version != CEPH_INLINE_NONE &&
2974 !session->connection->has_feature(CEPH_FEATURE_MDS_INLINE_DATA)) ||
2975 (!file_i->layout.pool_ns.empty() &&
2976 !session->connection->has_feature(CEPH_FEATURE_FS_FILE_LAYOUT_V2)))
2977 allowed &= ~(CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR);
2978 }
2979 return allowed;
2980 }
2981
2982 // caps issued, wanted
2983 int CInode::get_caps_issued(int *ploner, int *pother, int *pxlocker,
2984 int shift, int mask)
2985 {
2986 int c = 0;
2987 int loner = 0, other = 0, xlocker = 0;
2988 if (!is_auth()) {
2989 loner_cap = -1;
2990 }
2991
2992 for (map<client_t,Capability*>::const_iterator it = client_caps.begin();
2993 it != client_caps.end();
2994 ++it) {
2995 int i = it->second->issued();
2996 c |= i;
2997 if (it->first == loner_cap)
2998 loner |= i;
2999 else
3000 other |= i;
3001 xlocker |= get_xlocker_mask(it->first) & i;
3002 }
3003 if (ploner) *ploner = (loner >> shift) & mask;
3004 if (pother) *pother = (other >> shift) & mask;
3005 if (pxlocker) *pxlocker = (xlocker >> shift) & mask;
3006 return (c >> shift) & mask;
3007 }
3008
3009 bool CInode::is_any_caps_wanted() const
3010 {
3011 for (map<client_t,Capability*>::const_iterator it = client_caps.begin();
3012 it != client_caps.end();
3013 ++it)
3014 if (it->second->wanted())
3015 return true;
3016 return false;
3017 }
3018
3019 int CInode::get_caps_wanted(int *ploner, int *pother, int shift, int mask) const
3020 {
3021 int w = 0;
3022 int loner = 0, other = 0;
3023 for (map<client_t,Capability*>::const_iterator it = client_caps.begin();
3024 it != client_caps.end();
3025 ++it) {
3026 if (!it->second->is_stale()) {
3027 int t = it->second->wanted();
3028 w |= t;
3029 if (it->first == loner_cap)
3030 loner |= t;
3031 else
3032 other |= t;
3033 }
3034 //cout << " get_caps_wanted client " << it->first << " " << cap_string(it->second.wanted()) << endl;
3035 }
3036 if (is_auth())
3037 for (compact_map<int,int>::const_iterator it = mds_caps_wanted.begin();
3038 it != mds_caps_wanted.end();
3039 ++it) {
3040 w |= it->second;
3041 other |= it->second;
3042 //cout << " get_caps_wanted mds " << it->first << " " << cap_string(it->second) << endl;
3043 }
3044 if (ploner) *ploner = (loner >> shift) & mask;
3045 if (pother) *pother = (other >> shift) & mask;
3046 return (w >> shift) & mask;
3047 }
3048
3049 bool CInode::issued_caps_need_gather(SimpleLock *lock)
3050 {
3051 int loner_issued, other_issued, xlocker_issued;
3052 get_caps_issued(&loner_issued, &other_issued, &xlocker_issued,
3053 lock->get_cap_shift(), lock->get_cap_mask());
3054 if ((loner_issued & ~lock->gcaps_allowed(CAP_LONER)) ||
3055 (other_issued & ~lock->gcaps_allowed(CAP_ANY)) ||
3056 (xlocker_issued & ~lock->gcaps_allowed(CAP_XLOCKER)))
3057 return true;
3058 return false;
3059 }
3060
3061 void CInode::replicate_relax_locks()
3062 {
3063 //dout(10) << " relaxing locks on " << *this << dendl;
3064 assert(is_auth());
3065 assert(!is_replicated());
3066
3067 authlock.replicate_relax();
3068 linklock.replicate_relax();
3069 dirfragtreelock.replicate_relax();
3070 filelock.replicate_relax();
3071 xattrlock.replicate_relax();
3072 snaplock.replicate_relax();
3073 nestlock.replicate_relax();
3074 flocklock.replicate_relax();
3075 policylock.replicate_relax();
3076 }
3077
3078
3079
3080 // =============================================
3081
3082 int CInode::encode_inodestat(bufferlist& bl, Session *session,
3083 SnapRealm *dir_realm,
3084 snapid_t snapid,
3085 unsigned max_bytes,
3086 int getattr_caps)
3087 {
3088 client_t client = session->info.inst.name.num();
3089 assert(snapid);
3090 assert(session->connection);
3091
3092 bool valid = true;
3093
3094 // pick a version!
3095 inode_t *oi = &inode;
3096 inode_t *pi = get_projected_inode();
3097
3098 map<string, bufferptr> *pxattrs = 0;
3099
3100 if (snapid != CEPH_NOSNAP) {
3101
3102 // for now at least, old_inodes is only defined/valid on the auth
3103 if (!is_auth())
3104 valid = false;
3105
3106 if (is_multiversion()) {
3107 compact_map<snapid_t,old_inode_t>::iterator p = old_inodes.lower_bound(snapid);
3108 if (p != old_inodes.end()) {
3109 if (p->second.first > snapid) {
3110 if (p != old_inodes.begin())
3111 --p;
3112 }
3113 if (p->second.first <= snapid && snapid <= p->first) {
3114 dout(15) << "encode_inodestat snapid " << snapid
3115 << " to old_inode [" << p->second.first << "," << p->first << "]"
3116 << " " << p->second.inode.rstat
3117 << dendl;
3118 pi = oi = &p->second.inode;
3119 pxattrs = &p->second.xattrs;
3120 } else {
3121 // snapshoted remote dentry can result this
3122 dout(0) << "encode_inodestat old_inode for snapid " << snapid
3123 << " not found" << dendl;
3124 }
3125 }
3126 } else if (snapid < first || snapid > last) {
3127 // snapshoted remote dentry can result this
3128 dout(0) << "encode_inodestat [" << first << "," << last << "]"
3129 << " not match snapid " << snapid << dendl;
3130 }
3131 }
3132
3133 SnapRealm *realm = find_snaprealm();
3134
3135 bool no_caps = !valid ||
3136 session->is_stale() ||
3137 (dir_realm && realm != dir_realm) ||
3138 is_frozen() ||
3139 state_test(CInode::STATE_EXPORTINGCAPS);
3140 if (no_caps)
3141 dout(20) << "encode_inodestat no caps"
3142 << (!valid?", !valid":"")
3143 << (session->is_stale()?", session stale ":"")
3144 << ((dir_realm && realm != dir_realm)?", snaprealm differs ":"")
3145 << (is_frozen()?", frozen inode":"")
3146 << (state_test(CInode::STATE_EXPORTINGCAPS)?", exporting caps":"")
3147 << dendl;
3148
3149
3150 // "fake" a version that is old (stable) version, +1 if projected.
3151 version_t version = (oi->version * 2) + is_projected();
3152
3153 Capability *cap = get_client_cap(client);
3154 bool pfile = filelock.is_xlocked_by_client(client) || get_loner() == client;
3155 //(cap && (cap->issued() & CEPH_CAP_FILE_EXCL));
3156 bool pauth = authlock.is_xlocked_by_client(client) || get_loner() == client;
3157 bool plink = linklock.is_xlocked_by_client(client) || get_loner() == client;
3158 bool pxattr = xattrlock.is_xlocked_by_client(client) || get_loner() == client;
3159
3160 bool plocal = versionlock.get_last_wrlock_client() == client;
3161 bool ppolicy = policylock.is_xlocked_by_client(client) || get_loner()==client;
3162
3163 inode_t *any_i = (pfile|pauth|plink|pxattr|plocal) ? pi : oi;
3164
3165 dout(20) << " pfile " << pfile << " pauth " << pauth
3166 << " plink " << plink << " pxattr " << pxattr
3167 << " plocal " << plocal
3168 << " ctime " << any_i->ctime
3169 << " valid=" << valid << dendl;
3170
3171 // file
3172 inode_t *file_i = pfile ? pi:oi;
3173 file_layout_t layout;
3174 if (is_dir()) {
3175 layout = (ppolicy ? pi : oi)->layout;
3176 } else {
3177 layout = file_i->layout;
3178 }
3179
3180 // max_size is min of projected, actual
3181 uint64_t max_size =
3182 MIN(oi->client_ranges.count(client) ?
3183 oi->client_ranges[client].range.last : 0,
3184 pi->client_ranges.count(client) ?
3185 pi->client_ranges[client].range.last : 0);
3186
3187 // inline data
3188 version_t inline_version = 0;
3189 bufferlist inline_data;
3190 if (file_i->inline_data.version == CEPH_INLINE_NONE) {
3191 inline_version = CEPH_INLINE_NONE;
3192 } else if ((!cap && !no_caps) ||
3193 (cap && cap->client_inline_version < file_i->inline_data.version) ||
3194 (getattr_caps & CEPH_CAP_FILE_RD)) { // client requests inline data
3195 inline_version = file_i->inline_data.version;
3196 if (file_i->inline_data.length() > 0)
3197 inline_data = file_i->inline_data.get_data();
3198 }
3199
3200 // nest (do same as file... :/)
3201 if (cap) {
3202 cap->last_rbytes = file_i->rstat.rbytes;
3203 cap->last_rsize = file_i->rstat.rsize();
3204 }
3205
3206 // auth
3207 inode_t *auth_i = pauth ? pi:oi;
3208
3209 // link
3210 inode_t *link_i = plink ? pi:oi;
3211
3212 // xattr
3213 inode_t *xattr_i = pxattr ? pi:oi;
3214
3215 // xattr
3216 bufferlist xbl;
3217 version_t xattr_version;
3218 if ((!cap && !no_caps) ||
3219 (cap && cap->client_xattr_version < xattr_i->xattr_version) ||
3220 (getattr_caps & CEPH_CAP_XATTR_SHARED)) { // client requests xattrs
3221 if (!pxattrs)
3222 pxattrs = pxattr ? get_projected_xattrs() : &xattrs;
3223 ::encode(*pxattrs, xbl);
3224 xattr_version = xattr_i->xattr_version;
3225 } else {
3226 xattr_version = 0;
3227 }
3228
3229 // do we have room?
3230 if (max_bytes) {
3231 unsigned bytes = 8 + 8 + 4 + 8 + 8 + sizeof(ceph_mds_reply_cap) +
3232 sizeof(struct ceph_file_layout) + 4 + layout.pool_ns.size() +
3233 sizeof(struct ceph_timespec) * 3 +
3234 4 + 8 + 8 + 8 + 4 + 4 + 4 + 4 + 4 +
3235 8 + 8 + 8 + 8 + 8 + sizeof(struct ceph_timespec) +
3236 4;
3237 bytes += sizeof(__u32);
3238 bytes += (sizeof(__u32) + sizeof(__u32)) * dirfragtree._splits.size();
3239 bytes += sizeof(__u32) + symlink.length();
3240 bytes += sizeof(__u32) + xbl.length();
3241 bytes += sizeof(version_t) + sizeof(__u32) + inline_data.length();
3242 if (bytes > max_bytes)
3243 return -ENOSPC;
3244 }
3245
3246
3247 // encode caps
3248 struct ceph_mds_reply_cap ecap;
3249 if (snapid != CEPH_NOSNAP) {
3250 /*
3251 * snapped inodes (files or dirs) only get read-only caps. always
3252 * issue everything possible, since it is read only.
3253 *
3254 * if a snapped inode has caps, limit issued caps based on the
3255 * lock state.
3256 *
3257 * if it is a live inode, limit issued caps based on the lock
3258 * state.
3259 *
3260 * do NOT adjust cap issued state, because the client always
3261 * tracks caps per-snap and the mds does either per-interval or
3262 * multiversion.
3263 */
3264 ecap.caps = valid ? get_caps_allowed_by_type(CAP_ANY) : CEPH_STAT_CAP_INODE;
3265 if (last == CEPH_NOSNAP || is_any_caps())
3266 ecap.caps = ecap.caps & get_caps_allowed_for_client(session, file_i);
3267 ecap.seq = 0;
3268 ecap.mseq = 0;
3269 ecap.realm = 0;
3270 } else {
3271 if (!no_caps && !cap) {
3272 // add a new cap
3273 cap = add_client_cap(client, session, realm);
3274 if (is_auth()) {
3275 if (choose_ideal_loner() >= 0)
3276 try_set_loner();
3277 else if (get_wanted_loner() < 0)
3278 try_drop_loner();
3279 }
3280 }
3281
3282 int issue = 0;
3283 if (!no_caps && cap) {
3284 int likes = get_caps_liked();
3285 int allowed = get_caps_allowed_for_client(session, file_i);
3286 issue = (cap->wanted() | likes) & allowed;
3287 cap->issue_norevoke(issue);
3288 issue = cap->pending();
3289 dout(10) << "encode_inodestat issuing " << ccap_string(issue)
3290 << " seq " << cap->get_last_seq() << dendl;
3291 } else if (cap && cap->is_new() && !dir_realm) {
3292 // alway issue new caps to client, otherwise the caps get lost
3293 assert(cap->is_stale());
3294 issue = cap->pending() | CEPH_CAP_PIN;
3295 cap->issue_norevoke(issue);
3296 dout(10) << "encode_inodestat issuing " << ccap_string(issue)
3297 << " seq " << cap->get_last_seq()
3298 << "(stale|new caps)" << dendl;
3299 }
3300
3301 if (issue) {
3302 cap->set_last_issue();
3303 cap->set_last_issue_stamp(ceph_clock_now());
3304 cap->clear_new();
3305 ecap.caps = issue;
3306 ecap.wanted = cap->wanted();
3307 ecap.cap_id = cap->get_cap_id();
3308 ecap.seq = cap->get_last_seq();
3309 ecap.mseq = cap->get_mseq();
3310 ecap.realm = realm->inode->ino();
3311 } else {
3312 ecap.cap_id = 0;
3313 ecap.caps = 0;
3314 ecap.seq = 0;
3315 ecap.mseq = 0;
3316 ecap.realm = 0;
3317 ecap.wanted = 0;
3318 }
3319 }
3320 ecap.flags = is_auth() ? CEPH_CAP_FLAG_AUTH : 0;
3321 dout(10) << "encode_inodestat caps " << ccap_string(ecap.caps)
3322 << " seq " << ecap.seq << " mseq " << ecap.mseq
3323 << " xattrv " << xattr_version << " len " << xbl.length()
3324 << dendl;
3325
3326 if (inline_data.length() && cap) {
3327 if ((cap->pending() | getattr_caps) & CEPH_CAP_FILE_SHARED) {
3328 dout(10) << "including inline version " << inline_version << dendl;
3329 cap->client_inline_version = inline_version;
3330 } else {
3331 dout(10) << "dropping inline version " << inline_version << dendl;
3332 inline_version = 0;
3333 inline_data.clear();
3334 }
3335 }
3336
3337 // include those xattrs?
3338 if (xbl.length() && cap) {
3339 if ((cap->pending() | getattr_caps) & CEPH_CAP_XATTR_SHARED) {
3340 dout(10) << "including xattrs version " << xattr_i->xattr_version << dendl;
3341 cap->client_xattr_version = xattr_i->xattr_version;
3342 } else {
3343 dout(10) << "dropping xattrs version " << xattr_i->xattr_version << dendl;
3344 xbl.clear(); // no xattrs .. XXX what's this about?!?
3345 xattr_version = 0;
3346 }
3347 }
3348
3349 /*
3350 * note: encoding matches MClientReply::InodeStat
3351 */
3352 ::encode(oi->ino, bl);
3353 ::encode(snapid, bl);
3354 ::encode(oi->rdev, bl);
3355 ::encode(version, bl);
3356
3357 ::encode(xattr_version, bl);
3358
3359 ::encode(ecap, bl);
3360 {
3361 ceph_file_layout legacy_layout;
3362 layout.to_legacy(&legacy_layout);
3363 ::encode(legacy_layout, bl);
3364 }
3365 ::encode(any_i->ctime, bl);
3366 ::encode(file_i->mtime, bl);
3367 ::encode(file_i->atime, bl);
3368 ::encode(file_i->time_warp_seq, bl);
3369 ::encode(file_i->size, bl);
3370 ::encode(max_size, bl);
3371 ::encode(file_i->truncate_size, bl);
3372 ::encode(file_i->truncate_seq, bl);
3373
3374 ::encode(auth_i->mode, bl);
3375 ::encode((uint32_t)auth_i->uid, bl);
3376 ::encode((uint32_t)auth_i->gid, bl);
3377
3378 ::encode(link_i->nlink, bl);
3379
3380 ::encode(file_i->dirstat.nfiles, bl);
3381 ::encode(file_i->dirstat.nsubdirs, bl);
3382 ::encode(file_i->rstat.rbytes, bl);
3383 ::encode(file_i->rstat.rfiles, bl);
3384 ::encode(file_i->rstat.rsubdirs, bl);
3385 ::encode(file_i->rstat.rctime, bl);
3386
3387 dirfragtree.encode(bl);
3388
3389 ::encode(symlink, bl);
3390 if (session->connection->has_feature(CEPH_FEATURE_DIRLAYOUTHASH)) {
3391 ::encode(file_i->dir_layout, bl);
3392 }
3393 ::encode(xbl, bl);
3394 if (session->connection->has_feature(CEPH_FEATURE_MDS_INLINE_DATA)) {
3395 ::encode(inline_version, bl);
3396 ::encode(inline_data, bl);
3397 }
3398 if (session->connection->has_feature(CEPH_FEATURE_MDS_QUOTA)) {
3399 inode_t *policy_i = ppolicy ? pi : oi;
3400 ::encode(policy_i->quota, bl);
3401 }
3402 if (session->connection->has_feature(CEPH_FEATURE_FS_FILE_LAYOUT_V2)) {
3403 ::encode(layout.pool_ns, bl);
3404 }
3405 if (session->connection->has_feature(CEPH_FEATURE_FS_BTIME)) {
3406 ::encode(any_i->btime, bl);
3407 ::encode(any_i->change_attr, bl);
3408 }
3409
3410 return valid;
3411 }
3412
3413 void CInode::encode_cap_message(MClientCaps *m, Capability *cap)
3414 {
3415 assert(cap);
3416
3417 client_t client = cap->get_client();
3418
3419 bool pfile = filelock.is_xlocked_by_client(client) || (cap->issued() & CEPH_CAP_FILE_EXCL);
3420 bool pauth = authlock.is_xlocked_by_client(client);
3421 bool plink = linklock.is_xlocked_by_client(client);
3422 bool pxattr = xattrlock.is_xlocked_by_client(client);
3423
3424 inode_t *oi = &inode;
3425 inode_t *pi = get_projected_inode();
3426 inode_t *i = (pfile|pauth|plink|pxattr) ? pi : oi;
3427
3428 dout(20) << "encode_cap_message pfile " << pfile
3429 << " pauth " << pauth << " plink " << plink << " pxattr " << pxattr
3430 << " ctime " << i->ctime << dendl;
3431
3432 i = pfile ? pi:oi;
3433 m->set_layout(i->layout);
3434 m->size = i->size;
3435 m->truncate_seq = i->truncate_seq;
3436 m->truncate_size = i->truncate_size;
3437 m->mtime = i->mtime;
3438 m->atime = i->atime;
3439 m->ctime = i->ctime;
3440 m->change_attr = i->change_attr;
3441 m->time_warp_seq = i->time_warp_seq;
3442
3443 if (cap->client_inline_version < i->inline_data.version) {
3444 m->inline_version = cap->client_inline_version = i->inline_data.version;
3445 if (i->inline_data.length() > 0)
3446 m->inline_data = i->inline_data.get_data();
3447 } else {
3448 m->inline_version = 0;
3449 }
3450
3451 // max_size is min of projected, actual.
3452 uint64_t oldms = oi->client_ranges.count(client) ? oi->client_ranges[client].range.last : 0;
3453 uint64_t newms = pi->client_ranges.count(client) ? pi->client_ranges[client].range.last : 0;
3454 m->max_size = MIN(oldms, newms);
3455
3456 i = pauth ? pi:oi;
3457 m->head.mode = i->mode;
3458 m->head.uid = i->uid;
3459 m->head.gid = i->gid;
3460
3461 i = plink ? pi:oi;
3462 m->head.nlink = i->nlink;
3463
3464 i = pxattr ? pi:oi;
3465 map<string,bufferptr> *ix = pxattr ? get_projected_xattrs() : &xattrs;
3466 if ((cap->pending() & CEPH_CAP_XATTR_SHARED) &&
3467 i->xattr_version > cap->client_xattr_version) {
3468 dout(10) << " including xattrs v " << i->xattr_version << dendl;
3469 ::encode(*ix, m->xattrbl);
3470 m->head.xattr_version = i->xattr_version;
3471 cap->client_xattr_version = i->xattr_version;
3472 }
3473 }
3474
3475
3476
3477 void CInode::_encode_base(bufferlist& bl, uint64_t features)
3478 {
3479 ::encode(first, bl);
3480 ::encode(inode, bl, features);
3481 ::encode(symlink, bl);
3482 ::encode(dirfragtree, bl);
3483 ::encode(xattrs, bl);
3484 ::encode(old_inodes, bl, features);
3485 ::encode(damage_flags, bl);
3486 encode_snap(bl);
3487 }
3488 void CInode::_decode_base(bufferlist::iterator& p)
3489 {
3490 ::decode(first, p);
3491 ::decode(inode, p);
3492 ::decode(symlink, p);
3493 ::decode(dirfragtree, p);
3494 ::decode(xattrs, p);
3495 ::decode(old_inodes, p);
3496 ::decode(damage_flags, p);
3497 decode_snap(p);
3498 }
3499
3500 void CInode::_encode_locks_full(bufferlist& bl)
3501 {
3502 ::encode(authlock, bl);
3503 ::encode(linklock, bl);
3504 ::encode(dirfragtreelock, bl);
3505 ::encode(filelock, bl);
3506 ::encode(xattrlock, bl);
3507 ::encode(snaplock, bl);
3508 ::encode(nestlock, bl);
3509 ::encode(flocklock, bl);
3510 ::encode(policylock, bl);
3511
3512 ::encode(loner_cap, bl);
3513 }
3514 void CInode::_decode_locks_full(bufferlist::iterator& p)
3515 {
3516 ::decode(authlock, p);
3517 ::decode(linklock, p);
3518 ::decode(dirfragtreelock, p);
3519 ::decode(filelock, p);
3520 ::decode(xattrlock, p);
3521 ::decode(snaplock, p);
3522 ::decode(nestlock, p);
3523 ::decode(flocklock, p);
3524 ::decode(policylock, p);
3525
3526 ::decode(loner_cap, p);
3527 set_loner_cap(loner_cap);
3528 want_loner_cap = loner_cap; // for now, we'll eval() shortly.
3529 }
3530
3531 void CInode::_encode_locks_state_for_replica(bufferlist& bl)
3532 {
3533 authlock.encode_state_for_replica(bl);
3534 linklock.encode_state_for_replica(bl);
3535 dirfragtreelock.encode_state_for_replica(bl);
3536 filelock.encode_state_for_replica(bl);
3537 nestlock.encode_state_for_replica(bl);
3538 xattrlock.encode_state_for_replica(bl);
3539 snaplock.encode_state_for_replica(bl);
3540 flocklock.encode_state_for_replica(bl);
3541 policylock.encode_state_for_replica(bl);
3542 }
3543 void CInode::_encode_locks_state_for_rejoin(bufferlist& bl, int rep)
3544 {
3545 authlock.encode_state_for_replica(bl);
3546 linklock.encode_state_for_replica(bl);
3547 dirfragtreelock.encode_state_for_rejoin(bl, rep);
3548 filelock.encode_state_for_rejoin(bl, rep);
3549 nestlock.encode_state_for_rejoin(bl, rep);
3550 xattrlock.encode_state_for_replica(bl);
3551 snaplock.encode_state_for_replica(bl);
3552 flocklock.encode_state_for_replica(bl);
3553 policylock.encode_state_for_replica(bl);
3554 }
3555 void CInode::_decode_locks_state(bufferlist::iterator& p, bool is_new)
3556 {
3557 authlock.decode_state(p, is_new);
3558 linklock.decode_state(p, is_new);
3559 dirfragtreelock.decode_state(p, is_new);
3560 filelock.decode_state(p, is_new);
3561 nestlock.decode_state(p, is_new);
3562 xattrlock.decode_state(p, is_new);
3563 snaplock.decode_state(p, is_new);
3564 flocklock.decode_state(p, is_new);
3565 policylock.decode_state(p, is_new);
3566 }
3567 void CInode::_decode_locks_rejoin(bufferlist::iterator& p, list<MDSInternalContextBase*>& waiters,
3568 list<SimpleLock*>& eval_locks)
3569 {
3570 authlock.decode_state_rejoin(p, waiters);
3571 linklock.decode_state_rejoin(p, waiters);
3572 dirfragtreelock.decode_state_rejoin(p, waiters);
3573 filelock.decode_state_rejoin(p, waiters);
3574 nestlock.decode_state_rejoin(p, waiters);
3575 xattrlock.decode_state_rejoin(p, waiters);
3576 snaplock.decode_state_rejoin(p, waiters);
3577 flocklock.decode_state_rejoin(p, waiters);
3578 policylock.decode_state_rejoin(p, waiters);
3579
3580 if (!dirfragtreelock.is_stable() && !dirfragtreelock.is_wrlocked())
3581 eval_locks.push_back(&dirfragtreelock);
3582 if (!filelock.is_stable() && !filelock.is_wrlocked())
3583 eval_locks.push_back(&filelock);
3584 if (!nestlock.is_stable() && !nestlock.is_wrlocked())
3585 eval_locks.push_back(&nestlock);
3586 }
3587
3588
3589 // IMPORT/EXPORT
3590
3591 void CInode::encode_export(bufferlist& bl)
3592 {
3593 ENCODE_START(5, 4, bl);
3594 _encode_base(bl, mdcache->mds->mdsmap->get_up_features());
3595
3596 ::encode(state, bl);
3597
3598 ::encode(pop, bl);
3599
3600 ::encode(replica_map, bl);
3601
3602 // include scatterlock info for any bounding CDirs
3603 bufferlist bounding;
3604 if (inode.is_dir())
3605 for (compact_map<frag_t,CDir*>::iterator p = dirfrags.begin();
3606 p != dirfrags.end();
3607 ++p) {
3608 CDir *dir = p->second;
3609 if (dir->state_test(CDir::STATE_EXPORTBOUND)) {
3610 ::encode(p->first, bounding);
3611 ::encode(dir->fnode.fragstat, bounding);
3612 ::encode(dir->fnode.accounted_fragstat, bounding);
3613 ::encode(dir->fnode.rstat, bounding);
3614 ::encode(dir->fnode.accounted_rstat, bounding);
3615 dout(10) << " encoded fragstat/rstat info for " << *dir << dendl;
3616 }
3617 }
3618 ::encode(bounding, bl);
3619
3620 _encode_locks_full(bl);
3621
3622 _encode_file_locks(bl);
3623
3624 ENCODE_FINISH(bl);
3625
3626 get(PIN_TEMPEXPORTING);
3627 }
3628
3629 void CInode::finish_export(utime_t now)
3630 {
3631 state &= MASK_STATE_EXPORT_KEPT;
3632
3633 pop.zero(now);
3634
3635 // just in case!
3636 //dirlock.clear_updated();
3637
3638 loner_cap = -1;
3639
3640 put(PIN_TEMPEXPORTING);
3641 }
3642
3643 void CInode::decode_import(bufferlist::iterator& p,
3644 LogSegment *ls)
3645 {
3646 DECODE_START(5, p);
3647
3648 _decode_base(p);
3649
3650 unsigned s;
3651 ::decode(s, p);
3652 state_set(STATE_AUTH | (s & MASK_STATE_EXPORTED));
3653
3654 if (is_dirty()) {
3655 get(PIN_DIRTY);
3656 _mark_dirty(ls);
3657 }
3658 if (is_dirty_parent()) {
3659 get(PIN_DIRTYPARENT);
3660 _mark_dirty_parent(ls);
3661 }
3662
3663 ::decode(pop, ceph_clock_now(), p);
3664
3665 ::decode(replica_map, p);
3666 if (!replica_map.empty())
3667 get(PIN_REPLICATED);
3668 replica_nonce = 0;
3669
3670 // decode fragstat info on bounding cdirs
3671 bufferlist bounding;
3672 ::decode(bounding, p);
3673 bufferlist::iterator q = bounding.begin();
3674 while (!q.end()) {
3675 frag_t fg;
3676 ::decode(fg, q);
3677 CDir *dir = get_dirfrag(fg);
3678 assert(dir); // we should have all bounds open
3679
3680 // Only take the remote's fragstat/rstat if we are non-auth for
3681 // this dirfrag AND the lock is NOT in a scattered (MIX) state.
3682 // We know lock is stable, and MIX is the only state in which
3683 // the inode auth (who sent us this data) may not have the best
3684 // info.
3685
3686 // HMM: Are there cases where dir->is_auth() is an insufficient
3687 // check because the dirfrag is under migration? That implies
3688 // it is frozen (and in a SYNC or LOCK state). FIXME.
3689
3690 if (dir->is_auth() ||
3691 filelock.get_state() == LOCK_MIX) {
3692 dout(10) << " skipped fragstat info for " << *dir << dendl;
3693 frag_info_t f;
3694 ::decode(f, q);
3695 ::decode(f, q);
3696 } else {
3697 ::decode(dir->fnode.fragstat, q);
3698 ::decode(dir->fnode.accounted_fragstat, q);
3699 dout(10) << " took fragstat info for " << *dir << dendl;
3700 }
3701 if (dir->is_auth() ||
3702 nestlock.get_state() == LOCK_MIX) {
3703 dout(10) << " skipped rstat info for " << *dir << dendl;
3704 nest_info_t n;
3705 ::decode(n, q);
3706 ::decode(n, q);
3707 } else {
3708 ::decode(dir->fnode.rstat, q);
3709 ::decode(dir->fnode.accounted_rstat, q);
3710 dout(10) << " took rstat info for " << *dir << dendl;
3711 }
3712 }
3713
3714 _decode_locks_full(p);
3715
3716 _decode_file_locks(p);
3717
3718 DECODE_FINISH(p);
3719 }
3720
3721
3722 void InodeStoreBase::dump(Formatter *f) const
3723 {
3724 inode.dump(f);
3725 f->dump_string("symlink", symlink);
3726 f->open_array_section("old_inodes");
3727 for (compact_map<snapid_t, old_inode_t>::const_iterator i = old_inodes.begin();
3728 i != old_inodes.end(); ++i) {
3729 f->open_object_section("old_inode");
3730 {
3731 // The key is the last snapid, the first is in the old_inode_t
3732 f->dump_int("last", i->first);
3733 i->second.dump(f);
3734 }
3735 f->close_section(); // old_inode
3736 }
3737 f->close_section(); // old_inodes
3738
3739 f->open_object_section("dirfragtree");
3740 dirfragtree.dump(f);
3741 f->close_section(); // dirfragtree
3742 }
3743
3744
3745 void InodeStore::generate_test_instances(list<InodeStore*> &ls)
3746 {
3747 InodeStore *populated = new InodeStore;
3748 populated->inode.ino = 0xdeadbeef;
3749 populated->symlink = "rhubarb";
3750 ls.push_back(populated);
3751 }
3752
3753 void CInode::validate_disk_state(CInode::validated_data *results,
3754 MDSInternalContext *fin)
3755 {
3756 class ValidationContinuation : public MDSContinuation {
3757 public:
3758 MDSInternalContext *fin;
3759 CInode *in;
3760 CInode::validated_data *results;
3761 bufferlist bl;
3762 CInode *shadow_in;
3763
3764 enum {
3765 START = 0,
3766 BACKTRACE,
3767 INODE,
3768 DIRFRAGS
3769 };
3770
3771 ValidationContinuation(CInode *i,
3772 CInode::validated_data *data_r,
3773 MDSInternalContext *fin_) :
3774 MDSContinuation(i->mdcache->mds->server),
3775 fin(fin_),
3776 in(i),
3777 results(data_r),
3778 shadow_in(NULL) {
3779 set_callback(START, static_cast<Continuation::stagePtr>(&ValidationContinuation::_start));
3780 set_callback(BACKTRACE, static_cast<Continuation::stagePtr>(&ValidationContinuation::_backtrace));
3781 set_callback(INODE, static_cast<Continuation::stagePtr>(&ValidationContinuation::_inode_disk));
3782 set_callback(DIRFRAGS, static_cast<Continuation::stagePtr>(&ValidationContinuation::_dirfrags));
3783 }
3784
3785 ~ValidationContinuation() override {
3786 delete shadow_in;
3787 }
3788
3789 /**
3790 * Fetch backtrace and set tag if tag is non-empty
3791 */
3792 void fetch_backtrace_and_tag(CInode *in, std::string tag,
3793 Context *fin, int *bt_r, bufferlist *bt)
3794 {
3795 const int64_t pool = in->get_backtrace_pool();
3796 object_t oid = CInode::get_object_name(in->ino(), frag_t(), "");
3797
3798 ObjectOperation fetch;
3799 fetch.getxattr("parent", bt, bt_r);
3800 in->mdcache->mds->objecter->read(oid, object_locator_t(pool), fetch, CEPH_NOSNAP,
3801 NULL, 0, fin);
3802 if (!tag.empty()) {
3803 ObjectOperation scrub_tag;
3804 bufferlist tag_bl;
3805 ::encode(tag, tag_bl);
3806 scrub_tag.setxattr("scrub_tag", tag_bl);
3807 SnapContext snapc;
3808 in->mdcache->mds->objecter->mutate(oid, object_locator_t(pool), scrub_tag, snapc,
3809 ceph::real_clock::now(),
3810 0, NULL);
3811 }
3812 }
3813
3814 bool _start(int rval) {
3815 if (in->is_dirty()) {
3816 MDCache *mdcache = in->mdcache;
3817 inode_t& inode = in->inode;
3818 dout(20) << "validating a dirty CInode; results will be inconclusive"
3819 << dendl;
3820 }
3821 if (in->is_symlink()) {
3822 // there's nothing to do for symlinks!
3823 return true;
3824 }
3825
3826 C_OnFinisher *conf = new C_OnFinisher(get_io_callback(BACKTRACE),
3827 in->mdcache->mds->finisher);
3828
3829 // Whether we have a tag to apply depends on ScrubHeader (if one is
3830 // present)
3831 if (in->scrub_infop) {
3832 // I'm a non-orphan, so look up my ScrubHeader via my linkage
3833 const std::string &tag = in->scrub_infop->header->get_tag();
3834 // Rather than using the usual CInode::fetch_backtrace,
3835 // use a special variant that optionally writes a tag in the same
3836 // operation.
3837 fetch_backtrace_and_tag(in, tag, conf,
3838 &results->backtrace.ondisk_read_retval, &bl);
3839 } else {
3840 // When we're invoked outside of ScrubStack we might be called
3841 // on an orphaned inode like /
3842 fetch_backtrace_and_tag(in, {}, conf,
3843 &results->backtrace.ondisk_read_retval, &bl);
3844 }
3845 return false;
3846 }
3847
3848 bool _backtrace(int rval) {
3849 // set up basic result reporting and make sure we got the data
3850 results->performed_validation = true; // at least, some of it!
3851 results->backtrace.checked = true;
3852
3853 const int64_t pool = in->get_backtrace_pool();
3854 inode_backtrace_t& memory_backtrace = results->backtrace.memory_value;
3855 in->build_backtrace(pool, memory_backtrace);
3856 bool equivalent, divergent;
3857 int memory_newer;
3858
3859 MDCache *mdcache = in->mdcache; // For the benefit of dout
3860 const inode_t& inode = in->inode; // For the benefit of dout
3861
3862 // Ignore rval because it's the result of a FAILOK operation
3863 // from fetch_backtrace_and_tag: the real result is in
3864 // backtrace.ondisk_read_retval
3865 dout(20) << "ondisk_read_retval: " << results->backtrace.ondisk_read_retval << dendl;
3866 if (results->backtrace.ondisk_read_retval != 0) {
3867 results->backtrace.error_str << "failed to read off disk; see retval";
3868 goto next;
3869 }
3870
3871 // extract the backtrace, and compare it to a newly-constructed one
3872 try {
3873 bufferlist::iterator p = bl.begin();
3874 ::decode(results->backtrace.ondisk_value, p);
3875 dout(10) << "decoded " << bl.length() << " bytes of backtrace successfully" << dendl;
3876 } catch (buffer::error&) {
3877 if (results->backtrace.ondisk_read_retval == 0 && rval != 0) {
3878 // Cases where something has clearly gone wrong with the overall
3879 // fetch op, though we didn't get a nonzero rc from the getxattr
3880 // operation. e.g. object missing.
3881 results->backtrace.ondisk_read_retval = rval;
3882 }
3883 results->backtrace.error_str << "failed to decode on-disk backtrace ("
3884 << bl.length() << " bytes)!";
3885 goto next;
3886 }
3887
3888 memory_newer = memory_backtrace.compare(results->backtrace.ondisk_value,
3889 &equivalent, &divergent);
3890
3891 if (divergent || memory_newer < 0) {
3892 // we're divergent, or on-disk version is newer
3893 results->backtrace.error_str << "On-disk backtrace is divergent or newer";
3894 } else {
3895 results->backtrace.passed = true;
3896 }
3897 next:
3898
3899 if (!results->backtrace.passed && in->scrub_infop->header->get_repair()) {
3900 std::string path;
3901 in->make_path_string(path);
3902 in->mdcache->mds->clog->warn() << "bad backtrace on inode " << *in
3903 << ", rewriting it at " << path;
3904 in->_mark_dirty_parent(in->mdcache->mds->mdlog->get_current_segment(),
3905 false);
3906 }
3907
3908 // If the inode's number was free in the InoTable, fix that
3909 // (#15619)
3910 {
3911 InoTable *inotable = mdcache->mds->inotable;
3912
3913 dout(10) << "scrub: inotable ino = 0x" << std::hex << inode.ino << dendl;
3914 dout(10) << "scrub: inotable free says "
3915 << inotable->is_marked_free(inode.ino) << dendl;
3916
3917 if (inotable->is_marked_free(inode.ino)) {
3918 LogChannelRef clog = in->mdcache->mds->clog;
3919 clog->error() << "scrub: inode wrongly marked free: 0x" << std::hex
3920 << inode.ino;
3921
3922 if (in->scrub_infop->header->get_repair()) {
3923 bool repaired = inotable->repair(inode.ino);
3924 if (repaired) {
3925 clog->error() << "inode table repaired for inode: 0x" << std::hex
3926 << inode.ino;
3927
3928 inotable->save();
3929 } else {
3930 clog->error() << "Cannot repair inotable while other operations"
3931 " are in progress";
3932 }
3933 }
3934 }
3935 }
3936
3937 // quit if we're a file, or kick off directory checks otherwise
3938 // TODO: validate on-disk inode for non-base directories
3939 if (!in->is_dir()) {
3940 return true;
3941 }
3942
3943 return validate_directory_data();
3944 }
3945
3946 bool validate_directory_data() {
3947 assert(in->is_dir());
3948
3949 if (in->is_base()) {
3950 shadow_in = new CInode(in->mdcache);
3951 in->mdcache->create_unlinked_system_inode(shadow_in,
3952 in->inode.ino,
3953 in->inode.mode);
3954 shadow_in->fetch(get_internal_callback(INODE));
3955 return false;
3956 } else {
3957 results->inode.passed = true;
3958 return check_dirfrag_rstats();
3959 }
3960 }
3961
3962 bool _inode_disk(int rval) {
3963 results->inode.checked = true;
3964 results->inode.ondisk_read_retval = rval;
3965 results->inode.ondisk_value = shadow_in->inode;
3966 results->inode.memory_value = in->inode;
3967
3968 inode_t& si = shadow_in->inode;
3969 inode_t& i = in->inode;
3970 if (si.version > i.version) {
3971 // uh, what?
3972 results->inode.error_str << "On-disk inode is newer than in-memory one!";
3973 goto next;
3974 } else {
3975 bool divergent = false;
3976 int r = i.compare(si, &divergent);
3977 results->inode.passed = !divergent && r >= 0;
3978 if (!results->inode.passed) {
3979 results->inode.error_str <<
3980 "On-disk inode is divergent or newer than in-memory one!";
3981 goto next;
3982 }
3983 }
3984 next:
3985 return check_dirfrag_rstats();
3986 }
3987
3988 bool check_dirfrag_rstats() {
3989 MDSGatherBuilder gather(g_ceph_context);
3990 std::list<frag_t> frags;
3991 in->dirfragtree.get_leaves(frags);
3992 for (list<frag_t>::iterator p = frags.begin();
3993 p != frags.end();
3994 ++p) {
3995 CDir *dir = in->get_or_open_dirfrag(in->mdcache, *p);
3996 dir->scrub_info();
3997 if (!dir->scrub_infop->header)
3998 dir->scrub_infop->header = in->scrub_infop->header;
3999 if (dir->is_complete()) {
4000 dir->scrub_local();
4001 } else {
4002 dir->scrub_infop->need_scrub_local = true;
4003 dir->fetch(gather.new_sub(), false);
4004 }
4005 }
4006 if (gather.has_subs()) {
4007 gather.set_finisher(get_internal_callback(DIRFRAGS));
4008 gather.activate();
4009 return false;
4010 } else {
4011 return immediate(DIRFRAGS, 0);
4012 }
4013 }
4014
4015 bool _dirfrags(int rval) {
4016 int frags_errors = 0;
4017 // basic reporting setup
4018 results->raw_stats.checked = true;
4019 results->raw_stats.ondisk_read_retval = rval;
4020
4021 results->raw_stats.memory_value.dirstat = in->inode.dirstat;
4022 results->raw_stats.memory_value.rstat = in->inode.rstat;
4023 frag_info_t& dir_info = results->raw_stats.ondisk_value.dirstat;
4024 nest_info_t& nest_info = results->raw_stats.ondisk_value.rstat;
4025
4026 if (rval != 0) {
4027 results->raw_stats.error_str << "Failed to read dirfrags off disk";
4028 goto next;
4029 }
4030
4031 // check each dirfrag...
4032 for (compact_map<frag_t,CDir*>::iterator p = in->dirfrags.begin();
4033 p != in->dirfrags.end();
4034 ++p) {
4035 CDir *dir = p->second;
4036 assert(dir->get_version() > 0);
4037 nest_info.add(dir->fnode.accounted_rstat);
4038 dir_info.add(dir->fnode.accounted_fragstat);
4039 if (dir->scrub_infop &&
4040 dir->scrub_infop->pending_scrub_error) {
4041 dir->scrub_infop->pending_scrub_error = false;
4042 if (dir->scrub_infop->header->get_repair()) {
4043 results->raw_stats.error_str
4044 << "dirfrag(" << p->first << ") has bad stats (will be fixed); ";
4045 } else {
4046 results->raw_stats.error_str
4047 << "dirfrag(" << p->first << ") has bad stats; ";
4048 }
4049 frags_errors++;
4050 }
4051 }
4052 nest_info.rsubdirs++; // it gets one to account for self
4053 // ...and that their sum matches our inode settings
4054 if (!dir_info.same_sums(in->inode.dirstat) ||
4055 !nest_info.same_sums(in->inode.rstat)) {
4056 if (in->scrub_infop &&
4057 in->scrub_infop->header->get_repair()) {
4058 results->raw_stats.error_str
4059 << "freshly-calculated rstats don't match existing ones (will be fixed)";
4060 in->mdcache->repair_inode_stats(in);
4061 } else {
4062 results->raw_stats.error_str
4063 << "freshly-calculated rstats don't match existing ones";
4064 }
4065 goto next;
4066 }
4067 if (frags_errors > 0)
4068 goto next;
4069
4070 results->raw_stats.passed = true;
4071 next:
4072 return true;
4073 }
4074
4075 void _done() override {
4076 if ((!results->raw_stats.checked || results->raw_stats.passed) &&
4077 (!results->backtrace.checked || results->backtrace.passed) &&
4078 (!results->inode.checked || results->inode.passed))
4079 results->passed_validation = true;
4080 if (fin) {
4081 fin->complete(get_rval());
4082 }
4083 }
4084 };
4085
4086
4087 dout(10) << "scrub starting validate_disk_state on " << *this << dendl;
4088 ValidationContinuation *vc = new ValidationContinuation(this,
4089 results,
4090 fin);
4091 vc->begin();
4092 }
4093
4094 void CInode::validated_data::dump(Formatter *f) const
4095 {
4096 f->open_object_section("results");
4097 {
4098 f->dump_bool("performed_validation", performed_validation);
4099 f->dump_bool("passed_validation", passed_validation);
4100 f->open_object_section("backtrace");
4101 {
4102 f->dump_bool("checked", backtrace.checked);
4103 f->dump_bool("passed", backtrace.passed);
4104 f->dump_int("read_ret_val", backtrace.ondisk_read_retval);
4105 f->dump_stream("ondisk_value") << backtrace.ondisk_value;
4106 f->dump_stream("memoryvalue") << backtrace.memory_value;
4107 f->dump_string("error_str", backtrace.error_str.str());
4108 }
4109 f->close_section(); // backtrace
4110 f->open_object_section("raw_stats");
4111 {
4112 f->dump_bool("checked", raw_stats.checked);
4113 f->dump_bool("passed", raw_stats.passed);
4114 f->dump_int("read_ret_val", raw_stats.ondisk_read_retval);
4115 f->dump_stream("ondisk_value.dirstat") << raw_stats.ondisk_value.dirstat;
4116 f->dump_stream("ondisk_value.rstat") << raw_stats.ondisk_value.rstat;
4117 f->dump_stream("memory_value.dirrstat") << raw_stats.memory_value.dirstat;
4118 f->dump_stream("memory_value.rstat") << raw_stats.memory_value.rstat;
4119 f->dump_string("error_str", raw_stats.error_str.str());
4120 }
4121 f->close_section(); // raw_stats
4122 // dump failure return code
4123 int rc = 0;
4124 if (backtrace.checked && backtrace.ondisk_read_retval)
4125 rc = backtrace.ondisk_read_retval;
4126 if (inode.checked && inode.ondisk_read_retval)
4127 rc = inode.ondisk_read_retval;
4128 if (raw_stats.checked && raw_stats.ondisk_read_retval)
4129 rc = raw_stats.ondisk_read_retval;
4130 f->dump_int("return_code", rc);
4131 }
4132 f->close_section(); // results
4133 }
4134
4135 void CInode::dump(Formatter *f) const
4136 {
4137 InodeStoreBase::dump(f);
4138
4139 MDSCacheObject::dump(f);
4140
4141 f->open_object_section("versionlock");
4142 versionlock.dump(f);
4143 f->close_section();
4144
4145 f->open_object_section("authlock");
4146 authlock.dump(f);
4147 f->close_section();
4148
4149 f->open_object_section("linklock");
4150 linklock.dump(f);
4151 f->close_section();
4152
4153 f->open_object_section("dirfragtreelock");
4154 dirfragtreelock.dump(f);
4155 f->close_section();
4156
4157 f->open_object_section("filelock");
4158 filelock.dump(f);
4159 f->close_section();
4160
4161 f->open_object_section("xattrlock");
4162 xattrlock.dump(f);
4163 f->close_section();
4164
4165 f->open_object_section("snaplock");
4166 snaplock.dump(f);
4167 f->close_section();
4168
4169 f->open_object_section("nestlock");
4170 nestlock.dump(f);
4171 f->close_section();
4172
4173 f->open_object_section("flocklock");
4174 flocklock.dump(f);
4175 f->close_section();
4176
4177 f->open_object_section("policylock");
4178 policylock.dump(f);
4179 f->close_section();
4180
4181 f->open_array_section("states");
4182 MDSCacheObject::dump_states(f);
4183 if (state_test(STATE_EXPORTING))
4184 f->dump_string("state", "exporting");
4185 if (state_test(STATE_OPENINGDIR))
4186 f->dump_string("state", "openingdir");
4187 if (state_test(STATE_FREEZING))
4188 f->dump_string("state", "freezing");
4189 if (state_test(STATE_FROZEN))
4190 f->dump_string("state", "frozen");
4191 if (state_test(STATE_AMBIGUOUSAUTH))
4192 f->dump_string("state", "ambiguousauth");
4193 if (state_test(STATE_EXPORTINGCAPS))
4194 f->dump_string("state", "exportingcaps");
4195 if (state_test(STATE_NEEDSRECOVER))
4196 f->dump_string("state", "needsrecover");
4197 if (state_test(STATE_PURGING))
4198 f->dump_string("state", "purging");
4199 if (state_test(STATE_DIRTYPARENT))
4200 f->dump_string("state", "dirtyparent");
4201 if (state_test(STATE_DIRTYRSTAT))
4202 f->dump_string("state", "dirtyrstat");
4203 if (state_test(STATE_STRAYPINNED))
4204 f->dump_string("state", "straypinned");
4205 if (state_test(STATE_FROZENAUTHPIN))
4206 f->dump_string("state", "frozenauthpin");
4207 if (state_test(STATE_DIRTYPOOL))
4208 f->dump_string("state", "dirtypool");
4209 if (state_test(STATE_ORPHAN))
4210 f->dump_string("state", "orphan");
4211 if (state_test(STATE_MISSINGOBJS))
4212 f->dump_string("state", "missingobjs");
4213 f->close_section();
4214
4215 f->open_array_section("client_caps");
4216 for (map<client_t,Capability*>::const_iterator it = client_caps.begin();
4217 it != client_caps.end(); ++it) {
4218 f->open_object_section("client_cap");
4219 f->dump_int("client_id", it->first.v);
4220 f->dump_string("pending", ccap_string(it->second->pending()));
4221 f->dump_string("issued", ccap_string(it->second->issued()));
4222 f->dump_string("wanted", ccap_string(it->second->wanted()));
4223 f->dump_string("last_sent", ccap_string(it->second->get_last_sent()));
4224 f->close_section();
4225 }
4226 f->close_section();
4227
4228 f->dump_int("loner", loner_cap.v);
4229 f->dump_int("want_loner", want_loner_cap.v);
4230
4231 f->open_array_section("mds_caps_wanted");
4232 for (compact_map<int,int>::const_iterator p = mds_caps_wanted.begin();
4233 p != mds_caps_wanted.end(); ++p) {
4234 f->open_object_section("mds_cap_wanted");
4235 f->dump_int("rank", p->first);
4236 f->dump_string("cap", ccap_string(p->second));
4237 f->close_section();
4238 }
4239 f->close_section();
4240 }
4241
4242 /****** Scrub Stuff *****/
4243 void CInode::scrub_info_create() const
4244 {
4245 dout(25) << __func__ << dendl;
4246 assert(!scrub_infop);
4247
4248 // break out of const-land to set up implicit initial state
4249 CInode *me = const_cast<CInode*>(this);
4250 inode_t *in = me->get_projected_inode();
4251
4252 scrub_info_t *si = new scrub_info_t();
4253 si->scrub_start_stamp = si->last_scrub_stamp = in->last_scrub_stamp;
4254 si->scrub_start_version = si->last_scrub_version = in->last_scrub_version;
4255
4256 me->scrub_infop = si;
4257 }
4258
4259 void CInode::scrub_maybe_delete_info()
4260 {
4261 if (scrub_infop &&
4262 !scrub_infop->scrub_in_progress &&
4263 !scrub_infop->last_scrub_dirty) {
4264 delete scrub_infop;
4265 scrub_infop = NULL;
4266 }
4267 }
4268
4269 void CInode::scrub_initialize(CDentry *scrub_parent,
4270 const ScrubHeaderRefConst& header,
4271 MDSInternalContextBase *f)
4272 {
4273 dout(20) << __func__ << " with scrub_version " << get_version() << dendl;
4274 assert(!scrub_is_in_progress());
4275 scrub_info();
4276 if (!scrub_infop)
4277 scrub_infop = new scrub_info_t();
4278
4279 if (get_projected_inode()->is_dir()) {
4280 // fill in dirfrag_stamps with initial state
4281 std::list<frag_t> frags;
4282 dirfragtree.get_leaves(frags);
4283 for (std::list<frag_t>::iterator i = frags.begin();
4284 i != frags.end();
4285 ++i) {
4286 if (header->get_force())
4287 scrub_infop->dirfrag_stamps[*i].reset();
4288 else
4289 scrub_infop->dirfrag_stamps[*i];
4290 }
4291 }
4292
4293 if (scrub_parent)
4294 scrub_parent->get(CDentry::PIN_SCRUBPARENT);
4295 scrub_infop->scrub_parent = scrub_parent;
4296 scrub_infop->on_finish = f;
4297 scrub_infop->scrub_in_progress = true;
4298 scrub_infop->children_scrubbed = false;
4299 scrub_infop->header = header;
4300
4301 scrub_infop->scrub_start_version = get_version();
4302 scrub_infop->scrub_start_stamp = ceph_clock_now();
4303 // right now we don't handle remote inodes
4304 }
4305
4306 int CInode::scrub_dirfrag_next(frag_t* out_dirfrag)
4307 {
4308 dout(20) << __func__ << dendl;
4309 assert(scrub_is_in_progress());
4310
4311 if (!is_dir()) {
4312 return -ENOTDIR;
4313 }
4314
4315 std::map<frag_t, scrub_stamp_info_t>::iterator i =
4316 scrub_infop->dirfrag_stamps.begin();
4317
4318 while (i != scrub_infop->dirfrag_stamps.end()) {
4319 if (i->second.scrub_start_version < scrub_infop->scrub_start_version) {
4320 i->second.scrub_start_version = get_projected_version();
4321 i->second.scrub_start_stamp = ceph_clock_now();
4322 *out_dirfrag = i->first;
4323 dout(20) << " return frag " << *out_dirfrag << dendl;
4324 return 0;
4325 }
4326 ++i;
4327 }
4328
4329 dout(20) << " no frags left, ENOENT " << dendl;
4330 return ENOENT;
4331 }
4332
4333 void CInode::scrub_dirfrags_scrubbing(list<frag_t>* out_dirfrags)
4334 {
4335 assert(out_dirfrags != NULL);
4336 assert(scrub_infop != NULL);
4337
4338 out_dirfrags->clear();
4339 std::map<frag_t, scrub_stamp_info_t>::iterator i =
4340 scrub_infop->dirfrag_stamps.begin();
4341
4342 while (i != scrub_infop->dirfrag_stamps.end()) {
4343 if (i->second.scrub_start_version >= scrub_infop->scrub_start_version) {
4344 if (i->second.last_scrub_version < scrub_infop->scrub_start_version)
4345 out_dirfrags->push_back(i->first);
4346 } else {
4347 return;
4348 }
4349
4350 ++i;
4351 }
4352 }
4353
4354 void CInode::scrub_dirfrag_finished(frag_t dirfrag)
4355 {
4356 dout(20) << __func__ << " on frag " << dirfrag << dendl;
4357 assert(scrub_is_in_progress());
4358
4359 std::map<frag_t, scrub_stamp_info_t>::iterator i =
4360 scrub_infop->dirfrag_stamps.find(dirfrag);
4361 assert(i != scrub_infop->dirfrag_stamps.end());
4362
4363 scrub_stamp_info_t &si = i->second;
4364 si.last_scrub_stamp = si.scrub_start_stamp;
4365 si.last_scrub_version = si.scrub_start_version;
4366 }
4367
4368 void CInode::scrub_finished(MDSInternalContextBase **c) {
4369 dout(20) << __func__ << dendl;
4370 assert(scrub_is_in_progress());
4371 for (std::map<frag_t, scrub_stamp_info_t>::iterator i =
4372 scrub_infop->dirfrag_stamps.begin();
4373 i != scrub_infop->dirfrag_stamps.end();
4374 ++i) {
4375 if(i->second.last_scrub_version != i->second.scrub_start_version) {
4376 derr << i->second.last_scrub_version << " != "
4377 << i->second.scrub_start_version << dendl;
4378 }
4379 assert(i->second.last_scrub_version == i->second.scrub_start_version);
4380 }
4381
4382 scrub_infop->last_scrub_version = scrub_infop->scrub_start_version;
4383 scrub_infop->last_scrub_stamp = scrub_infop->scrub_start_stamp;
4384 scrub_infop->last_scrub_dirty = true;
4385 scrub_infop->scrub_in_progress = false;
4386
4387 if (scrub_infop->scrub_parent) {
4388 CDentry *dn = scrub_infop->scrub_parent;
4389 scrub_infop->scrub_parent = NULL;
4390 dn->dir->scrub_dentry_finished(dn);
4391 dn->put(CDentry::PIN_SCRUBPARENT);
4392 }
4393
4394 *c = scrub_infop->on_finish;
4395 scrub_infop->on_finish = NULL;
4396
4397 if (scrub_infop->header->get_origin() == this) {
4398 // We are at the point that a tagging scrub was initiated
4399 LogChannelRef clog = mdcache->mds->clog;
4400 clog->info() << "scrub complete with tag '" << scrub_infop->header->get_tag() << "'";
4401 }
4402 }
4403
4404 int64_t CInode::get_backtrace_pool() const
4405 {
4406 if (is_dir()) {
4407 return mdcache->mds->mdsmap->get_metadata_pool();
4408 } else {
4409 // Files are required to have an explicit layout that specifies
4410 // a pool
4411 assert(inode.layout.pool_id != -1);
4412 return inode.layout.pool_id;
4413 }
4414 }
4415
4416 void CInode::maybe_export_pin(bool update)
4417 {
4418 if (!g_conf->mds_bal_export_pin)
4419 return;
4420 if (!is_dir() || !is_normal())
4421 return;
4422
4423 mds_rank_t export_pin = get_export_pin(false);
4424 if (export_pin == MDS_RANK_NONE && !update)
4425 return;
4426
4427 if (state_test(CInode::STATE_QUEUEDEXPORTPIN))
4428 return;
4429
4430 bool queue = false;
4431 for (auto p = dirfrags.begin(); p != dirfrags.end(); p++) {
4432 CDir *dir = p->second;
4433 if (!dir->is_auth())
4434 continue;
4435 if (export_pin != MDS_RANK_NONE) {
4436 if (dir->is_subtree_root()) {
4437 // set auxsubtree bit or export it
4438 if (!dir->state_test(CDir::STATE_AUXSUBTREE) ||
4439 export_pin != dir->get_dir_auth().first)
4440 queue = true;
4441 } else {
4442 // create aux subtree or export it
4443 queue = true;
4444 }
4445 } else {
4446 // clear aux subtrees ?
4447 queue = dir->state_test(CDir::STATE_AUXSUBTREE);
4448 }
4449 if (queue) {
4450 state_set(CInode::STATE_QUEUEDEXPORTPIN);
4451 mdcache->export_pin_queue.insert(this);
4452 break;
4453 }
4454 }
4455 }
4456
4457 void CInode::set_export_pin(mds_rank_t rank)
4458 {
4459 assert(is_dir());
4460 assert(is_projected());
4461 get_projected_inode()->export_pin = rank;
4462 maybe_export_pin(true);
4463 }
4464
4465 mds_rank_t CInode::get_export_pin(bool inherit) const
4466 {
4467 /* An inode that is export pinned may not necessarily be a subtree root, we
4468 * need to traverse the parents. A base or system inode cannot be pinned.
4469 * N.B. inodes not yet linked into a dir (i.e. anonymous inodes) will not
4470 * have a parent yet.
4471 */
4472 for (const CInode *in = this; !in->is_base() && !in->is_system() && in->get_projected_parent_dn(); in = in->get_projected_parent_dn()->dir->inode) {
4473 mds_rank_t pin = in->get_projected_inode()->export_pin;
4474 if (pin >= 0) {
4475 return pin;
4476 }
4477 if (!inherit) break;
4478 }
4479 return MDS_RANK_NONE;
4480 }
4481
4482 bool CInode::is_exportable(mds_rank_t dest) const
4483 {
4484 mds_rank_t pin = get_export_pin();
4485 if (pin == dest) {
4486 return true;
4487 } else if (pin >= 0) {
4488 return false;
4489 } else {
4490 return true;
4491 }
4492 }