]> git.proxmox.com Git - ceph.git/blame - ceph/src/mds/CDir.cc
import ceph 15.2.13
[ceph.git] / ceph / src / mds / CDir.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
11fdf7f2 15#include <string_view>
7c673cae
FG
16
17#include "include/types.h"
18
19#include "CDir.h"
20#include "CDentry.h"
21#include "CInode.h"
22#include "Mutation.h"
23
24#include "MDSMap.h"
25#include "MDSRank.h"
26#include "MDCache.h"
27#include "Locker.h"
28#include "MDLog.h"
29#include "LogSegment.h"
30
31#include "common/bloom_filter.hpp"
32#include "include/Context.h"
33#include "common/Clock.h"
34
35#include "osdc/Objecter.h"
36
37#include "common/config.h"
11fdf7f2 38#include "include/ceph_assert.h"
7c673cae
FG
39#include "include/compat.h"
40
41#define dout_context g_ceph_context
42#define dout_subsys ceph_subsys_mds
43#undef dout_prefix
44#define dout_prefix *_dout << "mds." << cache->mds->get_nodeid() << ".cache.dir(" << this->dirfrag() << ") "
45
46int CDir::num_frozen_trees = 0;
47int CDir::num_freezing_trees = 0;
48
11fdf7f2 49class CDirContext : public MDSContext
7c673cae
FG
50{
51protected:
52 CDir *dir;
53 MDSRank* get_mds() override {return dir->cache->mds;}
54
55public:
56 explicit CDirContext(CDir *d) : dir(d) {
11fdf7f2 57 ceph_assert(dir != NULL);
7c673cae
FG
58 }
59};
60
61
62class CDirIOContext : public MDSIOContextBase
63{
64protected:
65 CDir *dir;
66 MDSRank* get_mds() override {return dir->cache->mds;}
67
68public:
69 explicit CDirIOContext(CDir *d) : dir(d) {
11fdf7f2 70 ceph_assert(dir != NULL);
7c673cae
FG
71 }
72};
73
74
75// PINS
76//int cdir_pins[CDIR_NUM_PINS] = { 0,0,0,0,0,0,0,0,0,0,0,0,0,0 };
77
78
79ostream& operator<<(ostream& out, const CDir& dir)
80{
81 out << "[dir " << dir.dirfrag() << " " << dir.get_path() << "/"
82 << " [" << dir.first << ",head]";
83 if (dir.is_auth()) {
84 out << " auth";
85 if (dir.is_replicated())
86 out << dir.get_replicas();
87
88 if (dir.is_projected())
89 out << " pv=" << dir.get_projected_version();
90 out << " v=" << dir.get_version();
91 out << " cv=" << dir.get_committing_version();
92 out << "/" << dir.get_committed_version();
93 } else {
94 mds_authority_t a = dir.authority();
95 out << " rep@" << a.first;
96 if (a.second != CDIR_AUTH_UNKNOWN)
97 out << "," << a.second;
98 out << "." << dir.get_replica_nonce();
99 }
100
101 if (dir.is_rep()) out << " REP";
102
103 if (dir.get_dir_auth() != CDIR_AUTH_DEFAULT) {
104 if (dir.get_dir_auth().second == CDIR_AUTH_UNKNOWN)
105 out << " dir_auth=" << dir.get_dir_auth().first;
106 else
107 out << " dir_auth=" << dir.get_dir_auth();
108 }
109
11fdf7f2 110 if (dir.get_auth_pins() || dir.get_dir_auth_pins()) {
7c673cae 111 out << " ap=" << dir.get_auth_pins()
11fdf7f2
TL
112 << "+" << dir.get_dir_auth_pins();
113#ifdef MDS_AUTHPIN_SET
114 dir.print_authpin_set(out);
115#endif
116 }
7c673cae
FG
117
118 out << " state=" << dir.get_state();
119 if (dir.state_test(CDir::STATE_COMPLETE)) out << "|complete";
120 if (dir.state_test(CDir::STATE_FREEZINGTREE)) out << "|freezingtree";
121 if (dir.state_test(CDir::STATE_FROZENTREE)) out << "|frozentree";
122 if (dir.state_test(CDir::STATE_AUXSUBTREE)) out << "|auxsubtree";
7c673cae
FG
123 if (dir.state_test(CDir::STATE_FROZENDIR)) out << "|frozendir";
124 if (dir.state_test(CDir::STATE_FREEZINGDIR)) out << "|freezingdir";
125 if (dir.state_test(CDir::STATE_EXPORTBOUND)) out << "|exportbound";
126 if (dir.state_test(CDir::STATE_IMPORTBOUND)) out << "|importbound";
127 if (dir.state_test(CDir::STATE_BADFRAG)) out << "|badfrag";
128 if (dir.state_test(CDir::STATE_FRAGMENTING)) out << "|fragmenting";
11fdf7f2
TL
129 if (dir.state_test(CDir::STATE_CREATING)) out << "|creating";
130 if (dir.state_test(CDir::STATE_COMMITTING)) out << "|committing";
131 if (dir.state_test(CDir::STATE_FETCHING)) out << "|fetching";
132 if (dir.state_test(CDir::STATE_EXPORTING)) out << "|exporting";
133 if (dir.state_test(CDir::STATE_IMPORTING)) out << "|importing";
134 if (dir.state_test(CDir::STATE_STICKY)) out << "|sticky";
135 if (dir.state_test(CDir::STATE_DNPINNEDFRAG)) out << "|dnpinnedfrag";
136 if (dir.state_test(CDir::STATE_ASSIMRSTAT)) out << "|assimrstat";
7c673cae
FG
137
138 // fragstat
139 out << " " << dir.fnode.fragstat;
140 if (!(dir.fnode.fragstat == dir.fnode.accounted_fragstat))
141 out << "/" << dir.fnode.accounted_fragstat;
11fdf7f2 142 if (g_conf()->mds_debug_scatterstat && dir.is_projected()) {
7c673cae
FG
143 const fnode_t *pf = dir.get_projected_fnode();
144 out << "->" << pf->fragstat;
145 if (!(pf->fragstat == pf->accounted_fragstat))
146 out << "/" << pf->accounted_fragstat;
147 }
148
149 // rstat
150 out << " " << dir.fnode.rstat;
151 if (!(dir.fnode.rstat == dir.fnode.accounted_rstat))
152 out << "/" << dir.fnode.accounted_rstat;
11fdf7f2 153 if (g_conf()->mds_debug_scatterstat && dir.is_projected()) {
7c673cae
FG
154 const fnode_t *pf = dir.get_projected_fnode();
155 out << "->" << pf->rstat;
156 if (!(pf->rstat == pf->accounted_rstat))
157 out << "/" << pf->accounted_rstat;
158 }
159
160 out << " hs=" << dir.get_num_head_items() << "+" << dir.get_num_head_null();
161 out << ",ss=" << dir.get_num_snap_items() << "+" << dir.get_num_snap_null();
162 if (dir.get_num_dirty())
163 out << " dirty=" << dir.get_num_dirty();
164
165 if (dir.get_num_ref()) {
166 out << " |";
167 dir.print_pin_set(out);
168 }
169
170 out << " " << &dir;
171 return out << "]";
172}
173
174
175void CDir::print(ostream& out)
176{
177 out << *this;
178}
179
180
181
182
183ostream& CDir::print_db_line_prefix(ostream& out)
184{
185 return out << ceph_clock_now() << " mds." << cache->mds->get_nodeid() << ".cache.dir(" << this->dirfrag() << ") ";
186}
187
188
189
190// -------------------------------------------------------------------
191// CDir
192
193CDir::CDir(CInode *in, frag_t fg, MDCache *mdcache, bool auth) :
194 cache(mdcache), inode(in), frag(fg),
7c673cae 195 dirty_rstat_inodes(member_offset(CInode, dirty_rstat_item)),
b32b8144
FG
196 dirty_dentries(member_offset(CDentry, item_dir_dirty)),
197 item_dirty(this), item_new(this),
9f95a23c
TL
198 lock_caches_with_auth_pins(member_offset(MDLockCache::DirItem, item_dir)),
199 freezing_inodes(member_offset(CInode, item_freezing_inode)),
7c673cae 200 dir_rep(REP_NONE),
11fdf7f2
TL
201 pop_me(mdcache->decayrate),
202 pop_nested(mdcache->decayrate),
203 pop_auth_subtree(mdcache->decayrate),
204 pop_auth_subtree_nested(mdcache->decayrate),
205 pop_spread(mdcache->decayrate),
28e407b8 206 pop_lru_subdirs(member_offset(CInode, item_pop_lru)),
7c673cae
FG
207 dir_auth(CDIR_AUTH_DEFAULT)
208{
7c673cae 209 // auth
11fdf7f2 210 ceph_assert(in->is_dir());
94b18763 211 if (auth) state_set(STATE_AUTH);
7c673cae
FG
212}
213
214/**
215 * Check the recursive statistics on size for consistency.
216 * If mds_debug_scatterstat is enabled, assert for correctness,
217 * otherwise just print out the mismatch and continue.
218 */
219bool CDir::check_rstats(bool scrub)
220{
11fdf7f2 221 if (!g_conf()->mds_debug_scatterstat && !scrub)
7c673cae
FG
222 return true;
223
224 dout(25) << "check_rstats on " << this << dendl;
225 if (!is_complete() || !is_auth() || is_frozen()) {
92f5a8d4
TL
226 dout(3) << "check_rstats " << (scrub ? "(scrub) " : "")
227 << "bailing out -- incomplete or non-auth or frozen dir on "
228 << *this << dendl;
229 return !scrub;
7c673cae
FG
230 }
231
232 frag_info_t frag_info;
233 nest_info_t nest_info;
94b18763 234 for (auto i = items.begin(); i != items.end(); ++i) {
7c673cae
FG
235 if (i->second->last != CEPH_NOSNAP)
236 continue;
237 CDentry::linkage_t *dnl = i->second->get_linkage();
238 if (dnl->is_primary()) {
239 CInode *in = dnl->get_inode();
240 nest_info.add(in->inode.accounted_rstat);
241 if (in->is_dir())
242 frag_info.nsubdirs++;
243 else
244 frag_info.nfiles++;
245 } else if (dnl->is_remote())
246 frag_info.nfiles++;
247 }
248
249 bool good = true;
250 // fragstat
251 if(!frag_info.same_sums(fnode.fragstat)) {
252 dout(1) << "mismatch between head items and fnode.fragstat! printing dentries" << dendl;
253 dout(1) << "get_num_head_items() = " << get_num_head_items()
254 << "; fnode.fragstat.nfiles=" << fnode.fragstat.nfiles
255 << " fnode.fragstat.nsubdirs=" << fnode.fragstat.nsubdirs << dendl;
256 good = false;
257 } else {
258 dout(20) << "get_num_head_items() = " << get_num_head_items()
259 << "; fnode.fragstat.nfiles=" << fnode.fragstat.nfiles
260 << " fnode.fragstat.nsubdirs=" << fnode.fragstat.nsubdirs << dendl;
261 }
262
263 // rstat
264 if (!nest_info.same_sums(fnode.rstat)) {
265 dout(1) << "mismatch between child accounted_rstats and my rstats!" << dendl;
266 dout(1) << "total of child dentrys: " << nest_info << dendl;
267 dout(1) << "my rstats: " << fnode.rstat << dendl;
268 good = false;
269 } else {
270 dout(20) << "total of child dentrys: " << nest_info << dendl;
271 dout(20) << "my rstats: " << fnode.rstat << dendl;
272 }
273
274 if (!good) {
275 if (!scrub) {
94b18763 276 for (auto i = items.begin(); i != items.end(); ++i) {
7c673cae
FG
277 CDentry *dn = i->second;
278 if (dn->get_linkage()->is_primary()) {
279 CInode *in = dn->get_linkage()->inode;
280 dout(1) << *dn << " rstat " << in->inode.accounted_rstat << dendl;
281 } else {
282 dout(1) << *dn << dendl;
283 }
284 }
285
11fdf7f2
TL
286 ceph_assert(frag_info.nfiles == fnode.fragstat.nfiles);
287 ceph_assert(frag_info.nsubdirs == fnode.fragstat.nsubdirs);
288 ceph_assert(nest_info.rbytes == fnode.rstat.rbytes);
289 ceph_assert(nest_info.rfiles == fnode.rstat.rfiles);
290 ceph_assert(nest_info.rsubdirs == fnode.rstat.rsubdirs);
7c673cae
FG
291 }
292 }
293 dout(10) << "check_rstats complete on " << this << dendl;
294 return good;
295}
296
11fdf7f2
TL
297void CDir::adjust_num_inodes_with_caps(int d)
298{
299 // FIXME: smarter way to decide if adding 'this' to open file table
300 if (num_inodes_with_caps == 0 && d > 0)
301 cache->open_file_table.add_dirfrag(this);
302 else if (num_inodes_with_caps > 0 && num_inodes_with_caps == -d)
303 cache->open_file_table.remove_dirfrag(this);
304
305 num_inodes_with_caps += d;
306 ceph_assert(num_inodes_with_caps >= 0);
307}
308
309CDentry *CDir::lookup(std::string_view name, snapid_t snap)
7c673cae
FG
310{
311 dout(20) << "lookup (" << snap << ", '" << name << "')" << dendl;
94b18763 312 auto iter = items.lower_bound(dentry_key_t(snap, name, inode->hash_dentry_name(name)));
7c673cae
FG
313 if (iter == items.end())
314 return 0;
94b18763 315 if (iter->second->get_name() == name &&
7c673cae
FG
316 iter->second->first <= snap &&
317 iter->second->last >= snap) {
318 dout(20) << " hit -> " << iter->first << dendl;
319 return iter->second;
320 }
321 dout(20) << " miss -> " << iter->first << dendl;
322 return 0;
323}
324
11fdf7f2
TL
325CDentry *CDir::lookup_exact_snap(std::string_view name, snapid_t last) {
326 dout(20) << __func__ << " (" << last << ", '" << name << "')" << dendl;
94b18763 327 auto p = items.find(dentry_key_t(last, name, inode->hash_dentry_name(name)));
7c673cae
FG
328 if (p == items.end())
329 return NULL;
330 return p->second;
331}
332
333/***
334 * linking fun
335 */
336
11fdf7f2 337CDentry* CDir::add_null_dentry(std::string_view dname,
7c673cae
FG
338 snapid_t first, snapid_t last)
339{
340 // foreign
11fdf7f2 341 ceph_assert(lookup_exact_snap(dname, last) == 0);
7c673cae
FG
342
343 // create dentry
344 CDentry* dn = new CDentry(dname, inode->hash_dentry_name(dname), first, last);
345 if (is_auth())
346 dn->state_set(CDentry::STATE_AUTH);
31f18b77
FG
347
348 cache->bottom_lru.lru_insert_mid(dn);
349 dn->state_set(CDentry::STATE_BOTTOMLRU);
7c673cae
FG
350
351 dn->dir = this;
352 dn->version = get_projected_version();
353
354 // add to dir
11fdf7f2 355 ceph_assert(items.count(dn->key()) == 0);
94b18763 356 //assert(null_items.count(dn->get_name()) == 0);
7c673cae
FG
357
358 items[dn->key()] = dn;
359 if (last == CEPH_NOSNAP)
360 num_head_null++;
361 else
362 num_snap_null++;
363
364 if (state_test(CDir::STATE_DNPINNEDFRAG)) {
365 dn->get(CDentry::PIN_FRAGMENTING);
366 dn->state_set(CDentry::STATE_FRAGMENTING);
367 }
368
11fdf7f2 369 dout(12) << __func__ << " " << *dn << dendl;
7c673cae
FG
370
371 // pin?
372 if (get_num_any() == 1)
373 get(PIN_CHILD);
374
11fdf7f2 375 ceph_assert(get_num_any() == items.size());
7c673cae
FG
376 return dn;
377}
378
379
11fdf7f2 380CDentry* CDir::add_primary_dentry(std::string_view dname, CInode *in,
7c673cae
FG
381 snapid_t first, snapid_t last)
382{
383 // primary
11fdf7f2 384 ceph_assert(lookup_exact_snap(dname, last) == 0);
7c673cae
FG
385
386 // create dentry
387 CDentry* dn = new CDentry(dname, inode->hash_dentry_name(dname), first, last);
388 if (is_auth())
389 dn->state_set(CDentry::STATE_AUTH);
31f18b77
FG
390 if (is_auth() || !inode->is_stray()) {
391 cache->lru.lru_insert_mid(dn);
392 } else {
393 cache->bottom_lru.lru_insert_mid(dn);
394 dn->state_set(CDentry::STATE_BOTTOMLRU);
395 }
7c673cae
FG
396
397 dn->dir = this;
398 dn->version = get_projected_version();
399
400 // add to dir
11fdf7f2 401 ceph_assert(items.count(dn->key()) == 0);
94b18763 402 //assert(null_items.count(dn->get_name()) == 0);
7c673cae
FG
403
404 items[dn->key()] = dn;
405
406 dn->get_linkage()->inode = in;
7c673cae
FG
407
408 link_inode_work(dn, in);
409
410 if (dn->last == CEPH_NOSNAP)
411 num_head_items++;
412 else
413 num_snap_items++;
414
415 if (state_test(CDir::STATE_DNPINNEDFRAG)) {
416 dn->get(CDentry::PIN_FRAGMENTING);
417 dn->state_set(CDentry::STATE_FRAGMENTING);
418 }
419
11fdf7f2 420 dout(12) << __func__ << " " << *dn << dendl;
7c673cae
FG
421
422 // pin?
423 if (get_num_any() == 1)
424 get(PIN_CHILD);
11fdf7f2 425 ceph_assert(get_num_any() == items.size());
7c673cae
FG
426 return dn;
427}
428
11fdf7f2 429CDentry* CDir::add_remote_dentry(std::string_view dname, inodeno_t ino, unsigned char d_type,
7c673cae
FG
430 snapid_t first, snapid_t last)
431{
432 // foreign
11fdf7f2 433 ceph_assert(lookup_exact_snap(dname, last) == 0);
7c673cae
FG
434
435 // create dentry
436 CDentry* dn = new CDentry(dname, inode->hash_dentry_name(dname), ino, d_type, first, last);
437 if (is_auth())
438 dn->state_set(CDentry::STATE_AUTH);
439 cache->lru.lru_insert_mid(dn);
440
441 dn->dir = this;
442 dn->version = get_projected_version();
443
444 // add to dir
11fdf7f2 445 ceph_assert(items.count(dn->key()) == 0);
94b18763 446 //assert(null_items.count(dn->get_name()) == 0);
7c673cae
FG
447
448 items[dn->key()] = dn;
449 if (last == CEPH_NOSNAP)
450 num_head_items++;
451 else
452 num_snap_items++;
453
454 if (state_test(CDir::STATE_DNPINNEDFRAG)) {
455 dn->get(CDentry::PIN_FRAGMENTING);
456 dn->state_set(CDentry::STATE_FRAGMENTING);
457 }
458
11fdf7f2 459 dout(12) << __func__ << " " << *dn << dendl;
7c673cae
FG
460
461 // pin?
462 if (get_num_any() == 1)
463 get(PIN_CHILD);
464
11fdf7f2 465 ceph_assert(get_num_any() == items.size());
7c673cae
FG
466 return dn;
467}
468
469
470
471void CDir::remove_dentry(CDentry *dn)
472{
11fdf7f2 473 dout(12) << __func__ << " " << *dn << dendl;
7c673cae
FG
474
475 // there should be no client leases at this point!
11fdf7f2 476 ceph_assert(dn->client_lease_map.empty());
7c673cae
FG
477
478 if (state_test(CDir::STATE_DNPINNEDFRAG)) {
479 dn->put(CDentry::PIN_FRAGMENTING);
480 dn->state_clear(CDentry::STATE_FRAGMENTING);
481 }
482
483 if (dn->get_linkage()->is_null()) {
484 if (dn->last == CEPH_NOSNAP)
485 num_head_null--;
486 else
487 num_snap_null--;
488 } else {
489 if (dn->last == CEPH_NOSNAP)
490 num_head_items--;
491 else
492 num_snap_items--;
493 }
494
495 if (!dn->get_linkage()->is_null())
496 // detach inode and dentry
497 unlink_inode_work(dn);
498
499 // remove from list
11fdf7f2 500 ceph_assert(items.count(dn->key()) == 1);
7c673cae
FG
501 items.erase(dn->key());
502
503 // clean?
504 if (dn->is_dirty())
505 dn->mark_clean();
506
31f18b77
FG
507 if (dn->state_test(CDentry::STATE_BOTTOMLRU))
508 cache->bottom_lru.lru_remove(dn);
509 else
510 cache->lru.lru_remove(dn);
7c673cae
FG
511 delete dn;
512
513 // unpin?
514 if (get_num_any() == 0)
515 put(PIN_CHILD);
11fdf7f2 516 ceph_assert(get_num_any() == items.size());
7c673cae
FG
517}
518
519void CDir::link_remote_inode(CDentry *dn, CInode *in)
520{
521 link_remote_inode(dn, in->ino(), IFTODT(in->get_projected_inode()->mode));
522}
523
524void CDir::link_remote_inode(CDentry *dn, inodeno_t ino, unsigned char d_type)
525{
11fdf7f2
TL
526 dout(12) << __func__ << " " << *dn << " remote " << ino << dendl;
527 ceph_assert(dn->get_linkage()->is_null());
7c673cae
FG
528
529 dn->get_linkage()->set_remote(ino, d_type);
530
31f18b77
FG
531 if (dn->state_test(CDentry::STATE_BOTTOMLRU)) {
532 cache->bottom_lru.lru_remove(dn);
533 cache->lru.lru_insert_mid(dn);
534 dn->state_clear(CDentry::STATE_BOTTOMLRU);
535 }
536
7c673cae
FG
537 if (dn->last == CEPH_NOSNAP) {
538 num_head_items++;
539 num_head_null--;
540 } else {
541 num_snap_items++;
542 num_snap_null--;
543 }
11fdf7f2 544 ceph_assert(get_num_any() == items.size());
7c673cae
FG
545}
546
547void CDir::link_primary_inode(CDentry *dn, CInode *in)
548{
11fdf7f2
TL
549 dout(12) << __func__ << " " << *dn << " " << *in << dendl;
550 ceph_assert(dn->get_linkage()->is_null());
7c673cae
FG
551
552 dn->get_linkage()->inode = in;
7c673cae
FG
553
554 link_inode_work(dn, in);
31f18b77
FG
555
556 if (dn->state_test(CDentry::STATE_BOTTOMLRU) &&
557 (is_auth() || !inode->is_stray())) {
558 cache->bottom_lru.lru_remove(dn);
559 cache->lru.lru_insert_mid(dn);
560 dn->state_clear(CDentry::STATE_BOTTOMLRU);
561 }
7c673cae
FG
562
563 if (dn->last == CEPH_NOSNAP) {
564 num_head_items++;
565 num_head_null--;
566 } else {
567 num_snap_items++;
568 num_snap_null--;
569 }
570
11fdf7f2 571 ceph_assert(get_num_any() == items.size());
7c673cae
FG
572}
573
574void CDir::link_inode_work( CDentry *dn, CInode *in)
575{
11fdf7f2 576 ceph_assert(dn->get_linkage()->get_inode() == in);
28e407b8 577 in->set_primary_parent(dn);
7c673cae
FG
578
579 // set inode version
580 //in->inode.version = dn->get_version();
581
582 // pin dentry?
583 if (in->get_num_ref())
584 dn->get(CDentry::PIN_INODEPIN);
11fdf7f2
TL
585
586 if (in->state_test(CInode::STATE_TRACKEDBYOFT))
587 inode->mdcache->open_file_table.notify_link(in);
588 if (in->is_any_caps())
589 adjust_num_inodes_with_caps(1);
7c673cae
FG
590
591 // adjust auth pin count
11fdf7f2
TL
592 if (in->auth_pins)
593 dn->adjust_nested_auth_pins(in->auth_pins, NULL);
7c673cae 594
9f95a23c
TL
595 if (in->is_freezing_inode())
596 freezing_inodes.push_back(&in->item_freezing_inode);
597 else if (in->is_frozen_inode() || in->is_frozen_auth_pin())
598 num_frozen_inodes++;
599
7c673cae
FG
600 // verify open snaprealm parent
601 if (in->snaprealm)
602 in->snaprealm->adjust_parent();
603 else if (in->is_any_caps())
604 in->move_to_realm(inode->find_snaprealm());
605}
606
31f18b77 607void CDir::unlink_inode(CDentry *dn, bool adjust_lru)
7c673cae
FG
608{
609 if (dn->get_linkage()->is_primary()) {
11fdf7f2 610 dout(12) << __func__ << " " << *dn << " " << *dn->get_linkage()->get_inode() << dendl;
7c673cae 611 } else {
11fdf7f2 612 dout(12) << __func__ << " " << *dn << dendl;
7c673cae
FG
613 }
614
615 unlink_inode_work(dn);
616
31f18b77
FG
617 if (adjust_lru && !dn->state_test(CDentry::STATE_BOTTOMLRU)) {
618 cache->lru.lru_remove(dn);
619 cache->bottom_lru.lru_insert_mid(dn);
620 dn->state_set(CDentry::STATE_BOTTOMLRU);
621 }
622
7c673cae
FG
623 if (dn->last == CEPH_NOSNAP) {
624 num_head_items--;
625 num_head_null++;
626 } else {
627 num_snap_items--;
628 num_snap_null++;
629 }
11fdf7f2 630 ceph_assert(get_num_any() == items.size());
7c673cae
FG
631}
632
633
634void CDir::try_remove_unlinked_dn(CDentry *dn)
635{
11fdf7f2
TL
636 ceph_assert(dn->dir == this);
637 ceph_assert(dn->get_linkage()->is_null());
7c673cae
FG
638
639 // no pins (besides dirty)?
640 if (dn->get_num_ref() != dn->is_dirty())
641 return;
642
643 // was the dn new?
644 if (dn->is_new()) {
11fdf7f2 645 dout(10) << __func__ << " " << *dn << " in " << *this << dendl;
7c673cae
FG
646 if (dn->is_dirty())
647 dn->mark_clean();
648 remove_dentry(dn);
649
650 // NOTE: we may not have any more dirty dentries, but the fnode
651 // still changed, so the directory must remain dirty.
652 }
653}
654
655
11fdf7f2 656void CDir::unlink_inode_work(CDentry *dn)
7c673cae
FG
657{
658 CInode *in = dn->get_linkage()->get_inode();
659
660 if (dn->get_linkage()->is_remote()) {
661 // remote
662 if (in)
663 dn->unlink_remote(dn->get_linkage());
664
665 dn->get_linkage()->set_remote(0, 0);
666 } else if (dn->get_linkage()->is_primary()) {
667 // primary
668 // unpin dentry?
669 if (in->get_num_ref())
670 dn->put(CDentry::PIN_INODEPIN);
11fdf7f2
TL
671
672 if (in->state_test(CInode::STATE_TRACKEDBYOFT))
673 inode->mdcache->open_file_table.notify_unlink(in);
674 if (in->is_any_caps())
675 adjust_num_inodes_with_caps(-1);
7c673cae
FG
676
677 // unlink auth_pin count
11fdf7f2
TL
678 if (in->auth_pins)
679 dn->adjust_nested_auth_pins(-in->auth_pins, nullptr);
28e407b8 680
9f95a23c
TL
681 if (in->is_freezing_inode())
682 in->item_freezing_inode.remove_myself();
683 else if (in->is_frozen_inode() || in->is_frozen_auth_pin())
684 num_frozen_inodes--;
685
7c673cae
FG
686 // detach inode
687 in->remove_primary_parent(dn);
28e407b8
AA
688 if (in->is_dir())
689 in->item_pop_lru.remove_myself();
7c673cae
FG
690 dn->get_linkage()->inode = 0;
691 } else {
11fdf7f2 692 ceph_assert(!dn->get_linkage()->is_null());
7c673cae
FG
693 }
694}
695
696void CDir::add_to_bloom(CDentry *dn)
697{
11fdf7f2 698 ceph_assert(dn->last == CEPH_NOSNAP);
7c673cae
FG
699 if (!bloom) {
700 /* not create bloom filter for incomplete dir that was added by log replay */
701 if (!is_complete())
702 return;
703
704 /* don't maintain bloom filters in standby replay (saves cycles, and also
705 * avoids need to implement clearing it in EExport for #16924) */
706 if (cache->mds->is_standby_replay()) {
707 return;
708 }
709
710 unsigned size = get_num_head_items() + get_num_snap_items();
711 if (size < 100) size = 100;
712 bloom.reset(new bloom_filter(size, 1.0 / size, 0));
713 }
714 /* This size and false positive probability is completely random.*/
94b18763 715 bloom->insert(dn->get_name().data(), dn->get_name().size());
7c673cae
FG
716}
717
11fdf7f2 718bool CDir::is_in_bloom(std::string_view name)
7c673cae
FG
719{
720 if (!bloom)
721 return false;
94b18763 722 return bloom->contains(name.data(), name.size());
7c673cae
FG
723}
724
725void CDir::remove_null_dentries() {
11fdf7f2 726 dout(12) << __func__ << " " << *this << dendl;
7c673cae 727
94b18763 728 auto p = items.begin();
7c673cae
FG
729 while (p != items.end()) {
730 CDentry *dn = p->second;
731 ++p;
732 if (dn->get_linkage()->is_null() && !dn->is_projected())
733 remove_dentry(dn);
734 }
735
11fdf7f2
TL
736 ceph_assert(num_snap_null == 0);
737 ceph_assert(num_head_null == 0);
738 ceph_assert(get_num_any() == items.size());
7c673cae
FG
739}
740
741/** remove dirty null dentries for deleted directory. the dirfrag will be
742 * deleted soon, so it's safe to not commit dirty dentries.
743 *
744 * This is called when a directory is being deleted, a prerequisite
745 * of which is that its children have been unlinked: we expect to only see
746 * null, unprojected dentries here.
747 */
748void CDir::try_remove_dentries_for_stray()
749{
750 dout(10) << __func__ << dendl;
11fdf7f2 751 ceph_assert(get_parent_dir()->inode->is_stray());
7c673cae
FG
752
753 // clear dirty only when the directory was not snapshotted
754 bool clear_dirty = !inode->snaprealm;
755
94b18763 756 auto p = items.begin();
7c673cae
FG
757 while (p != items.end()) {
758 CDentry *dn = p->second;
759 ++p;
760 if (dn->last == CEPH_NOSNAP) {
11fdf7f2
TL
761 ceph_assert(!dn->is_projected());
762 ceph_assert(dn->get_linkage()->is_null());
7c673cae
FG
763 if (clear_dirty && dn->is_dirty())
764 dn->mark_clean();
765 // It's OK to remove lease prematurely because we will never link
766 // the dentry to inode again.
767 if (dn->is_any_leases())
768 dn->remove_client_leases(cache->mds->locker);
769 if (dn->get_num_ref() == 0)
770 remove_dentry(dn);
771 } else {
11fdf7f2 772 ceph_assert(!dn->is_projected());
7c673cae
FG
773 CDentry::linkage_t *dnl= dn->get_linkage();
774 CInode *in = NULL;
775 if (dnl->is_primary()) {
776 in = dnl->get_inode();
777 if (clear_dirty && in->is_dirty())
778 in->mark_clean();
779 }
780 if (clear_dirty && dn->is_dirty())
781 dn->mark_clean();
782 if (dn->get_num_ref() == 0) {
783 remove_dentry(dn);
784 if (in)
785 cache->remove_inode(in);
786 }
787 }
788 }
789
790 if (clear_dirty && is_dirty())
791 mark_clean();
792}
793
7c673cae
FG
794bool CDir::try_trim_snap_dentry(CDentry *dn, const set<snapid_t>& snaps)
795{
11fdf7f2 796 ceph_assert(dn->last != CEPH_NOSNAP);
7c673cae
FG
797 set<snapid_t>::const_iterator p = snaps.lower_bound(dn->first);
798 CDentry::linkage_t *dnl= dn->get_linkage();
799 CInode *in = 0;
800 if (dnl->is_primary())
801 in = dnl->get_inode();
802 if ((p == snaps.end() || *p > dn->last) &&
803 (dn->get_num_ref() == dn->is_dirty()) &&
804 (!in || in->get_num_ref() == in->is_dirty())) {
805 dout(10) << " purging snapped " << *dn << dendl;
806 if (in && in->is_dirty())
807 in->mark_clean();
808 remove_dentry(dn);
809 if (in) {
810 dout(10) << " purging snapped " << *in << dendl;
811 cache->remove_inode(in);
812 }
813 return true;
814 }
815 return false;
816}
817
818
819void CDir::purge_stale_snap_data(const set<snapid_t>& snaps)
820{
11fdf7f2 821 dout(10) << __func__ << " " << snaps << dendl;
7c673cae 822
94b18763 823 auto p = items.begin();
7c673cae
FG
824 while (p != items.end()) {
825 CDentry *dn = p->second;
826 ++p;
827
828 if (dn->last == CEPH_NOSNAP)
829 continue;
830
831 try_trim_snap_dentry(dn, snaps);
832 }
833}
834
835
836/**
837 * steal_dentry -- semi-violently move a dentry from one CDir to another
838 * (*) violently, in that nitems, most pins, etc. are not correctly maintained
839 * on the old CDir corpse; must call finish_old_fragment() when finished.
840 */
841void CDir::steal_dentry(CDentry *dn)
842{
11fdf7f2 843 dout(15) << __func__ << " " << *dn << dendl;
7c673cae
FG
844
845 items[dn->key()] = dn;
846
847 dn->dir->items.erase(dn->key());
848 if (dn->dir->items.empty())
849 dn->dir->put(PIN_CHILD);
850
851 if (get_num_any() == 0)
852 get(PIN_CHILD);
853 if (dn->get_linkage()->is_null()) {
854 if (dn->last == CEPH_NOSNAP)
855 num_head_null++;
856 else
857 num_snap_null++;
858 } else if (dn->last == CEPH_NOSNAP) {
859 num_head_items++;
860
861 if (dn->get_linkage()->is_primary()) {
862 CInode *in = dn->get_linkage()->get_inode();
94b18763 863 auto pi = in->get_projected_inode();
28e407b8 864 if (in->is_dir()) {
7c673cae 865 fnode.fragstat.nsubdirs++;
28e407b8
AA
866 if (in->item_pop_lru.is_on_list())
867 pop_lru_subdirs.push_back(&in->item_pop_lru);
868 } else {
7c673cae 869 fnode.fragstat.nfiles++;
28e407b8 870 }
7c673cae
FG
871 fnode.rstat.rbytes += pi->accounted_rstat.rbytes;
872 fnode.rstat.rfiles += pi->accounted_rstat.rfiles;
873 fnode.rstat.rsubdirs += pi->accounted_rstat.rsubdirs;
11fdf7f2 874 fnode.rstat.rsnaps += pi->accounted_rstat.rsnaps;
7c673cae
FG
875 if (pi->accounted_rstat.rctime > fnode.rstat.rctime)
876 fnode.rstat.rctime = pi->accounted_rstat.rctime;
877
11fdf7f2
TL
878 if (in->is_any_caps())
879 adjust_num_inodes_with_caps(1);
880
7c673cae
FG
881 // move dirty inode rstat to new dirfrag
882 if (in->is_dirty_rstat())
883 dirty_rstat_inodes.push_back(&in->dirty_rstat_item);
884 } else if (dn->get_linkage()->is_remote()) {
885 if (dn->get_linkage()->get_remote_d_type() == DT_DIR)
886 fnode.fragstat.nsubdirs++;
887 else
888 fnode.fragstat.nfiles++;
889 }
890 } else {
891 num_snap_items++;
892 if (dn->get_linkage()->is_primary()) {
893 CInode *in = dn->get_linkage()->get_inode();
894 if (in->is_dirty_rstat())
895 dirty_rstat_inodes.push_back(&in->dirty_rstat_item);
896 }
897 }
898
11fdf7f2 899 {
7c673cae 900 int dap = dn->get_num_dir_auth_pins();
11fdf7f2
TL
901 if (dap) {
902 adjust_nested_auth_pins(dap, NULL);
903 dn->dir->adjust_nested_auth_pins(-dap, NULL);
904 }
7c673cae
FG
905 }
906
b32b8144
FG
907 if (dn->is_dirty()) {
908 dirty_dentries.push_back(&dn->item_dir_dirty);
7c673cae 909 num_dirty++;
b32b8144 910 }
7c673cae
FG
911
912 dn->dir = this;
913}
914
11fdf7f2 915void CDir::prepare_old_fragment(map<string_snap_t, MDSContext::vec >& dentry_waiters, bool replay)
7c673cae
FG
916{
917 // auth_pin old fragment for duration so that any auth_pinning
918 // during the dentry migration doesn't trigger side effects
919 if (!replay && is_auth())
920 auth_pin(this);
31f18b77
FG
921
922 if (!waiting_on_dentry.empty()) {
94b18763
FG
923 for (const auto &p : waiting_on_dentry) {
924 auto &e = dentry_waiters[p.first];
925 for (const auto &waiter : p.second) {
926 e.push_back(waiter);
927 }
928 }
31f18b77
FG
929 waiting_on_dentry.clear();
930 put(PIN_DNWAITER);
931 }
7c673cae
FG
932}
933
934void CDir::prepare_new_fragment(bool replay)
935{
936 if (!replay && is_auth()) {
937 _freeze_dir();
938 mark_complete();
939 }
31f18b77 940 inode->add_dirfrag(this);
7c673cae
FG
941}
942
11fdf7f2 943void CDir::finish_old_fragment(MDSContext::vec& waiters, bool replay)
7c673cae
FG
944{
945 // take waiters _before_ unfreeze...
946 if (!replay) {
947 take_waiting(WAIT_ANY_MASK, waiters);
948 if (is_auth()) {
949 auth_unpin(this); // pinned in prepare_old_fragment
11fdf7f2 950 ceph_assert(is_frozen_dir());
7c673cae
FG
951 unfreeze_dir();
952 }
953 }
954
11fdf7f2
TL
955 ceph_assert(dir_auth_pins == 0);
956 ceph_assert(auth_pins == 0);
7c673cae
FG
957
958 num_head_items = num_head_null = 0;
959 num_snap_items = num_snap_null = 0;
11fdf7f2 960 adjust_num_inodes_with_caps(-num_inodes_with_caps);
7c673cae
FG
961
962 // this mirrors init_fragment_pins()
963 if (is_auth())
964 clear_replica_map();
965 if (is_dirty())
966 mark_clean();
967 if (state_test(STATE_IMPORTBOUND))
968 put(PIN_IMPORTBOUND);
969 if (state_test(STATE_EXPORTBOUND))
970 put(PIN_EXPORTBOUND);
971 if (is_subtree_root())
972 put(PIN_SUBTREE);
973
974 if (auth_pins > 0)
975 put(PIN_AUTHPIN);
976
11fdf7f2 977 ceph_assert(get_num_ref() == (state_test(STATE_STICKY) ? 1:0));
7c673cae
FG
978}
979
980void CDir::init_fragment_pins()
981{
181888fb 982 if (is_replicated())
7c673cae
FG
983 get(PIN_REPLICATED);
984 if (state_test(STATE_DIRTY))
985 get(PIN_DIRTY);
986 if (state_test(STATE_EXPORTBOUND))
987 get(PIN_EXPORTBOUND);
988 if (state_test(STATE_IMPORTBOUND))
989 get(PIN_IMPORTBOUND);
990 if (is_subtree_root())
991 get(PIN_SUBTREE);
992}
993
9f95a23c 994void CDir::split(int bits, std::vector<CDir*>* subs, MDSContext::vec& waiters, bool replay)
7c673cae
FG
995{
996 dout(10) << "split by " << bits << " bits on " << *this << dendl;
997
11fdf7f2 998 ceph_assert(replay || is_complete() || !is_auth());
7c673cae 999
11fdf7f2 1000 frag_vec_t frags;
7c673cae
FG
1001 frag.split(bits, frags);
1002
1003 vector<CDir*> subfrags(1 << bits);
1004
1005 double fac = 1.0 / (double)(1 << bits); // for scaling load vecs
1006
1007 version_t rstat_version = inode->get_projected_inode()->rstat.version;
1008 version_t dirstat_version = inode->get_projected_inode()->dirstat.version;
1009
1010 nest_info_t rstatdiff;
1011 frag_info_t fragstatdiff;
1012 if (fnode.accounted_rstat.version == rstat_version)
1013 rstatdiff.add_delta(fnode.accounted_rstat, fnode.rstat);
1014 if (fnode.accounted_fragstat.version == dirstat_version)
1015 fragstatdiff.add_delta(fnode.accounted_fragstat, fnode.fragstat);
1016 dout(10) << " rstatdiff " << rstatdiff << " fragstatdiff " << fragstatdiff << dendl;
1017
11fdf7f2 1018 map<string_snap_t, MDSContext::vec > dentry_waiters;
31f18b77 1019 prepare_old_fragment(dentry_waiters, replay);
7c673cae
FG
1020
1021 // create subfrag dirs
1022 int n = 0;
11fdf7f2
TL
1023 for (const auto& fg : frags) {
1024 CDir *f = new CDir(inode, fg, cache, is_auth());
7c673cae 1025 f->state_set(state & (MASK_STATE_FRAGMENT_KEPT | STATE_COMPLETE));
181888fb 1026 f->get_replicas() = get_replicas();
7c673cae 1027 f->set_version(get_version());
7c673cae
FG
1028 f->pop_me = pop_me;
1029 f->pop_me.scale(fac);
1030
1031 // FIXME; this is an approximation
1032 f->pop_nested = pop_nested;
1033 f->pop_nested.scale(fac);
1034 f->pop_auth_subtree = pop_auth_subtree;
1035 f->pop_auth_subtree.scale(fac);
1036 f->pop_auth_subtree_nested = pop_auth_subtree_nested;
1037 f->pop_auth_subtree_nested.scale(fac);
1038
11fdf7f2 1039 dout(10) << " subfrag " << fg << " " << *f << dendl;
7c673cae 1040 subfrags[n++] = f;
9f95a23c 1041 subs->push_back(f);
7c673cae
FG
1042
1043 f->set_dir_auth(get_dir_auth());
11fdf7f2 1044 f->freeze_tree_state = freeze_tree_state;
7c673cae 1045 f->prepare_new_fragment(replay);
1adf2230 1046 f->init_fragment_pins();
7c673cae
FG
1047 }
1048
1049 // repartition dentries
1050 while (!items.empty()) {
94b18763 1051 auto p = items.begin();
7c673cae
FG
1052
1053 CDentry *dn = p->second;
94b18763 1054 frag_t subfrag = inode->pick_dirfrag(dn->get_name());
7c673cae
FG
1055 int n = (subfrag.value() & (subfrag.mask() ^ frag.mask())) >> subfrag.mask_shift();
1056 dout(15) << " subfrag " << subfrag << " n=" << n << " for " << p->first << dendl;
1057 CDir *f = subfrags[n];
1058 f->steal_dentry(dn);
1059 }
1060
94b18763 1061 for (const auto &p : dentry_waiters) {
31f18b77
FG
1062 frag_t subfrag = inode->pick_dirfrag(p.first.name);
1063 int n = (subfrag.value() & (subfrag.mask() ^ frag.mask())) >> subfrag.mask_shift();
1064 CDir *f = subfrags[n];
1065
1066 if (f->waiting_on_dentry.empty())
1067 f->get(PIN_DNWAITER);
94b18763
FG
1068 auto &e = f->waiting_on_dentry[p.first];
1069 for (const auto &waiter : p.second) {
1070 e.push_back(waiter);
1071 }
31f18b77
FG
1072 }
1073
7c673cae
FG
1074 // FIXME: handle dirty old rstat
1075
1076 // fix up new frag fragstats
1077 for (int i=0; i<n; i++) {
1078 CDir *f = subfrags[i];
1079 f->fnode.rstat.version = rstat_version;
1080 f->fnode.accounted_rstat = f->fnode.rstat;
1081 f->fnode.fragstat.version = dirstat_version;
1082 f->fnode.accounted_fragstat = f->fnode.fragstat;
1083 dout(10) << " rstat " << f->fnode.rstat << " fragstat " << f->fnode.fragstat
1084 << " on " << *f << dendl;
1085 }
1086
1087 // give any outstanding frag stat differential to first frag
1088 dout(10) << " giving rstatdiff " << rstatdiff << " fragstatdiff" << fragstatdiff
1089 << " to " << *subfrags[0] << dendl;
1090 subfrags[0]->fnode.accounted_rstat.add(rstatdiff);
1091 subfrags[0]->fnode.accounted_fragstat.add(fragstatdiff);
1092
1093 finish_old_fragment(waiters, replay);
1094}
1095
9f95a23c 1096void CDir::merge(const std::vector<CDir*>& subs, MDSContext::vec& waiters, bool replay)
7c673cae
FG
1097{
1098 dout(10) << "merge " << subs << dendl;
1099
9f95a23c
TL
1100 ceph_assert(subs.size() > 0);
1101
11fdf7f2
TL
1102 set_dir_auth(subs.front()->get_dir_auth());
1103 freeze_tree_state = subs.front()->freeze_tree_state;
1104
9f95a23c 1105 for (const auto& dir : subs) {
11fdf7f2
TL
1106 ceph_assert(get_dir_auth() == dir->get_dir_auth());
1107 ceph_assert(freeze_tree_state == dir->freeze_tree_state);
7c673cae
FG
1108 }
1109
7c673cae
FG
1110 prepare_new_fragment(replay);
1111
1112 nest_info_t rstatdiff;
1113 frag_info_t fragstatdiff;
1114 bool touched_mtime, touched_chattr;
1115 version_t rstat_version = inode->get_projected_inode()->rstat.version;
1116 version_t dirstat_version = inode->get_projected_inode()->dirstat.version;
1117
11fdf7f2 1118 map<string_snap_t, MDSContext::vec > dentry_waiters;
31f18b77 1119
9f95a23c 1120 for (const auto& dir : subs) {
7c673cae 1121 dout(10) << " subfrag " << dir->get_frag() << " " << *dir << dendl;
11fdf7f2 1122 ceph_assert(!dir->is_auth() || dir->is_complete() || replay);
7c673cae
FG
1123
1124 if (dir->fnode.accounted_rstat.version == rstat_version)
1125 rstatdiff.add_delta(dir->fnode.accounted_rstat, dir->fnode.rstat);
1126 if (dir->fnode.accounted_fragstat.version == dirstat_version)
1127 fragstatdiff.add_delta(dir->fnode.accounted_fragstat, dir->fnode.fragstat,
1128 &touched_mtime, &touched_chattr);
1129
31f18b77 1130 dir->prepare_old_fragment(dentry_waiters, replay);
7c673cae
FG
1131
1132 // steal dentries
1133 while (!dir->items.empty())
1134 steal_dentry(dir->items.begin()->second);
1135
1136 // merge replica map
181888fb
FG
1137 for (const auto &p : dir->get_replicas()) {
1138 unsigned cur = get_replicas()[p.first];
1139 if (p.second > cur)
1140 get_replicas()[p.first] = p.second;
7c673cae
FG
1141 }
1142
1143 // merge version
1144 if (dir->get_version() > get_version())
1145 set_version(dir->get_version());
1146
1147 // merge state
1148 state_set(dir->get_state() & MASK_STATE_FRAGMENT_KEPT);
7c673cae
FG
1149
1150 dir->finish_old_fragment(waiters, replay);
1151 inode->close_dirfrag(dir->get_frag());
1152 }
1153
31f18b77
FG
1154 if (!dentry_waiters.empty()) {
1155 get(PIN_DNWAITER);
94b18763
FG
1156 for (const auto &p : dentry_waiters) {
1157 auto &e = waiting_on_dentry[p.first];
1158 for (const auto &waiter : p.second) {
1159 e.push_back(waiter);
1160 }
31f18b77
FG
1161 }
1162 }
1163
7c673cae
FG
1164 if (is_auth() && !replay)
1165 mark_complete();
1166
1167 // FIXME: merge dirty old rstat
1168 fnode.rstat.version = rstat_version;
1169 fnode.accounted_rstat = fnode.rstat;
1170 fnode.accounted_rstat.add(rstatdiff);
1171
1172 fnode.fragstat.version = dirstat_version;
1173 fnode.accounted_fragstat = fnode.fragstat;
1174 fnode.accounted_fragstat.add(fragstatdiff);
1175
1176 init_fragment_pins();
1177}
1178
1179
1180
1181
1182void CDir::resync_accounted_fragstat()
1183{
1184 fnode_t *pf = get_projected_fnode();
94b18763 1185 auto pi = inode->get_projected_inode();
7c673cae
FG
1186
1187 if (pf->accounted_fragstat.version != pi->dirstat.version) {
1188 pf->fragstat.version = pi->dirstat.version;
11fdf7f2 1189 dout(10) << __func__ << " " << pf->accounted_fragstat << " -> " << pf->fragstat << dendl;
7c673cae
FG
1190 pf->accounted_fragstat = pf->fragstat;
1191 }
1192}
1193
1194/*
1195 * resync rstat and accounted_rstat with inode
1196 */
1197void CDir::resync_accounted_rstat()
1198{
1199 fnode_t *pf = get_projected_fnode();
94b18763 1200 auto pi = inode->get_projected_inode();
7c673cae
FG
1201
1202 if (pf->accounted_rstat.version != pi->rstat.version) {
1203 pf->rstat.version = pi->rstat.version;
11fdf7f2 1204 dout(10) << __func__ << " " << pf->accounted_rstat << " -> " << pf->rstat << dendl;
7c673cae
FG
1205 pf->accounted_rstat = pf->rstat;
1206 dirty_old_rstat.clear();
1207 }
1208}
1209
1210void CDir::assimilate_dirty_rstat_inodes()
1211{
11fdf7f2 1212 dout(10) << __func__ << dendl;
7c673cae
FG
1213 for (elist<CInode*>::iterator p = dirty_rstat_inodes.begin_use_current();
1214 !p.end(); ++p) {
1215 CInode *in = *p;
11fdf7f2 1216 ceph_assert(in->is_auth());
7c673cae
FG
1217 if (in->is_frozen())
1218 continue;
1219
94b18763
FG
1220 auto &pi = in->project_inode();
1221 pi.inode.version = in->pre_dirty();
7c673cae
FG
1222
1223 inode->mdcache->project_rstat_inode_to_frag(in, this, 0, 0, NULL);
1224 }
1225 state_set(STATE_ASSIMRSTAT);
11fdf7f2 1226 dout(10) << __func__ << " done" << dendl;
7c673cae
FG
1227}
1228
1229void CDir::assimilate_dirty_rstat_inodes_finish(MutationRef& mut, EMetaBlob *blob)
1230{
1231 if (!state_test(STATE_ASSIMRSTAT))
1232 return;
1233 state_clear(STATE_ASSIMRSTAT);
11fdf7f2 1234 dout(10) << __func__ << dendl;
7c673cae
FG
1235 elist<CInode*>::iterator p = dirty_rstat_inodes.begin_use_current();
1236 while (!p.end()) {
1237 CInode *in = *p;
1238 ++p;
1239
1240 if (in->is_frozen())
1241 continue;
1242
1243 CDentry *dn = in->get_projected_parent_dn();
1244
1245 mut->auth_pin(in);
1246 mut->add_projected_inode(in);
1247
1248 in->clear_dirty_rstat();
1249 blob->add_primary_dentry(dn, in, true);
1250 }
1251
1252 if (!dirty_rstat_inodes.empty())
1253 inode->mdcache->mds->locker->mark_updated_scatterlock(&inode->nestlock);
1254}
1255
1256
1257
1258
1259/****************************************
1260 * WAITING
1261 */
1262
11fdf7f2 1263void CDir::add_dentry_waiter(std::string_view dname, snapid_t snapid, MDSContext *c)
7c673cae
FG
1264{
1265 if (waiting_on_dentry.empty())
1266 get(PIN_DNWAITER);
1267 waiting_on_dentry[string_snap_t(dname, snapid)].push_back(c);
11fdf7f2 1268 dout(10) << __func__ << " dentry " << dname
7c673cae
FG
1269 << " snap " << snapid
1270 << " " << c << " on " << *this << dendl;
1271}
1272
11fdf7f2
TL
1273void CDir::take_dentry_waiting(std::string_view dname, snapid_t first, snapid_t last,
1274 MDSContext::vec& ls)
7c673cae
FG
1275{
1276 if (waiting_on_dentry.empty())
1277 return;
1278
1279 string_snap_t lb(dname, first);
1280 string_snap_t ub(dname, last);
94b18763
FG
1281 auto it = waiting_on_dentry.lower_bound(lb);
1282 while (it != waiting_on_dentry.end() &&
1283 !(ub < it->first)) {
11fdf7f2 1284 dout(10) << __func__ << " " << dname
7c673cae 1285 << " [" << first << "," << last << "] found waiter on snap "
94b18763 1286 << it->first.snapid
7c673cae 1287 << " on " << *this << dendl;
94b18763
FG
1288 for (const auto &waiter : it->second) {
1289 ls.push_back(waiter);
1290 }
1291 waiting_on_dentry.erase(it++);
7c673cae
FG
1292 }
1293
1294 if (waiting_on_dentry.empty())
1295 put(PIN_DNWAITER);
1296}
1297
11fdf7f2 1298void CDir::take_sub_waiting(MDSContext::vec& ls)
7c673cae 1299{
11fdf7f2 1300 dout(10) << __func__ << dendl;
7c673cae 1301 if (!waiting_on_dentry.empty()) {
94b18763
FG
1302 for (const auto &p : waiting_on_dentry) {
1303 for (const auto &waiter : p.second) {
1304 ls.push_back(waiter);
1305 }
1306 }
7c673cae
FG
1307 waiting_on_dentry.clear();
1308 put(PIN_DNWAITER);
1309 }
1310}
1311
1312
1313
11fdf7f2 1314void CDir::add_waiter(uint64_t tag, MDSContext *c)
7c673cae
FG
1315{
1316 // hierarchical?
7c673cae
FG
1317
1318 // at subtree root?
1319 if (tag & WAIT_ATSUBTREEROOT) {
1320 if (!is_subtree_root()) {
1321 // try parent
1322 dout(10) << "add_waiter " << std::hex << tag << std::dec << " " << c << " should be ATSUBTREEROOT, " << *this << " is not root, trying parent" << dendl;
1323 inode->parent->dir->add_waiter(tag, c);
1324 return;
1325 }
1326 }
1327
11fdf7f2 1328 ceph_assert(!(tag & WAIT_CREATED) || state_test(STATE_CREATING));
7c673cae
FG
1329
1330 MDSCacheObject::add_waiter(tag, c);
1331}
1332
1333
1334
1335/* NOTE: this checks dentry waiters too */
11fdf7f2 1336void CDir::take_waiting(uint64_t mask, MDSContext::vec& ls)
7c673cae
FG
1337{
1338 if ((mask & WAIT_DENTRY) && !waiting_on_dentry.empty()) {
1339 // take all dentry waiters
94b18763
FG
1340 for (const auto &p : waiting_on_dentry) {
1341 dout(10) << "take_waiting dentry " << p.first.name
1342 << " snap " << p.first.snapid << " on " << *this << dendl;
1343 for (const auto &waiter : p.second) {
1344 ls.push_back(waiter);
1345 }
7c673cae 1346 }
94b18763 1347 waiting_on_dentry.clear();
7c673cae
FG
1348 put(PIN_DNWAITER);
1349 }
1350
1351 // waiting
1352 MDSCacheObject::take_waiting(mask, ls);
1353}
1354
1355
1356void CDir::finish_waiting(uint64_t mask, int result)
1357{
11fdf7f2 1358 dout(11) << __func__ << " mask " << hex << mask << dec << " result " << result << " on " << *this << dendl;
7c673cae 1359
11fdf7f2 1360 MDSContext::vec finished;
7c673cae
FG
1361 take_waiting(mask, finished);
1362 if (result < 0)
1363 finish_contexts(g_ceph_context, finished, result);
1364 else
1365 cache->mds->queue_waiters(finished);
1366}
1367
1368
1369
1370// dirty/clean
1371
1372fnode_t *CDir::project_fnode()
1373{
11fdf7f2
TL
1374 ceph_assert(get_version() != 0);
1375 auto &p = projected_fnode.emplace_back(*get_projected_fnode());
7c673cae
FG
1376
1377 if (scrub_infop && scrub_infop->last_scrub_dirty) {
94b18763
FG
1378 p.localized_scrub_stamp = scrub_infop->last_local.time;
1379 p.localized_scrub_version = scrub_infop->last_local.version;
1380 p.recursive_scrub_stamp = scrub_infop->last_recursive.time;
1381 p.recursive_scrub_version = scrub_infop->last_recursive.version;
7c673cae
FG
1382 scrub_infop->last_scrub_dirty = false;
1383 scrub_maybe_delete_info();
1384 }
1385
94b18763
FG
1386 dout(10) << __func__ << " " << &p << dendl;
1387 return &p;
7c673cae
FG
1388}
1389
1390void CDir::pop_and_dirty_projected_fnode(LogSegment *ls)
1391{
11fdf7f2 1392 ceph_assert(!projected_fnode.empty());
94b18763
FG
1393 auto &front = projected_fnode.front();
1394 dout(15) << __func__ << " " << &front << " v" << front.version << dendl;
1395 fnode = front;
7c673cae 1396 _mark_dirty(ls);
7c673cae
FG
1397 projected_fnode.pop_front();
1398}
1399
1400
1401version_t CDir::pre_dirty(version_t min)
1402{
1403 if (min > projected_version)
1404 projected_version = min;
1405 ++projected_version;
11fdf7f2 1406 dout(10) << __func__ << " " << projected_version << dendl;
7c673cae
FG
1407 return projected_version;
1408}
1409
1410void CDir::mark_dirty(version_t pv, LogSegment *ls)
1411{
11fdf7f2
TL
1412 ceph_assert(get_version() < pv);
1413 ceph_assert(pv <= projected_version);
7c673cae
FG
1414 fnode.version = pv;
1415 _mark_dirty(ls);
1416}
1417
1418void CDir::_mark_dirty(LogSegment *ls)
1419{
1420 if (!state_test(STATE_DIRTY)) {
11fdf7f2 1421 dout(10) << __func__ << " (was clean) " << *this << " version " << get_version() << dendl;
7c673cae 1422 _set_dirty_flag();
11fdf7f2 1423 ceph_assert(ls);
7c673cae 1424 } else {
11fdf7f2 1425 dout(10) << __func__ << " (already dirty) " << *this << " version " << get_version() << dendl;
7c673cae
FG
1426 }
1427 if (ls) {
1428 ls->dirty_dirfrags.push_back(&item_dirty);
1429
1430 // if i've never committed, i need to be before _any_ mention of me is trimmed from the journal.
1431 if (committed_version == 0 && !item_new.is_on_list())
1432 ls->new_dirfrags.push_back(&item_new);
1433 }
1434}
1435
1436void CDir::mark_new(LogSegment *ls)
1437{
1438 ls->new_dirfrags.push_back(&item_new);
1439 state_clear(STATE_CREATING);
1440
11fdf7f2 1441 MDSContext::vec waiters;
7c673cae
FG
1442 take_waiting(CDir::WAIT_CREATED, waiters);
1443 cache->mds->queue_waiters(waiters);
1444}
1445
1446void CDir::mark_clean()
1447{
11fdf7f2 1448 dout(10) << __func__ << " " << *this << " version " << get_version() << dendl;
7c673cae
FG
1449 if (state_test(STATE_DIRTY)) {
1450 item_dirty.remove_myself();
1451 item_new.remove_myself();
1452
1453 state_clear(STATE_DIRTY);
1454 put(PIN_DIRTY);
1455 }
1456}
1457
1458// caller should hold auth pin of this
1459void CDir::log_mark_dirty()
1460{
b32b8144 1461 if (is_dirty() || projected_version > get_version())
7c673cae
FG
1462 return; // noop if it is already dirty or will be dirty
1463
1464 version_t pv = pre_dirty();
1465 mark_dirty(pv, cache->mds->mdlog->get_current_segment());
1466}
1467
1468void CDir::mark_complete() {
1469 state_set(STATE_COMPLETE);
1470 bloom.reset();
1471}
1472
1473void CDir::first_get()
1474{
1475 inode->get(CInode::PIN_DIRFRAG);
1476}
1477
1478void CDir::last_put()
1479{
1480 inode->put(CInode::PIN_DIRFRAG);
1481}
1482
1483
1484
1485/******************************************************************************
1486 * FETCH and COMMIT
1487 */
1488
1489// -----------------------
1490// FETCH
11fdf7f2 1491void CDir::fetch(MDSContext *c, bool ignore_authpinnability)
7c673cae
FG
1492{
1493 string want;
1494 return fetch(c, want, ignore_authpinnability);
1495}
1496
11fdf7f2 1497void CDir::fetch(MDSContext *c, std::string_view want_dn, bool ignore_authpinnability)
7c673cae
FG
1498{
1499 dout(10) << "fetch on " << *this << dendl;
1500
11fdf7f2
TL
1501 ceph_assert(is_auth());
1502 ceph_assert(!is_complete());
7c673cae
FG
1503
1504 if (!can_auth_pin() && !ignore_authpinnability) {
1505 if (c) {
1506 dout(7) << "fetch waiting for authpinnable" << dendl;
1507 add_waiter(WAIT_UNFREEZE, c);
1508 } else
1509 dout(7) << "fetch not authpinnable and no context" << dendl;
1510 return;
1511 }
1512
1513 // unlinked directory inode shouldn't have any entry
31f18b77
FG
1514 if (!inode->is_base() && get_parent_dir()->inode->is_stray() &&
1515 !inode->snaprealm) {
7c673cae
FG
1516 dout(7) << "fetch dirfrag for unlinked directory, mark complete" << dendl;
1517 if (get_version() == 0) {
11fdf7f2 1518 ceph_assert(inode->is_auth());
7c673cae
FG
1519 set_version(1);
1520
1521 if (state_test(STATE_REJOINUNDEF)) {
11fdf7f2 1522 ceph_assert(cache->mds->is_rejoin());
7c673cae
FG
1523 state_clear(STATE_REJOINUNDEF);
1524 cache->opened_undef_dirfrag(this);
1525 }
1526 }
1527 mark_complete();
1528
1529 if (c)
1530 cache->mds->queue_waiter(c);
1531 return;
1532 }
1533
1534 if (c) add_waiter(WAIT_COMPLETE, c);
94b18763 1535 if (!want_dn.empty()) wanted_items.insert(mempool::mds_co::string(want_dn));
7c673cae
FG
1536
1537 // already fetching?
1538 if (state_test(CDir::STATE_FETCHING)) {
1539 dout(7) << "already fetching; waiting" << dendl;
1540 return;
1541 }
1542
1543 auth_pin(this);
1544 state_set(CDir::STATE_FETCHING);
1545
1546 if (cache->mds->logger) cache->mds->logger->inc(l_mds_dir_fetch);
1547
1548 std::set<dentry_key_t> empty;
1549 _omap_fetch(NULL, empty);
1550}
1551
11fdf7f2 1552void CDir::fetch(MDSContext *c, const std::set<dentry_key_t>& keys)
7c673cae
FG
1553{
1554 dout(10) << "fetch " << keys.size() << " keys on " << *this << dendl;
1555
11fdf7f2
TL
1556 ceph_assert(is_auth());
1557 ceph_assert(!is_complete());
7c673cae
FG
1558
1559 if (!can_auth_pin()) {
1560 dout(7) << "fetch keys waiting for authpinnable" << dendl;
1561 add_waiter(WAIT_UNFREEZE, c);
1562 return;
1563 }
1564 if (state_test(CDir::STATE_FETCHING)) {
1565 dout(7) << "fetch keys waiting for full fetch" << dendl;
1566 add_waiter(WAIT_COMPLETE, c);
1567 return;
1568 }
1569
1570 auth_pin(this);
1571 if (cache->mds->logger) cache->mds->logger->inc(l_mds_dir_fetch);
1572
1573 _omap_fetch(c, keys);
1574}
1575
1576class C_IO_Dir_OMAP_FetchedMore : public CDirIOContext {
11fdf7f2 1577 MDSContext *fin;
7c673cae 1578public:
7f7e6c64 1579 const version_t omap_version;
7c673cae
FG
1580 bufferlist hdrbl;
1581 bool more = false;
1582 map<string, bufferlist> omap; ///< carry-over from before
1583 map<string, bufferlist> omap_more; ///< new batch
1584 int ret;
7f7e6c64
TL
1585 C_IO_Dir_OMAP_FetchedMore(CDir *d, version_t v, MDSContext *f) :
1586 CDirIOContext(d), fin(f), omap_version(v), ret(0) { }
7c673cae 1587 void finish(int r) {
7f7e6c64
TL
1588 if (omap_version < dir->get_committed_version()) {
1589 omap.clear();
1590 dir->_omap_fetch(fin, {});
1591 return;
1592 }
1593
7c673cae
FG
1594 // merge results
1595 if (omap.empty()) {
1596 omap.swap(omap_more);
1597 } else {
1598 omap.insert(omap_more.begin(), omap_more.end());
1599 }
1600 if (more) {
7f7e6c64 1601 dir->_omap_fetch_more(omap_version, hdrbl, omap, fin);
7c673cae
FG
1602 } else {
1603 dir->_omap_fetched(hdrbl, omap, !fin, r);
1604 if (fin)
1605 fin->complete(r);
1606 }
1607 }
91327a77
AA
1608 void print(ostream& out) const override {
1609 out << "dirfrag_fetch_more(" << dir->dirfrag() << ")";
1610 }
7c673cae
FG
1611};
1612
1613class C_IO_Dir_OMAP_Fetched : public CDirIOContext {
11fdf7f2 1614 MDSContext *fin;
7c673cae 1615public:
7f7e6c64 1616 const version_t omap_version;
7c673cae
FG
1617 bufferlist hdrbl;
1618 bool more = false;
1619 map<string, bufferlist> omap;
1620 bufferlist btbl;
1621 int ret1, ret2, ret3;
1622
11fdf7f2 1623 C_IO_Dir_OMAP_Fetched(CDir *d, MDSContext *f) :
7f7e6c64
TL
1624 CDirIOContext(d), fin(f),
1625 omap_version(d->get_committing_version()),
1626 ret1(0), ret2(0), ret3(0) { }
7c673cae
FG
1627 void finish(int r) override {
1628 // check the correctness of backtrace
1629 if (r >= 0 && ret3 != -ECANCELED)
1630 dir->inode->verify_diri_backtrace(btbl, ret3);
1631 if (r >= 0) r = ret1;
1632 if (r >= 0) r = ret2;
7f7e6c64 1633
7c673cae 1634 if (more) {
7f7e6c64
TL
1635 if (omap_version < dir->get_committed_version()) {
1636 omap.clear();
1637 dir->_omap_fetch(fin, {});
1638 } else {
1639 dir->_omap_fetch_more(omap_version, hdrbl, omap, fin);
1640 }
1641 return;
7c673cae 1642 }
7f7e6c64
TL
1643
1644 dir->_omap_fetched(hdrbl, omap, !fin, r);
1645 if (fin)
1646 fin->complete(r);
1647
7c673cae 1648 }
91327a77
AA
1649 void print(ostream& out) const override {
1650 out << "dirfrag_fetch(" << dir->dirfrag() << ")";
1651 }
7c673cae
FG
1652};
1653
11fdf7f2 1654void CDir::_omap_fetch(MDSContext *c, const std::set<dentry_key_t>& keys)
7c673cae
FG
1655{
1656 C_IO_Dir_OMAP_Fetched *fin = new C_IO_Dir_OMAP_Fetched(this, c);
1657 object_t oid = get_ondisk_object();
1658 object_locator_t oloc(cache->mds->mdsmap->get_metadata_pool());
1659 ObjectOperation rd;
1660 rd.omap_get_header(&fin->hdrbl, &fin->ret1);
1661 if (keys.empty()) {
11fdf7f2
TL
1662 ceph_assert(!c);
1663 rd.omap_get_vals("", "", g_conf()->mds_dir_keys_per_op,
7c673cae
FG
1664 &fin->omap, &fin->more, &fin->ret2);
1665 } else {
11fdf7f2 1666 ceph_assert(c);
7c673cae 1667 std::set<std::string> str_keys;
94b18763 1668 for (auto p : keys) {
7c673cae 1669 string str;
94b18763 1670 p.encode(str);
7c673cae
FG
1671 str_keys.insert(str);
1672 }
1673 rd.omap_get_vals_by_keys(str_keys, &fin->omap, &fin->ret2);
1674 }
1675 // check the correctness of backtrace
11fdf7f2 1676 if (g_conf()->mds_verify_backtrace > 0 && frag == frag_t()) {
7c673cae
FG
1677 rd.getxattr("parent", &fin->btbl, &fin->ret3);
1678 rd.set_last_op_flags(CEPH_OSD_OP_FLAG_FAILOK);
1679 } else {
1680 fin->ret3 = -ECANCELED;
1681 }
1682
1683 cache->mds->objecter->read(oid, oloc, rd, CEPH_NOSNAP, NULL, 0,
1684 new C_OnFinisher(fin, cache->mds->finisher));
1685}
1686
7f7e6c64
TL
1687void CDir::_omap_fetch_more(version_t omap_version, bufferlist& hdrbl,
1688 map<string, bufferlist>& omap, MDSContext *c)
7c673cae
FG
1689{
1690 // we have more omap keys to fetch!
1691 object_t oid = get_ondisk_object();
1692 object_locator_t oloc(cache->mds->mdsmap->get_metadata_pool());
7f7e6c64 1693 auto fin = new C_IO_Dir_OMAP_FetchedMore(this, omap_version, c);
7c673cae
FG
1694 fin->hdrbl.claim(hdrbl);
1695 fin->omap.swap(omap);
1696 ObjectOperation rd;
1697 rd.omap_get_vals(fin->omap.rbegin()->first,
1698 "", /* filter prefix */
11fdf7f2 1699 g_conf()->mds_dir_keys_per_op,
7c673cae
FG
1700 &fin->omap_more,
1701 &fin->more,
1702 &fin->ret);
1703 cache->mds->objecter->read(oid, oloc, rd, CEPH_NOSNAP, NULL, 0,
1704 new C_OnFinisher(fin, cache->mds->finisher));
1705}
1706
1707CDentry *CDir::_load_dentry(
11fdf7f2
TL
1708 std::string_view key,
1709 std::string_view dname,
7c673cae
FG
1710 const snapid_t last,
1711 bufferlist &bl,
1712 const int pos,
1713 const std::set<snapid_t> *snaps,
f91f0fd5 1714 double rand_threshold,
28e407b8 1715 bool *force_dirty)
7c673cae 1716{
11fdf7f2 1717 auto q = bl.cbegin();
7c673cae
FG
1718
1719 snapid_t first;
11fdf7f2 1720 decode(first, q);
7c673cae
FG
1721
1722 // marker
1723 char type;
11fdf7f2 1724 decode(type, q);
7c673cae
FG
1725
1726 dout(20) << "_fetched pos " << pos << " marker '" << type << "' dname '" << dname
1727 << " [" << first << "," << last << "]"
1728 << dendl;
1729
1730 bool stale = false;
1731 if (snaps && last != CEPH_NOSNAP) {
1732 set<snapid_t>::const_iterator p = snaps->lower_bound(first);
1733 if (p == snaps->end() || *p > last) {
1734 dout(10) << " skipping stale dentry on [" << first << "," << last << "]" << dendl;
1735 stale = true;
1736 }
1737 }
1738
1739 /*
1740 * look for existing dentry for _last_ snap, because unlink +
1741 * create may leave a "hole" (epochs during which the dentry
1742 * doesn't exist) but for which no explicit negative dentry is in
1743 * the cache.
1744 */
1745 CDentry *dn;
1746 if (stale)
1747 dn = lookup_exact_snap(dname, last);
1748 else
1749 dn = lookup(dname, last);
1750
1751 if (type == 'L') {
1752 // hard link
1753 inodeno_t ino;
1754 unsigned char d_type;
11fdf7f2
TL
1755 decode(ino, q);
1756 decode(d_type, q);
7c673cae
FG
1757
1758 if (stale) {
1759 if (!dn) {
94b18763 1760 stale_items.insert(mempool::mds_co::string(key));
7c673cae
FG
1761 *force_dirty = true;
1762 }
1763 return dn;
1764 }
1765
1766 if (dn) {
28e407b8
AA
1767 CDentry::linkage_t *dnl = dn->get_linkage();
1768 dout(12) << "_fetched had " << (dnl->is_null() ? "NEG" : "") << " dentry " << *dn << dendl;
1769 if (committed_version == 0 &&
1770 dnl->is_remote() &&
1771 dn->is_dirty() &&
1772 ino == dnl->get_remote_ino() &&
1773 d_type == dnl->get_remote_d_type()) {
1774 // see comment below
1775 dout(10) << "_fetched had underwater dentry " << *dn << ", marking clean" << dendl;
1776 dn->mark_clean();
7c673cae
FG
1777 }
1778 } else {
1779 // (remote) link
1780 dn = add_remote_dentry(dname, ino, d_type, first, last);
1781
1782 // link to inode?
1783 CInode *in = cache->get_inode(ino); // we may or may not have it.
1784 if (in) {
1785 dn->link_remote(dn->get_linkage(), in);
1786 dout(12) << "_fetched got remote link " << ino << " which we have " << *in << dendl;
1787 } else {
11fdf7f2 1788 dout(12) << "_fetched got remote link " << ino << " (don't have it)" << dendl;
7c673cae
FG
1789 }
1790 }
1791 }
1792 else if (type == 'I') {
1793 // inode
1794
1795 // Load inode data before looking up or constructing CInode
1796 InodeStore inode_data;
1797 inode_data.decode_bare(q);
1798
1799 if (stale) {
1800 if (!dn) {
94b18763 1801 stale_items.insert(mempool::mds_co::string(key));
7c673cae
FG
1802 *force_dirty = true;
1803 }
1804 return dn;
1805 }
1806
1807 bool undef_inode = false;
1808 if (dn) {
28e407b8
AA
1809 CDentry::linkage_t *dnl = dn->get_linkage();
1810 dout(12) << "_fetched had " << (dnl->is_null() ? "NEG" : "") << " dentry " << *dn << dendl;
1811
1812 if (dnl->is_primary()) {
1813 CInode *in = dnl->get_inode();
1814 if (in->state_test(CInode::STATE_REJOINUNDEF)) {
1815 undef_inode = true;
1816 } else if (committed_version == 0 &&
1817 dn->is_dirty() &&
1818 inode_data.inode.ino == in->ino() &&
1819 inode_data.inode.version == in->get_version()) {
1820 /* clean underwater item?
1821 * Underwater item is something that is dirty in our cache from
1822 * journal replay, but was previously flushed to disk before the
1823 * mds failed.
1824 *
1825 * We only do this is committed_version == 0. that implies either
1826 * - this is a fetch after from a clean/empty CDir is created
1827 * (and has no effect, since the dn won't exist); or
1828 * - this is a fetch after _recovery_, which is what we're worried
1829 * about. Items that are marked dirty from the journal should be
1830 * marked clean if they appear on disk.
1831 */
1832 dout(10) << "_fetched had underwater dentry " << *dn << ", marking clean" << dendl;
1833 dn->mark_clean();
1834 dout(10) << "_fetched had underwater inode " << *dnl->get_inode() << ", marking clean" << dendl;
1835 in->mark_clean();
1836 }
1837 }
7c673cae
FG
1838 }
1839
1840 if (!dn || undef_inode) {
1841 // add inode
1842 CInode *in = cache->get_inode(inode_data.inode.ino, last);
1843 if (!in || undef_inode) {
1844 if (undef_inode && in)
1845 in->first = first;
1846 else
1847 in = new CInode(cache, true, first, last);
1848
1849 in->inode = inode_data.inode;
1850 // symlink?
1851 if (in->is_symlink())
1852 in->symlink = inode_data.symlink;
1853
1854 in->dirfragtree.swap(inode_data.dirfragtree);
1855 in->xattrs.swap(inode_data.xattrs);
1856 in->old_inodes.swap(inode_data.old_inodes);
1857 if (!in->old_inodes.empty()) {
1858 snapid_t min_first = in->old_inodes.rbegin()->first + 1;
1859 if (min_first > in->first)
1860 in->first = min_first;
1861 }
1862
1863 in->oldest_snap = inode_data.oldest_snap;
1864 in->decode_snap_blob(inode_data.snap_blob);
1865 if (snaps && !in->snaprealm)
1866 in->purge_stale_snap_data(*snaps);
1867
1868 if (!undef_inode) {
1869 cache->add_inode(in); // add
1870 dn = add_primary_dentry(dname, in, first, last); // link
1871 }
1872 dout(12) << "_fetched got " << *dn << " " << *in << dendl;
1873
1874 if (in->inode.is_dirty_rstat())
1875 in->mark_dirty_rstat();
1876
f91f0fd5 1877 in->maybe_ephemeral_rand(true, rand_threshold);
7c673cae
FG
1878 //in->hack_accessed = false;
1879 //in->hack_load_stamp = ceph_clock_now();
1880 //num_new_inodes_loaded++;
11fdf7f2 1881 } else if (g_conf().get_val<bool>("mds_hack_allow_loading_invalid_metadata")) {
94b18763
FG
1882 dout(20) << "hack: adding duplicate dentry for " << *in << dendl;
1883 dn = add_primary_dentry(dname, in, first, last);
7c673cae
FG
1884 } else {
1885 dout(0) << "_fetched badness: got (but i already had) " << *in
1886 << " mode " << in->inode.mode
1887 << " mtime " << in->inode.mtime << dendl;
1888 string dirpath, inopath;
1889 this->inode->make_path_string(dirpath);
1890 in->make_path_string(inopath);
1891 cache->mds->clog->error() << "loaded dup inode " << inode_data.inode.ino
1892 << " [" << first << "," << last << "] v" << inode_data.inode.version
1893 << " at " << dirpath << "/" << dname
1894 << ", but inode " << in->vino() << " v" << in->inode.version
1895 << " already exists at " << inopath;
1896 return dn;
1897 }
1898 }
1899 } else {
1900 std::ostringstream oss;
1901 oss << "Invalid tag char '" << type << "' pos " << pos;
1902 throw buffer::malformed_input(oss.str());
1903 }
1904
1905 return dn;
1906}
1907
1908void CDir::_omap_fetched(bufferlist& hdrbl, map<string, bufferlist>& omap,
1909 bool complete, int r)
1910{
1911 LogChannelRef clog = cache->mds->clog;
1912 dout(10) << "_fetched header " << hdrbl.length() << " bytes "
1913 << omap.size() << " keys for " << *this << dendl;
1914
11fdf7f2
TL
1915 ceph_assert(r == 0 || r == -ENOENT || r == -ENODATA);
1916 ceph_assert(is_auth());
1917 ceph_assert(!is_frozen());
7c673cae
FG
1918
1919 if (hdrbl.length() == 0) {
1920 dout(0) << "_fetched missing object for " << *this << dendl;
1921
1922 clog->error() << "dir " << dirfrag() << " object missing on disk; some "
1923 "files may be lost (" << get_path() << ")";
1924
1925 go_bad(complete);
1926 return;
1927 }
1928
1929 fnode_t got_fnode;
1930 {
11fdf7f2 1931 auto p = hdrbl.cbegin();
7c673cae 1932 try {
11fdf7f2 1933 decode(got_fnode, p);
7c673cae
FG
1934 } catch (const buffer::error &err) {
1935 derr << "Corrupt fnode in dirfrag " << dirfrag()
1936 << ": " << err << dendl;
1937 clog->warn() << "Corrupt fnode header in " << dirfrag() << ": "
1938 << err << " (" << get_path() << ")";
1939 go_bad(complete);
1940 return;
1941 }
1942 if (!p.end()) {
1943 clog->warn() << "header buffer of dir " << dirfrag() << " has "
1944 << hdrbl.length() - p.get_off() << " extra bytes ("
1945 << get_path() << ")";
1946 go_bad(complete);
1947 return;
1948 }
1949 }
1950
1951 dout(10) << "_fetched version " << got_fnode.version << dendl;
1952
1953 // take the loaded fnode?
1954 // only if we are a fresh CDir* with no prior state.
1955 if (get_version() == 0) {
11fdf7f2
TL
1956 ceph_assert(!is_projected());
1957 ceph_assert(!state_test(STATE_COMMITTING));
7c673cae
FG
1958 fnode = got_fnode;
1959 projected_version = committing_version = committed_version = got_fnode.version;
1960
1961 if (state_test(STATE_REJOINUNDEF)) {
11fdf7f2 1962 ceph_assert(cache->mds->is_rejoin());
7c673cae
FG
1963 state_clear(STATE_REJOINUNDEF);
1964 cache->opened_undef_dirfrag(this);
1965 }
1966 }
1967
1968 list<CInode*> undef_inodes;
1969
1970 // purge stale snaps?
1971 // only if we have past_parents open!
1972 bool force_dirty = false;
1973 const set<snapid_t> *snaps = NULL;
1974 SnapRealm *realm = inode->find_snaprealm();
1975 if (!realm->have_past_parents_open()) {
1976 dout(10) << " no snap purge, one or more past parents NOT open" << dendl;
1977 } else if (fnode.snap_purged_thru < realm->get_last_destroyed()) {
1978 snaps = &realm->get_snaps();
1979 dout(10) << " snap_purged_thru " << fnode.snap_purged_thru
1980 << " < " << realm->get_last_destroyed()
1981 << ", snap purge based on " << *snaps << dendl;
1982 if (get_num_snap_items() == 0) {
1983 fnode.snap_purged_thru = realm->get_last_destroyed();
1984 force_dirty = true;
1985 }
1986 }
1987
1988 unsigned pos = omap.size() - 1;
f91f0fd5 1989 double rand_threshold = get_inode()->get_ephemeral_rand();
7c673cae
FG
1990 for (map<string, bufferlist>::reverse_iterator p = omap.rbegin();
1991 p != omap.rend();
1992 ++p, --pos) {
1993 string dname;
1994 snapid_t last;
1995 dentry_key_t::decode_helper(p->first, dname, last);
1996
1997 CDentry *dn = NULL;
1998 try {
1999 dn = _load_dentry(
2000 p->first, dname, last, p->second, pos, snaps,
f91f0fd5 2001 rand_threshold, &force_dirty);
7c673cae
FG
2002 } catch (const buffer::error &err) {
2003 cache->mds->clog->warn() << "Corrupt dentry '" << dname << "' in "
2004 "dir frag " << dirfrag() << ": "
2005 << err << "(" << get_path() << ")";
2006
2007 // Remember that this dentry is damaged. Subsequent operations
2008 // that try to act directly on it will get their EIOs, but this
2009 // dirfrag as a whole will continue to look okay (minus the
2010 // mysteriously-missing dentry)
2011 go_bad_dentry(last, dname);
2012
2013 // Anyone who was WAIT_DENTRY for this guy will get kicked
2014 // to RetryRequest, and hit the DamageTable-interrogating path.
2015 // Stats will now be bogus because we will think we're complete,
2016 // but have 1 or more missing dentries.
2017 continue;
2018 }
2019
28e407b8
AA
2020 if (!dn)
2021 continue;
7c673cae 2022
28e407b8
AA
2023 CDentry::linkage_t *dnl = dn->get_linkage();
2024 if (dnl->is_primary() && dnl->get_inode()->state_test(CInode::STATE_REJOINUNDEF))
2025 undef_inodes.push_back(dnl->get_inode());
7c673cae 2026
11fdf7f2 2027 if (wanted_items.count(mempool::mds_co::string(dname)) > 0 || !complete) {
28e407b8
AA
2028 dout(10) << " touching wanted dn " << *dn << dendl;
2029 inode->mdcache->touch_dentry(dn);
7c673cae
FG
2030 }
2031 }
2032
2033 //cache->mds->logger->inc("newin", num_new_inodes_loaded);
2034
2035 // mark complete, !fetching
2036 if (complete) {
2037 wanted_items.clear();
2038 mark_complete();
2039 state_clear(STATE_FETCHING);
2040
2041 if (scrub_infop && scrub_infop->need_scrub_local) {
2042 scrub_infop->need_scrub_local = false;
2043 scrub_local();
2044 }
2045 }
2046
2047 // open & force frags
2048 while (!undef_inodes.empty()) {
2049 CInode *in = undef_inodes.front();
2050 undef_inodes.pop_front();
2051 in->state_clear(CInode::STATE_REJOINUNDEF);
2052 cache->opened_undef_inode(in);
2053 }
2054
2055 // dirty myself to remove stale snap dentries
2056 if (force_dirty && !inode->mdcache->is_readonly())
2057 log_mark_dirty();
2058
2059 auth_unpin(this);
2060
2061 if (complete) {
2062 // kick waiters
2063 finish_waiting(WAIT_COMPLETE, 0);
2064 }
2065}
2066
11fdf7f2 2067void CDir::go_bad_dentry(snapid_t last, std::string_view dname)
7c673cae 2068{
94b18763
FG
2069 dout(10) << __func__ << " " << dname << dendl;
2070 std::string path(get_path());
2071 path += "/";
11fdf7f2 2072 path += dname;
7c673cae 2073 const bool fatal = cache->mds->damage_table.notify_dentry(
94b18763 2074 inode->ino(), frag, last, dname, path);
7c673cae
FG
2075 if (fatal) {
2076 cache->mds->damaged();
2077 ceph_abort(); // unreachable, damaged() respawns us
2078 }
2079}
2080
2081void CDir::go_bad(bool complete)
2082{
11fdf7f2 2083 dout(10) << __func__ << " " << frag << dendl;
7c673cae
FG
2084 const bool fatal = cache->mds->damage_table.notify_dirfrag(
2085 inode->ino(), frag, get_path());
2086 if (fatal) {
2087 cache->mds->damaged();
2088 ceph_abort(); // unreachable, damaged() respawns us
2089 }
2090
f91f0fd5
TL
2091 if (complete) {
2092 if (get_version() == 0)
2093 set_version(1);
2094
2095 state_set(STATE_BADFRAG);
2096 mark_complete();
2097 }
2098
2099 state_clear(STATE_FETCHING);
2100 auth_unpin(this);
2101 finish_waiting(WAIT_COMPLETE, -EIO);
7c673cae
FG
2102}
2103
2104// -----------------------
2105// COMMIT
2106
2107/**
2108 * commit
2109 *
2110 * @param want - min version i want committed
2111 * @param c - callback for completion
2112 */
11fdf7f2 2113void CDir::commit(version_t want, MDSContext *c, bool ignore_authpinnability, int op_prio)
7c673cae
FG
2114{
2115 dout(10) << "commit want " << want << " on " << *this << dendl;
2116 if (want == 0) want = get_version();
2117
2118 // preconditions
11fdf7f2
TL
2119 ceph_assert(want <= get_version() || get_version() == 0); // can't commit the future
2120 ceph_assert(want > committed_version); // the caller is stupid
2121 ceph_assert(is_auth());
2122 ceph_assert(ignore_authpinnability || can_auth_pin());
7c673cae 2123
7c673cae
FG
2124 // note: queue up a noop if necessary, so that we always
2125 // get an auth_pin.
2126 if (!c)
2127 c = new C_MDSInternalNoop;
2128
2129 // auth_pin on first waiter
2130 if (waiting_for_commit.empty())
2131 auth_pin(this);
2132 waiting_for_commit[want].push_back(c);
2133
2134 // ok.
2135 _commit(want, op_prio);
2136}
2137
2138class C_IO_Dir_Committed : public CDirIOContext {
2139 version_t version;
2140public:
2141 C_IO_Dir_Committed(CDir *d, version_t v) : CDirIOContext(d), version(v) { }
2142 void finish(int r) override {
2143 dir->_committed(r, version);
2144 }
91327a77
AA
2145 void print(ostream& out) const override {
2146 out << "dirfrag_commit(" << dir->dirfrag() << ")";
2147 }
7c673cae
FG
2148};
2149
2150/**
2151 * Flush out the modified dentries in this dir. Keep the bufferlist
2152 * below max_write_size;
2153 */
2154void CDir::_omap_commit(int op_prio)
2155{
11fdf7f2 2156 dout(10) << __func__ << dendl;
7c673cae
FG
2157
2158 unsigned max_write_size = cache->max_dir_commit_size;
2159 unsigned write_size = 0;
2160
2161 if (op_prio < 0)
2162 op_prio = CEPH_MSG_PRIO_DEFAULT;
2163
2164 // snap purge?
2165 const set<snapid_t> *snaps = NULL;
2166 SnapRealm *realm = inode->find_snaprealm();
2167 if (!realm->have_past_parents_open()) {
2168 dout(10) << " no snap purge, one or more past parents NOT open" << dendl;
2169 } else if (fnode.snap_purged_thru < realm->get_last_destroyed()) {
2170 snaps = &realm->get_snaps();
2171 dout(10) << " snap_purged_thru " << fnode.snap_purged_thru
2172 << " < " << realm->get_last_destroyed()
2173 << ", snap purge based on " << *snaps << dendl;
2174 // fnode.snap_purged_thru = realm->get_last_destroyed();
2175 }
2176
2177 set<string> to_remove;
2178 map<string, bufferlist> to_set;
2179
2180 C_GatherBuilder gather(g_ceph_context,
2181 new C_OnFinisher(new C_IO_Dir_Committed(this,
2182 get_version()),
2183 cache->mds->finisher));
2184
2185 SnapContext snapc;
2186 object_t oid = get_ondisk_object();
2187 object_locator_t oloc(cache->mds->mdsmap->get_metadata_pool());
2188
2189 if (!stale_items.empty()) {
94b18763 2190 for (const auto &p : stale_items) {
11fdf7f2 2191 to_remove.insert(std::string(p));
94b18763 2192 write_size += p.length();
7c673cae
FG
2193 }
2194 stale_items.clear();
2195 }
2196
b32b8144 2197 auto write_one = [&](CDentry *dn) {
7c673cae
FG
2198 string key;
2199 dn->key().encode(key);
2200
2201 if (dn->last != CEPH_NOSNAP &&
2202 snaps && try_trim_snap_dentry(dn, *snaps)) {
2203 dout(10) << " rm " << key << dendl;
2204 write_size += key.length();
2205 to_remove.insert(key);
b32b8144 2206 return;
7c673cae
FG
2207 }
2208
7c673cae 2209 if (dn->get_linkage()->is_null()) {
94b18763 2210 dout(10) << " rm " << dn->get_name() << " " << *dn << dendl;
7c673cae
FG
2211 write_size += key.length();
2212 to_remove.insert(key);
2213 } else {
94b18763 2214 dout(10) << " set " << dn->get_name() << " " << *dn << dendl;
7c673cae
FG
2215 bufferlist dnbl;
2216 _encode_dentry(dn, dnbl, snaps);
2217 write_size += key.length() + dnbl.length();
2218 to_set[key].swap(dnbl);
2219 }
2220
2221 if (write_size >= max_write_size) {
2222 ObjectOperation op;
2223 op.priority = op_prio;
2224
2225 // don't create new dirfrag blindly
f91f0fd5 2226 if (!is_new())
7c673cae
FG
2227 op.stat(NULL, (ceph::real_time*) NULL, NULL);
2228
2229 if (!to_set.empty())
2230 op.omap_set(to_set);
2231 if (!to_remove.empty())
2232 op.omap_rm_keys(to_remove);
2233
2234 cache->mds->objecter->mutate(oid, oloc, op, snapc,
2235 ceph::real_clock::now(),
2236 0, gather.new_sub());
2237
2238 write_size = 0;
2239 to_set.clear();
2240 to_remove.clear();
2241 }
b32b8144
FG
2242 };
2243
f91f0fd5
TL
2244 if (state_test(CDir::STATE_FRAGMENTING) && is_new()) {
2245 assert(committed_version == 0);
b32b8144
FG
2246 for (auto p = items.begin(); p != items.end(); ) {
2247 CDentry *dn = p->second;
2248 ++p;
f91f0fd5 2249 if (dn->get_linkage()->is_null())
b32b8144
FG
2250 continue;
2251 write_one(dn);
2252 }
2253 } else {
2254 for (auto p = dirty_dentries.begin(); !p.end(); ) {
2255 CDentry *dn = *p;
2256 ++p;
2257 write_one(dn);
2258 }
7c673cae
FG
2259 }
2260
2261 ObjectOperation op;
2262 op.priority = op_prio;
2263
2264 // don't create new dirfrag blindly
f91f0fd5 2265 if (!is_new())
7c673cae
FG
2266 op.stat(NULL, (ceph::real_time*)NULL, NULL);
2267
2268 /*
2269 * save the header at the last moment.. If we were to send it off before other
2270 * updates, but die before sending them all, we'd think that the on-disk state
2271 * was fully committed even though it wasn't! However, since the messages are
2272 * strictly ordered between the MDS and the OSD, and since messages to a given
2273 * PG are strictly ordered, if we simply send the message containing the header
2274 * off last, we cannot get our header into an incorrect state.
2275 */
2276 bufferlist header;
11fdf7f2 2277 encode(fnode, header);
7c673cae
FG
2278 op.omap_set_header(header);
2279
2280 if (!to_set.empty())
2281 op.omap_set(to_set);
2282 if (!to_remove.empty())
2283 op.omap_rm_keys(to_remove);
2284
2285 cache->mds->objecter->mutate(oid, oloc, op, snapc,
2286 ceph::real_clock::now(),
2287 0, gather.new_sub());
2288
2289 gather.activate();
2290}
2291
2292void CDir::_encode_dentry(CDentry *dn, bufferlist& bl,
2293 const set<snapid_t> *snaps)
2294{
2295 // clear dentry NEW flag, if any. we can no longer silently drop it.
2296 dn->clear_new();
2297
11fdf7f2 2298 encode(dn->first, bl);
7c673cae
FG
2299
2300 // primary or remote?
2301 if (dn->linkage.is_remote()) {
2302 inodeno_t ino = dn->linkage.get_remote_ino();
2303 unsigned char d_type = dn->linkage.get_remote_d_type();
94b18763 2304 dout(14) << " pos " << bl.length() << " dn '" << dn->get_name() << "' remote ino " << ino << dendl;
7c673cae
FG
2305
2306 // marker, name, ino
2307 bl.append('L'); // remote link
11fdf7f2
TL
2308 encode(ino, bl);
2309 encode(d_type, bl);
7c673cae
FG
2310 } else if (dn->linkage.is_primary()) {
2311 // primary link
2312 CInode *in = dn->linkage.get_inode();
11fdf7f2 2313 ceph_assert(in);
7c673cae 2314
94b18763 2315 dout(14) << " pos " << bl.length() << " dn '" << dn->get_name() << "' inode " << *in << dendl;
7c673cae
FG
2316
2317 // marker, name, inode, [symlink string]
2318 bl.append('I'); // inode
2319
2320 if (in->is_multiversion()) {
2321 if (!in->snaprealm) {
2322 if (snaps)
2323 in->purge_stale_snap_data(*snaps);
2324 } else if (in->snaprealm->have_past_parents_open()) {
2325 in->purge_stale_snap_data(in->snaprealm->get_snaps());
2326 }
2327 }
2328
2329 bufferlist snap_blob;
2330 in->encode_snap_blob(snap_blob);
2331 in->encode_bare(bl, cache->mds->mdsmap->get_up_features(), &snap_blob);
2332 } else {
11fdf7f2 2333 ceph_assert(!dn->linkage.is_null());
7c673cae
FG
2334 }
2335}
2336
2337void CDir::_commit(version_t want, int op_prio)
2338{
2339 dout(10) << "_commit want " << want << " on " << *this << dendl;
2340
2341 // we can't commit things in the future.
2342 // (even the projected future.)
11fdf7f2 2343 ceph_assert(want <= get_version() || get_version() == 0);
7c673cae
FG
2344
2345 // check pre+postconditions.
11fdf7f2 2346 ceph_assert(is_auth());
7c673cae
FG
2347
2348 // already committed?
2349 if (committed_version >= want) {
2350 dout(10) << "already committed " << committed_version << " >= " << want << dendl;
2351 return;
2352 }
2353 // already committing >= want?
2354 if (committing_version >= want) {
2355 dout(10) << "already committing " << committing_version << " >= " << want << dendl;
11fdf7f2 2356 ceph_assert(state_test(STATE_COMMITTING));
7c673cae
FG
2357 return;
2358 }
2359
2360 // alrady committed an older version?
2361 if (committing_version > committed_version) {
2362 dout(10) << "already committing older " << committing_version << ", waiting for that to finish" << dendl;
2363 return;
2364 }
2365
2366 // commit.
2367 committing_version = get_version();
2368
2369 // mark committing (if not already)
2370 if (!state_test(STATE_COMMITTING)) {
2371 dout(10) << "marking committing" << dendl;
2372 state_set(STATE_COMMITTING);
2373 }
2374
2375 if (cache->mds->logger) cache->mds->logger->inc(l_mds_dir_commit);
2376
2377 _omap_commit(op_prio);
2378}
2379
2380
2381/**
2382 * _committed
2383 *
2384 * @param v version i just committed
2385 */
2386void CDir::_committed(int r, version_t v)
2387{
2388 if (r < 0) {
2389 // the directory could be partly purged during MDS failover
2390 if (r == -ENOENT && committed_version == 0 &&
31f18b77 2391 !inode->is_base() && get_parent_dir()->inode->is_stray()) {
7c673cae 2392 r = 0;
31f18b77
FG
2393 if (inode->snaprealm)
2394 inode->state_set(CInode::STATE_MISSINGOBJS);
7c673cae
FG
2395 }
2396 if (r < 0) {
2397 dout(1) << "commit error " << r << " v " << v << dendl;
2398 cache->mds->clog->error() << "failed to commit dir " << dirfrag() << " object,"
2399 << " errno " << r;
2400 cache->mds->handle_write_error(r);
2401 return;
2402 }
2403 }
2404
2405 dout(10) << "_committed v " << v << " on " << *this << dendl;
11fdf7f2 2406 ceph_assert(is_auth());
7c673cae
FG
2407
2408 bool stray = inode->is_stray();
2409
2410 // take note.
11fdf7f2
TL
2411 ceph_assert(v > committed_version);
2412 ceph_assert(v <= committing_version);
7c673cae
FG
2413 committed_version = v;
2414
2415 // _all_ commits done?
2416 if (committing_version == committed_version)
2417 state_clear(CDir::STATE_COMMITTING);
2418
2419 // _any_ commit, even if we've been redirtied, means we're no longer new.
2420 item_new.remove_myself();
2421
2422 // dir clean?
2423 if (committed_version == get_version())
2424 mark_clean();
2425
2426 // dentries clean?
b32b8144
FG
2427 for (auto p = dirty_dentries.begin(); !p.end(); ) {
2428 CDentry *dn = *p;
2429 ++p;
7c673cae
FG
2430
2431 // inode?
2432 if (dn->linkage.is_primary()) {
2433 CInode *in = dn->linkage.get_inode();
11fdf7f2
TL
2434 ceph_assert(in);
2435 ceph_assert(in->is_auth());
7c673cae
FG
2436
2437 if (committed_version >= in->get_version()) {
2438 if (in->is_dirty()) {
2439 dout(15) << " dir " << committed_version << " >= inode " << in->get_version() << " now clean " << *in << dendl;
2440 in->mark_clean();
2441 }
2442 } else {
2443 dout(15) << " dir " << committed_version << " < inode " << in->get_version() << " still dirty " << *in << dendl;
11fdf7f2 2444 ceph_assert(in->is_dirty() || in->last < CEPH_NOSNAP); // special case for cow snap items (not predirtied)
7c673cae
FG
2445 }
2446 }
2447
2448 // dentry
2449 if (committed_version >= dn->get_version()) {
b32b8144
FG
2450 dout(15) << " dir " << committed_version << " >= dn " << dn->get_version() << " now clean " << *dn << dendl;
2451 dn->mark_clean();
7c673cae 2452
b32b8144
FG
2453 // drop clean null stray dentries immediately
2454 if (stray &&
2455 dn->get_num_ref() == 0 &&
2456 !dn->is_projected() &&
2457 dn->get_linkage()->is_null())
2458 remove_dentry(dn);
7c673cae
FG
2459 } else {
2460 dout(15) << " dir " << committed_version << " < dn " << dn->get_version() << " still dirty " << *dn << dendl;
11fdf7f2 2461 ceph_assert(dn->is_dirty());
7c673cae
FG
2462 }
2463 }
2464
2465 // finishers?
2466 bool were_waiters = !waiting_for_commit.empty();
2467
94b18763
FG
2468 auto it = waiting_for_commit.begin();
2469 while (it != waiting_for_commit.end()) {
2470 auto _it = it;
2471 ++_it;
2472 if (it->first > committed_version) {
2473 dout(10) << " there are waiters for " << it->first << ", committing again" << dendl;
2474 _commit(it->first, -1);
7c673cae
FG
2475 break;
2476 }
11fdf7f2 2477 MDSContext::vec t;
94b18763
FG
2478 for (const auto &waiter : it->second)
2479 t.push_back(waiter);
2480 cache->mds->queue_waiters(t);
2481 waiting_for_commit.erase(it);
2482 it = _it;
7c673cae
FG
2483 }
2484
2485 // try drop dentries in this dirfrag if it's about to be purged
31f18b77
FG
2486 if (!inode->is_base() && get_parent_dir()->inode->is_stray() &&
2487 inode->snaprealm)
7c673cae
FG
2488 cache->maybe_eval_stray(inode, true);
2489
2490 // unpin if we kicked the last waiter.
2491 if (were_waiters &&
2492 waiting_for_commit.empty())
2493 auth_unpin(this);
2494}
2495
2496
2497
2498
2499// IMPORT/EXPORT
2500
2501void CDir::encode_export(bufferlist& bl)
2502{
9f95a23c 2503 ENCODE_START(1, 1, bl);
11fdf7f2
TL
2504 ceph_assert(!is_projected());
2505 encode(first, bl);
2506 encode(fnode, bl);
2507 encode(dirty_old_rstat, bl);
2508 encode(committed_version, bl);
7c673cae 2509
11fdf7f2
TL
2510 encode(state, bl);
2511 encode(dir_rep, bl);
7c673cae 2512
11fdf7f2
TL
2513 encode(pop_me, bl);
2514 encode(pop_auth_subtree, bl);
7c673cae 2515
11fdf7f2
TL
2516 encode(dir_rep_by, bl);
2517 encode(get_replicas(), bl);
7c673cae
FG
2518
2519 get(PIN_TEMPEXPORTING);
9f95a23c 2520 ENCODE_FINISH(bl);
7c673cae
FG
2521}
2522
11fdf7f2 2523void CDir::finish_export()
7c673cae
FG
2524{
2525 state &= MASK_STATE_EXPORT_KEPT;
11fdf7f2
TL
2526 pop_nested.sub(pop_auth_subtree);
2527 pop_auth_subtree_nested.sub(pop_auth_subtree);
2528 pop_me.zero();
2529 pop_auth_subtree.zero();
7c673cae
FG
2530 put(PIN_TEMPEXPORTING);
2531 dirty_old_rstat.clear();
2532}
2533
11fdf7f2 2534void CDir::decode_import(bufferlist::const_iterator& blp, LogSegment *ls)
7c673cae 2535{
9f95a23c 2536 DECODE_START(1, blp);
11fdf7f2
TL
2537 decode(first, blp);
2538 decode(fnode, blp);
2539 decode(dirty_old_rstat, blp);
7c673cae 2540 projected_version = fnode.version;
11fdf7f2 2541 decode(committed_version, blp);
7c673cae
FG
2542 committing_version = committed_version;
2543
2544 unsigned s;
11fdf7f2 2545 decode(s, blp);
7c673cae
FG
2546 state &= MASK_STATE_IMPORT_KEPT;
2547 state_set(STATE_AUTH | (s & MASK_STATE_EXPORTED));
2548
2549 if (is_dirty()) {
2550 get(PIN_DIRTY);
2551 _mark_dirty(ls);
2552 }
2553
11fdf7f2 2554 decode(dir_rep, blp);
7c673cae 2555
11fdf7f2
TL
2556 decode(pop_me, blp);
2557 decode(pop_auth_subtree, blp);
2558 pop_nested.add(pop_auth_subtree);
2559 pop_auth_subtree_nested.add(pop_auth_subtree);
7c673cae 2560
11fdf7f2
TL
2561 decode(dir_rep_by, blp);
2562 decode(get_replicas(), blp);
181888fb 2563 if (is_replicated()) get(PIN_REPLICATED);
7c673cae
FG
2564
2565 replica_nonce = 0; // no longer defined
2566
2567 // did we import some dirty scatterlock data?
2568 if (dirty_old_rstat.size() ||
2569 !(fnode.rstat == fnode.accounted_rstat)) {
2570 cache->mds->locker->mark_updated_scatterlock(&inode->nestlock);
2571 ls->dirty_dirfrag_nest.push_back(&inode->item_dirty_dirfrag_nest);
2572 }
2573 if (!(fnode.fragstat == fnode.accounted_fragstat)) {
2574 cache->mds->locker->mark_updated_scatterlock(&inode->filelock);
2575 ls->dirty_dirfrag_dir.push_back(&inode->item_dirty_dirfrag_dir);
2576 }
2577 if (is_dirty_dft()) {
2578 if (inode->dirfragtreelock.get_state() != LOCK_MIX &&
2579 inode->dirfragtreelock.is_stable()) {
2580 // clear stale dirtydft
2581 state_clear(STATE_DIRTYDFT);
2582 } else {
2583 cache->mds->locker->mark_updated_scatterlock(&inode->dirfragtreelock);
2584 ls->dirty_dirfrag_dirfragtree.push_back(&inode->item_dirty_dirfrag_dirfragtree);
2585 }
2586 }
9f95a23c 2587 DECODE_FINISH(blp);
7c673cae
FG
2588}
2589
11fdf7f2
TL
2590void CDir::abort_import()
2591{
2592 ceph_assert(is_auth());
2593 state_clear(CDir::STATE_AUTH);
2594 remove_bloom();
2595 clear_replica_map();
2596 set_replica_nonce(CDir::EXPORT_NONCE);
2597 if (is_dirty())
2598 mark_clean();
7c673cae 2599
11fdf7f2
TL
2600 pop_nested.sub(pop_auth_subtree);
2601 pop_auth_subtree_nested.sub(pop_auth_subtree);
2602 pop_me.zero();
2603 pop_auth_subtree.zero();
2604}
7c673cae 2605
11fdf7f2
TL
2606void CDir::encode_dirstat(bufferlist& bl, const session_info_t& info, const DirStat& ds) {
2607 if (info.has_feature(CEPHFS_FEATURE_REPLY_ENCODING)) {
2608 ENCODE_START(1, 1, bl);
2609 encode(ds.frag, bl);
2610 encode(ds.auth, bl);
2611 encode(ds.dist, bl);
2612 ENCODE_FINISH(bl);
2613 }
2614 else {
2615 encode(ds.frag, bl);
2616 encode(ds.auth, bl);
2617 encode(ds.dist, bl);
2618 }
2619}
7c673cae
FG
2620
2621/********************************
2622 * AUTHORITY
2623 */
2624
2625/*
2626 * if dir_auth.first == parent, auth is same as inode.
2627 * unless .second != unknown, in which case that sticks.
2628 */
2629mds_authority_t CDir::authority() const
2630{
2631 if (is_subtree_root())
2632 return dir_auth;
2633 else
2634 return inode->authority();
2635}
2636
2637/** is_subtree_root()
2638 * true if this is an auth delegation point.
2639 * that is, dir_auth != default (parent,unknown)
2640 *
2641 * some key observations:
2642 * if i am auth:
2643 * - any region bound will be an export, or frozen.
2644 *
2645 * note that this DOES heed dir_auth.pending
2646 */
2647/*
2648bool CDir::is_subtree_root()
2649{
2650 if (dir_auth == CDIR_AUTH_DEFAULT) {
2651 //dout(10) << "is_subtree_root false " << dir_auth << " != " << CDIR_AUTH_DEFAULT
2652 //<< " on " << ino() << dendl;
2653 return false;
2654 } else {
2655 //dout(10) << "is_subtree_root true " << dir_auth << " != " << CDIR_AUTH_DEFAULT
2656 //<< " on " << ino() << dendl;
2657 return true;
2658 }
2659}
2660*/
2661
2662/** contains(x)
2663 * true if we are x, or an ancestor of x
2664 */
2665bool CDir::contains(CDir *x)
2666{
2667 while (1) {
2668 if (x == this)
2669 return true;
2670 x = x->get_inode()->get_projected_parent_dir();
2671 if (x == 0)
2672 return false;
2673 }
2674}
2675
2676
2677
2678/** set_dir_auth
2679 */
11fdf7f2 2680void CDir::set_dir_auth(const mds_authority_t &a)
7c673cae
FG
2681{
2682 dout(10) << "setting dir_auth=" << a
2683 << " from " << dir_auth
2684 << " on " << *this << dendl;
2685
2686 bool was_subtree = is_subtree_root();
2687 bool was_ambiguous = dir_auth.second >= 0;
2688
2689 // set it.
2690 dir_auth = a;
2691
2692 // new subtree root?
2693 if (!was_subtree && is_subtree_root()) {
2694 dout(10) << " new subtree root, adjusting auth_pins" << dendl;
1adf2230 2695
11fdf7f2
TL
2696 if (freeze_tree_state) {
2697 // only by CDir::_freeze_tree()
2698 ceph_assert(is_freezing_tree_root());
2699 }
1adf2230 2700
11fdf7f2 2701 inode->num_subtree_roots++;
7c673cae
FG
2702
2703 // unpin parent of frozen dir/tree?
224ce89b 2704 if (inode->is_auth()) {
11fdf7f2 2705 ceph_assert(!is_frozen_tree_root());
224ce89b
WB
2706 if (is_frozen_dir())
2707 inode->auth_unpin(this);
2708 }
7c673cae
FG
2709 }
2710 if (was_subtree && !is_subtree_root()) {
2711 dout(10) << " old subtree root, adjusting auth_pins" << dendl;
1adf2230
AA
2712
2713 inode->num_subtree_roots--;
7c673cae
FG
2714
2715 // pin parent of frozen dir/tree?
224ce89b 2716 if (inode->is_auth()) {
11fdf7f2 2717 ceph_assert(!is_frozen_tree_root());
224ce89b
WB
2718 if (is_frozen_dir())
2719 inode->auth_pin(this);
2720 }
7c673cae
FG
2721 }
2722
2723 // newly single auth?
2724 if (was_ambiguous && dir_auth.second == CDIR_AUTH_UNKNOWN) {
11fdf7f2 2725 MDSContext::vec ls;
7c673cae
FG
2726 take_waiting(WAIT_SINGLEAUTH, ls);
2727 cache->mds->queue_waiters(ls);
2728 }
2729}
2730
7c673cae
FG
2731/*****************************************
2732 * AUTH PINS and FREEZING
2733 *
2734 * the basic plan is that auth_pins only exist in auth regions, and they
2735 * prevent a freeze (and subsequent auth change).
2736 *
2737 * however, we also need to prevent a parent from freezing if a child is frozen.
2738 * for that reason, the parent inode of a frozen directory is auth_pinned.
2739 *
2740 * the oddity is when the frozen directory is a subtree root. if that's the case,
2741 * the parent inode isn't frozen. which means that when subtree authority is adjusted
2742 * at the bounds, inodes for any frozen bound directories need to get auth_pins at that
2743 * time.
2744 *
2745 */
2746
2747void CDir::auth_pin(void *by)
2748{
2749 if (auth_pins == 0)
2750 get(PIN_AUTHPIN);
2751 auth_pins++;
2752
2753#ifdef MDS_AUTHPIN_SET
2754 auth_pin_set.insert(by);
2755#endif
2756
11fdf7f2 2757 dout(10) << "auth_pin by " << by << " on " << *this << " count now " << auth_pins << dendl;
7c673cae 2758
11fdf7f2
TL
2759 if (freeze_tree_state)
2760 freeze_tree_state->auth_pins += 1;
7c673cae
FG
2761}
2762
2763void CDir::auth_unpin(void *by)
2764{
2765 auth_pins--;
2766
2767#ifdef MDS_AUTHPIN_SET
11fdf7f2
TL
2768 {
2769 auto it = auth_pin_set.find(by);
2770 ceph_assert(it != auth_pin_set.end());
2771 auth_pin_set.erase(it);
2772 }
7c673cae
FG
2773#endif
2774 if (auth_pins == 0)
2775 put(PIN_AUTHPIN);
2776
11fdf7f2
TL
2777 dout(10) << "auth_unpin by " << by << " on " << *this << " count now " << auth_pins << dendl;
2778 ceph_assert(auth_pins >= 0);
2779
2780 if (freeze_tree_state)
2781 freeze_tree_state->auth_pins -= 1;
7c673cae
FG
2782
2783 maybe_finish_freeze(); // pending freeze?
7c673cae
FG
2784}
2785
11fdf7f2 2786void CDir::adjust_nested_auth_pins(int dirinc, void *by)
7c673cae 2787{
11fdf7f2 2788 ceph_assert(dirinc);
7c673cae
FG
2789 dir_auth_pins += dirinc;
2790
11fdf7f2 2791 dout(15) << __func__ << " " << dirinc << " on " << *this
7c673cae 2792 << " by " << by << " count now "
11fdf7f2
TL
2793 << auth_pins << "/" << dir_auth_pins << dendl;
2794 ceph_assert(dir_auth_pins >= 0);
7c673cae 2795
11fdf7f2
TL
2796 if (freeze_tree_state)
2797 freeze_tree_state->auth_pins += dirinc;
7c673cae 2798
11fdf7f2
TL
2799 if (dirinc < 0)
2800 maybe_finish_freeze(); // pending freeze?
7c673cae
FG
2801}
2802
2803#ifdef MDS_VERIFY_FRAGSTAT
2804void CDir::verify_fragstat()
2805{
11fdf7f2 2806 ceph_assert(is_complete());
7c673cae
FG
2807 if (inode->is_stray())
2808 return;
2809
2810 frag_info_t c;
2811 memset(&c, 0, sizeof(c));
2812
94b18763 2813 for (auto it = items.begin();
7c673cae
FG
2814 it != items.end();
2815 ++it) {
2816 CDentry *dn = it->second;
2817 if (dn->is_null())
2818 continue;
2819
2820 dout(10) << " " << *dn << dendl;
2821 if (dn->is_primary())
2822 dout(10) << " " << *dn->inode << dendl;
2823
2824 if (dn->is_primary()) {
2825 if (dn->inode->is_dir())
2826 c.nsubdirs++;
2827 else
2828 c.nfiles++;
2829 }
2830 if (dn->is_remote()) {
2831 if (dn->get_remote_d_type() == DT_DIR)
2832 c.nsubdirs++;
2833 else
2834 c.nfiles++;
2835 }
2836 }
2837
2838 if (c.nsubdirs != fnode.fragstat.nsubdirs ||
2839 c.nfiles != fnode.fragstat.nfiles) {
2840 dout(0) << "verify_fragstat failed " << fnode.fragstat << " on " << *this << dendl;
2841 dout(0) << " i count " << c << dendl;
2842 ceph_abort();
2843 } else {
2844 dout(0) << "verify_fragstat ok " << fnode.fragstat << " on " << *this << dendl;
2845 }
2846}
2847#endif
2848
2849/*****************************************************************************
2850 * FREEZING
2851 */
2852
2853// FREEZE TREE
2854
11fdf7f2
TL
2855void CDir::_walk_tree(std::function<bool(CDir*)> callback)
2856{
11fdf7f2
TL
2857 deque<CDir*> dfq;
2858 dfq.push_back(this);
2859
11fdf7f2
TL
2860 while (!dfq.empty()) {
2861 CDir *dir = dfq.front();
2862 dfq.pop_front();
2863
2864 for (auto& p : *dir) {
2865 CDentry *dn = p.second;
2866 if (!dn->get_linkage()->is_primary())
2867 continue;
2868 CInode *in = dn->get_linkage()->get_inode();
2869 if (!in->is_dir())
2870 continue;
2871
9f95a23c 2872 auto&& dfv = in->get_nested_dirfrags();
11fdf7f2
TL
2873 for (auto& dir : dfv) {
2874 auto ret = callback(dir);
2875 if (ret)
2876 dfq.push_back(dir);
2877 }
11fdf7f2
TL
2878 }
2879 }
2880}
2881
7c673cae
FG
2882bool CDir::freeze_tree()
2883{
11fdf7f2
TL
2884 ceph_assert(!is_frozen());
2885 ceph_assert(!is_freezing());
2886 ceph_assert(!freeze_tree_state);
7c673cae
FG
2887
2888 auth_pin(this);
11fdf7f2
TL
2889
2890 // Travese the subtree to mark dirfrags as 'freezing' (set freeze_tree_state)
2891 // and to accumulate auth pins and record total count in freeze_tree_state.
2892 // when auth unpin an 'freezing' object, the counter in freeze_tree_state also
2893 // gets decreased. Subtree become 'frozen' when the counter reaches zero.
2894 freeze_tree_state = std::make_shared<freeze_tree_state_t>(this);
2895 freeze_tree_state->auth_pins += get_auth_pins() + get_dir_auth_pins();
9f95a23c
TL
2896 if (!lock_caches_with_auth_pins.empty())
2897 cache->mds->locker->invalidate_lock_caches(this);
11fdf7f2
TL
2898
2899 _walk_tree([this](CDir *dir) {
2900 if (dir->freeze_tree_state)
2901 return false;
2902 dir->freeze_tree_state = freeze_tree_state;
2903 freeze_tree_state->auth_pins += dir->get_auth_pins() + dir->get_dir_auth_pins();
9f95a23c
TL
2904 if (!dir->lock_caches_with_auth_pins.empty())
2905 cache->mds->locker->invalidate_lock_caches(dir);
11fdf7f2 2906 return true;
9f95a23c 2907 }
11fdf7f2
TL
2908 );
2909
7c673cae
FG
2910 if (is_freezeable(true)) {
2911 _freeze_tree();
2912 auth_unpin(this);
2913 return true;
2914 } else {
2915 state_set(STATE_FREEZINGTREE);
2916 ++num_freezing_trees;
2917 dout(10) << "freeze_tree waiting " << *this << dendl;
2918 return false;
2919 }
2920}
2921
2922void CDir::_freeze_tree()
2923{
11fdf7f2
TL
2924 dout(10) << __func__ << " " << *this << dendl;
2925 ceph_assert(is_freezeable(true));
7c673cae 2926
11fdf7f2
TL
2927 if (freeze_tree_state) {
2928 ceph_assert(is_auth());
2929 } else {
2930 ceph_assert(!is_auth());
2931 freeze_tree_state = std::make_shared<freeze_tree_state_t>(this);
7c673cae 2932 }
11fdf7f2 2933 freeze_tree_state->frozen = true;
224ce89b
WB
2934
2935 if (is_auth()) {
2936 mds_authority_t auth;
2937 bool was_subtree = is_subtree_root();
2938 if (was_subtree) {
2939 auth = get_dir_auth();
2940 } else {
2941 // temporarily prevent parent subtree from becoming frozen.
2942 inode->auth_pin(this);
2943 // create new subtree
2944 auth = authority();
2945 }
2946
11fdf7f2
TL
2947 _walk_tree([this, &auth] (CDir *dir) {
2948 if (dir->freeze_tree_state != freeze_tree_state) {
2949 inode->mdcache->adjust_subtree_auth(dir, auth);
2950 return false;
2951 }
2952 return true;
2953 }
2954 );
2955
2956 ceph_assert(auth.first >= 0);
2957 ceph_assert(auth.second == CDIR_AUTH_UNKNOWN);
224ce89b
WB
2958 auth.second = auth.first;
2959 inode->mdcache->adjust_subtree_auth(this, auth);
2960 if (!was_subtree)
2961 inode->auth_unpin(this);
11fdf7f2
TL
2962 } else {
2963 // importing subtree ?
2964 _walk_tree([this] (CDir *dir) {
2965 ceph_assert(!dir->freeze_tree_state);
2966 dir->freeze_tree_state = freeze_tree_state;
2967 return true;
2968 }
2969 );
2970 }
2971
2972 // twiddle state
2973 if (state_test(STATE_FREEZINGTREE)) {
2974 state_clear(STATE_FREEZINGTREE);
2975 --num_freezing_trees;
224ce89b
WB
2976 }
2977
7c673cae
FG
2978 state_set(STATE_FROZENTREE);
2979 ++num_frozen_trees;
2980 get(PIN_FROZEN);
7c673cae
FG
2981}
2982
2983void CDir::unfreeze_tree()
2984{
11fdf7f2
TL
2985 dout(10) << __func__ << " " << *this << dendl;
2986
2987 MDSContext::vec unfreeze_waiters;
2988 take_waiting(WAIT_UNFREEZE, unfreeze_waiters);
2989
2990 if (freeze_tree_state) {
2991 _walk_tree([this, &unfreeze_waiters](CDir *dir) {
2992 if (dir->freeze_tree_state != freeze_tree_state)
2993 return false;
2994 dir->freeze_tree_state.reset();
2995 dir->take_waiting(WAIT_UNFREEZE, unfreeze_waiters);
2996 return true;
2997 }
2998 );
2999 }
7c673cae
FG
3000
3001 if (state_test(STATE_FROZENTREE)) {
3002 // frozen. unfreeze.
3003 state_clear(STATE_FROZENTREE);
3004 --num_frozen_trees;
3005
3006 put(PIN_FROZEN);
3007
224ce89b
WB
3008 if (is_auth()) {
3009 // must be subtree
11fdf7f2 3010 ceph_assert(is_subtree_root());
224ce89b
WB
3011 // for debug purpose, caller should ensure 'dir_auth.second == dir_auth.first'
3012 mds_authority_t auth = get_dir_auth();
11fdf7f2
TL
3013 ceph_assert(auth.first >= 0);
3014 ceph_assert(auth.second == auth.first);
224ce89b
WB
3015 auth.second = CDIR_AUTH_UNKNOWN;
3016 inode->mdcache->adjust_subtree_auth(this, auth);
3017 }
11fdf7f2 3018 freeze_tree_state.reset();
7c673cae 3019 } else {
11fdf7f2 3020 ceph_assert(state_test(STATE_FREEZINGTREE));
7c673cae
FG
3021
3022 // freezing. stop it.
7c673cae
FG
3023 state_clear(STATE_FREEZINGTREE);
3024 --num_freezing_trees;
11fdf7f2
TL
3025 freeze_tree_state.reset();
3026
3027 finish_waiting(WAIT_FROZEN, -1);
7c673cae 3028 auth_unpin(this);
7c673cae 3029 }
11fdf7f2
TL
3030
3031 cache->mds->queue_waiters(unfreeze_waiters);
3032}
3033
3034void CDir::adjust_freeze_after_rename(CDir *dir)
3035{
3036 if (!freeze_tree_state || dir->freeze_tree_state != freeze_tree_state)
3037 return;
3038 CDir *newdir = dir->get_inode()->get_parent_dir();
3039 if (newdir == this || newdir->freeze_tree_state == freeze_tree_state)
3040 return;
3041
3042 ceph_assert(!freeze_tree_state->frozen);
3043 ceph_assert(get_dir_auth_pins() > 0);
3044
3045 MDSContext::vec unfreeze_waiters;
3046
3047 auto unfreeze = [this, &unfreeze_waiters](CDir *dir) {
3048 if (dir->freeze_tree_state != freeze_tree_state)
3049 return false;
3050 int dec = dir->get_auth_pins() + dir->get_dir_auth_pins();
3051 // shouldn't become zero because srcdn of rename was auth pinned
3052 ceph_assert(freeze_tree_state->auth_pins > dec);
3053 freeze_tree_state->auth_pins -= dec;
3054 dir->freeze_tree_state.reset();
3055 dir->take_waiting(WAIT_UNFREEZE, unfreeze_waiters);
3056 return true;
3057 };
3058
3059 unfreeze(dir);
3060 dir->_walk_tree(unfreeze);
3061
3062 cache->mds->queue_waiters(unfreeze_waiters);
7c673cae
FG
3063}
3064
91327a77 3065bool CDir::can_auth_pin(int *err_ret) const
7c673cae 3066{
91327a77
AA
3067 int err;
3068 if (!is_auth()) {
3069 err = ERR_NOT_AUTH;
3070 } else if (is_freezing_dir() || is_frozen_dir()) {
3071 err = ERR_FRAGMENTING_DIR;
3072 } else {
3073 auto p = is_freezing_or_frozen_tree();
3074 if (p.first || p.second) {
3075 err = ERR_EXPORTING_TREE;
3076 } else {
3077 err = 0;
3078 }
3079 }
3080 if (err && err_ret)
3081 *err_ret = err;
3082 return !err;
3083}
3084
7c673cae
FG
3085class C_Dir_AuthUnpin : public CDirContext {
3086 public:
3087 explicit C_Dir_AuthUnpin(CDir *d) : CDirContext(d) {}
3088 void finish(int r) override {
3089 dir->auth_unpin(dir->get_inode());
3090 }
3091};
3092
3093void CDir::maybe_finish_freeze()
3094{
11fdf7f2 3095 if (dir_auth_pins != 0)
7c673cae
FG
3096 return;
3097
3098 // we can freeze the _dir_ even with nested pins...
3099 if (state_test(STATE_FREEZINGDIR)) {
11fdf7f2
TL
3100 if (auth_pins == 1) {
3101 _freeze_dir();
3102 auth_unpin(this);
3103 finish_waiting(WAIT_FROZEN);
3104 }
7c673cae
FG
3105 }
3106
11fdf7f2
TL
3107 if (freeze_tree_state) {
3108 if (freeze_tree_state->frozen ||
3109 freeze_tree_state->auth_pins != 1)
3110 return;
3111
3112 if (freeze_tree_state->dir != this) {
3113 freeze_tree_state->dir->maybe_finish_freeze();
3114 return;
3115 }
3116
3117 ceph_assert(state_test(STATE_FREEZINGTREE));
7c673cae 3118
7c673cae 3119 if (!is_subtree_root() && inode->is_frozen()) {
11fdf7f2 3120 dout(10) << __func__ << " !subtree root and frozen inode, waiting for unfreeze on " << inode << dendl;
7c673cae
FG
3121 // retake an auth_pin...
3122 auth_pin(inode);
3123 // and release it when the parent inode unfreezes
3124 inode->add_waiter(WAIT_UNFREEZE, new C_Dir_AuthUnpin(this));
3125 return;
3126 }
3127
3128 _freeze_tree();
3129 auth_unpin(this);
3130 finish_waiting(WAIT_FROZEN);
3131 }
3132}
3133
3134
3135
3136// FREEZE DIR
3137
3138bool CDir::freeze_dir()
3139{
11fdf7f2
TL
3140 ceph_assert(!is_frozen());
3141 ceph_assert(!is_freezing());
7c673cae
FG
3142
3143 auth_pin(this);
3144 if (is_freezeable_dir(true)) {
3145 _freeze_dir();
3146 auth_unpin(this);
3147 return true;
3148 } else {
3149 state_set(STATE_FREEZINGDIR);
9f95a23c
TL
3150 if (!lock_caches_with_auth_pins.empty())
3151 cache->mds->locker->invalidate_lock_caches(this);
7c673cae
FG
3152 dout(10) << "freeze_dir + wait " << *this << dendl;
3153 return false;
3154 }
3155}
3156
3157void CDir::_freeze_dir()
3158{
11fdf7f2 3159 dout(10) << __func__ << " " << *this << dendl;
7c673cae
FG
3160 //assert(is_freezeable_dir(true));
3161 // not always true during split because the original fragment may have frozen a while
3162 // ago and we're just now getting around to breaking it up.
3163
3164 state_clear(STATE_FREEZINGDIR);
3165 state_set(STATE_FROZENDIR);
3166 get(PIN_FROZEN);
3167
3168 if (is_auth() && !is_subtree_root())
3169 inode->auth_pin(this); // auth_pin for duration of freeze
3170}
3171
3172
3173void CDir::unfreeze_dir()
3174{
11fdf7f2 3175 dout(10) << __func__ << " " << *this << dendl;
7c673cae
FG
3176
3177 if (state_test(STATE_FROZENDIR)) {
3178 state_clear(STATE_FROZENDIR);
3179 put(PIN_FROZEN);
3180
3181 // unpin (may => FREEZEABLE) FIXME: is this order good?
3182 if (is_auth() && !is_subtree_root())
3183 inode->auth_unpin(this);
3184
3185 finish_waiting(WAIT_UNFREEZE);
3186 } else {
3187 finish_waiting(WAIT_FROZEN, -1);
3188
3189 // still freezing. stop.
11fdf7f2 3190 ceph_assert(state_test(STATE_FREEZINGDIR));
7c673cae
FG
3191 state_clear(STATE_FREEZINGDIR);
3192 auth_unpin(this);
3193
3194 finish_waiting(WAIT_UNFREEZE);
3195 }
3196}
3197
9f95a23c
TL
3198void CDir::enable_frozen_inode()
3199{
3200 ceph_assert(frozen_inode_suppressed > 0);
3201 if (--frozen_inode_suppressed == 0) {
3202 for (auto p = freezing_inodes.begin(); !p.end(); ) {
3203 CInode *in = *p;
3204 ++p;
3205 ceph_assert(in->is_freezing_inode());
3206 in->maybe_finish_freeze_inode();
3207 }
3208 }
3209}
3210
7c673cae
FG
3211/**
3212 * Slightly less complete than operator<<, because this is intended
3213 * for identifying a directory and its state rather than for dumping
3214 * debug output.
3215 */
11fdf7f2 3216void CDir::dump(Formatter *f, int flags) const
7c673cae 3217{
11fdf7f2
TL
3218 ceph_assert(f != NULL);
3219 if (flags & DUMP_PATH) {
3220 f->dump_stream("path") << get_path();
3221 }
3222 if (flags & DUMP_DIRFRAG) {
3223 f->dump_stream("dirfrag") << dirfrag();
3224 }
3225 if (flags & DUMP_SNAPID_FIRST) {
3226 f->dump_int("snapid_first", first);
3227 }
3228 if (flags & DUMP_VERSIONS) {
3229 f->dump_stream("projected_version") << get_projected_version();
3230 f->dump_stream("version") << get_version();
3231 f->dump_stream("committing_version") << get_committing_version();
3232 f->dump_stream("committed_version") << get_committed_version();
3233 }
3234 if (flags & DUMP_REP) {
3235 f->dump_bool("is_rep", is_rep());
3236 }
3237 if (flags & DUMP_DIR_AUTH) {
3238 if (get_dir_auth() != CDIR_AUTH_DEFAULT) {
3239 if (get_dir_auth().second == CDIR_AUTH_UNKNOWN) {
3240 f->dump_stream("dir_auth") << get_dir_auth().first;
3241 } else {
3242 f->dump_stream("dir_auth") << get_dir_auth();
3243 }
7c673cae 3244 } else {
11fdf7f2 3245 f->dump_string("dir_auth", "");
7c673cae 3246 }
11fdf7f2
TL
3247 }
3248 if (flags & DUMP_STATES) {
3249 f->open_array_section("states");
3250 MDSCacheObject::dump_states(f);
3251 if (state_test(CDir::STATE_COMPLETE)) f->dump_string("state", "complete");
3252 if (state_test(CDir::STATE_FREEZINGTREE)) f->dump_string("state", "freezingtree");
3253 if (state_test(CDir::STATE_FROZENTREE)) f->dump_string("state", "frozentree");
3254 if (state_test(CDir::STATE_FROZENDIR)) f->dump_string("state", "frozendir");
3255 if (state_test(CDir::STATE_FREEZINGDIR)) f->dump_string("state", "freezingdir");
3256 if (state_test(CDir::STATE_EXPORTBOUND)) f->dump_string("state", "exportbound");
3257 if (state_test(CDir::STATE_IMPORTBOUND)) f->dump_string("state", "importbound");
3258 if (state_test(CDir::STATE_BADFRAG)) f->dump_string("state", "badfrag");
3259 f->close_section();
3260 }
3261 if (flags & DUMP_MDS_CACHE_OBJECT) {
3262 MDSCacheObject::dump(f);
3263 }
3264 if (flags & DUMP_ITEMS) {
3265 f->open_array_section("dentries");
3266 for (auto &p : items) {
3267 CDentry *dn = p.second;
3268 f->open_object_section("dentry");
3269 dn->dump(f);
3270 f->close_section();
3271 }
3272 f->close_section();
3273 }
7c673cae
FG
3274}
3275
11fdf7f2 3276void CDir::dump_load(Formatter *f)
28e407b8
AA
3277{
3278 f->dump_stream("path") << get_path();
3279 f->dump_stream("dirfrag") << dirfrag();
3280
3281 f->open_object_section("pop_me");
11fdf7f2 3282 pop_me.dump(f);
28e407b8
AA
3283 f->close_section();
3284
3285 f->open_object_section("pop_nested");
11fdf7f2 3286 pop_nested.dump(f);
28e407b8
AA
3287 f->close_section();
3288
3289 f->open_object_section("pop_auth_subtree");
11fdf7f2 3290 pop_auth_subtree.dump(f);
28e407b8
AA
3291 f->close_section();
3292
3293 f->open_object_section("pop_auth_subtree_nested");
11fdf7f2 3294 pop_auth_subtree_nested.dump(f);
28e407b8
AA
3295 f->close_section();
3296}
3297
7c673cae
FG
3298/****** Scrub Stuff *******/
3299
3300void CDir::scrub_info_create() const
3301{
11fdf7f2 3302 ceph_assert(!scrub_infop);
7c673cae
FG
3303
3304 // break out of const-land to set up implicit initial state
3305 CDir *me = const_cast<CDir*>(this);
3306 fnode_t *fn = me->get_projected_fnode();
3307
3308 std::unique_ptr<scrub_info_t> si(new scrub_info_t());
3309
3310 si->last_recursive.version = si->recursive_start.version =
3311 fn->recursive_scrub_version;
3312 si->last_recursive.time = si->recursive_start.time =
3313 fn->recursive_scrub_stamp;
3314
3315 si->last_local.version = fn->localized_scrub_version;
3316 si->last_local.time = fn->localized_scrub_stamp;
3317
3318 me->scrub_infop.swap(si);
3319}
3320
3321void CDir::scrub_initialize(const ScrubHeaderRefConst& header)
3322{
3323 dout(20) << __func__ << dendl;
11fdf7f2
TL
3324 ceph_assert(is_complete());
3325 ceph_assert(header != nullptr);
7c673cae
FG
3326
3327 // FIXME: weird implicit construction, is someone else meant
3328 // to be calling scrub_info_create first?
3329 scrub_info();
11fdf7f2 3330 ceph_assert(scrub_infop && !scrub_infop->directory_scrubbing);
7c673cae
FG
3331
3332 scrub_infop->recursive_start.version = get_projected_version();
3333 scrub_infop->recursive_start.time = ceph_clock_now();
3334
3335 scrub_infop->directories_to_scrub.clear();
3336 scrub_infop->directories_scrubbing.clear();
3337 scrub_infop->directories_scrubbed.clear();
3338 scrub_infop->others_to_scrub.clear();
3339 scrub_infop->others_scrubbing.clear();
3340 scrub_infop->others_scrubbed.clear();
3341
94b18763 3342 for (auto i = items.begin();
7c673cae
FG
3343 i != items.end();
3344 ++i) {
3345 // TODO: handle snapshot scrubbing
3346 if (i->first.snapid != CEPH_NOSNAP)
3347 continue;
3348
3349 CDentry::linkage_t *dnl = i->second->get_projected_linkage();
3350 if (dnl->is_primary()) {
3351 if (dnl->get_inode()->is_dir())
3352 scrub_infop->directories_to_scrub.insert(i->first);
3353 else
3354 scrub_infop->others_to_scrub.insert(i->first);
3355 } else if (dnl->is_remote()) {
3356 // TODO: check remote linkage
3357 }
3358 }
3359 scrub_infop->directory_scrubbing = true;
3360 scrub_infop->header = header;
3361}
3362
3363void CDir::scrub_finished()
3364{
3365 dout(20) << __func__ << dendl;
11fdf7f2 3366 ceph_assert(scrub_infop && scrub_infop->directory_scrubbing);
7c673cae 3367
11fdf7f2
TL
3368 ceph_assert(scrub_infop->directories_to_scrub.empty());
3369 ceph_assert(scrub_infop->directories_scrubbing.empty());
7c673cae 3370 scrub_infop->directories_scrubbed.clear();
11fdf7f2
TL
3371 ceph_assert(scrub_infop->others_to_scrub.empty());
3372 ceph_assert(scrub_infop->others_scrubbing.empty());
7c673cae
FG
3373 scrub_infop->others_scrubbed.clear();
3374 scrub_infop->directory_scrubbing = false;
3375
3376 scrub_infop->last_recursive = scrub_infop->recursive_start;
3377 scrub_infop->last_scrub_dirty = true;
3378}
3379
94b18763 3380int CDir::_next_dentry_on_set(dentry_key_set &dns, bool missing_okay,
11fdf7f2 3381 MDSContext *cb, CDentry **dnout)
7c673cae
FG
3382{
3383 dentry_key_t dnkey;
3384 CDentry *dn;
3385
3386 while (!dns.empty()) {
3387 set<dentry_key_t>::iterator front = dns.begin();
3388 dnkey = *front;
3389 dn = lookup(dnkey.name);
3390 if (!dn) {
3391 if (!is_complete() &&
3392 (!has_bloom() || is_in_bloom(dnkey.name))) {
3393 // need to re-read this dirfrag
3394 fetch(cb);
3395 return EAGAIN;
3396 }
3397 // okay, we lost it
3398 if (missing_okay) {
3399 dout(15) << " we no longer have directory dentry "
3400 << dnkey.name << ", assuming it got renamed" << dendl;
3401 dns.erase(dnkey);
3402 continue;
3403 } else {
3404 dout(5) << " we lost dentry " << dnkey.name
3405 << ", bailing out because that's impossible!" << dendl;
3406 ceph_abort();
3407 }
3408 }
3409 // okay, we got a dentry
3410 dns.erase(dnkey);
3411
3412 if (dn->get_projected_version() < scrub_infop->last_recursive.version &&
3413 !(scrub_infop->header->get_force())) {
3414 dout(15) << " skip dentry " << dnkey.name
3415 << ", no change since last scrub" << dendl;
3416 continue;
94b18763
FG
3417 }
3418
3419 if (!dn->get_linkage()->is_primary()) {
3420 dout(15) << " skip dentry " << dnkey.name
3421 << ", no longer primary" << dendl;
3422 continue;
7c673cae
FG
3423 }
3424
3425 *dnout = dn;
3426 return 0;
3427 }
3428 *dnout = NULL;
3429 return ENOENT;
3430}
3431
11fdf7f2 3432int CDir::scrub_dentry_next(MDSContext *cb, CDentry **dnout)
7c673cae
FG
3433{
3434 dout(20) << __func__ << dendl;
11fdf7f2 3435 ceph_assert(scrub_infop && scrub_infop->directory_scrubbing);
7c673cae
FG
3436
3437 dout(20) << "trying to scrub directories underneath us" << dendl;
3438 int rval = _next_dentry_on_set(scrub_infop->directories_to_scrub, true,
3439 cb, dnout);
3440 if (rval == 0) {
3441 dout(20) << __func__ << " inserted to directories scrubbing: "
3442 << *dnout << dendl;
3443 scrub_infop->directories_scrubbing.insert((*dnout)->key());
3444 } else if (rval == EAGAIN) {
3445 // we don't need to do anything else
3446 } else { // we emptied out the directory scrub set
11fdf7f2 3447 ceph_assert(rval == ENOENT);
7c673cae
FG
3448 dout(20) << "no directories left, moving on to other kinds of dentries"
3449 << dendl;
3450
3451 rval = _next_dentry_on_set(scrub_infop->others_to_scrub, false, cb, dnout);
3452 if (rval == 0) {
3453 dout(20) << __func__ << " inserted to others scrubbing: "
3454 << *dnout << dendl;
3455 scrub_infop->others_scrubbing.insert((*dnout)->key());
3456 }
3457 }
3458 dout(20) << " returning " << rval << " with dn=" << *dnout << dendl;
3459 return rval;
3460}
3461
9f95a23c 3462std::vector<CDentry*> CDir::scrub_dentries_scrubbing()
7c673cae
FG
3463{
3464 dout(20) << __func__ << dendl;
11fdf7f2 3465 ceph_assert(scrub_infop && scrub_infop->directory_scrubbing);
7c673cae 3466
9f95a23c
TL
3467 std::vector<CDentry*> result;
3468 for (auto& scrub_info : scrub_infop->directories_scrubbing) {
3469 CDentry *d = lookup(scrub_info.name, scrub_info.snapid);
11fdf7f2 3470 ceph_assert(d);
9f95a23c 3471 result.push_back(d);
7c673cae 3472 }
9f95a23c
TL
3473 for (auto& scrub_info : scrub_infop->others_scrubbing) {
3474 CDentry *d = lookup(scrub_info.name, scrub_info.snapid);
11fdf7f2 3475 ceph_assert(d);
9f95a23c 3476 result.push_back(d);
7c673cae 3477 }
9f95a23c 3478 return result;
7c673cae
FG
3479}
3480
3481void CDir::scrub_dentry_finished(CDentry *dn)
3482{
3483 dout(20) << __func__ << " on dn " << *dn << dendl;
11fdf7f2 3484 ceph_assert(scrub_infop && scrub_infop->directory_scrubbing);
7c673cae
FG
3485 dentry_key_t dn_key = dn->key();
3486 if (scrub_infop->directories_scrubbing.erase(dn_key)) {
3487 scrub_infop->directories_scrubbed.insert(dn_key);
3488 } else {
11fdf7f2 3489 ceph_assert(scrub_infop->others_scrubbing.count(dn_key));
7c673cae
FG
3490 scrub_infop->others_scrubbing.erase(dn_key);
3491 scrub_infop->others_scrubbed.insert(dn_key);
3492 }
3493}
3494
3495void CDir::scrub_maybe_delete_info()
3496{
3497 if (scrub_infop &&
3498 !scrub_infop->directory_scrubbing &&
3499 !scrub_infop->need_scrub_local &&
3500 !scrub_infop->last_scrub_dirty &&
3501 !scrub_infop->pending_scrub_error &&
3502 scrub_infop->dirty_scrub_stamps.empty()) {
3503 scrub_infop.reset();
3504 }
3505}
3506
3507bool CDir::scrub_local()
3508{
11fdf7f2 3509 ceph_assert(is_complete());
7c673cae
FG
3510 bool rval = check_rstats(true);
3511
3512 scrub_info();
3513 if (rval) {
3514 scrub_infop->last_local.time = ceph_clock_now();
3515 scrub_infop->last_local.version = get_projected_version();
3516 scrub_infop->pending_scrub_error = false;
3517 scrub_infop->last_scrub_dirty = true;
3518 } else {
3519 scrub_infop->pending_scrub_error = true;
3520 if (scrub_infop->header->get_repair())
3521 cache->repair_dirfrag_stats(this);
3522 }
3523 return rval;
3524}
3525
3526std::string CDir::get_path() const
3527{
3528 std::string path;
3529 get_inode()->make_path_string(path, true);
3530 return path;
3531}
3532
3533bool CDir::should_split_fast() const
3534{
3535 // Max size a fragment can be before trigger fast splitting
11fdf7f2 3536 int fast_limit = g_conf()->mds_bal_split_size * g_conf()->mds_bal_fragment_fast_factor;
7c673cae
FG
3537
3538 // Fast path: the sum of accounted size and null dentries does not
3539 // exceed threshold: we definitely are not over it.
3540 if (get_frag_size() + get_num_head_null() <= fast_limit) {
3541 return false;
3542 }
3543
3544 // Fast path: the accounted size of the frag exceeds threshold: we
3545 // definitely are over it
3546 if (get_frag_size() > fast_limit) {
3547 return true;
3548 }
3549
3550 int64_t effective_size = 0;
3551
3552 for (const auto &p : items) {
3553 const CDentry *dn = p.second;
3554 if (!dn->get_projected_linkage()->is_null()) {
3555 effective_size++;
3556 }
3557 }
3558
3559 return effective_size > fast_limit;
3560}
3561
181888fb 3562MEMPOOL_DEFINE_OBJECT_FACTORY(CDir, co_dir, mds_co);
f91f0fd5 3563MEMPOOL_DEFINE_OBJECT_FACTORY(CDir::scrub_info_t, scrub_info_t, mds_co)