]> git.proxmox.com Git - ceph.git/blame - ceph/src/mds/CDir.cc
update sources to v12.2.1
[ceph.git] / ceph / src / mds / CDir.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
16#include "include/types.h"
17
18#include "CDir.h"
19#include "CDentry.h"
20#include "CInode.h"
21#include "Mutation.h"
22
23#include "MDSMap.h"
24#include "MDSRank.h"
25#include "MDCache.h"
26#include "Locker.h"
27#include "MDLog.h"
28#include "LogSegment.h"
29
30#include "common/bloom_filter.hpp"
31#include "include/Context.h"
32#include "common/Clock.h"
33
34#include "osdc/Objecter.h"
35
36#include "common/config.h"
37#include "include/assert.h"
38#include "include/compat.h"
39
40#define dout_context g_ceph_context
41#define dout_subsys ceph_subsys_mds
42#undef dout_prefix
43#define dout_prefix *_dout << "mds." << cache->mds->get_nodeid() << ".cache.dir(" << this->dirfrag() << ") "
44
45int CDir::num_frozen_trees = 0;
46int CDir::num_freezing_trees = 0;
47
48class CDirContext : public MDSInternalContextBase
49{
50protected:
51 CDir *dir;
52 MDSRank* get_mds() override {return dir->cache->mds;}
53
54public:
55 explicit CDirContext(CDir *d) : dir(d) {
56 assert(dir != NULL);
57 }
58};
59
60
61class CDirIOContext : public MDSIOContextBase
62{
63protected:
64 CDir *dir;
65 MDSRank* get_mds() override {return dir->cache->mds;}
66
67public:
68 explicit CDirIOContext(CDir *d) : dir(d) {
69 assert(dir != NULL);
70 }
71};
72
73
74// PINS
75//int cdir_pins[CDIR_NUM_PINS] = { 0,0,0,0,0,0,0,0,0,0,0,0,0,0 };
76
77
78ostream& operator<<(ostream& out, const CDir& dir)
79{
80 out << "[dir " << dir.dirfrag() << " " << dir.get_path() << "/"
81 << " [" << dir.first << ",head]";
82 if (dir.is_auth()) {
83 out << " auth";
84 if (dir.is_replicated())
85 out << dir.get_replicas();
86
87 if (dir.is_projected())
88 out << " pv=" << dir.get_projected_version();
89 out << " v=" << dir.get_version();
90 out << " cv=" << dir.get_committing_version();
91 out << "/" << dir.get_committed_version();
92 } else {
93 mds_authority_t a = dir.authority();
94 out << " rep@" << a.first;
95 if (a.second != CDIR_AUTH_UNKNOWN)
96 out << "," << a.second;
97 out << "." << dir.get_replica_nonce();
98 }
99
100 if (dir.is_rep()) out << " REP";
101
102 if (dir.get_dir_auth() != CDIR_AUTH_DEFAULT) {
103 if (dir.get_dir_auth().second == CDIR_AUTH_UNKNOWN)
104 out << " dir_auth=" << dir.get_dir_auth().first;
105 else
106 out << " dir_auth=" << dir.get_dir_auth();
107 }
108
109 if (dir.get_cum_auth_pins())
110 out << " ap=" << dir.get_auth_pins()
111 << "+" << dir.get_dir_auth_pins()
112 << "+" << dir.get_nested_auth_pins();
113
114 out << " state=" << dir.get_state();
115 if (dir.state_test(CDir::STATE_COMPLETE)) out << "|complete";
116 if (dir.state_test(CDir::STATE_FREEZINGTREE)) out << "|freezingtree";
117 if (dir.state_test(CDir::STATE_FROZENTREE)) out << "|frozentree";
118 if (dir.state_test(CDir::STATE_AUXSUBTREE)) out << "|auxsubtree";
119 //if (dir.state_test(CDir::STATE_FROZENTREELEAF)) out << "|frozentreeleaf";
120 if (dir.state_test(CDir::STATE_FROZENDIR)) out << "|frozendir";
121 if (dir.state_test(CDir::STATE_FREEZINGDIR)) out << "|freezingdir";
122 if (dir.state_test(CDir::STATE_EXPORTBOUND)) out << "|exportbound";
123 if (dir.state_test(CDir::STATE_IMPORTBOUND)) out << "|importbound";
124 if (dir.state_test(CDir::STATE_BADFRAG)) out << "|badfrag";
125 if (dir.state_test(CDir::STATE_FRAGMENTING)) out << "|fragmenting";
126
127 // fragstat
128 out << " " << dir.fnode.fragstat;
129 if (!(dir.fnode.fragstat == dir.fnode.accounted_fragstat))
130 out << "/" << dir.fnode.accounted_fragstat;
131 if (g_conf->mds_debug_scatterstat && dir.is_projected()) {
132 const fnode_t *pf = dir.get_projected_fnode();
133 out << "->" << pf->fragstat;
134 if (!(pf->fragstat == pf->accounted_fragstat))
135 out << "/" << pf->accounted_fragstat;
136 }
137
138 // rstat
139 out << " " << dir.fnode.rstat;
140 if (!(dir.fnode.rstat == dir.fnode.accounted_rstat))
141 out << "/" << dir.fnode.accounted_rstat;
142 if (g_conf->mds_debug_scatterstat && dir.is_projected()) {
143 const fnode_t *pf = dir.get_projected_fnode();
144 out << "->" << pf->rstat;
145 if (!(pf->rstat == pf->accounted_rstat))
146 out << "/" << pf->accounted_rstat;
147 }
148
149 out << " hs=" << dir.get_num_head_items() << "+" << dir.get_num_head_null();
150 out << ",ss=" << dir.get_num_snap_items() << "+" << dir.get_num_snap_null();
151 if (dir.get_num_dirty())
152 out << " dirty=" << dir.get_num_dirty();
153
154 if (dir.get_num_ref()) {
155 out << " |";
156 dir.print_pin_set(out);
157 }
158
159 out << " " << &dir;
160 return out << "]";
161}
162
163
164void CDir::print(ostream& out)
165{
166 out << *this;
167}
168
169
170
171
172ostream& CDir::print_db_line_prefix(ostream& out)
173{
174 return out << ceph_clock_now() << " mds." << cache->mds->get_nodeid() << ".cache.dir(" << this->dirfrag() << ") ";
175}
176
177
178
179// -------------------------------------------------------------------
180// CDir
181
182CDir::CDir(CInode *in, frag_t fg, MDCache *mdcache, bool auth) :
183 cache(mdcache), inode(in), frag(fg),
184 first(2),
185 dirty_rstat_inodes(member_offset(CInode, dirty_rstat_item)),
186 projected_version(0), item_dirty(this), item_new(this),
187 num_head_items(0), num_head_null(0),
188 num_snap_items(0), num_snap_null(0),
189 num_dirty(0), committing_version(0), committed_version(0),
190 dir_auth_pins(0), request_pins(0),
191 dir_rep(REP_NONE),
192 pop_me(ceph_clock_now()),
193 pop_nested(ceph_clock_now()),
194 pop_auth_subtree(ceph_clock_now()),
195 pop_auth_subtree_nested(ceph_clock_now()),
196 num_dentries_nested(0), num_dentries_auth_subtree(0),
197 num_dentries_auth_subtree_nested(0),
198 dir_auth(CDIR_AUTH_DEFAULT)
199{
200 state = STATE_INITIAL;
201
202 memset(&fnode, 0, sizeof(fnode));
203
204 // auth
205 assert(in->is_dir());
206 if (auth)
207 state |= STATE_AUTH;
208}
209
210/**
211 * Check the recursive statistics on size for consistency.
212 * If mds_debug_scatterstat is enabled, assert for correctness,
213 * otherwise just print out the mismatch and continue.
214 */
215bool CDir::check_rstats(bool scrub)
216{
217 if (!g_conf->mds_debug_scatterstat && !scrub)
218 return true;
219
220 dout(25) << "check_rstats on " << this << dendl;
221 if (!is_complete() || !is_auth() || is_frozen()) {
222 assert(!scrub);
223 dout(10) << "check_rstats bailing out -- incomplete or non-auth or frozen dir!" << dendl;
224 return true;
225 }
226
227 frag_info_t frag_info;
228 nest_info_t nest_info;
229 for (map_t::iterator i = items.begin(); i != items.end(); ++i) {
230 if (i->second->last != CEPH_NOSNAP)
231 continue;
232 CDentry::linkage_t *dnl = i->second->get_linkage();
233 if (dnl->is_primary()) {
234 CInode *in = dnl->get_inode();
235 nest_info.add(in->inode.accounted_rstat);
236 if (in->is_dir())
237 frag_info.nsubdirs++;
238 else
239 frag_info.nfiles++;
240 } else if (dnl->is_remote())
241 frag_info.nfiles++;
242 }
243
244 bool good = true;
245 // fragstat
246 if(!frag_info.same_sums(fnode.fragstat)) {
247 dout(1) << "mismatch between head items and fnode.fragstat! printing dentries" << dendl;
248 dout(1) << "get_num_head_items() = " << get_num_head_items()
249 << "; fnode.fragstat.nfiles=" << fnode.fragstat.nfiles
250 << " fnode.fragstat.nsubdirs=" << fnode.fragstat.nsubdirs << dendl;
251 good = false;
252 } else {
253 dout(20) << "get_num_head_items() = " << get_num_head_items()
254 << "; fnode.fragstat.nfiles=" << fnode.fragstat.nfiles
255 << " fnode.fragstat.nsubdirs=" << fnode.fragstat.nsubdirs << dendl;
256 }
257
258 // rstat
259 if (!nest_info.same_sums(fnode.rstat)) {
260 dout(1) << "mismatch between child accounted_rstats and my rstats!" << dendl;
261 dout(1) << "total of child dentrys: " << nest_info << dendl;
262 dout(1) << "my rstats: " << fnode.rstat << dendl;
263 good = false;
264 } else {
265 dout(20) << "total of child dentrys: " << nest_info << dendl;
266 dout(20) << "my rstats: " << fnode.rstat << dendl;
267 }
268
269 if (!good) {
270 if (!scrub) {
271 for (map_t::iterator i = items.begin(); i != items.end(); ++i) {
272 CDentry *dn = i->second;
273 if (dn->get_linkage()->is_primary()) {
274 CInode *in = dn->get_linkage()->inode;
275 dout(1) << *dn << " rstat " << in->inode.accounted_rstat << dendl;
276 } else {
277 dout(1) << *dn << dendl;
278 }
279 }
280
281 assert(frag_info.nfiles == fnode.fragstat.nfiles);
282 assert(frag_info.nsubdirs == fnode.fragstat.nsubdirs);
283 assert(nest_info.rbytes == fnode.rstat.rbytes);
284 assert(nest_info.rfiles == fnode.rstat.rfiles);
285 assert(nest_info.rsubdirs == fnode.rstat.rsubdirs);
286 }
287 }
288 dout(10) << "check_rstats complete on " << this << dendl;
289 return good;
290}
291
292CDentry *CDir::lookup(const string& name, snapid_t snap)
293{
294 dout(20) << "lookup (" << snap << ", '" << name << "')" << dendl;
295 map_t::iterator iter = items.lower_bound(dentry_key_t(snap, name.c_str(),
296 inode->hash_dentry_name(name)));
297 if (iter == items.end())
298 return 0;
299 if (iter->second->name == name &&
300 iter->second->first <= snap &&
301 iter->second->last >= snap) {
302 dout(20) << " hit -> " << iter->first << dendl;
303 return iter->second;
304 }
305 dout(20) << " miss -> " << iter->first << dendl;
306 return 0;
307}
308
309CDentry *CDir::lookup_exact_snap(const string& name, snapid_t last) {
310 map_t::iterator p = items.find(dentry_key_t(last, name.c_str(),
311 inode->hash_dentry_name(name)));
312 if (p == items.end())
313 return NULL;
314 return p->second;
315}
316
317/***
318 * linking fun
319 */
320
321CDentry* CDir::add_null_dentry(const string& dname,
322 snapid_t first, snapid_t last)
323{
324 // foreign
325 assert(lookup_exact_snap(dname, last) == 0);
326
327 // create dentry
328 CDentry* dn = new CDentry(dname, inode->hash_dentry_name(dname), first, last);
329 if (is_auth())
330 dn->state_set(CDentry::STATE_AUTH);
31f18b77
FG
331
332 cache->bottom_lru.lru_insert_mid(dn);
333 dn->state_set(CDentry::STATE_BOTTOMLRU);
7c673cae
FG
334
335 dn->dir = this;
336 dn->version = get_projected_version();
337
338 // add to dir
339 assert(items.count(dn->key()) == 0);
340 //assert(null_items.count(dn->name) == 0);
341
342 items[dn->key()] = dn;
343 if (last == CEPH_NOSNAP)
344 num_head_null++;
345 else
346 num_snap_null++;
347
348 if (state_test(CDir::STATE_DNPINNEDFRAG)) {
349 dn->get(CDentry::PIN_FRAGMENTING);
350 dn->state_set(CDentry::STATE_FRAGMENTING);
351 }
352
353 dout(12) << "add_null_dentry " << *dn << dendl;
354
355 // pin?
356 if (get_num_any() == 1)
357 get(PIN_CHILD);
358
359 assert(get_num_any() == items.size());
360 return dn;
361}
362
363
364CDentry* CDir::add_primary_dentry(const string& dname, CInode *in,
365 snapid_t first, snapid_t last)
366{
367 // primary
368 assert(lookup_exact_snap(dname, last) == 0);
369
370 // create dentry
371 CDentry* dn = new CDentry(dname, inode->hash_dentry_name(dname), first, last);
372 if (is_auth())
373 dn->state_set(CDentry::STATE_AUTH);
31f18b77
FG
374 if (is_auth() || !inode->is_stray()) {
375 cache->lru.lru_insert_mid(dn);
376 } else {
377 cache->bottom_lru.lru_insert_mid(dn);
378 dn->state_set(CDentry::STATE_BOTTOMLRU);
379 }
7c673cae
FG
380
381 dn->dir = this;
382 dn->version = get_projected_version();
383
384 // add to dir
385 assert(items.count(dn->key()) == 0);
386 //assert(null_items.count(dn->name) == 0);
387
388 items[dn->key()] = dn;
389
390 dn->get_linkage()->inode = in;
391 in->set_primary_parent(dn);
392
393 link_inode_work(dn, in);
394
395 if (dn->last == CEPH_NOSNAP)
396 num_head_items++;
397 else
398 num_snap_items++;
399
400 if (state_test(CDir::STATE_DNPINNEDFRAG)) {
401 dn->get(CDentry::PIN_FRAGMENTING);
402 dn->state_set(CDentry::STATE_FRAGMENTING);
403 }
404
405 dout(12) << "add_primary_dentry " << *dn << dendl;
406
407 // pin?
408 if (get_num_any() == 1)
409 get(PIN_CHILD);
410 assert(get_num_any() == items.size());
411 return dn;
412}
413
414CDentry* CDir::add_remote_dentry(const string& dname, inodeno_t ino, unsigned char d_type,
415 snapid_t first, snapid_t last)
416{
417 // foreign
418 assert(lookup_exact_snap(dname, last) == 0);
419
420 // create dentry
421 CDentry* dn = new CDentry(dname, inode->hash_dentry_name(dname), ino, d_type, first, last);
422 if (is_auth())
423 dn->state_set(CDentry::STATE_AUTH);
424 cache->lru.lru_insert_mid(dn);
425
426 dn->dir = this;
427 dn->version = get_projected_version();
428
429 // add to dir
430 assert(items.count(dn->key()) == 0);
431 //assert(null_items.count(dn->name) == 0);
432
433 items[dn->key()] = dn;
434 if (last == CEPH_NOSNAP)
435 num_head_items++;
436 else
437 num_snap_items++;
438
439 if (state_test(CDir::STATE_DNPINNEDFRAG)) {
440 dn->get(CDentry::PIN_FRAGMENTING);
441 dn->state_set(CDentry::STATE_FRAGMENTING);
442 }
443
444 dout(12) << "add_remote_dentry " << *dn << dendl;
445
446 // pin?
447 if (get_num_any() == 1)
448 get(PIN_CHILD);
449
450 assert(get_num_any() == items.size());
451 return dn;
452}
453
454
455
456void CDir::remove_dentry(CDentry *dn)
457{
458 dout(12) << "remove_dentry " << *dn << dendl;
459
460 // there should be no client leases at this point!
461 assert(dn->client_lease_map.empty());
462
463 if (state_test(CDir::STATE_DNPINNEDFRAG)) {
464 dn->put(CDentry::PIN_FRAGMENTING);
465 dn->state_clear(CDentry::STATE_FRAGMENTING);
466 }
467
468 if (dn->get_linkage()->is_null()) {
469 if (dn->last == CEPH_NOSNAP)
470 num_head_null--;
471 else
472 num_snap_null--;
473 } else {
474 if (dn->last == CEPH_NOSNAP)
475 num_head_items--;
476 else
477 num_snap_items--;
478 }
479
480 if (!dn->get_linkage()->is_null())
481 // detach inode and dentry
482 unlink_inode_work(dn);
483
484 // remove from list
485 assert(items.count(dn->key()) == 1);
486 items.erase(dn->key());
487
488 // clean?
489 if (dn->is_dirty())
490 dn->mark_clean();
491
31f18b77
FG
492 if (dn->state_test(CDentry::STATE_BOTTOMLRU))
493 cache->bottom_lru.lru_remove(dn);
494 else
495 cache->lru.lru_remove(dn);
7c673cae
FG
496 delete dn;
497
498 // unpin?
499 if (get_num_any() == 0)
500 put(PIN_CHILD);
501 assert(get_num_any() == items.size());
502}
503
504void CDir::link_remote_inode(CDentry *dn, CInode *in)
505{
506 link_remote_inode(dn, in->ino(), IFTODT(in->get_projected_inode()->mode));
507}
508
509void CDir::link_remote_inode(CDentry *dn, inodeno_t ino, unsigned char d_type)
510{
511 dout(12) << "link_remote_inode " << *dn << " remote " << ino << dendl;
512 assert(dn->get_linkage()->is_null());
513
514 dn->get_linkage()->set_remote(ino, d_type);
515
31f18b77
FG
516 if (dn->state_test(CDentry::STATE_BOTTOMLRU)) {
517 cache->bottom_lru.lru_remove(dn);
518 cache->lru.lru_insert_mid(dn);
519 dn->state_clear(CDentry::STATE_BOTTOMLRU);
520 }
521
7c673cae
FG
522 if (dn->last == CEPH_NOSNAP) {
523 num_head_items++;
524 num_head_null--;
525 } else {
526 num_snap_items++;
527 num_snap_null--;
528 }
529 assert(get_num_any() == items.size());
530}
531
532void CDir::link_primary_inode(CDentry *dn, CInode *in)
533{
534 dout(12) << "link_primary_inode " << *dn << " " << *in << dendl;
535 assert(dn->get_linkage()->is_null());
536
537 dn->get_linkage()->inode = in;
538 in->set_primary_parent(dn);
539
540 link_inode_work(dn, in);
31f18b77
FG
541
542 if (dn->state_test(CDentry::STATE_BOTTOMLRU) &&
543 (is_auth() || !inode->is_stray())) {
544 cache->bottom_lru.lru_remove(dn);
545 cache->lru.lru_insert_mid(dn);
546 dn->state_clear(CDentry::STATE_BOTTOMLRU);
547 }
7c673cae
FG
548
549 if (dn->last == CEPH_NOSNAP) {
550 num_head_items++;
551 num_head_null--;
552 } else {
553 num_snap_items++;
554 num_snap_null--;
555 }
556
557 assert(get_num_any() == items.size());
558}
559
560void CDir::link_inode_work( CDentry *dn, CInode *in)
561{
562 assert(dn->get_linkage()->get_inode() == in);
563 assert(in->get_parent_dn() == dn);
564
565 // set inode version
566 //in->inode.version = dn->get_version();
567
568 // pin dentry?
569 if (in->get_num_ref())
570 dn->get(CDentry::PIN_INODEPIN);
571
572 // adjust auth pin count
573 if (in->auth_pins + in->nested_auth_pins)
574 dn->adjust_nested_auth_pins(in->auth_pins + in->nested_auth_pins, in->auth_pins, NULL);
575
576 // verify open snaprealm parent
577 if (in->snaprealm)
578 in->snaprealm->adjust_parent();
579 else if (in->is_any_caps())
580 in->move_to_realm(inode->find_snaprealm());
581}
582
31f18b77 583void CDir::unlink_inode(CDentry *dn, bool adjust_lru)
7c673cae
FG
584{
585 if (dn->get_linkage()->is_primary()) {
586 dout(12) << "unlink_inode " << *dn << " " << *dn->get_linkage()->get_inode() << dendl;
587 } else {
588 dout(12) << "unlink_inode " << *dn << dendl;
589 }
590
591 unlink_inode_work(dn);
592
31f18b77
FG
593 if (adjust_lru && !dn->state_test(CDentry::STATE_BOTTOMLRU)) {
594 cache->lru.lru_remove(dn);
595 cache->bottom_lru.lru_insert_mid(dn);
596 dn->state_set(CDentry::STATE_BOTTOMLRU);
597 }
598
7c673cae
FG
599 if (dn->last == CEPH_NOSNAP) {
600 num_head_items--;
601 num_head_null++;
602 } else {
603 num_snap_items--;
604 num_snap_null++;
605 }
606 assert(get_num_any() == items.size());
607}
608
609
610void CDir::try_remove_unlinked_dn(CDentry *dn)
611{
612 assert(dn->dir == this);
613 assert(dn->get_linkage()->is_null());
614
615 // no pins (besides dirty)?
616 if (dn->get_num_ref() != dn->is_dirty())
617 return;
618
619 // was the dn new?
620 if (dn->is_new()) {
621 dout(10) << "try_remove_unlinked_dn " << *dn << " in " << *this << dendl;
622 if (dn->is_dirty())
623 dn->mark_clean();
624 remove_dentry(dn);
625
626 // NOTE: we may not have any more dirty dentries, but the fnode
627 // still changed, so the directory must remain dirty.
628 }
629}
630
631
632void CDir::unlink_inode_work( CDentry *dn )
633{
634 CInode *in = dn->get_linkage()->get_inode();
635
636 if (dn->get_linkage()->is_remote()) {
637 // remote
638 if (in)
639 dn->unlink_remote(dn->get_linkage());
640
641 dn->get_linkage()->set_remote(0, 0);
642 } else if (dn->get_linkage()->is_primary()) {
643 // primary
644 // unpin dentry?
645 if (in->get_num_ref())
646 dn->put(CDentry::PIN_INODEPIN);
647
648 // unlink auth_pin count
649 if (in->auth_pins + in->nested_auth_pins)
650 dn->adjust_nested_auth_pins(0 - (in->auth_pins + in->nested_auth_pins), 0 - in->auth_pins, NULL);
651
652 // detach inode
653 in->remove_primary_parent(dn);
654 dn->get_linkage()->inode = 0;
655 } else {
656 assert(!dn->get_linkage()->is_null());
657 }
658}
659
660void CDir::add_to_bloom(CDentry *dn)
661{
662 assert(dn->last == CEPH_NOSNAP);
663 if (!bloom) {
664 /* not create bloom filter for incomplete dir that was added by log replay */
665 if (!is_complete())
666 return;
667
668 /* don't maintain bloom filters in standby replay (saves cycles, and also
669 * avoids need to implement clearing it in EExport for #16924) */
670 if (cache->mds->is_standby_replay()) {
671 return;
672 }
673
674 unsigned size = get_num_head_items() + get_num_snap_items();
675 if (size < 100) size = 100;
676 bloom.reset(new bloom_filter(size, 1.0 / size, 0));
677 }
678 /* This size and false positive probability is completely random.*/
679 bloom->insert(dn->name.c_str(), dn->name.size());
680}
681
682bool CDir::is_in_bloom(const string& name)
683{
684 if (!bloom)
685 return false;
686 return bloom->contains(name.c_str(), name.size());
687}
688
689void CDir::remove_null_dentries() {
690 dout(12) << "remove_null_dentries " << *this << dendl;
691
692 CDir::map_t::iterator p = items.begin();
693 while (p != items.end()) {
694 CDentry *dn = p->second;
695 ++p;
696 if (dn->get_linkage()->is_null() && !dn->is_projected())
697 remove_dentry(dn);
698 }
699
700 assert(num_snap_null == 0);
701 assert(num_head_null == 0);
702 assert(get_num_any() == items.size());
703}
704
705/** remove dirty null dentries for deleted directory. the dirfrag will be
706 * deleted soon, so it's safe to not commit dirty dentries.
707 *
708 * This is called when a directory is being deleted, a prerequisite
709 * of which is that its children have been unlinked: we expect to only see
710 * null, unprojected dentries here.
711 */
712void CDir::try_remove_dentries_for_stray()
713{
714 dout(10) << __func__ << dendl;
31f18b77 715 assert(get_parent_dir()->inode->is_stray());
7c673cae
FG
716
717 // clear dirty only when the directory was not snapshotted
718 bool clear_dirty = !inode->snaprealm;
719
720 CDir::map_t::iterator p = items.begin();
721 while (p != items.end()) {
722 CDentry *dn = p->second;
723 ++p;
724 if (dn->last == CEPH_NOSNAP) {
725 assert(!dn->is_projected());
726 assert(dn->get_linkage()->is_null());
727 if (clear_dirty && dn->is_dirty())
728 dn->mark_clean();
729 // It's OK to remove lease prematurely because we will never link
730 // the dentry to inode again.
731 if (dn->is_any_leases())
732 dn->remove_client_leases(cache->mds->locker);
733 if (dn->get_num_ref() == 0)
734 remove_dentry(dn);
735 } else {
736 assert(!dn->is_projected());
737 CDentry::linkage_t *dnl= dn->get_linkage();
738 CInode *in = NULL;
739 if (dnl->is_primary()) {
740 in = dnl->get_inode();
741 if (clear_dirty && in->is_dirty())
742 in->mark_clean();
743 }
744 if (clear_dirty && dn->is_dirty())
745 dn->mark_clean();
746 if (dn->get_num_ref() == 0) {
747 remove_dentry(dn);
748 if (in)
749 cache->remove_inode(in);
750 }
751 }
752 }
753
754 if (clear_dirty && is_dirty())
755 mark_clean();
756}
757
7c673cae
FG
758bool CDir::try_trim_snap_dentry(CDentry *dn, const set<snapid_t>& snaps)
759{
760 assert(dn->last != CEPH_NOSNAP);
761 set<snapid_t>::const_iterator p = snaps.lower_bound(dn->first);
762 CDentry::linkage_t *dnl= dn->get_linkage();
763 CInode *in = 0;
764 if (dnl->is_primary())
765 in = dnl->get_inode();
766 if ((p == snaps.end() || *p > dn->last) &&
767 (dn->get_num_ref() == dn->is_dirty()) &&
768 (!in || in->get_num_ref() == in->is_dirty())) {
769 dout(10) << " purging snapped " << *dn << dendl;
770 if (in && in->is_dirty())
771 in->mark_clean();
772 remove_dentry(dn);
773 if (in) {
774 dout(10) << " purging snapped " << *in << dendl;
775 cache->remove_inode(in);
776 }
777 return true;
778 }
779 return false;
780}
781
782
783void CDir::purge_stale_snap_data(const set<snapid_t>& snaps)
784{
785 dout(10) << "purge_stale_snap_data " << snaps << dendl;
786
787 CDir::map_t::iterator p = items.begin();
788 while (p != items.end()) {
789 CDentry *dn = p->second;
790 ++p;
791
792 if (dn->last == CEPH_NOSNAP)
793 continue;
794
795 try_trim_snap_dentry(dn, snaps);
796 }
797}
798
799
800/**
801 * steal_dentry -- semi-violently move a dentry from one CDir to another
802 * (*) violently, in that nitems, most pins, etc. are not correctly maintained
803 * on the old CDir corpse; must call finish_old_fragment() when finished.
804 */
805void CDir::steal_dentry(CDentry *dn)
806{
807 dout(15) << "steal_dentry " << *dn << dendl;
808
809 items[dn->key()] = dn;
810
811 dn->dir->items.erase(dn->key());
812 if (dn->dir->items.empty())
813 dn->dir->put(PIN_CHILD);
814
815 if (get_num_any() == 0)
816 get(PIN_CHILD);
817 if (dn->get_linkage()->is_null()) {
818 if (dn->last == CEPH_NOSNAP)
819 num_head_null++;
820 else
821 num_snap_null++;
822 } else if (dn->last == CEPH_NOSNAP) {
823 num_head_items++;
824
825 if (dn->get_linkage()->is_primary()) {
826 CInode *in = dn->get_linkage()->get_inode();
827 inode_t *pi = in->get_projected_inode();
828 if (dn->get_linkage()->get_inode()->is_dir())
829 fnode.fragstat.nsubdirs++;
830 else
831 fnode.fragstat.nfiles++;
832 fnode.rstat.rbytes += pi->accounted_rstat.rbytes;
833 fnode.rstat.rfiles += pi->accounted_rstat.rfiles;
834 fnode.rstat.rsubdirs += pi->accounted_rstat.rsubdirs;
835 fnode.rstat.rsnaprealms += pi->accounted_rstat.rsnaprealms;
836 if (pi->accounted_rstat.rctime > fnode.rstat.rctime)
837 fnode.rstat.rctime = pi->accounted_rstat.rctime;
838
839 // move dirty inode rstat to new dirfrag
840 if (in->is_dirty_rstat())
841 dirty_rstat_inodes.push_back(&in->dirty_rstat_item);
842 } else if (dn->get_linkage()->is_remote()) {
843 if (dn->get_linkage()->get_remote_d_type() == DT_DIR)
844 fnode.fragstat.nsubdirs++;
845 else
846 fnode.fragstat.nfiles++;
847 }
848 } else {
849 num_snap_items++;
850 if (dn->get_linkage()->is_primary()) {
851 CInode *in = dn->get_linkage()->get_inode();
852 if (in->is_dirty_rstat())
853 dirty_rstat_inodes.push_back(&in->dirty_rstat_item);
854 }
855 }
856
857 if (dn->auth_pins || dn->nested_auth_pins) {
858 // use the helpers here to maintain the auth_pin invariants on the dir inode
859 int ap = dn->get_num_auth_pins() + dn->get_num_nested_auth_pins();
860 int dap = dn->get_num_dir_auth_pins();
861 assert(dap <= ap);
862 adjust_nested_auth_pins(ap, dap, NULL);
863 dn->dir->adjust_nested_auth_pins(-ap, -dap, NULL);
864 }
865
866 if (dn->is_dirty())
867 num_dirty++;
868
869 dn->dir = this;
870}
871
31f18b77 872void CDir::prepare_old_fragment(map<string_snap_t, std::list<MDSInternalContextBase*> >& dentry_waiters, bool replay)
7c673cae
FG
873{
874 // auth_pin old fragment for duration so that any auth_pinning
875 // during the dentry migration doesn't trigger side effects
876 if (!replay && is_auth())
877 auth_pin(this);
31f18b77
FG
878
879 if (!waiting_on_dentry.empty()) {
880 for (auto p = waiting_on_dentry.begin(); p != waiting_on_dentry.end(); ++p)
881 dentry_waiters[p->first].swap(p->second);
882 waiting_on_dentry.clear();
883 put(PIN_DNWAITER);
884 }
7c673cae
FG
885}
886
887void CDir::prepare_new_fragment(bool replay)
888{
889 if (!replay && is_auth()) {
890 _freeze_dir();
891 mark_complete();
892 }
31f18b77 893 inode->add_dirfrag(this);
7c673cae
FG
894}
895
896void CDir::finish_old_fragment(list<MDSInternalContextBase*>& waiters, bool replay)
897{
898 // take waiters _before_ unfreeze...
899 if (!replay) {
900 take_waiting(WAIT_ANY_MASK, waiters);
901 if (is_auth()) {
902 auth_unpin(this); // pinned in prepare_old_fragment
903 assert(is_frozen_dir());
904 unfreeze_dir();
905 }
906 }
907
908 assert(nested_auth_pins == 0);
909 assert(dir_auth_pins == 0);
910 assert(auth_pins == 0);
911
912 num_head_items = num_head_null = 0;
913 num_snap_items = num_snap_null = 0;
914
915 // this mirrors init_fragment_pins()
916 if (is_auth())
917 clear_replica_map();
918 if (is_dirty())
919 mark_clean();
920 if (state_test(STATE_IMPORTBOUND))
921 put(PIN_IMPORTBOUND);
922 if (state_test(STATE_EXPORTBOUND))
923 put(PIN_EXPORTBOUND);
924 if (is_subtree_root())
925 put(PIN_SUBTREE);
926
927 if (auth_pins > 0)
928 put(PIN_AUTHPIN);
929
930 assert(get_num_ref() == (state_test(STATE_STICKY) ? 1:0));
931}
932
933void CDir::init_fragment_pins()
934{
181888fb 935 if (is_replicated())
7c673cae
FG
936 get(PIN_REPLICATED);
937 if (state_test(STATE_DIRTY))
938 get(PIN_DIRTY);
939 if (state_test(STATE_EXPORTBOUND))
940 get(PIN_EXPORTBOUND);
941 if (state_test(STATE_IMPORTBOUND))
942 get(PIN_IMPORTBOUND);
943 if (is_subtree_root())
944 get(PIN_SUBTREE);
945}
946
947void CDir::split(int bits, list<CDir*>& subs, list<MDSInternalContextBase*>& waiters, bool replay)
948{
949 dout(10) << "split by " << bits << " bits on " << *this << dendl;
950
951 assert(replay || is_complete() || !is_auth());
952
953 list<frag_t> frags;
954 frag.split(bits, frags);
955
956 vector<CDir*> subfrags(1 << bits);
957
958 double fac = 1.0 / (double)(1 << bits); // for scaling load vecs
959
960 version_t rstat_version = inode->get_projected_inode()->rstat.version;
961 version_t dirstat_version = inode->get_projected_inode()->dirstat.version;
962
963 nest_info_t rstatdiff;
964 frag_info_t fragstatdiff;
965 if (fnode.accounted_rstat.version == rstat_version)
966 rstatdiff.add_delta(fnode.accounted_rstat, fnode.rstat);
967 if (fnode.accounted_fragstat.version == dirstat_version)
968 fragstatdiff.add_delta(fnode.accounted_fragstat, fnode.fragstat);
969 dout(10) << " rstatdiff " << rstatdiff << " fragstatdiff " << fragstatdiff << dendl;
970
31f18b77
FG
971 map<string_snap_t, std::list<MDSInternalContextBase*> > dentry_waiters;
972 prepare_old_fragment(dentry_waiters, replay);
7c673cae
FG
973
974 // create subfrag dirs
975 int n = 0;
976 for (list<frag_t>::iterator p = frags.begin(); p != frags.end(); ++p) {
977 CDir *f = new CDir(inode, *p, cache, is_auth());
978 f->state_set(state & (MASK_STATE_FRAGMENT_KEPT | STATE_COMPLETE));
181888fb 979 f->get_replicas() = get_replicas();
7c673cae
FG
980 f->dir_auth = dir_auth;
981 f->init_fragment_pins();
982 f->set_version(get_version());
983
984 f->pop_me = pop_me;
985 f->pop_me.scale(fac);
986
987 // FIXME; this is an approximation
988 f->pop_nested = pop_nested;
989 f->pop_nested.scale(fac);
990 f->pop_auth_subtree = pop_auth_subtree;
991 f->pop_auth_subtree.scale(fac);
992 f->pop_auth_subtree_nested = pop_auth_subtree_nested;
993 f->pop_auth_subtree_nested.scale(fac);
994
995 dout(10) << " subfrag " << *p << " " << *f << dendl;
996 subfrags[n++] = f;
997 subs.push_back(f);
7c673cae
FG
998
999 f->set_dir_auth(get_dir_auth());
1000 f->prepare_new_fragment(replay);
1001 }
1002
1003 // repartition dentries
1004 while (!items.empty()) {
1005 CDir::map_t::iterator p = items.begin();
1006
1007 CDentry *dn = p->second;
1008 frag_t subfrag = inode->pick_dirfrag(dn->name);
1009 int n = (subfrag.value() & (subfrag.mask() ^ frag.mask())) >> subfrag.mask_shift();
1010 dout(15) << " subfrag " << subfrag << " n=" << n << " for " << p->first << dendl;
1011 CDir *f = subfrags[n];
1012 f->steal_dentry(dn);
1013 }
1014
31f18b77
FG
1015 for (auto& p : dentry_waiters) {
1016 frag_t subfrag = inode->pick_dirfrag(p.first.name);
1017 int n = (subfrag.value() & (subfrag.mask() ^ frag.mask())) >> subfrag.mask_shift();
1018 CDir *f = subfrags[n];
1019
1020 if (f->waiting_on_dentry.empty())
1021 f->get(PIN_DNWAITER);
1022 f->waiting_on_dentry[p.first].swap(p.second);
1023 }
1024
7c673cae
FG
1025 // FIXME: handle dirty old rstat
1026
1027 // fix up new frag fragstats
1028 for (int i=0; i<n; i++) {
1029 CDir *f = subfrags[i];
1030 f->fnode.rstat.version = rstat_version;
1031 f->fnode.accounted_rstat = f->fnode.rstat;
1032 f->fnode.fragstat.version = dirstat_version;
1033 f->fnode.accounted_fragstat = f->fnode.fragstat;
1034 dout(10) << " rstat " << f->fnode.rstat << " fragstat " << f->fnode.fragstat
1035 << " on " << *f << dendl;
1036 }
1037
1038 // give any outstanding frag stat differential to first frag
1039 dout(10) << " giving rstatdiff " << rstatdiff << " fragstatdiff" << fragstatdiff
1040 << " to " << *subfrags[0] << dendl;
1041 subfrags[0]->fnode.accounted_rstat.add(rstatdiff);
1042 subfrags[0]->fnode.accounted_fragstat.add(fragstatdiff);
1043
1044 finish_old_fragment(waiters, replay);
1045}
1046
1047void CDir::merge(list<CDir*>& subs, list<MDSInternalContextBase*>& waiters, bool replay)
1048{
1049 dout(10) << "merge " << subs << dendl;
1050
1051 mds_authority_t new_auth = CDIR_AUTH_DEFAULT;
1052 for (auto dir : subs) {
1053 if (dir->get_dir_auth() != CDIR_AUTH_DEFAULT &&
1054 dir->get_dir_auth() != new_auth) {
1055 assert(new_auth == CDIR_AUTH_DEFAULT);
1056 new_auth = dir->get_dir_auth();
1057 }
1058 }
1059
1060 set_dir_auth(new_auth);
1061 prepare_new_fragment(replay);
1062
1063 nest_info_t rstatdiff;
1064 frag_info_t fragstatdiff;
1065 bool touched_mtime, touched_chattr;
1066 version_t rstat_version = inode->get_projected_inode()->rstat.version;
1067 version_t dirstat_version = inode->get_projected_inode()->dirstat.version;
1068
31f18b77
FG
1069 map<string_snap_t, std::list<MDSInternalContextBase*> > dentry_waiters;
1070
7c673cae
FG
1071 for (auto dir : subs) {
1072 dout(10) << " subfrag " << dir->get_frag() << " " << *dir << dendl;
1073 assert(!dir->is_auth() || dir->is_complete() || replay);
1074
1075 if (dir->fnode.accounted_rstat.version == rstat_version)
1076 rstatdiff.add_delta(dir->fnode.accounted_rstat, dir->fnode.rstat);
1077 if (dir->fnode.accounted_fragstat.version == dirstat_version)
1078 fragstatdiff.add_delta(dir->fnode.accounted_fragstat, dir->fnode.fragstat,
1079 &touched_mtime, &touched_chattr);
1080
31f18b77 1081 dir->prepare_old_fragment(dentry_waiters, replay);
7c673cae
FG
1082
1083 // steal dentries
1084 while (!dir->items.empty())
1085 steal_dentry(dir->items.begin()->second);
1086
1087 // merge replica map
181888fb
FG
1088 for (const auto &p : dir->get_replicas()) {
1089 unsigned cur = get_replicas()[p.first];
1090 if (p.second > cur)
1091 get_replicas()[p.first] = p.second;
7c673cae
FG
1092 }
1093
1094 // merge version
1095 if (dir->get_version() > get_version())
1096 set_version(dir->get_version());
1097
1098 // merge state
1099 state_set(dir->get_state() & MASK_STATE_FRAGMENT_KEPT);
7c673cae
FG
1100
1101 dir->finish_old_fragment(waiters, replay);
1102 inode->close_dirfrag(dir->get_frag());
1103 }
1104
31f18b77
FG
1105 if (!dentry_waiters.empty()) {
1106 get(PIN_DNWAITER);
1107 for (auto& p : dentry_waiters) {
1108 waiting_on_dentry[p.first].swap(p.second);
1109 }
1110 }
1111
7c673cae
FG
1112 if (is_auth() && !replay)
1113 mark_complete();
1114
1115 // FIXME: merge dirty old rstat
1116 fnode.rstat.version = rstat_version;
1117 fnode.accounted_rstat = fnode.rstat;
1118 fnode.accounted_rstat.add(rstatdiff);
1119
1120 fnode.fragstat.version = dirstat_version;
1121 fnode.accounted_fragstat = fnode.fragstat;
1122 fnode.accounted_fragstat.add(fragstatdiff);
1123
1124 init_fragment_pins();
1125}
1126
1127
1128
1129
1130void CDir::resync_accounted_fragstat()
1131{
1132 fnode_t *pf = get_projected_fnode();
1133 inode_t *pi = inode->get_projected_inode();
1134
1135 if (pf->accounted_fragstat.version != pi->dirstat.version) {
1136 pf->fragstat.version = pi->dirstat.version;
1137 dout(10) << "resync_accounted_fragstat " << pf->accounted_fragstat << " -> " << pf->fragstat << dendl;
1138 pf->accounted_fragstat = pf->fragstat;
1139 }
1140}
1141
1142/*
1143 * resync rstat and accounted_rstat with inode
1144 */
1145void CDir::resync_accounted_rstat()
1146{
1147 fnode_t *pf = get_projected_fnode();
1148 inode_t *pi = inode->get_projected_inode();
1149
1150 if (pf->accounted_rstat.version != pi->rstat.version) {
1151 pf->rstat.version = pi->rstat.version;
1152 dout(10) << "resync_accounted_rstat " << pf->accounted_rstat << " -> " << pf->rstat << dendl;
1153 pf->accounted_rstat = pf->rstat;
1154 dirty_old_rstat.clear();
1155 }
1156}
1157
1158void CDir::assimilate_dirty_rstat_inodes()
1159{
1160 dout(10) << "assimilate_dirty_rstat_inodes" << dendl;
1161 for (elist<CInode*>::iterator p = dirty_rstat_inodes.begin_use_current();
1162 !p.end(); ++p) {
1163 CInode *in = *p;
1164 assert(in->is_auth());
1165 if (in->is_frozen())
1166 continue;
1167
1168 inode_t *pi = in->project_inode();
1169 pi->version = in->pre_dirty();
1170
1171 inode->mdcache->project_rstat_inode_to_frag(in, this, 0, 0, NULL);
1172 }
1173 state_set(STATE_ASSIMRSTAT);
1174 dout(10) << "assimilate_dirty_rstat_inodes done" << dendl;
1175}
1176
1177void CDir::assimilate_dirty_rstat_inodes_finish(MutationRef& mut, EMetaBlob *blob)
1178{
1179 if (!state_test(STATE_ASSIMRSTAT))
1180 return;
1181 state_clear(STATE_ASSIMRSTAT);
1182 dout(10) << "assimilate_dirty_rstat_inodes_finish" << dendl;
1183 elist<CInode*>::iterator p = dirty_rstat_inodes.begin_use_current();
1184 while (!p.end()) {
1185 CInode *in = *p;
1186 ++p;
1187
1188 if (in->is_frozen())
1189 continue;
1190
1191 CDentry *dn = in->get_projected_parent_dn();
1192
1193 mut->auth_pin(in);
1194 mut->add_projected_inode(in);
1195
1196 in->clear_dirty_rstat();
1197 blob->add_primary_dentry(dn, in, true);
1198 }
1199
1200 if (!dirty_rstat_inodes.empty())
1201 inode->mdcache->mds->locker->mark_updated_scatterlock(&inode->nestlock);
1202}
1203
1204
1205
1206
1207/****************************************
1208 * WAITING
1209 */
1210
1211void CDir::add_dentry_waiter(const string& dname, snapid_t snapid, MDSInternalContextBase *c)
1212{
1213 if (waiting_on_dentry.empty())
1214 get(PIN_DNWAITER);
1215 waiting_on_dentry[string_snap_t(dname, snapid)].push_back(c);
1216 dout(10) << "add_dentry_waiter dentry " << dname
1217 << " snap " << snapid
1218 << " " << c << " on " << *this << dendl;
1219}
1220
1221void CDir::take_dentry_waiting(const string& dname, snapid_t first, snapid_t last,
1222 list<MDSInternalContextBase*>& ls)
1223{
1224 if (waiting_on_dentry.empty())
1225 return;
1226
1227 string_snap_t lb(dname, first);
1228 string_snap_t ub(dname, last);
1229 compact_map<string_snap_t, list<MDSInternalContextBase*> >::iterator p = waiting_on_dentry.lower_bound(lb);
1230 while (p != waiting_on_dentry.end() &&
1231 !(ub < p->first)) {
1232 dout(10) << "take_dentry_waiting dentry " << dname
1233 << " [" << first << "," << last << "] found waiter on snap "
1234 << p->first.snapid
1235 << " on " << *this << dendl;
1236 ls.splice(ls.end(), p->second);
1237 waiting_on_dentry.erase(p++);
1238 }
1239
1240 if (waiting_on_dentry.empty())
1241 put(PIN_DNWAITER);
1242}
1243
1244void CDir::take_sub_waiting(list<MDSInternalContextBase*>& ls)
1245{
1246 dout(10) << "take_sub_waiting" << dendl;
1247 if (!waiting_on_dentry.empty()) {
1248 for (compact_map<string_snap_t, list<MDSInternalContextBase*> >::iterator p = waiting_on_dentry.begin();
1249 p != waiting_on_dentry.end();
1250 ++p)
1251 ls.splice(ls.end(), p->second);
1252 waiting_on_dentry.clear();
1253 put(PIN_DNWAITER);
1254 }
1255}
1256
1257
1258
1259void CDir::add_waiter(uint64_t tag, MDSInternalContextBase *c)
1260{
1261 // hierarchical?
1262
1263 // at free root?
1264 if (tag & WAIT_ATFREEZEROOT) {
1265 if (!(is_freezing_tree_root() || is_frozen_tree_root() ||
1266 is_freezing_dir() || is_frozen_dir())) {
1267 // try parent
1268 dout(10) << "add_waiter " << std::hex << tag << std::dec << " " << c << " should be ATFREEZEROOT, " << *this << " is not root, trying parent" << dendl;
1269 inode->parent->dir->add_waiter(tag, c);
1270 return;
1271 }
1272 }
1273
1274 // at subtree root?
1275 if (tag & WAIT_ATSUBTREEROOT) {
1276 if (!is_subtree_root()) {
1277 // try parent
1278 dout(10) << "add_waiter " << std::hex << tag << std::dec << " " << c << " should be ATSUBTREEROOT, " << *this << " is not root, trying parent" << dendl;
1279 inode->parent->dir->add_waiter(tag, c);
1280 return;
1281 }
1282 }
1283
1284 assert(!(tag & WAIT_CREATED) || state_test(STATE_CREATING));
1285
1286 MDSCacheObject::add_waiter(tag, c);
1287}
1288
1289
1290
1291/* NOTE: this checks dentry waiters too */
1292void CDir::take_waiting(uint64_t mask, list<MDSInternalContextBase*>& ls)
1293{
1294 if ((mask & WAIT_DENTRY) && !waiting_on_dentry.empty()) {
1295 // take all dentry waiters
1296 while (!waiting_on_dentry.empty()) {
1297 compact_map<string_snap_t, list<MDSInternalContextBase*> >::iterator p = waiting_on_dentry.begin();
1298 dout(10) << "take_waiting dentry " << p->first.name
1299 << " snap " << p->first.snapid << " on " << *this << dendl;
1300 ls.splice(ls.end(), p->second);
1301 waiting_on_dentry.erase(p);
1302 }
1303 put(PIN_DNWAITER);
1304 }
1305
1306 // waiting
1307 MDSCacheObject::take_waiting(mask, ls);
1308}
1309
1310
1311void CDir::finish_waiting(uint64_t mask, int result)
1312{
1313 dout(11) << "finish_waiting mask " << hex << mask << dec << " result " << result << " on " << *this << dendl;
1314
1315 list<MDSInternalContextBase*> finished;
1316 take_waiting(mask, finished);
1317 if (result < 0)
1318 finish_contexts(g_ceph_context, finished, result);
1319 else
1320 cache->mds->queue_waiters(finished);
1321}
1322
1323
1324
1325// dirty/clean
1326
1327fnode_t *CDir::project_fnode()
1328{
1329 assert(get_version() != 0);
1330 fnode_t *p = new fnode_t;
1331 *p = *get_projected_fnode();
1332 projected_fnode.push_back(p);
1333
1334 if (scrub_infop && scrub_infop->last_scrub_dirty) {
1335 p->localized_scrub_stamp = scrub_infop->last_local.time;
1336 p->localized_scrub_version = scrub_infop->last_local.version;
1337 p->recursive_scrub_stamp = scrub_infop->last_recursive.time;
1338 p->recursive_scrub_version = scrub_infop->last_recursive.version;
1339 scrub_infop->last_scrub_dirty = false;
1340 scrub_maybe_delete_info();
1341 }
1342
1343 dout(10) << "project_fnode " << p << dendl;
1344 return p;
1345}
1346
1347void CDir::pop_and_dirty_projected_fnode(LogSegment *ls)
1348{
1349 assert(!projected_fnode.empty());
1350 dout(15) << "pop_and_dirty_projected_fnode " << projected_fnode.front()
1351 << " v" << projected_fnode.front()->version << dendl;
1352 fnode = *projected_fnode.front();
1353 _mark_dirty(ls);
1354 delete projected_fnode.front();
1355 projected_fnode.pop_front();
1356}
1357
1358
1359version_t CDir::pre_dirty(version_t min)
1360{
1361 if (min > projected_version)
1362 projected_version = min;
1363 ++projected_version;
1364 dout(10) << "pre_dirty " << projected_version << dendl;
1365 return projected_version;
1366}
1367
1368void CDir::mark_dirty(version_t pv, LogSegment *ls)
1369{
1370 assert(get_version() < pv);
1371 assert(pv <= projected_version);
1372 fnode.version = pv;
1373 _mark_dirty(ls);
1374}
1375
1376void CDir::_mark_dirty(LogSegment *ls)
1377{
1378 if (!state_test(STATE_DIRTY)) {
1379 dout(10) << "mark_dirty (was clean) " << *this << " version " << get_version() << dendl;
1380 _set_dirty_flag();
1381 assert(ls);
1382 } else {
1383 dout(10) << "mark_dirty (already dirty) " << *this << " version " << get_version() << dendl;
1384 }
1385 if (ls) {
1386 ls->dirty_dirfrags.push_back(&item_dirty);
1387
1388 // if i've never committed, i need to be before _any_ mention of me is trimmed from the journal.
1389 if (committed_version == 0 && !item_new.is_on_list())
1390 ls->new_dirfrags.push_back(&item_new);
1391 }
1392}
1393
1394void CDir::mark_new(LogSegment *ls)
1395{
1396 ls->new_dirfrags.push_back(&item_new);
1397 state_clear(STATE_CREATING);
1398
1399 list<MDSInternalContextBase*> waiters;
1400 take_waiting(CDir::WAIT_CREATED, waiters);
1401 cache->mds->queue_waiters(waiters);
1402}
1403
1404void CDir::mark_clean()
1405{
1406 dout(10) << "mark_clean " << *this << " version " << get_version() << dendl;
1407 if (state_test(STATE_DIRTY)) {
1408 item_dirty.remove_myself();
1409 item_new.remove_myself();
1410
1411 state_clear(STATE_DIRTY);
1412 put(PIN_DIRTY);
1413 }
1414}
1415
1416// caller should hold auth pin of this
1417void CDir::log_mark_dirty()
1418{
1419 if (is_dirty() || is_projected())
1420 return; // noop if it is already dirty or will be dirty
1421
1422 version_t pv = pre_dirty();
1423 mark_dirty(pv, cache->mds->mdlog->get_current_segment());
1424}
1425
1426void CDir::mark_complete() {
1427 state_set(STATE_COMPLETE);
1428 bloom.reset();
1429}
1430
1431void CDir::first_get()
1432{
1433 inode->get(CInode::PIN_DIRFRAG);
1434}
1435
1436void CDir::last_put()
1437{
1438 inode->put(CInode::PIN_DIRFRAG);
1439}
1440
1441
1442
1443/******************************************************************************
1444 * FETCH and COMMIT
1445 */
1446
1447// -----------------------
1448// FETCH
1449void CDir::fetch(MDSInternalContextBase *c, bool ignore_authpinnability)
1450{
1451 string want;
1452 return fetch(c, want, ignore_authpinnability);
1453}
1454
1455void CDir::fetch(MDSInternalContextBase *c, const string& want_dn, bool ignore_authpinnability)
1456{
1457 dout(10) << "fetch on " << *this << dendl;
1458
1459 assert(is_auth());
1460 assert(!is_complete());
1461
1462 if (!can_auth_pin() && !ignore_authpinnability) {
1463 if (c) {
1464 dout(7) << "fetch waiting for authpinnable" << dendl;
1465 add_waiter(WAIT_UNFREEZE, c);
1466 } else
1467 dout(7) << "fetch not authpinnable and no context" << dendl;
1468 return;
1469 }
1470
1471 // unlinked directory inode shouldn't have any entry
31f18b77
FG
1472 if (!inode->is_base() && get_parent_dir()->inode->is_stray() &&
1473 !inode->snaprealm) {
7c673cae
FG
1474 dout(7) << "fetch dirfrag for unlinked directory, mark complete" << dendl;
1475 if (get_version() == 0) {
31f18b77 1476 assert(inode->is_auth());
7c673cae
FG
1477 set_version(1);
1478
1479 if (state_test(STATE_REJOINUNDEF)) {
1480 assert(cache->mds->is_rejoin());
1481 state_clear(STATE_REJOINUNDEF);
1482 cache->opened_undef_dirfrag(this);
1483 }
1484 }
1485 mark_complete();
1486
1487 if (c)
1488 cache->mds->queue_waiter(c);
1489 return;
1490 }
1491
1492 if (c) add_waiter(WAIT_COMPLETE, c);
1493 if (!want_dn.empty()) wanted_items.insert(want_dn);
1494
1495 // already fetching?
1496 if (state_test(CDir::STATE_FETCHING)) {
1497 dout(7) << "already fetching; waiting" << dendl;
1498 return;
1499 }
1500
1501 auth_pin(this);
1502 state_set(CDir::STATE_FETCHING);
1503
1504 if (cache->mds->logger) cache->mds->logger->inc(l_mds_dir_fetch);
1505
1506 std::set<dentry_key_t> empty;
1507 _omap_fetch(NULL, empty);
1508}
1509
1510void CDir::fetch(MDSInternalContextBase *c, const std::set<dentry_key_t>& keys)
1511{
1512 dout(10) << "fetch " << keys.size() << " keys on " << *this << dendl;
1513
1514 assert(is_auth());
1515 assert(!is_complete());
1516
1517 if (!can_auth_pin()) {
1518 dout(7) << "fetch keys waiting for authpinnable" << dendl;
1519 add_waiter(WAIT_UNFREEZE, c);
1520 return;
1521 }
1522 if (state_test(CDir::STATE_FETCHING)) {
1523 dout(7) << "fetch keys waiting for full fetch" << dendl;
1524 add_waiter(WAIT_COMPLETE, c);
1525 return;
1526 }
1527
1528 auth_pin(this);
1529 if (cache->mds->logger) cache->mds->logger->inc(l_mds_dir_fetch);
1530
1531 _omap_fetch(c, keys);
1532}
1533
1534class C_IO_Dir_OMAP_FetchedMore : public CDirIOContext {
1535 MDSInternalContextBase *fin;
1536public:
1537 bufferlist hdrbl;
1538 bool more = false;
1539 map<string, bufferlist> omap; ///< carry-over from before
1540 map<string, bufferlist> omap_more; ///< new batch
1541 int ret;
1542 C_IO_Dir_OMAP_FetchedMore(CDir *d, MDSInternalContextBase *f) :
1543 CDirIOContext(d), fin(f), ret(0) { }
1544 void finish(int r) {
1545 // merge results
1546 if (omap.empty()) {
1547 omap.swap(omap_more);
1548 } else {
1549 omap.insert(omap_more.begin(), omap_more.end());
1550 }
1551 if (more) {
1552 dir->_omap_fetch_more(hdrbl, omap, fin);
1553 } else {
1554 dir->_omap_fetched(hdrbl, omap, !fin, r);
1555 if (fin)
1556 fin->complete(r);
1557 }
1558 }
1559};
1560
1561class C_IO_Dir_OMAP_Fetched : public CDirIOContext {
1562 MDSInternalContextBase *fin;
1563public:
1564 bufferlist hdrbl;
1565 bool more = false;
1566 map<string, bufferlist> omap;
1567 bufferlist btbl;
1568 int ret1, ret2, ret3;
1569
1570 C_IO_Dir_OMAP_Fetched(CDir *d, MDSInternalContextBase *f) :
1571 CDirIOContext(d), fin(f), ret1(0), ret2(0), ret3(0) { }
1572 void finish(int r) override {
1573 // check the correctness of backtrace
1574 if (r >= 0 && ret3 != -ECANCELED)
1575 dir->inode->verify_diri_backtrace(btbl, ret3);
1576 if (r >= 0) r = ret1;
1577 if (r >= 0) r = ret2;
1578 if (more) {
1579 dir->_omap_fetch_more(hdrbl, omap, fin);
1580 } else {
1581 dir->_omap_fetched(hdrbl, omap, !fin, r);
1582 if (fin)
1583 fin->complete(r);
1584 }
1585 }
1586};
1587
1588void CDir::_omap_fetch(MDSInternalContextBase *c, const std::set<dentry_key_t>& keys)
1589{
1590 C_IO_Dir_OMAP_Fetched *fin = new C_IO_Dir_OMAP_Fetched(this, c);
1591 object_t oid = get_ondisk_object();
1592 object_locator_t oloc(cache->mds->mdsmap->get_metadata_pool());
1593 ObjectOperation rd;
1594 rd.omap_get_header(&fin->hdrbl, &fin->ret1);
1595 if (keys.empty()) {
1596 assert(!c);
1597 rd.omap_get_vals("", "", g_conf->mds_dir_keys_per_op,
1598 &fin->omap, &fin->more, &fin->ret2);
1599 } else {
1600 assert(c);
1601 std::set<std::string> str_keys;
1602 for (auto p = keys.begin(); p != keys.end(); ++p) {
1603 string str;
1604 p->encode(str);
1605 str_keys.insert(str);
1606 }
1607 rd.omap_get_vals_by_keys(str_keys, &fin->omap, &fin->ret2);
1608 }
1609 // check the correctness of backtrace
1610 if (g_conf->mds_verify_backtrace > 0 && frag == frag_t()) {
1611 rd.getxattr("parent", &fin->btbl, &fin->ret3);
1612 rd.set_last_op_flags(CEPH_OSD_OP_FLAG_FAILOK);
1613 } else {
1614 fin->ret3 = -ECANCELED;
1615 }
1616
1617 cache->mds->objecter->read(oid, oloc, rd, CEPH_NOSNAP, NULL, 0,
1618 new C_OnFinisher(fin, cache->mds->finisher));
1619}
1620
1621void CDir::_omap_fetch_more(
1622 bufferlist& hdrbl,
1623 map<string, bufferlist>& omap,
1624 MDSInternalContextBase *c)
1625{
1626 // we have more omap keys to fetch!
1627 object_t oid = get_ondisk_object();
1628 object_locator_t oloc(cache->mds->mdsmap->get_metadata_pool());
1629 C_IO_Dir_OMAP_FetchedMore *fin = new C_IO_Dir_OMAP_FetchedMore(this, c);
1630 fin->hdrbl.claim(hdrbl);
1631 fin->omap.swap(omap);
1632 ObjectOperation rd;
1633 rd.omap_get_vals(fin->omap.rbegin()->first,
1634 "", /* filter prefix */
1635 g_conf->mds_dir_keys_per_op,
1636 &fin->omap_more,
1637 &fin->more,
1638 &fin->ret);
1639 cache->mds->objecter->read(oid, oloc, rd, CEPH_NOSNAP, NULL, 0,
1640 new C_OnFinisher(fin, cache->mds->finisher));
1641}
1642
1643CDentry *CDir::_load_dentry(
1644 const std::string &key,
1645 const std::string &dname,
1646 const snapid_t last,
1647 bufferlist &bl,
1648 const int pos,
1649 const std::set<snapid_t> *snaps,
1650 bool *force_dirty,
1651 list<CInode*> *undef_inodes)
1652{
1653 bufferlist::iterator q = bl.begin();
1654
1655 snapid_t first;
1656 ::decode(first, q);
1657
1658 // marker
1659 char type;
1660 ::decode(type, q);
1661
1662 dout(20) << "_fetched pos " << pos << " marker '" << type << "' dname '" << dname
1663 << " [" << first << "," << last << "]"
1664 << dendl;
1665
1666 bool stale = false;
1667 if (snaps && last != CEPH_NOSNAP) {
1668 set<snapid_t>::const_iterator p = snaps->lower_bound(first);
1669 if (p == snaps->end() || *p > last) {
1670 dout(10) << " skipping stale dentry on [" << first << "," << last << "]" << dendl;
1671 stale = true;
1672 }
1673 }
1674
1675 /*
1676 * look for existing dentry for _last_ snap, because unlink +
1677 * create may leave a "hole" (epochs during which the dentry
1678 * doesn't exist) but for which no explicit negative dentry is in
1679 * the cache.
1680 */
1681 CDentry *dn;
1682 if (stale)
1683 dn = lookup_exact_snap(dname, last);
1684 else
1685 dn = lookup(dname, last);
1686
1687 if (type == 'L') {
1688 // hard link
1689 inodeno_t ino;
1690 unsigned char d_type;
1691 ::decode(ino, q);
1692 ::decode(d_type, q);
1693
1694 if (stale) {
1695 if (!dn) {
1696 stale_items.insert(key);
1697 *force_dirty = true;
1698 }
1699 return dn;
1700 }
1701
1702 if (dn) {
1703 if (dn->get_linkage()->get_inode() == 0) {
1704 dout(12) << "_fetched had NEG dentry " << *dn << dendl;
1705 } else {
1706 dout(12) << "_fetched had dentry " << *dn << dendl;
1707 }
1708 } else {
1709 // (remote) link
1710 dn = add_remote_dentry(dname, ino, d_type, first, last);
1711
1712 // link to inode?
1713 CInode *in = cache->get_inode(ino); // we may or may not have it.
1714 if (in) {
1715 dn->link_remote(dn->get_linkage(), in);
1716 dout(12) << "_fetched got remote link " << ino << " which we have " << *in << dendl;
1717 } else {
1718 dout(12) << "_fetched got remote link " << ino << " (dont' have it)" << dendl;
1719 }
1720 }
1721 }
1722 else if (type == 'I') {
1723 // inode
1724
1725 // Load inode data before looking up or constructing CInode
1726 InodeStore inode_data;
1727 inode_data.decode_bare(q);
1728
1729 if (stale) {
1730 if (!dn) {
1731 stale_items.insert(key);
1732 *force_dirty = true;
1733 }
1734 return dn;
1735 }
1736
1737 bool undef_inode = false;
1738 if (dn) {
1739 CInode *in = dn->get_linkage()->get_inode();
1740 if (in) {
1741 dout(12) << "_fetched had dentry " << *dn << dendl;
1742 if (in->state_test(CInode::STATE_REJOINUNDEF)) {
1743 undef_inodes->push_back(in);
1744 undef_inode = true;
1745 }
1746 } else
1747 dout(12) << "_fetched had NEG dentry " << *dn << dendl;
1748 }
1749
1750 if (!dn || undef_inode) {
1751 // add inode
1752 CInode *in = cache->get_inode(inode_data.inode.ino, last);
1753 if (!in || undef_inode) {
1754 if (undef_inode && in)
1755 in->first = first;
1756 else
1757 in = new CInode(cache, true, first, last);
1758
1759 in->inode = inode_data.inode;
1760 // symlink?
1761 if (in->is_symlink())
1762 in->symlink = inode_data.symlink;
1763
1764 in->dirfragtree.swap(inode_data.dirfragtree);
1765 in->xattrs.swap(inode_data.xattrs);
1766 in->old_inodes.swap(inode_data.old_inodes);
1767 if (!in->old_inodes.empty()) {
1768 snapid_t min_first = in->old_inodes.rbegin()->first + 1;
1769 if (min_first > in->first)
1770 in->first = min_first;
1771 }
1772
1773 in->oldest_snap = inode_data.oldest_snap;
1774 in->decode_snap_blob(inode_data.snap_blob);
1775 if (snaps && !in->snaprealm)
1776 in->purge_stale_snap_data(*snaps);
1777
1778 if (!undef_inode) {
1779 cache->add_inode(in); // add
1780 dn = add_primary_dentry(dname, in, first, last); // link
1781 }
1782 dout(12) << "_fetched got " << *dn << " " << *in << dendl;
1783
1784 if (in->inode.is_dirty_rstat())
1785 in->mark_dirty_rstat();
1786
1787 //in->hack_accessed = false;
1788 //in->hack_load_stamp = ceph_clock_now();
1789 //num_new_inodes_loaded++;
1790 } else {
1791 dout(0) << "_fetched badness: got (but i already had) " << *in
1792 << " mode " << in->inode.mode
1793 << " mtime " << in->inode.mtime << dendl;
1794 string dirpath, inopath;
1795 this->inode->make_path_string(dirpath);
1796 in->make_path_string(inopath);
1797 cache->mds->clog->error() << "loaded dup inode " << inode_data.inode.ino
1798 << " [" << first << "," << last << "] v" << inode_data.inode.version
1799 << " at " << dirpath << "/" << dname
1800 << ", but inode " << in->vino() << " v" << in->inode.version
1801 << " already exists at " << inopath;
1802 return dn;
1803 }
1804 }
1805 } else {
1806 std::ostringstream oss;
1807 oss << "Invalid tag char '" << type << "' pos " << pos;
1808 throw buffer::malformed_input(oss.str());
1809 }
1810
1811 return dn;
1812}
1813
1814void CDir::_omap_fetched(bufferlist& hdrbl, map<string, bufferlist>& omap,
1815 bool complete, int r)
1816{
1817 LogChannelRef clog = cache->mds->clog;
1818 dout(10) << "_fetched header " << hdrbl.length() << " bytes "
1819 << omap.size() << " keys for " << *this << dendl;
1820
1821 assert(r == 0 || r == -ENOENT || r == -ENODATA);
1822 assert(is_auth());
1823 assert(!is_frozen());
1824
1825 if (hdrbl.length() == 0) {
1826 dout(0) << "_fetched missing object for " << *this << dendl;
1827
1828 clog->error() << "dir " << dirfrag() << " object missing on disk; some "
1829 "files may be lost (" << get_path() << ")";
1830
1831 go_bad(complete);
1832 return;
1833 }
1834
1835 fnode_t got_fnode;
1836 {
1837 bufferlist::iterator p = hdrbl.begin();
1838 try {
1839 ::decode(got_fnode, p);
1840 } catch (const buffer::error &err) {
1841 derr << "Corrupt fnode in dirfrag " << dirfrag()
1842 << ": " << err << dendl;
1843 clog->warn() << "Corrupt fnode header in " << dirfrag() << ": "
1844 << err << " (" << get_path() << ")";
1845 go_bad(complete);
1846 return;
1847 }
1848 if (!p.end()) {
1849 clog->warn() << "header buffer of dir " << dirfrag() << " has "
1850 << hdrbl.length() - p.get_off() << " extra bytes ("
1851 << get_path() << ")";
1852 go_bad(complete);
1853 return;
1854 }
1855 }
1856
1857 dout(10) << "_fetched version " << got_fnode.version << dendl;
1858
1859 // take the loaded fnode?
1860 // only if we are a fresh CDir* with no prior state.
1861 if (get_version() == 0) {
1862 assert(!is_projected());
1863 assert(!state_test(STATE_COMMITTING));
1864 fnode = got_fnode;
1865 projected_version = committing_version = committed_version = got_fnode.version;
1866
1867 if (state_test(STATE_REJOINUNDEF)) {
1868 assert(cache->mds->is_rejoin());
1869 state_clear(STATE_REJOINUNDEF);
1870 cache->opened_undef_dirfrag(this);
1871 }
1872 }
1873
1874 list<CInode*> undef_inodes;
1875
1876 // purge stale snaps?
1877 // only if we have past_parents open!
1878 bool force_dirty = false;
1879 const set<snapid_t> *snaps = NULL;
1880 SnapRealm *realm = inode->find_snaprealm();
1881 if (!realm->have_past_parents_open()) {
1882 dout(10) << " no snap purge, one or more past parents NOT open" << dendl;
1883 } else if (fnode.snap_purged_thru < realm->get_last_destroyed()) {
1884 snaps = &realm->get_snaps();
1885 dout(10) << " snap_purged_thru " << fnode.snap_purged_thru
1886 << " < " << realm->get_last_destroyed()
1887 << ", snap purge based on " << *snaps << dendl;
1888 if (get_num_snap_items() == 0) {
1889 fnode.snap_purged_thru = realm->get_last_destroyed();
1890 force_dirty = true;
1891 }
1892 }
1893
1894 unsigned pos = omap.size() - 1;
1895 for (map<string, bufferlist>::reverse_iterator p = omap.rbegin();
1896 p != omap.rend();
1897 ++p, --pos) {
1898 string dname;
1899 snapid_t last;
1900 dentry_key_t::decode_helper(p->first, dname, last);
1901
1902 CDentry *dn = NULL;
1903 try {
1904 dn = _load_dentry(
1905 p->first, dname, last, p->second, pos, snaps,
1906 &force_dirty, &undef_inodes);
1907 } catch (const buffer::error &err) {
1908 cache->mds->clog->warn() << "Corrupt dentry '" << dname << "' in "
1909 "dir frag " << dirfrag() << ": "
1910 << err << "(" << get_path() << ")";
1911
1912 // Remember that this dentry is damaged. Subsequent operations
1913 // that try to act directly on it will get their EIOs, but this
1914 // dirfrag as a whole will continue to look okay (minus the
1915 // mysteriously-missing dentry)
1916 go_bad_dentry(last, dname);
1917
1918 // Anyone who was WAIT_DENTRY for this guy will get kicked
1919 // to RetryRequest, and hit the DamageTable-interrogating path.
1920 // Stats will now be bogus because we will think we're complete,
1921 // but have 1 or more missing dentries.
1922 continue;
1923 }
1924
1925 if (dn && (wanted_items.count(dname) > 0 || !complete)) {
1926 dout(10) << " touching wanted dn " << *dn << dendl;
1927 inode->mdcache->touch_dentry(dn);
1928 }
1929
1930 /** clean underwater item?
1931 * Underwater item is something that is dirty in our cache from
1932 * journal replay, but was previously flushed to disk before the
1933 * mds failed.
1934 *
1935 * We only do this is committed_version == 0. that implies either
1936 * - this is a fetch after from a clean/empty CDir is created
1937 * (and has no effect, since the dn won't exist); or
1938 * - this is a fetch after _recovery_, which is what we're worried
1939 * about. Items that are marked dirty from the journal should be
1940 * marked clean if they appear on disk.
1941 */
1942 if (committed_version == 0 &&
1943 dn &&
1944 dn->get_version() <= got_fnode.version &&
1945 dn->is_dirty()) {
1946 dout(10) << "_fetched had underwater dentry " << *dn << ", marking clean" << dendl;
1947 dn->mark_clean();
1948
1949 if (dn->get_linkage()->is_primary()) {
1950 assert(dn->get_linkage()->get_inode()->get_version() <= got_fnode.version);
1951 dout(10) << "_fetched had underwater inode " << *dn->get_linkage()->get_inode() << ", marking clean" << dendl;
1952 dn->get_linkage()->get_inode()->mark_clean();
1953 }
1954 }
1955 }
1956
1957 //cache->mds->logger->inc("newin", num_new_inodes_loaded);
1958
1959 // mark complete, !fetching
1960 if (complete) {
1961 wanted_items.clear();
1962 mark_complete();
1963 state_clear(STATE_FETCHING);
1964
1965 if (scrub_infop && scrub_infop->need_scrub_local) {
1966 scrub_infop->need_scrub_local = false;
1967 scrub_local();
1968 }
1969 }
1970
1971 // open & force frags
1972 while (!undef_inodes.empty()) {
1973 CInode *in = undef_inodes.front();
1974 undef_inodes.pop_front();
1975 in->state_clear(CInode::STATE_REJOINUNDEF);
1976 cache->opened_undef_inode(in);
1977 }
1978
1979 // dirty myself to remove stale snap dentries
1980 if (force_dirty && !inode->mdcache->is_readonly())
1981 log_mark_dirty();
1982
1983 auth_unpin(this);
1984
1985 if (complete) {
1986 // kick waiters
1987 finish_waiting(WAIT_COMPLETE, 0);
1988 }
1989}
1990
1991void CDir::_go_bad()
1992{
1993 if (get_version() == 0)
1994 set_version(1);
1995 state_set(STATE_BADFRAG);
1996 // mark complete, !fetching
1997 mark_complete();
1998 state_clear(STATE_FETCHING);
1999 auth_unpin(this);
2000
2001 // kick waiters
2002 finish_waiting(WAIT_COMPLETE, -EIO);
2003}
2004
2005void CDir::go_bad_dentry(snapid_t last, const std::string &dname)
2006{
2007 dout(10) << "go_bad_dentry " << dname << dendl;
2008 const bool fatal = cache->mds->damage_table.notify_dentry(
2009 inode->ino(), frag, last, dname, get_path() + "/" + dname);
2010 if (fatal) {
2011 cache->mds->damaged();
2012 ceph_abort(); // unreachable, damaged() respawns us
2013 }
2014}
2015
2016void CDir::go_bad(bool complete)
2017{
2018 dout(10) << "go_bad " << frag << dendl;
2019 const bool fatal = cache->mds->damage_table.notify_dirfrag(
2020 inode->ino(), frag, get_path());
2021 if (fatal) {
2022 cache->mds->damaged();
2023 ceph_abort(); // unreachable, damaged() respawns us
2024 }
2025
2026 if (complete)
2027 _go_bad();
2028 else
2029 auth_unpin(this);
2030}
2031
2032// -----------------------
2033// COMMIT
2034
2035/**
2036 * commit
2037 *
2038 * @param want - min version i want committed
2039 * @param c - callback for completion
2040 */
2041void CDir::commit(version_t want, MDSInternalContextBase *c, bool ignore_authpinnability, int op_prio)
2042{
2043 dout(10) << "commit want " << want << " on " << *this << dendl;
2044 if (want == 0) want = get_version();
2045
2046 // preconditions
2047 assert(want <= get_version() || get_version() == 0); // can't commit the future
2048 assert(want > committed_version); // the caller is stupid
2049 assert(is_auth());
2050 assert(ignore_authpinnability || can_auth_pin());
2051
7c673cae
FG
2052 // note: queue up a noop if necessary, so that we always
2053 // get an auth_pin.
2054 if (!c)
2055 c = new C_MDSInternalNoop;
2056
2057 // auth_pin on first waiter
2058 if (waiting_for_commit.empty())
2059 auth_pin(this);
2060 waiting_for_commit[want].push_back(c);
2061
2062 // ok.
2063 _commit(want, op_prio);
2064}
2065
2066class C_IO_Dir_Committed : public CDirIOContext {
2067 version_t version;
2068public:
2069 C_IO_Dir_Committed(CDir *d, version_t v) : CDirIOContext(d), version(v) { }
2070 void finish(int r) override {
2071 dir->_committed(r, version);
2072 }
2073};
2074
2075/**
2076 * Flush out the modified dentries in this dir. Keep the bufferlist
2077 * below max_write_size;
2078 */
2079void CDir::_omap_commit(int op_prio)
2080{
2081 dout(10) << "_omap_commit" << dendl;
2082
2083 unsigned max_write_size = cache->max_dir_commit_size;
2084 unsigned write_size = 0;
2085
2086 if (op_prio < 0)
2087 op_prio = CEPH_MSG_PRIO_DEFAULT;
2088
2089 // snap purge?
2090 const set<snapid_t> *snaps = NULL;
2091 SnapRealm *realm = inode->find_snaprealm();
2092 if (!realm->have_past_parents_open()) {
2093 dout(10) << " no snap purge, one or more past parents NOT open" << dendl;
2094 } else if (fnode.snap_purged_thru < realm->get_last_destroyed()) {
2095 snaps = &realm->get_snaps();
2096 dout(10) << " snap_purged_thru " << fnode.snap_purged_thru
2097 << " < " << realm->get_last_destroyed()
2098 << ", snap purge based on " << *snaps << dendl;
2099 // fnode.snap_purged_thru = realm->get_last_destroyed();
2100 }
2101
2102 set<string> to_remove;
2103 map<string, bufferlist> to_set;
2104
2105 C_GatherBuilder gather(g_ceph_context,
2106 new C_OnFinisher(new C_IO_Dir_Committed(this,
2107 get_version()),
2108 cache->mds->finisher));
2109
2110 SnapContext snapc;
2111 object_t oid = get_ondisk_object();
2112 object_locator_t oloc(cache->mds->mdsmap->get_metadata_pool());
2113
2114 if (!stale_items.empty()) {
2115 for (compact_set<string>::iterator p = stale_items.begin();
2116 p != stale_items.end();
2117 ++p) {
2118 to_remove.insert(*p);
2119 write_size += (*p).length();
2120 }
2121 stale_items.clear();
2122 }
2123
2124 for (map_t::iterator p = items.begin();
2125 p != items.end(); ) {
2126 CDentry *dn = p->second;
2127 ++p;
2128
2129 string key;
2130 dn->key().encode(key);
2131
2132 if (dn->last != CEPH_NOSNAP &&
2133 snaps && try_trim_snap_dentry(dn, *snaps)) {
2134 dout(10) << " rm " << key << dendl;
2135 write_size += key.length();
2136 to_remove.insert(key);
2137 continue;
2138 }
2139
2140 if (!dn->is_dirty() &&
2141 (!dn->state_test(CDentry::STATE_FRAGMENTING) || dn->get_linkage()->is_null()))
2142 continue; // skip clean dentries
2143
2144 if (dn->get_linkage()->is_null()) {
2145 dout(10) << " rm " << dn->name << " " << *dn << dendl;
2146 write_size += key.length();
2147 to_remove.insert(key);
2148 } else {
2149 dout(10) << " set " << dn->name << " " << *dn << dendl;
2150 bufferlist dnbl;
2151 _encode_dentry(dn, dnbl, snaps);
2152 write_size += key.length() + dnbl.length();
2153 to_set[key].swap(dnbl);
2154 }
2155
2156 if (write_size >= max_write_size) {
2157 ObjectOperation op;
2158 op.priority = op_prio;
2159
2160 // don't create new dirfrag blindly
2161 if (!is_new() && !state_test(CDir::STATE_FRAGMENTING))
2162 op.stat(NULL, (ceph::real_time*) NULL, NULL);
2163
2164 if (!to_set.empty())
2165 op.omap_set(to_set);
2166 if (!to_remove.empty())
2167 op.omap_rm_keys(to_remove);
2168
2169 cache->mds->objecter->mutate(oid, oloc, op, snapc,
2170 ceph::real_clock::now(),
2171 0, gather.new_sub());
2172
2173 write_size = 0;
2174 to_set.clear();
2175 to_remove.clear();
2176 }
2177 }
2178
2179 ObjectOperation op;
2180 op.priority = op_prio;
2181
2182 // don't create new dirfrag blindly
2183 if (!is_new() && !state_test(CDir::STATE_FRAGMENTING))
2184 op.stat(NULL, (ceph::real_time*)NULL, NULL);
2185
2186 /*
2187 * save the header at the last moment.. If we were to send it off before other
2188 * updates, but die before sending them all, we'd think that the on-disk state
2189 * was fully committed even though it wasn't! However, since the messages are
2190 * strictly ordered between the MDS and the OSD, and since messages to a given
2191 * PG are strictly ordered, if we simply send the message containing the header
2192 * off last, we cannot get our header into an incorrect state.
2193 */
2194 bufferlist header;
2195 ::encode(fnode, header);
2196 op.omap_set_header(header);
2197
2198 if (!to_set.empty())
2199 op.omap_set(to_set);
2200 if (!to_remove.empty())
2201 op.omap_rm_keys(to_remove);
2202
2203 cache->mds->objecter->mutate(oid, oloc, op, snapc,
2204 ceph::real_clock::now(),
2205 0, gather.new_sub());
2206
2207 gather.activate();
2208}
2209
2210void CDir::_encode_dentry(CDentry *dn, bufferlist& bl,
2211 const set<snapid_t> *snaps)
2212{
2213 // clear dentry NEW flag, if any. we can no longer silently drop it.
2214 dn->clear_new();
2215
2216 ::encode(dn->first, bl);
2217
2218 // primary or remote?
2219 if (dn->linkage.is_remote()) {
2220 inodeno_t ino = dn->linkage.get_remote_ino();
2221 unsigned char d_type = dn->linkage.get_remote_d_type();
2222 dout(14) << " pos " << bl.length() << " dn '" << dn->name << "' remote ino " << ino << dendl;
2223
2224 // marker, name, ino
2225 bl.append('L'); // remote link
2226 ::encode(ino, bl);
2227 ::encode(d_type, bl);
2228 } else if (dn->linkage.is_primary()) {
2229 // primary link
2230 CInode *in = dn->linkage.get_inode();
2231 assert(in);
2232
2233 dout(14) << " pos " << bl.length() << " dn '" << dn->name << "' inode " << *in << dendl;
2234
2235 // marker, name, inode, [symlink string]
2236 bl.append('I'); // inode
2237
2238 if (in->is_multiversion()) {
2239 if (!in->snaprealm) {
2240 if (snaps)
2241 in->purge_stale_snap_data(*snaps);
2242 } else if (in->snaprealm->have_past_parents_open()) {
2243 in->purge_stale_snap_data(in->snaprealm->get_snaps());
2244 }
2245 }
2246
2247 bufferlist snap_blob;
2248 in->encode_snap_blob(snap_blob);
2249 in->encode_bare(bl, cache->mds->mdsmap->get_up_features(), &snap_blob);
2250 } else {
2251 assert(!dn->linkage.is_null());
2252 }
2253}
2254
2255void CDir::_commit(version_t want, int op_prio)
2256{
2257 dout(10) << "_commit want " << want << " on " << *this << dendl;
2258
2259 // we can't commit things in the future.
2260 // (even the projected future.)
2261 assert(want <= get_version() || get_version() == 0);
2262
2263 // check pre+postconditions.
2264 assert(is_auth());
2265
2266 // already committed?
2267 if (committed_version >= want) {
2268 dout(10) << "already committed " << committed_version << " >= " << want << dendl;
2269 return;
2270 }
2271 // already committing >= want?
2272 if (committing_version >= want) {
2273 dout(10) << "already committing " << committing_version << " >= " << want << dendl;
2274 assert(state_test(STATE_COMMITTING));
2275 return;
2276 }
2277
2278 // alrady committed an older version?
2279 if (committing_version > committed_version) {
2280 dout(10) << "already committing older " << committing_version << ", waiting for that to finish" << dendl;
2281 return;
2282 }
2283
2284 // commit.
2285 committing_version = get_version();
2286
2287 // mark committing (if not already)
2288 if (!state_test(STATE_COMMITTING)) {
2289 dout(10) << "marking committing" << dendl;
2290 state_set(STATE_COMMITTING);
2291 }
2292
2293 if (cache->mds->logger) cache->mds->logger->inc(l_mds_dir_commit);
2294
2295 _omap_commit(op_prio);
2296}
2297
2298
2299/**
2300 * _committed
2301 *
2302 * @param v version i just committed
2303 */
2304void CDir::_committed(int r, version_t v)
2305{
2306 if (r < 0) {
2307 // the directory could be partly purged during MDS failover
2308 if (r == -ENOENT && committed_version == 0 &&
31f18b77 2309 !inode->is_base() && get_parent_dir()->inode->is_stray()) {
7c673cae 2310 r = 0;
31f18b77
FG
2311 if (inode->snaprealm)
2312 inode->state_set(CInode::STATE_MISSINGOBJS);
7c673cae
FG
2313 }
2314 if (r < 0) {
2315 dout(1) << "commit error " << r << " v " << v << dendl;
2316 cache->mds->clog->error() << "failed to commit dir " << dirfrag() << " object,"
2317 << " errno " << r;
2318 cache->mds->handle_write_error(r);
2319 return;
2320 }
2321 }
2322
2323 dout(10) << "_committed v " << v << " on " << *this << dendl;
2324 assert(is_auth());
2325
2326 bool stray = inode->is_stray();
2327
2328 // take note.
2329 assert(v > committed_version);
2330 assert(v <= committing_version);
2331 committed_version = v;
2332
2333 // _all_ commits done?
2334 if (committing_version == committed_version)
2335 state_clear(CDir::STATE_COMMITTING);
2336
2337 // _any_ commit, even if we've been redirtied, means we're no longer new.
2338 item_new.remove_myself();
2339
2340 // dir clean?
2341 if (committed_version == get_version())
2342 mark_clean();
2343
2344 // dentries clean?
2345 for (map_t::iterator it = items.begin();
2346 it != items.end(); ) {
2347 CDentry *dn = it->second;
2348 ++it;
2349
2350 // inode?
2351 if (dn->linkage.is_primary()) {
2352 CInode *in = dn->linkage.get_inode();
2353 assert(in);
2354 assert(in->is_auth());
2355
2356 if (committed_version >= in->get_version()) {
2357 if (in->is_dirty()) {
2358 dout(15) << " dir " << committed_version << " >= inode " << in->get_version() << " now clean " << *in << dendl;
2359 in->mark_clean();
2360 }
2361 } else {
2362 dout(15) << " dir " << committed_version << " < inode " << in->get_version() << " still dirty " << *in << dendl;
2363 assert(in->is_dirty() || in->last < CEPH_NOSNAP); // special case for cow snap items (not predirtied)
2364 }
2365 }
2366
2367 // dentry
2368 if (committed_version >= dn->get_version()) {
2369 if (dn->is_dirty()) {
2370 dout(15) << " dir " << committed_version << " >= dn " << dn->get_version() << " now clean " << *dn << dendl;
2371 dn->mark_clean();
2372
2373 // drop clean null stray dentries immediately
2374 if (stray &&
2375 dn->get_num_ref() == 0 &&
2376 !dn->is_projected() &&
2377 dn->get_linkage()->is_null())
2378 remove_dentry(dn);
2379 }
2380 } else {
2381 dout(15) << " dir " << committed_version << " < dn " << dn->get_version() << " still dirty " << *dn << dendl;
2382 }
2383 }
2384
2385 // finishers?
2386 bool were_waiters = !waiting_for_commit.empty();
2387
2388 compact_map<version_t, list<MDSInternalContextBase*> >::iterator p = waiting_for_commit.begin();
2389 while (p != waiting_for_commit.end()) {
2390 compact_map<version_t, list<MDSInternalContextBase*> >::iterator n = p;
2391 ++n;
2392 if (p->first > committed_version) {
2393 dout(10) << " there are waiters for " << p->first << ", committing again" << dendl;
2394 _commit(p->first, -1);
2395 break;
2396 }
2397 cache->mds->queue_waiters(p->second);
2398 waiting_for_commit.erase(p);
2399 p = n;
2400 }
2401
2402 // try drop dentries in this dirfrag if it's about to be purged
31f18b77
FG
2403 if (!inode->is_base() && get_parent_dir()->inode->is_stray() &&
2404 inode->snaprealm)
7c673cae
FG
2405 cache->maybe_eval_stray(inode, true);
2406
2407 // unpin if we kicked the last waiter.
2408 if (were_waiters &&
2409 waiting_for_commit.empty())
2410 auth_unpin(this);
2411}
2412
2413
2414
2415
2416// IMPORT/EXPORT
2417
2418void CDir::encode_export(bufferlist& bl)
2419{
2420 assert(!is_projected());
2421 ::encode(first, bl);
2422 ::encode(fnode, bl);
2423 ::encode(dirty_old_rstat, bl);
2424 ::encode(committed_version, bl);
2425
2426 ::encode(state, bl);
2427 ::encode(dir_rep, bl);
2428
2429 ::encode(pop_me, bl);
2430 ::encode(pop_auth_subtree, bl);
2431
2432 ::encode(dir_rep_by, bl);
181888fb 2433 ::encode(get_replicas(), bl);
7c673cae
FG
2434
2435 get(PIN_TEMPEXPORTING);
2436}
2437
2438void CDir::finish_export(utime_t now)
2439{
2440 state &= MASK_STATE_EXPORT_KEPT;
2441 pop_auth_subtree_nested.sub(now, cache->decayrate, pop_auth_subtree);
2442 pop_me.zero(now);
2443 pop_auth_subtree.zero(now);
2444 put(PIN_TEMPEXPORTING);
2445 dirty_old_rstat.clear();
2446}
2447
2448void CDir::decode_import(bufferlist::iterator& blp, utime_t now, LogSegment *ls)
2449{
2450 ::decode(first, blp);
2451 ::decode(fnode, blp);
2452 ::decode(dirty_old_rstat, blp);
2453 projected_version = fnode.version;
2454 ::decode(committed_version, blp);
2455 committing_version = committed_version;
2456
2457 unsigned s;
2458 ::decode(s, blp);
2459 state &= MASK_STATE_IMPORT_KEPT;
2460 state_set(STATE_AUTH | (s & MASK_STATE_EXPORTED));
2461
2462 if (is_dirty()) {
2463 get(PIN_DIRTY);
2464 _mark_dirty(ls);
2465 }
2466
2467 ::decode(dir_rep, blp);
2468
2469 ::decode(pop_me, now, blp);
2470 ::decode(pop_auth_subtree, now, blp);
2471 pop_auth_subtree_nested.add(now, cache->decayrate, pop_auth_subtree);
2472
2473 ::decode(dir_rep_by, blp);
181888fb
FG
2474 ::decode(get_replicas(), blp);
2475 if (is_replicated()) get(PIN_REPLICATED);
7c673cae
FG
2476
2477 replica_nonce = 0; // no longer defined
2478
2479 // did we import some dirty scatterlock data?
2480 if (dirty_old_rstat.size() ||
2481 !(fnode.rstat == fnode.accounted_rstat)) {
2482 cache->mds->locker->mark_updated_scatterlock(&inode->nestlock);
2483 ls->dirty_dirfrag_nest.push_back(&inode->item_dirty_dirfrag_nest);
2484 }
2485 if (!(fnode.fragstat == fnode.accounted_fragstat)) {
2486 cache->mds->locker->mark_updated_scatterlock(&inode->filelock);
2487 ls->dirty_dirfrag_dir.push_back(&inode->item_dirty_dirfrag_dir);
2488 }
2489 if (is_dirty_dft()) {
2490 if (inode->dirfragtreelock.get_state() != LOCK_MIX &&
2491 inode->dirfragtreelock.is_stable()) {
2492 // clear stale dirtydft
2493 state_clear(STATE_DIRTYDFT);
2494 } else {
2495 cache->mds->locker->mark_updated_scatterlock(&inode->dirfragtreelock);
2496 ls->dirty_dirfrag_dirfragtree.push_back(&inode->item_dirty_dirfrag_dirfragtree);
2497 }
2498 }
2499}
2500
2501
2502
2503
2504/********************************
2505 * AUTHORITY
2506 */
2507
2508/*
2509 * if dir_auth.first == parent, auth is same as inode.
2510 * unless .second != unknown, in which case that sticks.
2511 */
2512mds_authority_t CDir::authority() const
2513{
2514 if (is_subtree_root())
2515 return dir_auth;
2516 else
2517 return inode->authority();
2518}
2519
2520/** is_subtree_root()
2521 * true if this is an auth delegation point.
2522 * that is, dir_auth != default (parent,unknown)
2523 *
2524 * some key observations:
2525 * if i am auth:
2526 * - any region bound will be an export, or frozen.
2527 *
2528 * note that this DOES heed dir_auth.pending
2529 */
2530/*
2531bool CDir::is_subtree_root()
2532{
2533 if (dir_auth == CDIR_AUTH_DEFAULT) {
2534 //dout(10) << "is_subtree_root false " << dir_auth << " != " << CDIR_AUTH_DEFAULT
2535 //<< " on " << ino() << dendl;
2536 return false;
2537 } else {
2538 //dout(10) << "is_subtree_root true " << dir_auth << " != " << CDIR_AUTH_DEFAULT
2539 //<< " on " << ino() << dendl;
2540 return true;
2541 }
2542}
2543*/
2544
2545/** contains(x)
2546 * true if we are x, or an ancestor of x
2547 */
2548bool CDir::contains(CDir *x)
2549{
2550 while (1) {
2551 if (x == this)
2552 return true;
2553 x = x->get_inode()->get_projected_parent_dir();
2554 if (x == 0)
2555 return false;
2556 }
2557}
2558
2559
2560
2561/** set_dir_auth
2562 */
2563void CDir::set_dir_auth(mds_authority_t a)
2564{
2565 dout(10) << "setting dir_auth=" << a
2566 << " from " << dir_auth
2567 << " on " << *this << dendl;
2568
2569 bool was_subtree = is_subtree_root();
2570 bool was_ambiguous = dir_auth.second >= 0;
2571
2572 // set it.
2573 dir_auth = a;
2574
2575 // new subtree root?
2576 if (!was_subtree && is_subtree_root()) {
2577 dout(10) << " new subtree root, adjusting auth_pins" << dendl;
2578
2579 // adjust nested auth pins
2580 if (get_cum_auth_pins())
2581 inode->adjust_nested_auth_pins(-1, NULL);
2582
2583 // unpin parent of frozen dir/tree?
224ce89b
WB
2584 if (inode->is_auth()) {
2585 assert(!is_frozen_tree_root());
2586 if (is_frozen_dir())
2587 inode->auth_unpin(this);
2588 }
7c673cae
FG
2589 }
2590 if (was_subtree && !is_subtree_root()) {
2591 dout(10) << " old subtree root, adjusting auth_pins" << dendl;
2592
2593 // adjust nested auth pins
2594 if (get_cum_auth_pins())
2595 inode->adjust_nested_auth_pins(1, NULL);
2596
2597 // pin parent of frozen dir/tree?
224ce89b
WB
2598 if (inode->is_auth()) {
2599 assert(!is_frozen_tree_root());
2600 if (is_frozen_dir())
2601 inode->auth_pin(this);
2602 }
7c673cae
FG
2603 }
2604
2605 // newly single auth?
2606 if (was_ambiguous && dir_auth.second == CDIR_AUTH_UNKNOWN) {
2607 list<MDSInternalContextBase*> ls;
2608 take_waiting(WAIT_SINGLEAUTH, ls);
2609 cache->mds->queue_waiters(ls);
2610 }
2611}
2612
2613
2614/*****************************************
2615 * AUTH PINS and FREEZING
2616 *
2617 * the basic plan is that auth_pins only exist in auth regions, and they
2618 * prevent a freeze (and subsequent auth change).
2619 *
2620 * however, we also need to prevent a parent from freezing if a child is frozen.
2621 * for that reason, the parent inode of a frozen directory is auth_pinned.
2622 *
2623 * the oddity is when the frozen directory is a subtree root. if that's the case,
2624 * the parent inode isn't frozen. which means that when subtree authority is adjusted
2625 * at the bounds, inodes for any frozen bound directories need to get auth_pins at that
2626 * time.
2627 *
2628 */
2629
2630void CDir::auth_pin(void *by)
2631{
2632 if (auth_pins == 0)
2633 get(PIN_AUTHPIN);
2634 auth_pins++;
2635
2636#ifdef MDS_AUTHPIN_SET
2637 auth_pin_set.insert(by);
2638#endif
2639
2640 dout(10) << "auth_pin by " << by
2641 << " on " << *this
2642 << " count now " << auth_pins << " + " << nested_auth_pins << dendl;
2643
2644 // nest pins?
2645 if (!is_subtree_root() &&
2646 get_cum_auth_pins() == 1)
2647 inode->adjust_nested_auth_pins(1, by);
2648}
2649
2650void CDir::auth_unpin(void *by)
2651{
2652 auth_pins--;
2653
2654#ifdef MDS_AUTHPIN_SET
2655 assert(auth_pin_set.count(by));
2656 auth_pin_set.erase(auth_pin_set.find(by));
2657#endif
2658 if (auth_pins == 0)
2659 put(PIN_AUTHPIN);
2660
2661 dout(10) << "auth_unpin by " << by
2662 << " on " << *this
2663 << " count now " << auth_pins << " + " << nested_auth_pins << dendl;
2664 assert(auth_pins >= 0);
2665
2666 int newcum = get_cum_auth_pins();
2667
2668 maybe_finish_freeze(); // pending freeze?
2669
2670 // nest?
2671 if (!is_subtree_root() &&
2672 newcum == 0)
2673 inode->adjust_nested_auth_pins(-1, by);
2674}
2675
2676void CDir::adjust_nested_auth_pins(int inc, int dirinc, void *by)
2677{
2678 assert(inc);
2679 nested_auth_pins += inc;
2680 dir_auth_pins += dirinc;
2681
2682 dout(15) << "adjust_nested_auth_pins " << inc << "/" << dirinc << " on " << *this
2683 << " by " << by << " count now "
2684 << auth_pins << " + " << nested_auth_pins << dendl;
2685 assert(nested_auth_pins >= 0);
2686 assert(dir_auth_pins >= 0);
2687
2688 int newcum = get_cum_auth_pins();
2689
2690 maybe_finish_freeze(); // pending freeze?
2691
2692 // nest?
2693 if (!is_subtree_root()) {
2694 if (newcum == 0)
2695 inode->adjust_nested_auth_pins(-1, by);
2696 else if (newcum == inc)
2697 inode->adjust_nested_auth_pins(1, by);
2698 }
2699}
2700
2701#ifdef MDS_VERIFY_FRAGSTAT
2702void CDir::verify_fragstat()
2703{
2704 assert(is_complete());
2705 if (inode->is_stray())
2706 return;
2707
2708 frag_info_t c;
2709 memset(&c, 0, sizeof(c));
2710
2711 for (map_t::iterator it = items.begin();
2712 it != items.end();
2713 ++it) {
2714 CDentry *dn = it->second;
2715 if (dn->is_null())
2716 continue;
2717
2718 dout(10) << " " << *dn << dendl;
2719 if (dn->is_primary())
2720 dout(10) << " " << *dn->inode << dendl;
2721
2722 if (dn->is_primary()) {
2723 if (dn->inode->is_dir())
2724 c.nsubdirs++;
2725 else
2726 c.nfiles++;
2727 }
2728 if (dn->is_remote()) {
2729 if (dn->get_remote_d_type() == DT_DIR)
2730 c.nsubdirs++;
2731 else
2732 c.nfiles++;
2733 }
2734 }
2735
2736 if (c.nsubdirs != fnode.fragstat.nsubdirs ||
2737 c.nfiles != fnode.fragstat.nfiles) {
2738 dout(0) << "verify_fragstat failed " << fnode.fragstat << " on " << *this << dendl;
2739 dout(0) << " i count " << c << dendl;
2740 ceph_abort();
2741 } else {
2742 dout(0) << "verify_fragstat ok " << fnode.fragstat << " on " << *this << dendl;
2743 }
2744}
2745#endif
2746
2747/*****************************************************************************
2748 * FREEZING
2749 */
2750
2751// FREEZE TREE
2752
2753bool CDir::freeze_tree()
2754{
2755 assert(!is_frozen());
2756 assert(!is_freezing());
2757
2758 auth_pin(this);
2759 if (is_freezeable(true)) {
2760 _freeze_tree();
2761 auth_unpin(this);
2762 return true;
2763 } else {
2764 state_set(STATE_FREEZINGTREE);
2765 ++num_freezing_trees;
2766 dout(10) << "freeze_tree waiting " << *this << dendl;
2767 return false;
2768 }
2769}
2770
2771void CDir::_freeze_tree()
2772{
2773 dout(10) << "_freeze_tree " << *this << dendl;
2774 assert(is_freezeable(true));
2775
2776 // twiddle state
2777 if (state_test(STATE_FREEZINGTREE)) {
2778 state_clear(STATE_FREEZINGTREE); // actually, this may get set again by next context?
2779 --num_freezing_trees;
2780 }
224ce89b
WB
2781
2782 if (is_auth()) {
2783 mds_authority_t auth;
2784 bool was_subtree = is_subtree_root();
2785 if (was_subtree) {
2786 auth = get_dir_auth();
2787 } else {
2788 // temporarily prevent parent subtree from becoming frozen.
2789 inode->auth_pin(this);
2790 // create new subtree
2791 auth = authority();
2792 }
2793
2794 assert(auth.first >= 0);
2795 assert(auth.second == CDIR_AUTH_UNKNOWN);
2796 auth.second = auth.first;
2797 inode->mdcache->adjust_subtree_auth(this, auth);
2798 if (!was_subtree)
2799 inode->auth_unpin(this);
2800 }
2801
7c673cae
FG
2802 state_set(STATE_FROZENTREE);
2803 ++num_frozen_trees;
2804 get(PIN_FROZEN);
7c673cae
FG
2805}
2806
2807void CDir::unfreeze_tree()
2808{
2809 dout(10) << "unfreeze_tree " << *this << dendl;
2810
2811 if (state_test(STATE_FROZENTREE)) {
2812 // frozen. unfreeze.
2813 state_clear(STATE_FROZENTREE);
2814 --num_frozen_trees;
2815
2816 put(PIN_FROZEN);
2817
224ce89b
WB
2818 if (is_auth()) {
2819 // must be subtree
2820 assert(is_subtree_root());
2821 // for debug purpose, caller should ensure 'dir_auth.second == dir_auth.first'
2822 mds_authority_t auth = get_dir_auth();
2823 assert(auth.first >= 0);
2824 assert(auth.second == auth.first);
2825 auth.second = CDIR_AUTH_UNKNOWN;
2826 inode->mdcache->adjust_subtree_auth(this, auth);
2827 }
7c673cae
FG
2828
2829 // waiters?
2830 finish_waiting(WAIT_UNFREEZE);
2831 } else {
2832 finish_waiting(WAIT_FROZEN, -1);
2833
2834 // freezing. stop it.
2835 assert(state_test(STATE_FREEZINGTREE));
2836 state_clear(STATE_FREEZINGTREE);
2837 --num_freezing_trees;
2838 auth_unpin(this);
2839
2840 finish_waiting(WAIT_UNFREEZE);
2841 }
2842}
2843
2844bool CDir::is_freezing_tree() const
2845{
2846 if (num_freezing_trees == 0)
2847 return false;
2848 const CDir *dir = this;
2849 while (1) {
2850 if (dir->is_freezing_tree_root()) return true;
2851 if (dir->is_subtree_root()) return false;
2852 if (dir->inode->parent)
2853 dir = dir->inode->parent->dir;
2854 else
2855 return false; // root on replica
2856 }
2857}
2858
2859bool CDir::is_frozen_tree() const
2860{
2861 if (num_frozen_trees == 0)
2862 return false;
2863 const CDir *dir = this;
2864 while (1) {
2865 if (dir->is_frozen_tree_root()) return true;
2866 if (dir->is_subtree_root()) return false;
2867 if (dir->inode->parent)
2868 dir = dir->inode->parent->dir;
2869 else
2870 return false; // root on replica
2871 }
2872}
2873
2874CDir *CDir::get_frozen_tree_root()
2875{
2876 assert(is_frozen());
2877 CDir *dir = this;
2878 while (1) {
2879 if (dir->is_frozen_tree_root())
2880 return dir;
2881 if (dir->inode->parent)
2882 dir = dir->inode->parent->dir;
2883 else
2884 ceph_abort();
2885 }
2886}
2887
2888class C_Dir_AuthUnpin : public CDirContext {
2889 public:
2890 explicit C_Dir_AuthUnpin(CDir *d) : CDirContext(d) {}
2891 void finish(int r) override {
2892 dir->auth_unpin(dir->get_inode());
2893 }
2894};
2895
2896void CDir::maybe_finish_freeze()
2897{
2898 if (auth_pins != 1 || dir_auth_pins != 0)
2899 return;
2900
2901 // we can freeze the _dir_ even with nested pins...
2902 if (state_test(STATE_FREEZINGDIR)) {
2903 _freeze_dir();
2904 auth_unpin(this);
2905 finish_waiting(WAIT_FROZEN);
2906 }
2907
2908 if (nested_auth_pins != 0)
2909 return;
2910
2911 if (state_test(STATE_FREEZINGTREE)) {
2912 if (!is_subtree_root() && inode->is_frozen()) {
2913 dout(10) << "maybe_finish_freeze !subtree root and frozen inode, waiting for unfreeze on " << inode << dendl;
2914 // retake an auth_pin...
2915 auth_pin(inode);
2916 // and release it when the parent inode unfreezes
2917 inode->add_waiter(WAIT_UNFREEZE, new C_Dir_AuthUnpin(this));
2918 return;
2919 }
2920
2921 _freeze_tree();
2922 auth_unpin(this);
2923 finish_waiting(WAIT_FROZEN);
2924 }
2925}
2926
2927
2928
2929// FREEZE DIR
2930
2931bool CDir::freeze_dir()
2932{
2933 assert(!is_frozen());
2934 assert(!is_freezing());
2935
2936 auth_pin(this);
2937 if (is_freezeable_dir(true)) {
2938 _freeze_dir();
2939 auth_unpin(this);
2940 return true;
2941 } else {
2942 state_set(STATE_FREEZINGDIR);
2943 dout(10) << "freeze_dir + wait " << *this << dendl;
2944 return false;
2945 }
2946}
2947
2948void CDir::_freeze_dir()
2949{
2950 dout(10) << "_freeze_dir " << *this << dendl;
2951 //assert(is_freezeable_dir(true));
2952 // not always true during split because the original fragment may have frozen a while
2953 // ago and we're just now getting around to breaking it up.
2954
2955 state_clear(STATE_FREEZINGDIR);
2956 state_set(STATE_FROZENDIR);
2957 get(PIN_FROZEN);
2958
2959 if (is_auth() && !is_subtree_root())
2960 inode->auth_pin(this); // auth_pin for duration of freeze
2961}
2962
2963
2964void CDir::unfreeze_dir()
2965{
2966 dout(10) << "unfreeze_dir " << *this << dendl;
2967
2968 if (state_test(STATE_FROZENDIR)) {
2969 state_clear(STATE_FROZENDIR);
2970 put(PIN_FROZEN);
2971
2972 // unpin (may => FREEZEABLE) FIXME: is this order good?
2973 if (is_auth() && !is_subtree_root())
2974 inode->auth_unpin(this);
2975
2976 finish_waiting(WAIT_UNFREEZE);
2977 } else {
2978 finish_waiting(WAIT_FROZEN, -1);
2979
2980 // still freezing. stop.
2981 assert(state_test(STATE_FREEZINGDIR));
2982 state_clear(STATE_FREEZINGDIR);
2983 auth_unpin(this);
2984
2985 finish_waiting(WAIT_UNFREEZE);
2986 }
2987}
2988
2989/**
2990 * Slightly less complete than operator<<, because this is intended
2991 * for identifying a directory and its state rather than for dumping
2992 * debug output.
2993 */
2994void CDir::dump(Formatter *f) const
2995{
2996 assert(f != NULL);
2997
2998 f->dump_stream("path") << get_path();
2999
3000 f->dump_stream("dirfrag") << dirfrag();
3001 f->dump_int("snapid_first", first);
3002
3003 f->dump_stream("projected_version") << get_projected_version();
3004 f->dump_stream("version") << get_version();
3005 f->dump_stream("committing_version") << get_committing_version();
3006 f->dump_stream("committed_version") << get_committed_version();
3007
3008 f->dump_bool("is_rep", is_rep());
3009
3010 if (get_dir_auth() != CDIR_AUTH_DEFAULT) {
3011 if (get_dir_auth().second == CDIR_AUTH_UNKNOWN) {
3012 f->dump_stream("dir_auth") << get_dir_auth().first;
3013 } else {
3014 f->dump_stream("dir_auth") << get_dir_auth();
3015 }
3016 } else {
3017 f->dump_string("dir_auth", "");
3018 }
3019
3020 f->open_array_section("states");
3021 MDSCacheObject::dump_states(f);
3022 if (state_test(CDir::STATE_COMPLETE)) f->dump_string("state", "complete");
3023 if (state_test(CDir::STATE_FREEZINGTREE)) f->dump_string("state", "freezingtree");
3024 if (state_test(CDir::STATE_FROZENTREE)) f->dump_string("state", "frozentree");
3025 if (state_test(CDir::STATE_FROZENDIR)) f->dump_string("state", "frozendir");
3026 if (state_test(CDir::STATE_FREEZINGDIR)) f->dump_string("state", "freezingdir");
3027 if (state_test(CDir::STATE_EXPORTBOUND)) f->dump_string("state", "exportbound");
3028 if (state_test(CDir::STATE_IMPORTBOUND)) f->dump_string("state", "importbound");
3029 if (state_test(CDir::STATE_BADFRAG)) f->dump_string("state", "badfrag");
3030 f->close_section();
3031
3032 MDSCacheObject::dump(f);
3033}
3034
3035/****** Scrub Stuff *******/
3036
3037void CDir::scrub_info_create() const
3038{
3039 assert(!scrub_infop);
3040
3041 // break out of const-land to set up implicit initial state
3042 CDir *me = const_cast<CDir*>(this);
3043 fnode_t *fn = me->get_projected_fnode();
3044
3045 std::unique_ptr<scrub_info_t> si(new scrub_info_t());
3046
3047 si->last_recursive.version = si->recursive_start.version =
3048 fn->recursive_scrub_version;
3049 si->last_recursive.time = si->recursive_start.time =
3050 fn->recursive_scrub_stamp;
3051
3052 si->last_local.version = fn->localized_scrub_version;
3053 si->last_local.time = fn->localized_scrub_stamp;
3054
3055 me->scrub_infop.swap(si);
3056}
3057
3058void CDir::scrub_initialize(const ScrubHeaderRefConst& header)
3059{
3060 dout(20) << __func__ << dendl;
3061 assert(is_complete());
3062 assert(header != nullptr);
3063
3064 // FIXME: weird implicit construction, is someone else meant
3065 // to be calling scrub_info_create first?
3066 scrub_info();
3067 assert(scrub_infop && !scrub_infop->directory_scrubbing);
3068
3069 scrub_infop->recursive_start.version = get_projected_version();
3070 scrub_infop->recursive_start.time = ceph_clock_now();
3071
3072 scrub_infop->directories_to_scrub.clear();
3073 scrub_infop->directories_scrubbing.clear();
3074 scrub_infop->directories_scrubbed.clear();
3075 scrub_infop->others_to_scrub.clear();
3076 scrub_infop->others_scrubbing.clear();
3077 scrub_infop->others_scrubbed.clear();
3078
3079 for (map_t::iterator i = items.begin();
3080 i != items.end();
3081 ++i) {
3082 // TODO: handle snapshot scrubbing
3083 if (i->first.snapid != CEPH_NOSNAP)
3084 continue;
3085
3086 CDentry::linkage_t *dnl = i->second->get_projected_linkage();
3087 if (dnl->is_primary()) {
3088 if (dnl->get_inode()->is_dir())
3089 scrub_infop->directories_to_scrub.insert(i->first);
3090 else
3091 scrub_infop->others_to_scrub.insert(i->first);
3092 } else if (dnl->is_remote()) {
3093 // TODO: check remote linkage
3094 }
3095 }
3096 scrub_infop->directory_scrubbing = true;
3097 scrub_infop->header = header;
3098}
3099
3100void CDir::scrub_finished()
3101{
3102 dout(20) << __func__ << dendl;
3103 assert(scrub_infop && scrub_infop->directory_scrubbing);
3104
3105 assert(scrub_infop->directories_to_scrub.empty());
3106 assert(scrub_infop->directories_scrubbing.empty());
3107 scrub_infop->directories_scrubbed.clear();
3108 assert(scrub_infop->others_to_scrub.empty());
3109 assert(scrub_infop->others_scrubbing.empty());
3110 scrub_infop->others_scrubbed.clear();
3111 scrub_infop->directory_scrubbing = false;
3112
3113 scrub_infop->last_recursive = scrub_infop->recursive_start;
3114 scrub_infop->last_scrub_dirty = true;
3115}
3116
3117int CDir::_next_dentry_on_set(set<dentry_key_t>& dns, bool missing_okay,
3118 MDSInternalContext *cb, CDentry **dnout)
3119{
3120 dentry_key_t dnkey;
3121 CDentry *dn;
3122
3123 while (!dns.empty()) {
3124 set<dentry_key_t>::iterator front = dns.begin();
3125 dnkey = *front;
3126 dn = lookup(dnkey.name);
3127 if (!dn) {
3128 if (!is_complete() &&
3129 (!has_bloom() || is_in_bloom(dnkey.name))) {
3130 // need to re-read this dirfrag
3131 fetch(cb);
3132 return EAGAIN;
3133 }
3134 // okay, we lost it
3135 if (missing_okay) {
3136 dout(15) << " we no longer have directory dentry "
3137 << dnkey.name << ", assuming it got renamed" << dendl;
3138 dns.erase(dnkey);
3139 continue;
3140 } else {
3141 dout(5) << " we lost dentry " << dnkey.name
3142 << ", bailing out because that's impossible!" << dendl;
3143 ceph_abort();
3144 }
3145 }
3146 // okay, we got a dentry
3147 dns.erase(dnkey);
3148
3149 if (dn->get_projected_version() < scrub_infop->last_recursive.version &&
3150 !(scrub_infop->header->get_force())) {
3151 dout(15) << " skip dentry " << dnkey.name
3152 << ", no change since last scrub" << dendl;
3153 continue;
3154 }
3155
3156 *dnout = dn;
3157 return 0;
3158 }
3159 *dnout = NULL;
3160 return ENOENT;
3161}
3162
3163int CDir::scrub_dentry_next(MDSInternalContext *cb, CDentry **dnout)
3164{
3165 dout(20) << __func__ << dendl;
3166 assert(scrub_infop && scrub_infop->directory_scrubbing);
3167
3168 dout(20) << "trying to scrub directories underneath us" << dendl;
3169 int rval = _next_dentry_on_set(scrub_infop->directories_to_scrub, true,
3170 cb, dnout);
3171 if (rval == 0) {
3172 dout(20) << __func__ << " inserted to directories scrubbing: "
3173 << *dnout << dendl;
3174 scrub_infop->directories_scrubbing.insert((*dnout)->key());
3175 } else if (rval == EAGAIN) {
3176 // we don't need to do anything else
3177 } else { // we emptied out the directory scrub set
3178 assert(rval == ENOENT);
3179 dout(20) << "no directories left, moving on to other kinds of dentries"
3180 << dendl;
3181
3182 rval = _next_dentry_on_set(scrub_infop->others_to_scrub, false, cb, dnout);
3183 if (rval == 0) {
3184 dout(20) << __func__ << " inserted to others scrubbing: "
3185 << *dnout << dendl;
3186 scrub_infop->others_scrubbing.insert((*dnout)->key());
3187 }
3188 }
3189 dout(20) << " returning " << rval << " with dn=" << *dnout << dendl;
3190 return rval;
3191}
3192
3193void CDir::scrub_dentries_scrubbing(list<CDentry*> *out_dentries)
3194{
3195 dout(20) << __func__ << dendl;
3196 assert(scrub_infop && scrub_infop->directory_scrubbing);
3197
3198 for (set<dentry_key_t>::iterator i =
3199 scrub_infop->directories_scrubbing.begin();
3200 i != scrub_infop->directories_scrubbing.end();
3201 ++i) {
3202 CDentry *d = lookup(i->name, i->snapid);
3203 assert(d);
3204 out_dentries->push_back(d);
3205 }
3206 for (set<dentry_key_t>::iterator i = scrub_infop->others_scrubbing.begin();
3207 i != scrub_infop->others_scrubbing.end();
3208 ++i) {
3209 CDentry *d = lookup(i->name, i->snapid);
3210 assert(d);
3211 out_dentries->push_back(d);
3212 }
3213}
3214
3215void CDir::scrub_dentry_finished(CDentry *dn)
3216{
3217 dout(20) << __func__ << " on dn " << *dn << dendl;
3218 assert(scrub_infop && scrub_infop->directory_scrubbing);
3219 dentry_key_t dn_key = dn->key();
3220 if (scrub_infop->directories_scrubbing.erase(dn_key)) {
3221 scrub_infop->directories_scrubbed.insert(dn_key);
3222 } else {
3223 assert(scrub_infop->others_scrubbing.count(dn_key));
3224 scrub_infop->others_scrubbing.erase(dn_key);
3225 scrub_infop->others_scrubbed.insert(dn_key);
3226 }
3227}
3228
3229void CDir::scrub_maybe_delete_info()
3230{
3231 if (scrub_infop &&
3232 !scrub_infop->directory_scrubbing &&
3233 !scrub_infop->need_scrub_local &&
3234 !scrub_infop->last_scrub_dirty &&
3235 !scrub_infop->pending_scrub_error &&
3236 scrub_infop->dirty_scrub_stamps.empty()) {
3237 scrub_infop.reset();
3238 }
3239}
3240
3241bool CDir::scrub_local()
3242{
3243 assert(is_complete());
3244 bool rval = check_rstats(true);
3245
3246 scrub_info();
3247 if (rval) {
3248 scrub_infop->last_local.time = ceph_clock_now();
3249 scrub_infop->last_local.version = get_projected_version();
3250 scrub_infop->pending_scrub_error = false;
3251 scrub_infop->last_scrub_dirty = true;
3252 } else {
3253 scrub_infop->pending_scrub_error = true;
3254 if (scrub_infop->header->get_repair())
3255 cache->repair_dirfrag_stats(this);
3256 }
3257 return rval;
3258}
3259
3260std::string CDir::get_path() const
3261{
3262 std::string path;
3263 get_inode()->make_path_string(path, true);
3264 return path;
3265}
3266
3267bool CDir::should_split_fast() const
3268{
3269 // Max size a fragment can be before trigger fast splitting
3270 int fast_limit = g_conf->mds_bal_split_size * g_conf->mds_bal_fragment_fast_factor;
3271
3272 // Fast path: the sum of accounted size and null dentries does not
3273 // exceed threshold: we definitely are not over it.
3274 if (get_frag_size() + get_num_head_null() <= fast_limit) {
3275 return false;
3276 }
3277
3278 // Fast path: the accounted size of the frag exceeds threshold: we
3279 // definitely are over it
3280 if (get_frag_size() > fast_limit) {
3281 return true;
3282 }
3283
3284 int64_t effective_size = 0;
3285
3286 for (const auto &p : items) {
3287 const CDentry *dn = p.second;
3288 if (!dn->get_projected_linkage()->is_null()) {
3289 effective_size++;
3290 }
3291 }
3292
3293 return effective_size > fast_limit;
3294}
3295
181888fb 3296MEMPOOL_DEFINE_OBJECT_FACTORY(CDir, co_dir, mds_co);