]> git.proxmox.com Git - ceph.git/blob - ceph/src/mds/CDir.cc
update sources to v12.1.1
[ceph.git] / ceph / src / mds / CDir.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
16 #include "include/types.h"
17
18 #include "CDir.h"
19 #include "CDentry.h"
20 #include "CInode.h"
21 #include "Mutation.h"
22
23 #include "MDSMap.h"
24 #include "MDSRank.h"
25 #include "MDCache.h"
26 #include "Locker.h"
27 #include "MDLog.h"
28 #include "LogSegment.h"
29
30 #include "common/bloom_filter.hpp"
31 #include "include/Context.h"
32 #include "common/Clock.h"
33
34 #include "osdc/Objecter.h"
35
36 #include "common/config.h"
37 #include "include/assert.h"
38 #include "include/compat.h"
39
40 #define dout_context g_ceph_context
41 #define dout_subsys ceph_subsys_mds
42 #undef dout_prefix
43 #define dout_prefix *_dout << "mds." << cache->mds->get_nodeid() << ".cache.dir(" << this->dirfrag() << ") "
44
45 int CDir::num_frozen_trees = 0;
46 int CDir::num_freezing_trees = 0;
47
48 class CDirContext : public MDSInternalContextBase
49 {
50 protected:
51 CDir *dir;
52 MDSRank* get_mds() override {return dir->cache->mds;}
53
54 public:
55 explicit CDirContext(CDir *d) : dir(d) {
56 assert(dir != NULL);
57 }
58 };
59
60
61 class CDirIOContext : public MDSIOContextBase
62 {
63 protected:
64 CDir *dir;
65 MDSRank* get_mds() override {return dir->cache->mds;}
66
67 public:
68 explicit CDirIOContext(CDir *d) : dir(d) {
69 assert(dir != NULL);
70 }
71 };
72
73
74 // PINS
75 //int cdir_pins[CDIR_NUM_PINS] = { 0,0,0,0,0,0,0,0,0,0,0,0,0,0 };
76
77
78 ostream& operator<<(ostream& out, const CDir& dir)
79 {
80 out << "[dir " << dir.dirfrag() << " " << dir.get_path() << "/"
81 << " [" << dir.first << ",head]";
82 if (dir.is_auth()) {
83 out << " auth";
84 if (dir.is_replicated())
85 out << dir.get_replicas();
86
87 if (dir.is_projected())
88 out << " pv=" << dir.get_projected_version();
89 out << " v=" << dir.get_version();
90 out << " cv=" << dir.get_committing_version();
91 out << "/" << dir.get_committed_version();
92 } else {
93 mds_authority_t a = dir.authority();
94 out << " rep@" << a.first;
95 if (a.second != CDIR_AUTH_UNKNOWN)
96 out << "," << a.second;
97 out << "." << dir.get_replica_nonce();
98 }
99
100 if (dir.is_rep()) out << " REP";
101
102 if (dir.get_dir_auth() != CDIR_AUTH_DEFAULT) {
103 if (dir.get_dir_auth().second == CDIR_AUTH_UNKNOWN)
104 out << " dir_auth=" << dir.get_dir_auth().first;
105 else
106 out << " dir_auth=" << dir.get_dir_auth();
107 }
108
109 if (dir.get_cum_auth_pins())
110 out << " ap=" << dir.get_auth_pins()
111 << "+" << dir.get_dir_auth_pins()
112 << "+" << dir.get_nested_auth_pins();
113
114 out << " state=" << dir.get_state();
115 if (dir.state_test(CDir::STATE_COMPLETE)) out << "|complete";
116 if (dir.state_test(CDir::STATE_FREEZINGTREE)) out << "|freezingtree";
117 if (dir.state_test(CDir::STATE_FROZENTREE)) out << "|frozentree";
118 if (dir.state_test(CDir::STATE_AUXSUBTREE)) out << "|auxsubtree";
119 //if (dir.state_test(CDir::STATE_FROZENTREELEAF)) out << "|frozentreeleaf";
120 if (dir.state_test(CDir::STATE_FROZENDIR)) out << "|frozendir";
121 if (dir.state_test(CDir::STATE_FREEZINGDIR)) out << "|freezingdir";
122 if (dir.state_test(CDir::STATE_EXPORTBOUND)) out << "|exportbound";
123 if (dir.state_test(CDir::STATE_IMPORTBOUND)) out << "|importbound";
124 if (dir.state_test(CDir::STATE_BADFRAG)) out << "|badfrag";
125 if (dir.state_test(CDir::STATE_FRAGMENTING)) out << "|fragmenting";
126
127 // fragstat
128 out << " " << dir.fnode.fragstat;
129 if (!(dir.fnode.fragstat == dir.fnode.accounted_fragstat))
130 out << "/" << dir.fnode.accounted_fragstat;
131 if (g_conf->mds_debug_scatterstat && dir.is_projected()) {
132 const fnode_t *pf = dir.get_projected_fnode();
133 out << "->" << pf->fragstat;
134 if (!(pf->fragstat == pf->accounted_fragstat))
135 out << "/" << pf->accounted_fragstat;
136 }
137
138 // rstat
139 out << " " << dir.fnode.rstat;
140 if (!(dir.fnode.rstat == dir.fnode.accounted_rstat))
141 out << "/" << dir.fnode.accounted_rstat;
142 if (g_conf->mds_debug_scatterstat && dir.is_projected()) {
143 const fnode_t *pf = dir.get_projected_fnode();
144 out << "->" << pf->rstat;
145 if (!(pf->rstat == pf->accounted_rstat))
146 out << "/" << pf->accounted_rstat;
147 }
148
149 out << " hs=" << dir.get_num_head_items() << "+" << dir.get_num_head_null();
150 out << ",ss=" << dir.get_num_snap_items() << "+" << dir.get_num_snap_null();
151 if (dir.get_num_dirty())
152 out << " dirty=" << dir.get_num_dirty();
153
154 if (dir.get_num_ref()) {
155 out << " |";
156 dir.print_pin_set(out);
157 }
158
159 out << " " << &dir;
160 return out << "]";
161 }
162
163
164 void CDir::print(ostream& out)
165 {
166 out << *this;
167 }
168
169
170
171
172 ostream& CDir::print_db_line_prefix(ostream& out)
173 {
174 return out << ceph_clock_now() << " mds." << cache->mds->get_nodeid() << ".cache.dir(" << this->dirfrag() << ") ";
175 }
176
177
178
179 // -------------------------------------------------------------------
180 // CDir
181
182 CDir::CDir(CInode *in, frag_t fg, MDCache *mdcache, bool auth) :
183 cache(mdcache), inode(in), frag(fg),
184 first(2),
185 dirty_rstat_inodes(member_offset(CInode, dirty_rstat_item)),
186 projected_version(0), item_dirty(this), item_new(this),
187 num_head_items(0), num_head_null(0),
188 num_snap_items(0), num_snap_null(0),
189 num_dirty(0), committing_version(0), committed_version(0),
190 dir_auth_pins(0), request_pins(0),
191 dir_rep(REP_NONE),
192 pop_me(ceph_clock_now()),
193 pop_nested(ceph_clock_now()),
194 pop_auth_subtree(ceph_clock_now()),
195 pop_auth_subtree_nested(ceph_clock_now()),
196 num_dentries_nested(0), num_dentries_auth_subtree(0),
197 num_dentries_auth_subtree_nested(0),
198 dir_auth(CDIR_AUTH_DEFAULT)
199 {
200 state = STATE_INITIAL;
201
202 memset(&fnode, 0, sizeof(fnode));
203
204 // auth
205 assert(in->is_dir());
206 if (auth)
207 state |= STATE_AUTH;
208 }
209
210 /**
211 * Check the recursive statistics on size for consistency.
212 * If mds_debug_scatterstat is enabled, assert for correctness,
213 * otherwise just print out the mismatch and continue.
214 */
215 bool CDir::check_rstats(bool scrub)
216 {
217 if (!g_conf->mds_debug_scatterstat && !scrub)
218 return true;
219
220 dout(25) << "check_rstats on " << this << dendl;
221 if (!is_complete() || !is_auth() || is_frozen()) {
222 assert(!scrub);
223 dout(10) << "check_rstats bailing out -- incomplete or non-auth or frozen dir!" << dendl;
224 return true;
225 }
226
227 frag_info_t frag_info;
228 nest_info_t nest_info;
229 for (map_t::iterator i = items.begin(); i != items.end(); ++i) {
230 if (i->second->last != CEPH_NOSNAP)
231 continue;
232 CDentry::linkage_t *dnl = i->second->get_linkage();
233 if (dnl->is_primary()) {
234 CInode *in = dnl->get_inode();
235 nest_info.add(in->inode.accounted_rstat);
236 if (in->is_dir())
237 frag_info.nsubdirs++;
238 else
239 frag_info.nfiles++;
240 } else if (dnl->is_remote())
241 frag_info.nfiles++;
242 }
243
244 bool good = true;
245 // fragstat
246 if(!frag_info.same_sums(fnode.fragstat)) {
247 dout(1) << "mismatch between head items and fnode.fragstat! printing dentries" << dendl;
248 dout(1) << "get_num_head_items() = " << get_num_head_items()
249 << "; fnode.fragstat.nfiles=" << fnode.fragstat.nfiles
250 << " fnode.fragstat.nsubdirs=" << fnode.fragstat.nsubdirs << dendl;
251 good = false;
252 } else {
253 dout(20) << "get_num_head_items() = " << get_num_head_items()
254 << "; fnode.fragstat.nfiles=" << fnode.fragstat.nfiles
255 << " fnode.fragstat.nsubdirs=" << fnode.fragstat.nsubdirs << dendl;
256 }
257
258 // rstat
259 if (!nest_info.same_sums(fnode.rstat)) {
260 dout(1) << "mismatch between child accounted_rstats and my rstats!" << dendl;
261 dout(1) << "total of child dentrys: " << nest_info << dendl;
262 dout(1) << "my rstats: " << fnode.rstat << dendl;
263 good = false;
264 } else {
265 dout(20) << "total of child dentrys: " << nest_info << dendl;
266 dout(20) << "my rstats: " << fnode.rstat << dendl;
267 }
268
269 if (!good) {
270 if (!scrub) {
271 for (map_t::iterator i = items.begin(); i != items.end(); ++i) {
272 CDentry *dn = i->second;
273 if (dn->get_linkage()->is_primary()) {
274 CInode *in = dn->get_linkage()->inode;
275 dout(1) << *dn << " rstat " << in->inode.accounted_rstat << dendl;
276 } else {
277 dout(1) << *dn << dendl;
278 }
279 }
280
281 assert(frag_info.nfiles == fnode.fragstat.nfiles);
282 assert(frag_info.nsubdirs == fnode.fragstat.nsubdirs);
283 assert(nest_info.rbytes == fnode.rstat.rbytes);
284 assert(nest_info.rfiles == fnode.rstat.rfiles);
285 assert(nest_info.rsubdirs == fnode.rstat.rsubdirs);
286 }
287 }
288 dout(10) << "check_rstats complete on " << this << dendl;
289 return good;
290 }
291
292 CDentry *CDir::lookup(const string& name, snapid_t snap)
293 {
294 dout(20) << "lookup (" << snap << ", '" << name << "')" << dendl;
295 map_t::iterator iter = items.lower_bound(dentry_key_t(snap, name.c_str(),
296 inode->hash_dentry_name(name)));
297 if (iter == items.end())
298 return 0;
299 if (iter->second->name == name &&
300 iter->second->first <= snap &&
301 iter->second->last >= snap) {
302 dout(20) << " hit -> " << iter->first << dendl;
303 return iter->second;
304 }
305 dout(20) << " miss -> " << iter->first << dendl;
306 return 0;
307 }
308
309 CDentry *CDir::lookup_exact_snap(const string& name, snapid_t last) {
310 map_t::iterator p = items.find(dentry_key_t(last, name.c_str(),
311 inode->hash_dentry_name(name)));
312 if (p == items.end())
313 return NULL;
314 return p->second;
315 }
316
317 /***
318 * linking fun
319 */
320
321 CDentry* CDir::add_null_dentry(const string& dname,
322 snapid_t first, snapid_t last)
323 {
324 // foreign
325 assert(lookup_exact_snap(dname, last) == 0);
326
327 // create dentry
328 CDentry* dn = new CDentry(dname, inode->hash_dentry_name(dname), first, last);
329 if (is_auth())
330 dn->state_set(CDentry::STATE_AUTH);
331
332 cache->bottom_lru.lru_insert_mid(dn);
333 dn->state_set(CDentry::STATE_BOTTOMLRU);
334
335 dn->dir = this;
336 dn->version = get_projected_version();
337
338 // add to dir
339 assert(items.count(dn->key()) == 0);
340 //assert(null_items.count(dn->name) == 0);
341
342 items[dn->key()] = dn;
343 if (last == CEPH_NOSNAP)
344 num_head_null++;
345 else
346 num_snap_null++;
347
348 if (state_test(CDir::STATE_DNPINNEDFRAG)) {
349 dn->get(CDentry::PIN_FRAGMENTING);
350 dn->state_set(CDentry::STATE_FRAGMENTING);
351 }
352
353 dout(12) << "add_null_dentry " << *dn << dendl;
354
355 // pin?
356 if (get_num_any() == 1)
357 get(PIN_CHILD);
358
359 assert(get_num_any() == items.size());
360 return dn;
361 }
362
363
364 CDentry* CDir::add_primary_dentry(const string& dname, CInode *in,
365 snapid_t first, snapid_t last)
366 {
367 // primary
368 assert(lookup_exact_snap(dname, last) == 0);
369
370 // create dentry
371 CDentry* dn = new CDentry(dname, inode->hash_dentry_name(dname), first, last);
372 if (is_auth())
373 dn->state_set(CDentry::STATE_AUTH);
374 if (is_auth() || !inode->is_stray()) {
375 cache->lru.lru_insert_mid(dn);
376 } else {
377 cache->bottom_lru.lru_insert_mid(dn);
378 dn->state_set(CDentry::STATE_BOTTOMLRU);
379 }
380
381 dn->dir = this;
382 dn->version = get_projected_version();
383
384 // add to dir
385 assert(items.count(dn->key()) == 0);
386 //assert(null_items.count(dn->name) == 0);
387
388 items[dn->key()] = dn;
389
390 dn->get_linkage()->inode = in;
391 in->set_primary_parent(dn);
392
393 link_inode_work(dn, in);
394
395 if (dn->last == CEPH_NOSNAP)
396 num_head_items++;
397 else
398 num_snap_items++;
399
400 if (state_test(CDir::STATE_DNPINNEDFRAG)) {
401 dn->get(CDentry::PIN_FRAGMENTING);
402 dn->state_set(CDentry::STATE_FRAGMENTING);
403 }
404
405 dout(12) << "add_primary_dentry " << *dn << dendl;
406
407 // pin?
408 if (get_num_any() == 1)
409 get(PIN_CHILD);
410 assert(get_num_any() == items.size());
411 return dn;
412 }
413
414 CDentry* CDir::add_remote_dentry(const string& dname, inodeno_t ino, unsigned char d_type,
415 snapid_t first, snapid_t last)
416 {
417 // foreign
418 assert(lookup_exact_snap(dname, last) == 0);
419
420 // create dentry
421 CDentry* dn = new CDentry(dname, inode->hash_dentry_name(dname), ino, d_type, first, last);
422 if (is_auth())
423 dn->state_set(CDentry::STATE_AUTH);
424 cache->lru.lru_insert_mid(dn);
425
426 dn->dir = this;
427 dn->version = get_projected_version();
428
429 // add to dir
430 assert(items.count(dn->key()) == 0);
431 //assert(null_items.count(dn->name) == 0);
432
433 items[dn->key()] = dn;
434 if (last == CEPH_NOSNAP)
435 num_head_items++;
436 else
437 num_snap_items++;
438
439 if (state_test(CDir::STATE_DNPINNEDFRAG)) {
440 dn->get(CDentry::PIN_FRAGMENTING);
441 dn->state_set(CDentry::STATE_FRAGMENTING);
442 }
443
444 dout(12) << "add_remote_dentry " << *dn << dendl;
445
446 // pin?
447 if (get_num_any() == 1)
448 get(PIN_CHILD);
449
450 assert(get_num_any() == items.size());
451 return dn;
452 }
453
454
455
456 void CDir::remove_dentry(CDentry *dn)
457 {
458 dout(12) << "remove_dentry " << *dn << dendl;
459
460 // there should be no client leases at this point!
461 assert(dn->client_lease_map.empty());
462
463 if (state_test(CDir::STATE_DNPINNEDFRAG)) {
464 dn->put(CDentry::PIN_FRAGMENTING);
465 dn->state_clear(CDentry::STATE_FRAGMENTING);
466 }
467
468 if (dn->get_linkage()->is_null()) {
469 if (dn->last == CEPH_NOSNAP)
470 num_head_null--;
471 else
472 num_snap_null--;
473 } else {
474 if (dn->last == CEPH_NOSNAP)
475 num_head_items--;
476 else
477 num_snap_items--;
478 }
479
480 if (!dn->get_linkage()->is_null())
481 // detach inode and dentry
482 unlink_inode_work(dn);
483
484 // remove from list
485 assert(items.count(dn->key()) == 1);
486 items.erase(dn->key());
487
488 // clean?
489 if (dn->is_dirty())
490 dn->mark_clean();
491
492 if (dn->state_test(CDentry::STATE_BOTTOMLRU))
493 cache->bottom_lru.lru_remove(dn);
494 else
495 cache->lru.lru_remove(dn);
496 delete dn;
497
498 // unpin?
499 if (get_num_any() == 0)
500 put(PIN_CHILD);
501 assert(get_num_any() == items.size());
502 }
503
504 void CDir::link_remote_inode(CDentry *dn, CInode *in)
505 {
506 link_remote_inode(dn, in->ino(), IFTODT(in->get_projected_inode()->mode));
507 }
508
509 void CDir::link_remote_inode(CDentry *dn, inodeno_t ino, unsigned char d_type)
510 {
511 dout(12) << "link_remote_inode " << *dn << " remote " << ino << dendl;
512 assert(dn->get_linkage()->is_null());
513
514 dn->get_linkage()->set_remote(ino, d_type);
515
516 if (dn->state_test(CDentry::STATE_BOTTOMLRU)) {
517 cache->bottom_lru.lru_remove(dn);
518 cache->lru.lru_insert_mid(dn);
519 dn->state_clear(CDentry::STATE_BOTTOMLRU);
520 }
521
522 if (dn->last == CEPH_NOSNAP) {
523 num_head_items++;
524 num_head_null--;
525 } else {
526 num_snap_items++;
527 num_snap_null--;
528 }
529 assert(get_num_any() == items.size());
530 }
531
532 void CDir::link_primary_inode(CDentry *dn, CInode *in)
533 {
534 dout(12) << "link_primary_inode " << *dn << " " << *in << dendl;
535 assert(dn->get_linkage()->is_null());
536
537 dn->get_linkage()->inode = in;
538 in->set_primary_parent(dn);
539
540 link_inode_work(dn, in);
541
542 if (dn->state_test(CDentry::STATE_BOTTOMLRU) &&
543 (is_auth() || !inode->is_stray())) {
544 cache->bottom_lru.lru_remove(dn);
545 cache->lru.lru_insert_mid(dn);
546 dn->state_clear(CDentry::STATE_BOTTOMLRU);
547 }
548
549 if (dn->last == CEPH_NOSNAP) {
550 num_head_items++;
551 num_head_null--;
552 } else {
553 num_snap_items++;
554 num_snap_null--;
555 }
556
557 assert(get_num_any() == items.size());
558 }
559
560 void CDir::link_inode_work( CDentry *dn, CInode *in)
561 {
562 assert(dn->get_linkage()->get_inode() == in);
563 assert(in->get_parent_dn() == dn);
564
565 // set inode version
566 //in->inode.version = dn->get_version();
567
568 // pin dentry?
569 if (in->get_num_ref())
570 dn->get(CDentry::PIN_INODEPIN);
571
572 // adjust auth pin count
573 if (in->auth_pins + in->nested_auth_pins)
574 dn->adjust_nested_auth_pins(in->auth_pins + in->nested_auth_pins, in->auth_pins, NULL);
575
576 // verify open snaprealm parent
577 if (in->snaprealm)
578 in->snaprealm->adjust_parent();
579 else if (in->is_any_caps())
580 in->move_to_realm(inode->find_snaprealm());
581 }
582
583 void CDir::unlink_inode(CDentry *dn, bool adjust_lru)
584 {
585 if (dn->get_linkage()->is_primary()) {
586 dout(12) << "unlink_inode " << *dn << " " << *dn->get_linkage()->get_inode() << dendl;
587 } else {
588 dout(12) << "unlink_inode " << *dn << dendl;
589 }
590
591 unlink_inode_work(dn);
592
593 if (adjust_lru && !dn->state_test(CDentry::STATE_BOTTOMLRU)) {
594 cache->lru.lru_remove(dn);
595 cache->bottom_lru.lru_insert_mid(dn);
596 dn->state_set(CDentry::STATE_BOTTOMLRU);
597 }
598
599 if (dn->last == CEPH_NOSNAP) {
600 num_head_items--;
601 num_head_null++;
602 } else {
603 num_snap_items--;
604 num_snap_null++;
605 }
606 assert(get_num_any() == items.size());
607 }
608
609
610 void CDir::try_remove_unlinked_dn(CDentry *dn)
611 {
612 assert(dn->dir == this);
613 assert(dn->get_linkage()->is_null());
614
615 // no pins (besides dirty)?
616 if (dn->get_num_ref() != dn->is_dirty())
617 return;
618
619 // was the dn new?
620 if (dn->is_new()) {
621 dout(10) << "try_remove_unlinked_dn " << *dn << " in " << *this << dendl;
622 if (dn->is_dirty())
623 dn->mark_clean();
624 remove_dentry(dn);
625
626 // NOTE: we may not have any more dirty dentries, but the fnode
627 // still changed, so the directory must remain dirty.
628 }
629 }
630
631
632 void CDir::unlink_inode_work( CDentry *dn )
633 {
634 CInode *in = dn->get_linkage()->get_inode();
635
636 if (dn->get_linkage()->is_remote()) {
637 // remote
638 if (in)
639 dn->unlink_remote(dn->get_linkage());
640
641 dn->get_linkage()->set_remote(0, 0);
642 } else if (dn->get_linkage()->is_primary()) {
643 // primary
644 // unpin dentry?
645 if (in->get_num_ref())
646 dn->put(CDentry::PIN_INODEPIN);
647
648 // unlink auth_pin count
649 if (in->auth_pins + in->nested_auth_pins)
650 dn->adjust_nested_auth_pins(0 - (in->auth_pins + in->nested_auth_pins), 0 - in->auth_pins, NULL);
651
652 // detach inode
653 in->remove_primary_parent(dn);
654 dn->get_linkage()->inode = 0;
655 } else {
656 assert(!dn->get_linkage()->is_null());
657 }
658 }
659
660 void CDir::add_to_bloom(CDentry *dn)
661 {
662 assert(dn->last == CEPH_NOSNAP);
663 if (!bloom) {
664 /* not create bloom filter for incomplete dir that was added by log replay */
665 if (!is_complete())
666 return;
667
668 /* don't maintain bloom filters in standby replay (saves cycles, and also
669 * avoids need to implement clearing it in EExport for #16924) */
670 if (cache->mds->is_standby_replay()) {
671 return;
672 }
673
674 unsigned size = get_num_head_items() + get_num_snap_items();
675 if (size < 100) size = 100;
676 bloom.reset(new bloom_filter(size, 1.0 / size, 0));
677 }
678 /* This size and false positive probability is completely random.*/
679 bloom->insert(dn->name.c_str(), dn->name.size());
680 }
681
682 bool CDir::is_in_bloom(const string& name)
683 {
684 if (!bloom)
685 return false;
686 return bloom->contains(name.c_str(), name.size());
687 }
688
689 void CDir::remove_null_dentries() {
690 dout(12) << "remove_null_dentries " << *this << dendl;
691
692 CDir::map_t::iterator p = items.begin();
693 while (p != items.end()) {
694 CDentry *dn = p->second;
695 ++p;
696 if (dn->get_linkage()->is_null() && !dn->is_projected())
697 remove_dentry(dn);
698 }
699
700 assert(num_snap_null == 0);
701 assert(num_head_null == 0);
702 assert(get_num_any() == items.size());
703 }
704
705 /** remove dirty null dentries for deleted directory. the dirfrag will be
706 * deleted soon, so it's safe to not commit dirty dentries.
707 *
708 * This is called when a directory is being deleted, a prerequisite
709 * of which is that its children have been unlinked: we expect to only see
710 * null, unprojected dentries here.
711 */
712 void CDir::try_remove_dentries_for_stray()
713 {
714 dout(10) << __func__ << dendl;
715 assert(get_parent_dir()->inode->is_stray());
716
717 // clear dirty only when the directory was not snapshotted
718 bool clear_dirty = !inode->snaprealm;
719
720 CDir::map_t::iterator p = items.begin();
721 while (p != items.end()) {
722 CDentry *dn = p->second;
723 ++p;
724 if (dn->last == CEPH_NOSNAP) {
725 assert(!dn->is_projected());
726 assert(dn->get_linkage()->is_null());
727 if (clear_dirty && dn->is_dirty())
728 dn->mark_clean();
729 // It's OK to remove lease prematurely because we will never link
730 // the dentry to inode again.
731 if (dn->is_any_leases())
732 dn->remove_client_leases(cache->mds->locker);
733 if (dn->get_num_ref() == 0)
734 remove_dentry(dn);
735 } else {
736 assert(!dn->is_projected());
737 CDentry::linkage_t *dnl= dn->get_linkage();
738 CInode *in = NULL;
739 if (dnl->is_primary()) {
740 in = dnl->get_inode();
741 if (clear_dirty && in->is_dirty())
742 in->mark_clean();
743 }
744 if (clear_dirty && dn->is_dirty())
745 dn->mark_clean();
746 if (dn->get_num_ref() == 0) {
747 remove_dentry(dn);
748 if (in)
749 cache->remove_inode(in);
750 }
751 }
752 }
753
754 if (clear_dirty && is_dirty())
755 mark_clean();
756 }
757
758 bool CDir::try_trim_snap_dentry(CDentry *dn, const set<snapid_t>& snaps)
759 {
760 assert(dn->last != CEPH_NOSNAP);
761 set<snapid_t>::const_iterator p = snaps.lower_bound(dn->first);
762 CDentry::linkage_t *dnl= dn->get_linkage();
763 CInode *in = 0;
764 if (dnl->is_primary())
765 in = dnl->get_inode();
766 if ((p == snaps.end() || *p > dn->last) &&
767 (dn->get_num_ref() == dn->is_dirty()) &&
768 (!in || in->get_num_ref() == in->is_dirty())) {
769 dout(10) << " purging snapped " << *dn << dendl;
770 if (in && in->is_dirty())
771 in->mark_clean();
772 remove_dentry(dn);
773 if (in) {
774 dout(10) << " purging snapped " << *in << dendl;
775 cache->remove_inode(in);
776 }
777 return true;
778 }
779 return false;
780 }
781
782
783 void CDir::purge_stale_snap_data(const set<snapid_t>& snaps)
784 {
785 dout(10) << "purge_stale_snap_data " << snaps << dendl;
786
787 CDir::map_t::iterator p = items.begin();
788 while (p != items.end()) {
789 CDentry *dn = p->second;
790 ++p;
791
792 if (dn->last == CEPH_NOSNAP)
793 continue;
794
795 try_trim_snap_dentry(dn, snaps);
796 }
797 }
798
799
800 /**
801 * steal_dentry -- semi-violently move a dentry from one CDir to another
802 * (*) violently, in that nitems, most pins, etc. are not correctly maintained
803 * on the old CDir corpse; must call finish_old_fragment() when finished.
804 */
805 void CDir::steal_dentry(CDentry *dn)
806 {
807 dout(15) << "steal_dentry " << *dn << dendl;
808
809 items[dn->key()] = dn;
810
811 dn->dir->items.erase(dn->key());
812 if (dn->dir->items.empty())
813 dn->dir->put(PIN_CHILD);
814
815 if (get_num_any() == 0)
816 get(PIN_CHILD);
817 if (dn->get_linkage()->is_null()) {
818 if (dn->last == CEPH_NOSNAP)
819 num_head_null++;
820 else
821 num_snap_null++;
822 } else if (dn->last == CEPH_NOSNAP) {
823 num_head_items++;
824
825 if (dn->get_linkage()->is_primary()) {
826 CInode *in = dn->get_linkage()->get_inode();
827 inode_t *pi = in->get_projected_inode();
828 if (dn->get_linkage()->get_inode()->is_dir())
829 fnode.fragstat.nsubdirs++;
830 else
831 fnode.fragstat.nfiles++;
832 fnode.rstat.rbytes += pi->accounted_rstat.rbytes;
833 fnode.rstat.rfiles += pi->accounted_rstat.rfiles;
834 fnode.rstat.rsubdirs += pi->accounted_rstat.rsubdirs;
835 fnode.rstat.rsnaprealms += pi->accounted_rstat.rsnaprealms;
836 if (pi->accounted_rstat.rctime > fnode.rstat.rctime)
837 fnode.rstat.rctime = pi->accounted_rstat.rctime;
838
839 // move dirty inode rstat to new dirfrag
840 if (in->is_dirty_rstat())
841 dirty_rstat_inodes.push_back(&in->dirty_rstat_item);
842 } else if (dn->get_linkage()->is_remote()) {
843 if (dn->get_linkage()->get_remote_d_type() == DT_DIR)
844 fnode.fragstat.nsubdirs++;
845 else
846 fnode.fragstat.nfiles++;
847 }
848 } else {
849 num_snap_items++;
850 if (dn->get_linkage()->is_primary()) {
851 CInode *in = dn->get_linkage()->get_inode();
852 if (in->is_dirty_rstat())
853 dirty_rstat_inodes.push_back(&in->dirty_rstat_item);
854 }
855 }
856
857 if (dn->auth_pins || dn->nested_auth_pins) {
858 // use the helpers here to maintain the auth_pin invariants on the dir inode
859 int ap = dn->get_num_auth_pins() + dn->get_num_nested_auth_pins();
860 int dap = dn->get_num_dir_auth_pins();
861 assert(dap <= ap);
862 adjust_nested_auth_pins(ap, dap, NULL);
863 dn->dir->adjust_nested_auth_pins(-ap, -dap, NULL);
864 }
865
866 if (dn->is_dirty())
867 num_dirty++;
868
869 dn->dir = this;
870 }
871
872 void CDir::prepare_old_fragment(map<string_snap_t, std::list<MDSInternalContextBase*> >& dentry_waiters, bool replay)
873 {
874 // auth_pin old fragment for duration so that any auth_pinning
875 // during the dentry migration doesn't trigger side effects
876 if (!replay && is_auth())
877 auth_pin(this);
878
879 if (!waiting_on_dentry.empty()) {
880 for (auto p = waiting_on_dentry.begin(); p != waiting_on_dentry.end(); ++p)
881 dentry_waiters[p->first].swap(p->second);
882 waiting_on_dentry.clear();
883 put(PIN_DNWAITER);
884 }
885 }
886
887 void CDir::prepare_new_fragment(bool replay)
888 {
889 if (!replay && is_auth()) {
890 _freeze_dir();
891 mark_complete();
892 }
893 inode->add_dirfrag(this);
894 }
895
896 void CDir::finish_old_fragment(list<MDSInternalContextBase*>& waiters, bool replay)
897 {
898 // take waiters _before_ unfreeze...
899 if (!replay) {
900 take_waiting(WAIT_ANY_MASK, waiters);
901 if (is_auth()) {
902 auth_unpin(this); // pinned in prepare_old_fragment
903 assert(is_frozen_dir());
904 unfreeze_dir();
905 }
906 }
907
908 assert(nested_auth_pins == 0);
909 assert(dir_auth_pins == 0);
910 assert(auth_pins == 0);
911
912 num_head_items = num_head_null = 0;
913 num_snap_items = num_snap_null = 0;
914
915 // this mirrors init_fragment_pins()
916 if (is_auth())
917 clear_replica_map();
918 if (is_dirty())
919 mark_clean();
920 if (state_test(STATE_IMPORTBOUND))
921 put(PIN_IMPORTBOUND);
922 if (state_test(STATE_EXPORTBOUND))
923 put(PIN_EXPORTBOUND);
924 if (is_subtree_root())
925 put(PIN_SUBTREE);
926
927 if (auth_pins > 0)
928 put(PIN_AUTHPIN);
929
930 assert(get_num_ref() == (state_test(STATE_STICKY) ? 1:0));
931 }
932
933 void CDir::init_fragment_pins()
934 {
935 if (!replica_map.empty())
936 get(PIN_REPLICATED);
937 if (state_test(STATE_DIRTY))
938 get(PIN_DIRTY);
939 if (state_test(STATE_EXPORTBOUND))
940 get(PIN_EXPORTBOUND);
941 if (state_test(STATE_IMPORTBOUND))
942 get(PIN_IMPORTBOUND);
943 if (is_subtree_root())
944 get(PIN_SUBTREE);
945 }
946
947 void CDir::split(int bits, list<CDir*>& subs, list<MDSInternalContextBase*>& waiters, bool replay)
948 {
949 dout(10) << "split by " << bits << " bits on " << *this << dendl;
950
951 assert(replay || is_complete() || !is_auth());
952
953 list<frag_t> frags;
954 frag.split(bits, frags);
955
956 vector<CDir*> subfrags(1 << bits);
957
958 double fac = 1.0 / (double)(1 << bits); // for scaling load vecs
959
960 version_t rstat_version = inode->get_projected_inode()->rstat.version;
961 version_t dirstat_version = inode->get_projected_inode()->dirstat.version;
962
963 nest_info_t rstatdiff;
964 frag_info_t fragstatdiff;
965 if (fnode.accounted_rstat.version == rstat_version)
966 rstatdiff.add_delta(fnode.accounted_rstat, fnode.rstat);
967 if (fnode.accounted_fragstat.version == dirstat_version)
968 fragstatdiff.add_delta(fnode.accounted_fragstat, fnode.fragstat);
969 dout(10) << " rstatdiff " << rstatdiff << " fragstatdiff " << fragstatdiff << dendl;
970
971 map<string_snap_t, std::list<MDSInternalContextBase*> > dentry_waiters;
972 prepare_old_fragment(dentry_waiters, replay);
973
974 // create subfrag dirs
975 int n = 0;
976 for (list<frag_t>::iterator p = frags.begin(); p != frags.end(); ++p) {
977 CDir *f = new CDir(inode, *p, cache, is_auth());
978 f->state_set(state & (MASK_STATE_FRAGMENT_KEPT | STATE_COMPLETE));
979 f->replica_map = replica_map;
980 f->dir_auth = dir_auth;
981 f->init_fragment_pins();
982 f->set_version(get_version());
983
984 f->pop_me = pop_me;
985 f->pop_me.scale(fac);
986
987 // FIXME; this is an approximation
988 f->pop_nested = pop_nested;
989 f->pop_nested.scale(fac);
990 f->pop_auth_subtree = pop_auth_subtree;
991 f->pop_auth_subtree.scale(fac);
992 f->pop_auth_subtree_nested = pop_auth_subtree_nested;
993 f->pop_auth_subtree_nested.scale(fac);
994
995 dout(10) << " subfrag " << *p << " " << *f << dendl;
996 subfrags[n++] = f;
997 subs.push_back(f);
998
999 f->set_dir_auth(get_dir_auth());
1000 f->prepare_new_fragment(replay);
1001 }
1002
1003 // repartition dentries
1004 while (!items.empty()) {
1005 CDir::map_t::iterator p = items.begin();
1006
1007 CDentry *dn = p->second;
1008 frag_t subfrag = inode->pick_dirfrag(dn->name);
1009 int n = (subfrag.value() & (subfrag.mask() ^ frag.mask())) >> subfrag.mask_shift();
1010 dout(15) << " subfrag " << subfrag << " n=" << n << " for " << p->first << dendl;
1011 CDir *f = subfrags[n];
1012 f->steal_dentry(dn);
1013 }
1014
1015 for (auto& p : dentry_waiters) {
1016 frag_t subfrag = inode->pick_dirfrag(p.first.name);
1017 int n = (subfrag.value() & (subfrag.mask() ^ frag.mask())) >> subfrag.mask_shift();
1018 CDir *f = subfrags[n];
1019
1020 if (f->waiting_on_dentry.empty())
1021 f->get(PIN_DNWAITER);
1022 f->waiting_on_dentry[p.first].swap(p.second);
1023 }
1024
1025 // FIXME: handle dirty old rstat
1026
1027 // fix up new frag fragstats
1028 for (int i=0; i<n; i++) {
1029 CDir *f = subfrags[i];
1030 f->fnode.rstat.version = rstat_version;
1031 f->fnode.accounted_rstat = f->fnode.rstat;
1032 f->fnode.fragstat.version = dirstat_version;
1033 f->fnode.accounted_fragstat = f->fnode.fragstat;
1034 dout(10) << " rstat " << f->fnode.rstat << " fragstat " << f->fnode.fragstat
1035 << " on " << *f << dendl;
1036 }
1037
1038 // give any outstanding frag stat differential to first frag
1039 dout(10) << " giving rstatdiff " << rstatdiff << " fragstatdiff" << fragstatdiff
1040 << " to " << *subfrags[0] << dendl;
1041 subfrags[0]->fnode.accounted_rstat.add(rstatdiff);
1042 subfrags[0]->fnode.accounted_fragstat.add(fragstatdiff);
1043
1044 finish_old_fragment(waiters, replay);
1045 }
1046
1047 void CDir::merge(list<CDir*>& subs, list<MDSInternalContextBase*>& waiters, bool replay)
1048 {
1049 dout(10) << "merge " << subs << dendl;
1050
1051 mds_authority_t new_auth = CDIR_AUTH_DEFAULT;
1052 for (auto dir : subs) {
1053 if (dir->get_dir_auth() != CDIR_AUTH_DEFAULT &&
1054 dir->get_dir_auth() != new_auth) {
1055 assert(new_auth == CDIR_AUTH_DEFAULT);
1056 new_auth = dir->get_dir_auth();
1057 }
1058 }
1059
1060 set_dir_auth(new_auth);
1061 prepare_new_fragment(replay);
1062
1063 nest_info_t rstatdiff;
1064 frag_info_t fragstatdiff;
1065 bool touched_mtime, touched_chattr;
1066 version_t rstat_version = inode->get_projected_inode()->rstat.version;
1067 version_t dirstat_version = inode->get_projected_inode()->dirstat.version;
1068
1069 map<string_snap_t, std::list<MDSInternalContextBase*> > dentry_waiters;
1070
1071 for (auto dir : subs) {
1072 dout(10) << " subfrag " << dir->get_frag() << " " << *dir << dendl;
1073 assert(!dir->is_auth() || dir->is_complete() || replay);
1074
1075 if (dir->fnode.accounted_rstat.version == rstat_version)
1076 rstatdiff.add_delta(dir->fnode.accounted_rstat, dir->fnode.rstat);
1077 if (dir->fnode.accounted_fragstat.version == dirstat_version)
1078 fragstatdiff.add_delta(dir->fnode.accounted_fragstat, dir->fnode.fragstat,
1079 &touched_mtime, &touched_chattr);
1080
1081 dir->prepare_old_fragment(dentry_waiters, replay);
1082
1083 // steal dentries
1084 while (!dir->items.empty())
1085 steal_dentry(dir->items.begin()->second);
1086
1087 // merge replica map
1088 for (compact_map<mds_rank_t,unsigned>::iterator p = dir->replicas_begin();
1089 p != dir->replicas_end();
1090 ++p) {
1091 unsigned cur = replica_map[p->first];
1092 if (p->second > cur)
1093 replica_map[p->first] = p->second;
1094 }
1095
1096 // merge version
1097 if (dir->get_version() > get_version())
1098 set_version(dir->get_version());
1099
1100 // merge state
1101 state_set(dir->get_state() & MASK_STATE_FRAGMENT_KEPT);
1102
1103 dir->finish_old_fragment(waiters, replay);
1104 inode->close_dirfrag(dir->get_frag());
1105 }
1106
1107 if (!dentry_waiters.empty()) {
1108 get(PIN_DNWAITER);
1109 for (auto& p : dentry_waiters) {
1110 waiting_on_dentry[p.first].swap(p.second);
1111 }
1112 }
1113
1114 if (is_auth() && !replay)
1115 mark_complete();
1116
1117 // FIXME: merge dirty old rstat
1118 fnode.rstat.version = rstat_version;
1119 fnode.accounted_rstat = fnode.rstat;
1120 fnode.accounted_rstat.add(rstatdiff);
1121
1122 fnode.fragstat.version = dirstat_version;
1123 fnode.accounted_fragstat = fnode.fragstat;
1124 fnode.accounted_fragstat.add(fragstatdiff);
1125
1126 init_fragment_pins();
1127 }
1128
1129
1130
1131
1132 void CDir::resync_accounted_fragstat()
1133 {
1134 fnode_t *pf = get_projected_fnode();
1135 inode_t *pi = inode->get_projected_inode();
1136
1137 if (pf->accounted_fragstat.version != pi->dirstat.version) {
1138 pf->fragstat.version = pi->dirstat.version;
1139 dout(10) << "resync_accounted_fragstat " << pf->accounted_fragstat << " -> " << pf->fragstat << dendl;
1140 pf->accounted_fragstat = pf->fragstat;
1141 }
1142 }
1143
1144 /*
1145 * resync rstat and accounted_rstat with inode
1146 */
1147 void CDir::resync_accounted_rstat()
1148 {
1149 fnode_t *pf = get_projected_fnode();
1150 inode_t *pi = inode->get_projected_inode();
1151
1152 if (pf->accounted_rstat.version != pi->rstat.version) {
1153 pf->rstat.version = pi->rstat.version;
1154 dout(10) << "resync_accounted_rstat " << pf->accounted_rstat << " -> " << pf->rstat << dendl;
1155 pf->accounted_rstat = pf->rstat;
1156 dirty_old_rstat.clear();
1157 }
1158 }
1159
1160 void CDir::assimilate_dirty_rstat_inodes()
1161 {
1162 dout(10) << "assimilate_dirty_rstat_inodes" << dendl;
1163 for (elist<CInode*>::iterator p = dirty_rstat_inodes.begin_use_current();
1164 !p.end(); ++p) {
1165 CInode *in = *p;
1166 assert(in->is_auth());
1167 if (in->is_frozen())
1168 continue;
1169
1170 inode_t *pi = in->project_inode();
1171 pi->version = in->pre_dirty();
1172
1173 inode->mdcache->project_rstat_inode_to_frag(in, this, 0, 0, NULL);
1174 }
1175 state_set(STATE_ASSIMRSTAT);
1176 dout(10) << "assimilate_dirty_rstat_inodes done" << dendl;
1177 }
1178
1179 void CDir::assimilate_dirty_rstat_inodes_finish(MutationRef& mut, EMetaBlob *blob)
1180 {
1181 if (!state_test(STATE_ASSIMRSTAT))
1182 return;
1183 state_clear(STATE_ASSIMRSTAT);
1184 dout(10) << "assimilate_dirty_rstat_inodes_finish" << dendl;
1185 elist<CInode*>::iterator p = dirty_rstat_inodes.begin_use_current();
1186 while (!p.end()) {
1187 CInode *in = *p;
1188 ++p;
1189
1190 if (in->is_frozen())
1191 continue;
1192
1193 CDentry *dn = in->get_projected_parent_dn();
1194
1195 mut->auth_pin(in);
1196 mut->add_projected_inode(in);
1197
1198 in->clear_dirty_rstat();
1199 blob->add_primary_dentry(dn, in, true);
1200 }
1201
1202 if (!dirty_rstat_inodes.empty())
1203 inode->mdcache->mds->locker->mark_updated_scatterlock(&inode->nestlock);
1204 }
1205
1206
1207
1208
1209 /****************************************
1210 * WAITING
1211 */
1212
1213 void CDir::add_dentry_waiter(const string& dname, snapid_t snapid, MDSInternalContextBase *c)
1214 {
1215 if (waiting_on_dentry.empty())
1216 get(PIN_DNWAITER);
1217 waiting_on_dentry[string_snap_t(dname, snapid)].push_back(c);
1218 dout(10) << "add_dentry_waiter dentry " << dname
1219 << " snap " << snapid
1220 << " " << c << " on " << *this << dendl;
1221 }
1222
1223 void CDir::take_dentry_waiting(const string& dname, snapid_t first, snapid_t last,
1224 list<MDSInternalContextBase*>& ls)
1225 {
1226 if (waiting_on_dentry.empty())
1227 return;
1228
1229 string_snap_t lb(dname, first);
1230 string_snap_t ub(dname, last);
1231 compact_map<string_snap_t, list<MDSInternalContextBase*> >::iterator p = waiting_on_dentry.lower_bound(lb);
1232 while (p != waiting_on_dentry.end() &&
1233 !(ub < p->first)) {
1234 dout(10) << "take_dentry_waiting dentry " << dname
1235 << " [" << first << "," << last << "] found waiter on snap "
1236 << p->first.snapid
1237 << " on " << *this << dendl;
1238 ls.splice(ls.end(), p->second);
1239 waiting_on_dentry.erase(p++);
1240 }
1241
1242 if (waiting_on_dentry.empty())
1243 put(PIN_DNWAITER);
1244 }
1245
1246 void CDir::take_sub_waiting(list<MDSInternalContextBase*>& ls)
1247 {
1248 dout(10) << "take_sub_waiting" << dendl;
1249 if (!waiting_on_dentry.empty()) {
1250 for (compact_map<string_snap_t, list<MDSInternalContextBase*> >::iterator p = waiting_on_dentry.begin();
1251 p != waiting_on_dentry.end();
1252 ++p)
1253 ls.splice(ls.end(), p->second);
1254 waiting_on_dentry.clear();
1255 put(PIN_DNWAITER);
1256 }
1257 }
1258
1259
1260
1261 void CDir::add_waiter(uint64_t tag, MDSInternalContextBase *c)
1262 {
1263 // hierarchical?
1264
1265 // at free root?
1266 if (tag & WAIT_ATFREEZEROOT) {
1267 if (!(is_freezing_tree_root() || is_frozen_tree_root() ||
1268 is_freezing_dir() || is_frozen_dir())) {
1269 // try parent
1270 dout(10) << "add_waiter " << std::hex << tag << std::dec << " " << c << " should be ATFREEZEROOT, " << *this << " is not root, trying parent" << dendl;
1271 inode->parent->dir->add_waiter(tag, c);
1272 return;
1273 }
1274 }
1275
1276 // at subtree root?
1277 if (tag & WAIT_ATSUBTREEROOT) {
1278 if (!is_subtree_root()) {
1279 // try parent
1280 dout(10) << "add_waiter " << std::hex << tag << std::dec << " " << c << " should be ATSUBTREEROOT, " << *this << " is not root, trying parent" << dendl;
1281 inode->parent->dir->add_waiter(tag, c);
1282 return;
1283 }
1284 }
1285
1286 assert(!(tag & WAIT_CREATED) || state_test(STATE_CREATING));
1287
1288 MDSCacheObject::add_waiter(tag, c);
1289 }
1290
1291
1292
1293 /* NOTE: this checks dentry waiters too */
1294 void CDir::take_waiting(uint64_t mask, list<MDSInternalContextBase*>& ls)
1295 {
1296 if ((mask & WAIT_DENTRY) && !waiting_on_dentry.empty()) {
1297 // take all dentry waiters
1298 while (!waiting_on_dentry.empty()) {
1299 compact_map<string_snap_t, list<MDSInternalContextBase*> >::iterator p = waiting_on_dentry.begin();
1300 dout(10) << "take_waiting dentry " << p->first.name
1301 << " snap " << p->first.snapid << " on " << *this << dendl;
1302 ls.splice(ls.end(), p->second);
1303 waiting_on_dentry.erase(p);
1304 }
1305 put(PIN_DNWAITER);
1306 }
1307
1308 // waiting
1309 MDSCacheObject::take_waiting(mask, ls);
1310 }
1311
1312
1313 void CDir::finish_waiting(uint64_t mask, int result)
1314 {
1315 dout(11) << "finish_waiting mask " << hex << mask << dec << " result " << result << " on " << *this << dendl;
1316
1317 list<MDSInternalContextBase*> finished;
1318 take_waiting(mask, finished);
1319 if (result < 0)
1320 finish_contexts(g_ceph_context, finished, result);
1321 else
1322 cache->mds->queue_waiters(finished);
1323 }
1324
1325
1326
1327 // dirty/clean
1328
1329 fnode_t *CDir::project_fnode()
1330 {
1331 assert(get_version() != 0);
1332 fnode_t *p = new fnode_t;
1333 *p = *get_projected_fnode();
1334 projected_fnode.push_back(p);
1335
1336 if (scrub_infop && scrub_infop->last_scrub_dirty) {
1337 p->localized_scrub_stamp = scrub_infop->last_local.time;
1338 p->localized_scrub_version = scrub_infop->last_local.version;
1339 p->recursive_scrub_stamp = scrub_infop->last_recursive.time;
1340 p->recursive_scrub_version = scrub_infop->last_recursive.version;
1341 scrub_infop->last_scrub_dirty = false;
1342 scrub_maybe_delete_info();
1343 }
1344
1345 dout(10) << "project_fnode " << p << dendl;
1346 return p;
1347 }
1348
1349 void CDir::pop_and_dirty_projected_fnode(LogSegment *ls)
1350 {
1351 assert(!projected_fnode.empty());
1352 dout(15) << "pop_and_dirty_projected_fnode " << projected_fnode.front()
1353 << " v" << projected_fnode.front()->version << dendl;
1354 fnode = *projected_fnode.front();
1355 _mark_dirty(ls);
1356 delete projected_fnode.front();
1357 projected_fnode.pop_front();
1358 }
1359
1360
1361 version_t CDir::pre_dirty(version_t min)
1362 {
1363 if (min > projected_version)
1364 projected_version = min;
1365 ++projected_version;
1366 dout(10) << "pre_dirty " << projected_version << dendl;
1367 return projected_version;
1368 }
1369
1370 void CDir::mark_dirty(version_t pv, LogSegment *ls)
1371 {
1372 assert(get_version() < pv);
1373 assert(pv <= projected_version);
1374 fnode.version = pv;
1375 _mark_dirty(ls);
1376 }
1377
1378 void CDir::_mark_dirty(LogSegment *ls)
1379 {
1380 if (!state_test(STATE_DIRTY)) {
1381 dout(10) << "mark_dirty (was clean) " << *this << " version " << get_version() << dendl;
1382 _set_dirty_flag();
1383 assert(ls);
1384 } else {
1385 dout(10) << "mark_dirty (already dirty) " << *this << " version " << get_version() << dendl;
1386 }
1387 if (ls) {
1388 ls->dirty_dirfrags.push_back(&item_dirty);
1389
1390 // if i've never committed, i need to be before _any_ mention of me is trimmed from the journal.
1391 if (committed_version == 0 && !item_new.is_on_list())
1392 ls->new_dirfrags.push_back(&item_new);
1393 }
1394 }
1395
1396 void CDir::mark_new(LogSegment *ls)
1397 {
1398 ls->new_dirfrags.push_back(&item_new);
1399 state_clear(STATE_CREATING);
1400
1401 list<MDSInternalContextBase*> waiters;
1402 take_waiting(CDir::WAIT_CREATED, waiters);
1403 cache->mds->queue_waiters(waiters);
1404 }
1405
1406 void CDir::mark_clean()
1407 {
1408 dout(10) << "mark_clean " << *this << " version " << get_version() << dendl;
1409 if (state_test(STATE_DIRTY)) {
1410 item_dirty.remove_myself();
1411 item_new.remove_myself();
1412
1413 state_clear(STATE_DIRTY);
1414 put(PIN_DIRTY);
1415 }
1416 }
1417
1418 // caller should hold auth pin of this
1419 void CDir::log_mark_dirty()
1420 {
1421 if (is_dirty() || is_projected())
1422 return; // noop if it is already dirty or will be dirty
1423
1424 version_t pv = pre_dirty();
1425 mark_dirty(pv, cache->mds->mdlog->get_current_segment());
1426 }
1427
1428 void CDir::mark_complete() {
1429 state_set(STATE_COMPLETE);
1430 bloom.reset();
1431 }
1432
1433 void CDir::first_get()
1434 {
1435 inode->get(CInode::PIN_DIRFRAG);
1436 }
1437
1438 void CDir::last_put()
1439 {
1440 inode->put(CInode::PIN_DIRFRAG);
1441 }
1442
1443
1444
1445 /******************************************************************************
1446 * FETCH and COMMIT
1447 */
1448
1449 // -----------------------
1450 // FETCH
1451 void CDir::fetch(MDSInternalContextBase *c, bool ignore_authpinnability)
1452 {
1453 string want;
1454 return fetch(c, want, ignore_authpinnability);
1455 }
1456
1457 void CDir::fetch(MDSInternalContextBase *c, const string& want_dn, bool ignore_authpinnability)
1458 {
1459 dout(10) << "fetch on " << *this << dendl;
1460
1461 assert(is_auth());
1462 assert(!is_complete());
1463
1464 if (!can_auth_pin() && !ignore_authpinnability) {
1465 if (c) {
1466 dout(7) << "fetch waiting for authpinnable" << dendl;
1467 add_waiter(WAIT_UNFREEZE, c);
1468 } else
1469 dout(7) << "fetch not authpinnable and no context" << dendl;
1470 return;
1471 }
1472
1473 // unlinked directory inode shouldn't have any entry
1474 if (!inode->is_base() && get_parent_dir()->inode->is_stray() &&
1475 !inode->snaprealm) {
1476 dout(7) << "fetch dirfrag for unlinked directory, mark complete" << dendl;
1477 if (get_version() == 0) {
1478 assert(inode->is_auth());
1479 set_version(1);
1480
1481 if (state_test(STATE_REJOINUNDEF)) {
1482 assert(cache->mds->is_rejoin());
1483 state_clear(STATE_REJOINUNDEF);
1484 cache->opened_undef_dirfrag(this);
1485 }
1486 }
1487 mark_complete();
1488
1489 if (c)
1490 cache->mds->queue_waiter(c);
1491 return;
1492 }
1493
1494 if (c) add_waiter(WAIT_COMPLETE, c);
1495 if (!want_dn.empty()) wanted_items.insert(want_dn);
1496
1497 // already fetching?
1498 if (state_test(CDir::STATE_FETCHING)) {
1499 dout(7) << "already fetching; waiting" << dendl;
1500 return;
1501 }
1502
1503 auth_pin(this);
1504 state_set(CDir::STATE_FETCHING);
1505
1506 if (cache->mds->logger) cache->mds->logger->inc(l_mds_dir_fetch);
1507
1508 std::set<dentry_key_t> empty;
1509 _omap_fetch(NULL, empty);
1510 }
1511
1512 void CDir::fetch(MDSInternalContextBase *c, const std::set<dentry_key_t>& keys)
1513 {
1514 dout(10) << "fetch " << keys.size() << " keys on " << *this << dendl;
1515
1516 assert(is_auth());
1517 assert(!is_complete());
1518
1519 if (!can_auth_pin()) {
1520 dout(7) << "fetch keys waiting for authpinnable" << dendl;
1521 add_waiter(WAIT_UNFREEZE, c);
1522 return;
1523 }
1524 if (state_test(CDir::STATE_FETCHING)) {
1525 dout(7) << "fetch keys waiting for full fetch" << dendl;
1526 add_waiter(WAIT_COMPLETE, c);
1527 return;
1528 }
1529
1530 auth_pin(this);
1531 if (cache->mds->logger) cache->mds->logger->inc(l_mds_dir_fetch);
1532
1533 _omap_fetch(c, keys);
1534 }
1535
1536 class C_IO_Dir_OMAP_FetchedMore : public CDirIOContext {
1537 MDSInternalContextBase *fin;
1538 public:
1539 bufferlist hdrbl;
1540 bool more = false;
1541 map<string, bufferlist> omap; ///< carry-over from before
1542 map<string, bufferlist> omap_more; ///< new batch
1543 int ret;
1544 C_IO_Dir_OMAP_FetchedMore(CDir *d, MDSInternalContextBase *f) :
1545 CDirIOContext(d), fin(f), ret(0) { }
1546 void finish(int r) {
1547 // merge results
1548 if (omap.empty()) {
1549 omap.swap(omap_more);
1550 } else {
1551 omap.insert(omap_more.begin(), omap_more.end());
1552 }
1553 if (more) {
1554 dir->_omap_fetch_more(hdrbl, omap, fin);
1555 } else {
1556 dir->_omap_fetched(hdrbl, omap, !fin, r);
1557 if (fin)
1558 fin->complete(r);
1559 }
1560 }
1561 };
1562
1563 class C_IO_Dir_OMAP_Fetched : public CDirIOContext {
1564 MDSInternalContextBase *fin;
1565 public:
1566 bufferlist hdrbl;
1567 bool more = false;
1568 map<string, bufferlist> omap;
1569 bufferlist btbl;
1570 int ret1, ret2, ret3;
1571
1572 C_IO_Dir_OMAP_Fetched(CDir *d, MDSInternalContextBase *f) :
1573 CDirIOContext(d), fin(f), ret1(0), ret2(0), ret3(0) { }
1574 void finish(int r) override {
1575 // check the correctness of backtrace
1576 if (r >= 0 && ret3 != -ECANCELED)
1577 dir->inode->verify_diri_backtrace(btbl, ret3);
1578 if (r >= 0) r = ret1;
1579 if (r >= 0) r = ret2;
1580 if (more) {
1581 dir->_omap_fetch_more(hdrbl, omap, fin);
1582 } else {
1583 dir->_omap_fetched(hdrbl, omap, !fin, r);
1584 if (fin)
1585 fin->complete(r);
1586 }
1587 }
1588 };
1589
1590 void CDir::_omap_fetch(MDSInternalContextBase *c, const std::set<dentry_key_t>& keys)
1591 {
1592 C_IO_Dir_OMAP_Fetched *fin = new C_IO_Dir_OMAP_Fetched(this, c);
1593 object_t oid = get_ondisk_object();
1594 object_locator_t oloc(cache->mds->mdsmap->get_metadata_pool());
1595 ObjectOperation rd;
1596 rd.omap_get_header(&fin->hdrbl, &fin->ret1);
1597 if (keys.empty()) {
1598 assert(!c);
1599 rd.omap_get_vals("", "", g_conf->mds_dir_keys_per_op,
1600 &fin->omap, &fin->more, &fin->ret2);
1601 } else {
1602 assert(c);
1603 std::set<std::string> str_keys;
1604 for (auto p = keys.begin(); p != keys.end(); ++p) {
1605 string str;
1606 p->encode(str);
1607 str_keys.insert(str);
1608 }
1609 rd.omap_get_vals_by_keys(str_keys, &fin->omap, &fin->ret2);
1610 }
1611 // check the correctness of backtrace
1612 if (g_conf->mds_verify_backtrace > 0 && frag == frag_t()) {
1613 rd.getxattr("parent", &fin->btbl, &fin->ret3);
1614 rd.set_last_op_flags(CEPH_OSD_OP_FLAG_FAILOK);
1615 } else {
1616 fin->ret3 = -ECANCELED;
1617 }
1618
1619 cache->mds->objecter->read(oid, oloc, rd, CEPH_NOSNAP, NULL, 0,
1620 new C_OnFinisher(fin, cache->mds->finisher));
1621 }
1622
1623 void CDir::_omap_fetch_more(
1624 bufferlist& hdrbl,
1625 map<string, bufferlist>& omap,
1626 MDSInternalContextBase *c)
1627 {
1628 // we have more omap keys to fetch!
1629 object_t oid = get_ondisk_object();
1630 object_locator_t oloc(cache->mds->mdsmap->get_metadata_pool());
1631 C_IO_Dir_OMAP_FetchedMore *fin = new C_IO_Dir_OMAP_FetchedMore(this, c);
1632 fin->hdrbl.claim(hdrbl);
1633 fin->omap.swap(omap);
1634 ObjectOperation rd;
1635 rd.omap_get_vals(fin->omap.rbegin()->first,
1636 "", /* filter prefix */
1637 g_conf->mds_dir_keys_per_op,
1638 &fin->omap_more,
1639 &fin->more,
1640 &fin->ret);
1641 cache->mds->objecter->read(oid, oloc, rd, CEPH_NOSNAP, NULL, 0,
1642 new C_OnFinisher(fin, cache->mds->finisher));
1643 }
1644
1645 CDentry *CDir::_load_dentry(
1646 const std::string &key,
1647 const std::string &dname,
1648 const snapid_t last,
1649 bufferlist &bl,
1650 const int pos,
1651 const std::set<snapid_t> *snaps,
1652 bool *force_dirty,
1653 list<CInode*> *undef_inodes)
1654 {
1655 bufferlist::iterator q = bl.begin();
1656
1657 snapid_t first;
1658 ::decode(first, q);
1659
1660 // marker
1661 char type;
1662 ::decode(type, q);
1663
1664 dout(20) << "_fetched pos " << pos << " marker '" << type << "' dname '" << dname
1665 << " [" << first << "," << last << "]"
1666 << dendl;
1667
1668 bool stale = false;
1669 if (snaps && last != CEPH_NOSNAP) {
1670 set<snapid_t>::const_iterator p = snaps->lower_bound(first);
1671 if (p == snaps->end() || *p > last) {
1672 dout(10) << " skipping stale dentry on [" << first << "," << last << "]" << dendl;
1673 stale = true;
1674 }
1675 }
1676
1677 /*
1678 * look for existing dentry for _last_ snap, because unlink +
1679 * create may leave a "hole" (epochs during which the dentry
1680 * doesn't exist) but for which no explicit negative dentry is in
1681 * the cache.
1682 */
1683 CDentry *dn;
1684 if (stale)
1685 dn = lookup_exact_snap(dname, last);
1686 else
1687 dn = lookup(dname, last);
1688
1689 if (type == 'L') {
1690 // hard link
1691 inodeno_t ino;
1692 unsigned char d_type;
1693 ::decode(ino, q);
1694 ::decode(d_type, q);
1695
1696 if (stale) {
1697 if (!dn) {
1698 stale_items.insert(key);
1699 *force_dirty = true;
1700 }
1701 return dn;
1702 }
1703
1704 if (dn) {
1705 if (dn->get_linkage()->get_inode() == 0) {
1706 dout(12) << "_fetched had NEG dentry " << *dn << dendl;
1707 } else {
1708 dout(12) << "_fetched had dentry " << *dn << dendl;
1709 }
1710 } else {
1711 // (remote) link
1712 dn = add_remote_dentry(dname, ino, d_type, first, last);
1713
1714 // link to inode?
1715 CInode *in = cache->get_inode(ino); // we may or may not have it.
1716 if (in) {
1717 dn->link_remote(dn->get_linkage(), in);
1718 dout(12) << "_fetched got remote link " << ino << " which we have " << *in << dendl;
1719 } else {
1720 dout(12) << "_fetched got remote link " << ino << " (dont' have it)" << dendl;
1721 }
1722 }
1723 }
1724 else if (type == 'I') {
1725 // inode
1726
1727 // Load inode data before looking up or constructing CInode
1728 InodeStore inode_data;
1729 inode_data.decode_bare(q);
1730
1731 if (stale) {
1732 if (!dn) {
1733 stale_items.insert(key);
1734 *force_dirty = true;
1735 }
1736 return dn;
1737 }
1738
1739 bool undef_inode = false;
1740 if (dn) {
1741 CInode *in = dn->get_linkage()->get_inode();
1742 if (in) {
1743 dout(12) << "_fetched had dentry " << *dn << dendl;
1744 if (in->state_test(CInode::STATE_REJOINUNDEF)) {
1745 undef_inodes->push_back(in);
1746 undef_inode = true;
1747 }
1748 } else
1749 dout(12) << "_fetched had NEG dentry " << *dn << dendl;
1750 }
1751
1752 if (!dn || undef_inode) {
1753 // add inode
1754 CInode *in = cache->get_inode(inode_data.inode.ino, last);
1755 if (!in || undef_inode) {
1756 if (undef_inode && in)
1757 in->first = first;
1758 else
1759 in = new CInode(cache, true, first, last);
1760
1761 in->inode = inode_data.inode;
1762 // symlink?
1763 if (in->is_symlink())
1764 in->symlink = inode_data.symlink;
1765
1766 in->dirfragtree.swap(inode_data.dirfragtree);
1767 in->xattrs.swap(inode_data.xattrs);
1768 in->old_inodes.swap(inode_data.old_inodes);
1769 if (!in->old_inodes.empty()) {
1770 snapid_t min_first = in->old_inodes.rbegin()->first + 1;
1771 if (min_first > in->first)
1772 in->first = min_first;
1773 }
1774
1775 in->oldest_snap = inode_data.oldest_snap;
1776 in->decode_snap_blob(inode_data.snap_blob);
1777 if (snaps && !in->snaprealm)
1778 in->purge_stale_snap_data(*snaps);
1779
1780 if (!undef_inode) {
1781 cache->add_inode(in); // add
1782 dn = add_primary_dentry(dname, in, first, last); // link
1783 }
1784 dout(12) << "_fetched got " << *dn << " " << *in << dendl;
1785
1786 if (in->inode.is_dirty_rstat())
1787 in->mark_dirty_rstat();
1788
1789 //in->hack_accessed = false;
1790 //in->hack_load_stamp = ceph_clock_now();
1791 //num_new_inodes_loaded++;
1792 } else {
1793 dout(0) << "_fetched badness: got (but i already had) " << *in
1794 << " mode " << in->inode.mode
1795 << " mtime " << in->inode.mtime << dendl;
1796 string dirpath, inopath;
1797 this->inode->make_path_string(dirpath);
1798 in->make_path_string(inopath);
1799 cache->mds->clog->error() << "loaded dup inode " << inode_data.inode.ino
1800 << " [" << first << "," << last << "] v" << inode_data.inode.version
1801 << " at " << dirpath << "/" << dname
1802 << ", but inode " << in->vino() << " v" << in->inode.version
1803 << " already exists at " << inopath;
1804 return dn;
1805 }
1806 }
1807 } else {
1808 std::ostringstream oss;
1809 oss << "Invalid tag char '" << type << "' pos " << pos;
1810 throw buffer::malformed_input(oss.str());
1811 }
1812
1813 return dn;
1814 }
1815
1816 void CDir::_omap_fetched(bufferlist& hdrbl, map<string, bufferlist>& omap,
1817 bool complete, int r)
1818 {
1819 LogChannelRef clog = cache->mds->clog;
1820 dout(10) << "_fetched header " << hdrbl.length() << " bytes "
1821 << omap.size() << " keys for " << *this << dendl;
1822
1823 assert(r == 0 || r == -ENOENT || r == -ENODATA);
1824 assert(is_auth());
1825 assert(!is_frozen());
1826
1827 if (hdrbl.length() == 0) {
1828 dout(0) << "_fetched missing object for " << *this << dendl;
1829
1830 clog->error() << "dir " << dirfrag() << " object missing on disk; some "
1831 "files may be lost (" << get_path() << ")";
1832
1833 go_bad(complete);
1834 return;
1835 }
1836
1837 fnode_t got_fnode;
1838 {
1839 bufferlist::iterator p = hdrbl.begin();
1840 try {
1841 ::decode(got_fnode, p);
1842 } catch (const buffer::error &err) {
1843 derr << "Corrupt fnode in dirfrag " << dirfrag()
1844 << ": " << err << dendl;
1845 clog->warn() << "Corrupt fnode header in " << dirfrag() << ": "
1846 << err << " (" << get_path() << ")";
1847 go_bad(complete);
1848 return;
1849 }
1850 if (!p.end()) {
1851 clog->warn() << "header buffer of dir " << dirfrag() << " has "
1852 << hdrbl.length() - p.get_off() << " extra bytes ("
1853 << get_path() << ")";
1854 go_bad(complete);
1855 return;
1856 }
1857 }
1858
1859 dout(10) << "_fetched version " << got_fnode.version << dendl;
1860
1861 // take the loaded fnode?
1862 // only if we are a fresh CDir* with no prior state.
1863 if (get_version() == 0) {
1864 assert(!is_projected());
1865 assert(!state_test(STATE_COMMITTING));
1866 fnode = got_fnode;
1867 projected_version = committing_version = committed_version = got_fnode.version;
1868
1869 if (state_test(STATE_REJOINUNDEF)) {
1870 assert(cache->mds->is_rejoin());
1871 state_clear(STATE_REJOINUNDEF);
1872 cache->opened_undef_dirfrag(this);
1873 }
1874 }
1875
1876 list<CInode*> undef_inodes;
1877
1878 // purge stale snaps?
1879 // only if we have past_parents open!
1880 bool force_dirty = false;
1881 const set<snapid_t> *snaps = NULL;
1882 SnapRealm *realm = inode->find_snaprealm();
1883 if (!realm->have_past_parents_open()) {
1884 dout(10) << " no snap purge, one or more past parents NOT open" << dendl;
1885 } else if (fnode.snap_purged_thru < realm->get_last_destroyed()) {
1886 snaps = &realm->get_snaps();
1887 dout(10) << " snap_purged_thru " << fnode.snap_purged_thru
1888 << " < " << realm->get_last_destroyed()
1889 << ", snap purge based on " << *snaps << dendl;
1890 if (get_num_snap_items() == 0) {
1891 fnode.snap_purged_thru = realm->get_last_destroyed();
1892 force_dirty = true;
1893 }
1894 }
1895
1896 unsigned pos = omap.size() - 1;
1897 for (map<string, bufferlist>::reverse_iterator p = omap.rbegin();
1898 p != omap.rend();
1899 ++p, --pos) {
1900 string dname;
1901 snapid_t last;
1902 dentry_key_t::decode_helper(p->first, dname, last);
1903
1904 CDentry *dn = NULL;
1905 try {
1906 dn = _load_dentry(
1907 p->first, dname, last, p->second, pos, snaps,
1908 &force_dirty, &undef_inodes);
1909 } catch (const buffer::error &err) {
1910 cache->mds->clog->warn() << "Corrupt dentry '" << dname << "' in "
1911 "dir frag " << dirfrag() << ": "
1912 << err << "(" << get_path() << ")";
1913
1914 // Remember that this dentry is damaged. Subsequent operations
1915 // that try to act directly on it will get their EIOs, but this
1916 // dirfrag as a whole will continue to look okay (minus the
1917 // mysteriously-missing dentry)
1918 go_bad_dentry(last, dname);
1919
1920 // Anyone who was WAIT_DENTRY for this guy will get kicked
1921 // to RetryRequest, and hit the DamageTable-interrogating path.
1922 // Stats will now be bogus because we will think we're complete,
1923 // but have 1 or more missing dentries.
1924 continue;
1925 }
1926
1927 if (dn && (wanted_items.count(dname) > 0 || !complete)) {
1928 dout(10) << " touching wanted dn " << *dn << dendl;
1929 inode->mdcache->touch_dentry(dn);
1930 }
1931
1932 /** clean underwater item?
1933 * Underwater item is something that is dirty in our cache from
1934 * journal replay, but was previously flushed to disk before the
1935 * mds failed.
1936 *
1937 * We only do this is committed_version == 0. that implies either
1938 * - this is a fetch after from a clean/empty CDir is created
1939 * (and has no effect, since the dn won't exist); or
1940 * - this is a fetch after _recovery_, which is what we're worried
1941 * about. Items that are marked dirty from the journal should be
1942 * marked clean if they appear on disk.
1943 */
1944 if (committed_version == 0 &&
1945 dn &&
1946 dn->get_version() <= got_fnode.version &&
1947 dn->is_dirty()) {
1948 dout(10) << "_fetched had underwater dentry " << *dn << ", marking clean" << dendl;
1949 dn->mark_clean();
1950
1951 if (dn->get_linkage()->is_primary()) {
1952 assert(dn->get_linkage()->get_inode()->get_version() <= got_fnode.version);
1953 dout(10) << "_fetched had underwater inode " << *dn->get_linkage()->get_inode() << ", marking clean" << dendl;
1954 dn->get_linkage()->get_inode()->mark_clean();
1955 }
1956 }
1957 }
1958
1959 //cache->mds->logger->inc("newin", num_new_inodes_loaded);
1960
1961 // mark complete, !fetching
1962 if (complete) {
1963 wanted_items.clear();
1964 mark_complete();
1965 state_clear(STATE_FETCHING);
1966
1967 if (scrub_infop && scrub_infop->need_scrub_local) {
1968 scrub_infop->need_scrub_local = false;
1969 scrub_local();
1970 }
1971 }
1972
1973 // open & force frags
1974 while (!undef_inodes.empty()) {
1975 CInode *in = undef_inodes.front();
1976 undef_inodes.pop_front();
1977 in->state_clear(CInode::STATE_REJOINUNDEF);
1978 cache->opened_undef_inode(in);
1979 }
1980
1981 // dirty myself to remove stale snap dentries
1982 if (force_dirty && !inode->mdcache->is_readonly())
1983 log_mark_dirty();
1984
1985 auth_unpin(this);
1986
1987 if (complete) {
1988 // kick waiters
1989 finish_waiting(WAIT_COMPLETE, 0);
1990 }
1991 }
1992
1993 void CDir::_go_bad()
1994 {
1995 if (get_version() == 0)
1996 set_version(1);
1997 state_set(STATE_BADFRAG);
1998 // mark complete, !fetching
1999 mark_complete();
2000 state_clear(STATE_FETCHING);
2001 auth_unpin(this);
2002
2003 // kick waiters
2004 finish_waiting(WAIT_COMPLETE, -EIO);
2005 }
2006
2007 void CDir::go_bad_dentry(snapid_t last, const std::string &dname)
2008 {
2009 dout(10) << "go_bad_dentry " << dname << dendl;
2010 const bool fatal = cache->mds->damage_table.notify_dentry(
2011 inode->ino(), frag, last, dname, get_path() + "/" + dname);
2012 if (fatal) {
2013 cache->mds->damaged();
2014 ceph_abort(); // unreachable, damaged() respawns us
2015 }
2016 }
2017
2018 void CDir::go_bad(bool complete)
2019 {
2020 dout(10) << "go_bad " << frag << dendl;
2021 const bool fatal = cache->mds->damage_table.notify_dirfrag(
2022 inode->ino(), frag, get_path());
2023 if (fatal) {
2024 cache->mds->damaged();
2025 ceph_abort(); // unreachable, damaged() respawns us
2026 }
2027
2028 if (complete)
2029 _go_bad();
2030 else
2031 auth_unpin(this);
2032 }
2033
2034 // -----------------------
2035 // COMMIT
2036
2037 /**
2038 * commit
2039 *
2040 * @param want - min version i want committed
2041 * @param c - callback for completion
2042 */
2043 void CDir::commit(version_t want, MDSInternalContextBase *c, bool ignore_authpinnability, int op_prio)
2044 {
2045 dout(10) << "commit want " << want << " on " << *this << dendl;
2046 if (want == 0) want = get_version();
2047
2048 // preconditions
2049 assert(want <= get_version() || get_version() == 0); // can't commit the future
2050 assert(want > committed_version); // the caller is stupid
2051 assert(is_auth());
2052 assert(ignore_authpinnability || can_auth_pin());
2053
2054 // note: queue up a noop if necessary, so that we always
2055 // get an auth_pin.
2056 if (!c)
2057 c = new C_MDSInternalNoop;
2058
2059 // auth_pin on first waiter
2060 if (waiting_for_commit.empty())
2061 auth_pin(this);
2062 waiting_for_commit[want].push_back(c);
2063
2064 // ok.
2065 _commit(want, op_prio);
2066 }
2067
2068 class C_IO_Dir_Committed : public CDirIOContext {
2069 version_t version;
2070 public:
2071 C_IO_Dir_Committed(CDir *d, version_t v) : CDirIOContext(d), version(v) { }
2072 void finish(int r) override {
2073 dir->_committed(r, version);
2074 }
2075 };
2076
2077 /**
2078 * Flush out the modified dentries in this dir. Keep the bufferlist
2079 * below max_write_size;
2080 */
2081 void CDir::_omap_commit(int op_prio)
2082 {
2083 dout(10) << "_omap_commit" << dendl;
2084
2085 unsigned max_write_size = cache->max_dir_commit_size;
2086 unsigned write_size = 0;
2087
2088 if (op_prio < 0)
2089 op_prio = CEPH_MSG_PRIO_DEFAULT;
2090
2091 // snap purge?
2092 const set<snapid_t> *snaps = NULL;
2093 SnapRealm *realm = inode->find_snaprealm();
2094 if (!realm->have_past_parents_open()) {
2095 dout(10) << " no snap purge, one or more past parents NOT open" << dendl;
2096 } else if (fnode.snap_purged_thru < realm->get_last_destroyed()) {
2097 snaps = &realm->get_snaps();
2098 dout(10) << " snap_purged_thru " << fnode.snap_purged_thru
2099 << " < " << realm->get_last_destroyed()
2100 << ", snap purge based on " << *snaps << dendl;
2101 // fnode.snap_purged_thru = realm->get_last_destroyed();
2102 }
2103
2104 set<string> to_remove;
2105 map<string, bufferlist> to_set;
2106
2107 C_GatherBuilder gather(g_ceph_context,
2108 new C_OnFinisher(new C_IO_Dir_Committed(this,
2109 get_version()),
2110 cache->mds->finisher));
2111
2112 SnapContext snapc;
2113 object_t oid = get_ondisk_object();
2114 object_locator_t oloc(cache->mds->mdsmap->get_metadata_pool());
2115
2116 if (!stale_items.empty()) {
2117 for (compact_set<string>::iterator p = stale_items.begin();
2118 p != stale_items.end();
2119 ++p) {
2120 to_remove.insert(*p);
2121 write_size += (*p).length();
2122 }
2123 stale_items.clear();
2124 }
2125
2126 for (map_t::iterator p = items.begin();
2127 p != items.end(); ) {
2128 CDentry *dn = p->second;
2129 ++p;
2130
2131 string key;
2132 dn->key().encode(key);
2133
2134 if (dn->last != CEPH_NOSNAP &&
2135 snaps && try_trim_snap_dentry(dn, *snaps)) {
2136 dout(10) << " rm " << key << dendl;
2137 write_size += key.length();
2138 to_remove.insert(key);
2139 continue;
2140 }
2141
2142 if (!dn->is_dirty() &&
2143 (!dn->state_test(CDentry::STATE_FRAGMENTING) || dn->get_linkage()->is_null()))
2144 continue; // skip clean dentries
2145
2146 if (dn->get_linkage()->is_null()) {
2147 dout(10) << " rm " << dn->name << " " << *dn << dendl;
2148 write_size += key.length();
2149 to_remove.insert(key);
2150 } else {
2151 dout(10) << " set " << dn->name << " " << *dn << dendl;
2152 bufferlist dnbl;
2153 _encode_dentry(dn, dnbl, snaps);
2154 write_size += key.length() + dnbl.length();
2155 to_set[key].swap(dnbl);
2156 }
2157
2158 if (write_size >= max_write_size) {
2159 ObjectOperation op;
2160 op.priority = op_prio;
2161
2162 // don't create new dirfrag blindly
2163 if (!is_new() && !state_test(CDir::STATE_FRAGMENTING))
2164 op.stat(NULL, (ceph::real_time*) NULL, NULL);
2165
2166 if (!to_set.empty())
2167 op.omap_set(to_set);
2168 if (!to_remove.empty())
2169 op.omap_rm_keys(to_remove);
2170
2171 cache->mds->objecter->mutate(oid, oloc, op, snapc,
2172 ceph::real_clock::now(),
2173 0, gather.new_sub());
2174
2175 write_size = 0;
2176 to_set.clear();
2177 to_remove.clear();
2178 }
2179 }
2180
2181 ObjectOperation op;
2182 op.priority = op_prio;
2183
2184 // don't create new dirfrag blindly
2185 if (!is_new() && !state_test(CDir::STATE_FRAGMENTING))
2186 op.stat(NULL, (ceph::real_time*)NULL, NULL);
2187
2188 /*
2189 * save the header at the last moment.. If we were to send it off before other
2190 * updates, but die before sending them all, we'd think that the on-disk state
2191 * was fully committed even though it wasn't! However, since the messages are
2192 * strictly ordered between the MDS and the OSD, and since messages to a given
2193 * PG are strictly ordered, if we simply send the message containing the header
2194 * off last, we cannot get our header into an incorrect state.
2195 */
2196 bufferlist header;
2197 ::encode(fnode, header);
2198 op.omap_set_header(header);
2199
2200 if (!to_set.empty())
2201 op.omap_set(to_set);
2202 if (!to_remove.empty())
2203 op.omap_rm_keys(to_remove);
2204
2205 cache->mds->objecter->mutate(oid, oloc, op, snapc,
2206 ceph::real_clock::now(),
2207 0, gather.new_sub());
2208
2209 gather.activate();
2210 }
2211
2212 void CDir::_encode_dentry(CDentry *dn, bufferlist& bl,
2213 const set<snapid_t> *snaps)
2214 {
2215 // clear dentry NEW flag, if any. we can no longer silently drop it.
2216 dn->clear_new();
2217
2218 ::encode(dn->first, bl);
2219
2220 // primary or remote?
2221 if (dn->linkage.is_remote()) {
2222 inodeno_t ino = dn->linkage.get_remote_ino();
2223 unsigned char d_type = dn->linkage.get_remote_d_type();
2224 dout(14) << " pos " << bl.length() << " dn '" << dn->name << "' remote ino " << ino << dendl;
2225
2226 // marker, name, ino
2227 bl.append('L'); // remote link
2228 ::encode(ino, bl);
2229 ::encode(d_type, bl);
2230 } else if (dn->linkage.is_primary()) {
2231 // primary link
2232 CInode *in = dn->linkage.get_inode();
2233 assert(in);
2234
2235 dout(14) << " pos " << bl.length() << " dn '" << dn->name << "' inode " << *in << dendl;
2236
2237 // marker, name, inode, [symlink string]
2238 bl.append('I'); // inode
2239
2240 if (in->is_multiversion()) {
2241 if (!in->snaprealm) {
2242 if (snaps)
2243 in->purge_stale_snap_data(*snaps);
2244 } else if (in->snaprealm->have_past_parents_open()) {
2245 in->purge_stale_snap_data(in->snaprealm->get_snaps());
2246 }
2247 }
2248
2249 bufferlist snap_blob;
2250 in->encode_snap_blob(snap_blob);
2251 in->encode_bare(bl, cache->mds->mdsmap->get_up_features(), &snap_blob);
2252 } else {
2253 assert(!dn->linkage.is_null());
2254 }
2255 }
2256
2257 void CDir::_commit(version_t want, int op_prio)
2258 {
2259 dout(10) << "_commit want " << want << " on " << *this << dendl;
2260
2261 // we can't commit things in the future.
2262 // (even the projected future.)
2263 assert(want <= get_version() || get_version() == 0);
2264
2265 // check pre+postconditions.
2266 assert(is_auth());
2267
2268 // already committed?
2269 if (committed_version >= want) {
2270 dout(10) << "already committed " << committed_version << " >= " << want << dendl;
2271 return;
2272 }
2273 // already committing >= want?
2274 if (committing_version >= want) {
2275 dout(10) << "already committing " << committing_version << " >= " << want << dendl;
2276 assert(state_test(STATE_COMMITTING));
2277 return;
2278 }
2279
2280 // alrady committed an older version?
2281 if (committing_version > committed_version) {
2282 dout(10) << "already committing older " << committing_version << ", waiting for that to finish" << dendl;
2283 return;
2284 }
2285
2286 // commit.
2287 committing_version = get_version();
2288
2289 // mark committing (if not already)
2290 if (!state_test(STATE_COMMITTING)) {
2291 dout(10) << "marking committing" << dendl;
2292 state_set(STATE_COMMITTING);
2293 }
2294
2295 if (cache->mds->logger) cache->mds->logger->inc(l_mds_dir_commit);
2296
2297 _omap_commit(op_prio);
2298 }
2299
2300
2301 /**
2302 * _committed
2303 *
2304 * @param v version i just committed
2305 */
2306 void CDir::_committed(int r, version_t v)
2307 {
2308 if (r < 0) {
2309 // the directory could be partly purged during MDS failover
2310 if (r == -ENOENT && committed_version == 0 &&
2311 !inode->is_base() && get_parent_dir()->inode->is_stray()) {
2312 r = 0;
2313 if (inode->snaprealm)
2314 inode->state_set(CInode::STATE_MISSINGOBJS);
2315 }
2316 if (r < 0) {
2317 dout(1) << "commit error " << r << " v " << v << dendl;
2318 cache->mds->clog->error() << "failed to commit dir " << dirfrag() << " object,"
2319 << " errno " << r;
2320 cache->mds->handle_write_error(r);
2321 return;
2322 }
2323 }
2324
2325 dout(10) << "_committed v " << v << " on " << *this << dendl;
2326 assert(is_auth());
2327
2328 bool stray = inode->is_stray();
2329
2330 // take note.
2331 assert(v > committed_version);
2332 assert(v <= committing_version);
2333 committed_version = v;
2334
2335 // _all_ commits done?
2336 if (committing_version == committed_version)
2337 state_clear(CDir::STATE_COMMITTING);
2338
2339 // _any_ commit, even if we've been redirtied, means we're no longer new.
2340 item_new.remove_myself();
2341
2342 // dir clean?
2343 if (committed_version == get_version())
2344 mark_clean();
2345
2346 // dentries clean?
2347 for (map_t::iterator it = items.begin();
2348 it != items.end(); ) {
2349 CDentry *dn = it->second;
2350 ++it;
2351
2352 // inode?
2353 if (dn->linkage.is_primary()) {
2354 CInode *in = dn->linkage.get_inode();
2355 assert(in);
2356 assert(in->is_auth());
2357
2358 if (committed_version >= in->get_version()) {
2359 if (in->is_dirty()) {
2360 dout(15) << " dir " << committed_version << " >= inode " << in->get_version() << " now clean " << *in << dendl;
2361 in->mark_clean();
2362 }
2363 } else {
2364 dout(15) << " dir " << committed_version << " < inode " << in->get_version() << " still dirty " << *in << dendl;
2365 assert(in->is_dirty() || in->last < CEPH_NOSNAP); // special case for cow snap items (not predirtied)
2366 }
2367 }
2368
2369 // dentry
2370 if (committed_version >= dn->get_version()) {
2371 if (dn->is_dirty()) {
2372 dout(15) << " dir " << committed_version << " >= dn " << dn->get_version() << " now clean " << *dn << dendl;
2373 dn->mark_clean();
2374
2375 // drop clean null stray dentries immediately
2376 if (stray &&
2377 dn->get_num_ref() == 0 &&
2378 !dn->is_projected() &&
2379 dn->get_linkage()->is_null())
2380 remove_dentry(dn);
2381 }
2382 } else {
2383 dout(15) << " dir " << committed_version << " < dn " << dn->get_version() << " still dirty " << *dn << dendl;
2384 }
2385 }
2386
2387 // finishers?
2388 bool were_waiters = !waiting_for_commit.empty();
2389
2390 compact_map<version_t, list<MDSInternalContextBase*> >::iterator p = waiting_for_commit.begin();
2391 while (p != waiting_for_commit.end()) {
2392 compact_map<version_t, list<MDSInternalContextBase*> >::iterator n = p;
2393 ++n;
2394 if (p->first > committed_version) {
2395 dout(10) << " there are waiters for " << p->first << ", committing again" << dendl;
2396 _commit(p->first, -1);
2397 break;
2398 }
2399 cache->mds->queue_waiters(p->second);
2400 waiting_for_commit.erase(p);
2401 p = n;
2402 }
2403
2404 // try drop dentries in this dirfrag if it's about to be purged
2405 if (!inode->is_base() && get_parent_dir()->inode->is_stray() &&
2406 inode->snaprealm)
2407 cache->maybe_eval_stray(inode, true);
2408
2409 // unpin if we kicked the last waiter.
2410 if (were_waiters &&
2411 waiting_for_commit.empty())
2412 auth_unpin(this);
2413 }
2414
2415
2416
2417
2418 // IMPORT/EXPORT
2419
2420 void CDir::encode_export(bufferlist& bl)
2421 {
2422 assert(!is_projected());
2423 ::encode(first, bl);
2424 ::encode(fnode, bl);
2425 ::encode(dirty_old_rstat, bl);
2426 ::encode(committed_version, bl);
2427
2428 ::encode(state, bl);
2429 ::encode(dir_rep, bl);
2430
2431 ::encode(pop_me, bl);
2432 ::encode(pop_auth_subtree, bl);
2433
2434 ::encode(dir_rep_by, bl);
2435 ::encode(replica_map, bl);
2436
2437 get(PIN_TEMPEXPORTING);
2438 }
2439
2440 void CDir::finish_export(utime_t now)
2441 {
2442 state &= MASK_STATE_EXPORT_KEPT;
2443 pop_auth_subtree_nested.sub(now, cache->decayrate, pop_auth_subtree);
2444 pop_me.zero(now);
2445 pop_auth_subtree.zero(now);
2446 put(PIN_TEMPEXPORTING);
2447 dirty_old_rstat.clear();
2448 }
2449
2450 void CDir::decode_import(bufferlist::iterator& blp, utime_t now, LogSegment *ls)
2451 {
2452 ::decode(first, blp);
2453 ::decode(fnode, blp);
2454 ::decode(dirty_old_rstat, blp);
2455 projected_version = fnode.version;
2456 ::decode(committed_version, blp);
2457 committing_version = committed_version;
2458
2459 unsigned s;
2460 ::decode(s, blp);
2461 state &= MASK_STATE_IMPORT_KEPT;
2462 state_set(STATE_AUTH | (s & MASK_STATE_EXPORTED));
2463
2464 if (is_dirty()) {
2465 get(PIN_DIRTY);
2466 _mark_dirty(ls);
2467 }
2468
2469 ::decode(dir_rep, blp);
2470
2471 ::decode(pop_me, now, blp);
2472 ::decode(pop_auth_subtree, now, blp);
2473 pop_auth_subtree_nested.add(now, cache->decayrate, pop_auth_subtree);
2474
2475 ::decode(dir_rep_by, blp);
2476 ::decode(replica_map, blp);
2477 if (!replica_map.empty()) get(PIN_REPLICATED);
2478
2479 replica_nonce = 0; // no longer defined
2480
2481 // did we import some dirty scatterlock data?
2482 if (dirty_old_rstat.size() ||
2483 !(fnode.rstat == fnode.accounted_rstat)) {
2484 cache->mds->locker->mark_updated_scatterlock(&inode->nestlock);
2485 ls->dirty_dirfrag_nest.push_back(&inode->item_dirty_dirfrag_nest);
2486 }
2487 if (!(fnode.fragstat == fnode.accounted_fragstat)) {
2488 cache->mds->locker->mark_updated_scatterlock(&inode->filelock);
2489 ls->dirty_dirfrag_dir.push_back(&inode->item_dirty_dirfrag_dir);
2490 }
2491 if (is_dirty_dft()) {
2492 if (inode->dirfragtreelock.get_state() != LOCK_MIX &&
2493 inode->dirfragtreelock.is_stable()) {
2494 // clear stale dirtydft
2495 state_clear(STATE_DIRTYDFT);
2496 } else {
2497 cache->mds->locker->mark_updated_scatterlock(&inode->dirfragtreelock);
2498 ls->dirty_dirfrag_dirfragtree.push_back(&inode->item_dirty_dirfrag_dirfragtree);
2499 }
2500 }
2501 }
2502
2503
2504
2505
2506 /********************************
2507 * AUTHORITY
2508 */
2509
2510 /*
2511 * if dir_auth.first == parent, auth is same as inode.
2512 * unless .second != unknown, in which case that sticks.
2513 */
2514 mds_authority_t CDir::authority() const
2515 {
2516 if (is_subtree_root())
2517 return dir_auth;
2518 else
2519 return inode->authority();
2520 }
2521
2522 /** is_subtree_root()
2523 * true if this is an auth delegation point.
2524 * that is, dir_auth != default (parent,unknown)
2525 *
2526 * some key observations:
2527 * if i am auth:
2528 * - any region bound will be an export, or frozen.
2529 *
2530 * note that this DOES heed dir_auth.pending
2531 */
2532 /*
2533 bool CDir::is_subtree_root()
2534 {
2535 if (dir_auth == CDIR_AUTH_DEFAULT) {
2536 //dout(10) << "is_subtree_root false " << dir_auth << " != " << CDIR_AUTH_DEFAULT
2537 //<< " on " << ino() << dendl;
2538 return false;
2539 } else {
2540 //dout(10) << "is_subtree_root true " << dir_auth << " != " << CDIR_AUTH_DEFAULT
2541 //<< " on " << ino() << dendl;
2542 return true;
2543 }
2544 }
2545 */
2546
2547 /** contains(x)
2548 * true if we are x, or an ancestor of x
2549 */
2550 bool CDir::contains(CDir *x)
2551 {
2552 while (1) {
2553 if (x == this)
2554 return true;
2555 x = x->get_inode()->get_projected_parent_dir();
2556 if (x == 0)
2557 return false;
2558 }
2559 }
2560
2561
2562
2563 /** set_dir_auth
2564 */
2565 void CDir::set_dir_auth(mds_authority_t a)
2566 {
2567 dout(10) << "setting dir_auth=" << a
2568 << " from " << dir_auth
2569 << " on " << *this << dendl;
2570
2571 bool was_subtree = is_subtree_root();
2572 bool was_ambiguous = dir_auth.second >= 0;
2573
2574 // set it.
2575 dir_auth = a;
2576
2577 // new subtree root?
2578 if (!was_subtree && is_subtree_root()) {
2579 dout(10) << " new subtree root, adjusting auth_pins" << dendl;
2580
2581 // adjust nested auth pins
2582 if (get_cum_auth_pins())
2583 inode->adjust_nested_auth_pins(-1, NULL);
2584
2585 // unpin parent of frozen dir/tree?
2586 if (inode->is_auth()) {
2587 assert(!is_frozen_tree_root());
2588 if (is_frozen_dir())
2589 inode->auth_unpin(this);
2590 }
2591 }
2592 if (was_subtree && !is_subtree_root()) {
2593 dout(10) << " old subtree root, adjusting auth_pins" << dendl;
2594
2595 // adjust nested auth pins
2596 if (get_cum_auth_pins())
2597 inode->adjust_nested_auth_pins(1, NULL);
2598
2599 // pin parent of frozen dir/tree?
2600 if (inode->is_auth()) {
2601 assert(!is_frozen_tree_root());
2602 if (is_frozen_dir())
2603 inode->auth_pin(this);
2604 }
2605 }
2606
2607 // newly single auth?
2608 if (was_ambiguous && dir_auth.second == CDIR_AUTH_UNKNOWN) {
2609 list<MDSInternalContextBase*> ls;
2610 take_waiting(WAIT_SINGLEAUTH, ls);
2611 cache->mds->queue_waiters(ls);
2612 }
2613 }
2614
2615
2616 /*****************************************
2617 * AUTH PINS and FREEZING
2618 *
2619 * the basic plan is that auth_pins only exist in auth regions, and they
2620 * prevent a freeze (and subsequent auth change).
2621 *
2622 * however, we also need to prevent a parent from freezing if a child is frozen.
2623 * for that reason, the parent inode of a frozen directory is auth_pinned.
2624 *
2625 * the oddity is when the frozen directory is a subtree root. if that's the case,
2626 * the parent inode isn't frozen. which means that when subtree authority is adjusted
2627 * at the bounds, inodes for any frozen bound directories need to get auth_pins at that
2628 * time.
2629 *
2630 */
2631
2632 void CDir::auth_pin(void *by)
2633 {
2634 if (auth_pins == 0)
2635 get(PIN_AUTHPIN);
2636 auth_pins++;
2637
2638 #ifdef MDS_AUTHPIN_SET
2639 auth_pin_set.insert(by);
2640 #endif
2641
2642 dout(10) << "auth_pin by " << by
2643 << " on " << *this
2644 << " count now " << auth_pins << " + " << nested_auth_pins << dendl;
2645
2646 // nest pins?
2647 if (!is_subtree_root() &&
2648 get_cum_auth_pins() == 1)
2649 inode->adjust_nested_auth_pins(1, by);
2650 }
2651
2652 void CDir::auth_unpin(void *by)
2653 {
2654 auth_pins--;
2655
2656 #ifdef MDS_AUTHPIN_SET
2657 assert(auth_pin_set.count(by));
2658 auth_pin_set.erase(auth_pin_set.find(by));
2659 #endif
2660 if (auth_pins == 0)
2661 put(PIN_AUTHPIN);
2662
2663 dout(10) << "auth_unpin by " << by
2664 << " on " << *this
2665 << " count now " << auth_pins << " + " << nested_auth_pins << dendl;
2666 assert(auth_pins >= 0);
2667
2668 int newcum = get_cum_auth_pins();
2669
2670 maybe_finish_freeze(); // pending freeze?
2671
2672 // nest?
2673 if (!is_subtree_root() &&
2674 newcum == 0)
2675 inode->adjust_nested_auth_pins(-1, by);
2676 }
2677
2678 void CDir::adjust_nested_auth_pins(int inc, int dirinc, void *by)
2679 {
2680 assert(inc);
2681 nested_auth_pins += inc;
2682 dir_auth_pins += dirinc;
2683
2684 dout(15) << "adjust_nested_auth_pins " << inc << "/" << dirinc << " on " << *this
2685 << " by " << by << " count now "
2686 << auth_pins << " + " << nested_auth_pins << dendl;
2687 assert(nested_auth_pins >= 0);
2688 assert(dir_auth_pins >= 0);
2689
2690 int newcum = get_cum_auth_pins();
2691
2692 maybe_finish_freeze(); // pending freeze?
2693
2694 // nest?
2695 if (!is_subtree_root()) {
2696 if (newcum == 0)
2697 inode->adjust_nested_auth_pins(-1, by);
2698 else if (newcum == inc)
2699 inode->adjust_nested_auth_pins(1, by);
2700 }
2701 }
2702
2703 #ifdef MDS_VERIFY_FRAGSTAT
2704 void CDir::verify_fragstat()
2705 {
2706 assert(is_complete());
2707 if (inode->is_stray())
2708 return;
2709
2710 frag_info_t c;
2711 memset(&c, 0, sizeof(c));
2712
2713 for (map_t::iterator it = items.begin();
2714 it != items.end();
2715 ++it) {
2716 CDentry *dn = it->second;
2717 if (dn->is_null())
2718 continue;
2719
2720 dout(10) << " " << *dn << dendl;
2721 if (dn->is_primary())
2722 dout(10) << " " << *dn->inode << dendl;
2723
2724 if (dn->is_primary()) {
2725 if (dn->inode->is_dir())
2726 c.nsubdirs++;
2727 else
2728 c.nfiles++;
2729 }
2730 if (dn->is_remote()) {
2731 if (dn->get_remote_d_type() == DT_DIR)
2732 c.nsubdirs++;
2733 else
2734 c.nfiles++;
2735 }
2736 }
2737
2738 if (c.nsubdirs != fnode.fragstat.nsubdirs ||
2739 c.nfiles != fnode.fragstat.nfiles) {
2740 dout(0) << "verify_fragstat failed " << fnode.fragstat << " on " << *this << dendl;
2741 dout(0) << " i count " << c << dendl;
2742 ceph_abort();
2743 } else {
2744 dout(0) << "verify_fragstat ok " << fnode.fragstat << " on " << *this << dendl;
2745 }
2746 }
2747 #endif
2748
2749 /*****************************************************************************
2750 * FREEZING
2751 */
2752
2753 // FREEZE TREE
2754
2755 bool CDir::freeze_tree()
2756 {
2757 assert(!is_frozen());
2758 assert(!is_freezing());
2759
2760 auth_pin(this);
2761 if (is_freezeable(true)) {
2762 _freeze_tree();
2763 auth_unpin(this);
2764 return true;
2765 } else {
2766 state_set(STATE_FREEZINGTREE);
2767 ++num_freezing_trees;
2768 dout(10) << "freeze_tree waiting " << *this << dendl;
2769 return false;
2770 }
2771 }
2772
2773 void CDir::_freeze_tree()
2774 {
2775 dout(10) << "_freeze_tree " << *this << dendl;
2776 assert(is_freezeable(true));
2777
2778 // twiddle state
2779 if (state_test(STATE_FREEZINGTREE)) {
2780 state_clear(STATE_FREEZINGTREE); // actually, this may get set again by next context?
2781 --num_freezing_trees;
2782 }
2783
2784 if (is_auth()) {
2785 mds_authority_t auth;
2786 bool was_subtree = is_subtree_root();
2787 if (was_subtree) {
2788 auth = get_dir_auth();
2789 } else {
2790 // temporarily prevent parent subtree from becoming frozen.
2791 inode->auth_pin(this);
2792 // create new subtree
2793 auth = authority();
2794 }
2795
2796 assert(auth.first >= 0);
2797 assert(auth.second == CDIR_AUTH_UNKNOWN);
2798 auth.second = auth.first;
2799 inode->mdcache->adjust_subtree_auth(this, auth);
2800 if (!was_subtree)
2801 inode->auth_unpin(this);
2802 }
2803
2804 state_set(STATE_FROZENTREE);
2805 ++num_frozen_trees;
2806 get(PIN_FROZEN);
2807 }
2808
2809 void CDir::unfreeze_tree()
2810 {
2811 dout(10) << "unfreeze_tree " << *this << dendl;
2812
2813 if (state_test(STATE_FROZENTREE)) {
2814 // frozen. unfreeze.
2815 state_clear(STATE_FROZENTREE);
2816 --num_frozen_trees;
2817
2818 put(PIN_FROZEN);
2819
2820 if (is_auth()) {
2821 // must be subtree
2822 assert(is_subtree_root());
2823 // for debug purpose, caller should ensure 'dir_auth.second == dir_auth.first'
2824 mds_authority_t auth = get_dir_auth();
2825 assert(auth.first >= 0);
2826 assert(auth.second == auth.first);
2827 auth.second = CDIR_AUTH_UNKNOWN;
2828 inode->mdcache->adjust_subtree_auth(this, auth);
2829 }
2830
2831 // waiters?
2832 finish_waiting(WAIT_UNFREEZE);
2833 } else {
2834 finish_waiting(WAIT_FROZEN, -1);
2835
2836 // freezing. stop it.
2837 assert(state_test(STATE_FREEZINGTREE));
2838 state_clear(STATE_FREEZINGTREE);
2839 --num_freezing_trees;
2840 auth_unpin(this);
2841
2842 finish_waiting(WAIT_UNFREEZE);
2843 }
2844 }
2845
2846 bool CDir::is_freezing_tree() const
2847 {
2848 if (num_freezing_trees == 0)
2849 return false;
2850 const CDir *dir = this;
2851 while (1) {
2852 if (dir->is_freezing_tree_root()) return true;
2853 if (dir->is_subtree_root()) return false;
2854 if (dir->inode->parent)
2855 dir = dir->inode->parent->dir;
2856 else
2857 return false; // root on replica
2858 }
2859 }
2860
2861 bool CDir::is_frozen_tree() const
2862 {
2863 if (num_frozen_trees == 0)
2864 return false;
2865 const CDir *dir = this;
2866 while (1) {
2867 if (dir->is_frozen_tree_root()) return true;
2868 if (dir->is_subtree_root()) return false;
2869 if (dir->inode->parent)
2870 dir = dir->inode->parent->dir;
2871 else
2872 return false; // root on replica
2873 }
2874 }
2875
2876 CDir *CDir::get_frozen_tree_root()
2877 {
2878 assert(is_frozen());
2879 CDir *dir = this;
2880 while (1) {
2881 if (dir->is_frozen_tree_root())
2882 return dir;
2883 if (dir->inode->parent)
2884 dir = dir->inode->parent->dir;
2885 else
2886 ceph_abort();
2887 }
2888 }
2889
2890 class C_Dir_AuthUnpin : public CDirContext {
2891 public:
2892 explicit C_Dir_AuthUnpin(CDir *d) : CDirContext(d) {}
2893 void finish(int r) override {
2894 dir->auth_unpin(dir->get_inode());
2895 }
2896 };
2897
2898 void CDir::maybe_finish_freeze()
2899 {
2900 if (auth_pins != 1 || dir_auth_pins != 0)
2901 return;
2902
2903 // we can freeze the _dir_ even with nested pins...
2904 if (state_test(STATE_FREEZINGDIR)) {
2905 _freeze_dir();
2906 auth_unpin(this);
2907 finish_waiting(WAIT_FROZEN);
2908 }
2909
2910 if (nested_auth_pins != 0)
2911 return;
2912
2913 if (state_test(STATE_FREEZINGTREE)) {
2914 if (!is_subtree_root() && inode->is_frozen()) {
2915 dout(10) << "maybe_finish_freeze !subtree root and frozen inode, waiting for unfreeze on " << inode << dendl;
2916 // retake an auth_pin...
2917 auth_pin(inode);
2918 // and release it when the parent inode unfreezes
2919 inode->add_waiter(WAIT_UNFREEZE, new C_Dir_AuthUnpin(this));
2920 return;
2921 }
2922
2923 _freeze_tree();
2924 auth_unpin(this);
2925 finish_waiting(WAIT_FROZEN);
2926 }
2927 }
2928
2929
2930
2931 // FREEZE DIR
2932
2933 bool CDir::freeze_dir()
2934 {
2935 assert(!is_frozen());
2936 assert(!is_freezing());
2937
2938 auth_pin(this);
2939 if (is_freezeable_dir(true)) {
2940 _freeze_dir();
2941 auth_unpin(this);
2942 return true;
2943 } else {
2944 state_set(STATE_FREEZINGDIR);
2945 dout(10) << "freeze_dir + wait " << *this << dendl;
2946 return false;
2947 }
2948 }
2949
2950 void CDir::_freeze_dir()
2951 {
2952 dout(10) << "_freeze_dir " << *this << dendl;
2953 //assert(is_freezeable_dir(true));
2954 // not always true during split because the original fragment may have frozen a while
2955 // ago and we're just now getting around to breaking it up.
2956
2957 state_clear(STATE_FREEZINGDIR);
2958 state_set(STATE_FROZENDIR);
2959 get(PIN_FROZEN);
2960
2961 if (is_auth() && !is_subtree_root())
2962 inode->auth_pin(this); // auth_pin for duration of freeze
2963 }
2964
2965
2966 void CDir::unfreeze_dir()
2967 {
2968 dout(10) << "unfreeze_dir " << *this << dendl;
2969
2970 if (state_test(STATE_FROZENDIR)) {
2971 state_clear(STATE_FROZENDIR);
2972 put(PIN_FROZEN);
2973
2974 // unpin (may => FREEZEABLE) FIXME: is this order good?
2975 if (is_auth() && !is_subtree_root())
2976 inode->auth_unpin(this);
2977
2978 finish_waiting(WAIT_UNFREEZE);
2979 } else {
2980 finish_waiting(WAIT_FROZEN, -1);
2981
2982 // still freezing. stop.
2983 assert(state_test(STATE_FREEZINGDIR));
2984 state_clear(STATE_FREEZINGDIR);
2985 auth_unpin(this);
2986
2987 finish_waiting(WAIT_UNFREEZE);
2988 }
2989 }
2990
2991 /**
2992 * Slightly less complete than operator<<, because this is intended
2993 * for identifying a directory and its state rather than for dumping
2994 * debug output.
2995 */
2996 void CDir::dump(Formatter *f) const
2997 {
2998 assert(f != NULL);
2999
3000 f->dump_stream("path") << get_path();
3001
3002 f->dump_stream("dirfrag") << dirfrag();
3003 f->dump_int("snapid_first", first);
3004
3005 f->dump_stream("projected_version") << get_projected_version();
3006 f->dump_stream("version") << get_version();
3007 f->dump_stream("committing_version") << get_committing_version();
3008 f->dump_stream("committed_version") << get_committed_version();
3009
3010 f->dump_bool("is_rep", is_rep());
3011
3012 if (get_dir_auth() != CDIR_AUTH_DEFAULT) {
3013 if (get_dir_auth().second == CDIR_AUTH_UNKNOWN) {
3014 f->dump_stream("dir_auth") << get_dir_auth().first;
3015 } else {
3016 f->dump_stream("dir_auth") << get_dir_auth();
3017 }
3018 } else {
3019 f->dump_string("dir_auth", "");
3020 }
3021
3022 f->open_array_section("states");
3023 MDSCacheObject::dump_states(f);
3024 if (state_test(CDir::STATE_COMPLETE)) f->dump_string("state", "complete");
3025 if (state_test(CDir::STATE_FREEZINGTREE)) f->dump_string("state", "freezingtree");
3026 if (state_test(CDir::STATE_FROZENTREE)) f->dump_string("state", "frozentree");
3027 if (state_test(CDir::STATE_FROZENDIR)) f->dump_string("state", "frozendir");
3028 if (state_test(CDir::STATE_FREEZINGDIR)) f->dump_string("state", "freezingdir");
3029 if (state_test(CDir::STATE_EXPORTBOUND)) f->dump_string("state", "exportbound");
3030 if (state_test(CDir::STATE_IMPORTBOUND)) f->dump_string("state", "importbound");
3031 if (state_test(CDir::STATE_BADFRAG)) f->dump_string("state", "badfrag");
3032 f->close_section();
3033
3034 MDSCacheObject::dump(f);
3035 }
3036
3037 /****** Scrub Stuff *******/
3038
3039 void CDir::scrub_info_create() const
3040 {
3041 assert(!scrub_infop);
3042
3043 // break out of const-land to set up implicit initial state
3044 CDir *me = const_cast<CDir*>(this);
3045 fnode_t *fn = me->get_projected_fnode();
3046
3047 std::unique_ptr<scrub_info_t> si(new scrub_info_t());
3048
3049 si->last_recursive.version = si->recursive_start.version =
3050 fn->recursive_scrub_version;
3051 si->last_recursive.time = si->recursive_start.time =
3052 fn->recursive_scrub_stamp;
3053
3054 si->last_local.version = fn->localized_scrub_version;
3055 si->last_local.time = fn->localized_scrub_stamp;
3056
3057 me->scrub_infop.swap(si);
3058 }
3059
3060 void CDir::scrub_initialize(const ScrubHeaderRefConst& header)
3061 {
3062 dout(20) << __func__ << dendl;
3063 assert(is_complete());
3064 assert(header != nullptr);
3065
3066 // FIXME: weird implicit construction, is someone else meant
3067 // to be calling scrub_info_create first?
3068 scrub_info();
3069 assert(scrub_infop && !scrub_infop->directory_scrubbing);
3070
3071 scrub_infop->recursive_start.version = get_projected_version();
3072 scrub_infop->recursive_start.time = ceph_clock_now();
3073
3074 scrub_infop->directories_to_scrub.clear();
3075 scrub_infop->directories_scrubbing.clear();
3076 scrub_infop->directories_scrubbed.clear();
3077 scrub_infop->others_to_scrub.clear();
3078 scrub_infop->others_scrubbing.clear();
3079 scrub_infop->others_scrubbed.clear();
3080
3081 for (map_t::iterator i = items.begin();
3082 i != items.end();
3083 ++i) {
3084 // TODO: handle snapshot scrubbing
3085 if (i->first.snapid != CEPH_NOSNAP)
3086 continue;
3087
3088 CDentry::linkage_t *dnl = i->second->get_projected_linkage();
3089 if (dnl->is_primary()) {
3090 if (dnl->get_inode()->is_dir())
3091 scrub_infop->directories_to_scrub.insert(i->first);
3092 else
3093 scrub_infop->others_to_scrub.insert(i->first);
3094 } else if (dnl->is_remote()) {
3095 // TODO: check remote linkage
3096 }
3097 }
3098 scrub_infop->directory_scrubbing = true;
3099 scrub_infop->header = header;
3100 }
3101
3102 void CDir::scrub_finished()
3103 {
3104 dout(20) << __func__ << dendl;
3105 assert(scrub_infop && scrub_infop->directory_scrubbing);
3106
3107 assert(scrub_infop->directories_to_scrub.empty());
3108 assert(scrub_infop->directories_scrubbing.empty());
3109 scrub_infop->directories_scrubbed.clear();
3110 assert(scrub_infop->others_to_scrub.empty());
3111 assert(scrub_infop->others_scrubbing.empty());
3112 scrub_infop->others_scrubbed.clear();
3113 scrub_infop->directory_scrubbing = false;
3114
3115 scrub_infop->last_recursive = scrub_infop->recursive_start;
3116 scrub_infop->last_scrub_dirty = true;
3117 }
3118
3119 int CDir::_next_dentry_on_set(set<dentry_key_t>& dns, bool missing_okay,
3120 MDSInternalContext *cb, CDentry **dnout)
3121 {
3122 dentry_key_t dnkey;
3123 CDentry *dn;
3124
3125 while (!dns.empty()) {
3126 set<dentry_key_t>::iterator front = dns.begin();
3127 dnkey = *front;
3128 dn = lookup(dnkey.name);
3129 if (!dn) {
3130 if (!is_complete() &&
3131 (!has_bloom() || is_in_bloom(dnkey.name))) {
3132 // need to re-read this dirfrag
3133 fetch(cb);
3134 return EAGAIN;
3135 }
3136 // okay, we lost it
3137 if (missing_okay) {
3138 dout(15) << " we no longer have directory dentry "
3139 << dnkey.name << ", assuming it got renamed" << dendl;
3140 dns.erase(dnkey);
3141 continue;
3142 } else {
3143 dout(5) << " we lost dentry " << dnkey.name
3144 << ", bailing out because that's impossible!" << dendl;
3145 ceph_abort();
3146 }
3147 }
3148 // okay, we got a dentry
3149 dns.erase(dnkey);
3150
3151 if (dn->get_projected_version() < scrub_infop->last_recursive.version &&
3152 !(scrub_infop->header->get_force())) {
3153 dout(15) << " skip dentry " << dnkey.name
3154 << ", no change since last scrub" << dendl;
3155 continue;
3156 }
3157
3158 *dnout = dn;
3159 return 0;
3160 }
3161 *dnout = NULL;
3162 return ENOENT;
3163 }
3164
3165 int CDir::scrub_dentry_next(MDSInternalContext *cb, CDentry **dnout)
3166 {
3167 dout(20) << __func__ << dendl;
3168 assert(scrub_infop && scrub_infop->directory_scrubbing);
3169
3170 dout(20) << "trying to scrub directories underneath us" << dendl;
3171 int rval = _next_dentry_on_set(scrub_infop->directories_to_scrub, true,
3172 cb, dnout);
3173 if (rval == 0) {
3174 dout(20) << __func__ << " inserted to directories scrubbing: "
3175 << *dnout << dendl;
3176 scrub_infop->directories_scrubbing.insert((*dnout)->key());
3177 } else if (rval == EAGAIN) {
3178 // we don't need to do anything else
3179 } else { // we emptied out the directory scrub set
3180 assert(rval == ENOENT);
3181 dout(20) << "no directories left, moving on to other kinds of dentries"
3182 << dendl;
3183
3184 rval = _next_dentry_on_set(scrub_infop->others_to_scrub, false, cb, dnout);
3185 if (rval == 0) {
3186 dout(20) << __func__ << " inserted to others scrubbing: "
3187 << *dnout << dendl;
3188 scrub_infop->others_scrubbing.insert((*dnout)->key());
3189 }
3190 }
3191 dout(20) << " returning " << rval << " with dn=" << *dnout << dendl;
3192 return rval;
3193 }
3194
3195 void CDir::scrub_dentries_scrubbing(list<CDentry*> *out_dentries)
3196 {
3197 dout(20) << __func__ << dendl;
3198 assert(scrub_infop && scrub_infop->directory_scrubbing);
3199
3200 for (set<dentry_key_t>::iterator i =
3201 scrub_infop->directories_scrubbing.begin();
3202 i != scrub_infop->directories_scrubbing.end();
3203 ++i) {
3204 CDentry *d = lookup(i->name, i->snapid);
3205 assert(d);
3206 out_dentries->push_back(d);
3207 }
3208 for (set<dentry_key_t>::iterator i = scrub_infop->others_scrubbing.begin();
3209 i != scrub_infop->others_scrubbing.end();
3210 ++i) {
3211 CDentry *d = lookup(i->name, i->snapid);
3212 assert(d);
3213 out_dentries->push_back(d);
3214 }
3215 }
3216
3217 void CDir::scrub_dentry_finished(CDentry *dn)
3218 {
3219 dout(20) << __func__ << " on dn " << *dn << dendl;
3220 assert(scrub_infop && scrub_infop->directory_scrubbing);
3221 dentry_key_t dn_key = dn->key();
3222 if (scrub_infop->directories_scrubbing.erase(dn_key)) {
3223 scrub_infop->directories_scrubbed.insert(dn_key);
3224 } else {
3225 assert(scrub_infop->others_scrubbing.count(dn_key));
3226 scrub_infop->others_scrubbing.erase(dn_key);
3227 scrub_infop->others_scrubbed.insert(dn_key);
3228 }
3229 }
3230
3231 void CDir::scrub_maybe_delete_info()
3232 {
3233 if (scrub_infop &&
3234 !scrub_infop->directory_scrubbing &&
3235 !scrub_infop->need_scrub_local &&
3236 !scrub_infop->last_scrub_dirty &&
3237 !scrub_infop->pending_scrub_error &&
3238 scrub_infop->dirty_scrub_stamps.empty()) {
3239 scrub_infop.reset();
3240 }
3241 }
3242
3243 bool CDir::scrub_local()
3244 {
3245 assert(is_complete());
3246 bool rval = check_rstats(true);
3247
3248 scrub_info();
3249 if (rval) {
3250 scrub_infop->last_local.time = ceph_clock_now();
3251 scrub_infop->last_local.version = get_projected_version();
3252 scrub_infop->pending_scrub_error = false;
3253 scrub_infop->last_scrub_dirty = true;
3254 } else {
3255 scrub_infop->pending_scrub_error = true;
3256 if (scrub_infop->header->get_repair())
3257 cache->repair_dirfrag_stats(this);
3258 }
3259 return rval;
3260 }
3261
3262 std::string CDir::get_path() const
3263 {
3264 std::string path;
3265 get_inode()->make_path_string(path, true);
3266 return path;
3267 }
3268
3269 bool CDir::should_split_fast() const
3270 {
3271 // Max size a fragment can be before trigger fast splitting
3272 int fast_limit = g_conf->mds_bal_split_size * g_conf->mds_bal_fragment_fast_factor;
3273
3274 // Fast path: the sum of accounted size and null dentries does not
3275 // exceed threshold: we definitely are not over it.
3276 if (get_frag_size() + get_num_head_null() <= fast_limit) {
3277 return false;
3278 }
3279
3280 // Fast path: the accounted size of the frag exceeds threshold: we
3281 // definitely are over it
3282 if (get_frag_size() > fast_limit) {
3283 return true;
3284 }
3285
3286 int64_t effective_size = 0;
3287
3288 for (const auto &p : items) {
3289 const CDentry *dn = p.second;
3290 if (!dn->get_projected_linkage()->is_null()) {
3291 effective_size++;
3292 }
3293 }
3294
3295 return effective_size > fast_limit;
3296 }
3297