]> git.proxmox.com Git - ceph.git/blob - ceph/src/mds/CDir.cc
update sources to v12.2.5
[ceph.git] / ceph / src / mds / CDir.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include <boost/utility/string_view.hpp>
16
17 #include "include/types.h"
18
19 #include "CDir.h"
20 #include "CDentry.h"
21 #include "CInode.h"
22 #include "Mutation.h"
23
24 #include "MDSMap.h"
25 #include "MDSRank.h"
26 #include "MDCache.h"
27 #include "Locker.h"
28 #include "MDLog.h"
29 #include "LogSegment.h"
30
31 #include "common/bloom_filter.hpp"
32 #include "include/Context.h"
33 #include "common/Clock.h"
34
35 #include "osdc/Objecter.h"
36
37 #include "common/config.h"
38 #include "include/assert.h"
39 #include "include/compat.h"
40
41 #define dout_context g_ceph_context
42 #define dout_subsys ceph_subsys_mds
43 #undef dout_prefix
44 #define dout_prefix *_dout << "mds." << cache->mds->get_nodeid() << ".cache.dir(" << this->dirfrag() << ") "
45
46 int CDir::num_frozen_trees = 0;
47 int CDir::num_freezing_trees = 0;
48
49 class CDirContext : public MDSInternalContextBase
50 {
51 protected:
52 CDir *dir;
53 MDSRank* get_mds() override {return dir->cache->mds;}
54
55 public:
56 explicit CDirContext(CDir *d) : dir(d) {
57 assert(dir != NULL);
58 }
59 };
60
61
62 class CDirIOContext : public MDSIOContextBase
63 {
64 protected:
65 CDir *dir;
66 MDSRank* get_mds() override {return dir->cache->mds;}
67
68 public:
69 explicit CDirIOContext(CDir *d) : dir(d) {
70 assert(dir != NULL);
71 }
72 };
73
74
75 // PINS
76 //int cdir_pins[CDIR_NUM_PINS] = { 0,0,0,0,0,0,0,0,0,0,0,0,0,0 };
77
78
79 ostream& operator<<(ostream& out, const CDir& dir)
80 {
81 out << "[dir " << dir.dirfrag() << " " << dir.get_path() << "/"
82 << " [" << dir.first << ",head]";
83 if (dir.is_auth()) {
84 out << " auth";
85 if (dir.is_replicated())
86 out << dir.get_replicas();
87
88 if (dir.is_projected())
89 out << " pv=" << dir.get_projected_version();
90 out << " v=" << dir.get_version();
91 out << " cv=" << dir.get_committing_version();
92 out << "/" << dir.get_committed_version();
93 } else {
94 mds_authority_t a = dir.authority();
95 out << " rep@" << a.first;
96 if (a.second != CDIR_AUTH_UNKNOWN)
97 out << "," << a.second;
98 out << "." << dir.get_replica_nonce();
99 }
100
101 if (dir.is_rep()) out << " REP";
102
103 if (dir.get_dir_auth() != CDIR_AUTH_DEFAULT) {
104 if (dir.get_dir_auth().second == CDIR_AUTH_UNKNOWN)
105 out << " dir_auth=" << dir.get_dir_auth().first;
106 else
107 out << " dir_auth=" << dir.get_dir_auth();
108 }
109
110 if (dir.get_cum_auth_pins())
111 out << " ap=" << dir.get_auth_pins()
112 << "+" << dir.get_dir_auth_pins()
113 << "+" << dir.get_nested_auth_pins();
114
115 out << " state=" << dir.get_state();
116 if (dir.state_test(CDir::STATE_COMPLETE)) out << "|complete";
117 if (dir.state_test(CDir::STATE_FREEZINGTREE)) out << "|freezingtree";
118 if (dir.state_test(CDir::STATE_FROZENTREE)) out << "|frozentree";
119 if (dir.state_test(CDir::STATE_AUXSUBTREE)) out << "|auxsubtree";
120 //if (dir.state_test(CDir::STATE_FROZENTREELEAF)) out << "|frozentreeleaf";
121 if (dir.state_test(CDir::STATE_FROZENDIR)) out << "|frozendir";
122 if (dir.state_test(CDir::STATE_FREEZINGDIR)) out << "|freezingdir";
123 if (dir.state_test(CDir::STATE_EXPORTBOUND)) out << "|exportbound";
124 if (dir.state_test(CDir::STATE_IMPORTBOUND)) out << "|importbound";
125 if (dir.state_test(CDir::STATE_BADFRAG)) out << "|badfrag";
126 if (dir.state_test(CDir::STATE_FRAGMENTING)) out << "|fragmenting";
127
128 // fragstat
129 out << " " << dir.fnode.fragstat;
130 if (!(dir.fnode.fragstat == dir.fnode.accounted_fragstat))
131 out << "/" << dir.fnode.accounted_fragstat;
132 if (g_conf->mds_debug_scatterstat && dir.is_projected()) {
133 const fnode_t *pf = dir.get_projected_fnode();
134 out << "->" << pf->fragstat;
135 if (!(pf->fragstat == pf->accounted_fragstat))
136 out << "/" << pf->accounted_fragstat;
137 }
138
139 // rstat
140 out << " " << dir.fnode.rstat;
141 if (!(dir.fnode.rstat == dir.fnode.accounted_rstat))
142 out << "/" << dir.fnode.accounted_rstat;
143 if (g_conf->mds_debug_scatterstat && dir.is_projected()) {
144 const fnode_t *pf = dir.get_projected_fnode();
145 out << "->" << pf->rstat;
146 if (!(pf->rstat == pf->accounted_rstat))
147 out << "/" << pf->accounted_rstat;
148 }
149
150 out << " hs=" << dir.get_num_head_items() << "+" << dir.get_num_head_null();
151 out << ",ss=" << dir.get_num_snap_items() << "+" << dir.get_num_snap_null();
152 if (dir.get_num_dirty())
153 out << " dirty=" << dir.get_num_dirty();
154
155 if (dir.get_num_ref()) {
156 out << " |";
157 dir.print_pin_set(out);
158 }
159
160 out << " " << &dir;
161 return out << "]";
162 }
163
164
165 void CDir::print(ostream& out)
166 {
167 out << *this;
168 }
169
170
171
172
173 ostream& CDir::print_db_line_prefix(ostream& out)
174 {
175 return out << ceph_clock_now() << " mds." << cache->mds->get_nodeid() << ".cache.dir(" << this->dirfrag() << ") ";
176 }
177
178
179
180 // -------------------------------------------------------------------
181 // CDir
182
183 CDir::CDir(CInode *in, frag_t fg, MDCache *mdcache, bool auth) :
184 cache(mdcache), inode(in), frag(fg),
185 first(2),
186 dirty_rstat_inodes(member_offset(CInode, dirty_rstat_item)),
187 projected_version(0),
188 dirty_dentries(member_offset(CDentry, item_dir_dirty)),
189 item_dirty(this), item_new(this),
190 num_head_items(0), num_head_null(0),
191 num_snap_items(0), num_snap_null(0),
192 num_dirty(0), committing_version(0), committed_version(0),
193 dir_auth_pins(0), request_pins(0),
194 dir_rep(REP_NONE),
195 pop_me(ceph_clock_now()),
196 pop_nested(ceph_clock_now()),
197 pop_auth_subtree(ceph_clock_now()),
198 pop_auth_subtree_nested(ceph_clock_now()),
199 num_dentries_nested(0), num_dentries_auth_subtree(0),
200 num_dentries_auth_subtree_nested(0),
201 dir_auth(CDIR_AUTH_DEFAULT)
202 {
203 memset(&fnode, 0, sizeof(fnode));
204
205 // auth
206 assert(in->is_dir());
207 if (auth) state_set(STATE_AUTH);
208 }
209
210 /**
211 * Check the recursive statistics on size for consistency.
212 * If mds_debug_scatterstat is enabled, assert for correctness,
213 * otherwise just print out the mismatch and continue.
214 */
215 bool CDir::check_rstats(bool scrub)
216 {
217 if (!g_conf->mds_debug_scatterstat && !scrub)
218 return true;
219
220 dout(25) << "check_rstats on " << this << dendl;
221 if (!is_complete() || !is_auth() || is_frozen()) {
222 assert(!scrub);
223 dout(10) << "check_rstats bailing out -- incomplete or non-auth or frozen dir!" << dendl;
224 return true;
225 }
226
227 frag_info_t frag_info;
228 nest_info_t nest_info;
229 for (auto i = items.begin(); i != items.end(); ++i) {
230 if (i->second->last != CEPH_NOSNAP)
231 continue;
232 CDentry::linkage_t *dnl = i->second->get_linkage();
233 if (dnl->is_primary()) {
234 CInode *in = dnl->get_inode();
235 nest_info.add(in->inode.accounted_rstat);
236 if (in->is_dir())
237 frag_info.nsubdirs++;
238 else
239 frag_info.nfiles++;
240 } else if (dnl->is_remote())
241 frag_info.nfiles++;
242 }
243
244 bool good = true;
245 // fragstat
246 if(!frag_info.same_sums(fnode.fragstat)) {
247 dout(1) << "mismatch between head items and fnode.fragstat! printing dentries" << dendl;
248 dout(1) << "get_num_head_items() = " << get_num_head_items()
249 << "; fnode.fragstat.nfiles=" << fnode.fragstat.nfiles
250 << " fnode.fragstat.nsubdirs=" << fnode.fragstat.nsubdirs << dendl;
251 good = false;
252 } else {
253 dout(20) << "get_num_head_items() = " << get_num_head_items()
254 << "; fnode.fragstat.nfiles=" << fnode.fragstat.nfiles
255 << " fnode.fragstat.nsubdirs=" << fnode.fragstat.nsubdirs << dendl;
256 }
257
258 // rstat
259 if (!nest_info.same_sums(fnode.rstat)) {
260 dout(1) << "mismatch between child accounted_rstats and my rstats!" << dendl;
261 dout(1) << "total of child dentrys: " << nest_info << dendl;
262 dout(1) << "my rstats: " << fnode.rstat << dendl;
263 good = false;
264 } else {
265 dout(20) << "total of child dentrys: " << nest_info << dendl;
266 dout(20) << "my rstats: " << fnode.rstat << dendl;
267 }
268
269 if (!good) {
270 if (!scrub) {
271 for (auto i = items.begin(); i != items.end(); ++i) {
272 CDentry *dn = i->second;
273 if (dn->get_linkage()->is_primary()) {
274 CInode *in = dn->get_linkage()->inode;
275 dout(1) << *dn << " rstat " << in->inode.accounted_rstat << dendl;
276 } else {
277 dout(1) << *dn << dendl;
278 }
279 }
280
281 assert(frag_info.nfiles == fnode.fragstat.nfiles);
282 assert(frag_info.nsubdirs == fnode.fragstat.nsubdirs);
283 assert(nest_info.rbytes == fnode.rstat.rbytes);
284 assert(nest_info.rfiles == fnode.rstat.rfiles);
285 assert(nest_info.rsubdirs == fnode.rstat.rsubdirs);
286 }
287 }
288 dout(10) << "check_rstats complete on " << this << dendl;
289 return good;
290 }
291
292 CDentry *CDir::lookup(boost::string_view name, snapid_t snap)
293 {
294 dout(20) << "lookup (" << snap << ", '" << name << "')" << dendl;
295 auto iter = items.lower_bound(dentry_key_t(snap, name, inode->hash_dentry_name(name)));
296 if (iter == items.end())
297 return 0;
298 if (iter->second->get_name() == name &&
299 iter->second->first <= snap &&
300 iter->second->last >= snap) {
301 dout(20) << " hit -> " << iter->first << dendl;
302 return iter->second;
303 }
304 dout(20) << " miss -> " << iter->first << dendl;
305 return 0;
306 }
307
308 CDentry *CDir::lookup_exact_snap(boost::string_view name, snapid_t last) {
309 auto p = items.find(dentry_key_t(last, name, inode->hash_dentry_name(name)));
310 if (p == items.end())
311 return NULL;
312 return p->second;
313 }
314
315 /***
316 * linking fun
317 */
318
319 CDentry* CDir::add_null_dentry(boost::string_view dname,
320 snapid_t first, snapid_t last)
321 {
322 // foreign
323 assert(lookup_exact_snap(dname, last) == 0);
324
325 // create dentry
326 CDentry* dn = new CDentry(dname, inode->hash_dentry_name(dname), first, last);
327 if (is_auth())
328 dn->state_set(CDentry::STATE_AUTH);
329
330 cache->bottom_lru.lru_insert_mid(dn);
331 dn->state_set(CDentry::STATE_BOTTOMLRU);
332
333 dn->dir = this;
334 dn->version = get_projected_version();
335
336 // add to dir
337 assert(items.count(dn->key()) == 0);
338 //assert(null_items.count(dn->get_name()) == 0);
339
340 items[dn->key()] = dn;
341 if (last == CEPH_NOSNAP)
342 num_head_null++;
343 else
344 num_snap_null++;
345
346 if (state_test(CDir::STATE_DNPINNEDFRAG)) {
347 dn->get(CDentry::PIN_FRAGMENTING);
348 dn->state_set(CDentry::STATE_FRAGMENTING);
349 }
350
351 dout(12) << "add_null_dentry " << *dn << dendl;
352
353 // pin?
354 if (get_num_any() == 1)
355 get(PIN_CHILD);
356
357 assert(get_num_any() == items.size());
358 return dn;
359 }
360
361
362 CDentry* CDir::add_primary_dentry(boost::string_view dname, CInode *in,
363 snapid_t first, snapid_t last)
364 {
365 // primary
366 assert(lookup_exact_snap(dname, last) == 0);
367
368 // create dentry
369 CDentry* dn = new CDentry(dname, inode->hash_dentry_name(dname), first, last);
370 if (is_auth())
371 dn->state_set(CDentry::STATE_AUTH);
372 if (is_auth() || !inode->is_stray()) {
373 cache->lru.lru_insert_mid(dn);
374 } else {
375 cache->bottom_lru.lru_insert_mid(dn);
376 dn->state_set(CDentry::STATE_BOTTOMLRU);
377 }
378
379 dn->dir = this;
380 dn->version = get_projected_version();
381
382 // add to dir
383 assert(items.count(dn->key()) == 0);
384 //assert(null_items.count(dn->get_name()) == 0);
385
386 items[dn->key()] = dn;
387
388 dn->get_linkage()->inode = in;
389 in->set_primary_parent(dn);
390
391 link_inode_work(dn, in);
392
393 if (dn->last == CEPH_NOSNAP)
394 num_head_items++;
395 else
396 num_snap_items++;
397
398 if (state_test(CDir::STATE_DNPINNEDFRAG)) {
399 dn->get(CDentry::PIN_FRAGMENTING);
400 dn->state_set(CDentry::STATE_FRAGMENTING);
401 }
402
403 dout(12) << "add_primary_dentry " << *dn << dendl;
404
405 // pin?
406 if (get_num_any() == 1)
407 get(PIN_CHILD);
408 assert(get_num_any() == items.size());
409 return dn;
410 }
411
412 CDentry* CDir::add_remote_dentry(boost::string_view dname, inodeno_t ino, unsigned char d_type,
413 snapid_t first, snapid_t last)
414 {
415 // foreign
416 assert(lookup_exact_snap(dname, last) == 0);
417
418 // create dentry
419 CDentry* dn = new CDentry(dname, inode->hash_dentry_name(dname), ino, d_type, first, last);
420 if (is_auth())
421 dn->state_set(CDentry::STATE_AUTH);
422 cache->lru.lru_insert_mid(dn);
423
424 dn->dir = this;
425 dn->version = get_projected_version();
426
427 // add to dir
428 assert(items.count(dn->key()) == 0);
429 //assert(null_items.count(dn->get_name()) == 0);
430
431 items[dn->key()] = dn;
432 if (last == CEPH_NOSNAP)
433 num_head_items++;
434 else
435 num_snap_items++;
436
437 if (state_test(CDir::STATE_DNPINNEDFRAG)) {
438 dn->get(CDentry::PIN_FRAGMENTING);
439 dn->state_set(CDentry::STATE_FRAGMENTING);
440 }
441
442 dout(12) << "add_remote_dentry " << *dn << dendl;
443
444 // pin?
445 if (get_num_any() == 1)
446 get(PIN_CHILD);
447
448 assert(get_num_any() == items.size());
449 return dn;
450 }
451
452
453
454 void CDir::remove_dentry(CDentry *dn)
455 {
456 dout(12) << "remove_dentry " << *dn << dendl;
457
458 // there should be no client leases at this point!
459 assert(dn->client_lease_map.empty());
460
461 if (state_test(CDir::STATE_DNPINNEDFRAG)) {
462 dn->put(CDentry::PIN_FRAGMENTING);
463 dn->state_clear(CDentry::STATE_FRAGMENTING);
464 }
465
466 if (dn->get_linkage()->is_null()) {
467 if (dn->last == CEPH_NOSNAP)
468 num_head_null--;
469 else
470 num_snap_null--;
471 } else {
472 if (dn->last == CEPH_NOSNAP)
473 num_head_items--;
474 else
475 num_snap_items--;
476 }
477
478 if (!dn->get_linkage()->is_null())
479 // detach inode and dentry
480 unlink_inode_work(dn);
481
482 // remove from list
483 assert(items.count(dn->key()) == 1);
484 items.erase(dn->key());
485
486 // clean?
487 if (dn->is_dirty())
488 dn->mark_clean();
489
490 if (dn->state_test(CDentry::STATE_BOTTOMLRU))
491 cache->bottom_lru.lru_remove(dn);
492 else
493 cache->lru.lru_remove(dn);
494 delete dn;
495
496 // unpin?
497 if (get_num_any() == 0)
498 put(PIN_CHILD);
499 assert(get_num_any() == items.size());
500 }
501
502 void CDir::link_remote_inode(CDentry *dn, CInode *in)
503 {
504 link_remote_inode(dn, in->ino(), IFTODT(in->get_projected_inode()->mode));
505 }
506
507 void CDir::link_remote_inode(CDentry *dn, inodeno_t ino, unsigned char d_type)
508 {
509 dout(12) << "link_remote_inode " << *dn << " remote " << ino << dendl;
510 assert(dn->get_linkage()->is_null());
511
512 dn->get_linkage()->set_remote(ino, d_type);
513
514 if (dn->state_test(CDentry::STATE_BOTTOMLRU)) {
515 cache->bottom_lru.lru_remove(dn);
516 cache->lru.lru_insert_mid(dn);
517 dn->state_clear(CDentry::STATE_BOTTOMLRU);
518 }
519
520 if (dn->last == CEPH_NOSNAP) {
521 num_head_items++;
522 num_head_null--;
523 } else {
524 num_snap_items++;
525 num_snap_null--;
526 }
527 assert(get_num_any() == items.size());
528 }
529
530 void CDir::link_primary_inode(CDentry *dn, CInode *in)
531 {
532 dout(12) << "link_primary_inode " << *dn << " " << *in << dendl;
533 assert(dn->get_linkage()->is_null());
534
535 dn->get_linkage()->inode = in;
536 in->set_primary_parent(dn);
537
538 link_inode_work(dn, in);
539
540 if (dn->state_test(CDentry::STATE_BOTTOMLRU) &&
541 (is_auth() || !inode->is_stray())) {
542 cache->bottom_lru.lru_remove(dn);
543 cache->lru.lru_insert_mid(dn);
544 dn->state_clear(CDentry::STATE_BOTTOMLRU);
545 }
546
547 if (dn->last == CEPH_NOSNAP) {
548 num_head_items++;
549 num_head_null--;
550 } else {
551 num_snap_items++;
552 num_snap_null--;
553 }
554
555 assert(get_num_any() == items.size());
556 }
557
558 void CDir::link_inode_work( CDentry *dn, CInode *in)
559 {
560 assert(dn->get_linkage()->get_inode() == in);
561 assert(in->get_parent_dn() == dn);
562
563 // set inode version
564 //in->inode.version = dn->get_version();
565
566 // pin dentry?
567 if (in->get_num_ref())
568 dn->get(CDentry::PIN_INODEPIN);
569
570 // adjust auth pin count
571 if (in->auth_pins + in->nested_auth_pins)
572 dn->adjust_nested_auth_pins(in->auth_pins + in->nested_auth_pins, in->auth_pins, NULL);
573
574 // verify open snaprealm parent
575 if (in->snaprealm)
576 in->snaprealm->adjust_parent();
577 else if (in->is_any_caps())
578 in->move_to_realm(inode->find_snaprealm());
579 }
580
581 void CDir::unlink_inode(CDentry *dn, bool adjust_lru)
582 {
583 if (dn->get_linkage()->is_primary()) {
584 dout(12) << "unlink_inode " << *dn << " " << *dn->get_linkage()->get_inode() << dendl;
585 } else {
586 dout(12) << "unlink_inode " << *dn << dendl;
587 }
588
589 unlink_inode_work(dn);
590
591 if (adjust_lru && !dn->state_test(CDentry::STATE_BOTTOMLRU)) {
592 cache->lru.lru_remove(dn);
593 cache->bottom_lru.lru_insert_mid(dn);
594 dn->state_set(CDentry::STATE_BOTTOMLRU);
595 }
596
597 if (dn->last == CEPH_NOSNAP) {
598 num_head_items--;
599 num_head_null++;
600 } else {
601 num_snap_items--;
602 num_snap_null++;
603 }
604 assert(get_num_any() == items.size());
605 }
606
607
608 void CDir::try_remove_unlinked_dn(CDentry *dn)
609 {
610 assert(dn->dir == this);
611 assert(dn->get_linkage()->is_null());
612
613 // no pins (besides dirty)?
614 if (dn->get_num_ref() != dn->is_dirty())
615 return;
616
617 // was the dn new?
618 if (dn->is_new()) {
619 dout(10) << "try_remove_unlinked_dn " << *dn << " in " << *this << dendl;
620 if (dn->is_dirty())
621 dn->mark_clean();
622 remove_dentry(dn);
623
624 // NOTE: we may not have any more dirty dentries, but the fnode
625 // still changed, so the directory must remain dirty.
626 }
627 }
628
629
630 void CDir::unlink_inode_work( CDentry *dn )
631 {
632 CInode *in = dn->get_linkage()->get_inode();
633
634 if (dn->get_linkage()->is_remote()) {
635 // remote
636 if (in)
637 dn->unlink_remote(dn->get_linkage());
638
639 dn->get_linkage()->set_remote(0, 0);
640 } else if (dn->get_linkage()->is_primary()) {
641 // primary
642 // unpin dentry?
643 if (in->get_num_ref())
644 dn->put(CDentry::PIN_INODEPIN);
645
646 // unlink auth_pin count
647 if (in->auth_pins + in->nested_auth_pins)
648 dn->adjust_nested_auth_pins(0 - (in->auth_pins + in->nested_auth_pins), 0 - in->auth_pins, NULL);
649
650 // detach inode
651 in->remove_primary_parent(dn);
652 dn->get_linkage()->inode = 0;
653 } else {
654 assert(!dn->get_linkage()->is_null());
655 }
656 }
657
658 void CDir::add_to_bloom(CDentry *dn)
659 {
660 assert(dn->last == CEPH_NOSNAP);
661 if (!bloom) {
662 /* not create bloom filter for incomplete dir that was added by log replay */
663 if (!is_complete())
664 return;
665
666 /* don't maintain bloom filters in standby replay (saves cycles, and also
667 * avoids need to implement clearing it in EExport for #16924) */
668 if (cache->mds->is_standby_replay()) {
669 return;
670 }
671
672 unsigned size = get_num_head_items() + get_num_snap_items();
673 if (size < 100) size = 100;
674 bloom.reset(new bloom_filter(size, 1.0 / size, 0));
675 }
676 /* This size and false positive probability is completely random.*/
677 bloom->insert(dn->get_name().data(), dn->get_name().size());
678 }
679
680 bool CDir::is_in_bloom(boost::string_view name)
681 {
682 if (!bloom)
683 return false;
684 return bloom->contains(name.data(), name.size());
685 }
686
687 void CDir::remove_null_dentries() {
688 dout(12) << "remove_null_dentries " << *this << dendl;
689
690 auto p = items.begin();
691 while (p != items.end()) {
692 CDentry *dn = p->second;
693 ++p;
694 if (dn->get_linkage()->is_null() && !dn->is_projected())
695 remove_dentry(dn);
696 }
697
698 assert(num_snap_null == 0);
699 assert(num_head_null == 0);
700 assert(get_num_any() == items.size());
701 }
702
703 /** remove dirty null dentries for deleted directory. the dirfrag will be
704 * deleted soon, so it's safe to not commit dirty dentries.
705 *
706 * This is called when a directory is being deleted, a prerequisite
707 * of which is that its children have been unlinked: we expect to only see
708 * null, unprojected dentries here.
709 */
710 void CDir::try_remove_dentries_for_stray()
711 {
712 dout(10) << __func__ << dendl;
713 assert(get_parent_dir()->inode->is_stray());
714
715 // clear dirty only when the directory was not snapshotted
716 bool clear_dirty = !inode->snaprealm;
717
718 auto p = items.begin();
719 while (p != items.end()) {
720 CDentry *dn = p->second;
721 ++p;
722 if (dn->last == CEPH_NOSNAP) {
723 assert(!dn->is_projected());
724 assert(dn->get_linkage()->is_null());
725 if (clear_dirty && dn->is_dirty())
726 dn->mark_clean();
727 // It's OK to remove lease prematurely because we will never link
728 // the dentry to inode again.
729 if (dn->is_any_leases())
730 dn->remove_client_leases(cache->mds->locker);
731 if (dn->get_num_ref() == 0)
732 remove_dentry(dn);
733 } else {
734 assert(!dn->is_projected());
735 CDentry::linkage_t *dnl= dn->get_linkage();
736 CInode *in = NULL;
737 if (dnl->is_primary()) {
738 in = dnl->get_inode();
739 if (clear_dirty && in->is_dirty())
740 in->mark_clean();
741 }
742 if (clear_dirty && dn->is_dirty())
743 dn->mark_clean();
744 if (dn->get_num_ref() == 0) {
745 remove_dentry(dn);
746 if (in)
747 cache->remove_inode(in);
748 }
749 }
750 }
751
752 if (clear_dirty && is_dirty())
753 mark_clean();
754 }
755
756 bool CDir::try_trim_snap_dentry(CDentry *dn, const set<snapid_t>& snaps)
757 {
758 assert(dn->last != CEPH_NOSNAP);
759 set<snapid_t>::const_iterator p = snaps.lower_bound(dn->first);
760 CDentry::linkage_t *dnl= dn->get_linkage();
761 CInode *in = 0;
762 if (dnl->is_primary())
763 in = dnl->get_inode();
764 if ((p == snaps.end() || *p > dn->last) &&
765 (dn->get_num_ref() == dn->is_dirty()) &&
766 (!in || in->get_num_ref() == in->is_dirty())) {
767 dout(10) << " purging snapped " << *dn << dendl;
768 if (in && in->is_dirty())
769 in->mark_clean();
770 remove_dentry(dn);
771 if (in) {
772 dout(10) << " purging snapped " << *in << dendl;
773 cache->remove_inode(in);
774 }
775 return true;
776 }
777 return false;
778 }
779
780
781 void CDir::purge_stale_snap_data(const set<snapid_t>& snaps)
782 {
783 dout(10) << "purge_stale_snap_data " << snaps << dendl;
784
785 auto p = items.begin();
786 while (p != items.end()) {
787 CDentry *dn = p->second;
788 ++p;
789
790 if (dn->last == CEPH_NOSNAP)
791 continue;
792
793 try_trim_snap_dentry(dn, snaps);
794 }
795 }
796
797
798 /**
799 * steal_dentry -- semi-violently move a dentry from one CDir to another
800 * (*) violently, in that nitems, most pins, etc. are not correctly maintained
801 * on the old CDir corpse; must call finish_old_fragment() when finished.
802 */
803 void CDir::steal_dentry(CDentry *dn)
804 {
805 dout(15) << "steal_dentry " << *dn << dendl;
806
807 items[dn->key()] = dn;
808
809 dn->dir->items.erase(dn->key());
810 if (dn->dir->items.empty())
811 dn->dir->put(PIN_CHILD);
812
813 if (get_num_any() == 0)
814 get(PIN_CHILD);
815 if (dn->get_linkage()->is_null()) {
816 if (dn->last == CEPH_NOSNAP)
817 num_head_null++;
818 else
819 num_snap_null++;
820 } else if (dn->last == CEPH_NOSNAP) {
821 num_head_items++;
822
823 if (dn->get_linkage()->is_primary()) {
824 CInode *in = dn->get_linkage()->get_inode();
825 auto pi = in->get_projected_inode();
826 if (dn->get_linkage()->get_inode()->is_dir())
827 fnode.fragstat.nsubdirs++;
828 else
829 fnode.fragstat.nfiles++;
830 fnode.rstat.rbytes += pi->accounted_rstat.rbytes;
831 fnode.rstat.rfiles += pi->accounted_rstat.rfiles;
832 fnode.rstat.rsubdirs += pi->accounted_rstat.rsubdirs;
833 fnode.rstat.rsnaprealms += pi->accounted_rstat.rsnaprealms;
834 if (pi->accounted_rstat.rctime > fnode.rstat.rctime)
835 fnode.rstat.rctime = pi->accounted_rstat.rctime;
836
837 // move dirty inode rstat to new dirfrag
838 if (in->is_dirty_rstat())
839 dirty_rstat_inodes.push_back(&in->dirty_rstat_item);
840 } else if (dn->get_linkage()->is_remote()) {
841 if (dn->get_linkage()->get_remote_d_type() == DT_DIR)
842 fnode.fragstat.nsubdirs++;
843 else
844 fnode.fragstat.nfiles++;
845 }
846 } else {
847 num_snap_items++;
848 if (dn->get_linkage()->is_primary()) {
849 CInode *in = dn->get_linkage()->get_inode();
850 if (in->is_dirty_rstat())
851 dirty_rstat_inodes.push_back(&in->dirty_rstat_item);
852 }
853 }
854
855 if (dn->auth_pins || dn->nested_auth_pins) {
856 // use the helpers here to maintain the auth_pin invariants on the dir inode
857 int ap = dn->get_num_auth_pins() + dn->get_num_nested_auth_pins();
858 int dap = dn->get_num_dir_auth_pins();
859 assert(dap <= ap);
860 adjust_nested_auth_pins(ap, dap, NULL);
861 dn->dir->adjust_nested_auth_pins(-ap, -dap, NULL);
862 }
863
864 if (dn->is_dirty()) {
865 dirty_dentries.push_back(&dn->item_dir_dirty);
866 num_dirty++;
867 }
868
869 dn->dir = this;
870 }
871
872 void CDir::prepare_old_fragment(map<string_snap_t, std::list<MDSInternalContextBase*> >& dentry_waiters, bool replay)
873 {
874 // auth_pin old fragment for duration so that any auth_pinning
875 // during the dentry migration doesn't trigger side effects
876 if (!replay && is_auth())
877 auth_pin(this);
878
879 if (!waiting_on_dentry.empty()) {
880 for (const auto &p : waiting_on_dentry) {
881 auto &e = dentry_waiters[p.first];
882 for (const auto &waiter : p.second) {
883 e.push_back(waiter);
884 }
885 }
886 waiting_on_dentry.clear();
887 put(PIN_DNWAITER);
888 }
889 }
890
891 void CDir::prepare_new_fragment(bool replay)
892 {
893 if (!replay && is_auth()) {
894 _freeze_dir();
895 mark_complete();
896 }
897 inode->add_dirfrag(this);
898 }
899
900 void CDir::finish_old_fragment(list<MDSInternalContextBase*>& waiters, bool replay)
901 {
902 // take waiters _before_ unfreeze...
903 if (!replay) {
904 take_waiting(WAIT_ANY_MASK, waiters);
905 if (is_auth()) {
906 auth_unpin(this); // pinned in prepare_old_fragment
907 assert(is_frozen_dir());
908 unfreeze_dir();
909 }
910 }
911
912 assert(nested_auth_pins == 0);
913 assert(dir_auth_pins == 0);
914 assert(auth_pins == 0);
915
916 num_head_items = num_head_null = 0;
917 num_snap_items = num_snap_null = 0;
918
919 // this mirrors init_fragment_pins()
920 if (is_auth())
921 clear_replica_map();
922 if (is_dirty())
923 mark_clean();
924 if (state_test(STATE_IMPORTBOUND))
925 put(PIN_IMPORTBOUND);
926 if (state_test(STATE_EXPORTBOUND))
927 put(PIN_EXPORTBOUND);
928 if (is_subtree_root())
929 put(PIN_SUBTREE);
930
931 if (auth_pins > 0)
932 put(PIN_AUTHPIN);
933
934 assert(get_num_ref() == (state_test(STATE_STICKY) ? 1:0));
935 }
936
937 void CDir::init_fragment_pins()
938 {
939 if (is_replicated())
940 get(PIN_REPLICATED);
941 if (state_test(STATE_DIRTY))
942 get(PIN_DIRTY);
943 if (state_test(STATE_EXPORTBOUND))
944 get(PIN_EXPORTBOUND);
945 if (state_test(STATE_IMPORTBOUND))
946 get(PIN_IMPORTBOUND);
947 if (is_subtree_root())
948 get(PIN_SUBTREE);
949 }
950
951 void CDir::split(int bits, list<CDir*>& subs, list<MDSInternalContextBase*>& waiters, bool replay)
952 {
953 dout(10) << "split by " << bits << " bits on " << *this << dendl;
954
955 assert(replay || is_complete() || !is_auth());
956
957 list<frag_t> frags;
958 frag.split(bits, frags);
959
960 vector<CDir*> subfrags(1 << bits);
961
962 double fac = 1.0 / (double)(1 << bits); // for scaling load vecs
963
964 version_t rstat_version = inode->get_projected_inode()->rstat.version;
965 version_t dirstat_version = inode->get_projected_inode()->dirstat.version;
966
967 nest_info_t rstatdiff;
968 frag_info_t fragstatdiff;
969 if (fnode.accounted_rstat.version == rstat_version)
970 rstatdiff.add_delta(fnode.accounted_rstat, fnode.rstat);
971 if (fnode.accounted_fragstat.version == dirstat_version)
972 fragstatdiff.add_delta(fnode.accounted_fragstat, fnode.fragstat);
973 dout(10) << " rstatdiff " << rstatdiff << " fragstatdiff " << fragstatdiff << dendl;
974
975 map<string_snap_t, std::list<MDSInternalContextBase*> > dentry_waiters;
976 prepare_old_fragment(dentry_waiters, replay);
977
978 // create subfrag dirs
979 int n = 0;
980 for (list<frag_t>::iterator p = frags.begin(); p != frags.end(); ++p) {
981 CDir *f = new CDir(inode, *p, cache, is_auth());
982 f->state_set(state & (MASK_STATE_FRAGMENT_KEPT | STATE_COMPLETE));
983 f->get_replicas() = get_replicas();
984 f->dir_auth = dir_auth;
985 f->init_fragment_pins();
986 f->set_version(get_version());
987
988 f->pop_me = pop_me;
989 f->pop_me.scale(fac);
990
991 // FIXME; this is an approximation
992 f->pop_nested = pop_nested;
993 f->pop_nested.scale(fac);
994 f->pop_auth_subtree = pop_auth_subtree;
995 f->pop_auth_subtree.scale(fac);
996 f->pop_auth_subtree_nested = pop_auth_subtree_nested;
997 f->pop_auth_subtree_nested.scale(fac);
998
999 dout(10) << " subfrag " << *p << " " << *f << dendl;
1000 subfrags[n++] = f;
1001 subs.push_back(f);
1002
1003 f->set_dir_auth(get_dir_auth());
1004 f->prepare_new_fragment(replay);
1005 }
1006
1007 // repartition dentries
1008 while (!items.empty()) {
1009 auto p = items.begin();
1010
1011 CDentry *dn = p->second;
1012 frag_t subfrag = inode->pick_dirfrag(dn->get_name());
1013 int n = (subfrag.value() & (subfrag.mask() ^ frag.mask())) >> subfrag.mask_shift();
1014 dout(15) << " subfrag " << subfrag << " n=" << n << " for " << p->first << dendl;
1015 CDir *f = subfrags[n];
1016 f->steal_dentry(dn);
1017 }
1018
1019 for (const auto &p : dentry_waiters) {
1020 frag_t subfrag = inode->pick_dirfrag(p.first.name);
1021 int n = (subfrag.value() & (subfrag.mask() ^ frag.mask())) >> subfrag.mask_shift();
1022 CDir *f = subfrags[n];
1023
1024 if (f->waiting_on_dentry.empty())
1025 f->get(PIN_DNWAITER);
1026 auto &e = f->waiting_on_dentry[p.first];
1027 for (const auto &waiter : p.second) {
1028 e.push_back(waiter);
1029 }
1030 }
1031
1032 // FIXME: handle dirty old rstat
1033
1034 // fix up new frag fragstats
1035 for (int i=0; i<n; i++) {
1036 CDir *f = subfrags[i];
1037 f->fnode.rstat.version = rstat_version;
1038 f->fnode.accounted_rstat = f->fnode.rstat;
1039 f->fnode.fragstat.version = dirstat_version;
1040 f->fnode.accounted_fragstat = f->fnode.fragstat;
1041 dout(10) << " rstat " << f->fnode.rstat << " fragstat " << f->fnode.fragstat
1042 << " on " << *f << dendl;
1043 }
1044
1045 // give any outstanding frag stat differential to first frag
1046 dout(10) << " giving rstatdiff " << rstatdiff << " fragstatdiff" << fragstatdiff
1047 << " to " << *subfrags[0] << dendl;
1048 subfrags[0]->fnode.accounted_rstat.add(rstatdiff);
1049 subfrags[0]->fnode.accounted_fragstat.add(fragstatdiff);
1050
1051 finish_old_fragment(waiters, replay);
1052 }
1053
1054 void CDir::merge(list<CDir*>& subs, list<MDSInternalContextBase*>& waiters, bool replay)
1055 {
1056 dout(10) << "merge " << subs << dendl;
1057
1058 mds_authority_t new_auth = CDIR_AUTH_DEFAULT;
1059 for (auto dir : subs) {
1060 if (dir->get_dir_auth() != CDIR_AUTH_DEFAULT &&
1061 dir->get_dir_auth() != new_auth) {
1062 assert(new_auth == CDIR_AUTH_DEFAULT);
1063 new_auth = dir->get_dir_auth();
1064 }
1065 }
1066
1067 set_dir_auth(new_auth);
1068 prepare_new_fragment(replay);
1069
1070 nest_info_t rstatdiff;
1071 frag_info_t fragstatdiff;
1072 bool touched_mtime, touched_chattr;
1073 version_t rstat_version = inode->get_projected_inode()->rstat.version;
1074 version_t dirstat_version = inode->get_projected_inode()->dirstat.version;
1075
1076 map<string_snap_t, std::list<MDSInternalContextBase*> > dentry_waiters;
1077
1078 for (auto dir : subs) {
1079 dout(10) << " subfrag " << dir->get_frag() << " " << *dir << dendl;
1080 assert(!dir->is_auth() || dir->is_complete() || replay);
1081
1082 if (dir->fnode.accounted_rstat.version == rstat_version)
1083 rstatdiff.add_delta(dir->fnode.accounted_rstat, dir->fnode.rstat);
1084 if (dir->fnode.accounted_fragstat.version == dirstat_version)
1085 fragstatdiff.add_delta(dir->fnode.accounted_fragstat, dir->fnode.fragstat,
1086 &touched_mtime, &touched_chattr);
1087
1088 dir->prepare_old_fragment(dentry_waiters, replay);
1089
1090 // steal dentries
1091 while (!dir->items.empty())
1092 steal_dentry(dir->items.begin()->second);
1093
1094 // merge replica map
1095 for (const auto &p : dir->get_replicas()) {
1096 unsigned cur = get_replicas()[p.first];
1097 if (p.second > cur)
1098 get_replicas()[p.first] = p.second;
1099 }
1100
1101 // merge version
1102 if (dir->get_version() > get_version())
1103 set_version(dir->get_version());
1104
1105 // merge state
1106 state_set(dir->get_state() & MASK_STATE_FRAGMENT_KEPT);
1107
1108 dir->finish_old_fragment(waiters, replay);
1109 inode->close_dirfrag(dir->get_frag());
1110 }
1111
1112 if (!dentry_waiters.empty()) {
1113 get(PIN_DNWAITER);
1114 for (const auto &p : dentry_waiters) {
1115 auto &e = waiting_on_dentry[p.first];
1116 for (const auto &waiter : p.second) {
1117 e.push_back(waiter);
1118 }
1119 }
1120 }
1121
1122 if (is_auth() && !replay)
1123 mark_complete();
1124
1125 // FIXME: merge dirty old rstat
1126 fnode.rstat.version = rstat_version;
1127 fnode.accounted_rstat = fnode.rstat;
1128 fnode.accounted_rstat.add(rstatdiff);
1129
1130 fnode.fragstat.version = dirstat_version;
1131 fnode.accounted_fragstat = fnode.fragstat;
1132 fnode.accounted_fragstat.add(fragstatdiff);
1133
1134 init_fragment_pins();
1135 }
1136
1137
1138
1139
1140 void CDir::resync_accounted_fragstat()
1141 {
1142 fnode_t *pf = get_projected_fnode();
1143 auto pi = inode->get_projected_inode();
1144
1145 if (pf->accounted_fragstat.version != pi->dirstat.version) {
1146 pf->fragstat.version = pi->dirstat.version;
1147 dout(10) << "resync_accounted_fragstat " << pf->accounted_fragstat << " -> " << pf->fragstat << dendl;
1148 pf->accounted_fragstat = pf->fragstat;
1149 }
1150 }
1151
1152 /*
1153 * resync rstat and accounted_rstat with inode
1154 */
1155 void CDir::resync_accounted_rstat()
1156 {
1157 fnode_t *pf = get_projected_fnode();
1158 auto pi = inode->get_projected_inode();
1159
1160 if (pf->accounted_rstat.version != pi->rstat.version) {
1161 pf->rstat.version = pi->rstat.version;
1162 dout(10) << "resync_accounted_rstat " << pf->accounted_rstat << " -> " << pf->rstat << dendl;
1163 pf->accounted_rstat = pf->rstat;
1164 dirty_old_rstat.clear();
1165 }
1166 }
1167
1168 void CDir::assimilate_dirty_rstat_inodes()
1169 {
1170 dout(10) << "assimilate_dirty_rstat_inodes" << dendl;
1171 for (elist<CInode*>::iterator p = dirty_rstat_inodes.begin_use_current();
1172 !p.end(); ++p) {
1173 CInode *in = *p;
1174 assert(in->is_auth());
1175 if (in->is_frozen())
1176 continue;
1177
1178 auto &pi = in->project_inode();
1179 pi.inode.version = in->pre_dirty();
1180
1181 inode->mdcache->project_rstat_inode_to_frag(in, this, 0, 0, NULL);
1182 }
1183 state_set(STATE_ASSIMRSTAT);
1184 dout(10) << "assimilate_dirty_rstat_inodes done" << dendl;
1185 }
1186
1187 void CDir::assimilate_dirty_rstat_inodes_finish(MutationRef& mut, EMetaBlob *blob)
1188 {
1189 if (!state_test(STATE_ASSIMRSTAT))
1190 return;
1191 state_clear(STATE_ASSIMRSTAT);
1192 dout(10) << "assimilate_dirty_rstat_inodes_finish" << dendl;
1193 elist<CInode*>::iterator p = dirty_rstat_inodes.begin_use_current();
1194 while (!p.end()) {
1195 CInode *in = *p;
1196 ++p;
1197
1198 if (in->is_frozen())
1199 continue;
1200
1201 CDentry *dn = in->get_projected_parent_dn();
1202
1203 mut->auth_pin(in);
1204 mut->add_projected_inode(in);
1205
1206 in->clear_dirty_rstat();
1207 blob->add_primary_dentry(dn, in, true);
1208 }
1209
1210 if (!dirty_rstat_inodes.empty())
1211 inode->mdcache->mds->locker->mark_updated_scatterlock(&inode->nestlock);
1212 }
1213
1214
1215
1216
1217 /****************************************
1218 * WAITING
1219 */
1220
1221 void CDir::add_dentry_waiter(boost::string_view dname, snapid_t snapid, MDSInternalContextBase *c)
1222 {
1223 if (waiting_on_dentry.empty())
1224 get(PIN_DNWAITER);
1225 waiting_on_dentry[string_snap_t(dname, snapid)].push_back(c);
1226 dout(10) << "add_dentry_waiter dentry " << dname
1227 << " snap " << snapid
1228 << " " << c << " on " << *this << dendl;
1229 }
1230
1231 void CDir::take_dentry_waiting(boost::string_view dname, snapid_t first, snapid_t last,
1232 list<MDSInternalContextBase*>& ls)
1233 {
1234 if (waiting_on_dentry.empty())
1235 return;
1236
1237 string_snap_t lb(dname, first);
1238 string_snap_t ub(dname, last);
1239 auto it = waiting_on_dentry.lower_bound(lb);
1240 while (it != waiting_on_dentry.end() &&
1241 !(ub < it->first)) {
1242 dout(10) << "take_dentry_waiting dentry " << dname
1243 << " [" << first << "," << last << "] found waiter on snap "
1244 << it->first.snapid
1245 << " on " << *this << dendl;
1246 for (const auto &waiter : it->second) {
1247 ls.push_back(waiter);
1248 }
1249 waiting_on_dentry.erase(it++);
1250 }
1251
1252 if (waiting_on_dentry.empty())
1253 put(PIN_DNWAITER);
1254 }
1255
1256 void CDir::take_sub_waiting(list<MDSInternalContextBase*>& ls)
1257 {
1258 dout(10) << "take_sub_waiting" << dendl;
1259 if (!waiting_on_dentry.empty()) {
1260 for (const auto &p : waiting_on_dentry) {
1261 for (const auto &waiter : p.second) {
1262 ls.push_back(waiter);
1263 }
1264 }
1265 waiting_on_dentry.clear();
1266 put(PIN_DNWAITER);
1267 }
1268 }
1269
1270
1271
1272 void CDir::add_waiter(uint64_t tag, MDSInternalContextBase *c)
1273 {
1274 // hierarchical?
1275
1276 // at free root?
1277 if (tag & WAIT_ATFREEZEROOT) {
1278 if (!(is_freezing_tree_root() || is_frozen_tree_root() ||
1279 is_freezing_dir() || is_frozen_dir())) {
1280 // try parent
1281 dout(10) << "add_waiter " << std::hex << tag << std::dec << " " << c << " should be ATFREEZEROOT, " << *this << " is not root, trying parent" << dendl;
1282 inode->parent->dir->add_waiter(tag, c);
1283 return;
1284 }
1285 }
1286
1287 // at subtree root?
1288 if (tag & WAIT_ATSUBTREEROOT) {
1289 if (!is_subtree_root()) {
1290 // try parent
1291 dout(10) << "add_waiter " << std::hex << tag << std::dec << " " << c << " should be ATSUBTREEROOT, " << *this << " is not root, trying parent" << dendl;
1292 inode->parent->dir->add_waiter(tag, c);
1293 return;
1294 }
1295 }
1296
1297 assert(!(tag & WAIT_CREATED) || state_test(STATE_CREATING));
1298
1299 MDSCacheObject::add_waiter(tag, c);
1300 }
1301
1302
1303
1304 /* NOTE: this checks dentry waiters too */
1305 void CDir::take_waiting(uint64_t mask, list<MDSInternalContextBase*>& ls)
1306 {
1307 if ((mask & WAIT_DENTRY) && !waiting_on_dentry.empty()) {
1308 // take all dentry waiters
1309 for (const auto &p : waiting_on_dentry) {
1310 dout(10) << "take_waiting dentry " << p.first.name
1311 << " snap " << p.first.snapid << " on " << *this << dendl;
1312 for (const auto &waiter : p.second) {
1313 ls.push_back(waiter);
1314 }
1315 }
1316 waiting_on_dentry.clear();
1317 put(PIN_DNWAITER);
1318 }
1319
1320 // waiting
1321 MDSCacheObject::take_waiting(mask, ls);
1322 }
1323
1324
1325 void CDir::finish_waiting(uint64_t mask, int result)
1326 {
1327 dout(11) << "finish_waiting mask " << hex << mask << dec << " result " << result << " on " << *this << dendl;
1328
1329 list<MDSInternalContextBase*> finished;
1330 take_waiting(mask, finished);
1331 if (result < 0)
1332 finish_contexts(g_ceph_context, finished, result);
1333 else
1334 cache->mds->queue_waiters(finished);
1335 }
1336
1337
1338
1339 // dirty/clean
1340
1341 fnode_t *CDir::project_fnode()
1342 {
1343 assert(get_version() != 0);
1344 projected_fnode.emplace_back(*get_projected_fnode());
1345 auto &p = projected_fnode.back();
1346
1347 if (scrub_infop && scrub_infop->last_scrub_dirty) {
1348 p.localized_scrub_stamp = scrub_infop->last_local.time;
1349 p.localized_scrub_version = scrub_infop->last_local.version;
1350 p.recursive_scrub_stamp = scrub_infop->last_recursive.time;
1351 p.recursive_scrub_version = scrub_infop->last_recursive.version;
1352 scrub_infop->last_scrub_dirty = false;
1353 scrub_maybe_delete_info();
1354 }
1355
1356 dout(10) << __func__ << " " << &p << dendl;
1357 return &p;
1358 }
1359
1360 void CDir::pop_and_dirty_projected_fnode(LogSegment *ls)
1361 {
1362 assert(!projected_fnode.empty());
1363 auto &front = projected_fnode.front();
1364 dout(15) << __func__ << " " << &front << " v" << front.version << dendl;
1365 fnode = front;
1366 _mark_dirty(ls);
1367 projected_fnode.pop_front();
1368 }
1369
1370
1371 version_t CDir::pre_dirty(version_t min)
1372 {
1373 if (min > projected_version)
1374 projected_version = min;
1375 ++projected_version;
1376 dout(10) << "pre_dirty " << projected_version << dendl;
1377 return projected_version;
1378 }
1379
1380 void CDir::mark_dirty(version_t pv, LogSegment *ls)
1381 {
1382 assert(get_version() < pv);
1383 assert(pv <= projected_version);
1384 fnode.version = pv;
1385 _mark_dirty(ls);
1386 }
1387
1388 void CDir::_mark_dirty(LogSegment *ls)
1389 {
1390 if (!state_test(STATE_DIRTY)) {
1391 dout(10) << "mark_dirty (was clean) " << *this << " version " << get_version() << dendl;
1392 _set_dirty_flag();
1393 assert(ls);
1394 } else {
1395 dout(10) << "mark_dirty (already dirty) " << *this << " version " << get_version() << dendl;
1396 }
1397 if (ls) {
1398 ls->dirty_dirfrags.push_back(&item_dirty);
1399
1400 // if i've never committed, i need to be before _any_ mention of me is trimmed from the journal.
1401 if (committed_version == 0 && !item_new.is_on_list())
1402 ls->new_dirfrags.push_back(&item_new);
1403 }
1404 }
1405
1406 void CDir::mark_new(LogSegment *ls)
1407 {
1408 ls->new_dirfrags.push_back(&item_new);
1409 state_clear(STATE_CREATING);
1410
1411 list<MDSInternalContextBase*> waiters;
1412 take_waiting(CDir::WAIT_CREATED, waiters);
1413 cache->mds->queue_waiters(waiters);
1414 }
1415
1416 void CDir::mark_clean()
1417 {
1418 dout(10) << "mark_clean " << *this << " version " << get_version() << dendl;
1419 if (state_test(STATE_DIRTY)) {
1420 item_dirty.remove_myself();
1421 item_new.remove_myself();
1422
1423 state_clear(STATE_DIRTY);
1424 put(PIN_DIRTY);
1425 }
1426 }
1427
1428 // caller should hold auth pin of this
1429 void CDir::log_mark_dirty()
1430 {
1431 if (is_dirty() || projected_version > get_version())
1432 return; // noop if it is already dirty or will be dirty
1433
1434 version_t pv = pre_dirty();
1435 mark_dirty(pv, cache->mds->mdlog->get_current_segment());
1436 }
1437
1438 void CDir::mark_complete() {
1439 state_set(STATE_COMPLETE);
1440 bloom.reset();
1441 }
1442
1443 void CDir::first_get()
1444 {
1445 inode->get(CInode::PIN_DIRFRAG);
1446 }
1447
1448 void CDir::last_put()
1449 {
1450 inode->put(CInode::PIN_DIRFRAG);
1451 }
1452
1453
1454
1455 /******************************************************************************
1456 * FETCH and COMMIT
1457 */
1458
1459 // -----------------------
1460 // FETCH
1461 void CDir::fetch(MDSInternalContextBase *c, bool ignore_authpinnability)
1462 {
1463 string want;
1464 return fetch(c, want, ignore_authpinnability);
1465 }
1466
1467 void CDir::fetch(MDSInternalContextBase *c, boost::string_view want_dn, bool ignore_authpinnability)
1468 {
1469 dout(10) << "fetch on " << *this << dendl;
1470
1471 assert(is_auth());
1472 assert(!is_complete());
1473
1474 if (!can_auth_pin() && !ignore_authpinnability) {
1475 if (c) {
1476 dout(7) << "fetch waiting for authpinnable" << dendl;
1477 add_waiter(WAIT_UNFREEZE, c);
1478 } else
1479 dout(7) << "fetch not authpinnable and no context" << dendl;
1480 return;
1481 }
1482
1483 // unlinked directory inode shouldn't have any entry
1484 if (!inode->is_base() && get_parent_dir()->inode->is_stray() &&
1485 !inode->snaprealm) {
1486 dout(7) << "fetch dirfrag for unlinked directory, mark complete" << dendl;
1487 if (get_version() == 0) {
1488 assert(inode->is_auth());
1489 set_version(1);
1490
1491 if (state_test(STATE_REJOINUNDEF)) {
1492 assert(cache->mds->is_rejoin());
1493 state_clear(STATE_REJOINUNDEF);
1494 cache->opened_undef_dirfrag(this);
1495 }
1496 }
1497 mark_complete();
1498
1499 if (c)
1500 cache->mds->queue_waiter(c);
1501 return;
1502 }
1503
1504 if (c) add_waiter(WAIT_COMPLETE, c);
1505 if (!want_dn.empty()) wanted_items.insert(mempool::mds_co::string(want_dn));
1506
1507 // already fetching?
1508 if (state_test(CDir::STATE_FETCHING)) {
1509 dout(7) << "already fetching; waiting" << dendl;
1510 return;
1511 }
1512
1513 auth_pin(this);
1514 state_set(CDir::STATE_FETCHING);
1515
1516 if (cache->mds->logger) cache->mds->logger->inc(l_mds_dir_fetch);
1517
1518 std::set<dentry_key_t> empty;
1519 _omap_fetch(NULL, empty);
1520 }
1521
1522 void CDir::fetch(MDSInternalContextBase *c, const std::set<dentry_key_t>& keys)
1523 {
1524 dout(10) << "fetch " << keys.size() << " keys on " << *this << dendl;
1525
1526 assert(is_auth());
1527 assert(!is_complete());
1528
1529 if (!can_auth_pin()) {
1530 dout(7) << "fetch keys waiting for authpinnable" << dendl;
1531 add_waiter(WAIT_UNFREEZE, c);
1532 return;
1533 }
1534 if (state_test(CDir::STATE_FETCHING)) {
1535 dout(7) << "fetch keys waiting for full fetch" << dendl;
1536 add_waiter(WAIT_COMPLETE, c);
1537 return;
1538 }
1539
1540 auth_pin(this);
1541 if (cache->mds->logger) cache->mds->logger->inc(l_mds_dir_fetch);
1542
1543 _omap_fetch(c, keys);
1544 }
1545
1546 class C_IO_Dir_OMAP_FetchedMore : public CDirIOContext {
1547 MDSInternalContextBase *fin;
1548 public:
1549 bufferlist hdrbl;
1550 bool more = false;
1551 map<string, bufferlist> omap; ///< carry-over from before
1552 map<string, bufferlist> omap_more; ///< new batch
1553 int ret;
1554 C_IO_Dir_OMAP_FetchedMore(CDir *d, MDSInternalContextBase *f) :
1555 CDirIOContext(d), fin(f), ret(0) { }
1556 void finish(int r) {
1557 // merge results
1558 if (omap.empty()) {
1559 omap.swap(omap_more);
1560 } else {
1561 omap.insert(omap_more.begin(), omap_more.end());
1562 }
1563 if (more) {
1564 dir->_omap_fetch_more(hdrbl, omap, fin);
1565 } else {
1566 dir->_omap_fetched(hdrbl, omap, !fin, r);
1567 if (fin)
1568 fin->complete(r);
1569 }
1570 }
1571 };
1572
1573 class C_IO_Dir_OMAP_Fetched : public CDirIOContext {
1574 MDSInternalContextBase *fin;
1575 public:
1576 bufferlist hdrbl;
1577 bool more = false;
1578 map<string, bufferlist> omap;
1579 bufferlist btbl;
1580 int ret1, ret2, ret3;
1581
1582 C_IO_Dir_OMAP_Fetched(CDir *d, MDSInternalContextBase *f) :
1583 CDirIOContext(d), fin(f), ret1(0), ret2(0), ret3(0) { }
1584 void finish(int r) override {
1585 // check the correctness of backtrace
1586 if (r >= 0 && ret3 != -ECANCELED)
1587 dir->inode->verify_diri_backtrace(btbl, ret3);
1588 if (r >= 0) r = ret1;
1589 if (r >= 0) r = ret2;
1590 if (more) {
1591 dir->_omap_fetch_more(hdrbl, omap, fin);
1592 } else {
1593 dir->_omap_fetched(hdrbl, omap, !fin, r);
1594 if (fin)
1595 fin->complete(r);
1596 }
1597 }
1598 };
1599
1600 void CDir::_omap_fetch(MDSInternalContextBase *c, const std::set<dentry_key_t>& keys)
1601 {
1602 C_IO_Dir_OMAP_Fetched *fin = new C_IO_Dir_OMAP_Fetched(this, c);
1603 object_t oid = get_ondisk_object();
1604 object_locator_t oloc(cache->mds->mdsmap->get_metadata_pool());
1605 ObjectOperation rd;
1606 rd.omap_get_header(&fin->hdrbl, &fin->ret1);
1607 if (keys.empty()) {
1608 assert(!c);
1609 rd.omap_get_vals("", "", g_conf->mds_dir_keys_per_op,
1610 &fin->omap, &fin->more, &fin->ret2);
1611 } else {
1612 assert(c);
1613 std::set<std::string> str_keys;
1614 for (auto p : keys) {
1615 string str;
1616 p.encode(str);
1617 str_keys.insert(str);
1618 }
1619 rd.omap_get_vals_by_keys(str_keys, &fin->omap, &fin->ret2);
1620 }
1621 // check the correctness of backtrace
1622 if (g_conf->mds_verify_backtrace > 0 && frag == frag_t()) {
1623 rd.getxattr("parent", &fin->btbl, &fin->ret3);
1624 rd.set_last_op_flags(CEPH_OSD_OP_FLAG_FAILOK);
1625 } else {
1626 fin->ret3 = -ECANCELED;
1627 }
1628
1629 cache->mds->objecter->read(oid, oloc, rd, CEPH_NOSNAP, NULL, 0,
1630 new C_OnFinisher(fin, cache->mds->finisher));
1631 }
1632
1633 void CDir::_omap_fetch_more(
1634 bufferlist& hdrbl,
1635 map<string, bufferlist>& omap,
1636 MDSInternalContextBase *c)
1637 {
1638 // we have more omap keys to fetch!
1639 object_t oid = get_ondisk_object();
1640 object_locator_t oloc(cache->mds->mdsmap->get_metadata_pool());
1641 C_IO_Dir_OMAP_FetchedMore *fin = new C_IO_Dir_OMAP_FetchedMore(this, c);
1642 fin->hdrbl.claim(hdrbl);
1643 fin->omap.swap(omap);
1644 ObjectOperation rd;
1645 rd.omap_get_vals(fin->omap.rbegin()->first,
1646 "", /* filter prefix */
1647 g_conf->mds_dir_keys_per_op,
1648 &fin->omap_more,
1649 &fin->more,
1650 &fin->ret);
1651 cache->mds->objecter->read(oid, oloc, rd, CEPH_NOSNAP, NULL, 0,
1652 new C_OnFinisher(fin, cache->mds->finisher));
1653 }
1654
1655 CDentry *CDir::_load_dentry(
1656 boost::string_view key,
1657 boost::string_view dname,
1658 const snapid_t last,
1659 bufferlist &bl,
1660 const int pos,
1661 const std::set<snapid_t> *snaps,
1662 bool *force_dirty,
1663 list<CInode*> *undef_inodes)
1664 {
1665 bufferlist::iterator q = bl.begin();
1666
1667 snapid_t first;
1668 ::decode(first, q);
1669
1670 // marker
1671 char type;
1672 ::decode(type, q);
1673
1674 dout(20) << "_fetched pos " << pos << " marker '" << type << "' dname '" << dname
1675 << " [" << first << "," << last << "]"
1676 << dendl;
1677
1678 bool stale = false;
1679 if (snaps && last != CEPH_NOSNAP) {
1680 set<snapid_t>::const_iterator p = snaps->lower_bound(first);
1681 if (p == snaps->end() || *p > last) {
1682 dout(10) << " skipping stale dentry on [" << first << "," << last << "]" << dendl;
1683 stale = true;
1684 }
1685 }
1686
1687 /*
1688 * look for existing dentry for _last_ snap, because unlink +
1689 * create may leave a "hole" (epochs during which the dentry
1690 * doesn't exist) but for which no explicit negative dentry is in
1691 * the cache.
1692 */
1693 CDentry *dn;
1694 if (stale)
1695 dn = lookup_exact_snap(dname, last);
1696 else
1697 dn = lookup(dname, last);
1698
1699 if (type == 'L') {
1700 // hard link
1701 inodeno_t ino;
1702 unsigned char d_type;
1703 ::decode(ino, q);
1704 ::decode(d_type, q);
1705
1706 if (stale) {
1707 if (!dn) {
1708 stale_items.insert(mempool::mds_co::string(key));
1709 *force_dirty = true;
1710 }
1711 return dn;
1712 }
1713
1714 if (dn) {
1715 if (dn->get_linkage()->get_inode() == 0) {
1716 dout(12) << "_fetched had NEG dentry " << *dn << dendl;
1717 } else {
1718 dout(12) << "_fetched had dentry " << *dn << dendl;
1719 }
1720 } else {
1721 // (remote) link
1722 dn = add_remote_dentry(dname, ino, d_type, first, last);
1723
1724 // link to inode?
1725 CInode *in = cache->get_inode(ino); // we may or may not have it.
1726 if (in) {
1727 dn->link_remote(dn->get_linkage(), in);
1728 dout(12) << "_fetched got remote link " << ino << " which we have " << *in << dendl;
1729 } else {
1730 dout(12) << "_fetched got remote link " << ino << " (dont' have it)" << dendl;
1731 }
1732 }
1733 }
1734 else if (type == 'I') {
1735 // inode
1736
1737 // Load inode data before looking up or constructing CInode
1738 InodeStore inode_data;
1739 inode_data.decode_bare(q);
1740
1741 if (stale) {
1742 if (!dn) {
1743 stale_items.insert(mempool::mds_co::string(key));
1744 *force_dirty = true;
1745 }
1746 return dn;
1747 }
1748
1749 bool undef_inode = false;
1750 if (dn) {
1751 CInode *in = dn->get_linkage()->get_inode();
1752 if (in) {
1753 dout(12) << "_fetched had dentry " << *dn << dendl;
1754 if (in->state_test(CInode::STATE_REJOINUNDEF)) {
1755 undef_inodes->push_back(in);
1756 undef_inode = true;
1757 }
1758 } else
1759 dout(12) << "_fetched had NEG dentry " << *dn << dendl;
1760 }
1761
1762 if (!dn || undef_inode) {
1763 // add inode
1764 CInode *in = cache->get_inode(inode_data.inode.ino, last);
1765 if (!in || undef_inode) {
1766 if (undef_inode && in)
1767 in->first = first;
1768 else
1769 in = new CInode(cache, true, first, last);
1770
1771 in->inode = inode_data.inode;
1772 // symlink?
1773 if (in->is_symlink())
1774 in->symlink = inode_data.symlink;
1775
1776 in->dirfragtree.swap(inode_data.dirfragtree);
1777 in->xattrs.swap(inode_data.xattrs);
1778 in->old_inodes.swap(inode_data.old_inodes);
1779 if (!in->old_inodes.empty()) {
1780 snapid_t min_first = in->old_inodes.rbegin()->first + 1;
1781 if (min_first > in->first)
1782 in->first = min_first;
1783 }
1784
1785 in->oldest_snap = inode_data.oldest_snap;
1786 in->decode_snap_blob(inode_data.snap_blob);
1787 if (snaps && !in->snaprealm)
1788 in->purge_stale_snap_data(*snaps);
1789
1790 if (!undef_inode) {
1791 cache->add_inode(in); // add
1792 dn = add_primary_dentry(dname, in, first, last); // link
1793 }
1794 dout(12) << "_fetched got " << *dn << " " << *in << dendl;
1795
1796 if (in->inode.is_dirty_rstat())
1797 in->mark_dirty_rstat();
1798
1799 //in->hack_accessed = false;
1800 //in->hack_load_stamp = ceph_clock_now();
1801 //num_new_inodes_loaded++;
1802 } else if (g_conf->get_val<bool>("mds_hack_allow_loading_invalid_metadata")) {
1803 dout(20) << "hack: adding duplicate dentry for " << *in << dendl;
1804 dn = add_primary_dentry(dname, in, first, last);
1805 } else {
1806 dout(0) << "_fetched badness: got (but i already had) " << *in
1807 << " mode " << in->inode.mode
1808 << " mtime " << in->inode.mtime << dendl;
1809 string dirpath, inopath;
1810 this->inode->make_path_string(dirpath);
1811 in->make_path_string(inopath);
1812 cache->mds->clog->error() << "loaded dup inode " << inode_data.inode.ino
1813 << " [" << first << "," << last << "] v" << inode_data.inode.version
1814 << " at " << dirpath << "/" << dname
1815 << ", but inode " << in->vino() << " v" << in->inode.version
1816 << " already exists at " << inopath;
1817 return dn;
1818 }
1819 }
1820 } else {
1821 std::ostringstream oss;
1822 oss << "Invalid tag char '" << type << "' pos " << pos;
1823 throw buffer::malformed_input(oss.str());
1824 }
1825
1826 return dn;
1827 }
1828
1829 void CDir::_omap_fetched(bufferlist& hdrbl, map<string, bufferlist>& omap,
1830 bool complete, int r)
1831 {
1832 LogChannelRef clog = cache->mds->clog;
1833 dout(10) << "_fetched header " << hdrbl.length() << " bytes "
1834 << omap.size() << " keys for " << *this << dendl;
1835
1836 assert(r == 0 || r == -ENOENT || r == -ENODATA);
1837 assert(is_auth());
1838 assert(!is_frozen());
1839
1840 if (hdrbl.length() == 0) {
1841 dout(0) << "_fetched missing object for " << *this << dendl;
1842
1843 clog->error() << "dir " << dirfrag() << " object missing on disk; some "
1844 "files may be lost (" << get_path() << ")";
1845
1846 go_bad(complete);
1847 return;
1848 }
1849
1850 fnode_t got_fnode;
1851 {
1852 bufferlist::iterator p = hdrbl.begin();
1853 try {
1854 ::decode(got_fnode, p);
1855 } catch (const buffer::error &err) {
1856 derr << "Corrupt fnode in dirfrag " << dirfrag()
1857 << ": " << err << dendl;
1858 clog->warn() << "Corrupt fnode header in " << dirfrag() << ": "
1859 << err << " (" << get_path() << ")";
1860 go_bad(complete);
1861 return;
1862 }
1863 if (!p.end()) {
1864 clog->warn() << "header buffer of dir " << dirfrag() << " has "
1865 << hdrbl.length() - p.get_off() << " extra bytes ("
1866 << get_path() << ")";
1867 go_bad(complete);
1868 return;
1869 }
1870 }
1871
1872 dout(10) << "_fetched version " << got_fnode.version << dendl;
1873
1874 // take the loaded fnode?
1875 // only if we are a fresh CDir* with no prior state.
1876 if (get_version() == 0) {
1877 assert(!is_projected());
1878 assert(!state_test(STATE_COMMITTING));
1879 fnode = got_fnode;
1880 projected_version = committing_version = committed_version = got_fnode.version;
1881
1882 if (state_test(STATE_REJOINUNDEF)) {
1883 assert(cache->mds->is_rejoin());
1884 state_clear(STATE_REJOINUNDEF);
1885 cache->opened_undef_dirfrag(this);
1886 }
1887 }
1888
1889 list<CInode*> undef_inodes;
1890
1891 // purge stale snaps?
1892 // only if we have past_parents open!
1893 bool force_dirty = false;
1894 const set<snapid_t> *snaps = NULL;
1895 SnapRealm *realm = inode->find_snaprealm();
1896 if (!realm->have_past_parents_open()) {
1897 dout(10) << " no snap purge, one or more past parents NOT open" << dendl;
1898 } else if (fnode.snap_purged_thru < realm->get_last_destroyed()) {
1899 snaps = &realm->get_snaps();
1900 dout(10) << " snap_purged_thru " << fnode.snap_purged_thru
1901 << " < " << realm->get_last_destroyed()
1902 << ", snap purge based on " << *snaps << dendl;
1903 if (get_num_snap_items() == 0) {
1904 fnode.snap_purged_thru = realm->get_last_destroyed();
1905 force_dirty = true;
1906 }
1907 }
1908
1909 unsigned pos = omap.size() - 1;
1910 for (map<string, bufferlist>::reverse_iterator p = omap.rbegin();
1911 p != omap.rend();
1912 ++p, --pos) {
1913 string dname;
1914 snapid_t last;
1915 dentry_key_t::decode_helper(p->first, dname, last);
1916
1917 CDentry *dn = NULL;
1918 try {
1919 dn = _load_dentry(
1920 p->first, dname, last, p->second, pos, snaps,
1921 &force_dirty, &undef_inodes);
1922 } catch (const buffer::error &err) {
1923 cache->mds->clog->warn() << "Corrupt dentry '" << dname << "' in "
1924 "dir frag " << dirfrag() << ": "
1925 << err << "(" << get_path() << ")";
1926
1927 // Remember that this dentry is damaged. Subsequent operations
1928 // that try to act directly on it will get their EIOs, but this
1929 // dirfrag as a whole will continue to look okay (minus the
1930 // mysteriously-missing dentry)
1931 go_bad_dentry(last, dname);
1932
1933 // Anyone who was WAIT_DENTRY for this guy will get kicked
1934 // to RetryRequest, and hit the DamageTable-interrogating path.
1935 // Stats will now be bogus because we will think we're complete,
1936 // but have 1 or more missing dentries.
1937 continue;
1938 }
1939
1940 if (dn && (wanted_items.count(mempool::mds_co::string(boost::string_view(dname))) > 0 || !complete)) {
1941 dout(10) << " touching wanted dn " << *dn << dendl;
1942 inode->mdcache->touch_dentry(dn);
1943 }
1944
1945 /** clean underwater item?
1946 * Underwater item is something that is dirty in our cache from
1947 * journal replay, but was previously flushed to disk before the
1948 * mds failed.
1949 *
1950 * We only do this is committed_version == 0. that implies either
1951 * - this is a fetch after from a clean/empty CDir is created
1952 * (and has no effect, since the dn won't exist); or
1953 * - this is a fetch after _recovery_, which is what we're worried
1954 * about. Items that are marked dirty from the journal should be
1955 * marked clean if they appear on disk.
1956 */
1957 if (committed_version == 0 &&
1958 dn &&
1959 dn->get_version() <= got_fnode.version &&
1960 dn->is_dirty()) {
1961 dout(10) << "_fetched had underwater dentry " << *dn << ", marking clean" << dendl;
1962 dn->mark_clean();
1963
1964 if (dn->get_linkage()->is_primary()) {
1965 assert(dn->get_linkage()->get_inode()->get_version() <= got_fnode.version);
1966 dout(10) << "_fetched had underwater inode " << *dn->get_linkage()->get_inode() << ", marking clean" << dendl;
1967 dn->get_linkage()->get_inode()->mark_clean();
1968 }
1969 }
1970 }
1971
1972 //cache->mds->logger->inc("newin", num_new_inodes_loaded);
1973
1974 // mark complete, !fetching
1975 if (complete) {
1976 wanted_items.clear();
1977 mark_complete();
1978 state_clear(STATE_FETCHING);
1979
1980 if (scrub_infop && scrub_infop->need_scrub_local) {
1981 scrub_infop->need_scrub_local = false;
1982 scrub_local();
1983 }
1984 }
1985
1986 // open & force frags
1987 while (!undef_inodes.empty()) {
1988 CInode *in = undef_inodes.front();
1989 undef_inodes.pop_front();
1990 in->state_clear(CInode::STATE_REJOINUNDEF);
1991 cache->opened_undef_inode(in);
1992 }
1993
1994 // dirty myself to remove stale snap dentries
1995 if (force_dirty && !inode->mdcache->is_readonly())
1996 log_mark_dirty();
1997
1998 auth_unpin(this);
1999
2000 if (complete) {
2001 // kick waiters
2002 finish_waiting(WAIT_COMPLETE, 0);
2003 }
2004 }
2005
2006 void CDir::_go_bad()
2007 {
2008 if (get_version() == 0)
2009 set_version(1);
2010 state_set(STATE_BADFRAG);
2011 // mark complete, !fetching
2012 mark_complete();
2013 state_clear(STATE_FETCHING);
2014 auth_unpin(this);
2015
2016 // kick waiters
2017 finish_waiting(WAIT_COMPLETE, -EIO);
2018 }
2019
2020 void CDir::go_bad_dentry(snapid_t last, boost::string_view dname)
2021 {
2022 dout(10) << __func__ << " " << dname << dendl;
2023 std::string path(get_path());
2024 path += "/";
2025 path += std::string(dname);
2026 const bool fatal = cache->mds->damage_table.notify_dentry(
2027 inode->ino(), frag, last, dname, path);
2028 if (fatal) {
2029 cache->mds->damaged();
2030 ceph_abort(); // unreachable, damaged() respawns us
2031 }
2032 }
2033
2034 void CDir::go_bad(bool complete)
2035 {
2036 dout(10) << "go_bad " << frag << dendl;
2037 const bool fatal = cache->mds->damage_table.notify_dirfrag(
2038 inode->ino(), frag, get_path());
2039 if (fatal) {
2040 cache->mds->damaged();
2041 ceph_abort(); // unreachable, damaged() respawns us
2042 }
2043
2044 if (complete)
2045 _go_bad();
2046 else
2047 auth_unpin(this);
2048 }
2049
2050 // -----------------------
2051 // COMMIT
2052
2053 /**
2054 * commit
2055 *
2056 * @param want - min version i want committed
2057 * @param c - callback for completion
2058 */
2059 void CDir::commit(version_t want, MDSInternalContextBase *c, bool ignore_authpinnability, int op_prio)
2060 {
2061 dout(10) << "commit want " << want << " on " << *this << dendl;
2062 if (want == 0) want = get_version();
2063
2064 // preconditions
2065 assert(want <= get_version() || get_version() == 0); // can't commit the future
2066 assert(want > committed_version); // the caller is stupid
2067 assert(is_auth());
2068 assert(ignore_authpinnability || can_auth_pin());
2069
2070 // note: queue up a noop if necessary, so that we always
2071 // get an auth_pin.
2072 if (!c)
2073 c = new C_MDSInternalNoop;
2074
2075 // auth_pin on first waiter
2076 if (waiting_for_commit.empty())
2077 auth_pin(this);
2078 waiting_for_commit[want].push_back(c);
2079
2080 // ok.
2081 _commit(want, op_prio);
2082 }
2083
2084 class C_IO_Dir_Committed : public CDirIOContext {
2085 version_t version;
2086 public:
2087 C_IO_Dir_Committed(CDir *d, version_t v) : CDirIOContext(d), version(v) { }
2088 void finish(int r) override {
2089 dir->_committed(r, version);
2090 }
2091 };
2092
2093 /**
2094 * Flush out the modified dentries in this dir. Keep the bufferlist
2095 * below max_write_size;
2096 */
2097 void CDir::_omap_commit(int op_prio)
2098 {
2099 dout(10) << "_omap_commit" << dendl;
2100
2101 unsigned max_write_size = cache->max_dir_commit_size;
2102 unsigned write_size = 0;
2103
2104 if (op_prio < 0)
2105 op_prio = CEPH_MSG_PRIO_DEFAULT;
2106
2107 // snap purge?
2108 const set<snapid_t> *snaps = NULL;
2109 SnapRealm *realm = inode->find_snaprealm();
2110 if (!realm->have_past_parents_open()) {
2111 dout(10) << " no snap purge, one or more past parents NOT open" << dendl;
2112 } else if (fnode.snap_purged_thru < realm->get_last_destroyed()) {
2113 snaps = &realm->get_snaps();
2114 dout(10) << " snap_purged_thru " << fnode.snap_purged_thru
2115 << " < " << realm->get_last_destroyed()
2116 << ", snap purge based on " << *snaps << dendl;
2117 // fnode.snap_purged_thru = realm->get_last_destroyed();
2118 }
2119
2120 set<string> to_remove;
2121 map<string, bufferlist> to_set;
2122
2123 C_GatherBuilder gather(g_ceph_context,
2124 new C_OnFinisher(new C_IO_Dir_Committed(this,
2125 get_version()),
2126 cache->mds->finisher));
2127
2128 SnapContext snapc;
2129 object_t oid = get_ondisk_object();
2130 object_locator_t oloc(cache->mds->mdsmap->get_metadata_pool());
2131
2132 if (!stale_items.empty()) {
2133 for (const auto &p : stale_items) {
2134 to_remove.insert(std::string(boost::string_view(p)));
2135 write_size += p.length();
2136 }
2137 stale_items.clear();
2138 }
2139
2140 auto write_one = [&](CDentry *dn) {
2141 string key;
2142 dn->key().encode(key);
2143
2144 if (dn->last != CEPH_NOSNAP &&
2145 snaps && try_trim_snap_dentry(dn, *snaps)) {
2146 dout(10) << " rm " << key << dendl;
2147 write_size += key.length();
2148 to_remove.insert(key);
2149 return;
2150 }
2151
2152 if (dn->get_linkage()->is_null()) {
2153 dout(10) << " rm " << dn->get_name() << " " << *dn << dendl;
2154 write_size += key.length();
2155 to_remove.insert(key);
2156 } else {
2157 dout(10) << " set " << dn->get_name() << " " << *dn << dendl;
2158 bufferlist dnbl;
2159 _encode_dentry(dn, dnbl, snaps);
2160 write_size += key.length() + dnbl.length();
2161 to_set[key].swap(dnbl);
2162 }
2163
2164 if (write_size >= max_write_size) {
2165 ObjectOperation op;
2166 op.priority = op_prio;
2167
2168 // don't create new dirfrag blindly
2169 if (!is_new() && !state_test(CDir::STATE_FRAGMENTING))
2170 op.stat(NULL, (ceph::real_time*) NULL, NULL);
2171
2172 if (!to_set.empty())
2173 op.omap_set(to_set);
2174 if (!to_remove.empty())
2175 op.omap_rm_keys(to_remove);
2176
2177 cache->mds->objecter->mutate(oid, oloc, op, snapc,
2178 ceph::real_clock::now(),
2179 0, gather.new_sub());
2180
2181 write_size = 0;
2182 to_set.clear();
2183 to_remove.clear();
2184 }
2185 };
2186
2187 if (state_test(CDir::STATE_FRAGMENTING)) {
2188 for (auto p = items.begin(); p != items.end(); ) {
2189 CDentry *dn = p->second;
2190 ++p;
2191 if (!dn->is_dirty() && dn->get_linkage()->is_null())
2192 continue;
2193 write_one(dn);
2194 }
2195 } else {
2196 for (auto p = dirty_dentries.begin(); !p.end(); ) {
2197 CDentry *dn = *p;
2198 ++p;
2199 write_one(dn);
2200 }
2201 }
2202
2203 ObjectOperation op;
2204 op.priority = op_prio;
2205
2206 // don't create new dirfrag blindly
2207 if (!is_new() && !state_test(CDir::STATE_FRAGMENTING))
2208 op.stat(NULL, (ceph::real_time*)NULL, NULL);
2209
2210 /*
2211 * save the header at the last moment.. If we were to send it off before other
2212 * updates, but die before sending them all, we'd think that the on-disk state
2213 * was fully committed even though it wasn't! However, since the messages are
2214 * strictly ordered between the MDS and the OSD, and since messages to a given
2215 * PG are strictly ordered, if we simply send the message containing the header
2216 * off last, we cannot get our header into an incorrect state.
2217 */
2218 bufferlist header;
2219 ::encode(fnode, header);
2220 op.omap_set_header(header);
2221
2222 if (!to_set.empty())
2223 op.omap_set(to_set);
2224 if (!to_remove.empty())
2225 op.omap_rm_keys(to_remove);
2226
2227 cache->mds->objecter->mutate(oid, oloc, op, snapc,
2228 ceph::real_clock::now(),
2229 0, gather.new_sub());
2230
2231 gather.activate();
2232 }
2233
2234 void CDir::_encode_dentry(CDentry *dn, bufferlist& bl,
2235 const set<snapid_t> *snaps)
2236 {
2237 // clear dentry NEW flag, if any. we can no longer silently drop it.
2238 dn->clear_new();
2239
2240 ::encode(dn->first, bl);
2241
2242 // primary or remote?
2243 if (dn->linkage.is_remote()) {
2244 inodeno_t ino = dn->linkage.get_remote_ino();
2245 unsigned char d_type = dn->linkage.get_remote_d_type();
2246 dout(14) << " pos " << bl.length() << " dn '" << dn->get_name() << "' remote ino " << ino << dendl;
2247
2248 // marker, name, ino
2249 bl.append('L'); // remote link
2250 ::encode(ino, bl);
2251 ::encode(d_type, bl);
2252 } else if (dn->linkage.is_primary()) {
2253 // primary link
2254 CInode *in = dn->linkage.get_inode();
2255 assert(in);
2256
2257 dout(14) << " pos " << bl.length() << " dn '" << dn->get_name() << "' inode " << *in << dendl;
2258
2259 // marker, name, inode, [symlink string]
2260 bl.append('I'); // inode
2261
2262 if (in->is_multiversion()) {
2263 if (!in->snaprealm) {
2264 if (snaps)
2265 in->purge_stale_snap_data(*snaps);
2266 } else if (in->snaprealm->have_past_parents_open()) {
2267 in->purge_stale_snap_data(in->snaprealm->get_snaps());
2268 }
2269 }
2270
2271 bufferlist snap_blob;
2272 in->encode_snap_blob(snap_blob);
2273 in->encode_bare(bl, cache->mds->mdsmap->get_up_features(), &snap_blob);
2274 } else {
2275 assert(!dn->linkage.is_null());
2276 }
2277 }
2278
2279 void CDir::_commit(version_t want, int op_prio)
2280 {
2281 dout(10) << "_commit want " << want << " on " << *this << dendl;
2282
2283 // we can't commit things in the future.
2284 // (even the projected future.)
2285 assert(want <= get_version() || get_version() == 0);
2286
2287 // check pre+postconditions.
2288 assert(is_auth());
2289
2290 // already committed?
2291 if (committed_version >= want) {
2292 dout(10) << "already committed " << committed_version << " >= " << want << dendl;
2293 return;
2294 }
2295 // already committing >= want?
2296 if (committing_version >= want) {
2297 dout(10) << "already committing " << committing_version << " >= " << want << dendl;
2298 assert(state_test(STATE_COMMITTING));
2299 return;
2300 }
2301
2302 // alrady committed an older version?
2303 if (committing_version > committed_version) {
2304 dout(10) << "already committing older " << committing_version << ", waiting for that to finish" << dendl;
2305 return;
2306 }
2307
2308 // commit.
2309 committing_version = get_version();
2310
2311 // mark committing (if not already)
2312 if (!state_test(STATE_COMMITTING)) {
2313 dout(10) << "marking committing" << dendl;
2314 state_set(STATE_COMMITTING);
2315 }
2316
2317 if (cache->mds->logger) cache->mds->logger->inc(l_mds_dir_commit);
2318
2319 _omap_commit(op_prio);
2320 }
2321
2322
2323 /**
2324 * _committed
2325 *
2326 * @param v version i just committed
2327 */
2328 void CDir::_committed(int r, version_t v)
2329 {
2330 if (r < 0) {
2331 // the directory could be partly purged during MDS failover
2332 if (r == -ENOENT && committed_version == 0 &&
2333 !inode->is_base() && get_parent_dir()->inode->is_stray()) {
2334 r = 0;
2335 if (inode->snaprealm)
2336 inode->state_set(CInode::STATE_MISSINGOBJS);
2337 }
2338 if (r < 0) {
2339 dout(1) << "commit error " << r << " v " << v << dendl;
2340 cache->mds->clog->error() << "failed to commit dir " << dirfrag() << " object,"
2341 << " errno " << r;
2342 cache->mds->handle_write_error(r);
2343 return;
2344 }
2345 }
2346
2347 dout(10) << "_committed v " << v << " on " << *this << dendl;
2348 assert(is_auth());
2349
2350 bool stray = inode->is_stray();
2351
2352 // take note.
2353 assert(v > committed_version);
2354 assert(v <= committing_version);
2355 committed_version = v;
2356
2357 // _all_ commits done?
2358 if (committing_version == committed_version)
2359 state_clear(CDir::STATE_COMMITTING);
2360
2361 // _any_ commit, even if we've been redirtied, means we're no longer new.
2362 item_new.remove_myself();
2363
2364 // dir clean?
2365 if (committed_version == get_version())
2366 mark_clean();
2367
2368 // dentries clean?
2369 for (auto p = dirty_dentries.begin(); !p.end(); ) {
2370 CDentry *dn = *p;
2371 ++p;
2372
2373 // inode?
2374 if (dn->linkage.is_primary()) {
2375 CInode *in = dn->linkage.get_inode();
2376 assert(in);
2377 assert(in->is_auth());
2378
2379 if (committed_version >= in->get_version()) {
2380 if (in->is_dirty()) {
2381 dout(15) << " dir " << committed_version << " >= inode " << in->get_version() << " now clean " << *in << dendl;
2382 in->mark_clean();
2383 }
2384 } else {
2385 dout(15) << " dir " << committed_version << " < inode " << in->get_version() << " still dirty " << *in << dendl;
2386 assert(in->is_dirty() || in->last < CEPH_NOSNAP); // special case for cow snap items (not predirtied)
2387 }
2388 }
2389
2390 // dentry
2391 if (committed_version >= dn->get_version()) {
2392 dout(15) << " dir " << committed_version << " >= dn " << dn->get_version() << " now clean " << *dn << dendl;
2393 dn->mark_clean();
2394
2395 // drop clean null stray dentries immediately
2396 if (stray &&
2397 dn->get_num_ref() == 0 &&
2398 !dn->is_projected() &&
2399 dn->get_linkage()->is_null())
2400 remove_dentry(dn);
2401 } else {
2402 dout(15) << " dir " << committed_version << " < dn " << dn->get_version() << " still dirty " << *dn << dendl;
2403 assert(dn->is_dirty());
2404 }
2405 }
2406
2407 // finishers?
2408 bool were_waiters = !waiting_for_commit.empty();
2409
2410 auto it = waiting_for_commit.begin();
2411 while (it != waiting_for_commit.end()) {
2412 auto _it = it;
2413 ++_it;
2414 if (it->first > committed_version) {
2415 dout(10) << " there are waiters for " << it->first << ", committing again" << dendl;
2416 _commit(it->first, -1);
2417 break;
2418 }
2419 std::list<MDSInternalContextBase*> t;
2420 for (const auto &waiter : it->second)
2421 t.push_back(waiter);
2422 cache->mds->queue_waiters(t);
2423 waiting_for_commit.erase(it);
2424 it = _it;
2425 }
2426
2427 // try drop dentries in this dirfrag if it's about to be purged
2428 if (!inode->is_base() && get_parent_dir()->inode->is_stray() &&
2429 inode->snaprealm)
2430 cache->maybe_eval_stray(inode, true);
2431
2432 // unpin if we kicked the last waiter.
2433 if (were_waiters &&
2434 waiting_for_commit.empty())
2435 auth_unpin(this);
2436 }
2437
2438
2439
2440
2441 // IMPORT/EXPORT
2442
2443 void CDir::encode_export(bufferlist& bl)
2444 {
2445 assert(!is_projected());
2446 ::encode(first, bl);
2447 ::encode(fnode, bl);
2448 ::encode(dirty_old_rstat, bl);
2449 ::encode(committed_version, bl);
2450
2451 ::encode(state, bl);
2452 ::encode(dir_rep, bl);
2453
2454 ::encode(pop_me, bl);
2455 ::encode(pop_auth_subtree, bl);
2456
2457 ::encode(dir_rep_by, bl);
2458 ::encode(get_replicas(), bl);
2459
2460 get(PIN_TEMPEXPORTING);
2461 }
2462
2463 void CDir::finish_export(utime_t now)
2464 {
2465 state &= MASK_STATE_EXPORT_KEPT;
2466 pop_auth_subtree_nested.sub(now, cache->decayrate, pop_auth_subtree);
2467 pop_me.zero(now);
2468 pop_auth_subtree.zero(now);
2469 put(PIN_TEMPEXPORTING);
2470 dirty_old_rstat.clear();
2471 }
2472
2473 void CDir::decode_import(bufferlist::iterator& blp, utime_t now, LogSegment *ls)
2474 {
2475 ::decode(first, blp);
2476 ::decode(fnode, blp);
2477 ::decode(dirty_old_rstat, blp);
2478 projected_version = fnode.version;
2479 ::decode(committed_version, blp);
2480 committing_version = committed_version;
2481
2482 unsigned s;
2483 ::decode(s, blp);
2484 state &= MASK_STATE_IMPORT_KEPT;
2485 state_set(STATE_AUTH | (s & MASK_STATE_EXPORTED));
2486
2487 if (is_dirty()) {
2488 get(PIN_DIRTY);
2489 _mark_dirty(ls);
2490 }
2491
2492 ::decode(dir_rep, blp);
2493
2494 ::decode(pop_me, now, blp);
2495 ::decode(pop_auth_subtree, now, blp);
2496 pop_auth_subtree_nested.add(now, cache->decayrate, pop_auth_subtree);
2497
2498 ::decode(dir_rep_by, blp);
2499 ::decode(get_replicas(), blp);
2500 if (is_replicated()) get(PIN_REPLICATED);
2501
2502 replica_nonce = 0; // no longer defined
2503
2504 // did we import some dirty scatterlock data?
2505 if (dirty_old_rstat.size() ||
2506 !(fnode.rstat == fnode.accounted_rstat)) {
2507 cache->mds->locker->mark_updated_scatterlock(&inode->nestlock);
2508 ls->dirty_dirfrag_nest.push_back(&inode->item_dirty_dirfrag_nest);
2509 }
2510 if (!(fnode.fragstat == fnode.accounted_fragstat)) {
2511 cache->mds->locker->mark_updated_scatterlock(&inode->filelock);
2512 ls->dirty_dirfrag_dir.push_back(&inode->item_dirty_dirfrag_dir);
2513 }
2514 if (is_dirty_dft()) {
2515 if (inode->dirfragtreelock.get_state() != LOCK_MIX &&
2516 inode->dirfragtreelock.is_stable()) {
2517 // clear stale dirtydft
2518 state_clear(STATE_DIRTYDFT);
2519 } else {
2520 cache->mds->locker->mark_updated_scatterlock(&inode->dirfragtreelock);
2521 ls->dirty_dirfrag_dirfragtree.push_back(&inode->item_dirty_dirfrag_dirfragtree);
2522 }
2523 }
2524 }
2525
2526
2527
2528
2529 /********************************
2530 * AUTHORITY
2531 */
2532
2533 /*
2534 * if dir_auth.first == parent, auth is same as inode.
2535 * unless .second != unknown, in which case that sticks.
2536 */
2537 mds_authority_t CDir::authority() const
2538 {
2539 if (is_subtree_root())
2540 return dir_auth;
2541 else
2542 return inode->authority();
2543 }
2544
2545 /** is_subtree_root()
2546 * true if this is an auth delegation point.
2547 * that is, dir_auth != default (parent,unknown)
2548 *
2549 * some key observations:
2550 * if i am auth:
2551 * - any region bound will be an export, or frozen.
2552 *
2553 * note that this DOES heed dir_auth.pending
2554 */
2555 /*
2556 bool CDir::is_subtree_root()
2557 {
2558 if (dir_auth == CDIR_AUTH_DEFAULT) {
2559 //dout(10) << "is_subtree_root false " << dir_auth << " != " << CDIR_AUTH_DEFAULT
2560 //<< " on " << ino() << dendl;
2561 return false;
2562 } else {
2563 //dout(10) << "is_subtree_root true " << dir_auth << " != " << CDIR_AUTH_DEFAULT
2564 //<< " on " << ino() << dendl;
2565 return true;
2566 }
2567 }
2568 */
2569
2570 /** contains(x)
2571 * true if we are x, or an ancestor of x
2572 */
2573 bool CDir::contains(CDir *x)
2574 {
2575 while (1) {
2576 if (x == this)
2577 return true;
2578 x = x->get_inode()->get_projected_parent_dir();
2579 if (x == 0)
2580 return false;
2581 }
2582 }
2583
2584
2585
2586 /** set_dir_auth
2587 */
2588 void CDir::set_dir_auth(mds_authority_t a)
2589 {
2590 dout(10) << "setting dir_auth=" << a
2591 << " from " << dir_auth
2592 << " on " << *this << dendl;
2593
2594 bool was_subtree = is_subtree_root();
2595 bool was_ambiguous = dir_auth.second >= 0;
2596
2597 // set it.
2598 dir_auth = a;
2599
2600 // new subtree root?
2601 if (!was_subtree && is_subtree_root()) {
2602 dout(10) << " new subtree root, adjusting auth_pins" << dendl;
2603
2604 // adjust nested auth pins
2605 if (get_cum_auth_pins())
2606 inode->adjust_nested_auth_pins(-1, NULL);
2607
2608 // unpin parent of frozen dir/tree?
2609 if (inode->is_auth()) {
2610 assert(!is_frozen_tree_root());
2611 if (is_frozen_dir())
2612 inode->auth_unpin(this);
2613 }
2614 }
2615 if (was_subtree && !is_subtree_root()) {
2616 dout(10) << " old subtree root, adjusting auth_pins" << dendl;
2617
2618 // adjust nested auth pins
2619 if (get_cum_auth_pins())
2620 inode->adjust_nested_auth_pins(1, NULL);
2621
2622 // pin parent of frozen dir/tree?
2623 if (inode->is_auth()) {
2624 assert(!is_frozen_tree_root());
2625 if (is_frozen_dir())
2626 inode->auth_pin(this);
2627 }
2628 }
2629
2630 // newly single auth?
2631 if (was_ambiguous && dir_auth.second == CDIR_AUTH_UNKNOWN) {
2632 list<MDSInternalContextBase*> ls;
2633 take_waiting(WAIT_SINGLEAUTH, ls);
2634 cache->mds->queue_waiters(ls);
2635 }
2636 }
2637
2638
2639 /*****************************************
2640 * AUTH PINS and FREEZING
2641 *
2642 * the basic plan is that auth_pins only exist in auth regions, and they
2643 * prevent a freeze (and subsequent auth change).
2644 *
2645 * however, we also need to prevent a parent from freezing if a child is frozen.
2646 * for that reason, the parent inode of a frozen directory is auth_pinned.
2647 *
2648 * the oddity is when the frozen directory is a subtree root. if that's the case,
2649 * the parent inode isn't frozen. which means that when subtree authority is adjusted
2650 * at the bounds, inodes for any frozen bound directories need to get auth_pins at that
2651 * time.
2652 *
2653 */
2654
2655 void CDir::auth_pin(void *by)
2656 {
2657 if (auth_pins == 0)
2658 get(PIN_AUTHPIN);
2659 auth_pins++;
2660
2661 #ifdef MDS_AUTHPIN_SET
2662 auth_pin_set.insert(by);
2663 #endif
2664
2665 dout(10) << "auth_pin by " << by
2666 << " on " << *this
2667 << " count now " << auth_pins << " + " << nested_auth_pins << dendl;
2668
2669 // nest pins?
2670 if (!is_subtree_root() &&
2671 get_cum_auth_pins() == 1)
2672 inode->adjust_nested_auth_pins(1, by);
2673 }
2674
2675 void CDir::auth_unpin(void *by)
2676 {
2677 auth_pins--;
2678
2679 #ifdef MDS_AUTHPIN_SET
2680 assert(auth_pin_set.count(by));
2681 auth_pin_set.erase(auth_pin_set.find(by));
2682 #endif
2683 if (auth_pins == 0)
2684 put(PIN_AUTHPIN);
2685
2686 dout(10) << "auth_unpin by " << by
2687 << " on " << *this
2688 << " count now " << auth_pins << " + " << nested_auth_pins << dendl;
2689 assert(auth_pins >= 0);
2690
2691 int newcum = get_cum_auth_pins();
2692
2693 maybe_finish_freeze(); // pending freeze?
2694
2695 // nest?
2696 if (!is_subtree_root() &&
2697 newcum == 0)
2698 inode->adjust_nested_auth_pins(-1, by);
2699 }
2700
2701 void CDir::adjust_nested_auth_pins(int inc, int dirinc, void *by)
2702 {
2703 assert(inc);
2704 nested_auth_pins += inc;
2705 dir_auth_pins += dirinc;
2706
2707 dout(15) << "adjust_nested_auth_pins " << inc << "/" << dirinc << " on " << *this
2708 << " by " << by << " count now "
2709 << auth_pins << " + " << nested_auth_pins << dendl;
2710 assert(nested_auth_pins >= 0);
2711 assert(dir_auth_pins >= 0);
2712
2713 int newcum = get_cum_auth_pins();
2714
2715 maybe_finish_freeze(); // pending freeze?
2716
2717 // nest?
2718 if (!is_subtree_root()) {
2719 if (newcum == 0)
2720 inode->adjust_nested_auth_pins(-1, by);
2721 else if (newcum == inc)
2722 inode->adjust_nested_auth_pins(1, by);
2723 }
2724 }
2725
2726 #ifdef MDS_VERIFY_FRAGSTAT
2727 void CDir::verify_fragstat()
2728 {
2729 assert(is_complete());
2730 if (inode->is_stray())
2731 return;
2732
2733 frag_info_t c;
2734 memset(&c, 0, sizeof(c));
2735
2736 for (auto it = items.begin();
2737 it != items.end();
2738 ++it) {
2739 CDentry *dn = it->second;
2740 if (dn->is_null())
2741 continue;
2742
2743 dout(10) << " " << *dn << dendl;
2744 if (dn->is_primary())
2745 dout(10) << " " << *dn->inode << dendl;
2746
2747 if (dn->is_primary()) {
2748 if (dn->inode->is_dir())
2749 c.nsubdirs++;
2750 else
2751 c.nfiles++;
2752 }
2753 if (dn->is_remote()) {
2754 if (dn->get_remote_d_type() == DT_DIR)
2755 c.nsubdirs++;
2756 else
2757 c.nfiles++;
2758 }
2759 }
2760
2761 if (c.nsubdirs != fnode.fragstat.nsubdirs ||
2762 c.nfiles != fnode.fragstat.nfiles) {
2763 dout(0) << "verify_fragstat failed " << fnode.fragstat << " on " << *this << dendl;
2764 dout(0) << " i count " << c << dendl;
2765 ceph_abort();
2766 } else {
2767 dout(0) << "verify_fragstat ok " << fnode.fragstat << " on " << *this << dendl;
2768 }
2769 }
2770 #endif
2771
2772 /*****************************************************************************
2773 * FREEZING
2774 */
2775
2776 // FREEZE TREE
2777
2778 bool CDir::freeze_tree()
2779 {
2780 assert(!is_frozen());
2781 assert(!is_freezing());
2782
2783 auth_pin(this);
2784 if (is_freezeable(true)) {
2785 _freeze_tree();
2786 auth_unpin(this);
2787 return true;
2788 } else {
2789 state_set(STATE_FREEZINGTREE);
2790 ++num_freezing_trees;
2791 dout(10) << "freeze_tree waiting " << *this << dendl;
2792 return false;
2793 }
2794 }
2795
2796 void CDir::_freeze_tree()
2797 {
2798 dout(10) << "_freeze_tree " << *this << dendl;
2799 assert(is_freezeable(true));
2800
2801 // twiddle state
2802 if (state_test(STATE_FREEZINGTREE)) {
2803 state_clear(STATE_FREEZINGTREE); // actually, this may get set again by next context?
2804 --num_freezing_trees;
2805 }
2806
2807 if (is_auth()) {
2808 mds_authority_t auth;
2809 bool was_subtree = is_subtree_root();
2810 if (was_subtree) {
2811 auth = get_dir_auth();
2812 } else {
2813 // temporarily prevent parent subtree from becoming frozen.
2814 inode->auth_pin(this);
2815 // create new subtree
2816 auth = authority();
2817 }
2818
2819 assert(auth.first >= 0);
2820 assert(auth.second == CDIR_AUTH_UNKNOWN);
2821 auth.second = auth.first;
2822 inode->mdcache->adjust_subtree_auth(this, auth);
2823 if (!was_subtree)
2824 inode->auth_unpin(this);
2825 }
2826
2827 state_set(STATE_FROZENTREE);
2828 ++num_frozen_trees;
2829 get(PIN_FROZEN);
2830 }
2831
2832 void CDir::unfreeze_tree()
2833 {
2834 dout(10) << "unfreeze_tree " << *this << dendl;
2835
2836 if (state_test(STATE_FROZENTREE)) {
2837 // frozen. unfreeze.
2838 state_clear(STATE_FROZENTREE);
2839 --num_frozen_trees;
2840
2841 put(PIN_FROZEN);
2842
2843 if (is_auth()) {
2844 // must be subtree
2845 assert(is_subtree_root());
2846 // for debug purpose, caller should ensure 'dir_auth.second == dir_auth.first'
2847 mds_authority_t auth = get_dir_auth();
2848 assert(auth.first >= 0);
2849 assert(auth.second == auth.first);
2850 auth.second = CDIR_AUTH_UNKNOWN;
2851 inode->mdcache->adjust_subtree_auth(this, auth);
2852 }
2853
2854 // waiters?
2855 finish_waiting(WAIT_UNFREEZE);
2856 } else {
2857 finish_waiting(WAIT_FROZEN, -1);
2858
2859 // freezing. stop it.
2860 assert(state_test(STATE_FREEZINGTREE));
2861 state_clear(STATE_FREEZINGTREE);
2862 --num_freezing_trees;
2863 auth_unpin(this);
2864
2865 finish_waiting(WAIT_UNFREEZE);
2866 }
2867 }
2868
2869 bool CDir::is_freezing_tree() const
2870 {
2871 if (num_freezing_trees == 0)
2872 return false;
2873 const CDir *dir = this;
2874 while (1) {
2875 if (dir->is_freezing_tree_root()) return true;
2876 if (dir->is_subtree_root()) return false;
2877 if (dir->inode->parent)
2878 dir = dir->inode->parent->dir;
2879 else
2880 return false; // root on replica
2881 }
2882 }
2883
2884 bool CDir::is_frozen_tree() const
2885 {
2886 if (num_frozen_trees == 0)
2887 return false;
2888 const CDir *dir = this;
2889 while (1) {
2890 if (dir->is_frozen_tree_root()) return true;
2891 if (dir->is_subtree_root()) return false;
2892 if (dir->inode->parent)
2893 dir = dir->inode->parent->dir;
2894 else
2895 return false; // root on replica
2896 }
2897 }
2898
2899 CDir *CDir::get_frozen_tree_root()
2900 {
2901 assert(is_frozen());
2902 CDir *dir = this;
2903 while (1) {
2904 if (dir->is_frozen_tree_root())
2905 return dir;
2906 if (dir->inode->parent)
2907 dir = dir->inode->parent->dir;
2908 else
2909 ceph_abort();
2910 }
2911 }
2912
2913 class C_Dir_AuthUnpin : public CDirContext {
2914 public:
2915 explicit C_Dir_AuthUnpin(CDir *d) : CDirContext(d) {}
2916 void finish(int r) override {
2917 dir->auth_unpin(dir->get_inode());
2918 }
2919 };
2920
2921 void CDir::maybe_finish_freeze()
2922 {
2923 if (auth_pins != 1 || dir_auth_pins != 0)
2924 return;
2925
2926 // we can freeze the _dir_ even with nested pins...
2927 if (state_test(STATE_FREEZINGDIR)) {
2928 _freeze_dir();
2929 auth_unpin(this);
2930 finish_waiting(WAIT_FROZEN);
2931 }
2932
2933 if (nested_auth_pins != 0)
2934 return;
2935
2936 if (state_test(STATE_FREEZINGTREE)) {
2937 if (!is_subtree_root() && inode->is_frozen()) {
2938 dout(10) << "maybe_finish_freeze !subtree root and frozen inode, waiting for unfreeze on " << inode << dendl;
2939 // retake an auth_pin...
2940 auth_pin(inode);
2941 // and release it when the parent inode unfreezes
2942 inode->add_waiter(WAIT_UNFREEZE, new C_Dir_AuthUnpin(this));
2943 return;
2944 }
2945
2946 _freeze_tree();
2947 auth_unpin(this);
2948 finish_waiting(WAIT_FROZEN);
2949 }
2950 }
2951
2952
2953
2954 // FREEZE DIR
2955
2956 bool CDir::freeze_dir()
2957 {
2958 assert(!is_frozen());
2959 assert(!is_freezing());
2960
2961 auth_pin(this);
2962 if (is_freezeable_dir(true)) {
2963 _freeze_dir();
2964 auth_unpin(this);
2965 return true;
2966 } else {
2967 state_set(STATE_FREEZINGDIR);
2968 dout(10) << "freeze_dir + wait " << *this << dendl;
2969 return false;
2970 }
2971 }
2972
2973 void CDir::_freeze_dir()
2974 {
2975 dout(10) << "_freeze_dir " << *this << dendl;
2976 //assert(is_freezeable_dir(true));
2977 // not always true during split because the original fragment may have frozen a while
2978 // ago and we're just now getting around to breaking it up.
2979
2980 state_clear(STATE_FREEZINGDIR);
2981 state_set(STATE_FROZENDIR);
2982 get(PIN_FROZEN);
2983
2984 if (is_auth() && !is_subtree_root())
2985 inode->auth_pin(this); // auth_pin for duration of freeze
2986 }
2987
2988
2989 void CDir::unfreeze_dir()
2990 {
2991 dout(10) << "unfreeze_dir " << *this << dendl;
2992
2993 if (state_test(STATE_FROZENDIR)) {
2994 state_clear(STATE_FROZENDIR);
2995 put(PIN_FROZEN);
2996
2997 // unpin (may => FREEZEABLE) FIXME: is this order good?
2998 if (is_auth() && !is_subtree_root())
2999 inode->auth_unpin(this);
3000
3001 finish_waiting(WAIT_UNFREEZE);
3002 } else {
3003 finish_waiting(WAIT_FROZEN, -1);
3004
3005 // still freezing. stop.
3006 assert(state_test(STATE_FREEZINGDIR));
3007 state_clear(STATE_FREEZINGDIR);
3008 auth_unpin(this);
3009
3010 finish_waiting(WAIT_UNFREEZE);
3011 }
3012 }
3013
3014 /**
3015 * Slightly less complete than operator<<, because this is intended
3016 * for identifying a directory and its state rather than for dumping
3017 * debug output.
3018 */
3019 void CDir::dump(Formatter *f) const
3020 {
3021 assert(f != NULL);
3022
3023 f->dump_stream("path") << get_path();
3024
3025 f->dump_stream("dirfrag") << dirfrag();
3026 f->dump_int("snapid_first", first);
3027
3028 f->dump_stream("projected_version") << get_projected_version();
3029 f->dump_stream("version") << get_version();
3030 f->dump_stream("committing_version") << get_committing_version();
3031 f->dump_stream("committed_version") << get_committed_version();
3032
3033 f->dump_bool("is_rep", is_rep());
3034
3035 if (get_dir_auth() != CDIR_AUTH_DEFAULT) {
3036 if (get_dir_auth().second == CDIR_AUTH_UNKNOWN) {
3037 f->dump_stream("dir_auth") << get_dir_auth().first;
3038 } else {
3039 f->dump_stream("dir_auth") << get_dir_auth();
3040 }
3041 } else {
3042 f->dump_string("dir_auth", "");
3043 }
3044
3045 f->open_array_section("states");
3046 MDSCacheObject::dump_states(f);
3047 if (state_test(CDir::STATE_COMPLETE)) f->dump_string("state", "complete");
3048 if (state_test(CDir::STATE_FREEZINGTREE)) f->dump_string("state", "freezingtree");
3049 if (state_test(CDir::STATE_FROZENTREE)) f->dump_string("state", "frozentree");
3050 if (state_test(CDir::STATE_FROZENDIR)) f->dump_string("state", "frozendir");
3051 if (state_test(CDir::STATE_FREEZINGDIR)) f->dump_string("state", "freezingdir");
3052 if (state_test(CDir::STATE_EXPORTBOUND)) f->dump_string("state", "exportbound");
3053 if (state_test(CDir::STATE_IMPORTBOUND)) f->dump_string("state", "importbound");
3054 if (state_test(CDir::STATE_BADFRAG)) f->dump_string("state", "badfrag");
3055 f->close_section();
3056
3057 MDSCacheObject::dump(f);
3058 }
3059
3060 /****** Scrub Stuff *******/
3061
3062 void CDir::scrub_info_create() const
3063 {
3064 assert(!scrub_infop);
3065
3066 // break out of const-land to set up implicit initial state
3067 CDir *me = const_cast<CDir*>(this);
3068 fnode_t *fn = me->get_projected_fnode();
3069
3070 std::unique_ptr<scrub_info_t> si(new scrub_info_t());
3071
3072 si->last_recursive.version = si->recursive_start.version =
3073 fn->recursive_scrub_version;
3074 si->last_recursive.time = si->recursive_start.time =
3075 fn->recursive_scrub_stamp;
3076
3077 si->last_local.version = fn->localized_scrub_version;
3078 si->last_local.time = fn->localized_scrub_stamp;
3079
3080 me->scrub_infop.swap(si);
3081 }
3082
3083 void CDir::scrub_initialize(const ScrubHeaderRefConst& header)
3084 {
3085 dout(20) << __func__ << dendl;
3086 assert(is_complete());
3087 assert(header != nullptr);
3088
3089 // FIXME: weird implicit construction, is someone else meant
3090 // to be calling scrub_info_create first?
3091 scrub_info();
3092 assert(scrub_infop && !scrub_infop->directory_scrubbing);
3093
3094 scrub_infop->recursive_start.version = get_projected_version();
3095 scrub_infop->recursive_start.time = ceph_clock_now();
3096
3097 scrub_infop->directories_to_scrub.clear();
3098 scrub_infop->directories_scrubbing.clear();
3099 scrub_infop->directories_scrubbed.clear();
3100 scrub_infop->others_to_scrub.clear();
3101 scrub_infop->others_scrubbing.clear();
3102 scrub_infop->others_scrubbed.clear();
3103
3104 for (auto i = items.begin();
3105 i != items.end();
3106 ++i) {
3107 // TODO: handle snapshot scrubbing
3108 if (i->first.snapid != CEPH_NOSNAP)
3109 continue;
3110
3111 CDentry::linkage_t *dnl = i->second->get_projected_linkage();
3112 if (dnl->is_primary()) {
3113 if (dnl->get_inode()->is_dir())
3114 scrub_infop->directories_to_scrub.insert(i->first);
3115 else
3116 scrub_infop->others_to_scrub.insert(i->first);
3117 } else if (dnl->is_remote()) {
3118 // TODO: check remote linkage
3119 }
3120 }
3121 scrub_infop->directory_scrubbing = true;
3122 scrub_infop->header = header;
3123 }
3124
3125 void CDir::scrub_finished()
3126 {
3127 dout(20) << __func__ << dendl;
3128 assert(scrub_infop && scrub_infop->directory_scrubbing);
3129
3130 assert(scrub_infop->directories_to_scrub.empty());
3131 assert(scrub_infop->directories_scrubbing.empty());
3132 scrub_infop->directories_scrubbed.clear();
3133 assert(scrub_infop->others_to_scrub.empty());
3134 assert(scrub_infop->others_scrubbing.empty());
3135 scrub_infop->others_scrubbed.clear();
3136 scrub_infop->directory_scrubbing = false;
3137
3138 scrub_infop->last_recursive = scrub_infop->recursive_start;
3139 scrub_infop->last_scrub_dirty = true;
3140 }
3141
3142 int CDir::_next_dentry_on_set(dentry_key_set &dns, bool missing_okay,
3143 MDSInternalContext *cb, CDentry **dnout)
3144 {
3145 dentry_key_t dnkey;
3146 CDentry *dn;
3147
3148 while (!dns.empty()) {
3149 set<dentry_key_t>::iterator front = dns.begin();
3150 dnkey = *front;
3151 dn = lookup(dnkey.name);
3152 if (!dn) {
3153 if (!is_complete() &&
3154 (!has_bloom() || is_in_bloom(dnkey.name))) {
3155 // need to re-read this dirfrag
3156 fetch(cb);
3157 return EAGAIN;
3158 }
3159 // okay, we lost it
3160 if (missing_okay) {
3161 dout(15) << " we no longer have directory dentry "
3162 << dnkey.name << ", assuming it got renamed" << dendl;
3163 dns.erase(dnkey);
3164 continue;
3165 } else {
3166 dout(5) << " we lost dentry " << dnkey.name
3167 << ", bailing out because that's impossible!" << dendl;
3168 ceph_abort();
3169 }
3170 }
3171 // okay, we got a dentry
3172 dns.erase(dnkey);
3173
3174 if (dn->get_projected_version() < scrub_infop->last_recursive.version &&
3175 !(scrub_infop->header->get_force())) {
3176 dout(15) << " skip dentry " << dnkey.name
3177 << ", no change since last scrub" << dendl;
3178 continue;
3179 }
3180
3181 if (!dn->get_linkage()->is_primary()) {
3182 dout(15) << " skip dentry " << dnkey.name
3183 << ", no longer primary" << dendl;
3184 continue;
3185 }
3186
3187 *dnout = dn;
3188 return 0;
3189 }
3190 *dnout = NULL;
3191 return ENOENT;
3192 }
3193
3194 int CDir::scrub_dentry_next(MDSInternalContext *cb, CDentry **dnout)
3195 {
3196 dout(20) << __func__ << dendl;
3197 assert(scrub_infop && scrub_infop->directory_scrubbing);
3198
3199 dout(20) << "trying to scrub directories underneath us" << dendl;
3200 int rval = _next_dentry_on_set(scrub_infop->directories_to_scrub, true,
3201 cb, dnout);
3202 if (rval == 0) {
3203 dout(20) << __func__ << " inserted to directories scrubbing: "
3204 << *dnout << dendl;
3205 scrub_infop->directories_scrubbing.insert((*dnout)->key());
3206 } else if (rval == EAGAIN) {
3207 // we don't need to do anything else
3208 } else { // we emptied out the directory scrub set
3209 assert(rval == ENOENT);
3210 dout(20) << "no directories left, moving on to other kinds of dentries"
3211 << dendl;
3212
3213 rval = _next_dentry_on_set(scrub_infop->others_to_scrub, false, cb, dnout);
3214 if (rval == 0) {
3215 dout(20) << __func__ << " inserted to others scrubbing: "
3216 << *dnout << dendl;
3217 scrub_infop->others_scrubbing.insert((*dnout)->key());
3218 }
3219 }
3220 dout(20) << " returning " << rval << " with dn=" << *dnout << dendl;
3221 return rval;
3222 }
3223
3224 void CDir::scrub_dentries_scrubbing(list<CDentry*> *out_dentries)
3225 {
3226 dout(20) << __func__ << dendl;
3227 assert(scrub_infop && scrub_infop->directory_scrubbing);
3228
3229 for (set<dentry_key_t>::iterator i =
3230 scrub_infop->directories_scrubbing.begin();
3231 i != scrub_infop->directories_scrubbing.end();
3232 ++i) {
3233 CDentry *d = lookup(i->name, i->snapid);
3234 assert(d);
3235 out_dentries->push_back(d);
3236 }
3237 for (set<dentry_key_t>::iterator i = scrub_infop->others_scrubbing.begin();
3238 i != scrub_infop->others_scrubbing.end();
3239 ++i) {
3240 CDentry *d = lookup(i->name, i->snapid);
3241 assert(d);
3242 out_dentries->push_back(d);
3243 }
3244 }
3245
3246 void CDir::scrub_dentry_finished(CDentry *dn)
3247 {
3248 dout(20) << __func__ << " on dn " << *dn << dendl;
3249 assert(scrub_infop && scrub_infop->directory_scrubbing);
3250 dentry_key_t dn_key = dn->key();
3251 if (scrub_infop->directories_scrubbing.erase(dn_key)) {
3252 scrub_infop->directories_scrubbed.insert(dn_key);
3253 } else {
3254 assert(scrub_infop->others_scrubbing.count(dn_key));
3255 scrub_infop->others_scrubbing.erase(dn_key);
3256 scrub_infop->others_scrubbed.insert(dn_key);
3257 }
3258 }
3259
3260 void CDir::scrub_maybe_delete_info()
3261 {
3262 if (scrub_infop &&
3263 !scrub_infop->directory_scrubbing &&
3264 !scrub_infop->need_scrub_local &&
3265 !scrub_infop->last_scrub_dirty &&
3266 !scrub_infop->pending_scrub_error &&
3267 scrub_infop->dirty_scrub_stamps.empty()) {
3268 scrub_infop.reset();
3269 }
3270 }
3271
3272 bool CDir::scrub_local()
3273 {
3274 assert(is_complete());
3275 bool rval = check_rstats(true);
3276
3277 scrub_info();
3278 if (rval) {
3279 scrub_infop->last_local.time = ceph_clock_now();
3280 scrub_infop->last_local.version = get_projected_version();
3281 scrub_infop->pending_scrub_error = false;
3282 scrub_infop->last_scrub_dirty = true;
3283 } else {
3284 scrub_infop->pending_scrub_error = true;
3285 if (scrub_infop->header->get_repair())
3286 cache->repair_dirfrag_stats(this);
3287 }
3288 return rval;
3289 }
3290
3291 std::string CDir::get_path() const
3292 {
3293 std::string path;
3294 get_inode()->make_path_string(path, true);
3295 return path;
3296 }
3297
3298 bool CDir::should_split_fast() const
3299 {
3300 // Max size a fragment can be before trigger fast splitting
3301 int fast_limit = g_conf->mds_bal_split_size * g_conf->mds_bal_fragment_fast_factor;
3302
3303 // Fast path: the sum of accounted size and null dentries does not
3304 // exceed threshold: we definitely are not over it.
3305 if (get_frag_size() + get_num_head_null() <= fast_limit) {
3306 return false;
3307 }
3308
3309 // Fast path: the accounted size of the frag exceeds threshold: we
3310 // definitely are over it
3311 if (get_frag_size() > fast_limit) {
3312 return true;
3313 }
3314
3315 int64_t effective_size = 0;
3316
3317 for (const auto &p : items) {
3318 const CDentry *dn = p.second;
3319 if (!dn->get_projected_linkage()->is_null()) {
3320 effective_size++;
3321 }
3322 }
3323
3324 return effective_size > fast_limit;
3325 }
3326
3327 MEMPOOL_DEFINE_OBJECT_FACTORY(CDir, co_dir, mds_co);