]> git.proxmox.com Git - ceph.git/blame - ceph/src/mds/CDir.cc
update sources to 12.2.7
[ceph.git] / ceph / src / mds / CDir.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
94b18763 15#include <boost/utility/string_view.hpp>
7c673cae
FG
16
17#include "include/types.h"
18
19#include "CDir.h"
20#include "CDentry.h"
21#include "CInode.h"
22#include "Mutation.h"
23
24#include "MDSMap.h"
25#include "MDSRank.h"
26#include "MDCache.h"
27#include "Locker.h"
28#include "MDLog.h"
29#include "LogSegment.h"
30
31#include "common/bloom_filter.hpp"
32#include "include/Context.h"
33#include "common/Clock.h"
34
35#include "osdc/Objecter.h"
36
37#include "common/config.h"
38#include "include/assert.h"
39#include "include/compat.h"
40
41#define dout_context g_ceph_context
42#define dout_subsys ceph_subsys_mds
43#undef dout_prefix
44#define dout_prefix *_dout << "mds." << cache->mds->get_nodeid() << ".cache.dir(" << this->dirfrag() << ") "
45
46int CDir::num_frozen_trees = 0;
47int CDir::num_freezing_trees = 0;
48
49class CDirContext : public MDSInternalContextBase
50{
51protected:
52 CDir *dir;
53 MDSRank* get_mds() override {return dir->cache->mds;}
54
55public:
56 explicit CDirContext(CDir *d) : dir(d) {
57 assert(dir != NULL);
58 }
59};
60
61
62class CDirIOContext : public MDSIOContextBase
63{
64protected:
65 CDir *dir;
66 MDSRank* get_mds() override {return dir->cache->mds;}
67
68public:
69 explicit CDirIOContext(CDir *d) : dir(d) {
70 assert(dir != NULL);
71 }
72};
73
74
75// PINS
76//int cdir_pins[CDIR_NUM_PINS] = { 0,0,0,0,0,0,0,0,0,0,0,0,0,0 };
77
78
79ostream& operator<<(ostream& out, const CDir& dir)
80{
81 out << "[dir " << dir.dirfrag() << " " << dir.get_path() << "/"
82 << " [" << dir.first << ",head]";
83 if (dir.is_auth()) {
84 out << " auth";
85 if (dir.is_replicated())
86 out << dir.get_replicas();
87
88 if (dir.is_projected())
89 out << " pv=" << dir.get_projected_version();
90 out << " v=" << dir.get_version();
91 out << " cv=" << dir.get_committing_version();
92 out << "/" << dir.get_committed_version();
93 } else {
94 mds_authority_t a = dir.authority();
95 out << " rep@" << a.first;
96 if (a.second != CDIR_AUTH_UNKNOWN)
97 out << "," << a.second;
98 out << "." << dir.get_replica_nonce();
99 }
100
101 if (dir.is_rep()) out << " REP";
102
103 if (dir.get_dir_auth() != CDIR_AUTH_DEFAULT) {
104 if (dir.get_dir_auth().second == CDIR_AUTH_UNKNOWN)
105 out << " dir_auth=" << dir.get_dir_auth().first;
106 else
107 out << " dir_auth=" << dir.get_dir_auth();
108 }
109
110 if (dir.get_cum_auth_pins())
111 out << " ap=" << dir.get_auth_pins()
112 << "+" << dir.get_dir_auth_pins()
113 << "+" << dir.get_nested_auth_pins();
114
115 out << " state=" << dir.get_state();
116 if (dir.state_test(CDir::STATE_COMPLETE)) out << "|complete";
117 if (dir.state_test(CDir::STATE_FREEZINGTREE)) out << "|freezingtree";
118 if (dir.state_test(CDir::STATE_FROZENTREE)) out << "|frozentree";
119 if (dir.state_test(CDir::STATE_AUXSUBTREE)) out << "|auxsubtree";
120 //if (dir.state_test(CDir::STATE_FROZENTREELEAF)) out << "|frozentreeleaf";
121 if (dir.state_test(CDir::STATE_FROZENDIR)) out << "|frozendir";
122 if (dir.state_test(CDir::STATE_FREEZINGDIR)) out << "|freezingdir";
123 if (dir.state_test(CDir::STATE_EXPORTBOUND)) out << "|exportbound";
124 if (dir.state_test(CDir::STATE_IMPORTBOUND)) out << "|importbound";
125 if (dir.state_test(CDir::STATE_BADFRAG)) out << "|badfrag";
126 if (dir.state_test(CDir::STATE_FRAGMENTING)) out << "|fragmenting";
127
128 // fragstat
129 out << " " << dir.fnode.fragstat;
130 if (!(dir.fnode.fragstat == dir.fnode.accounted_fragstat))
131 out << "/" << dir.fnode.accounted_fragstat;
132 if (g_conf->mds_debug_scatterstat && dir.is_projected()) {
133 const fnode_t *pf = dir.get_projected_fnode();
134 out << "->" << pf->fragstat;
135 if (!(pf->fragstat == pf->accounted_fragstat))
136 out << "/" << pf->accounted_fragstat;
137 }
138
139 // rstat
140 out << " " << dir.fnode.rstat;
141 if (!(dir.fnode.rstat == dir.fnode.accounted_rstat))
142 out << "/" << dir.fnode.accounted_rstat;
143 if (g_conf->mds_debug_scatterstat && dir.is_projected()) {
144 const fnode_t *pf = dir.get_projected_fnode();
145 out << "->" << pf->rstat;
146 if (!(pf->rstat == pf->accounted_rstat))
147 out << "/" << pf->accounted_rstat;
148 }
149
150 out << " hs=" << dir.get_num_head_items() << "+" << dir.get_num_head_null();
151 out << ",ss=" << dir.get_num_snap_items() << "+" << dir.get_num_snap_null();
152 if (dir.get_num_dirty())
153 out << " dirty=" << dir.get_num_dirty();
154
155 if (dir.get_num_ref()) {
156 out << " |";
157 dir.print_pin_set(out);
158 }
159
160 out << " " << &dir;
161 return out << "]";
162}
163
164
165void CDir::print(ostream& out)
166{
167 out << *this;
168}
169
170
171
172
173ostream& CDir::print_db_line_prefix(ostream& out)
174{
175 return out << ceph_clock_now() << " mds." << cache->mds->get_nodeid() << ".cache.dir(" << this->dirfrag() << ") ";
176}
177
178
179
180// -------------------------------------------------------------------
181// CDir
182
183CDir::CDir(CInode *in, frag_t fg, MDCache *mdcache, bool auth) :
184 cache(mdcache), inode(in), frag(fg),
185 first(2),
186 dirty_rstat_inodes(member_offset(CInode, dirty_rstat_item)),
b32b8144
FG
187 projected_version(0),
188 dirty_dentries(member_offset(CDentry, item_dir_dirty)),
189 item_dirty(this), item_new(this),
7c673cae
FG
190 num_head_items(0), num_head_null(0),
191 num_snap_items(0), num_snap_null(0),
192 num_dirty(0), committing_version(0), committed_version(0),
193 dir_auth_pins(0), request_pins(0),
194 dir_rep(REP_NONE),
195 pop_me(ceph_clock_now()),
196 pop_nested(ceph_clock_now()),
197 pop_auth_subtree(ceph_clock_now()),
198 pop_auth_subtree_nested(ceph_clock_now()),
28e407b8 199 pop_lru_subdirs(member_offset(CInode, item_pop_lru)),
7c673cae
FG
200 num_dentries_nested(0), num_dentries_auth_subtree(0),
201 num_dentries_auth_subtree_nested(0),
202 dir_auth(CDIR_AUTH_DEFAULT)
203{
7c673cae
FG
204 memset(&fnode, 0, sizeof(fnode));
205
206 // auth
207 assert(in->is_dir());
94b18763 208 if (auth) state_set(STATE_AUTH);
7c673cae
FG
209}
210
211/**
212 * Check the recursive statistics on size for consistency.
213 * If mds_debug_scatterstat is enabled, assert for correctness,
214 * otherwise just print out the mismatch and continue.
215 */
216bool CDir::check_rstats(bool scrub)
217{
218 if (!g_conf->mds_debug_scatterstat && !scrub)
219 return true;
220
221 dout(25) << "check_rstats on " << this << dendl;
222 if (!is_complete() || !is_auth() || is_frozen()) {
223 assert(!scrub);
224 dout(10) << "check_rstats bailing out -- incomplete or non-auth or frozen dir!" << dendl;
225 return true;
226 }
227
228 frag_info_t frag_info;
229 nest_info_t nest_info;
94b18763 230 for (auto i = items.begin(); i != items.end(); ++i) {
7c673cae
FG
231 if (i->second->last != CEPH_NOSNAP)
232 continue;
233 CDentry::linkage_t *dnl = i->second->get_linkage();
234 if (dnl->is_primary()) {
235 CInode *in = dnl->get_inode();
236 nest_info.add(in->inode.accounted_rstat);
237 if (in->is_dir())
238 frag_info.nsubdirs++;
239 else
240 frag_info.nfiles++;
241 } else if (dnl->is_remote())
242 frag_info.nfiles++;
243 }
244
245 bool good = true;
246 // fragstat
247 if(!frag_info.same_sums(fnode.fragstat)) {
248 dout(1) << "mismatch between head items and fnode.fragstat! printing dentries" << dendl;
249 dout(1) << "get_num_head_items() = " << get_num_head_items()
250 << "; fnode.fragstat.nfiles=" << fnode.fragstat.nfiles
251 << " fnode.fragstat.nsubdirs=" << fnode.fragstat.nsubdirs << dendl;
252 good = false;
253 } else {
254 dout(20) << "get_num_head_items() = " << get_num_head_items()
255 << "; fnode.fragstat.nfiles=" << fnode.fragstat.nfiles
256 << " fnode.fragstat.nsubdirs=" << fnode.fragstat.nsubdirs << dendl;
257 }
258
259 // rstat
260 if (!nest_info.same_sums(fnode.rstat)) {
261 dout(1) << "mismatch between child accounted_rstats and my rstats!" << dendl;
262 dout(1) << "total of child dentrys: " << nest_info << dendl;
263 dout(1) << "my rstats: " << fnode.rstat << dendl;
264 good = false;
265 } else {
266 dout(20) << "total of child dentrys: " << nest_info << dendl;
267 dout(20) << "my rstats: " << fnode.rstat << dendl;
268 }
269
270 if (!good) {
271 if (!scrub) {
94b18763 272 for (auto i = items.begin(); i != items.end(); ++i) {
7c673cae
FG
273 CDentry *dn = i->second;
274 if (dn->get_linkage()->is_primary()) {
275 CInode *in = dn->get_linkage()->inode;
276 dout(1) << *dn << " rstat " << in->inode.accounted_rstat << dendl;
277 } else {
278 dout(1) << *dn << dendl;
279 }
280 }
281
282 assert(frag_info.nfiles == fnode.fragstat.nfiles);
283 assert(frag_info.nsubdirs == fnode.fragstat.nsubdirs);
284 assert(nest_info.rbytes == fnode.rstat.rbytes);
285 assert(nest_info.rfiles == fnode.rstat.rfiles);
286 assert(nest_info.rsubdirs == fnode.rstat.rsubdirs);
287 }
288 }
289 dout(10) << "check_rstats complete on " << this << dendl;
290 return good;
291}
292
94b18763 293CDentry *CDir::lookup(boost::string_view name, snapid_t snap)
7c673cae
FG
294{
295 dout(20) << "lookup (" << snap << ", '" << name << "')" << dendl;
94b18763 296 auto iter = items.lower_bound(dentry_key_t(snap, name, inode->hash_dentry_name(name)));
7c673cae
FG
297 if (iter == items.end())
298 return 0;
94b18763 299 if (iter->second->get_name() == name &&
7c673cae
FG
300 iter->second->first <= snap &&
301 iter->second->last >= snap) {
302 dout(20) << " hit -> " << iter->first << dendl;
303 return iter->second;
304 }
305 dout(20) << " miss -> " << iter->first << dendl;
306 return 0;
307}
308
94b18763
FG
309CDentry *CDir::lookup_exact_snap(boost::string_view name, snapid_t last) {
310 auto p = items.find(dentry_key_t(last, name, inode->hash_dentry_name(name)));
7c673cae
FG
311 if (p == items.end())
312 return NULL;
313 return p->second;
314}
315
316/***
317 * linking fun
318 */
319
94b18763 320CDentry* CDir::add_null_dentry(boost::string_view dname,
7c673cae
FG
321 snapid_t first, snapid_t last)
322{
323 // foreign
324 assert(lookup_exact_snap(dname, last) == 0);
325
326 // create dentry
327 CDentry* dn = new CDentry(dname, inode->hash_dentry_name(dname), first, last);
328 if (is_auth())
329 dn->state_set(CDentry::STATE_AUTH);
31f18b77
FG
330
331 cache->bottom_lru.lru_insert_mid(dn);
332 dn->state_set(CDentry::STATE_BOTTOMLRU);
7c673cae
FG
333
334 dn->dir = this;
335 dn->version = get_projected_version();
336
337 // add to dir
338 assert(items.count(dn->key()) == 0);
94b18763 339 //assert(null_items.count(dn->get_name()) == 0);
7c673cae
FG
340
341 items[dn->key()] = dn;
342 if (last == CEPH_NOSNAP)
343 num_head_null++;
344 else
345 num_snap_null++;
346
347 if (state_test(CDir::STATE_DNPINNEDFRAG)) {
348 dn->get(CDentry::PIN_FRAGMENTING);
349 dn->state_set(CDentry::STATE_FRAGMENTING);
350 }
351
352 dout(12) << "add_null_dentry " << *dn << dendl;
353
354 // pin?
355 if (get_num_any() == 1)
356 get(PIN_CHILD);
357
358 assert(get_num_any() == items.size());
359 return dn;
360}
361
362
94b18763 363CDentry* CDir::add_primary_dentry(boost::string_view dname, CInode *in,
7c673cae
FG
364 snapid_t first, snapid_t last)
365{
366 // primary
367 assert(lookup_exact_snap(dname, last) == 0);
368
369 // create dentry
370 CDentry* dn = new CDentry(dname, inode->hash_dentry_name(dname), first, last);
371 if (is_auth())
372 dn->state_set(CDentry::STATE_AUTH);
31f18b77
FG
373 if (is_auth() || !inode->is_stray()) {
374 cache->lru.lru_insert_mid(dn);
375 } else {
376 cache->bottom_lru.lru_insert_mid(dn);
377 dn->state_set(CDentry::STATE_BOTTOMLRU);
378 }
7c673cae
FG
379
380 dn->dir = this;
381 dn->version = get_projected_version();
382
383 // add to dir
384 assert(items.count(dn->key()) == 0);
94b18763 385 //assert(null_items.count(dn->get_name()) == 0);
7c673cae
FG
386
387 items[dn->key()] = dn;
388
389 dn->get_linkage()->inode = in;
7c673cae
FG
390
391 link_inode_work(dn, in);
392
393 if (dn->last == CEPH_NOSNAP)
394 num_head_items++;
395 else
396 num_snap_items++;
397
398 if (state_test(CDir::STATE_DNPINNEDFRAG)) {
399 dn->get(CDentry::PIN_FRAGMENTING);
400 dn->state_set(CDentry::STATE_FRAGMENTING);
401 }
402
403 dout(12) << "add_primary_dentry " << *dn << dendl;
404
405 // pin?
406 if (get_num_any() == 1)
407 get(PIN_CHILD);
408 assert(get_num_any() == items.size());
409 return dn;
410}
411
94b18763 412CDentry* CDir::add_remote_dentry(boost::string_view dname, inodeno_t ino, unsigned char d_type,
7c673cae
FG
413 snapid_t first, snapid_t last)
414{
415 // foreign
416 assert(lookup_exact_snap(dname, last) == 0);
417
418 // create dentry
419 CDentry* dn = new CDentry(dname, inode->hash_dentry_name(dname), ino, d_type, first, last);
420 if (is_auth())
421 dn->state_set(CDentry::STATE_AUTH);
422 cache->lru.lru_insert_mid(dn);
423
424 dn->dir = this;
425 dn->version = get_projected_version();
426
427 // add to dir
428 assert(items.count(dn->key()) == 0);
94b18763 429 //assert(null_items.count(dn->get_name()) == 0);
7c673cae
FG
430
431 items[dn->key()] = dn;
432 if (last == CEPH_NOSNAP)
433 num_head_items++;
434 else
435 num_snap_items++;
436
437 if (state_test(CDir::STATE_DNPINNEDFRAG)) {
438 dn->get(CDentry::PIN_FRAGMENTING);
439 dn->state_set(CDentry::STATE_FRAGMENTING);
440 }
441
442 dout(12) << "add_remote_dentry " << *dn << dendl;
443
444 // pin?
445 if (get_num_any() == 1)
446 get(PIN_CHILD);
447
448 assert(get_num_any() == items.size());
449 return dn;
450}
451
452
453
454void CDir::remove_dentry(CDentry *dn)
455{
456 dout(12) << "remove_dentry " << *dn << dendl;
457
458 // there should be no client leases at this point!
459 assert(dn->client_lease_map.empty());
460
461 if (state_test(CDir::STATE_DNPINNEDFRAG)) {
462 dn->put(CDentry::PIN_FRAGMENTING);
463 dn->state_clear(CDentry::STATE_FRAGMENTING);
464 }
465
466 if (dn->get_linkage()->is_null()) {
467 if (dn->last == CEPH_NOSNAP)
468 num_head_null--;
469 else
470 num_snap_null--;
471 } else {
472 if (dn->last == CEPH_NOSNAP)
473 num_head_items--;
474 else
475 num_snap_items--;
476 }
477
478 if (!dn->get_linkage()->is_null())
479 // detach inode and dentry
480 unlink_inode_work(dn);
481
482 // remove from list
483 assert(items.count(dn->key()) == 1);
484 items.erase(dn->key());
485
486 // clean?
487 if (dn->is_dirty())
488 dn->mark_clean();
489
31f18b77
FG
490 if (dn->state_test(CDentry::STATE_BOTTOMLRU))
491 cache->bottom_lru.lru_remove(dn);
492 else
493 cache->lru.lru_remove(dn);
7c673cae
FG
494 delete dn;
495
496 // unpin?
497 if (get_num_any() == 0)
498 put(PIN_CHILD);
499 assert(get_num_any() == items.size());
500}
501
502void CDir::link_remote_inode(CDentry *dn, CInode *in)
503{
504 link_remote_inode(dn, in->ino(), IFTODT(in->get_projected_inode()->mode));
505}
506
507void CDir::link_remote_inode(CDentry *dn, inodeno_t ino, unsigned char d_type)
508{
509 dout(12) << "link_remote_inode " << *dn << " remote " << ino << dendl;
510 assert(dn->get_linkage()->is_null());
511
512 dn->get_linkage()->set_remote(ino, d_type);
513
31f18b77
FG
514 if (dn->state_test(CDentry::STATE_BOTTOMLRU)) {
515 cache->bottom_lru.lru_remove(dn);
516 cache->lru.lru_insert_mid(dn);
517 dn->state_clear(CDentry::STATE_BOTTOMLRU);
518 }
519
7c673cae
FG
520 if (dn->last == CEPH_NOSNAP) {
521 num_head_items++;
522 num_head_null--;
523 } else {
524 num_snap_items++;
525 num_snap_null--;
526 }
527 assert(get_num_any() == items.size());
528}
529
530void CDir::link_primary_inode(CDentry *dn, CInode *in)
531{
532 dout(12) << "link_primary_inode " << *dn << " " << *in << dendl;
533 assert(dn->get_linkage()->is_null());
534
535 dn->get_linkage()->inode = in;
7c673cae
FG
536
537 link_inode_work(dn, in);
31f18b77
FG
538
539 if (dn->state_test(CDentry::STATE_BOTTOMLRU) &&
540 (is_auth() || !inode->is_stray())) {
541 cache->bottom_lru.lru_remove(dn);
542 cache->lru.lru_insert_mid(dn);
543 dn->state_clear(CDentry::STATE_BOTTOMLRU);
544 }
7c673cae
FG
545
546 if (dn->last == CEPH_NOSNAP) {
547 num_head_items++;
548 num_head_null--;
549 } else {
550 num_snap_items++;
551 num_snap_null--;
552 }
553
554 assert(get_num_any() == items.size());
555}
556
557void CDir::link_inode_work( CDentry *dn, CInode *in)
558{
559 assert(dn->get_linkage()->get_inode() == in);
28e407b8 560 in->set_primary_parent(dn);
7c673cae
FG
561
562 // set inode version
563 //in->inode.version = dn->get_version();
564
565 // pin dentry?
566 if (in->get_num_ref())
567 dn->get(CDentry::PIN_INODEPIN);
568
569 // adjust auth pin count
570 if (in->auth_pins + in->nested_auth_pins)
571 dn->adjust_nested_auth_pins(in->auth_pins + in->nested_auth_pins, in->auth_pins, NULL);
572
573 // verify open snaprealm parent
574 if (in->snaprealm)
575 in->snaprealm->adjust_parent();
576 else if (in->is_any_caps())
577 in->move_to_realm(inode->find_snaprealm());
578}
579
31f18b77 580void CDir::unlink_inode(CDentry *dn, bool adjust_lru)
7c673cae
FG
581{
582 if (dn->get_linkage()->is_primary()) {
583 dout(12) << "unlink_inode " << *dn << " " << *dn->get_linkage()->get_inode() << dendl;
584 } else {
585 dout(12) << "unlink_inode " << *dn << dendl;
586 }
587
588 unlink_inode_work(dn);
589
31f18b77
FG
590 if (adjust_lru && !dn->state_test(CDentry::STATE_BOTTOMLRU)) {
591 cache->lru.lru_remove(dn);
592 cache->bottom_lru.lru_insert_mid(dn);
593 dn->state_set(CDentry::STATE_BOTTOMLRU);
594 }
595
7c673cae
FG
596 if (dn->last == CEPH_NOSNAP) {
597 num_head_items--;
598 num_head_null++;
599 } else {
600 num_snap_items--;
601 num_snap_null++;
602 }
603 assert(get_num_any() == items.size());
604}
605
606
607void CDir::try_remove_unlinked_dn(CDentry *dn)
608{
609 assert(dn->dir == this);
610 assert(dn->get_linkage()->is_null());
611
612 // no pins (besides dirty)?
613 if (dn->get_num_ref() != dn->is_dirty())
614 return;
615
616 // was the dn new?
617 if (dn->is_new()) {
618 dout(10) << "try_remove_unlinked_dn " << *dn << " in " << *this << dendl;
619 if (dn->is_dirty())
620 dn->mark_clean();
621 remove_dentry(dn);
622
623 // NOTE: we may not have any more dirty dentries, but the fnode
624 // still changed, so the directory must remain dirty.
625 }
626}
627
628
629void CDir::unlink_inode_work( CDentry *dn )
630{
631 CInode *in = dn->get_linkage()->get_inode();
632
633 if (dn->get_linkage()->is_remote()) {
634 // remote
635 if (in)
636 dn->unlink_remote(dn->get_linkage());
637
638 dn->get_linkage()->set_remote(0, 0);
639 } else if (dn->get_linkage()->is_primary()) {
640 // primary
641 // unpin dentry?
642 if (in->get_num_ref())
643 dn->put(CDentry::PIN_INODEPIN);
644
645 // unlink auth_pin count
646 if (in->auth_pins + in->nested_auth_pins)
647 dn->adjust_nested_auth_pins(0 - (in->auth_pins + in->nested_auth_pins), 0 - in->auth_pins, NULL);
28e407b8 648
7c673cae
FG
649 // detach inode
650 in->remove_primary_parent(dn);
28e407b8
AA
651 if (in->is_dir())
652 in->item_pop_lru.remove_myself();
7c673cae
FG
653 dn->get_linkage()->inode = 0;
654 } else {
655 assert(!dn->get_linkage()->is_null());
656 }
657}
658
659void CDir::add_to_bloom(CDentry *dn)
660{
661 assert(dn->last == CEPH_NOSNAP);
662 if (!bloom) {
663 /* not create bloom filter for incomplete dir that was added by log replay */
664 if (!is_complete())
665 return;
666
667 /* don't maintain bloom filters in standby replay (saves cycles, and also
668 * avoids need to implement clearing it in EExport for #16924) */
669 if (cache->mds->is_standby_replay()) {
670 return;
671 }
672
673 unsigned size = get_num_head_items() + get_num_snap_items();
674 if (size < 100) size = 100;
675 bloom.reset(new bloom_filter(size, 1.0 / size, 0));
676 }
677 /* This size and false positive probability is completely random.*/
94b18763 678 bloom->insert(dn->get_name().data(), dn->get_name().size());
7c673cae
FG
679}
680
94b18763 681bool CDir::is_in_bloom(boost::string_view name)
7c673cae
FG
682{
683 if (!bloom)
684 return false;
94b18763 685 return bloom->contains(name.data(), name.size());
7c673cae
FG
686}
687
688void CDir::remove_null_dentries() {
689 dout(12) << "remove_null_dentries " << *this << dendl;
690
94b18763 691 auto p = items.begin();
7c673cae
FG
692 while (p != items.end()) {
693 CDentry *dn = p->second;
694 ++p;
695 if (dn->get_linkage()->is_null() && !dn->is_projected())
696 remove_dentry(dn);
697 }
698
699 assert(num_snap_null == 0);
700 assert(num_head_null == 0);
701 assert(get_num_any() == items.size());
702}
703
704/** remove dirty null dentries for deleted directory. the dirfrag will be
705 * deleted soon, so it's safe to not commit dirty dentries.
706 *
707 * This is called when a directory is being deleted, a prerequisite
708 * of which is that its children have been unlinked: we expect to only see
709 * null, unprojected dentries here.
710 */
711void CDir::try_remove_dentries_for_stray()
712{
713 dout(10) << __func__ << dendl;
31f18b77 714 assert(get_parent_dir()->inode->is_stray());
7c673cae
FG
715
716 // clear dirty only when the directory was not snapshotted
717 bool clear_dirty = !inode->snaprealm;
718
94b18763 719 auto p = items.begin();
7c673cae
FG
720 while (p != items.end()) {
721 CDentry *dn = p->second;
722 ++p;
723 if (dn->last == CEPH_NOSNAP) {
724 assert(!dn->is_projected());
725 assert(dn->get_linkage()->is_null());
726 if (clear_dirty && dn->is_dirty())
727 dn->mark_clean();
728 // It's OK to remove lease prematurely because we will never link
729 // the dentry to inode again.
730 if (dn->is_any_leases())
731 dn->remove_client_leases(cache->mds->locker);
732 if (dn->get_num_ref() == 0)
733 remove_dentry(dn);
734 } else {
735 assert(!dn->is_projected());
736 CDentry::linkage_t *dnl= dn->get_linkage();
737 CInode *in = NULL;
738 if (dnl->is_primary()) {
739 in = dnl->get_inode();
740 if (clear_dirty && in->is_dirty())
741 in->mark_clean();
742 }
743 if (clear_dirty && dn->is_dirty())
744 dn->mark_clean();
745 if (dn->get_num_ref() == 0) {
746 remove_dentry(dn);
747 if (in)
748 cache->remove_inode(in);
749 }
750 }
751 }
752
753 if (clear_dirty && is_dirty())
754 mark_clean();
755}
756
7c673cae
FG
757bool CDir::try_trim_snap_dentry(CDentry *dn, const set<snapid_t>& snaps)
758{
759 assert(dn->last != CEPH_NOSNAP);
760 set<snapid_t>::const_iterator p = snaps.lower_bound(dn->first);
761 CDentry::linkage_t *dnl= dn->get_linkage();
762 CInode *in = 0;
763 if (dnl->is_primary())
764 in = dnl->get_inode();
765 if ((p == snaps.end() || *p > dn->last) &&
766 (dn->get_num_ref() == dn->is_dirty()) &&
767 (!in || in->get_num_ref() == in->is_dirty())) {
768 dout(10) << " purging snapped " << *dn << dendl;
769 if (in && in->is_dirty())
770 in->mark_clean();
771 remove_dentry(dn);
772 if (in) {
773 dout(10) << " purging snapped " << *in << dendl;
774 cache->remove_inode(in);
775 }
776 return true;
777 }
778 return false;
779}
780
781
782void CDir::purge_stale_snap_data(const set<snapid_t>& snaps)
783{
784 dout(10) << "purge_stale_snap_data " << snaps << dendl;
785
94b18763 786 auto p = items.begin();
7c673cae
FG
787 while (p != items.end()) {
788 CDentry *dn = p->second;
789 ++p;
790
791 if (dn->last == CEPH_NOSNAP)
792 continue;
793
794 try_trim_snap_dentry(dn, snaps);
795 }
796}
797
798
799/**
800 * steal_dentry -- semi-violently move a dentry from one CDir to another
801 * (*) violently, in that nitems, most pins, etc. are not correctly maintained
802 * on the old CDir corpse; must call finish_old_fragment() when finished.
803 */
804void CDir::steal_dentry(CDentry *dn)
805{
806 dout(15) << "steal_dentry " << *dn << dendl;
807
808 items[dn->key()] = dn;
809
810 dn->dir->items.erase(dn->key());
811 if (dn->dir->items.empty())
812 dn->dir->put(PIN_CHILD);
813
814 if (get_num_any() == 0)
815 get(PIN_CHILD);
816 if (dn->get_linkage()->is_null()) {
817 if (dn->last == CEPH_NOSNAP)
818 num_head_null++;
819 else
820 num_snap_null++;
821 } else if (dn->last == CEPH_NOSNAP) {
822 num_head_items++;
823
824 if (dn->get_linkage()->is_primary()) {
825 CInode *in = dn->get_linkage()->get_inode();
94b18763 826 auto pi = in->get_projected_inode();
28e407b8 827 if (in->is_dir()) {
7c673cae 828 fnode.fragstat.nsubdirs++;
28e407b8
AA
829 if (in->item_pop_lru.is_on_list())
830 pop_lru_subdirs.push_back(&in->item_pop_lru);
831 } else {
7c673cae 832 fnode.fragstat.nfiles++;
28e407b8 833 }
7c673cae
FG
834 fnode.rstat.rbytes += pi->accounted_rstat.rbytes;
835 fnode.rstat.rfiles += pi->accounted_rstat.rfiles;
836 fnode.rstat.rsubdirs += pi->accounted_rstat.rsubdirs;
837 fnode.rstat.rsnaprealms += pi->accounted_rstat.rsnaprealms;
838 if (pi->accounted_rstat.rctime > fnode.rstat.rctime)
839 fnode.rstat.rctime = pi->accounted_rstat.rctime;
840
841 // move dirty inode rstat to new dirfrag
842 if (in->is_dirty_rstat())
843 dirty_rstat_inodes.push_back(&in->dirty_rstat_item);
844 } else if (dn->get_linkage()->is_remote()) {
845 if (dn->get_linkage()->get_remote_d_type() == DT_DIR)
846 fnode.fragstat.nsubdirs++;
847 else
848 fnode.fragstat.nfiles++;
849 }
850 } else {
851 num_snap_items++;
852 if (dn->get_linkage()->is_primary()) {
853 CInode *in = dn->get_linkage()->get_inode();
854 if (in->is_dirty_rstat())
855 dirty_rstat_inodes.push_back(&in->dirty_rstat_item);
856 }
857 }
858
859 if (dn->auth_pins || dn->nested_auth_pins) {
860 // use the helpers here to maintain the auth_pin invariants on the dir inode
861 int ap = dn->get_num_auth_pins() + dn->get_num_nested_auth_pins();
862 int dap = dn->get_num_dir_auth_pins();
863 assert(dap <= ap);
864 adjust_nested_auth_pins(ap, dap, NULL);
865 dn->dir->adjust_nested_auth_pins(-ap, -dap, NULL);
866 }
867
b32b8144
FG
868 if (dn->is_dirty()) {
869 dirty_dentries.push_back(&dn->item_dir_dirty);
7c673cae 870 num_dirty++;
b32b8144 871 }
7c673cae
FG
872
873 dn->dir = this;
874}
875
31f18b77 876void CDir::prepare_old_fragment(map<string_snap_t, std::list<MDSInternalContextBase*> >& dentry_waiters, bool replay)
7c673cae
FG
877{
878 // auth_pin old fragment for duration so that any auth_pinning
879 // during the dentry migration doesn't trigger side effects
880 if (!replay && is_auth())
881 auth_pin(this);
31f18b77
FG
882
883 if (!waiting_on_dentry.empty()) {
94b18763
FG
884 for (const auto &p : waiting_on_dentry) {
885 auto &e = dentry_waiters[p.first];
886 for (const auto &waiter : p.second) {
887 e.push_back(waiter);
888 }
889 }
31f18b77
FG
890 waiting_on_dentry.clear();
891 put(PIN_DNWAITER);
892 }
7c673cae
FG
893}
894
895void CDir::prepare_new_fragment(bool replay)
896{
897 if (!replay && is_auth()) {
898 _freeze_dir();
899 mark_complete();
900 }
31f18b77 901 inode->add_dirfrag(this);
7c673cae
FG
902}
903
904void CDir::finish_old_fragment(list<MDSInternalContextBase*>& waiters, bool replay)
905{
906 // take waiters _before_ unfreeze...
907 if (!replay) {
908 take_waiting(WAIT_ANY_MASK, waiters);
909 if (is_auth()) {
910 auth_unpin(this); // pinned in prepare_old_fragment
911 assert(is_frozen_dir());
912 unfreeze_dir();
913 }
914 }
915
916 assert(nested_auth_pins == 0);
917 assert(dir_auth_pins == 0);
918 assert(auth_pins == 0);
919
920 num_head_items = num_head_null = 0;
921 num_snap_items = num_snap_null = 0;
922
923 // this mirrors init_fragment_pins()
924 if (is_auth())
925 clear_replica_map();
926 if (is_dirty())
927 mark_clean();
928 if (state_test(STATE_IMPORTBOUND))
929 put(PIN_IMPORTBOUND);
930 if (state_test(STATE_EXPORTBOUND))
931 put(PIN_EXPORTBOUND);
932 if (is_subtree_root())
933 put(PIN_SUBTREE);
934
935 if (auth_pins > 0)
936 put(PIN_AUTHPIN);
937
938 assert(get_num_ref() == (state_test(STATE_STICKY) ? 1:0));
939}
940
941void CDir::init_fragment_pins()
942{
181888fb 943 if (is_replicated())
7c673cae
FG
944 get(PIN_REPLICATED);
945 if (state_test(STATE_DIRTY))
946 get(PIN_DIRTY);
947 if (state_test(STATE_EXPORTBOUND))
948 get(PIN_EXPORTBOUND);
949 if (state_test(STATE_IMPORTBOUND))
950 get(PIN_IMPORTBOUND);
951 if (is_subtree_root())
952 get(PIN_SUBTREE);
953}
954
955void CDir::split(int bits, list<CDir*>& subs, list<MDSInternalContextBase*>& waiters, bool replay)
956{
957 dout(10) << "split by " << bits << " bits on " << *this << dendl;
958
959 assert(replay || is_complete() || !is_auth());
960
961 list<frag_t> frags;
962 frag.split(bits, frags);
963
964 vector<CDir*> subfrags(1 << bits);
965
966 double fac = 1.0 / (double)(1 << bits); // for scaling load vecs
967
968 version_t rstat_version = inode->get_projected_inode()->rstat.version;
969 version_t dirstat_version = inode->get_projected_inode()->dirstat.version;
970
971 nest_info_t rstatdiff;
972 frag_info_t fragstatdiff;
973 if (fnode.accounted_rstat.version == rstat_version)
974 rstatdiff.add_delta(fnode.accounted_rstat, fnode.rstat);
975 if (fnode.accounted_fragstat.version == dirstat_version)
976 fragstatdiff.add_delta(fnode.accounted_fragstat, fnode.fragstat);
977 dout(10) << " rstatdiff " << rstatdiff << " fragstatdiff " << fragstatdiff << dendl;
978
31f18b77
FG
979 map<string_snap_t, std::list<MDSInternalContextBase*> > dentry_waiters;
980 prepare_old_fragment(dentry_waiters, replay);
7c673cae
FG
981
982 // create subfrag dirs
983 int n = 0;
984 for (list<frag_t>::iterator p = frags.begin(); p != frags.end(); ++p) {
985 CDir *f = new CDir(inode, *p, cache, is_auth());
986 f->state_set(state & (MASK_STATE_FRAGMENT_KEPT | STATE_COMPLETE));
181888fb 987 f->get_replicas() = get_replicas();
7c673cae
FG
988 f->dir_auth = dir_auth;
989 f->init_fragment_pins();
990 f->set_version(get_version());
991
992 f->pop_me = pop_me;
993 f->pop_me.scale(fac);
994
995 // FIXME; this is an approximation
996 f->pop_nested = pop_nested;
997 f->pop_nested.scale(fac);
998 f->pop_auth_subtree = pop_auth_subtree;
999 f->pop_auth_subtree.scale(fac);
1000 f->pop_auth_subtree_nested = pop_auth_subtree_nested;
1001 f->pop_auth_subtree_nested.scale(fac);
1002
1003 dout(10) << " subfrag " << *p << " " << *f << dendl;
1004 subfrags[n++] = f;
1005 subs.push_back(f);
7c673cae
FG
1006
1007 f->set_dir_auth(get_dir_auth());
1008 f->prepare_new_fragment(replay);
1009 }
1010
1011 // repartition dentries
1012 while (!items.empty()) {
94b18763 1013 auto p = items.begin();
7c673cae
FG
1014
1015 CDentry *dn = p->second;
94b18763 1016 frag_t subfrag = inode->pick_dirfrag(dn->get_name());
7c673cae
FG
1017 int n = (subfrag.value() & (subfrag.mask() ^ frag.mask())) >> subfrag.mask_shift();
1018 dout(15) << " subfrag " << subfrag << " n=" << n << " for " << p->first << dendl;
1019 CDir *f = subfrags[n];
1020 f->steal_dentry(dn);
1021 }
1022
94b18763 1023 for (const auto &p : dentry_waiters) {
31f18b77
FG
1024 frag_t subfrag = inode->pick_dirfrag(p.first.name);
1025 int n = (subfrag.value() & (subfrag.mask() ^ frag.mask())) >> subfrag.mask_shift();
1026 CDir *f = subfrags[n];
1027
1028 if (f->waiting_on_dentry.empty())
1029 f->get(PIN_DNWAITER);
94b18763
FG
1030 auto &e = f->waiting_on_dentry[p.first];
1031 for (const auto &waiter : p.second) {
1032 e.push_back(waiter);
1033 }
31f18b77
FG
1034 }
1035
7c673cae
FG
1036 // FIXME: handle dirty old rstat
1037
1038 // fix up new frag fragstats
1039 for (int i=0; i<n; i++) {
1040 CDir *f = subfrags[i];
1041 f->fnode.rstat.version = rstat_version;
1042 f->fnode.accounted_rstat = f->fnode.rstat;
1043 f->fnode.fragstat.version = dirstat_version;
1044 f->fnode.accounted_fragstat = f->fnode.fragstat;
1045 dout(10) << " rstat " << f->fnode.rstat << " fragstat " << f->fnode.fragstat
1046 << " on " << *f << dendl;
1047 }
1048
1049 // give any outstanding frag stat differential to first frag
1050 dout(10) << " giving rstatdiff " << rstatdiff << " fragstatdiff" << fragstatdiff
1051 << " to " << *subfrags[0] << dendl;
1052 subfrags[0]->fnode.accounted_rstat.add(rstatdiff);
1053 subfrags[0]->fnode.accounted_fragstat.add(fragstatdiff);
1054
1055 finish_old_fragment(waiters, replay);
1056}
1057
1058void CDir::merge(list<CDir*>& subs, list<MDSInternalContextBase*>& waiters, bool replay)
1059{
1060 dout(10) << "merge " << subs << dendl;
1061
1062 mds_authority_t new_auth = CDIR_AUTH_DEFAULT;
1063 for (auto dir : subs) {
1064 if (dir->get_dir_auth() != CDIR_AUTH_DEFAULT &&
1065 dir->get_dir_auth() != new_auth) {
1066 assert(new_auth == CDIR_AUTH_DEFAULT);
1067 new_auth = dir->get_dir_auth();
1068 }
1069 }
1070
1071 set_dir_auth(new_auth);
1072 prepare_new_fragment(replay);
1073
1074 nest_info_t rstatdiff;
1075 frag_info_t fragstatdiff;
1076 bool touched_mtime, touched_chattr;
1077 version_t rstat_version = inode->get_projected_inode()->rstat.version;
1078 version_t dirstat_version = inode->get_projected_inode()->dirstat.version;
1079
31f18b77
FG
1080 map<string_snap_t, std::list<MDSInternalContextBase*> > dentry_waiters;
1081
7c673cae
FG
1082 for (auto dir : subs) {
1083 dout(10) << " subfrag " << dir->get_frag() << " " << *dir << dendl;
1084 assert(!dir->is_auth() || dir->is_complete() || replay);
1085
1086 if (dir->fnode.accounted_rstat.version == rstat_version)
1087 rstatdiff.add_delta(dir->fnode.accounted_rstat, dir->fnode.rstat);
1088 if (dir->fnode.accounted_fragstat.version == dirstat_version)
1089 fragstatdiff.add_delta(dir->fnode.accounted_fragstat, dir->fnode.fragstat,
1090 &touched_mtime, &touched_chattr);
1091
31f18b77 1092 dir->prepare_old_fragment(dentry_waiters, replay);
7c673cae
FG
1093
1094 // steal dentries
1095 while (!dir->items.empty())
1096 steal_dentry(dir->items.begin()->second);
1097
1098 // merge replica map
181888fb
FG
1099 for (const auto &p : dir->get_replicas()) {
1100 unsigned cur = get_replicas()[p.first];
1101 if (p.second > cur)
1102 get_replicas()[p.first] = p.second;
7c673cae
FG
1103 }
1104
1105 // merge version
1106 if (dir->get_version() > get_version())
1107 set_version(dir->get_version());
1108
1109 // merge state
1110 state_set(dir->get_state() & MASK_STATE_FRAGMENT_KEPT);
7c673cae
FG
1111
1112 dir->finish_old_fragment(waiters, replay);
1113 inode->close_dirfrag(dir->get_frag());
1114 }
1115
31f18b77
FG
1116 if (!dentry_waiters.empty()) {
1117 get(PIN_DNWAITER);
94b18763
FG
1118 for (const auto &p : dentry_waiters) {
1119 auto &e = waiting_on_dentry[p.first];
1120 for (const auto &waiter : p.second) {
1121 e.push_back(waiter);
1122 }
31f18b77
FG
1123 }
1124 }
1125
7c673cae
FG
1126 if (is_auth() && !replay)
1127 mark_complete();
1128
1129 // FIXME: merge dirty old rstat
1130 fnode.rstat.version = rstat_version;
1131 fnode.accounted_rstat = fnode.rstat;
1132 fnode.accounted_rstat.add(rstatdiff);
1133
1134 fnode.fragstat.version = dirstat_version;
1135 fnode.accounted_fragstat = fnode.fragstat;
1136 fnode.accounted_fragstat.add(fragstatdiff);
1137
1138 init_fragment_pins();
1139}
1140
1141
1142
1143
1144void CDir::resync_accounted_fragstat()
1145{
1146 fnode_t *pf = get_projected_fnode();
94b18763 1147 auto pi = inode->get_projected_inode();
7c673cae
FG
1148
1149 if (pf->accounted_fragstat.version != pi->dirstat.version) {
1150 pf->fragstat.version = pi->dirstat.version;
1151 dout(10) << "resync_accounted_fragstat " << pf->accounted_fragstat << " -> " << pf->fragstat << dendl;
1152 pf->accounted_fragstat = pf->fragstat;
1153 }
1154}
1155
1156/*
1157 * resync rstat and accounted_rstat with inode
1158 */
1159void CDir::resync_accounted_rstat()
1160{
1161 fnode_t *pf = get_projected_fnode();
94b18763 1162 auto pi = inode->get_projected_inode();
7c673cae
FG
1163
1164 if (pf->accounted_rstat.version != pi->rstat.version) {
1165 pf->rstat.version = pi->rstat.version;
1166 dout(10) << "resync_accounted_rstat " << pf->accounted_rstat << " -> " << pf->rstat << dendl;
1167 pf->accounted_rstat = pf->rstat;
1168 dirty_old_rstat.clear();
1169 }
1170}
1171
1172void CDir::assimilate_dirty_rstat_inodes()
1173{
1174 dout(10) << "assimilate_dirty_rstat_inodes" << dendl;
1175 for (elist<CInode*>::iterator p = dirty_rstat_inodes.begin_use_current();
1176 !p.end(); ++p) {
1177 CInode *in = *p;
1178 assert(in->is_auth());
1179 if (in->is_frozen())
1180 continue;
1181
94b18763
FG
1182 auto &pi = in->project_inode();
1183 pi.inode.version = in->pre_dirty();
7c673cae
FG
1184
1185 inode->mdcache->project_rstat_inode_to_frag(in, this, 0, 0, NULL);
1186 }
1187 state_set(STATE_ASSIMRSTAT);
1188 dout(10) << "assimilate_dirty_rstat_inodes done" << dendl;
1189}
1190
1191void CDir::assimilate_dirty_rstat_inodes_finish(MutationRef& mut, EMetaBlob *blob)
1192{
1193 if (!state_test(STATE_ASSIMRSTAT))
1194 return;
1195 state_clear(STATE_ASSIMRSTAT);
1196 dout(10) << "assimilate_dirty_rstat_inodes_finish" << dendl;
1197 elist<CInode*>::iterator p = dirty_rstat_inodes.begin_use_current();
1198 while (!p.end()) {
1199 CInode *in = *p;
1200 ++p;
1201
1202 if (in->is_frozen())
1203 continue;
1204
1205 CDentry *dn = in->get_projected_parent_dn();
1206
1207 mut->auth_pin(in);
1208 mut->add_projected_inode(in);
1209
1210 in->clear_dirty_rstat();
1211 blob->add_primary_dentry(dn, in, true);
1212 }
1213
1214 if (!dirty_rstat_inodes.empty())
1215 inode->mdcache->mds->locker->mark_updated_scatterlock(&inode->nestlock);
1216}
1217
1218
1219
1220
1221/****************************************
1222 * WAITING
1223 */
1224
94b18763 1225void CDir::add_dentry_waiter(boost::string_view dname, snapid_t snapid, MDSInternalContextBase *c)
7c673cae
FG
1226{
1227 if (waiting_on_dentry.empty())
1228 get(PIN_DNWAITER);
1229 waiting_on_dentry[string_snap_t(dname, snapid)].push_back(c);
1230 dout(10) << "add_dentry_waiter dentry " << dname
1231 << " snap " << snapid
1232 << " " << c << " on " << *this << dendl;
1233}
1234
94b18763 1235void CDir::take_dentry_waiting(boost::string_view dname, snapid_t first, snapid_t last,
7c673cae
FG
1236 list<MDSInternalContextBase*>& ls)
1237{
1238 if (waiting_on_dentry.empty())
1239 return;
1240
1241 string_snap_t lb(dname, first);
1242 string_snap_t ub(dname, last);
94b18763
FG
1243 auto it = waiting_on_dentry.lower_bound(lb);
1244 while (it != waiting_on_dentry.end() &&
1245 !(ub < it->first)) {
7c673cae
FG
1246 dout(10) << "take_dentry_waiting dentry " << dname
1247 << " [" << first << "," << last << "] found waiter on snap "
94b18763 1248 << it->first.snapid
7c673cae 1249 << " on " << *this << dendl;
94b18763
FG
1250 for (const auto &waiter : it->second) {
1251 ls.push_back(waiter);
1252 }
1253 waiting_on_dentry.erase(it++);
7c673cae
FG
1254 }
1255
1256 if (waiting_on_dentry.empty())
1257 put(PIN_DNWAITER);
1258}
1259
1260void CDir::take_sub_waiting(list<MDSInternalContextBase*>& ls)
1261{
1262 dout(10) << "take_sub_waiting" << dendl;
1263 if (!waiting_on_dentry.empty()) {
94b18763
FG
1264 for (const auto &p : waiting_on_dentry) {
1265 for (const auto &waiter : p.second) {
1266 ls.push_back(waiter);
1267 }
1268 }
7c673cae
FG
1269 waiting_on_dentry.clear();
1270 put(PIN_DNWAITER);
1271 }
1272}
1273
1274
1275
1276void CDir::add_waiter(uint64_t tag, MDSInternalContextBase *c)
1277{
1278 // hierarchical?
1279
1280 // at free root?
1281 if (tag & WAIT_ATFREEZEROOT) {
1282 if (!(is_freezing_tree_root() || is_frozen_tree_root() ||
1283 is_freezing_dir() || is_frozen_dir())) {
1284 // try parent
1285 dout(10) << "add_waiter " << std::hex << tag << std::dec << " " << c << " should be ATFREEZEROOT, " << *this << " is not root, trying parent" << dendl;
1286 inode->parent->dir->add_waiter(tag, c);
1287 return;
1288 }
1289 }
1290
1291 // at subtree root?
1292 if (tag & WAIT_ATSUBTREEROOT) {
1293 if (!is_subtree_root()) {
1294 // try parent
1295 dout(10) << "add_waiter " << std::hex << tag << std::dec << " " << c << " should be ATSUBTREEROOT, " << *this << " is not root, trying parent" << dendl;
1296 inode->parent->dir->add_waiter(tag, c);
1297 return;
1298 }
1299 }
1300
1301 assert(!(tag & WAIT_CREATED) || state_test(STATE_CREATING));
1302
1303 MDSCacheObject::add_waiter(tag, c);
1304}
1305
1306
1307
1308/* NOTE: this checks dentry waiters too */
1309void CDir::take_waiting(uint64_t mask, list<MDSInternalContextBase*>& ls)
1310{
1311 if ((mask & WAIT_DENTRY) && !waiting_on_dentry.empty()) {
1312 // take all dentry waiters
94b18763
FG
1313 for (const auto &p : waiting_on_dentry) {
1314 dout(10) << "take_waiting dentry " << p.first.name
1315 << " snap " << p.first.snapid << " on " << *this << dendl;
1316 for (const auto &waiter : p.second) {
1317 ls.push_back(waiter);
1318 }
7c673cae 1319 }
94b18763 1320 waiting_on_dentry.clear();
7c673cae
FG
1321 put(PIN_DNWAITER);
1322 }
1323
1324 // waiting
1325 MDSCacheObject::take_waiting(mask, ls);
1326}
1327
1328
1329void CDir::finish_waiting(uint64_t mask, int result)
1330{
1331 dout(11) << "finish_waiting mask " << hex << mask << dec << " result " << result << " on " << *this << dendl;
1332
1333 list<MDSInternalContextBase*> finished;
1334 take_waiting(mask, finished);
1335 if (result < 0)
1336 finish_contexts(g_ceph_context, finished, result);
1337 else
1338 cache->mds->queue_waiters(finished);
1339}
1340
1341
1342
1343// dirty/clean
1344
1345fnode_t *CDir::project_fnode()
1346{
1347 assert(get_version() != 0);
94b18763
FG
1348 projected_fnode.emplace_back(*get_projected_fnode());
1349 auto &p = projected_fnode.back();
7c673cae
FG
1350
1351 if (scrub_infop && scrub_infop->last_scrub_dirty) {
94b18763
FG
1352 p.localized_scrub_stamp = scrub_infop->last_local.time;
1353 p.localized_scrub_version = scrub_infop->last_local.version;
1354 p.recursive_scrub_stamp = scrub_infop->last_recursive.time;
1355 p.recursive_scrub_version = scrub_infop->last_recursive.version;
7c673cae
FG
1356 scrub_infop->last_scrub_dirty = false;
1357 scrub_maybe_delete_info();
1358 }
1359
94b18763
FG
1360 dout(10) << __func__ << " " << &p << dendl;
1361 return &p;
7c673cae
FG
1362}
1363
1364void CDir::pop_and_dirty_projected_fnode(LogSegment *ls)
1365{
1366 assert(!projected_fnode.empty());
94b18763
FG
1367 auto &front = projected_fnode.front();
1368 dout(15) << __func__ << " " << &front << " v" << front.version << dendl;
1369 fnode = front;
7c673cae 1370 _mark_dirty(ls);
7c673cae
FG
1371 projected_fnode.pop_front();
1372}
1373
1374
1375version_t CDir::pre_dirty(version_t min)
1376{
1377 if (min > projected_version)
1378 projected_version = min;
1379 ++projected_version;
1380 dout(10) << "pre_dirty " << projected_version << dendl;
1381 return projected_version;
1382}
1383
1384void CDir::mark_dirty(version_t pv, LogSegment *ls)
1385{
1386 assert(get_version() < pv);
1387 assert(pv <= projected_version);
1388 fnode.version = pv;
1389 _mark_dirty(ls);
1390}
1391
1392void CDir::_mark_dirty(LogSegment *ls)
1393{
1394 if (!state_test(STATE_DIRTY)) {
1395 dout(10) << "mark_dirty (was clean) " << *this << " version " << get_version() << dendl;
1396 _set_dirty_flag();
1397 assert(ls);
1398 } else {
1399 dout(10) << "mark_dirty (already dirty) " << *this << " version " << get_version() << dendl;
1400 }
1401 if (ls) {
1402 ls->dirty_dirfrags.push_back(&item_dirty);
1403
1404 // if i've never committed, i need to be before _any_ mention of me is trimmed from the journal.
1405 if (committed_version == 0 && !item_new.is_on_list())
1406 ls->new_dirfrags.push_back(&item_new);
1407 }
1408}
1409
1410void CDir::mark_new(LogSegment *ls)
1411{
1412 ls->new_dirfrags.push_back(&item_new);
1413 state_clear(STATE_CREATING);
1414
1415 list<MDSInternalContextBase*> waiters;
1416 take_waiting(CDir::WAIT_CREATED, waiters);
1417 cache->mds->queue_waiters(waiters);
1418}
1419
1420void CDir::mark_clean()
1421{
1422 dout(10) << "mark_clean " << *this << " version " << get_version() << dendl;
1423 if (state_test(STATE_DIRTY)) {
1424 item_dirty.remove_myself();
1425 item_new.remove_myself();
1426
1427 state_clear(STATE_DIRTY);
1428 put(PIN_DIRTY);
1429 }
1430}
1431
1432// caller should hold auth pin of this
1433void CDir::log_mark_dirty()
1434{
b32b8144 1435 if (is_dirty() || projected_version > get_version())
7c673cae
FG
1436 return; // noop if it is already dirty or will be dirty
1437
1438 version_t pv = pre_dirty();
1439 mark_dirty(pv, cache->mds->mdlog->get_current_segment());
1440}
1441
1442void CDir::mark_complete() {
1443 state_set(STATE_COMPLETE);
1444 bloom.reset();
1445}
1446
1447void CDir::first_get()
1448{
1449 inode->get(CInode::PIN_DIRFRAG);
1450}
1451
1452void CDir::last_put()
1453{
1454 inode->put(CInode::PIN_DIRFRAG);
1455}
1456
1457
1458
1459/******************************************************************************
1460 * FETCH and COMMIT
1461 */
1462
1463// -----------------------
1464// FETCH
1465void CDir::fetch(MDSInternalContextBase *c, bool ignore_authpinnability)
1466{
1467 string want;
1468 return fetch(c, want, ignore_authpinnability);
1469}
1470
94b18763 1471void CDir::fetch(MDSInternalContextBase *c, boost::string_view want_dn, bool ignore_authpinnability)
7c673cae
FG
1472{
1473 dout(10) << "fetch on " << *this << dendl;
1474
1475 assert(is_auth());
1476 assert(!is_complete());
1477
1478 if (!can_auth_pin() && !ignore_authpinnability) {
1479 if (c) {
1480 dout(7) << "fetch waiting for authpinnable" << dendl;
1481 add_waiter(WAIT_UNFREEZE, c);
1482 } else
1483 dout(7) << "fetch not authpinnable and no context" << dendl;
1484 return;
1485 }
1486
1487 // unlinked directory inode shouldn't have any entry
31f18b77
FG
1488 if (!inode->is_base() && get_parent_dir()->inode->is_stray() &&
1489 !inode->snaprealm) {
7c673cae
FG
1490 dout(7) << "fetch dirfrag for unlinked directory, mark complete" << dendl;
1491 if (get_version() == 0) {
31f18b77 1492 assert(inode->is_auth());
7c673cae
FG
1493 set_version(1);
1494
1495 if (state_test(STATE_REJOINUNDEF)) {
1496 assert(cache->mds->is_rejoin());
1497 state_clear(STATE_REJOINUNDEF);
1498 cache->opened_undef_dirfrag(this);
1499 }
1500 }
1501 mark_complete();
1502
1503 if (c)
1504 cache->mds->queue_waiter(c);
1505 return;
1506 }
1507
1508 if (c) add_waiter(WAIT_COMPLETE, c);
94b18763 1509 if (!want_dn.empty()) wanted_items.insert(mempool::mds_co::string(want_dn));
7c673cae
FG
1510
1511 // already fetching?
1512 if (state_test(CDir::STATE_FETCHING)) {
1513 dout(7) << "already fetching; waiting" << dendl;
1514 return;
1515 }
1516
1517 auth_pin(this);
1518 state_set(CDir::STATE_FETCHING);
1519
1520 if (cache->mds->logger) cache->mds->logger->inc(l_mds_dir_fetch);
1521
1522 std::set<dentry_key_t> empty;
1523 _omap_fetch(NULL, empty);
1524}
1525
1526void CDir::fetch(MDSInternalContextBase *c, const std::set<dentry_key_t>& keys)
1527{
1528 dout(10) << "fetch " << keys.size() << " keys on " << *this << dendl;
1529
1530 assert(is_auth());
1531 assert(!is_complete());
1532
1533 if (!can_auth_pin()) {
1534 dout(7) << "fetch keys waiting for authpinnable" << dendl;
1535 add_waiter(WAIT_UNFREEZE, c);
1536 return;
1537 }
1538 if (state_test(CDir::STATE_FETCHING)) {
1539 dout(7) << "fetch keys waiting for full fetch" << dendl;
1540 add_waiter(WAIT_COMPLETE, c);
1541 return;
1542 }
1543
1544 auth_pin(this);
1545 if (cache->mds->logger) cache->mds->logger->inc(l_mds_dir_fetch);
1546
1547 _omap_fetch(c, keys);
1548}
1549
1550class C_IO_Dir_OMAP_FetchedMore : public CDirIOContext {
1551 MDSInternalContextBase *fin;
1552public:
1553 bufferlist hdrbl;
1554 bool more = false;
1555 map<string, bufferlist> omap; ///< carry-over from before
1556 map<string, bufferlist> omap_more; ///< new batch
1557 int ret;
1558 C_IO_Dir_OMAP_FetchedMore(CDir *d, MDSInternalContextBase *f) :
1559 CDirIOContext(d), fin(f), ret(0) { }
1560 void finish(int r) {
1561 // merge results
1562 if (omap.empty()) {
1563 omap.swap(omap_more);
1564 } else {
1565 omap.insert(omap_more.begin(), omap_more.end());
1566 }
1567 if (more) {
1568 dir->_omap_fetch_more(hdrbl, omap, fin);
1569 } else {
1570 dir->_omap_fetched(hdrbl, omap, !fin, r);
1571 if (fin)
1572 fin->complete(r);
1573 }
1574 }
1575};
1576
1577class C_IO_Dir_OMAP_Fetched : public CDirIOContext {
1578 MDSInternalContextBase *fin;
1579public:
1580 bufferlist hdrbl;
1581 bool more = false;
1582 map<string, bufferlist> omap;
1583 bufferlist btbl;
1584 int ret1, ret2, ret3;
1585
1586 C_IO_Dir_OMAP_Fetched(CDir *d, MDSInternalContextBase *f) :
1587 CDirIOContext(d), fin(f), ret1(0), ret2(0), ret3(0) { }
1588 void finish(int r) override {
1589 // check the correctness of backtrace
1590 if (r >= 0 && ret3 != -ECANCELED)
1591 dir->inode->verify_diri_backtrace(btbl, ret3);
1592 if (r >= 0) r = ret1;
1593 if (r >= 0) r = ret2;
1594 if (more) {
1595 dir->_omap_fetch_more(hdrbl, omap, fin);
1596 } else {
1597 dir->_omap_fetched(hdrbl, omap, !fin, r);
1598 if (fin)
1599 fin->complete(r);
1600 }
1601 }
1602};
1603
1604void CDir::_omap_fetch(MDSInternalContextBase *c, const std::set<dentry_key_t>& keys)
1605{
1606 C_IO_Dir_OMAP_Fetched *fin = new C_IO_Dir_OMAP_Fetched(this, c);
1607 object_t oid = get_ondisk_object();
1608 object_locator_t oloc(cache->mds->mdsmap->get_metadata_pool());
1609 ObjectOperation rd;
1610 rd.omap_get_header(&fin->hdrbl, &fin->ret1);
1611 if (keys.empty()) {
1612 assert(!c);
1613 rd.omap_get_vals("", "", g_conf->mds_dir_keys_per_op,
1614 &fin->omap, &fin->more, &fin->ret2);
1615 } else {
1616 assert(c);
1617 std::set<std::string> str_keys;
94b18763 1618 for (auto p : keys) {
7c673cae 1619 string str;
94b18763 1620 p.encode(str);
7c673cae
FG
1621 str_keys.insert(str);
1622 }
1623 rd.omap_get_vals_by_keys(str_keys, &fin->omap, &fin->ret2);
1624 }
1625 // check the correctness of backtrace
1626 if (g_conf->mds_verify_backtrace > 0 && frag == frag_t()) {
1627 rd.getxattr("parent", &fin->btbl, &fin->ret3);
1628 rd.set_last_op_flags(CEPH_OSD_OP_FLAG_FAILOK);
1629 } else {
1630 fin->ret3 = -ECANCELED;
1631 }
1632
1633 cache->mds->objecter->read(oid, oloc, rd, CEPH_NOSNAP, NULL, 0,
1634 new C_OnFinisher(fin, cache->mds->finisher));
1635}
1636
1637void CDir::_omap_fetch_more(
1638 bufferlist& hdrbl,
1639 map<string, bufferlist>& omap,
1640 MDSInternalContextBase *c)
1641{
1642 // we have more omap keys to fetch!
1643 object_t oid = get_ondisk_object();
1644 object_locator_t oloc(cache->mds->mdsmap->get_metadata_pool());
1645 C_IO_Dir_OMAP_FetchedMore *fin = new C_IO_Dir_OMAP_FetchedMore(this, c);
1646 fin->hdrbl.claim(hdrbl);
1647 fin->omap.swap(omap);
1648 ObjectOperation rd;
1649 rd.omap_get_vals(fin->omap.rbegin()->first,
1650 "", /* filter prefix */
1651 g_conf->mds_dir_keys_per_op,
1652 &fin->omap_more,
1653 &fin->more,
1654 &fin->ret);
1655 cache->mds->objecter->read(oid, oloc, rd, CEPH_NOSNAP, NULL, 0,
1656 new C_OnFinisher(fin, cache->mds->finisher));
1657}
1658
1659CDentry *CDir::_load_dentry(
94b18763
FG
1660 boost::string_view key,
1661 boost::string_view dname,
7c673cae
FG
1662 const snapid_t last,
1663 bufferlist &bl,
1664 const int pos,
1665 const std::set<snapid_t> *snaps,
28e407b8 1666 bool *force_dirty)
7c673cae
FG
1667{
1668 bufferlist::iterator q = bl.begin();
1669
1670 snapid_t first;
1671 ::decode(first, q);
1672
1673 // marker
1674 char type;
1675 ::decode(type, q);
1676
1677 dout(20) << "_fetched pos " << pos << " marker '" << type << "' dname '" << dname
1678 << " [" << first << "," << last << "]"
1679 << dendl;
1680
1681 bool stale = false;
1682 if (snaps && last != CEPH_NOSNAP) {
1683 set<snapid_t>::const_iterator p = snaps->lower_bound(first);
1684 if (p == snaps->end() || *p > last) {
1685 dout(10) << " skipping stale dentry on [" << first << "," << last << "]" << dendl;
1686 stale = true;
1687 }
1688 }
1689
1690 /*
1691 * look for existing dentry for _last_ snap, because unlink +
1692 * create may leave a "hole" (epochs during which the dentry
1693 * doesn't exist) but for which no explicit negative dentry is in
1694 * the cache.
1695 */
1696 CDentry *dn;
1697 if (stale)
1698 dn = lookup_exact_snap(dname, last);
1699 else
1700 dn = lookup(dname, last);
1701
1702 if (type == 'L') {
1703 // hard link
1704 inodeno_t ino;
1705 unsigned char d_type;
1706 ::decode(ino, q);
1707 ::decode(d_type, q);
1708
1709 if (stale) {
1710 if (!dn) {
94b18763 1711 stale_items.insert(mempool::mds_co::string(key));
7c673cae
FG
1712 *force_dirty = true;
1713 }
1714 return dn;
1715 }
1716
1717 if (dn) {
28e407b8
AA
1718 CDentry::linkage_t *dnl = dn->get_linkage();
1719 dout(12) << "_fetched had " << (dnl->is_null() ? "NEG" : "") << " dentry " << *dn << dendl;
1720 if (committed_version == 0 &&
1721 dnl->is_remote() &&
1722 dn->is_dirty() &&
1723 ino == dnl->get_remote_ino() &&
1724 d_type == dnl->get_remote_d_type()) {
1725 // see comment below
1726 dout(10) << "_fetched had underwater dentry " << *dn << ", marking clean" << dendl;
1727 dn->mark_clean();
7c673cae
FG
1728 }
1729 } else {
1730 // (remote) link
1731 dn = add_remote_dentry(dname, ino, d_type, first, last);
1732
1733 // link to inode?
1734 CInode *in = cache->get_inode(ino); // we may or may not have it.
1735 if (in) {
1736 dn->link_remote(dn->get_linkage(), in);
1737 dout(12) << "_fetched got remote link " << ino << " which we have " << *in << dendl;
1738 } else {
1739 dout(12) << "_fetched got remote link " << ino << " (dont' have it)" << dendl;
1740 }
1741 }
1742 }
1743 else if (type == 'I') {
1744 // inode
1745
1746 // Load inode data before looking up or constructing CInode
1747 InodeStore inode_data;
1748 inode_data.decode_bare(q);
1749
1750 if (stale) {
1751 if (!dn) {
94b18763 1752 stale_items.insert(mempool::mds_co::string(key));
7c673cae
FG
1753 *force_dirty = true;
1754 }
1755 return dn;
1756 }
1757
1758 bool undef_inode = false;
1759 if (dn) {
28e407b8
AA
1760 CDentry::linkage_t *dnl = dn->get_linkage();
1761 dout(12) << "_fetched had " << (dnl->is_null() ? "NEG" : "") << " dentry " << *dn << dendl;
1762
1763 if (dnl->is_primary()) {
1764 CInode *in = dnl->get_inode();
1765 if (in->state_test(CInode::STATE_REJOINUNDEF)) {
1766 undef_inode = true;
1767 } else if (committed_version == 0 &&
1768 dn->is_dirty() &&
1769 inode_data.inode.ino == in->ino() &&
1770 inode_data.inode.version == in->get_version()) {
1771 /* clean underwater item?
1772 * Underwater item is something that is dirty in our cache from
1773 * journal replay, but was previously flushed to disk before the
1774 * mds failed.
1775 *
1776 * We only do this is committed_version == 0. that implies either
1777 * - this is a fetch after from a clean/empty CDir is created
1778 * (and has no effect, since the dn won't exist); or
1779 * - this is a fetch after _recovery_, which is what we're worried
1780 * about. Items that are marked dirty from the journal should be
1781 * marked clean if they appear on disk.
1782 */
1783 dout(10) << "_fetched had underwater dentry " << *dn << ", marking clean" << dendl;
1784 dn->mark_clean();
1785 dout(10) << "_fetched had underwater inode " << *dnl->get_inode() << ", marking clean" << dendl;
1786 in->mark_clean();
1787 }
1788 }
7c673cae
FG
1789 }
1790
1791 if (!dn || undef_inode) {
1792 // add inode
1793 CInode *in = cache->get_inode(inode_data.inode.ino, last);
1794 if (!in || undef_inode) {
1795 if (undef_inode && in)
1796 in->first = first;
1797 else
1798 in = new CInode(cache, true, first, last);
1799
1800 in->inode = inode_data.inode;
1801 // symlink?
1802 if (in->is_symlink())
1803 in->symlink = inode_data.symlink;
1804
1805 in->dirfragtree.swap(inode_data.dirfragtree);
1806 in->xattrs.swap(inode_data.xattrs);
1807 in->old_inodes.swap(inode_data.old_inodes);
1808 if (!in->old_inodes.empty()) {
1809 snapid_t min_first = in->old_inodes.rbegin()->first + 1;
1810 if (min_first > in->first)
1811 in->first = min_first;
1812 }
1813
1814 in->oldest_snap = inode_data.oldest_snap;
1815 in->decode_snap_blob(inode_data.snap_blob);
1816 if (snaps && !in->snaprealm)
1817 in->purge_stale_snap_data(*snaps);
1818
1819 if (!undef_inode) {
1820 cache->add_inode(in); // add
1821 dn = add_primary_dentry(dname, in, first, last); // link
1822 }
1823 dout(12) << "_fetched got " << *dn << " " << *in << dendl;
1824
1825 if (in->inode.is_dirty_rstat())
1826 in->mark_dirty_rstat();
1827
1828 //in->hack_accessed = false;
1829 //in->hack_load_stamp = ceph_clock_now();
1830 //num_new_inodes_loaded++;
94b18763
FG
1831 } else if (g_conf->get_val<bool>("mds_hack_allow_loading_invalid_metadata")) {
1832 dout(20) << "hack: adding duplicate dentry for " << *in << dendl;
1833 dn = add_primary_dentry(dname, in, first, last);
7c673cae
FG
1834 } else {
1835 dout(0) << "_fetched badness: got (but i already had) " << *in
1836 << " mode " << in->inode.mode
1837 << " mtime " << in->inode.mtime << dendl;
1838 string dirpath, inopath;
1839 this->inode->make_path_string(dirpath);
1840 in->make_path_string(inopath);
1841 cache->mds->clog->error() << "loaded dup inode " << inode_data.inode.ino
1842 << " [" << first << "," << last << "] v" << inode_data.inode.version
1843 << " at " << dirpath << "/" << dname
1844 << ", but inode " << in->vino() << " v" << in->inode.version
1845 << " already exists at " << inopath;
1846 return dn;
1847 }
1848 }
1849 } else {
1850 std::ostringstream oss;
1851 oss << "Invalid tag char '" << type << "' pos " << pos;
1852 throw buffer::malformed_input(oss.str());
1853 }
1854
1855 return dn;
1856}
1857
1858void CDir::_omap_fetched(bufferlist& hdrbl, map<string, bufferlist>& omap,
1859 bool complete, int r)
1860{
1861 LogChannelRef clog = cache->mds->clog;
1862 dout(10) << "_fetched header " << hdrbl.length() << " bytes "
1863 << omap.size() << " keys for " << *this << dendl;
1864
1865 assert(r == 0 || r == -ENOENT || r == -ENODATA);
1866 assert(is_auth());
1867 assert(!is_frozen());
1868
1869 if (hdrbl.length() == 0) {
1870 dout(0) << "_fetched missing object for " << *this << dendl;
1871
1872 clog->error() << "dir " << dirfrag() << " object missing on disk; some "
1873 "files may be lost (" << get_path() << ")";
1874
1875 go_bad(complete);
1876 return;
1877 }
1878
1879 fnode_t got_fnode;
1880 {
1881 bufferlist::iterator p = hdrbl.begin();
1882 try {
1883 ::decode(got_fnode, p);
1884 } catch (const buffer::error &err) {
1885 derr << "Corrupt fnode in dirfrag " << dirfrag()
1886 << ": " << err << dendl;
1887 clog->warn() << "Corrupt fnode header in " << dirfrag() << ": "
1888 << err << " (" << get_path() << ")";
1889 go_bad(complete);
1890 return;
1891 }
1892 if (!p.end()) {
1893 clog->warn() << "header buffer of dir " << dirfrag() << " has "
1894 << hdrbl.length() - p.get_off() << " extra bytes ("
1895 << get_path() << ")";
1896 go_bad(complete);
1897 return;
1898 }
1899 }
1900
1901 dout(10) << "_fetched version " << got_fnode.version << dendl;
1902
1903 // take the loaded fnode?
1904 // only if we are a fresh CDir* with no prior state.
1905 if (get_version() == 0) {
1906 assert(!is_projected());
1907 assert(!state_test(STATE_COMMITTING));
1908 fnode = got_fnode;
1909 projected_version = committing_version = committed_version = got_fnode.version;
1910
1911 if (state_test(STATE_REJOINUNDEF)) {
1912 assert(cache->mds->is_rejoin());
1913 state_clear(STATE_REJOINUNDEF);
1914 cache->opened_undef_dirfrag(this);
1915 }
1916 }
1917
1918 list<CInode*> undef_inodes;
1919
1920 // purge stale snaps?
1921 // only if we have past_parents open!
1922 bool force_dirty = false;
1923 const set<snapid_t> *snaps = NULL;
1924 SnapRealm *realm = inode->find_snaprealm();
1925 if (!realm->have_past_parents_open()) {
1926 dout(10) << " no snap purge, one or more past parents NOT open" << dendl;
1927 } else if (fnode.snap_purged_thru < realm->get_last_destroyed()) {
1928 snaps = &realm->get_snaps();
1929 dout(10) << " snap_purged_thru " << fnode.snap_purged_thru
1930 << " < " << realm->get_last_destroyed()
1931 << ", snap purge based on " << *snaps << dendl;
1932 if (get_num_snap_items() == 0) {
1933 fnode.snap_purged_thru = realm->get_last_destroyed();
1934 force_dirty = true;
1935 }
1936 }
1937
1938 unsigned pos = omap.size() - 1;
1939 for (map<string, bufferlist>::reverse_iterator p = omap.rbegin();
1940 p != omap.rend();
1941 ++p, --pos) {
1942 string dname;
1943 snapid_t last;
1944 dentry_key_t::decode_helper(p->first, dname, last);
1945
1946 CDentry *dn = NULL;
1947 try {
1948 dn = _load_dentry(
1949 p->first, dname, last, p->second, pos, snaps,
28e407b8 1950 &force_dirty);
7c673cae
FG
1951 } catch (const buffer::error &err) {
1952 cache->mds->clog->warn() << "Corrupt dentry '" << dname << "' in "
1953 "dir frag " << dirfrag() << ": "
1954 << err << "(" << get_path() << ")";
1955
1956 // Remember that this dentry is damaged. Subsequent operations
1957 // that try to act directly on it will get their EIOs, but this
1958 // dirfrag as a whole will continue to look okay (minus the
1959 // mysteriously-missing dentry)
1960 go_bad_dentry(last, dname);
1961
1962 // Anyone who was WAIT_DENTRY for this guy will get kicked
1963 // to RetryRequest, and hit the DamageTable-interrogating path.
1964 // Stats will now be bogus because we will think we're complete,
1965 // but have 1 or more missing dentries.
1966 continue;
1967 }
1968
28e407b8
AA
1969 if (!dn)
1970 continue;
7c673cae 1971
28e407b8
AA
1972 CDentry::linkage_t *dnl = dn->get_linkage();
1973 if (dnl->is_primary() && dnl->get_inode()->state_test(CInode::STATE_REJOINUNDEF))
1974 undef_inodes.push_back(dnl->get_inode());
7c673cae 1975
28e407b8
AA
1976 if (!complete || wanted_items.count(mempool::mds_co::string(boost::string_view(dname))) > 0) {
1977 dout(10) << " touching wanted dn " << *dn << dendl;
1978 inode->mdcache->touch_dentry(dn);
7c673cae
FG
1979 }
1980 }
1981
1982 //cache->mds->logger->inc("newin", num_new_inodes_loaded);
1983
1984 // mark complete, !fetching
1985 if (complete) {
1986 wanted_items.clear();
1987 mark_complete();
1988 state_clear(STATE_FETCHING);
1989
1990 if (scrub_infop && scrub_infop->need_scrub_local) {
1991 scrub_infop->need_scrub_local = false;
1992 scrub_local();
1993 }
1994 }
1995
1996 // open & force frags
1997 while (!undef_inodes.empty()) {
1998 CInode *in = undef_inodes.front();
1999 undef_inodes.pop_front();
2000 in->state_clear(CInode::STATE_REJOINUNDEF);
2001 cache->opened_undef_inode(in);
2002 }
2003
2004 // dirty myself to remove stale snap dentries
2005 if (force_dirty && !inode->mdcache->is_readonly())
2006 log_mark_dirty();
2007
2008 auth_unpin(this);
2009
2010 if (complete) {
2011 // kick waiters
2012 finish_waiting(WAIT_COMPLETE, 0);
2013 }
2014}
2015
2016void CDir::_go_bad()
2017{
2018 if (get_version() == 0)
2019 set_version(1);
2020 state_set(STATE_BADFRAG);
2021 // mark complete, !fetching
2022 mark_complete();
2023 state_clear(STATE_FETCHING);
2024 auth_unpin(this);
2025
2026 // kick waiters
2027 finish_waiting(WAIT_COMPLETE, -EIO);
2028}
2029
94b18763 2030void CDir::go_bad_dentry(snapid_t last, boost::string_view dname)
7c673cae 2031{
94b18763
FG
2032 dout(10) << __func__ << " " << dname << dendl;
2033 std::string path(get_path());
2034 path += "/";
2035 path += std::string(dname);
7c673cae 2036 const bool fatal = cache->mds->damage_table.notify_dentry(
94b18763 2037 inode->ino(), frag, last, dname, path);
7c673cae
FG
2038 if (fatal) {
2039 cache->mds->damaged();
2040 ceph_abort(); // unreachable, damaged() respawns us
2041 }
2042}
2043
2044void CDir::go_bad(bool complete)
2045{
2046 dout(10) << "go_bad " << frag << dendl;
2047 const bool fatal = cache->mds->damage_table.notify_dirfrag(
2048 inode->ino(), frag, get_path());
2049 if (fatal) {
2050 cache->mds->damaged();
2051 ceph_abort(); // unreachable, damaged() respawns us
2052 }
2053
2054 if (complete)
2055 _go_bad();
2056 else
2057 auth_unpin(this);
2058}
2059
2060// -----------------------
2061// COMMIT
2062
2063/**
2064 * commit
2065 *
2066 * @param want - min version i want committed
2067 * @param c - callback for completion
2068 */
2069void CDir::commit(version_t want, MDSInternalContextBase *c, bool ignore_authpinnability, int op_prio)
2070{
2071 dout(10) << "commit want " << want << " on " << *this << dendl;
2072 if (want == 0) want = get_version();
2073
2074 // preconditions
2075 assert(want <= get_version() || get_version() == 0); // can't commit the future
2076 assert(want > committed_version); // the caller is stupid
2077 assert(is_auth());
2078 assert(ignore_authpinnability || can_auth_pin());
2079
7c673cae
FG
2080 // note: queue up a noop if necessary, so that we always
2081 // get an auth_pin.
2082 if (!c)
2083 c = new C_MDSInternalNoop;
2084
2085 // auth_pin on first waiter
2086 if (waiting_for_commit.empty())
2087 auth_pin(this);
2088 waiting_for_commit[want].push_back(c);
2089
2090 // ok.
2091 _commit(want, op_prio);
2092}
2093
2094class C_IO_Dir_Committed : public CDirIOContext {
2095 version_t version;
2096public:
2097 C_IO_Dir_Committed(CDir *d, version_t v) : CDirIOContext(d), version(v) { }
2098 void finish(int r) override {
2099 dir->_committed(r, version);
2100 }
2101};
2102
2103/**
2104 * Flush out the modified dentries in this dir. Keep the bufferlist
2105 * below max_write_size;
2106 */
2107void CDir::_omap_commit(int op_prio)
2108{
2109 dout(10) << "_omap_commit" << dendl;
2110
2111 unsigned max_write_size = cache->max_dir_commit_size;
2112 unsigned write_size = 0;
2113
2114 if (op_prio < 0)
2115 op_prio = CEPH_MSG_PRIO_DEFAULT;
2116
2117 // snap purge?
2118 const set<snapid_t> *snaps = NULL;
2119 SnapRealm *realm = inode->find_snaprealm();
2120 if (!realm->have_past_parents_open()) {
2121 dout(10) << " no snap purge, one or more past parents NOT open" << dendl;
2122 } else if (fnode.snap_purged_thru < realm->get_last_destroyed()) {
2123 snaps = &realm->get_snaps();
2124 dout(10) << " snap_purged_thru " << fnode.snap_purged_thru
2125 << " < " << realm->get_last_destroyed()
2126 << ", snap purge based on " << *snaps << dendl;
2127 // fnode.snap_purged_thru = realm->get_last_destroyed();
2128 }
2129
2130 set<string> to_remove;
2131 map<string, bufferlist> to_set;
2132
2133 C_GatherBuilder gather(g_ceph_context,
2134 new C_OnFinisher(new C_IO_Dir_Committed(this,
2135 get_version()),
2136 cache->mds->finisher));
2137
2138 SnapContext snapc;
2139 object_t oid = get_ondisk_object();
2140 object_locator_t oloc(cache->mds->mdsmap->get_metadata_pool());
2141
2142 if (!stale_items.empty()) {
94b18763
FG
2143 for (const auto &p : stale_items) {
2144 to_remove.insert(std::string(boost::string_view(p)));
2145 write_size += p.length();
7c673cae
FG
2146 }
2147 stale_items.clear();
2148 }
2149
b32b8144 2150 auto write_one = [&](CDentry *dn) {
7c673cae
FG
2151 string key;
2152 dn->key().encode(key);
2153
2154 if (dn->last != CEPH_NOSNAP &&
2155 snaps && try_trim_snap_dentry(dn, *snaps)) {
2156 dout(10) << " rm " << key << dendl;
2157 write_size += key.length();
2158 to_remove.insert(key);
b32b8144 2159 return;
7c673cae
FG
2160 }
2161
7c673cae 2162 if (dn->get_linkage()->is_null()) {
94b18763 2163 dout(10) << " rm " << dn->get_name() << " " << *dn << dendl;
7c673cae
FG
2164 write_size += key.length();
2165 to_remove.insert(key);
2166 } else {
94b18763 2167 dout(10) << " set " << dn->get_name() << " " << *dn << dendl;
7c673cae
FG
2168 bufferlist dnbl;
2169 _encode_dentry(dn, dnbl, snaps);
2170 write_size += key.length() + dnbl.length();
2171 to_set[key].swap(dnbl);
2172 }
2173
2174 if (write_size >= max_write_size) {
2175 ObjectOperation op;
2176 op.priority = op_prio;
2177
2178 // don't create new dirfrag blindly
2179 if (!is_new() && !state_test(CDir::STATE_FRAGMENTING))
2180 op.stat(NULL, (ceph::real_time*) NULL, NULL);
2181
2182 if (!to_set.empty())
2183 op.omap_set(to_set);
2184 if (!to_remove.empty())
2185 op.omap_rm_keys(to_remove);
2186
2187 cache->mds->objecter->mutate(oid, oloc, op, snapc,
2188 ceph::real_clock::now(),
2189 0, gather.new_sub());
2190
2191 write_size = 0;
2192 to_set.clear();
2193 to_remove.clear();
2194 }
b32b8144
FG
2195 };
2196
2197 if (state_test(CDir::STATE_FRAGMENTING)) {
2198 for (auto p = items.begin(); p != items.end(); ) {
2199 CDentry *dn = p->second;
2200 ++p;
2201 if (!dn->is_dirty() && dn->get_linkage()->is_null())
2202 continue;
2203 write_one(dn);
2204 }
2205 } else {
2206 for (auto p = dirty_dentries.begin(); !p.end(); ) {
2207 CDentry *dn = *p;
2208 ++p;
2209 write_one(dn);
2210 }
7c673cae
FG
2211 }
2212
2213 ObjectOperation op;
2214 op.priority = op_prio;
2215
2216 // don't create new dirfrag blindly
2217 if (!is_new() && !state_test(CDir::STATE_FRAGMENTING))
2218 op.stat(NULL, (ceph::real_time*)NULL, NULL);
2219
2220 /*
2221 * save the header at the last moment.. If we were to send it off before other
2222 * updates, but die before sending them all, we'd think that the on-disk state
2223 * was fully committed even though it wasn't! However, since the messages are
2224 * strictly ordered between the MDS and the OSD, and since messages to a given
2225 * PG are strictly ordered, if we simply send the message containing the header
2226 * off last, we cannot get our header into an incorrect state.
2227 */
2228 bufferlist header;
2229 ::encode(fnode, header);
2230 op.omap_set_header(header);
2231
2232 if (!to_set.empty())
2233 op.omap_set(to_set);
2234 if (!to_remove.empty())
2235 op.omap_rm_keys(to_remove);
2236
2237 cache->mds->objecter->mutate(oid, oloc, op, snapc,
2238 ceph::real_clock::now(),
2239 0, gather.new_sub());
2240
2241 gather.activate();
2242}
2243
2244void CDir::_encode_dentry(CDentry *dn, bufferlist& bl,
2245 const set<snapid_t> *snaps)
2246{
2247 // clear dentry NEW flag, if any. we can no longer silently drop it.
2248 dn->clear_new();
2249
2250 ::encode(dn->first, bl);
2251
2252 // primary or remote?
2253 if (dn->linkage.is_remote()) {
2254 inodeno_t ino = dn->linkage.get_remote_ino();
2255 unsigned char d_type = dn->linkage.get_remote_d_type();
94b18763 2256 dout(14) << " pos " << bl.length() << " dn '" << dn->get_name() << "' remote ino " << ino << dendl;
7c673cae
FG
2257
2258 // marker, name, ino
2259 bl.append('L'); // remote link
2260 ::encode(ino, bl);
2261 ::encode(d_type, bl);
2262 } else if (dn->linkage.is_primary()) {
2263 // primary link
2264 CInode *in = dn->linkage.get_inode();
2265 assert(in);
2266
94b18763 2267 dout(14) << " pos " << bl.length() << " dn '" << dn->get_name() << "' inode " << *in << dendl;
7c673cae
FG
2268
2269 // marker, name, inode, [symlink string]
2270 bl.append('I'); // inode
2271
2272 if (in->is_multiversion()) {
2273 if (!in->snaprealm) {
2274 if (snaps)
2275 in->purge_stale_snap_data(*snaps);
2276 } else if (in->snaprealm->have_past_parents_open()) {
2277 in->purge_stale_snap_data(in->snaprealm->get_snaps());
2278 }
2279 }
2280
2281 bufferlist snap_blob;
2282 in->encode_snap_blob(snap_blob);
2283 in->encode_bare(bl, cache->mds->mdsmap->get_up_features(), &snap_blob);
2284 } else {
2285 assert(!dn->linkage.is_null());
2286 }
2287}
2288
2289void CDir::_commit(version_t want, int op_prio)
2290{
2291 dout(10) << "_commit want " << want << " on " << *this << dendl;
2292
2293 // we can't commit things in the future.
2294 // (even the projected future.)
2295 assert(want <= get_version() || get_version() == 0);
2296
2297 // check pre+postconditions.
2298 assert(is_auth());
2299
2300 // already committed?
2301 if (committed_version >= want) {
2302 dout(10) << "already committed " << committed_version << " >= " << want << dendl;
2303 return;
2304 }
2305 // already committing >= want?
2306 if (committing_version >= want) {
2307 dout(10) << "already committing " << committing_version << " >= " << want << dendl;
2308 assert(state_test(STATE_COMMITTING));
2309 return;
2310 }
2311
2312 // alrady committed an older version?
2313 if (committing_version > committed_version) {
2314 dout(10) << "already committing older " << committing_version << ", waiting for that to finish" << dendl;
2315 return;
2316 }
2317
2318 // commit.
2319 committing_version = get_version();
2320
2321 // mark committing (if not already)
2322 if (!state_test(STATE_COMMITTING)) {
2323 dout(10) << "marking committing" << dendl;
2324 state_set(STATE_COMMITTING);
2325 }
2326
2327 if (cache->mds->logger) cache->mds->logger->inc(l_mds_dir_commit);
2328
2329 _omap_commit(op_prio);
2330}
2331
2332
2333/**
2334 * _committed
2335 *
2336 * @param v version i just committed
2337 */
2338void CDir::_committed(int r, version_t v)
2339{
2340 if (r < 0) {
2341 // the directory could be partly purged during MDS failover
2342 if (r == -ENOENT && committed_version == 0 &&
31f18b77 2343 !inode->is_base() && get_parent_dir()->inode->is_stray()) {
7c673cae 2344 r = 0;
31f18b77
FG
2345 if (inode->snaprealm)
2346 inode->state_set(CInode::STATE_MISSINGOBJS);
7c673cae
FG
2347 }
2348 if (r < 0) {
2349 dout(1) << "commit error " << r << " v " << v << dendl;
2350 cache->mds->clog->error() << "failed to commit dir " << dirfrag() << " object,"
2351 << " errno " << r;
2352 cache->mds->handle_write_error(r);
2353 return;
2354 }
2355 }
2356
2357 dout(10) << "_committed v " << v << " on " << *this << dendl;
2358 assert(is_auth());
2359
2360 bool stray = inode->is_stray();
2361
2362 // take note.
2363 assert(v > committed_version);
2364 assert(v <= committing_version);
2365 committed_version = v;
2366
2367 // _all_ commits done?
2368 if (committing_version == committed_version)
2369 state_clear(CDir::STATE_COMMITTING);
2370
2371 // _any_ commit, even if we've been redirtied, means we're no longer new.
2372 item_new.remove_myself();
2373
2374 // dir clean?
2375 if (committed_version == get_version())
2376 mark_clean();
2377
2378 // dentries clean?
b32b8144
FG
2379 for (auto p = dirty_dentries.begin(); !p.end(); ) {
2380 CDentry *dn = *p;
2381 ++p;
7c673cae
FG
2382
2383 // inode?
2384 if (dn->linkage.is_primary()) {
2385 CInode *in = dn->linkage.get_inode();
2386 assert(in);
2387 assert(in->is_auth());
2388
2389 if (committed_version >= in->get_version()) {
2390 if (in->is_dirty()) {
2391 dout(15) << " dir " << committed_version << " >= inode " << in->get_version() << " now clean " << *in << dendl;
2392 in->mark_clean();
2393 }
2394 } else {
2395 dout(15) << " dir " << committed_version << " < inode " << in->get_version() << " still dirty " << *in << dendl;
2396 assert(in->is_dirty() || in->last < CEPH_NOSNAP); // special case for cow snap items (not predirtied)
2397 }
2398 }
2399
2400 // dentry
2401 if (committed_version >= dn->get_version()) {
b32b8144
FG
2402 dout(15) << " dir " << committed_version << " >= dn " << dn->get_version() << " now clean " << *dn << dendl;
2403 dn->mark_clean();
7c673cae 2404
b32b8144
FG
2405 // drop clean null stray dentries immediately
2406 if (stray &&
2407 dn->get_num_ref() == 0 &&
2408 !dn->is_projected() &&
2409 dn->get_linkage()->is_null())
2410 remove_dentry(dn);
7c673cae
FG
2411 } else {
2412 dout(15) << " dir " << committed_version << " < dn " << dn->get_version() << " still dirty " << *dn << dendl;
b32b8144 2413 assert(dn->is_dirty());
7c673cae
FG
2414 }
2415 }
2416
2417 // finishers?
2418 bool were_waiters = !waiting_for_commit.empty();
2419
94b18763
FG
2420 auto it = waiting_for_commit.begin();
2421 while (it != waiting_for_commit.end()) {
2422 auto _it = it;
2423 ++_it;
2424 if (it->first > committed_version) {
2425 dout(10) << " there are waiters for " << it->first << ", committing again" << dendl;
2426 _commit(it->first, -1);
7c673cae
FG
2427 break;
2428 }
94b18763
FG
2429 std::list<MDSInternalContextBase*> t;
2430 for (const auto &waiter : it->second)
2431 t.push_back(waiter);
2432 cache->mds->queue_waiters(t);
2433 waiting_for_commit.erase(it);
2434 it = _it;
7c673cae
FG
2435 }
2436
2437 // try drop dentries in this dirfrag if it's about to be purged
31f18b77
FG
2438 if (!inode->is_base() && get_parent_dir()->inode->is_stray() &&
2439 inode->snaprealm)
7c673cae
FG
2440 cache->maybe_eval_stray(inode, true);
2441
2442 // unpin if we kicked the last waiter.
2443 if (were_waiters &&
2444 waiting_for_commit.empty())
2445 auth_unpin(this);
2446}
2447
2448
2449
2450
2451// IMPORT/EXPORT
2452
2453void CDir::encode_export(bufferlist& bl)
2454{
2455 assert(!is_projected());
2456 ::encode(first, bl);
2457 ::encode(fnode, bl);
2458 ::encode(dirty_old_rstat, bl);
2459 ::encode(committed_version, bl);
2460
2461 ::encode(state, bl);
2462 ::encode(dir_rep, bl);
2463
2464 ::encode(pop_me, bl);
2465 ::encode(pop_auth_subtree, bl);
2466
2467 ::encode(dir_rep_by, bl);
181888fb 2468 ::encode(get_replicas(), bl);
7c673cae
FG
2469
2470 get(PIN_TEMPEXPORTING);
2471}
2472
2473void CDir::finish_export(utime_t now)
2474{
2475 state &= MASK_STATE_EXPORT_KEPT;
2476 pop_auth_subtree_nested.sub(now, cache->decayrate, pop_auth_subtree);
2477 pop_me.zero(now);
2478 pop_auth_subtree.zero(now);
2479 put(PIN_TEMPEXPORTING);
2480 dirty_old_rstat.clear();
2481}
2482
2483void CDir::decode_import(bufferlist::iterator& blp, utime_t now, LogSegment *ls)
2484{
2485 ::decode(first, blp);
2486 ::decode(fnode, blp);
2487 ::decode(dirty_old_rstat, blp);
2488 projected_version = fnode.version;
2489 ::decode(committed_version, blp);
2490 committing_version = committed_version;
2491
2492 unsigned s;
2493 ::decode(s, blp);
2494 state &= MASK_STATE_IMPORT_KEPT;
2495 state_set(STATE_AUTH | (s & MASK_STATE_EXPORTED));
2496
2497 if (is_dirty()) {
2498 get(PIN_DIRTY);
2499 _mark_dirty(ls);
2500 }
2501
2502 ::decode(dir_rep, blp);
2503
2504 ::decode(pop_me, now, blp);
2505 ::decode(pop_auth_subtree, now, blp);
2506 pop_auth_subtree_nested.add(now, cache->decayrate, pop_auth_subtree);
2507
2508 ::decode(dir_rep_by, blp);
181888fb
FG
2509 ::decode(get_replicas(), blp);
2510 if (is_replicated()) get(PIN_REPLICATED);
7c673cae
FG
2511
2512 replica_nonce = 0; // no longer defined
2513
2514 // did we import some dirty scatterlock data?
2515 if (dirty_old_rstat.size() ||
2516 !(fnode.rstat == fnode.accounted_rstat)) {
2517 cache->mds->locker->mark_updated_scatterlock(&inode->nestlock);
2518 ls->dirty_dirfrag_nest.push_back(&inode->item_dirty_dirfrag_nest);
2519 }
2520 if (!(fnode.fragstat == fnode.accounted_fragstat)) {
2521 cache->mds->locker->mark_updated_scatterlock(&inode->filelock);
2522 ls->dirty_dirfrag_dir.push_back(&inode->item_dirty_dirfrag_dir);
2523 }
2524 if (is_dirty_dft()) {
2525 if (inode->dirfragtreelock.get_state() != LOCK_MIX &&
2526 inode->dirfragtreelock.is_stable()) {
2527 // clear stale dirtydft
2528 state_clear(STATE_DIRTYDFT);
2529 } else {
2530 cache->mds->locker->mark_updated_scatterlock(&inode->dirfragtreelock);
2531 ls->dirty_dirfrag_dirfragtree.push_back(&inode->item_dirty_dirfrag_dirfragtree);
2532 }
2533 }
2534}
2535
2536
2537
2538
2539/********************************
2540 * AUTHORITY
2541 */
2542
2543/*
2544 * if dir_auth.first == parent, auth is same as inode.
2545 * unless .second != unknown, in which case that sticks.
2546 */
2547mds_authority_t CDir::authority() const
2548{
2549 if (is_subtree_root())
2550 return dir_auth;
2551 else
2552 return inode->authority();
2553}
2554
2555/** is_subtree_root()
2556 * true if this is an auth delegation point.
2557 * that is, dir_auth != default (parent,unknown)
2558 *
2559 * some key observations:
2560 * if i am auth:
2561 * - any region bound will be an export, or frozen.
2562 *
2563 * note that this DOES heed dir_auth.pending
2564 */
2565/*
2566bool CDir::is_subtree_root()
2567{
2568 if (dir_auth == CDIR_AUTH_DEFAULT) {
2569 //dout(10) << "is_subtree_root false " << dir_auth << " != " << CDIR_AUTH_DEFAULT
2570 //<< " on " << ino() << dendl;
2571 return false;
2572 } else {
2573 //dout(10) << "is_subtree_root true " << dir_auth << " != " << CDIR_AUTH_DEFAULT
2574 //<< " on " << ino() << dendl;
2575 return true;
2576 }
2577}
2578*/
2579
2580/** contains(x)
2581 * true if we are x, or an ancestor of x
2582 */
2583bool CDir::contains(CDir *x)
2584{
2585 while (1) {
2586 if (x == this)
2587 return true;
2588 x = x->get_inode()->get_projected_parent_dir();
2589 if (x == 0)
2590 return false;
2591 }
2592}
2593
2594
2595
2596/** set_dir_auth
2597 */
2598void CDir::set_dir_auth(mds_authority_t a)
2599{
2600 dout(10) << "setting dir_auth=" << a
2601 << " from " << dir_auth
2602 << " on " << *this << dendl;
2603
2604 bool was_subtree = is_subtree_root();
2605 bool was_ambiguous = dir_auth.second >= 0;
2606
2607 // set it.
2608 dir_auth = a;
2609
2610 // new subtree root?
2611 if (!was_subtree && is_subtree_root()) {
2612 dout(10) << " new subtree root, adjusting auth_pins" << dendl;
2613
2614 // adjust nested auth pins
2615 if (get_cum_auth_pins())
2616 inode->adjust_nested_auth_pins(-1, NULL);
2617
2618 // unpin parent of frozen dir/tree?
224ce89b
WB
2619 if (inode->is_auth()) {
2620 assert(!is_frozen_tree_root());
2621 if (is_frozen_dir())
2622 inode->auth_unpin(this);
2623 }
7c673cae
FG
2624 }
2625 if (was_subtree && !is_subtree_root()) {
2626 dout(10) << " old subtree root, adjusting auth_pins" << dendl;
2627
2628 // adjust nested auth pins
2629 if (get_cum_auth_pins())
2630 inode->adjust_nested_auth_pins(1, NULL);
2631
2632 // pin parent of frozen dir/tree?
224ce89b
WB
2633 if (inode->is_auth()) {
2634 assert(!is_frozen_tree_root());
2635 if (is_frozen_dir())
2636 inode->auth_pin(this);
2637 }
7c673cae
FG
2638 }
2639
2640 // newly single auth?
2641 if (was_ambiguous && dir_auth.second == CDIR_AUTH_UNKNOWN) {
2642 list<MDSInternalContextBase*> ls;
2643 take_waiting(WAIT_SINGLEAUTH, ls);
2644 cache->mds->queue_waiters(ls);
2645 }
2646}
2647
2648
2649/*****************************************
2650 * AUTH PINS and FREEZING
2651 *
2652 * the basic plan is that auth_pins only exist in auth regions, and they
2653 * prevent a freeze (and subsequent auth change).
2654 *
2655 * however, we also need to prevent a parent from freezing if a child is frozen.
2656 * for that reason, the parent inode of a frozen directory is auth_pinned.
2657 *
2658 * the oddity is when the frozen directory is a subtree root. if that's the case,
2659 * the parent inode isn't frozen. which means that when subtree authority is adjusted
2660 * at the bounds, inodes for any frozen bound directories need to get auth_pins at that
2661 * time.
2662 *
2663 */
2664
2665void CDir::auth_pin(void *by)
2666{
2667 if (auth_pins == 0)
2668 get(PIN_AUTHPIN);
2669 auth_pins++;
2670
2671#ifdef MDS_AUTHPIN_SET
2672 auth_pin_set.insert(by);
2673#endif
2674
2675 dout(10) << "auth_pin by " << by
2676 << " on " << *this
2677 << " count now " << auth_pins << " + " << nested_auth_pins << dendl;
2678
2679 // nest pins?
2680 if (!is_subtree_root() &&
2681 get_cum_auth_pins() == 1)
2682 inode->adjust_nested_auth_pins(1, by);
2683}
2684
2685void CDir::auth_unpin(void *by)
2686{
2687 auth_pins--;
2688
2689#ifdef MDS_AUTHPIN_SET
2690 assert(auth_pin_set.count(by));
2691 auth_pin_set.erase(auth_pin_set.find(by));
2692#endif
2693 if (auth_pins == 0)
2694 put(PIN_AUTHPIN);
2695
2696 dout(10) << "auth_unpin by " << by
2697 << " on " << *this
2698 << " count now " << auth_pins << " + " << nested_auth_pins << dendl;
2699 assert(auth_pins >= 0);
2700
2701 int newcum = get_cum_auth_pins();
2702
2703 maybe_finish_freeze(); // pending freeze?
2704
2705 // nest?
2706 if (!is_subtree_root() &&
2707 newcum == 0)
2708 inode->adjust_nested_auth_pins(-1, by);
2709}
2710
2711void CDir::adjust_nested_auth_pins(int inc, int dirinc, void *by)
2712{
2713 assert(inc);
2714 nested_auth_pins += inc;
2715 dir_auth_pins += dirinc;
2716
2717 dout(15) << "adjust_nested_auth_pins " << inc << "/" << dirinc << " on " << *this
2718 << " by " << by << " count now "
2719 << auth_pins << " + " << nested_auth_pins << dendl;
2720 assert(nested_auth_pins >= 0);
2721 assert(dir_auth_pins >= 0);
2722
2723 int newcum = get_cum_auth_pins();
2724
2725 maybe_finish_freeze(); // pending freeze?
2726
2727 // nest?
2728 if (!is_subtree_root()) {
2729 if (newcum == 0)
2730 inode->adjust_nested_auth_pins(-1, by);
2731 else if (newcum == inc)
2732 inode->adjust_nested_auth_pins(1, by);
2733 }
2734}
2735
2736#ifdef MDS_VERIFY_FRAGSTAT
2737void CDir::verify_fragstat()
2738{
2739 assert(is_complete());
2740 if (inode->is_stray())
2741 return;
2742
2743 frag_info_t c;
2744 memset(&c, 0, sizeof(c));
2745
94b18763 2746 for (auto it = items.begin();
7c673cae
FG
2747 it != items.end();
2748 ++it) {
2749 CDentry *dn = it->second;
2750 if (dn->is_null())
2751 continue;
2752
2753 dout(10) << " " << *dn << dendl;
2754 if (dn->is_primary())
2755 dout(10) << " " << *dn->inode << dendl;
2756
2757 if (dn->is_primary()) {
2758 if (dn->inode->is_dir())
2759 c.nsubdirs++;
2760 else
2761 c.nfiles++;
2762 }
2763 if (dn->is_remote()) {
2764 if (dn->get_remote_d_type() == DT_DIR)
2765 c.nsubdirs++;
2766 else
2767 c.nfiles++;
2768 }
2769 }
2770
2771 if (c.nsubdirs != fnode.fragstat.nsubdirs ||
2772 c.nfiles != fnode.fragstat.nfiles) {
2773 dout(0) << "verify_fragstat failed " << fnode.fragstat << " on " << *this << dendl;
2774 dout(0) << " i count " << c << dendl;
2775 ceph_abort();
2776 } else {
2777 dout(0) << "verify_fragstat ok " << fnode.fragstat << " on " << *this << dendl;
2778 }
2779}
2780#endif
2781
2782/*****************************************************************************
2783 * FREEZING
2784 */
2785
2786// FREEZE TREE
2787
2788bool CDir::freeze_tree()
2789{
2790 assert(!is_frozen());
2791 assert(!is_freezing());
2792
2793 auth_pin(this);
2794 if (is_freezeable(true)) {
2795 _freeze_tree();
2796 auth_unpin(this);
2797 return true;
2798 } else {
2799 state_set(STATE_FREEZINGTREE);
2800 ++num_freezing_trees;
2801 dout(10) << "freeze_tree waiting " << *this << dendl;
2802 return false;
2803 }
2804}
2805
2806void CDir::_freeze_tree()
2807{
2808 dout(10) << "_freeze_tree " << *this << dendl;
2809 assert(is_freezeable(true));
2810
2811 // twiddle state
2812 if (state_test(STATE_FREEZINGTREE)) {
2813 state_clear(STATE_FREEZINGTREE); // actually, this may get set again by next context?
2814 --num_freezing_trees;
2815 }
224ce89b
WB
2816
2817 if (is_auth()) {
2818 mds_authority_t auth;
2819 bool was_subtree = is_subtree_root();
2820 if (was_subtree) {
2821 auth = get_dir_auth();
2822 } else {
2823 // temporarily prevent parent subtree from becoming frozen.
2824 inode->auth_pin(this);
2825 // create new subtree
2826 auth = authority();
2827 }
2828
2829 assert(auth.first >= 0);
2830 assert(auth.second == CDIR_AUTH_UNKNOWN);
2831 auth.second = auth.first;
2832 inode->mdcache->adjust_subtree_auth(this, auth);
2833 if (!was_subtree)
2834 inode->auth_unpin(this);
2835 }
2836
7c673cae
FG
2837 state_set(STATE_FROZENTREE);
2838 ++num_frozen_trees;
2839 get(PIN_FROZEN);
7c673cae
FG
2840}
2841
2842void CDir::unfreeze_tree()
2843{
2844 dout(10) << "unfreeze_tree " << *this << dendl;
2845
2846 if (state_test(STATE_FROZENTREE)) {
2847 // frozen. unfreeze.
2848 state_clear(STATE_FROZENTREE);
2849 --num_frozen_trees;
2850
2851 put(PIN_FROZEN);
2852
224ce89b
WB
2853 if (is_auth()) {
2854 // must be subtree
2855 assert(is_subtree_root());
2856 // for debug purpose, caller should ensure 'dir_auth.second == dir_auth.first'
2857 mds_authority_t auth = get_dir_auth();
2858 assert(auth.first >= 0);
2859 assert(auth.second == auth.first);
2860 auth.second = CDIR_AUTH_UNKNOWN;
2861 inode->mdcache->adjust_subtree_auth(this, auth);
2862 }
7c673cae
FG
2863
2864 // waiters?
2865 finish_waiting(WAIT_UNFREEZE);
2866 } else {
2867 finish_waiting(WAIT_FROZEN, -1);
2868
2869 // freezing. stop it.
2870 assert(state_test(STATE_FREEZINGTREE));
2871 state_clear(STATE_FREEZINGTREE);
2872 --num_freezing_trees;
2873 auth_unpin(this);
2874
2875 finish_waiting(WAIT_UNFREEZE);
2876 }
2877}
2878
2879bool CDir::is_freezing_tree() const
2880{
2881 if (num_freezing_trees == 0)
2882 return false;
2883 const CDir *dir = this;
2884 while (1) {
2885 if (dir->is_freezing_tree_root()) return true;
2886 if (dir->is_subtree_root()) return false;
2887 if (dir->inode->parent)
2888 dir = dir->inode->parent->dir;
2889 else
2890 return false; // root on replica
2891 }
2892}
2893
2894bool CDir::is_frozen_tree() const
2895{
2896 if (num_frozen_trees == 0)
2897 return false;
2898 const CDir *dir = this;
2899 while (1) {
2900 if (dir->is_frozen_tree_root()) return true;
2901 if (dir->is_subtree_root()) return false;
2902 if (dir->inode->parent)
2903 dir = dir->inode->parent->dir;
2904 else
2905 return false; // root on replica
2906 }
2907}
2908
2909CDir *CDir::get_frozen_tree_root()
2910{
2911 assert(is_frozen());
2912 CDir *dir = this;
2913 while (1) {
2914 if (dir->is_frozen_tree_root())
2915 return dir;
2916 if (dir->inode->parent)
2917 dir = dir->inode->parent->dir;
2918 else
2919 ceph_abort();
2920 }
2921}
2922
2923class C_Dir_AuthUnpin : public CDirContext {
2924 public:
2925 explicit C_Dir_AuthUnpin(CDir *d) : CDirContext(d) {}
2926 void finish(int r) override {
2927 dir->auth_unpin(dir->get_inode());
2928 }
2929};
2930
2931void CDir::maybe_finish_freeze()
2932{
2933 if (auth_pins != 1 || dir_auth_pins != 0)
2934 return;
2935
2936 // we can freeze the _dir_ even with nested pins...
2937 if (state_test(STATE_FREEZINGDIR)) {
2938 _freeze_dir();
2939 auth_unpin(this);
2940 finish_waiting(WAIT_FROZEN);
2941 }
2942
2943 if (nested_auth_pins != 0)
2944 return;
2945
2946 if (state_test(STATE_FREEZINGTREE)) {
2947 if (!is_subtree_root() && inode->is_frozen()) {
2948 dout(10) << "maybe_finish_freeze !subtree root and frozen inode, waiting for unfreeze on " << inode << dendl;
2949 // retake an auth_pin...
2950 auth_pin(inode);
2951 // and release it when the parent inode unfreezes
2952 inode->add_waiter(WAIT_UNFREEZE, new C_Dir_AuthUnpin(this));
2953 return;
2954 }
2955
2956 _freeze_tree();
2957 auth_unpin(this);
2958 finish_waiting(WAIT_FROZEN);
2959 }
2960}
2961
2962
2963
2964// FREEZE DIR
2965
2966bool CDir::freeze_dir()
2967{
2968 assert(!is_frozen());
2969 assert(!is_freezing());
2970
2971 auth_pin(this);
2972 if (is_freezeable_dir(true)) {
2973 _freeze_dir();
2974 auth_unpin(this);
2975 return true;
2976 } else {
2977 state_set(STATE_FREEZINGDIR);
2978 dout(10) << "freeze_dir + wait " << *this << dendl;
2979 return false;
2980 }
2981}
2982
2983void CDir::_freeze_dir()
2984{
2985 dout(10) << "_freeze_dir " << *this << dendl;
2986 //assert(is_freezeable_dir(true));
2987 // not always true during split because the original fragment may have frozen a while
2988 // ago and we're just now getting around to breaking it up.
2989
2990 state_clear(STATE_FREEZINGDIR);
2991 state_set(STATE_FROZENDIR);
2992 get(PIN_FROZEN);
2993
2994 if (is_auth() && !is_subtree_root())
2995 inode->auth_pin(this); // auth_pin for duration of freeze
2996}
2997
2998
2999void CDir::unfreeze_dir()
3000{
3001 dout(10) << "unfreeze_dir " << *this << dendl;
3002
3003 if (state_test(STATE_FROZENDIR)) {
3004 state_clear(STATE_FROZENDIR);
3005 put(PIN_FROZEN);
3006
3007 // unpin (may => FREEZEABLE) FIXME: is this order good?
3008 if (is_auth() && !is_subtree_root())
3009 inode->auth_unpin(this);
3010
3011 finish_waiting(WAIT_UNFREEZE);
3012 } else {
3013 finish_waiting(WAIT_FROZEN, -1);
3014
3015 // still freezing. stop.
3016 assert(state_test(STATE_FREEZINGDIR));
3017 state_clear(STATE_FREEZINGDIR);
3018 auth_unpin(this);
3019
3020 finish_waiting(WAIT_UNFREEZE);
3021 }
3022}
3023
3024/**
3025 * Slightly less complete than operator<<, because this is intended
3026 * for identifying a directory and its state rather than for dumping
3027 * debug output.
3028 */
3029void CDir::dump(Formatter *f) const
3030{
3031 assert(f != NULL);
3032
3033 f->dump_stream("path") << get_path();
3034
3035 f->dump_stream("dirfrag") << dirfrag();
3036 f->dump_int("snapid_first", first);
3037
3038 f->dump_stream("projected_version") << get_projected_version();
3039 f->dump_stream("version") << get_version();
3040 f->dump_stream("committing_version") << get_committing_version();
3041 f->dump_stream("committed_version") << get_committed_version();
3042
3043 f->dump_bool("is_rep", is_rep());
3044
3045 if (get_dir_auth() != CDIR_AUTH_DEFAULT) {
3046 if (get_dir_auth().second == CDIR_AUTH_UNKNOWN) {
3047 f->dump_stream("dir_auth") << get_dir_auth().first;
3048 } else {
3049 f->dump_stream("dir_auth") << get_dir_auth();
3050 }
3051 } else {
3052 f->dump_string("dir_auth", "");
3053 }
3054
3055 f->open_array_section("states");
3056 MDSCacheObject::dump_states(f);
3057 if (state_test(CDir::STATE_COMPLETE)) f->dump_string("state", "complete");
3058 if (state_test(CDir::STATE_FREEZINGTREE)) f->dump_string("state", "freezingtree");
3059 if (state_test(CDir::STATE_FROZENTREE)) f->dump_string("state", "frozentree");
3060 if (state_test(CDir::STATE_FROZENDIR)) f->dump_string("state", "frozendir");
3061 if (state_test(CDir::STATE_FREEZINGDIR)) f->dump_string("state", "freezingdir");
3062 if (state_test(CDir::STATE_EXPORTBOUND)) f->dump_string("state", "exportbound");
3063 if (state_test(CDir::STATE_IMPORTBOUND)) f->dump_string("state", "importbound");
3064 if (state_test(CDir::STATE_BADFRAG)) f->dump_string("state", "badfrag");
3065 f->close_section();
3066
3067 MDSCacheObject::dump(f);
3068}
3069
28e407b8
AA
3070void CDir::dump_load(Formatter *f, utime_t now, const DecayRate& rate)
3071{
3072 f->dump_stream("path") << get_path();
3073 f->dump_stream("dirfrag") << dirfrag();
3074
3075 f->open_object_section("pop_me");
3076 pop_me.dump(f, now, rate);
3077 f->close_section();
3078
3079 f->open_object_section("pop_nested");
3080 pop_nested.dump(f, now, rate);
3081 f->close_section();
3082
3083 f->open_object_section("pop_auth_subtree");
3084 pop_auth_subtree.dump(f, now, rate);
3085 f->close_section();
3086
3087 f->open_object_section("pop_auth_subtree_nested");
3088 pop_auth_subtree_nested.dump(f, now, rate);
3089 f->close_section();
3090}
3091
7c673cae
FG
3092/****** Scrub Stuff *******/
3093
3094void CDir::scrub_info_create() const
3095{
3096 assert(!scrub_infop);
3097
3098 // break out of const-land to set up implicit initial state
3099 CDir *me = const_cast<CDir*>(this);
3100 fnode_t *fn = me->get_projected_fnode();
3101
3102 std::unique_ptr<scrub_info_t> si(new scrub_info_t());
3103
3104 si->last_recursive.version = si->recursive_start.version =
3105 fn->recursive_scrub_version;
3106 si->last_recursive.time = si->recursive_start.time =
3107 fn->recursive_scrub_stamp;
3108
3109 si->last_local.version = fn->localized_scrub_version;
3110 si->last_local.time = fn->localized_scrub_stamp;
3111
3112 me->scrub_infop.swap(si);
3113}
3114
3115void CDir::scrub_initialize(const ScrubHeaderRefConst& header)
3116{
3117 dout(20) << __func__ << dendl;
3118 assert(is_complete());
3119 assert(header != nullptr);
3120
3121 // FIXME: weird implicit construction, is someone else meant
3122 // to be calling scrub_info_create first?
3123 scrub_info();
3124 assert(scrub_infop && !scrub_infop->directory_scrubbing);
3125
3126 scrub_infop->recursive_start.version = get_projected_version();
3127 scrub_infop->recursive_start.time = ceph_clock_now();
3128
3129 scrub_infop->directories_to_scrub.clear();
3130 scrub_infop->directories_scrubbing.clear();
3131 scrub_infop->directories_scrubbed.clear();
3132 scrub_infop->others_to_scrub.clear();
3133 scrub_infop->others_scrubbing.clear();
3134 scrub_infop->others_scrubbed.clear();
3135
94b18763 3136 for (auto i = items.begin();
7c673cae
FG
3137 i != items.end();
3138 ++i) {
3139 // TODO: handle snapshot scrubbing
3140 if (i->first.snapid != CEPH_NOSNAP)
3141 continue;
3142
3143 CDentry::linkage_t *dnl = i->second->get_projected_linkage();
3144 if (dnl->is_primary()) {
3145 if (dnl->get_inode()->is_dir())
3146 scrub_infop->directories_to_scrub.insert(i->first);
3147 else
3148 scrub_infop->others_to_scrub.insert(i->first);
3149 } else if (dnl->is_remote()) {
3150 // TODO: check remote linkage
3151 }
3152 }
3153 scrub_infop->directory_scrubbing = true;
3154 scrub_infop->header = header;
3155}
3156
3157void CDir::scrub_finished()
3158{
3159 dout(20) << __func__ << dendl;
3160 assert(scrub_infop && scrub_infop->directory_scrubbing);
3161
3162 assert(scrub_infop->directories_to_scrub.empty());
3163 assert(scrub_infop->directories_scrubbing.empty());
3164 scrub_infop->directories_scrubbed.clear();
3165 assert(scrub_infop->others_to_scrub.empty());
3166 assert(scrub_infop->others_scrubbing.empty());
3167 scrub_infop->others_scrubbed.clear();
3168 scrub_infop->directory_scrubbing = false;
3169
3170 scrub_infop->last_recursive = scrub_infop->recursive_start;
3171 scrub_infop->last_scrub_dirty = true;
3172}
3173
94b18763 3174int CDir::_next_dentry_on_set(dentry_key_set &dns, bool missing_okay,
7c673cae
FG
3175 MDSInternalContext *cb, CDentry **dnout)
3176{
3177 dentry_key_t dnkey;
3178 CDentry *dn;
3179
3180 while (!dns.empty()) {
3181 set<dentry_key_t>::iterator front = dns.begin();
3182 dnkey = *front;
3183 dn = lookup(dnkey.name);
3184 if (!dn) {
3185 if (!is_complete() &&
3186 (!has_bloom() || is_in_bloom(dnkey.name))) {
3187 // need to re-read this dirfrag
3188 fetch(cb);
3189 return EAGAIN;
3190 }
3191 // okay, we lost it
3192 if (missing_okay) {
3193 dout(15) << " we no longer have directory dentry "
3194 << dnkey.name << ", assuming it got renamed" << dendl;
3195 dns.erase(dnkey);
3196 continue;
3197 } else {
3198 dout(5) << " we lost dentry " << dnkey.name
3199 << ", bailing out because that's impossible!" << dendl;
3200 ceph_abort();
3201 }
3202 }
3203 // okay, we got a dentry
3204 dns.erase(dnkey);
3205
3206 if (dn->get_projected_version() < scrub_infop->last_recursive.version &&
3207 !(scrub_infop->header->get_force())) {
3208 dout(15) << " skip dentry " << dnkey.name
3209 << ", no change since last scrub" << dendl;
3210 continue;
94b18763
FG
3211 }
3212
3213 if (!dn->get_linkage()->is_primary()) {
3214 dout(15) << " skip dentry " << dnkey.name
3215 << ", no longer primary" << dendl;
3216 continue;
7c673cae
FG
3217 }
3218
3219 *dnout = dn;
3220 return 0;
3221 }
3222 *dnout = NULL;
3223 return ENOENT;
3224}
3225
3226int CDir::scrub_dentry_next(MDSInternalContext *cb, CDentry **dnout)
3227{
3228 dout(20) << __func__ << dendl;
3229 assert(scrub_infop && scrub_infop->directory_scrubbing);
3230
3231 dout(20) << "trying to scrub directories underneath us" << dendl;
3232 int rval = _next_dentry_on_set(scrub_infop->directories_to_scrub, true,
3233 cb, dnout);
3234 if (rval == 0) {
3235 dout(20) << __func__ << " inserted to directories scrubbing: "
3236 << *dnout << dendl;
3237 scrub_infop->directories_scrubbing.insert((*dnout)->key());
3238 } else if (rval == EAGAIN) {
3239 // we don't need to do anything else
3240 } else { // we emptied out the directory scrub set
3241 assert(rval == ENOENT);
3242 dout(20) << "no directories left, moving on to other kinds of dentries"
3243 << dendl;
3244
3245 rval = _next_dentry_on_set(scrub_infop->others_to_scrub, false, cb, dnout);
3246 if (rval == 0) {
3247 dout(20) << __func__ << " inserted to others scrubbing: "
3248 << *dnout << dendl;
3249 scrub_infop->others_scrubbing.insert((*dnout)->key());
3250 }
3251 }
3252 dout(20) << " returning " << rval << " with dn=" << *dnout << dendl;
3253 return rval;
3254}
3255
3256void CDir::scrub_dentries_scrubbing(list<CDentry*> *out_dentries)
3257{
3258 dout(20) << __func__ << dendl;
3259 assert(scrub_infop && scrub_infop->directory_scrubbing);
3260
3261 for (set<dentry_key_t>::iterator i =
3262 scrub_infop->directories_scrubbing.begin();
3263 i != scrub_infop->directories_scrubbing.end();
3264 ++i) {
3265 CDentry *d = lookup(i->name, i->snapid);
3266 assert(d);
3267 out_dentries->push_back(d);
3268 }
3269 for (set<dentry_key_t>::iterator i = scrub_infop->others_scrubbing.begin();
3270 i != scrub_infop->others_scrubbing.end();
3271 ++i) {
3272 CDentry *d = lookup(i->name, i->snapid);
3273 assert(d);
3274 out_dentries->push_back(d);
3275 }
3276}
3277
3278void CDir::scrub_dentry_finished(CDentry *dn)
3279{
3280 dout(20) << __func__ << " on dn " << *dn << dendl;
3281 assert(scrub_infop && scrub_infop->directory_scrubbing);
3282 dentry_key_t dn_key = dn->key();
3283 if (scrub_infop->directories_scrubbing.erase(dn_key)) {
3284 scrub_infop->directories_scrubbed.insert(dn_key);
3285 } else {
3286 assert(scrub_infop->others_scrubbing.count(dn_key));
3287 scrub_infop->others_scrubbing.erase(dn_key);
3288 scrub_infop->others_scrubbed.insert(dn_key);
3289 }
3290}
3291
3292void CDir::scrub_maybe_delete_info()
3293{
3294 if (scrub_infop &&
3295 !scrub_infop->directory_scrubbing &&
3296 !scrub_infop->need_scrub_local &&
3297 !scrub_infop->last_scrub_dirty &&
3298 !scrub_infop->pending_scrub_error &&
3299 scrub_infop->dirty_scrub_stamps.empty()) {
3300 scrub_infop.reset();
3301 }
3302}
3303
3304bool CDir::scrub_local()
3305{
3306 assert(is_complete());
3307 bool rval = check_rstats(true);
3308
3309 scrub_info();
3310 if (rval) {
3311 scrub_infop->last_local.time = ceph_clock_now();
3312 scrub_infop->last_local.version = get_projected_version();
3313 scrub_infop->pending_scrub_error = false;
3314 scrub_infop->last_scrub_dirty = true;
3315 } else {
3316 scrub_infop->pending_scrub_error = true;
3317 if (scrub_infop->header->get_repair())
3318 cache->repair_dirfrag_stats(this);
3319 }
3320 return rval;
3321}
3322
3323std::string CDir::get_path() const
3324{
3325 std::string path;
3326 get_inode()->make_path_string(path, true);
3327 return path;
3328}
3329
3330bool CDir::should_split_fast() const
3331{
3332 // Max size a fragment can be before trigger fast splitting
3333 int fast_limit = g_conf->mds_bal_split_size * g_conf->mds_bal_fragment_fast_factor;
3334
3335 // Fast path: the sum of accounted size and null dentries does not
3336 // exceed threshold: we definitely are not over it.
3337 if (get_frag_size() + get_num_head_null() <= fast_limit) {
3338 return false;
3339 }
3340
3341 // Fast path: the accounted size of the frag exceeds threshold: we
3342 // definitely are over it
3343 if (get_frag_size() > fast_limit) {
3344 return true;
3345 }
3346
3347 int64_t effective_size = 0;
3348
3349 for (const auto &p : items) {
3350 const CDentry *dn = p.second;
3351 if (!dn->get_projected_linkage()->is_null()) {
3352 effective_size++;
3353 }
3354 }
3355
3356 return effective_size > fast_limit;
3357}
3358
181888fb 3359MEMPOOL_DEFINE_OBJECT_FACTORY(CDir, co_dir, mds_co);