]> git.proxmox.com Git - ceph.git/blame - ceph/src/mds/CDir.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / mds / CDir.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
16#include "include/types.h"
17
18#include "CDir.h"
19#include "CDentry.h"
20#include "CInode.h"
21#include "Mutation.h"
22
23#include "MDSMap.h"
24#include "MDSRank.h"
25#include "MDCache.h"
26#include "Locker.h"
27#include "MDLog.h"
28#include "LogSegment.h"
29
30#include "common/bloom_filter.hpp"
31#include "include/Context.h"
32#include "common/Clock.h"
33
34#include "osdc/Objecter.h"
35
36#include "common/config.h"
37#include "include/assert.h"
38#include "include/compat.h"
39
40#define dout_context g_ceph_context
41#define dout_subsys ceph_subsys_mds
42#undef dout_prefix
43#define dout_prefix *_dout << "mds." << cache->mds->get_nodeid() << ".cache.dir(" << this->dirfrag() << ") "
44
45int CDir::num_frozen_trees = 0;
46int CDir::num_freezing_trees = 0;
47
48class CDirContext : public MDSInternalContextBase
49{
50protected:
51 CDir *dir;
52 MDSRank* get_mds() override {return dir->cache->mds;}
53
54public:
55 explicit CDirContext(CDir *d) : dir(d) {
56 assert(dir != NULL);
57 }
58};
59
60
61class CDirIOContext : public MDSIOContextBase
62{
63protected:
64 CDir *dir;
65 MDSRank* get_mds() override {return dir->cache->mds;}
66
67public:
68 explicit CDirIOContext(CDir *d) : dir(d) {
69 assert(dir != NULL);
70 }
71};
72
73
74// PINS
75//int cdir_pins[CDIR_NUM_PINS] = { 0,0,0,0,0,0,0,0,0,0,0,0,0,0 };
76
77
78ostream& operator<<(ostream& out, const CDir& dir)
79{
80 out << "[dir " << dir.dirfrag() << " " << dir.get_path() << "/"
81 << " [" << dir.first << ",head]";
82 if (dir.is_auth()) {
83 out << " auth";
84 if (dir.is_replicated())
85 out << dir.get_replicas();
86
87 if (dir.is_projected())
88 out << " pv=" << dir.get_projected_version();
89 out << " v=" << dir.get_version();
90 out << " cv=" << dir.get_committing_version();
91 out << "/" << dir.get_committed_version();
92 } else {
93 mds_authority_t a = dir.authority();
94 out << " rep@" << a.first;
95 if (a.second != CDIR_AUTH_UNKNOWN)
96 out << "," << a.second;
97 out << "." << dir.get_replica_nonce();
98 }
99
100 if (dir.is_rep()) out << " REP";
101
102 if (dir.get_dir_auth() != CDIR_AUTH_DEFAULT) {
103 if (dir.get_dir_auth().second == CDIR_AUTH_UNKNOWN)
104 out << " dir_auth=" << dir.get_dir_auth().first;
105 else
106 out << " dir_auth=" << dir.get_dir_auth();
107 }
108
109 if (dir.get_cum_auth_pins())
110 out << " ap=" << dir.get_auth_pins()
111 << "+" << dir.get_dir_auth_pins()
112 << "+" << dir.get_nested_auth_pins();
113
114 out << " state=" << dir.get_state();
115 if (dir.state_test(CDir::STATE_COMPLETE)) out << "|complete";
116 if (dir.state_test(CDir::STATE_FREEZINGTREE)) out << "|freezingtree";
117 if (dir.state_test(CDir::STATE_FROZENTREE)) out << "|frozentree";
118 if (dir.state_test(CDir::STATE_AUXSUBTREE)) out << "|auxsubtree";
119 //if (dir.state_test(CDir::STATE_FROZENTREELEAF)) out << "|frozentreeleaf";
120 if (dir.state_test(CDir::STATE_FROZENDIR)) out << "|frozendir";
121 if (dir.state_test(CDir::STATE_FREEZINGDIR)) out << "|freezingdir";
122 if (dir.state_test(CDir::STATE_EXPORTBOUND)) out << "|exportbound";
123 if (dir.state_test(CDir::STATE_IMPORTBOUND)) out << "|importbound";
124 if (dir.state_test(CDir::STATE_BADFRAG)) out << "|badfrag";
125 if (dir.state_test(CDir::STATE_FRAGMENTING)) out << "|fragmenting";
126
127 // fragstat
128 out << " " << dir.fnode.fragstat;
129 if (!(dir.fnode.fragstat == dir.fnode.accounted_fragstat))
130 out << "/" << dir.fnode.accounted_fragstat;
131 if (g_conf->mds_debug_scatterstat && dir.is_projected()) {
132 const fnode_t *pf = dir.get_projected_fnode();
133 out << "->" << pf->fragstat;
134 if (!(pf->fragstat == pf->accounted_fragstat))
135 out << "/" << pf->accounted_fragstat;
136 }
137
138 // rstat
139 out << " " << dir.fnode.rstat;
140 if (!(dir.fnode.rstat == dir.fnode.accounted_rstat))
141 out << "/" << dir.fnode.accounted_rstat;
142 if (g_conf->mds_debug_scatterstat && dir.is_projected()) {
143 const fnode_t *pf = dir.get_projected_fnode();
144 out << "->" << pf->rstat;
145 if (!(pf->rstat == pf->accounted_rstat))
146 out << "/" << pf->accounted_rstat;
147 }
148
149 out << " hs=" << dir.get_num_head_items() << "+" << dir.get_num_head_null();
150 out << ",ss=" << dir.get_num_snap_items() << "+" << dir.get_num_snap_null();
151 if (dir.get_num_dirty())
152 out << " dirty=" << dir.get_num_dirty();
153
154 if (dir.get_num_ref()) {
155 out << " |";
156 dir.print_pin_set(out);
157 }
158
159 out << " " << &dir;
160 return out << "]";
161}
162
163
164void CDir::print(ostream& out)
165{
166 out << *this;
167}
168
169
170
171
172ostream& CDir::print_db_line_prefix(ostream& out)
173{
174 return out << ceph_clock_now() << " mds." << cache->mds->get_nodeid() << ".cache.dir(" << this->dirfrag() << ") ";
175}
176
177
178
179// -------------------------------------------------------------------
180// CDir
181
182CDir::CDir(CInode *in, frag_t fg, MDCache *mdcache, bool auth) :
183 cache(mdcache), inode(in), frag(fg),
184 first(2),
185 dirty_rstat_inodes(member_offset(CInode, dirty_rstat_item)),
186 projected_version(0), item_dirty(this), item_new(this),
187 num_head_items(0), num_head_null(0),
188 num_snap_items(0), num_snap_null(0),
189 num_dirty(0), committing_version(0), committed_version(0),
190 dir_auth_pins(0), request_pins(0),
191 dir_rep(REP_NONE),
192 pop_me(ceph_clock_now()),
193 pop_nested(ceph_clock_now()),
194 pop_auth_subtree(ceph_clock_now()),
195 pop_auth_subtree_nested(ceph_clock_now()),
196 num_dentries_nested(0), num_dentries_auth_subtree(0),
197 num_dentries_auth_subtree_nested(0),
198 dir_auth(CDIR_AUTH_DEFAULT)
199{
200 state = STATE_INITIAL;
201
202 memset(&fnode, 0, sizeof(fnode));
203
204 // auth
205 assert(in->is_dir());
206 if (auth)
207 state |= STATE_AUTH;
208}
209
210/**
211 * Check the recursive statistics on size for consistency.
212 * If mds_debug_scatterstat is enabled, assert for correctness,
213 * otherwise just print out the mismatch and continue.
214 */
215bool CDir::check_rstats(bool scrub)
216{
217 if (!g_conf->mds_debug_scatterstat && !scrub)
218 return true;
219
220 dout(25) << "check_rstats on " << this << dendl;
221 if (!is_complete() || !is_auth() || is_frozen()) {
222 assert(!scrub);
223 dout(10) << "check_rstats bailing out -- incomplete or non-auth or frozen dir!" << dendl;
224 return true;
225 }
226
227 frag_info_t frag_info;
228 nest_info_t nest_info;
229 for (map_t::iterator i = items.begin(); i != items.end(); ++i) {
230 if (i->second->last != CEPH_NOSNAP)
231 continue;
232 CDentry::linkage_t *dnl = i->second->get_linkage();
233 if (dnl->is_primary()) {
234 CInode *in = dnl->get_inode();
235 nest_info.add(in->inode.accounted_rstat);
236 if (in->is_dir())
237 frag_info.nsubdirs++;
238 else
239 frag_info.nfiles++;
240 } else if (dnl->is_remote())
241 frag_info.nfiles++;
242 }
243
244 bool good = true;
245 // fragstat
246 if(!frag_info.same_sums(fnode.fragstat)) {
247 dout(1) << "mismatch between head items and fnode.fragstat! printing dentries" << dendl;
248 dout(1) << "get_num_head_items() = " << get_num_head_items()
249 << "; fnode.fragstat.nfiles=" << fnode.fragstat.nfiles
250 << " fnode.fragstat.nsubdirs=" << fnode.fragstat.nsubdirs << dendl;
251 good = false;
252 } else {
253 dout(20) << "get_num_head_items() = " << get_num_head_items()
254 << "; fnode.fragstat.nfiles=" << fnode.fragstat.nfiles
255 << " fnode.fragstat.nsubdirs=" << fnode.fragstat.nsubdirs << dendl;
256 }
257
258 // rstat
259 if (!nest_info.same_sums(fnode.rstat)) {
260 dout(1) << "mismatch between child accounted_rstats and my rstats!" << dendl;
261 dout(1) << "total of child dentrys: " << nest_info << dendl;
262 dout(1) << "my rstats: " << fnode.rstat << dendl;
263 good = false;
264 } else {
265 dout(20) << "total of child dentrys: " << nest_info << dendl;
266 dout(20) << "my rstats: " << fnode.rstat << dendl;
267 }
268
269 if (!good) {
270 if (!scrub) {
271 for (map_t::iterator i = items.begin(); i != items.end(); ++i) {
272 CDentry *dn = i->second;
273 if (dn->get_linkage()->is_primary()) {
274 CInode *in = dn->get_linkage()->inode;
275 dout(1) << *dn << " rstat " << in->inode.accounted_rstat << dendl;
276 } else {
277 dout(1) << *dn << dendl;
278 }
279 }
280
281 assert(frag_info.nfiles == fnode.fragstat.nfiles);
282 assert(frag_info.nsubdirs == fnode.fragstat.nsubdirs);
283 assert(nest_info.rbytes == fnode.rstat.rbytes);
284 assert(nest_info.rfiles == fnode.rstat.rfiles);
285 assert(nest_info.rsubdirs == fnode.rstat.rsubdirs);
286 }
287 }
288 dout(10) << "check_rstats complete on " << this << dendl;
289 return good;
290}
291
292CDentry *CDir::lookup(const string& name, snapid_t snap)
293{
294 dout(20) << "lookup (" << snap << ", '" << name << "')" << dendl;
295 map_t::iterator iter = items.lower_bound(dentry_key_t(snap, name.c_str(),
296 inode->hash_dentry_name(name)));
297 if (iter == items.end())
298 return 0;
299 if (iter->second->name == name &&
300 iter->second->first <= snap &&
301 iter->second->last >= snap) {
302 dout(20) << " hit -> " << iter->first << dendl;
303 return iter->second;
304 }
305 dout(20) << " miss -> " << iter->first << dendl;
306 return 0;
307}
308
309CDentry *CDir::lookup_exact_snap(const string& name, snapid_t last) {
310 map_t::iterator p = items.find(dentry_key_t(last, name.c_str(),
311 inode->hash_dentry_name(name)));
312 if (p == items.end())
313 return NULL;
314 return p->second;
315}
316
317/***
318 * linking fun
319 */
320
321CDentry* CDir::add_null_dentry(const string& dname,
322 snapid_t first, snapid_t last)
323{
324 // foreign
325 assert(lookup_exact_snap(dname, last) == 0);
326
327 // create dentry
328 CDentry* dn = new CDentry(dname, inode->hash_dentry_name(dname), first, last);
329 if (is_auth())
330 dn->state_set(CDentry::STATE_AUTH);
331 cache->lru.lru_insert_mid(dn);
332
333 dn->dir = this;
334 dn->version = get_projected_version();
335
336 // add to dir
337 assert(items.count(dn->key()) == 0);
338 //assert(null_items.count(dn->name) == 0);
339
340 items[dn->key()] = dn;
341 if (last == CEPH_NOSNAP)
342 num_head_null++;
343 else
344 num_snap_null++;
345
346 if (state_test(CDir::STATE_DNPINNEDFRAG)) {
347 dn->get(CDentry::PIN_FRAGMENTING);
348 dn->state_set(CDentry::STATE_FRAGMENTING);
349 }
350
351 dout(12) << "add_null_dentry " << *dn << dendl;
352
353 // pin?
354 if (get_num_any() == 1)
355 get(PIN_CHILD);
356
357 assert(get_num_any() == items.size());
358 return dn;
359}
360
361
362CDentry* CDir::add_primary_dentry(const string& dname, CInode *in,
363 snapid_t first, snapid_t last)
364{
365 // primary
366 assert(lookup_exact_snap(dname, last) == 0);
367
368 // create dentry
369 CDentry* dn = new CDentry(dname, inode->hash_dentry_name(dname), first, last);
370 if (is_auth())
371 dn->state_set(CDentry::STATE_AUTH);
372 cache->lru.lru_insert_mid(dn);
373
374 dn->dir = this;
375 dn->version = get_projected_version();
376
377 // add to dir
378 assert(items.count(dn->key()) == 0);
379 //assert(null_items.count(dn->name) == 0);
380
381 items[dn->key()] = dn;
382
383 dn->get_linkage()->inode = in;
384 in->set_primary_parent(dn);
385
386 link_inode_work(dn, in);
387
388 if (dn->last == CEPH_NOSNAP)
389 num_head_items++;
390 else
391 num_snap_items++;
392
393 if (state_test(CDir::STATE_DNPINNEDFRAG)) {
394 dn->get(CDentry::PIN_FRAGMENTING);
395 dn->state_set(CDentry::STATE_FRAGMENTING);
396 }
397
398 dout(12) << "add_primary_dentry " << *dn << dendl;
399
400 // pin?
401 if (get_num_any() == 1)
402 get(PIN_CHILD);
403 assert(get_num_any() == items.size());
404 return dn;
405}
406
407CDentry* CDir::add_remote_dentry(const string& dname, inodeno_t ino, unsigned char d_type,
408 snapid_t first, snapid_t last)
409{
410 // foreign
411 assert(lookup_exact_snap(dname, last) == 0);
412
413 // create dentry
414 CDentry* dn = new CDentry(dname, inode->hash_dentry_name(dname), ino, d_type, first, last);
415 if (is_auth())
416 dn->state_set(CDentry::STATE_AUTH);
417 cache->lru.lru_insert_mid(dn);
418
419 dn->dir = this;
420 dn->version = get_projected_version();
421
422 // add to dir
423 assert(items.count(dn->key()) == 0);
424 //assert(null_items.count(dn->name) == 0);
425
426 items[dn->key()] = dn;
427 if (last == CEPH_NOSNAP)
428 num_head_items++;
429 else
430 num_snap_items++;
431
432 if (state_test(CDir::STATE_DNPINNEDFRAG)) {
433 dn->get(CDentry::PIN_FRAGMENTING);
434 dn->state_set(CDentry::STATE_FRAGMENTING);
435 }
436
437 dout(12) << "add_remote_dentry " << *dn << dendl;
438
439 // pin?
440 if (get_num_any() == 1)
441 get(PIN_CHILD);
442
443 assert(get_num_any() == items.size());
444 return dn;
445}
446
447
448
449void CDir::remove_dentry(CDentry *dn)
450{
451 dout(12) << "remove_dentry " << *dn << dendl;
452
453 // there should be no client leases at this point!
454 assert(dn->client_lease_map.empty());
455
456 if (state_test(CDir::STATE_DNPINNEDFRAG)) {
457 dn->put(CDentry::PIN_FRAGMENTING);
458 dn->state_clear(CDentry::STATE_FRAGMENTING);
459 }
460
461 if (dn->get_linkage()->is_null()) {
462 if (dn->last == CEPH_NOSNAP)
463 num_head_null--;
464 else
465 num_snap_null--;
466 } else {
467 if (dn->last == CEPH_NOSNAP)
468 num_head_items--;
469 else
470 num_snap_items--;
471 }
472
473 if (!dn->get_linkage()->is_null())
474 // detach inode and dentry
475 unlink_inode_work(dn);
476
477 // remove from list
478 assert(items.count(dn->key()) == 1);
479 items.erase(dn->key());
480
481 // clean?
482 if (dn->is_dirty())
483 dn->mark_clean();
484
485 cache->lru.lru_remove(dn);
486 delete dn;
487
488 // unpin?
489 if (get_num_any() == 0)
490 put(PIN_CHILD);
491 assert(get_num_any() == items.size());
492}
493
494void CDir::link_remote_inode(CDentry *dn, CInode *in)
495{
496 link_remote_inode(dn, in->ino(), IFTODT(in->get_projected_inode()->mode));
497}
498
499void CDir::link_remote_inode(CDentry *dn, inodeno_t ino, unsigned char d_type)
500{
501 dout(12) << "link_remote_inode " << *dn << " remote " << ino << dendl;
502 assert(dn->get_linkage()->is_null());
503
504 dn->get_linkage()->set_remote(ino, d_type);
505
506 if (dn->last == CEPH_NOSNAP) {
507 num_head_items++;
508 num_head_null--;
509 } else {
510 num_snap_items++;
511 num_snap_null--;
512 }
513 assert(get_num_any() == items.size());
514}
515
516void CDir::link_primary_inode(CDentry *dn, CInode *in)
517{
518 dout(12) << "link_primary_inode " << *dn << " " << *in << dendl;
519 assert(dn->get_linkage()->is_null());
520
521 dn->get_linkage()->inode = in;
522 in->set_primary_parent(dn);
523
524 link_inode_work(dn, in);
525
526 if (dn->last == CEPH_NOSNAP) {
527 num_head_items++;
528 num_head_null--;
529 } else {
530 num_snap_items++;
531 num_snap_null--;
532 }
533
534 assert(get_num_any() == items.size());
535}
536
537void CDir::link_inode_work( CDentry *dn, CInode *in)
538{
539 assert(dn->get_linkage()->get_inode() == in);
540 assert(in->get_parent_dn() == dn);
541
542 // set inode version
543 //in->inode.version = dn->get_version();
544
545 // pin dentry?
546 if (in->get_num_ref())
547 dn->get(CDentry::PIN_INODEPIN);
548
549 // adjust auth pin count
550 if (in->auth_pins + in->nested_auth_pins)
551 dn->adjust_nested_auth_pins(in->auth_pins + in->nested_auth_pins, in->auth_pins, NULL);
552
553 // verify open snaprealm parent
554 if (in->snaprealm)
555 in->snaprealm->adjust_parent();
556 else if (in->is_any_caps())
557 in->move_to_realm(inode->find_snaprealm());
558}
559
560void CDir::unlink_inode(CDentry *dn)
561{
562 if (dn->get_linkage()->is_primary()) {
563 dout(12) << "unlink_inode " << *dn << " " << *dn->get_linkage()->get_inode() << dendl;
564 } else {
565 dout(12) << "unlink_inode " << *dn << dendl;
566 }
567
568 unlink_inode_work(dn);
569
570 if (dn->last == CEPH_NOSNAP) {
571 num_head_items--;
572 num_head_null++;
573 } else {
574 num_snap_items--;
575 num_snap_null++;
576 }
577 assert(get_num_any() == items.size());
578}
579
580
581void CDir::try_remove_unlinked_dn(CDentry *dn)
582{
583 assert(dn->dir == this);
584 assert(dn->get_linkage()->is_null());
585
586 // no pins (besides dirty)?
587 if (dn->get_num_ref() != dn->is_dirty())
588 return;
589
590 // was the dn new?
591 if (dn->is_new()) {
592 dout(10) << "try_remove_unlinked_dn " << *dn << " in " << *this << dendl;
593 if (dn->is_dirty())
594 dn->mark_clean();
595 remove_dentry(dn);
596
597 // NOTE: we may not have any more dirty dentries, but the fnode
598 // still changed, so the directory must remain dirty.
599 }
600}
601
602
603void CDir::unlink_inode_work( CDentry *dn )
604{
605 CInode *in = dn->get_linkage()->get_inode();
606
607 if (dn->get_linkage()->is_remote()) {
608 // remote
609 if (in)
610 dn->unlink_remote(dn->get_linkage());
611
612 dn->get_linkage()->set_remote(0, 0);
613 } else if (dn->get_linkage()->is_primary()) {
614 // primary
615 // unpin dentry?
616 if (in->get_num_ref())
617 dn->put(CDentry::PIN_INODEPIN);
618
619 // unlink auth_pin count
620 if (in->auth_pins + in->nested_auth_pins)
621 dn->adjust_nested_auth_pins(0 - (in->auth_pins + in->nested_auth_pins), 0 - in->auth_pins, NULL);
622
623 // detach inode
624 in->remove_primary_parent(dn);
625 dn->get_linkage()->inode = 0;
626 } else {
627 assert(!dn->get_linkage()->is_null());
628 }
629}
630
631void CDir::add_to_bloom(CDentry *dn)
632{
633 assert(dn->last == CEPH_NOSNAP);
634 if (!bloom) {
635 /* not create bloom filter for incomplete dir that was added by log replay */
636 if (!is_complete())
637 return;
638
639 /* don't maintain bloom filters in standby replay (saves cycles, and also
640 * avoids need to implement clearing it in EExport for #16924) */
641 if (cache->mds->is_standby_replay()) {
642 return;
643 }
644
645 unsigned size = get_num_head_items() + get_num_snap_items();
646 if (size < 100) size = 100;
647 bloom.reset(new bloom_filter(size, 1.0 / size, 0));
648 }
649 /* This size and false positive probability is completely random.*/
650 bloom->insert(dn->name.c_str(), dn->name.size());
651}
652
653bool CDir::is_in_bloom(const string& name)
654{
655 if (!bloom)
656 return false;
657 return bloom->contains(name.c_str(), name.size());
658}
659
660void CDir::remove_null_dentries() {
661 dout(12) << "remove_null_dentries " << *this << dendl;
662
663 CDir::map_t::iterator p = items.begin();
664 while (p != items.end()) {
665 CDentry *dn = p->second;
666 ++p;
667 if (dn->get_linkage()->is_null() && !dn->is_projected())
668 remove_dentry(dn);
669 }
670
671 assert(num_snap_null == 0);
672 assert(num_head_null == 0);
673 assert(get_num_any() == items.size());
674}
675
676/** remove dirty null dentries for deleted directory. the dirfrag will be
677 * deleted soon, so it's safe to not commit dirty dentries.
678 *
679 * This is called when a directory is being deleted, a prerequisite
680 * of which is that its children have been unlinked: we expect to only see
681 * null, unprojected dentries here.
682 */
683void CDir::try_remove_dentries_for_stray()
684{
685 dout(10) << __func__ << dendl;
686 assert(inode->inode.nlink == 0);
687
688 // clear dirty only when the directory was not snapshotted
689 bool clear_dirty = !inode->snaprealm;
690
691 CDir::map_t::iterator p = items.begin();
692 while (p != items.end()) {
693 CDentry *dn = p->second;
694 ++p;
695 if (dn->last == CEPH_NOSNAP) {
696 assert(!dn->is_projected());
697 assert(dn->get_linkage()->is_null());
698 if (clear_dirty && dn->is_dirty())
699 dn->mark_clean();
700 // It's OK to remove lease prematurely because we will never link
701 // the dentry to inode again.
702 if (dn->is_any_leases())
703 dn->remove_client_leases(cache->mds->locker);
704 if (dn->get_num_ref() == 0)
705 remove_dentry(dn);
706 } else {
707 assert(!dn->is_projected());
708 CDentry::linkage_t *dnl= dn->get_linkage();
709 CInode *in = NULL;
710 if (dnl->is_primary()) {
711 in = dnl->get_inode();
712 if (clear_dirty && in->is_dirty())
713 in->mark_clean();
714 }
715 if (clear_dirty && dn->is_dirty())
716 dn->mark_clean();
717 if (dn->get_num_ref() == 0) {
718 remove_dentry(dn);
719 if (in)
720 cache->remove_inode(in);
721 }
722 }
723 }
724
725 if (clear_dirty && is_dirty())
726 mark_clean();
727}
728
729void CDir::touch_dentries_bottom() {
730 dout(12) << "touch_dentries_bottom " << *this << dendl;
731
732 for (CDir::map_t::iterator p = items.begin();
733 p != items.end();
734 ++p)
735 inode->mdcache->touch_dentry_bottom(p->second);
736}
737
738bool CDir::try_trim_snap_dentry(CDentry *dn, const set<snapid_t>& snaps)
739{
740 assert(dn->last != CEPH_NOSNAP);
741 set<snapid_t>::const_iterator p = snaps.lower_bound(dn->first);
742 CDentry::linkage_t *dnl= dn->get_linkage();
743 CInode *in = 0;
744 if (dnl->is_primary())
745 in = dnl->get_inode();
746 if ((p == snaps.end() || *p > dn->last) &&
747 (dn->get_num_ref() == dn->is_dirty()) &&
748 (!in || in->get_num_ref() == in->is_dirty())) {
749 dout(10) << " purging snapped " << *dn << dendl;
750 if (in && in->is_dirty())
751 in->mark_clean();
752 remove_dentry(dn);
753 if (in) {
754 dout(10) << " purging snapped " << *in << dendl;
755 cache->remove_inode(in);
756 }
757 return true;
758 }
759 return false;
760}
761
762
763void CDir::purge_stale_snap_data(const set<snapid_t>& snaps)
764{
765 dout(10) << "purge_stale_snap_data " << snaps << dendl;
766
767 CDir::map_t::iterator p = items.begin();
768 while (p != items.end()) {
769 CDentry *dn = p->second;
770 ++p;
771
772 if (dn->last == CEPH_NOSNAP)
773 continue;
774
775 try_trim_snap_dentry(dn, snaps);
776 }
777}
778
779
780/**
781 * steal_dentry -- semi-violently move a dentry from one CDir to another
782 * (*) violently, in that nitems, most pins, etc. are not correctly maintained
783 * on the old CDir corpse; must call finish_old_fragment() when finished.
784 */
785void CDir::steal_dentry(CDentry *dn)
786{
787 dout(15) << "steal_dentry " << *dn << dendl;
788
789 items[dn->key()] = dn;
790
791 dn->dir->items.erase(dn->key());
792 if (dn->dir->items.empty())
793 dn->dir->put(PIN_CHILD);
794
795 if (get_num_any() == 0)
796 get(PIN_CHILD);
797 if (dn->get_linkage()->is_null()) {
798 if (dn->last == CEPH_NOSNAP)
799 num_head_null++;
800 else
801 num_snap_null++;
802 } else if (dn->last == CEPH_NOSNAP) {
803 num_head_items++;
804
805 if (dn->get_linkage()->is_primary()) {
806 CInode *in = dn->get_linkage()->get_inode();
807 inode_t *pi = in->get_projected_inode();
808 if (dn->get_linkage()->get_inode()->is_dir())
809 fnode.fragstat.nsubdirs++;
810 else
811 fnode.fragstat.nfiles++;
812 fnode.rstat.rbytes += pi->accounted_rstat.rbytes;
813 fnode.rstat.rfiles += pi->accounted_rstat.rfiles;
814 fnode.rstat.rsubdirs += pi->accounted_rstat.rsubdirs;
815 fnode.rstat.rsnaprealms += pi->accounted_rstat.rsnaprealms;
816 if (pi->accounted_rstat.rctime > fnode.rstat.rctime)
817 fnode.rstat.rctime = pi->accounted_rstat.rctime;
818
819 // move dirty inode rstat to new dirfrag
820 if (in->is_dirty_rstat())
821 dirty_rstat_inodes.push_back(&in->dirty_rstat_item);
822 } else if (dn->get_linkage()->is_remote()) {
823 if (dn->get_linkage()->get_remote_d_type() == DT_DIR)
824 fnode.fragstat.nsubdirs++;
825 else
826 fnode.fragstat.nfiles++;
827 }
828 } else {
829 num_snap_items++;
830 if (dn->get_linkage()->is_primary()) {
831 CInode *in = dn->get_linkage()->get_inode();
832 if (in->is_dirty_rstat())
833 dirty_rstat_inodes.push_back(&in->dirty_rstat_item);
834 }
835 }
836
837 if (dn->auth_pins || dn->nested_auth_pins) {
838 // use the helpers here to maintain the auth_pin invariants on the dir inode
839 int ap = dn->get_num_auth_pins() + dn->get_num_nested_auth_pins();
840 int dap = dn->get_num_dir_auth_pins();
841 assert(dap <= ap);
842 adjust_nested_auth_pins(ap, dap, NULL);
843 dn->dir->adjust_nested_auth_pins(-ap, -dap, NULL);
844 }
845
846 if (dn->is_dirty())
847 num_dirty++;
848
849 dn->dir = this;
850}
851
852void CDir::prepare_old_fragment(bool replay)
853{
854 // auth_pin old fragment for duration so that any auth_pinning
855 // during the dentry migration doesn't trigger side effects
856 if (!replay && is_auth())
857 auth_pin(this);
858}
859
860void CDir::prepare_new_fragment(bool replay)
861{
862 if (!replay && is_auth()) {
863 _freeze_dir();
864 mark_complete();
865 }
866}
867
868void CDir::finish_old_fragment(list<MDSInternalContextBase*>& waiters, bool replay)
869{
870 // take waiters _before_ unfreeze...
871 if (!replay) {
872 take_waiting(WAIT_ANY_MASK, waiters);
873 if (is_auth()) {
874 auth_unpin(this); // pinned in prepare_old_fragment
875 assert(is_frozen_dir());
876 unfreeze_dir();
877 }
878 }
879
880 assert(nested_auth_pins == 0);
881 assert(dir_auth_pins == 0);
882 assert(auth_pins == 0);
883
884 num_head_items = num_head_null = 0;
885 num_snap_items = num_snap_null = 0;
886
887 // this mirrors init_fragment_pins()
888 if (is_auth())
889 clear_replica_map();
890 if (is_dirty())
891 mark_clean();
892 if (state_test(STATE_IMPORTBOUND))
893 put(PIN_IMPORTBOUND);
894 if (state_test(STATE_EXPORTBOUND))
895 put(PIN_EXPORTBOUND);
896 if (is_subtree_root())
897 put(PIN_SUBTREE);
898
899 if (auth_pins > 0)
900 put(PIN_AUTHPIN);
901
902 assert(get_num_ref() == (state_test(STATE_STICKY) ? 1:0));
903}
904
905void CDir::init_fragment_pins()
906{
907 if (!replica_map.empty())
908 get(PIN_REPLICATED);
909 if (state_test(STATE_DIRTY))
910 get(PIN_DIRTY);
911 if (state_test(STATE_EXPORTBOUND))
912 get(PIN_EXPORTBOUND);
913 if (state_test(STATE_IMPORTBOUND))
914 get(PIN_IMPORTBOUND);
915 if (is_subtree_root())
916 get(PIN_SUBTREE);
917}
918
919void CDir::split(int bits, list<CDir*>& subs, list<MDSInternalContextBase*>& waiters, bool replay)
920{
921 dout(10) << "split by " << bits << " bits on " << *this << dendl;
922
923 assert(replay || is_complete() || !is_auth());
924
925 list<frag_t> frags;
926 frag.split(bits, frags);
927
928 vector<CDir*> subfrags(1 << bits);
929
930 double fac = 1.0 / (double)(1 << bits); // for scaling load vecs
931
932 version_t rstat_version = inode->get_projected_inode()->rstat.version;
933 version_t dirstat_version = inode->get_projected_inode()->dirstat.version;
934
935 nest_info_t rstatdiff;
936 frag_info_t fragstatdiff;
937 if (fnode.accounted_rstat.version == rstat_version)
938 rstatdiff.add_delta(fnode.accounted_rstat, fnode.rstat);
939 if (fnode.accounted_fragstat.version == dirstat_version)
940 fragstatdiff.add_delta(fnode.accounted_fragstat, fnode.fragstat);
941 dout(10) << " rstatdiff " << rstatdiff << " fragstatdiff " << fragstatdiff << dendl;
942
943 prepare_old_fragment(replay);
944
945 // create subfrag dirs
946 int n = 0;
947 for (list<frag_t>::iterator p = frags.begin(); p != frags.end(); ++p) {
948 CDir *f = new CDir(inode, *p, cache, is_auth());
949 f->state_set(state & (MASK_STATE_FRAGMENT_KEPT | STATE_COMPLETE));
950 f->replica_map = replica_map;
951 f->dir_auth = dir_auth;
952 f->init_fragment_pins();
953 f->set_version(get_version());
954
955 f->pop_me = pop_me;
956 f->pop_me.scale(fac);
957
958 // FIXME; this is an approximation
959 f->pop_nested = pop_nested;
960 f->pop_nested.scale(fac);
961 f->pop_auth_subtree = pop_auth_subtree;
962 f->pop_auth_subtree.scale(fac);
963 f->pop_auth_subtree_nested = pop_auth_subtree_nested;
964 f->pop_auth_subtree_nested.scale(fac);
965
966 dout(10) << " subfrag " << *p << " " << *f << dendl;
967 subfrags[n++] = f;
968 subs.push_back(f);
969 inode->add_dirfrag(f);
970
971 f->set_dir_auth(get_dir_auth());
972 f->prepare_new_fragment(replay);
973 }
974
975 // repartition dentries
976 while (!items.empty()) {
977 CDir::map_t::iterator p = items.begin();
978
979 CDentry *dn = p->second;
980 frag_t subfrag = inode->pick_dirfrag(dn->name);
981 int n = (subfrag.value() & (subfrag.mask() ^ frag.mask())) >> subfrag.mask_shift();
982 dout(15) << " subfrag " << subfrag << " n=" << n << " for " << p->first << dendl;
983 CDir *f = subfrags[n];
984 f->steal_dentry(dn);
985 }
986
987 // FIXME: handle dirty old rstat
988
989 // fix up new frag fragstats
990 for (int i=0; i<n; i++) {
991 CDir *f = subfrags[i];
992 f->fnode.rstat.version = rstat_version;
993 f->fnode.accounted_rstat = f->fnode.rstat;
994 f->fnode.fragstat.version = dirstat_version;
995 f->fnode.accounted_fragstat = f->fnode.fragstat;
996 dout(10) << " rstat " << f->fnode.rstat << " fragstat " << f->fnode.fragstat
997 << " on " << *f << dendl;
998 }
999
1000 // give any outstanding frag stat differential to first frag
1001 dout(10) << " giving rstatdiff " << rstatdiff << " fragstatdiff" << fragstatdiff
1002 << " to " << *subfrags[0] << dendl;
1003 subfrags[0]->fnode.accounted_rstat.add(rstatdiff);
1004 subfrags[0]->fnode.accounted_fragstat.add(fragstatdiff);
1005
1006 finish_old_fragment(waiters, replay);
1007}
1008
1009void CDir::merge(list<CDir*>& subs, list<MDSInternalContextBase*>& waiters, bool replay)
1010{
1011 dout(10) << "merge " << subs << dendl;
1012
1013 mds_authority_t new_auth = CDIR_AUTH_DEFAULT;
1014 for (auto dir : subs) {
1015 if (dir->get_dir_auth() != CDIR_AUTH_DEFAULT &&
1016 dir->get_dir_auth() != new_auth) {
1017 assert(new_auth == CDIR_AUTH_DEFAULT);
1018 new_auth = dir->get_dir_auth();
1019 }
1020 }
1021
1022 set_dir_auth(new_auth);
1023 prepare_new_fragment(replay);
1024
1025 nest_info_t rstatdiff;
1026 frag_info_t fragstatdiff;
1027 bool touched_mtime, touched_chattr;
1028 version_t rstat_version = inode->get_projected_inode()->rstat.version;
1029 version_t dirstat_version = inode->get_projected_inode()->dirstat.version;
1030
1031 for (auto dir : subs) {
1032 dout(10) << " subfrag " << dir->get_frag() << " " << *dir << dendl;
1033 assert(!dir->is_auth() || dir->is_complete() || replay);
1034
1035 if (dir->fnode.accounted_rstat.version == rstat_version)
1036 rstatdiff.add_delta(dir->fnode.accounted_rstat, dir->fnode.rstat);
1037 if (dir->fnode.accounted_fragstat.version == dirstat_version)
1038 fragstatdiff.add_delta(dir->fnode.accounted_fragstat, dir->fnode.fragstat,
1039 &touched_mtime, &touched_chattr);
1040
1041 dir->prepare_old_fragment(replay);
1042
1043 // steal dentries
1044 while (!dir->items.empty())
1045 steal_dentry(dir->items.begin()->second);
1046
1047 // merge replica map
1048 for (compact_map<mds_rank_t,unsigned>::iterator p = dir->replicas_begin();
1049 p != dir->replicas_end();
1050 ++p) {
1051 unsigned cur = replica_map[p->first];
1052 if (p->second > cur)
1053 replica_map[p->first] = p->second;
1054 }
1055
1056 // merge version
1057 if (dir->get_version() > get_version())
1058 set_version(dir->get_version());
1059
1060 // merge state
1061 state_set(dir->get_state() & MASK_STATE_FRAGMENT_KEPT);
1062 dir_auth = dir->dir_auth;
1063
1064 dir->finish_old_fragment(waiters, replay);
1065 inode->close_dirfrag(dir->get_frag());
1066 }
1067
1068 if (is_auth() && !replay)
1069 mark_complete();
1070
1071 // FIXME: merge dirty old rstat
1072 fnode.rstat.version = rstat_version;
1073 fnode.accounted_rstat = fnode.rstat;
1074 fnode.accounted_rstat.add(rstatdiff);
1075
1076 fnode.fragstat.version = dirstat_version;
1077 fnode.accounted_fragstat = fnode.fragstat;
1078 fnode.accounted_fragstat.add(fragstatdiff);
1079
1080 init_fragment_pins();
1081}
1082
1083
1084
1085
1086void CDir::resync_accounted_fragstat()
1087{
1088 fnode_t *pf = get_projected_fnode();
1089 inode_t *pi = inode->get_projected_inode();
1090
1091 if (pf->accounted_fragstat.version != pi->dirstat.version) {
1092 pf->fragstat.version = pi->dirstat.version;
1093 dout(10) << "resync_accounted_fragstat " << pf->accounted_fragstat << " -> " << pf->fragstat << dendl;
1094 pf->accounted_fragstat = pf->fragstat;
1095 }
1096}
1097
1098/*
1099 * resync rstat and accounted_rstat with inode
1100 */
1101void CDir::resync_accounted_rstat()
1102{
1103 fnode_t *pf = get_projected_fnode();
1104 inode_t *pi = inode->get_projected_inode();
1105
1106 if (pf->accounted_rstat.version != pi->rstat.version) {
1107 pf->rstat.version = pi->rstat.version;
1108 dout(10) << "resync_accounted_rstat " << pf->accounted_rstat << " -> " << pf->rstat << dendl;
1109 pf->accounted_rstat = pf->rstat;
1110 dirty_old_rstat.clear();
1111 }
1112}
1113
1114void CDir::assimilate_dirty_rstat_inodes()
1115{
1116 dout(10) << "assimilate_dirty_rstat_inodes" << dendl;
1117 for (elist<CInode*>::iterator p = dirty_rstat_inodes.begin_use_current();
1118 !p.end(); ++p) {
1119 CInode *in = *p;
1120 assert(in->is_auth());
1121 if (in->is_frozen())
1122 continue;
1123
1124 inode_t *pi = in->project_inode();
1125 pi->version = in->pre_dirty();
1126
1127 inode->mdcache->project_rstat_inode_to_frag(in, this, 0, 0, NULL);
1128 }
1129 state_set(STATE_ASSIMRSTAT);
1130 dout(10) << "assimilate_dirty_rstat_inodes done" << dendl;
1131}
1132
1133void CDir::assimilate_dirty_rstat_inodes_finish(MutationRef& mut, EMetaBlob *blob)
1134{
1135 if (!state_test(STATE_ASSIMRSTAT))
1136 return;
1137 state_clear(STATE_ASSIMRSTAT);
1138 dout(10) << "assimilate_dirty_rstat_inodes_finish" << dendl;
1139 elist<CInode*>::iterator p = dirty_rstat_inodes.begin_use_current();
1140 while (!p.end()) {
1141 CInode *in = *p;
1142 ++p;
1143
1144 if (in->is_frozen())
1145 continue;
1146
1147 CDentry *dn = in->get_projected_parent_dn();
1148
1149 mut->auth_pin(in);
1150 mut->add_projected_inode(in);
1151
1152 in->clear_dirty_rstat();
1153 blob->add_primary_dentry(dn, in, true);
1154 }
1155
1156 if (!dirty_rstat_inodes.empty())
1157 inode->mdcache->mds->locker->mark_updated_scatterlock(&inode->nestlock);
1158}
1159
1160
1161
1162
1163/****************************************
1164 * WAITING
1165 */
1166
1167void CDir::add_dentry_waiter(const string& dname, snapid_t snapid, MDSInternalContextBase *c)
1168{
1169 if (waiting_on_dentry.empty())
1170 get(PIN_DNWAITER);
1171 waiting_on_dentry[string_snap_t(dname, snapid)].push_back(c);
1172 dout(10) << "add_dentry_waiter dentry " << dname
1173 << " snap " << snapid
1174 << " " << c << " on " << *this << dendl;
1175}
1176
1177void CDir::take_dentry_waiting(const string& dname, snapid_t first, snapid_t last,
1178 list<MDSInternalContextBase*>& ls)
1179{
1180 if (waiting_on_dentry.empty())
1181 return;
1182
1183 string_snap_t lb(dname, first);
1184 string_snap_t ub(dname, last);
1185 compact_map<string_snap_t, list<MDSInternalContextBase*> >::iterator p = waiting_on_dentry.lower_bound(lb);
1186 while (p != waiting_on_dentry.end() &&
1187 !(ub < p->first)) {
1188 dout(10) << "take_dentry_waiting dentry " << dname
1189 << " [" << first << "," << last << "] found waiter on snap "
1190 << p->first.snapid
1191 << " on " << *this << dendl;
1192 ls.splice(ls.end(), p->second);
1193 waiting_on_dentry.erase(p++);
1194 }
1195
1196 if (waiting_on_dentry.empty())
1197 put(PIN_DNWAITER);
1198}
1199
1200void CDir::take_sub_waiting(list<MDSInternalContextBase*>& ls)
1201{
1202 dout(10) << "take_sub_waiting" << dendl;
1203 if (!waiting_on_dentry.empty()) {
1204 for (compact_map<string_snap_t, list<MDSInternalContextBase*> >::iterator p = waiting_on_dentry.begin();
1205 p != waiting_on_dentry.end();
1206 ++p)
1207 ls.splice(ls.end(), p->second);
1208 waiting_on_dentry.clear();
1209 put(PIN_DNWAITER);
1210 }
1211}
1212
1213
1214
1215void CDir::add_waiter(uint64_t tag, MDSInternalContextBase *c)
1216{
1217 // hierarchical?
1218
1219 // at free root?
1220 if (tag & WAIT_ATFREEZEROOT) {
1221 if (!(is_freezing_tree_root() || is_frozen_tree_root() ||
1222 is_freezing_dir() || is_frozen_dir())) {
1223 // try parent
1224 dout(10) << "add_waiter " << std::hex << tag << std::dec << " " << c << " should be ATFREEZEROOT, " << *this << " is not root, trying parent" << dendl;
1225 inode->parent->dir->add_waiter(tag, c);
1226 return;
1227 }
1228 }
1229
1230 // at subtree root?
1231 if (tag & WAIT_ATSUBTREEROOT) {
1232 if (!is_subtree_root()) {
1233 // try parent
1234 dout(10) << "add_waiter " << std::hex << tag << std::dec << " " << c << " should be ATSUBTREEROOT, " << *this << " is not root, trying parent" << dendl;
1235 inode->parent->dir->add_waiter(tag, c);
1236 return;
1237 }
1238 }
1239
1240 assert(!(tag & WAIT_CREATED) || state_test(STATE_CREATING));
1241
1242 MDSCacheObject::add_waiter(tag, c);
1243}
1244
1245
1246
1247/* NOTE: this checks dentry waiters too */
1248void CDir::take_waiting(uint64_t mask, list<MDSInternalContextBase*>& ls)
1249{
1250 if ((mask & WAIT_DENTRY) && !waiting_on_dentry.empty()) {
1251 // take all dentry waiters
1252 while (!waiting_on_dentry.empty()) {
1253 compact_map<string_snap_t, list<MDSInternalContextBase*> >::iterator p = waiting_on_dentry.begin();
1254 dout(10) << "take_waiting dentry " << p->first.name
1255 << " snap " << p->first.snapid << " on " << *this << dendl;
1256 ls.splice(ls.end(), p->second);
1257 waiting_on_dentry.erase(p);
1258 }
1259 put(PIN_DNWAITER);
1260 }
1261
1262 // waiting
1263 MDSCacheObject::take_waiting(mask, ls);
1264}
1265
1266
1267void CDir::finish_waiting(uint64_t mask, int result)
1268{
1269 dout(11) << "finish_waiting mask " << hex << mask << dec << " result " << result << " on " << *this << dendl;
1270
1271 list<MDSInternalContextBase*> finished;
1272 take_waiting(mask, finished);
1273 if (result < 0)
1274 finish_contexts(g_ceph_context, finished, result);
1275 else
1276 cache->mds->queue_waiters(finished);
1277}
1278
1279
1280
1281// dirty/clean
1282
1283fnode_t *CDir::project_fnode()
1284{
1285 assert(get_version() != 0);
1286 fnode_t *p = new fnode_t;
1287 *p = *get_projected_fnode();
1288 projected_fnode.push_back(p);
1289
1290 if (scrub_infop && scrub_infop->last_scrub_dirty) {
1291 p->localized_scrub_stamp = scrub_infop->last_local.time;
1292 p->localized_scrub_version = scrub_infop->last_local.version;
1293 p->recursive_scrub_stamp = scrub_infop->last_recursive.time;
1294 p->recursive_scrub_version = scrub_infop->last_recursive.version;
1295 scrub_infop->last_scrub_dirty = false;
1296 scrub_maybe_delete_info();
1297 }
1298
1299 dout(10) << "project_fnode " << p << dendl;
1300 return p;
1301}
1302
1303void CDir::pop_and_dirty_projected_fnode(LogSegment *ls)
1304{
1305 assert(!projected_fnode.empty());
1306 dout(15) << "pop_and_dirty_projected_fnode " << projected_fnode.front()
1307 << " v" << projected_fnode.front()->version << dendl;
1308 fnode = *projected_fnode.front();
1309 _mark_dirty(ls);
1310 delete projected_fnode.front();
1311 projected_fnode.pop_front();
1312}
1313
1314
1315version_t CDir::pre_dirty(version_t min)
1316{
1317 if (min > projected_version)
1318 projected_version = min;
1319 ++projected_version;
1320 dout(10) << "pre_dirty " << projected_version << dendl;
1321 return projected_version;
1322}
1323
1324void CDir::mark_dirty(version_t pv, LogSegment *ls)
1325{
1326 assert(get_version() < pv);
1327 assert(pv <= projected_version);
1328 fnode.version = pv;
1329 _mark_dirty(ls);
1330}
1331
1332void CDir::_mark_dirty(LogSegment *ls)
1333{
1334 if (!state_test(STATE_DIRTY)) {
1335 dout(10) << "mark_dirty (was clean) " << *this << " version " << get_version() << dendl;
1336 _set_dirty_flag();
1337 assert(ls);
1338 } else {
1339 dout(10) << "mark_dirty (already dirty) " << *this << " version " << get_version() << dendl;
1340 }
1341 if (ls) {
1342 ls->dirty_dirfrags.push_back(&item_dirty);
1343
1344 // if i've never committed, i need to be before _any_ mention of me is trimmed from the journal.
1345 if (committed_version == 0 && !item_new.is_on_list())
1346 ls->new_dirfrags.push_back(&item_new);
1347 }
1348}
1349
1350void CDir::mark_new(LogSegment *ls)
1351{
1352 ls->new_dirfrags.push_back(&item_new);
1353 state_clear(STATE_CREATING);
1354
1355 list<MDSInternalContextBase*> waiters;
1356 take_waiting(CDir::WAIT_CREATED, waiters);
1357 cache->mds->queue_waiters(waiters);
1358}
1359
1360void CDir::mark_clean()
1361{
1362 dout(10) << "mark_clean " << *this << " version " << get_version() << dendl;
1363 if (state_test(STATE_DIRTY)) {
1364 item_dirty.remove_myself();
1365 item_new.remove_myself();
1366
1367 state_clear(STATE_DIRTY);
1368 put(PIN_DIRTY);
1369 }
1370}
1371
1372// caller should hold auth pin of this
1373void CDir::log_mark_dirty()
1374{
1375 if (is_dirty() || is_projected())
1376 return; // noop if it is already dirty or will be dirty
1377
1378 version_t pv = pre_dirty();
1379 mark_dirty(pv, cache->mds->mdlog->get_current_segment());
1380}
1381
1382void CDir::mark_complete() {
1383 state_set(STATE_COMPLETE);
1384 bloom.reset();
1385}
1386
1387void CDir::first_get()
1388{
1389 inode->get(CInode::PIN_DIRFRAG);
1390}
1391
1392void CDir::last_put()
1393{
1394 inode->put(CInode::PIN_DIRFRAG);
1395}
1396
1397
1398
1399/******************************************************************************
1400 * FETCH and COMMIT
1401 */
1402
1403// -----------------------
1404// FETCH
1405void CDir::fetch(MDSInternalContextBase *c, bool ignore_authpinnability)
1406{
1407 string want;
1408 return fetch(c, want, ignore_authpinnability);
1409}
1410
1411void CDir::fetch(MDSInternalContextBase *c, const string& want_dn, bool ignore_authpinnability)
1412{
1413 dout(10) << "fetch on " << *this << dendl;
1414
1415 assert(is_auth());
1416 assert(!is_complete());
1417
1418 if (!can_auth_pin() && !ignore_authpinnability) {
1419 if (c) {
1420 dout(7) << "fetch waiting for authpinnable" << dendl;
1421 add_waiter(WAIT_UNFREEZE, c);
1422 } else
1423 dout(7) << "fetch not authpinnable and no context" << dendl;
1424 return;
1425 }
1426
1427 // unlinked directory inode shouldn't have any entry
1428 if (inode->inode.nlink == 0 && !inode->snaprealm) {
1429 dout(7) << "fetch dirfrag for unlinked directory, mark complete" << dendl;
1430 if (get_version() == 0) {
1431 set_version(1);
1432
1433 if (state_test(STATE_REJOINUNDEF)) {
1434 assert(cache->mds->is_rejoin());
1435 state_clear(STATE_REJOINUNDEF);
1436 cache->opened_undef_dirfrag(this);
1437 }
1438 }
1439 mark_complete();
1440
1441 if (c)
1442 cache->mds->queue_waiter(c);
1443 return;
1444 }
1445
1446 if (c) add_waiter(WAIT_COMPLETE, c);
1447 if (!want_dn.empty()) wanted_items.insert(want_dn);
1448
1449 // already fetching?
1450 if (state_test(CDir::STATE_FETCHING)) {
1451 dout(7) << "already fetching; waiting" << dendl;
1452 return;
1453 }
1454
1455 auth_pin(this);
1456 state_set(CDir::STATE_FETCHING);
1457
1458 if (cache->mds->logger) cache->mds->logger->inc(l_mds_dir_fetch);
1459
1460 std::set<dentry_key_t> empty;
1461 _omap_fetch(NULL, empty);
1462}
1463
1464void CDir::fetch(MDSInternalContextBase *c, const std::set<dentry_key_t>& keys)
1465{
1466 dout(10) << "fetch " << keys.size() << " keys on " << *this << dendl;
1467
1468 assert(is_auth());
1469 assert(!is_complete());
1470
1471 if (!can_auth_pin()) {
1472 dout(7) << "fetch keys waiting for authpinnable" << dendl;
1473 add_waiter(WAIT_UNFREEZE, c);
1474 return;
1475 }
1476 if (state_test(CDir::STATE_FETCHING)) {
1477 dout(7) << "fetch keys waiting for full fetch" << dendl;
1478 add_waiter(WAIT_COMPLETE, c);
1479 return;
1480 }
1481
1482 auth_pin(this);
1483 if (cache->mds->logger) cache->mds->logger->inc(l_mds_dir_fetch);
1484
1485 _omap_fetch(c, keys);
1486}
1487
1488class C_IO_Dir_OMAP_FetchedMore : public CDirIOContext {
1489 MDSInternalContextBase *fin;
1490public:
1491 bufferlist hdrbl;
1492 bool more = false;
1493 map<string, bufferlist> omap; ///< carry-over from before
1494 map<string, bufferlist> omap_more; ///< new batch
1495 int ret;
1496 C_IO_Dir_OMAP_FetchedMore(CDir *d, MDSInternalContextBase *f) :
1497 CDirIOContext(d), fin(f), ret(0) { }
1498 void finish(int r) {
1499 // merge results
1500 if (omap.empty()) {
1501 omap.swap(omap_more);
1502 } else {
1503 omap.insert(omap_more.begin(), omap_more.end());
1504 }
1505 if (more) {
1506 dir->_omap_fetch_more(hdrbl, omap, fin);
1507 } else {
1508 dir->_omap_fetched(hdrbl, omap, !fin, r);
1509 if (fin)
1510 fin->complete(r);
1511 }
1512 }
1513};
1514
1515class C_IO_Dir_OMAP_Fetched : public CDirIOContext {
1516 MDSInternalContextBase *fin;
1517public:
1518 bufferlist hdrbl;
1519 bool more = false;
1520 map<string, bufferlist> omap;
1521 bufferlist btbl;
1522 int ret1, ret2, ret3;
1523
1524 C_IO_Dir_OMAP_Fetched(CDir *d, MDSInternalContextBase *f) :
1525 CDirIOContext(d), fin(f), ret1(0), ret2(0), ret3(0) { }
1526 void finish(int r) override {
1527 // check the correctness of backtrace
1528 if (r >= 0 && ret3 != -ECANCELED)
1529 dir->inode->verify_diri_backtrace(btbl, ret3);
1530 if (r >= 0) r = ret1;
1531 if (r >= 0) r = ret2;
1532 if (more) {
1533 dir->_omap_fetch_more(hdrbl, omap, fin);
1534 } else {
1535 dir->_omap_fetched(hdrbl, omap, !fin, r);
1536 if (fin)
1537 fin->complete(r);
1538 }
1539 }
1540};
1541
1542void CDir::_omap_fetch(MDSInternalContextBase *c, const std::set<dentry_key_t>& keys)
1543{
1544 C_IO_Dir_OMAP_Fetched *fin = new C_IO_Dir_OMAP_Fetched(this, c);
1545 object_t oid = get_ondisk_object();
1546 object_locator_t oloc(cache->mds->mdsmap->get_metadata_pool());
1547 ObjectOperation rd;
1548 rd.omap_get_header(&fin->hdrbl, &fin->ret1);
1549 if (keys.empty()) {
1550 assert(!c);
1551 rd.omap_get_vals("", "", g_conf->mds_dir_keys_per_op,
1552 &fin->omap, &fin->more, &fin->ret2);
1553 } else {
1554 assert(c);
1555 std::set<std::string> str_keys;
1556 for (auto p = keys.begin(); p != keys.end(); ++p) {
1557 string str;
1558 p->encode(str);
1559 str_keys.insert(str);
1560 }
1561 rd.omap_get_vals_by_keys(str_keys, &fin->omap, &fin->ret2);
1562 }
1563 // check the correctness of backtrace
1564 if (g_conf->mds_verify_backtrace > 0 && frag == frag_t()) {
1565 rd.getxattr("parent", &fin->btbl, &fin->ret3);
1566 rd.set_last_op_flags(CEPH_OSD_OP_FLAG_FAILOK);
1567 } else {
1568 fin->ret3 = -ECANCELED;
1569 }
1570
1571 cache->mds->objecter->read(oid, oloc, rd, CEPH_NOSNAP, NULL, 0,
1572 new C_OnFinisher(fin, cache->mds->finisher));
1573}
1574
1575void CDir::_omap_fetch_more(
1576 bufferlist& hdrbl,
1577 map<string, bufferlist>& omap,
1578 MDSInternalContextBase *c)
1579{
1580 // we have more omap keys to fetch!
1581 object_t oid = get_ondisk_object();
1582 object_locator_t oloc(cache->mds->mdsmap->get_metadata_pool());
1583 C_IO_Dir_OMAP_FetchedMore *fin = new C_IO_Dir_OMAP_FetchedMore(this, c);
1584 fin->hdrbl.claim(hdrbl);
1585 fin->omap.swap(omap);
1586 ObjectOperation rd;
1587 rd.omap_get_vals(fin->omap.rbegin()->first,
1588 "", /* filter prefix */
1589 g_conf->mds_dir_keys_per_op,
1590 &fin->omap_more,
1591 &fin->more,
1592 &fin->ret);
1593 cache->mds->objecter->read(oid, oloc, rd, CEPH_NOSNAP, NULL, 0,
1594 new C_OnFinisher(fin, cache->mds->finisher));
1595}
1596
1597CDentry *CDir::_load_dentry(
1598 const std::string &key,
1599 const std::string &dname,
1600 const snapid_t last,
1601 bufferlist &bl,
1602 const int pos,
1603 const std::set<snapid_t> *snaps,
1604 bool *force_dirty,
1605 list<CInode*> *undef_inodes)
1606{
1607 bufferlist::iterator q = bl.begin();
1608
1609 snapid_t first;
1610 ::decode(first, q);
1611
1612 // marker
1613 char type;
1614 ::decode(type, q);
1615
1616 dout(20) << "_fetched pos " << pos << " marker '" << type << "' dname '" << dname
1617 << " [" << first << "," << last << "]"
1618 << dendl;
1619
1620 bool stale = false;
1621 if (snaps && last != CEPH_NOSNAP) {
1622 set<snapid_t>::const_iterator p = snaps->lower_bound(first);
1623 if (p == snaps->end() || *p > last) {
1624 dout(10) << " skipping stale dentry on [" << first << "," << last << "]" << dendl;
1625 stale = true;
1626 }
1627 }
1628
1629 /*
1630 * look for existing dentry for _last_ snap, because unlink +
1631 * create may leave a "hole" (epochs during which the dentry
1632 * doesn't exist) but for which no explicit negative dentry is in
1633 * the cache.
1634 */
1635 CDentry *dn;
1636 if (stale)
1637 dn = lookup_exact_snap(dname, last);
1638 else
1639 dn = lookup(dname, last);
1640
1641 if (type == 'L') {
1642 // hard link
1643 inodeno_t ino;
1644 unsigned char d_type;
1645 ::decode(ino, q);
1646 ::decode(d_type, q);
1647
1648 if (stale) {
1649 if (!dn) {
1650 stale_items.insert(key);
1651 *force_dirty = true;
1652 }
1653 return dn;
1654 }
1655
1656 if (dn) {
1657 if (dn->get_linkage()->get_inode() == 0) {
1658 dout(12) << "_fetched had NEG dentry " << *dn << dendl;
1659 } else {
1660 dout(12) << "_fetched had dentry " << *dn << dendl;
1661 }
1662 } else {
1663 // (remote) link
1664 dn = add_remote_dentry(dname, ino, d_type, first, last);
1665
1666 // link to inode?
1667 CInode *in = cache->get_inode(ino); // we may or may not have it.
1668 if (in) {
1669 dn->link_remote(dn->get_linkage(), in);
1670 dout(12) << "_fetched got remote link " << ino << " which we have " << *in << dendl;
1671 } else {
1672 dout(12) << "_fetched got remote link " << ino << " (dont' have it)" << dendl;
1673 }
1674 }
1675 }
1676 else if (type == 'I') {
1677 // inode
1678
1679 // Load inode data before looking up or constructing CInode
1680 InodeStore inode_data;
1681 inode_data.decode_bare(q);
1682
1683 if (stale) {
1684 if (!dn) {
1685 stale_items.insert(key);
1686 *force_dirty = true;
1687 }
1688 return dn;
1689 }
1690
1691 bool undef_inode = false;
1692 if (dn) {
1693 CInode *in = dn->get_linkage()->get_inode();
1694 if (in) {
1695 dout(12) << "_fetched had dentry " << *dn << dendl;
1696 if (in->state_test(CInode::STATE_REJOINUNDEF)) {
1697 undef_inodes->push_back(in);
1698 undef_inode = true;
1699 }
1700 } else
1701 dout(12) << "_fetched had NEG dentry " << *dn << dendl;
1702 }
1703
1704 if (!dn || undef_inode) {
1705 // add inode
1706 CInode *in = cache->get_inode(inode_data.inode.ino, last);
1707 if (!in || undef_inode) {
1708 if (undef_inode && in)
1709 in->first = first;
1710 else
1711 in = new CInode(cache, true, first, last);
1712
1713 in->inode = inode_data.inode;
1714 // symlink?
1715 if (in->is_symlink())
1716 in->symlink = inode_data.symlink;
1717
1718 in->dirfragtree.swap(inode_data.dirfragtree);
1719 in->xattrs.swap(inode_data.xattrs);
1720 in->old_inodes.swap(inode_data.old_inodes);
1721 if (!in->old_inodes.empty()) {
1722 snapid_t min_first = in->old_inodes.rbegin()->first + 1;
1723 if (min_first > in->first)
1724 in->first = min_first;
1725 }
1726
1727 in->oldest_snap = inode_data.oldest_snap;
1728 in->decode_snap_blob(inode_data.snap_blob);
1729 if (snaps && !in->snaprealm)
1730 in->purge_stale_snap_data(*snaps);
1731
1732 if (!undef_inode) {
1733 cache->add_inode(in); // add
1734 dn = add_primary_dentry(dname, in, first, last); // link
1735 }
1736 dout(12) << "_fetched got " << *dn << " " << *in << dendl;
1737
1738 if (in->inode.is_dirty_rstat())
1739 in->mark_dirty_rstat();
1740
1741 //in->hack_accessed = false;
1742 //in->hack_load_stamp = ceph_clock_now();
1743 //num_new_inodes_loaded++;
1744 } else {
1745 dout(0) << "_fetched badness: got (but i already had) " << *in
1746 << " mode " << in->inode.mode
1747 << " mtime " << in->inode.mtime << dendl;
1748 string dirpath, inopath;
1749 this->inode->make_path_string(dirpath);
1750 in->make_path_string(inopath);
1751 cache->mds->clog->error() << "loaded dup inode " << inode_data.inode.ino
1752 << " [" << first << "," << last << "] v" << inode_data.inode.version
1753 << " at " << dirpath << "/" << dname
1754 << ", but inode " << in->vino() << " v" << in->inode.version
1755 << " already exists at " << inopath;
1756 return dn;
1757 }
1758 }
1759 } else {
1760 std::ostringstream oss;
1761 oss << "Invalid tag char '" << type << "' pos " << pos;
1762 throw buffer::malformed_input(oss.str());
1763 }
1764
1765 return dn;
1766}
1767
1768void CDir::_omap_fetched(bufferlist& hdrbl, map<string, bufferlist>& omap,
1769 bool complete, int r)
1770{
1771 LogChannelRef clog = cache->mds->clog;
1772 dout(10) << "_fetched header " << hdrbl.length() << " bytes "
1773 << omap.size() << " keys for " << *this << dendl;
1774
1775 assert(r == 0 || r == -ENOENT || r == -ENODATA);
1776 assert(is_auth());
1777 assert(!is_frozen());
1778
1779 if (hdrbl.length() == 0) {
1780 dout(0) << "_fetched missing object for " << *this << dendl;
1781
1782 clog->error() << "dir " << dirfrag() << " object missing on disk; some "
1783 "files may be lost (" << get_path() << ")";
1784
1785 go_bad(complete);
1786 return;
1787 }
1788
1789 fnode_t got_fnode;
1790 {
1791 bufferlist::iterator p = hdrbl.begin();
1792 try {
1793 ::decode(got_fnode, p);
1794 } catch (const buffer::error &err) {
1795 derr << "Corrupt fnode in dirfrag " << dirfrag()
1796 << ": " << err << dendl;
1797 clog->warn() << "Corrupt fnode header in " << dirfrag() << ": "
1798 << err << " (" << get_path() << ")";
1799 go_bad(complete);
1800 return;
1801 }
1802 if (!p.end()) {
1803 clog->warn() << "header buffer of dir " << dirfrag() << " has "
1804 << hdrbl.length() - p.get_off() << " extra bytes ("
1805 << get_path() << ")";
1806 go_bad(complete);
1807 return;
1808 }
1809 }
1810
1811 dout(10) << "_fetched version " << got_fnode.version << dendl;
1812
1813 // take the loaded fnode?
1814 // only if we are a fresh CDir* with no prior state.
1815 if (get_version() == 0) {
1816 assert(!is_projected());
1817 assert(!state_test(STATE_COMMITTING));
1818 fnode = got_fnode;
1819 projected_version = committing_version = committed_version = got_fnode.version;
1820
1821 if (state_test(STATE_REJOINUNDEF)) {
1822 assert(cache->mds->is_rejoin());
1823 state_clear(STATE_REJOINUNDEF);
1824 cache->opened_undef_dirfrag(this);
1825 }
1826 }
1827
1828 list<CInode*> undef_inodes;
1829
1830 // purge stale snaps?
1831 // only if we have past_parents open!
1832 bool force_dirty = false;
1833 const set<snapid_t> *snaps = NULL;
1834 SnapRealm *realm = inode->find_snaprealm();
1835 if (!realm->have_past_parents_open()) {
1836 dout(10) << " no snap purge, one or more past parents NOT open" << dendl;
1837 } else if (fnode.snap_purged_thru < realm->get_last_destroyed()) {
1838 snaps = &realm->get_snaps();
1839 dout(10) << " snap_purged_thru " << fnode.snap_purged_thru
1840 << " < " << realm->get_last_destroyed()
1841 << ", snap purge based on " << *snaps << dendl;
1842 if (get_num_snap_items() == 0) {
1843 fnode.snap_purged_thru = realm->get_last_destroyed();
1844 force_dirty = true;
1845 }
1846 }
1847
1848 unsigned pos = omap.size() - 1;
1849 for (map<string, bufferlist>::reverse_iterator p = omap.rbegin();
1850 p != omap.rend();
1851 ++p, --pos) {
1852 string dname;
1853 snapid_t last;
1854 dentry_key_t::decode_helper(p->first, dname, last);
1855
1856 CDentry *dn = NULL;
1857 try {
1858 dn = _load_dentry(
1859 p->first, dname, last, p->second, pos, snaps,
1860 &force_dirty, &undef_inodes);
1861 } catch (const buffer::error &err) {
1862 cache->mds->clog->warn() << "Corrupt dentry '" << dname << "' in "
1863 "dir frag " << dirfrag() << ": "
1864 << err << "(" << get_path() << ")";
1865
1866 // Remember that this dentry is damaged. Subsequent operations
1867 // that try to act directly on it will get their EIOs, but this
1868 // dirfrag as a whole will continue to look okay (minus the
1869 // mysteriously-missing dentry)
1870 go_bad_dentry(last, dname);
1871
1872 // Anyone who was WAIT_DENTRY for this guy will get kicked
1873 // to RetryRequest, and hit the DamageTable-interrogating path.
1874 // Stats will now be bogus because we will think we're complete,
1875 // but have 1 or more missing dentries.
1876 continue;
1877 }
1878
1879 if (dn && (wanted_items.count(dname) > 0 || !complete)) {
1880 dout(10) << " touching wanted dn " << *dn << dendl;
1881 inode->mdcache->touch_dentry(dn);
1882 }
1883
1884 /** clean underwater item?
1885 * Underwater item is something that is dirty in our cache from
1886 * journal replay, but was previously flushed to disk before the
1887 * mds failed.
1888 *
1889 * We only do this is committed_version == 0. that implies either
1890 * - this is a fetch after from a clean/empty CDir is created
1891 * (and has no effect, since the dn won't exist); or
1892 * - this is a fetch after _recovery_, which is what we're worried
1893 * about. Items that are marked dirty from the journal should be
1894 * marked clean if they appear on disk.
1895 */
1896 if (committed_version == 0 &&
1897 dn &&
1898 dn->get_version() <= got_fnode.version &&
1899 dn->is_dirty()) {
1900 dout(10) << "_fetched had underwater dentry " << *dn << ", marking clean" << dendl;
1901 dn->mark_clean();
1902
1903 if (dn->get_linkage()->is_primary()) {
1904 assert(dn->get_linkage()->get_inode()->get_version() <= got_fnode.version);
1905 dout(10) << "_fetched had underwater inode " << *dn->get_linkage()->get_inode() << ", marking clean" << dendl;
1906 dn->get_linkage()->get_inode()->mark_clean();
1907 }
1908 }
1909 }
1910
1911 //cache->mds->logger->inc("newin", num_new_inodes_loaded);
1912
1913 // mark complete, !fetching
1914 if (complete) {
1915 wanted_items.clear();
1916 mark_complete();
1917 state_clear(STATE_FETCHING);
1918
1919 if (scrub_infop && scrub_infop->need_scrub_local) {
1920 scrub_infop->need_scrub_local = false;
1921 scrub_local();
1922 }
1923 }
1924
1925 // open & force frags
1926 while (!undef_inodes.empty()) {
1927 CInode *in = undef_inodes.front();
1928 undef_inodes.pop_front();
1929 in->state_clear(CInode::STATE_REJOINUNDEF);
1930 cache->opened_undef_inode(in);
1931 }
1932
1933 // dirty myself to remove stale snap dentries
1934 if (force_dirty && !inode->mdcache->is_readonly())
1935 log_mark_dirty();
1936
1937 auth_unpin(this);
1938
1939 if (complete) {
1940 // kick waiters
1941 finish_waiting(WAIT_COMPLETE, 0);
1942 }
1943}
1944
1945void CDir::_go_bad()
1946{
1947 if (get_version() == 0)
1948 set_version(1);
1949 state_set(STATE_BADFRAG);
1950 // mark complete, !fetching
1951 mark_complete();
1952 state_clear(STATE_FETCHING);
1953 auth_unpin(this);
1954
1955 // kick waiters
1956 finish_waiting(WAIT_COMPLETE, -EIO);
1957}
1958
1959void CDir::go_bad_dentry(snapid_t last, const std::string &dname)
1960{
1961 dout(10) << "go_bad_dentry " << dname << dendl;
1962 const bool fatal = cache->mds->damage_table.notify_dentry(
1963 inode->ino(), frag, last, dname, get_path() + "/" + dname);
1964 if (fatal) {
1965 cache->mds->damaged();
1966 ceph_abort(); // unreachable, damaged() respawns us
1967 }
1968}
1969
1970void CDir::go_bad(bool complete)
1971{
1972 dout(10) << "go_bad " << frag << dendl;
1973 const bool fatal = cache->mds->damage_table.notify_dirfrag(
1974 inode->ino(), frag, get_path());
1975 if (fatal) {
1976 cache->mds->damaged();
1977 ceph_abort(); // unreachable, damaged() respawns us
1978 }
1979
1980 if (complete)
1981 _go_bad();
1982 else
1983 auth_unpin(this);
1984}
1985
1986// -----------------------
1987// COMMIT
1988
1989/**
1990 * commit
1991 *
1992 * @param want - min version i want committed
1993 * @param c - callback for completion
1994 */
1995void CDir::commit(version_t want, MDSInternalContextBase *c, bool ignore_authpinnability, int op_prio)
1996{
1997 dout(10) << "commit want " << want << " on " << *this << dendl;
1998 if (want == 0) want = get_version();
1999
2000 // preconditions
2001 assert(want <= get_version() || get_version() == 0); // can't commit the future
2002 assert(want > committed_version); // the caller is stupid
2003 assert(is_auth());
2004 assert(ignore_authpinnability || can_auth_pin());
2005
2006 if (inode->inode.nlink == 0 && !inode->snaprealm) {
2007 dout(7) << "commit dirfrag for unlinked directory, mark clean" << dendl;
2008 try_remove_dentries_for_stray();
2009 if (c)
2010 cache->mds->queue_waiter(c);
2011 return;
2012 }
2013
2014 // note: queue up a noop if necessary, so that we always
2015 // get an auth_pin.
2016 if (!c)
2017 c = new C_MDSInternalNoop;
2018
2019 // auth_pin on first waiter
2020 if (waiting_for_commit.empty())
2021 auth_pin(this);
2022 waiting_for_commit[want].push_back(c);
2023
2024 // ok.
2025 _commit(want, op_prio);
2026}
2027
2028class C_IO_Dir_Committed : public CDirIOContext {
2029 version_t version;
2030public:
2031 C_IO_Dir_Committed(CDir *d, version_t v) : CDirIOContext(d), version(v) { }
2032 void finish(int r) override {
2033 dir->_committed(r, version);
2034 }
2035};
2036
2037/**
2038 * Flush out the modified dentries in this dir. Keep the bufferlist
2039 * below max_write_size;
2040 */
2041void CDir::_omap_commit(int op_prio)
2042{
2043 dout(10) << "_omap_commit" << dendl;
2044
2045 unsigned max_write_size = cache->max_dir_commit_size;
2046 unsigned write_size = 0;
2047
2048 if (op_prio < 0)
2049 op_prio = CEPH_MSG_PRIO_DEFAULT;
2050
2051 // snap purge?
2052 const set<snapid_t> *snaps = NULL;
2053 SnapRealm *realm = inode->find_snaprealm();
2054 if (!realm->have_past_parents_open()) {
2055 dout(10) << " no snap purge, one or more past parents NOT open" << dendl;
2056 } else if (fnode.snap_purged_thru < realm->get_last_destroyed()) {
2057 snaps = &realm->get_snaps();
2058 dout(10) << " snap_purged_thru " << fnode.snap_purged_thru
2059 << " < " << realm->get_last_destroyed()
2060 << ", snap purge based on " << *snaps << dendl;
2061 // fnode.snap_purged_thru = realm->get_last_destroyed();
2062 }
2063
2064 set<string> to_remove;
2065 map<string, bufferlist> to_set;
2066
2067 C_GatherBuilder gather(g_ceph_context,
2068 new C_OnFinisher(new C_IO_Dir_Committed(this,
2069 get_version()),
2070 cache->mds->finisher));
2071
2072 SnapContext snapc;
2073 object_t oid = get_ondisk_object();
2074 object_locator_t oloc(cache->mds->mdsmap->get_metadata_pool());
2075
2076 if (!stale_items.empty()) {
2077 for (compact_set<string>::iterator p = stale_items.begin();
2078 p != stale_items.end();
2079 ++p) {
2080 to_remove.insert(*p);
2081 write_size += (*p).length();
2082 }
2083 stale_items.clear();
2084 }
2085
2086 for (map_t::iterator p = items.begin();
2087 p != items.end(); ) {
2088 CDentry *dn = p->second;
2089 ++p;
2090
2091 string key;
2092 dn->key().encode(key);
2093
2094 if (dn->last != CEPH_NOSNAP &&
2095 snaps && try_trim_snap_dentry(dn, *snaps)) {
2096 dout(10) << " rm " << key << dendl;
2097 write_size += key.length();
2098 to_remove.insert(key);
2099 continue;
2100 }
2101
2102 if (!dn->is_dirty() &&
2103 (!dn->state_test(CDentry::STATE_FRAGMENTING) || dn->get_linkage()->is_null()))
2104 continue; // skip clean dentries
2105
2106 if (dn->get_linkage()->is_null()) {
2107 dout(10) << " rm " << dn->name << " " << *dn << dendl;
2108 write_size += key.length();
2109 to_remove.insert(key);
2110 } else {
2111 dout(10) << " set " << dn->name << " " << *dn << dendl;
2112 bufferlist dnbl;
2113 _encode_dentry(dn, dnbl, snaps);
2114 write_size += key.length() + dnbl.length();
2115 to_set[key].swap(dnbl);
2116 }
2117
2118 if (write_size >= max_write_size) {
2119 ObjectOperation op;
2120 op.priority = op_prio;
2121
2122 // don't create new dirfrag blindly
2123 if (!is_new() && !state_test(CDir::STATE_FRAGMENTING))
2124 op.stat(NULL, (ceph::real_time*) NULL, NULL);
2125
2126 if (!to_set.empty())
2127 op.omap_set(to_set);
2128 if (!to_remove.empty())
2129 op.omap_rm_keys(to_remove);
2130
2131 cache->mds->objecter->mutate(oid, oloc, op, snapc,
2132 ceph::real_clock::now(),
2133 0, gather.new_sub());
2134
2135 write_size = 0;
2136 to_set.clear();
2137 to_remove.clear();
2138 }
2139 }
2140
2141 ObjectOperation op;
2142 op.priority = op_prio;
2143
2144 // don't create new dirfrag blindly
2145 if (!is_new() && !state_test(CDir::STATE_FRAGMENTING))
2146 op.stat(NULL, (ceph::real_time*)NULL, NULL);
2147
2148 /*
2149 * save the header at the last moment.. If we were to send it off before other
2150 * updates, but die before sending them all, we'd think that the on-disk state
2151 * was fully committed even though it wasn't! However, since the messages are
2152 * strictly ordered between the MDS and the OSD, and since messages to a given
2153 * PG are strictly ordered, if we simply send the message containing the header
2154 * off last, we cannot get our header into an incorrect state.
2155 */
2156 bufferlist header;
2157 ::encode(fnode, header);
2158 op.omap_set_header(header);
2159
2160 if (!to_set.empty())
2161 op.omap_set(to_set);
2162 if (!to_remove.empty())
2163 op.omap_rm_keys(to_remove);
2164
2165 cache->mds->objecter->mutate(oid, oloc, op, snapc,
2166 ceph::real_clock::now(),
2167 0, gather.new_sub());
2168
2169 gather.activate();
2170}
2171
2172void CDir::_encode_dentry(CDentry *dn, bufferlist& bl,
2173 const set<snapid_t> *snaps)
2174{
2175 // clear dentry NEW flag, if any. we can no longer silently drop it.
2176 dn->clear_new();
2177
2178 ::encode(dn->first, bl);
2179
2180 // primary or remote?
2181 if (dn->linkage.is_remote()) {
2182 inodeno_t ino = dn->linkage.get_remote_ino();
2183 unsigned char d_type = dn->linkage.get_remote_d_type();
2184 dout(14) << " pos " << bl.length() << " dn '" << dn->name << "' remote ino " << ino << dendl;
2185
2186 // marker, name, ino
2187 bl.append('L'); // remote link
2188 ::encode(ino, bl);
2189 ::encode(d_type, bl);
2190 } else if (dn->linkage.is_primary()) {
2191 // primary link
2192 CInode *in = dn->linkage.get_inode();
2193 assert(in);
2194
2195 dout(14) << " pos " << bl.length() << " dn '" << dn->name << "' inode " << *in << dendl;
2196
2197 // marker, name, inode, [symlink string]
2198 bl.append('I'); // inode
2199
2200 if (in->is_multiversion()) {
2201 if (!in->snaprealm) {
2202 if (snaps)
2203 in->purge_stale_snap_data(*snaps);
2204 } else if (in->snaprealm->have_past_parents_open()) {
2205 in->purge_stale_snap_data(in->snaprealm->get_snaps());
2206 }
2207 }
2208
2209 bufferlist snap_blob;
2210 in->encode_snap_blob(snap_blob);
2211 in->encode_bare(bl, cache->mds->mdsmap->get_up_features(), &snap_blob);
2212 } else {
2213 assert(!dn->linkage.is_null());
2214 }
2215}
2216
2217void CDir::_commit(version_t want, int op_prio)
2218{
2219 dout(10) << "_commit want " << want << " on " << *this << dendl;
2220
2221 // we can't commit things in the future.
2222 // (even the projected future.)
2223 assert(want <= get_version() || get_version() == 0);
2224
2225 // check pre+postconditions.
2226 assert(is_auth());
2227
2228 // already committed?
2229 if (committed_version >= want) {
2230 dout(10) << "already committed " << committed_version << " >= " << want << dendl;
2231 return;
2232 }
2233 // already committing >= want?
2234 if (committing_version >= want) {
2235 dout(10) << "already committing " << committing_version << " >= " << want << dendl;
2236 assert(state_test(STATE_COMMITTING));
2237 return;
2238 }
2239
2240 // alrady committed an older version?
2241 if (committing_version > committed_version) {
2242 dout(10) << "already committing older " << committing_version << ", waiting for that to finish" << dendl;
2243 return;
2244 }
2245
2246 // commit.
2247 committing_version = get_version();
2248
2249 // mark committing (if not already)
2250 if (!state_test(STATE_COMMITTING)) {
2251 dout(10) << "marking committing" << dendl;
2252 state_set(STATE_COMMITTING);
2253 }
2254
2255 if (cache->mds->logger) cache->mds->logger->inc(l_mds_dir_commit);
2256
2257 _omap_commit(op_prio);
2258}
2259
2260
2261/**
2262 * _committed
2263 *
2264 * @param v version i just committed
2265 */
2266void CDir::_committed(int r, version_t v)
2267{
2268 if (r < 0) {
2269 // the directory could be partly purged during MDS failover
2270 if (r == -ENOENT && committed_version == 0 &&
2271 inode->inode.nlink == 0 && inode->snaprealm) {
2272 inode->state_set(CInode::STATE_MISSINGOBJS);
2273 r = 0;
2274 }
2275 if (r < 0) {
2276 dout(1) << "commit error " << r << " v " << v << dendl;
2277 cache->mds->clog->error() << "failed to commit dir " << dirfrag() << " object,"
2278 << " errno " << r;
2279 cache->mds->handle_write_error(r);
2280 return;
2281 }
2282 }
2283
2284 dout(10) << "_committed v " << v << " on " << *this << dendl;
2285 assert(is_auth());
2286
2287 bool stray = inode->is_stray();
2288
2289 // take note.
2290 assert(v > committed_version);
2291 assert(v <= committing_version);
2292 committed_version = v;
2293
2294 // _all_ commits done?
2295 if (committing_version == committed_version)
2296 state_clear(CDir::STATE_COMMITTING);
2297
2298 // _any_ commit, even if we've been redirtied, means we're no longer new.
2299 item_new.remove_myself();
2300
2301 // dir clean?
2302 if (committed_version == get_version())
2303 mark_clean();
2304
2305 // dentries clean?
2306 for (map_t::iterator it = items.begin();
2307 it != items.end(); ) {
2308 CDentry *dn = it->second;
2309 ++it;
2310
2311 // inode?
2312 if (dn->linkage.is_primary()) {
2313 CInode *in = dn->linkage.get_inode();
2314 assert(in);
2315 assert(in->is_auth());
2316
2317 if (committed_version >= in->get_version()) {
2318 if (in->is_dirty()) {
2319 dout(15) << " dir " << committed_version << " >= inode " << in->get_version() << " now clean " << *in << dendl;
2320 in->mark_clean();
2321 }
2322 } else {
2323 dout(15) << " dir " << committed_version << " < inode " << in->get_version() << " still dirty " << *in << dendl;
2324 assert(in->is_dirty() || in->last < CEPH_NOSNAP); // special case for cow snap items (not predirtied)
2325 }
2326 }
2327
2328 // dentry
2329 if (committed_version >= dn->get_version()) {
2330 if (dn->is_dirty()) {
2331 dout(15) << " dir " << committed_version << " >= dn " << dn->get_version() << " now clean " << *dn << dendl;
2332 dn->mark_clean();
2333
2334 // drop clean null stray dentries immediately
2335 if (stray &&
2336 dn->get_num_ref() == 0 &&
2337 !dn->is_projected() &&
2338 dn->get_linkage()->is_null())
2339 remove_dentry(dn);
2340 }
2341 } else {
2342 dout(15) << " dir " << committed_version << " < dn " << dn->get_version() << " still dirty " << *dn << dendl;
2343 }
2344 }
2345
2346 // finishers?
2347 bool were_waiters = !waiting_for_commit.empty();
2348
2349 compact_map<version_t, list<MDSInternalContextBase*> >::iterator p = waiting_for_commit.begin();
2350 while (p != waiting_for_commit.end()) {
2351 compact_map<version_t, list<MDSInternalContextBase*> >::iterator n = p;
2352 ++n;
2353 if (p->first > committed_version) {
2354 dout(10) << " there are waiters for " << p->first << ", committing again" << dendl;
2355 _commit(p->first, -1);
2356 break;
2357 }
2358 cache->mds->queue_waiters(p->second);
2359 waiting_for_commit.erase(p);
2360 p = n;
2361 }
2362
2363 // try drop dentries in this dirfrag if it's about to be purged
2364 if (inode->inode.nlink == 0 && inode->snaprealm)
2365 cache->maybe_eval_stray(inode, true);
2366
2367 // unpin if we kicked the last waiter.
2368 if (were_waiters &&
2369 waiting_for_commit.empty())
2370 auth_unpin(this);
2371}
2372
2373
2374
2375
2376// IMPORT/EXPORT
2377
2378void CDir::encode_export(bufferlist& bl)
2379{
2380 assert(!is_projected());
2381 ::encode(first, bl);
2382 ::encode(fnode, bl);
2383 ::encode(dirty_old_rstat, bl);
2384 ::encode(committed_version, bl);
2385
2386 ::encode(state, bl);
2387 ::encode(dir_rep, bl);
2388
2389 ::encode(pop_me, bl);
2390 ::encode(pop_auth_subtree, bl);
2391
2392 ::encode(dir_rep_by, bl);
2393 ::encode(replica_map, bl);
2394
2395 get(PIN_TEMPEXPORTING);
2396}
2397
2398void CDir::finish_export(utime_t now)
2399{
2400 state &= MASK_STATE_EXPORT_KEPT;
2401 pop_auth_subtree_nested.sub(now, cache->decayrate, pop_auth_subtree);
2402 pop_me.zero(now);
2403 pop_auth_subtree.zero(now);
2404 put(PIN_TEMPEXPORTING);
2405 dirty_old_rstat.clear();
2406}
2407
2408void CDir::decode_import(bufferlist::iterator& blp, utime_t now, LogSegment *ls)
2409{
2410 ::decode(first, blp);
2411 ::decode(fnode, blp);
2412 ::decode(dirty_old_rstat, blp);
2413 projected_version = fnode.version;
2414 ::decode(committed_version, blp);
2415 committing_version = committed_version;
2416
2417 unsigned s;
2418 ::decode(s, blp);
2419 state &= MASK_STATE_IMPORT_KEPT;
2420 state_set(STATE_AUTH | (s & MASK_STATE_EXPORTED));
2421
2422 if (is_dirty()) {
2423 get(PIN_DIRTY);
2424 _mark_dirty(ls);
2425 }
2426
2427 ::decode(dir_rep, blp);
2428
2429 ::decode(pop_me, now, blp);
2430 ::decode(pop_auth_subtree, now, blp);
2431 pop_auth_subtree_nested.add(now, cache->decayrate, pop_auth_subtree);
2432
2433 ::decode(dir_rep_by, blp);
2434 ::decode(replica_map, blp);
2435 if (!replica_map.empty()) get(PIN_REPLICATED);
2436
2437 replica_nonce = 0; // no longer defined
2438
2439 // did we import some dirty scatterlock data?
2440 if (dirty_old_rstat.size() ||
2441 !(fnode.rstat == fnode.accounted_rstat)) {
2442 cache->mds->locker->mark_updated_scatterlock(&inode->nestlock);
2443 ls->dirty_dirfrag_nest.push_back(&inode->item_dirty_dirfrag_nest);
2444 }
2445 if (!(fnode.fragstat == fnode.accounted_fragstat)) {
2446 cache->mds->locker->mark_updated_scatterlock(&inode->filelock);
2447 ls->dirty_dirfrag_dir.push_back(&inode->item_dirty_dirfrag_dir);
2448 }
2449 if (is_dirty_dft()) {
2450 if (inode->dirfragtreelock.get_state() != LOCK_MIX &&
2451 inode->dirfragtreelock.is_stable()) {
2452 // clear stale dirtydft
2453 state_clear(STATE_DIRTYDFT);
2454 } else {
2455 cache->mds->locker->mark_updated_scatterlock(&inode->dirfragtreelock);
2456 ls->dirty_dirfrag_dirfragtree.push_back(&inode->item_dirty_dirfrag_dirfragtree);
2457 }
2458 }
2459}
2460
2461
2462
2463
2464/********************************
2465 * AUTHORITY
2466 */
2467
2468/*
2469 * if dir_auth.first == parent, auth is same as inode.
2470 * unless .second != unknown, in which case that sticks.
2471 */
2472mds_authority_t CDir::authority() const
2473{
2474 if (is_subtree_root())
2475 return dir_auth;
2476 else
2477 return inode->authority();
2478}
2479
2480/** is_subtree_root()
2481 * true if this is an auth delegation point.
2482 * that is, dir_auth != default (parent,unknown)
2483 *
2484 * some key observations:
2485 * if i am auth:
2486 * - any region bound will be an export, or frozen.
2487 *
2488 * note that this DOES heed dir_auth.pending
2489 */
2490/*
2491bool CDir::is_subtree_root()
2492{
2493 if (dir_auth == CDIR_AUTH_DEFAULT) {
2494 //dout(10) << "is_subtree_root false " << dir_auth << " != " << CDIR_AUTH_DEFAULT
2495 //<< " on " << ino() << dendl;
2496 return false;
2497 } else {
2498 //dout(10) << "is_subtree_root true " << dir_auth << " != " << CDIR_AUTH_DEFAULT
2499 //<< " on " << ino() << dendl;
2500 return true;
2501 }
2502}
2503*/
2504
2505/** contains(x)
2506 * true if we are x, or an ancestor of x
2507 */
2508bool CDir::contains(CDir *x)
2509{
2510 while (1) {
2511 if (x == this)
2512 return true;
2513 x = x->get_inode()->get_projected_parent_dir();
2514 if (x == 0)
2515 return false;
2516 }
2517}
2518
2519
2520
2521/** set_dir_auth
2522 */
2523void CDir::set_dir_auth(mds_authority_t a)
2524{
2525 dout(10) << "setting dir_auth=" << a
2526 << " from " << dir_auth
2527 << " on " << *this << dendl;
2528
2529 bool was_subtree = is_subtree_root();
2530 bool was_ambiguous = dir_auth.second >= 0;
2531
2532 // set it.
2533 dir_auth = a;
2534
2535 // new subtree root?
2536 if (!was_subtree && is_subtree_root()) {
2537 dout(10) << " new subtree root, adjusting auth_pins" << dendl;
2538
2539 // adjust nested auth pins
2540 if (get_cum_auth_pins())
2541 inode->adjust_nested_auth_pins(-1, NULL);
2542
2543 // unpin parent of frozen dir/tree?
2544 if (inode->is_auth() && (is_frozen_tree_root() || is_frozen_dir()))
2545 inode->auth_unpin(this);
2546 }
2547 if (was_subtree && !is_subtree_root()) {
2548 dout(10) << " old subtree root, adjusting auth_pins" << dendl;
2549
2550 // adjust nested auth pins
2551 if (get_cum_auth_pins())
2552 inode->adjust_nested_auth_pins(1, NULL);
2553
2554 // pin parent of frozen dir/tree?
2555 if (inode->is_auth() && (is_frozen_tree_root() || is_frozen_dir()))
2556 inode->auth_pin(this);
2557 }
2558
2559 // newly single auth?
2560 if (was_ambiguous && dir_auth.second == CDIR_AUTH_UNKNOWN) {
2561 list<MDSInternalContextBase*> ls;
2562 take_waiting(WAIT_SINGLEAUTH, ls);
2563 cache->mds->queue_waiters(ls);
2564 }
2565}
2566
2567
2568/*****************************************
2569 * AUTH PINS and FREEZING
2570 *
2571 * the basic plan is that auth_pins only exist in auth regions, and they
2572 * prevent a freeze (and subsequent auth change).
2573 *
2574 * however, we also need to prevent a parent from freezing if a child is frozen.
2575 * for that reason, the parent inode of a frozen directory is auth_pinned.
2576 *
2577 * the oddity is when the frozen directory is a subtree root. if that's the case,
2578 * the parent inode isn't frozen. which means that when subtree authority is adjusted
2579 * at the bounds, inodes for any frozen bound directories need to get auth_pins at that
2580 * time.
2581 *
2582 */
2583
2584void CDir::auth_pin(void *by)
2585{
2586 if (auth_pins == 0)
2587 get(PIN_AUTHPIN);
2588 auth_pins++;
2589
2590#ifdef MDS_AUTHPIN_SET
2591 auth_pin_set.insert(by);
2592#endif
2593
2594 dout(10) << "auth_pin by " << by
2595 << " on " << *this
2596 << " count now " << auth_pins << " + " << nested_auth_pins << dendl;
2597
2598 // nest pins?
2599 if (!is_subtree_root() &&
2600 get_cum_auth_pins() == 1)
2601 inode->adjust_nested_auth_pins(1, by);
2602}
2603
2604void CDir::auth_unpin(void *by)
2605{
2606 auth_pins--;
2607
2608#ifdef MDS_AUTHPIN_SET
2609 assert(auth_pin_set.count(by));
2610 auth_pin_set.erase(auth_pin_set.find(by));
2611#endif
2612 if (auth_pins == 0)
2613 put(PIN_AUTHPIN);
2614
2615 dout(10) << "auth_unpin by " << by
2616 << " on " << *this
2617 << " count now " << auth_pins << " + " << nested_auth_pins << dendl;
2618 assert(auth_pins >= 0);
2619
2620 int newcum = get_cum_auth_pins();
2621
2622 maybe_finish_freeze(); // pending freeze?
2623
2624 // nest?
2625 if (!is_subtree_root() &&
2626 newcum == 0)
2627 inode->adjust_nested_auth_pins(-1, by);
2628}
2629
2630void CDir::adjust_nested_auth_pins(int inc, int dirinc, void *by)
2631{
2632 assert(inc);
2633 nested_auth_pins += inc;
2634 dir_auth_pins += dirinc;
2635
2636 dout(15) << "adjust_nested_auth_pins " << inc << "/" << dirinc << " on " << *this
2637 << " by " << by << " count now "
2638 << auth_pins << " + " << nested_auth_pins << dendl;
2639 assert(nested_auth_pins >= 0);
2640 assert(dir_auth_pins >= 0);
2641
2642 int newcum = get_cum_auth_pins();
2643
2644 maybe_finish_freeze(); // pending freeze?
2645
2646 // nest?
2647 if (!is_subtree_root()) {
2648 if (newcum == 0)
2649 inode->adjust_nested_auth_pins(-1, by);
2650 else if (newcum == inc)
2651 inode->adjust_nested_auth_pins(1, by);
2652 }
2653}
2654
2655#ifdef MDS_VERIFY_FRAGSTAT
2656void CDir::verify_fragstat()
2657{
2658 assert(is_complete());
2659 if (inode->is_stray())
2660 return;
2661
2662 frag_info_t c;
2663 memset(&c, 0, sizeof(c));
2664
2665 for (map_t::iterator it = items.begin();
2666 it != items.end();
2667 ++it) {
2668 CDentry *dn = it->second;
2669 if (dn->is_null())
2670 continue;
2671
2672 dout(10) << " " << *dn << dendl;
2673 if (dn->is_primary())
2674 dout(10) << " " << *dn->inode << dendl;
2675
2676 if (dn->is_primary()) {
2677 if (dn->inode->is_dir())
2678 c.nsubdirs++;
2679 else
2680 c.nfiles++;
2681 }
2682 if (dn->is_remote()) {
2683 if (dn->get_remote_d_type() == DT_DIR)
2684 c.nsubdirs++;
2685 else
2686 c.nfiles++;
2687 }
2688 }
2689
2690 if (c.nsubdirs != fnode.fragstat.nsubdirs ||
2691 c.nfiles != fnode.fragstat.nfiles) {
2692 dout(0) << "verify_fragstat failed " << fnode.fragstat << " on " << *this << dendl;
2693 dout(0) << " i count " << c << dendl;
2694 ceph_abort();
2695 } else {
2696 dout(0) << "verify_fragstat ok " << fnode.fragstat << " on " << *this << dendl;
2697 }
2698}
2699#endif
2700
2701/*****************************************************************************
2702 * FREEZING
2703 */
2704
2705// FREEZE TREE
2706
2707bool CDir::freeze_tree()
2708{
2709 assert(!is_frozen());
2710 assert(!is_freezing());
2711
2712 auth_pin(this);
2713 if (is_freezeable(true)) {
2714 _freeze_tree();
2715 auth_unpin(this);
2716 return true;
2717 } else {
2718 state_set(STATE_FREEZINGTREE);
2719 ++num_freezing_trees;
2720 dout(10) << "freeze_tree waiting " << *this << dendl;
2721 return false;
2722 }
2723}
2724
2725void CDir::_freeze_tree()
2726{
2727 dout(10) << "_freeze_tree " << *this << dendl;
2728 assert(is_freezeable(true));
2729
2730 // twiddle state
2731 if (state_test(STATE_FREEZINGTREE)) {
2732 state_clear(STATE_FREEZINGTREE); // actually, this may get set again by next context?
2733 --num_freezing_trees;
2734 }
2735 state_set(STATE_FROZENTREE);
2736 ++num_frozen_trees;
2737 get(PIN_FROZEN);
2738
2739 // auth_pin inode for duration of freeze, if we are not a subtree root.
2740 if (is_auth() && !is_subtree_root())
2741 inode->auth_pin(this);
2742}
2743
2744void CDir::unfreeze_tree()
2745{
2746 dout(10) << "unfreeze_tree " << *this << dendl;
2747
2748 if (state_test(STATE_FROZENTREE)) {
2749 // frozen. unfreeze.
2750 state_clear(STATE_FROZENTREE);
2751 --num_frozen_trees;
2752
2753 put(PIN_FROZEN);
2754
2755 // unpin (may => FREEZEABLE) FIXME: is this order good?
2756 if (is_auth() && !is_subtree_root())
2757 inode->auth_unpin(this);
2758
2759 // waiters?
2760 finish_waiting(WAIT_UNFREEZE);
2761 } else {
2762 finish_waiting(WAIT_FROZEN, -1);
2763
2764 // freezing. stop it.
2765 assert(state_test(STATE_FREEZINGTREE));
2766 state_clear(STATE_FREEZINGTREE);
2767 --num_freezing_trees;
2768 auth_unpin(this);
2769
2770 finish_waiting(WAIT_UNFREEZE);
2771 }
2772}
2773
2774bool CDir::is_freezing_tree() const
2775{
2776 if (num_freezing_trees == 0)
2777 return false;
2778 const CDir *dir = this;
2779 while (1) {
2780 if (dir->is_freezing_tree_root()) return true;
2781 if (dir->is_subtree_root()) return false;
2782 if (dir->inode->parent)
2783 dir = dir->inode->parent->dir;
2784 else
2785 return false; // root on replica
2786 }
2787}
2788
2789bool CDir::is_frozen_tree() const
2790{
2791 if (num_frozen_trees == 0)
2792 return false;
2793 const CDir *dir = this;
2794 while (1) {
2795 if (dir->is_frozen_tree_root()) return true;
2796 if (dir->is_subtree_root()) return false;
2797 if (dir->inode->parent)
2798 dir = dir->inode->parent->dir;
2799 else
2800 return false; // root on replica
2801 }
2802}
2803
2804CDir *CDir::get_frozen_tree_root()
2805{
2806 assert(is_frozen());
2807 CDir *dir = this;
2808 while (1) {
2809 if (dir->is_frozen_tree_root())
2810 return dir;
2811 if (dir->inode->parent)
2812 dir = dir->inode->parent->dir;
2813 else
2814 ceph_abort();
2815 }
2816}
2817
2818class C_Dir_AuthUnpin : public CDirContext {
2819 public:
2820 explicit C_Dir_AuthUnpin(CDir *d) : CDirContext(d) {}
2821 void finish(int r) override {
2822 dir->auth_unpin(dir->get_inode());
2823 }
2824};
2825
2826void CDir::maybe_finish_freeze()
2827{
2828 if (auth_pins != 1 || dir_auth_pins != 0)
2829 return;
2830
2831 // we can freeze the _dir_ even with nested pins...
2832 if (state_test(STATE_FREEZINGDIR)) {
2833 _freeze_dir();
2834 auth_unpin(this);
2835 finish_waiting(WAIT_FROZEN);
2836 }
2837
2838 if (nested_auth_pins != 0)
2839 return;
2840
2841 if (state_test(STATE_FREEZINGTREE)) {
2842 if (!is_subtree_root() && inode->is_frozen()) {
2843 dout(10) << "maybe_finish_freeze !subtree root and frozen inode, waiting for unfreeze on " << inode << dendl;
2844 // retake an auth_pin...
2845 auth_pin(inode);
2846 // and release it when the parent inode unfreezes
2847 inode->add_waiter(WAIT_UNFREEZE, new C_Dir_AuthUnpin(this));
2848 return;
2849 }
2850
2851 _freeze_tree();
2852 auth_unpin(this);
2853 finish_waiting(WAIT_FROZEN);
2854 }
2855}
2856
2857
2858
2859// FREEZE DIR
2860
2861bool CDir::freeze_dir()
2862{
2863 assert(!is_frozen());
2864 assert(!is_freezing());
2865
2866 auth_pin(this);
2867 if (is_freezeable_dir(true)) {
2868 _freeze_dir();
2869 auth_unpin(this);
2870 return true;
2871 } else {
2872 state_set(STATE_FREEZINGDIR);
2873 dout(10) << "freeze_dir + wait " << *this << dendl;
2874 return false;
2875 }
2876}
2877
2878void CDir::_freeze_dir()
2879{
2880 dout(10) << "_freeze_dir " << *this << dendl;
2881 //assert(is_freezeable_dir(true));
2882 // not always true during split because the original fragment may have frozen a while
2883 // ago and we're just now getting around to breaking it up.
2884
2885 state_clear(STATE_FREEZINGDIR);
2886 state_set(STATE_FROZENDIR);
2887 get(PIN_FROZEN);
2888
2889 if (is_auth() && !is_subtree_root())
2890 inode->auth_pin(this); // auth_pin for duration of freeze
2891}
2892
2893
2894void CDir::unfreeze_dir()
2895{
2896 dout(10) << "unfreeze_dir " << *this << dendl;
2897
2898 if (state_test(STATE_FROZENDIR)) {
2899 state_clear(STATE_FROZENDIR);
2900 put(PIN_FROZEN);
2901
2902 // unpin (may => FREEZEABLE) FIXME: is this order good?
2903 if (is_auth() && !is_subtree_root())
2904 inode->auth_unpin(this);
2905
2906 finish_waiting(WAIT_UNFREEZE);
2907 } else {
2908 finish_waiting(WAIT_FROZEN, -1);
2909
2910 // still freezing. stop.
2911 assert(state_test(STATE_FREEZINGDIR));
2912 state_clear(STATE_FREEZINGDIR);
2913 auth_unpin(this);
2914
2915 finish_waiting(WAIT_UNFREEZE);
2916 }
2917}
2918
2919/**
2920 * Slightly less complete than operator<<, because this is intended
2921 * for identifying a directory and its state rather than for dumping
2922 * debug output.
2923 */
2924void CDir::dump(Formatter *f) const
2925{
2926 assert(f != NULL);
2927
2928 f->dump_stream("path") << get_path();
2929
2930 f->dump_stream("dirfrag") << dirfrag();
2931 f->dump_int("snapid_first", first);
2932
2933 f->dump_stream("projected_version") << get_projected_version();
2934 f->dump_stream("version") << get_version();
2935 f->dump_stream("committing_version") << get_committing_version();
2936 f->dump_stream("committed_version") << get_committed_version();
2937
2938 f->dump_bool("is_rep", is_rep());
2939
2940 if (get_dir_auth() != CDIR_AUTH_DEFAULT) {
2941 if (get_dir_auth().second == CDIR_AUTH_UNKNOWN) {
2942 f->dump_stream("dir_auth") << get_dir_auth().first;
2943 } else {
2944 f->dump_stream("dir_auth") << get_dir_auth();
2945 }
2946 } else {
2947 f->dump_string("dir_auth", "");
2948 }
2949
2950 f->open_array_section("states");
2951 MDSCacheObject::dump_states(f);
2952 if (state_test(CDir::STATE_COMPLETE)) f->dump_string("state", "complete");
2953 if (state_test(CDir::STATE_FREEZINGTREE)) f->dump_string("state", "freezingtree");
2954 if (state_test(CDir::STATE_FROZENTREE)) f->dump_string("state", "frozentree");
2955 if (state_test(CDir::STATE_FROZENDIR)) f->dump_string("state", "frozendir");
2956 if (state_test(CDir::STATE_FREEZINGDIR)) f->dump_string("state", "freezingdir");
2957 if (state_test(CDir::STATE_EXPORTBOUND)) f->dump_string("state", "exportbound");
2958 if (state_test(CDir::STATE_IMPORTBOUND)) f->dump_string("state", "importbound");
2959 if (state_test(CDir::STATE_BADFRAG)) f->dump_string("state", "badfrag");
2960 f->close_section();
2961
2962 MDSCacheObject::dump(f);
2963}
2964
2965/****** Scrub Stuff *******/
2966
2967void CDir::scrub_info_create() const
2968{
2969 assert(!scrub_infop);
2970
2971 // break out of const-land to set up implicit initial state
2972 CDir *me = const_cast<CDir*>(this);
2973 fnode_t *fn = me->get_projected_fnode();
2974
2975 std::unique_ptr<scrub_info_t> si(new scrub_info_t());
2976
2977 si->last_recursive.version = si->recursive_start.version =
2978 fn->recursive_scrub_version;
2979 si->last_recursive.time = si->recursive_start.time =
2980 fn->recursive_scrub_stamp;
2981
2982 si->last_local.version = fn->localized_scrub_version;
2983 si->last_local.time = fn->localized_scrub_stamp;
2984
2985 me->scrub_infop.swap(si);
2986}
2987
2988void CDir::scrub_initialize(const ScrubHeaderRefConst& header)
2989{
2990 dout(20) << __func__ << dendl;
2991 assert(is_complete());
2992 assert(header != nullptr);
2993
2994 // FIXME: weird implicit construction, is someone else meant
2995 // to be calling scrub_info_create first?
2996 scrub_info();
2997 assert(scrub_infop && !scrub_infop->directory_scrubbing);
2998
2999 scrub_infop->recursive_start.version = get_projected_version();
3000 scrub_infop->recursive_start.time = ceph_clock_now();
3001
3002 scrub_infop->directories_to_scrub.clear();
3003 scrub_infop->directories_scrubbing.clear();
3004 scrub_infop->directories_scrubbed.clear();
3005 scrub_infop->others_to_scrub.clear();
3006 scrub_infop->others_scrubbing.clear();
3007 scrub_infop->others_scrubbed.clear();
3008
3009 for (map_t::iterator i = items.begin();
3010 i != items.end();
3011 ++i) {
3012 // TODO: handle snapshot scrubbing
3013 if (i->first.snapid != CEPH_NOSNAP)
3014 continue;
3015
3016 CDentry::linkage_t *dnl = i->second->get_projected_linkage();
3017 if (dnl->is_primary()) {
3018 if (dnl->get_inode()->is_dir())
3019 scrub_infop->directories_to_scrub.insert(i->first);
3020 else
3021 scrub_infop->others_to_scrub.insert(i->first);
3022 } else if (dnl->is_remote()) {
3023 // TODO: check remote linkage
3024 }
3025 }
3026 scrub_infop->directory_scrubbing = true;
3027 scrub_infop->header = header;
3028}
3029
3030void CDir::scrub_finished()
3031{
3032 dout(20) << __func__ << dendl;
3033 assert(scrub_infop && scrub_infop->directory_scrubbing);
3034
3035 assert(scrub_infop->directories_to_scrub.empty());
3036 assert(scrub_infop->directories_scrubbing.empty());
3037 scrub_infop->directories_scrubbed.clear();
3038 assert(scrub_infop->others_to_scrub.empty());
3039 assert(scrub_infop->others_scrubbing.empty());
3040 scrub_infop->others_scrubbed.clear();
3041 scrub_infop->directory_scrubbing = false;
3042
3043 scrub_infop->last_recursive = scrub_infop->recursive_start;
3044 scrub_infop->last_scrub_dirty = true;
3045}
3046
3047int CDir::_next_dentry_on_set(set<dentry_key_t>& dns, bool missing_okay,
3048 MDSInternalContext *cb, CDentry **dnout)
3049{
3050 dentry_key_t dnkey;
3051 CDentry *dn;
3052
3053 while (!dns.empty()) {
3054 set<dentry_key_t>::iterator front = dns.begin();
3055 dnkey = *front;
3056 dn = lookup(dnkey.name);
3057 if (!dn) {
3058 if (!is_complete() &&
3059 (!has_bloom() || is_in_bloom(dnkey.name))) {
3060 // need to re-read this dirfrag
3061 fetch(cb);
3062 return EAGAIN;
3063 }
3064 // okay, we lost it
3065 if (missing_okay) {
3066 dout(15) << " we no longer have directory dentry "
3067 << dnkey.name << ", assuming it got renamed" << dendl;
3068 dns.erase(dnkey);
3069 continue;
3070 } else {
3071 dout(5) << " we lost dentry " << dnkey.name
3072 << ", bailing out because that's impossible!" << dendl;
3073 ceph_abort();
3074 }
3075 }
3076 // okay, we got a dentry
3077 dns.erase(dnkey);
3078
3079 if (dn->get_projected_version() < scrub_infop->last_recursive.version &&
3080 !(scrub_infop->header->get_force())) {
3081 dout(15) << " skip dentry " << dnkey.name
3082 << ", no change since last scrub" << dendl;
3083 continue;
3084 }
3085
3086 *dnout = dn;
3087 return 0;
3088 }
3089 *dnout = NULL;
3090 return ENOENT;
3091}
3092
3093int CDir::scrub_dentry_next(MDSInternalContext *cb, CDentry **dnout)
3094{
3095 dout(20) << __func__ << dendl;
3096 assert(scrub_infop && scrub_infop->directory_scrubbing);
3097
3098 dout(20) << "trying to scrub directories underneath us" << dendl;
3099 int rval = _next_dentry_on_set(scrub_infop->directories_to_scrub, true,
3100 cb, dnout);
3101 if (rval == 0) {
3102 dout(20) << __func__ << " inserted to directories scrubbing: "
3103 << *dnout << dendl;
3104 scrub_infop->directories_scrubbing.insert((*dnout)->key());
3105 } else if (rval == EAGAIN) {
3106 // we don't need to do anything else
3107 } else { // we emptied out the directory scrub set
3108 assert(rval == ENOENT);
3109 dout(20) << "no directories left, moving on to other kinds of dentries"
3110 << dendl;
3111
3112 rval = _next_dentry_on_set(scrub_infop->others_to_scrub, false, cb, dnout);
3113 if (rval == 0) {
3114 dout(20) << __func__ << " inserted to others scrubbing: "
3115 << *dnout << dendl;
3116 scrub_infop->others_scrubbing.insert((*dnout)->key());
3117 }
3118 }
3119 dout(20) << " returning " << rval << " with dn=" << *dnout << dendl;
3120 return rval;
3121}
3122
3123void CDir::scrub_dentries_scrubbing(list<CDentry*> *out_dentries)
3124{
3125 dout(20) << __func__ << dendl;
3126 assert(scrub_infop && scrub_infop->directory_scrubbing);
3127
3128 for (set<dentry_key_t>::iterator i =
3129 scrub_infop->directories_scrubbing.begin();
3130 i != scrub_infop->directories_scrubbing.end();
3131 ++i) {
3132 CDentry *d = lookup(i->name, i->snapid);
3133 assert(d);
3134 out_dentries->push_back(d);
3135 }
3136 for (set<dentry_key_t>::iterator i = scrub_infop->others_scrubbing.begin();
3137 i != scrub_infop->others_scrubbing.end();
3138 ++i) {
3139 CDentry *d = lookup(i->name, i->snapid);
3140 assert(d);
3141 out_dentries->push_back(d);
3142 }
3143}
3144
3145void CDir::scrub_dentry_finished(CDentry *dn)
3146{
3147 dout(20) << __func__ << " on dn " << *dn << dendl;
3148 assert(scrub_infop && scrub_infop->directory_scrubbing);
3149 dentry_key_t dn_key = dn->key();
3150 if (scrub_infop->directories_scrubbing.erase(dn_key)) {
3151 scrub_infop->directories_scrubbed.insert(dn_key);
3152 } else {
3153 assert(scrub_infop->others_scrubbing.count(dn_key));
3154 scrub_infop->others_scrubbing.erase(dn_key);
3155 scrub_infop->others_scrubbed.insert(dn_key);
3156 }
3157}
3158
3159void CDir::scrub_maybe_delete_info()
3160{
3161 if (scrub_infop &&
3162 !scrub_infop->directory_scrubbing &&
3163 !scrub_infop->need_scrub_local &&
3164 !scrub_infop->last_scrub_dirty &&
3165 !scrub_infop->pending_scrub_error &&
3166 scrub_infop->dirty_scrub_stamps.empty()) {
3167 scrub_infop.reset();
3168 }
3169}
3170
3171bool CDir::scrub_local()
3172{
3173 assert(is_complete());
3174 bool rval = check_rstats(true);
3175
3176 scrub_info();
3177 if (rval) {
3178 scrub_infop->last_local.time = ceph_clock_now();
3179 scrub_infop->last_local.version = get_projected_version();
3180 scrub_infop->pending_scrub_error = false;
3181 scrub_infop->last_scrub_dirty = true;
3182 } else {
3183 scrub_infop->pending_scrub_error = true;
3184 if (scrub_infop->header->get_repair())
3185 cache->repair_dirfrag_stats(this);
3186 }
3187 return rval;
3188}
3189
3190std::string CDir::get_path() const
3191{
3192 std::string path;
3193 get_inode()->make_path_string(path, true);
3194 return path;
3195}
3196
3197bool CDir::should_split_fast() const
3198{
3199 // Max size a fragment can be before trigger fast splitting
3200 int fast_limit = g_conf->mds_bal_split_size * g_conf->mds_bal_fragment_fast_factor;
3201
3202 // Fast path: the sum of accounted size and null dentries does not
3203 // exceed threshold: we definitely are not over it.
3204 if (get_frag_size() + get_num_head_null() <= fast_limit) {
3205 return false;
3206 }
3207
3208 // Fast path: the accounted size of the frag exceeds threshold: we
3209 // definitely are over it
3210 if (get_frag_size() > fast_limit) {
3211 return true;
3212 }
3213
3214 int64_t effective_size = 0;
3215
3216 for (const auto &p : items) {
3217 const CDentry *dn = p.second;
3218 if (!dn->get_projected_linkage()->is_null()) {
3219 effective_size++;
3220 }
3221 }
3222
3223 return effective_size > fast_limit;
3224}
3225