]> git.proxmox.com Git - ceph.git/blame - ceph/src/mds/CDir.cc
import new upstream nautilus stable release 14.2.8
[ceph.git] / ceph / src / mds / CDir.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
11fdf7f2 15#include <string_view>
7c673cae
FG
16
17#include "include/types.h"
18
19#include "CDir.h"
20#include "CDentry.h"
21#include "CInode.h"
22#include "Mutation.h"
23
24#include "MDSMap.h"
25#include "MDSRank.h"
26#include "MDCache.h"
27#include "Locker.h"
28#include "MDLog.h"
29#include "LogSegment.h"
30
31#include "common/bloom_filter.hpp"
32#include "include/Context.h"
33#include "common/Clock.h"
34
35#include "osdc/Objecter.h"
36
37#include "common/config.h"
11fdf7f2 38#include "include/ceph_assert.h"
7c673cae
FG
39#include "include/compat.h"
40
41#define dout_context g_ceph_context
42#define dout_subsys ceph_subsys_mds
43#undef dout_prefix
44#define dout_prefix *_dout << "mds." << cache->mds->get_nodeid() << ".cache.dir(" << this->dirfrag() << ") "
45
46int CDir::num_frozen_trees = 0;
47int CDir::num_freezing_trees = 0;
48
11fdf7f2 49class CDirContext : public MDSContext
7c673cae
FG
50{
51protected:
52 CDir *dir;
53 MDSRank* get_mds() override {return dir->cache->mds;}
54
55public:
56 explicit CDirContext(CDir *d) : dir(d) {
11fdf7f2 57 ceph_assert(dir != NULL);
7c673cae
FG
58 }
59};
60
61
62class CDirIOContext : public MDSIOContextBase
63{
64protected:
65 CDir *dir;
66 MDSRank* get_mds() override {return dir->cache->mds;}
67
68public:
69 explicit CDirIOContext(CDir *d) : dir(d) {
11fdf7f2 70 ceph_assert(dir != NULL);
7c673cae
FG
71 }
72};
73
74
75// PINS
76//int cdir_pins[CDIR_NUM_PINS] = { 0,0,0,0,0,0,0,0,0,0,0,0,0,0 };
77
78
79ostream& operator<<(ostream& out, const CDir& dir)
80{
81 out << "[dir " << dir.dirfrag() << " " << dir.get_path() << "/"
82 << " [" << dir.first << ",head]";
83 if (dir.is_auth()) {
84 out << " auth";
85 if (dir.is_replicated())
86 out << dir.get_replicas();
87
88 if (dir.is_projected())
89 out << " pv=" << dir.get_projected_version();
90 out << " v=" << dir.get_version();
91 out << " cv=" << dir.get_committing_version();
92 out << "/" << dir.get_committed_version();
93 } else {
94 mds_authority_t a = dir.authority();
95 out << " rep@" << a.first;
96 if (a.second != CDIR_AUTH_UNKNOWN)
97 out << "," << a.second;
98 out << "." << dir.get_replica_nonce();
99 }
100
101 if (dir.is_rep()) out << " REP";
102
103 if (dir.get_dir_auth() != CDIR_AUTH_DEFAULT) {
104 if (dir.get_dir_auth().second == CDIR_AUTH_UNKNOWN)
105 out << " dir_auth=" << dir.get_dir_auth().first;
106 else
107 out << " dir_auth=" << dir.get_dir_auth();
108 }
109
11fdf7f2 110 if (dir.get_auth_pins() || dir.get_dir_auth_pins()) {
7c673cae 111 out << " ap=" << dir.get_auth_pins()
11fdf7f2
TL
112 << "+" << dir.get_dir_auth_pins();
113#ifdef MDS_AUTHPIN_SET
114 dir.print_authpin_set(out);
115#endif
116 }
7c673cae
FG
117
118 out << " state=" << dir.get_state();
119 if (dir.state_test(CDir::STATE_COMPLETE)) out << "|complete";
120 if (dir.state_test(CDir::STATE_FREEZINGTREE)) out << "|freezingtree";
121 if (dir.state_test(CDir::STATE_FROZENTREE)) out << "|frozentree";
122 if (dir.state_test(CDir::STATE_AUXSUBTREE)) out << "|auxsubtree";
7c673cae
FG
123 if (dir.state_test(CDir::STATE_FROZENDIR)) out << "|frozendir";
124 if (dir.state_test(CDir::STATE_FREEZINGDIR)) out << "|freezingdir";
125 if (dir.state_test(CDir::STATE_EXPORTBOUND)) out << "|exportbound";
126 if (dir.state_test(CDir::STATE_IMPORTBOUND)) out << "|importbound";
127 if (dir.state_test(CDir::STATE_BADFRAG)) out << "|badfrag";
128 if (dir.state_test(CDir::STATE_FRAGMENTING)) out << "|fragmenting";
11fdf7f2
TL
129 if (dir.state_test(CDir::STATE_CREATING)) out << "|creating";
130 if (dir.state_test(CDir::STATE_COMMITTING)) out << "|committing";
131 if (dir.state_test(CDir::STATE_FETCHING)) out << "|fetching";
132 if (dir.state_test(CDir::STATE_EXPORTING)) out << "|exporting";
133 if (dir.state_test(CDir::STATE_IMPORTING)) out << "|importing";
134 if (dir.state_test(CDir::STATE_STICKY)) out << "|sticky";
135 if (dir.state_test(CDir::STATE_DNPINNEDFRAG)) out << "|dnpinnedfrag";
136 if (dir.state_test(CDir::STATE_ASSIMRSTAT)) out << "|assimrstat";
7c673cae
FG
137
138 // fragstat
139 out << " " << dir.fnode.fragstat;
140 if (!(dir.fnode.fragstat == dir.fnode.accounted_fragstat))
141 out << "/" << dir.fnode.accounted_fragstat;
11fdf7f2 142 if (g_conf()->mds_debug_scatterstat && dir.is_projected()) {
7c673cae
FG
143 const fnode_t *pf = dir.get_projected_fnode();
144 out << "->" << pf->fragstat;
145 if (!(pf->fragstat == pf->accounted_fragstat))
146 out << "/" << pf->accounted_fragstat;
147 }
148
149 // rstat
150 out << " " << dir.fnode.rstat;
151 if (!(dir.fnode.rstat == dir.fnode.accounted_rstat))
152 out << "/" << dir.fnode.accounted_rstat;
11fdf7f2 153 if (g_conf()->mds_debug_scatterstat && dir.is_projected()) {
7c673cae
FG
154 const fnode_t *pf = dir.get_projected_fnode();
155 out << "->" << pf->rstat;
156 if (!(pf->rstat == pf->accounted_rstat))
157 out << "/" << pf->accounted_rstat;
158 }
159
160 out << " hs=" << dir.get_num_head_items() << "+" << dir.get_num_head_null();
161 out << ",ss=" << dir.get_num_snap_items() << "+" << dir.get_num_snap_null();
162 if (dir.get_num_dirty())
163 out << " dirty=" << dir.get_num_dirty();
164
165 if (dir.get_num_ref()) {
166 out << " |";
167 dir.print_pin_set(out);
168 }
169
170 out << " " << &dir;
171 return out << "]";
172}
173
174
175void CDir::print(ostream& out)
176{
177 out << *this;
178}
179
180
181
182
183ostream& CDir::print_db_line_prefix(ostream& out)
184{
185 return out << ceph_clock_now() << " mds." << cache->mds->get_nodeid() << ".cache.dir(" << this->dirfrag() << ") ";
186}
187
188
189
190// -------------------------------------------------------------------
191// CDir
192
193CDir::CDir(CInode *in, frag_t fg, MDCache *mdcache, bool auth) :
194 cache(mdcache), inode(in), frag(fg),
195 first(2),
196 dirty_rstat_inodes(member_offset(CInode, dirty_rstat_item)),
b32b8144
FG
197 projected_version(0),
198 dirty_dentries(member_offset(CDentry, item_dir_dirty)),
199 item_dirty(this), item_new(this),
7c673cae
FG
200 num_head_items(0), num_head_null(0),
201 num_snap_items(0), num_snap_null(0),
202 num_dirty(0), committing_version(0), committed_version(0),
11fdf7f2 203 dir_auth_pins(0),
7c673cae 204 dir_rep(REP_NONE),
11fdf7f2
TL
205 pop_me(mdcache->decayrate),
206 pop_nested(mdcache->decayrate),
207 pop_auth_subtree(mdcache->decayrate),
208 pop_auth_subtree_nested(mdcache->decayrate),
209 pop_spread(mdcache->decayrate),
28e407b8 210 pop_lru_subdirs(member_offset(CInode, item_pop_lru)),
7c673cae
FG
211 num_dentries_nested(0), num_dentries_auth_subtree(0),
212 num_dentries_auth_subtree_nested(0),
213 dir_auth(CDIR_AUTH_DEFAULT)
214{
7c673cae 215 // auth
11fdf7f2 216 ceph_assert(in->is_dir());
94b18763 217 if (auth) state_set(STATE_AUTH);
7c673cae
FG
218}
219
220/**
221 * Check the recursive statistics on size for consistency.
222 * If mds_debug_scatterstat is enabled, assert for correctness,
223 * otherwise just print out the mismatch and continue.
224 */
225bool CDir::check_rstats(bool scrub)
226{
11fdf7f2 227 if (!g_conf()->mds_debug_scatterstat && !scrub)
7c673cae
FG
228 return true;
229
230 dout(25) << "check_rstats on " << this << dendl;
231 if (!is_complete() || !is_auth() || is_frozen()) {
92f5a8d4
TL
232 dout(3) << "check_rstats " << (scrub ? "(scrub) " : "")
233 << "bailing out -- incomplete or non-auth or frozen dir on "
234 << *this << dendl;
235 return !scrub;
7c673cae
FG
236 }
237
238 frag_info_t frag_info;
239 nest_info_t nest_info;
94b18763 240 for (auto i = items.begin(); i != items.end(); ++i) {
7c673cae
FG
241 if (i->second->last != CEPH_NOSNAP)
242 continue;
243 CDentry::linkage_t *dnl = i->second->get_linkage();
244 if (dnl->is_primary()) {
245 CInode *in = dnl->get_inode();
246 nest_info.add(in->inode.accounted_rstat);
247 if (in->is_dir())
248 frag_info.nsubdirs++;
249 else
250 frag_info.nfiles++;
251 } else if (dnl->is_remote())
252 frag_info.nfiles++;
253 }
254
255 bool good = true;
256 // fragstat
257 if(!frag_info.same_sums(fnode.fragstat)) {
258 dout(1) << "mismatch between head items and fnode.fragstat! printing dentries" << dendl;
259 dout(1) << "get_num_head_items() = " << get_num_head_items()
260 << "; fnode.fragstat.nfiles=" << fnode.fragstat.nfiles
261 << " fnode.fragstat.nsubdirs=" << fnode.fragstat.nsubdirs << dendl;
262 good = false;
263 } else {
264 dout(20) << "get_num_head_items() = " << get_num_head_items()
265 << "; fnode.fragstat.nfiles=" << fnode.fragstat.nfiles
266 << " fnode.fragstat.nsubdirs=" << fnode.fragstat.nsubdirs << dendl;
267 }
268
269 // rstat
270 if (!nest_info.same_sums(fnode.rstat)) {
271 dout(1) << "mismatch between child accounted_rstats and my rstats!" << dendl;
272 dout(1) << "total of child dentrys: " << nest_info << dendl;
273 dout(1) << "my rstats: " << fnode.rstat << dendl;
274 good = false;
275 } else {
276 dout(20) << "total of child dentrys: " << nest_info << dendl;
277 dout(20) << "my rstats: " << fnode.rstat << dendl;
278 }
279
280 if (!good) {
281 if (!scrub) {
94b18763 282 for (auto i = items.begin(); i != items.end(); ++i) {
7c673cae
FG
283 CDentry *dn = i->second;
284 if (dn->get_linkage()->is_primary()) {
285 CInode *in = dn->get_linkage()->inode;
286 dout(1) << *dn << " rstat " << in->inode.accounted_rstat << dendl;
287 } else {
288 dout(1) << *dn << dendl;
289 }
290 }
291
11fdf7f2
TL
292 ceph_assert(frag_info.nfiles == fnode.fragstat.nfiles);
293 ceph_assert(frag_info.nsubdirs == fnode.fragstat.nsubdirs);
294 ceph_assert(nest_info.rbytes == fnode.rstat.rbytes);
295 ceph_assert(nest_info.rfiles == fnode.rstat.rfiles);
296 ceph_assert(nest_info.rsubdirs == fnode.rstat.rsubdirs);
7c673cae
FG
297 }
298 }
299 dout(10) << "check_rstats complete on " << this << dendl;
300 return good;
301}
302
11fdf7f2
TL
303void CDir::adjust_num_inodes_with_caps(int d)
304{
305 // FIXME: smarter way to decide if adding 'this' to open file table
306 if (num_inodes_with_caps == 0 && d > 0)
307 cache->open_file_table.add_dirfrag(this);
308 else if (num_inodes_with_caps > 0 && num_inodes_with_caps == -d)
309 cache->open_file_table.remove_dirfrag(this);
310
311 num_inodes_with_caps += d;
312 ceph_assert(num_inodes_with_caps >= 0);
313}
314
315CDentry *CDir::lookup(std::string_view name, snapid_t snap)
7c673cae
FG
316{
317 dout(20) << "lookup (" << snap << ", '" << name << "')" << dendl;
94b18763 318 auto iter = items.lower_bound(dentry_key_t(snap, name, inode->hash_dentry_name(name)));
7c673cae
FG
319 if (iter == items.end())
320 return 0;
94b18763 321 if (iter->second->get_name() == name &&
7c673cae
FG
322 iter->second->first <= snap &&
323 iter->second->last >= snap) {
324 dout(20) << " hit -> " << iter->first << dendl;
325 return iter->second;
326 }
327 dout(20) << " miss -> " << iter->first << dendl;
328 return 0;
329}
330
11fdf7f2
TL
331CDentry *CDir::lookup_exact_snap(std::string_view name, snapid_t last) {
332 dout(20) << __func__ << " (" << last << ", '" << name << "')" << dendl;
94b18763 333 auto p = items.find(dentry_key_t(last, name, inode->hash_dentry_name(name)));
7c673cae
FG
334 if (p == items.end())
335 return NULL;
336 return p->second;
337}
338
339/***
340 * linking fun
341 */
342
11fdf7f2 343CDentry* CDir::add_null_dentry(std::string_view dname,
7c673cae
FG
344 snapid_t first, snapid_t last)
345{
346 // foreign
11fdf7f2 347 ceph_assert(lookup_exact_snap(dname, last) == 0);
7c673cae
FG
348
349 // create dentry
350 CDentry* dn = new CDentry(dname, inode->hash_dentry_name(dname), first, last);
351 if (is_auth())
352 dn->state_set(CDentry::STATE_AUTH);
31f18b77
FG
353
354 cache->bottom_lru.lru_insert_mid(dn);
355 dn->state_set(CDentry::STATE_BOTTOMLRU);
7c673cae
FG
356
357 dn->dir = this;
358 dn->version = get_projected_version();
359
360 // add to dir
11fdf7f2 361 ceph_assert(items.count(dn->key()) == 0);
94b18763 362 //assert(null_items.count(dn->get_name()) == 0);
7c673cae
FG
363
364 items[dn->key()] = dn;
365 if (last == CEPH_NOSNAP)
366 num_head_null++;
367 else
368 num_snap_null++;
369
370 if (state_test(CDir::STATE_DNPINNEDFRAG)) {
371 dn->get(CDentry::PIN_FRAGMENTING);
372 dn->state_set(CDentry::STATE_FRAGMENTING);
373 }
374
11fdf7f2 375 dout(12) << __func__ << " " << *dn << dendl;
7c673cae
FG
376
377 // pin?
378 if (get_num_any() == 1)
379 get(PIN_CHILD);
380
11fdf7f2 381 ceph_assert(get_num_any() == items.size());
7c673cae
FG
382 return dn;
383}
384
385
11fdf7f2 386CDentry* CDir::add_primary_dentry(std::string_view dname, CInode *in,
7c673cae
FG
387 snapid_t first, snapid_t last)
388{
389 // primary
11fdf7f2 390 ceph_assert(lookup_exact_snap(dname, last) == 0);
7c673cae
FG
391
392 // create dentry
393 CDentry* dn = new CDentry(dname, inode->hash_dentry_name(dname), first, last);
394 if (is_auth())
395 dn->state_set(CDentry::STATE_AUTH);
31f18b77
FG
396 if (is_auth() || !inode->is_stray()) {
397 cache->lru.lru_insert_mid(dn);
398 } else {
399 cache->bottom_lru.lru_insert_mid(dn);
400 dn->state_set(CDentry::STATE_BOTTOMLRU);
401 }
7c673cae
FG
402
403 dn->dir = this;
404 dn->version = get_projected_version();
405
406 // add to dir
11fdf7f2 407 ceph_assert(items.count(dn->key()) == 0);
94b18763 408 //assert(null_items.count(dn->get_name()) == 0);
7c673cae
FG
409
410 items[dn->key()] = dn;
411
412 dn->get_linkage()->inode = in;
7c673cae
FG
413
414 link_inode_work(dn, in);
415
416 if (dn->last == CEPH_NOSNAP)
417 num_head_items++;
418 else
419 num_snap_items++;
420
421 if (state_test(CDir::STATE_DNPINNEDFRAG)) {
422 dn->get(CDentry::PIN_FRAGMENTING);
423 dn->state_set(CDentry::STATE_FRAGMENTING);
424 }
425
11fdf7f2 426 dout(12) << __func__ << " " << *dn << dendl;
7c673cae
FG
427
428 // pin?
429 if (get_num_any() == 1)
430 get(PIN_CHILD);
11fdf7f2 431 ceph_assert(get_num_any() == items.size());
7c673cae
FG
432 return dn;
433}
434
11fdf7f2 435CDentry* CDir::add_remote_dentry(std::string_view dname, inodeno_t ino, unsigned char d_type,
7c673cae
FG
436 snapid_t first, snapid_t last)
437{
438 // foreign
11fdf7f2 439 ceph_assert(lookup_exact_snap(dname, last) == 0);
7c673cae
FG
440
441 // create dentry
442 CDentry* dn = new CDentry(dname, inode->hash_dentry_name(dname), ino, d_type, first, last);
443 if (is_auth())
444 dn->state_set(CDentry::STATE_AUTH);
445 cache->lru.lru_insert_mid(dn);
446
447 dn->dir = this;
448 dn->version = get_projected_version();
449
450 // add to dir
11fdf7f2 451 ceph_assert(items.count(dn->key()) == 0);
94b18763 452 //assert(null_items.count(dn->get_name()) == 0);
7c673cae
FG
453
454 items[dn->key()] = dn;
455 if (last == CEPH_NOSNAP)
456 num_head_items++;
457 else
458 num_snap_items++;
459
460 if (state_test(CDir::STATE_DNPINNEDFRAG)) {
461 dn->get(CDentry::PIN_FRAGMENTING);
462 dn->state_set(CDentry::STATE_FRAGMENTING);
463 }
464
11fdf7f2 465 dout(12) << __func__ << " " << *dn << dendl;
7c673cae
FG
466
467 // pin?
468 if (get_num_any() == 1)
469 get(PIN_CHILD);
470
11fdf7f2 471 ceph_assert(get_num_any() == items.size());
7c673cae
FG
472 return dn;
473}
474
475
476
477void CDir::remove_dentry(CDentry *dn)
478{
11fdf7f2 479 dout(12) << __func__ << " " << *dn << dendl;
7c673cae
FG
480
481 // there should be no client leases at this point!
11fdf7f2 482 ceph_assert(dn->client_lease_map.empty());
7c673cae
FG
483
484 if (state_test(CDir::STATE_DNPINNEDFRAG)) {
485 dn->put(CDentry::PIN_FRAGMENTING);
486 dn->state_clear(CDentry::STATE_FRAGMENTING);
487 }
488
489 if (dn->get_linkage()->is_null()) {
490 if (dn->last == CEPH_NOSNAP)
491 num_head_null--;
492 else
493 num_snap_null--;
494 } else {
495 if (dn->last == CEPH_NOSNAP)
496 num_head_items--;
497 else
498 num_snap_items--;
499 }
500
501 if (!dn->get_linkage()->is_null())
502 // detach inode and dentry
503 unlink_inode_work(dn);
504
505 // remove from list
11fdf7f2 506 ceph_assert(items.count(dn->key()) == 1);
7c673cae
FG
507 items.erase(dn->key());
508
509 // clean?
510 if (dn->is_dirty())
511 dn->mark_clean();
512
31f18b77
FG
513 if (dn->state_test(CDentry::STATE_BOTTOMLRU))
514 cache->bottom_lru.lru_remove(dn);
515 else
516 cache->lru.lru_remove(dn);
7c673cae
FG
517 delete dn;
518
519 // unpin?
520 if (get_num_any() == 0)
521 put(PIN_CHILD);
11fdf7f2 522 ceph_assert(get_num_any() == items.size());
7c673cae
FG
523}
524
525void CDir::link_remote_inode(CDentry *dn, CInode *in)
526{
527 link_remote_inode(dn, in->ino(), IFTODT(in->get_projected_inode()->mode));
528}
529
530void CDir::link_remote_inode(CDentry *dn, inodeno_t ino, unsigned char d_type)
531{
11fdf7f2
TL
532 dout(12) << __func__ << " " << *dn << " remote " << ino << dendl;
533 ceph_assert(dn->get_linkage()->is_null());
7c673cae
FG
534
535 dn->get_linkage()->set_remote(ino, d_type);
536
31f18b77
FG
537 if (dn->state_test(CDentry::STATE_BOTTOMLRU)) {
538 cache->bottom_lru.lru_remove(dn);
539 cache->lru.lru_insert_mid(dn);
540 dn->state_clear(CDentry::STATE_BOTTOMLRU);
541 }
542
7c673cae
FG
543 if (dn->last == CEPH_NOSNAP) {
544 num_head_items++;
545 num_head_null--;
546 } else {
547 num_snap_items++;
548 num_snap_null--;
549 }
11fdf7f2 550 ceph_assert(get_num_any() == items.size());
7c673cae
FG
551}
552
553void CDir::link_primary_inode(CDentry *dn, CInode *in)
554{
11fdf7f2
TL
555 dout(12) << __func__ << " " << *dn << " " << *in << dendl;
556 ceph_assert(dn->get_linkage()->is_null());
7c673cae
FG
557
558 dn->get_linkage()->inode = in;
7c673cae
FG
559
560 link_inode_work(dn, in);
31f18b77
FG
561
562 if (dn->state_test(CDentry::STATE_BOTTOMLRU) &&
563 (is_auth() || !inode->is_stray())) {
564 cache->bottom_lru.lru_remove(dn);
565 cache->lru.lru_insert_mid(dn);
566 dn->state_clear(CDentry::STATE_BOTTOMLRU);
567 }
7c673cae
FG
568
569 if (dn->last == CEPH_NOSNAP) {
570 num_head_items++;
571 num_head_null--;
572 } else {
573 num_snap_items++;
574 num_snap_null--;
575 }
576
11fdf7f2 577 ceph_assert(get_num_any() == items.size());
7c673cae
FG
578}
579
580void CDir::link_inode_work( CDentry *dn, CInode *in)
581{
11fdf7f2 582 ceph_assert(dn->get_linkage()->get_inode() == in);
28e407b8 583 in->set_primary_parent(dn);
7c673cae
FG
584
585 // set inode version
586 //in->inode.version = dn->get_version();
587
588 // pin dentry?
589 if (in->get_num_ref())
590 dn->get(CDentry::PIN_INODEPIN);
11fdf7f2
TL
591
592 if (in->state_test(CInode::STATE_TRACKEDBYOFT))
593 inode->mdcache->open_file_table.notify_link(in);
594 if (in->is_any_caps())
595 adjust_num_inodes_with_caps(1);
7c673cae
FG
596
597 // adjust auth pin count
11fdf7f2
TL
598 if (in->auth_pins)
599 dn->adjust_nested_auth_pins(in->auth_pins, NULL);
7c673cae
FG
600
601 // verify open snaprealm parent
602 if (in->snaprealm)
603 in->snaprealm->adjust_parent();
604 else if (in->is_any_caps())
605 in->move_to_realm(inode->find_snaprealm());
606}
607
31f18b77 608void CDir::unlink_inode(CDentry *dn, bool adjust_lru)
7c673cae
FG
609{
610 if (dn->get_linkage()->is_primary()) {
11fdf7f2 611 dout(12) << __func__ << " " << *dn << " " << *dn->get_linkage()->get_inode() << dendl;
7c673cae 612 } else {
11fdf7f2 613 dout(12) << __func__ << " " << *dn << dendl;
7c673cae
FG
614 }
615
616 unlink_inode_work(dn);
617
31f18b77
FG
618 if (adjust_lru && !dn->state_test(CDentry::STATE_BOTTOMLRU)) {
619 cache->lru.lru_remove(dn);
620 cache->bottom_lru.lru_insert_mid(dn);
621 dn->state_set(CDentry::STATE_BOTTOMLRU);
622 }
623
7c673cae
FG
624 if (dn->last == CEPH_NOSNAP) {
625 num_head_items--;
626 num_head_null++;
627 } else {
628 num_snap_items--;
629 num_snap_null++;
630 }
11fdf7f2 631 ceph_assert(get_num_any() == items.size());
7c673cae
FG
632}
633
634
635void CDir::try_remove_unlinked_dn(CDentry *dn)
636{
11fdf7f2
TL
637 ceph_assert(dn->dir == this);
638 ceph_assert(dn->get_linkage()->is_null());
7c673cae
FG
639
640 // no pins (besides dirty)?
641 if (dn->get_num_ref() != dn->is_dirty())
642 return;
643
644 // was the dn new?
645 if (dn->is_new()) {
11fdf7f2 646 dout(10) << __func__ << " " << *dn << " in " << *this << dendl;
7c673cae
FG
647 if (dn->is_dirty())
648 dn->mark_clean();
649 remove_dentry(dn);
650
651 // NOTE: we may not have any more dirty dentries, but the fnode
652 // still changed, so the directory must remain dirty.
653 }
654}
655
656
11fdf7f2 657void CDir::unlink_inode_work(CDentry *dn)
7c673cae
FG
658{
659 CInode *in = dn->get_linkage()->get_inode();
660
661 if (dn->get_linkage()->is_remote()) {
662 // remote
663 if (in)
664 dn->unlink_remote(dn->get_linkage());
665
666 dn->get_linkage()->set_remote(0, 0);
667 } else if (dn->get_linkage()->is_primary()) {
668 // primary
669 // unpin dentry?
670 if (in->get_num_ref())
671 dn->put(CDentry::PIN_INODEPIN);
11fdf7f2
TL
672
673 if (in->state_test(CInode::STATE_TRACKEDBYOFT))
674 inode->mdcache->open_file_table.notify_unlink(in);
675 if (in->is_any_caps())
676 adjust_num_inodes_with_caps(-1);
7c673cae
FG
677
678 // unlink auth_pin count
11fdf7f2
TL
679 if (in->auth_pins)
680 dn->adjust_nested_auth_pins(-in->auth_pins, nullptr);
28e407b8 681
7c673cae
FG
682 // detach inode
683 in->remove_primary_parent(dn);
28e407b8
AA
684 if (in->is_dir())
685 in->item_pop_lru.remove_myself();
7c673cae
FG
686 dn->get_linkage()->inode = 0;
687 } else {
11fdf7f2 688 ceph_assert(!dn->get_linkage()->is_null());
7c673cae
FG
689 }
690}
691
692void CDir::add_to_bloom(CDentry *dn)
693{
11fdf7f2 694 ceph_assert(dn->last == CEPH_NOSNAP);
7c673cae
FG
695 if (!bloom) {
696 /* not create bloom filter for incomplete dir that was added by log replay */
697 if (!is_complete())
698 return;
699
700 /* don't maintain bloom filters in standby replay (saves cycles, and also
701 * avoids need to implement clearing it in EExport for #16924) */
702 if (cache->mds->is_standby_replay()) {
703 return;
704 }
705
706 unsigned size = get_num_head_items() + get_num_snap_items();
707 if (size < 100) size = 100;
708 bloom.reset(new bloom_filter(size, 1.0 / size, 0));
709 }
710 /* This size and false positive probability is completely random.*/
94b18763 711 bloom->insert(dn->get_name().data(), dn->get_name().size());
7c673cae
FG
712}
713
11fdf7f2 714bool CDir::is_in_bloom(std::string_view name)
7c673cae
FG
715{
716 if (!bloom)
717 return false;
94b18763 718 return bloom->contains(name.data(), name.size());
7c673cae
FG
719}
720
721void CDir::remove_null_dentries() {
11fdf7f2 722 dout(12) << __func__ << " " << *this << dendl;
7c673cae 723
94b18763 724 auto p = items.begin();
7c673cae
FG
725 while (p != items.end()) {
726 CDentry *dn = p->second;
727 ++p;
728 if (dn->get_linkage()->is_null() && !dn->is_projected())
729 remove_dentry(dn);
730 }
731
11fdf7f2
TL
732 ceph_assert(num_snap_null == 0);
733 ceph_assert(num_head_null == 0);
734 ceph_assert(get_num_any() == items.size());
7c673cae
FG
735}
736
737/** remove dirty null dentries for deleted directory. the dirfrag will be
738 * deleted soon, so it's safe to not commit dirty dentries.
739 *
740 * This is called when a directory is being deleted, a prerequisite
741 * of which is that its children have been unlinked: we expect to only see
742 * null, unprojected dentries here.
743 */
744void CDir::try_remove_dentries_for_stray()
745{
746 dout(10) << __func__ << dendl;
11fdf7f2 747 ceph_assert(get_parent_dir()->inode->is_stray());
7c673cae
FG
748
749 // clear dirty only when the directory was not snapshotted
750 bool clear_dirty = !inode->snaprealm;
751
94b18763 752 auto p = items.begin();
7c673cae
FG
753 while (p != items.end()) {
754 CDentry *dn = p->second;
755 ++p;
756 if (dn->last == CEPH_NOSNAP) {
11fdf7f2
TL
757 ceph_assert(!dn->is_projected());
758 ceph_assert(dn->get_linkage()->is_null());
7c673cae
FG
759 if (clear_dirty && dn->is_dirty())
760 dn->mark_clean();
761 // It's OK to remove lease prematurely because we will never link
762 // the dentry to inode again.
763 if (dn->is_any_leases())
764 dn->remove_client_leases(cache->mds->locker);
765 if (dn->get_num_ref() == 0)
766 remove_dentry(dn);
767 } else {
11fdf7f2 768 ceph_assert(!dn->is_projected());
7c673cae
FG
769 CDentry::linkage_t *dnl= dn->get_linkage();
770 CInode *in = NULL;
771 if (dnl->is_primary()) {
772 in = dnl->get_inode();
773 if (clear_dirty && in->is_dirty())
774 in->mark_clean();
775 }
776 if (clear_dirty && dn->is_dirty())
777 dn->mark_clean();
778 if (dn->get_num_ref() == 0) {
779 remove_dentry(dn);
780 if (in)
781 cache->remove_inode(in);
782 }
783 }
784 }
785
786 if (clear_dirty && is_dirty())
787 mark_clean();
788}
789
7c673cae
FG
790bool CDir::try_trim_snap_dentry(CDentry *dn, const set<snapid_t>& snaps)
791{
11fdf7f2 792 ceph_assert(dn->last != CEPH_NOSNAP);
7c673cae
FG
793 set<snapid_t>::const_iterator p = snaps.lower_bound(dn->first);
794 CDentry::linkage_t *dnl= dn->get_linkage();
795 CInode *in = 0;
796 if (dnl->is_primary())
797 in = dnl->get_inode();
798 if ((p == snaps.end() || *p > dn->last) &&
799 (dn->get_num_ref() == dn->is_dirty()) &&
800 (!in || in->get_num_ref() == in->is_dirty())) {
801 dout(10) << " purging snapped " << *dn << dendl;
802 if (in && in->is_dirty())
803 in->mark_clean();
804 remove_dentry(dn);
805 if (in) {
806 dout(10) << " purging snapped " << *in << dendl;
807 cache->remove_inode(in);
808 }
809 return true;
810 }
811 return false;
812}
813
814
815void CDir::purge_stale_snap_data(const set<snapid_t>& snaps)
816{
11fdf7f2 817 dout(10) << __func__ << " " << snaps << dendl;
7c673cae 818
94b18763 819 auto p = items.begin();
7c673cae
FG
820 while (p != items.end()) {
821 CDentry *dn = p->second;
822 ++p;
823
824 if (dn->last == CEPH_NOSNAP)
825 continue;
826
827 try_trim_snap_dentry(dn, snaps);
828 }
829}
830
831
832/**
833 * steal_dentry -- semi-violently move a dentry from one CDir to another
834 * (*) violently, in that nitems, most pins, etc. are not correctly maintained
835 * on the old CDir corpse; must call finish_old_fragment() when finished.
836 */
837void CDir::steal_dentry(CDentry *dn)
838{
11fdf7f2 839 dout(15) << __func__ << " " << *dn << dendl;
7c673cae
FG
840
841 items[dn->key()] = dn;
842
843 dn->dir->items.erase(dn->key());
844 if (dn->dir->items.empty())
845 dn->dir->put(PIN_CHILD);
846
847 if (get_num_any() == 0)
848 get(PIN_CHILD);
849 if (dn->get_linkage()->is_null()) {
850 if (dn->last == CEPH_NOSNAP)
851 num_head_null++;
852 else
853 num_snap_null++;
854 } else if (dn->last == CEPH_NOSNAP) {
855 num_head_items++;
856
857 if (dn->get_linkage()->is_primary()) {
858 CInode *in = dn->get_linkage()->get_inode();
94b18763 859 auto pi = in->get_projected_inode();
28e407b8 860 if (in->is_dir()) {
7c673cae 861 fnode.fragstat.nsubdirs++;
28e407b8
AA
862 if (in->item_pop_lru.is_on_list())
863 pop_lru_subdirs.push_back(&in->item_pop_lru);
864 } else {
7c673cae 865 fnode.fragstat.nfiles++;
28e407b8 866 }
7c673cae
FG
867 fnode.rstat.rbytes += pi->accounted_rstat.rbytes;
868 fnode.rstat.rfiles += pi->accounted_rstat.rfiles;
869 fnode.rstat.rsubdirs += pi->accounted_rstat.rsubdirs;
11fdf7f2 870 fnode.rstat.rsnaps += pi->accounted_rstat.rsnaps;
7c673cae
FG
871 if (pi->accounted_rstat.rctime > fnode.rstat.rctime)
872 fnode.rstat.rctime = pi->accounted_rstat.rctime;
873
11fdf7f2
TL
874 if (in->is_any_caps())
875 adjust_num_inodes_with_caps(1);
876
7c673cae
FG
877 // move dirty inode rstat to new dirfrag
878 if (in->is_dirty_rstat())
879 dirty_rstat_inodes.push_back(&in->dirty_rstat_item);
880 } else if (dn->get_linkage()->is_remote()) {
881 if (dn->get_linkage()->get_remote_d_type() == DT_DIR)
882 fnode.fragstat.nsubdirs++;
883 else
884 fnode.fragstat.nfiles++;
885 }
886 } else {
887 num_snap_items++;
888 if (dn->get_linkage()->is_primary()) {
889 CInode *in = dn->get_linkage()->get_inode();
890 if (in->is_dirty_rstat())
891 dirty_rstat_inodes.push_back(&in->dirty_rstat_item);
892 }
893 }
894
11fdf7f2 895 {
7c673cae 896 int dap = dn->get_num_dir_auth_pins();
11fdf7f2
TL
897 if (dap) {
898 adjust_nested_auth_pins(dap, NULL);
899 dn->dir->adjust_nested_auth_pins(-dap, NULL);
900 }
7c673cae
FG
901 }
902
b32b8144
FG
903 if (dn->is_dirty()) {
904 dirty_dentries.push_back(&dn->item_dir_dirty);
7c673cae 905 num_dirty++;
b32b8144 906 }
7c673cae
FG
907
908 dn->dir = this;
909}
910
11fdf7f2 911void CDir::prepare_old_fragment(map<string_snap_t, MDSContext::vec >& dentry_waiters, bool replay)
7c673cae
FG
912{
913 // auth_pin old fragment for duration so that any auth_pinning
914 // during the dentry migration doesn't trigger side effects
915 if (!replay && is_auth())
916 auth_pin(this);
31f18b77
FG
917
918 if (!waiting_on_dentry.empty()) {
94b18763
FG
919 for (const auto &p : waiting_on_dentry) {
920 auto &e = dentry_waiters[p.first];
921 for (const auto &waiter : p.second) {
922 e.push_back(waiter);
923 }
924 }
31f18b77
FG
925 waiting_on_dentry.clear();
926 put(PIN_DNWAITER);
927 }
7c673cae
FG
928}
929
930void CDir::prepare_new_fragment(bool replay)
931{
932 if (!replay && is_auth()) {
933 _freeze_dir();
934 mark_complete();
935 }
31f18b77 936 inode->add_dirfrag(this);
7c673cae
FG
937}
938
11fdf7f2 939void CDir::finish_old_fragment(MDSContext::vec& waiters, bool replay)
7c673cae
FG
940{
941 // take waiters _before_ unfreeze...
942 if (!replay) {
943 take_waiting(WAIT_ANY_MASK, waiters);
944 if (is_auth()) {
945 auth_unpin(this); // pinned in prepare_old_fragment
11fdf7f2 946 ceph_assert(is_frozen_dir());
7c673cae
FG
947 unfreeze_dir();
948 }
949 }
950
11fdf7f2
TL
951 ceph_assert(dir_auth_pins == 0);
952 ceph_assert(auth_pins == 0);
7c673cae
FG
953
954 num_head_items = num_head_null = 0;
955 num_snap_items = num_snap_null = 0;
11fdf7f2 956 adjust_num_inodes_with_caps(-num_inodes_with_caps);
7c673cae
FG
957
958 // this mirrors init_fragment_pins()
959 if (is_auth())
960 clear_replica_map();
961 if (is_dirty())
962 mark_clean();
963 if (state_test(STATE_IMPORTBOUND))
964 put(PIN_IMPORTBOUND);
965 if (state_test(STATE_EXPORTBOUND))
966 put(PIN_EXPORTBOUND);
967 if (is_subtree_root())
968 put(PIN_SUBTREE);
969
970 if (auth_pins > 0)
971 put(PIN_AUTHPIN);
972
11fdf7f2 973 ceph_assert(get_num_ref() == (state_test(STATE_STICKY) ? 1:0));
7c673cae
FG
974}
975
976void CDir::init_fragment_pins()
977{
181888fb 978 if (is_replicated())
7c673cae
FG
979 get(PIN_REPLICATED);
980 if (state_test(STATE_DIRTY))
981 get(PIN_DIRTY);
982 if (state_test(STATE_EXPORTBOUND))
983 get(PIN_EXPORTBOUND);
984 if (state_test(STATE_IMPORTBOUND))
985 get(PIN_IMPORTBOUND);
986 if (is_subtree_root())
987 get(PIN_SUBTREE);
988}
989
11fdf7f2 990void CDir::split(int bits, list<CDir*>& subs, MDSContext::vec& waiters, bool replay)
7c673cae
FG
991{
992 dout(10) << "split by " << bits << " bits on " << *this << dendl;
993
11fdf7f2 994 ceph_assert(replay || is_complete() || !is_auth());
7c673cae 995
11fdf7f2 996 frag_vec_t frags;
7c673cae
FG
997 frag.split(bits, frags);
998
999 vector<CDir*> subfrags(1 << bits);
1000
1001 double fac = 1.0 / (double)(1 << bits); // for scaling load vecs
1002
1003 version_t rstat_version = inode->get_projected_inode()->rstat.version;
1004 version_t dirstat_version = inode->get_projected_inode()->dirstat.version;
1005
1006 nest_info_t rstatdiff;
1007 frag_info_t fragstatdiff;
1008 if (fnode.accounted_rstat.version == rstat_version)
1009 rstatdiff.add_delta(fnode.accounted_rstat, fnode.rstat);
1010 if (fnode.accounted_fragstat.version == dirstat_version)
1011 fragstatdiff.add_delta(fnode.accounted_fragstat, fnode.fragstat);
1012 dout(10) << " rstatdiff " << rstatdiff << " fragstatdiff " << fragstatdiff << dendl;
1013
11fdf7f2 1014 map<string_snap_t, MDSContext::vec > dentry_waiters;
31f18b77 1015 prepare_old_fragment(dentry_waiters, replay);
7c673cae
FG
1016
1017 // create subfrag dirs
1018 int n = 0;
11fdf7f2
TL
1019 for (const auto& fg : frags) {
1020 CDir *f = new CDir(inode, fg, cache, is_auth());
7c673cae 1021 f->state_set(state & (MASK_STATE_FRAGMENT_KEPT | STATE_COMPLETE));
181888fb 1022 f->get_replicas() = get_replicas();
7c673cae 1023 f->set_version(get_version());
7c673cae
FG
1024 f->pop_me = pop_me;
1025 f->pop_me.scale(fac);
1026
1027 // FIXME; this is an approximation
1028 f->pop_nested = pop_nested;
1029 f->pop_nested.scale(fac);
1030 f->pop_auth_subtree = pop_auth_subtree;
1031 f->pop_auth_subtree.scale(fac);
1032 f->pop_auth_subtree_nested = pop_auth_subtree_nested;
1033 f->pop_auth_subtree_nested.scale(fac);
1034
11fdf7f2 1035 dout(10) << " subfrag " << fg << " " << *f << dendl;
7c673cae
FG
1036 subfrags[n++] = f;
1037 subs.push_back(f);
7c673cae
FG
1038
1039 f->set_dir_auth(get_dir_auth());
11fdf7f2 1040 f->freeze_tree_state = freeze_tree_state;
7c673cae 1041 f->prepare_new_fragment(replay);
1adf2230 1042 f->init_fragment_pins();
7c673cae
FG
1043 }
1044
1045 // repartition dentries
1046 while (!items.empty()) {
94b18763 1047 auto p = items.begin();
7c673cae
FG
1048
1049 CDentry *dn = p->second;
94b18763 1050 frag_t subfrag = inode->pick_dirfrag(dn->get_name());
7c673cae
FG
1051 int n = (subfrag.value() & (subfrag.mask() ^ frag.mask())) >> subfrag.mask_shift();
1052 dout(15) << " subfrag " << subfrag << " n=" << n << " for " << p->first << dendl;
1053 CDir *f = subfrags[n];
1054 f->steal_dentry(dn);
1055 }
1056
94b18763 1057 for (const auto &p : dentry_waiters) {
31f18b77
FG
1058 frag_t subfrag = inode->pick_dirfrag(p.first.name);
1059 int n = (subfrag.value() & (subfrag.mask() ^ frag.mask())) >> subfrag.mask_shift();
1060 CDir *f = subfrags[n];
1061
1062 if (f->waiting_on_dentry.empty())
1063 f->get(PIN_DNWAITER);
94b18763
FG
1064 auto &e = f->waiting_on_dentry[p.first];
1065 for (const auto &waiter : p.second) {
1066 e.push_back(waiter);
1067 }
31f18b77
FG
1068 }
1069
7c673cae
FG
1070 // FIXME: handle dirty old rstat
1071
1072 // fix up new frag fragstats
1073 for (int i=0; i<n; i++) {
1074 CDir *f = subfrags[i];
1075 f->fnode.rstat.version = rstat_version;
1076 f->fnode.accounted_rstat = f->fnode.rstat;
1077 f->fnode.fragstat.version = dirstat_version;
1078 f->fnode.accounted_fragstat = f->fnode.fragstat;
1079 dout(10) << " rstat " << f->fnode.rstat << " fragstat " << f->fnode.fragstat
1080 << " on " << *f << dendl;
1081 }
1082
1083 // give any outstanding frag stat differential to first frag
1084 dout(10) << " giving rstatdiff " << rstatdiff << " fragstatdiff" << fragstatdiff
1085 << " to " << *subfrags[0] << dendl;
1086 subfrags[0]->fnode.accounted_rstat.add(rstatdiff);
1087 subfrags[0]->fnode.accounted_fragstat.add(fragstatdiff);
1088
1089 finish_old_fragment(waiters, replay);
1090}
1091
11fdf7f2 1092void CDir::merge(list<CDir*>& subs, MDSContext::vec& waiters, bool replay)
7c673cae
FG
1093{
1094 dout(10) << "merge " << subs << dendl;
1095
11fdf7f2
TL
1096 set_dir_auth(subs.front()->get_dir_auth());
1097 freeze_tree_state = subs.front()->freeze_tree_state;
1098
7c673cae 1099 for (auto dir : subs) {
11fdf7f2
TL
1100 ceph_assert(get_dir_auth() == dir->get_dir_auth());
1101 ceph_assert(freeze_tree_state == dir->freeze_tree_state);
7c673cae
FG
1102 }
1103
7c673cae
FG
1104 prepare_new_fragment(replay);
1105
1106 nest_info_t rstatdiff;
1107 frag_info_t fragstatdiff;
1108 bool touched_mtime, touched_chattr;
1109 version_t rstat_version = inode->get_projected_inode()->rstat.version;
1110 version_t dirstat_version = inode->get_projected_inode()->dirstat.version;
1111
11fdf7f2 1112 map<string_snap_t, MDSContext::vec > dentry_waiters;
31f18b77 1113
7c673cae
FG
1114 for (auto dir : subs) {
1115 dout(10) << " subfrag " << dir->get_frag() << " " << *dir << dendl;
11fdf7f2 1116 ceph_assert(!dir->is_auth() || dir->is_complete() || replay);
7c673cae
FG
1117
1118 if (dir->fnode.accounted_rstat.version == rstat_version)
1119 rstatdiff.add_delta(dir->fnode.accounted_rstat, dir->fnode.rstat);
1120 if (dir->fnode.accounted_fragstat.version == dirstat_version)
1121 fragstatdiff.add_delta(dir->fnode.accounted_fragstat, dir->fnode.fragstat,
1122 &touched_mtime, &touched_chattr);
1123
31f18b77 1124 dir->prepare_old_fragment(dentry_waiters, replay);
7c673cae
FG
1125
1126 // steal dentries
1127 while (!dir->items.empty())
1128 steal_dentry(dir->items.begin()->second);
1129
1130 // merge replica map
181888fb
FG
1131 for (const auto &p : dir->get_replicas()) {
1132 unsigned cur = get_replicas()[p.first];
1133 if (p.second > cur)
1134 get_replicas()[p.first] = p.second;
7c673cae
FG
1135 }
1136
1137 // merge version
1138 if (dir->get_version() > get_version())
1139 set_version(dir->get_version());
1140
1141 // merge state
1142 state_set(dir->get_state() & MASK_STATE_FRAGMENT_KEPT);
7c673cae
FG
1143
1144 dir->finish_old_fragment(waiters, replay);
1145 inode->close_dirfrag(dir->get_frag());
1146 }
1147
31f18b77
FG
1148 if (!dentry_waiters.empty()) {
1149 get(PIN_DNWAITER);
94b18763
FG
1150 for (const auto &p : dentry_waiters) {
1151 auto &e = waiting_on_dentry[p.first];
1152 for (const auto &waiter : p.second) {
1153 e.push_back(waiter);
1154 }
31f18b77
FG
1155 }
1156 }
1157
7c673cae
FG
1158 if (is_auth() && !replay)
1159 mark_complete();
1160
1161 // FIXME: merge dirty old rstat
1162 fnode.rstat.version = rstat_version;
1163 fnode.accounted_rstat = fnode.rstat;
1164 fnode.accounted_rstat.add(rstatdiff);
1165
1166 fnode.fragstat.version = dirstat_version;
1167 fnode.accounted_fragstat = fnode.fragstat;
1168 fnode.accounted_fragstat.add(fragstatdiff);
1169
1170 init_fragment_pins();
1171}
1172
1173
1174
1175
1176void CDir::resync_accounted_fragstat()
1177{
1178 fnode_t *pf = get_projected_fnode();
94b18763 1179 auto pi = inode->get_projected_inode();
7c673cae
FG
1180
1181 if (pf->accounted_fragstat.version != pi->dirstat.version) {
1182 pf->fragstat.version = pi->dirstat.version;
11fdf7f2 1183 dout(10) << __func__ << " " << pf->accounted_fragstat << " -> " << pf->fragstat << dendl;
7c673cae
FG
1184 pf->accounted_fragstat = pf->fragstat;
1185 }
1186}
1187
1188/*
1189 * resync rstat and accounted_rstat with inode
1190 */
1191void CDir::resync_accounted_rstat()
1192{
1193 fnode_t *pf = get_projected_fnode();
94b18763 1194 auto pi = inode->get_projected_inode();
7c673cae
FG
1195
1196 if (pf->accounted_rstat.version != pi->rstat.version) {
1197 pf->rstat.version = pi->rstat.version;
11fdf7f2 1198 dout(10) << __func__ << " " << pf->accounted_rstat << " -> " << pf->rstat << dendl;
7c673cae
FG
1199 pf->accounted_rstat = pf->rstat;
1200 dirty_old_rstat.clear();
1201 }
1202}
1203
1204void CDir::assimilate_dirty_rstat_inodes()
1205{
11fdf7f2 1206 dout(10) << __func__ << dendl;
7c673cae
FG
1207 for (elist<CInode*>::iterator p = dirty_rstat_inodes.begin_use_current();
1208 !p.end(); ++p) {
1209 CInode *in = *p;
11fdf7f2 1210 ceph_assert(in->is_auth());
7c673cae
FG
1211 if (in->is_frozen())
1212 continue;
1213
94b18763
FG
1214 auto &pi = in->project_inode();
1215 pi.inode.version = in->pre_dirty();
7c673cae
FG
1216
1217 inode->mdcache->project_rstat_inode_to_frag(in, this, 0, 0, NULL);
1218 }
1219 state_set(STATE_ASSIMRSTAT);
11fdf7f2 1220 dout(10) << __func__ << " done" << dendl;
7c673cae
FG
1221}
1222
1223void CDir::assimilate_dirty_rstat_inodes_finish(MutationRef& mut, EMetaBlob *blob)
1224{
1225 if (!state_test(STATE_ASSIMRSTAT))
1226 return;
1227 state_clear(STATE_ASSIMRSTAT);
11fdf7f2 1228 dout(10) << __func__ << dendl;
7c673cae
FG
1229 elist<CInode*>::iterator p = dirty_rstat_inodes.begin_use_current();
1230 while (!p.end()) {
1231 CInode *in = *p;
1232 ++p;
1233
1234 if (in->is_frozen())
1235 continue;
1236
1237 CDentry *dn = in->get_projected_parent_dn();
1238
1239 mut->auth_pin(in);
1240 mut->add_projected_inode(in);
1241
1242 in->clear_dirty_rstat();
1243 blob->add_primary_dentry(dn, in, true);
1244 }
1245
1246 if (!dirty_rstat_inodes.empty())
1247 inode->mdcache->mds->locker->mark_updated_scatterlock(&inode->nestlock);
1248}
1249
1250
1251
1252
1253/****************************************
1254 * WAITING
1255 */
1256
11fdf7f2 1257void CDir::add_dentry_waiter(std::string_view dname, snapid_t snapid, MDSContext *c)
7c673cae
FG
1258{
1259 if (waiting_on_dentry.empty())
1260 get(PIN_DNWAITER);
1261 waiting_on_dentry[string_snap_t(dname, snapid)].push_back(c);
11fdf7f2 1262 dout(10) << __func__ << " dentry " << dname
7c673cae
FG
1263 << " snap " << snapid
1264 << " " << c << " on " << *this << dendl;
1265}
1266
11fdf7f2
TL
1267void CDir::take_dentry_waiting(std::string_view dname, snapid_t first, snapid_t last,
1268 MDSContext::vec& ls)
7c673cae
FG
1269{
1270 if (waiting_on_dentry.empty())
1271 return;
1272
1273 string_snap_t lb(dname, first);
1274 string_snap_t ub(dname, last);
94b18763
FG
1275 auto it = waiting_on_dentry.lower_bound(lb);
1276 while (it != waiting_on_dentry.end() &&
1277 !(ub < it->first)) {
11fdf7f2 1278 dout(10) << __func__ << " " << dname
7c673cae 1279 << " [" << first << "," << last << "] found waiter on snap "
94b18763 1280 << it->first.snapid
7c673cae 1281 << " on " << *this << dendl;
94b18763
FG
1282 for (const auto &waiter : it->second) {
1283 ls.push_back(waiter);
1284 }
1285 waiting_on_dentry.erase(it++);
7c673cae
FG
1286 }
1287
1288 if (waiting_on_dentry.empty())
1289 put(PIN_DNWAITER);
1290}
1291
11fdf7f2 1292void CDir::take_sub_waiting(MDSContext::vec& ls)
7c673cae 1293{
11fdf7f2 1294 dout(10) << __func__ << dendl;
7c673cae 1295 if (!waiting_on_dentry.empty()) {
94b18763
FG
1296 for (const auto &p : waiting_on_dentry) {
1297 for (const auto &waiter : p.second) {
1298 ls.push_back(waiter);
1299 }
1300 }
7c673cae
FG
1301 waiting_on_dentry.clear();
1302 put(PIN_DNWAITER);
1303 }
1304}
1305
1306
1307
11fdf7f2 1308void CDir::add_waiter(uint64_t tag, MDSContext *c)
7c673cae
FG
1309{
1310 // hierarchical?
7c673cae
FG
1311
1312 // at subtree root?
1313 if (tag & WAIT_ATSUBTREEROOT) {
1314 if (!is_subtree_root()) {
1315 // try parent
1316 dout(10) << "add_waiter " << std::hex << tag << std::dec << " " << c << " should be ATSUBTREEROOT, " << *this << " is not root, trying parent" << dendl;
1317 inode->parent->dir->add_waiter(tag, c);
1318 return;
1319 }
1320 }
1321
11fdf7f2 1322 ceph_assert(!(tag & WAIT_CREATED) || state_test(STATE_CREATING));
7c673cae
FG
1323
1324 MDSCacheObject::add_waiter(tag, c);
1325}
1326
1327
1328
1329/* NOTE: this checks dentry waiters too */
11fdf7f2 1330void CDir::take_waiting(uint64_t mask, MDSContext::vec& ls)
7c673cae
FG
1331{
1332 if ((mask & WAIT_DENTRY) && !waiting_on_dentry.empty()) {
1333 // take all dentry waiters
94b18763
FG
1334 for (const auto &p : waiting_on_dentry) {
1335 dout(10) << "take_waiting dentry " << p.first.name
1336 << " snap " << p.first.snapid << " on " << *this << dendl;
1337 for (const auto &waiter : p.second) {
1338 ls.push_back(waiter);
1339 }
7c673cae 1340 }
94b18763 1341 waiting_on_dentry.clear();
7c673cae
FG
1342 put(PIN_DNWAITER);
1343 }
1344
1345 // waiting
1346 MDSCacheObject::take_waiting(mask, ls);
1347}
1348
1349
1350void CDir::finish_waiting(uint64_t mask, int result)
1351{
11fdf7f2 1352 dout(11) << __func__ << " mask " << hex << mask << dec << " result " << result << " on " << *this << dendl;
7c673cae 1353
11fdf7f2 1354 MDSContext::vec finished;
7c673cae
FG
1355 take_waiting(mask, finished);
1356 if (result < 0)
1357 finish_contexts(g_ceph_context, finished, result);
1358 else
1359 cache->mds->queue_waiters(finished);
1360}
1361
1362
1363
1364// dirty/clean
1365
1366fnode_t *CDir::project_fnode()
1367{
11fdf7f2
TL
1368 ceph_assert(get_version() != 0);
1369 auto &p = projected_fnode.emplace_back(*get_projected_fnode());
7c673cae
FG
1370
1371 if (scrub_infop && scrub_infop->last_scrub_dirty) {
94b18763
FG
1372 p.localized_scrub_stamp = scrub_infop->last_local.time;
1373 p.localized_scrub_version = scrub_infop->last_local.version;
1374 p.recursive_scrub_stamp = scrub_infop->last_recursive.time;
1375 p.recursive_scrub_version = scrub_infop->last_recursive.version;
7c673cae
FG
1376 scrub_infop->last_scrub_dirty = false;
1377 scrub_maybe_delete_info();
1378 }
1379
94b18763
FG
1380 dout(10) << __func__ << " " << &p << dendl;
1381 return &p;
7c673cae
FG
1382}
1383
1384void CDir::pop_and_dirty_projected_fnode(LogSegment *ls)
1385{
11fdf7f2 1386 ceph_assert(!projected_fnode.empty());
94b18763
FG
1387 auto &front = projected_fnode.front();
1388 dout(15) << __func__ << " " << &front << " v" << front.version << dendl;
1389 fnode = front;
7c673cae 1390 _mark_dirty(ls);
7c673cae
FG
1391 projected_fnode.pop_front();
1392}
1393
1394
1395version_t CDir::pre_dirty(version_t min)
1396{
1397 if (min > projected_version)
1398 projected_version = min;
1399 ++projected_version;
11fdf7f2 1400 dout(10) << __func__ << " " << projected_version << dendl;
7c673cae
FG
1401 return projected_version;
1402}
1403
1404void CDir::mark_dirty(version_t pv, LogSegment *ls)
1405{
11fdf7f2
TL
1406 ceph_assert(get_version() < pv);
1407 ceph_assert(pv <= projected_version);
7c673cae
FG
1408 fnode.version = pv;
1409 _mark_dirty(ls);
1410}
1411
1412void CDir::_mark_dirty(LogSegment *ls)
1413{
1414 if (!state_test(STATE_DIRTY)) {
11fdf7f2 1415 dout(10) << __func__ << " (was clean) " << *this << " version " << get_version() << dendl;
7c673cae 1416 _set_dirty_flag();
11fdf7f2 1417 ceph_assert(ls);
7c673cae 1418 } else {
11fdf7f2 1419 dout(10) << __func__ << " (already dirty) " << *this << " version " << get_version() << dendl;
7c673cae
FG
1420 }
1421 if (ls) {
1422 ls->dirty_dirfrags.push_back(&item_dirty);
1423
1424 // if i've never committed, i need to be before _any_ mention of me is trimmed from the journal.
1425 if (committed_version == 0 && !item_new.is_on_list())
1426 ls->new_dirfrags.push_back(&item_new);
1427 }
1428}
1429
1430void CDir::mark_new(LogSegment *ls)
1431{
1432 ls->new_dirfrags.push_back(&item_new);
1433 state_clear(STATE_CREATING);
1434
11fdf7f2 1435 MDSContext::vec waiters;
7c673cae
FG
1436 take_waiting(CDir::WAIT_CREATED, waiters);
1437 cache->mds->queue_waiters(waiters);
1438}
1439
1440void CDir::mark_clean()
1441{
11fdf7f2 1442 dout(10) << __func__ << " " << *this << " version " << get_version() << dendl;
7c673cae
FG
1443 if (state_test(STATE_DIRTY)) {
1444 item_dirty.remove_myself();
1445 item_new.remove_myself();
1446
1447 state_clear(STATE_DIRTY);
1448 put(PIN_DIRTY);
1449 }
1450}
1451
1452// caller should hold auth pin of this
1453void CDir::log_mark_dirty()
1454{
b32b8144 1455 if (is_dirty() || projected_version > get_version())
7c673cae
FG
1456 return; // noop if it is already dirty or will be dirty
1457
1458 version_t pv = pre_dirty();
1459 mark_dirty(pv, cache->mds->mdlog->get_current_segment());
1460}
1461
1462void CDir::mark_complete() {
1463 state_set(STATE_COMPLETE);
1464 bloom.reset();
1465}
1466
1467void CDir::first_get()
1468{
1469 inode->get(CInode::PIN_DIRFRAG);
1470}
1471
1472void CDir::last_put()
1473{
1474 inode->put(CInode::PIN_DIRFRAG);
1475}
1476
1477
1478
1479/******************************************************************************
1480 * FETCH and COMMIT
1481 */
1482
1483// -----------------------
1484// FETCH
11fdf7f2 1485void CDir::fetch(MDSContext *c, bool ignore_authpinnability)
7c673cae
FG
1486{
1487 string want;
1488 return fetch(c, want, ignore_authpinnability);
1489}
1490
11fdf7f2 1491void CDir::fetch(MDSContext *c, std::string_view want_dn, bool ignore_authpinnability)
7c673cae
FG
1492{
1493 dout(10) << "fetch on " << *this << dendl;
1494
11fdf7f2
TL
1495 ceph_assert(is_auth());
1496 ceph_assert(!is_complete());
7c673cae
FG
1497
1498 if (!can_auth_pin() && !ignore_authpinnability) {
1499 if (c) {
1500 dout(7) << "fetch waiting for authpinnable" << dendl;
1501 add_waiter(WAIT_UNFREEZE, c);
1502 } else
1503 dout(7) << "fetch not authpinnable and no context" << dendl;
1504 return;
1505 }
1506
1507 // unlinked directory inode shouldn't have any entry
31f18b77
FG
1508 if (!inode->is_base() && get_parent_dir()->inode->is_stray() &&
1509 !inode->snaprealm) {
7c673cae
FG
1510 dout(7) << "fetch dirfrag for unlinked directory, mark complete" << dendl;
1511 if (get_version() == 0) {
11fdf7f2 1512 ceph_assert(inode->is_auth());
7c673cae
FG
1513 set_version(1);
1514
1515 if (state_test(STATE_REJOINUNDEF)) {
11fdf7f2 1516 ceph_assert(cache->mds->is_rejoin());
7c673cae
FG
1517 state_clear(STATE_REJOINUNDEF);
1518 cache->opened_undef_dirfrag(this);
1519 }
1520 }
1521 mark_complete();
1522
1523 if (c)
1524 cache->mds->queue_waiter(c);
1525 return;
1526 }
1527
1528 if (c) add_waiter(WAIT_COMPLETE, c);
94b18763 1529 if (!want_dn.empty()) wanted_items.insert(mempool::mds_co::string(want_dn));
7c673cae
FG
1530
1531 // already fetching?
1532 if (state_test(CDir::STATE_FETCHING)) {
1533 dout(7) << "already fetching; waiting" << dendl;
1534 return;
1535 }
1536
1537 auth_pin(this);
1538 state_set(CDir::STATE_FETCHING);
1539
1540 if (cache->mds->logger) cache->mds->logger->inc(l_mds_dir_fetch);
1541
1542 std::set<dentry_key_t> empty;
1543 _omap_fetch(NULL, empty);
1544}
1545
11fdf7f2 1546void CDir::fetch(MDSContext *c, const std::set<dentry_key_t>& keys)
7c673cae
FG
1547{
1548 dout(10) << "fetch " << keys.size() << " keys on " << *this << dendl;
1549
11fdf7f2
TL
1550 ceph_assert(is_auth());
1551 ceph_assert(!is_complete());
7c673cae
FG
1552
1553 if (!can_auth_pin()) {
1554 dout(7) << "fetch keys waiting for authpinnable" << dendl;
1555 add_waiter(WAIT_UNFREEZE, c);
1556 return;
1557 }
1558 if (state_test(CDir::STATE_FETCHING)) {
1559 dout(7) << "fetch keys waiting for full fetch" << dendl;
1560 add_waiter(WAIT_COMPLETE, c);
1561 return;
1562 }
1563
1564 auth_pin(this);
1565 if (cache->mds->logger) cache->mds->logger->inc(l_mds_dir_fetch);
1566
1567 _omap_fetch(c, keys);
1568}
1569
1570class C_IO_Dir_OMAP_FetchedMore : public CDirIOContext {
11fdf7f2 1571 MDSContext *fin;
7c673cae
FG
1572public:
1573 bufferlist hdrbl;
1574 bool more = false;
1575 map<string, bufferlist> omap; ///< carry-over from before
1576 map<string, bufferlist> omap_more; ///< new batch
1577 int ret;
11fdf7f2 1578 C_IO_Dir_OMAP_FetchedMore(CDir *d, MDSContext *f) :
7c673cae
FG
1579 CDirIOContext(d), fin(f), ret(0) { }
1580 void finish(int r) {
1581 // merge results
1582 if (omap.empty()) {
1583 omap.swap(omap_more);
1584 } else {
1585 omap.insert(omap_more.begin(), omap_more.end());
1586 }
1587 if (more) {
1588 dir->_omap_fetch_more(hdrbl, omap, fin);
1589 } else {
1590 dir->_omap_fetched(hdrbl, omap, !fin, r);
1591 if (fin)
1592 fin->complete(r);
1593 }
1594 }
91327a77
AA
1595 void print(ostream& out) const override {
1596 out << "dirfrag_fetch_more(" << dir->dirfrag() << ")";
1597 }
7c673cae
FG
1598};
1599
1600class C_IO_Dir_OMAP_Fetched : public CDirIOContext {
11fdf7f2 1601 MDSContext *fin;
7c673cae
FG
1602public:
1603 bufferlist hdrbl;
1604 bool more = false;
1605 map<string, bufferlist> omap;
1606 bufferlist btbl;
1607 int ret1, ret2, ret3;
1608
11fdf7f2 1609 C_IO_Dir_OMAP_Fetched(CDir *d, MDSContext *f) :
7c673cae
FG
1610 CDirIOContext(d), fin(f), ret1(0), ret2(0), ret3(0) { }
1611 void finish(int r) override {
1612 // check the correctness of backtrace
1613 if (r >= 0 && ret3 != -ECANCELED)
1614 dir->inode->verify_diri_backtrace(btbl, ret3);
1615 if (r >= 0) r = ret1;
1616 if (r >= 0) r = ret2;
1617 if (more) {
1618 dir->_omap_fetch_more(hdrbl, omap, fin);
1619 } else {
1620 dir->_omap_fetched(hdrbl, omap, !fin, r);
1621 if (fin)
1622 fin->complete(r);
1623 }
1624 }
91327a77
AA
1625 void print(ostream& out) const override {
1626 out << "dirfrag_fetch(" << dir->dirfrag() << ")";
1627 }
7c673cae
FG
1628};
1629
11fdf7f2 1630void CDir::_omap_fetch(MDSContext *c, const std::set<dentry_key_t>& keys)
7c673cae
FG
1631{
1632 C_IO_Dir_OMAP_Fetched *fin = new C_IO_Dir_OMAP_Fetched(this, c);
1633 object_t oid = get_ondisk_object();
1634 object_locator_t oloc(cache->mds->mdsmap->get_metadata_pool());
1635 ObjectOperation rd;
1636 rd.omap_get_header(&fin->hdrbl, &fin->ret1);
1637 if (keys.empty()) {
11fdf7f2
TL
1638 ceph_assert(!c);
1639 rd.omap_get_vals("", "", g_conf()->mds_dir_keys_per_op,
7c673cae
FG
1640 &fin->omap, &fin->more, &fin->ret2);
1641 } else {
11fdf7f2 1642 ceph_assert(c);
7c673cae 1643 std::set<std::string> str_keys;
94b18763 1644 for (auto p : keys) {
7c673cae 1645 string str;
94b18763 1646 p.encode(str);
7c673cae
FG
1647 str_keys.insert(str);
1648 }
1649 rd.omap_get_vals_by_keys(str_keys, &fin->omap, &fin->ret2);
1650 }
1651 // check the correctness of backtrace
11fdf7f2 1652 if (g_conf()->mds_verify_backtrace > 0 && frag == frag_t()) {
7c673cae
FG
1653 rd.getxattr("parent", &fin->btbl, &fin->ret3);
1654 rd.set_last_op_flags(CEPH_OSD_OP_FLAG_FAILOK);
1655 } else {
1656 fin->ret3 = -ECANCELED;
1657 }
1658
1659 cache->mds->objecter->read(oid, oloc, rd, CEPH_NOSNAP, NULL, 0,
1660 new C_OnFinisher(fin, cache->mds->finisher));
1661}
1662
1663void CDir::_omap_fetch_more(
1664 bufferlist& hdrbl,
1665 map<string, bufferlist>& omap,
11fdf7f2 1666 MDSContext *c)
7c673cae
FG
1667{
1668 // we have more omap keys to fetch!
1669 object_t oid = get_ondisk_object();
1670 object_locator_t oloc(cache->mds->mdsmap->get_metadata_pool());
1671 C_IO_Dir_OMAP_FetchedMore *fin = new C_IO_Dir_OMAP_FetchedMore(this, c);
1672 fin->hdrbl.claim(hdrbl);
1673 fin->omap.swap(omap);
1674 ObjectOperation rd;
1675 rd.omap_get_vals(fin->omap.rbegin()->first,
1676 "", /* filter prefix */
11fdf7f2 1677 g_conf()->mds_dir_keys_per_op,
7c673cae
FG
1678 &fin->omap_more,
1679 &fin->more,
1680 &fin->ret);
1681 cache->mds->objecter->read(oid, oloc, rd, CEPH_NOSNAP, NULL, 0,
1682 new C_OnFinisher(fin, cache->mds->finisher));
1683}
1684
1685CDentry *CDir::_load_dentry(
11fdf7f2
TL
1686 std::string_view key,
1687 std::string_view dname,
7c673cae
FG
1688 const snapid_t last,
1689 bufferlist &bl,
1690 const int pos,
1691 const std::set<snapid_t> *snaps,
28e407b8 1692 bool *force_dirty)
7c673cae 1693{
11fdf7f2 1694 auto q = bl.cbegin();
7c673cae
FG
1695
1696 snapid_t first;
11fdf7f2 1697 decode(first, q);
7c673cae
FG
1698
1699 // marker
1700 char type;
11fdf7f2 1701 decode(type, q);
7c673cae
FG
1702
1703 dout(20) << "_fetched pos " << pos << " marker '" << type << "' dname '" << dname
1704 << " [" << first << "," << last << "]"
1705 << dendl;
1706
1707 bool stale = false;
1708 if (snaps && last != CEPH_NOSNAP) {
1709 set<snapid_t>::const_iterator p = snaps->lower_bound(first);
1710 if (p == snaps->end() || *p > last) {
1711 dout(10) << " skipping stale dentry on [" << first << "," << last << "]" << dendl;
1712 stale = true;
1713 }
1714 }
1715
1716 /*
1717 * look for existing dentry for _last_ snap, because unlink +
1718 * create may leave a "hole" (epochs during which the dentry
1719 * doesn't exist) but for which no explicit negative dentry is in
1720 * the cache.
1721 */
1722 CDentry *dn;
1723 if (stale)
1724 dn = lookup_exact_snap(dname, last);
1725 else
1726 dn = lookup(dname, last);
1727
1728 if (type == 'L') {
1729 // hard link
1730 inodeno_t ino;
1731 unsigned char d_type;
11fdf7f2
TL
1732 decode(ino, q);
1733 decode(d_type, q);
7c673cae
FG
1734
1735 if (stale) {
1736 if (!dn) {
94b18763 1737 stale_items.insert(mempool::mds_co::string(key));
7c673cae
FG
1738 *force_dirty = true;
1739 }
1740 return dn;
1741 }
1742
1743 if (dn) {
28e407b8
AA
1744 CDentry::linkage_t *dnl = dn->get_linkage();
1745 dout(12) << "_fetched had " << (dnl->is_null() ? "NEG" : "") << " dentry " << *dn << dendl;
1746 if (committed_version == 0 &&
1747 dnl->is_remote() &&
1748 dn->is_dirty() &&
1749 ino == dnl->get_remote_ino() &&
1750 d_type == dnl->get_remote_d_type()) {
1751 // see comment below
1752 dout(10) << "_fetched had underwater dentry " << *dn << ", marking clean" << dendl;
1753 dn->mark_clean();
7c673cae
FG
1754 }
1755 } else {
1756 // (remote) link
1757 dn = add_remote_dentry(dname, ino, d_type, first, last);
1758
1759 // link to inode?
1760 CInode *in = cache->get_inode(ino); // we may or may not have it.
1761 if (in) {
1762 dn->link_remote(dn->get_linkage(), in);
1763 dout(12) << "_fetched got remote link " << ino << " which we have " << *in << dendl;
1764 } else {
11fdf7f2 1765 dout(12) << "_fetched got remote link " << ino << " (don't have it)" << dendl;
7c673cae
FG
1766 }
1767 }
1768 }
1769 else if (type == 'I') {
1770 // inode
1771
1772 // Load inode data before looking up or constructing CInode
1773 InodeStore inode_data;
1774 inode_data.decode_bare(q);
1775
1776 if (stale) {
1777 if (!dn) {
94b18763 1778 stale_items.insert(mempool::mds_co::string(key));
7c673cae
FG
1779 *force_dirty = true;
1780 }
1781 return dn;
1782 }
1783
1784 bool undef_inode = false;
1785 if (dn) {
28e407b8
AA
1786 CDentry::linkage_t *dnl = dn->get_linkage();
1787 dout(12) << "_fetched had " << (dnl->is_null() ? "NEG" : "") << " dentry " << *dn << dendl;
1788
1789 if (dnl->is_primary()) {
1790 CInode *in = dnl->get_inode();
1791 if (in->state_test(CInode::STATE_REJOINUNDEF)) {
1792 undef_inode = true;
1793 } else if (committed_version == 0 &&
1794 dn->is_dirty() &&
1795 inode_data.inode.ino == in->ino() &&
1796 inode_data.inode.version == in->get_version()) {
1797 /* clean underwater item?
1798 * Underwater item is something that is dirty in our cache from
1799 * journal replay, but was previously flushed to disk before the
1800 * mds failed.
1801 *
1802 * We only do this is committed_version == 0. that implies either
1803 * - this is a fetch after from a clean/empty CDir is created
1804 * (and has no effect, since the dn won't exist); or
1805 * - this is a fetch after _recovery_, which is what we're worried
1806 * about. Items that are marked dirty from the journal should be
1807 * marked clean if they appear on disk.
1808 */
1809 dout(10) << "_fetched had underwater dentry " << *dn << ", marking clean" << dendl;
1810 dn->mark_clean();
1811 dout(10) << "_fetched had underwater inode " << *dnl->get_inode() << ", marking clean" << dendl;
1812 in->mark_clean();
1813 }
1814 }
7c673cae
FG
1815 }
1816
1817 if (!dn || undef_inode) {
1818 // add inode
1819 CInode *in = cache->get_inode(inode_data.inode.ino, last);
1820 if (!in || undef_inode) {
1821 if (undef_inode && in)
1822 in->first = first;
1823 else
1824 in = new CInode(cache, true, first, last);
1825
1826 in->inode = inode_data.inode;
1827 // symlink?
1828 if (in->is_symlink())
1829 in->symlink = inode_data.symlink;
1830
1831 in->dirfragtree.swap(inode_data.dirfragtree);
1832 in->xattrs.swap(inode_data.xattrs);
1833 in->old_inodes.swap(inode_data.old_inodes);
1834 if (!in->old_inodes.empty()) {
1835 snapid_t min_first = in->old_inodes.rbegin()->first + 1;
1836 if (min_first > in->first)
1837 in->first = min_first;
1838 }
1839
1840 in->oldest_snap = inode_data.oldest_snap;
1841 in->decode_snap_blob(inode_data.snap_blob);
1842 if (snaps && !in->snaprealm)
1843 in->purge_stale_snap_data(*snaps);
1844
1845 if (!undef_inode) {
1846 cache->add_inode(in); // add
1847 dn = add_primary_dentry(dname, in, first, last); // link
1848 }
1849 dout(12) << "_fetched got " << *dn << " " << *in << dendl;
1850
1851 if (in->inode.is_dirty_rstat())
1852 in->mark_dirty_rstat();
1853
1854 //in->hack_accessed = false;
1855 //in->hack_load_stamp = ceph_clock_now();
1856 //num_new_inodes_loaded++;
11fdf7f2 1857 } else if (g_conf().get_val<bool>("mds_hack_allow_loading_invalid_metadata")) {
94b18763
FG
1858 dout(20) << "hack: adding duplicate dentry for " << *in << dendl;
1859 dn = add_primary_dentry(dname, in, first, last);
7c673cae
FG
1860 } else {
1861 dout(0) << "_fetched badness: got (but i already had) " << *in
1862 << " mode " << in->inode.mode
1863 << " mtime " << in->inode.mtime << dendl;
1864 string dirpath, inopath;
1865 this->inode->make_path_string(dirpath);
1866 in->make_path_string(inopath);
1867 cache->mds->clog->error() << "loaded dup inode " << inode_data.inode.ino
1868 << " [" << first << "," << last << "] v" << inode_data.inode.version
1869 << " at " << dirpath << "/" << dname
1870 << ", but inode " << in->vino() << " v" << in->inode.version
1871 << " already exists at " << inopath;
1872 return dn;
1873 }
1874 }
1875 } else {
1876 std::ostringstream oss;
1877 oss << "Invalid tag char '" << type << "' pos " << pos;
1878 throw buffer::malformed_input(oss.str());
1879 }
1880
1881 return dn;
1882}
1883
1884void CDir::_omap_fetched(bufferlist& hdrbl, map<string, bufferlist>& omap,
1885 bool complete, int r)
1886{
1887 LogChannelRef clog = cache->mds->clog;
1888 dout(10) << "_fetched header " << hdrbl.length() << " bytes "
1889 << omap.size() << " keys for " << *this << dendl;
1890
11fdf7f2
TL
1891 ceph_assert(r == 0 || r == -ENOENT || r == -ENODATA);
1892 ceph_assert(is_auth());
1893 ceph_assert(!is_frozen());
7c673cae
FG
1894
1895 if (hdrbl.length() == 0) {
1896 dout(0) << "_fetched missing object for " << *this << dendl;
1897
1898 clog->error() << "dir " << dirfrag() << " object missing on disk; some "
1899 "files may be lost (" << get_path() << ")";
1900
1901 go_bad(complete);
1902 return;
1903 }
1904
1905 fnode_t got_fnode;
1906 {
11fdf7f2 1907 auto p = hdrbl.cbegin();
7c673cae 1908 try {
11fdf7f2 1909 decode(got_fnode, p);
7c673cae
FG
1910 } catch (const buffer::error &err) {
1911 derr << "Corrupt fnode in dirfrag " << dirfrag()
1912 << ": " << err << dendl;
1913 clog->warn() << "Corrupt fnode header in " << dirfrag() << ": "
1914 << err << " (" << get_path() << ")";
1915 go_bad(complete);
1916 return;
1917 }
1918 if (!p.end()) {
1919 clog->warn() << "header buffer of dir " << dirfrag() << " has "
1920 << hdrbl.length() - p.get_off() << " extra bytes ("
1921 << get_path() << ")";
1922 go_bad(complete);
1923 return;
1924 }
1925 }
1926
1927 dout(10) << "_fetched version " << got_fnode.version << dendl;
1928
1929 // take the loaded fnode?
1930 // only if we are a fresh CDir* with no prior state.
1931 if (get_version() == 0) {
11fdf7f2
TL
1932 ceph_assert(!is_projected());
1933 ceph_assert(!state_test(STATE_COMMITTING));
7c673cae
FG
1934 fnode = got_fnode;
1935 projected_version = committing_version = committed_version = got_fnode.version;
1936
1937 if (state_test(STATE_REJOINUNDEF)) {
11fdf7f2 1938 ceph_assert(cache->mds->is_rejoin());
7c673cae
FG
1939 state_clear(STATE_REJOINUNDEF);
1940 cache->opened_undef_dirfrag(this);
1941 }
1942 }
1943
1944 list<CInode*> undef_inodes;
1945
1946 // purge stale snaps?
1947 // only if we have past_parents open!
1948 bool force_dirty = false;
1949 const set<snapid_t> *snaps = NULL;
1950 SnapRealm *realm = inode->find_snaprealm();
1951 if (!realm->have_past_parents_open()) {
1952 dout(10) << " no snap purge, one or more past parents NOT open" << dendl;
1953 } else if (fnode.snap_purged_thru < realm->get_last_destroyed()) {
1954 snaps = &realm->get_snaps();
1955 dout(10) << " snap_purged_thru " << fnode.snap_purged_thru
1956 << " < " << realm->get_last_destroyed()
1957 << ", snap purge based on " << *snaps << dendl;
1958 if (get_num_snap_items() == 0) {
1959 fnode.snap_purged_thru = realm->get_last_destroyed();
1960 force_dirty = true;
1961 }
1962 }
1963
1964 unsigned pos = omap.size() - 1;
1965 for (map<string, bufferlist>::reverse_iterator p = omap.rbegin();
1966 p != omap.rend();
1967 ++p, --pos) {
1968 string dname;
1969 snapid_t last;
1970 dentry_key_t::decode_helper(p->first, dname, last);
1971
1972 CDentry *dn = NULL;
1973 try {
1974 dn = _load_dentry(
1975 p->first, dname, last, p->second, pos, snaps,
28e407b8 1976 &force_dirty);
7c673cae
FG
1977 } catch (const buffer::error &err) {
1978 cache->mds->clog->warn() << "Corrupt dentry '" << dname << "' in "
1979 "dir frag " << dirfrag() << ": "
1980 << err << "(" << get_path() << ")";
1981
1982 // Remember that this dentry is damaged. Subsequent operations
1983 // that try to act directly on it will get their EIOs, but this
1984 // dirfrag as a whole will continue to look okay (minus the
1985 // mysteriously-missing dentry)
1986 go_bad_dentry(last, dname);
1987
1988 // Anyone who was WAIT_DENTRY for this guy will get kicked
1989 // to RetryRequest, and hit the DamageTable-interrogating path.
1990 // Stats will now be bogus because we will think we're complete,
1991 // but have 1 or more missing dentries.
1992 continue;
1993 }
1994
28e407b8
AA
1995 if (!dn)
1996 continue;
7c673cae 1997
28e407b8
AA
1998 CDentry::linkage_t *dnl = dn->get_linkage();
1999 if (dnl->is_primary() && dnl->get_inode()->state_test(CInode::STATE_REJOINUNDEF))
2000 undef_inodes.push_back(dnl->get_inode());
7c673cae 2001
11fdf7f2 2002 if (wanted_items.count(mempool::mds_co::string(dname)) > 0 || !complete) {
28e407b8
AA
2003 dout(10) << " touching wanted dn " << *dn << dendl;
2004 inode->mdcache->touch_dentry(dn);
7c673cae
FG
2005 }
2006 }
2007
2008 //cache->mds->logger->inc("newin", num_new_inodes_loaded);
2009
2010 // mark complete, !fetching
2011 if (complete) {
2012 wanted_items.clear();
2013 mark_complete();
2014 state_clear(STATE_FETCHING);
2015
2016 if (scrub_infop && scrub_infop->need_scrub_local) {
2017 scrub_infop->need_scrub_local = false;
2018 scrub_local();
2019 }
2020 }
2021
2022 // open & force frags
2023 while (!undef_inodes.empty()) {
2024 CInode *in = undef_inodes.front();
2025 undef_inodes.pop_front();
2026 in->state_clear(CInode::STATE_REJOINUNDEF);
2027 cache->opened_undef_inode(in);
2028 }
2029
2030 // dirty myself to remove stale snap dentries
2031 if (force_dirty && !inode->mdcache->is_readonly())
2032 log_mark_dirty();
2033
2034 auth_unpin(this);
2035
2036 if (complete) {
2037 // kick waiters
2038 finish_waiting(WAIT_COMPLETE, 0);
2039 }
2040}
2041
2042void CDir::_go_bad()
2043{
2044 if (get_version() == 0)
2045 set_version(1);
2046 state_set(STATE_BADFRAG);
2047 // mark complete, !fetching
2048 mark_complete();
2049 state_clear(STATE_FETCHING);
2050 auth_unpin(this);
2051
2052 // kick waiters
2053 finish_waiting(WAIT_COMPLETE, -EIO);
2054}
2055
11fdf7f2 2056void CDir::go_bad_dentry(snapid_t last, std::string_view dname)
7c673cae 2057{
94b18763
FG
2058 dout(10) << __func__ << " " << dname << dendl;
2059 std::string path(get_path());
2060 path += "/";
11fdf7f2 2061 path += dname;
7c673cae 2062 const bool fatal = cache->mds->damage_table.notify_dentry(
94b18763 2063 inode->ino(), frag, last, dname, path);
7c673cae
FG
2064 if (fatal) {
2065 cache->mds->damaged();
2066 ceph_abort(); // unreachable, damaged() respawns us
2067 }
2068}
2069
2070void CDir::go_bad(bool complete)
2071{
11fdf7f2 2072 dout(10) << __func__ << " " << frag << dendl;
7c673cae
FG
2073 const bool fatal = cache->mds->damage_table.notify_dirfrag(
2074 inode->ino(), frag, get_path());
2075 if (fatal) {
2076 cache->mds->damaged();
2077 ceph_abort(); // unreachable, damaged() respawns us
2078 }
2079
2080 if (complete)
2081 _go_bad();
2082 else
2083 auth_unpin(this);
2084}
2085
2086// -----------------------
2087// COMMIT
2088
2089/**
2090 * commit
2091 *
2092 * @param want - min version i want committed
2093 * @param c - callback for completion
2094 */
11fdf7f2 2095void CDir::commit(version_t want, MDSContext *c, bool ignore_authpinnability, int op_prio)
7c673cae
FG
2096{
2097 dout(10) << "commit want " << want << " on " << *this << dendl;
2098 if (want == 0) want = get_version();
2099
2100 // preconditions
11fdf7f2
TL
2101 ceph_assert(want <= get_version() || get_version() == 0); // can't commit the future
2102 ceph_assert(want > committed_version); // the caller is stupid
2103 ceph_assert(is_auth());
2104 ceph_assert(ignore_authpinnability || can_auth_pin());
7c673cae 2105
7c673cae
FG
2106 // note: queue up a noop if necessary, so that we always
2107 // get an auth_pin.
2108 if (!c)
2109 c = new C_MDSInternalNoop;
2110
2111 // auth_pin on first waiter
2112 if (waiting_for_commit.empty())
2113 auth_pin(this);
2114 waiting_for_commit[want].push_back(c);
2115
2116 // ok.
2117 _commit(want, op_prio);
2118}
2119
2120class C_IO_Dir_Committed : public CDirIOContext {
2121 version_t version;
2122public:
2123 C_IO_Dir_Committed(CDir *d, version_t v) : CDirIOContext(d), version(v) { }
2124 void finish(int r) override {
2125 dir->_committed(r, version);
2126 }
91327a77
AA
2127 void print(ostream& out) const override {
2128 out << "dirfrag_commit(" << dir->dirfrag() << ")";
2129 }
7c673cae
FG
2130};
2131
2132/**
2133 * Flush out the modified dentries in this dir. Keep the bufferlist
2134 * below max_write_size;
2135 */
2136void CDir::_omap_commit(int op_prio)
2137{
11fdf7f2 2138 dout(10) << __func__ << dendl;
7c673cae
FG
2139
2140 unsigned max_write_size = cache->max_dir_commit_size;
2141 unsigned write_size = 0;
2142
2143 if (op_prio < 0)
2144 op_prio = CEPH_MSG_PRIO_DEFAULT;
2145
2146 // snap purge?
2147 const set<snapid_t> *snaps = NULL;
2148 SnapRealm *realm = inode->find_snaprealm();
2149 if (!realm->have_past_parents_open()) {
2150 dout(10) << " no snap purge, one or more past parents NOT open" << dendl;
2151 } else if (fnode.snap_purged_thru < realm->get_last_destroyed()) {
2152 snaps = &realm->get_snaps();
2153 dout(10) << " snap_purged_thru " << fnode.snap_purged_thru
2154 << " < " << realm->get_last_destroyed()
2155 << ", snap purge based on " << *snaps << dendl;
2156 // fnode.snap_purged_thru = realm->get_last_destroyed();
2157 }
2158
2159 set<string> to_remove;
2160 map<string, bufferlist> to_set;
2161
2162 C_GatherBuilder gather(g_ceph_context,
2163 new C_OnFinisher(new C_IO_Dir_Committed(this,
2164 get_version()),
2165 cache->mds->finisher));
2166
2167 SnapContext snapc;
2168 object_t oid = get_ondisk_object();
2169 object_locator_t oloc(cache->mds->mdsmap->get_metadata_pool());
2170
2171 if (!stale_items.empty()) {
94b18763 2172 for (const auto &p : stale_items) {
11fdf7f2 2173 to_remove.insert(std::string(p));
94b18763 2174 write_size += p.length();
7c673cae
FG
2175 }
2176 stale_items.clear();
2177 }
2178
b32b8144 2179 auto write_one = [&](CDentry *dn) {
7c673cae
FG
2180 string key;
2181 dn->key().encode(key);
2182
2183 if (dn->last != CEPH_NOSNAP &&
2184 snaps && try_trim_snap_dentry(dn, *snaps)) {
2185 dout(10) << " rm " << key << dendl;
2186 write_size += key.length();
2187 to_remove.insert(key);
b32b8144 2188 return;
7c673cae
FG
2189 }
2190
7c673cae 2191 if (dn->get_linkage()->is_null()) {
94b18763 2192 dout(10) << " rm " << dn->get_name() << " " << *dn << dendl;
7c673cae
FG
2193 write_size += key.length();
2194 to_remove.insert(key);
2195 } else {
94b18763 2196 dout(10) << " set " << dn->get_name() << " " << *dn << dendl;
7c673cae
FG
2197 bufferlist dnbl;
2198 _encode_dentry(dn, dnbl, snaps);
2199 write_size += key.length() + dnbl.length();
2200 to_set[key].swap(dnbl);
2201 }
2202
2203 if (write_size >= max_write_size) {
2204 ObjectOperation op;
2205 op.priority = op_prio;
2206
2207 // don't create new dirfrag blindly
2208 if (!is_new() && !state_test(CDir::STATE_FRAGMENTING))
2209 op.stat(NULL, (ceph::real_time*) NULL, NULL);
2210
2211 if (!to_set.empty())
2212 op.omap_set(to_set);
2213 if (!to_remove.empty())
2214 op.omap_rm_keys(to_remove);
2215
2216 cache->mds->objecter->mutate(oid, oloc, op, snapc,
2217 ceph::real_clock::now(),
2218 0, gather.new_sub());
2219
2220 write_size = 0;
2221 to_set.clear();
2222 to_remove.clear();
2223 }
b32b8144
FG
2224 };
2225
2226 if (state_test(CDir::STATE_FRAGMENTING)) {
2227 for (auto p = items.begin(); p != items.end(); ) {
2228 CDentry *dn = p->second;
2229 ++p;
2230 if (!dn->is_dirty() && dn->get_linkage()->is_null())
2231 continue;
2232 write_one(dn);
2233 }
2234 } else {
2235 for (auto p = dirty_dentries.begin(); !p.end(); ) {
2236 CDentry *dn = *p;
2237 ++p;
2238 write_one(dn);
2239 }
7c673cae
FG
2240 }
2241
2242 ObjectOperation op;
2243 op.priority = op_prio;
2244
2245 // don't create new dirfrag blindly
2246 if (!is_new() && !state_test(CDir::STATE_FRAGMENTING))
2247 op.stat(NULL, (ceph::real_time*)NULL, NULL);
2248
2249 /*
2250 * save the header at the last moment.. If we were to send it off before other
2251 * updates, but die before sending them all, we'd think that the on-disk state
2252 * was fully committed even though it wasn't! However, since the messages are
2253 * strictly ordered between the MDS and the OSD, and since messages to a given
2254 * PG are strictly ordered, if we simply send the message containing the header
2255 * off last, we cannot get our header into an incorrect state.
2256 */
2257 bufferlist header;
11fdf7f2 2258 encode(fnode, header);
7c673cae
FG
2259 op.omap_set_header(header);
2260
2261 if (!to_set.empty())
2262 op.omap_set(to_set);
2263 if (!to_remove.empty())
2264 op.omap_rm_keys(to_remove);
2265
2266 cache->mds->objecter->mutate(oid, oloc, op, snapc,
2267 ceph::real_clock::now(),
2268 0, gather.new_sub());
2269
2270 gather.activate();
2271}
2272
2273void CDir::_encode_dentry(CDentry *dn, bufferlist& bl,
2274 const set<snapid_t> *snaps)
2275{
2276 // clear dentry NEW flag, if any. we can no longer silently drop it.
2277 dn->clear_new();
2278
11fdf7f2 2279 encode(dn->first, bl);
7c673cae
FG
2280
2281 // primary or remote?
2282 if (dn->linkage.is_remote()) {
2283 inodeno_t ino = dn->linkage.get_remote_ino();
2284 unsigned char d_type = dn->linkage.get_remote_d_type();
94b18763 2285 dout(14) << " pos " << bl.length() << " dn '" << dn->get_name() << "' remote ino " << ino << dendl;
7c673cae
FG
2286
2287 // marker, name, ino
2288 bl.append('L'); // remote link
11fdf7f2
TL
2289 encode(ino, bl);
2290 encode(d_type, bl);
7c673cae
FG
2291 } else if (dn->linkage.is_primary()) {
2292 // primary link
2293 CInode *in = dn->linkage.get_inode();
11fdf7f2 2294 ceph_assert(in);
7c673cae 2295
94b18763 2296 dout(14) << " pos " << bl.length() << " dn '" << dn->get_name() << "' inode " << *in << dendl;
7c673cae
FG
2297
2298 // marker, name, inode, [symlink string]
2299 bl.append('I'); // inode
2300
2301 if (in->is_multiversion()) {
2302 if (!in->snaprealm) {
2303 if (snaps)
2304 in->purge_stale_snap_data(*snaps);
2305 } else if (in->snaprealm->have_past_parents_open()) {
2306 in->purge_stale_snap_data(in->snaprealm->get_snaps());
2307 }
2308 }
2309
2310 bufferlist snap_blob;
2311 in->encode_snap_blob(snap_blob);
2312 in->encode_bare(bl, cache->mds->mdsmap->get_up_features(), &snap_blob);
2313 } else {
11fdf7f2 2314 ceph_assert(!dn->linkage.is_null());
7c673cae
FG
2315 }
2316}
2317
2318void CDir::_commit(version_t want, int op_prio)
2319{
2320 dout(10) << "_commit want " << want << " on " << *this << dendl;
2321
2322 // we can't commit things in the future.
2323 // (even the projected future.)
11fdf7f2 2324 ceph_assert(want <= get_version() || get_version() == 0);
7c673cae
FG
2325
2326 // check pre+postconditions.
11fdf7f2 2327 ceph_assert(is_auth());
7c673cae
FG
2328
2329 // already committed?
2330 if (committed_version >= want) {
2331 dout(10) << "already committed " << committed_version << " >= " << want << dendl;
2332 return;
2333 }
2334 // already committing >= want?
2335 if (committing_version >= want) {
2336 dout(10) << "already committing " << committing_version << " >= " << want << dendl;
11fdf7f2 2337 ceph_assert(state_test(STATE_COMMITTING));
7c673cae
FG
2338 return;
2339 }
2340
2341 // alrady committed an older version?
2342 if (committing_version > committed_version) {
2343 dout(10) << "already committing older " << committing_version << ", waiting for that to finish" << dendl;
2344 return;
2345 }
2346
2347 // commit.
2348 committing_version = get_version();
2349
2350 // mark committing (if not already)
2351 if (!state_test(STATE_COMMITTING)) {
2352 dout(10) << "marking committing" << dendl;
2353 state_set(STATE_COMMITTING);
2354 }
2355
2356 if (cache->mds->logger) cache->mds->logger->inc(l_mds_dir_commit);
2357
2358 _omap_commit(op_prio);
2359}
2360
2361
2362/**
2363 * _committed
2364 *
2365 * @param v version i just committed
2366 */
2367void CDir::_committed(int r, version_t v)
2368{
2369 if (r < 0) {
2370 // the directory could be partly purged during MDS failover
2371 if (r == -ENOENT && committed_version == 0 &&
31f18b77 2372 !inode->is_base() && get_parent_dir()->inode->is_stray()) {
7c673cae 2373 r = 0;
31f18b77
FG
2374 if (inode->snaprealm)
2375 inode->state_set(CInode::STATE_MISSINGOBJS);
7c673cae
FG
2376 }
2377 if (r < 0) {
2378 dout(1) << "commit error " << r << " v " << v << dendl;
2379 cache->mds->clog->error() << "failed to commit dir " << dirfrag() << " object,"
2380 << " errno " << r;
2381 cache->mds->handle_write_error(r);
2382 return;
2383 }
2384 }
2385
2386 dout(10) << "_committed v " << v << " on " << *this << dendl;
11fdf7f2 2387 ceph_assert(is_auth());
7c673cae
FG
2388
2389 bool stray = inode->is_stray();
2390
2391 // take note.
11fdf7f2
TL
2392 ceph_assert(v > committed_version);
2393 ceph_assert(v <= committing_version);
7c673cae
FG
2394 committed_version = v;
2395
2396 // _all_ commits done?
2397 if (committing_version == committed_version)
2398 state_clear(CDir::STATE_COMMITTING);
2399
2400 // _any_ commit, even if we've been redirtied, means we're no longer new.
2401 item_new.remove_myself();
2402
2403 // dir clean?
2404 if (committed_version == get_version())
2405 mark_clean();
2406
2407 // dentries clean?
b32b8144
FG
2408 for (auto p = dirty_dentries.begin(); !p.end(); ) {
2409 CDentry *dn = *p;
2410 ++p;
7c673cae
FG
2411
2412 // inode?
2413 if (dn->linkage.is_primary()) {
2414 CInode *in = dn->linkage.get_inode();
11fdf7f2
TL
2415 ceph_assert(in);
2416 ceph_assert(in->is_auth());
7c673cae
FG
2417
2418 if (committed_version >= in->get_version()) {
2419 if (in->is_dirty()) {
2420 dout(15) << " dir " << committed_version << " >= inode " << in->get_version() << " now clean " << *in << dendl;
2421 in->mark_clean();
2422 }
2423 } else {
2424 dout(15) << " dir " << committed_version << " < inode " << in->get_version() << " still dirty " << *in << dendl;
11fdf7f2 2425 ceph_assert(in->is_dirty() || in->last < CEPH_NOSNAP); // special case for cow snap items (not predirtied)
7c673cae
FG
2426 }
2427 }
2428
2429 // dentry
2430 if (committed_version >= dn->get_version()) {
b32b8144
FG
2431 dout(15) << " dir " << committed_version << " >= dn " << dn->get_version() << " now clean " << *dn << dendl;
2432 dn->mark_clean();
7c673cae 2433
b32b8144
FG
2434 // drop clean null stray dentries immediately
2435 if (stray &&
2436 dn->get_num_ref() == 0 &&
2437 !dn->is_projected() &&
2438 dn->get_linkage()->is_null())
2439 remove_dentry(dn);
7c673cae
FG
2440 } else {
2441 dout(15) << " dir " << committed_version << " < dn " << dn->get_version() << " still dirty " << *dn << dendl;
11fdf7f2 2442 ceph_assert(dn->is_dirty());
7c673cae
FG
2443 }
2444 }
2445
2446 // finishers?
2447 bool were_waiters = !waiting_for_commit.empty();
2448
94b18763
FG
2449 auto it = waiting_for_commit.begin();
2450 while (it != waiting_for_commit.end()) {
2451 auto _it = it;
2452 ++_it;
2453 if (it->first > committed_version) {
2454 dout(10) << " there are waiters for " << it->first << ", committing again" << dendl;
2455 _commit(it->first, -1);
7c673cae
FG
2456 break;
2457 }
11fdf7f2 2458 MDSContext::vec t;
94b18763
FG
2459 for (const auto &waiter : it->second)
2460 t.push_back(waiter);
2461 cache->mds->queue_waiters(t);
2462 waiting_for_commit.erase(it);
2463 it = _it;
7c673cae
FG
2464 }
2465
2466 // try drop dentries in this dirfrag if it's about to be purged
31f18b77
FG
2467 if (!inode->is_base() && get_parent_dir()->inode->is_stray() &&
2468 inode->snaprealm)
7c673cae
FG
2469 cache->maybe_eval_stray(inode, true);
2470
2471 // unpin if we kicked the last waiter.
2472 if (were_waiters &&
2473 waiting_for_commit.empty())
2474 auth_unpin(this);
2475}
2476
2477
2478
2479
2480// IMPORT/EXPORT
2481
2482void CDir::encode_export(bufferlist& bl)
2483{
11fdf7f2
TL
2484 ceph_assert(!is_projected());
2485 encode(first, bl);
2486 encode(fnode, bl);
2487 encode(dirty_old_rstat, bl);
2488 encode(committed_version, bl);
7c673cae 2489
11fdf7f2
TL
2490 encode(state, bl);
2491 encode(dir_rep, bl);
7c673cae 2492
11fdf7f2
TL
2493 encode(pop_me, bl);
2494 encode(pop_auth_subtree, bl);
7c673cae 2495
11fdf7f2
TL
2496 encode(dir_rep_by, bl);
2497 encode(get_replicas(), bl);
7c673cae
FG
2498
2499 get(PIN_TEMPEXPORTING);
2500}
2501
11fdf7f2 2502void CDir::finish_export()
7c673cae
FG
2503{
2504 state &= MASK_STATE_EXPORT_KEPT;
11fdf7f2
TL
2505 pop_nested.sub(pop_auth_subtree);
2506 pop_auth_subtree_nested.sub(pop_auth_subtree);
2507 pop_me.zero();
2508 pop_auth_subtree.zero();
7c673cae
FG
2509 put(PIN_TEMPEXPORTING);
2510 dirty_old_rstat.clear();
2511}
2512
11fdf7f2 2513void CDir::decode_import(bufferlist::const_iterator& blp, LogSegment *ls)
7c673cae 2514{
11fdf7f2
TL
2515 decode(first, blp);
2516 decode(fnode, blp);
2517 decode(dirty_old_rstat, blp);
7c673cae 2518 projected_version = fnode.version;
11fdf7f2 2519 decode(committed_version, blp);
7c673cae
FG
2520 committing_version = committed_version;
2521
2522 unsigned s;
11fdf7f2 2523 decode(s, blp);
7c673cae
FG
2524 state &= MASK_STATE_IMPORT_KEPT;
2525 state_set(STATE_AUTH | (s & MASK_STATE_EXPORTED));
2526
2527 if (is_dirty()) {
2528 get(PIN_DIRTY);
2529 _mark_dirty(ls);
2530 }
2531
11fdf7f2 2532 decode(dir_rep, blp);
7c673cae 2533
11fdf7f2
TL
2534 decode(pop_me, blp);
2535 decode(pop_auth_subtree, blp);
2536 pop_nested.add(pop_auth_subtree);
2537 pop_auth_subtree_nested.add(pop_auth_subtree);
7c673cae 2538
11fdf7f2
TL
2539 decode(dir_rep_by, blp);
2540 decode(get_replicas(), blp);
181888fb 2541 if (is_replicated()) get(PIN_REPLICATED);
7c673cae
FG
2542
2543 replica_nonce = 0; // no longer defined
2544
2545 // did we import some dirty scatterlock data?
2546 if (dirty_old_rstat.size() ||
2547 !(fnode.rstat == fnode.accounted_rstat)) {
2548 cache->mds->locker->mark_updated_scatterlock(&inode->nestlock);
2549 ls->dirty_dirfrag_nest.push_back(&inode->item_dirty_dirfrag_nest);
2550 }
2551 if (!(fnode.fragstat == fnode.accounted_fragstat)) {
2552 cache->mds->locker->mark_updated_scatterlock(&inode->filelock);
2553 ls->dirty_dirfrag_dir.push_back(&inode->item_dirty_dirfrag_dir);
2554 }
2555 if (is_dirty_dft()) {
2556 if (inode->dirfragtreelock.get_state() != LOCK_MIX &&
2557 inode->dirfragtreelock.is_stable()) {
2558 // clear stale dirtydft
2559 state_clear(STATE_DIRTYDFT);
2560 } else {
2561 cache->mds->locker->mark_updated_scatterlock(&inode->dirfragtreelock);
2562 ls->dirty_dirfrag_dirfragtree.push_back(&inode->item_dirty_dirfrag_dirfragtree);
2563 }
2564 }
2565}
2566
11fdf7f2
TL
2567void CDir::abort_import()
2568{
2569 ceph_assert(is_auth());
2570 state_clear(CDir::STATE_AUTH);
2571 remove_bloom();
2572 clear_replica_map();
2573 set_replica_nonce(CDir::EXPORT_NONCE);
2574 if (is_dirty())
2575 mark_clean();
7c673cae 2576
11fdf7f2
TL
2577 pop_nested.sub(pop_auth_subtree);
2578 pop_auth_subtree_nested.sub(pop_auth_subtree);
2579 pop_me.zero();
2580 pop_auth_subtree.zero();
2581}
7c673cae 2582
11fdf7f2
TL
2583void CDir::encode_dirstat(bufferlist& bl, const session_info_t& info, const DirStat& ds) {
2584 if (info.has_feature(CEPHFS_FEATURE_REPLY_ENCODING)) {
2585 ENCODE_START(1, 1, bl);
2586 encode(ds.frag, bl);
2587 encode(ds.auth, bl);
2588 encode(ds.dist, bl);
2589 ENCODE_FINISH(bl);
2590 }
2591 else {
2592 encode(ds.frag, bl);
2593 encode(ds.auth, bl);
2594 encode(ds.dist, bl);
2595 }
2596}
7c673cae
FG
2597
2598/********************************
2599 * AUTHORITY
2600 */
2601
2602/*
2603 * if dir_auth.first == parent, auth is same as inode.
2604 * unless .second != unknown, in which case that sticks.
2605 */
2606mds_authority_t CDir::authority() const
2607{
2608 if (is_subtree_root())
2609 return dir_auth;
2610 else
2611 return inode->authority();
2612}
2613
2614/** is_subtree_root()
2615 * true if this is an auth delegation point.
2616 * that is, dir_auth != default (parent,unknown)
2617 *
2618 * some key observations:
2619 * if i am auth:
2620 * - any region bound will be an export, or frozen.
2621 *
2622 * note that this DOES heed dir_auth.pending
2623 */
2624/*
2625bool CDir::is_subtree_root()
2626{
2627 if (dir_auth == CDIR_AUTH_DEFAULT) {
2628 //dout(10) << "is_subtree_root false " << dir_auth << " != " << CDIR_AUTH_DEFAULT
2629 //<< " on " << ino() << dendl;
2630 return false;
2631 } else {
2632 //dout(10) << "is_subtree_root true " << dir_auth << " != " << CDIR_AUTH_DEFAULT
2633 //<< " on " << ino() << dendl;
2634 return true;
2635 }
2636}
2637*/
2638
2639/** contains(x)
2640 * true if we are x, or an ancestor of x
2641 */
2642bool CDir::contains(CDir *x)
2643{
2644 while (1) {
2645 if (x == this)
2646 return true;
2647 x = x->get_inode()->get_projected_parent_dir();
2648 if (x == 0)
2649 return false;
2650 }
2651}
2652
2653
2654
2655/** set_dir_auth
2656 */
11fdf7f2 2657void CDir::set_dir_auth(const mds_authority_t &a)
7c673cae
FG
2658{
2659 dout(10) << "setting dir_auth=" << a
2660 << " from " << dir_auth
2661 << " on " << *this << dendl;
2662
2663 bool was_subtree = is_subtree_root();
2664 bool was_ambiguous = dir_auth.second >= 0;
2665
2666 // set it.
2667 dir_auth = a;
2668
2669 // new subtree root?
2670 if (!was_subtree && is_subtree_root()) {
2671 dout(10) << " new subtree root, adjusting auth_pins" << dendl;
1adf2230 2672
11fdf7f2
TL
2673 if (freeze_tree_state) {
2674 // only by CDir::_freeze_tree()
2675 ceph_assert(is_freezing_tree_root());
2676 }
1adf2230 2677
11fdf7f2 2678 inode->num_subtree_roots++;
7c673cae
FG
2679
2680 // unpin parent of frozen dir/tree?
224ce89b 2681 if (inode->is_auth()) {
11fdf7f2 2682 ceph_assert(!is_frozen_tree_root());
224ce89b
WB
2683 if (is_frozen_dir())
2684 inode->auth_unpin(this);
2685 }
7c673cae
FG
2686 }
2687 if (was_subtree && !is_subtree_root()) {
2688 dout(10) << " old subtree root, adjusting auth_pins" << dendl;
1adf2230
AA
2689
2690 inode->num_subtree_roots--;
7c673cae
FG
2691
2692 // pin parent of frozen dir/tree?
224ce89b 2693 if (inode->is_auth()) {
11fdf7f2 2694 ceph_assert(!is_frozen_tree_root());
224ce89b
WB
2695 if (is_frozen_dir())
2696 inode->auth_pin(this);
2697 }
7c673cae
FG
2698 }
2699
2700 // newly single auth?
2701 if (was_ambiguous && dir_auth.second == CDIR_AUTH_UNKNOWN) {
11fdf7f2 2702 MDSContext::vec ls;
7c673cae
FG
2703 take_waiting(WAIT_SINGLEAUTH, ls);
2704 cache->mds->queue_waiters(ls);
2705 }
2706}
2707
7c673cae
FG
2708/*****************************************
2709 * AUTH PINS and FREEZING
2710 *
2711 * the basic plan is that auth_pins only exist in auth regions, and they
2712 * prevent a freeze (and subsequent auth change).
2713 *
2714 * however, we also need to prevent a parent from freezing if a child is frozen.
2715 * for that reason, the parent inode of a frozen directory is auth_pinned.
2716 *
2717 * the oddity is when the frozen directory is a subtree root. if that's the case,
2718 * the parent inode isn't frozen. which means that when subtree authority is adjusted
2719 * at the bounds, inodes for any frozen bound directories need to get auth_pins at that
2720 * time.
2721 *
2722 */
2723
2724void CDir::auth_pin(void *by)
2725{
2726 if (auth_pins == 0)
2727 get(PIN_AUTHPIN);
2728 auth_pins++;
2729
2730#ifdef MDS_AUTHPIN_SET
2731 auth_pin_set.insert(by);
2732#endif
2733
11fdf7f2 2734 dout(10) << "auth_pin by " << by << " on " << *this << " count now " << auth_pins << dendl;
7c673cae 2735
11fdf7f2
TL
2736 if (freeze_tree_state)
2737 freeze_tree_state->auth_pins += 1;
7c673cae
FG
2738}
2739
2740void CDir::auth_unpin(void *by)
2741{
2742 auth_pins--;
2743
2744#ifdef MDS_AUTHPIN_SET
11fdf7f2
TL
2745 {
2746 auto it = auth_pin_set.find(by);
2747 ceph_assert(it != auth_pin_set.end());
2748 auth_pin_set.erase(it);
2749 }
7c673cae
FG
2750#endif
2751 if (auth_pins == 0)
2752 put(PIN_AUTHPIN);
2753
11fdf7f2
TL
2754 dout(10) << "auth_unpin by " << by << " on " << *this << " count now " << auth_pins << dendl;
2755 ceph_assert(auth_pins >= 0);
2756
2757 if (freeze_tree_state)
2758 freeze_tree_state->auth_pins -= 1;
7c673cae
FG
2759
2760 maybe_finish_freeze(); // pending freeze?
7c673cae
FG
2761}
2762
11fdf7f2 2763void CDir::adjust_nested_auth_pins(int dirinc, void *by)
7c673cae 2764{
11fdf7f2 2765 ceph_assert(dirinc);
7c673cae
FG
2766 dir_auth_pins += dirinc;
2767
11fdf7f2 2768 dout(15) << __func__ << " " << dirinc << " on " << *this
7c673cae 2769 << " by " << by << " count now "
11fdf7f2
TL
2770 << auth_pins << "/" << dir_auth_pins << dendl;
2771 ceph_assert(dir_auth_pins >= 0);
7c673cae 2772
11fdf7f2
TL
2773 if (freeze_tree_state)
2774 freeze_tree_state->auth_pins += dirinc;
7c673cae 2775
11fdf7f2
TL
2776 if (dirinc < 0)
2777 maybe_finish_freeze(); // pending freeze?
7c673cae
FG
2778}
2779
2780#ifdef MDS_VERIFY_FRAGSTAT
2781void CDir::verify_fragstat()
2782{
11fdf7f2 2783 ceph_assert(is_complete());
7c673cae
FG
2784 if (inode->is_stray())
2785 return;
2786
2787 frag_info_t c;
2788 memset(&c, 0, sizeof(c));
2789
94b18763 2790 for (auto it = items.begin();
7c673cae
FG
2791 it != items.end();
2792 ++it) {
2793 CDentry *dn = it->second;
2794 if (dn->is_null())
2795 continue;
2796
2797 dout(10) << " " << *dn << dendl;
2798 if (dn->is_primary())
2799 dout(10) << " " << *dn->inode << dendl;
2800
2801 if (dn->is_primary()) {
2802 if (dn->inode->is_dir())
2803 c.nsubdirs++;
2804 else
2805 c.nfiles++;
2806 }
2807 if (dn->is_remote()) {
2808 if (dn->get_remote_d_type() == DT_DIR)
2809 c.nsubdirs++;
2810 else
2811 c.nfiles++;
2812 }
2813 }
2814
2815 if (c.nsubdirs != fnode.fragstat.nsubdirs ||
2816 c.nfiles != fnode.fragstat.nfiles) {
2817 dout(0) << "verify_fragstat failed " << fnode.fragstat << " on " << *this << dendl;
2818 dout(0) << " i count " << c << dendl;
2819 ceph_abort();
2820 } else {
2821 dout(0) << "verify_fragstat ok " << fnode.fragstat << " on " << *this << dendl;
2822 }
2823}
2824#endif
2825
2826/*****************************************************************************
2827 * FREEZING
2828 */
2829
2830// FREEZE TREE
2831
11fdf7f2
TL
2832void CDir::_walk_tree(std::function<bool(CDir*)> callback)
2833{
2834
2835 deque<CDir*> dfq;
2836 dfq.push_back(this);
2837
2838 vector<CDir*> dfv;
2839 while (!dfq.empty()) {
2840 CDir *dir = dfq.front();
2841 dfq.pop_front();
2842
2843 for (auto& p : *dir) {
2844 CDentry *dn = p.second;
2845 if (!dn->get_linkage()->is_primary())
2846 continue;
2847 CInode *in = dn->get_linkage()->get_inode();
2848 if (!in->is_dir())
2849 continue;
2850
2851 in->get_nested_dirfrags(dfv);
2852 for (auto& dir : dfv) {
2853 auto ret = callback(dir);
2854 if (ret)
2855 dfq.push_back(dir);
2856 }
2857 dfv.clear();
2858 }
2859 }
2860}
2861
7c673cae
FG
2862bool CDir::freeze_tree()
2863{
11fdf7f2
TL
2864 ceph_assert(!is_frozen());
2865 ceph_assert(!is_freezing());
2866 ceph_assert(!freeze_tree_state);
7c673cae
FG
2867
2868 auth_pin(this);
11fdf7f2
TL
2869
2870 // Travese the subtree to mark dirfrags as 'freezing' (set freeze_tree_state)
2871 // and to accumulate auth pins and record total count in freeze_tree_state.
2872 // when auth unpin an 'freezing' object, the counter in freeze_tree_state also
2873 // gets decreased. Subtree become 'frozen' when the counter reaches zero.
2874 freeze_tree_state = std::make_shared<freeze_tree_state_t>(this);
2875 freeze_tree_state->auth_pins += get_auth_pins() + get_dir_auth_pins();
2876
2877 _walk_tree([this](CDir *dir) {
2878 if (dir->freeze_tree_state)
2879 return false;
2880 dir->freeze_tree_state = freeze_tree_state;
2881 freeze_tree_state->auth_pins += dir->get_auth_pins() + dir->get_dir_auth_pins();
2882 return true;
2883 }
2884 );
2885
7c673cae
FG
2886 if (is_freezeable(true)) {
2887 _freeze_tree();
2888 auth_unpin(this);
2889 return true;
2890 } else {
2891 state_set(STATE_FREEZINGTREE);
2892 ++num_freezing_trees;
2893 dout(10) << "freeze_tree waiting " << *this << dendl;
2894 return false;
2895 }
2896}
2897
2898void CDir::_freeze_tree()
2899{
11fdf7f2
TL
2900 dout(10) << __func__ << " " << *this << dendl;
2901 ceph_assert(is_freezeable(true));
7c673cae 2902
11fdf7f2
TL
2903 if (freeze_tree_state) {
2904 ceph_assert(is_auth());
2905 } else {
2906 ceph_assert(!is_auth());
2907 freeze_tree_state = std::make_shared<freeze_tree_state_t>(this);
7c673cae 2908 }
11fdf7f2 2909 freeze_tree_state->frozen = true;
224ce89b
WB
2910
2911 if (is_auth()) {
2912 mds_authority_t auth;
2913 bool was_subtree = is_subtree_root();
2914 if (was_subtree) {
2915 auth = get_dir_auth();
2916 } else {
2917 // temporarily prevent parent subtree from becoming frozen.
2918 inode->auth_pin(this);
2919 // create new subtree
2920 auth = authority();
2921 }
2922
11fdf7f2
TL
2923 _walk_tree([this, &auth] (CDir *dir) {
2924 if (dir->freeze_tree_state != freeze_tree_state) {
2925 inode->mdcache->adjust_subtree_auth(dir, auth);
2926 return false;
2927 }
2928 return true;
2929 }
2930 );
2931
2932 ceph_assert(auth.first >= 0);
2933 ceph_assert(auth.second == CDIR_AUTH_UNKNOWN);
224ce89b
WB
2934 auth.second = auth.first;
2935 inode->mdcache->adjust_subtree_auth(this, auth);
2936 if (!was_subtree)
2937 inode->auth_unpin(this);
11fdf7f2
TL
2938 } else {
2939 // importing subtree ?
2940 _walk_tree([this] (CDir *dir) {
2941 ceph_assert(!dir->freeze_tree_state);
2942 dir->freeze_tree_state = freeze_tree_state;
2943 return true;
2944 }
2945 );
2946 }
2947
2948 // twiddle state
2949 if (state_test(STATE_FREEZINGTREE)) {
2950 state_clear(STATE_FREEZINGTREE);
2951 --num_freezing_trees;
224ce89b
WB
2952 }
2953
7c673cae
FG
2954 state_set(STATE_FROZENTREE);
2955 ++num_frozen_trees;
2956 get(PIN_FROZEN);
7c673cae
FG
2957}
2958
2959void CDir::unfreeze_tree()
2960{
11fdf7f2
TL
2961 dout(10) << __func__ << " " << *this << dendl;
2962
2963 MDSContext::vec unfreeze_waiters;
2964 take_waiting(WAIT_UNFREEZE, unfreeze_waiters);
2965
2966 if (freeze_tree_state) {
2967 _walk_tree([this, &unfreeze_waiters](CDir *dir) {
2968 if (dir->freeze_tree_state != freeze_tree_state)
2969 return false;
2970 dir->freeze_tree_state.reset();
2971 dir->take_waiting(WAIT_UNFREEZE, unfreeze_waiters);
2972 return true;
2973 }
2974 );
2975 }
7c673cae
FG
2976
2977 if (state_test(STATE_FROZENTREE)) {
2978 // frozen. unfreeze.
2979 state_clear(STATE_FROZENTREE);
2980 --num_frozen_trees;
2981
2982 put(PIN_FROZEN);
2983
224ce89b
WB
2984 if (is_auth()) {
2985 // must be subtree
11fdf7f2 2986 ceph_assert(is_subtree_root());
224ce89b
WB
2987 // for debug purpose, caller should ensure 'dir_auth.second == dir_auth.first'
2988 mds_authority_t auth = get_dir_auth();
11fdf7f2
TL
2989 ceph_assert(auth.first >= 0);
2990 ceph_assert(auth.second == auth.first);
224ce89b
WB
2991 auth.second = CDIR_AUTH_UNKNOWN;
2992 inode->mdcache->adjust_subtree_auth(this, auth);
2993 }
11fdf7f2 2994 freeze_tree_state.reset();
7c673cae 2995 } else {
11fdf7f2 2996 ceph_assert(state_test(STATE_FREEZINGTREE));
7c673cae
FG
2997
2998 // freezing. stop it.
7c673cae
FG
2999 state_clear(STATE_FREEZINGTREE);
3000 --num_freezing_trees;
11fdf7f2
TL
3001 freeze_tree_state.reset();
3002
3003 finish_waiting(WAIT_FROZEN, -1);
7c673cae 3004 auth_unpin(this);
7c673cae 3005 }
11fdf7f2
TL
3006
3007 cache->mds->queue_waiters(unfreeze_waiters);
3008}
3009
3010void CDir::adjust_freeze_after_rename(CDir *dir)
3011{
3012 if (!freeze_tree_state || dir->freeze_tree_state != freeze_tree_state)
3013 return;
3014 CDir *newdir = dir->get_inode()->get_parent_dir();
3015 if (newdir == this || newdir->freeze_tree_state == freeze_tree_state)
3016 return;
3017
3018 ceph_assert(!freeze_tree_state->frozen);
3019 ceph_assert(get_dir_auth_pins() > 0);
3020
3021 MDSContext::vec unfreeze_waiters;
3022
3023 auto unfreeze = [this, &unfreeze_waiters](CDir *dir) {
3024 if (dir->freeze_tree_state != freeze_tree_state)
3025 return false;
3026 int dec = dir->get_auth_pins() + dir->get_dir_auth_pins();
3027 // shouldn't become zero because srcdn of rename was auth pinned
3028 ceph_assert(freeze_tree_state->auth_pins > dec);
3029 freeze_tree_state->auth_pins -= dec;
3030 dir->freeze_tree_state.reset();
3031 dir->take_waiting(WAIT_UNFREEZE, unfreeze_waiters);
3032 return true;
3033 };
3034
3035 unfreeze(dir);
3036 dir->_walk_tree(unfreeze);
3037
3038 cache->mds->queue_waiters(unfreeze_waiters);
7c673cae
FG
3039}
3040
91327a77 3041bool CDir::can_auth_pin(int *err_ret) const
7c673cae 3042{
91327a77
AA
3043 int err;
3044 if (!is_auth()) {
3045 err = ERR_NOT_AUTH;
3046 } else if (is_freezing_dir() || is_frozen_dir()) {
3047 err = ERR_FRAGMENTING_DIR;
3048 } else {
3049 auto p = is_freezing_or_frozen_tree();
3050 if (p.first || p.second) {
3051 err = ERR_EXPORTING_TREE;
3052 } else {
3053 err = 0;
3054 }
3055 }
3056 if (err && err_ret)
3057 *err_ret = err;
3058 return !err;
3059}
3060
7c673cae
FG
3061class C_Dir_AuthUnpin : public CDirContext {
3062 public:
3063 explicit C_Dir_AuthUnpin(CDir *d) : CDirContext(d) {}
3064 void finish(int r) override {
3065 dir->auth_unpin(dir->get_inode());
3066 }
3067};
3068
3069void CDir::maybe_finish_freeze()
3070{
11fdf7f2 3071 if (dir_auth_pins != 0)
7c673cae
FG
3072 return;
3073
3074 // we can freeze the _dir_ even with nested pins...
3075 if (state_test(STATE_FREEZINGDIR)) {
11fdf7f2
TL
3076 if (auth_pins == 1) {
3077 _freeze_dir();
3078 auth_unpin(this);
3079 finish_waiting(WAIT_FROZEN);
3080 }
7c673cae
FG
3081 }
3082
11fdf7f2
TL
3083 if (freeze_tree_state) {
3084 if (freeze_tree_state->frozen ||
3085 freeze_tree_state->auth_pins != 1)
3086 return;
3087
3088 if (freeze_tree_state->dir != this) {
3089 freeze_tree_state->dir->maybe_finish_freeze();
3090 return;
3091 }
3092
3093 ceph_assert(state_test(STATE_FREEZINGTREE));
7c673cae 3094
7c673cae 3095 if (!is_subtree_root() && inode->is_frozen()) {
11fdf7f2 3096 dout(10) << __func__ << " !subtree root and frozen inode, waiting for unfreeze on " << inode << dendl;
7c673cae
FG
3097 // retake an auth_pin...
3098 auth_pin(inode);
3099 // and release it when the parent inode unfreezes
3100 inode->add_waiter(WAIT_UNFREEZE, new C_Dir_AuthUnpin(this));
3101 return;
3102 }
3103
3104 _freeze_tree();
3105 auth_unpin(this);
3106 finish_waiting(WAIT_FROZEN);
3107 }
3108}
3109
3110
3111
3112// FREEZE DIR
3113
3114bool CDir::freeze_dir()
3115{
11fdf7f2
TL
3116 ceph_assert(!is_frozen());
3117 ceph_assert(!is_freezing());
7c673cae
FG
3118
3119 auth_pin(this);
3120 if (is_freezeable_dir(true)) {
3121 _freeze_dir();
3122 auth_unpin(this);
3123 return true;
3124 } else {
3125 state_set(STATE_FREEZINGDIR);
3126 dout(10) << "freeze_dir + wait " << *this << dendl;
3127 return false;
3128 }
3129}
3130
3131void CDir::_freeze_dir()
3132{
11fdf7f2 3133 dout(10) << __func__ << " " << *this << dendl;
7c673cae
FG
3134 //assert(is_freezeable_dir(true));
3135 // not always true during split because the original fragment may have frozen a while
3136 // ago and we're just now getting around to breaking it up.
3137
3138 state_clear(STATE_FREEZINGDIR);
3139 state_set(STATE_FROZENDIR);
3140 get(PIN_FROZEN);
3141
3142 if (is_auth() && !is_subtree_root())
3143 inode->auth_pin(this); // auth_pin for duration of freeze
3144}
3145
3146
3147void CDir::unfreeze_dir()
3148{
11fdf7f2 3149 dout(10) << __func__ << " " << *this << dendl;
7c673cae
FG
3150
3151 if (state_test(STATE_FROZENDIR)) {
3152 state_clear(STATE_FROZENDIR);
3153 put(PIN_FROZEN);
3154
3155 // unpin (may => FREEZEABLE) FIXME: is this order good?
3156 if (is_auth() && !is_subtree_root())
3157 inode->auth_unpin(this);
3158
3159 finish_waiting(WAIT_UNFREEZE);
3160 } else {
3161 finish_waiting(WAIT_FROZEN, -1);
3162
3163 // still freezing. stop.
11fdf7f2 3164 ceph_assert(state_test(STATE_FREEZINGDIR));
7c673cae
FG
3165 state_clear(STATE_FREEZINGDIR);
3166 auth_unpin(this);
3167
3168 finish_waiting(WAIT_UNFREEZE);
3169 }
3170}
3171
3172/**
3173 * Slightly less complete than operator<<, because this is intended
3174 * for identifying a directory and its state rather than for dumping
3175 * debug output.
3176 */
11fdf7f2 3177void CDir::dump(Formatter *f, int flags) const
7c673cae 3178{
11fdf7f2
TL
3179 ceph_assert(f != NULL);
3180 if (flags & DUMP_PATH) {
3181 f->dump_stream("path") << get_path();
3182 }
3183 if (flags & DUMP_DIRFRAG) {
3184 f->dump_stream("dirfrag") << dirfrag();
3185 }
3186 if (flags & DUMP_SNAPID_FIRST) {
3187 f->dump_int("snapid_first", first);
3188 }
3189 if (flags & DUMP_VERSIONS) {
3190 f->dump_stream("projected_version") << get_projected_version();
3191 f->dump_stream("version") << get_version();
3192 f->dump_stream("committing_version") << get_committing_version();
3193 f->dump_stream("committed_version") << get_committed_version();
3194 }
3195 if (flags & DUMP_REP) {
3196 f->dump_bool("is_rep", is_rep());
3197 }
3198 if (flags & DUMP_DIR_AUTH) {
3199 if (get_dir_auth() != CDIR_AUTH_DEFAULT) {
3200 if (get_dir_auth().second == CDIR_AUTH_UNKNOWN) {
3201 f->dump_stream("dir_auth") << get_dir_auth().first;
3202 } else {
3203 f->dump_stream("dir_auth") << get_dir_auth();
3204 }
7c673cae 3205 } else {
11fdf7f2 3206 f->dump_string("dir_auth", "");
7c673cae 3207 }
11fdf7f2
TL
3208 }
3209 if (flags & DUMP_STATES) {
3210 f->open_array_section("states");
3211 MDSCacheObject::dump_states(f);
3212 if (state_test(CDir::STATE_COMPLETE)) f->dump_string("state", "complete");
3213 if (state_test(CDir::STATE_FREEZINGTREE)) f->dump_string("state", "freezingtree");
3214 if (state_test(CDir::STATE_FROZENTREE)) f->dump_string("state", "frozentree");
3215 if (state_test(CDir::STATE_FROZENDIR)) f->dump_string("state", "frozendir");
3216 if (state_test(CDir::STATE_FREEZINGDIR)) f->dump_string("state", "freezingdir");
3217 if (state_test(CDir::STATE_EXPORTBOUND)) f->dump_string("state", "exportbound");
3218 if (state_test(CDir::STATE_IMPORTBOUND)) f->dump_string("state", "importbound");
3219 if (state_test(CDir::STATE_BADFRAG)) f->dump_string("state", "badfrag");
3220 f->close_section();
3221 }
3222 if (flags & DUMP_MDS_CACHE_OBJECT) {
3223 MDSCacheObject::dump(f);
3224 }
3225 if (flags & DUMP_ITEMS) {
3226 f->open_array_section("dentries");
3227 for (auto &p : items) {
3228 CDentry *dn = p.second;
3229 f->open_object_section("dentry");
3230 dn->dump(f);
3231 f->close_section();
3232 }
3233 f->close_section();
3234 }
7c673cae
FG
3235}
3236
11fdf7f2 3237void CDir::dump_load(Formatter *f)
28e407b8
AA
3238{
3239 f->dump_stream("path") << get_path();
3240 f->dump_stream("dirfrag") << dirfrag();
3241
3242 f->open_object_section("pop_me");
11fdf7f2 3243 pop_me.dump(f);
28e407b8
AA
3244 f->close_section();
3245
3246 f->open_object_section("pop_nested");
11fdf7f2 3247 pop_nested.dump(f);
28e407b8
AA
3248 f->close_section();
3249
3250 f->open_object_section("pop_auth_subtree");
11fdf7f2 3251 pop_auth_subtree.dump(f);
28e407b8
AA
3252 f->close_section();
3253
3254 f->open_object_section("pop_auth_subtree_nested");
11fdf7f2 3255 pop_auth_subtree_nested.dump(f);
28e407b8
AA
3256 f->close_section();
3257}
3258
7c673cae
FG
3259/****** Scrub Stuff *******/
3260
3261void CDir::scrub_info_create() const
3262{
11fdf7f2 3263 ceph_assert(!scrub_infop);
7c673cae
FG
3264
3265 // break out of const-land to set up implicit initial state
3266 CDir *me = const_cast<CDir*>(this);
3267 fnode_t *fn = me->get_projected_fnode();
3268
3269 std::unique_ptr<scrub_info_t> si(new scrub_info_t());
3270
3271 si->last_recursive.version = si->recursive_start.version =
3272 fn->recursive_scrub_version;
3273 si->last_recursive.time = si->recursive_start.time =
3274 fn->recursive_scrub_stamp;
3275
3276 si->last_local.version = fn->localized_scrub_version;
3277 si->last_local.time = fn->localized_scrub_stamp;
3278
3279 me->scrub_infop.swap(si);
3280}
3281
3282void CDir::scrub_initialize(const ScrubHeaderRefConst& header)
3283{
3284 dout(20) << __func__ << dendl;
11fdf7f2
TL
3285 ceph_assert(is_complete());
3286 ceph_assert(header != nullptr);
7c673cae
FG
3287
3288 // FIXME: weird implicit construction, is someone else meant
3289 // to be calling scrub_info_create first?
3290 scrub_info();
11fdf7f2 3291 ceph_assert(scrub_infop && !scrub_infop->directory_scrubbing);
7c673cae
FG
3292
3293 scrub_infop->recursive_start.version = get_projected_version();
3294 scrub_infop->recursive_start.time = ceph_clock_now();
3295
3296 scrub_infop->directories_to_scrub.clear();
3297 scrub_infop->directories_scrubbing.clear();
3298 scrub_infop->directories_scrubbed.clear();
3299 scrub_infop->others_to_scrub.clear();
3300 scrub_infop->others_scrubbing.clear();
3301 scrub_infop->others_scrubbed.clear();
3302
94b18763 3303 for (auto i = items.begin();
7c673cae
FG
3304 i != items.end();
3305 ++i) {
3306 // TODO: handle snapshot scrubbing
3307 if (i->first.snapid != CEPH_NOSNAP)
3308 continue;
3309
3310 CDentry::linkage_t *dnl = i->second->get_projected_linkage();
3311 if (dnl->is_primary()) {
3312 if (dnl->get_inode()->is_dir())
3313 scrub_infop->directories_to_scrub.insert(i->first);
3314 else
3315 scrub_infop->others_to_scrub.insert(i->first);
3316 } else if (dnl->is_remote()) {
3317 // TODO: check remote linkage
3318 }
3319 }
3320 scrub_infop->directory_scrubbing = true;
3321 scrub_infop->header = header;
3322}
3323
3324void CDir::scrub_finished()
3325{
3326 dout(20) << __func__ << dendl;
11fdf7f2 3327 ceph_assert(scrub_infop && scrub_infop->directory_scrubbing);
7c673cae 3328
11fdf7f2
TL
3329 ceph_assert(scrub_infop->directories_to_scrub.empty());
3330 ceph_assert(scrub_infop->directories_scrubbing.empty());
7c673cae 3331 scrub_infop->directories_scrubbed.clear();
11fdf7f2
TL
3332 ceph_assert(scrub_infop->others_to_scrub.empty());
3333 ceph_assert(scrub_infop->others_scrubbing.empty());
7c673cae
FG
3334 scrub_infop->others_scrubbed.clear();
3335 scrub_infop->directory_scrubbing = false;
3336
3337 scrub_infop->last_recursive = scrub_infop->recursive_start;
3338 scrub_infop->last_scrub_dirty = true;
3339}
3340
94b18763 3341int CDir::_next_dentry_on_set(dentry_key_set &dns, bool missing_okay,
11fdf7f2 3342 MDSContext *cb, CDentry **dnout)
7c673cae
FG
3343{
3344 dentry_key_t dnkey;
3345 CDentry *dn;
3346
3347 while (!dns.empty()) {
3348 set<dentry_key_t>::iterator front = dns.begin();
3349 dnkey = *front;
3350 dn = lookup(dnkey.name);
3351 if (!dn) {
3352 if (!is_complete() &&
3353 (!has_bloom() || is_in_bloom(dnkey.name))) {
3354 // need to re-read this dirfrag
3355 fetch(cb);
3356 return EAGAIN;
3357 }
3358 // okay, we lost it
3359 if (missing_okay) {
3360 dout(15) << " we no longer have directory dentry "
3361 << dnkey.name << ", assuming it got renamed" << dendl;
3362 dns.erase(dnkey);
3363 continue;
3364 } else {
3365 dout(5) << " we lost dentry " << dnkey.name
3366 << ", bailing out because that's impossible!" << dendl;
3367 ceph_abort();
3368 }
3369 }
3370 // okay, we got a dentry
3371 dns.erase(dnkey);
3372
3373 if (dn->get_projected_version() < scrub_infop->last_recursive.version &&
3374 !(scrub_infop->header->get_force())) {
3375 dout(15) << " skip dentry " << dnkey.name
3376 << ", no change since last scrub" << dendl;
3377 continue;
94b18763
FG
3378 }
3379
3380 if (!dn->get_linkage()->is_primary()) {
3381 dout(15) << " skip dentry " << dnkey.name
3382 << ", no longer primary" << dendl;
3383 continue;
7c673cae
FG
3384 }
3385
3386 *dnout = dn;
3387 return 0;
3388 }
3389 *dnout = NULL;
3390 return ENOENT;
3391}
3392
11fdf7f2 3393int CDir::scrub_dentry_next(MDSContext *cb, CDentry **dnout)
7c673cae
FG
3394{
3395 dout(20) << __func__ << dendl;
11fdf7f2 3396 ceph_assert(scrub_infop && scrub_infop->directory_scrubbing);
7c673cae
FG
3397
3398 dout(20) << "trying to scrub directories underneath us" << dendl;
3399 int rval = _next_dentry_on_set(scrub_infop->directories_to_scrub, true,
3400 cb, dnout);
3401 if (rval == 0) {
3402 dout(20) << __func__ << " inserted to directories scrubbing: "
3403 << *dnout << dendl;
3404 scrub_infop->directories_scrubbing.insert((*dnout)->key());
3405 } else if (rval == EAGAIN) {
3406 // we don't need to do anything else
3407 } else { // we emptied out the directory scrub set
11fdf7f2 3408 ceph_assert(rval == ENOENT);
7c673cae
FG
3409 dout(20) << "no directories left, moving on to other kinds of dentries"
3410 << dendl;
3411
3412 rval = _next_dentry_on_set(scrub_infop->others_to_scrub, false, cb, dnout);
3413 if (rval == 0) {
3414 dout(20) << __func__ << " inserted to others scrubbing: "
3415 << *dnout << dendl;
3416 scrub_infop->others_scrubbing.insert((*dnout)->key());
3417 }
3418 }
3419 dout(20) << " returning " << rval << " with dn=" << *dnout << dendl;
3420 return rval;
3421}
3422
3423void CDir::scrub_dentries_scrubbing(list<CDentry*> *out_dentries)
3424{
3425 dout(20) << __func__ << dendl;
11fdf7f2 3426 ceph_assert(scrub_infop && scrub_infop->directory_scrubbing);
7c673cae
FG
3427
3428 for (set<dentry_key_t>::iterator i =
3429 scrub_infop->directories_scrubbing.begin();
3430 i != scrub_infop->directories_scrubbing.end();
3431 ++i) {
3432 CDentry *d = lookup(i->name, i->snapid);
11fdf7f2 3433 ceph_assert(d);
7c673cae
FG
3434 out_dentries->push_back(d);
3435 }
3436 for (set<dentry_key_t>::iterator i = scrub_infop->others_scrubbing.begin();
3437 i != scrub_infop->others_scrubbing.end();
3438 ++i) {
3439 CDentry *d = lookup(i->name, i->snapid);
11fdf7f2 3440 ceph_assert(d);
7c673cae
FG
3441 out_dentries->push_back(d);
3442 }
3443}
3444
3445void CDir::scrub_dentry_finished(CDentry *dn)
3446{
3447 dout(20) << __func__ << " on dn " << *dn << dendl;
11fdf7f2 3448 ceph_assert(scrub_infop && scrub_infop->directory_scrubbing);
7c673cae
FG
3449 dentry_key_t dn_key = dn->key();
3450 if (scrub_infop->directories_scrubbing.erase(dn_key)) {
3451 scrub_infop->directories_scrubbed.insert(dn_key);
3452 } else {
11fdf7f2 3453 ceph_assert(scrub_infop->others_scrubbing.count(dn_key));
7c673cae
FG
3454 scrub_infop->others_scrubbing.erase(dn_key);
3455 scrub_infop->others_scrubbed.insert(dn_key);
3456 }
3457}
3458
3459void CDir::scrub_maybe_delete_info()
3460{
3461 if (scrub_infop &&
3462 !scrub_infop->directory_scrubbing &&
3463 !scrub_infop->need_scrub_local &&
3464 !scrub_infop->last_scrub_dirty &&
3465 !scrub_infop->pending_scrub_error &&
3466 scrub_infop->dirty_scrub_stamps.empty()) {
3467 scrub_infop.reset();
3468 }
3469}
3470
3471bool CDir::scrub_local()
3472{
11fdf7f2 3473 ceph_assert(is_complete());
7c673cae
FG
3474 bool rval = check_rstats(true);
3475
3476 scrub_info();
3477 if (rval) {
3478 scrub_infop->last_local.time = ceph_clock_now();
3479 scrub_infop->last_local.version = get_projected_version();
3480 scrub_infop->pending_scrub_error = false;
3481 scrub_infop->last_scrub_dirty = true;
3482 } else {
3483 scrub_infop->pending_scrub_error = true;
3484 if (scrub_infop->header->get_repair())
3485 cache->repair_dirfrag_stats(this);
3486 }
3487 return rval;
3488}
3489
3490std::string CDir::get_path() const
3491{
3492 std::string path;
3493 get_inode()->make_path_string(path, true);
3494 return path;
3495}
3496
3497bool CDir::should_split_fast() const
3498{
3499 // Max size a fragment can be before trigger fast splitting
11fdf7f2 3500 int fast_limit = g_conf()->mds_bal_split_size * g_conf()->mds_bal_fragment_fast_factor;
7c673cae
FG
3501
3502 // Fast path: the sum of accounted size and null dentries does not
3503 // exceed threshold: we definitely are not over it.
3504 if (get_frag_size() + get_num_head_null() <= fast_limit) {
3505 return false;
3506 }
3507
3508 // Fast path: the accounted size of the frag exceeds threshold: we
3509 // definitely are over it
3510 if (get_frag_size() > fast_limit) {
3511 return true;
3512 }
3513
3514 int64_t effective_size = 0;
3515
3516 for (const auto &p : items) {
3517 const CDentry *dn = p.second;
3518 if (!dn->get_projected_linkage()->is_null()) {
3519 effective_size++;
3520 }
3521 }
3522
3523 return effective_size > fast_limit;
3524}
3525
181888fb 3526MEMPOOL_DEFINE_OBJECT_FACTORY(CDir, co_dir, mds_co);