]> git.proxmox.com Git - ceph.git/blame - ceph/src/mds/CDir.cc
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / mds / CDir.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
11fdf7f2 15#include <string_view>
f67539c2 16#include <algorithm>
7c673cae
FG
17
18#include "include/types.h"
19
20#include "CDir.h"
21#include "CDentry.h"
22#include "CInode.h"
23#include "Mutation.h"
24
25#include "MDSMap.h"
26#include "MDSRank.h"
27#include "MDCache.h"
28#include "Locker.h"
29#include "MDLog.h"
30#include "LogSegment.h"
522d829b 31#include "MDBalancer.h"
1e59de90 32#include "SnapClient.h"
7c673cae
FG
33
34#include "common/bloom_filter.hpp"
1e59de90 35#include "common/likely.h"
7c673cae
FG
36#include "include/Context.h"
37#include "common/Clock.h"
38
39#include "osdc/Objecter.h"
40
41#include "common/config.h"
11fdf7f2 42#include "include/ceph_assert.h"
7c673cae
FG
43#include "include/compat.h"
44
45#define dout_context g_ceph_context
46#define dout_subsys ceph_subsys_mds
47#undef dout_prefix
f67539c2 48#define dout_prefix *_dout << "mds." << mdcache->mds->get_nodeid() << ".cache.dir(" << this->dirfrag() << ") "
7c673cae 49
20effc67
TL
50using namespace std;
51
7c673cae
FG
52int CDir::num_frozen_trees = 0;
53int CDir::num_freezing_trees = 0;
54
f67539c2
TL
55CDir::fnode_const_ptr CDir::empty_fnode = CDir::allocate_fnode();
56
11fdf7f2 57class CDirContext : public MDSContext
7c673cae
FG
58{
59protected:
60 CDir *dir;
f67539c2 61 MDSRank* get_mds() override {return dir->mdcache->mds;}
7c673cae
FG
62
63public:
64 explicit CDirContext(CDir *d) : dir(d) {
11fdf7f2 65 ceph_assert(dir != NULL);
7c673cae
FG
66 }
67};
68
69
70class CDirIOContext : public MDSIOContextBase
71{
72protected:
73 CDir *dir;
f67539c2 74 MDSRank* get_mds() override {return dir->mdcache->mds;}
7c673cae
FG
75
76public:
77 explicit CDirIOContext(CDir *d) : dir(d) {
11fdf7f2 78 ceph_assert(dir != NULL);
7c673cae
FG
79 }
80};
81
82
83// PINS
84//int cdir_pins[CDIR_NUM_PINS] = { 0,0,0,0,0,0,0,0,0,0,0,0,0,0 };
85
86
87ostream& operator<<(ostream& out, const CDir& dir)
88{
89 out << "[dir " << dir.dirfrag() << " " << dir.get_path() << "/"
90 << " [" << dir.first << ",head]";
91 if (dir.is_auth()) {
92 out << " auth";
93 if (dir.is_replicated())
94 out << dir.get_replicas();
95
96 if (dir.is_projected())
97 out << " pv=" << dir.get_projected_version();
98 out << " v=" << dir.get_version();
99 out << " cv=" << dir.get_committing_version();
100 out << "/" << dir.get_committed_version();
101 } else {
102 mds_authority_t a = dir.authority();
103 out << " rep@" << a.first;
104 if (a.second != CDIR_AUTH_UNKNOWN)
105 out << "," << a.second;
106 out << "." << dir.get_replica_nonce();
107 }
108
109 if (dir.is_rep()) out << " REP";
110
111 if (dir.get_dir_auth() != CDIR_AUTH_DEFAULT) {
112 if (dir.get_dir_auth().second == CDIR_AUTH_UNKNOWN)
113 out << " dir_auth=" << dir.get_dir_auth().first;
114 else
115 out << " dir_auth=" << dir.get_dir_auth();
116 }
117
11fdf7f2 118 if (dir.get_auth_pins() || dir.get_dir_auth_pins()) {
7c673cae 119 out << " ap=" << dir.get_auth_pins()
11fdf7f2
TL
120 << "+" << dir.get_dir_auth_pins();
121#ifdef MDS_AUTHPIN_SET
122 dir.print_authpin_set(out);
123#endif
124 }
7c673cae
FG
125
126 out << " state=" << dir.get_state();
127 if (dir.state_test(CDir::STATE_COMPLETE)) out << "|complete";
128 if (dir.state_test(CDir::STATE_FREEZINGTREE)) out << "|freezingtree";
129 if (dir.state_test(CDir::STATE_FROZENTREE)) out << "|frozentree";
130 if (dir.state_test(CDir::STATE_AUXSUBTREE)) out << "|auxsubtree";
7c673cae
FG
131 if (dir.state_test(CDir::STATE_FROZENDIR)) out << "|frozendir";
132 if (dir.state_test(CDir::STATE_FREEZINGDIR)) out << "|freezingdir";
133 if (dir.state_test(CDir::STATE_EXPORTBOUND)) out << "|exportbound";
134 if (dir.state_test(CDir::STATE_IMPORTBOUND)) out << "|importbound";
135 if (dir.state_test(CDir::STATE_BADFRAG)) out << "|badfrag";
136 if (dir.state_test(CDir::STATE_FRAGMENTING)) out << "|fragmenting";
11fdf7f2
TL
137 if (dir.state_test(CDir::STATE_CREATING)) out << "|creating";
138 if (dir.state_test(CDir::STATE_COMMITTING)) out << "|committing";
139 if (dir.state_test(CDir::STATE_FETCHING)) out << "|fetching";
140 if (dir.state_test(CDir::STATE_EXPORTING)) out << "|exporting";
141 if (dir.state_test(CDir::STATE_IMPORTING)) out << "|importing";
142 if (dir.state_test(CDir::STATE_STICKY)) out << "|sticky";
143 if (dir.state_test(CDir::STATE_DNPINNEDFRAG)) out << "|dnpinnedfrag";
144 if (dir.state_test(CDir::STATE_ASSIMRSTAT)) out << "|assimrstat";
7c673cae
FG
145
146 // fragstat
f67539c2
TL
147 out << " " << dir.get_fnode()->fragstat;
148 if (!(dir.get_fnode()->fragstat == dir.get_fnode()->accounted_fragstat))
149 out << "/" << dir.get_fnode()->accounted_fragstat;
11fdf7f2 150 if (g_conf()->mds_debug_scatterstat && dir.is_projected()) {
f67539c2 151 const auto& pf = dir.get_projected_fnode();
7c673cae
FG
152 out << "->" << pf->fragstat;
153 if (!(pf->fragstat == pf->accounted_fragstat))
154 out << "/" << pf->accounted_fragstat;
155 }
156
157 // rstat
f67539c2
TL
158 out << " " << dir.get_fnode()->rstat;
159 if (!(dir.get_fnode()->rstat == dir.get_fnode()->accounted_rstat))
160 out << "/" << dir.get_fnode()->accounted_rstat;
11fdf7f2 161 if (g_conf()->mds_debug_scatterstat && dir.is_projected()) {
f67539c2 162 const auto& pf = dir.get_projected_fnode();
7c673cae
FG
163 out << "->" << pf->rstat;
164 if (!(pf->rstat == pf->accounted_rstat))
165 out << "/" << pf->accounted_rstat;
166 }
167
168 out << " hs=" << dir.get_num_head_items() << "+" << dir.get_num_head_null();
169 out << ",ss=" << dir.get_num_snap_items() << "+" << dir.get_num_snap_null();
170 if (dir.get_num_dirty())
171 out << " dirty=" << dir.get_num_dirty();
172
173 if (dir.get_num_ref()) {
174 out << " |";
175 dir.print_pin_set(out);
176 }
177
178 out << " " << &dir;
179 return out << "]";
180}
181
182
aee94f69 183void CDir::print(ostream& out) const
7c673cae
FG
184{
185 out << *this;
186}
187
188
189
190
aee94f69 191ostream& CDir::print_db_line_prefix(ostream& out) const
7c673cae 192{
f67539c2 193 return out << ceph_clock_now() << " mds." << mdcache->mds->get_nodeid() << ".cache.dir(" << this->dirfrag() << ") ";
7c673cae
FG
194}
195
196
197
198// -------------------------------------------------------------------
199// CDir
200
f67539c2
TL
201CDir::CDir(CInode *in, frag_t fg, MDCache *mdc, bool auth) :
202 mdcache(mdc), inode(in), frag(fg),
7c673cae 203 dirty_rstat_inodes(member_offset(CInode, dirty_rstat_item)),
b32b8144
FG
204 dirty_dentries(member_offset(CDentry, item_dir_dirty)),
205 item_dirty(this), item_new(this),
9f95a23c
TL
206 lock_caches_with_auth_pins(member_offset(MDLockCache::DirItem, item_dir)),
207 freezing_inodes(member_offset(CInode, item_freezing_inode)),
7c673cae 208 dir_rep(REP_NONE),
f67539c2
TL
209 pop_me(mdc->decayrate),
210 pop_nested(mdc->decayrate),
211 pop_auth_subtree(mdc->decayrate),
212 pop_auth_subtree_nested(mdc->decayrate),
28e407b8 213 pop_lru_subdirs(member_offset(CInode, item_pop_lru)),
7c673cae
FG
214 dir_auth(CDIR_AUTH_DEFAULT)
215{
7c673cae 216 // auth
11fdf7f2 217 ceph_assert(in->is_dir());
f67539c2
TL
218 if (auth)
219 state_set(STATE_AUTH);
7c673cae
FG
220}
221
222/**
223 * Check the recursive statistics on size for consistency.
224 * If mds_debug_scatterstat is enabled, assert for correctness,
225 * otherwise just print out the mismatch and continue.
226 */
227bool CDir::check_rstats(bool scrub)
228{
11fdf7f2 229 if (!g_conf()->mds_debug_scatterstat && !scrub)
7c673cae
FG
230 return true;
231
232 dout(25) << "check_rstats on " << this << dendl;
233 if (!is_complete() || !is_auth() || is_frozen()) {
92f5a8d4
TL
234 dout(3) << "check_rstats " << (scrub ? "(scrub) " : "")
235 << "bailing out -- incomplete or non-auth or frozen dir on "
236 << *this << dendl;
237 return !scrub;
7c673cae
FG
238 }
239
240 frag_info_t frag_info;
241 nest_info_t nest_info;
94b18763 242 for (auto i = items.begin(); i != items.end(); ++i) {
7c673cae
FG
243 if (i->second->last != CEPH_NOSNAP)
244 continue;
245 CDentry::linkage_t *dnl = i->second->get_linkage();
246 if (dnl->is_primary()) {
247 CInode *in = dnl->get_inode();
f67539c2 248 nest_info.add(in->get_inode()->accounted_rstat);
7c673cae
FG
249 if (in->is_dir())
250 frag_info.nsubdirs++;
251 else
252 frag_info.nfiles++;
253 } else if (dnl->is_remote())
254 frag_info.nfiles++;
255 }
256
257 bool good = true;
258 // fragstat
f67539c2 259 if(!frag_info.same_sums(fnode->fragstat)) {
7c673cae
FG
260 dout(1) << "mismatch between head items and fnode.fragstat! printing dentries" << dendl;
261 dout(1) << "get_num_head_items() = " << get_num_head_items()
f67539c2
TL
262 << "; fnode.fragstat.nfiles=" << fnode->fragstat.nfiles
263 << " fnode.fragstat.nsubdirs=" << fnode->fragstat.nsubdirs << dendl;
7c673cae
FG
264 good = false;
265 } else {
266 dout(20) << "get_num_head_items() = " << get_num_head_items()
f67539c2
TL
267 << "; fnode.fragstat.nfiles=" << fnode->fragstat.nfiles
268 << " fnode.fragstat.nsubdirs=" << fnode->fragstat.nsubdirs << dendl;
7c673cae
FG
269 }
270
271 // rstat
f67539c2 272 if (!nest_info.same_sums(fnode->rstat)) {
7c673cae 273 dout(1) << "mismatch between child accounted_rstats and my rstats!" << dendl;
f67539c2
TL
274 dout(1) << "total of child dentries: " << nest_info << dendl;
275 dout(1) << "my rstats: " << fnode->rstat << dendl;
7c673cae
FG
276 good = false;
277 } else {
f67539c2
TL
278 dout(20) << "total of child dentries: " << nest_info << dendl;
279 dout(20) << "my rstats: " << fnode->rstat << dendl;
7c673cae
FG
280 }
281
282 if (!good) {
283 if (!scrub) {
94b18763 284 for (auto i = items.begin(); i != items.end(); ++i) {
7c673cae
FG
285 CDentry *dn = i->second;
286 if (dn->get_linkage()->is_primary()) {
287 CInode *in = dn->get_linkage()->inode;
f67539c2 288 dout(1) << *dn << " rstat " << in->get_inode()->accounted_rstat << dendl;
7c673cae
FG
289 } else {
290 dout(1) << *dn << dendl;
291 }
292 }
293
f67539c2
TL
294 ceph_assert(frag_info.nfiles == fnode->fragstat.nfiles);
295 ceph_assert(frag_info.nsubdirs == fnode->fragstat.nsubdirs);
296 ceph_assert(nest_info.rbytes == fnode->rstat.rbytes);
297 ceph_assert(nest_info.rfiles == fnode->rstat.rfiles);
298 ceph_assert(nest_info.rsubdirs == fnode->rstat.rsubdirs);
7c673cae
FG
299 }
300 }
301 dout(10) << "check_rstats complete on " << this << dendl;
302 return good;
303}
304
11fdf7f2
TL
305void CDir::adjust_num_inodes_with_caps(int d)
306{
307 // FIXME: smarter way to decide if adding 'this' to open file table
308 if (num_inodes_with_caps == 0 && d > 0)
f67539c2 309 mdcache->open_file_table.add_dirfrag(this);
11fdf7f2 310 else if (num_inodes_with_caps > 0 && num_inodes_with_caps == -d)
f67539c2 311 mdcache->open_file_table.remove_dirfrag(this);
11fdf7f2
TL
312
313 num_inodes_with_caps += d;
314 ceph_assert(num_inodes_with_caps >= 0);
315}
316
317CDentry *CDir::lookup(std::string_view name, snapid_t snap)
7c673cae 318{
1e59de90 319 dout(20) << "lookup (" << name << ", '" << snap << "')" << dendl;
94b18763 320 auto iter = items.lower_bound(dentry_key_t(snap, name, inode->hash_dentry_name(name)));
7c673cae
FG
321 if (iter == items.end())
322 return 0;
94b18763 323 if (iter->second->get_name() == name &&
7c673cae
FG
324 iter->second->first <= snap &&
325 iter->second->last >= snap) {
326 dout(20) << " hit -> " << iter->first << dendl;
327 return iter->second;
328 }
329 dout(20) << " miss -> " << iter->first << dendl;
330 return 0;
331}
332
11fdf7f2
TL
333CDentry *CDir::lookup_exact_snap(std::string_view name, snapid_t last) {
334 dout(20) << __func__ << " (" << last << ", '" << name << "')" << dendl;
94b18763 335 auto p = items.find(dentry_key_t(last, name, inode->hash_dentry_name(name)));
7c673cae
FG
336 if (p == items.end())
337 return NULL;
338 return p->second;
339}
340
1e59de90
TL
341void CDir::adjust_dentry_lru(CDentry *dn)
342{
343 bool bottom_lru;
344 if (dn->get_linkage()->is_primary()) {
345 bottom_lru = !is_auth() && inode->is_stray();
346 } else if (dn->get_linkage()->is_remote()) {
347 bottom_lru = false;
348 } else {
349 bottom_lru = !is_auth();
350 }
351 if (bottom_lru) {
352 if (!dn->state_test(CDentry::STATE_BOTTOMLRU)) {
353 mdcache->lru.lru_remove(dn);
354 mdcache->bottom_lru.lru_insert_mid(dn);
355 dn->state_set(CDentry::STATE_BOTTOMLRU);
356 }
357 } else {
358 if (dn->state_test(CDentry::STATE_BOTTOMLRU)) {
359 mdcache->bottom_lru.lru_remove(dn);
360 mdcache->lru.lru_insert_mid(dn);
361 dn->state_clear(CDentry::STATE_BOTTOMLRU);
362 }
363 }
364}
365
7c673cae
FG
366/***
367 * linking fun
368 */
369
11fdf7f2 370CDentry* CDir::add_null_dentry(std::string_view dname,
7c673cae
FG
371 snapid_t first, snapid_t last)
372{
373 // foreign
11fdf7f2 374 ceph_assert(lookup_exact_snap(dname, last) == 0);
7c673cae
FG
375
376 // create dentry
f67539c2 377 CDentry* dn = new CDentry(dname, inode->hash_dentry_name(dname), "", first, last);
7c673cae
FG
378 dn->dir = this;
379 dn->version = get_projected_version();
1e59de90
TL
380 dn->check_corruption(true);
381 if (is_auth()) {
382 dn->state_set(CDentry::STATE_AUTH);
383 mdcache->lru.lru_insert_mid(dn);
384 } else {
385 mdcache->bottom_lru.lru_insert_mid(dn);
386 dn->state_set(CDentry::STATE_BOTTOMLRU);
387 }
7c673cae
FG
388
389 // add to dir
11fdf7f2 390 ceph_assert(items.count(dn->key()) == 0);
94b18763 391 //assert(null_items.count(dn->get_name()) == 0);
7c673cae
FG
392
393 items[dn->key()] = dn;
394 if (last == CEPH_NOSNAP)
395 num_head_null++;
396 else
397 num_snap_null++;
398
399 if (state_test(CDir::STATE_DNPINNEDFRAG)) {
400 dn->get(CDentry::PIN_FRAGMENTING);
401 dn->state_set(CDentry::STATE_FRAGMENTING);
402 }
403
11fdf7f2 404 dout(12) << __func__ << " " << *dn << dendl;
7c673cae
FG
405
406 // pin?
407 if (get_num_any() == 1)
408 get(PIN_CHILD);
409
11fdf7f2 410 ceph_assert(get_num_any() == items.size());
7c673cae
FG
411 return dn;
412}
413
414
11fdf7f2 415CDentry* CDir::add_primary_dentry(std::string_view dname, CInode *in,
f67539c2 416 mempool::mds_co::string alternate_name,
7c673cae
FG
417 snapid_t first, snapid_t last)
418{
419 // primary
11fdf7f2 420 ceph_assert(lookup_exact_snap(dname, last) == 0);
7c673cae
FG
421
422 // create dentry
f67539c2 423 CDentry* dn = new CDentry(dname, inode->hash_dentry_name(dname), std::move(alternate_name), first, last);
1e59de90
TL
424 dn->dir = this;
425 dn->version = get_projected_version();
426 dn->check_corruption(true);
7c673cae
FG
427 if (is_auth())
428 dn->state_set(CDentry::STATE_AUTH);
31f18b77 429 if (is_auth() || !inode->is_stray()) {
f67539c2 430 mdcache->lru.lru_insert_mid(dn);
31f18b77 431 } else {
f67539c2 432 mdcache->bottom_lru.lru_insert_mid(dn);
31f18b77
FG
433 dn->state_set(CDentry::STATE_BOTTOMLRU);
434 }
7c673cae 435
7c673cae 436 // add to dir
11fdf7f2 437 ceph_assert(items.count(dn->key()) == 0);
94b18763 438 //assert(null_items.count(dn->get_name()) == 0);
7c673cae
FG
439
440 items[dn->key()] = dn;
441
442 dn->get_linkage()->inode = in;
7c673cae
FG
443
444 link_inode_work(dn, in);
445
446 if (dn->last == CEPH_NOSNAP)
447 num_head_items++;
448 else
449 num_snap_items++;
450
451 if (state_test(CDir::STATE_DNPINNEDFRAG)) {
452 dn->get(CDentry::PIN_FRAGMENTING);
453 dn->state_set(CDentry::STATE_FRAGMENTING);
454 }
455
11fdf7f2 456 dout(12) << __func__ << " " << *dn << dendl;
7c673cae
FG
457
458 // pin?
459 if (get_num_any() == 1)
460 get(PIN_CHILD);
11fdf7f2 461 ceph_assert(get_num_any() == items.size());
7c673cae
FG
462 return dn;
463}
464
11fdf7f2 465CDentry* CDir::add_remote_dentry(std::string_view dname, inodeno_t ino, unsigned char d_type,
f67539c2 466 mempool::mds_co::string alternate_name,
7c673cae
FG
467 snapid_t first, snapid_t last)
468{
469 // foreign
11fdf7f2 470 ceph_assert(lookup_exact_snap(dname, last) == 0);
7c673cae
FG
471
472 // create dentry
f67539c2 473 CDentry* dn = new CDentry(dname, inode->hash_dentry_name(dname), std::move(alternate_name), ino, d_type, first, last);
1e59de90
TL
474 dn->dir = this;
475 dn->version = get_projected_version();
476 dn->check_corruption(true);
7c673cae
FG
477 if (is_auth())
478 dn->state_set(CDentry::STATE_AUTH);
f67539c2 479 mdcache->lru.lru_insert_mid(dn);
7c673cae
FG
480
481 // add to dir
11fdf7f2 482 ceph_assert(items.count(dn->key()) == 0);
94b18763 483 //assert(null_items.count(dn->get_name()) == 0);
7c673cae
FG
484
485 items[dn->key()] = dn;
486 if (last == CEPH_NOSNAP)
487 num_head_items++;
488 else
489 num_snap_items++;
490
491 if (state_test(CDir::STATE_DNPINNEDFRAG)) {
492 dn->get(CDentry::PIN_FRAGMENTING);
493 dn->state_set(CDentry::STATE_FRAGMENTING);
494 }
495
11fdf7f2 496 dout(12) << __func__ << " " << *dn << dendl;
7c673cae
FG
497
498 // pin?
499 if (get_num_any() == 1)
500 get(PIN_CHILD);
501
11fdf7f2 502 ceph_assert(get_num_any() == items.size());
7c673cae
FG
503 return dn;
504}
505
506
507
508void CDir::remove_dentry(CDentry *dn)
509{
11fdf7f2 510 dout(12) << __func__ << " " << *dn << dendl;
7c673cae
FG
511
512 // there should be no client leases at this point!
11fdf7f2 513 ceph_assert(dn->client_lease_map.empty());
7c673cae
FG
514
515 if (state_test(CDir::STATE_DNPINNEDFRAG)) {
516 dn->put(CDentry::PIN_FRAGMENTING);
517 dn->state_clear(CDentry::STATE_FRAGMENTING);
518 }
519
520 if (dn->get_linkage()->is_null()) {
521 if (dn->last == CEPH_NOSNAP)
522 num_head_null--;
523 else
524 num_snap_null--;
525 } else {
526 if (dn->last == CEPH_NOSNAP)
527 num_head_items--;
528 else
529 num_snap_items--;
530 }
531
532 if (!dn->get_linkage()->is_null())
533 // detach inode and dentry
534 unlink_inode_work(dn);
535
536 // remove from list
11fdf7f2 537 ceph_assert(items.count(dn->key()) == 1);
7c673cae
FG
538 items.erase(dn->key());
539
540 // clean?
541 if (dn->is_dirty())
542 dn->mark_clean();
543
31f18b77 544 if (dn->state_test(CDentry::STATE_BOTTOMLRU))
f67539c2 545 mdcache->bottom_lru.lru_remove(dn);
31f18b77 546 else
f67539c2 547 mdcache->lru.lru_remove(dn);
7c673cae
FG
548 delete dn;
549
550 // unpin?
551 if (get_num_any() == 0)
552 put(PIN_CHILD);
11fdf7f2 553 ceph_assert(get_num_any() == items.size());
7c673cae
FG
554}
555
556void CDir::link_remote_inode(CDentry *dn, CInode *in)
557{
558 link_remote_inode(dn, in->ino(), IFTODT(in->get_projected_inode()->mode));
559}
560
561void CDir::link_remote_inode(CDentry *dn, inodeno_t ino, unsigned char d_type)
562{
11fdf7f2
TL
563 dout(12) << __func__ << " " << *dn << " remote " << ino << dendl;
564 ceph_assert(dn->get_linkage()->is_null());
7c673cae
FG
565
566 dn->get_linkage()->set_remote(ino, d_type);
567
31f18b77 568 if (dn->state_test(CDentry::STATE_BOTTOMLRU)) {
f67539c2
TL
569 mdcache->bottom_lru.lru_remove(dn);
570 mdcache->lru.lru_insert_mid(dn);
31f18b77
FG
571 dn->state_clear(CDentry::STATE_BOTTOMLRU);
572 }
573
7c673cae
FG
574 if (dn->last == CEPH_NOSNAP) {
575 num_head_items++;
576 num_head_null--;
577 } else {
578 num_snap_items++;
579 num_snap_null--;
580 }
11fdf7f2 581 ceph_assert(get_num_any() == items.size());
7c673cae
FG
582}
583
584void CDir::link_primary_inode(CDentry *dn, CInode *in)
585{
11fdf7f2
TL
586 dout(12) << __func__ << " " << *dn << " " << *in << dendl;
587 ceph_assert(dn->get_linkage()->is_null());
7c673cae
FG
588
589 dn->get_linkage()->inode = in;
7c673cae
FG
590
591 link_inode_work(dn, in);
31f18b77
FG
592
593 if (dn->state_test(CDentry::STATE_BOTTOMLRU) &&
594 (is_auth() || !inode->is_stray())) {
f67539c2
TL
595 mdcache->bottom_lru.lru_remove(dn);
596 mdcache->lru.lru_insert_mid(dn);
31f18b77
FG
597 dn->state_clear(CDentry::STATE_BOTTOMLRU);
598 }
7c673cae
FG
599
600 if (dn->last == CEPH_NOSNAP) {
601 num_head_items++;
602 num_head_null--;
603 } else {
604 num_snap_items++;
605 num_snap_null--;
606 }
607
11fdf7f2 608 ceph_assert(get_num_any() == items.size());
7c673cae
FG
609}
610
611void CDir::link_inode_work( CDentry *dn, CInode *in)
612{
11fdf7f2 613 ceph_assert(dn->get_linkage()->get_inode() == in);
28e407b8 614 in->set_primary_parent(dn);
7c673cae
FG
615
616 // set inode version
617 //in->inode.version = dn->get_version();
618
619 // pin dentry?
620 if (in->get_num_ref())
621 dn->get(CDentry::PIN_INODEPIN);
11fdf7f2
TL
622
623 if (in->state_test(CInode::STATE_TRACKEDBYOFT))
f67539c2 624 mdcache->open_file_table.notify_link(in);
11fdf7f2
TL
625 if (in->is_any_caps())
626 adjust_num_inodes_with_caps(1);
7c673cae
FG
627
628 // adjust auth pin count
11fdf7f2
TL
629 if (in->auth_pins)
630 dn->adjust_nested_auth_pins(in->auth_pins, NULL);
7c673cae 631
9f95a23c
TL
632 if (in->is_freezing_inode())
633 freezing_inodes.push_back(&in->item_freezing_inode);
634 else if (in->is_frozen_inode() || in->is_frozen_auth_pin())
635 num_frozen_inodes++;
636
7c673cae
FG
637 // verify open snaprealm parent
638 if (in->snaprealm)
639 in->snaprealm->adjust_parent();
640 else if (in->is_any_caps())
641 in->move_to_realm(inode->find_snaprealm());
642}
643
31f18b77 644void CDir::unlink_inode(CDentry *dn, bool adjust_lru)
7c673cae
FG
645{
646 if (dn->get_linkage()->is_primary()) {
11fdf7f2 647 dout(12) << __func__ << " " << *dn << " " << *dn->get_linkage()->get_inode() << dendl;
7c673cae 648 } else {
11fdf7f2 649 dout(12) << __func__ << " " << *dn << dendl;
7c673cae
FG
650 }
651
652 unlink_inode_work(dn);
653
1e59de90
TL
654 if (adjust_lru && !is_auth() &&
655 !dn->state_test(CDentry::STATE_BOTTOMLRU)) {
f67539c2
TL
656 mdcache->lru.lru_remove(dn);
657 mdcache->bottom_lru.lru_insert_mid(dn);
31f18b77
FG
658 dn->state_set(CDentry::STATE_BOTTOMLRU);
659 }
660
7c673cae
FG
661 if (dn->last == CEPH_NOSNAP) {
662 num_head_items--;
663 num_head_null++;
664 } else {
665 num_snap_items--;
666 num_snap_null++;
667 }
11fdf7f2 668 ceph_assert(get_num_any() == items.size());
7c673cae
FG
669}
670
7c673cae
FG
671void CDir::try_remove_unlinked_dn(CDentry *dn)
672{
11fdf7f2
TL
673 ceph_assert(dn->dir == this);
674 ceph_assert(dn->get_linkage()->is_null());
7c673cae
FG
675
676 // no pins (besides dirty)?
677 if (dn->get_num_ref() != dn->is_dirty())
678 return;
679
680 // was the dn new?
681 if (dn->is_new()) {
11fdf7f2 682 dout(10) << __func__ << " " << *dn << " in " << *this << dendl;
7c673cae
FG
683 if (dn->is_dirty())
684 dn->mark_clean();
685 remove_dentry(dn);
686
687 // NOTE: we may not have any more dirty dentries, but the fnode
688 // still changed, so the directory must remain dirty.
689 }
690}
691
692
11fdf7f2 693void CDir::unlink_inode_work(CDentry *dn)
7c673cae
FG
694{
695 CInode *in = dn->get_linkage()->get_inode();
696
697 if (dn->get_linkage()->is_remote()) {
698 // remote
699 if (in)
700 dn->unlink_remote(dn->get_linkage());
701
702 dn->get_linkage()->set_remote(0, 0);
703 } else if (dn->get_linkage()->is_primary()) {
704 // primary
705 // unpin dentry?
706 if (in->get_num_ref())
707 dn->put(CDentry::PIN_INODEPIN);
11fdf7f2
TL
708
709 if (in->state_test(CInode::STATE_TRACKEDBYOFT))
f67539c2 710 mdcache->open_file_table.notify_unlink(in);
11fdf7f2
TL
711 if (in->is_any_caps())
712 adjust_num_inodes_with_caps(-1);
7c673cae
FG
713
714 // unlink auth_pin count
11fdf7f2
TL
715 if (in->auth_pins)
716 dn->adjust_nested_auth_pins(-in->auth_pins, nullptr);
28e407b8 717
9f95a23c
TL
718 if (in->is_freezing_inode())
719 in->item_freezing_inode.remove_myself();
720 else if (in->is_frozen_inode() || in->is_frozen_auth_pin())
721 num_frozen_inodes--;
722
7c673cae
FG
723 // detach inode
724 in->remove_primary_parent(dn);
28e407b8
AA
725 if (in->is_dir())
726 in->item_pop_lru.remove_myself();
7c673cae
FG
727 dn->get_linkage()->inode = 0;
728 } else {
11fdf7f2 729 ceph_assert(!dn->get_linkage()->is_null());
7c673cae
FG
730 }
731}
732
733void CDir::add_to_bloom(CDentry *dn)
734{
11fdf7f2 735 ceph_assert(dn->last == CEPH_NOSNAP);
7c673cae
FG
736 if (!bloom) {
737 /* not create bloom filter for incomplete dir that was added by log replay */
738 if (!is_complete())
739 return;
740
741 /* don't maintain bloom filters in standby replay (saves cycles, and also
742 * avoids need to implement clearing it in EExport for #16924) */
f67539c2 743 if (mdcache->mds->is_standby_replay()) {
7c673cae
FG
744 return;
745 }
746
747 unsigned size = get_num_head_items() + get_num_snap_items();
748 if (size < 100) size = 100;
749 bloom.reset(new bloom_filter(size, 1.0 / size, 0));
750 }
751 /* This size and false positive probability is completely random.*/
94b18763 752 bloom->insert(dn->get_name().data(), dn->get_name().size());
7c673cae
FG
753}
754
11fdf7f2 755bool CDir::is_in_bloom(std::string_view name)
7c673cae
FG
756{
757 if (!bloom)
758 return false;
94b18763 759 return bloom->contains(name.data(), name.size());
7c673cae
FG
760}
761
762void CDir::remove_null_dentries() {
11fdf7f2 763 dout(12) << __func__ << " " << *this << dendl;
7c673cae 764
94b18763 765 auto p = items.begin();
7c673cae
FG
766 while (p != items.end()) {
767 CDentry *dn = p->second;
768 ++p;
769 if (dn->get_linkage()->is_null() && !dn->is_projected())
770 remove_dentry(dn);
771 }
772
11fdf7f2
TL
773 ceph_assert(num_snap_null == 0);
774 ceph_assert(num_head_null == 0);
775 ceph_assert(get_num_any() == items.size());
7c673cae
FG
776}
777
778/** remove dirty null dentries for deleted directory. the dirfrag will be
779 * deleted soon, so it's safe to not commit dirty dentries.
780 *
781 * This is called when a directory is being deleted, a prerequisite
782 * of which is that its children have been unlinked: we expect to only see
783 * null, unprojected dentries here.
784 */
785void CDir::try_remove_dentries_for_stray()
786{
787 dout(10) << __func__ << dendl;
11fdf7f2 788 ceph_assert(get_parent_dir()->inode->is_stray());
7c673cae
FG
789
790 // clear dirty only when the directory was not snapshotted
791 bool clear_dirty = !inode->snaprealm;
792
94b18763 793 auto p = items.begin();
7c673cae
FG
794 while (p != items.end()) {
795 CDentry *dn = p->second;
796 ++p;
797 if (dn->last == CEPH_NOSNAP) {
11fdf7f2
TL
798 ceph_assert(!dn->is_projected());
799 ceph_assert(dn->get_linkage()->is_null());
7c673cae
FG
800 if (clear_dirty && dn->is_dirty())
801 dn->mark_clean();
802 // It's OK to remove lease prematurely because we will never link
803 // the dentry to inode again.
804 if (dn->is_any_leases())
f67539c2 805 dn->remove_client_leases(mdcache->mds->locker);
7c673cae
FG
806 if (dn->get_num_ref() == 0)
807 remove_dentry(dn);
808 } else {
11fdf7f2 809 ceph_assert(!dn->is_projected());
7c673cae
FG
810 CDentry::linkage_t *dnl= dn->get_linkage();
811 CInode *in = NULL;
812 if (dnl->is_primary()) {
813 in = dnl->get_inode();
814 if (clear_dirty && in->is_dirty())
815 in->mark_clean();
816 }
817 if (clear_dirty && dn->is_dirty())
818 dn->mark_clean();
819 if (dn->get_num_ref() == 0) {
820 remove_dentry(dn);
821 if (in)
f67539c2 822 mdcache->remove_inode(in);
7c673cae
FG
823 }
824 }
825 }
826
827 if (clear_dirty && is_dirty())
828 mark_clean();
829}
830
7c673cae
FG
831bool CDir::try_trim_snap_dentry(CDentry *dn, const set<snapid_t>& snaps)
832{
1e59de90
TL
833 if (dn->last == CEPH_NOSNAP) {
834 return false;
835 }
7c673cae
FG
836 set<snapid_t>::const_iterator p = snaps.lower_bound(dn->first);
837 CDentry::linkage_t *dnl= dn->get_linkage();
838 CInode *in = 0;
839 if (dnl->is_primary())
840 in = dnl->get_inode();
841 if ((p == snaps.end() || *p > dn->last) &&
842 (dn->get_num_ref() == dn->is_dirty()) &&
843 (!in || in->get_num_ref() == in->is_dirty())) {
844 dout(10) << " purging snapped " << *dn << dendl;
845 if (in && in->is_dirty())
846 in->mark_clean();
847 remove_dentry(dn);
848 if (in) {
849 dout(10) << " purging snapped " << *in << dendl;
f67539c2 850 mdcache->remove_inode(in);
7c673cae
FG
851 }
852 return true;
853 }
854 return false;
855}
856
857
7c673cae
FG
858/**
859 * steal_dentry -- semi-violently move a dentry from one CDir to another
860 * (*) violently, in that nitems, most pins, etc. are not correctly maintained
861 * on the old CDir corpse; must call finish_old_fragment() when finished.
862 */
863void CDir::steal_dentry(CDentry *dn)
864{
11fdf7f2 865 dout(15) << __func__ << " " << *dn << dendl;
7c673cae
FG
866
867 items[dn->key()] = dn;
868
869 dn->dir->items.erase(dn->key());
870 if (dn->dir->items.empty())
871 dn->dir->put(PIN_CHILD);
872
873 if (get_num_any() == 0)
874 get(PIN_CHILD);
875 if (dn->get_linkage()->is_null()) {
876 if (dn->last == CEPH_NOSNAP)
877 num_head_null++;
878 else
879 num_snap_null++;
880 } else if (dn->last == CEPH_NOSNAP) {
881 num_head_items++;
882
f67539c2
TL
883 auto _fnode = _get_fnode();
884
7c673cae
FG
885 if (dn->get_linkage()->is_primary()) {
886 CInode *in = dn->get_linkage()->get_inode();
f67539c2 887 const auto& pi = in->get_projected_inode();
28e407b8 888 if (in->is_dir()) {
f67539c2 889 _fnode->fragstat.nsubdirs++;
28e407b8
AA
890 if (in->item_pop_lru.is_on_list())
891 pop_lru_subdirs.push_back(&in->item_pop_lru);
892 } else {
f67539c2 893 _fnode->fragstat.nfiles++;
28e407b8 894 }
f67539c2
TL
895 _fnode->rstat.rbytes += pi->accounted_rstat.rbytes;
896 _fnode->rstat.rfiles += pi->accounted_rstat.rfiles;
897 _fnode->rstat.rsubdirs += pi->accounted_rstat.rsubdirs;
898 _fnode->rstat.rsnaps += pi->accounted_rstat.rsnaps;
899 if (pi->accounted_rstat.rctime > fnode->rstat.rctime)
900 _fnode->rstat.rctime = pi->accounted_rstat.rctime;
7c673cae 901
11fdf7f2
TL
902 if (in->is_any_caps())
903 adjust_num_inodes_with_caps(1);
904
7c673cae
FG
905 // move dirty inode rstat to new dirfrag
906 if (in->is_dirty_rstat())
907 dirty_rstat_inodes.push_back(&in->dirty_rstat_item);
908 } else if (dn->get_linkage()->is_remote()) {
909 if (dn->get_linkage()->get_remote_d_type() == DT_DIR)
f67539c2 910 _fnode->fragstat.nsubdirs++;
7c673cae 911 else
f67539c2 912 _fnode->fragstat.nfiles++;
7c673cae
FG
913 }
914 } else {
915 num_snap_items++;
916 if (dn->get_linkage()->is_primary()) {
917 CInode *in = dn->get_linkage()->get_inode();
918 if (in->is_dirty_rstat())
919 dirty_rstat_inodes.push_back(&in->dirty_rstat_item);
920 }
921 }
922
11fdf7f2 923 {
7c673cae 924 int dap = dn->get_num_dir_auth_pins();
11fdf7f2
TL
925 if (dap) {
926 adjust_nested_auth_pins(dap, NULL);
927 dn->dir->adjust_nested_auth_pins(-dap, NULL);
928 }
7c673cae
FG
929 }
930
b32b8144
FG
931 if (dn->is_dirty()) {
932 dirty_dentries.push_back(&dn->item_dir_dirty);
7c673cae 933 num_dirty++;
b32b8144 934 }
7c673cae
FG
935
936 dn->dir = this;
937}
938
11fdf7f2 939void CDir::prepare_old_fragment(map<string_snap_t, MDSContext::vec >& dentry_waiters, bool replay)
7c673cae
FG
940{
941 // auth_pin old fragment for duration so that any auth_pinning
942 // during the dentry migration doesn't trigger side effects
943 if (!replay && is_auth())
944 auth_pin(this);
31f18b77
FG
945
946 if (!waiting_on_dentry.empty()) {
94b18763 947 for (const auto &p : waiting_on_dentry) {
1e59de90
TL
948 std::copy(p.second.begin(), p.second.end(),
949 std::back_inserter(dentry_waiters[p.first]));
94b18763 950 }
31f18b77
FG
951 waiting_on_dentry.clear();
952 put(PIN_DNWAITER);
953 }
7c673cae
FG
954}
955
956void CDir::prepare_new_fragment(bool replay)
957{
958 if (!replay && is_auth()) {
959 _freeze_dir();
960 mark_complete();
961 }
31f18b77 962 inode->add_dirfrag(this);
7c673cae
FG
963}
964
11fdf7f2 965void CDir::finish_old_fragment(MDSContext::vec& waiters, bool replay)
7c673cae
FG
966{
967 // take waiters _before_ unfreeze...
968 if (!replay) {
969 take_waiting(WAIT_ANY_MASK, waiters);
970 if (is_auth()) {
971 auth_unpin(this); // pinned in prepare_old_fragment
11fdf7f2 972 ceph_assert(is_frozen_dir());
7c673cae
FG
973 unfreeze_dir();
974 }
975 }
976
11fdf7f2
TL
977 ceph_assert(dir_auth_pins == 0);
978 ceph_assert(auth_pins == 0);
7c673cae
FG
979
980 num_head_items = num_head_null = 0;
981 num_snap_items = num_snap_null = 0;
11fdf7f2 982 adjust_num_inodes_with_caps(-num_inodes_with_caps);
7c673cae
FG
983
984 // this mirrors init_fragment_pins()
985 if (is_auth())
986 clear_replica_map();
987 if (is_dirty())
988 mark_clean();
989 if (state_test(STATE_IMPORTBOUND))
990 put(PIN_IMPORTBOUND);
991 if (state_test(STATE_EXPORTBOUND))
992 put(PIN_EXPORTBOUND);
993 if (is_subtree_root())
994 put(PIN_SUBTREE);
995
996 if (auth_pins > 0)
997 put(PIN_AUTHPIN);
998
11fdf7f2 999 ceph_assert(get_num_ref() == (state_test(STATE_STICKY) ? 1:0));
7c673cae
FG
1000}
1001
1002void CDir::init_fragment_pins()
1003{
181888fb 1004 if (is_replicated())
7c673cae
FG
1005 get(PIN_REPLICATED);
1006 if (state_test(STATE_DIRTY))
1007 get(PIN_DIRTY);
1008 if (state_test(STATE_EXPORTBOUND))
1009 get(PIN_EXPORTBOUND);
1010 if (state_test(STATE_IMPORTBOUND))
1011 get(PIN_IMPORTBOUND);
1012 if (is_subtree_root())
1013 get(PIN_SUBTREE);
1014}
1015
9f95a23c 1016void CDir::split(int bits, std::vector<CDir*>* subs, MDSContext::vec& waiters, bool replay)
7c673cae
FG
1017{
1018 dout(10) << "split by " << bits << " bits on " << *this << dendl;
1019
11fdf7f2 1020 ceph_assert(replay || is_complete() || !is_auth());
7c673cae 1021
11fdf7f2 1022 frag_vec_t frags;
7c673cae
FG
1023 frag.split(bits, frags);
1024
1025 vector<CDir*> subfrags(1 << bits);
1026
1027 double fac = 1.0 / (double)(1 << bits); // for scaling load vecs
1028
1029 version_t rstat_version = inode->get_projected_inode()->rstat.version;
1030 version_t dirstat_version = inode->get_projected_inode()->dirstat.version;
1031
1032 nest_info_t rstatdiff;
1033 frag_info_t fragstatdiff;
f67539c2
TL
1034 if (fnode->accounted_rstat.version == rstat_version)
1035 rstatdiff.add_delta(fnode->accounted_rstat, fnode->rstat);
1036 if (fnode->accounted_fragstat.version == dirstat_version)
1037 fragstatdiff.add_delta(fnode->accounted_fragstat, fnode->fragstat);
7c673cae
FG
1038 dout(10) << " rstatdiff " << rstatdiff << " fragstatdiff " << fragstatdiff << dendl;
1039
11fdf7f2 1040 map<string_snap_t, MDSContext::vec > dentry_waiters;
31f18b77 1041 prepare_old_fragment(dentry_waiters, replay);
7c673cae
FG
1042
1043 // create subfrag dirs
1044 int n = 0;
11fdf7f2 1045 for (const auto& fg : frags) {
f67539c2 1046 CDir *f = new CDir(inode, fg, mdcache, is_auth());
7c673cae 1047 f->state_set(state & (MASK_STATE_FRAGMENT_KEPT | STATE_COMPLETE));
181888fb 1048 f->get_replicas() = get_replicas();
7c673cae
FG
1049 f->pop_me = pop_me;
1050 f->pop_me.scale(fac);
1051
1052 // FIXME; this is an approximation
1053 f->pop_nested = pop_nested;
1054 f->pop_nested.scale(fac);
1055 f->pop_auth_subtree = pop_auth_subtree;
1056 f->pop_auth_subtree.scale(fac);
1057 f->pop_auth_subtree_nested = pop_auth_subtree_nested;
1058 f->pop_auth_subtree_nested.scale(fac);
1059
11fdf7f2 1060 dout(10) << " subfrag " << fg << " " << *f << dendl;
7c673cae 1061 subfrags[n++] = f;
9f95a23c 1062 subs->push_back(f);
7c673cae
FG
1063
1064 f->set_dir_auth(get_dir_auth());
11fdf7f2 1065 f->freeze_tree_state = freeze_tree_state;
7c673cae 1066 f->prepare_new_fragment(replay);
1adf2230 1067 f->init_fragment_pins();
7c673cae
FG
1068 }
1069
1070 // repartition dentries
1071 while (!items.empty()) {
94b18763 1072 auto p = items.begin();
7c673cae
FG
1073
1074 CDentry *dn = p->second;
94b18763 1075 frag_t subfrag = inode->pick_dirfrag(dn->get_name());
7c673cae
FG
1076 int n = (subfrag.value() & (subfrag.mask() ^ frag.mask())) >> subfrag.mask_shift();
1077 dout(15) << " subfrag " << subfrag << " n=" << n << " for " << p->first << dendl;
1078 CDir *f = subfrags[n];
1079 f->steal_dentry(dn);
1080 }
1081
94b18763 1082 for (const auto &p : dentry_waiters) {
31f18b77
FG
1083 frag_t subfrag = inode->pick_dirfrag(p.first.name);
1084 int n = (subfrag.value() & (subfrag.mask() ^ frag.mask())) >> subfrag.mask_shift();
1085 CDir *f = subfrags[n];
1086
1087 if (f->waiting_on_dentry.empty())
1088 f->get(PIN_DNWAITER);
1e59de90
TL
1089 std::copy(p.second.begin(), p.second.end(),
1090 std::back_inserter(f->waiting_on_dentry[p.first]));
31f18b77
FG
1091 }
1092
7c673cae
FG
1093 // FIXME: handle dirty old rstat
1094
1095 // fix up new frag fragstats
f67539c2 1096 for (int i = 0; i < n; i++) {
7c673cae 1097 CDir *f = subfrags[i];
f67539c2
TL
1098 auto _fnode = f->_get_fnode();
1099 _fnode->version = f->projected_version = get_version();
1100 _fnode->rstat.version = rstat_version;
1101 _fnode->accounted_rstat = _fnode->rstat;
1102 _fnode->fragstat.version = dirstat_version;
1103 _fnode->accounted_fragstat = _fnode->fragstat;
1104 dout(10) << " rstat " << _fnode->rstat << " fragstat " << _fnode->fragstat
7c673cae 1105 << " on " << *f << dendl;
7c673cae 1106
f67539c2
TL
1107 if (i == 0) {
1108 // give any outstanding frag stat differential to first frag
1109 dout(10) << " giving rstatdiff " << rstatdiff << " fragstatdiff" << fragstatdiff
1110 << " to " << *subfrags[0] << dendl;
1111 _fnode->accounted_rstat.add(rstatdiff);
1112 _fnode->accounted_fragstat.add(fragstatdiff);
1113 }
1114 }
7c673cae
FG
1115
1116 finish_old_fragment(waiters, replay);
1117}
1118
9f95a23c 1119void CDir::merge(const std::vector<CDir*>& subs, MDSContext::vec& waiters, bool replay)
7c673cae
FG
1120{
1121 dout(10) << "merge " << subs << dendl;
1122
9f95a23c
TL
1123 ceph_assert(subs.size() > 0);
1124
11fdf7f2
TL
1125 set_dir_auth(subs.front()->get_dir_auth());
1126 freeze_tree_state = subs.front()->freeze_tree_state;
1127
9f95a23c 1128 for (const auto& dir : subs) {
11fdf7f2
TL
1129 ceph_assert(get_dir_auth() == dir->get_dir_auth());
1130 ceph_assert(freeze_tree_state == dir->freeze_tree_state);
7c673cae
FG
1131 }
1132
7c673cae
FG
1133 prepare_new_fragment(replay);
1134
f67539c2
TL
1135 auto _fnode = _get_fnode();
1136
7c673cae
FG
1137 nest_info_t rstatdiff;
1138 frag_info_t fragstatdiff;
1139 bool touched_mtime, touched_chattr;
1140 version_t rstat_version = inode->get_projected_inode()->rstat.version;
1141 version_t dirstat_version = inode->get_projected_inode()->dirstat.version;
1142
11fdf7f2 1143 map<string_snap_t, MDSContext::vec > dentry_waiters;
31f18b77 1144
9f95a23c 1145 for (const auto& dir : subs) {
7c673cae 1146 dout(10) << " subfrag " << dir->get_frag() << " " << *dir << dendl;
11fdf7f2 1147 ceph_assert(!dir->is_auth() || dir->is_complete() || replay);
7c673cae 1148
f67539c2
TL
1149 if (dir->get_fnode()->accounted_rstat.version == rstat_version)
1150 rstatdiff.add_delta(dir->get_fnode()->accounted_rstat, dir->get_fnode()->rstat);
1151 if (dir->get_fnode()->accounted_fragstat.version == dirstat_version)
1152 fragstatdiff.add_delta(dir->get_fnode()->accounted_fragstat, dir->get_fnode()->fragstat,
7c673cae
FG
1153 &touched_mtime, &touched_chattr);
1154
31f18b77 1155 dir->prepare_old_fragment(dentry_waiters, replay);
7c673cae
FG
1156
1157 // steal dentries
1158 while (!dir->items.empty())
1159 steal_dentry(dir->items.begin()->second);
1160
1161 // merge replica map
181888fb
FG
1162 for (const auto &p : dir->get_replicas()) {
1163 unsigned cur = get_replicas()[p.first];
1164 if (p.second > cur)
1165 get_replicas()[p.first] = p.second;
7c673cae
FG
1166 }
1167
1168 // merge version
f67539c2
TL
1169 if (dir->get_version() > _fnode->version)
1170 _fnode->version = projected_version = dir->get_version();
7c673cae
FG
1171
1172 // merge state
1173 state_set(dir->get_state() & MASK_STATE_FRAGMENT_KEPT);
7c673cae
FG
1174
1175 dir->finish_old_fragment(waiters, replay);
1176 inode->close_dirfrag(dir->get_frag());
1177 }
1178
31f18b77
FG
1179 if (!dentry_waiters.empty()) {
1180 get(PIN_DNWAITER);
94b18763 1181 for (const auto &p : dentry_waiters) {
1e59de90
TL
1182 std::copy(p.second.begin(), p.second.end(),
1183 std::back_inserter(waiting_on_dentry[p.first]));
31f18b77
FG
1184 }
1185 }
1186
7c673cae
FG
1187 if (is_auth() && !replay)
1188 mark_complete();
1189
1190 // FIXME: merge dirty old rstat
f67539c2
TL
1191 _fnode->rstat.version = rstat_version;
1192 _fnode->accounted_rstat = _fnode->rstat;
1193 _fnode->accounted_rstat.add(rstatdiff);
7c673cae 1194
f67539c2
TL
1195 _fnode->fragstat.version = dirstat_version;
1196 _fnode->accounted_fragstat = _fnode->fragstat;
1197 _fnode->accounted_fragstat.add(fragstatdiff);
7c673cae
FG
1198
1199 init_fragment_pins();
1200}
1201
1202
1203
1204
1205void CDir::resync_accounted_fragstat()
1206{
f67539c2
TL
1207 auto pf = _get_projected_fnode();
1208 const auto& pi = inode->get_projected_inode();
7c673cae
FG
1209
1210 if (pf->accounted_fragstat.version != pi->dirstat.version) {
1211 pf->fragstat.version = pi->dirstat.version;
11fdf7f2 1212 dout(10) << __func__ << " " << pf->accounted_fragstat << " -> " << pf->fragstat << dendl;
7c673cae
FG
1213 pf->accounted_fragstat = pf->fragstat;
1214 }
1215}
1216
1217/*
1218 * resync rstat and accounted_rstat with inode
1219 */
1220void CDir::resync_accounted_rstat()
1221{
f67539c2
TL
1222 auto pf = _get_projected_fnode();
1223 const auto& pi = inode->get_projected_inode();
7c673cae
FG
1224
1225 if (pf->accounted_rstat.version != pi->rstat.version) {
1226 pf->rstat.version = pi->rstat.version;
11fdf7f2 1227 dout(10) << __func__ << " " << pf->accounted_rstat << " -> " << pf->rstat << dendl;
7c673cae
FG
1228 pf->accounted_rstat = pf->rstat;
1229 dirty_old_rstat.clear();
1230 }
1231}
1232
f67539c2 1233void CDir::assimilate_dirty_rstat_inodes(MutationRef& mut)
7c673cae 1234{
11fdf7f2 1235 dout(10) << __func__ << dendl;
7c673cae
FG
1236 for (elist<CInode*>::iterator p = dirty_rstat_inodes.begin_use_current();
1237 !p.end(); ++p) {
1238 CInode *in = *p;
11fdf7f2 1239 ceph_assert(in->is_auth());
7c673cae
FG
1240 if (in->is_frozen())
1241 continue;
1242
f67539c2
TL
1243 mut->auth_pin(in);
1244
1245 auto pi = in->project_inode(mut);
1246 pi.inode->version = in->pre_dirty();
7c673cae 1247
f67539c2 1248 mdcache->project_rstat_inode_to_frag(mut, in, this, 0, 0, nullptr);
7c673cae
FG
1249 }
1250 state_set(STATE_ASSIMRSTAT);
11fdf7f2 1251 dout(10) << __func__ << " done" << dendl;
7c673cae
FG
1252}
1253
f67539c2 1254void CDir::assimilate_dirty_rstat_inodes_finish(EMetaBlob *blob)
7c673cae
FG
1255{
1256 if (!state_test(STATE_ASSIMRSTAT))
1257 return;
1258 state_clear(STATE_ASSIMRSTAT);
11fdf7f2 1259 dout(10) << __func__ << dendl;
7c673cae
FG
1260 elist<CInode*>::iterator p = dirty_rstat_inodes.begin_use_current();
1261 while (!p.end()) {
1262 CInode *in = *p;
1263 ++p;
1264
1265 if (in->is_frozen())
1266 continue;
1267
1268 CDentry *dn = in->get_projected_parent_dn();
1269
7c673cae
FG
1270 in->clear_dirty_rstat();
1271 blob->add_primary_dentry(dn, in, true);
1272 }
1273
1274 if (!dirty_rstat_inodes.empty())
f67539c2 1275 mdcache->mds->locker->mark_updated_scatterlock(&inode->nestlock);
7c673cae
FG
1276}
1277
1278
1279
1280
1281/****************************************
1282 * WAITING
1283 */
1284
11fdf7f2 1285void CDir::add_dentry_waiter(std::string_view dname, snapid_t snapid, MDSContext *c)
7c673cae
FG
1286{
1287 if (waiting_on_dentry.empty())
1288 get(PIN_DNWAITER);
1289 waiting_on_dentry[string_snap_t(dname, snapid)].push_back(c);
11fdf7f2 1290 dout(10) << __func__ << " dentry " << dname
7c673cae
FG
1291 << " snap " << snapid
1292 << " " << c << " on " << *this << dendl;
1293}
1294
11fdf7f2
TL
1295void CDir::take_dentry_waiting(std::string_view dname, snapid_t first, snapid_t last,
1296 MDSContext::vec& ls)
7c673cae
FG
1297{
1298 if (waiting_on_dentry.empty())
1299 return;
1300
1301 string_snap_t lb(dname, first);
1302 string_snap_t ub(dname, last);
94b18763
FG
1303 auto it = waiting_on_dentry.lower_bound(lb);
1304 while (it != waiting_on_dentry.end() &&
1305 !(ub < it->first)) {
11fdf7f2 1306 dout(10) << __func__ << " " << dname
7c673cae 1307 << " [" << first << "," << last << "] found waiter on snap "
94b18763 1308 << it->first.snapid
7c673cae 1309 << " on " << *this << dendl;
1e59de90 1310 std::copy(it->second.begin(), it->second.end(), std::back_inserter(ls));
94b18763 1311 waiting_on_dentry.erase(it++);
7c673cae
FG
1312 }
1313
1314 if (waiting_on_dentry.empty())
1315 put(PIN_DNWAITER);
1316}
1317
11fdf7f2 1318void CDir::add_waiter(uint64_t tag, MDSContext *c)
7c673cae
FG
1319{
1320 // hierarchical?
7c673cae
FG
1321
1322 // at subtree root?
1323 if (tag & WAIT_ATSUBTREEROOT) {
1324 if (!is_subtree_root()) {
1325 // try parent
1326 dout(10) << "add_waiter " << std::hex << tag << std::dec << " " << c << " should be ATSUBTREEROOT, " << *this << " is not root, trying parent" << dendl;
1327 inode->parent->dir->add_waiter(tag, c);
1328 return;
1329 }
1330 }
1331
11fdf7f2 1332 ceph_assert(!(tag & WAIT_CREATED) || state_test(STATE_CREATING));
7c673cae
FG
1333
1334 MDSCacheObject::add_waiter(tag, c);
1335}
1336
1337
1338
1339/* NOTE: this checks dentry waiters too */
11fdf7f2 1340void CDir::take_waiting(uint64_t mask, MDSContext::vec& ls)
7c673cae
FG
1341{
1342 if ((mask & WAIT_DENTRY) && !waiting_on_dentry.empty()) {
1343 // take all dentry waiters
94b18763
FG
1344 for (const auto &p : waiting_on_dentry) {
1345 dout(10) << "take_waiting dentry " << p.first.name
1346 << " snap " << p.first.snapid << " on " << *this << dendl;
1e59de90 1347 std::copy(p.second.begin(), p.second.end(), std::back_inserter(ls));
7c673cae 1348 }
94b18763 1349 waiting_on_dentry.clear();
7c673cae
FG
1350 put(PIN_DNWAITER);
1351 }
1352
1353 // waiting
1354 MDSCacheObject::take_waiting(mask, ls);
1355}
1356
1357
1358void CDir::finish_waiting(uint64_t mask, int result)
1359{
11fdf7f2 1360 dout(11) << __func__ << " mask " << hex << mask << dec << " result " << result << " on " << *this << dendl;
7c673cae 1361
11fdf7f2 1362 MDSContext::vec finished;
7c673cae
FG
1363 take_waiting(mask, finished);
1364 if (result < 0)
1365 finish_contexts(g_ceph_context, finished, result);
1366 else
f67539c2 1367 mdcache->mds->queue_waiters(finished);
7c673cae
FG
1368}
1369
1370
1371
1372// dirty/clean
1373
f67539c2 1374CDir::fnode_ptr CDir::project_fnode(const MutationRef& mut)
7c673cae 1375{
11fdf7f2 1376 ceph_assert(get_version() != 0);
f67539c2
TL
1377
1378 if (mut && mut->is_projected(this))
1379 return std::const_pointer_cast<fnode_t>(projected_fnode.back());
1380
1381 auto pf = allocate_fnode(*get_projected_fnode());
7c673cae
FG
1382
1383 if (scrub_infop && scrub_infop->last_scrub_dirty) {
f67539c2
TL
1384 pf->localized_scrub_stamp = scrub_infop->last_local.time;
1385 pf->localized_scrub_version = scrub_infop->last_local.version;
1386 pf->recursive_scrub_stamp = scrub_infop->last_recursive.time;
1387 pf->recursive_scrub_version = scrub_infop->last_recursive.version;
7c673cae
FG
1388 scrub_infop->last_scrub_dirty = false;
1389 scrub_maybe_delete_info();
1390 }
1391
f67539c2
TL
1392 projected_fnode.emplace_back(pf);
1393 if (mut)
1394 mut->add_projected_node(this);
1395 dout(10) << __func__ << " " << pf.get() << dendl;
1396 return pf;
7c673cae
FG
1397}
1398
f67539c2 1399void CDir::pop_and_dirty_projected_fnode(LogSegment *ls, const MutationRef& mut)
7c673cae 1400{
11fdf7f2 1401 ceph_assert(!projected_fnode.empty());
f67539c2
TL
1402 auto pf = std::move(projected_fnode.front());
1403 dout(15) << __func__ << " " << pf.get() << " v" << pf->version << dendl;
1404
7c673cae 1405 projected_fnode.pop_front();
f67539c2
TL
1406 if (mut)
1407 mut->remove_projected_node(this);
7c673cae 1408
f67539c2
TL
1409 reset_fnode(std::move(pf));
1410 _mark_dirty(ls);
1411}
7c673cae
FG
1412
1413version_t CDir::pre_dirty(version_t min)
1414{
1415 if (min > projected_version)
1416 projected_version = min;
1417 ++projected_version;
11fdf7f2 1418 dout(10) << __func__ << " " << projected_version << dendl;
7c673cae
FG
1419 return projected_version;
1420}
1421
f67539c2 1422void CDir::mark_dirty(LogSegment *ls, version_t pv)
7c673cae 1423{
f67539c2
TL
1424 ceph_assert(is_auth());
1425
1426 if (pv) {
1427 ceph_assert(get_version() < pv);
1428 ceph_assert(pv <= projected_version);
1429 ceph_assert(!projected_fnode.empty() &&
1430 pv <= projected_fnode.front()->version);
1431 }
1432
7c673cae
FG
1433 _mark_dirty(ls);
1434}
1435
1436void CDir::_mark_dirty(LogSegment *ls)
1437{
1438 if (!state_test(STATE_DIRTY)) {
11fdf7f2 1439 dout(10) << __func__ << " (was clean) " << *this << " version " << get_version() << dendl;
7c673cae 1440 _set_dirty_flag();
11fdf7f2 1441 ceph_assert(ls);
7c673cae 1442 } else {
11fdf7f2 1443 dout(10) << __func__ << " (already dirty) " << *this << " version " << get_version() << dendl;
7c673cae
FG
1444 }
1445 if (ls) {
1446 ls->dirty_dirfrags.push_back(&item_dirty);
1447
1448 // if i've never committed, i need to be before _any_ mention of me is trimmed from the journal.
1449 if (committed_version == 0 && !item_new.is_on_list())
1450 ls->new_dirfrags.push_back(&item_new);
1451 }
1452}
1453
1454void CDir::mark_new(LogSegment *ls)
1455{
1456 ls->new_dirfrags.push_back(&item_new);
1457 state_clear(STATE_CREATING);
1458
11fdf7f2 1459 MDSContext::vec waiters;
7c673cae 1460 take_waiting(CDir::WAIT_CREATED, waiters);
f67539c2 1461 mdcache->mds->queue_waiters(waiters);
7c673cae
FG
1462}
1463
2a845540
TL
1464void CDir::set_fresh_fnode(fnode_const_ptr&& ptr) {
1465 ceph_assert(inode->is_auth());
1466 ceph_assert(!is_projected());
1467 ceph_assert(!state_test(STATE_COMMITTING));
1468 reset_fnode(std::move(ptr));
1469 projected_version = committing_version = committed_version = get_version();
1470
1471 if (state_test(STATE_REJOINUNDEF)) {
1472 ceph_assert(mdcache->mds->is_rejoin());
1473 state_clear(STATE_REJOINUNDEF);
1474 mdcache->opened_undef_dirfrag(this);
1475 }
1476}
1477
7c673cae
FG
1478void CDir::mark_clean()
1479{
11fdf7f2 1480 dout(10) << __func__ << " " << *this << " version " << get_version() << dendl;
7c673cae
FG
1481 if (state_test(STATE_DIRTY)) {
1482 item_dirty.remove_myself();
1483 item_new.remove_myself();
1484
1485 state_clear(STATE_DIRTY);
1486 put(PIN_DIRTY);
1487 }
1488}
1489
1490// caller should hold auth pin of this
1491void CDir::log_mark_dirty()
1492{
b32b8144 1493 if (is_dirty() || projected_version > get_version())
7c673cae
FG
1494 return; // noop if it is already dirty or will be dirty
1495
f67539c2
TL
1496 auto _fnode = allocate_fnode(*get_fnode());
1497 _fnode->version = pre_dirty();
1498 reset_fnode(std::move(_fnode));
1499 mark_dirty(mdcache->mds->mdlog->get_current_segment());
7c673cae
FG
1500}
1501
1502void CDir::mark_complete() {
1503 state_set(STATE_COMPLETE);
1504 bloom.reset();
1505}
1506
1507void CDir::first_get()
1508{
1509 inode->get(CInode::PIN_DIRFRAG);
1510}
1511
1512void CDir::last_put()
1513{
1514 inode->put(CInode::PIN_DIRFRAG);
1515}
1516
1517
1518
1519/******************************************************************************
1520 * FETCH and COMMIT
1521 */
1522
1523// -----------------------
1524// FETCH
1e59de90
TL
1525void CDir::fetch(std::string_view dname, snapid_t last,
1526 MDSContext *c, bool ignore_authpinnability)
7c673cae 1527{
1e59de90
TL
1528 if (dname.empty())
1529 dout(10) << "fetch on " << *this << dendl;
1530 else
1531 dout(10) << "fetch key(" << dname << ", '" << last << "')" << dendl;
7c673cae 1532
11fdf7f2
TL
1533 ceph_assert(is_auth());
1534 ceph_assert(!is_complete());
7c673cae 1535
1e59de90 1536 if (!ignore_authpinnability && !can_auth_pin()) {
7c673cae
FG
1537 if (c) {
1538 dout(7) << "fetch waiting for authpinnable" << dendl;
1539 add_waiter(WAIT_UNFREEZE, c);
1540 } else
1541 dout(7) << "fetch not authpinnable and no context" << dendl;
1542 return;
1543 }
1544
1545 // unlinked directory inode shouldn't have any entry
1e59de90
TL
1546 if (CDir *pdir = get_parent_dir();
1547 pdir && pdir->inode->is_stray() && !inode->snaprealm) {
7c673cae
FG
1548 dout(7) << "fetch dirfrag for unlinked directory, mark complete" << dendl;
1549 if (get_version() == 0) {
f67539c2
TL
1550 auto _fnode = allocate_fnode();
1551 _fnode->version = 1;
2a845540 1552 set_fresh_fnode(std::move(_fnode));
7c673cae
FG
1553 }
1554 mark_complete();
1555
1556 if (c)
f67539c2 1557 mdcache->mds->queue_waiter(c);
7c673cae
FG
1558 return;
1559 }
1560
1e59de90
TL
1561 // FIXME: to fetch a snap dentry, we need to get omap key in range
1562 // [(name, last), (name, CEPH_NOSNAP))
1563 if (!dname.empty() && last == CEPH_NOSNAP && !g_conf().get_val<bool>("mds_dir_prefetch")) {
1564 dentry_key_t key(last, dname, inode->hash_dentry_name(dname));
1565 fetch_keys({key}, c);
1566 return;
1567 }
1568
1569 if (c)
1570 add_waiter(WAIT_COMPLETE, c);
7c673cae
FG
1571
1572 // already fetching?
1573 if (state_test(CDir::STATE_FETCHING)) {
1574 dout(7) << "already fetching; waiting" << dendl;
1575 return;
1576 }
1577
1578 auth_pin(this);
1579 state_set(CDir::STATE_FETCHING);
1580
1e59de90 1581 _omap_fetch(nullptr, nullptr);
7c673cae 1582
1e59de90
TL
1583 if (mdcache->mds->logger)
1584 mdcache->mds->logger->inc(l_mds_dir_fetch_complete);
522d829b 1585 mdcache->mds->balancer->hit_dir(this, META_POP_FETCH);
7c673cae
FG
1586}
1587
1e59de90 1588void CDir::fetch_keys(const std::vector<dentry_key_t>& keys, MDSContext *c)
7c673cae 1589{
1e59de90 1590 dout(10) << __func__ << " " << keys.size() << " keys on " << *this << dendl;
11fdf7f2
TL
1591 ceph_assert(is_auth());
1592 ceph_assert(!is_complete());
7c673cae 1593
1e59de90
TL
1594 if (CDir *pdir = get_parent_dir();
1595 pdir && pdir->inode->is_stray() && !inode->snaprealm) {
1596 fetch(c, true);
7c673cae
FG
1597 return;
1598 }
1e59de90
TL
1599
1600 MDSContext::vec_alloc<mempool::mds_co::pool_allocator> *fallback_waiting = nullptr;
1601 std::set<std::string> str_keys;
1602 for (auto& key : keys) {
1603 ceph_assert(key.snapid == CEPH_NOSNAP);
1604 if (waiting_on_dentry.empty())
1605 get(PIN_DNWAITER);
1606 auto em = waiting_on_dentry.emplace(std::piecewise_construct,
1607 std::forward_as_tuple(key.name, key.snapid),
1608 std::forward_as_tuple());
1609 if (!em.second) {
1610 if (!fallback_waiting)
1611 fallback_waiting = &em.first->second;
1612 continue;
1613 }
1614
1615 if (c) {
1616 em.first->second.push_back(c);
1617 c = nullptr;
1618 }
1619
1620 string str;
1621 key.encode(str);
1622 str_keys.emplace(std::move(str));
1623 }
1624
1625 if (str_keys.empty()) {
1626 if (c && fallback_waiting) {
1627 fallback_waiting->push_back(c);
1628 c = nullptr;
1629 }
1630
1631 if (get_version() > 0) {
1632 dout(7) << "fetch keys, all are already being fetched" << dendl;
1633 ceph_assert(!c);
1634 return;
1635 }
1636 }
1637
7c673cae 1638 if (state_test(CDir::STATE_FETCHING)) {
1e59de90
TL
1639 dout(7) << "fetch keys, waiting for full fetch" << dendl;
1640 if (c)
1641 add_waiter(WAIT_COMPLETE, c);
7c673cae
FG
1642 return;
1643 }
1644
1645 auth_pin(this);
1e59de90 1646 _omap_fetch(&str_keys, c);
7c673cae 1647
1e59de90
TL
1648 if (mdcache->mds->logger)
1649 mdcache->mds->logger->inc(l_mds_dir_fetch_keys);
522d829b 1650 mdcache->mds->balancer->hit_dir(this, META_POP_FETCH);
7c673cae
FG
1651}
1652
1653class C_IO_Dir_OMAP_FetchedMore : public CDirIOContext {
11fdf7f2 1654 MDSContext *fin;
7c673cae 1655public:
f67539c2 1656 const version_t omap_version;
7c673cae
FG
1657 bufferlist hdrbl;
1658 bool more = false;
1659 map<string, bufferlist> omap; ///< carry-over from before
1660 map<string, bufferlist> omap_more; ///< new batch
1661 int ret;
f67539c2
TL
1662 C_IO_Dir_OMAP_FetchedMore(CDir *d, version_t v, MDSContext *f) :
1663 CDirIOContext(d), fin(f), omap_version(v), ret(0) { }
7c673cae 1664 void finish(int r) {
f67539c2
TL
1665 if (omap_version < dir->get_committed_version()) {
1666 omap.clear();
1e59de90 1667 dir->_omap_fetch(nullptr, fin);
f67539c2
TL
1668 return;
1669 }
1670
7c673cae
FG
1671 // merge results
1672 if (omap.empty()) {
1673 omap.swap(omap_more);
1674 } else {
1675 omap.insert(omap_more.begin(), omap_more.end());
1676 }
1677 if (more) {
f67539c2 1678 dir->_omap_fetch_more(omap_version, hdrbl, omap, fin);
7c673cae 1679 } else {
1e59de90 1680 dir->_omap_fetched(hdrbl, omap, true, {}, r);
7c673cae
FG
1681 if (fin)
1682 fin->complete(r);
1683 }
1684 }
91327a77
AA
1685 void print(ostream& out) const override {
1686 out << "dirfrag_fetch_more(" << dir->dirfrag() << ")";
1687 }
7c673cae
FG
1688};
1689
1690class C_IO_Dir_OMAP_Fetched : public CDirIOContext {
11fdf7f2 1691 MDSContext *fin;
7c673cae 1692public:
f67539c2 1693 const version_t omap_version;
1e59de90
TL
1694 bool complete = true;
1695 std::set<string> keys;
7c673cae
FG
1696 bufferlist hdrbl;
1697 bool more = false;
1698 map<string, bufferlist> omap;
1699 bufferlist btbl;
1700 int ret1, ret2, ret3;
1701
11fdf7f2 1702 C_IO_Dir_OMAP_Fetched(CDir *d, MDSContext *f) :
f67539c2
TL
1703 CDirIOContext(d), fin(f),
1704 omap_version(d->get_committing_version()),
1705 ret1(0), ret2(0), ret3(0) { }
7c673cae
FG
1706 void finish(int r) override {
1707 // check the correctness of backtrace
f67539c2 1708 if (r >= 0 && ret3 != -CEPHFS_ECANCELED)
7c673cae
FG
1709 dir->inode->verify_diri_backtrace(btbl, ret3);
1710 if (r >= 0) r = ret1;
1711 if (r >= 0) r = ret2;
f67539c2 1712
7c673cae 1713 if (more) {
f67539c2 1714 if (omap_version < dir->get_committed_version()) {
1e59de90 1715 dir->_omap_fetch(nullptr, fin);
f67539c2 1716 } else {
1e59de90 1717 dir->_omap_fetch_more(omap_version, hdrbl, omap, fin);
f67539c2
TL
1718 }
1719 return;
7c673cae 1720 }
f67539c2 1721
1e59de90 1722 dir->_omap_fetched(hdrbl, omap, complete, keys, r);
f67539c2
TL
1723 if (fin)
1724 fin->complete(r);
7c673cae 1725 }
91327a77
AA
1726 void print(ostream& out) const override {
1727 out << "dirfrag_fetch(" << dir->dirfrag() << ")";
1728 }
7c673cae
FG
1729};
1730
1e59de90 1731void CDir::_omap_fetch(std::set<string> *keys, MDSContext *c)
7c673cae
FG
1732{
1733 C_IO_Dir_OMAP_Fetched *fin = new C_IO_Dir_OMAP_Fetched(this, c);
1734 object_t oid = get_ondisk_object();
f67539c2 1735 object_locator_t oloc(mdcache->mds->mdsmap->get_metadata_pool());
7c673cae
FG
1736 ObjectOperation rd;
1737 rd.omap_get_header(&fin->hdrbl, &fin->ret1);
1e59de90
TL
1738 if (keys) {
1739 fin->complete = false;
1740 fin->keys.swap(*keys);
1741 rd.omap_get_vals_by_keys(fin->keys, &fin->omap, &fin->ret2);
1742 } else {
11fdf7f2
TL
1743 ceph_assert(!c);
1744 rd.omap_get_vals("", "", g_conf()->mds_dir_keys_per_op,
7c673cae 1745 &fin->omap, &fin->more, &fin->ret2);
7c673cae
FG
1746 }
1747 // check the correctness of backtrace
11fdf7f2 1748 if (g_conf()->mds_verify_backtrace > 0 && frag == frag_t()) {
7c673cae
FG
1749 rd.getxattr("parent", &fin->btbl, &fin->ret3);
1750 rd.set_last_op_flags(CEPH_OSD_OP_FLAG_FAILOK);
1751 } else {
f67539c2 1752 fin->ret3 = -CEPHFS_ECANCELED;
7c673cae
FG
1753 }
1754
f67539c2
TL
1755 mdcache->mds->objecter->read(oid, oloc, rd, CEPH_NOSNAP, NULL, 0,
1756 new C_OnFinisher(fin, mdcache->mds->finisher));
7c673cae
FG
1757}
1758
f67539c2
TL
1759void CDir::_omap_fetch_more(version_t omap_version, bufferlist& hdrbl,
1760 map<string, bufferlist>& omap, MDSContext *c)
7c673cae
FG
1761{
1762 // we have more omap keys to fetch!
1763 object_t oid = get_ondisk_object();
f67539c2
TL
1764 object_locator_t oloc(mdcache->mds->mdsmap->get_metadata_pool());
1765 auto fin = new C_IO_Dir_OMAP_FetchedMore(this, omap_version, c);
1766 fin->hdrbl = std::move(hdrbl);
7c673cae
FG
1767 fin->omap.swap(omap);
1768 ObjectOperation rd;
1769 rd.omap_get_vals(fin->omap.rbegin()->first,
1770 "", /* filter prefix */
11fdf7f2 1771 g_conf()->mds_dir_keys_per_op,
7c673cae
FG
1772 &fin->omap_more,
1773 &fin->more,
1774 &fin->ret);
f67539c2
TL
1775 mdcache->mds->objecter->read(oid, oloc, rd, CEPH_NOSNAP, NULL, 0,
1776 new C_OnFinisher(fin, mdcache->mds->finisher));
7c673cae
FG
1777}
1778
1779CDentry *CDir::_load_dentry(
11fdf7f2
TL
1780 std::string_view key,
1781 std::string_view dname,
7c673cae
FG
1782 const snapid_t last,
1783 bufferlist &bl,
1784 const int pos,
1785 const std::set<snapid_t> *snaps,
f91f0fd5 1786 double rand_threshold,
28e407b8 1787 bool *force_dirty)
7c673cae 1788{
11fdf7f2 1789 auto q = bl.cbegin();
7c673cae
FG
1790
1791 snapid_t first;
11fdf7f2 1792 decode(first, q);
7c673cae
FG
1793
1794 // marker
1795 char type;
11fdf7f2 1796 decode(type, q);
7c673cae
FG
1797
1798 dout(20) << "_fetched pos " << pos << " marker '" << type << "' dname '" << dname
1799 << " [" << first << "," << last << "]"
1800 << dendl;
1801
1802 bool stale = false;
1803 if (snaps && last != CEPH_NOSNAP) {
1804 set<snapid_t>::const_iterator p = snaps->lower_bound(first);
1805 if (p == snaps->end() || *p > last) {
1806 dout(10) << " skipping stale dentry on [" << first << "," << last << "]" << dendl;
1807 stale = true;
1808 }
1809 }
f67539c2 1810
7c673cae
FG
1811 /*
1812 * look for existing dentry for _last_ snap, because unlink +
1813 * create may leave a "hole" (epochs during which the dentry
1814 * doesn't exist) but for which no explicit negative dentry is in
1815 * the cache.
1816 */
1817 CDentry *dn;
1818 if (stale)
1819 dn = lookup_exact_snap(dname, last);
1820 else
1821 dn = lookup(dname, last);
1822
f67539c2 1823 if (type == 'L' || type == 'l') {
7c673cae
FG
1824 // hard link
1825 inodeno_t ino;
1826 unsigned char d_type;
f67539c2
TL
1827 mempool::mds_co::string alternate_name;
1828
1829 CDentry::decode_remote(type, ino, d_type, alternate_name, q);
7c673cae
FG
1830
1831 if (stale) {
1832 if (!dn) {
94b18763 1833 stale_items.insert(mempool::mds_co::string(key));
7c673cae
FG
1834 *force_dirty = true;
1835 }
1836 return dn;
1837 }
1838
1839 if (dn) {
28e407b8
AA
1840 CDentry::linkage_t *dnl = dn->get_linkage();
1841 dout(12) << "_fetched had " << (dnl->is_null() ? "NEG" : "") << " dentry " << *dn << dendl;
1842 if (committed_version == 0 &&
1843 dnl->is_remote() &&
1844 dn->is_dirty() &&
1845 ino == dnl->get_remote_ino() &&
f67539c2
TL
1846 d_type == dnl->get_remote_d_type() &&
1847 alternate_name == dn->get_alternate_name()) {
28e407b8
AA
1848 // see comment below
1849 dout(10) << "_fetched had underwater dentry " << *dn << ", marking clean" << dendl;
1850 dn->mark_clean();
7c673cae
FG
1851 }
1852 } else {
1853 // (remote) link
f67539c2
TL
1854 dn = add_remote_dentry(dname, ino, d_type, std::move(alternate_name), first, last);
1855
7c673cae 1856 // link to inode?
f67539c2 1857 CInode *in = mdcache->get_inode(ino); // we may or may not have it.
7c673cae
FG
1858 if (in) {
1859 dn->link_remote(dn->get_linkage(), in);
1860 dout(12) << "_fetched got remote link " << ino << " which we have " << *in << dendl;
1861 } else {
11fdf7f2 1862 dout(12) << "_fetched got remote link " << ino << " (don't have it)" << dendl;
7c673cae
FG
1863 }
1864 }
f67539c2
TL
1865 }
1866 else if (type == 'I' || type == 'i') {
1867 InodeStore inode_data;
1868 mempool::mds_co::string alternate_name;
7c673cae 1869 // inode
7c673cae 1870 // Load inode data before looking up or constructing CInode
f67539c2
TL
1871 if (type == 'i') {
1872 DECODE_START(2, q);
1873 if (struct_v >= 2) {
1874 decode(alternate_name, q);
1875 }
1876 inode_data.decode(q);
1877 DECODE_FINISH(q);
1878 } else {
1879 inode_data.decode_bare(q);
1880 }
1881
7c673cae
FG
1882 if (stale) {
1883 if (!dn) {
94b18763 1884 stale_items.insert(mempool::mds_co::string(key));
7c673cae
FG
1885 *force_dirty = true;
1886 }
1887 return dn;
1888 }
1889
1890 bool undef_inode = false;
1891 if (dn) {
28e407b8
AA
1892 CDentry::linkage_t *dnl = dn->get_linkage();
1893 dout(12) << "_fetched had " << (dnl->is_null() ? "NEG" : "") << " dentry " << *dn << dendl;
1894
1895 if (dnl->is_primary()) {
1896 CInode *in = dnl->get_inode();
1897 if (in->state_test(CInode::STATE_REJOINUNDEF)) {
1898 undef_inode = true;
1899 } else if (committed_version == 0 &&
1900 dn->is_dirty() &&
f67539c2
TL
1901 inode_data.inode->ino == in->ino() &&
1902 inode_data.inode->version == in->get_version()) {
28e407b8
AA
1903 /* clean underwater item?
1904 * Underwater item is something that is dirty in our cache from
1905 * journal replay, but was previously flushed to disk before the
1906 * mds failed.
1907 *
1908 * We only do this is committed_version == 0. that implies either
1909 * - this is a fetch after from a clean/empty CDir is created
1910 * (and has no effect, since the dn won't exist); or
1911 * - this is a fetch after _recovery_, which is what we're worried
1912 * about. Items that are marked dirty from the journal should be
1913 * marked clean if they appear on disk.
1914 */
1915 dout(10) << "_fetched had underwater dentry " << *dn << ", marking clean" << dendl;
1916 dn->mark_clean();
1917 dout(10) << "_fetched had underwater inode " << *dnl->get_inode() << ", marking clean" << dendl;
1918 in->mark_clean();
1919 }
1920 }
7c673cae
FG
1921 }
1922
1923 if (!dn || undef_inode) {
1924 // add inode
f67539c2 1925 CInode *in = mdcache->get_inode(inode_data.inode->ino, last);
7c673cae
FG
1926 if (!in || undef_inode) {
1927 if (undef_inode && in)
1928 in->first = first;
1929 else
f67539c2 1930 in = new CInode(mdcache, true, first, last);
7c673cae 1931
f67539c2
TL
1932 in->reset_inode(std::move(inode_data.inode));
1933 in->reset_xattrs(std::move(inode_data.xattrs));
7c673cae
FG
1934 // symlink?
1935 if (in->is_symlink())
1936 in->symlink = inode_data.symlink;
1937
1938 in->dirfragtree.swap(inode_data.dirfragtree);
f67539c2
TL
1939 in->reset_old_inodes(std::move(inode_data.old_inodes));
1940 if (in->is_any_old_inodes()) {
1941 snapid_t min_first = in->get_old_inodes()->rbegin()->first + 1;
7c673cae
FG
1942 if (min_first > in->first)
1943 in->first = min_first;
1944 }
1945
1946 in->oldest_snap = inode_data.oldest_snap;
1947 in->decode_snap_blob(inode_data.snap_blob);
1948 if (snaps && !in->snaprealm)
1949 in->purge_stale_snap_data(*snaps);
1950
1951 if (!undef_inode) {
f67539c2 1952 mdcache->add_inode(in); // add
05a536ef 1953 mdcache->insert_taken_inos(in->ino());
f67539c2 1954 dn = add_primary_dentry(dname, in, std::move(alternate_name), first, last); // link
7c673cae
FG
1955 }
1956 dout(12) << "_fetched got " << *dn << " " << *in << dendl;
1957
f67539c2 1958 if (in->get_inode()->is_dirty_rstat())
7c673cae
FG
1959 in->mark_dirty_rstat();
1960
f67539c2 1961 in->maybe_ephemeral_rand(rand_threshold);
7c673cae
FG
1962 //in->hack_accessed = false;
1963 //in->hack_load_stamp = ceph_clock_now();
1964 //num_new_inodes_loaded++;
11fdf7f2 1965 } else if (g_conf().get_val<bool>("mds_hack_allow_loading_invalid_metadata")) {
94b18763 1966 dout(20) << "hack: adding duplicate dentry for " << *in << dendl;
f67539c2 1967 dn = add_primary_dentry(dname, in, std::move(alternate_name), first, last);
7c673cae
FG
1968 } else {
1969 dout(0) << "_fetched badness: got (but i already had) " << *in
f67539c2
TL
1970 << " mode " << in->get_inode()->mode
1971 << " mtime " << in->get_inode()->mtime << dendl;
7c673cae
FG
1972 string dirpath, inopath;
1973 this->inode->make_path_string(dirpath);
1974 in->make_path_string(inopath);
f67539c2
TL
1975 mdcache->mds->clog->error() << "loaded dup inode " << inode_data.inode->ino
1976 << " [" << first << "," << last << "] v" << inode_data.inode->version
7c673cae 1977 << " at " << dirpath << "/" << dname
f67539c2 1978 << ", but inode " << in->vino() << " v" << in->get_version()
7c673cae
FG
1979 << " already exists at " << inopath;
1980 return dn;
1981 }
1982 }
1983 } else {
f67539c2
TL
1984 CachedStackStringStream css;
1985 *css << "Invalid tag char '" << type << "' pos " << pos;
1986 throw buffer::malformed_input(css->str());
7c673cae
FG
1987 }
1988
1989 return dn;
1990}
1991
1992void CDir::_omap_fetched(bufferlist& hdrbl, map<string, bufferlist>& omap,
1e59de90 1993 bool complete, const std::set<string>& keys, int r)
7c673cae 1994{
f67539c2 1995 LogChannelRef clog = mdcache->mds->clog;
7c673cae
FG
1996 dout(10) << "_fetched header " << hdrbl.length() << " bytes "
1997 << omap.size() << " keys for " << *this << dendl;
1998
f67539c2 1999 ceph_assert(r == 0 || r == -CEPHFS_ENOENT || r == -CEPHFS_ENODATA);
11fdf7f2
TL
2000 ceph_assert(is_auth());
2001 ceph_assert(!is_frozen());
7c673cae
FG
2002
2003 if (hdrbl.length() == 0) {
2004 dout(0) << "_fetched missing object for " << *this << dendl;
2005
2006 clog->error() << "dir " << dirfrag() << " object missing on disk; some "
2007 "files may be lost (" << get_path() << ")";
2008
2009 go_bad(complete);
2010 return;
2011 }
2012
2013 fnode_t got_fnode;
2014 {
11fdf7f2 2015 auto p = hdrbl.cbegin();
7c673cae 2016 try {
11fdf7f2 2017 decode(got_fnode, p);
7c673cae
FG
2018 } catch (const buffer::error &err) {
2019 derr << "Corrupt fnode in dirfrag " << dirfrag()
f67539c2 2020 << ": " << err.what() << dendl;
7c673cae 2021 clog->warn() << "Corrupt fnode header in " << dirfrag() << ": "
f67539c2 2022 << err.what() << " (" << get_path() << ")";
7c673cae
FG
2023 go_bad(complete);
2024 return;
2025 }
2026 if (!p.end()) {
2027 clog->warn() << "header buffer of dir " << dirfrag() << " has "
2028 << hdrbl.length() - p.get_off() << " extra bytes ("
2029 << get_path() << ")";
2030 go_bad(complete);
2031 return;
2032 }
2033 }
2034
2035 dout(10) << "_fetched version " << got_fnode.version << dendl;
2036
2037 // take the loaded fnode?
2038 // only if we are a fresh CDir* with no prior state.
2039 if (get_version() == 0) {
2a845540 2040 set_fresh_fnode(allocate_fnode(got_fnode));
7c673cae
FG
2041 }
2042
2043 list<CInode*> undef_inodes;
2044
2045 // purge stale snaps?
7c673cae
FG
2046 bool force_dirty = false;
2047 const set<snapid_t> *snaps = NULL;
2048 SnapRealm *realm = inode->find_snaprealm();
f67539c2 2049 if (fnode->snap_purged_thru < realm->get_last_destroyed()) {
7c673cae 2050 snaps = &realm->get_snaps();
f67539c2 2051 dout(10) << " snap_purged_thru " << fnode->snap_purged_thru
7c673cae
FG
2052 << " < " << realm->get_last_destroyed()
2053 << ", snap purge based on " << *snaps << dendl;
2054 if (get_num_snap_items() == 0) {
f67539c2 2055 const_cast<snapid_t&>(fnode->snap_purged_thru) = realm->get_last_destroyed();
7c673cae
FG
2056 force_dirty = true;
2057 }
2058 }
2059
1e59de90
TL
2060
2061 MDSContext::vec finished;
2062 std::vector<string_snap_t> null_keys;
2063
2064 auto k_it = keys.rbegin();
2065 auto w_it = waiting_on_dentry.rbegin();
2066 std::string_view last_name = "";
2067
2068 auto proc_waiters = [&](const string_snap_t& key) {
2069 bool touch = false;
2070 if (last_name < key.name) {
2071 // string_snap_t and key string are not in the same order
2072 w_it = decltype(w_it)(waiting_on_dentry.upper_bound(key));
2073 }
2074 while (w_it != waiting_on_dentry.rend()) {
2075 int cmp = w_it->first.compare(key);
2076 if (cmp < 0)
2077 break;
2078 if (cmp == 0) {
2079 touch = true;
2080 std::copy(w_it->second.begin(), w_it->second.end(),
2081 std::back_inserter(finished));
2082 waiting_on_dentry.erase(std::next(w_it).base());
2083 if (waiting_on_dentry.empty())
2084 put(PIN_DNWAITER);
2085 break;
2086 }
2087 ++w_it;
2088 }
2089 return touch;
2090 };
2091 auto proc_nulls_and_waiters = [&](const string& str_key, const string_snap_t& key) {
2092 bool touch = false;
2093 int count = 0;
2094
2095 while (k_it != keys.rend()) {
2096 int cmp = k_it->compare(str_key);
2097 if (cmp < 0)
2098 break;
2099 if (cmp == 0) {
2100 touch = true;
2101 proc_waiters(key);
2102 ++k_it;
2103 break;
2104 }
2105 string_snap_t n_key;
2106 dentry_key_t::decode_helper(*k_it, n_key.name, n_key.snapid);
2107 ceph_assert(n_key.snapid == CEPH_NOSNAP);
2108 proc_waiters(n_key);
2109 last_name = std::string_view(k_it->c_str(), n_key.name.length());
2110 null_keys.emplace_back(std::move(n_key));
2111 ++k_it;
2112
2113 if (!(++count % mdcache->mds->heartbeat_reset_grace()))
2114 mdcache->mds->heartbeat_reset();
2115 }
2116 return touch;
2117 };
2118
33c7a0ef 2119 int count = 0;
7c673cae 2120 unsigned pos = omap.size() - 1;
f91f0fd5 2121 double rand_threshold = get_inode()->get_ephemeral_rand();
1e59de90
TL
2122 for (auto p = omap.rbegin(); p != omap.rend(); ++p, --pos) {
2123 string_snap_t key;
2124 dentry_key_t::decode_helper(p->first, key.name, key.snapid);
2125 bool touch = false;
2126
2127 if (key.snapid == CEPH_NOSNAP) {
2128 if (complete) {
2129 touch = proc_waiters(key);
2130 } else {
2131 touch = proc_nulls_and_waiters(p->first, key);
2132 }
2133 last_name = std::string_view(p->first.c_str(), key.name.length());
2134 }
7c673cae 2135
1e59de90 2136 if (!(++count % mdcache->mds->heartbeat_reset_grace()))
33c7a0ef
TL
2137 mdcache->mds->heartbeat_reset();
2138
1e59de90 2139 CDentry *dn = nullptr;
7c673cae
FG
2140 try {
2141 dn = _load_dentry(
1e59de90 2142 p->first, key.name, key.snapid, p->second, pos, snaps,
f91f0fd5 2143 rand_threshold, &force_dirty);
7c673cae 2144 } catch (const buffer::error &err) {
1e59de90 2145 mdcache->mds->clog->warn() << "Corrupt dentry '" << key.name << "' in "
7c673cae 2146 "dir frag " << dirfrag() << ": "
f67539c2 2147 << err.what() << "(" << get_path() << ")";
7c673cae
FG
2148
2149 // Remember that this dentry is damaged. Subsequent operations
f67539c2 2150 // that try to act directly on it will get their CEPHFS_EIOs, but this
7c673cae
FG
2151 // dirfrag as a whole will continue to look okay (minus the
2152 // mysteriously-missing dentry)
1e59de90 2153 go_bad_dentry(key.snapid, key.name);
7c673cae
FG
2154
2155 // Anyone who was WAIT_DENTRY for this guy will get kicked
2156 // to RetryRequest, and hit the DamageTable-interrogating path.
2157 // Stats will now be bogus because we will think we're complete,
2158 // but have 1 or more missing dentries.
2159 continue;
2160 }
2161
28e407b8
AA
2162 if (!dn)
2163 continue;
7c673cae 2164
1e59de90
TL
2165 if (touch) {
2166 dout(10) << " touching wanted dn " << *dn << dendl;
2167 mdcache->touch_dentry(dn);
2168 }
2169
28e407b8
AA
2170 CDentry::linkage_t *dnl = dn->get_linkage();
2171 if (dnl->is_primary() && dnl->get_inode()->state_test(CInode::STATE_REJOINUNDEF))
2172 undef_inodes.push_back(dnl->get_inode());
1e59de90 2173 }
7c673cae 2174
1e59de90
TL
2175 if (complete) {
2176 if (!waiting_on_dentry.empty()) {
2177 for (auto &p : waiting_on_dentry) {
2178 std::copy(p.second.begin(), p.second.end(), std::back_inserter(finished));
2179 if (p.first.snapid == CEPH_NOSNAP)
2180 null_keys.emplace_back(p.first);
2181 }
2182 waiting_on_dentry.clear();
2183 put(PIN_DNWAITER);
2184 }
2185 } else {
2186 proc_nulls_and_waiters("", string_snap_t());
2187 }
2188
2189 if (!null_keys.empty()) {
2190 snapid_t first = mdcache->get_global_snaprealm()->get_newest_seq() + 1;
2191 for (auto& key : null_keys) {
2192 CDentry* dn = lookup(key.name, key.snapid);
2193 if (dn) {
2194 dout(12) << "_fetched got null for key " << key << ", have " << *dn << dendl;
2195 } else {
2196 dn = add_null_dentry(key.name, first, key.snapid);
2197 dout(12) << "_fetched got null for key " << key << ", added " << *dn << dendl;
2198 }
f67539c2 2199 mdcache->touch_dentry(dn);
33c7a0ef
TL
2200
2201 if (!(++count % mdcache->mds->heartbeat_reset_grace(2)))
2202 mdcache->mds->heartbeat_reset();
7c673cae
FG
2203 }
2204 }
2205
2206 //cache->mds->logger->inc("newin", num_new_inodes_loaded);
2207
2208 // mark complete, !fetching
2209 if (complete) {
7c673cae
FG
2210 mark_complete();
2211 state_clear(STATE_FETCHING);
1e59de90 2212 take_waiting(WAIT_COMPLETE, finished);
7c673cae
FG
2213 }
2214
2215 // open & force frags
2216 while (!undef_inodes.empty()) {
2217 CInode *in = undef_inodes.front();
33c7a0ef 2218
7c673cae
FG
2219 undef_inodes.pop_front();
2220 in->state_clear(CInode::STATE_REJOINUNDEF);
f67539c2 2221 mdcache->opened_undef_inode(in);
33c7a0ef
TL
2222
2223 if (!(++count % mdcache->mds->heartbeat_reset_grace()))
2224 mdcache->mds->heartbeat_reset();
7c673cae
FG
2225 }
2226
2227 // dirty myself to remove stale snap dentries
f67539c2 2228 if (force_dirty && !mdcache->is_readonly())
7c673cae
FG
2229 log_mark_dirty();
2230
2231 auth_unpin(this);
2232
1e59de90
TL
2233 if (!finished.empty())
2234 mdcache->mds->queue_waiters(finished);
7c673cae
FG
2235}
2236
11fdf7f2 2237void CDir::go_bad_dentry(snapid_t last, std::string_view dname)
7c673cae 2238{
94b18763
FG
2239 dout(10) << __func__ << " " << dname << dendl;
2240 std::string path(get_path());
2241 path += "/";
11fdf7f2 2242 path += dname;
f67539c2 2243 const bool fatal = mdcache->mds->damage_table.notify_dentry(
94b18763 2244 inode->ino(), frag, last, dname, path);
7c673cae 2245 if (fatal) {
f67539c2 2246 mdcache->mds->damaged();
7c673cae
FG
2247 ceph_abort(); // unreachable, damaged() respawns us
2248 }
2249}
2250
2251void CDir::go_bad(bool complete)
2252{
11fdf7f2 2253 dout(10) << __func__ << " " << frag << dendl;
f67539c2 2254 const bool fatal = mdcache->mds->damage_table.notify_dirfrag(
7c673cae
FG
2255 inode->ino(), frag, get_path());
2256 if (fatal) {
f67539c2 2257 mdcache->mds->damaged();
7c673cae
FG
2258 ceph_abort(); // unreachable, damaged() respawns us
2259 }
2260
f91f0fd5 2261 if (complete) {
f67539c2
TL
2262 if (get_version() == 0) {
2263 auto _fnode = allocate_fnode();
2264 _fnode->version = 1;
2265 reset_fnode(std::move(_fnode));
2266 }
f91f0fd5
TL
2267
2268 state_set(STATE_BADFRAG);
2269 mark_complete();
2270 }
2271
2272 state_clear(STATE_FETCHING);
2273 auth_unpin(this);
f67539c2 2274 finish_waiting(WAIT_COMPLETE, -CEPHFS_EIO);
7c673cae
FG
2275}
2276
2277// -----------------------
2278// COMMIT
2279
2280/**
2281 * commit
2282 *
2283 * @param want - min version i want committed
2284 * @param c - callback for completion
2285 */
11fdf7f2 2286void CDir::commit(version_t want, MDSContext *c, bool ignore_authpinnability, int op_prio)
7c673cae
FG
2287{
2288 dout(10) << "commit want " << want << " on " << *this << dendl;
2289 if (want == 0) want = get_version();
2290
2291 // preconditions
11fdf7f2
TL
2292 ceph_assert(want <= get_version() || get_version() == 0); // can't commit the future
2293 ceph_assert(want > committed_version); // the caller is stupid
2294 ceph_assert(is_auth());
2295 ceph_assert(ignore_authpinnability || can_auth_pin());
7c673cae 2296
7c673cae
FG
2297 // note: queue up a noop if necessary, so that we always
2298 // get an auth_pin.
2299 if (!c)
2300 c = new C_MDSInternalNoop;
2301
2302 // auth_pin on first waiter
2303 if (waiting_for_commit.empty())
2304 auth_pin(this);
2305 waiting_for_commit[want].push_back(c);
2306
2307 // ok.
2308 _commit(want, op_prio);
2309}
2310
2311class C_IO_Dir_Committed : public CDirIOContext {
2312 version_t version;
2313public:
2314 C_IO_Dir_Committed(CDir *d, version_t v) : CDirIOContext(d), version(v) { }
2315 void finish(int r) override {
2316 dir->_committed(r, version);
2317 }
91327a77 2318 void print(ostream& out) const override {
f67539c2
TL
2319 out << "dirfrag_committed(" << dir->dirfrag() << ")";
2320 }
2321};
2322
2323class C_IO_Dir_Commit_Ops : public Context {
2324public:
2325 C_IO_Dir_Commit_Ops(CDir *d, int pr,
2326 vector<CDir::dentry_commit_item> &&s, bufferlist &&bl,
2327 vector<string> &&r,
2328 mempool::mds_co::compact_set<mempool::mds_co::string> &&stales) :
2329 dir(d), op_prio(pr) {
b3b6e05e 2330 metapool = dir->mdcache->mds->get_metadata_pool();
f67539c2
TL
2331 version = dir->get_version();
2332 is_new = dir->is_new();
2333 to_set.swap(s);
2334 dfts.swap(bl);
2335 to_remove.swap(r);
2336 stale_items.swap(stales);
91327a77 2337 }
f67539c2
TL
2338
2339 void finish(int r) override {
2340 dir->_omap_commit_ops(r, op_prio, metapool, version, is_new, to_set, dfts,
2341 to_remove, stale_items);
2342 }
2343
2344private:
2345 CDir *dir;
2346 int op_prio;
2347 int64_t metapool;
2348 version_t version;
2349 bool is_new;
2350 vector<CDir::dentry_commit_item> to_set;
2351 bufferlist dfts;
2352 vector<string> to_remove;
2353 mempool::mds_co::compact_set<mempool::mds_co::string> stale_items;
7c673cae
FG
2354};
2355
f67539c2
TL
2356// This is doing the same thing with the InodeStoreBase::encode()
2357void CDir::_encode_primary_inode_base(dentry_commit_item &item, bufferlist &dfts,
2358 bufferlist &bl)
2359{
2360 ENCODE_START(6, 4, bl);
2361 encode(*item.inode, bl, item.features);
2362
2363 if (!item.symlink.empty())
2364 encode(item.symlink, bl);
2365
2366 // dirfragtree
2367 dfts.splice(0, item.dft_len, &bl);
2368
2369 if (item.xattrs)
2370 encode(*item.xattrs, bl);
2371 else
2372 encode((__u32)0, bl);
2373
2374 if (item.snaprealm) {
2375 bufferlist snapr_bl;
2376 encode(item.srnode, snapr_bl);
2377 encode(snapr_bl, bl);
2378 } else {
2379 encode(bufferlist(), bl);
2380 }
2381
2382 if (item.old_inodes)
2383 encode(*item.old_inodes, bl, item.features);
2384 else
2385 encode((__u32)0, bl);
2386
2387 encode(item.oldest_snap, bl);
2388 encode(item.damage_flags, bl);
2389 ENCODE_FINISH(bl);
2390}
2391
2392// This is not locked by mds_lock
2393void CDir::_omap_commit_ops(int r, int op_prio, int64_t metapool, version_t version, bool _new,
2394 vector<dentry_commit_item> &to_set, bufferlist &dfts,
2395 vector<string>& to_remove,
2396 mempool::mds_co::compact_set<mempool::mds_co::string> &stales)
2397{
2398 dout(10) << __func__ << dendl;
2399
2400 if (r < 0) {
2401 mdcache->mds->handle_write_error_with_lock(r);
2402 return;
2403 }
2404
2405 C_GatherBuilder gather(g_ceph_context,
2406 new C_OnFinisher(new C_IO_Dir_Committed(this, version),
2407 mdcache->mds->finisher));
2408
2409 SnapContext snapc;
2410 object_t oid = get_ondisk_object();
2411 object_locator_t oloc(metapool);
2412
2413 map<string, bufferlist> _set;
2414 set<string> _rm;
2415
2416 unsigned max_write_size = mdcache->max_dir_commit_size;
2417 unsigned write_size = 0;
2418
2419 auto commit_one = [&](bool header=false) {
2420 ObjectOperation op;
2421
39ae355f
TL
2422 /*
2423 * Shouldn't submit empty op to Rados, which could cause
2424 * the cephfs to become readonly.
2425 */
2426 ceph_assert(header || !_set.empty() || !_rm.empty());
2427
2428
f67539c2
TL
2429 // don't create new dirfrag blindly
2430 if (!_new)
2431 op.stat(nullptr, nullptr, nullptr);
2432
2433 /*
2434 * save the header at the last moment.. If we were to send it off before
2435 * other updates, but die before sending them all, we'd think that the
2436 * on-disk state was fully committed even though it wasn't! However, since
2437 * the messages are strictly ordered between the MDS and the OSD, and
2438 * since messages to a given PG are strictly ordered, if we simply send
2439 * the message containing the header off last, we cannot get our header
2440 * into an incorrect state.
2441 */
2442 if (header) {
2443 bufferlist header;
2444 encode(*fnode, header);
2445 op.omap_set_header(header);
2446 }
2447
2448 op.priority = op_prio;
2449 if (!_set.empty())
2450 op.omap_set(_set);
2451 if (!_rm.empty())
2452 op.omap_rm_keys(_rm);
2453 mdcache->mds->objecter->mutate(oid, oloc, op, snapc,
2454 ceph::real_clock::now(),
2455 0, gather.new_sub());
2456 write_size = 0;
2457 _set.clear();
2458 _rm.clear();
2459 };
2460
33c7a0ef 2461 int count = 0;
f67539c2
TL
2462 for (auto &key : stales) {
2463 unsigned size = key.length() + sizeof(__u32);
39ae355f 2464 if (write_size > 0 && write_size + size > max_write_size)
f67539c2
TL
2465 commit_one();
2466
2467 write_size += size;
2468 _rm.emplace(key);
33c7a0ef
TL
2469
2470 if (!(++count % mdcache->mds->heartbeat_reset_grace(2)))
2471 mdcache->mds->heartbeat_reset();
f67539c2
TL
2472 }
2473
2474 for (auto &key : to_remove) {
2475 unsigned size = key.length() + sizeof(__u32);
39ae355f 2476 if (write_size > 0 && write_size + size > max_write_size)
f67539c2
TL
2477 commit_one();
2478
2479 write_size += size;
2480 _rm.emplace(std::move(key));
33c7a0ef
TL
2481
2482 if (!(++count % mdcache->mds->heartbeat_reset_grace(2)))
2483 mdcache->mds->heartbeat_reset();
f67539c2
TL
2484 }
2485
f67539c2
TL
2486 bufferlist bl;
2487 using ceph::encode;
2488 for (auto &item : to_set) {
2489 encode(item.first, bl);
2490 if (item.is_remote) {
2491 // remote link
2492 CDentry::encode_remote(item.ino, item.d_type, item.alternate_name, bl);
2493 } else {
2494 // marker, name, inode, [symlink string]
2495 bl.append('i'); // inode
2496
2497 ENCODE_START(2, 1, bl);
2498 encode(item.alternate_name, bl);
2499 _encode_primary_inode_base(item, dfts, bl);
2500 ENCODE_FINISH(bl);
2501 }
f67539c2
TL
2502
2503 unsigned size = item.key.length() + bl.length() + 2 * sizeof(__u32);
39ae355f 2504 if (write_size > 0 && write_size + size > max_write_size)
f67539c2
TL
2505 commit_one();
2506
2507 write_size += size;
2508 _set[std::move(item.key)].swap(bl);
33c7a0ef
TL
2509
2510 if (!(++count % mdcache->mds->heartbeat_reset_grace()))
2511 mdcache->mds->heartbeat_reset();
f67539c2
TL
2512 }
2513
2514 commit_one(true);
2515 gather.activate();
2516}
2517
7c673cae
FG
2518/**
2519 * Flush out the modified dentries in this dir. Keep the bufferlist
2520 * below max_write_size;
2521 */
2522void CDir::_omap_commit(int op_prio)
2523{
11fdf7f2 2524 dout(10) << __func__ << dendl;
7c673cae 2525
7c673cae
FG
2526 if (op_prio < 0)
2527 op_prio = CEPH_MSG_PRIO_DEFAULT;
2528
2529 // snap purge?
2530 const set<snapid_t> *snaps = NULL;
2531 SnapRealm *realm = inode->find_snaprealm();
f67539c2 2532 if (fnode->snap_purged_thru < realm->get_last_destroyed()) {
7c673cae 2533 snaps = &realm->get_snaps();
f67539c2 2534 dout(10) << " snap_purged_thru " << fnode->snap_purged_thru
7c673cae
FG
2535 << " < " << realm->get_last_destroyed()
2536 << ", snap purge based on " << *snaps << dendl;
2537 // fnode.snap_purged_thru = realm->get_last_destroyed();
2538 }
2539
33c7a0ef 2540 size_t items_count = 0;
f67539c2 2541 if (state_test(CDir::STATE_FRAGMENTING) && is_new()) {
33c7a0ef 2542 items_count = get_num_head_items() + get_num_snap_items();
f67539c2
TL
2543 } else {
2544 for (elist<CDentry*>::iterator it = dirty_dentries.begin(); !it.end(); ++it)
33c7a0ef 2545 ++items_count;
f67539c2 2546 }
7c673cae 2547
f67539c2
TL
2548 vector<string> to_remove;
2549 // reverve enough memories, which maybe larger than the actually needed
33c7a0ef 2550 to_remove.reserve(items_count);
7c673cae 2551
f67539c2
TL
2552 vector<dentry_commit_item> to_set;
2553 // reverve enough memories, which maybe larger than the actually needed
33c7a0ef 2554 to_set.reserve(items_count);
7c673cae 2555
f67539c2
TL
2556 // for dir fragtrees
2557 bufferlist dfts(CEPH_PAGE_SIZE);
7c673cae 2558
b32b8144 2559 auto write_one = [&](CDentry *dn) {
7c673cae
FG
2560 string key;
2561 dn->key().encode(key);
2562
1e59de90
TL
2563 if (!dn->corrupt_first_loaded) {
2564 dn->check_corruption(false);
2565 }
2566
2567 if (snaps && try_trim_snap_dentry(dn, *snaps)) {
7c673cae 2568 dout(10) << " rm " << key << dendl;
f67539c2 2569 to_remove.emplace_back(std::move(key));
b32b8144 2570 return;
7c673cae
FG
2571 }
2572
7c673cae 2573 if (dn->get_linkage()->is_null()) {
94b18763 2574 dout(10) << " rm " << dn->get_name() << " " << *dn << dendl;
f67539c2 2575 to_remove.emplace_back(std::move(key));
7c673cae 2576 } else {
94b18763 2577 dout(10) << " set " << dn->get_name() << " " << *dn << dendl;
7c673cae 2578
f67539c2
TL
2579 uint64_t off = dfts.length();
2580 // try to reserve new size if there has less
2581 // than 1/8 page space
2582 uint64_t left = CEPH_PAGE_SIZE - off % CEPH_PAGE_SIZE;
2583 if (left < CEPH_PAGE_SIZE / 8)
2584 dfts.reserve(left + CEPH_PAGE_SIZE);
7c673cae 2585
f67539c2
TL
2586 auto& item = to_set.emplace_back();
2587 item.key = std::move(key);
2588 _parse_dentry(dn, item, snaps, dfts);
2589 item.dft_len = dfts.length() - off;
7c673cae 2590 }
b32b8144
FG
2591 };
2592
33c7a0ef 2593 int count = 0;
f91f0fd5 2594 if (state_test(CDir::STATE_FRAGMENTING) && is_new()) {
20effc67 2595 ceph_assert(committed_version == 0);
b32b8144
FG
2596 for (auto p = items.begin(); p != items.end(); ) {
2597 CDentry *dn = p->second;
2598 ++p;
f91f0fd5 2599 if (dn->get_linkage()->is_null())
b32b8144
FG
2600 continue;
2601 write_one(dn);
33c7a0ef
TL
2602
2603 if (!(++count % mdcache->mds->heartbeat_reset_grace()))
2604 mdcache->mds->heartbeat_reset();
b32b8144
FG
2605 }
2606 } else {
2607 for (auto p = dirty_dentries.begin(); !p.end(); ) {
2608 CDentry *dn = *p;
2609 ++p;
2610 write_one(dn);
33c7a0ef
TL
2611
2612 if (!(++count % mdcache->mds->heartbeat_reset_grace()))
2613 mdcache->mds->heartbeat_reset();
b32b8144 2614 }
7c673cae
FG
2615 }
2616
f67539c2
TL
2617 auto c = new C_IO_Dir_Commit_Ops(this, op_prio, std::move(to_set), std::move(dfts),
2618 std::move(to_remove), std::move(stale_items));
2619 stale_items.clear();
2620 mdcache->mds->finisher->queue(c);
7c673cae
FG
2621}
2622
f67539c2
TL
2623void CDir::_parse_dentry(CDentry *dn, dentry_commit_item &item,
2624 const set<snapid_t> *snaps, bufferlist &bl)
7c673cae
FG
2625{
2626 // clear dentry NEW flag, if any. we can no longer silently drop it.
2627 dn->clear_new();
2628
f67539c2 2629 item.first = dn->first;
7c673cae
FG
2630
2631 // primary or remote?
f67539c2
TL
2632 auto& linkage = dn->linkage;
2633 item.alternate_name = dn->get_alternate_name();
2634 if (linkage.is_remote()) {
2635 item.is_remote = true;
2636 item.ino = linkage.get_remote_ino();
2637 item.d_type = linkage.get_remote_d_type();
2638 dout(14) << " dn '" << dn->get_name() << "' remote ino " << item.ino << dendl;
2639 } else if (linkage.is_primary()) {
7c673cae 2640 // primary link
f67539c2 2641 CInode *in = linkage.get_inode();
11fdf7f2 2642 ceph_assert(in);
f67539c2
TL
2643
2644 dout(14) << " dn '" << dn->get_name() << "' inode " << *in << dendl;
7c673cae
FG
2645
2646 if (in->is_multiversion()) {
2647 if (!in->snaprealm) {
2648 if (snaps)
2649 in->purge_stale_snap_data(*snaps);
f67539c2 2650 } else {
7c673cae
FG
2651 in->purge_stale_snap_data(in->snaprealm->get_snaps());
2652 }
2653 }
2654
f67539c2
TL
2655 if (in->snaprealm) {
2656 item.snaprealm = true;
2657 item.srnode = in->snaprealm->srnode;
2658 }
2659 item.features = mdcache->mds->mdsmap->get_up_features();
2660 item.inode = in->inode;
2661 if (in->inode->is_symlink())
2662 item.symlink = in->symlink;
2663 using ceph::encode;
2664 encode(in->dirfragtree, bl);
2665 item.xattrs = in->xattrs;
2666 item.old_inodes = in->old_inodes;
2667 item.oldest_snap = in->oldest_snap;
2668 item.damage_flags = in->damage_flags;
7c673cae 2669 } else {
f67539c2 2670 ceph_assert(!linkage.is_null());
7c673cae
FG
2671 }
2672}
2673
2674void CDir::_commit(version_t want, int op_prio)
2675{
2676 dout(10) << "_commit want " << want << " on " << *this << dendl;
2677
2678 // we can't commit things in the future.
2679 // (even the projected future.)
11fdf7f2 2680 ceph_assert(want <= get_version() || get_version() == 0);
7c673cae
FG
2681
2682 // check pre+postconditions.
11fdf7f2 2683 ceph_assert(is_auth());
7c673cae
FG
2684
2685 // already committed?
2686 if (committed_version >= want) {
2687 dout(10) << "already committed " << committed_version << " >= " << want << dendl;
2688 return;
2689 }
2690 // already committing >= want?
2691 if (committing_version >= want) {
2692 dout(10) << "already committing " << committing_version << " >= " << want << dendl;
11fdf7f2 2693 ceph_assert(state_test(STATE_COMMITTING));
7c673cae
FG
2694 return;
2695 }
2696
2697 // alrady committed an older version?
2698 if (committing_version > committed_version) {
2699 dout(10) << "already committing older " << committing_version << ", waiting for that to finish" << dendl;
2700 return;
2701 }
2702
2703 // commit.
2704 committing_version = get_version();
2705
2706 // mark committing (if not already)
2707 if (!state_test(STATE_COMMITTING)) {
2708 dout(10) << "marking committing" << dendl;
2709 state_set(STATE_COMMITTING);
2710 }
2711
f67539c2 2712 if (mdcache->mds->logger) mdcache->mds->logger->inc(l_mds_dir_commit);
7c673cae 2713
522d829b
TL
2714 mdcache->mds->balancer->hit_dir(this, META_POP_STORE);
2715
7c673cae
FG
2716 _omap_commit(op_prio);
2717}
2718
2719
2720/**
2721 * _committed
2722 *
2723 * @param v version i just committed
2724 */
2725void CDir::_committed(int r, version_t v)
2726{
2727 if (r < 0) {
2728 // the directory could be partly purged during MDS failover
f67539c2 2729 if (r == -CEPHFS_ENOENT && committed_version == 0 &&
31f18b77 2730 !inode->is_base() && get_parent_dir()->inode->is_stray()) {
7c673cae 2731 r = 0;
31f18b77
FG
2732 if (inode->snaprealm)
2733 inode->state_set(CInode::STATE_MISSINGOBJS);
7c673cae
FG
2734 }
2735 if (r < 0) {
2736 dout(1) << "commit error " << r << " v " << v << dendl;
f67539c2 2737 mdcache->mds->clog->error() << "failed to commit dir " << dirfrag() << " object,"
7c673cae 2738 << " errno " << r;
f67539c2 2739 mdcache->mds->handle_write_error(r);
7c673cae
FG
2740 return;
2741 }
2742 }
2743
2744 dout(10) << "_committed v " << v << " on " << *this << dendl;
11fdf7f2 2745 ceph_assert(is_auth());
7c673cae
FG
2746
2747 bool stray = inode->is_stray();
2748
2749 // take note.
11fdf7f2
TL
2750 ceph_assert(v > committed_version);
2751 ceph_assert(v <= committing_version);
7c673cae
FG
2752 committed_version = v;
2753
2754 // _all_ commits done?
2755 if (committing_version == committed_version)
2756 state_clear(CDir::STATE_COMMITTING);
2757
2758 // _any_ commit, even if we've been redirtied, means we're no longer new.
2759 item_new.remove_myself();
2760
2761 // dir clean?
2762 if (committed_version == get_version())
2763 mark_clean();
2764
33c7a0ef
TL
2765 int count = 0;
2766
7c673cae 2767 // dentries clean?
b32b8144
FG
2768 for (auto p = dirty_dentries.begin(); !p.end(); ) {
2769 CDentry *dn = *p;
2770 ++p;
7c673cae
FG
2771
2772 // inode?
2773 if (dn->linkage.is_primary()) {
2774 CInode *in = dn->linkage.get_inode();
11fdf7f2
TL
2775 ceph_assert(in);
2776 ceph_assert(in->is_auth());
7c673cae
FG
2777
2778 if (committed_version >= in->get_version()) {
2779 if (in->is_dirty()) {
2780 dout(15) << " dir " << committed_version << " >= inode " << in->get_version() << " now clean " << *in << dendl;
2781 in->mark_clean();
2782 }
2783 } else {
2784 dout(15) << " dir " << committed_version << " < inode " << in->get_version() << " still dirty " << *in << dendl;
11fdf7f2 2785 ceph_assert(in->is_dirty() || in->last < CEPH_NOSNAP); // special case for cow snap items (not predirtied)
7c673cae
FG
2786 }
2787 }
2788
2789 // dentry
2790 if (committed_version >= dn->get_version()) {
b32b8144
FG
2791 dout(15) << " dir " << committed_version << " >= dn " << dn->get_version() << " now clean " << *dn << dendl;
2792 dn->mark_clean();
7c673cae 2793
b32b8144
FG
2794 // drop clean null stray dentries immediately
2795 if (stray &&
2796 dn->get_num_ref() == 0 &&
2797 !dn->is_projected() &&
2798 dn->get_linkage()->is_null())
2799 remove_dentry(dn);
7c673cae
FG
2800 } else {
2801 dout(15) << " dir " << committed_version << " < dn " << dn->get_version() << " still dirty " << *dn << dendl;
11fdf7f2 2802 ceph_assert(dn->is_dirty());
7c673cae 2803 }
33c7a0ef
TL
2804
2805 if (!(++count % mdcache->mds->heartbeat_reset_grace()))
2806 mdcache->mds->heartbeat_reset();
7c673cae
FG
2807 }
2808
2809 // finishers?
2810 bool were_waiters = !waiting_for_commit.empty();
33c7a0ef 2811
94b18763
FG
2812 auto it = waiting_for_commit.begin();
2813 while (it != waiting_for_commit.end()) {
2814 auto _it = it;
2815 ++_it;
2816 if (it->first > committed_version) {
2817 dout(10) << " there are waiters for " << it->first << ", committing again" << dendl;
2818 _commit(it->first, -1);
7c673cae
FG
2819 break;
2820 }
11fdf7f2 2821 MDSContext::vec t;
94b18763
FG
2822 for (const auto &waiter : it->second)
2823 t.push_back(waiter);
f67539c2 2824 mdcache->mds->queue_waiters(t);
94b18763
FG
2825 waiting_for_commit.erase(it);
2826 it = _it;
33c7a0ef
TL
2827
2828 if (!(++count % mdcache->mds->heartbeat_reset_grace()))
2829 mdcache->mds->heartbeat_reset();
2830 }
7c673cae
FG
2831
2832 // try drop dentries in this dirfrag if it's about to be purged
31f18b77
FG
2833 if (!inode->is_base() && get_parent_dir()->inode->is_stray() &&
2834 inode->snaprealm)
f67539c2 2835 mdcache->maybe_eval_stray(inode, true);
7c673cae
FG
2836
2837 // unpin if we kicked the last waiter.
2838 if (were_waiters &&
2839 waiting_for_commit.empty())
2840 auth_unpin(this);
2841}
2842
2843
2844
2845
2846// IMPORT/EXPORT
2847
f67539c2
TL
2848mds_rank_t CDir::get_export_pin(bool inherit) const
2849{
2850 mds_rank_t export_pin = inode->get_export_pin(inherit);
2851 if (export_pin == MDS_RANK_EPHEMERAL_DIST)
2852 export_pin = mdcache->hash_into_rank_bucket(ino(), get_frag());
2853 else if (export_pin == MDS_RANK_EPHEMERAL_RAND)
2854 export_pin = mdcache->hash_into_rank_bucket(ino());
2855 return export_pin;
2856}
2857
2858bool CDir::is_exportable(mds_rank_t dest) const
2859{
2860 mds_rank_t export_pin = get_export_pin();
2861 if (export_pin == dest)
2862 return true;
2863 if (export_pin >= 0)
2864 return false;
2865 return true;
2866}
2867
7c673cae
FG
2868void CDir::encode_export(bufferlist& bl)
2869{
9f95a23c 2870 ENCODE_START(1, 1, bl);
11fdf7f2
TL
2871 ceph_assert(!is_projected());
2872 encode(first, bl);
f67539c2 2873 encode(*fnode, bl);
11fdf7f2
TL
2874 encode(dirty_old_rstat, bl);
2875 encode(committed_version, bl);
7c673cae 2876
11fdf7f2
TL
2877 encode(state, bl);
2878 encode(dir_rep, bl);
7c673cae 2879
11fdf7f2
TL
2880 encode(pop_me, bl);
2881 encode(pop_auth_subtree, bl);
7c673cae 2882
11fdf7f2
TL
2883 encode(dir_rep_by, bl);
2884 encode(get_replicas(), bl);
7c673cae
FG
2885
2886 get(PIN_TEMPEXPORTING);
9f95a23c 2887 ENCODE_FINISH(bl);
7c673cae
FG
2888}
2889
11fdf7f2 2890void CDir::finish_export()
7c673cae
FG
2891{
2892 state &= MASK_STATE_EXPORT_KEPT;
11fdf7f2
TL
2893 pop_nested.sub(pop_auth_subtree);
2894 pop_auth_subtree_nested.sub(pop_auth_subtree);
2895 pop_me.zero();
2896 pop_auth_subtree.zero();
7c673cae
FG
2897 put(PIN_TEMPEXPORTING);
2898 dirty_old_rstat.clear();
2899}
2900
11fdf7f2 2901void CDir::decode_import(bufferlist::const_iterator& blp, LogSegment *ls)
7c673cae 2902{
9f95a23c 2903 DECODE_START(1, blp);
11fdf7f2 2904 decode(first, blp);
f67539c2
TL
2905 {
2906 auto _fnode = allocate_fnode();
2907 decode(*_fnode, blp);
2908 reset_fnode(std::move(_fnode));
2909 }
2910 update_projected_version();
2911
11fdf7f2 2912 decode(dirty_old_rstat, blp);
11fdf7f2 2913 decode(committed_version, blp);
7c673cae
FG
2914 committing_version = committed_version;
2915
2916 unsigned s;
11fdf7f2 2917 decode(s, blp);
7c673cae
FG
2918 state &= MASK_STATE_IMPORT_KEPT;
2919 state_set(STATE_AUTH | (s & MASK_STATE_EXPORTED));
2920
2921 if (is_dirty()) {
2922 get(PIN_DIRTY);
2923 _mark_dirty(ls);
2924 }
2925
11fdf7f2 2926 decode(dir_rep, blp);
7c673cae 2927
11fdf7f2
TL
2928 decode(pop_me, blp);
2929 decode(pop_auth_subtree, blp);
2930 pop_nested.add(pop_auth_subtree);
2931 pop_auth_subtree_nested.add(pop_auth_subtree);
7c673cae 2932
11fdf7f2
TL
2933 decode(dir_rep_by, blp);
2934 decode(get_replicas(), blp);
181888fb 2935 if (is_replicated()) get(PIN_REPLICATED);
7c673cae
FG
2936
2937 replica_nonce = 0; // no longer defined
2938
2939 // did we import some dirty scatterlock data?
2940 if (dirty_old_rstat.size() ||
f67539c2
TL
2941 !(fnode->rstat == fnode->accounted_rstat)) {
2942 mdcache->mds->locker->mark_updated_scatterlock(&inode->nestlock);
7c673cae
FG
2943 ls->dirty_dirfrag_nest.push_back(&inode->item_dirty_dirfrag_nest);
2944 }
f67539c2
TL
2945 if (!(fnode->fragstat == fnode->accounted_fragstat)) {
2946 mdcache->mds->locker->mark_updated_scatterlock(&inode->filelock);
7c673cae
FG
2947 ls->dirty_dirfrag_dir.push_back(&inode->item_dirty_dirfrag_dir);
2948 }
2949 if (is_dirty_dft()) {
2950 if (inode->dirfragtreelock.get_state() != LOCK_MIX &&
2951 inode->dirfragtreelock.is_stable()) {
2952 // clear stale dirtydft
2953 state_clear(STATE_DIRTYDFT);
2954 } else {
f67539c2 2955 mdcache->mds->locker->mark_updated_scatterlock(&inode->dirfragtreelock);
7c673cae
FG
2956 ls->dirty_dirfrag_dirfragtree.push_back(&inode->item_dirty_dirfrag_dirfragtree);
2957 }
2958 }
9f95a23c 2959 DECODE_FINISH(blp);
7c673cae
FG
2960}
2961
11fdf7f2
TL
2962void CDir::abort_import()
2963{
2964 ceph_assert(is_auth());
2965 state_clear(CDir::STATE_AUTH);
2966 remove_bloom();
2967 clear_replica_map();
2968 set_replica_nonce(CDir::EXPORT_NONCE);
2969 if (is_dirty())
2970 mark_clean();
7c673cae 2971
11fdf7f2
TL
2972 pop_nested.sub(pop_auth_subtree);
2973 pop_auth_subtree_nested.sub(pop_auth_subtree);
2974 pop_me.zero();
2975 pop_auth_subtree.zero();
2976}
7c673cae 2977
11fdf7f2
TL
2978void CDir::encode_dirstat(bufferlist& bl, const session_info_t& info, const DirStat& ds) {
2979 if (info.has_feature(CEPHFS_FEATURE_REPLY_ENCODING)) {
2980 ENCODE_START(1, 1, bl);
2981 encode(ds.frag, bl);
2982 encode(ds.auth, bl);
2983 encode(ds.dist, bl);
2984 ENCODE_FINISH(bl);
2985 }
2986 else {
2987 encode(ds.frag, bl);
2988 encode(ds.auth, bl);
2989 encode(ds.dist, bl);
2990 }
2991}
7c673cae
FG
2992
2993/********************************
2994 * AUTHORITY
2995 */
2996
2997/*
2998 * if dir_auth.first == parent, auth is same as inode.
2999 * unless .second != unknown, in which case that sticks.
3000 */
3001mds_authority_t CDir::authority() const
3002{
3003 if (is_subtree_root())
3004 return dir_auth;
3005 else
3006 return inode->authority();
3007}
3008
3009/** is_subtree_root()
3010 * true if this is an auth delegation point.
3011 * that is, dir_auth != default (parent,unknown)
3012 *
3013 * some key observations:
3014 * if i am auth:
3015 * - any region bound will be an export, or frozen.
3016 *
3017 * note that this DOES heed dir_auth.pending
3018 */
3019/*
3020bool CDir::is_subtree_root()
3021{
3022 if (dir_auth == CDIR_AUTH_DEFAULT) {
3023 //dout(10) << "is_subtree_root false " << dir_auth << " != " << CDIR_AUTH_DEFAULT
3024 //<< " on " << ino() << dendl;
3025 return false;
3026 } else {
3027 //dout(10) << "is_subtree_root true " << dir_auth << " != " << CDIR_AUTH_DEFAULT
3028 //<< " on " << ino() << dendl;
3029 return true;
3030 }
3031}
3032*/
3033
3034/** contains(x)
3035 * true if we are x, or an ancestor of x
3036 */
3037bool CDir::contains(CDir *x)
3038{
3039 while (1) {
3040 if (x == this)
3041 return true;
3042 x = x->get_inode()->get_projected_parent_dir();
3043 if (x == 0)
3044 return false;
3045 }
3046}
3047
f67539c2
TL
3048bool CDir::can_rep() const
3049{
3050 if (!is_rep())
3051 return true;
3052
3053 unsigned mds_num = mdcache->mds->get_mds_map()->get_num_mds(MDSMap::STATE_ACTIVE);
3054 if ((mds_num - 1) > get_replicas().size())
3055 return true;
3056
3057 return false;
3058}
7c673cae
FG
3059
3060
3061/** set_dir_auth
3062 */
11fdf7f2 3063void CDir::set_dir_auth(const mds_authority_t &a)
7c673cae
FG
3064{
3065 dout(10) << "setting dir_auth=" << a
3066 << " from " << dir_auth
3067 << " on " << *this << dendl;
3068
3069 bool was_subtree = is_subtree_root();
3070 bool was_ambiguous = dir_auth.second >= 0;
3071
3072 // set it.
3073 dir_auth = a;
3074
3075 // new subtree root?
3076 if (!was_subtree && is_subtree_root()) {
3077 dout(10) << " new subtree root, adjusting auth_pins" << dendl;
1adf2230 3078
11fdf7f2
TL
3079 if (freeze_tree_state) {
3080 // only by CDir::_freeze_tree()
3081 ceph_assert(is_freezing_tree_root());
3082 }
1adf2230 3083
11fdf7f2 3084 inode->num_subtree_roots++;
7c673cae
FG
3085
3086 // unpin parent of frozen dir/tree?
224ce89b 3087 if (inode->is_auth()) {
11fdf7f2 3088 ceph_assert(!is_frozen_tree_root());
224ce89b
WB
3089 if (is_frozen_dir())
3090 inode->auth_unpin(this);
3091 }
7c673cae
FG
3092 }
3093 if (was_subtree && !is_subtree_root()) {
3094 dout(10) << " old subtree root, adjusting auth_pins" << dendl;
1adf2230
AA
3095
3096 inode->num_subtree_roots--;
7c673cae
FG
3097
3098 // pin parent of frozen dir/tree?
224ce89b 3099 if (inode->is_auth()) {
11fdf7f2 3100 ceph_assert(!is_frozen_tree_root());
224ce89b
WB
3101 if (is_frozen_dir())
3102 inode->auth_pin(this);
3103 }
7c673cae
FG
3104 }
3105
3106 // newly single auth?
3107 if (was_ambiguous && dir_auth.second == CDIR_AUTH_UNKNOWN) {
11fdf7f2 3108 MDSContext::vec ls;
7c673cae 3109 take_waiting(WAIT_SINGLEAUTH, ls);
f67539c2 3110 mdcache->mds->queue_waiters(ls);
7c673cae
FG
3111 }
3112}
3113
7c673cae
FG
3114/*****************************************
3115 * AUTH PINS and FREEZING
3116 *
3117 * the basic plan is that auth_pins only exist in auth regions, and they
3118 * prevent a freeze (and subsequent auth change).
3119 *
3120 * however, we also need to prevent a parent from freezing if a child is frozen.
3121 * for that reason, the parent inode of a frozen directory is auth_pinned.
3122 *
3123 * the oddity is when the frozen directory is a subtree root. if that's the case,
3124 * the parent inode isn't frozen. which means that when subtree authority is adjusted
3125 * at the bounds, inodes for any frozen bound directories need to get auth_pins at that
3126 * time.
3127 *
3128 */
3129
3130void CDir::auth_pin(void *by)
3131{
3132 if (auth_pins == 0)
3133 get(PIN_AUTHPIN);
3134 auth_pins++;
3135
3136#ifdef MDS_AUTHPIN_SET
3137 auth_pin_set.insert(by);
3138#endif
3139
11fdf7f2 3140 dout(10) << "auth_pin by " << by << " on " << *this << " count now " << auth_pins << dendl;
7c673cae 3141
11fdf7f2
TL
3142 if (freeze_tree_state)
3143 freeze_tree_state->auth_pins += 1;
7c673cae
FG
3144}
3145
3146void CDir::auth_unpin(void *by)
3147{
3148 auth_pins--;
3149
3150#ifdef MDS_AUTHPIN_SET
11fdf7f2
TL
3151 {
3152 auto it = auth_pin_set.find(by);
3153 ceph_assert(it != auth_pin_set.end());
3154 auth_pin_set.erase(it);
3155 }
7c673cae
FG
3156#endif
3157 if (auth_pins == 0)
3158 put(PIN_AUTHPIN);
3159
11fdf7f2
TL
3160 dout(10) << "auth_unpin by " << by << " on " << *this << " count now " << auth_pins << dendl;
3161 ceph_assert(auth_pins >= 0);
3162
3163 if (freeze_tree_state)
3164 freeze_tree_state->auth_pins -= 1;
7c673cae
FG
3165
3166 maybe_finish_freeze(); // pending freeze?
7c673cae
FG
3167}
3168
11fdf7f2 3169void CDir::adjust_nested_auth_pins(int dirinc, void *by)
7c673cae 3170{
11fdf7f2 3171 ceph_assert(dirinc);
7c673cae
FG
3172 dir_auth_pins += dirinc;
3173
11fdf7f2 3174 dout(15) << __func__ << " " << dirinc << " on " << *this
7c673cae 3175 << " by " << by << " count now "
11fdf7f2
TL
3176 << auth_pins << "/" << dir_auth_pins << dendl;
3177 ceph_assert(dir_auth_pins >= 0);
7c673cae 3178
11fdf7f2
TL
3179 if (freeze_tree_state)
3180 freeze_tree_state->auth_pins += dirinc;
7c673cae 3181
11fdf7f2
TL
3182 if (dirinc < 0)
3183 maybe_finish_freeze(); // pending freeze?
7c673cae
FG
3184}
3185
3186#ifdef MDS_VERIFY_FRAGSTAT
3187void CDir::verify_fragstat()
3188{
11fdf7f2 3189 ceph_assert(is_complete());
7c673cae
FG
3190 if (inode->is_stray())
3191 return;
3192
3193 frag_info_t c;
3194 memset(&c, 0, sizeof(c));
3195
94b18763 3196 for (auto it = items.begin();
7c673cae
FG
3197 it != items.end();
3198 ++it) {
3199 CDentry *dn = it->second;
3200 if (dn->is_null())
3201 continue;
3202
3203 dout(10) << " " << *dn << dendl;
3204 if (dn->is_primary())
3205 dout(10) << " " << *dn->inode << dendl;
3206
3207 if (dn->is_primary()) {
3208 if (dn->inode->is_dir())
3209 c.nsubdirs++;
3210 else
3211 c.nfiles++;
3212 }
3213 if (dn->is_remote()) {
3214 if (dn->get_remote_d_type() == DT_DIR)
3215 c.nsubdirs++;
3216 else
3217 c.nfiles++;
3218 }
3219 }
3220
f67539c2
TL
3221 if (c.nsubdirs != fnode->fragstat.nsubdirs ||
3222 c.nfiles != fnode->fragstat.nfiles) {
3223 dout(0) << "verify_fragstat failed " << fnode->fragstat << " on " << *this << dendl;
7c673cae
FG
3224 dout(0) << " i count " << c << dendl;
3225 ceph_abort();
3226 } else {
f67539c2 3227 dout(0) << "verify_fragstat ok " << fnode->fragstat << " on " << *this << dendl;
7c673cae
FG
3228 }
3229}
3230#endif
3231
3232/*****************************************************************************
3233 * FREEZING
3234 */
3235
3236// FREEZE TREE
3237
11fdf7f2
TL
3238void CDir::_walk_tree(std::function<bool(CDir*)> callback)
3239{
11fdf7f2
TL
3240 deque<CDir*> dfq;
3241 dfq.push_back(this);
3242
11fdf7f2
TL
3243 while (!dfq.empty()) {
3244 CDir *dir = dfq.front();
3245 dfq.pop_front();
3246
3247 for (auto& p : *dir) {
3248 CDentry *dn = p.second;
3249 if (!dn->get_linkage()->is_primary())
3250 continue;
3251 CInode *in = dn->get_linkage()->get_inode();
3252 if (!in->is_dir())
3253 continue;
3254
9f95a23c 3255 auto&& dfv = in->get_nested_dirfrags();
11fdf7f2
TL
3256 for (auto& dir : dfv) {
3257 auto ret = callback(dir);
3258 if (ret)
3259 dfq.push_back(dir);
3260 }
11fdf7f2
TL
3261 }
3262 }
3263}
3264
7c673cae
FG
3265bool CDir::freeze_tree()
3266{
11fdf7f2
TL
3267 ceph_assert(!is_frozen());
3268 ceph_assert(!is_freezing());
3269 ceph_assert(!freeze_tree_state);
7c673cae
FG
3270
3271 auth_pin(this);
11fdf7f2
TL
3272
3273 // Travese the subtree to mark dirfrags as 'freezing' (set freeze_tree_state)
3274 // and to accumulate auth pins and record total count in freeze_tree_state.
3275 // when auth unpin an 'freezing' object, the counter in freeze_tree_state also
3276 // gets decreased. Subtree become 'frozen' when the counter reaches zero.
3277 freeze_tree_state = std::make_shared<freeze_tree_state_t>(this);
3278 freeze_tree_state->auth_pins += get_auth_pins() + get_dir_auth_pins();
9f95a23c 3279 if (!lock_caches_with_auth_pins.empty())
f67539c2 3280 mdcache->mds->locker->invalidate_lock_caches(this);
11fdf7f2
TL
3281
3282 _walk_tree([this](CDir *dir) {
3283 if (dir->freeze_tree_state)
3284 return false;
3285 dir->freeze_tree_state = freeze_tree_state;
3286 freeze_tree_state->auth_pins += dir->get_auth_pins() + dir->get_dir_auth_pins();
9f95a23c 3287 if (!dir->lock_caches_with_auth_pins.empty())
f67539c2 3288 mdcache->mds->locker->invalidate_lock_caches(dir);
11fdf7f2 3289 return true;
9f95a23c 3290 }
11fdf7f2
TL
3291 );
3292
7c673cae
FG
3293 if (is_freezeable(true)) {
3294 _freeze_tree();
3295 auth_unpin(this);
3296 return true;
3297 } else {
3298 state_set(STATE_FREEZINGTREE);
3299 ++num_freezing_trees;
3300 dout(10) << "freeze_tree waiting " << *this << dendl;
3301 return false;
3302 }
3303}
3304
3305void CDir::_freeze_tree()
3306{
11fdf7f2
TL
3307 dout(10) << __func__ << " " << *this << dendl;
3308 ceph_assert(is_freezeable(true));
7c673cae 3309
11fdf7f2
TL
3310 if (freeze_tree_state) {
3311 ceph_assert(is_auth());
3312 } else {
3313 ceph_assert(!is_auth());
3314 freeze_tree_state = std::make_shared<freeze_tree_state_t>(this);
7c673cae 3315 }
11fdf7f2 3316 freeze_tree_state->frozen = true;
224ce89b
WB
3317
3318 if (is_auth()) {
3319 mds_authority_t auth;
3320 bool was_subtree = is_subtree_root();
3321 if (was_subtree) {
3322 auth = get_dir_auth();
3323 } else {
3324 // temporarily prevent parent subtree from becoming frozen.
3325 inode->auth_pin(this);
3326 // create new subtree
3327 auth = authority();
3328 }
3329
11fdf7f2
TL
3330 _walk_tree([this, &auth] (CDir *dir) {
3331 if (dir->freeze_tree_state != freeze_tree_state) {
f67539c2 3332 mdcache->adjust_subtree_auth(dir, auth);
11fdf7f2
TL
3333 return false;
3334 }
3335 return true;
3336 }
3337 );
3338
3339 ceph_assert(auth.first >= 0);
3340 ceph_assert(auth.second == CDIR_AUTH_UNKNOWN);
224ce89b 3341 auth.second = auth.first;
f67539c2 3342 mdcache->adjust_subtree_auth(this, auth);
224ce89b
WB
3343 if (!was_subtree)
3344 inode->auth_unpin(this);
11fdf7f2
TL
3345 } else {
3346 // importing subtree ?
3347 _walk_tree([this] (CDir *dir) {
3348 ceph_assert(!dir->freeze_tree_state);
3349 dir->freeze_tree_state = freeze_tree_state;
3350 return true;
3351 }
3352 );
3353 }
3354
3355 // twiddle state
3356 if (state_test(STATE_FREEZINGTREE)) {
3357 state_clear(STATE_FREEZINGTREE);
3358 --num_freezing_trees;
224ce89b
WB
3359 }
3360
7c673cae
FG
3361 state_set(STATE_FROZENTREE);
3362 ++num_frozen_trees;
3363 get(PIN_FROZEN);
7c673cae
FG
3364}
3365
3366void CDir::unfreeze_tree()
3367{
11fdf7f2
TL
3368 dout(10) << __func__ << " " << *this << dendl;
3369
3370 MDSContext::vec unfreeze_waiters;
3371 take_waiting(WAIT_UNFREEZE, unfreeze_waiters);
3372
3373 if (freeze_tree_state) {
3374 _walk_tree([this, &unfreeze_waiters](CDir *dir) {
3375 if (dir->freeze_tree_state != freeze_tree_state)
3376 return false;
3377 dir->freeze_tree_state.reset();
3378 dir->take_waiting(WAIT_UNFREEZE, unfreeze_waiters);
3379 return true;
3380 }
3381 );
3382 }
7c673cae
FG
3383
3384 if (state_test(STATE_FROZENTREE)) {
3385 // frozen. unfreeze.
3386 state_clear(STATE_FROZENTREE);
3387 --num_frozen_trees;
3388
3389 put(PIN_FROZEN);
3390
224ce89b
WB
3391 if (is_auth()) {
3392 // must be subtree
11fdf7f2 3393 ceph_assert(is_subtree_root());
224ce89b
WB
3394 // for debug purpose, caller should ensure 'dir_auth.second == dir_auth.first'
3395 mds_authority_t auth = get_dir_auth();
11fdf7f2
TL
3396 ceph_assert(auth.first >= 0);
3397 ceph_assert(auth.second == auth.first);
224ce89b 3398 auth.second = CDIR_AUTH_UNKNOWN;
f67539c2 3399 mdcache->adjust_subtree_auth(this, auth);
224ce89b 3400 }
11fdf7f2 3401 freeze_tree_state.reset();
7c673cae 3402 } else {
11fdf7f2 3403 ceph_assert(state_test(STATE_FREEZINGTREE));
7c673cae
FG
3404
3405 // freezing. stop it.
7c673cae
FG
3406 state_clear(STATE_FREEZINGTREE);
3407 --num_freezing_trees;
11fdf7f2
TL
3408 freeze_tree_state.reset();
3409
3410 finish_waiting(WAIT_FROZEN, -1);
7c673cae 3411 auth_unpin(this);
7c673cae 3412 }
11fdf7f2 3413
f67539c2 3414 mdcache->mds->queue_waiters(unfreeze_waiters);
11fdf7f2
TL
3415}
3416
3417void CDir::adjust_freeze_after_rename(CDir *dir)
3418{
3419 if (!freeze_tree_state || dir->freeze_tree_state != freeze_tree_state)
3420 return;
3421 CDir *newdir = dir->get_inode()->get_parent_dir();
3422 if (newdir == this || newdir->freeze_tree_state == freeze_tree_state)
3423 return;
3424
3425 ceph_assert(!freeze_tree_state->frozen);
3426 ceph_assert(get_dir_auth_pins() > 0);
3427
3428 MDSContext::vec unfreeze_waiters;
3429
3430 auto unfreeze = [this, &unfreeze_waiters](CDir *dir) {
3431 if (dir->freeze_tree_state != freeze_tree_state)
3432 return false;
3433 int dec = dir->get_auth_pins() + dir->get_dir_auth_pins();
3434 // shouldn't become zero because srcdn of rename was auth pinned
3435 ceph_assert(freeze_tree_state->auth_pins > dec);
3436 freeze_tree_state->auth_pins -= dec;
3437 dir->freeze_tree_state.reset();
3438 dir->take_waiting(WAIT_UNFREEZE, unfreeze_waiters);
3439 return true;
3440 };
3441
3442 unfreeze(dir);
3443 dir->_walk_tree(unfreeze);
3444
f67539c2 3445 mdcache->mds->queue_waiters(unfreeze_waiters);
7c673cae
FG
3446}
3447
91327a77 3448bool CDir::can_auth_pin(int *err_ret) const
7c673cae 3449{
91327a77
AA
3450 int err;
3451 if (!is_auth()) {
3452 err = ERR_NOT_AUTH;
3453 } else if (is_freezing_dir() || is_frozen_dir()) {
3454 err = ERR_FRAGMENTING_DIR;
3455 } else {
3456 auto p = is_freezing_or_frozen_tree();
3457 if (p.first || p.second) {
3458 err = ERR_EXPORTING_TREE;
3459 } else {
3460 err = 0;
3461 }
3462 }
3463 if (err && err_ret)
3464 *err_ret = err;
3465 return !err;
3466}
3467
7c673cae
FG
3468class C_Dir_AuthUnpin : public CDirContext {
3469 public:
3470 explicit C_Dir_AuthUnpin(CDir *d) : CDirContext(d) {}
3471 void finish(int r) override {
3472 dir->auth_unpin(dir->get_inode());
3473 }
3474};
3475
3476void CDir::maybe_finish_freeze()
3477{
11fdf7f2 3478 if (dir_auth_pins != 0)
7c673cae
FG
3479 return;
3480
3481 // we can freeze the _dir_ even with nested pins...
3482 if (state_test(STATE_FREEZINGDIR)) {
11fdf7f2
TL
3483 if (auth_pins == 1) {
3484 _freeze_dir();
3485 auth_unpin(this);
3486 finish_waiting(WAIT_FROZEN);
3487 }
7c673cae
FG
3488 }
3489
11fdf7f2
TL
3490 if (freeze_tree_state) {
3491 if (freeze_tree_state->frozen ||
3492 freeze_tree_state->auth_pins != 1)
3493 return;
3494
3495 if (freeze_tree_state->dir != this) {
3496 freeze_tree_state->dir->maybe_finish_freeze();
3497 return;
3498 }
3499
3500 ceph_assert(state_test(STATE_FREEZINGTREE));
7c673cae 3501
7c673cae 3502 if (!is_subtree_root() && inode->is_frozen()) {
11fdf7f2 3503 dout(10) << __func__ << " !subtree root and frozen inode, waiting for unfreeze on " << inode << dendl;
7c673cae
FG
3504 // retake an auth_pin...
3505 auth_pin(inode);
3506 // and release it when the parent inode unfreezes
3507 inode->add_waiter(WAIT_UNFREEZE, new C_Dir_AuthUnpin(this));
3508 return;
3509 }
3510
3511 _freeze_tree();
3512 auth_unpin(this);
3513 finish_waiting(WAIT_FROZEN);
3514 }
3515}
3516
3517
3518
3519// FREEZE DIR
3520
3521bool CDir::freeze_dir()
3522{
11fdf7f2
TL
3523 ceph_assert(!is_frozen());
3524 ceph_assert(!is_freezing());
7c673cae
FG
3525
3526 auth_pin(this);
3527 if (is_freezeable_dir(true)) {
3528 _freeze_dir();
3529 auth_unpin(this);
3530 return true;
3531 } else {
3532 state_set(STATE_FREEZINGDIR);
9f95a23c 3533 if (!lock_caches_with_auth_pins.empty())
f67539c2 3534 mdcache->mds->locker->invalidate_lock_caches(this);
7c673cae
FG
3535 dout(10) << "freeze_dir + wait " << *this << dendl;
3536 return false;
3537 }
3538}
3539
3540void CDir::_freeze_dir()
3541{
11fdf7f2 3542 dout(10) << __func__ << " " << *this << dendl;
7c673cae
FG
3543 //assert(is_freezeable_dir(true));
3544 // not always true during split because the original fragment may have frozen a while
3545 // ago and we're just now getting around to breaking it up.
3546
3547 state_clear(STATE_FREEZINGDIR);
3548 state_set(STATE_FROZENDIR);
3549 get(PIN_FROZEN);
3550
3551 if (is_auth() && !is_subtree_root())
3552 inode->auth_pin(this); // auth_pin for duration of freeze
3553}
3554
3555
3556void CDir::unfreeze_dir()
3557{
11fdf7f2 3558 dout(10) << __func__ << " " << *this << dendl;
7c673cae
FG
3559
3560 if (state_test(STATE_FROZENDIR)) {
3561 state_clear(STATE_FROZENDIR);
3562 put(PIN_FROZEN);
3563
3564 // unpin (may => FREEZEABLE) FIXME: is this order good?
3565 if (is_auth() && !is_subtree_root())
3566 inode->auth_unpin(this);
3567
3568 finish_waiting(WAIT_UNFREEZE);
3569 } else {
3570 finish_waiting(WAIT_FROZEN, -1);
3571
3572 // still freezing. stop.
11fdf7f2 3573 ceph_assert(state_test(STATE_FREEZINGDIR));
7c673cae
FG
3574 state_clear(STATE_FREEZINGDIR);
3575 auth_unpin(this);
3576
3577 finish_waiting(WAIT_UNFREEZE);
3578 }
3579}
3580
9f95a23c
TL
3581void CDir::enable_frozen_inode()
3582{
3583 ceph_assert(frozen_inode_suppressed > 0);
3584 if (--frozen_inode_suppressed == 0) {
3585 for (auto p = freezing_inodes.begin(); !p.end(); ) {
3586 CInode *in = *p;
3587 ++p;
3588 ceph_assert(in->is_freezing_inode());
3589 in->maybe_finish_freeze_inode();
3590 }
3591 }
3592}
3593
7c673cae
FG
3594/**
3595 * Slightly less complete than operator<<, because this is intended
3596 * for identifying a directory and its state rather than for dumping
3597 * debug output.
3598 */
11fdf7f2 3599void CDir::dump(Formatter *f, int flags) const
7c673cae 3600{
11fdf7f2
TL
3601 ceph_assert(f != NULL);
3602 if (flags & DUMP_PATH) {
3603 f->dump_stream("path") << get_path();
3604 }
3605 if (flags & DUMP_DIRFRAG) {
3606 f->dump_stream("dirfrag") << dirfrag();
3607 }
3608 if (flags & DUMP_SNAPID_FIRST) {
3609 f->dump_int("snapid_first", first);
3610 }
3611 if (flags & DUMP_VERSIONS) {
3612 f->dump_stream("projected_version") << get_projected_version();
3613 f->dump_stream("version") << get_version();
3614 f->dump_stream("committing_version") << get_committing_version();
3615 f->dump_stream("committed_version") << get_committed_version();
3616 }
3617 if (flags & DUMP_REP) {
3618 f->dump_bool("is_rep", is_rep());
3619 }
3620 if (flags & DUMP_DIR_AUTH) {
3621 if (get_dir_auth() != CDIR_AUTH_DEFAULT) {
3622 if (get_dir_auth().second == CDIR_AUTH_UNKNOWN) {
3623 f->dump_stream("dir_auth") << get_dir_auth().first;
3624 } else {
3625 f->dump_stream("dir_auth") << get_dir_auth();
3626 }
7c673cae 3627 } else {
11fdf7f2 3628 f->dump_string("dir_auth", "");
7c673cae 3629 }
11fdf7f2
TL
3630 }
3631 if (flags & DUMP_STATES) {
3632 f->open_array_section("states");
3633 MDSCacheObject::dump_states(f);
3634 if (state_test(CDir::STATE_COMPLETE)) f->dump_string("state", "complete");
3635 if (state_test(CDir::STATE_FREEZINGTREE)) f->dump_string("state", "freezingtree");
3636 if (state_test(CDir::STATE_FROZENTREE)) f->dump_string("state", "frozentree");
3637 if (state_test(CDir::STATE_FROZENDIR)) f->dump_string("state", "frozendir");
3638 if (state_test(CDir::STATE_FREEZINGDIR)) f->dump_string("state", "freezingdir");
3639 if (state_test(CDir::STATE_EXPORTBOUND)) f->dump_string("state", "exportbound");
3640 if (state_test(CDir::STATE_IMPORTBOUND)) f->dump_string("state", "importbound");
3641 if (state_test(CDir::STATE_BADFRAG)) f->dump_string("state", "badfrag");
3642 f->close_section();
3643 }
3644 if (flags & DUMP_MDS_CACHE_OBJECT) {
3645 MDSCacheObject::dump(f);
3646 }
3647 if (flags & DUMP_ITEMS) {
3648 f->open_array_section("dentries");
3649 for (auto &p : items) {
3650 CDentry *dn = p.second;
3651 f->open_object_section("dentry");
3652 dn->dump(f);
3653 f->close_section();
3654 }
3655 f->close_section();
3656 }
7c673cae
FG
3657}
3658
11fdf7f2 3659void CDir::dump_load(Formatter *f)
28e407b8
AA
3660{
3661 f->dump_stream("path") << get_path();
3662 f->dump_stream("dirfrag") << dirfrag();
3663
3664 f->open_object_section("pop_me");
11fdf7f2 3665 pop_me.dump(f);
28e407b8
AA
3666 f->close_section();
3667
3668 f->open_object_section("pop_nested");
11fdf7f2 3669 pop_nested.dump(f);
28e407b8
AA
3670 f->close_section();
3671
3672 f->open_object_section("pop_auth_subtree");
11fdf7f2 3673 pop_auth_subtree.dump(f);
28e407b8
AA
3674 f->close_section();
3675
3676 f->open_object_section("pop_auth_subtree_nested");
11fdf7f2 3677 pop_auth_subtree_nested.dump(f);
28e407b8
AA
3678 f->close_section();
3679}
3680
7c673cae
FG
3681/****** Scrub Stuff *******/
3682
3683void CDir::scrub_info_create() const
3684{
11fdf7f2 3685 ceph_assert(!scrub_infop);
7c673cae
FG
3686
3687 // break out of const-land to set up implicit initial state
3688 CDir *me = const_cast<CDir*>(this);
f67539c2 3689 const auto& pf = me->get_projected_fnode();
7c673cae
FG
3690
3691 std::unique_ptr<scrub_info_t> si(new scrub_info_t());
3692
f67539c2
TL
3693 si->last_recursive.version = pf->recursive_scrub_version;
3694 si->last_recursive.time = pf->recursive_scrub_stamp;
7c673cae 3695
f67539c2
TL
3696 si->last_local.version = pf->localized_scrub_version;
3697 si->last_local.time = pf->localized_scrub_stamp;
7c673cae
FG
3698
3699 me->scrub_infop.swap(si);
3700}
3701
f67539c2 3702void CDir::scrub_initialize(const ScrubHeaderRef& header)
7c673cae 3703{
f67539c2 3704 ceph_assert(header);
7c673cae
FG
3705 // FIXME: weird implicit construction, is someone else meant
3706 // to be calling scrub_info_create first?
3707 scrub_info();
7c673cae
FG
3708 scrub_infop->directory_scrubbing = true;
3709 scrub_infop->header = header;
f67539c2 3710 header->inc_num_pending();
7c673cae
FG
3711}
3712
f67539c2 3713void CDir::scrub_aborted() {
7c673cae 3714 dout(20) << __func__ << dendl;
f67539c2 3715 ceph_assert(scrub_is_in_progress());
7c673cae 3716
f67539c2
TL
3717 scrub_infop->last_scrub_dirty = false;
3718 scrub_infop->directory_scrubbing = false;
3719 scrub_infop->header->dec_num_pending();
3720 scrub_infop.reset();
7c673cae
FG
3721}
3722
f67539c2 3723void CDir::scrub_finished()
7c673cae
FG
3724{
3725 dout(20) << __func__ << dendl;
f67539c2 3726 ceph_assert(scrub_is_in_progress());
7c673cae 3727
f67539c2
TL
3728 scrub_infop->last_local.time = ceph_clock_now();
3729 scrub_infop->last_local.version = get_version();
3730 if (scrub_infop->header->get_recursive())
3731 scrub_infop->last_recursive = scrub_infop->last_local;
7c673cae 3732
f67539c2 3733 scrub_infop->last_scrub_dirty = true;
7c673cae 3734
f67539c2
TL
3735 scrub_infop->directory_scrubbing = false;
3736 scrub_infop->header->dec_num_pending();
7c673cae
FG
3737}
3738
3739void CDir::scrub_maybe_delete_info()
3740{
3741 if (scrub_infop &&
3742 !scrub_infop->directory_scrubbing &&
f67539c2 3743 !scrub_infop->last_scrub_dirty)
7c673cae 3744 scrub_infop.reset();
7c673cae
FG
3745}
3746
3747bool CDir::scrub_local()
3748{
11fdf7f2 3749 ceph_assert(is_complete());
f67539c2
TL
3750 bool good = check_rstats(true);
3751 if (!good && scrub_infop->header->get_repair()) {
3752 mdcache->repair_dirfrag_stats(this);
3753 scrub_infop->header->set_repaired();
1e59de90 3754 good = true;
7c673cae 3755 }
f67539c2 3756 return good;
7c673cae
FG
3757}
3758
3759std::string CDir::get_path() const
3760{
3761 std::string path;
3762 get_inode()->make_path_string(path, true);
3763 return path;
3764}
3765
3766bool CDir::should_split_fast() const
3767{
3768 // Max size a fragment can be before trigger fast splitting
11fdf7f2 3769 int fast_limit = g_conf()->mds_bal_split_size * g_conf()->mds_bal_fragment_fast_factor;
7c673cae
FG
3770
3771 // Fast path: the sum of accounted size and null dentries does not
3772 // exceed threshold: we definitely are not over it.
3773 if (get_frag_size() + get_num_head_null() <= fast_limit) {
3774 return false;
3775 }
3776
3777 // Fast path: the accounted size of the frag exceeds threshold: we
3778 // definitely are over it
3779 if (get_frag_size() > fast_limit) {
3780 return true;
3781 }
3782
3783 int64_t effective_size = 0;
3784
3785 for (const auto &p : items) {
3786 const CDentry *dn = p.second;
3787 if (!dn->get_projected_linkage()->is_null()) {
3788 effective_size++;
3789 }
3790 }
3791
3792 return effective_size > fast_limit;
3793}
3794
f67539c2
TL
3795bool CDir::should_merge() const
3796{
3797 if (get_frag() == frag_t())
3798 return false;
3799
3800 if (inode->is_ephemeral_dist()) {
3801 unsigned min_frag_bits = mdcache->get_ephemeral_dist_frag_bits();
3802 if (min_frag_bits > 0 && get_frag().bits() < min_frag_bits + 1)
3803 return false;
3804 }
3805
39ae355f 3806 return ((int)get_frag_size() + (int)get_num_snap_items()) < g_conf()->mds_bal_merge_size;
f67539c2
TL
3807}
3808
181888fb 3809MEMPOOL_DEFINE_OBJECT_FACTORY(CDir, co_dir, mds_co);
f91f0fd5 3810MEMPOOL_DEFINE_OBJECT_FACTORY(CDir::scrub_info_t, scrub_info_t, mds_co)