]> git.proxmox.com Git - ceph.git/blob - ceph/src/mds/CDir.cc
import ceph quincy 17.2.1
[ceph.git] / ceph / src / mds / CDir.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include <string_view>
16 #include <algorithm>
17
18 #include "include/types.h"
19
20 #include "CDir.h"
21 #include "CDentry.h"
22 #include "CInode.h"
23 #include "Mutation.h"
24
25 #include "MDSMap.h"
26 #include "MDSRank.h"
27 #include "MDCache.h"
28 #include "Locker.h"
29 #include "MDLog.h"
30 #include "LogSegment.h"
31 #include "MDBalancer.h"
32
33 #include "common/bloom_filter.hpp"
34 #include "include/Context.h"
35 #include "common/Clock.h"
36
37 #include "osdc/Objecter.h"
38
39 #include "common/config.h"
40 #include "include/ceph_assert.h"
41 #include "include/compat.h"
42
43 #define dout_context g_ceph_context
44 #define dout_subsys ceph_subsys_mds
45 #undef dout_prefix
46 #define dout_prefix *_dout << "mds." << mdcache->mds->get_nodeid() << ".cache.dir(" << this->dirfrag() << ") "
47
48 using namespace std;
49
50 int CDir::num_frozen_trees = 0;
51 int CDir::num_freezing_trees = 0;
52
53 CDir::fnode_const_ptr CDir::empty_fnode = CDir::allocate_fnode();
54
55 class CDirContext : public MDSContext
56 {
57 protected:
58 CDir *dir;
59 MDSRank* get_mds() override {return dir->mdcache->mds;}
60
61 public:
62 explicit CDirContext(CDir *d) : dir(d) {
63 ceph_assert(dir != NULL);
64 }
65 };
66
67
68 class CDirIOContext : public MDSIOContextBase
69 {
70 protected:
71 CDir *dir;
72 MDSRank* get_mds() override {return dir->mdcache->mds;}
73
74 public:
75 explicit CDirIOContext(CDir *d) : dir(d) {
76 ceph_assert(dir != NULL);
77 }
78 };
79
80
81 // PINS
82 //int cdir_pins[CDIR_NUM_PINS] = { 0,0,0,0,0,0,0,0,0,0,0,0,0,0 };
83
84
85 ostream& operator<<(ostream& out, const CDir& dir)
86 {
87 out << "[dir " << dir.dirfrag() << " " << dir.get_path() << "/"
88 << " [" << dir.first << ",head]";
89 if (dir.is_auth()) {
90 out << " auth";
91 if (dir.is_replicated())
92 out << dir.get_replicas();
93
94 if (dir.is_projected())
95 out << " pv=" << dir.get_projected_version();
96 out << " v=" << dir.get_version();
97 out << " cv=" << dir.get_committing_version();
98 out << "/" << dir.get_committed_version();
99 } else {
100 mds_authority_t a = dir.authority();
101 out << " rep@" << a.first;
102 if (a.second != CDIR_AUTH_UNKNOWN)
103 out << "," << a.second;
104 out << "." << dir.get_replica_nonce();
105 }
106
107 if (dir.is_rep()) out << " REP";
108
109 if (dir.get_dir_auth() != CDIR_AUTH_DEFAULT) {
110 if (dir.get_dir_auth().second == CDIR_AUTH_UNKNOWN)
111 out << " dir_auth=" << dir.get_dir_auth().first;
112 else
113 out << " dir_auth=" << dir.get_dir_auth();
114 }
115
116 if (dir.get_auth_pins() || dir.get_dir_auth_pins()) {
117 out << " ap=" << dir.get_auth_pins()
118 << "+" << dir.get_dir_auth_pins();
119 #ifdef MDS_AUTHPIN_SET
120 dir.print_authpin_set(out);
121 #endif
122 }
123
124 out << " state=" << dir.get_state();
125 if (dir.state_test(CDir::STATE_COMPLETE)) out << "|complete";
126 if (dir.state_test(CDir::STATE_FREEZINGTREE)) out << "|freezingtree";
127 if (dir.state_test(CDir::STATE_FROZENTREE)) out << "|frozentree";
128 if (dir.state_test(CDir::STATE_AUXSUBTREE)) out << "|auxsubtree";
129 if (dir.state_test(CDir::STATE_FROZENDIR)) out << "|frozendir";
130 if (dir.state_test(CDir::STATE_FREEZINGDIR)) out << "|freezingdir";
131 if (dir.state_test(CDir::STATE_EXPORTBOUND)) out << "|exportbound";
132 if (dir.state_test(CDir::STATE_IMPORTBOUND)) out << "|importbound";
133 if (dir.state_test(CDir::STATE_BADFRAG)) out << "|badfrag";
134 if (dir.state_test(CDir::STATE_FRAGMENTING)) out << "|fragmenting";
135 if (dir.state_test(CDir::STATE_CREATING)) out << "|creating";
136 if (dir.state_test(CDir::STATE_COMMITTING)) out << "|committing";
137 if (dir.state_test(CDir::STATE_FETCHING)) out << "|fetching";
138 if (dir.state_test(CDir::STATE_EXPORTING)) out << "|exporting";
139 if (dir.state_test(CDir::STATE_IMPORTING)) out << "|importing";
140 if (dir.state_test(CDir::STATE_STICKY)) out << "|sticky";
141 if (dir.state_test(CDir::STATE_DNPINNEDFRAG)) out << "|dnpinnedfrag";
142 if (dir.state_test(CDir::STATE_ASSIMRSTAT)) out << "|assimrstat";
143
144 // fragstat
145 out << " " << dir.get_fnode()->fragstat;
146 if (!(dir.get_fnode()->fragstat == dir.get_fnode()->accounted_fragstat))
147 out << "/" << dir.get_fnode()->accounted_fragstat;
148 if (g_conf()->mds_debug_scatterstat && dir.is_projected()) {
149 const auto& pf = dir.get_projected_fnode();
150 out << "->" << pf->fragstat;
151 if (!(pf->fragstat == pf->accounted_fragstat))
152 out << "/" << pf->accounted_fragstat;
153 }
154
155 // rstat
156 out << " " << dir.get_fnode()->rstat;
157 if (!(dir.get_fnode()->rstat == dir.get_fnode()->accounted_rstat))
158 out << "/" << dir.get_fnode()->accounted_rstat;
159 if (g_conf()->mds_debug_scatterstat && dir.is_projected()) {
160 const auto& pf = dir.get_projected_fnode();
161 out << "->" << pf->rstat;
162 if (!(pf->rstat == pf->accounted_rstat))
163 out << "/" << pf->accounted_rstat;
164 }
165
166 out << " hs=" << dir.get_num_head_items() << "+" << dir.get_num_head_null();
167 out << ",ss=" << dir.get_num_snap_items() << "+" << dir.get_num_snap_null();
168 if (dir.get_num_dirty())
169 out << " dirty=" << dir.get_num_dirty();
170
171 if (dir.get_num_ref()) {
172 out << " |";
173 dir.print_pin_set(out);
174 }
175
176 out << " " << &dir;
177 return out << "]";
178 }
179
180
181 void CDir::print(ostream& out)
182 {
183 out << *this;
184 }
185
186
187
188
189 ostream& CDir::print_db_line_prefix(ostream& out)
190 {
191 return out << ceph_clock_now() << " mds." << mdcache->mds->get_nodeid() << ".cache.dir(" << this->dirfrag() << ") ";
192 }
193
194
195
196 // -------------------------------------------------------------------
197 // CDir
198
199 CDir::CDir(CInode *in, frag_t fg, MDCache *mdc, bool auth) :
200 mdcache(mdc), inode(in), frag(fg),
201 dirty_rstat_inodes(member_offset(CInode, dirty_rstat_item)),
202 dirty_dentries(member_offset(CDentry, item_dir_dirty)),
203 item_dirty(this), item_new(this),
204 lock_caches_with_auth_pins(member_offset(MDLockCache::DirItem, item_dir)),
205 freezing_inodes(member_offset(CInode, item_freezing_inode)),
206 dir_rep(REP_NONE),
207 pop_me(mdc->decayrate),
208 pop_nested(mdc->decayrate),
209 pop_auth_subtree(mdc->decayrate),
210 pop_auth_subtree_nested(mdc->decayrate),
211 pop_spread(mdc->decayrate),
212 pop_lru_subdirs(member_offset(CInode, item_pop_lru)),
213 dir_auth(CDIR_AUTH_DEFAULT)
214 {
215 // auth
216 ceph_assert(in->is_dir());
217 if (auth)
218 state_set(STATE_AUTH);
219 }
220
221 /**
222 * Check the recursive statistics on size for consistency.
223 * If mds_debug_scatterstat is enabled, assert for correctness,
224 * otherwise just print out the mismatch and continue.
225 */
226 bool CDir::check_rstats(bool scrub)
227 {
228 if (!g_conf()->mds_debug_scatterstat && !scrub)
229 return true;
230
231 dout(25) << "check_rstats on " << this << dendl;
232 if (!is_complete() || !is_auth() || is_frozen()) {
233 dout(3) << "check_rstats " << (scrub ? "(scrub) " : "")
234 << "bailing out -- incomplete or non-auth or frozen dir on "
235 << *this << dendl;
236 return !scrub;
237 }
238
239 frag_info_t frag_info;
240 nest_info_t nest_info;
241 for (auto i = items.begin(); i != items.end(); ++i) {
242 if (i->second->last != CEPH_NOSNAP)
243 continue;
244 CDentry::linkage_t *dnl = i->second->get_linkage();
245 if (dnl->is_primary()) {
246 CInode *in = dnl->get_inode();
247 nest_info.add(in->get_inode()->accounted_rstat);
248 if (in->is_dir())
249 frag_info.nsubdirs++;
250 else
251 frag_info.nfiles++;
252 } else if (dnl->is_remote())
253 frag_info.nfiles++;
254 }
255
256 bool good = true;
257 // fragstat
258 if(!frag_info.same_sums(fnode->fragstat)) {
259 dout(1) << "mismatch between head items and fnode.fragstat! printing dentries" << dendl;
260 dout(1) << "get_num_head_items() = " << get_num_head_items()
261 << "; fnode.fragstat.nfiles=" << fnode->fragstat.nfiles
262 << " fnode.fragstat.nsubdirs=" << fnode->fragstat.nsubdirs << dendl;
263 good = false;
264 } else {
265 dout(20) << "get_num_head_items() = " << get_num_head_items()
266 << "; fnode.fragstat.nfiles=" << fnode->fragstat.nfiles
267 << " fnode.fragstat.nsubdirs=" << fnode->fragstat.nsubdirs << dendl;
268 }
269
270 // rstat
271 if (!nest_info.same_sums(fnode->rstat)) {
272 dout(1) << "mismatch between child accounted_rstats and my rstats!" << dendl;
273 dout(1) << "total of child dentries: " << nest_info << dendl;
274 dout(1) << "my rstats: " << fnode->rstat << dendl;
275 good = false;
276 } else {
277 dout(20) << "total of child dentries: " << nest_info << dendl;
278 dout(20) << "my rstats: " << fnode->rstat << dendl;
279 }
280
281 if (!good) {
282 if (!scrub) {
283 for (auto i = items.begin(); i != items.end(); ++i) {
284 CDentry *dn = i->second;
285 if (dn->get_linkage()->is_primary()) {
286 CInode *in = dn->get_linkage()->inode;
287 dout(1) << *dn << " rstat " << in->get_inode()->accounted_rstat << dendl;
288 } else {
289 dout(1) << *dn << dendl;
290 }
291 }
292
293 ceph_assert(frag_info.nfiles == fnode->fragstat.nfiles);
294 ceph_assert(frag_info.nsubdirs == fnode->fragstat.nsubdirs);
295 ceph_assert(nest_info.rbytes == fnode->rstat.rbytes);
296 ceph_assert(nest_info.rfiles == fnode->rstat.rfiles);
297 ceph_assert(nest_info.rsubdirs == fnode->rstat.rsubdirs);
298 }
299 }
300 dout(10) << "check_rstats complete on " << this << dendl;
301 return good;
302 }
303
304 void CDir::adjust_num_inodes_with_caps(int d)
305 {
306 // FIXME: smarter way to decide if adding 'this' to open file table
307 if (num_inodes_with_caps == 0 && d > 0)
308 mdcache->open_file_table.add_dirfrag(this);
309 else if (num_inodes_with_caps > 0 && num_inodes_with_caps == -d)
310 mdcache->open_file_table.remove_dirfrag(this);
311
312 num_inodes_with_caps += d;
313 ceph_assert(num_inodes_with_caps >= 0);
314 }
315
316 CDentry *CDir::lookup(std::string_view name, snapid_t snap)
317 {
318 dout(20) << "lookup (" << snap << ", '" << name << "')" << dendl;
319 auto iter = items.lower_bound(dentry_key_t(snap, name, inode->hash_dentry_name(name)));
320 if (iter == items.end())
321 return 0;
322 if (iter->second->get_name() == name &&
323 iter->second->first <= snap &&
324 iter->second->last >= snap) {
325 dout(20) << " hit -> " << iter->first << dendl;
326 return iter->second;
327 }
328 dout(20) << " miss -> " << iter->first << dendl;
329 return 0;
330 }
331
332 CDentry *CDir::lookup_exact_snap(std::string_view name, snapid_t last) {
333 dout(20) << __func__ << " (" << last << ", '" << name << "')" << dendl;
334 auto p = items.find(dentry_key_t(last, name, inode->hash_dentry_name(name)));
335 if (p == items.end())
336 return NULL;
337 return p->second;
338 }
339
340 /***
341 * linking fun
342 */
343
344 CDentry* CDir::add_null_dentry(std::string_view dname,
345 snapid_t first, snapid_t last)
346 {
347 // foreign
348 ceph_assert(lookup_exact_snap(dname, last) == 0);
349
350 // create dentry
351 CDentry* dn = new CDentry(dname, inode->hash_dentry_name(dname), "", first, last);
352 if (is_auth())
353 dn->state_set(CDentry::STATE_AUTH);
354
355 mdcache->bottom_lru.lru_insert_mid(dn);
356 dn->state_set(CDentry::STATE_BOTTOMLRU);
357
358 dn->dir = this;
359 dn->version = get_projected_version();
360
361 // add to dir
362 ceph_assert(items.count(dn->key()) == 0);
363 //assert(null_items.count(dn->get_name()) == 0);
364
365 items[dn->key()] = dn;
366 if (last == CEPH_NOSNAP)
367 num_head_null++;
368 else
369 num_snap_null++;
370
371 if (state_test(CDir::STATE_DNPINNEDFRAG)) {
372 dn->get(CDentry::PIN_FRAGMENTING);
373 dn->state_set(CDentry::STATE_FRAGMENTING);
374 }
375
376 dout(12) << __func__ << " " << *dn << dendl;
377
378 // pin?
379 if (get_num_any() == 1)
380 get(PIN_CHILD);
381
382 ceph_assert(get_num_any() == items.size());
383 return dn;
384 }
385
386
387 CDentry* CDir::add_primary_dentry(std::string_view dname, CInode *in,
388 mempool::mds_co::string alternate_name,
389 snapid_t first, snapid_t last)
390 {
391 // primary
392 ceph_assert(lookup_exact_snap(dname, last) == 0);
393
394 // create dentry
395 CDentry* dn = new CDentry(dname, inode->hash_dentry_name(dname), std::move(alternate_name), first, last);
396 if (is_auth())
397 dn->state_set(CDentry::STATE_AUTH);
398 if (is_auth() || !inode->is_stray()) {
399 mdcache->lru.lru_insert_mid(dn);
400 } else {
401 mdcache->bottom_lru.lru_insert_mid(dn);
402 dn->state_set(CDentry::STATE_BOTTOMLRU);
403 }
404
405 dn->dir = this;
406 dn->version = get_projected_version();
407
408 // add to dir
409 ceph_assert(items.count(dn->key()) == 0);
410 //assert(null_items.count(dn->get_name()) == 0);
411
412 items[dn->key()] = dn;
413
414 dn->get_linkage()->inode = in;
415
416 link_inode_work(dn, in);
417
418 if (dn->last == CEPH_NOSNAP)
419 num_head_items++;
420 else
421 num_snap_items++;
422
423 if (state_test(CDir::STATE_DNPINNEDFRAG)) {
424 dn->get(CDentry::PIN_FRAGMENTING);
425 dn->state_set(CDentry::STATE_FRAGMENTING);
426 }
427
428 dout(12) << __func__ << " " << *dn << dendl;
429
430 // pin?
431 if (get_num_any() == 1)
432 get(PIN_CHILD);
433 ceph_assert(get_num_any() == items.size());
434 return dn;
435 }
436
437 CDentry* CDir::add_remote_dentry(std::string_view dname, inodeno_t ino, unsigned char d_type,
438 mempool::mds_co::string alternate_name,
439 snapid_t first, snapid_t last)
440 {
441 // foreign
442 ceph_assert(lookup_exact_snap(dname, last) == 0);
443
444 // create dentry
445 CDentry* dn = new CDentry(dname, inode->hash_dentry_name(dname), std::move(alternate_name), ino, d_type, first, last);
446 if (is_auth())
447 dn->state_set(CDentry::STATE_AUTH);
448 mdcache->lru.lru_insert_mid(dn);
449
450 dn->dir = this;
451 dn->version = get_projected_version();
452
453 // add to dir
454 ceph_assert(items.count(dn->key()) == 0);
455 //assert(null_items.count(dn->get_name()) == 0);
456
457 items[dn->key()] = dn;
458 if (last == CEPH_NOSNAP)
459 num_head_items++;
460 else
461 num_snap_items++;
462
463 if (state_test(CDir::STATE_DNPINNEDFRAG)) {
464 dn->get(CDentry::PIN_FRAGMENTING);
465 dn->state_set(CDentry::STATE_FRAGMENTING);
466 }
467
468 dout(12) << __func__ << " " << *dn << dendl;
469
470 // pin?
471 if (get_num_any() == 1)
472 get(PIN_CHILD);
473
474 ceph_assert(get_num_any() == items.size());
475 return dn;
476 }
477
478
479
480 void CDir::remove_dentry(CDentry *dn)
481 {
482 dout(12) << __func__ << " " << *dn << dendl;
483
484 // there should be no client leases at this point!
485 ceph_assert(dn->client_lease_map.empty());
486
487 if (state_test(CDir::STATE_DNPINNEDFRAG)) {
488 dn->put(CDentry::PIN_FRAGMENTING);
489 dn->state_clear(CDentry::STATE_FRAGMENTING);
490 }
491
492 if (dn->get_linkage()->is_null()) {
493 if (dn->last == CEPH_NOSNAP)
494 num_head_null--;
495 else
496 num_snap_null--;
497 } else {
498 if (dn->last == CEPH_NOSNAP)
499 num_head_items--;
500 else
501 num_snap_items--;
502 }
503
504 if (!dn->get_linkage()->is_null())
505 // detach inode and dentry
506 unlink_inode_work(dn);
507
508 // remove from list
509 ceph_assert(items.count(dn->key()) == 1);
510 items.erase(dn->key());
511
512 // clean?
513 if (dn->is_dirty())
514 dn->mark_clean();
515
516 if (dn->state_test(CDentry::STATE_BOTTOMLRU))
517 mdcache->bottom_lru.lru_remove(dn);
518 else
519 mdcache->lru.lru_remove(dn);
520 delete dn;
521
522 // unpin?
523 if (get_num_any() == 0)
524 put(PIN_CHILD);
525 ceph_assert(get_num_any() == items.size());
526 }
527
528 void CDir::link_remote_inode(CDentry *dn, CInode *in)
529 {
530 link_remote_inode(dn, in->ino(), IFTODT(in->get_projected_inode()->mode));
531 }
532
533 void CDir::link_remote_inode(CDentry *dn, inodeno_t ino, unsigned char d_type)
534 {
535 dout(12) << __func__ << " " << *dn << " remote " << ino << dendl;
536 ceph_assert(dn->get_linkage()->is_null());
537
538 dn->get_linkage()->set_remote(ino, d_type);
539
540 if (dn->state_test(CDentry::STATE_BOTTOMLRU)) {
541 mdcache->bottom_lru.lru_remove(dn);
542 mdcache->lru.lru_insert_mid(dn);
543 dn->state_clear(CDentry::STATE_BOTTOMLRU);
544 }
545
546 if (dn->last == CEPH_NOSNAP) {
547 num_head_items++;
548 num_head_null--;
549 } else {
550 num_snap_items++;
551 num_snap_null--;
552 }
553 ceph_assert(get_num_any() == items.size());
554 }
555
556 void CDir::link_primary_inode(CDentry *dn, CInode *in)
557 {
558 dout(12) << __func__ << " " << *dn << " " << *in << dendl;
559 ceph_assert(dn->get_linkage()->is_null());
560
561 dn->get_linkage()->inode = in;
562
563 link_inode_work(dn, in);
564
565 if (dn->state_test(CDentry::STATE_BOTTOMLRU) &&
566 (is_auth() || !inode->is_stray())) {
567 mdcache->bottom_lru.lru_remove(dn);
568 mdcache->lru.lru_insert_mid(dn);
569 dn->state_clear(CDentry::STATE_BOTTOMLRU);
570 }
571
572 if (dn->last == CEPH_NOSNAP) {
573 num_head_items++;
574 num_head_null--;
575 } else {
576 num_snap_items++;
577 num_snap_null--;
578 }
579
580 ceph_assert(get_num_any() == items.size());
581 }
582
583 void CDir::link_inode_work( CDentry *dn, CInode *in)
584 {
585 ceph_assert(dn->get_linkage()->get_inode() == in);
586 in->set_primary_parent(dn);
587
588 // set inode version
589 //in->inode.version = dn->get_version();
590
591 // pin dentry?
592 if (in->get_num_ref())
593 dn->get(CDentry::PIN_INODEPIN);
594
595 if (in->state_test(CInode::STATE_TRACKEDBYOFT))
596 mdcache->open_file_table.notify_link(in);
597 if (in->is_any_caps())
598 adjust_num_inodes_with_caps(1);
599
600 // adjust auth pin count
601 if (in->auth_pins)
602 dn->adjust_nested_auth_pins(in->auth_pins, NULL);
603
604 if (in->is_freezing_inode())
605 freezing_inodes.push_back(&in->item_freezing_inode);
606 else if (in->is_frozen_inode() || in->is_frozen_auth_pin())
607 num_frozen_inodes++;
608
609 // verify open snaprealm parent
610 if (in->snaprealm)
611 in->snaprealm->adjust_parent();
612 else if (in->is_any_caps())
613 in->move_to_realm(inode->find_snaprealm());
614 }
615
616 void CDir::unlink_inode(CDentry *dn, bool adjust_lru)
617 {
618 if (dn->get_linkage()->is_primary()) {
619 dout(12) << __func__ << " " << *dn << " " << *dn->get_linkage()->get_inode() << dendl;
620 } else {
621 dout(12) << __func__ << " " << *dn << dendl;
622 }
623
624 unlink_inode_work(dn);
625
626 if (adjust_lru && !dn->state_test(CDentry::STATE_BOTTOMLRU)) {
627 mdcache->lru.lru_remove(dn);
628 mdcache->bottom_lru.lru_insert_mid(dn);
629 dn->state_set(CDentry::STATE_BOTTOMLRU);
630 }
631
632 if (dn->last == CEPH_NOSNAP) {
633 num_head_items--;
634 num_head_null++;
635 } else {
636 num_snap_items--;
637 num_snap_null++;
638 }
639 ceph_assert(get_num_any() == items.size());
640 }
641
642
643 void CDir::try_remove_unlinked_dn(CDentry *dn)
644 {
645 ceph_assert(dn->dir == this);
646 ceph_assert(dn->get_linkage()->is_null());
647
648 // no pins (besides dirty)?
649 if (dn->get_num_ref() != dn->is_dirty())
650 return;
651
652 // was the dn new?
653 if (dn->is_new()) {
654 dout(10) << __func__ << " " << *dn << " in " << *this << dendl;
655 if (dn->is_dirty())
656 dn->mark_clean();
657 remove_dentry(dn);
658
659 // NOTE: we may not have any more dirty dentries, but the fnode
660 // still changed, so the directory must remain dirty.
661 }
662 }
663
664
665 void CDir::unlink_inode_work(CDentry *dn)
666 {
667 CInode *in = dn->get_linkage()->get_inode();
668
669 if (dn->get_linkage()->is_remote()) {
670 // remote
671 if (in)
672 dn->unlink_remote(dn->get_linkage());
673
674 dn->get_linkage()->set_remote(0, 0);
675 } else if (dn->get_linkage()->is_primary()) {
676 // primary
677 // unpin dentry?
678 if (in->get_num_ref())
679 dn->put(CDentry::PIN_INODEPIN);
680
681 if (in->state_test(CInode::STATE_TRACKEDBYOFT))
682 mdcache->open_file_table.notify_unlink(in);
683 if (in->is_any_caps())
684 adjust_num_inodes_with_caps(-1);
685
686 // unlink auth_pin count
687 if (in->auth_pins)
688 dn->adjust_nested_auth_pins(-in->auth_pins, nullptr);
689
690 if (in->is_freezing_inode())
691 in->item_freezing_inode.remove_myself();
692 else if (in->is_frozen_inode() || in->is_frozen_auth_pin())
693 num_frozen_inodes--;
694
695 // detach inode
696 in->remove_primary_parent(dn);
697 if (in->is_dir())
698 in->item_pop_lru.remove_myself();
699 dn->get_linkage()->inode = 0;
700 } else {
701 ceph_assert(!dn->get_linkage()->is_null());
702 }
703 }
704
705 void CDir::add_to_bloom(CDentry *dn)
706 {
707 ceph_assert(dn->last == CEPH_NOSNAP);
708 if (!bloom) {
709 /* not create bloom filter for incomplete dir that was added by log replay */
710 if (!is_complete())
711 return;
712
713 /* don't maintain bloom filters in standby replay (saves cycles, and also
714 * avoids need to implement clearing it in EExport for #16924) */
715 if (mdcache->mds->is_standby_replay()) {
716 return;
717 }
718
719 unsigned size = get_num_head_items() + get_num_snap_items();
720 if (size < 100) size = 100;
721 bloom.reset(new bloom_filter(size, 1.0 / size, 0));
722 }
723 /* This size and false positive probability is completely random.*/
724 bloom->insert(dn->get_name().data(), dn->get_name().size());
725 }
726
727 bool CDir::is_in_bloom(std::string_view name)
728 {
729 if (!bloom)
730 return false;
731 return bloom->contains(name.data(), name.size());
732 }
733
734 void CDir::remove_null_dentries() {
735 dout(12) << __func__ << " " << *this << dendl;
736
737 auto p = items.begin();
738 while (p != items.end()) {
739 CDentry *dn = p->second;
740 ++p;
741 if (dn->get_linkage()->is_null() && !dn->is_projected())
742 remove_dentry(dn);
743 }
744
745 ceph_assert(num_snap_null == 0);
746 ceph_assert(num_head_null == 0);
747 ceph_assert(get_num_any() == items.size());
748 }
749
750 /** remove dirty null dentries for deleted directory. the dirfrag will be
751 * deleted soon, so it's safe to not commit dirty dentries.
752 *
753 * This is called when a directory is being deleted, a prerequisite
754 * of which is that its children have been unlinked: we expect to only see
755 * null, unprojected dentries here.
756 */
757 void CDir::try_remove_dentries_for_stray()
758 {
759 dout(10) << __func__ << dendl;
760 ceph_assert(get_parent_dir()->inode->is_stray());
761
762 // clear dirty only when the directory was not snapshotted
763 bool clear_dirty = !inode->snaprealm;
764
765 auto p = items.begin();
766 while (p != items.end()) {
767 CDentry *dn = p->second;
768 ++p;
769 if (dn->last == CEPH_NOSNAP) {
770 ceph_assert(!dn->is_projected());
771 ceph_assert(dn->get_linkage()->is_null());
772 if (clear_dirty && dn->is_dirty())
773 dn->mark_clean();
774 // It's OK to remove lease prematurely because we will never link
775 // the dentry to inode again.
776 if (dn->is_any_leases())
777 dn->remove_client_leases(mdcache->mds->locker);
778 if (dn->get_num_ref() == 0)
779 remove_dentry(dn);
780 } else {
781 ceph_assert(!dn->is_projected());
782 CDentry::linkage_t *dnl= dn->get_linkage();
783 CInode *in = NULL;
784 if (dnl->is_primary()) {
785 in = dnl->get_inode();
786 if (clear_dirty && in->is_dirty())
787 in->mark_clean();
788 }
789 if (clear_dirty && dn->is_dirty())
790 dn->mark_clean();
791 if (dn->get_num_ref() == 0) {
792 remove_dentry(dn);
793 if (in)
794 mdcache->remove_inode(in);
795 }
796 }
797 }
798
799 if (clear_dirty && is_dirty())
800 mark_clean();
801 }
802
803 bool CDir::try_trim_snap_dentry(CDentry *dn, const set<snapid_t>& snaps)
804 {
805 ceph_assert(dn->last != CEPH_NOSNAP);
806 set<snapid_t>::const_iterator p = snaps.lower_bound(dn->first);
807 CDentry::linkage_t *dnl= dn->get_linkage();
808 CInode *in = 0;
809 if (dnl->is_primary())
810 in = dnl->get_inode();
811 if ((p == snaps.end() || *p > dn->last) &&
812 (dn->get_num_ref() == dn->is_dirty()) &&
813 (!in || in->get_num_ref() == in->is_dirty())) {
814 dout(10) << " purging snapped " << *dn << dendl;
815 if (in && in->is_dirty())
816 in->mark_clean();
817 remove_dentry(dn);
818 if (in) {
819 dout(10) << " purging snapped " << *in << dendl;
820 mdcache->remove_inode(in);
821 }
822 return true;
823 }
824 return false;
825 }
826
827
828 void CDir::purge_stale_snap_data(const set<snapid_t>& snaps)
829 {
830 dout(10) << __func__ << " " << snaps << dendl;
831
832 auto p = items.begin();
833 while (p != items.end()) {
834 CDentry *dn = p->second;
835 ++p;
836
837 if (dn->last == CEPH_NOSNAP)
838 continue;
839
840 try_trim_snap_dentry(dn, snaps);
841 }
842 }
843
844
845 /**
846 * steal_dentry -- semi-violently move a dentry from one CDir to another
847 * (*) violently, in that nitems, most pins, etc. are not correctly maintained
848 * on the old CDir corpse; must call finish_old_fragment() when finished.
849 */
850 void CDir::steal_dentry(CDentry *dn)
851 {
852 dout(15) << __func__ << " " << *dn << dendl;
853
854 items[dn->key()] = dn;
855
856 dn->dir->items.erase(dn->key());
857 if (dn->dir->items.empty())
858 dn->dir->put(PIN_CHILD);
859
860 if (get_num_any() == 0)
861 get(PIN_CHILD);
862 if (dn->get_linkage()->is_null()) {
863 if (dn->last == CEPH_NOSNAP)
864 num_head_null++;
865 else
866 num_snap_null++;
867 } else if (dn->last == CEPH_NOSNAP) {
868 num_head_items++;
869
870 auto _fnode = _get_fnode();
871
872 if (dn->get_linkage()->is_primary()) {
873 CInode *in = dn->get_linkage()->get_inode();
874 const auto& pi = in->get_projected_inode();
875 if (in->is_dir()) {
876 _fnode->fragstat.nsubdirs++;
877 if (in->item_pop_lru.is_on_list())
878 pop_lru_subdirs.push_back(&in->item_pop_lru);
879 } else {
880 _fnode->fragstat.nfiles++;
881 }
882 _fnode->rstat.rbytes += pi->accounted_rstat.rbytes;
883 _fnode->rstat.rfiles += pi->accounted_rstat.rfiles;
884 _fnode->rstat.rsubdirs += pi->accounted_rstat.rsubdirs;
885 _fnode->rstat.rsnaps += pi->accounted_rstat.rsnaps;
886 if (pi->accounted_rstat.rctime > fnode->rstat.rctime)
887 _fnode->rstat.rctime = pi->accounted_rstat.rctime;
888
889 if (in->is_any_caps())
890 adjust_num_inodes_with_caps(1);
891
892 // move dirty inode rstat to new dirfrag
893 if (in->is_dirty_rstat())
894 dirty_rstat_inodes.push_back(&in->dirty_rstat_item);
895 } else if (dn->get_linkage()->is_remote()) {
896 if (dn->get_linkage()->get_remote_d_type() == DT_DIR)
897 _fnode->fragstat.nsubdirs++;
898 else
899 _fnode->fragstat.nfiles++;
900 }
901 } else {
902 num_snap_items++;
903 if (dn->get_linkage()->is_primary()) {
904 CInode *in = dn->get_linkage()->get_inode();
905 if (in->is_dirty_rstat())
906 dirty_rstat_inodes.push_back(&in->dirty_rstat_item);
907 }
908 }
909
910 {
911 int dap = dn->get_num_dir_auth_pins();
912 if (dap) {
913 adjust_nested_auth_pins(dap, NULL);
914 dn->dir->adjust_nested_auth_pins(-dap, NULL);
915 }
916 }
917
918 if (dn->is_dirty()) {
919 dirty_dentries.push_back(&dn->item_dir_dirty);
920 num_dirty++;
921 }
922
923 dn->dir = this;
924 }
925
926 void CDir::prepare_old_fragment(map<string_snap_t, MDSContext::vec >& dentry_waiters, bool replay)
927 {
928 // auth_pin old fragment for duration so that any auth_pinning
929 // during the dentry migration doesn't trigger side effects
930 if (!replay && is_auth())
931 auth_pin(this);
932
933 if (!waiting_on_dentry.empty()) {
934 for (const auto &p : waiting_on_dentry) {
935 auto &e = dentry_waiters[p.first];
936 for (const auto &waiter : p.second) {
937 e.push_back(waiter);
938 }
939 }
940 waiting_on_dentry.clear();
941 put(PIN_DNWAITER);
942 }
943 }
944
945 void CDir::prepare_new_fragment(bool replay)
946 {
947 if (!replay && is_auth()) {
948 _freeze_dir();
949 mark_complete();
950 }
951 inode->add_dirfrag(this);
952 }
953
954 void CDir::finish_old_fragment(MDSContext::vec& waiters, bool replay)
955 {
956 // take waiters _before_ unfreeze...
957 if (!replay) {
958 take_waiting(WAIT_ANY_MASK, waiters);
959 if (is_auth()) {
960 auth_unpin(this); // pinned in prepare_old_fragment
961 ceph_assert(is_frozen_dir());
962 unfreeze_dir();
963 }
964 }
965
966 ceph_assert(dir_auth_pins == 0);
967 ceph_assert(auth_pins == 0);
968
969 num_head_items = num_head_null = 0;
970 num_snap_items = num_snap_null = 0;
971 adjust_num_inodes_with_caps(-num_inodes_with_caps);
972
973 // this mirrors init_fragment_pins()
974 if (is_auth())
975 clear_replica_map();
976 if (is_dirty())
977 mark_clean();
978 if (state_test(STATE_IMPORTBOUND))
979 put(PIN_IMPORTBOUND);
980 if (state_test(STATE_EXPORTBOUND))
981 put(PIN_EXPORTBOUND);
982 if (is_subtree_root())
983 put(PIN_SUBTREE);
984
985 if (auth_pins > 0)
986 put(PIN_AUTHPIN);
987
988 ceph_assert(get_num_ref() == (state_test(STATE_STICKY) ? 1:0));
989 }
990
991 void CDir::init_fragment_pins()
992 {
993 if (is_replicated())
994 get(PIN_REPLICATED);
995 if (state_test(STATE_DIRTY))
996 get(PIN_DIRTY);
997 if (state_test(STATE_EXPORTBOUND))
998 get(PIN_EXPORTBOUND);
999 if (state_test(STATE_IMPORTBOUND))
1000 get(PIN_IMPORTBOUND);
1001 if (is_subtree_root())
1002 get(PIN_SUBTREE);
1003 }
1004
1005 void CDir::split(int bits, std::vector<CDir*>* subs, MDSContext::vec& waiters, bool replay)
1006 {
1007 dout(10) << "split by " << bits << " bits on " << *this << dendl;
1008
1009 ceph_assert(replay || is_complete() || !is_auth());
1010
1011 frag_vec_t frags;
1012 frag.split(bits, frags);
1013
1014 vector<CDir*> subfrags(1 << bits);
1015
1016 double fac = 1.0 / (double)(1 << bits); // for scaling load vecs
1017
1018 version_t rstat_version = inode->get_projected_inode()->rstat.version;
1019 version_t dirstat_version = inode->get_projected_inode()->dirstat.version;
1020
1021 nest_info_t rstatdiff;
1022 frag_info_t fragstatdiff;
1023 if (fnode->accounted_rstat.version == rstat_version)
1024 rstatdiff.add_delta(fnode->accounted_rstat, fnode->rstat);
1025 if (fnode->accounted_fragstat.version == dirstat_version)
1026 fragstatdiff.add_delta(fnode->accounted_fragstat, fnode->fragstat);
1027 dout(10) << " rstatdiff " << rstatdiff << " fragstatdiff " << fragstatdiff << dendl;
1028
1029 map<string_snap_t, MDSContext::vec > dentry_waiters;
1030 prepare_old_fragment(dentry_waiters, replay);
1031
1032 // create subfrag dirs
1033 int n = 0;
1034 for (const auto& fg : frags) {
1035 CDir *f = new CDir(inode, fg, mdcache, is_auth());
1036 f->state_set(state & (MASK_STATE_FRAGMENT_KEPT | STATE_COMPLETE));
1037 f->get_replicas() = get_replicas();
1038 f->pop_me = pop_me;
1039 f->pop_me.scale(fac);
1040
1041 // FIXME; this is an approximation
1042 f->pop_nested = pop_nested;
1043 f->pop_nested.scale(fac);
1044 f->pop_auth_subtree = pop_auth_subtree;
1045 f->pop_auth_subtree.scale(fac);
1046 f->pop_auth_subtree_nested = pop_auth_subtree_nested;
1047 f->pop_auth_subtree_nested.scale(fac);
1048
1049 dout(10) << " subfrag " << fg << " " << *f << dendl;
1050 subfrags[n++] = f;
1051 subs->push_back(f);
1052
1053 f->set_dir_auth(get_dir_auth());
1054 f->freeze_tree_state = freeze_tree_state;
1055 f->prepare_new_fragment(replay);
1056 f->init_fragment_pins();
1057 }
1058
1059 // repartition dentries
1060 while (!items.empty()) {
1061 auto p = items.begin();
1062
1063 CDentry *dn = p->second;
1064 frag_t subfrag = inode->pick_dirfrag(dn->get_name());
1065 int n = (subfrag.value() & (subfrag.mask() ^ frag.mask())) >> subfrag.mask_shift();
1066 dout(15) << " subfrag " << subfrag << " n=" << n << " for " << p->first << dendl;
1067 CDir *f = subfrags[n];
1068 f->steal_dentry(dn);
1069 }
1070
1071 for (const auto &p : dentry_waiters) {
1072 frag_t subfrag = inode->pick_dirfrag(p.first.name);
1073 int n = (subfrag.value() & (subfrag.mask() ^ frag.mask())) >> subfrag.mask_shift();
1074 CDir *f = subfrags[n];
1075
1076 if (f->waiting_on_dentry.empty())
1077 f->get(PIN_DNWAITER);
1078 auto &e = f->waiting_on_dentry[p.first];
1079 for (const auto &waiter : p.second) {
1080 e.push_back(waiter);
1081 }
1082 }
1083
1084 // FIXME: handle dirty old rstat
1085
1086 // fix up new frag fragstats
1087 for (int i = 0; i < n; i++) {
1088 CDir *f = subfrags[i];
1089 auto _fnode = f->_get_fnode();
1090 _fnode->version = f->projected_version = get_version();
1091 _fnode->rstat.version = rstat_version;
1092 _fnode->accounted_rstat = _fnode->rstat;
1093 _fnode->fragstat.version = dirstat_version;
1094 _fnode->accounted_fragstat = _fnode->fragstat;
1095 dout(10) << " rstat " << _fnode->rstat << " fragstat " << _fnode->fragstat
1096 << " on " << *f << dendl;
1097
1098 if (i == 0) {
1099 // give any outstanding frag stat differential to first frag
1100 dout(10) << " giving rstatdiff " << rstatdiff << " fragstatdiff" << fragstatdiff
1101 << " to " << *subfrags[0] << dendl;
1102 _fnode->accounted_rstat.add(rstatdiff);
1103 _fnode->accounted_fragstat.add(fragstatdiff);
1104 }
1105 }
1106
1107 finish_old_fragment(waiters, replay);
1108 }
1109
1110 void CDir::merge(const std::vector<CDir*>& subs, MDSContext::vec& waiters, bool replay)
1111 {
1112 dout(10) << "merge " << subs << dendl;
1113
1114 ceph_assert(subs.size() > 0);
1115
1116 set_dir_auth(subs.front()->get_dir_auth());
1117 freeze_tree_state = subs.front()->freeze_tree_state;
1118
1119 for (const auto& dir : subs) {
1120 ceph_assert(get_dir_auth() == dir->get_dir_auth());
1121 ceph_assert(freeze_tree_state == dir->freeze_tree_state);
1122 }
1123
1124 prepare_new_fragment(replay);
1125
1126 auto _fnode = _get_fnode();
1127
1128 nest_info_t rstatdiff;
1129 frag_info_t fragstatdiff;
1130 bool touched_mtime, touched_chattr;
1131 version_t rstat_version = inode->get_projected_inode()->rstat.version;
1132 version_t dirstat_version = inode->get_projected_inode()->dirstat.version;
1133
1134 map<string_snap_t, MDSContext::vec > dentry_waiters;
1135
1136 for (const auto& dir : subs) {
1137 dout(10) << " subfrag " << dir->get_frag() << " " << *dir << dendl;
1138 ceph_assert(!dir->is_auth() || dir->is_complete() || replay);
1139
1140 if (dir->get_fnode()->accounted_rstat.version == rstat_version)
1141 rstatdiff.add_delta(dir->get_fnode()->accounted_rstat, dir->get_fnode()->rstat);
1142 if (dir->get_fnode()->accounted_fragstat.version == dirstat_version)
1143 fragstatdiff.add_delta(dir->get_fnode()->accounted_fragstat, dir->get_fnode()->fragstat,
1144 &touched_mtime, &touched_chattr);
1145
1146 dir->prepare_old_fragment(dentry_waiters, replay);
1147
1148 // steal dentries
1149 while (!dir->items.empty())
1150 steal_dentry(dir->items.begin()->second);
1151
1152 // merge replica map
1153 for (const auto &p : dir->get_replicas()) {
1154 unsigned cur = get_replicas()[p.first];
1155 if (p.second > cur)
1156 get_replicas()[p.first] = p.second;
1157 }
1158
1159 // merge version
1160 if (dir->get_version() > _fnode->version)
1161 _fnode->version = projected_version = dir->get_version();
1162
1163 // merge state
1164 state_set(dir->get_state() & MASK_STATE_FRAGMENT_KEPT);
1165
1166 dir->finish_old_fragment(waiters, replay);
1167 inode->close_dirfrag(dir->get_frag());
1168 }
1169
1170 if (!dentry_waiters.empty()) {
1171 get(PIN_DNWAITER);
1172 for (const auto &p : dentry_waiters) {
1173 auto &e = waiting_on_dentry[p.first];
1174 for (const auto &waiter : p.second) {
1175 e.push_back(waiter);
1176 }
1177 }
1178 }
1179
1180 if (is_auth() && !replay)
1181 mark_complete();
1182
1183 // FIXME: merge dirty old rstat
1184 _fnode->rstat.version = rstat_version;
1185 _fnode->accounted_rstat = _fnode->rstat;
1186 _fnode->accounted_rstat.add(rstatdiff);
1187
1188 _fnode->fragstat.version = dirstat_version;
1189 _fnode->accounted_fragstat = _fnode->fragstat;
1190 _fnode->accounted_fragstat.add(fragstatdiff);
1191
1192 init_fragment_pins();
1193 }
1194
1195
1196
1197
1198 void CDir::resync_accounted_fragstat()
1199 {
1200 auto pf = _get_projected_fnode();
1201 const auto& pi = inode->get_projected_inode();
1202
1203 if (pf->accounted_fragstat.version != pi->dirstat.version) {
1204 pf->fragstat.version = pi->dirstat.version;
1205 dout(10) << __func__ << " " << pf->accounted_fragstat << " -> " << pf->fragstat << dendl;
1206 pf->accounted_fragstat = pf->fragstat;
1207 }
1208 }
1209
1210 /*
1211 * resync rstat and accounted_rstat with inode
1212 */
1213 void CDir::resync_accounted_rstat()
1214 {
1215 auto pf = _get_projected_fnode();
1216 const auto& pi = inode->get_projected_inode();
1217
1218 if (pf->accounted_rstat.version != pi->rstat.version) {
1219 pf->rstat.version = pi->rstat.version;
1220 dout(10) << __func__ << " " << pf->accounted_rstat << " -> " << pf->rstat << dendl;
1221 pf->accounted_rstat = pf->rstat;
1222 dirty_old_rstat.clear();
1223 }
1224 }
1225
1226 void CDir::assimilate_dirty_rstat_inodes(MutationRef& mut)
1227 {
1228 dout(10) << __func__ << dendl;
1229 for (elist<CInode*>::iterator p = dirty_rstat_inodes.begin_use_current();
1230 !p.end(); ++p) {
1231 CInode *in = *p;
1232 ceph_assert(in->is_auth());
1233 if (in->is_frozen())
1234 continue;
1235
1236 mut->auth_pin(in);
1237
1238 auto pi = in->project_inode(mut);
1239 pi.inode->version = in->pre_dirty();
1240
1241 mdcache->project_rstat_inode_to_frag(mut, in, this, 0, 0, nullptr);
1242 }
1243 state_set(STATE_ASSIMRSTAT);
1244 dout(10) << __func__ << " done" << dendl;
1245 }
1246
1247 void CDir::assimilate_dirty_rstat_inodes_finish(EMetaBlob *blob)
1248 {
1249 if (!state_test(STATE_ASSIMRSTAT))
1250 return;
1251 state_clear(STATE_ASSIMRSTAT);
1252 dout(10) << __func__ << dendl;
1253 elist<CInode*>::iterator p = dirty_rstat_inodes.begin_use_current();
1254 while (!p.end()) {
1255 CInode *in = *p;
1256 ++p;
1257
1258 if (in->is_frozen())
1259 continue;
1260
1261 CDentry *dn = in->get_projected_parent_dn();
1262
1263 in->clear_dirty_rstat();
1264 blob->add_primary_dentry(dn, in, true);
1265 }
1266
1267 if (!dirty_rstat_inodes.empty())
1268 mdcache->mds->locker->mark_updated_scatterlock(&inode->nestlock);
1269 }
1270
1271
1272
1273
1274 /****************************************
1275 * WAITING
1276 */
1277
1278 void CDir::add_dentry_waiter(std::string_view dname, snapid_t snapid, MDSContext *c)
1279 {
1280 if (waiting_on_dentry.empty())
1281 get(PIN_DNWAITER);
1282 waiting_on_dentry[string_snap_t(dname, snapid)].push_back(c);
1283 dout(10) << __func__ << " dentry " << dname
1284 << " snap " << snapid
1285 << " " << c << " on " << *this << dendl;
1286 }
1287
1288 void CDir::take_dentry_waiting(std::string_view dname, snapid_t first, snapid_t last,
1289 MDSContext::vec& ls)
1290 {
1291 if (waiting_on_dentry.empty())
1292 return;
1293
1294 string_snap_t lb(dname, first);
1295 string_snap_t ub(dname, last);
1296 auto it = waiting_on_dentry.lower_bound(lb);
1297 while (it != waiting_on_dentry.end() &&
1298 !(ub < it->first)) {
1299 dout(10) << __func__ << " " << dname
1300 << " [" << first << "," << last << "] found waiter on snap "
1301 << it->first.snapid
1302 << " on " << *this << dendl;
1303 for (const auto &waiter : it->second) {
1304 ls.push_back(waiter);
1305 }
1306 waiting_on_dentry.erase(it++);
1307 }
1308
1309 if (waiting_on_dentry.empty())
1310 put(PIN_DNWAITER);
1311 }
1312
1313 void CDir::take_sub_waiting(MDSContext::vec& ls)
1314 {
1315 dout(10) << __func__ << dendl;
1316 if (!waiting_on_dentry.empty()) {
1317 for (const auto &p : waiting_on_dentry) {
1318 for (const auto &waiter : p.second) {
1319 ls.push_back(waiter);
1320 }
1321 }
1322 waiting_on_dentry.clear();
1323 put(PIN_DNWAITER);
1324 }
1325 }
1326
1327
1328
1329 void CDir::add_waiter(uint64_t tag, MDSContext *c)
1330 {
1331 // hierarchical?
1332
1333 // at subtree root?
1334 if (tag & WAIT_ATSUBTREEROOT) {
1335 if (!is_subtree_root()) {
1336 // try parent
1337 dout(10) << "add_waiter " << std::hex << tag << std::dec << " " << c << " should be ATSUBTREEROOT, " << *this << " is not root, trying parent" << dendl;
1338 inode->parent->dir->add_waiter(tag, c);
1339 return;
1340 }
1341 }
1342
1343 ceph_assert(!(tag & WAIT_CREATED) || state_test(STATE_CREATING));
1344
1345 MDSCacheObject::add_waiter(tag, c);
1346 }
1347
1348
1349
1350 /* NOTE: this checks dentry waiters too */
1351 void CDir::take_waiting(uint64_t mask, MDSContext::vec& ls)
1352 {
1353 if ((mask & WAIT_DENTRY) && !waiting_on_dentry.empty()) {
1354 // take all dentry waiters
1355 for (const auto &p : waiting_on_dentry) {
1356 dout(10) << "take_waiting dentry " << p.first.name
1357 << " snap " << p.first.snapid << " on " << *this << dendl;
1358 for (const auto &waiter : p.second) {
1359 ls.push_back(waiter);
1360 }
1361 }
1362 waiting_on_dentry.clear();
1363 put(PIN_DNWAITER);
1364 }
1365
1366 // waiting
1367 MDSCacheObject::take_waiting(mask, ls);
1368 }
1369
1370
1371 void CDir::finish_waiting(uint64_t mask, int result)
1372 {
1373 dout(11) << __func__ << " mask " << hex << mask << dec << " result " << result << " on " << *this << dendl;
1374
1375 MDSContext::vec finished;
1376 take_waiting(mask, finished);
1377 if (result < 0)
1378 finish_contexts(g_ceph_context, finished, result);
1379 else
1380 mdcache->mds->queue_waiters(finished);
1381 }
1382
1383
1384
1385 // dirty/clean
1386
1387 CDir::fnode_ptr CDir::project_fnode(const MutationRef& mut)
1388 {
1389 ceph_assert(get_version() != 0);
1390
1391 if (mut && mut->is_projected(this))
1392 return std::const_pointer_cast<fnode_t>(projected_fnode.back());
1393
1394 auto pf = allocate_fnode(*get_projected_fnode());
1395
1396 if (scrub_infop && scrub_infop->last_scrub_dirty) {
1397 pf->localized_scrub_stamp = scrub_infop->last_local.time;
1398 pf->localized_scrub_version = scrub_infop->last_local.version;
1399 pf->recursive_scrub_stamp = scrub_infop->last_recursive.time;
1400 pf->recursive_scrub_version = scrub_infop->last_recursive.version;
1401 scrub_infop->last_scrub_dirty = false;
1402 scrub_maybe_delete_info();
1403 }
1404
1405 projected_fnode.emplace_back(pf);
1406 if (mut)
1407 mut->add_projected_node(this);
1408 dout(10) << __func__ << " " << pf.get() << dendl;
1409 return pf;
1410 }
1411
1412 void CDir::pop_and_dirty_projected_fnode(LogSegment *ls, const MutationRef& mut)
1413 {
1414 ceph_assert(!projected_fnode.empty());
1415 auto pf = std::move(projected_fnode.front());
1416 dout(15) << __func__ << " " << pf.get() << " v" << pf->version << dendl;
1417
1418 projected_fnode.pop_front();
1419 if (mut)
1420 mut->remove_projected_node(this);
1421
1422 reset_fnode(std::move(pf));
1423 _mark_dirty(ls);
1424 }
1425
1426 version_t CDir::pre_dirty(version_t min)
1427 {
1428 if (min > projected_version)
1429 projected_version = min;
1430 ++projected_version;
1431 dout(10) << __func__ << " " << projected_version << dendl;
1432 return projected_version;
1433 }
1434
1435 void CDir::mark_dirty(LogSegment *ls, version_t pv)
1436 {
1437 ceph_assert(is_auth());
1438
1439 if (pv) {
1440 ceph_assert(get_version() < pv);
1441 ceph_assert(pv <= projected_version);
1442 ceph_assert(!projected_fnode.empty() &&
1443 pv <= projected_fnode.front()->version);
1444 }
1445
1446 _mark_dirty(ls);
1447 }
1448
1449 void CDir::_mark_dirty(LogSegment *ls)
1450 {
1451 if (!state_test(STATE_DIRTY)) {
1452 dout(10) << __func__ << " (was clean) " << *this << " version " << get_version() << dendl;
1453 _set_dirty_flag();
1454 ceph_assert(ls);
1455 } else {
1456 dout(10) << __func__ << " (already dirty) " << *this << " version " << get_version() << dendl;
1457 }
1458 if (ls) {
1459 ls->dirty_dirfrags.push_back(&item_dirty);
1460
1461 // if i've never committed, i need to be before _any_ mention of me is trimmed from the journal.
1462 if (committed_version == 0 && !item_new.is_on_list())
1463 ls->new_dirfrags.push_back(&item_new);
1464 }
1465 }
1466
1467 void CDir::mark_new(LogSegment *ls)
1468 {
1469 ls->new_dirfrags.push_back(&item_new);
1470 state_clear(STATE_CREATING);
1471
1472 MDSContext::vec waiters;
1473 take_waiting(CDir::WAIT_CREATED, waiters);
1474 mdcache->mds->queue_waiters(waiters);
1475 }
1476
1477 void CDir::mark_clean()
1478 {
1479 dout(10) << __func__ << " " << *this << " version " << get_version() << dendl;
1480 if (state_test(STATE_DIRTY)) {
1481 item_dirty.remove_myself();
1482 item_new.remove_myself();
1483
1484 state_clear(STATE_DIRTY);
1485 put(PIN_DIRTY);
1486 }
1487 }
1488
1489 // caller should hold auth pin of this
1490 void CDir::log_mark_dirty()
1491 {
1492 if (is_dirty() || projected_version > get_version())
1493 return; // noop if it is already dirty or will be dirty
1494
1495 auto _fnode = allocate_fnode(*get_fnode());
1496 _fnode->version = pre_dirty();
1497 reset_fnode(std::move(_fnode));
1498 mark_dirty(mdcache->mds->mdlog->get_current_segment());
1499 }
1500
1501 void CDir::mark_complete() {
1502 state_set(STATE_COMPLETE);
1503 bloom.reset();
1504 }
1505
1506 void CDir::first_get()
1507 {
1508 inode->get(CInode::PIN_DIRFRAG);
1509 }
1510
1511 void CDir::last_put()
1512 {
1513 inode->put(CInode::PIN_DIRFRAG);
1514 }
1515
1516
1517
1518 /******************************************************************************
1519 * FETCH and COMMIT
1520 */
1521
1522 // -----------------------
1523 // FETCH
1524 void CDir::fetch(MDSContext *c, bool ignore_authpinnability)
1525 {
1526 string want;
1527 return fetch(c, want, ignore_authpinnability);
1528 }
1529
1530 void CDir::fetch(MDSContext *c, std::string_view want_dn, bool ignore_authpinnability)
1531 {
1532 dout(10) << "fetch on " << *this << dendl;
1533
1534 ceph_assert(is_auth());
1535 ceph_assert(!is_complete());
1536
1537 if (!can_auth_pin() && !ignore_authpinnability) {
1538 if (c) {
1539 dout(7) << "fetch waiting for authpinnable" << dendl;
1540 add_waiter(WAIT_UNFREEZE, c);
1541 } else
1542 dout(7) << "fetch not authpinnable and no context" << dendl;
1543 return;
1544 }
1545
1546 // unlinked directory inode shouldn't have any entry
1547 if (!inode->is_base() && get_parent_dir()->inode->is_stray() &&
1548 !inode->snaprealm) {
1549 dout(7) << "fetch dirfrag for unlinked directory, mark complete" << dendl;
1550 if (get_version() == 0) {
1551 ceph_assert(inode->is_auth());
1552 auto _fnode = allocate_fnode();
1553 _fnode->version = 1;
1554 reset_fnode(std::move(_fnode));
1555
1556 if (state_test(STATE_REJOINUNDEF)) {
1557 ceph_assert(mdcache->mds->is_rejoin());
1558 state_clear(STATE_REJOINUNDEF);
1559 mdcache->opened_undef_dirfrag(this);
1560 }
1561 }
1562 mark_complete();
1563
1564 if (c)
1565 mdcache->mds->queue_waiter(c);
1566 return;
1567 }
1568
1569 if (c) add_waiter(WAIT_COMPLETE, c);
1570 if (!want_dn.empty()) wanted_items.insert(mempool::mds_co::string(want_dn));
1571
1572 // already fetching?
1573 if (state_test(CDir::STATE_FETCHING)) {
1574 dout(7) << "already fetching; waiting" << dendl;
1575 return;
1576 }
1577
1578 auth_pin(this);
1579 state_set(CDir::STATE_FETCHING);
1580
1581 if (mdcache->mds->logger) mdcache->mds->logger->inc(l_mds_dir_fetch);
1582
1583 mdcache->mds->balancer->hit_dir(this, META_POP_FETCH);
1584
1585 std::set<dentry_key_t> empty;
1586 _omap_fetch(NULL, empty);
1587 }
1588
1589 void CDir::fetch(MDSContext *c, const std::set<dentry_key_t>& keys)
1590 {
1591 dout(10) << "fetch " << keys.size() << " keys on " << *this << dendl;
1592
1593 ceph_assert(is_auth());
1594 ceph_assert(!is_complete());
1595
1596 if (!can_auth_pin()) {
1597 dout(7) << "fetch keys waiting for authpinnable" << dendl;
1598 add_waiter(WAIT_UNFREEZE, c);
1599 return;
1600 }
1601 if (state_test(CDir::STATE_FETCHING)) {
1602 dout(7) << "fetch keys waiting for full fetch" << dendl;
1603 add_waiter(WAIT_COMPLETE, c);
1604 return;
1605 }
1606
1607 auth_pin(this);
1608 if (mdcache->mds->logger) mdcache->mds->logger->inc(l_mds_dir_fetch);
1609
1610 mdcache->mds->balancer->hit_dir(this, META_POP_FETCH);
1611
1612 _omap_fetch(c, keys);
1613 }
1614
1615 class C_IO_Dir_OMAP_FetchedMore : public CDirIOContext {
1616 MDSContext *fin;
1617 public:
1618 const version_t omap_version;
1619 bufferlist hdrbl;
1620 bool more = false;
1621 map<string, bufferlist> omap; ///< carry-over from before
1622 map<string, bufferlist> omap_more; ///< new batch
1623 int ret;
1624 C_IO_Dir_OMAP_FetchedMore(CDir *d, version_t v, MDSContext *f) :
1625 CDirIOContext(d), fin(f), omap_version(v), ret(0) { }
1626 void finish(int r) {
1627 if (omap_version < dir->get_committed_version()) {
1628 omap.clear();
1629 dir->_omap_fetch(fin, {});
1630 return;
1631 }
1632
1633 // merge results
1634 if (omap.empty()) {
1635 omap.swap(omap_more);
1636 } else {
1637 omap.insert(omap_more.begin(), omap_more.end());
1638 }
1639 if (more) {
1640 dir->_omap_fetch_more(omap_version, hdrbl, omap, fin);
1641 } else {
1642 dir->_omap_fetched(hdrbl, omap, !fin, r);
1643 if (fin)
1644 fin->complete(r);
1645 }
1646 }
1647 void print(ostream& out) const override {
1648 out << "dirfrag_fetch_more(" << dir->dirfrag() << ")";
1649 }
1650 };
1651
1652 class C_IO_Dir_OMAP_Fetched : public CDirIOContext {
1653 MDSContext *fin;
1654 public:
1655 const version_t omap_version;
1656 bufferlist hdrbl;
1657 bool more = false;
1658 map<string, bufferlist> omap;
1659 bufferlist btbl;
1660 int ret1, ret2, ret3;
1661
1662 C_IO_Dir_OMAP_Fetched(CDir *d, MDSContext *f) :
1663 CDirIOContext(d), fin(f),
1664 omap_version(d->get_committing_version()),
1665 ret1(0), ret2(0), ret3(0) { }
1666 void finish(int r) override {
1667 // check the correctness of backtrace
1668 if (r >= 0 && ret3 != -CEPHFS_ECANCELED)
1669 dir->inode->verify_diri_backtrace(btbl, ret3);
1670 if (r >= 0) r = ret1;
1671 if (r >= 0) r = ret2;
1672
1673 if (more) {
1674 if (omap_version < dir->get_committed_version()) {
1675 omap.clear();
1676 dir->_omap_fetch(fin, {});
1677 } else {
1678 dir->_omap_fetch_more(omap_version, hdrbl, omap, fin);
1679 }
1680 return;
1681 }
1682
1683 dir->_omap_fetched(hdrbl, omap, !fin, r);
1684 if (fin)
1685 fin->complete(r);
1686
1687 }
1688 void print(ostream& out) const override {
1689 out << "dirfrag_fetch(" << dir->dirfrag() << ")";
1690 }
1691 };
1692
1693 void CDir::_omap_fetch(MDSContext *c, const std::set<dentry_key_t>& keys)
1694 {
1695 C_IO_Dir_OMAP_Fetched *fin = new C_IO_Dir_OMAP_Fetched(this, c);
1696 object_t oid = get_ondisk_object();
1697 object_locator_t oloc(mdcache->mds->mdsmap->get_metadata_pool());
1698 ObjectOperation rd;
1699 rd.omap_get_header(&fin->hdrbl, &fin->ret1);
1700 if (keys.empty()) {
1701 ceph_assert(!c);
1702 rd.omap_get_vals("", "", g_conf()->mds_dir_keys_per_op,
1703 &fin->omap, &fin->more, &fin->ret2);
1704 } else {
1705 ceph_assert(c);
1706 std::set<std::string> str_keys;
1707 for (auto p : keys) {
1708 string str;
1709 p.encode(str);
1710 str_keys.insert(str);
1711 }
1712 rd.omap_get_vals_by_keys(str_keys, &fin->omap, &fin->ret2);
1713 }
1714 // check the correctness of backtrace
1715 if (g_conf()->mds_verify_backtrace > 0 && frag == frag_t()) {
1716 rd.getxattr("parent", &fin->btbl, &fin->ret3);
1717 rd.set_last_op_flags(CEPH_OSD_OP_FLAG_FAILOK);
1718 } else {
1719 fin->ret3 = -CEPHFS_ECANCELED;
1720 }
1721
1722 mdcache->mds->objecter->read(oid, oloc, rd, CEPH_NOSNAP, NULL, 0,
1723 new C_OnFinisher(fin, mdcache->mds->finisher));
1724 }
1725
1726 void CDir::_omap_fetch_more(version_t omap_version, bufferlist& hdrbl,
1727 map<string, bufferlist>& omap, MDSContext *c)
1728 {
1729 // we have more omap keys to fetch!
1730 object_t oid = get_ondisk_object();
1731 object_locator_t oloc(mdcache->mds->mdsmap->get_metadata_pool());
1732 auto fin = new C_IO_Dir_OMAP_FetchedMore(this, omap_version, c);
1733 fin->hdrbl = std::move(hdrbl);
1734 fin->omap.swap(omap);
1735 ObjectOperation rd;
1736 rd.omap_get_vals(fin->omap.rbegin()->first,
1737 "", /* filter prefix */
1738 g_conf()->mds_dir_keys_per_op,
1739 &fin->omap_more,
1740 &fin->more,
1741 &fin->ret);
1742 mdcache->mds->objecter->read(oid, oloc, rd, CEPH_NOSNAP, NULL, 0,
1743 new C_OnFinisher(fin, mdcache->mds->finisher));
1744 }
1745
1746 CDentry *CDir::_load_dentry(
1747 std::string_view key,
1748 std::string_view dname,
1749 const snapid_t last,
1750 bufferlist &bl,
1751 const int pos,
1752 const std::set<snapid_t> *snaps,
1753 double rand_threshold,
1754 bool *force_dirty)
1755 {
1756 auto q = bl.cbegin();
1757
1758 snapid_t first;
1759 decode(first, q);
1760
1761 // marker
1762 char type;
1763 decode(type, q);
1764
1765 dout(20) << "_fetched pos " << pos << " marker '" << type << "' dname '" << dname
1766 << " [" << first << "," << last << "]"
1767 << dendl;
1768
1769 bool stale = false;
1770 if (snaps && last != CEPH_NOSNAP) {
1771 set<snapid_t>::const_iterator p = snaps->lower_bound(first);
1772 if (p == snaps->end() || *p > last) {
1773 dout(10) << " skipping stale dentry on [" << first << "," << last << "]" << dendl;
1774 stale = true;
1775 }
1776 }
1777
1778 /*
1779 * look for existing dentry for _last_ snap, because unlink +
1780 * create may leave a "hole" (epochs during which the dentry
1781 * doesn't exist) but for which no explicit negative dentry is in
1782 * the cache.
1783 */
1784 CDentry *dn;
1785 if (stale)
1786 dn = lookup_exact_snap(dname, last);
1787 else
1788 dn = lookup(dname, last);
1789
1790 if (type == 'L' || type == 'l') {
1791 // hard link
1792 inodeno_t ino;
1793 unsigned char d_type;
1794 mempool::mds_co::string alternate_name;
1795
1796 CDentry::decode_remote(type, ino, d_type, alternate_name, q);
1797
1798 if (stale) {
1799 if (!dn) {
1800 stale_items.insert(mempool::mds_co::string(key));
1801 *force_dirty = true;
1802 }
1803 return dn;
1804 }
1805
1806 if (dn) {
1807 CDentry::linkage_t *dnl = dn->get_linkage();
1808 dout(12) << "_fetched had " << (dnl->is_null() ? "NEG" : "") << " dentry " << *dn << dendl;
1809 if (committed_version == 0 &&
1810 dnl->is_remote() &&
1811 dn->is_dirty() &&
1812 ino == dnl->get_remote_ino() &&
1813 d_type == dnl->get_remote_d_type() &&
1814 alternate_name == dn->get_alternate_name()) {
1815 // see comment below
1816 dout(10) << "_fetched had underwater dentry " << *dn << ", marking clean" << dendl;
1817 dn->mark_clean();
1818 }
1819 } else {
1820 // (remote) link
1821 dn = add_remote_dentry(dname, ino, d_type, std::move(alternate_name), first, last);
1822
1823 // link to inode?
1824 CInode *in = mdcache->get_inode(ino); // we may or may not have it.
1825 if (in) {
1826 dn->link_remote(dn->get_linkage(), in);
1827 dout(12) << "_fetched got remote link " << ino << " which we have " << *in << dendl;
1828 } else {
1829 dout(12) << "_fetched got remote link " << ino << " (don't have it)" << dendl;
1830 }
1831 }
1832 }
1833 else if (type == 'I' || type == 'i') {
1834 InodeStore inode_data;
1835 mempool::mds_co::string alternate_name;
1836 // inode
1837 // Load inode data before looking up or constructing CInode
1838 if (type == 'i') {
1839 DECODE_START(2, q);
1840 if (struct_v >= 2) {
1841 decode(alternate_name, q);
1842 }
1843 inode_data.decode(q);
1844 DECODE_FINISH(q);
1845 } else {
1846 inode_data.decode_bare(q);
1847 }
1848
1849 if (stale) {
1850 if (!dn) {
1851 stale_items.insert(mempool::mds_co::string(key));
1852 *force_dirty = true;
1853 }
1854 return dn;
1855 }
1856
1857 bool undef_inode = false;
1858 if (dn) {
1859 CDentry::linkage_t *dnl = dn->get_linkage();
1860 dout(12) << "_fetched had " << (dnl->is_null() ? "NEG" : "") << " dentry " << *dn << dendl;
1861
1862 if (dnl->is_primary()) {
1863 CInode *in = dnl->get_inode();
1864 if (in->state_test(CInode::STATE_REJOINUNDEF)) {
1865 undef_inode = true;
1866 } else if (committed_version == 0 &&
1867 dn->is_dirty() &&
1868 inode_data.inode->ino == in->ino() &&
1869 inode_data.inode->version == in->get_version()) {
1870 /* clean underwater item?
1871 * Underwater item is something that is dirty in our cache from
1872 * journal replay, but was previously flushed to disk before the
1873 * mds failed.
1874 *
1875 * We only do this is committed_version == 0. that implies either
1876 * - this is a fetch after from a clean/empty CDir is created
1877 * (and has no effect, since the dn won't exist); or
1878 * - this is a fetch after _recovery_, which is what we're worried
1879 * about. Items that are marked dirty from the journal should be
1880 * marked clean if they appear on disk.
1881 */
1882 dout(10) << "_fetched had underwater dentry " << *dn << ", marking clean" << dendl;
1883 dn->mark_clean();
1884 dout(10) << "_fetched had underwater inode " << *dnl->get_inode() << ", marking clean" << dendl;
1885 in->mark_clean();
1886 }
1887 }
1888 }
1889
1890 if (!dn || undef_inode) {
1891 // add inode
1892 CInode *in = mdcache->get_inode(inode_data.inode->ino, last);
1893 if (!in || undef_inode) {
1894 if (undef_inode && in)
1895 in->first = first;
1896 else
1897 in = new CInode(mdcache, true, first, last);
1898
1899 in->reset_inode(std::move(inode_data.inode));
1900 in->reset_xattrs(std::move(inode_data.xattrs));
1901 // symlink?
1902 if (in->is_symlink())
1903 in->symlink = inode_data.symlink;
1904
1905 in->dirfragtree.swap(inode_data.dirfragtree);
1906 in->reset_old_inodes(std::move(inode_data.old_inodes));
1907 if (in->is_any_old_inodes()) {
1908 snapid_t min_first = in->get_old_inodes()->rbegin()->first + 1;
1909 if (min_first > in->first)
1910 in->first = min_first;
1911 }
1912
1913 in->oldest_snap = inode_data.oldest_snap;
1914 in->decode_snap_blob(inode_data.snap_blob);
1915 if (snaps && !in->snaprealm)
1916 in->purge_stale_snap_data(*snaps);
1917
1918 if (!undef_inode) {
1919 mdcache->add_inode(in); // add
1920 dn = add_primary_dentry(dname, in, std::move(alternate_name), first, last); // link
1921 }
1922 dout(12) << "_fetched got " << *dn << " " << *in << dendl;
1923
1924 if (in->get_inode()->is_dirty_rstat())
1925 in->mark_dirty_rstat();
1926
1927 in->maybe_ephemeral_rand(rand_threshold);
1928 //in->hack_accessed = false;
1929 //in->hack_load_stamp = ceph_clock_now();
1930 //num_new_inodes_loaded++;
1931 } else if (g_conf().get_val<bool>("mds_hack_allow_loading_invalid_metadata")) {
1932 dout(20) << "hack: adding duplicate dentry for " << *in << dendl;
1933 dn = add_primary_dentry(dname, in, std::move(alternate_name), first, last);
1934 } else {
1935 dout(0) << "_fetched badness: got (but i already had) " << *in
1936 << " mode " << in->get_inode()->mode
1937 << " mtime " << in->get_inode()->mtime << dendl;
1938 string dirpath, inopath;
1939 this->inode->make_path_string(dirpath);
1940 in->make_path_string(inopath);
1941 mdcache->mds->clog->error() << "loaded dup inode " << inode_data.inode->ino
1942 << " [" << first << "," << last << "] v" << inode_data.inode->version
1943 << " at " << dirpath << "/" << dname
1944 << ", but inode " << in->vino() << " v" << in->get_version()
1945 << " already exists at " << inopath;
1946 return dn;
1947 }
1948 }
1949 } else {
1950 CachedStackStringStream css;
1951 *css << "Invalid tag char '" << type << "' pos " << pos;
1952 throw buffer::malformed_input(css->str());
1953 }
1954
1955 return dn;
1956 }
1957
1958 void CDir::_omap_fetched(bufferlist& hdrbl, map<string, bufferlist>& omap,
1959 bool complete, int r)
1960 {
1961 LogChannelRef clog = mdcache->mds->clog;
1962 dout(10) << "_fetched header " << hdrbl.length() << " bytes "
1963 << omap.size() << " keys for " << *this << dendl;
1964
1965 ceph_assert(r == 0 || r == -CEPHFS_ENOENT || r == -CEPHFS_ENODATA);
1966 ceph_assert(is_auth());
1967 ceph_assert(!is_frozen());
1968
1969 if (hdrbl.length() == 0) {
1970 dout(0) << "_fetched missing object for " << *this << dendl;
1971
1972 clog->error() << "dir " << dirfrag() << " object missing on disk; some "
1973 "files may be lost (" << get_path() << ")";
1974
1975 go_bad(complete);
1976 return;
1977 }
1978
1979 fnode_t got_fnode;
1980 {
1981 auto p = hdrbl.cbegin();
1982 try {
1983 decode(got_fnode, p);
1984 } catch (const buffer::error &err) {
1985 derr << "Corrupt fnode in dirfrag " << dirfrag()
1986 << ": " << err.what() << dendl;
1987 clog->warn() << "Corrupt fnode header in " << dirfrag() << ": "
1988 << err.what() << " (" << get_path() << ")";
1989 go_bad(complete);
1990 return;
1991 }
1992 if (!p.end()) {
1993 clog->warn() << "header buffer of dir " << dirfrag() << " has "
1994 << hdrbl.length() - p.get_off() << " extra bytes ("
1995 << get_path() << ")";
1996 go_bad(complete);
1997 return;
1998 }
1999 }
2000
2001 dout(10) << "_fetched version " << got_fnode.version << dendl;
2002
2003 // take the loaded fnode?
2004 // only if we are a fresh CDir* with no prior state.
2005 if (get_version() == 0) {
2006 ceph_assert(!is_projected());
2007 ceph_assert(!state_test(STATE_COMMITTING));
2008 auto _fnode = allocate_fnode(got_fnode);
2009 reset_fnode(std::move(_fnode));
2010 projected_version = committing_version = committed_version = get_version();
2011
2012 if (state_test(STATE_REJOINUNDEF)) {
2013 ceph_assert(mdcache->mds->is_rejoin());
2014 state_clear(STATE_REJOINUNDEF);
2015 mdcache->opened_undef_dirfrag(this);
2016 }
2017 }
2018
2019 list<CInode*> undef_inodes;
2020
2021 // purge stale snaps?
2022 bool force_dirty = false;
2023 const set<snapid_t> *snaps = NULL;
2024 SnapRealm *realm = inode->find_snaprealm();
2025 if (fnode->snap_purged_thru < realm->get_last_destroyed()) {
2026 snaps = &realm->get_snaps();
2027 dout(10) << " snap_purged_thru " << fnode->snap_purged_thru
2028 << " < " << realm->get_last_destroyed()
2029 << ", snap purge based on " << *snaps << dendl;
2030 if (get_num_snap_items() == 0) {
2031 const_cast<snapid_t&>(fnode->snap_purged_thru) = realm->get_last_destroyed();
2032 force_dirty = true;
2033 }
2034 }
2035
2036 int count = 0;
2037 unsigned pos = omap.size() - 1;
2038 double rand_threshold = get_inode()->get_ephemeral_rand();
2039 for (map<string, bufferlist>::reverse_iterator p = omap.rbegin();
2040 p != omap.rend();
2041 ++p, --pos) {
2042 string dname;
2043 snapid_t last;
2044 dentry_key_t::decode_helper(p->first, dname, last);
2045
2046 if (!(++count % mdcache->mds->heartbeat_reset_grace(2)))
2047 mdcache->mds->heartbeat_reset();
2048
2049 CDentry *dn = NULL;
2050 try {
2051 dn = _load_dentry(
2052 p->first, dname, last, p->second, pos, snaps,
2053 rand_threshold, &force_dirty);
2054 } catch (const buffer::error &err) {
2055 mdcache->mds->clog->warn() << "Corrupt dentry '" << dname << "' in "
2056 "dir frag " << dirfrag() << ": "
2057 << err.what() << "(" << get_path() << ")";
2058
2059 // Remember that this dentry is damaged. Subsequent operations
2060 // that try to act directly on it will get their CEPHFS_EIOs, but this
2061 // dirfrag as a whole will continue to look okay (minus the
2062 // mysteriously-missing dentry)
2063 go_bad_dentry(last, dname);
2064
2065 // Anyone who was WAIT_DENTRY for this guy will get kicked
2066 // to RetryRequest, and hit the DamageTable-interrogating path.
2067 // Stats will now be bogus because we will think we're complete,
2068 // but have 1 or more missing dentries.
2069 continue;
2070 }
2071
2072 if (!dn)
2073 continue;
2074
2075 CDentry::linkage_t *dnl = dn->get_linkage();
2076 if (dnl->is_primary() && dnl->get_inode()->state_test(CInode::STATE_REJOINUNDEF))
2077 undef_inodes.push_back(dnl->get_inode());
2078
2079 if (wanted_items.count(mempool::mds_co::string(dname)) > 0 || !complete) {
2080 dout(10) << " touching wanted dn " << *dn << dendl;
2081 mdcache->touch_dentry(dn);
2082
2083 if (!(++count % mdcache->mds->heartbeat_reset_grace(2)))
2084 mdcache->mds->heartbeat_reset();
2085 }
2086 }
2087
2088 //cache->mds->logger->inc("newin", num_new_inodes_loaded);
2089
2090 // mark complete, !fetching
2091 if (complete) {
2092 wanted_items.clear();
2093 mark_complete();
2094 state_clear(STATE_FETCHING);
2095 }
2096
2097 // open & force frags
2098 while (!undef_inodes.empty()) {
2099 CInode *in = undef_inodes.front();
2100
2101 undef_inodes.pop_front();
2102 in->state_clear(CInode::STATE_REJOINUNDEF);
2103 mdcache->opened_undef_inode(in);
2104
2105 if (!(++count % mdcache->mds->heartbeat_reset_grace()))
2106 mdcache->mds->heartbeat_reset();
2107 }
2108
2109 // dirty myself to remove stale snap dentries
2110 if (force_dirty && !mdcache->is_readonly())
2111 log_mark_dirty();
2112
2113 auth_unpin(this);
2114
2115 if (complete) {
2116 // kick waiters
2117 finish_waiting(WAIT_COMPLETE, 0);
2118 }
2119 }
2120
2121 void CDir::go_bad_dentry(snapid_t last, std::string_view dname)
2122 {
2123 dout(10) << __func__ << " " << dname << dendl;
2124 std::string path(get_path());
2125 path += "/";
2126 path += dname;
2127 const bool fatal = mdcache->mds->damage_table.notify_dentry(
2128 inode->ino(), frag, last, dname, path);
2129 if (fatal) {
2130 mdcache->mds->damaged();
2131 ceph_abort(); // unreachable, damaged() respawns us
2132 }
2133 }
2134
2135 void CDir::go_bad(bool complete)
2136 {
2137 dout(10) << __func__ << " " << frag << dendl;
2138 const bool fatal = mdcache->mds->damage_table.notify_dirfrag(
2139 inode->ino(), frag, get_path());
2140 if (fatal) {
2141 mdcache->mds->damaged();
2142 ceph_abort(); // unreachable, damaged() respawns us
2143 }
2144
2145 if (complete) {
2146 if (get_version() == 0) {
2147 auto _fnode = allocate_fnode();
2148 _fnode->version = 1;
2149 reset_fnode(std::move(_fnode));
2150 }
2151
2152 state_set(STATE_BADFRAG);
2153 mark_complete();
2154 }
2155
2156 state_clear(STATE_FETCHING);
2157 auth_unpin(this);
2158 finish_waiting(WAIT_COMPLETE, -CEPHFS_EIO);
2159 }
2160
2161 // -----------------------
2162 // COMMIT
2163
2164 /**
2165 * commit
2166 *
2167 * @param want - min version i want committed
2168 * @param c - callback for completion
2169 */
2170 void CDir::commit(version_t want, MDSContext *c, bool ignore_authpinnability, int op_prio)
2171 {
2172 dout(10) << "commit want " << want << " on " << *this << dendl;
2173 if (want == 0) want = get_version();
2174
2175 // preconditions
2176 ceph_assert(want <= get_version() || get_version() == 0); // can't commit the future
2177 ceph_assert(want > committed_version); // the caller is stupid
2178 ceph_assert(is_auth());
2179 ceph_assert(ignore_authpinnability || can_auth_pin());
2180
2181 // note: queue up a noop if necessary, so that we always
2182 // get an auth_pin.
2183 if (!c)
2184 c = new C_MDSInternalNoop;
2185
2186 // auth_pin on first waiter
2187 if (waiting_for_commit.empty())
2188 auth_pin(this);
2189 waiting_for_commit[want].push_back(c);
2190
2191 // ok.
2192 _commit(want, op_prio);
2193 }
2194
2195 class C_IO_Dir_Committed : public CDirIOContext {
2196 version_t version;
2197 public:
2198 C_IO_Dir_Committed(CDir *d, version_t v) : CDirIOContext(d), version(v) { }
2199 void finish(int r) override {
2200 dir->_committed(r, version);
2201 }
2202 void print(ostream& out) const override {
2203 out << "dirfrag_committed(" << dir->dirfrag() << ")";
2204 }
2205 };
2206
2207 class C_IO_Dir_Commit_Ops : public Context {
2208 public:
2209 C_IO_Dir_Commit_Ops(CDir *d, int pr,
2210 vector<CDir::dentry_commit_item> &&s, bufferlist &&bl,
2211 vector<string> &&r,
2212 mempool::mds_co::compact_set<mempool::mds_co::string> &&stales) :
2213 dir(d), op_prio(pr) {
2214 metapool = dir->mdcache->mds->get_metadata_pool();
2215 version = dir->get_version();
2216 is_new = dir->is_new();
2217 to_set.swap(s);
2218 dfts.swap(bl);
2219 to_remove.swap(r);
2220 stale_items.swap(stales);
2221 }
2222
2223 void finish(int r) override {
2224 dir->_omap_commit_ops(r, op_prio, metapool, version, is_new, to_set, dfts,
2225 to_remove, stale_items);
2226 }
2227
2228 private:
2229 CDir *dir;
2230 int op_prio;
2231 int64_t metapool;
2232 version_t version;
2233 bool is_new;
2234 vector<CDir::dentry_commit_item> to_set;
2235 bufferlist dfts;
2236 vector<string> to_remove;
2237 mempool::mds_co::compact_set<mempool::mds_co::string> stale_items;
2238 };
2239
2240 // This is doing the same thing with the InodeStoreBase::encode()
2241 void CDir::_encode_primary_inode_base(dentry_commit_item &item, bufferlist &dfts,
2242 bufferlist &bl)
2243 {
2244 ENCODE_START(6, 4, bl);
2245 encode(*item.inode, bl, item.features);
2246
2247 if (!item.symlink.empty())
2248 encode(item.symlink, bl);
2249
2250 // dirfragtree
2251 dfts.splice(0, item.dft_len, &bl);
2252
2253 if (item.xattrs)
2254 encode(*item.xattrs, bl);
2255 else
2256 encode((__u32)0, bl);
2257
2258 if (item.snaprealm) {
2259 bufferlist snapr_bl;
2260 encode(item.srnode, snapr_bl);
2261 encode(snapr_bl, bl);
2262 } else {
2263 encode(bufferlist(), bl);
2264 }
2265
2266 if (item.old_inodes)
2267 encode(*item.old_inodes, bl, item.features);
2268 else
2269 encode((__u32)0, bl);
2270
2271 encode(item.oldest_snap, bl);
2272 encode(item.damage_flags, bl);
2273 ENCODE_FINISH(bl);
2274 }
2275
2276 // This is not locked by mds_lock
2277 void CDir::_omap_commit_ops(int r, int op_prio, int64_t metapool, version_t version, bool _new,
2278 vector<dentry_commit_item> &to_set, bufferlist &dfts,
2279 vector<string>& to_remove,
2280 mempool::mds_co::compact_set<mempool::mds_co::string> &stales)
2281 {
2282 dout(10) << __func__ << dendl;
2283
2284 if (r < 0) {
2285 mdcache->mds->handle_write_error_with_lock(r);
2286 return;
2287 }
2288
2289 C_GatherBuilder gather(g_ceph_context,
2290 new C_OnFinisher(new C_IO_Dir_Committed(this, version),
2291 mdcache->mds->finisher));
2292
2293 SnapContext snapc;
2294 object_t oid = get_ondisk_object();
2295 object_locator_t oloc(metapool);
2296
2297 map<string, bufferlist> _set;
2298 set<string> _rm;
2299
2300 unsigned max_write_size = mdcache->max_dir_commit_size;
2301 unsigned write_size = 0;
2302
2303 auto commit_one = [&](bool header=false) {
2304 ObjectOperation op;
2305
2306 // don't create new dirfrag blindly
2307 if (!_new)
2308 op.stat(nullptr, nullptr, nullptr);
2309
2310 /*
2311 * save the header at the last moment.. If we were to send it off before
2312 * other updates, but die before sending them all, we'd think that the
2313 * on-disk state was fully committed even though it wasn't! However, since
2314 * the messages are strictly ordered between the MDS and the OSD, and
2315 * since messages to a given PG are strictly ordered, if we simply send
2316 * the message containing the header off last, we cannot get our header
2317 * into an incorrect state.
2318 */
2319 if (header) {
2320 bufferlist header;
2321 encode(*fnode, header);
2322 op.omap_set_header(header);
2323 }
2324
2325 op.priority = op_prio;
2326 if (!_set.empty())
2327 op.omap_set(_set);
2328 if (!_rm.empty())
2329 op.omap_rm_keys(_rm);
2330 mdcache->mds->objecter->mutate(oid, oloc, op, snapc,
2331 ceph::real_clock::now(),
2332 0, gather.new_sub());
2333 write_size = 0;
2334 _set.clear();
2335 _rm.clear();
2336 };
2337
2338 int count = 0;
2339 for (auto &key : stales) {
2340 unsigned size = key.length() + sizeof(__u32);
2341 if (write_size + size > max_write_size)
2342 commit_one();
2343
2344 write_size += size;
2345 _rm.emplace(key);
2346
2347 if (!(++count % mdcache->mds->heartbeat_reset_grace(2)))
2348 mdcache->mds->heartbeat_reset();
2349 }
2350
2351 for (auto &key : to_remove) {
2352 unsigned size = key.length() + sizeof(__u32);
2353 if (write_size + size > max_write_size)
2354 commit_one();
2355
2356 write_size += size;
2357 _rm.emplace(std::move(key));
2358
2359 if (!(++count % mdcache->mds->heartbeat_reset_grace(2)))
2360 mdcache->mds->heartbeat_reset();
2361 }
2362
2363 bufferlist bl;
2364 using ceph::encode;
2365 for (auto &item : to_set) {
2366 encode(item.first, bl);
2367 if (item.is_remote) {
2368 // remote link
2369 CDentry::encode_remote(item.ino, item.d_type, item.alternate_name, bl);
2370 } else {
2371 // marker, name, inode, [symlink string]
2372 bl.append('i'); // inode
2373
2374 ENCODE_START(2, 1, bl);
2375 encode(item.alternate_name, bl);
2376 _encode_primary_inode_base(item, dfts, bl);
2377 ENCODE_FINISH(bl);
2378 }
2379
2380 unsigned size = item.key.length() + bl.length() + 2 * sizeof(__u32);
2381 if (write_size + size > max_write_size)
2382 commit_one();
2383
2384 write_size += size;
2385 _set[std::move(item.key)].swap(bl);
2386
2387 if (!(++count % mdcache->mds->heartbeat_reset_grace()))
2388 mdcache->mds->heartbeat_reset();
2389 }
2390
2391 commit_one(true);
2392 gather.activate();
2393 }
2394
2395 /**
2396 * Flush out the modified dentries in this dir. Keep the bufferlist
2397 * below max_write_size;
2398 */
2399 void CDir::_omap_commit(int op_prio)
2400 {
2401 dout(10) << __func__ << dendl;
2402
2403 if (op_prio < 0)
2404 op_prio = CEPH_MSG_PRIO_DEFAULT;
2405
2406 // snap purge?
2407 const set<snapid_t> *snaps = NULL;
2408 SnapRealm *realm = inode->find_snaprealm();
2409 if (fnode->snap_purged_thru < realm->get_last_destroyed()) {
2410 snaps = &realm->get_snaps();
2411 dout(10) << " snap_purged_thru " << fnode->snap_purged_thru
2412 << " < " << realm->get_last_destroyed()
2413 << ", snap purge based on " << *snaps << dendl;
2414 // fnode.snap_purged_thru = realm->get_last_destroyed();
2415 }
2416
2417 size_t items_count = 0;
2418 if (state_test(CDir::STATE_FRAGMENTING) && is_new()) {
2419 items_count = get_num_head_items() + get_num_snap_items();
2420 } else {
2421 for (elist<CDentry*>::iterator it = dirty_dentries.begin(); !it.end(); ++it)
2422 ++items_count;
2423 }
2424
2425 vector<string> to_remove;
2426 // reverve enough memories, which maybe larger than the actually needed
2427 to_remove.reserve(items_count);
2428
2429 vector<dentry_commit_item> to_set;
2430 // reverve enough memories, which maybe larger than the actually needed
2431 to_set.reserve(items_count);
2432
2433 // for dir fragtrees
2434 bufferlist dfts(CEPH_PAGE_SIZE);
2435
2436 auto write_one = [&](CDentry *dn) {
2437 string key;
2438 dn->key().encode(key);
2439
2440 if (dn->last != CEPH_NOSNAP &&
2441 snaps && try_trim_snap_dentry(dn, *snaps)) {
2442 dout(10) << " rm " << key << dendl;
2443 to_remove.emplace_back(std::move(key));
2444 return;
2445 }
2446
2447 if (dn->get_linkage()->is_null()) {
2448 dout(10) << " rm " << dn->get_name() << " " << *dn << dendl;
2449 to_remove.emplace_back(std::move(key));
2450 } else {
2451 dout(10) << " set " << dn->get_name() << " " << *dn << dendl;
2452
2453 uint64_t off = dfts.length();
2454 // try to reserve new size if there has less
2455 // than 1/8 page space
2456 uint64_t left = CEPH_PAGE_SIZE - off % CEPH_PAGE_SIZE;
2457 if (left < CEPH_PAGE_SIZE / 8)
2458 dfts.reserve(left + CEPH_PAGE_SIZE);
2459
2460 auto& item = to_set.emplace_back();
2461 item.key = std::move(key);
2462 _parse_dentry(dn, item, snaps, dfts);
2463 item.dft_len = dfts.length() - off;
2464 }
2465 };
2466
2467 int count = 0;
2468 if (state_test(CDir::STATE_FRAGMENTING) && is_new()) {
2469 ceph_assert(committed_version == 0);
2470 for (auto p = items.begin(); p != items.end(); ) {
2471 CDentry *dn = p->second;
2472 ++p;
2473 if (dn->get_linkage()->is_null())
2474 continue;
2475 write_one(dn);
2476
2477 if (!(++count % mdcache->mds->heartbeat_reset_grace()))
2478 mdcache->mds->heartbeat_reset();
2479 }
2480 } else {
2481 for (auto p = dirty_dentries.begin(); !p.end(); ) {
2482 CDentry *dn = *p;
2483 ++p;
2484 write_one(dn);
2485
2486 if (!(++count % mdcache->mds->heartbeat_reset_grace()))
2487 mdcache->mds->heartbeat_reset();
2488 }
2489 }
2490
2491 auto c = new C_IO_Dir_Commit_Ops(this, op_prio, std::move(to_set), std::move(dfts),
2492 std::move(to_remove), std::move(stale_items));
2493 stale_items.clear();
2494 mdcache->mds->finisher->queue(c);
2495 }
2496
2497 void CDir::_parse_dentry(CDentry *dn, dentry_commit_item &item,
2498 const set<snapid_t> *snaps, bufferlist &bl)
2499 {
2500 // clear dentry NEW flag, if any. we can no longer silently drop it.
2501 dn->clear_new();
2502
2503 item.first = dn->first;
2504
2505 // primary or remote?
2506 auto& linkage = dn->linkage;
2507 item.alternate_name = dn->get_alternate_name();
2508 if (linkage.is_remote()) {
2509 item.is_remote = true;
2510 item.ino = linkage.get_remote_ino();
2511 item.d_type = linkage.get_remote_d_type();
2512 dout(14) << " dn '" << dn->get_name() << "' remote ino " << item.ino << dendl;
2513 } else if (linkage.is_primary()) {
2514 // primary link
2515 CInode *in = linkage.get_inode();
2516 ceph_assert(in);
2517
2518 dout(14) << " dn '" << dn->get_name() << "' inode " << *in << dendl;
2519
2520 if (in->is_multiversion()) {
2521 if (!in->snaprealm) {
2522 if (snaps)
2523 in->purge_stale_snap_data(*snaps);
2524 } else {
2525 in->purge_stale_snap_data(in->snaprealm->get_snaps());
2526 }
2527 }
2528
2529 if (in->snaprealm) {
2530 item.snaprealm = true;
2531 item.srnode = in->snaprealm->srnode;
2532 }
2533 item.features = mdcache->mds->mdsmap->get_up_features();
2534 item.inode = in->inode;
2535 if (in->inode->is_symlink())
2536 item.symlink = in->symlink;
2537 using ceph::encode;
2538 encode(in->dirfragtree, bl);
2539 item.xattrs = in->xattrs;
2540 item.old_inodes = in->old_inodes;
2541 item.oldest_snap = in->oldest_snap;
2542 item.damage_flags = in->damage_flags;
2543 } else {
2544 ceph_assert(!linkage.is_null());
2545 }
2546 }
2547
2548 void CDir::_commit(version_t want, int op_prio)
2549 {
2550 dout(10) << "_commit want " << want << " on " << *this << dendl;
2551
2552 // we can't commit things in the future.
2553 // (even the projected future.)
2554 ceph_assert(want <= get_version() || get_version() == 0);
2555
2556 // check pre+postconditions.
2557 ceph_assert(is_auth());
2558
2559 // already committed?
2560 if (committed_version >= want) {
2561 dout(10) << "already committed " << committed_version << " >= " << want << dendl;
2562 return;
2563 }
2564 // already committing >= want?
2565 if (committing_version >= want) {
2566 dout(10) << "already committing " << committing_version << " >= " << want << dendl;
2567 ceph_assert(state_test(STATE_COMMITTING));
2568 return;
2569 }
2570
2571 // alrady committed an older version?
2572 if (committing_version > committed_version) {
2573 dout(10) << "already committing older " << committing_version << ", waiting for that to finish" << dendl;
2574 return;
2575 }
2576
2577 // commit.
2578 committing_version = get_version();
2579
2580 // mark committing (if not already)
2581 if (!state_test(STATE_COMMITTING)) {
2582 dout(10) << "marking committing" << dendl;
2583 state_set(STATE_COMMITTING);
2584 }
2585
2586 if (mdcache->mds->logger) mdcache->mds->logger->inc(l_mds_dir_commit);
2587
2588 mdcache->mds->balancer->hit_dir(this, META_POP_STORE);
2589
2590 _omap_commit(op_prio);
2591 }
2592
2593
2594 /**
2595 * _committed
2596 *
2597 * @param v version i just committed
2598 */
2599 void CDir::_committed(int r, version_t v)
2600 {
2601 if (r < 0) {
2602 // the directory could be partly purged during MDS failover
2603 if (r == -CEPHFS_ENOENT && committed_version == 0 &&
2604 !inode->is_base() && get_parent_dir()->inode->is_stray()) {
2605 r = 0;
2606 if (inode->snaprealm)
2607 inode->state_set(CInode::STATE_MISSINGOBJS);
2608 }
2609 if (r < 0) {
2610 dout(1) << "commit error " << r << " v " << v << dendl;
2611 mdcache->mds->clog->error() << "failed to commit dir " << dirfrag() << " object,"
2612 << " errno " << r;
2613 mdcache->mds->handle_write_error(r);
2614 return;
2615 }
2616 }
2617
2618 dout(10) << "_committed v " << v << " on " << *this << dendl;
2619 ceph_assert(is_auth());
2620
2621 bool stray = inode->is_stray();
2622
2623 // take note.
2624 ceph_assert(v > committed_version);
2625 ceph_assert(v <= committing_version);
2626 committed_version = v;
2627
2628 // _all_ commits done?
2629 if (committing_version == committed_version)
2630 state_clear(CDir::STATE_COMMITTING);
2631
2632 // _any_ commit, even if we've been redirtied, means we're no longer new.
2633 item_new.remove_myself();
2634
2635 // dir clean?
2636 if (committed_version == get_version())
2637 mark_clean();
2638
2639 int count = 0;
2640
2641 // dentries clean?
2642 for (auto p = dirty_dentries.begin(); !p.end(); ) {
2643 CDentry *dn = *p;
2644 ++p;
2645
2646 // inode?
2647 if (dn->linkage.is_primary()) {
2648 CInode *in = dn->linkage.get_inode();
2649 ceph_assert(in);
2650 ceph_assert(in->is_auth());
2651
2652 if (committed_version >= in->get_version()) {
2653 if (in->is_dirty()) {
2654 dout(15) << " dir " << committed_version << " >= inode " << in->get_version() << " now clean " << *in << dendl;
2655 in->mark_clean();
2656 }
2657 } else {
2658 dout(15) << " dir " << committed_version << " < inode " << in->get_version() << " still dirty " << *in << dendl;
2659 ceph_assert(in->is_dirty() || in->last < CEPH_NOSNAP); // special case for cow snap items (not predirtied)
2660 }
2661 }
2662
2663 // dentry
2664 if (committed_version >= dn->get_version()) {
2665 dout(15) << " dir " << committed_version << " >= dn " << dn->get_version() << " now clean " << *dn << dendl;
2666 dn->mark_clean();
2667
2668 // drop clean null stray dentries immediately
2669 if (stray &&
2670 dn->get_num_ref() == 0 &&
2671 !dn->is_projected() &&
2672 dn->get_linkage()->is_null())
2673 remove_dentry(dn);
2674 } else {
2675 dout(15) << " dir " << committed_version << " < dn " << dn->get_version() << " still dirty " << *dn << dendl;
2676 ceph_assert(dn->is_dirty());
2677 }
2678
2679 if (!(++count % mdcache->mds->heartbeat_reset_grace()))
2680 mdcache->mds->heartbeat_reset();
2681 }
2682
2683 // finishers?
2684 bool were_waiters = !waiting_for_commit.empty();
2685
2686 auto it = waiting_for_commit.begin();
2687 while (it != waiting_for_commit.end()) {
2688 auto _it = it;
2689 ++_it;
2690 if (it->first > committed_version) {
2691 dout(10) << " there are waiters for " << it->first << ", committing again" << dendl;
2692 _commit(it->first, -1);
2693 break;
2694 }
2695 MDSContext::vec t;
2696 for (const auto &waiter : it->second)
2697 t.push_back(waiter);
2698 mdcache->mds->queue_waiters(t);
2699 waiting_for_commit.erase(it);
2700 it = _it;
2701
2702 if (!(++count % mdcache->mds->heartbeat_reset_grace()))
2703 mdcache->mds->heartbeat_reset();
2704 }
2705
2706 // try drop dentries in this dirfrag if it's about to be purged
2707 if (!inode->is_base() && get_parent_dir()->inode->is_stray() &&
2708 inode->snaprealm)
2709 mdcache->maybe_eval_stray(inode, true);
2710
2711 // unpin if we kicked the last waiter.
2712 if (were_waiters &&
2713 waiting_for_commit.empty())
2714 auth_unpin(this);
2715 }
2716
2717
2718
2719
2720 // IMPORT/EXPORT
2721
2722 mds_rank_t CDir::get_export_pin(bool inherit) const
2723 {
2724 mds_rank_t export_pin = inode->get_export_pin(inherit);
2725 if (export_pin == MDS_RANK_EPHEMERAL_DIST)
2726 export_pin = mdcache->hash_into_rank_bucket(ino(), get_frag());
2727 else if (export_pin == MDS_RANK_EPHEMERAL_RAND)
2728 export_pin = mdcache->hash_into_rank_bucket(ino());
2729 return export_pin;
2730 }
2731
2732 bool CDir::is_exportable(mds_rank_t dest) const
2733 {
2734 mds_rank_t export_pin = get_export_pin();
2735 if (export_pin == dest)
2736 return true;
2737 if (export_pin >= 0)
2738 return false;
2739 return true;
2740 }
2741
2742 void CDir::encode_export(bufferlist& bl)
2743 {
2744 ENCODE_START(1, 1, bl);
2745 ceph_assert(!is_projected());
2746 encode(first, bl);
2747 encode(*fnode, bl);
2748 encode(dirty_old_rstat, bl);
2749 encode(committed_version, bl);
2750
2751 encode(state, bl);
2752 encode(dir_rep, bl);
2753
2754 encode(pop_me, bl);
2755 encode(pop_auth_subtree, bl);
2756
2757 encode(dir_rep_by, bl);
2758 encode(get_replicas(), bl);
2759
2760 get(PIN_TEMPEXPORTING);
2761 ENCODE_FINISH(bl);
2762 }
2763
2764 void CDir::finish_export()
2765 {
2766 state &= MASK_STATE_EXPORT_KEPT;
2767 pop_nested.sub(pop_auth_subtree);
2768 pop_auth_subtree_nested.sub(pop_auth_subtree);
2769 pop_me.zero();
2770 pop_auth_subtree.zero();
2771 put(PIN_TEMPEXPORTING);
2772 dirty_old_rstat.clear();
2773 }
2774
2775 void CDir::decode_import(bufferlist::const_iterator& blp, LogSegment *ls)
2776 {
2777 DECODE_START(1, blp);
2778 decode(first, blp);
2779 {
2780 auto _fnode = allocate_fnode();
2781 decode(*_fnode, blp);
2782 reset_fnode(std::move(_fnode));
2783 }
2784 update_projected_version();
2785
2786 decode(dirty_old_rstat, blp);
2787 decode(committed_version, blp);
2788 committing_version = committed_version;
2789
2790 unsigned s;
2791 decode(s, blp);
2792 state &= MASK_STATE_IMPORT_KEPT;
2793 state_set(STATE_AUTH | (s & MASK_STATE_EXPORTED));
2794
2795 if (is_dirty()) {
2796 get(PIN_DIRTY);
2797 _mark_dirty(ls);
2798 }
2799
2800 decode(dir_rep, blp);
2801
2802 decode(pop_me, blp);
2803 decode(pop_auth_subtree, blp);
2804 pop_nested.add(pop_auth_subtree);
2805 pop_auth_subtree_nested.add(pop_auth_subtree);
2806
2807 decode(dir_rep_by, blp);
2808 decode(get_replicas(), blp);
2809 if (is_replicated()) get(PIN_REPLICATED);
2810
2811 replica_nonce = 0; // no longer defined
2812
2813 // did we import some dirty scatterlock data?
2814 if (dirty_old_rstat.size() ||
2815 !(fnode->rstat == fnode->accounted_rstat)) {
2816 mdcache->mds->locker->mark_updated_scatterlock(&inode->nestlock);
2817 ls->dirty_dirfrag_nest.push_back(&inode->item_dirty_dirfrag_nest);
2818 }
2819 if (!(fnode->fragstat == fnode->accounted_fragstat)) {
2820 mdcache->mds->locker->mark_updated_scatterlock(&inode->filelock);
2821 ls->dirty_dirfrag_dir.push_back(&inode->item_dirty_dirfrag_dir);
2822 }
2823 if (is_dirty_dft()) {
2824 if (inode->dirfragtreelock.get_state() != LOCK_MIX &&
2825 inode->dirfragtreelock.is_stable()) {
2826 // clear stale dirtydft
2827 state_clear(STATE_DIRTYDFT);
2828 } else {
2829 mdcache->mds->locker->mark_updated_scatterlock(&inode->dirfragtreelock);
2830 ls->dirty_dirfrag_dirfragtree.push_back(&inode->item_dirty_dirfrag_dirfragtree);
2831 }
2832 }
2833 DECODE_FINISH(blp);
2834 }
2835
2836 void CDir::abort_import()
2837 {
2838 ceph_assert(is_auth());
2839 state_clear(CDir::STATE_AUTH);
2840 remove_bloom();
2841 clear_replica_map();
2842 set_replica_nonce(CDir::EXPORT_NONCE);
2843 if (is_dirty())
2844 mark_clean();
2845
2846 pop_nested.sub(pop_auth_subtree);
2847 pop_auth_subtree_nested.sub(pop_auth_subtree);
2848 pop_me.zero();
2849 pop_auth_subtree.zero();
2850 }
2851
2852 void CDir::encode_dirstat(bufferlist& bl, const session_info_t& info, const DirStat& ds) {
2853 if (info.has_feature(CEPHFS_FEATURE_REPLY_ENCODING)) {
2854 ENCODE_START(1, 1, bl);
2855 encode(ds.frag, bl);
2856 encode(ds.auth, bl);
2857 encode(ds.dist, bl);
2858 ENCODE_FINISH(bl);
2859 }
2860 else {
2861 encode(ds.frag, bl);
2862 encode(ds.auth, bl);
2863 encode(ds.dist, bl);
2864 }
2865 }
2866
2867 /********************************
2868 * AUTHORITY
2869 */
2870
2871 /*
2872 * if dir_auth.first == parent, auth is same as inode.
2873 * unless .second != unknown, in which case that sticks.
2874 */
2875 mds_authority_t CDir::authority() const
2876 {
2877 if (is_subtree_root())
2878 return dir_auth;
2879 else
2880 return inode->authority();
2881 }
2882
2883 /** is_subtree_root()
2884 * true if this is an auth delegation point.
2885 * that is, dir_auth != default (parent,unknown)
2886 *
2887 * some key observations:
2888 * if i am auth:
2889 * - any region bound will be an export, or frozen.
2890 *
2891 * note that this DOES heed dir_auth.pending
2892 */
2893 /*
2894 bool CDir::is_subtree_root()
2895 {
2896 if (dir_auth == CDIR_AUTH_DEFAULT) {
2897 //dout(10) << "is_subtree_root false " << dir_auth << " != " << CDIR_AUTH_DEFAULT
2898 //<< " on " << ino() << dendl;
2899 return false;
2900 } else {
2901 //dout(10) << "is_subtree_root true " << dir_auth << " != " << CDIR_AUTH_DEFAULT
2902 //<< " on " << ino() << dendl;
2903 return true;
2904 }
2905 }
2906 */
2907
2908 /** contains(x)
2909 * true if we are x, or an ancestor of x
2910 */
2911 bool CDir::contains(CDir *x)
2912 {
2913 while (1) {
2914 if (x == this)
2915 return true;
2916 x = x->get_inode()->get_projected_parent_dir();
2917 if (x == 0)
2918 return false;
2919 }
2920 }
2921
2922 bool CDir::can_rep() const
2923 {
2924 if (!is_rep())
2925 return true;
2926
2927 unsigned mds_num = mdcache->mds->get_mds_map()->get_num_mds(MDSMap::STATE_ACTIVE);
2928 if ((mds_num - 1) > get_replicas().size())
2929 return true;
2930
2931 return false;
2932 }
2933
2934
2935 /** set_dir_auth
2936 */
2937 void CDir::set_dir_auth(const mds_authority_t &a)
2938 {
2939 dout(10) << "setting dir_auth=" << a
2940 << " from " << dir_auth
2941 << " on " << *this << dendl;
2942
2943 bool was_subtree = is_subtree_root();
2944 bool was_ambiguous = dir_auth.second >= 0;
2945
2946 // set it.
2947 dir_auth = a;
2948
2949 // new subtree root?
2950 if (!was_subtree && is_subtree_root()) {
2951 dout(10) << " new subtree root, adjusting auth_pins" << dendl;
2952
2953 if (freeze_tree_state) {
2954 // only by CDir::_freeze_tree()
2955 ceph_assert(is_freezing_tree_root());
2956 }
2957
2958 inode->num_subtree_roots++;
2959
2960 // unpin parent of frozen dir/tree?
2961 if (inode->is_auth()) {
2962 ceph_assert(!is_frozen_tree_root());
2963 if (is_frozen_dir())
2964 inode->auth_unpin(this);
2965 }
2966 }
2967 if (was_subtree && !is_subtree_root()) {
2968 dout(10) << " old subtree root, adjusting auth_pins" << dendl;
2969
2970 inode->num_subtree_roots--;
2971
2972 // pin parent of frozen dir/tree?
2973 if (inode->is_auth()) {
2974 ceph_assert(!is_frozen_tree_root());
2975 if (is_frozen_dir())
2976 inode->auth_pin(this);
2977 }
2978 }
2979
2980 // newly single auth?
2981 if (was_ambiguous && dir_auth.second == CDIR_AUTH_UNKNOWN) {
2982 MDSContext::vec ls;
2983 take_waiting(WAIT_SINGLEAUTH, ls);
2984 mdcache->mds->queue_waiters(ls);
2985 }
2986 }
2987
2988 /*****************************************
2989 * AUTH PINS and FREEZING
2990 *
2991 * the basic plan is that auth_pins only exist in auth regions, and they
2992 * prevent a freeze (and subsequent auth change).
2993 *
2994 * however, we also need to prevent a parent from freezing if a child is frozen.
2995 * for that reason, the parent inode of a frozen directory is auth_pinned.
2996 *
2997 * the oddity is when the frozen directory is a subtree root. if that's the case,
2998 * the parent inode isn't frozen. which means that when subtree authority is adjusted
2999 * at the bounds, inodes for any frozen bound directories need to get auth_pins at that
3000 * time.
3001 *
3002 */
3003
3004 void CDir::auth_pin(void *by)
3005 {
3006 if (auth_pins == 0)
3007 get(PIN_AUTHPIN);
3008 auth_pins++;
3009
3010 #ifdef MDS_AUTHPIN_SET
3011 auth_pin_set.insert(by);
3012 #endif
3013
3014 dout(10) << "auth_pin by " << by << " on " << *this << " count now " << auth_pins << dendl;
3015
3016 if (freeze_tree_state)
3017 freeze_tree_state->auth_pins += 1;
3018 }
3019
3020 void CDir::auth_unpin(void *by)
3021 {
3022 auth_pins--;
3023
3024 #ifdef MDS_AUTHPIN_SET
3025 {
3026 auto it = auth_pin_set.find(by);
3027 ceph_assert(it != auth_pin_set.end());
3028 auth_pin_set.erase(it);
3029 }
3030 #endif
3031 if (auth_pins == 0)
3032 put(PIN_AUTHPIN);
3033
3034 dout(10) << "auth_unpin by " << by << " on " << *this << " count now " << auth_pins << dendl;
3035 ceph_assert(auth_pins >= 0);
3036
3037 if (freeze_tree_state)
3038 freeze_tree_state->auth_pins -= 1;
3039
3040 maybe_finish_freeze(); // pending freeze?
3041 }
3042
3043 void CDir::adjust_nested_auth_pins(int dirinc, void *by)
3044 {
3045 ceph_assert(dirinc);
3046 dir_auth_pins += dirinc;
3047
3048 dout(15) << __func__ << " " << dirinc << " on " << *this
3049 << " by " << by << " count now "
3050 << auth_pins << "/" << dir_auth_pins << dendl;
3051 ceph_assert(dir_auth_pins >= 0);
3052
3053 if (freeze_tree_state)
3054 freeze_tree_state->auth_pins += dirinc;
3055
3056 if (dirinc < 0)
3057 maybe_finish_freeze(); // pending freeze?
3058 }
3059
3060 #ifdef MDS_VERIFY_FRAGSTAT
3061 void CDir::verify_fragstat()
3062 {
3063 ceph_assert(is_complete());
3064 if (inode->is_stray())
3065 return;
3066
3067 frag_info_t c;
3068 memset(&c, 0, sizeof(c));
3069
3070 for (auto it = items.begin();
3071 it != items.end();
3072 ++it) {
3073 CDentry *dn = it->second;
3074 if (dn->is_null())
3075 continue;
3076
3077 dout(10) << " " << *dn << dendl;
3078 if (dn->is_primary())
3079 dout(10) << " " << *dn->inode << dendl;
3080
3081 if (dn->is_primary()) {
3082 if (dn->inode->is_dir())
3083 c.nsubdirs++;
3084 else
3085 c.nfiles++;
3086 }
3087 if (dn->is_remote()) {
3088 if (dn->get_remote_d_type() == DT_DIR)
3089 c.nsubdirs++;
3090 else
3091 c.nfiles++;
3092 }
3093 }
3094
3095 if (c.nsubdirs != fnode->fragstat.nsubdirs ||
3096 c.nfiles != fnode->fragstat.nfiles) {
3097 dout(0) << "verify_fragstat failed " << fnode->fragstat << " on " << *this << dendl;
3098 dout(0) << " i count " << c << dendl;
3099 ceph_abort();
3100 } else {
3101 dout(0) << "verify_fragstat ok " << fnode->fragstat << " on " << *this << dendl;
3102 }
3103 }
3104 #endif
3105
3106 /*****************************************************************************
3107 * FREEZING
3108 */
3109
3110 // FREEZE TREE
3111
3112 void CDir::_walk_tree(std::function<bool(CDir*)> callback)
3113 {
3114 deque<CDir*> dfq;
3115 dfq.push_back(this);
3116
3117 while (!dfq.empty()) {
3118 CDir *dir = dfq.front();
3119 dfq.pop_front();
3120
3121 for (auto& p : *dir) {
3122 CDentry *dn = p.second;
3123 if (!dn->get_linkage()->is_primary())
3124 continue;
3125 CInode *in = dn->get_linkage()->get_inode();
3126 if (!in->is_dir())
3127 continue;
3128
3129 auto&& dfv = in->get_nested_dirfrags();
3130 for (auto& dir : dfv) {
3131 auto ret = callback(dir);
3132 if (ret)
3133 dfq.push_back(dir);
3134 }
3135 }
3136 }
3137 }
3138
3139 bool CDir::freeze_tree()
3140 {
3141 ceph_assert(!is_frozen());
3142 ceph_assert(!is_freezing());
3143 ceph_assert(!freeze_tree_state);
3144
3145 auth_pin(this);
3146
3147 // Travese the subtree to mark dirfrags as 'freezing' (set freeze_tree_state)
3148 // and to accumulate auth pins and record total count in freeze_tree_state.
3149 // when auth unpin an 'freezing' object, the counter in freeze_tree_state also
3150 // gets decreased. Subtree become 'frozen' when the counter reaches zero.
3151 freeze_tree_state = std::make_shared<freeze_tree_state_t>(this);
3152 freeze_tree_state->auth_pins += get_auth_pins() + get_dir_auth_pins();
3153 if (!lock_caches_with_auth_pins.empty())
3154 mdcache->mds->locker->invalidate_lock_caches(this);
3155
3156 _walk_tree([this](CDir *dir) {
3157 if (dir->freeze_tree_state)
3158 return false;
3159 dir->freeze_tree_state = freeze_tree_state;
3160 freeze_tree_state->auth_pins += dir->get_auth_pins() + dir->get_dir_auth_pins();
3161 if (!dir->lock_caches_with_auth_pins.empty())
3162 mdcache->mds->locker->invalidate_lock_caches(dir);
3163 return true;
3164 }
3165 );
3166
3167 if (is_freezeable(true)) {
3168 _freeze_tree();
3169 auth_unpin(this);
3170 return true;
3171 } else {
3172 state_set(STATE_FREEZINGTREE);
3173 ++num_freezing_trees;
3174 dout(10) << "freeze_tree waiting " << *this << dendl;
3175 return false;
3176 }
3177 }
3178
3179 void CDir::_freeze_tree()
3180 {
3181 dout(10) << __func__ << " " << *this << dendl;
3182 ceph_assert(is_freezeable(true));
3183
3184 if (freeze_tree_state) {
3185 ceph_assert(is_auth());
3186 } else {
3187 ceph_assert(!is_auth());
3188 freeze_tree_state = std::make_shared<freeze_tree_state_t>(this);
3189 }
3190 freeze_tree_state->frozen = true;
3191
3192 if (is_auth()) {
3193 mds_authority_t auth;
3194 bool was_subtree = is_subtree_root();
3195 if (was_subtree) {
3196 auth = get_dir_auth();
3197 } else {
3198 // temporarily prevent parent subtree from becoming frozen.
3199 inode->auth_pin(this);
3200 // create new subtree
3201 auth = authority();
3202 }
3203
3204 _walk_tree([this, &auth] (CDir *dir) {
3205 if (dir->freeze_tree_state != freeze_tree_state) {
3206 mdcache->adjust_subtree_auth(dir, auth);
3207 return false;
3208 }
3209 return true;
3210 }
3211 );
3212
3213 ceph_assert(auth.first >= 0);
3214 ceph_assert(auth.second == CDIR_AUTH_UNKNOWN);
3215 auth.second = auth.first;
3216 mdcache->adjust_subtree_auth(this, auth);
3217 if (!was_subtree)
3218 inode->auth_unpin(this);
3219 } else {
3220 // importing subtree ?
3221 _walk_tree([this] (CDir *dir) {
3222 ceph_assert(!dir->freeze_tree_state);
3223 dir->freeze_tree_state = freeze_tree_state;
3224 return true;
3225 }
3226 );
3227 }
3228
3229 // twiddle state
3230 if (state_test(STATE_FREEZINGTREE)) {
3231 state_clear(STATE_FREEZINGTREE);
3232 --num_freezing_trees;
3233 }
3234
3235 state_set(STATE_FROZENTREE);
3236 ++num_frozen_trees;
3237 get(PIN_FROZEN);
3238 }
3239
3240 void CDir::unfreeze_tree()
3241 {
3242 dout(10) << __func__ << " " << *this << dendl;
3243
3244 MDSContext::vec unfreeze_waiters;
3245 take_waiting(WAIT_UNFREEZE, unfreeze_waiters);
3246
3247 if (freeze_tree_state) {
3248 _walk_tree([this, &unfreeze_waiters](CDir *dir) {
3249 if (dir->freeze_tree_state != freeze_tree_state)
3250 return false;
3251 dir->freeze_tree_state.reset();
3252 dir->take_waiting(WAIT_UNFREEZE, unfreeze_waiters);
3253 return true;
3254 }
3255 );
3256 }
3257
3258 if (state_test(STATE_FROZENTREE)) {
3259 // frozen. unfreeze.
3260 state_clear(STATE_FROZENTREE);
3261 --num_frozen_trees;
3262
3263 put(PIN_FROZEN);
3264
3265 if (is_auth()) {
3266 // must be subtree
3267 ceph_assert(is_subtree_root());
3268 // for debug purpose, caller should ensure 'dir_auth.second == dir_auth.first'
3269 mds_authority_t auth = get_dir_auth();
3270 ceph_assert(auth.first >= 0);
3271 ceph_assert(auth.second == auth.first);
3272 auth.second = CDIR_AUTH_UNKNOWN;
3273 mdcache->adjust_subtree_auth(this, auth);
3274 }
3275 freeze_tree_state.reset();
3276 } else {
3277 ceph_assert(state_test(STATE_FREEZINGTREE));
3278
3279 // freezing. stop it.
3280 state_clear(STATE_FREEZINGTREE);
3281 --num_freezing_trees;
3282 freeze_tree_state.reset();
3283
3284 finish_waiting(WAIT_FROZEN, -1);
3285 auth_unpin(this);
3286 }
3287
3288 mdcache->mds->queue_waiters(unfreeze_waiters);
3289 }
3290
3291 void CDir::adjust_freeze_after_rename(CDir *dir)
3292 {
3293 if (!freeze_tree_state || dir->freeze_tree_state != freeze_tree_state)
3294 return;
3295 CDir *newdir = dir->get_inode()->get_parent_dir();
3296 if (newdir == this || newdir->freeze_tree_state == freeze_tree_state)
3297 return;
3298
3299 ceph_assert(!freeze_tree_state->frozen);
3300 ceph_assert(get_dir_auth_pins() > 0);
3301
3302 MDSContext::vec unfreeze_waiters;
3303
3304 auto unfreeze = [this, &unfreeze_waiters](CDir *dir) {
3305 if (dir->freeze_tree_state != freeze_tree_state)
3306 return false;
3307 int dec = dir->get_auth_pins() + dir->get_dir_auth_pins();
3308 // shouldn't become zero because srcdn of rename was auth pinned
3309 ceph_assert(freeze_tree_state->auth_pins > dec);
3310 freeze_tree_state->auth_pins -= dec;
3311 dir->freeze_tree_state.reset();
3312 dir->take_waiting(WAIT_UNFREEZE, unfreeze_waiters);
3313 return true;
3314 };
3315
3316 unfreeze(dir);
3317 dir->_walk_tree(unfreeze);
3318
3319 mdcache->mds->queue_waiters(unfreeze_waiters);
3320 }
3321
3322 bool CDir::can_auth_pin(int *err_ret) const
3323 {
3324 int err;
3325 if (!is_auth()) {
3326 err = ERR_NOT_AUTH;
3327 } else if (is_freezing_dir() || is_frozen_dir()) {
3328 err = ERR_FRAGMENTING_DIR;
3329 } else {
3330 auto p = is_freezing_or_frozen_tree();
3331 if (p.first || p.second) {
3332 err = ERR_EXPORTING_TREE;
3333 } else {
3334 err = 0;
3335 }
3336 }
3337 if (err && err_ret)
3338 *err_ret = err;
3339 return !err;
3340 }
3341
3342 class C_Dir_AuthUnpin : public CDirContext {
3343 public:
3344 explicit C_Dir_AuthUnpin(CDir *d) : CDirContext(d) {}
3345 void finish(int r) override {
3346 dir->auth_unpin(dir->get_inode());
3347 }
3348 };
3349
3350 void CDir::maybe_finish_freeze()
3351 {
3352 if (dir_auth_pins != 0)
3353 return;
3354
3355 // we can freeze the _dir_ even with nested pins...
3356 if (state_test(STATE_FREEZINGDIR)) {
3357 if (auth_pins == 1) {
3358 _freeze_dir();
3359 auth_unpin(this);
3360 finish_waiting(WAIT_FROZEN);
3361 }
3362 }
3363
3364 if (freeze_tree_state) {
3365 if (freeze_tree_state->frozen ||
3366 freeze_tree_state->auth_pins != 1)
3367 return;
3368
3369 if (freeze_tree_state->dir != this) {
3370 freeze_tree_state->dir->maybe_finish_freeze();
3371 return;
3372 }
3373
3374 ceph_assert(state_test(STATE_FREEZINGTREE));
3375
3376 if (!is_subtree_root() && inode->is_frozen()) {
3377 dout(10) << __func__ << " !subtree root and frozen inode, waiting for unfreeze on " << inode << dendl;
3378 // retake an auth_pin...
3379 auth_pin(inode);
3380 // and release it when the parent inode unfreezes
3381 inode->add_waiter(WAIT_UNFREEZE, new C_Dir_AuthUnpin(this));
3382 return;
3383 }
3384
3385 _freeze_tree();
3386 auth_unpin(this);
3387 finish_waiting(WAIT_FROZEN);
3388 }
3389 }
3390
3391
3392
3393 // FREEZE DIR
3394
3395 bool CDir::freeze_dir()
3396 {
3397 ceph_assert(!is_frozen());
3398 ceph_assert(!is_freezing());
3399
3400 auth_pin(this);
3401 if (is_freezeable_dir(true)) {
3402 _freeze_dir();
3403 auth_unpin(this);
3404 return true;
3405 } else {
3406 state_set(STATE_FREEZINGDIR);
3407 if (!lock_caches_with_auth_pins.empty())
3408 mdcache->mds->locker->invalidate_lock_caches(this);
3409 dout(10) << "freeze_dir + wait " << *this << dendl;
3410 return false;
3411 }
3412 }
3413
3414 void CDir::_freeze_dir()
3415 {
3416 dout(10) << __func__ << " " << *this << dendl;
3417 //assert(is_freezeable_dir(true));
3418 // not always true during split because the original fragment may have frozen a while
3419 // ago and we're just now getting around to breaking it up.
3420
3421 state_clear(STATE_FREEZINGDIR);
3422 state_set(STATE_FROZENDIR);
3423 get(PIN_FROZEN);
3424
3425 if (is_auth() && !is_subtree_root())
3426 inode->auth_pin(this); // auth_pin for duration of freeze
3427 }
3428
3429
3430 void CDir::unfreeze_dir()
3431 {
3432 dout(10) << __func__ << " " << *this << dendl;
3433
3434 if (state_test(STATE_FROZENDIR)) {
3435 state_clear(STATE_FROZENDIR);
3436 put(PIN_FROZEN);
3437
3438 // unpin (may => FREEZEABLE) FIXME: is this order good?
3439 if (is_auth() && !is_subtree_root())
3440 inode->auth_unpin(this);
3441
3442 finish_waiting(WAIT_UNFREEZE);
3443 } else {
3444 finish_waiting(WAIT_FROZEN, -1);
3445
3446 // still freezing. stop.
3447 ceph_assert(state_test(STATE_FREEZINGDIR));
3448 state_clear(STATE_FREEZINGDIR);
3449 auth_unpin(this);
3450
3451 finish_waiting(WAIT_UNFREEZE);
3452 }
3453 }
3454
3455 void CDir::enable_frozen_inode()
3456 {
3457 ceph_assert(frozen_inode_suppressed > 0);
3458 if (--frozen_inode_suppressed == 0) {
3459 for (auto p = freezing_inodes.begin(); !p.end(); ) {
3460 CInode *in = *p;
3461 ++p;
3462 ceph_assert(in->is_freezing_inode());
3463 in->maybe_finish_freeze_inode();
3464 }
3465 }
3466 }
3467
3468 /**
3469 * Slightly less complete than operator<<, because this is intended
3470 * for identifying a directory and its state rather than for dumping
3471 * debug output.
3472 */
3473 void CDir::dump(Formatter *f, int flags) const
3474 {
3475 ceph_assert(f != NULL);
3476 if (flags & DUMP_PATH) {
3477 f->dump_stream("path") << get_path();
3478 }
3479 if (flags & DUMP_DIRFRAG) {
3480 f->dump_stream("dirfrag") << dirfrag();
3481 }
3482 if (flags & DUMP_SNAPID_FIRST) {
3483 f->dump_int("snapid_first", first);
3484 }
3485 if (flags & DUMP_VERSIONS) {
3486 f->dump_stream("projected_version") << get_projected_version();
3487 f->dump_stream("version") << get_version();
3488 f->dump_stream("committing_version") << get_committing_version();
3489 f->dump_stream("committed_version") << get_committed_version();
3490 }
3491 if (flags & DUMP_REP) {
3492 f->dump_bool("is_rep", is_rep());
3493 }
3494 if (flags & DUMP_DIR_AUTH) {
3495 if (get_dir_auth() != CDIR_AUTH_DEFAULT) {
3496 if (get_dir_auth().second == CDIR_AUTH_UNKNOWN) {
3497 f->dump_stream("dir_auth") << get_dir_auth().first;
3498 } else {
3499 f->dump_stream("dir_auth") << get_dir_auth();
3500 }
3501 } else {
3502 f->dump_string("dir_auth", "");
3503 }
3504 }
3505 if (flags & DUMP_STATES) {
3506 f->open_array_section("states");
3507 MDSCacheObject::dump_states(f);
3508 if (state_test(CDir::STATE_COMPLETE)) f->dump_string("state", "complete");
3509 if (state_test(CDir::STATE_FREEZINGTREE)) f->dump_string("state", "freezingtree");
3510 if (state_test(CDir::STATE_FROZENTREE)) f->dump_string("state", "frozentree");
3511 if (state_test(CDir::STATE_FROZENDIR)) f->dump_string("state", "frozendir");
3512 if (state_test(CDir::STATE_FREEZINGDIR)) f->dump_string("state", "freezingdir");
3513 if (state_test(CDir::STATE_EXPORTBOUND)) f->dump_string("state", "exportbound");
3514 if (state_test(CDir::STATE_IMPORTBOUND)) f->dump_string("state", "importbound");
3515 if (state_test(CDir::STATE_BADFRAG)) f->dump_string("state", "badfrag");
3516 f->close_section();
3517 }
3518 if (flags & DUMP_MDS_CACHE_OBJECT) {
3519 MDSCacheObject::dump(f);
3520 }
3521 if (flags & DUMP_ITEMS) {
3522 f->open_array_section("dentries");
3523 for (auto &p : items) {
3524 CDentry *dn = p.second;
3525 f->open_object_section("dentry");
3526 dn->dump(f);
3527 f->close_section();
3528 }
3529 f->close_section();
3530 }
3531 }
3532
3533 void CDir::dump_load(Formatter *f)
3534 {
3535 f->dump_stream("path") << get_path();
3536 f->dump_stream("dirfrag") << dirfrag();
3537
3538 f->open_object_section("pop_me");
3539 pop_me.dump(f);
3540 f->close_section();
3541
3542 f->open_object_section("pop_nested");
3543 pop_nested.dump(f);
3544 f->close_section();
3545
3546 f->open_object_section("pop_auth_subtree");
3547 pop_auth_subtree.dump(f);
3548 f->close_section();
3549
3550 f->open_object_section("pop_auth_subtree_nested");
3551 pop_auth_subtree_nested.dump(f);
3552 f->close_section();
3553 }
3554
3555 /****** Scrub Stuff *******/
3556
3557 void CDir::scrub_info_create() const
3558 {
3559 ceph_assert(!scrub_infop);
3560
3561 // break out of const-land to set up implicit initial state
3562 CDir *me = const_cast<CDir*>(this);
3563 const auto& pf = me->get_projected_fnode();
3564
3565 std::unique_ptr<scrub_info_t> si(new scrub_info_t());
3566
3567 si->last_recursive.version = pf->recursive_scrub_version;
3568 si->last_recursive.time = pf->recursive_scrub_stamp;
3569
3570 si->last_local.version = pf->localized_scrub_version;
3571 si->last_local.time = pf->localized_scrub_stamp;
3572
3573 me->scrub_infop.swap(si);
3574 }
3575
3576 void CDir::scrub_initialize(const ScrubHeaderRef& header)
3577 {
3578 ceph_assert(header);
3579 // FIXME: weird implicit construction, is someone else meant
3580 // to be calling scrub_info_create first?
3581 scrub_info();
3582 scrub_infop->directory_scrubbing = true;
3583 scrub_infop->header = header;
3584 header->inc_num_pending();
3585 }
3586
3587 void CDir::scrub_aborted() {
3588 dout(20) << __func__ << dendl;
3589 ceph_assert(scrub_is_in_progress());
3590
3591 scrub_infop->last_scrub_dirty = false;
3592 scrub_infop->directory_scrubbing = false;
3593 scrub_infop->header->dec_num_pending();
3594 scrub_infop.reset();
3595 }
3596
3597 void CDir::scrub_finished()
3598 {
3599 dout(20) << __func__ << dendl;
3600 ceph_assert(scrub_is_in_progress());
3601
3602 scrub_infop->last_local.time = ceph_clock_now();
3603 scrub_infop->last_local.version = get_version();
3604 if (scrub_infop->header->get_recursive())
3605 scrub_infop->last_recursive = scrub_infop->last_local;
3606
3607 scrub_infop->last_scrub_dirty = true;
3608
3609 scrub_infop->directory_scrubbing = false;
3610 scrub_infop->header->dec_num_pending();
3611 }
3612
3613 void CDir::scrub_maybe_delete_info()
3614 {
3615 if (scrub_infop &&
3616 !scrub_infop->directory_scrubbing &&
3617 !scrub_infop->last_scrub_dirty)
3618 scrub_infop.reset();
3619 }
3620
3621 bool CDir::scrub_local()
3622 {
3623 ceph_assert(is_complete());
3624 bool good = check_rstats(true);
3625 if (!good && scrub_infop->header->get_repair()) {
3626 mdcache->repair_dirfrag_stats(this);
3627 scrub_infop->header->set_repaired();
3628 }
3629 return good;
3630 }
3631
3632 std::string CDir::get_path() const
3633 {
3634 std::string path;
3635 get_inode()->make_path_string(path, true);
3636 return path;
3637 }
3638
3639 bool CDir::should_split_fast() const
3640 {
3641 // Max size a fragment can be before trigger fast splitting
3642 int fast_limit = g_conf()->mds_bal_split_size * g_conf()->mds_bal_fragment_fast_factor;
3643
3644 // Fast path: the sum of accounted size and null dentries does not
3645 // exceed threshold: we definitely are not over it.
3646 if (get_frag_size() + get_num_head_null() <= fast_limit) {
3647 return false;
3648 }
3649
3650 // Fast path: the accounted size of the frag exceeds threshold: we
3651 // definitely are over it
3652 if (get_frag_size() > fast_limit) {
3653 return true;
3654 }
3655
3656 int64_t effective_size = 0;
3657
3658 for (const auto &p : items) {
3659 const CDentry *dn = p.second;
3660 if (!dn->get_projected_linkage()->is_null()) {
3661 effective_size++;
3662 }
3663 }
3664
3665 return effective_size > fast_limit;
3666 }
3667
3668 bool CDir::should_merge() const
3669 {
3670 if (get_frag() == frag_t())
3671 return false;
3672
3673 if (inode->is_ephemeral_dist()) {
3674 unsigned min_frag_bits = mdcache->get_ephemeral_dist_frag_bits();
3675 if (min_frag_bits > 0 && get_frag().bits() < min_frag_bits + 1)
3676 return false;
3677 }
3678
3679 return (int)get_frag_size() < g_conf()->mds_bal_merge_size;
3680 }
3681
3682 MEMPOOL_DEFINE_OBJECT_FACTORY(CDir, co_dir, mds_co);
3683 MEMPOOL_DEFINE_OBJECT_FACTORY(CDir::scrub_info_t, scrub_info_t, mds_co)