]> git.proxmox.com Git - ceph.git/blame - ceph/src/mds/CDir.cc
import ceph 16.2.6
[ceph.git] / ceph / src / mds / CDir.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
11fdf7f2 15#include <string_view>
f67539c2 16#include <algorithm>
7c673cae
FG
17
18#include "include/types.h"
19
20#include "CDir.h"
21#include "CDentry.h"
22#include "CInode.h"
23#include "Mutation.h"
24
25#include "MDSMap.h"
26#include "MDSRank.h"
27#include "MDCache.h"
28#include "Locker.h"
29#include "MDLog.h"
30#include "LogSegment.h"
522d829b 31#include "MDBalancer.h"
7c673cae
FG
32
33#include "common/bloom_filter.hpp"
34#include "include/Context.h"
35#include "common/Clock.h"
36
37#include "osdc/Objecter.h"
38
39#include "common/config.h"
11fdf7f2 40#include "include/ceph_assert.h"
7c673cae
FG
41#include "include/compat.h"
42
43#define dout_context g_ceph_context
44#define dout_subsys ceph_subsys_mds
45#undef dout_prefix
f67539c2 46#define dout_prefix *_dout << "mds." << mdcache->mds->get_nodeid() << ".cache.dir(" << this->dirfrag() << ") "
7c673cae
FG
47
48int CDir::num_frozen_trees = 0;
49int CDir::num_freezing_trees = 0;
50
f67539c2
TL
51CDir::fnode_const_ptr CDir::empty_fnode = CDir::allocate_fnode();
52
11fdf7f2 53class CDirContext : public MDSContext
7c673cae
FG
54{
55protected:
56 CDir *dir;
f67539c2 57 MDSRank* get_mds() override {return dir->mdcache->mds;}
7c673cae
FG
58
59public:
60 explicit CDirContext(CDir *d) : dir(d) {
11fdf7f2 61 ceph_assert(dir != NULL);
7c673cae
FG
62 }
63};
64
65
66class CDirIOContext : public MDSIOContextBase
67{
68protected:
69 CDir *dir;
f67539c2 70 MDSRank* get_mds() override {return dir->mdcache->mds;}
7c673cae
FG
71
72public:
73 explicit CDirIOContext(CDir *d) : dir(d) {
11fdf7f2 74 ceph_assert(dir != NULL);
7c673cae
FG
75 }
76};
77
78
79// PINS
80//int cdir_pins[CDIR_NUM_PINS] = { 0,0,0,0,0,0,0,0,0,0,0,0,0,0 };
81
82
83ostream& operator<<(ostream& out, const CDir& dir)
84{
85 out << "[dir " << dir.dirfrag() << " " << dir.get_path() << "/"
86 << " [" << dir.first << ",head]";
87 if (dir.is_auth()) {
88 out << " auth";
89 if (dir.is_replicated())
90 out << dir.get_replicas();
91
92 if (dir.is_projected())
93 out << " pv=" << dir.get_projected_version();
94 out << " v=" << dir.get_version();
95 out << " cv=" << dir.get_committing_version();
96 out << "/" << dir.get_committed_version();
97 } else {
98 mds_authority_t a = dir.authority();
99 out << " rep@" << a.first;
100 if (a.second != CDIR_AUTH_UNKNOWN)
101 out << "," << a.second;
102 out << "." << dir.get_replica_nonce();
103 }
104
105 if (dir.is_rep()) out << " REP";
106
107 if (dir.get_dir_auth() != CDIR_AUTH_DEFAULT) {
108 if (dir.get_dir_auth().second == CDIR_AUTH_UNKNOWN)
109 out << " dir_auth=" << dir.get_dir_auth().first;
110 else
111 out << " dir_auth=" << dir.get_dir_auth();
112 }
113
11fdf7f2 114 if (dir.get_auth_pins() || dir.get_dir_auth_pins()) {
7c673cae 115 out << " ap=" << dir.get_auth_pins()
11fdf7f2
TL
116 << "+" << dir.get_dir_auth_pins();
117#ifdef MDS_AUTHPIN_SET
118 dir.print_authpin_set(out);
119#endif
120 }
7c673cae
FG
121
122 out << " state=" << dir.get_state();
123 if (dir.state_test(CDir::STATE_COMPLETE)) out << "|complete";
124 if (dir.state_test(CDir::STATE_FREEZINGTREE)) out << "|freezingtree";
125 if (dir.state_test(CDir::STATE_FROZENTREE)) out << "|frozentree";
126 if (dir.state_test(CDir::STATE_AUXSUBTREE)) out << "|auxsubtree";
7c673cae
FG
127 if (dir.state_test(CDir::STATE_FROZENDIR)) out << "|frozendir";
128 if (dir.state_test(CDir::STATE_FREEZINGDIR)) out << "|freezingdir";
129 if (dir.state_test(CDir::STATE_EXPORTBOUND)) out << "|exportbound";
130 if (dir.state_test(CDir::STATE_IMPORTBOUND)) out << "|importbound";
131 if (dir.state_test(CDir::STATE_BADFRAG)) out << "|badfrag";
132 if (dir.state_test(CDir::STATE_FRAGMENTING)) out << "|fragmenting";
11fdf7f2
TL
133 if (dir.state_test(CDir::STATE_CREATING)) out << "|creating";
134 if (dir.state_test(CDir::STATE_COMMITTING)) out << "|committing";
135 if (dir.state_test(CDir::STATE_FETCHING)) out << "|fetching";
136 if (dir.state_test(CDir::STATE_EXPORTING)) out << "|exporting";
137 if (dir.state_test(CDir::STATE_IMPORTING)) out << "|importing";
138 if (dir.state_test(CDir::STATE_STICKY)) out << "|sticky";
139 if (dir.state_test(CDir::STATE_DNPINNEDFRAG)) out << "|dnpinnedfrag";
140 if (dir.state_test(CDir::STATE_ASSIMRSTAT)) out << "|assimrstat";
7c673cae
FG
141
142 // fragstat
f67539c2
TL
143 out << " " << dir.get_fnode()->fragstat;
144 if (!(dir.get_fnode()->fragstat == dir.get_fnode()->accounted_fragstat))
145 out << "/" << dir.get_fnode()->accounted_fragstat;
11fdf7f2 146 if (g_conf()->mds_debug_scatterstat && dir.is_projected()) {
f67539c2 147 const auto& pf = dir.get_projected_fnode();
7c673cae
FG
148 out << "->" << pf->fragstat;
149 if (!(pf->fragstat == pf->accounted_fragstat))
150 out << "/" << pf->accounted_fragstat;
151 }
152
153 // rstat
f67539c2
TL
154 out << " " << dir.get_fnode()->rstat;
155 if (!(dir.get_fnode()->rstat == dir.get_fnode()->accounted_rstat))
156 out << "/" << dir.get_fnode()->accounted_rstat;
11fdf7f2 157 if (g_conf()->mds_debug_scatterstat && dir.is_projected()) {
f67539c2 158 const auto& pf = dir.get_projected_fnode();
7c673cae
FG
159 out << "->" << pf->rstat;
160 if (!(pf->rstat == pf->accounted_rstat))
161 out << "/" << pf->accounted_rstat;
162 }
163
164 out << " hs=" << dir.get_num_head_items() << "+" << dir.get_num_head_null();
165 out << ",ss=" << dir.get_num_snap_items() << "+" << dir.get_num_snap_null();
166 if (dir.get_num_dirty())
167 out << " dirty=" << dir.get_num_dirty();
168
169 if (dir.get_num_ref()) {
170 out << " |";
171 dir.print_pin_set(out);
172 }
173
174 out << " " << &dir;
175 return out << "]";
176}
177
178
179void CDir::print(ostream& out)
180{
181 out << *this;
182}
183
184
185
186
187ostream& CDir::print_db_line_prefix(ostream& out)
188{
f67539c2 189 return out << ceph_clock_now() << " mds." << mdcache->mds->get_nodeid() << ".cache.dir(" << this->dirfrag() << ") ";
7c673cae
FG
190}
191
192
193
194// -------------------------------------------------------------------
195// CDir
196
f67539c2
TL
197CDir::CDir(CInode *in, frag_t fg, MDCache *mdc, bool auth) :
198 mdcache(mdc), inode(in), frag(fg),
7c673cae 199 dirty_rstat_inodes(member_offset(CInode, dirty_rstat_item)),
b32b8144
FG
200 dirty_dentries(member_offset(CDentry, item_dir_dirty)),
201 item_dirty(this), item_new(this),
9f95a23c
TL
202 lock_caches_with_auth_pins(member_offset(MDLockCache::DirItem, item_dir)),
203 freezing_inodes(member_offset(CInode, item_freezing_inode)),
7c673cae 204 dir_rep(REP_NONE),
f67539c2
TL
205 pop_me(mdc->decayrate),
206 pop_nested(mdc->decayrate),
207 pop_auth_subtree(mdc->decayrate),
208 pop_auth_subtree_nested(mdc->decayrate),
209 pop_spread(mdc->decayrate),
28e407b8 210 pop_lru_subdirs(member_offset(CInode, item_pop_lru)),
7c673cae
FG
211 dir_auth(CDIR_AUTH_DEFAULT)
212{
7c673cae 213 // auth
11fdf7f2 214 ceph_assert(in->is_dir());
f67539c2
TL
215 if (auth)
216 state_set(STATE_AUTH);
7c673cae
FG
217}
218
219/**
220 * Check the recursive statistics on size for consistency.
221 * If mds_debug_scatterstat is enabled, assert for correctness,
222 * otherwise just print out the mismatch and continue.
223 */
224bool CDir::check_rstats(bool scrub)
225{
11fdf7f2 226 if (!g_conf()->mds_debug_scatterstat && !scrub)
7c673cae
FG
227 return true;
228
229 dout(25) << "check_rstats on " << this << dendl;
230 if (!is_complete() || !is_auth() || is_frozen()) {
92f5a8d4
TL
231 dout(3) << "check_rstats " << (scrub ? "(scrub) " : "")
232 << "bailing out -- incomplete or non-auth or frozen dir on "
233 << *this << dendl;
234 return !scrub;
7c673cae
FG
235 }
236
237 frag_info_t frag_info;
238 nest_info_t nest_info;
94b18763 239 for (auto i = items.begin(); i != items.end(); ++i) {
7c673cae
FG
240 if (i->second->last != CEPH_NOSNAP)
241 continue;
242 CDentry::linkage_t *dnl = i->second->get_linkage();
243 if (dnl->is_primary()) {
244 CInode *in = dnl->get_inode();
f67539c2 245 nest_info.add(in->get_inode()->accounted_rstat);
7c673cae
FG
246 if (in->is_dir())
247 frag_info.nsubdirs++;
248 else
249 frag_info.nfiles++;
250 } else if (dnl->is_remote())
251 frag_info.nfiles++;
252 }
253
254 bool good = true;
255 // fragstat
f67539c2 256 if(!frag_info.same_sums(fnode->fragstat)) {
7c673cae
FG
257 dout(1) << "mismatch between head items and fnode.fragstat! printing dentries" << dendl;
258 dout(1) << "get_num_head_items() = " << get_num_head_items()
f67539c2
TL
259 << "; fnode.fragstat.nfiles=" << fnode->fragstat.nfiles
260 << " fnode.fragstat.nsubdirs=" << fnode->fragstat.nsubdirs << dendl;
7c673cae
FG
261 good = false;
262 } else {
263 dout(20) << "get_num_head_items() = " << get_num_head_items()
f67539c2
TL
264 << "; fnode.fragstat.nfiles=" << fnode->fragstat.nfiles
265 << " fnode.fragstat.nsubdirs=" << fnode->fragstat.nsubdirs << dendl;
7c673cae
FG
266 }
267
268 // rstat
f67539c2 269 if (!nest_info.same_sums(fnode->rstat)) {
7c673cae 270 dout(1) << "mismatch between child accounted_rstats and my rstats!" << dendl;
f67539c2
TL
271 dout(1) << "total of child dentries: " << nest_info << dendl;
272 dout(1) << "my rstats: " << fnode->rstat << dendl;
7c673cae
FG
273 good = false;
274 } else {
f67539c2
TL
275 dout(20) << "total of child dentries: " << nest_info << dendl;
276 dout(20) << "my rstats: " << fnode->rstat << dendl;
7c673cae
FG
277 }
278
279 if (!good) {
280 if (!scrub) {
94b18763 281 for (auto i = items.begin(); i != items.end(); ++i) {
7c673cae
FG
282 CDentry *dn = i->second;
283 if (dn->get_linkage()->is_primary()) {
284 CInode *in = dn->get_linkage()->inode;
f67539c2 285 dout(1) << *dn << " rstat " << in->get_inode()->accounted_rstat << dendl;
7c673cae
FG
286 } else {
287 dout(1) << *dn << dendl;
288 }
289 }
290
f67539c2
TL
291 ceph_assert(frag_info.nfiles == fnode->fragstat.nfiles);
292 ceph_assert(frag_info.nsubdirs == fnode->fragstat.nsubdirs);
293 ceph_assert(nest_info.rbytes == fnode->rstat.rbytes);
294 ceph_assert(nest_info.rfiles == fnode->rstat.rfiles);
295 ceph_assert(nest_info.rsubdirs == fnode->rstat.rsubdirs);
7c673cae
FG
296 }
297 }
298 dout(10) << "check_rstats complete on " << this << dendl;
299 return good;
300}
301
11fdf7f2
TL
302void CDir::adjust_num_inodes_with_caps(int d)
303{
304 // FIXME: smarter way to decide if adding 'this' to open file table
305 if (num_inodes_with_caps == 0 && d > 0)
f67539c2 306 mdcache->open_file_table.add_dirfrag(this);
11fdf7f2 307 else if (num_inodes_with_caps > 0 && num_inodes_with_caps == -d)
f67539c2 308 mdcache->open_file_table.remove_dirfrag(this);
11fdf7f2
TL
309
310 num_inodes_with_caps += d;
311 ceph_assert(num_inodes_with_caps >= 0);
312}
313
314CDentry *CDir::lookup(std::string_view name, snapid_t snap)
7c673cae
FG
315{
316 dout(20) << "lookup (" << snap << ", '" << name << "')" << dendl;
94b18763 317 auto iter = items.lower_bound(dentry_key_t(snap, name, inode->hash_dentry_name(name)));
7c673cae
FG
318 if (iter == items.end())
319 return 0;
94b18763 320 if (iter->second->get_name() == name &&
7c673cae
FG
321 iter->second->first <= snap &&
322 iter->second->last >= snap) {
323 dout(20) << " hit -> " << iter->first << dendl;
324 return iter->second;
325 }
326 dout(20) << " miss -> " << iter->first << dendl;
327 return 0;
328}
329
11fdf7f2
TL
330CDentry *CDir::lookup_exact_snap(std::string_view name, snapid_t last) {
331 dout(20) << __func__ << " (" << last << ", '" << name << "')" << dendl;
94b18763 332 auto p = items.find(dentry_key_t(last, name, inode->hash_dentry_name(name)));
7c673cae
FG
333 if (p == items.end())
334 return NULL;
335 return p->second;
336}
337
338/***
339 * linking fun
340 */
341
11fdf7f2 342CDentry* CDir::add_null_dentry(std::string_view dname,
7c673cae
FG
343 snapid_t first, snapid_t last)
344{
345 // foreign
11fdf7f2 346 ceph_assert(lookup_exact_snap(dname, last) == 0);
7c673cae
FG
347
348 // create dentry
f67539c2 349 CDentry* dn = new CDentry(dname, inode->hash_dentry_name(dname), "", first, last);
7c673cae
FG
350 if (is_auth())
351 dn->state_set(CDentry::STATE_AUTH);
31f18b77 352
f67539c2 353 mdcache->bottom_lru.lru_insert_mid(dn);
31f18b77 354 dn->state_set(CDentry::STATE_BOTTOMLRU);
7c673cae
FG
355
356 dn->dir = this;
357 dn->version = get_projected_version();
358
359 // add to dir
11fdf7f2 360 ceph_assert(items.count(dn->key()) == 0);
94b18763 361 //assert(null_items.count(dn->get_name()) == 0);
7c673cae
FG
362
363 items[dn->key()] = dn;
364 if (last == CEPH_NOSNAP)
365 num_head_null++;
366 else
367 num_snap_null++;
368
369 if (state_test(CDir::STATE_DNPINNEDFRAG)) {
370 dn->get(CDentry::PIN_FRAGMENTING);
371 dn->state_set(CDentry::STATE_FRAGMENTING);
372 }
373
11fdf7f2 374 dout(12) << __func__ << " " << *dn << dendl;
7c673cae
FG
375
376 // pin?
377 if (get_num_any() == 1)
378 get(PIN_CHILD);
379
11fdf7f2 380 ceph_assert(get_num_any() == items.size());
7c673cae
FG
381 return dn;
382}
383
384
11fdf7f2 385CDentry* CDir::add_primary_dentry(std::string_view dname, CInode *in,
f67539c2 386 mempool::mds_co::string alternate_name,
7c673cae
FG
387 snapid_t first, snapid_t last)
388{
389 // primary
11fdf7f2 390 ceph_assert(lookup_exact_snap(dname, last) == 0);
7c673cae
FG
391
392 // create dentry
f67539c2 393 CDentry* dn = new CDentry(dname, inode->hash_dentry_name(dname), std::move(alternate_name), first, last);
7c673cae
FG
394 if (is_auth())
395 dn->state_set(CDentry::STATE_AUTH);
31f18b77 396 if (is_auth() || !inode->is_stray()) {
f67539c2 397 mdcache->lru.lru_insert_mid(dn);
31f18b77 398 } else {
f67539c2 399 mdcache->bottom_lru.lru_insert_mid(dn);
31f18b77
FG
400 dn->state_set(CDentry::STATE_BOTTOMLRU);
401 }
7c673cae
FG
402
403 dn->dir = this;
404 dn->version = get_projected_version();
405
406 // add to dir
11fdf7f2 407 ceph_assert(items.count(dn->key()) == 0);
94b18763 408 //assert(null_items.count(dn->get_name()) == 0);
7c673cae
FG
409
410 items[dn->key()] = dn;
411
412 dn->get_linkage()->inode = in;
7c673cae
FG
413
414 link_inode_work(dn, in);
415
416 if (dn->last == CEPH_NOSNAP)
417 num_head_items++;
418 else
419 num_snap_items++;
420
421 if (state_test(CDir::STATE_DNPINNEDFRAG)) {
422 dn->get(CDentry::PIN_FRAGMENTING);
423 dn->state_set(CDentry::STATE_FRAGMENTING);
424 }
425
11fdf7f2 426 dout(12) << __func__ << " " << *dn << dendl;
7c673cae
FG
427
428 // pin?
429 if (get_num_any() == 1)
430 get(PIN_CHILD);
11fdf7f2 431 ceph_assert(get_num_any() == items.size());
7c673cae
FG
432 return dn;
433}
434
11fdf7f2 435CDentry* CDir::add_remote_dentry(std::string_view dname, inodeno_t ino, unsigned char d_type,
f67539c2 436 mempool::mds_co::string alternate_name,
7c673cae
FG
437 snapid_t first, snapid_t last)
438{
439 // foreign
11fdf7f2 440 ceph_assert(lookup_exact_snap(dname, last) == 0);
7c673cae
FG
441
442 // create dentry
f67539c2 443 CDentry* dn = new CDentry(dname, inode->hash_dentry_name(dname), std::move(alternate_name), ino, d_type, first, last);
7c673cae
FG
444 if (is_auth())
445 dn->state_set(CDentry::STATE_AUTH);
f67539c2 446 mdcache->lru.lru_insert_mid(dn);
7c673cae
FG
447
448 dn->dir = this;
449 dn->version = get_projected_version();
450
451 // add to dir
11fdf7f2 452 ceph_assert(items.count(dn->key()) == 0);
94b18763 453 //assert(null_items.count(dn->get_name()) == 0);
7c673cae
FG
454
455 items[dn->key()] = dn;
456 if (last == CEPH_NOSNAP)
457 num_head_items++;
458 else
459 num_snap_items++;
460
461 if (state_test(CDir::STATE_DNPINNEDFRAG)) {
462 dn->get(CDentry::PIN_FRAGMENTING);
463 dn->state_set(CDentry::STATE_FRAGMENTING);
464 }
465
11fdf7f2 466 dout(12) << __func__ << " " << *dn << dendl;
7c673cae
FG
467
468 // pin?
469 if (get_num_any() == 1)
470 get(PIN_CHILD);
471
11fdf7f2 472 ceph_assert(get_num_any() == items.size());
7c673cae
FG
473 return dn;
474}
475
476
477
478void CDir::remove_dentry(CDentry *dn)
479{
11fdf7f2 480 dout(12) << __func__ << " " << *dn << dendl;
7c673cae
FG
481
482 // there should be no client leases at this point!
11fdf7f2 483 ceph_assert(dn->client_lease_map.empty());
7c673cae
FG
484
485 if (state_test(CDir::STATE_DNPINNEDFRAG)) {
486 dn->put(CDentry::PIN_FRAGMENTING);
487 dn->state_clear(CDentry::STATE_FRAGMENTING);
488 }
489
490 if (dn->get_linkage()->is_null()) {
491 if (dn->last == CEPH_NOSNAP)
492 num_head_null--;
493 else
494 num_snap_null--;
495 } else {
496 if (dn->last == CEPH_NOSNAP)
497 num_head_items--;
498 else
499 num_snap_items--;
500 }
501
502 if (!dn->get_linkage()->is_null())
503 // detach inode and dentry
504 unlink_inode_work(dn);
505
506 // remove from list
11fdf7f2 507 ceph_assert(items.count(dn->key()) == 1);
7c673cae
FG
508 items.erase(dn->key());
509
510 // clean?
511 if (dn->is_dirty())
512 dn->mark_clean();
513
31f18b77 514 if (dn->state_test(CDentry::STATE_BOTTOMLRU))
f67539c2 515 mdcache->bottom_lru.lru_remove(dn);
31f18b77 516 else
f67539c2 517 mdcache->lru.lru_remove(dn);
7c673cae
FG
518 delete dn;
519
520 // unpin?
521 if (get_num_any() == 0)
522 put(PIN_CHILD);
11fdf7f2 523 ceph_assert(get_num_any() == items.size());
7c673cae
FG
524}
525
526void CDir::link_remote_inode(CDentry *dn, CInode *in)
527{
528 link_remote_inode(dn, in->ino(), IFTODT(in->get_projected_inode()->mode));
529}
530
531void CDir::link_remote_inode(CDentry *dn, inodeno_t ino, unsigned char d_type)
532{
11fdf7f2
TL
533 dout(12) << __func__ << " " << *dn << " remote " << ino << dendl;
534 ceph_assert(dn->get_linkage()->is_null());
7c673cae
FG
535
536 dn->get_linkage()->set_remote(ino, d_type);
537
31f18b77 538 if (dn->state_test(CDentry::STATE_BOTTOMLRU)) {
f67539c2
TL
539 mdcache->bottom_lru.lru_remove(dn);
540 mdcache->lru.lru_insert_mid(dn);
31f18b77
FG
541 dn->state_clear(CDentry::STATE_BOTTOMLRU);
542 }
543
7c673cae
FG
544 if (dn->last == CEPH_NOSNAP) {
545 num_head_items++;
546 num_head_null--;
547 } else {
548 num_snap_items++;
549 num_snap_null--;
550 }
11fdf7f2 551 ceph_assert(get_num_any() == items.size());
7c673cae
FG
552}
553
554void CDir::link_primary_inode(CDentry *dn, CInode *in)
555{
11fdf7f2
TL
556 dout(12) << __func__ << " " << *dn << " " << *in << dendl;
557 ceph_assert(dn->get_linkage()->is_null());
7c673cae
FG
558
559 dn->get_linkage()->inode = in;
7c673cae
FG
560
561 link_inode_work(dn, in);
31f18b77
FG
562
563 if (dn->state_test(CDentry::STATE_BOTTOMLRU) &&
564 (is_auth() || !inode->is_stray())) {
f67539c2
TL
565 mdcache->bottom_lru.lru_remove(dn);
566 mdcache->lru.lru_insert_mid(dn);
31f18b77
FG
567 dn->state_clear(CDentry::STATE_BOTTOMLRU);
568 }
7c673cae
FG
569
570 if (dn->last == CEPH_NOSNAP) {
571 num_head_items++;
572 num_head_null--;
573 } else {
574 num_snap_items++;
575 num_snap_null--;
576 }
577
11fdf7f2 578 ceph_assert(get_num_any() == items.size());
7c673cae
FG
579}
580
581void CDir::link_inode_work( CDentry *dn, CInode *in)
582{
11fdf7f2 583 ceph_assert(dn->get_linkage()->get_inode() == in);
28e407b8 584 in->set_primary_parent(dn);
7c673cae
FG
585
586 // set inode version
587 //in->inode.version = dn->get_version();
588
589 // pin dentry?
590 if (in->get_num_ref())
591 dn->get(CDentry::PIN_INODEPIN);
11fdf7f2
TL
592
593 if (in->state_test(CInode::STATE_TRACKEDBYOFT))
f67539c2 594 mdcache->open_file_table.notify_link(in);
11fdf7f2
TL
595 if (in->is_any_caps())
596 adjust_num_inodes_with_caps(1);
7c673cae
FG
597
598 // adjust auth pin count
11fdf7f2
TL
599 if (in->auth_pins)
600 dn->adjust_nested_auth_pins(in->auth_pins, NULL);
7c673cae 601
9f95a23c
TL
602 if (in->is_freezing_inode())
603 freezing_inodes.push_back(&in->item_freezing_inode);
604 else if (in->is_frozen_inode() || in->is_frozen_auth_pin())
605 num_frozen_inodes++;
606
7c673cae
FG
607 // verify open snaprealm parent
608 if (in->snaprealm)
609 in->snaprealm->adjust_parent();
610 else if (in->is_any_caps())
611 in->move_to_realm(inode->find_snaprealm());
612}
613
31f18b77 614void CDir::unlink_inode(CDentry *dn, bool adjust_lru)
7c673cae
FG
615{
616 if (dn->get_linkage()->is_primary()) {
11fdf7f2 617 dout(12) << __func__ << " " << *dn << " " << *dn->get_linkage()->get_inode() << dendl;
7c673cae 618 } else {
11fdf7f2 619 dout(12) << __func__ << " " << *dn << dendl;
7c673cae
FG
620 }
621
622 unlink_inode_work(dn);
623
31f18b77 624 if (adjust_lru && !dn->state_test(CDentry::STATE_BOTTOMLRU)) {
f67539c2
TL
625 mdcache->lru.lru_remove(dn);
626 mdcache->bottom_lru.lru_insert_mid(dn);
31f18b77
FG
627 dn->state_set(CDentry::STATE_BOTTOMLRU);
628 }
629
7c673cae
FG
630 if (dn->last == CEPH_NOSNAP) {
631 num_head_items--;
632 num_head_null++;
633 } else {
634 num_snap_items--;
635 num_snap_null++;
636 }
11fdf7f2 637 ceph_assert(get_num_any() == items.size());
7c673cae
FG
638}
639
640
641void CDir::try_remove_unlinked_dn(CDentry *dn)
642{
11fdf7f2
TL
643 ceph_assert(dn->dir == this);
644 ceph_assert(dn->get_linkage()->is_null());
7c673cae
FG
645
646 // no pins (besides dirty)?
647 if (dn->get_num_ref() != dn->is_dirty())
648 return;
649
650 // was the dn new?
651 if (dn->is_new()) {
11fdf7f2 652 dout(10) << __func__ << " " << *dn << " in " << *this << dendl;
7c673cae
FG
653 if (dn->is_dirty())
654 dn->mark_clean();
655 remove_dentry(dn);
656
657 // NOTE: we may not have any more dirty dentries, but the fnode
658 // still changed, so the directory must remain dirty.
659 }
660}
661
662
11fdf7f2 663void CDir::unlink_inode_work(CDentry *dn)
7c673cae
FG
664{
665 CInode *in = dn->get_linkage()->get_inode();
666
667 if (dn->get_linkage()->is_remote()) {
668 // remote
669 if (in)
670 dn->unlink_remote(dn->get_linkage());
671
672 dn->get_linkage()->set_remote(0, 0);
673 } else if (dn->get_linkage()->is_primary()) {
674 // primary
675 // unpin dentry?
676 if (in->get_num_ref())
677 dn->put(CDentry::PIN_INODEPIN);
11fdf7f2
TL
678
679 if (in->state_test(CInode::STATE_TRACKEDBYOFT))
f67539c2 680 mdcache->open_file_table.notify_unlink(in);
11fdf7f2
TL
681 if (in->is_any_caps())
682 adjust_num_inodes_with_caps(-1);
7c673cae
FG
683
684 // unlink auth_pin count
11fdf7f2
TL
685 if (in->auth_pins)
686 dn->adjust_nested_auth_pins(-in->auth_pins, nullptr);
28e407b8 687
9f95a23c
TL
688 if (in->is_freezing_inode())
689 in->item_freezing_inode.remove_myself();
690 else if (in->is_frozen_inode() || in->is_frozen_auth_pin())
691 num_frozen_inodes--;
692
7c673cae
FG
693 // detach inode
694 in->remove_primary_parent(dn);
28e407b8
AA
695 if (in->is_dir())
696 in->item_pop_lru.remove_myself();
7c673cae
FG
697 dn->get_linkage()->inode = 0;
698 } else {
11fdf7f2 699 ceph_assert(!dn->get_linkage()->is_null());
7c673cae
FG
700 }
701}
702
703void CDir::add_to_bloom(CDentry *dn)
704{
11fdf7f2 705 ceph_assert(dn->last == CEPH_NOSNAP);
7c673cae
FG
706 if (!bloom) {
707 /* not create bloom filter for incomplete dir that was added by log replay */
708 if (!is_complete())
709 return;
710
711 /* don't maintain bloom filters in standby replay (saves cycles, and also
712 * avoids need to implement clearing it in EExport for #16924) */
f67539c2 713 if (mdcache->mds->is_standby_replay()) {
7c673cae
FG
714 return;
715 }
716
717 unsigned size = get_num_head_items() + get_num_snap_items();
718 if (size < 100) size = 100;
719 bloom.reset(new bloom_filter(size, 1.0 / size, 0));
720 }
721 /* This size and false positive probability is completely random.*/
94b18763 722 bloom->insert(dn->get_name().data(), dn->get_name().size());
7c673cae
FG
723}
724
11fdf7f2 725bool CDir::is_in_bloom(std::string_view name)
7c673cae
FG
726{
727 if (!bloom)
728 return false;
94b18763 729 return bloom->contains(name.data(), name.size());
7c673cae
FG
730}
731
732void CDir::remove_null_dentries() {
11fdf7f2 733 dout(12) << __func__ << " " << *this << dendl;
7c673cae 734
94b18763 735 auto p = items.begin();
7c673cae
FG
736 while (p != items.end()) {
737 CDentry *dn = p->second;
738 ++p;
739 if (dn->get_linkage()->is_null() && !dn->is_projected())
740 remove_dentry(dn);
741 }
742
11fdf7f2
TL
743 ceph_assert(num_snap_null == 0);
744 ceph_assert(num_head_null == 0);
745 ceph_assert(get_num_any() == items.size());
7c673cae
FG
746}
747
748/** remove dirty null dentries for deleted directory. the dirfrag will be
749 * deleted soon, so it's safe to not commit dirty dentries.
750 *
751 * This is called when a directory is being deleted, a prerequisite
752 * of which is that its children have been unlinked: we expect to only see
753 * null, unprojected dentries here.
754 */
755void CDir::try_remove_dentries_for_stray()
756{
757 dout(10) << __func__ << dendl;
11fdf7f2 758 ceph_assert(get_parent_dir()->inode->is_stray());
7c673cae
FG
759
760 // clear dirty only when the directory was not snapshotted
761 bool clear_dirty = !inode->snaprealm;
762
94b18763 763 auto p = items.begin();
7c673cae
FG
764 while (p != items.end()) {
765 CDentry *dn = p->second;
766 ++p;
767 if (dn->last == CEPH_NOSNAP) {
11fdf7f2
TL
768 ceph_assert(!dn->is_projected());
769 ceph_assert(dn->get_linkage()->is_null());
7c673cae
FG
770 if (clear_dirty && dn->is_dirty())
771 dn->mark_clean();
772 // It's OK to remove lease prematurely because we will never link
773 // the dentry to inode again.
774 if (dn->is_any_leases())
f67539c2 775 dn->remove_client_leases(mdcache->mds->locker);
7c673cae
FG
776 if (dn->get_num_ref() == 0)
777 remove_dentry(dn);
778 } else {
11fdf7f2 779 ceph_assert(!dn->is_projected());
7c673cae
FG
780 CDentry::linkage_t *dnl= dn->get_linkage();
781 CInode *in = NULL;
782 if (dnl->is_primary()) {
783 in = dnl->get_inode();
784 if (clear_dirty && in->is_dirty())
785 in->mark_clean();
786 }
787 if (clear_dirty && dn->is_dirty())
788 dn->mark_clean();
789 if (dn->get_num_ref() == 0) {
790 remove_dentry(dn);
791 if (in)
f67539c2 792 mdcache->remove_inode(in);
7c673cae
FG
793 }
794 }
795 }
796
797 if (clear_dirty && is_dirty())
798 mark_clean();
799}
800
7c673cae
FG
801bool CDir::try_trim_snap_dentry(CDentry *dn, const set<snapid_t>& snaps)
802{
11fdf7f2 803 ceph_assert(dn->last != CEPH_NOSNAP);
7c673cae
FG
804 set<snapid_t>::const_iterator p = snaps.lower_bound(dn->first);
805 CDentry::linkage_t *dnl= dn->get_linkage();
806 CInode *in = 0;
807 if (dnl->is_primary())
808 in = dnl->get_inode();
809 if ((p == snaps.end() || *p > dn->last) &&
810 (dn->get_num_ref() == dn->is_dirty()) &&
811 (!in || in->get_num_ref() == in->is_dirty())) {
812 dout(10) << " purging snapped " << *dn << dendl;
813 if (in && in->is_dirty())
814 in->mark_clean();
815 remove_dentry(dn);
816 if (in) {
817 dout(10) << " purging snapped " << *in << dendl;
f67539c2 818 mdcache->remove_inode(in);
7c673cae
FG
819 }
820 return true;
821 }
822 return false;
823}
824
825
826void CDir::purge_stale_snap_data(const set<snapid_t>& snaps)
827{
11fdf7f2 828 dout(10) << __func__ << " " << snaps << dendl;
7c673cae 829
94b18763 830 auto p = items.begin();
7c673cae
FG
831 while (p != items.end()) {
832 CDentry *dn = p->second;
833 ++p;
834
835 if (dn->last == CEPH_NOSNAP)
836 continue;
837
838 try_trim_snap_dentry(dn, snaps);
839 }
840}
841
842
843/**
844 * steal_dentry -- semi-violently move a dentry from one CDir to another
845 * (*) violently, in that nitems, most pins, etc. are not correctly maintained
846 * on the old CDir corpse; must call finish_old_fragment() when finished.
847 */
848void CDir::steal_dentry(CDentry *dn)
849{
11fdf7f2 850 dout(15) << __func__ << " " << *dn << dendl;
7c673cae
FG
851
852 items[dn->key()] = dn;
853
854 dn->dir->items.erase(dn->key());
855 if (dn->dir->items.empty())
856 dn->dir->put(PIN_CHILD);
857
858 if (get_num_any() == 0)
859 get(PIN_CHILD);
860 if (dn->get_linkage()->is_null()) {
861 if (dn->last == CEPH_NOSNAP)
862 num_head_null++;
863 else
864 num_snap_null++;
865 } else if (dn->last == CEPH_NOSNAP) {
866 num_head_items++;
867
f67539c2
TL
868 auto _fnode = _get_fnode();
869
7c673cae
FG
870 if (dn->get_linkage()->is_primary()) {
871 CInode *in = dn->get_linkage()->get_inode();
f67539c2 872 const auto& pi = in->get_projected_inode();
28e407b8 873 if (in->is_dir()) {
f67539c2 874 _fnode->fragstat.nsubdirs++;
28e407b8
AA
875 if (in->item_pop_lru.is_on_list())
876 pop_lru_subdirs.push_back(&in->item_pop_lru);
877 } else {
f67539c2 878 _fnode->fragstat.nfiles++;
28e407b8 879 }
f67539c2
TL
880 _fnode->rstat.rbytes += pi->accounted_rstat.rbytes;
881 _fnode->rstat.rfiles += pi->accounted_rstat.rfiles;
882 _fnode->rstat.rsubdirs += pi->accounted_rstat.rsubdirs;
883 _fnode->rstat.rsnaps += pi->accounted_rstat.rsnaps;
884 if (pi->accounted_rstat.rctime > fnode->rstat.rctime)
885 _fnode->rstat.rctime = pi->accounted_rstat.rctime;
7c673cae 886
11fdf7f2
TL
887 if (in->is_any_caps())
888 adjust_num_inodes_with_caps(1);
889
7c673cae
FG
890 // move dirty inode rstat to new dirfrag
891 if (in->is_dirty_rstat())
892 dirty_rstat_inodes.push_back(&in->dirty_rstat_item);
893 } else if (dn->get_linkage()->is_remote()) {
894 if (dn->get_linkage()->get_remote_d_type() == DT_DIR)
f67539c2 895 _fnode->fragstat.nsubdirs++;
7c673cae 896 else
f67539c2 897 _fnode->fragstat.nfiles++;
7c673cae
FG
898 }
899 } else {
900 num_snap_items++;
901 if (dn->get_linkage()->is_primary()) {
902 CInode *in = dn->get_linkage()->get_inode();
903 if (in->is_dirty_rstat())
904 dirty_rstat_inodes.push_back(&in->dirty_rstat_item);
905 }
906 }
907
11fdf7f2 908 {
7c673cae 909 int dap = dn->get_num_dir_auth_pins();
11fdf7f2
TL
910 if (dap) {
911 adjust_nested_auth_pins(dap, NULL);
912 dn->dir->adjust_nested_auth_pins(-dap, NULL);
913 }
7c673cae
FG
914 }
915
b32b8144
FG
916 if (dn->is_dirty()) {
917 dirty_dentries.push_back(&dn->item_dir_dirty);
7c673cae 918 num_dirty++;
b32b8144 919 }
7c673cae
FG
920
921 dn->dir = this;
922}
923
11fdf7f2 924void CDir::prepare_old_fragment(map<string_snap_t, MDSContext::vec >& dentry_waiters, bool replay)
7c673cae
FG
925{
926 // auth_pin old fragment for duration so that any auth_pinning
927 // during the dentry migration doesn't trigger side effects
928 if (!replay && is_auth())
929 auth_pin(this);
31f18b77
FG
930
931 if (!waiting_on_dentry.empty()) {
94b18763
FG
932 for (const auto &p : waiting_on_dentry) {
933 auto &e = dentry_waiters[p.first];
934 for (const auto &waiter : p.second) {
935 e.push_back(waiter);
936 }
937 }
31f18b77
FG
938 waiting_on_dentry.clear();
939 put(PIN_DNWAITER);
940 }
7c673cae
FG
941}
942
943void CDir::prepare_new_fragment(bool replay)
944{
945 if (!replay && is_auth()) {
946 _freeze_dir();
947 mark_complete();
948 }
31f18b77 949 inode->add_dirfrag(this);
7c673cae
FG
950}
951
11fdf7f2 952void CDir::finish_old_fragment(MDSContext::vec& waiters, bool replay)
7c673cae
FG
953{
954 // take waiters _before_ unfreeze...
955 if (!replay) {
956 take_waiting(WAIT_ANY_MASK, waiters);
957 if (is_auth()) {
958 auth_unpin(this); // pinned in prepare_old_fragment
11fdf7f2 959 ceph_assert(is_frozen_dir());
7c673cae
FG
960 unfreeze_dir();
961 }
962 }
963
11fdf7f2
TL
964 ceph_assert(dir_auth_pins == 0);
965 ceph_assert(auth_pins == 0);
7c673cae
FG
966
967 num_head_items = num_head_null = 0;
968 num_snap_items = num_snap_null = 0;
11fdf7f2 969 adjust_num_inodes_with_caps(-num_inodes_with_caps);
7c673cae
FG
970
971 // this mirrors init_fragment_pins()
972 if (is_auth())
973 clear_replica_map();
974 if (is_dirty())
975 mark_clean();
976 if (state_test(STATE_IMPORTBOUND))
977 put(PIN_IMPORTBOUND);
978 if (state_test(STATE_EXPORTBOUND))
979 put(PIN_EXPORTBOUND);
980 if (is_subtree_root())
981 put(PIN_SUBTREE);
982
983 if (auth_pins > 0)
984 put(PIN_AUTHPIN);
985
11fdf7f2 986 ceph_assert(get_num_ref() == (state_test(STATE_STICKY) ? 1:0));
7c673cae
FG
987}
988
989void CDir::init_fragment_pins()
990{
181888fb 991 if (is_replicated())
7c673cae
FG
992 get(PIN_REPLICATED);
993 if (state_test(STATE_DIRTY))
994 get(PIN_DIRTY);
995 if (state_test(STATE_EXPORTBOUND))
996 get(PIN_EXPORTBOUND);
997 if (state_test(STATE_IMPORTBOUND))
998 get(PIN_IMPORTBOUND);
999 if (is_subtree_root())
1000 get(PIN_SUBTREE);
1001}
1002
9f95a23c 1003void CDir::split(int bits, std::vector<CDir*>* subs, MDSContext::vec& waiters, bool replay)
7c673cae
FG
1004{
1005 dout(10) << "split by " << bits << " bits on " << *this << dendl;
1006
11fdf7f2 1007 ceph_assert(replay || is_complete() || !is_auth());
7c673cae 1008
11fdf7f2 1009 frag_vec_t frags;
7c673cae
FG
1010 frag.split(bits, frags);
1011
1012 vector<CDir*> subfrags(1 << bits);
1013
1014 double fac = 1.0 / (double)(1 << bits); // for scaling load vecs
1015
1016 version_t rstat_version = inode->get_projected_inode()->rstat.version;
1017 version_t dirstat_version = inode->get_projected_inode()->dirstat.version;
1018
1019 nest_info_t rstatdiff;
1020 frag_info_t fragstatdiff;
f67539c2
TL
1021 if (fnode->accounted_rstat.version == rstat_version)
1022 rstatdiff.add_delta(fnode->accounted_rstat, fnode->rstat);
1023 if (fnode->accounted_fragstat.version == dirstat_version)
1024 fragstatdiff.add_delta(fnode->accounted_fragstat, fnode->fragstat);
7c673cae
FG
1025 dout(10) << " rstatdiff " << rstatdiff << " fragstatdiff " << fragstatdiff << dendl;
1026
11fdf7f2 1027 map<string_snap_t, MDSContext::vec > dentry_waiters;
31f18b77 1028 prepare_old_fragment(dentry_waiters, replay);
7c673cae
FG
1029
1030 // create subfrag dirs
1031 int n = 0;
11fdf7f2 1032 for (const auto& fg : frags) {
f67539c2 1033 CDir *f = new CDir(inode, fg, mdcache, is_auth());
7c673cae 1034 f->state_set(state & (MASK_STATE_FRAGMENT_KEPT | STATE_COMPLETE));
181888fb 1035 f->get_replicas() = get_replicas();
7c673cae
FG
1036 f->pop_me = pop_me;
1037 f->pop_me.scale(fac);
1038
1039 // FIXME; this is an approximation
1040 f->pop_nested = pop_nested;
1041 f->pop_nested.scale(fac);
1042 f->pop_auth_subtree = pop_auth_subtree;
1043 f->pop_auth_subtree.scale(fac);
1044 f->pop_auth_subtree_nested = pop_auth_subtree_nested;
1045 f->pop_auth_subtree_nested.scale(fac);
1046
11fdf7f2 1047 dout(10) << " subfrag " << fg << " " << *f << dendl;
7c673cae 1048 subfrags[n++] = f;
9f95a23c 1049 subs->push_back(f);
7c673cae
FG
1050
1051 f->set_dir_auth(get_dir_auth());
11fdf7f2 1052 f->freeze_tree_state = freeze_tree_state;
7c673cae 1053 f->prepare_new_fragment(replay);
1adf2230 1054 f->init_fragment_pins();
7c673cae
FG
1055 }
1056
1057 // repartition dentries
1058 while (!items.empty()) {
94b18763 1059 auto p = items.begin();
7c673cae
FG
1060
1061 CDentry *dn = p->second;
94b18763 1062 frag_t subfrag = inode->pick_dirfrag(dn->get_name());
7c673cae
FG
1063 int n = (subfrag.value() & (subfrag.mask() ^ frag.mask())) >> subfrag.mask_shift();
1064 dout(15) << " subfrag " << subfrag << " n=" << n << " for " << p->first << dendl;
1065 CDir *f = subfrags[n];
1066 f->steal_dentry(dn);
1067 }
1068
94b18763 1069 for (const auto &p : dentry_waiters) {
31f18b77
FG
1070 frag_t subfrag = inode->pick_dirfrag(p.first.name);
1071 int n = (subfrag.value() & (subfrag.mask() ^ frag.mask())) >> subfrag.mask_shift();
1072 CDir *f = subfrags[n];
1073
1074 if (f->waiting_on_dentry.empty())
1075 f->get(PIN_DNWAITER);
94b18763
FG
1076 auto &e = f->waiting_on_dentry[p.first];
1077 for (const auto &waiter : p.second) {
1078 e.push_back(waiter);
1079 }
31f18b77
FG
1080 }
1081
7c673cae
FG
1082 // FIXME: handle dirty old rstat
1083
1084 // fix up new frag fragstats
f67539c2 1085 for (int i = 0; i < n; i++) {
7c673cae 1086 CDir *f = subfrags[i];
f67539c2
TL
1087 auto _fnode = f->_get_fnode();
1088 _fnode->version = f->projected_version = get_version();
1089 _fnode->rstat.version = rstat_version;
1090 _fnode->accounted_rstat = _fnode->rstat;
1091 _fnode->fragstat.version = dirstat_version;
1092 _fnode->accounted_fragstat = _fnode->fragstat;
1093 dout(10) << " rstat " << _fnode->rstat << " fragstat " << _fnode->fragstat
7c673cae 1094 << " on " << *f << dendl;
7c673cae 1095
f67539c2
TL
1096 if (i == 0) {
1097 // give any outstanding frag stat differential to first frag
1098 dout(10) << " giving rstatdiff " << rstatdiff << " fragstatdiff" << fragstatdiff
1099 << " to " << *subfrags[0] << dendl;
1100 _fnode->accounted_rstat.add(rstatdiff);
1101 _fnode->accounted_fragstat.add(fragstatdiff);
1102 }
1103 }
7c673cae
FG
1104
1105 finish_old_fragment(waiters, replay);
1106}
1107
9f95a23c 1108void CDir::merge(const std::vector<CDir*>& subs, MDSContext::vec& waiters, bool replay)
7c673cae
FG
1109{
1110 dout(10) << "merge " << subs << dendl;
1111
9f95a23c
TL
1112 ceph_assert(subs.size() > 0);
1113
11fdf7f2
TL
1114 set_dir_auth(subs.front()->get_dir_auth());
1115 freeze_tree_state = subs.front()->freeze_tree_state;
1116
9f95a23c 1117 for (const auto& dir : subs) {
11fdf7f2
TL
1118 ceph_assert(get_dir_auth() == dir->get_dir_auth());
1119 ceph_assert(freeze_tree_state == dir->freeze_tree_state);
7c673cae
FG
1120 }
1121
7c673cae
FG
1122 prepare_new_fragment(replay);
1123
f67539c2
TL
1124 auto _fnode = _get_fnode();
1125
7c673cae
FG
1126 nest_info_t rstatdiff;
1127 frag_info_t fragstatdiff;
1128 bool touched_mtime, touched_chattr;
1129 version_t rstat_version = inode->get_projected_inode()->rstat.version;
1130 version_t dirstat_version = inode->get_projected_inode()->dirstat.version;
1131
11fdf7f2 1132 map<string_snap_t, MDSContext::vec > dentry_waiters;
31f18b77 1133
9f95a23c 1134 for (const auto& dir : subs) {
7c673cae 1135 dout(10) << " subfrag " << dir->get_frag() << " " << *dir << dendl;
11fdf7f2 1136 ceph_assert(!dir->is_auth() || dir->is_complete() || replay);
7c673cae 1137
f67539c2
TL
1138 if (dir->get_fnode()->accounted_rstat.version == rstat_version)
1139 rstatdiff.add_delta(dir->get_fnode()->accounted_rstat, dir->get_fnode()->rstat);
1140 if (dir->get_fnode()->accounted_fragstat.version == dirstat_version)
1141 fragstatdiff.add_delta(dir->get_fnode()->accounted_fragstat, dir->get_fnode()->fragstat,
7c673cae
FG
1142 &touched_mtime, &touched_chattr);
1143
31f18b77 1144 dir->prepare_old_fragment(dentry_waiters, replay);
7c673cae
FG
1145
1146 // steal dentries
1147 while (!dir->items.empty())
1148 steal_dentry(dir->items.begin()->second);
1149
1150 // merge replica map
181888fb
FG
1151 for (const auto &p : dir->get_replicas()) {
1152 unsigned cur = get_replicas()[p.first];
1153 if (p.second > cur)
1154 get_replicas()[p.first] = p.second;
7c673cae
FG
1155 }
1156
1157 // merge version
f67539c2
TL
1158 if (dir->get_version() > _fnode->version)
1159 _fnode->version = projected_version = dir->get_version();
7c673cae
FG
1160
1161 // merge state
1162 state_set(dir->get_state() & MASK_STATE_FRAGMENT_KEPT);
7c673cae
FG
1163
1164 dir->finish_old_fragment(waiters, replay);
1165 inode->close_dirfrag(dir->get_frag());
1166 }
1167
31f18b77
FG
1168 if (!dentry_waiters.empty()) {
1169 get(PIN_DNWAITER);
94b18763
FG
1170 for (const auto &p : dentry_waiters) {
1171 auto &e = waiting_on_dentry[p.first];
1172 for (const auto &waiter : p.second) {
1173 e.push_back(waiter);
1174 }
31f18b77
FG
1175 }
1176 }
1177
7c673cae
FG
1178 if (is_auth() && !replay)
1179 mark_complete();
1180
1181 // FIXME: merge dirty old rstat
f67539c2
TL
1182 _fnode->rstat.version = rstat_version;
1183 _fnode->accounted_rstat = _fnode->rstat;
1184 _fnode->accounted_rstat.add(rstatdiff);
7c673cae 1185
f67539c2
TL
1186 _fnode->fragstat.version = dirstat_version;
1187 _fnode->accounted_fragstat = _fnode->fragstat;
1188 _fnode->accounted_fragstat.add(fragstatdiff);
7c673cae
FG
1189
1190 init_fragment_pins();
1191}
1192
1193
1194
1195
1196void CDir::resync_accounted_fragstat()
1197{
f67539c2
TL
1198 auto pf = _get_projected_fnode();
1199 const auto& pi = inode->get_projected_inode();
7c673cae
FG
1200
1201 if (pf->accounted_fragstat.version != pi->dirstat.version) {
1202 pf->fragstat.version = pi->dirstat.version;
11fdf7f2 1203 dout(10) << __func__ << " " << pf->accounted_fragstat << " -> " << pf->fragstat << dendl;
7c673cae
FG
1204 pf->accounted_fragstat = pf->fragstat;
1205 }
1206}
1207
1208/*
1209 * resync rstat and accounted_rstat with inode
1210 */
1211void CDir::resync_accounted_rstat()
1212{
f67539c2
TL
1213 auto pf = _get_projected_fnode();
1214 const auto& pi = inode->get_projected_inode();
7c673cae
FG
1215
1216 if (pf->accounted_rstat.version != pi->rstat.version) {
1217 pf->rstat.version = pi->rstat.version;
11fdf7f2 1218 dout(10) << __func__ << " " << pf->accounted_rstat << " -> " << pf->rstat << dendl;
7c673cae
FG
1219 pf->accounted_rstat = pf->rstat;
1220 dirty_old_rstat.clear();
1221 }
1222}
1223
f67539c2 1224void CDir::assimilate_dirty_rstat_inodes(MutationRef& mut)
7c673cae 1225{
11fdf7f2 1226 dout(10) << __func__ << dendl;
7c673cae
FG
1227 for (elist<CInode*>::iterator p = dirty_rstat_inodes.begin_use_current();
1228 !p.end(); ++p) {
1229 CInode *in = *p;
11fdf7f2 1230 ceph_assert(in->is_auth());
7c673cae
FG
1231 if (in->is_frozen())
1232 continue;
1233
f67539c2
TL
1234 mut->auth_pin(in);
1235
1236 auto pi = in->project_inode(mut);
1237 pi.inode->version = in->pre_dirty();
7c673cae 1238
f67539c2 1239 mdcache->project_rstat_inode_to_frag(mut, in, this, 0, 0, nullptr);
7c673cae
FG
1240 }
1241 state_set(STATE_ASSIMRSTAT);
11fdf7f2 1242 dout(10) << __func__ << " done" << dendl;
7c673cae
FG
1243}
1244
f67539c2 1245void CDir::assimilate_dirty_rstat_inodes_finish(EMetaBlob *blob)
7c673cae
FG
1246{
1247 if (!state_test(STATE_ASSIMRSTAT))
1248 return;
1249 state_clear(STATE_ASSIMRSTAT);
11fdf7f2 1250 dout(10) << __func__ << dendl;
7c673cae
FG
1251 elist<CInode*>::iterator p = dirty_rstat_inodes.begin_use_current();
1252 while (!p.end()) {
1253 CInode *in = *p;
1254 ++p;
1255
1256 if (in->is_frozen())
1257 continue;
1258
1259 CDentry *dn = in->get_projected_parent_dn();
1260
7c673cae
FG
1261 in->clear_dirty_rstat();
1262 blob->add_primary_dentry(dn, in, true);
1263 }
1264
1265 if (!dirty_rstat_inodes.empty())
f67539c2 1266 mdcache->mds->locker->mark_updated_scatterlock(&inode->nestlock);
7c673cae
FG
1267}
1268
1269
1270
1271
1272/****************************************
1273 * WAITING
1274 */
1275
11fdf7f2 1276void CDir::add_dentry_waiter(std::string_view dname, snapid_t snapid, MDSContext *c)
7c673cae
FG
1277{
1278 if (waiting_on_dentry.empty())
1279 get(PIN_DNWAITER);
1280 waiting_on_dentry[string_snap_t(dname, snapid)].push_back(c);
11fdf7f2 1281 dout(10) << __func__ << " dentry " << dname
7c673cae
FG
1282 << " snap " << snapid
1283 << " " << c << " on " << *this << dendl;
1284}
1285
11fdf7f2
TL
1286void CDir::take_dentry_waiting(std::string_view dname, snapid_t first, snapid_t last,
1287 MDSContext::vec& ls)
7c673cae
FG
1288{
1289 if (waiting_on_dentry.empty())
1290 return;
1291
1292 string_snap_t lb(dname, first);
1293 string_snap_t ub(dname, last);
94b18763
FG
1294 auto it = waiting_on_dentry.lower_bound(lb);
1295 while (it != waiting_on_dentry.end() &&
1296 !(ub < it->first)) {
11fdf7f2 1297 dout(10) << __func__ << " " << dname
7c673cae 1298 << " [" << first << "," << last << "] found waiter on snap "
94b18763 1299 << it->first.snapid
7c673cae 1300 << " on " << *this << dendl;
94b18763
FG
1301 for (const auto &waiter : it->second) {
1302 ls.push_back(waiter);
1303 }
1304 waiting_on_dentry.erase(it++);
7c673cae
FG
1305 }
1306
1307 if (waiting_on_dentry.empty())
1308 put(PIN_DNWAITER);
1309}
1310
11fdf7f2 1311void CDir::take_sub_waiting(MDSContext::vec& ls)
7c673cae 1312{
11fdf7f2 1313 dout(10) << __func__ << dendl;
7c673cae 1314 if (!waiting_on_dentry.empty()) {
94b18763
FG
1315 for (const auto &p : waiting_on_dentry) {
1316 for (const auto &waiter : p.second) {
1317 ls.push_back(waiter);
1318 }
1319 }
7c673cae
FG
1320 waiting_on_dentry.clear();
1321 put(PIN_DNWAITER);
1322 }
1323}
1324
1325
1326
11fdf7f2 1327void CDir::add_waiter(uint64_t tag, MDSContext *c)
7c673cae
FG
1328{
1329 // hierarchical?
7c673cae
FG
1330
1331 // at subtree root?
1332 if (tag & WAIT_ATSUBTREEROOT) {
1333 if (!is_subtree_root()) {
1334 // try parent
1335 dout(10) << "add_waiter " << std::hex << tag << std::dec << " " << c << " should be ATSUBTREEROOT, " << *this << " is not root, trying parent" << dendl;
1336 inode->parent->dir->add_waiter(tag, c);
1337 return;
1338 }
1339 }
1340
11fdf7f2 1341 ceph_assert(!(tag & WAIT_CREATED) || state_test(STATE_CREATING));
7c673cae
FG
1342
1343 MDSCacheObject::add_waiter(tag, c);
1344}
1345
1346
1347
1348/* NOTE: this checks dentry waiters too */
11fdf7f2 1349void CDir::take_waiting(uint64_t mask, MDSContext::vec& ls)
7c673cae
FG
1350{
1351 if ((mask & WAIT_DENTRY) && !waiting_on_dentry.empty()) {
1352 // take all dentry waiters
94b18763
FG
1353 for (const auto &p : waiting_on_dentry) {
1354 dout(10) << "take_waiting dentry " << p.first.name
1355 << " snap " << p.first.snapid << " on " << *this << dendl;
1356 for (const auto &waiter : p.second) {
1357 ls.push_back(waiter);
1358 }
7c673cae 1359 }
94b18763 1360 waiting_on_dentry.clear();
7c673cae
FG
1361 put(PIN_DNWAITER);
1362 }
1363
1364 // waiting
1365 MDSCacheObject::take_waiting(mask, ls);
1366}
1367
1368
1369void CDir::finish_waiting(uint64_t mask, int result)
1370{
11fdf7f2 1371 dout(11) << __func__ << " mask " << hex << mask << dec << " result " << result << " on " << *this << dendl;
7c673cae 1372
11fdf7f2 1373 MDSContext::vec finished;
7c673cae
FG
1374 take_waiting(mask, finished);
1375 if (result < 0)
1376 finish_contexts(g_ceph_context, finished, result);
1377 else
f67539c2 1378 mdcache->mds->queue_waiters(finished);
7c673cae
FG
1379}
1380
1381
1382
1383// dirty/clean
1384
f67539c2 1385CDir::fnode_ptr CDir::project_fnode(const MutationRef& mut)
7c673cae 1386{
11fdf7f2 1387 ceph_assert(get_version() != 0);
f67539c2
TL
1388
1389 if (mut && mut->is_projected(this))
1390 return std::const_pointer_cast<fnode_t>(projected_fnode.back());
1391
1392 auto pf = allocate_fnode(*get_projected_fnode());
7c673cae
FG
1393
1394 if (scrub_infop && scrub_infop->last_scrub_dirty) {
f67539c2
TL
1395 pf->localized_scrub_stamp = scrub_infop->last_local.time;
1396 pf->localized_scrub_version = scrub_infop->last_local.version;
1397 pf->recursive_scrub_stamp = scrub_infop->last_recursive.time;
1398 pf->recursive_scrub_version = scrub_infop->last_recursive.version;
7c673cae
FG
1399 scrub_infop->last_scrub_dirty = false;
1400 scrub_maybe_delete_info();
1401 }
1402
f67539c2
TL
1403 projected_fnode.emplace_back(pf);
1404 if (mut)
1405 mut->add_projected_node(this);
1406 dout(10) << __func__ << " " << pf.get() << dendl;
1407 return pf;
7c673cae
FG
1408}
1409
f67539c2 1410void CDir::pop_and_dirty_projected_fnode(LogSegment *ls, const MutationRef& mut)
7c673cae 1411{
11fdf7f2 1412 ceph_assert(!projected_fnode.empty());
f67539c2
TL
1413 auto pf = std::move(projected_fnode.front());
1414 dout(15) << __func__ << " " << pf.get() << " v" << pf->version << dendl;
1415
7c673cae 1416 projected_fnode.pop_front();
f67539c2
TL
1417 if (mut)
1418 mut->remove_projected_node(this);
7c673cae 1419
f67539c2
TL
1420 reset_fnode(std::move(pf));
1421 _mark_dirty(ls);
1422}
7c673cae
FG
1423
1424version_t CDir::pre_dirty(version_t min)
1425{
1426 if (min > projected_version)
1427 projected_version = min;
1428 ++projected_version;
11fdf7f2 1429 dout(10) << __func__ << " " << projected_version << dendl;
7c673cae
FG
1430 return projected_version;
1431}
1432
f67539c2 1433void CDir::mark_dirty(LogSegment *ls, version_t pv)
7c673cae 1434{
f67539c2
TL
1435 ceph_assert(is_auth());
1436
1437 if (pv) {
1438 ceph_assert(get_version() < pv);
1439 ceph_assert(pv <= projected_version);
1440 ceph_assert(!projected_fnode.empty() &&
1441 pv <= projected_fnode.front()->version);
1442 }
1443
7c673cae
FG
1444 _mark_dirty(ls);
1445}
1446
1447void CDir::_mark_dirty(LogSegment *ls)
1448{
1449 if (!state_test(STATE_DIRTY)) {
11fdf7f2 1450 dout(10) << __func__ << " (was clean) " << *this << " version " << get_version() << dendl;
7c673cae 1451 _set_dirty_flag();
11fdf7f2 1452 ceph_assert(ls);
7c673cae 1453 } else {
11fdf7f2 1454 dout(10) << __func__ << " (already dirty) " << *this << " version " << get_version() << dendl;
7c673cae
FG
1455 }
1456 if (ls) {
1457 ls->dirty_dirfrags.push_back(&item_dirty);
1458
1459 // if i've never committed, i need to be before _any_ mention of me is trimmed from the journal.
1460 if (committed_version == 0 && !item_new.is_on_list())
1461 ls->new_dirfrags.push_back(&item_new);
1462 }
1463}
1464
1465void CDir::mark_new(LogSegment *ls)
1466{
1467 ls->new_dirfrags.push_back(&item_new);
1468 state_clear(STATE_CREATING);
1469
11fdf7f2 1470 MDSContext::vec waiters;
7c673cae 1471 take_waiting(CDir::WAIT_CREATED, waiters);
f67539c2 1472 mdcache->mds->queue_waiters(waiters);
7c673cae
FG
1473}
1474
1475void CDir::mark_clean()
1476{
11fdf7f2 1477 dout(10) << __func__ << " " << *this << " version " << get_version() << dendl;
7c673cae
FG
1478 if (state_test(STATE_DIRTY)) {
1479 item_dirty.remove_myself();
1480 item_new.remove_myself();
1481
1482 state_clear(STATE_DIRTY);
1483 put(PIN_DIRTY);
1484 }
1485}
1486
1487// caller should hold auth pin of this
1488void CDir::log_mark_dirty()
1489{
b32b8144 1490 if (is_dirty() || projected_version > get_version())
7c673cae
FG
1491 return; // noop if it is already dirty or will be dirty
1492
f67539c2
TL
1493 auto _fnode = allocate_fnode(*get_fnode());
1494 _fnode->version = pre_dirty();
1495 reset_fnode(std::move(_fnode));
1496 mark_dirty(mdcache->mds->mdlog->get_current_segment());
7c673cae
FG
1497}
1498
1499void CDir::mark_complete() {
1500 state_set(STATE_COMPLETE);
1501 bloom.reset();
1502}
1503
1504void CDir::first_get()
1505{
1506 inode->get(CInode::PIN_DIRFRAG);
1507}
1508
1509void CDir::last_put()
1510{
1511 inode->put(CInode::PIN_DIRFRAG);
1512}
1513
1514
1515
1516/******************************************************************************
1517 * FETCH and COMMIT
1518 */
1519
1520// -----------------------
1521// FETCH
11fdf7f2 1522void CDir::fetch(MDSContext *c, bool ignore_authpinnability)
7c673cae
FG
1523{
1524 string want;
1525 return fetch(c, want, ignore_authpinnability);
1526}
1527
11fdf7f2 1528void CDir::fetch(MDSContext *c, std::string_view want_dn, bool ignore_authpinnability)
7c673cae
FG
1529{
1530 dout(10) << "fetch on " << *this << dendl;
1531
11fdf7f2
TL
1532 ceph_assert(is_auth());
1533 ceph_assert(!is_complete());
7c673cae
FG
1534
1535 if (!can_auth_pin() && !ignore_authpinnability) {
1536 if (c) {
1537 dout(7) << "fetch waiting for authpinnable" << dendl;
1538 add_waiter(WAIT_UNFREEZE, c);
1539 } else
1540 dout(7) << "fetch not authpinnable and no context" << dendl;
1541 return;
1542 }
1543
1544 // unlinked directory inode shouldn't have any entry
31f18b77
FG
1545 if (!inode->is_base() && get_parent_dir()->inode->is_stray() &&
1546 !inode->snaprealm) {
7c673cae
FG
1547 dout(7) << "fetch dirfrag for unlinked directory, mark complete" << dendl;
1548 if (get_version() == 0) {
11fdf7f2 1549 ceph_assert(inode->is_auth());
f67539c2
TL
1550 auto _fnode = allocate_fnode();
1551 _fnode->version = 1;
1552 reset_fnode(std::move(_fnode));
7c673cae
FG
1553
1554 if (state_test(STATE_REJOINUNDEF)) {
f67539c2 1555 ceph_assert(mdcache->mds->is_rejoin());
7c673cae 1556 state_clear(STATE_REJOINUNDEF);
f67539c2 1557 mdcache->opened_undef_dirfrag(this);
7c673cae
FG
1558 }
1559 }
1560 mark_complete();
1561
1562 if (c)
f67539c2 1563 mdcache->mds->queue_waiter(c);
7c673cae
FG
1564 return;
1565 }
1566
1567 if (c) add_waiter(WAIT_COMPLETE, c);
94b18763 1568 if (!want_dn.empty()) wanted_items.insert(mempool::mds_co::string(want_dn));
7c673cae
FG
1569
1570 // already fetching?
1571 if (state_test(CDir::STATE_FETCHING)) {
1572 dout(7) << "already fetching; waiting" << dendl;
1573 return;
1574 }
1575
1576 auth_pin(this);
1577 state_set(CDir::STATE_FETCHING);
1578
f67539c2 1579 if (mdcache->mds->logger) mdcache->mds->logger->inc(l_mds_dir_fetch);
7c673cae 1580
522d829b
TL
1581 mdcache->mds->balancer->hit_dir(this, META_POP_FETCH);
1582
7c673cae
FG
1583 std::set<dentry_key_t> empty;
1584 _omap_fetch(NULL, empty);
1585}
1586
11fdf7f2 1587void CDir::fetch(MDSContext *c, const std::set<dentry_key_t>& keys)
7c673cae
FG
1588{
1589 dout(10) << "fetch " << keys.size() << " keys on " << *this << dendl;
1590
11fdf7f2
TL
1591 ceph_assert(is_auth());
1592 ceph_assert(!is_complete());
7c673cae
FG
1593
1594 if (!can_auth_pin()) {
1595 dout(7) << "fetch keys waiting for authpinnable" << dendl;
1596 add_waiter(WAIT_UNFREEZE, c);
1597 return;
1598 }
1599 if (state_test(CDir::STATE_FETCHING)) {
1600 dout(7) << "fetch keys waiting for full fetch" << dendl;
1601 add_waiter(WAIT_COMPLETE, c);
1602 return;
1603 }
1604
1605 auth_pin(this);
f67539c2 1606 if (mdcache->mds->logger) mdcache->mds->logger->inc(l_mds_dir_fetch);
7c673cae 1607
522d829b
TL
1608 mdcache->mds->balancer->hit_dir(this, META_POP_FETCH);
1609
7c673cae
FG
1610 _omap_fetch(c, keys);
1611}
1612
1613class C_IO_Dir_OMAP_FetchedMore : public CDirIOContext {
11fdf7f2 1614 MDSContext *fin;
7c673cae 1615public:
f67539c2 1616 const version_t omap_version;
7c673cae
FG
1617 bufferlist hdrbl;
1618 bool more = false;
1619 map<string, bufferlist> omap; ///< carry-over from before
1620 map<string, bufferlist> omap_more; ///< new batch
1621 int ret;
f67539c2
TL
1622 C_IO_Dir_OMAP_FetchedMore(CDir *d, version_t v, MDSContext *f) :
1623 CDirIOContext(d), fin(f), omap_version(v), ret(0) { }
7c673cae 1624 void finish(int r) {
f67539c2
TL
1625 if (omap_version < dir->get_committed_version()) {
1626 omap.clear();
1627 dir->_omap_fetch(fin, {});
1628 return;
1629 }
1630
7c673cae
FG
1631 // merge results
1632 if (omap.empty()) {
1633 omap.swap(omap_more);
1634 } else {
1635 omap.insert(omap_more.begin(), omap_more.end());
1636 }
1637 if (more) {
f67539c2 1638 dir->_omap_fetch_more(omap_version, hdrbl, omap, fin);
7c673cae
FG
1639 } else {
1640 dir->_omap_fetched(hdrbl, omap, !fin, r);
1641 if (fin)
1642 fin->complete(r);
1643 }
1644 }
91327a77
AA
1645 void print(ostream& out) const override {
1646 out << "dirfrag_fetch_more(" << dir->dirfrag() << ")";
1647 }
7c673cae
FG
1648};
1649
1650class C_IO_Dir_OMAP_Fetched : public CDirIOContext {
11fdf7f2 1651 MDSContext *fin;
7c673cae 1652public:
f67539c2 1653 const version_t omap_version;
7c673cae
FG
1654 bufferlist hdrbl;
1655 bool more = false;
1656 map<string, bufferlist> omap;
1657 bufferlist btbl;
1658 int ret1, ret2, ret3;
1659
11fdf7f2 1660 C_IO_Dir_OMAP_Fetched(CDir *d, MDSContext *f) :
f67539c2
TL
1661 CDirIOContext(d), fin(f),
1662 omap_version(d->get_committing_version()),
1663 ret1(0), ret2(0), ret3(0) { }
7c673cae
FG
1664 void finish(int r) override {
1665 // check the correctness of backtrace
f67539c2 1666 if (r >= 0 && ret3 != -CEPHFS_ECANCELED)
7c673cae
FG
1667 dir->inode->verify_diri_backtrace(btbl, ret3);
1668 if (r >= 0) r = ret1;
1669 if (r >= 0) r = ret2;
f67539c2 1670
7c673cae 1671 if (more) {
f67539c2
TL
1672 if (omap_version < dir->get_committed_version()) {
1673 omap.clear();
1674 dir->_omap_fetch(fin, {});
1675 } else {
1676 dir->_omap_fetch_more(omap_version, hdrbl, omap, fin);
1677 }
1678 return;
7c673cae 1679 }
f67539c2
TL
1680
1681 dir->_omap_fetched(hdrbl, omap, !fin, r);
1682 if (fin)
1683 fin->complete(r);
1684
7c673cae 1685 }
91327a77
AA
1686 void print(ostream& out) const override {
1687 out << "dirfrag_fetch(" << dir->dirfrag() << ")";
1688 }
7c673cae
FG
1689};
1690
11fdf7f2 1691void CDir::_omap_fetch(MDSContext *c, const std::set<dentry_key_t>& keys)
7c673cae
FG
1692{
1693 C_IO_Dir_OMAP_Fetched *fin = new C_IO_Dir_OMAP_Fetched(this, c);
1694 object_t oid = get_ondisk_object();
f67539c2 1695 object_locator_t oloc(mdcache->mds->mdsmap->get_metadata_pool());
7c673cae
FG
1696 ObjectOperation rd;
1697 rd.omap_get_header(&fin->hdrbl, &fin->ret1);
1698 if (keys.empty()) {
11fdf7f2
TL
1699 ceph_assert(!c);
1700 rd.omap_get_vals("", "", g_conf()->mds_dir_keys_per_op,
7c673cae
FG
1701 &fin->omap, &fin->more, &fin->ret2);
1702 } else {
11fdf7f2 1703 ceph_assert(c);
7c673cae 1704 std::set<std::string> str_keys;
94b18763 1705 for (auto p : keys) {
7c673cae 1706 string str;
94b18763 1707 p.encode(str);
7c673cae
FG
1708 str_keys.insert(str);
1709 }
1710 rd.omap_get_vals_by_keys(str_keys, &fin->omap, &fin->ret2);
1711 }
1712 // check the correctness of backtrace
11fdf7f2 1713 if (g_conf()->mds_verify_backtrace > 0 && frag == frag_t()) {
7c673cae
FG
1714 rd.getxattr("parent", &fin->btbl, &fin->ret3);
1715 rd.set_last_op_flags(CEPH_OSD_OP_FLAG_FAILOK);
1716 } else {
f67539c2 1717 fin->ret3 = -CEPHFS_ECANCELED;
7c673cae
FG
1718 }
1719
f67539c2
TL
1720 mdcache->mds->objecter->read(oid, oloc, rd, CEPH_NOSNAP, NULL, 0,
1721 new C_OnFinisher(fin, mdcache->mds->finisher));
7c673cae
FG
1722}
1723
f67539c2
TL
1724void CDir::_omap_fetch_more(version_t omap_version, bufferlist& hdrbl,
1725 map<string, bufferlist>& omap, MDSContext *c)
7c673cae
FG
1726{
1727 // we have more omap keys to fetch!
1728 object_t oid = get_ondisk_object();
f67539c2
TL
1729 object_locator_t oloc(mdcache->mds->mdsmap->get_metadata_pool());
1730 auto fin = new C_IO_Dir_OMAP_FetchedMore(this, omap_version, c);
1731 fin->hdrbl = std::move(hdrbl);
7c673cae
FG
1732 fin->omap.swap(omap);
1733 ObjectOperation rd;
1734 rd.omap_get_vals(fin->omap.rbegin()->first,
1735 "", /* filter prefix */
11fdf7f2 1736 g_conf()->mds_dir_keys_per_op,
7c673cae
FG
1737 &fin->omap_more,
1738 &fin->more,
1739 &fin->ret);
f67539c2
TL
1740 mdcache->mds->objecter->read(oid, oloc, rd, CEPH_NOSNAP, NULL, 0,
1741 new C_OnFinisher(fin, mdcache->mds->finisher));
7c673cae
FG
1742}
1743
1744CDentry *CDir::_load_dentry(
11fdf7f2
TL
1745 std::string_view key,
1746 std::string_view dname,
7c673cae
FG
1747 const snapid_t last,
1748 bufferlist &bl,
1749 const int pos,
1750 const std::set<snapid_t> *snaps,
f91f0fd5 1751 double rand_threshold,
28e407b8 1752 bool *force_dirty)
7c673cae 1753{
11fdf7f2 1754 auto q = bl.cbegin();
7c673cae
FG
1755
1756 snapid_t first;
11fdf7f2 1757 decode(first, q);
7c673cae
FG
1758
1759 // marker
1760 char type;
11fdf7f2 1761 decode(type, q);
7c673cae
FG
1762
1763 dout(20) << "_fetched pos " << pos << " marker '" << type << "' dname '" << dname
1764 << " [" << first << "," << last << "]"
1765 << dendl;
1766
1767 bool stale = false;
1768 if (snaps && last != CEPH_NOSNAP) {
1769 set<snapid_t>::const_iterator p = snaps->lower_bound(first);
1770 if (p == snaps->end() || *p > last) {
1771 dout(10) << " skipping stale dentry on [" << first << "," << last << "]" << dendl;
1772 stale = true;
1773 }
1774 }
f67539c2 1775
7c673cae
FG
1776 /*
1777 * look for existing dentry for _last_ snap, because unlink +
1778 * create may leave a "hole" (epochs during which the dentry
1779 * doesn't exist) but for which no explicit negative dentry is in
1780 * the cache.
1781 */
1782 CDentry *dn;
1783 if (stale)
1784 dn = lookup_exact_snap(dname, last);
1785 else
1786 dn = lookup(dname, last);
1787
f67539c2 1788 if (type == 'L' || type == 'l') {
7c673cae
FG
1789 // hard link
1790 inodeno_t ino;
1791 unsigned char d_type;
f67539c2
TL
1792 mempool::mds_co::string alternate_name;
1793
1794 CDentry::decode_remote(type, ino, d_type, alternate_name, q);
7c673cae
FG
1795
1796 if (stale) {
1797 if (!dn) {
94b18763 1798 stale_items.insert(mempool::mds_co::string(key));
7c673cae
FG
1799 *force_dirty = true;
1800 }
1801 return dn;
1802 }
1803
1804 if (dn) {
28e407b8
AA
1805 CDentry::linkage_t *dnl = dn->get_linkage();
1806 dout(12) << "_fetched had " << (dnl->is_null() ? "NEG" : "") << " dentry " << *dn << dendl;
1807 if (committed_version == 0 &&
1808 dnl->is_remote() &&
1809 dn->is_dirty() &&
1810 ino == dnl->get_remote_ino() &&
f67539c2
TL
1811 d_type == dnl->get_remote_d_type() &&
1812 alternate_name == dn->get_alternate_name()) {
28e407b8
AA
1813 // see comment below
1814 dout(10) << "_fetched had underwater dentry " << *dn << ", marking clean" << dendl;
1815 dn->mark_clean();
7c673cae
FG
1816 }
1817 } else {
1818 // (remote) link
f67539c2
TL
1819 dn = add_remote_dentry(dname, ino, d_type, std::move(alternate_name), first, last);
1820
7c673cae 1821 // link to inode?
f67539c2 1822 CInode *in = mdcache->get_inode(ino); // we may or may not have it.
7c673cae
FG
1823 if (in) {
1824 dn->link_remote(dn->get_linkage(), in);
1825 dout(12) << "_fetched got remote link " << ino << " which we have " << *in << dendl;
1826 } else {
11fdf7f2 1827 dout(12) << "_fetched got remote link " << ino << " (don't have it)" << dendl;
7c673cae
FG
1828 }
1829 }
f67539c2
TL
1830 }
1831 else if (type == 'I' || type == 'i') {
1832 InodeStore inode_data;
1833 mempool::mds_co::string alternate_name;
7c673cae 1834 // inode
7c673cae 1835 // Load inode data before looking up or constructing CInode
f67539c2
TL
1836 if (type == 'i') {
1837 DECODE_START(2, q);
1838 if (struct_v >= 2) {
1839 decode(alternate_name, q);
1840 }
1841 inode_data.decode(q);
1842 DECODE_FINISH(q);
1843 } else {
1844 inode_data.decode_bare(q);
1845 }
1846
7c673cae
FG
1847 if (stale) {
1848 if (!dn) {
94b18763 1849 stale_items.insert(mempool::mds_co::string(key));
7c673cae
FG
1850 *force_dirty = true;
1851 }
1852 return dn;
1853 }
1854
1855 bool undef_inode = false;
1856 if (dn) {
28e407b8
AA
1857 CDentry::linkage_t *dnl = dn->get_linkage();
1858 dout(12) << "_fetched had " << (dnl->is_null() ? "NEG" : "") << " dentry " << *dn << dendl;
1859
1860 if (dnl->is_primary()) {
1861 CInode *in = dnl->get_inode();
1862 if (in->state_test(CInode::STATE_REJOINUNDEF)) {
1863 undef_inode = true;
1864 } else if (committed_version == 0 &&
1865 dn->is_dirty() &&
f67539c2
TL
1866 inode_data.inode->ino == in->ino() &&
1867 inode_data.inode->version == in->get_version()) {
28e407b8
AA
1868 /* clean underwater item?
1869 * Underwater item is something that is dirty in our cache from
1870 * journal replay, but was previously flushed to disk before the
1871 * mds failed.
1872 *
1873 * We only do this is committed_version == 0. that implies either
1874 * - this is a fetch after from a clean/empty CDir is created
1875 * (and has no effect, since the dn won't exist); or
1876 * - this is a fetch after _recovery_, which is what we're worried
1877 * about. Items that are marked dirty from the journal should be
1878 * marked clean if they appear on disk.
1879 */
1880 dout(10) << "_fetched had underwater dentry " << *dn << ", marking clean" << dendl;
1881 dn->mark_clean();
1882 dout(10) << "_fetched had underwater inode " << *dnl->get_inode() << ", marking clean" << dendl;
1883 in->mark_clean();
1884 }
1885 }
7c673cae
FG
1886 }
1887
1888 if (!dn || undef_inode) {
1889 // add inode
f67539c2 1890 CInode *in = mdcache->get_inode(inode_data.inode->ino, last);
7c673cae
FG
1891 if (!in || undef_inode) {
1892 if (undef_inode && in)
1893 in->first = first;
1894 else
f67539c2 1895 in = new CInode(mdcache, true, first, last);
7c673cae 1896
f67539c2
TL
1897 in->reset_inode(std::move(inode_data.inode));
1898 in->reset_xattrs(std::move(inode_data.xattrs));
7c673cae
FG
1899 // symlink?
1900 if (in->is_symlink())
1901 in->symlink = inode_data.symlink;
1902
1903 in->dirfragtree.swap(inode_data.dirfragtree);
f67539c2
TL
1904 in->reset_old_inodes(std::move(inode_data.old_inodes));
1905 if (in->is_any_old_inodes()) {
1906 snapid_t min_first = in->get_old_inodes()->rbegin()->first + 1;
7c673cae
FG
1907 if (min_first > in->first)
1908 in->first = min_first;
1909 }
1910
1911 in->oldest_snap = inode_data.oldest_snap;
1912 in->decode_snap_blob(inode_data.snap_blob);
1913 if (snaps && !in->snaprealm)
1914 in->purge_stale_snap_data(*snaps);
1915
1916 if (!undef_inode) {
f67539c2
TL
1917 mdcache->add_inode(in); // add
1918 dn = add_primary_dentry(dname, in, std::move(alternate_name), first, last); // link
7c673cae
FG
1919 }
1920 dout(12) << "_fetched got " << *dn << " " << *in << dendl;
1921
f67539c2 1922 if (in->get_inode()->is_dirty_rstat())
7c673cae
FG
1923 in->mark_dirty_rstat();
1924
f67539c2 1925 in->maybe_ephemeral_rand(rand_threshold);
7c673cae
FG
1926 //in->hack_accessed = false;
1927 //in->hack_load_stamp = ceph_clock_now();
1928 //num_new_inodes_loaded++;
11fdf7f2 1929 } else if (g_conf().get_val<bool>("mds_hack_allow_loading_invalid_metadata")) {
94b18763 1930 dout(20) << "hack: adding duplicate dentry for " << *in << dendl;
f67539c2 1931 dn = add_primary_dentry(dname, in, std::move(alternate_name), first, last);
7c673cae
FG
1932 } else {
1933 dout(0) << "_fetched badness: got (but i already had) " << *in
f67539c2
TL
1934 << " mode " << in->get_inode()->mode
1935 << " mtime " << in->get_inode()->mtime << dendl;
7c673cae
FG
1936 string dirpath, inopath;
1937 this->inode->make_path_string(dirpath);
1938 in->make_path_string(inopath);
f67539c2
TL
1939 mdcache->mds->clog->error() << "loaded dup inode " << inode_data.inode->ino
1940 << " [" << first << "," << last << "] v" << inode_data.inode->version
7c673cae 1941 << " at " << dirpath << "/" << dname
f67539c2 1942 << ", but inode " << in->vino() << " v" << in->get_version()
7c673cae
FG
1943 << " already exists at " << inopath;
1944 return dn;
1945 }
1946 }
1947 } else {
f67539c2
TL
1948 CachedStackStringStream css;
1949 *css << "Invalid tag char '" << type << "' pos " << pos;
1950 throw buffer::malformed_input(css->str());
7c673cae
FG
1951 }
1952
1953 return dn;
1954}
1955
1956void CDir::_omap_fetched(bufferlist& hdrbl, map<string, bufferlist>& omap,
1957 bool complete, int r)
1958{
f67539c2 1959 LogChannelRef clog = mdcache->mds->clog;
7c673cae
FG
1960 dout(10) << "_fetched header " << hdrbl.length() << " bytes "
1961 << omap.size() << " keys for " << *this << dendl;
1962
f67539c2 1963 ceph_assert(r == 0 || r == -CEPHFS_ENOENT || r == -CEPHFS_ENODATA);
11fdf7f2
TL
1964 ceph_assert(is_auth());
1965 ceph_assert(!is_frozen());
7c673cae
FG
1966
1967 if (hdrbl.length() == 0) {
1968 dout(0) << "_fetched missing object for " << *this << dendl;
1969
1970 clog->error() << "dir " << dirfrag() << " object missing on disk; some "
1971 "files may be lost (" << get_path() << ")";
1972
1973 go_bad(complete);
1974 return;
1975 }
1976
1977 fnode_t got_fnode;
1978 {
11fdf7f2 1979 auto p = hdrbl.cbegin();
7c673cae 1980 try {
11fdf7f2 1981 decode(got_fnode, p);
7c673cae
FG
1982 } catch (const buffer::error &err) {
1983 derr << "Corrupt fnode in dirfrag " << dirfrag()
f67539c2 1984 << ": " << err.what() << dendl;
7c673cae 1985 clog->warn() << "Corrupt fnode header in " << dirfrag() << ": "
f67539c2 1986 << err.what() << " (" << get_path() << ")";
7c673cae
FG
1987 go_bad(complete);
1988 return;
1989 }
1990 if (!p.end()) {
1991 clog->warn() << "header buffer of dir " << dirfrag() << " has "
1992 << hdrbl.length() - p.get_off() << " extra bytes ("
1993 << get_path() << ")";
1994 go_bad(complete);
1995 return;
1996 }
1997 }
1998
1999 dout(10) << "_fetched version " << got_fnode.version << dendl;
2000
2001 // take the loaded fnode?
2002 // only if we are a fresh CDir* with no prior state.
2003 if (get_version() == 0) {
11fdf7f2
TL
2004 ceph_assert(!is_projected());
2005 ceph_assert(!state_test(STATE_COMMITTING));
f67539c2
TL
2006 auto _fnode = allocate_fnode(got_fnode);
2007 reset_fnode(std::move(_fnode));
2008 projected_version = committing_version = committed_version = get_version();
7c673cae
FG
2009
2010 if (state_test(STATE_REJOINUNDEF)) {
f67539c2 2011 ceph_assert(mdcache->mds->is_rejoin());
7c673cae 2012 state_clear(STATE_REJOINUNDEF);
f67539c2 2013 mdcache->opened_undef_dirfrag(this);
7c673cae
FG
2014 }
2015 }
2016
2017 list<CInode*> undef_inodes;
2018
2019 // purge stale snaps?
7c673cae
FG
2020 bool force_dirty = false;
2021 const set<snapid_t> *snaps = NULL;
2022 SnapRealm *realm = inode->find_snaprealm();
f67539c2 2023 if (fnode->snap_purged_thru < realm->get_last_destroyed()) {
7c673cae 2024 snaps = &realm->get_snaps();
f67539c2 2025 dout(10) << " snap_purged_thru " << fnode->snap_purged_thru
7c673cae
FG
2026 << " < " << realm->get_last_destroyed()
2027 << ", snap purge based on " << *snaps << dendl;
2028 if (get_num_snap_items() == 0) {
f67539c2 2029 const_cast<snapid_t&>(fnode->snap_purged_thru) = realm->get_last_destroyed();
7c673cae
FG
2030 force_dirty = true;
2031 }
2032 }
2033
2034 unsigned pos = omap.size() - 1;
f91f0fd5 2035 double rand_threshold = get_inode()->get_ephemeral_rand();
7c673cae
FG
2036 for (map<string, bufferlist>::reverse_iterator p = omap.rbegin();
2037 p != omap.rend();
2038 ++p, --pos) {
2039 string dname;
2040 snapid_t last;
2041 dentry_key_t::decode_helper(p->first, dname, last);
2042
2043 CDentry *dn = NULL;
2044 try {
2045 dn = _load_dentry(
2046 p->first, dname, last, p->second, pos, snaps,
f91f0fd5 2047 rand_threshold, &force_dirty);
7c673cae 2048 } catch (const buffer::error &err) {
f67539c2 2049 mdcache->mds->clog->warn() << "Corrupt dentry '" << dname << "' in "
7c673cae 2050 "dir frag " << dirfrag() << ": "
f67539c2 2051 << err.what() << "(" << get_path() << ")";
7c673cae
FG
2052
2053 // Remember that this dentry is damaged. Subsequent operations
f67539c2 2054 // that try to act directly on it will get their CEPHFS_EIOs, but this
7c673cae
FG
2055 // dirfrag as a whole will continue to look okay (minus the
2056 // mysteriously-missing dentry)
2057 go_bad_dentry(last, dname);
2058
2059 // Anyone who was WAIT_DENTRY for this guy will get kicked
2060 // to RetryRequest, and hit the DamageTable-interrogating path.
2061 // Stats will now be bogus because we will think we're complete,
2062 // but have 1 or more missing dentries.
2063 continue;
2064 }
2065
28e407b8
AA
2066 if (!dn)
2067 continue;
7c673cae 2068
28e407b8
AA
2069 CDentry::linkage_t *dnl = dn->get_linkage();
2070 if (dnl->is_primary() && dnl->get_inode()->state_test(CInode::STATE_REJOINUNDEF))
2071 undef_inodes.push_back(dnl->get_inode());
7c673cae 2072
11fdf7f2 2073 if (wanted_items.count(mempool::mds_co::string(dname)) > 0 || !complete) {
28e407b8 2074 dout(10) << " touching wanted dn " << *dn << dendl;
f67539c2 2075 mdcache->touch_dentry(dn);
7c673cae
FG
2076 }
2077 }
2078
2079 //cache->mds->logger->inc("newin", num_new_inodes_loaded);
2080
2081 // mark complete, !fetching
2082 if (complete) {
2083 wanted_items.clear();
2084 mark_complete();
2085 state_clear(STATE_FETCHING);
7c673cae
FG
2086 }
2087
2088 // open & force frags
2089 while (!undef_inodes.empty()) {
2090 CInode *in = undef_inodes.front();
2091 undef_inodes.pop_front();
2092 in->state_clear(CInode::STATE_REJOINUNDEF);
f67539c2 2093 mdcache->opened_undef_inode(in);
7c673cae
FG
2094 }
2095
2096 // dirty myself to remove stale snap dentries
f67539c2 2097 if (force_dirty && !mdcache->is_readonly())
7c673cae
FG
2098 log_mark_dirty();
2099
2100 auth_unpin(this);
2101
2102 if (complete) {
2103 // kick waiters
2104 finish_waiting(WAIT_COMPLETE, 0);
2105 }
2106}
2107
11fdf7f2 2108void CDir::go_bad_dentry(snapid_t last, std::string_view dname)
7c673cae 2109{
94b18763
FG
2110 dout(10) << __func__ << " " << dname << dendl;
2111 std::string path(get_path());
2112 path += "/";
11fdf7f2 2113 path += dname;
f67539c2 2114 const bool fatal = mdcache->mds->damage_table.notify_dentry(
94b18763 2115 inode->ino(), frag, last, dname, path);
7c673cae 2116 if (fatal) {
f67539c2 2117 mdcache->mds->damaged();
7c673cae
FG
2118 ceph_abort(); // unreachable, damaged() respawns us
2119 }
2120}
2121
2122void CDir::go_bad(bool complete)
2123{
11fdf7f2 2124 dout(10) << __func__ << " " << frag << dendl;
f67539c2 2125 const bool fatal = mdcache->mds->damage_table.notify_dirfrag(
7c673cae
FG
2126 inode->ino(), frag, get_path());
2127 if (fatal) {
f67539c2 2128 mdcache->mds->damaged();
7c673cae
FG
2129 ceph_abort(); // unreachable, damaged() respawns us
2130 }
2131
f91f0fd5 2132 if (complete) {
f67539c2
TL
2133 if (get_version() == 0) {
2134 auto _fnode = allocate_fnode();
2135 _fnode->version = 1;
2136 reset_fnode(std::move(_fnode));
2137 }
f91f0fd5
TL
2138
2139 state_set(STATE_BADFRAG);
2140 mark_complete();
2141 }
2142
2143 state_clear(STATE_FETCHING);
2144 auth_unpin(this);
f67539c2 2145 finish_waiting(WAIT_COMPLETE, -CEPHFS_EIO);
7c673cae
FG
2146}
2147
2148// -----------------------
2149// COMMIT
2150
2151/**
2152 * commit
2153 *
2154 * @param want - min version i want committed
2155 * @param c - callback for completion
2156 */
11fdf7f2 2157void CDir::commit(version_t want, MDSContext *c, bool ignore_authpinnability, int op_prio)
7c673cae
FG
2158{
2159 dout(10) << "commit want " << want << " on " << *this << dendl;
2160 if (want == 0) want = get_version();
2161
2162 // preconditions
11fdf7f2
TL
2163 ceph_assert(want <= get_version() || get_version() == 0); // can't commit the future
2164 ceph_assert(want > committed_version); // the caller is stupid
2165 ceph_assert(is_auth());
2166 ceph_assert(ignore_authpinnability || can_auth_pin());
7c673cae 2167
7c673cae
FG
2168 // note: queue up a noop if necessary, so that we always
2169 // get an auth_pin.
2170 if (!c)
2171 c = new C_MDSInternalNoop;
2172
2173 // auth_pin on first waiter
2174 if (waiting_for_commit.empty())
2175 auth_pin(this);
2176 waiting_for_commit[want].push_back(c);
2177
2178 // ok.
2179 _commit(want, op_prio);
2180}
2181
2182class C_IO_Dir_Committed : public CDirIOContext {
2183 version_t version;
2184public:
2185 C_IO_Dir_Committed(CDir *d, version_t v) : CDirIOContext(d), version(v) { }
2186 void finish(int r) override {
2187 dir->_committed(r, version);
2188 }
91327a77 2189 void print(ostream& out) const override {
f67539c2
TL
2190 out << "dirfrag_committed(" << dir->dirfrag() << ")";
2191 }
2192};
2193
2194class C_IO_Dir_Commit_Ops : public Context {
2195public:
2196 C_IO_Dir_Commit_Ops(CDir *d, int pr,
2197 vector<CDir::dentry_commit_item> &&s, bufferlist &&bl,
2198 vector<string> &&r,
2199 mempool::mds_co::compact_set<mempool::mds_co::string> &&stales) :
2200 dir(d), op_prio(pr) {
b3b6e05e 2201 metapool = dir->mdcache->mds->get_metadata_pool();
f67539c2
TL
2202 version = dir->get_version();
2203 is_new = dir->is_new();
2204 to_set.swap(s);
2205 dfts.swap(bl);
2206 to_remove.swap(r);
2207 stale_items.swap(stales);
91327a77 2208 }
f67539c2
TL
2209
2210 void finish(int r) override {
2211 dir->_omap_commit_ops(r, op_prio, metapool, version, is_new, to_set, dfts,
2212 to_remove, stale_items);
2213 }
2214
2215private:
2216 CDir *dir;
2217 int op_prio;
2218 int64_t metapool;
2219 version_t version;
2220 bool is_new;
2221 vector<CDir::dentry_commit_item> to_set;
2222 bufferlist dfts;
2223 vector<string> to_remove;
2224 mempool::mds_co::compact_set<mempool::mds_co::string> stale_items;
7c673cae
FG
2225};
2226
f67539c2
TL
2227// This is doing the same thing with the InodeStoreBase::encode()
2228void CDir::_encode_primary_inode_base(dentry_commit_item &item, bufferlist &dfts,
2229 bufferlist &bl)
2230{
2231 ENCODE_START(6, 4, bl);
2232 encode(*item.inode, bl, item.features);
2233
2234 if (!item.symlink.empty())
2235 encode(item.symlink, bl);
2236
2237 // dirfragtree
2238 dfts.splice(0, item.dft_len, &bl);
2239
2240 if (item.xattrs)
2241 encode(*item.xattrs, bl);
2242 else
2243 encode((__u32)0, bl);
2244
2245 if (item.snaprealm) {
2246 bufferlist snapr_bl;
2247 encode(item.srnode, snapr_bl);
2248 encode(snapr_bl, bl);
2249 } else {
2250 encode(bufferlist(), bl);
2251 }
2252
2253 if (item.old_inodes)
2254 encode(*item.old_inodes, bl, item.features);
2255 else
2256 encode((__u32)0, bl);
2257
2258 encode(item.oldest_snap, bl);
2259 encode(item.damage_flags, bl);
2260 ENCODE_FINISH(bl);
2261}
2262
2263// This is not locked by mds_lock
2264void CDir::_omap_commit_ops(int r, int op_prio, int64_t metapool, version_t version, bool _new,
2265 vector<dentry_commit_item> &to_set, bufferlist &dfts,
2266 vector<string>& to_remove,
2267 mempool::mds_co::compact_set<mempool::mds_co::string> &stales)
2268{
2269 dout(10) << __func__ << dendl;
2270
2271 if (r < 0) {
2272 mdcache->mds->handle_write_error_with_lock(r);
2273 return;
2274 }
2275
2276 C_GatherBuilder gather(g_ceph_context,
2277 new C_OnFinisher(new C_IO_Dir_Committed(this, version),
2278 mdcache->mds->finisher));
2279
2280 SnapContext snapc;
2281 object_t oid = get_ondisk_object();
2282 object_locator_t oloc(metapool);
2283
2284 map<string, bufferlist> _set;
2285 set<string> _rm;
2286
2287 unsigned max_write_size = mdcache->max_dir_commit_size;
2288 unsigned write_size = 0;
2289
2290 auto commit_one = [&](bool header=false) {
2291 ObjectOperation op;
2292
2293 // don't create new dirfrag blindly
2294 if (!_new)
2295 op.stat(nullptr, nullptr, nullptr);
2296
2297 /*
2298 * save the header at the last moment.. If we were to send it off before
2299 * other updates, but die before sending them all, we'd think that the
2300 * on-disk state was fully committed even though it wasn't! However, since
2301 * the messages are strictly ordered between the MDS and the OSD, and
2302 * since messages to a given PG are strictly ordered, if we simply send
2303 * the message containing the header off last, we cannot get our header
2304 * into an incorrect state.
2305 */
2306 if (header) {
2307 bufferlist header;
2308 encode(*fnode, header);
2309 op.omap_set_header(header);
2310 }
2311
2312 op.priority = op_prio;
2313 if (!_set.empty())
2314 op.omap_set(_set);
2315 if (!_rm.empty())
2316 op.omap_rm_keys(_rm);
2317 mdcache->mds->objecter->mutate(oid, oloc, op, snapc,
2318 ceph::real_clock::now(),
2319 0, gather.new_sub());
2320 write_size = 0;
2321 _set.clear();
2322 _rm.clear();
2323 };
2324
2325 for (auto &key : stales) {
2326 unsigned size = key.length() + sizeof(__u32);
2327 if (write_size + size > max_write_size)
2328 commit_one();
2329
2330 write_size += size;
2331 _rm.emplace(key);
2332 }
2333
2334 for (auto &key : to_remove) {
2335 unsigned size = key.length() + sizeof(__u32);
2336 if (write_size + size > max_write_size)
2337 commit_one();
2338
2339 write_size += size;
2340 _rm.emplace(std::move(key));
2341 }
2342
2343 uint64_t off = 0;
2344 bufferlist bl;
2345 using ceph::encode;
2346 for (auto &item : to_set) {
2347 encode(item.first, bl);
2348 if (item.is_remote) {
2349 // remote link
2350 CDentry::encode_remote(item.ino, item.d_type, item.alternate_name, bl);
2351 } else {
2352 // marker, name, inode, [symlink string]
2353 bl.append('i'); // inode
2354
2355 ENCODE_START(2, 1, bl);
2356 encode(item.alternate_name, bl);
2357 _encode_primary_inode_base(item, dfts, bl);
2358 ENCODE_FINISH(bl);
2359 }
2360 off += item.dft_len;
2361
2362 unsigned size = item.key.length() + bl.length() + 2 * sizeof(__u32);
2363 if (write_size + size > max_write_size)
2364 commit_one();
2365
2366 write_size += size;
2367 _set[std::move(item.key)].swap(bl);
2368 }
2369
2370 commit_one(true);
2371 gather.activate();
2372}
2373
7c673cae
FG
2374/**
2375 * Flush out the modified dentries in this dir. Keep the bufferlist
2376 * below max_write_size;
2377 */
2378void CDir::_omap_commit(int op_prio)
2379{
11fdf7f2 2380 dout(10) << __func__ << dendl;
7c673cae 2381
7c673cae
FG
2382 if (op_prio < 0)
2383 op_prio = CEPH_MSG_PRIO_DEFAULT;
2384
2385 // snap purge?
2386 const set<snapid_t> *snaps = NULL;
2387 SnapRealm *realm = inode->find_snaprealm();
f67539c2 2388 if (fnode->snap_purged_thru < realm->get_last_destroyed()) {
7c673cae 2389 snaps = &realm->get_snaps();
f67539c2 2390 dout(10) << " snap_purged_thru " << fnode->snap_purged_thru
7c673cae
FG
2391 << " < " << realm->get_last_destroyed()
2392 << ", snap purge based on " << *snaps << dendl;
2393 // fnode.snap_purged_thru = realm->get_last_destroyed();
2394 }
2395
f67539c2
TL
2396 size_t count = 0;
2397 if (state_test(CDir::STATE_FRAGMENTING) && is_new()) {
2398 count = get_num_head_items() + get_num_snap_items();
2399 } else {
2400 for (elist<CDentry*>::iterator it = dirty_dentries.begin(); !it.end(); ++it)
2401 ++count;
2402 }
7c673cae 2403
f67539c2
TL
2404 vector<string> to_remove;
2405 // reverve enough memories, which maybe larger than the actually needed
2406 to_remove.reserve(count);
7c673cae 2407
f67539c2
TL
2408 vector<dentry_commit_item> to_set;
2409 // reverve enough memories, which maybe larger than the actually needed
2410 to_set.reserve(count);
7c673cae 2411
f67539c2
TL
2412 // for dir fragtrees
2413 bufferlist dfts(CEPH_PAGE_SIZE);
7c673cae 2414
b32b8144 2415 auto write_one = [&](CDentry *dn) {
7c673cae
FG
2416 string key;
2417 dn->key().encode(key);
2418
2419 if (dn->last != CEPH_NOSNAP &&
2420 snaps && try_trim_snap_dentry(dn, *snaps)) {
2421 dout(10) << " rm " << key << dendl;
f67539c2 2422 to_remove.emplace_back(std::move(key));
b32b8144 2423 return;
7c673cae
FG
2424 }
2425
7c673cae 2426 if (dn->get_linkage()->is_null()) {
94b18763 2427 dout(10) << " rm " << dn->get_name() << " " << *dn << dendl;
f67539c2 2428 to_remove.emplace_back(std::move(key));
7c673cae 2429 } else {
94b18763 2430 dout(10) << " set " << dn->get_name() << " " << *dn << dendl;
7c673cae 2431
f67539c2
TL
2432 uint64_t off = dfts.length();
2433 // try to reserve new size if there has less
2434 // than 1/8 page space
2435 uint64_t left = CEPH_PAGE_SIZE - off % CEPH_PAGE_SIZE;
2436 if (left < CEPH_PAGE_SIZE / 8)
2437 dfts.reserve(left + CEPH_PAGE_SIZE);
7c673cae 2438
f67539c2
TL
2439 auto& item = to_set.emplace_back();
2440 item.key = std::move(key);
2441 _parse_dentry(dn, item, snaps, dfts);
2442 item.dft_len = dfts.length() - off;
7c673cae 2443 }
b32b8144
FG
2444 };
2445
f91f0fd5
TL
2446 if (state_test(CDir::STATE_FRAGMENTING) && is_new()) {
2447 assert(committed_version == 0);
b32b8144
FG
2448 for (auto p = items.begin(); p != items.end(); ) {
2449 CDentry *dn = p->second;
2450 ++p;
f91f0fd5 2451 if (dn->get_linkage()->is_null())
b32b8144
FG
2452 continue;
2453 write_one(dn);
2454 }
2455 } else {
2456 for (auto p = dirty_dentries.begin(); !p.end(); ) {
2457 CDentry *dn = *p;
2458 ++p;
2459 write_one(dn);
2460 }
7c673cae
FG
2461 }
2462
f67539c2
TL
2463 auto c = new C_IO_Dir_Commit_Ops(this, op_prio, std::move(to_set), std::move(dfts),
2464 std::move(to_remove), std::move(stale_items));
2465 stale_items.clear();
2466 mdcache->mds->finisher->queue(c);
7c673cae
FG
2467}
2468
f67539c2
TL
2469void CDir::_parse_dentry(CDentry *dn, dentry_commit_item &item,
2470 const set<snapid_t> *snaps, bufferlist &bl)
7c673cae
FG
2471{
2472 // clear dentry NEW flag, if any. we can no longer silently drop it.
2473 dn->clear_new();
2474
f67539c2 2475 item.first = dn->first;
7c673cae
FG
2476
2477 // primary or remote?
f67539c2
TL
2478 auto& linkage = dn->linkage;
2479 item.alternate_name = dn->get_alternate_name();
2480 if (linkage.is_remote()) {
2481 item.is_remote = true;
2482 item.ino = linkage.get_remote_ino();
2483 item.d_type = linkage.get_remote_d_type();
2484 dout(14) << " dn '" << dn->get_name() << "' remote ino " << item.ino << dendl;
2485 } else if (linkage.is_primary()) {
7c673cae 2486 // primary link
f67539c2 2487 CInode *in = linkage.get_inode();
11fdf7f2 2488 ceph_assert(in);
f67539c2
TL
2489
2490 dout(14) << " dn '" << dn->get_name() << "' inode " << *in << dendl;
7c673cae
FG
2491
2492 if (in->is_multiversion()) {
2493 if (!in->snaprealm) {
2494 if (snaps)
2495 in->purge_stale_snap_data(*snaps);
f67539c2 2496 } else {
7c673cae
FG
2497 in->purge_stale_snap_data(in->snaprealm->get_snaps());
2498 }
2499 }
2500
f67539c2
TL
2501 if (in->snaprealm) {
2502 item.snaprealm = true;
2503 item.srnode = in->snaprealm->srnode;
2504 }
2505 item.features = mdcache->mds->mdsmap->get_up_features();
2506 item.inode = in->inode;
2507 if (in->inode->is_symlink())
2508 item.symlink = in->symlink;
2509 using ceph::encode;
2510 encode(in->dirfragtree, bl);
2511 item.xattrs = in->xattrs;
2512 item.old_inodes = in->old_inodes;
2513 item.oldest_snap = in->oldest_snap;
2514 item.damage_flags = in->damage_flags;
7c673cae 2515 } else {
f67539c2 2516 ceph_assert(!linkage.is_null());
7c673cae
FG
2517 }
2518}
2519
2520void CDir::_commit(version_t want, int op_prio)
2521{
2522 dout(10) << "_commit want " << want << " on " << *this << dendl;
2523
2524 // we can't commit things in the future.
2525 // (even the projected future.)
11fdf7f2 2526 ceph_assert(want <= get_version() || get_version() == 0);
7c673cae
FG
2527
2528 // check pre+postconditions.
11fdf7f2 2529 ceph_assert(is_auth());
7c673cae
FG
2530
2531 // already committed?
2532 if (committed_version >= want) {
2533 dout(10) << "already committed " << committed_version << " >= " << want << dendl;
2534 return;
2535 }
2536 // already committing >= want?
2537 if (committing_version >= want) {
2538 dout(10) << "already committing " << committing_version << " >= " << want << dendl;
11fdf7f2 2539 ceph_assert(state_test(STATE_COMMITTING));
7c673cae
FG
2540 return;
2541 }
2542
2543 // alrady committed an older version?
2544 if (committing_version > committed_version) {
2545 dout(10) << "already committing older " << committing_version << ", waiting for that to finish" << dendl;
2546 return;
2547 }
2548
2549 // commit.
2550 committing_version = get_version();
2551
2552 // mark committing (if not already)
2553 if (!state_test(STATE_COMMITTING)) {
2554 dout(10) << "marking committing" << dendl;
2555 state_set(STATE_COMMITTING);
2556 }
2557
f67539c2 2558 if (mdcache->mds->logger) mdcache->mds->logger->inc(l_mds_dir_commit);
7c673cae 2559
522d829b
TL
2560 mdcache->mds->balancer->hit_dir(this, META_POP_STORE);
2561
7c673cae
FG
2562 _omap_commit(op_prio);
2563}
2564
2565
2566/**
2567 * _committed
2568 *
2569 * @param v version i just committed
2570 */
2571void CDir::_committed(int r, version_t v)
2572{
2573 if (r < 0) {
2574 // the directory could be partly purged during MDS failover
f67539c2 2575 if (r == -CEPHFS_ENOENT && committed_version == 0 &&
31f18b77 2576 !inode->is_base() && get_parent_dir()->inode->is_stray()) {
7c673cae 2577 r = 0;
31f18b77
FG
2578 if (inode->snaprealm)
2579 inode->state_set(CInode::STATE_MISSINGOBJS);
7c673cae
FG
2580 }
2581 if (r < 0) {
2582 dout(1) << "commit error " << r << " v " << v << dendl;
f67539c2 2583 mdcache->mds->clog->error() << "failed to commit dir " << dirfrag() << " object,"
7c673cae 2584 << " errno " << r;
f67539c2 2585 mdcache->mds->handle_write_error(r);
7c673cae
FG
2586 return;
2587 }
2588 }
2589
2590 dout(10) << "_committed v " << v << " on " << *this << dendl;
11fdf7f2 2591 ceph_assert(is_auth());
7c673cae
FG
2592
2593 bool stray = inode->is_stray();
2594
2595 // take note.
11fdf7f2
TL
2596 ceph_assert(v > committed_version);
2597 ceph_assert(v <= committing_version);
7c673cae
FG
2598 committed_version = v;
2599
2600 // _all_ commits done?
2601 if (committing_version == committed_version)
2602 state_clear(CDir::STATE_COMMITTING);
2603
2604 // _any_ commit, even if we've been redirtied, means we're no longer new.
2605 item_new.remove_myself();
2606
2607 // dir clean?
2608 if (committed_version == get_version())
2609 mark_clean();
2610
2611 // dentries clean?
b32b8144
FG
2612 for (auto p = dirty_dentries.begin(); !p.end(); ) {
2613 CDentry *dn = *p;
2614 ++p;
7c673cae
FG
2615
2616 // inode?
2617 if (dn->linkage.is_primary()) {
2618 CInode *in = dn->linkage.get_inode();
11fdf7f2
TL
2619 ceph_assert(in);
2620 ceph_assert(in->is_auth());
7c673cae
FG
2621
2622 if (committed_version >= in->get_version()) {
2623 if (in->is_dirty()) {
2624 dout(15) << " dir " << committed_version << " >= inode " << in->get_version() << " now clean " << *in << dendl;
2625 in->mark_clean();
2626 }
2627 } else {
2628 dout(15) << " dir " << committed_version << " < inode " << in->get_version() << " still dirty " << *in << dendl;
11fdf7f2 2629 ceph_assert(in->is_dirty() || in->last < CEPH_NOSNAP); // special case for cow snap items (not predirtied)
7c673cae
FG
2630 }
2631 }
2632
2633 // dentry
2634 if (committed_version >= dn->get_version()) {
b32b8144
FG
2635 dout(15) << " dir " << committed_version << " >= dn " << dn->get_version() << " now clean " << *dn << dendl;
2636 dn->mark_clean();
7c673cae 2637
b32b8144
FG
2638 // drop clean null stray dentries immediately
2639 if (stray &&
2640 dn->get_num_ref() == 0 &&
2641 !dn->is_projected() &&
2642 dn->get_linkage()->is_null())
2643 remove_dentry(dn);
7c673cae
FG
2644 } else {
2645 dout(15) << " dir " << committed_version << " < dn " << dn->get_version() << " still dirty " << *dn << dendl;
11fdf7f2 2646 ceph_assert(dn->is_dirty());
7c673cae
FG
2647 }
2648 }
2649
2650 // finishers?
2651 bool were_waiters = !waiting_for_commit.empty();
2652
94b18763
FG
2653 auto it = waiting_for_commit.begin();
2654 while (it != waiting_for_commit.end()) {
2655 auto _it = it;
2656 ++_it;
2657 if (it->first > committed_version) {
2658 dout(10) << " there are waiters for " << it->first << ", committing again" << dendl;
2659 _commit(it->first, -1);
7c673cae
FG
2660 break;
2661 }
11fdf7f2 2662 MDSContext::vec t;
94b18763
FG
2663 for (const auto &waiter : it->second)
2664 t.push_back(waiter);
f67539c2 2665 mdcache->mds->queue_waiters(t);
94b18763
FG
2666 waiting_for_commit.erase(it);
2667 it = _it;
7c673cae
FG
2668 }
2669
2670 // try drop dentries in this dirfrag if it's about to be purged
31f18b77
FG
2671 if (!inode->is_base() && get_parent_dir()->inode->is_stray() &&
2672 inode->snaprealm)
f67539c2 2673 mdcache->maybe_eval_stray(inode, true);
7c673cae
FG
2674
2675 // unpin if we kicked the last waiter.
2676 if (were_waiters &&
2677 waiting_for_commit.empty())
2678 auth_unpin(this);
2679}
2680
2681
2682
2683
2684// IMPORT/EXPORT
2685
f67539c2
TL
2686mds_rank_t CDir::get_export_pin(bool inherit) const
2687{
2688 mds_rank_t export_pin = inode->get_export_pin(inherit);
2689 if (export_pin == MDS_RANK_EPHEMERAL_DIST)
2690 export_pin = mdcache->hash_into_rank_bucket(ino(), get_frag());
2691 else if (export_pin == MDS_RANK_EPHEMERAL_RAND)
2692 export_pin = mdcache->hash_into_rank_bucket(ino());
2693 return export_pin;
2694}
2695
2696bool CDir::is_exportable(mds_rank_t dest) const
2697{
2698 mds_rank_t export_pin = get_export_pin();
2699 if (export_pin == dest)
2700 return true;
2701 if (export_pin >= 0)
2702 return false;
2703 return true;
2704}
2705
7c673cae
FG
2706void CDir::encode_export(bufferlist& bl)
2707{
9f95a23c 2708 ENCODE_START(1, 1, bl);
11fdf7f2
TL
2709 ceph_assert(!is_projected());
2710 encode(first, bl);
f67539c2 2711 encode(*fnode, bl);
11fdf7f2
TL
2712 encode(dirty_old_rstat, bl);
2713 encode(committed_version, bl);
7c673cae 2714
11fdf7f2
TL
2715 encode(state, bl);
2716 encode(dir_rep, bl);
7c673cae 2717
11fdf7f2
TL
2718 encode(pop_me, bl);
2719 encode(pop_auth_subtree, bl);
7c673cae 2720
11fdf7f2
TL
2721 encode(dir_rep_by, bl);
2722 encode(get_replicas(), bl);
7c673cae
FG
2723
2724 get(PIN_TEMPEXPORTING);
9f95a23c 2725 ENCODE_FINISH(bl);
7c673cae
FG
2726}
2727
11fdf7f2 2728void CDir::finish_export()
7c673cae
FG
2729{
2730 state &= MASK_STATE_EXPORT_KEPT;
11fdf7f2
TL
2731 pop_nested.sub(pop_auth_subtree);
2732 pop_auth_subtree_nested.sub(pop_auth_subtree);
2733 pop_me.zero();
2734 pop_auth_subtree.zero();
7c673cae
FG
2735 put(PIN_TEMPEXPORTING);
2736 dirty_old_rstat.clear();
2737}
2738
11fdf7f2 2739void CDir::decode_import(bufferlist::const_iterator& blp, LogSegment *ls)
7c673cae 2740{
9f95a23c 2741 DECODE_START(1, blp);
11fdf7f2 2742 decode(first, blp);
f67539c2
TL
2743 {
2744 auto _fnode = allocate_fnode();
2745 decode(*_fnode, blp);
2746 reset_fnode(std::move(_fnode));
2747 }
2748 update_projected_version();
2749
11fdf7f2 2750 decode(dirty_old_rstat, blp);
11fdf7f2 2751 decode(committed_version, blp);
7c673cae
FG
2752 committing_version = committed_version;
2753
2754 unsigned s;
11fdf7f2 2755 decode(s, blp);
7c673cae
FG
2756 state &= MASK_STATE_IMPORT_KEPT;
2757 state_set(STATE_AUTH | (s & MASK_STATE_EXPORTED));
2758
2759 if (is_dirty()) {
2760 get(PIN_DIRTY);
2761 _mark_dirty(ls);
2762 }
2763
11fdf7f2 2764 decode(dir_rep, blp);
7c673cae 2765
11fdf7f2
TL
2766 decode(pop_me, blp);
2767 decode(pop_auth_subtree, blp);
2768 pop_nested.add(pop_auth_subtree);
2769 pop_auth_subtree_nested.add(pop_auth_subtree);
7c673cae 2770
11fdf7f2
TL
2771 decode(dir_rep_by, blp);
2772 decode(get_replicas(), blp);
181888fb 2773 if (is_replicated()) get(PIN_REPLICATED);
7c673cae
FG
2774
2775 replica_nonce = 0; // no longer defined
2776
2777 // did we import some dirty scatterlock data?
2778 if (dirty_old_rstat.size() ||
f67539c2
TL
2779 !(fnode->rstat == fnode->accounted_rstat)) {
2780 mdcache->mds->locker->mark_updated_scatterlock(&inode->nestlock);
7c673cae
FG
2781 ls->dirty_dirfrag_nest.push_back(&inode->item_dirty_dirfrag_nest);
2782 }
f67539c2
TL
2783 if (!(fnode->fragstat == fnode->accounted_fragstat)) {
2784 mdcache->mds->locker->mark_updated_scatterlock(&inode->filelock);
7c673cae
FG
2785 ls->dirty_dirfrag_dir.push_back(&inode->item_dirty_dirfrag_dir);
2786 }
2787 if (is_dirty_dft()) {
2788 if (inode->dirfragtreelock.get_state() != LOCK_MIX &&
2789 inode->dirfragtreelock.is_stable()) {
2790 // clear stale dirtydft
2791 state_clear(STATE_DIRTYDFT);
2792 } else {
f67539c2 2793 mdcache->mds->locker->mark_updated_scatterlock(&inode->dirfragtreelock);
7c673cae
FG
2794 ls->dirty_dirfrag_dirfragtree.push_back(&inode->item_dirty_dirfrag_dirfragtree);
2795 }
2796 }
9f95a23c 2797 DECODE_FINISH(blp);
7c673cae
FG
2798}
2799
11fdf7f2
TL
2800void CDir::abort_import()
2801{
2802 ceph_assert(is_auth());
2803 state_clear(CDir::STATE_AUTH);
2804 remove_bloom();
2805 clear_replica_map();
2806 set_replica_nonce(CDir::EXPORT_NONCE);
2807 if (is_dirty())
2808 mark_clean();
7c673cae 2809
11fdf7f2
TL
2810 pop_nested.sub(pop_auth_subtree);
2811 pop_auth_subtree_nested.sub(pop_auth_subtree);
2812 pop_me.zero();
2813 pop_auth_subtree.zero();
2814}
7c673cae 2815
11fdf7f2
TL
2816void CDir::encode_dirstat(bufferlist& bl, const session_info_t& info, const DirStat& ds) {
2817 if (info.has_feature(CEPHFS_FEATURE_REPLY_ENCODING)) {
2818 ENCODE_START(1, 1, bl);
2819 encode(ds.frag, bl);
2820 encode(ds.auth, bl);
2821 encode(ds.dist, bl);
2822 ENCODE_FINISH(bl);
2823 }
2824 else {
2825 encode(ds.frag, bl);
2826 encode(ds.auth, bl);
2827 encode(ds.dist, bl);
2828 }
2829}
7c673cae
FG
2830
2831/********************************
2832 * AUTHORITY
2833 */
2834
2835/*
2836 * if dir_auth.first == parent, auth is same as inode.
2837 * unless .second != unknown, in which case that sticks.
2838 */
2839mds_authority_t CDir::authority() const
2840{
2841 if (is_subtree_root())
2842 return dir_auth;
2843 else
2844 return inode->authority();
2845}
2846
2847/** is_subtree_root()
2848 * true if this is an auth delegation point.
2849 * that is, dir_auth != default (parent,unknown)
2850 *
2851 * some key observations:
2852 * if i am auth:
2853 * - any region bound will be an export, or frozen.
2854 *
2855 * note that this DOES heed dir_auth.pending
2856 */
2857/*
2858bool CDir::is_subtree_root()
2859{
2860 if (dir_auth == CDIR_AUTH_DEFAULT) {
2861 //dout(10) << "is_subtree_root false " << dir_auth << " != " << CDIR_AUTH_DEFAULT
2862 //<< " on " << ino() << dendl;
2863 return false;
2864 } else {
2865 //dout(10) << "is_subtree_root true " << dir_auth << " != " << CDIR_AUTH_DEFAULT
2866 //<< " on " << ino() << dendl;
2867 return true;
2868 }
2869}
2870*/
2871
2872/** contains(x)
2873 * true if we are x, or an ancestor of x
2874 */
2875bool CDir::contains(CDir *x)
2876{
2877 while (1) {
2878 if (x == this)
2879 return true;
2880 x = x->get_inode()->get_projected_parent_dir();
2881 if (x == 0)
2882 return false;
2883 }
2884}
2885
f67539c2
TL
2886bool CDir::can_rep() const
2887{
2888 if (!is_rep())
2889 return true;
2890
2891 unsigned mds_num = mdcache->mds->get_mds_map()->get_num_mds(MDSMap::STATE_ACTIVE);
2892 if ((mds_num - 1) > get_replicas().size())
2893 return true;
2894
2895 return false;
2896}
7c673cae
FG
2897
2898
2899/** set_dir_auth
2900 */
11fdf7f2 2901void CDir::set_dir_auth(const mds_authority_t &a)
7c673cae
FG
2902{
2903 dout(10) << "setting dir_auth=" << a
2904 << " from " << dir_auth
2905 << " on " << *this << dendl;
2906
2907 bool was_subtree = is_subtree_root();
2908 bool was_ambiguous = dir_auth.second >= 0;
2909
2910 // set it.
2911 dir_auth = a;
2912
2913 // new subtree root?
2914 if (!was_subtree && is_subtree_root()) {
2915 dout(10) << " new subtree root, adjusting auth_pins" << dendl;
1adf2230 2916
11fdf7f2
TL
2917 if (freeze_tree_state) {
2918 // only by CDir::_freeze_tree()
2919 ceph_assert(is_freezing_tree_root());
2920 }
1adf2230 2921
11fdf7f2 2922 inode->num_subtree_roots++;
7c673cae
FG
2923
2924 // unpin parent of frozen dir/tree?
224ce89b 2925 if (inode->is_auth()) {
11fdf7f2 2926 ceph_assert(!is_frozen_tree_root());
224ce89b
WB
2927 if (is_frozen_dir())
2928 inode->auth_unpin(this);
2929 }
7c673cae
FG
2930 }
2931 if (was_subtree && !is_subtree_root()) {
2932 dout(10) << " old subtree root, adjusting auth_pins" << dendl;
1adf2230
AA
2933
2934 inode->num_subtree_roots--;
7c673cae
FG
2935
2936 // pin parent of frozen dir/tree?
224ce89b 2937 if (inode->is_auth()) {
11fdf7f2 2938 ceph_assert(!is_frozen_tree_root());
224ce89b
WB
2939 if (is_frozen_dir())
2940 inode->auth_pin(this);
2941 }
7c673cae
FG
2942 }
2943
2944 // newly single auth?
2945 if (was_ambiguous && dir_auth.second == CDIR_AUTH_UNKNOWN) {
11fdf7f2 2946 MDSContext::vec ls;
7c673cae 2947 take_waiting(WAIT_SINGLEAUTH, ls);
f67539c2 2948 mdcache->mds->queue_waiters(ls);
7c673cae
FG
2949 }
2950}
2951
7c673cae
FG
2952/*****************************************
2953 * AUTH PINS and FREEZING
2954 *
2955 * the basic plan is that auth_pins only exist in auth regions, and they
2956 * prevent a freeze (and subsequent auth change).
2957 *
2958 * however, we also need to prevent a parent from freezing if a child is frozen.
2959 * for that reason, the parent inode of a frozen directory is auth_pinned.
2960 *
2961 * the oddity is when the frozen directory is a subtree root. if that's the case,
2962 * the parent inode isn't frozen. which means that when subtree authority is adjusted
2963 * at the bounds, inodes for any frozen bound directories need to get auth_pins at that
2964 * time.
2965 *
2966 */
2967
2968void CDir::auth_pin(void *by)
2969{
2970 if (auth_pins == 0)
2971 get(PIN_AUTHPIN);
2972 auth_pins++;
2973
2974#ifdef MDS_AUTHPIN_SET
2975 auth_pin_set.insert(by);
2976#endif
2977
11fdf7f2 2978 dout(10) << "auth_pin by " << by << " on " << *this << " count now " << auth_pins << dendl;
7c673cae 2979
11fdf7f2
TL
2980 if (freeze_tree_state)
2981 freeze_tree_state->auth_pins += 1;
7c673cae
FG
2982}
2983
2984void CDir::auth_unpin(void *by)
2985{
2986 auth_pins--;
2987
2988#ifdef MDS_AUTHPIN_SET
11fdf7f2
TL
2989 {
2990 auto it = auth_pin_set.find(by);
2991 ceph_assert(it != auth_pin_set.end());
2992 auth_pin_set.erase(it);
2993 }
7c673cae
FG
2994#endif
2995 if (auth_pins == 0)
2996 put(PIN_AUTHPIN);
2997
11fdf7f2
TL
2998 dout(10) << "auth_unpin by " << by << " on " << *this << " count now " << auth_pins << dendl;
2999 ceph_assert(auth_pins >= 0);
3000
3001 if (freeze_tree_state)
3002 freeze_tree_state->auth_pins -= 1;
7c673cae
FG
3003
3004 maybe_finish_freeze(); // pending freeze?
7c673cae
FG
3005}
3006
11fdf7f2 3007void CDir::adjust_nested_auth_pins(int dirinc, void *by)
7c673cae 3008{
11fdf7f2 3009 ceph_assert(dirinc);
7c673cae
FG
3010 dir_auth_pins += dirinc;
3011
11fdf7f2 3012 dout(15) << __func__ << " " << dirinc << " on " << *this
7c673cae 3013 << " by " << by << " count now "
11fdf7f2
TL
3014 << auth_pins << "/" << dir_auth_pins << dendl;
3015 ceph_assert(dir_auth_pins >= 0);
7c673cae 3016
11fdf7f2
TL
3017 if (freeze_tree_state)
3018 freeze_tree_state->auth_pins += dirinc;
7c673cae 3019
11fdf7f2
TL
3020 if (dirinc < 0)
3021 maybe_finish_freeze(); // pending freeze?
7c673cae
FG
3022}
3023
3024#ifdef MDS_VERIFY_FRAGSTAT
3025void CDir::verify_fragstat()
3026{
11fdf7f2 3027 ceph_assert(is_complete());
7c673cae
FG
3028 if (inode->is_stray())
3029 return;
3030
3031 frag_info_t c;
3032 memset(&c, 0, sizeof(c));
3033
94b18763 3034 for (auto it = items.begin();
7c673cae
FG
3035 it != items.end();
3036 ++it) {
3037 CDentry *dn = it->second;
3038 if (dn->is_null())
3039 continue;
3040
3041 dout(10) << " " << *dn << dendl;
3042 if (dn->is_primary())
3043 dout(10) << " " << *dn->inode << dendl;
3044
3045 if (dn->is_primary()) {
3046 if (dn->inode->is_dir())
3047 c.nsubdirs++;
3048 else
3049 c.nfiles++;
3050 }
3051 if (dn->is_remote()) {
3052 if (dn->get_remote_d_type() == DT_DIR)
3053 c.nsubdirs++;
3054 else
3055 c.nfiles++;
3056 }
3057 }
3058
f67539c2
TL
3059 if (c.nsubdirs != fnode->fragstat.nsubdirs ||
3060 c.nfiles != fnode->fragstat.nfiles) {
3061 dout(0) << "verify_fragstat failed " << fnode->fragstat << " on " << *this << dendl;
7c673cae
FG
3062 dout(0) << " i count " << c << dendl;
3063 ceph_abort();
3064 } else {
f67539c2 3065 dout(0) << "verify_fragstat ok " << fnode->fragstat << " on " << *this << dendl;
7c673cae
FG
3066 }
3067}
3068#endif
3069
3070/*****************************************************************************
3071 * FREEZING
3072 */
3073
3074// FREEZE TREE
3075
11fdf7f2
TL
3076void CDir::_walk_tree(std::function<bool(CDir*)> callback)
3077{
11fdf7f2
TL
3078 deque<CDir*> dfq;
3079 dfq.push_back(this);
3080
11fdf7f2
TL
3081 while (!dfq.empty()) {
3082 CDir *dir = dfq.front();
3083 dfq.pop_front();
3084
3085 for (auto& p : *dir) {
3086 CDentry *dn = p.second;
3087 if (!dn->get_linkage()->is_primary())
3088 continue;
3089 CInode *in = dn->get_linkage()->get_inode();
3090 if (!in->is_dir())
3091 continue;
3092
9f95a23c 3093 auto&& dfv = in->get_nested_dirfrags();
11fdf7f2
TL
3094 for (auto& dir : dfv) {
3095 auto ret = callback(dir);
3096 if (ret)
3097 dfq.push_back(dir);
3098 }
11fdf7f2
TL
3099 }
3100 }
3101}
3102
7c673cae
FG
3103bool CDir::freeze_tree()
3104{
11fdf7f2
TL
3105 ceph_assert(!is_frozen());
3106 ceph_assert(!is_freezing());
3107 ceph_assert(!freeze_tree_state);
7c673cae
FG
3108
3109 auth_pin(this);
11fdf7f2
TL
3110
3111 // Travese the subtree to mark dirfrags as 'freezing' (set freeze_tree_state)
3112 // and to accumulate auth pins and record total count in freeze_tree_state.
3113 // when auth unpin an 'freezing' object, the counter in freeze_tree_state also
3114 // gets decreased. Subtree become 'frozen' when the counter reaches zero.
3115 freeze_tree_state = std::make_shared<freeze_tree_state_t>(this);
3116 freeze_tree_state->auth_pins += get_auth_pins() + get_dir_auth_pins();
9f95a23c 3117 if (!lock_caches_with_auth_pins.empty())
f67539c2 3118 mdcache->mds->locker->invalidate_lock_caches(this);
11fdf7f2
TL
3119
3120 _walk_tree([this](CDir *dir) {
3121 if (dir->freeze_tree_state)
3122 return false;
3123 dir->freeze_tree_state = freeze_tree_state;
3124 freeze_tree_state->auth_pins += dir->get_auth_pins() + dir->get_dir_auth_pins();
9f95a23c 3125 if (!dir->lock_caches_with_auth_pins.empty())
f67539c2 3126 mdcache->mds->locker->invalidate_lock_caches(dir);
11fdf7f2 3127 return true;
9f95a23c 3128 }
11fdf7f2
TL
3129 );
3130
7c673cae
FG
3131 if (is_freezeable(true)) {
3132 _freeze_tree();
3133 auth_unpin(this);
3134 return true;
3135 } else {
3136 state_set(STATE_FREEZINGTREE);
3137 ++num_freezing_trees;
3138 dout(10) << "freeze_tree waiting " << *this << dendl;
3139 return false;
3140 }
3141}
3142
3143void CDir::_freeze_tree()
3144{
11fdf7f2
TL
3145 dout(10) << __func__ << " " << *this << dendl;
3146 ceph_assert(is_freezeable(true));
7c673cae 3147
11fdf7f2
TL
3148 if (freeze_tree_state) {
3149 ceph_assert(is_auth());
3150 } else {
3151 ceph_assert(!is_auth());
3152 freeze_tree_state = std::make_shared<freeze_tree_state_t>(this);
7c673cae 3153 }
11fdf7f2 3154 freeze_tree_state->frozen = true;
224ce89b
WB
3155
3156 if (is_auth()) {
3157 mds_authority_t auth;
3158 bool was_subtree = is_subtree_root();
3159 if (was_subtree) {
3160 auth = get_dir_auth();
3161 } else {
3162 // temporarily prevent parent subtree from becoming frozen.
3163 inode->auth_pin(this);
3164 // create new subtree
3165 auth = authority();
3166 }
3167
11fdf7f2
TL
3168 _walk_tree([this, &auth] (CDir *dir) {
3169 if (dir->freeze_tree_state != freeze_tree_state) {
f67539c2 3170 mdcache->adjust_subtree_auth(dir, auth);
11fdf7f2
TL
3171 return false;
3172 }
3173 return true;
3174 }
3175 );
3176
3177 ceph_assert(auth.first >= 0);
3178 ceph_assert(auth.second == CDIR_AUTH_UNKNOWN);
224ce89b 3179 auth.second = auth.first;
f67539c2 3180 mdcache->adjust_subtree_auth(this, auth);
224ce89b
WB
3181 if (!was_subtree)
3182 inode->auth_unpin(this);
11fdf7f2
TL
3183 } else {
3184 // importing subtree ?
3185 _walk_tree([this] (CDir *dir) {
3186 ceph_assert(!dir->freeze_tree_state);
3187 dir->freeze_tree_state = freeze_tree_state;
3188 return true;
3189 }
3190 );
3191 }
3192
3193 // twiddle state
3194 if (state_test(STATE_FREEZINGTREE)) {
3195 state_clear(STATE_FREEZINGTREE);
3196 --num_freezing_trees;
224ce89b
WB
3197 }
3198
7c673cae
FG
3199 state_set(STATE_FROZENTREE);
3200 ++num_frozen_trees;
3201 get(PIN_FROZEN);
7c673cae
FG
3202}
3203
3204void CDir::unfreeze_tree()
3205{
11fdf7f2
TL
3206 dout(10) << __func__ << " " << *this << dendl;
3207
3208 MDSContext::vec unfreeze_waiters;
3209 take_waiting(WAIT_UNFREEZE, unfreeze_waiters);
3210
3211 if (freeze_tree_state) {
3212 _walk_tree([this, &unfreeze_waiters](CDir *dir) {
3213 if (dir->freeze_tree_state != freeze_tree_state)
3214 return false;
3215 dir->freeze_tree_state.reset();
3216 dir->take_waiting(WAIT_UNFREEZE, unfreeze_waiters);
3217 return true;
3218 }
3219 );
3220 }
7c673cae
FG
3221
3222 if (state_test(STATE_FROZENTREE)) {
3223 // frozen. unfreeze.
3224 state_clear(STATE_FROZENTREE);
3225 --num_frozen_trees;
3226
3227 put(PIN_FROZEN);
3228
224ce89b
WB
3229 if (is_auth()) {
3230 // must be subtree
11fdf7f2 3231 ceph_assert(is_subtree_root());
224ce89b
WB
3232 // for debug purpose, caller should ensure 'dir_auth.second == dir_auth.first'
3233 mds_authority_t auth = get_dir_auth();
11fdf7f2
TL
3234 ceph_assert(auth.first >= 0);
3235 ceph_assert(auth.second == auth.first);
224ce89b 3236 auth.second = CDIR_AUTH_UNKNOWN;
f67539c2 3237 mdcache->adjust_subtree_auth(this, auth);
224ce89b 3238 }
11fdf7f2 3239 freeze_tree_state.reset();
7c673cae 3240 } else {
11fdf7f2 3241 ceph_assert(state_test(STATE_FREEZINGTREE));
7c673cae
FG
3242
3243 // freezing. stop it.
7c673cae
FG
3244 state_clear(STATE_FREEZINGTREE);
3245 --num_freezing_trees;
11fdf7f2
TL
3246 freeze_tree_state.reset();
3247
3248 finish_waiting(WAIT_FROZEN, -1);
7c673cae 3249 auth_unpin(this);
7c673cae 3250 }
11fdf7f2 3251
f67539c2 3252 mdcache->mds->queue_waiters(unfreeze_waiters);
11fdf7f2
TL
3253}
3254
3255void CDir::adjust_freeze_after_rename(CDir *dir)
3256{
3257 if (!freeze_tree_state || dir->freeze_tree_state != freeze_tree_state)
3258 return;
3259 CDir *newdir = dir->get_inode()->get_parent_dir();
3260 if (newdir == this || newdir->freeze_tree_state == freeze_tree_state)
3261 return;
3262
3263 ceph_assert(!freeze_tree_state->frozen);
3264 ceph_assert(get_dir_auth_pins() > 0);
3265
3266 MDSContext::vec unfreeze_waiters;
3267
3268 auto unfreeze = [this, &unfreeze_waiters](CDir *dir) {
3269 if (dir->freeze_tree_state != freeze_tree_state)
3270 return false;
3271 int dec = dir->get_auth_pins() + dir->get_dir_auth_pins();
3272 // shouldn't become zero because srcdn of rename was auth pinned
3273 ceph_assert(freeze_tree_state->auth_pins > dec);
3274 freeze_tree_state->auth_pins -= dec;
3275 dir->freeze_tree_state.reset();
3276 dir->take_waiting(WAIT_UNFREEZE, unfreeze_waiters);
3277 return true;
3278 };
3279
3280 unfreeze(dir);
3281 dir->_walk_tree(unfreeze);
3282
f67539c2 3283 mdcache->mds->queue_waiters(unfreeze_waiters);
7c673cae
FG
3284}
3285
91327a77 3286bool CDir::can_auth_pin(int *err_ret) const
7c673cae 3287{
91327a77
AA
3288 int err;
3289 if (!is_auth()) {
3290 err = ERR_NOT_AUTH;
3291 } else if (is_freezing_dir() || is_frozen_dir()) {
3292 err = ERR_FRAGMENTING_DIR;
3293 } else {
3294 auto p = is_freezing_or_frozen_tree();
3295 if (p.first || p.second) {
3296 err = ERR_EXPORTING_TREE;
3297 } else {
3298 err = 0;
3299 }
3300 }
3301 if (err && err_ret)
3302 *err_ret = err;
3303 return !err;
3304}
3305
7c673cae
FG
3306class C_Dir_AuthUnpin : public CDirContext {
3307 public:
3308 explicit C_Dir_AuthUnpin(CDir *d) : CDirContext(d) {}
3309 void finish(int r) override {
3310 dir->auth_unpin(dir->get_inode());
3311 }
3312};
3313
3314void CDir::maybe_finish_freeze()
3315{
11fdf7f2 3316 if (dir_auth_pins != 0)
7c673cae
FG
3317 return;
3318
3319 // we can freeze the _dir_ even with nested pins...
3320 if (state_test(STATE_FREEZINGDIR)) {
11fdf7f2
TL
3321 if (auth_pins == 1) {
3322 _freeze_dir();
3323 auth_unpin(this);
3324 finish_waiting(WAIT_FROZEN);
3325 }
7c673cae
FG
3326 }
3327
11fdf7f2
TL
3328 if (freeze_tree_state) {
3329 if (freeze_tree_state->frozen ||
3330 freeze_tree_state->auth_pins != 1)
3331 return;
3332
3333 if (freeze_tree_state->dir != this) {
3334 freeze_tree_state->dir->maybe_finish_freeze();
3335 return;
3336 }
3337
3338 ceph_assert(state_test(STATE_FREEZINGTREE));
7c673cae 3339
7c673cae 3340 if (!is_subtree_root() && inode->is_frozen()) {
11fdf7f2 3341 dout(10) << __func__ << " !subtree root and frozen inode, waiting for unfreeze on " << inode << dendl;
7c673cae
FG
3342 // retake an auth_pin...
3343 auth_pin(inode);
3344 // and release it when the parent inode unfreezes
3345 inode->add_waiter(WAIT_UNFREEZE, new C_Dir_AuthUnpin(this));
3346 return;
3347 }
3348
3349 _freeze_tree();
3350 auth_unpin(this);
3351 finish_waiting(WAIT_FROZEN);
3352 }
3353}
3354
3355
3356
3357// FREEZE DIR
3358
3359bool CDir::freeze_dir()
3360{
11fdf7f2
TL
3361 ceph_assert(!is_frozen());
3362 ceph_assert(!is_freezing());
7c673cae
FG
3363
3364 auth_pin(this);
3365 if (is_freezeable_dir(true)) {
3366 _freeze_dir();
3367 auth_unpin(this);
3368 return true;
3369 } else {
3370 state_set(STATE_FREEZINGDIR);
9f95a23c 3371 if (!lock_caches_with_auth_pins.empty())
f67539c2 3372 mdcache->mds->locker->invalidate_lock_caches(this);
7c673cae
FG
3373 dout(10) << "freeze_dir + wait " << *this << dendl;
3374 return false;
3375 }
3376}
3377
3378void CDir::_freeze_dir()
3379{
11fdf7f2 3380 dout(10) << __func__ << " " << *this << dendl;
7c673cae
FG
3381 //assert(is_freezeable_dir(true));
3382 // not always true during split because the original fragment may have frozen a while
3383 // ago and we're just now getting around to breaking it up.
3384
3385 state_clear(STATE_FREEZINGDIR);
3386 state_set(STATE_FROZENDIR);
3387 get(PIN_FROZEN);
3388
3389 if (is_auth() && !is_subtree_root())
3390 inode->auth_pin(this); // auth_pin for duration of freeze
3391}
3392
3393
3394void CDir::unfreeze_dir()
3395{
11fdf7f2 3396 dout(10) << __func__ << " " << *this << dendl;
7c673cae
FG
3397
3398 if (state_test(STATE_FROZENDIR)) {
3399 state_clear(STATE_FROZENDIR);
3400 put(PIN_FROZEN);
3401
3402 // unpin (may => FREEZEABLE) FIXME: is this order good?
3403 if (is_auth() && !is_subtree_root())
3404 inode->auth_unpin(this);
3405
3406 finish_waiting(WAIT_UNFREEZE);
3407 } else {
3408 finish_waiting(WAIT_FROZEN, -1);
3409
3410 // still freezing. stop.
11fdf7f2 3411 ceph_assert(state_test(STATE_FREEZINGDIR));
7c673cae
FG
3412 state_clear(STATE_FREEZINGDIR);
3413 auth_unpin(this);
3414
3415 finish_waiting(WAIT_UNFREEZE);
3416 }
3417}
3418
9f95a23c
TL
3419void CDir::enable_frozen_inode()
3420{
3421 ceph_assert(frozen_inode_suppressed > 0);
3422 if (--frozen_inode_suppressed == 0) {
3423 for (auto p = freezing_inodes.begin(); !p.end(); ) {
3424 CInode *in = *p;
3425 ++p;
3426 ceph_assert(in->is_freezing_inode());
3427 in->maybe_finish_freeze_inode();
3428 }
3429 }
3430}
3431
7c673cae
FG
3432/**
3433 * Slightly less complete than operator<<, because this is intended
3434 * for identifying a directory and its state rather than for dumping
3435 * debug output.
3436 */
11fdf7f2 3437void CDir::dump(Formatter *f, int flags) const
7c673cae 3438{
11fdf7f2
TL
3439 ceph_assert(f != NULL);
3440 if (flags & DUMP_PATH) {
3441 f->dump_stream("path") << get_path();
3442 }
3443 if (flags & DUMP_DIRFRAG) {
3444 f->dump_stream("dirfrag") << dirfrag();
3445 }
3446 if (flags & DUMP_SNAPID_FIRST) {
3447 f->dump_int("snapid_first", first);
3448 }
3449 if (flags & DUMP_VERSIONS) {
3450 f->dump_stream("projected_version") << get_projected_version();
3451 f->dump_stream("version") << get_version();
3452 f->dump_stream("committing_version") << get_committing_version();
3453 f->dump_stream("committed_version") << get_committed_version();
3454 }
3455 if (flags & DUMP_REP) {
3456 f->dump_bool("is_rep", is_rep());
3457 }
3458 if (flags & DUMP_DIR_AUTH) {
3459 if (get_dir_auth() != CDIR_AUTH_DEFAULT) {
3460 if (get_dir_auth().second == CDIR_AUTH_UNKNOWN) {
3461 f->dump_stream("dir_auth") << get_dir_auth().first;
3462 } else {
3463 f->dump_stream("dir_auth") << get_dir_auth();
3464 }
7c673cae 3465 } else {
11fdf7f2 3466 f->dump_string("dir_auth", "");
7c673cae 3467 }
11fdf7f2
TL
3468 }
3469 if (flags & DUMP_STATES) {
3470 f->open_array_section("states");
3471 MDSCacheObject::dump_states(f);
3472 if (state_test(CDir::STATE_COMPLETE)) f->dump_string("state", "complete");
3473 if (state_test(CDir::STATE_FREEZINGTREE)) f->dump_string("state", "freezingtree");
3474 if (state_test(CDir::STATE_FROZENTREE)) f->dump_string("state", "frozentree");
3475 if (state_test(CDir::STATE_FROZENDIR)) f->dump_string("state", "frozendir");
3476 if (state_test(CDir::STATE_FREEZINGDIR)) f->dump_string("state", "freezingdir");
3477 if (state_test(CDir::STATE_EXPORTBOUND)) f->dump_string("state", "exportbound");
3478 if (state_test(CDir::STATE_IMPORTBOUND)) f->dump_string("state", "importbound");
3479 if (state_test(CDir::STATE_BADFRAG)) f->dump_string("state", "badfrag");
3480 f->close_section();
3481 }
3482 if (flags & DUMP_MDS_CACHE_OBJECT) {
3483 MDSCacheObject::dump(f);
3484 }
3485 if (flags & DUMP_ITEMS) {
3486 f->open_array_section("dentries");
3487 for (auto &p : items) {
3488 CDentry *dn = p.second;
3489 f->open_object_section("dentry");
3490 dn->dump(f);
3491 f->close_section();
3492 }
3493 f->close_section();
3494 }
7c673cae
FG
3495}
3496
11fdf7f2 3497void CDir::dump_load(Formatter *f)
28e407b8
AA
3498{
3499 f->dump_stream("path") << get_path();
3500 f->dump_stream("dirfrag") << dirfrag();
3501
3502 f->open_object_section("pop_me");
11fdf7f2 3503 pop_me.dump(f);
28e407b8
AA
3504 f->close_section();
3505
3506 f->open_object_section("pop_nested");
11fdf7f2 3507 pop_nested.dump(f);
28e407b8
AA
3508 f->close_section();
3509
3510 f->open_object_section("pop_auth_subtree");
11fdf7f2 3511 pop_auth_subtree.dump(f);
28e407b8
AA
3512 f->close_section();
3513
3514 f->open_object_section("pop_auth_subtree_nested");
11fdf7f2 3515 pop_auth_subtree_nested.dump(f);
28e407b8
AA
3516 f->close_section();
3517}
3518
7c673cae
FG
3519/****** Scrub Stuff *******/
3520
3521void CDir::scrub_info_create() const
3522{
11fdf7f2 3523 ceph_assert(!scrub_infop);
7c673cae
FG
3524
3525 // break out of const-land to set up implicit initial state
3526 CDir *me = const_cast<CDir*>(this);
f67539c2 3527 const auto& pf = me->get_projected_fnode();
7c673cae
FG
3528
3529 std::unique_ptr<scrub_info_t> si(new scrub_info_t());
3530
f67539c2
TL
3531 si->last_recursive.version = pf->recursive_scrub_version;
3532 si->last_recursive.time = pf->recursive_scrub_stamp;
7c673cae 3533
f67539c2
TL
3534 si->last_local.version = pf->localized_scrub_version;
3535 si->last_local.time = pf->localized_scrub_stamp;
7c673cae
FG
3536
3537 me->scrub_infop.swap(si);
3538}
3539
f67539c2 3540void CDir::scrub_initialize(const ScrubHeaderRef& header)
7c673cae 3541{
f67539c2 3542 ceph_assert(header);
7c673cae
FG
3543 // FIXME: weird implicit construction, is someone else meant
3544 // to be calling scrub_info_create first?
3545 scrub_info();
7c673cae
FG
3546 scrub_infop->directory_scrubbing = true;
3547 scrub_infop->header = header;
f67539c2 3548 header->inc_num_pending();
7c673cae
FG
3549}
3550
f67539c2 3551void CDir::scrub_aborted() {
7c673cae 3552 dout(20) << __func__ << dendl;
f67539c2 3553 ceph_assert(scrub_is_in_progress());
7c673cae 3554
f67539c2
TL
3555 scrub_infop->last_scrub_dirty = false;
3556 scrub_infop->directory_scrubbing = false;
3557 scrub_infop->header->dec_num_pending();
3558 scrub_infop.reset();
7c673cae
FG
3559}
3560
f67539c2 3561void CDir::scrub_finished()
7c673cae
FG
3562{
3563 dout(20) << __func__ << dendl;
f67539c2 3564 ceph_assert(scrub_is_in_progress());
7c673cae 3565
f67539c2
TL
3566 scrub_infop->last_local.time = ceph_clock_now();
3567 scrub_infop->last_local.version = get_version();
3568 if (scrub_infop->header->get_recursive())
3569 scrub_infop->last_recursive = scrub_infop->last_local;
7c673cae 3570
f67539c2 3571 scrub_infop->last_scrub_dirty = true;
7c673cae 3572
f67539c2
TL
3573 scrub_infop->directory_scrubbing = false;
3574 scrub_infop->header->dec_num_pending();
7c673cae
FG
3575}
3576
3577void CDir::scrub_maybe_delete_info()
3578{
3579 if (scrub_infop &&
3580 !scrub_infop->directory_scrubbing &&
f67539c2 3581 !scrub_infop->last_scrub_dirty)
7c673cae 3582 scrub_infop.reset();
7c673cae
FG
3583}
3584
3585bool CDir::scrub_local()
3586{
11fdf7f2 3587 ceph_assert(is_complete());
f67539c2
TL
3588 bool good = check_rstats(true);
3589 if (!good && scrub_infop->header->get_repair()) {
3590 mdcache->repair_dirfrag_stats(this);
3591 scrub_infop->header->set_repaired();
7c673cae 3592 }
f67539c2 3593 return good;
7c673cae
FG
3594}
3595
3596std::string CDir::get_path() const
3597{
3598 std::string path;
3599 get_inode()->make_path_string(path, true);
3600 return path;
3601}
3602
3603bool CDir::should_split_fast() const
3604{
3605 // Max size a fragment can be before trigger fast splitting
11fdf7f2 3606 int fast_limit = g_conf()->mds_bal_split_size * g_conf()->mds_bal_fragment_fast_factor;
7c673cae
FG
3607
3608 // Fast path: the sum of accounted size and null dentries does not
3609 // exceed threshold: we definitely are not over it.
3610 if (get_frag_size() + get_num_head_null() <= fast_limit) {
3611 return false;
3612 }
3613
3614 // Fast path: the accounted size of the frag exceeds threshold: we
3615 // definitely are over it
3616 if (get_frag_size() > fast_limit) {
3617 return true;
3618 }
3619
3620 int64_t effective_size = 0;
3621
3622 for (const auto &p : items) {
3623 const CDentry *dn = p.second;
3624 if (!dn->get_projected_linkage()->is_null()) {
3625 effective_size++;
3626 }
3627 }
3628
3629 return effective_size > fast_limit;
3630}
3631
f67539c2
TL
3632bool CDir::should_merge() const
3633{
3634 if (get_frag() == frag_t())
3635 return false;
3636
3637 if (inode->is_ephemeral_dist()) {
3638 unsigned min_frag_bits = mdcache->get_ephemeral_dist_frag_bits();
3639 if (min_frag_bits > 0 && get_frag().bits() < min_frag_bits + 1)
3640 return false;
3641 }
3642
3643 return (int)get_frag_size() < g_conf()->mds_bal_merge_size;
3644}
3645
181888fb 3646MEMPOOL_DEFINE_OBJECT_FACTORY(CDir, co_dir, mds_co);
f91f0fd5 3647MEMPOOL_DEFINE_OBJECT_FACTORY(CDir::scrub_info_t, scrub_info_t, mds_co)