]> git.proxmox.com Git - ceph.git/blob - ceph/src/mds/CDir.cc
update sources to v12.2.3
[ceph.git] / ceph / src / mds / CDir.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
16 #include "include/types.h"
17
18 #include "CDir.h"
19 #include "CDentry.h"
20 #include "CInode.h"
21 #include "Mutation.h"
22
23 #include "MDSMap.h"
24 #include "MDSRank.h"
25 #include "MDCache.h"
26 #include "Locker.h"
27 #include "MDLog.h"
28 #include "LogSegment.h"
29
30 #include "common/bloom_filter.hpp"
31 #include "include/Context.h"
32 #include "common/Clock.h"
33
34 #include "osdc/Objecter.h"
35
36 #include "common/config.h"
37 #include "include/assert.h"
38 #include "include/compat.h"
39
40 #define dout_context g_ceph_context
41 #define dout_subsys ceph_subsys_mds
42 #undef dout_prefix
43 #define dout_prefix *_dout << "mds." << cache->mds->get_nodeid() << ".cache.dir(" << this->dirfrag() << ") "
44
45 int CDir::num_frozen_trees = 0;
46 int CDir::num_freezing_trees = 0;
47
48 class CDirContext : public MDSInternalContextBase
49 {
50 protected:
51 CDir *dir;
52 MDSRank* get_mds() override {return dir->cache->mds;}
53
54 public:
55 explicit CDirContext(CDir *d) : dir(d) {
56 assert(dir != NULL);
57 }
58 };
59
60
61 class CDirIOContext : public MDSIOContextBase
62 {
63 protected:
64 CDir *dir;
65 MDSRank* get_mds() override {return dir->cache->mds;}
66
67 public:
68 explicit CDirIOContext(CDir *d) : dir(d) {
69 assert(dir != NULL);
70 }
71 };
72
73
74 // PINS
75 //int cdir_pins[CDIR_NUM_PINS] = { 0,0,0,0,0,0,0,0,0,0,0,0,0,0 };
76
77
78 ostream& operator<<(ostream& out, const CDir& dir)
79 {
80 out << "[dir " << dir.dirfrag() << " " << dir.get_path() << "/"
81 << " [" << dir.first << ",head]";
82 if (dir.is_auth()) {
83 out << " auth";
84 if (dir.is_replicated())
85 out << dir.get_replicas();
86
87 if (dir.is_projected())
88 out << " pv=" << dir.get_projected_version();
89 out << " v=" << dir.get_version();
90 out << " cv=" << dir.get_committing_version();
91 out << "/" << dir.get_committed_version();
92 } else {
93 mds_authority_t a = dir.authority();
94 out << " rep@" << a.first;
95 if (a.second != CDIR_AUTH_UNKNOWN)
96 out << "," << a.second;
97 out << "." << dir.get_replica_nonce();
98 }
99
100 if (dir.is_rep()) out << " REP";
101
102 if (dir.get_dir_auth() != CDIR_AUTH_DEFAULT) {
103 if (dir.get_dir_auth().second == CDIR_AUTH_UNKNOWN)
104 out << " dir_auth=" << dir.get_dir_auth().first;
105 else
106 out << " dir_auth=" << dir.get_dir_auth();
107 }
108
109 if (dir.get_cum_auth_pins())
110 out << " ap=" << dir.get_auth_pins()
111 << "+" << dir.get_dir_auth_pins()
112 << "+" << dir.get_nested_auth_pins();
113
114 out << " state=" << dir.get_state();
115 if (dir.state_test(CDir::STATE_COMPLETE)) out << "|complete";
116 if (dir.state_test(CDir::STATE_FREEZINGTREE)) out << "|freezingtree";
117 if (dir.state_test(CDir::STATE_FROZENTREE)) out << "|frozentree";
118 if (dir.state_test(CDir::STATE_AUXSUBTREE)) out << "|auxsubtree";
119 //if (dir.state_test(CDir::STATE_FROZENTREELEAF)) out << "|frozentreeleaf";
120 if (dir.state_test(CDir::STATE_FROZENDIR)) out << "|frozendir";
121 if (dir.state_test(CDir::STATE_FREEZINGDIR)) out << "|freezingdir";
122 if (dir.state_test(CDir::STATE_EXPORTBOUND)) out << "|exportbound";
123 if (dir.state_test(CDir::STATE_IMPORTBOUND)) out << "|importbound";
124 if (dir.state_test(CDir::STATE_BADFRAG)) out << "|badfrag";
125 if (dir.state_test(CDir::STATE_FRAGMENTING)) out << "|fragmenting";
126
127 // fragstat
128 out << " " << dir.fnode.fragstat;
129 if (!(dir.fnode.fragstat == dir.fnode.accounted_fragstat))
130 out << "/" << dir.fnode.accounted_fragstat;
131 if (g_conf->mds_debug_scatterstat && dir.is_projected()) {
132 const fnode_t *pf = dir.get_projected_fnode();
133 out << "->" << pf->fragstat;
134 if (!(pf->fragstat == pf->accounted_fragstat))
135 out << "/" << pf->accounted_fragstat;
136 }
137
138 // rstat
139 out << " " << dir.fnode.rstat;
140 if (!(dir.fnode.rstat == dir.fnode.accounted_rstat))
141 out << "/" << dir.fnode.accounted_rstat;
142 if (g_conf->mds_debug_scatterstat && dir.is_projected()) {
143 const fnode_t *pf = dir.get_projected_fnode();
144 out << "->" << pf->rstat;
145 if (!(pf->rstat == pf->accounted_rstat))
146 out << "/" << pf->accounted_rstat;
147 }
148
149 out << " hs=" << dir.get_num_head_items() << "+" << dir.get_num_head_null();
150 out << ",ss=" << dir.get_num_snap_items() << "+" << dir.get_num_snap_null();
151 if (dir.get_num_dirty())
152 out << " dirty=" << dir.get_num_dirty();
153
154 if (dir.get_num_ref()) {
155 out << " |";
156 dir.print_pin_set(out);
157 }
158
159 out << " " << &dir;
160 return out << "]";
161 }
162
163
164 void CDir::print(ostream& out)
165 {
166 out << *this;
167 }
168
169
170
171
172 ostream& CDir::print_db_line_prefix(ostream& out)
173 {
174 return out << ceph_clock_now() << " mds." << cache->mds->get_nodeid() << ".cache.dir(" << this->dirfrag() << ") ";
175 }
176
177
178
179 // -------------------------------------------------------------------
180 // CDir
181
182 CDir::CDir(CInode *in, frag_t fg, MDCache *mdcache, bool auth) :
183 cache(mdcache), inode(in), frag(fg),
184 first(2),
185 dirty_rstat_inodes(member_offset(CInode, dirty_rstat_item)),
186 projected_version(0),
187 dirty_dentries(member_offset(CDentry, item_dir_dirty)),
188 item_dirty(this), item_new(this),
189 num_head_items(0), num_head_null(0),
190 num_snap_items(0), num_snap_null(0),
191 num_dirty(0), committing_version(0), committed_version(0),
192 dir_auth_pins(0), request_pins(0),
193 dir_rep(REP_NONE),
194 pop_me(ceph_clock_now()),
195 pop_nested(ceph_clock_now()),
196 pop_auth_subtree(ceph_clock_now()),
197 pop_auth_subtree_nested(ceph_clock_now()),
198 num_dentries_nested(0), num_dentries_auth_subtree(0),
199 num_dentries_auth_subtree_nested(0),
200 dir_auth(CDIR_AUTH_DEFAULT)
201 {
202 state = STATE_INITIAL;
203
204 memset(&fnode, 0, sizeof(fnode));
205
206 // auth
207 assert(in->is_dir());
208 if (auth)
209 state |= STATE_AUTH;
210 }
211
212 /**
213 * Check the recursive statistics on size for consistency.
214 * If mds_debug_scatterstat is enabled, assert for correctness,
215 * otherwise just print out the mismatch and continue.
216 */
217 bool CDir::check_rstats(bool scrub)
218 {
219 if (!g_conf->mds_debug_scatterstat && !scrub)
220 return true;
221
222 dout(25) << "check_rstats on " << this << dendl;
223 if (!is_complete() || !is_auth() || is_frozen()) {
224 assert(!scrub);
225 dout(10) << "check_rstats bailing out -- incomplete or non-auth or frozen dir!" << dendl;
226 return true;
227 }
228
229 frag_info_t frag_info;
230 nest_info_t nest_info;
231 for (map_t::iterator i = items.begin(); i != items.end(); ++i) {
232 if (i->second->last != CEPH_NOSNAP)
233 continue;
234 CDentry::linkage_t *dnl = i->second->get_linkage();
235 if (dnl->is_primary()) {
236 CInode *in = dnl->get_inode();
237 nest_info.add(in->inode.accounted_rstat);
238 if (in->is_dir())
239 frag_info.nsubdirs++;
240 else
241 frag_info.nfiles++;
242 } else if (dnl->is_remote())
243 frag_info.nfiles++;
244 }
245
246 bool good = true;
247 // fragstat
248 if(!frag_info.same_sums(fnode.fragstat)) {
249 dout(1) << "mismatch between head items and fnode.fragstat! printing dentries" << dendl;
250 dout(1) << "get_num_head_items() = " << get_num_head_items()
251 << "; fnode.fragstat.nfiles=" << fnode.fragstat.nfiles
252 << " fnode.fragstat.nsubdirs=" << fnode.fragstat.nsubdirs << dendl;
253 good = false;
254 } else {
255 dout(20) << "get_num_head_items() = " << get_num_head_items()
256 << "; fnode.fragstat.nfiles=" << fnode.fragstat.nfiles
257 << " fnode.fragstat.nsubdirs=" << fnode.fragstat.nsubdirs << dendl;
258 }
259
260 // rstat
261 if (!nest_info.same_sums(fnode.rstat)) {
262 dout(1) << "mismatch between child accounted_rstats and my rstats!" << dendl;
263 dout(1) << "total of child dentrys: " << nest_info << dendl;
264 dout(1) << "my rstats: " << fnode.rstat << dendl;
265 good = false;
266 } else {
267 dout(20) << "total of child dentrys: " << nest_info << dendl;
268 dout(20) << "my rstats: " << fnode.rstat << dendl;
269 }
270
271 if (!good) {
272 if (!scrub) {
273 for (map_t::iterator i = items.begin(); i != items.end(); ++i) {
274 CDentry *dn = i->second;
275 if (dn->get_linkage()->is_primary()) {
276 CInode *in = dn->get_linkage()->inode;
277 dout(1) << *dn << " rstat " << in->inode.accounted_rstat << dendl;
278 } else {
279 dout(1) << *dn << dendl;
280 }
281 }
282
283 assert(frag_info.nfiles == fnode.fragstat.nfiles);
284 assert(frag_info.nsubdirs == fnode.fragstat.nsubdirs);
285 assert(nest_info.rbytes == fnode.rstat.rbytes);
286 assert(nest_info.rfiles == fnode.rstat.rfiles);
287 assert(nest_info.rsubdirs == fnode.rstat.rsubdirs);
288 }
289 }
290 dout(10) << "check_rstats complete on " << this << dendl;
291 return good;
292 }
293
294 CDentry *CDir::lookup(const string& name, snapid_t snap)
295 {
296 dout(20) << "lookup (" << snap << ", '" << name << "')" << dendl;
297 map_t::iterator iter = items.lower_bound(dentry_key_t(snap, name.c_str(),
298 inode->hash_dentry_name(name)));
299 if (iter == items.end())
300 return 0;
301 if (iter->second->name == name &&
302 iter->second->first <= snap &&
303 iter->second->last >= snap) {
304 dout(20) << " hit -> " << iter->first << dendl;
305 return iter->second;
306 }
307 dout(20) << " miss -> " << iter->first << dendl;
308 return 0;
309 }
310
311 CDentry *CDir::lookup_exact_snap(const string& name, snapid_t last) {
312 map_t::iterator p = items.find(dentry_key_t(last, name.c_str(),
313 inode->hash_dentry_name(name)));
314 if (p == items.end())
315 return NULL;
316 return p->second;
317 }
318
319 /***
320 * linking fun
321 */
322
323 CDentry* CDir::add_null_dentry(const string& dname,
324 snapid_t first, snapid_t last)
325 {
326 // foreign
327 assert(lookup_exact_snap(dname, last) == 0);
328
329 // create dentry
330 CDentry* dn = new CDentry(dname, inode->hash_dentry_name(dname), first, last);
331 if (is_auth())
332 dn->state_set(CDentry::STATE_AUTH);
333
334 cache->bottom_lru.lru_insert_mid(dn);
335 dn->state_set(CDentry::STATE_BOTTOMLRU);
336
337 dn->dir = this;
338 dn->version = get_projected_version();
339
340 // add to dir
341 assert(items.count(dn->key()) == 0);
342 //assert(null_items.count(dn->name) == 0);
343
344 items[dn->key()] = dn;
345 if (last == CEPH_NOSNAP)
346 num_head_null++;
347 else
348 num_snap_null++;
349
350 if (state_test(CDir::STATE_DNPINNEDFRAG)) {
351 dn->get(CDentry::PIN_FRAGMENTING);
352 dn->state_set(CDentry::STATE_FRAGMENTING);
353 }
354
355 dout(12) << "add_null_dentry " << *dn << dendl;
356
357 // pin?
358 if (get_num_any() == 1)
359 get(PIN_CHILD);
360
361 assert(get_num_any() == items.size());
362 return dn;
363 }
364
365
366 CDentry* CDir::add_primary_dentry(const string& dname, CInode *in,
367 snapid_t first, snapid_t last)
368 {
369 // primary
370 assert(lookup_exact_snap(dname, last) == 0);
371
372 // create dentry
373 CDentry* dn = new CDentry(dname, inode->hash_dentry_name(dname), first, last);
374 if (is_auth())
375 dn->state_set(CDentry::STATE_AUTH);
376 if (is_auth() || !inode->is_stray()) {
377 cache->lru.lru_insert_mid(dn);
378 } else {
379 cache->bottom_lru.lru_insert_mid(dn);
380 dn->state_set(CDentry::STATE_BOTTOMLRU);
381 }
382
383 dn->dir = this;
384 dn->version = get_projected_version();
385
386 // add to dir
387 assert(items.count(dn->key()) == 0);
388 //assert(null_items.count(dn->name) == 0);
389
390 items[dn->key()] = dn;
391
392 dn->get_linkage()->inode = in;
393 in->set_primary_parent(dn);
394
395 link_inode_work(dn, in);
396
397 if (dn->last == CEPH_NOSNAP)
398 num_head_items++;
399 else
400 num_snap_items++;
401
402 if (state_test(CDir::STATE_DNPINNEDFRAG)) {
403 dn->get(CDentry::PIN_FRAGMENTING);
404 dn->state_set(CDentry::STATE_FRAGMENTING);
405 }
406
407 dout(12) << "add_primary_dentry " << *dn << dendl;
408
409 // pin?
410 if (get_num_any() == 1)
411 get(PIN_CHILD);
412 assert(get_num_any() == items.size());
413 return dn;
414 }
415
416 CDentry* CDir::add_remote_dentry(const string& dname, inodeno_t ino, unsigned char d_type,
417 snapid_t first, snapid_t last)
418 {
419 // foreign
420 assert(lookup_exact_snap(dname, last) == 0);
421
422 // create dentry
423 CDentry* dn = new CDentry(dname, inode->hash_dentry_name(dname), ino, d_type, first, last);
424 if (is_auth())
425 dn->state_set(CDentry::STATE_AUTH);
426 cache->lru.lru_insert_mid(dn);
427
428 dn->dir = this;
429 dn->version = get_projected_version();
430
431 // add to dir
432 assert(items.count(dn->key()) == 0);
433 //assert(null_items.count(dn->name) == 0);
434
435 items[dn->key()] = dn;
436 if (last == CEPH_NOSNAP)
437 num_head_items++;
438 else
439 num_snap_items++;
440
441 if (state_test(CDir::STATE_DNPINNEDFRAG)) {
442 dn->get(CDentry::PIN_FRAGMENTING);
443 dn->state_set(CDentry::STATE_FRAGMENTING);
444 }
445
446 dout(12) << "add_remote_dentry " << *dn << dendl;
447
448 // pin?
449 if (get_num_any() == 1)
450 get(PIN_CHILD);
451
452 assert(get_num_any() == items.size());
453 return dn;
454 }
455
456
457
458 void CDir::remove_dentry(CDentry *dn)
459 {
460 dout(12) << "remove_dentry " << *dn << dendl;
461
462 // there should be no client leases at this point!
463 assert(dn->client_lease_map.empty());
464
465 if (state_test(CDir::STATE_DNPINNEDFRAG)) {
466 dn->put(CDentry::PIN_FRAGMENTING);
467 dn->state_clear(CDentry::STATE_FRAGMENTING);
468 }
469
470 if (dn->get_linkage()->is_null()) {
471 if (dn->last == CEPH_NOSNAP)
472 num_head_null--;
473 else
474 num_snap_null--;
475 } else {
476 if (dn->last == CEPH_NOSNAP)
477 num_head_items--;
478 else
479 num_snap_items--;
480 }
481
482 if (!dn->get_linkage()->is_null())
483 // detach inode and dentry
484 unlink_inode_work(dn);
485
486 // remove from list
487 assert(items.count(dn->key()) == 1);
488 items.erase(dn->key());
489
490 // clean?
491 if (dn->is_dirty())
492 dn->mark_clean();
493
494 if (dn->state_test(CDentry::STATE_BOTTOMLRU))
495 cache->bottom_lru.lru_remove(dn);
496 else
497 cache->lru.lru_remove(dn);
498 delete dn;
499
500 // unpin?
501 if (get_num_any() == 0)
502 put(PIN_CHILD);
503 assert(get_num_any() == items.size());
504 }
505
506 void CDir::link_remote_inode(CDentry *dn, CInode *in)
507 {
508 link_remote_inode(dn, in->ino(), IFTODT(in->get_projected_inode()->mode));
509 }
510
511 void CDir::link_remote_inode(CDentry *dn, inodeno_t ino, unsigned char d_type)
512 {
513 dout(12) << "link_remote_inode " << *dn << " remote " << ino << dendl;
514 assert(dn->get_linkage()->is_null());
515
516 dn->get_linkage()->set_remote(ino, d_type);
517
518 if (dn->state_test(CDentry::STATE_BOTTOMLRU)) {
519 cache->bottom_lru.lru_remove(dn);
520 cache->lru.lru_insert_mid(dn);
521 dn->state_clear(CDentry::STATE_BOTTOMLRU);
522 }
523
524 if (dn->last == CEPH_NOSNAP) {
525 num_head_items++;
526 num_head_null--;
527 } else {
528 num_snap_items++;
529 num_snap_null--;
530 }
531 assert(get_num_any() == items.size());
532 }
533
534 void CDir::link_primary_inode(CDentry *dn, CInode *in)
535 {
536 dout(12) << "link_primary_inode " << *dn << " " << *in << dendl;
537 assert(dn->get_linkage()->is_null());
538
539 dn->get_linkage()->inode = in;
540 in->set_primary_parent(dn);
541
542 link_inode_work(dn, in);
543
544 if (dn->state_test(CDentry::STATE_BOTTOMLRU) &&
545 (is_auth() || !inode->is_stray())) {
546 cache->bottom_lru.lru_remove(dn);
547 cache->lru.lru_insert_mid(dn);
548 dn->state_clear(CDentry::STATE_BOTTOMLRU);
549 }
550
551 if (dn->last == CEPH_NOSNAP) {
552 num_head_items++;
553 num_head_null--;
554 } else {
555 num_snap_items++;
556 num_snap_null--;
557 }
558
559 assert(get_num_any() == items.size());
560 }
561
562 void CDir::link_inode_work( CDentry *dn, CInode *in)
563 {
564 assert(dn->get_linkage()->get_inode() == in);
565 assert(in->get_parent_dn() == dn);
566
567 // set inode version
568 //in->inode.version = dn->get_version();
569
570 // pin dentry?
571 if (in->get_num_ref())
572 dn->get(CDentry::PIN_INODEPIN);
573
574 // adjust auth pin count
575 if (in->auth_pins + in->nested_auth_pins)
576 dn->adjust_nested_auth_pins(in->auth_pins + in->nested_auth_pins, in->auth_pins, NULL);
577
578 // verify open snaprealm parent
579 if (in->snaprealm)
580 in->snaprealm->adjust_parent();
581 else if (in->is_any_caps())
582 in->move_to_realm(inode->find_snaprealm());
583 }
584
585 void CDir::unlink_inode(CDentry *dn, bool adjust_lru)
586 {
587 if (dn->get_linkage()->is_primary()) {
588 dout(12) << "unlink_inode " << *dn << " " << *dn->get_linkage()->get_inode() << dendl;
589 } else {
590 dout(12) << "unlink_inode " << *dn << dendl;
591 }
592
593 unlink_inode_work(dn);
594
595 if (adjust_lru && !dn->state_test(CDentry::STATE_BOTTOMLRU)) {
596 cache->lru.lru_remove(dn);
597 cache->bottom_lru.lru_insert_mid(dn);
598 dn->state_set(CDentry::STATE_BOTTOMLRU);
599 }
600
601 if (dn->last == CEPH_NOSNAP) {
602 num_head_items--;
603 num_head_null++;
604 } else {
605 num_snap_items--;
606 num_snap_null++;
607 }
608 assert(get_num_any() == items.size());
609 }
610
611
612 void CDir::try_remove_unlinked_dn(CDentry *dn)
613 {
614 assert(dn->dir == this);
615 assert(dn->get_linkage()->is_null());
616
617 // no pins (besides dirty)?
618 if (dn->get_num_ref() != dn->is_dirty())
619 return;
620
621 // was the dn new?
622 if (dn->is_new()) {
623 dout(10) << "try_remove_unlinked_dn " << *dn << " in " << *this << dendl;
624 if (dn->is_dirty())
625 dn->mark_clean();
626 remove_dentry(dn);
627
628 // NOTE: we may not have any more dirty dentries, but the fnode
629 // still changed, so the directory must remain dirty.
630 }
631 }
632
633
634 void CDir::unlink_inode_work( CDentry *dn )
635 {
636 CInode *in = dn->get_linkage()->get_inode();
637
638 if (dn->get_linkage()->is_remote()) {
639 // remote
640 if (in)
641 dn->unlink_remote(dn->get_linkage());
642
643 dn->get_linkage()->set_remote(0, 0);
644 } else if (dn->get_linkage()->is_primary()) {
645 // primary
646 // unpin dentry?
647 if (in->get_num_ref())
648 dn->put(CDentry::PIN_INODEPIN);
649
650 // unlink auth_pin count
651 if (in->auth_pins + in->nested_auth_pins)
652 dn->adjust_nested_auth_pins(0 - (in->auth_pins + in->nested_auth_pins), 0 - in->auth_pins, NULL);
653
654 // detach inode
655 in->remove_primary_parent(dn);
656 dn->get_linkage()->inode = 0;
657 } else {
658 assert(!dn->get_linkage()->is_null());
659 }
660 }
661
662 void CDir::add_to_bloom(CDentry *dn)
663 {
664 assert(dn->last == CEPH_NOSNAP);
665 if (!bloom) {
666 /* not create bloom filter for incomplete dir that was added by log replay */
667 if (!is_complete())
668 return;
669
670 /* don't maintain bloom filters in standby replay (saves cycles, and also
671 * avoids need to implement clearing it in EExport for #16924) */
672 if (cache->mds->is_standby_replay()) {
673 return;
674 }
675
676 unsigned size = get_num_head_items() + get_num_snap_items();
677 if (size < 100) size = 100;
678 bloom.reset(new bloom_filter(size, 1.0 / size, 0));
679 }
680 /* This size and false positive probability is completely random.*/
681 bloom->insert(dn->name.c_str(), dn->name.size());
682 }
683
684 bool CDir::is_in_bloom(const string& name)
685 {
686 if (!bloom)
687 return false;
688 return bloom->contains(name.c_str(), name.size());
689 }
690
691 void CDir::remove_null_dentries() {
692 dout(12) << "remove_null_dentries " << *this << dendl;
693
694 CDir::map_t::iterator p = items.begin();
695 while (p != items.end()) {
696 CDentry *dn = p->second;
697 ++p;
698 if (dn->get_linkage()->is_null() && !dn->is_projected())
699 remove_dentry(dn);
700 }
701
702 assert(num_snap_null == 0);
703 assert(num_head_null == 0);
704 assert(get_num_any() == items.size());
705 }
706
707 /** remove dirty null dentries for deleted directory. the dirfrag will be
708 * deleted soon, so it's safe to not commit dirty dentries.
709 *
710 * This is called when a directory is being deleted, a prerequisite
711 * of which is that its children have been unlinked: we expect to only see
712 * null, unprojected dentries here.
713 */
714 void CDir::try_remove_dentries_for_stray()
715 {
716 dout(10) << __func__ << dendl;
717 assert(get_parent_dir()->inode->is_stray());
718
719 // clear dirty only when the directory was not snapshotted
720 bool clear_dirty = !inode->snaprealm;
721
722 CDir::map_t::iterator p = items.begin();
723 while (p != items.end()) {
724 CDentry *dn = p->second;
725 ++p;
726 if (dn->last == CEPH_NOSNAP) {
727 assert(!dn->is_projected());
728 assert(dn->get_linkage()->is_null());
729 if (clear_dirty && dn->is_dirty())
730 dn->mark_clean();
731 // It's OK to remove lease prematurely because we will never link
732 // the dentry to inode again.
733 if (dn->is_any_leases())
734 dn->remove_client_leases(cache->mds->locker);
735 if (dn->get_num_ref() == 0)
736 remove_dentry(dn);
737 } else {
738 assert(!dn->is_projected());
739 CDentry::linkage_t *dnl= dn->get_linkage();
740 CInode *in = NULL;
741 if (dnl->is_primary()) {
742 in = dnl->get_inode();
743 if (clear_dirty && in->is_dirty())
744 in->mark_clean();
745 }
746 if (clear_dirty && dn->is_dirty())
747 dn->mark_clean();
748 if (dn->get_num_ref() == 0) {
749 remove_dentry(dn);
750 if (in)
751 cache->remove_inode(in);
752 }
753 }
754 }
755
756 if (clear_dirty && is_dirty())
757 mark_clean();
758 }
759
760 bool CDir::try_trim_snap_dentry(CDentry *dn, const set<snapid_t>& snaps)
761 {
762 assert(dn->last != CEPH_NOSNAP);
763 set<snapid_t>::const_iterator p = snaps.lower_bound(dn->first);
764 CDentry::linkage_t *dnl= dn->get_linkage();
765 CInode *in = 0;
766 if (dnl->is_primary())
767 in = dnl->get_inode();
768 if ((p == snaps.end() || *p > dn->last) &&
769 (dn->get_num_ref() == dn->is_dirty()) &&
770 (!in || in->get_num_ref() == in->is_dirty())) {
771 dout(10) << " purging snapped " << *dn << dendl;
772 if (in && in->is_dirty())
773 in->mark_clean();
774 remove_dentry(dn);
775 if (in) {
776 dout(10) << " purging snapped " << *in << dendl;
777 cache->remove_inode(in);
778 }
779 return true;
780 }
781 return false;
782 }
783
784
785 void CDir::purge_stale_snap_data(const set<snapid_t>& snaps)
786 {
787 dout(10) << "purge_stale_snap_data " << snaps << dendl;
788
789 CDir::map_t::iterator p = items.begin();
790 while (p != items.end()) {
791 CDentry *dn = p->second;
792 ++p;
793
794 if (dn->last == CEPH_NOSNAP)
795 continue;
796
797 try_trim_snap_dentry(dn, snaps);
798 }
799 }
800
801
802 /**
803 * steal_dentry -- semi-violently move a dentry from one CDir to another
804 * (*) violently, in that nitems, most pins, etc. are not correctly maintained
805 * on the old CDir corpse; must call finish_old_fragment() when finished.
806 */
807 void CDir::steal_dentry(CDentry *dn)
808 {
809 dout(15) << "steal_dentry " << *dn << dendl;
810
811 items[dn->key()] = dn;
812
813 dn->dir->items.erase(dn->key());
814 if (dn->dir->items.empty())
815 dn->dir->put(PIN_CHILD);
816
817 if (get_num_any() == 0)
818 get(PIN_CHILD);
819 if (dn->get_linkage()->is_null()) {
820 if (dn->last == CEPH_NOSNAP)
821 num_head_null++;
822 else
823 num_snap_null++;
824 } else if (dn->last == CEPH_NOSNAP) {
825 num_head_items++;
826
827 if (dn->get_linkage()->is_primary()) {
828 CInode *in = dn->get_linkage()->get_inode();
829 inode_t *pi = in->get_projected_inode();
830 if (dn->get_linkage()->get_inode()->is_dir())
831 fnode.fragstat.nsubdirs++;
832 else
833 fnode.fragstat.nfiles++;
834 fnode.rstat.rbytes += pi->accounted_rstat.rbytes;
835 fnode.rstat.rfiles += pi->accounted_rstat.rfiles;
836 fnode.rstat.rsubdirs += pi->accounted_rstat.rsubdirs;
837 fnode.rstat.rsnaprealms += pi->accounted_rstat.rsnaprealms;
838 if (pi->accounted_rstat.rctime > fnode.rstat.rctime)
839 fnode.rstat.rctime = pi->accounted_rstat.rctime;
840
841 // move dirty inode rstat to new dirfrag
842 if (in->is_dirty_rstat())
843 dirty_rstat_inodes.push_back(&in->dirty_rstat_item);
844 } else if (dn->get_linkage()->is_remote()) {
845 if (dn->get_linkage()->get_remote_d_type() == DT_DIR)
846 fnode.fragstat.nsubdirs++;
847 else
848 fnode.fragstat.nfiles++;
849 }
850 } else {
851 num_snap_items++;
852 if (dn->get_linkage()->is_primary()) {
853 CInode *in = dn->get_linkage()->get_inode();
854 if (in->is_dirty_rstat())
855 dirty_rstat_inodes.push_back(&in->dirty_rstat_item);
856 }
857 }
858
859 if (dn->auth_pins || dn->nested_auth_pins) {
860 // use the helpers here to maintain the auth_pin invariants on the dir inode
861 int ap = dn->get_num_auth_pins() + dn->get_num_nested_auth_pins();
862 int dap = dn->get_num_dir_auth_pins();
863 assert(dap <= ap);
864 adjust_nested_auth_pins(ap, dap, NULL);
865 dn->dir->adjust_nested_auth_pins(-ap, -dap, NULL);
866 }
867
868 if (dn->is_dirty()) {
869 dirty_dentries.push_back(&dn->item_dir_dirty);
870 num_dirty++;
871 }
872
873 dn->dir = this;
874 }
875
876 void CDir::prepare_old_fragment(map<string_snap_t, std::list<MDSInternalContextBase*> >& dentry_waiters, bool replay)
877 {
878 // auth_pin old fragment for duration so that any auth_pinning
879 // during the dentry migration doesn't trigger side effects
880 if (!replay && is_auth())
881 auth_pin(this);
882
883 if (!waiting_on_dentry.empty()) {
884 for (auto p = waiting_on_dentry.begin(); p != waiting_on_dentry.end(); ++p)
885 dentry_waiters[p->first].swap(p->second);
886 waiting_on_dentry.clear();
887 put(PIN_DNWAITER);
888 }
889 }
890
891 void CDir::prepare_new_fragment(bool replay)
892 {
893 if (!replay && is_auth()) {
894 _freeze_dir();
895 mark_complete();
896 }
897 inode->add_dirfrag(this);
898 }
899
900 void CDir::finish_old_fragment(list<MDSInternalContextBase*>& waiters, bool replay)
901 {
902 // take waiters _before_ unfreeze...
903 if (!replay) {
904 take_waiting(WAIT_ANY_MASK, waiters);
905 if (is_auth()) {
906 auth_unpin(this); // pinned in prepare_old_fragment
907 assert(is_frozen_dir());
908 unfreeze_dir();
909 }
910 }
911
912 assert(nested_auth_pins == 0);
913 assert(dir_auth_pins == 0);
914 assert(auth_pins == 0);
915
916 num_head_items = num_head_null = 0;
917 num_snap_items = num_snap_null = 0;
918
919 // this mirrors init_fragment_pins()
920 if (is_auth())
921 clear_replica_map();
922 if (is_dirty())
923 mark_clean();
924 if (state_test(STATE_IMPORTBOUND))
925 put(PIN_IMPORTBOUND);
926 if (state_test(STATE_EXPORTBOUND))
927 put(PIN_EXPORTBOUND);
928 if (is_subtree_root())
929 put(PIN_SUBTREE);
930
931 if (auth_pins > 0)
932 put(PIN_AUTHPIN);
933
934 assert(get_num_ref() == (state_test(STATE_STICKY) ? 1:0));
935 }
936
937 void CDir::init_fragment_pins()
938 {
939 if (is_replicated())
940 get(PIN_REPLICATED);
941 if (state_test(STATE_DIRTY))
942 get(PIN_DIRTY);
943 if (state_test(STATE_EXPORTBOUND))
944 get(PIN_EXPORTBOUND);
945 if (state_test(STATE_IMPORTBOUND))
946 get(PIN_IMPORTBOUND);
947 if (is_subtree_root())
948 get(PIN_SUBTREE);
949 }
950
951 void CDir::split(int bits, list<CDir*>& subs, list<MDSInternalContextBase*>& waiters, bool replay)
952 {
953 dout(10) << "split by " << bits << " bits on " << *this << dendl;
954
955 assert(replay || is_complete() || !is_auth());
956
957 list<frag_t> frags;
958 frag.split(bits, frags);
959
960 vector<CDir*> subfrags(1 << bits);
961
962 double fac = 1.0 / (double)(1 << bits); // for scaling load vecs
963
964 version_t rstat_version = inode->get_projected_inode()->rstat.version;
965 version_t dirstat_version = inode->get_projected_inode()->dirstat.version;
966
967 nest_info_t rstatdiff;
968 frag_info_t fragstatdiff;
969 if (fnode.accounted_rstat.version == rstat_version)
970 rstatdiff.add_delta(fnode.accounted_rstat, fnode.rstat);
971 if (fnode.accounted_fragstat.version == dirstat_version)
972 fragstatdiff.add_delta(fnode.accounted_fragstat, fnode.fragstat);
973 dout(10) << " rstatdiff " << rstatdiff << " fragstatdiff " << fragstatdiff << dendl;
974
975 map<string_snap_t, std::list<MDSInternalContextBase*> > dentry_waiters;
976 prepare_old_fragment(dentry_waiters, replay);
977
978 // create subfrag dirs
979 int n = 0;
980 for (list<frag_t>::iterator p = frags.begin(); p != frags.end(); ++p) {
981 CDir *f = new CDir(inode, *p, cache, is_auth());
982 f->state_set(state & (MASK_STATE_FRAGMENT_KEPT | STATE_COMPLETE));
983 f->get_replicas() = get_replicas();
984 f->dir_auth = dir_auth;
985 f->init_fragment_pins();
986 f->set_version(get_version());
987
988 f->pop_me = pop_me;
989 f->pop_me.scale(fac);
990
991 // FIXME; this is an approximation
992 f->pop_nested = pop_nested;
993 f->pop_nested.scale(fac);
994 f->pop_auth_subtree = pop_auth_subtree;
995 f->pop_auth_subtree.scale(fac);
996 f->pop_auth_subtree_nested = pop_auth_subtree_nested;
997 f->pop_auth_subtree_nested.scale(fac);
998
999 dout(10) << " subfrag " << *p << " " << *f << dendl;
1000 subfrags[n++] = f;
1001 subs.push_back(f);
1002
1003 f->set_dir_auth(get_dir_auth());
1004 f->prepare_new_fragment(replay);
1005 }
1006
1007 // repartition dentries
1008 while (!items.empty()) {
1009 CDir::map_t::iterator p = items.begin();
1010
1011 CDentry *dn = p->second;
1012 frag_t subfrag = inode->pick_dirfrag(dn->name);
1013 int n = (subfrag.value() & (subfrag.mask() ^ frag.mask())) >> subfrag.mask_shift();
1014 dout(15) << " subfrag " << subfrag << " n=" << n << " for " << p->first << dendl;
1015 CDir *f = subfrags[n];
1016 f->steal_dentry(dn);
1017 }
1018
1019 for (auto& p : dentry_waiters) {
1020 frag_t subfrag = inode->pick_dirfrag(p.first.name);
1021 int n = (subfrag.value() & (subfrag.mask() ^ frag.mask())) >> subfrag.mask_shift();
1022 CDir *f = subfrags[n];
1023
1024 if (f->waiting_on_dentry.empty())
1025 f->get(PIN_DNWAITER);
1026 f->waiting_on_dentry[p.first].swap(p.second);
1027 }
1028
1029 // FIXME: handle dirty old rstat
1030
1031 // fix up new frag fragstats
1032 for (int i=0; i<n; i++) {
1033 CDir *f = subfrags[i];
1034 f->fnode.rstat.version = rstat_version;
1035 f->fnode.accounted_rstat = f->fnode.rstat;
1036 f->fnode.fragstat.version = dirstat_version;
1037 f->fnode.accounted_fragstat = f->fnode.fragstat;
1038 dout(10) << " rstat " << f->fnode.rstat << " fragstat " << f->fnode.fragstat
1039 << " on " << *f << dendl;
1040 }
1041
1042 // give any outstanding frag stat differential to first frag
1043 dout(10) << " giving rstatdiff " << rstatdiff << " fragstatdiff" << fragstatdiff
1044 << " to " << *subfrags[0] << dendl;
1045 subfrags[0]->fnode.accounted_rstat.add(rstatdiff);
1046 subfrags[0]->fnode.accounted_fragstat.add(fragstatdiff);
1047
1048 finish_old_fragment(waiters, replay);
1049 }
1050
1051 void CDir::merge(list<CDir*>& subs, list<MDSInternalContextBase*>& waiters, bool replay)
1052 {
1053 dout(10) << "merge " << subs << dendl;
1054
1055 mds_authority_t new_auth = CDIR_AUTH_DEFAULT;
1056 for (auto dir : subs) {
1057 if (dir->get_dir_auth() != CDIR_AUTH_DEFAULT &&
1058 dir->get_dir_auth() != new_auth) {
1059 assert(new_auth == CDIR_AUTH_DEFAULT);
1060 new_auth = dir->get_dir_auth();
1061 }
1062 }
1063
1064 set_dir_auth(new_auth);
1065 prepare_new_fragment(replay);
1066
1067 nest_info_t rstatdiff;
1068 frag_info_t fragstatdiff;
1069 bool touched_mtime, touched_chattr;
1070 version_t rstat_version = inode->get_projected_inode()->rstat.version;
1071 version_t dirstat_version = inode->get_projected_inode()->dirstat.version;
1072
1073 map<string_snap_t, std::list<MDSInternalContextBase*> > dentry_waiters;
1074
1075 for (auto dir : subs) {
1076 dout(10) << " subfrag " << dir->get_frag() << " " << *dir << dendl;
1077 assert(!dir->is_auth() || dir->is_complete() || replay);
1078
1079 if (dir->fnode.accounted_rstat.version == rstat_version)
1080 rstatdiff.add_delta(dir->fnode.accounted_rstat, dir->fnode.rstat);
1081 if (dir->fnode.accounted_fragstat.version == dirstat_version)
1082 fragstatdiff.add_delta(dir->fnode.accounted_fragstat, dir->fnode.fragstat,
1083 &touched_mtime, &touched_chattr);
1084
1085 dir->prepare_old_fragment(dentry_waiters, replay);
1086
1087 // steal dentries
1088 while (!dir->items.empty())
1089 steal_dentry(dir->items.begin()->second);
1090
1091 // merge replica map
1092 for (const auto &p : dir->get_replicas()) {
1093 unsigned cur = get_replicas()[p.first];
1094 if (p.second > cur)
1095 get_replicas()[p.first] = p.second;
1096 }
1097
1098 // merge version
1099 if (dir->get_version() > get_version())
1100 set_version(dir->get_version());
1101
1102 // merge state
1103 state_set(dir->get_state() & MASK_STATE_FRAGMENT_KEPT);
1104
1105 dir->finish_old_fragment(waiters, replay);
1106 inode->close_dirfrag(dir->get_frag());
1107 }
1108
1109 if (!dentry_waiters.empty()) {
1110 get(PIN_DNWAITER);
1111 for (auto& p : dentry_waiters) {
1112 waiting_on_dentry[p.first].swap(p.second);
1113 }
1114 }
1115
1116 if (is_auth() && !replay)
1117 mark_complete();
1118
1119 // FIXME: merge dirty old rstat
1120 fnode.rstat.version = rstat_version;
1121 fnode.accounted_rstat = fnode.rstat;
1122 fnode.accounted_rstat.add(rstatdiff);
1123
1124 fnode.fragstat.version = dirstat_version;
1125 fnode.accounted_fragstat = fnode.fragstat;
1126 fnode.accounted_fragstat.add(fragstatdiff);
1127
1128 init_fragment_pins();
1129 }
1130
1131
1132
1133
1134 void CDir::resync_accounted_fragstat()
1135 {
1136 fnode_t *pf = get_projected_fnode();
1137 inode_t *pi = inode->get_projected_inode();
1138
1139 if (pf->accounted_fragstat.version != pi->dirstat.version) {
1140 pf->fragstat.version = pi->dirstat.version;
1141 dout(10) << "resync_accounted_fragstat " << pf->accounted_fragstat << " -> " << pf->fragstat << dendl;
1142 pf->accounted_fragstat = pf->fragstat;
1143 }
1144 }
1145
1146 /*
1147 * resync rstat and accounted_rstat with inode
1148 */
1149 void CDir::resync_accounted_rstat()
1150 {
1151 fnode_t *pf = get_projected_fnode();
1152 inode_t *pi = inode->get_projected_inode();
1153
1154 if (pf->accounted_rstat.version != pi->rstat.version) {
1155 pf->rstat.version = pi->rstat.version;
1156 dout(10) << "resync_accounted_rstat " << pf->accounted_rstat << " -> " << pf->rstat << dendl;
1157 pf->accounted_rstat = pf->rstat;
1158 dirty_old_rstat.clear();
1159 }
1160 }
1161
1162 void CDir::assimilate_dirty_rstat_inodes()
1163 {
1164 dout(10) << "assimilate_dirty_rstat_inodes" << dendl;
1165 for (elist<CInode*>::iterator p = dirty_rstat_inodes.begin_use_current();
1166 !p.end(); ++p) {
1167 CInode *in = *p;
1168 assert(in->is_auth());
1169 if (in->is_frozen())
1170 continue;
1171
1172 inode_t *pi = in->project_inode();
1173 pi->version = in->pre_dirty();
1174
1175 inode->mdcache->project_rstat_inode_to_frag(in, this, 0, 0, NULL);
1176 }
1177 state_set(STATE_ASSIMRSTAT);
1178 dout(10) << "assimilate_dirty_rstat_inodes done" << dendl;
1179 }
1180
1181 void CDir::assimilate_dirty_rstat_inodes_finish(MutationRef& mut, EMetaBlob *blob)
1182 {
1183 if (!state_test(STATE_ASSIMRSTAT))
1184 return;
1185 state_clear(STATE_ASSIMRSTAT);
1186 dout(10) << "assimilate_dirty_rstat_inodes_finish" << dendl;
1187 elist<CInode*>::iterator p = dirty_rstat_inodes.begin_use_current();
1188 while (!p.end()) {
1189 CInode *in = *p;
1190 ++p;
1191
1192 if (in->is_frozen())
1193 continue;
1194
1195 CDentry *dn = in->get_projected_parent_dn();
1196
1197 mut->auth_pin(in);
1198 mut->add_projected_inode(in);
1199
1200 in->clear_dirty_rstat();
1201 blob->add_primary_dentry(dn, in, true);
1202 }
1203
1204 if (!dirty_rstat_inodes.empty())
1205 inode->mdcache->mds->locker->mark_updated_scatterlock(&inode->nestlock);
1206 }
1207
1208
1209
1210
1211 /****************************************
1212 * WAITING
1213 */
1214
1215 void CDir::add_dentry_waiter(const string& dname, snapid_t snapid, MDSInternalContextBase *c)
1216 {
1217 if (waiting_on_dentry.empty())
1218 get(PIN_DNWAITER);
1219 waiting_on_dentry[string_snap_t(dname, snapid)].push_back(c);
1220 dout(10) << "add_dentry_waiter dentry " << dname
1221 << " snap " << snapid
1222 << " " << c << " on " << *this << dendl;
1223 }
1224
1225 void CDir::take_dentry_waiting(const string& dname, snapid_t first, snapid_t last,
1226 list<MDSInternalContextBase*>& ls)
1227 {
1228 if (waiting_on_dentry.empty())
1229 return;
1230
1231 string_snap_t lb(dname, first);
1232 string_snap_t ub(dname, last);
1233 compact_map<string_snap_t, list<MDSInternalContextBase*> >::iterator p = waiting_on_dentry.lower_bound(lb);
1234 while (p != waiting_on_dentry.end() &&
1235 !(ub < p->first)) {
1236 dout(10) << "take_dentry_waiting dentry " << dname
1237 << " [" << first << "," << last << "] found waiter on snap "
1238 << p->first.snapid
1239 << " on " << *this << dendl;
1240 ls.splice(ls.end(), p->second);
1241 waiting_on_dentry.erase(p++);
1242 }
1243
1244 if (waiting_on_dentry.empty())
1245 put(PIN_DNWAITER);
1246 }
1247
1248 void CDir::take_sub_waiting(list<MDSInternalContextBase*>& ls)
1249 {
1250 dout(10) << "take_sub_waiting" << dendl;
1251 if (!waiting_on_dentry.empty()) {
1252 for (compact_map<string_snap_t, list<MDSInternalContextBase*> >::iterator p = waiting_on_dentry.begin();
1253 p != waiting_on_dentry.end();
1254 ++p)
1255 ls.splice(ls.end(), p->second);
1256 waiting_on_dentry.clear();
1257 put(PIN_DNWAITER);
1258 }
1259 }
1260
1261
1262
1263 void CDir::add_waiter(uint64_t tag, MDSInternalContextBase *c)
1264 {
1265 // hierarchical?
1266
1267 // at free root?
1268 if (tag & WAIT_ATFREEZEROOT) {
1269 if (!(is_freezing_tree_root() || is_frozen_tree_root() ||
1270 is_freezing_dir() || is_frozen_dir())) {
1271 // try parent
1272 dout(10) << "add_waiter " << std::hex << tag << std::dec << " " << c << " should be ATFREEZEROOT, " << *this << " is not root, trying parent" << dendl;
1273 inode->parent->dir->add_waiter(tag, c);
1274 return;
1275 }
1276 }
1277
1278 // at subtree root?
1279 if (tag & WAIT_ATSUBTREEROOT) {
1280 if (!is_subtree_root()) {
1281 // try parent
1282 dout(10) << "add_waiter " << std::hex << tag << std::dec << " " << c << " should be ATSUBTREEROOT, " << *this << " is not root, trying parent" << dendl;
1283 inode->parent->dir->add_waiter(tag, c);
1284 return;
1285 }
1286 }
1287
1288 assert(!(tag & WAIT_CREATED) || state_test(STATE_CREATING));
1289
1290 MDSCacheObject::add_waiter(tag, c);
1291 }
1292
1293
1294
1295 /* NOTE: this checks dentry waiters too */
1296 void CDir::take_waiting(uint64_t mask, list<MDSInternalContextBase*>& ls)
1297 {
1298 if ((mask & WAIT_DENTRY) && !waiting_on_dentry.empty()) {
1299 // take all dentry waiters
1300 while (!waiting_on_dentry.empty()) {
1301 compact_map<string_snap_t, list<MDSInternalContextBase*> >::iterator p = waiting_on_dentry.begin();
1302 dout(10) << "take_waiting dentry " << p->first.name
1303 << " snap " << p->first.snapid << " on " << *this << dendl;
1304 ls.splice(ls.end(), p->second);
1305 waiting_on_dentry.erase(p);
1306 }
1307 put(PIN_DNWAITER);
1308 }
1309
1310 // waiting
1311 MDSCacheObject::take_waiting(mask, ls);
1312 }
1313
1314
1315 void CDir::finish_waiting(uint64_t mask, int result)
1316 {
1317 dout(11) << "finish_waiting mask " << hex << mask << dec << " result " << result << " on " << *this << dendl;
1318
1319 list<MDSInternalContextBase*> finished;
1320 take_waiting(mask, finished);
1321 if (result < 0)
1322 finish_contexts(g_ceph_context, finished, result);
1323 else
1324 cache->mds->queue_waiters(finished);
1325 }
1326
1327
1328
1329 // dirty/clean
1330
1331 fnode_t *CDir::project_fnode()
1332 {
1333 assert(get_version() != 0);
1334 fnode_t *p = new fnode_t;
1335 *p = *get_projected_fnode();
1336 projected_fnode.push_back(p);
1337
1338 if (scrub_infop && scrub_infop->last_scrub_dirty) {
1339 p->localized_scrub_stamp = scrub_infop->last_local.time;
1340 p->localized_scrub_version = scrub_infop->last_local.version;
1341 p->recursive_scrub_stamp = scrub_infop->last_recursive.time;
1342 p->recursive_scrub_version = scrub_infop->last_recursive.version;
1343 scrub_infop->last_scrub_dirty = false;
1344 scrub_maybe_delete_info();
1345 }
1346
1347 dout(10) << "project_fnode " << p << dendl;
1348 return p;
1349 }
1350
1351 void CDir::pop_and_dirty_projected_fnode(LogSegment *ls)
1352 {
1353 assert(!projected_fnode.empty());
1354 dout(15) << "pop_and_dirty_projected_fnode " << projected_fnode.front()
1355 << " v" << projected_fnode.front()->version << dendl;
1356 fnode = *projected_fnode.front();
1357 _mark_dirty(ls);
1358 delete projected_fnode.front();
1359 projected_fnode.pop_front();
1360 }
1361
1362
1363 version_t CDir::pre_dirty(version_t min)
1364 {
1365 if (min > projected_version)
1366 projected_version = min;
1367 ++projected_version;
1368 dout(10) << "pre_dirty " << projected_version << dendl;
1369 return projected_version;
1370 }
1371
1372 void CDir::mark_dirty(version_t pv, LogSegment *ls)
1373 {
1374 assert(get_version() < pv);
1375 assert(pv <= projected_version);
1376 fnode.version = pv;
1377 _mark_dirty(ls);
1378 }
1379
1380 void CDir::_mark_dirty(LogSegment *ls)
1381 {
1382 if (!state_test(STATE_DIRTY)) {
1383 dout(10) << "mark_dirty (was clean) " << *this << " version " << get_version() << dendl;
1384 _set_dirty_flag();
1385 assert(ls);
1386 } else {
1387 dout(10) << "mark_dirty (already dirty) " << *this << " version " << get_version() << dendl;
1388 }
1389 if (ls) {
1390 ls->dirty_dirfrags.push_back(&item_dirty);
1391
1392 // if i've never committed, i need to be before _any_ mention of me is trimmed from the journal.
1393 if (committed_version == 0 && !item_new.is_on_list())
1394 ls->new_dirfrags.push_back(&item_new);
1395 }
1396 }
1397
1398 void CDir::mark_new(LogSegment *ls)
1399 {
1400 ls->new_dirfrags.push_back(&item_new);
1401 state_clear(STATE_CREATING);
1402
1403 list<MDSInternalContextBase*> waiters;
1404 take_waiting(CDir::WAIT_CREATED, waiters);
1405 cache->mds->queue_waiters(waiters);
1406 }
1407
1408 void CDir::mark_clean()
1409 {
1410 dout(10) << "mark_clean " << *this << " version " << get_version() << dendl;
1411 if (state_test(STATE_DIRTY)) {
1412 item_dirty.remove_myself();
1413 item_new.remove_myself();
1414
1415 state_clear(STATE_DIRTY);
1416 put(PIN_DIRTY);
1417 }
1418 }
1419
1420 // caller should hold auth pin of this
1421 void CDir::log_mark_dirty()
1422 {
1423 if (is_dirty() || projected_version > get_version())
1424 return; // noop if it is already dirty or will be dirty
1425
1426 version_t pv = pre_dirty();
1427 mark_dirty(pv, cache->mds->mdlog->get_current_segment());
1428 }
1429
1430 void CDir::mark_complete() {
1431 state_set(STATE_COMPLETE);
1432 bloom.reset();
1433 }
1434
1435 void CDir::first_get()
1436 {
1437 inode->get(CInode::PIN_DIRFRAG);
1438 }
1439
1440 void CDir::last_put()
1441 {
1442 inode->put(CInode::PIN_DIRFRAG);
1443 }
1444
1445
1446
1447 /******************************************************************************
1448 * FETCH and COMMIT
1449 */
1450
1451 // -----------------------
1452 // FETCH
1453 void CDir::fetch(MDSInternalContextBase *c, bool ignore_authpinnability)
1454 {
1455 string want;
1456 return fetch(c, want, ignore_authpinnability);
1457 }
1458
1459 void CDir::fetch(MDSInternalContextBase *c, const string& want_dn, bool ignore_authpinnability)
1460 {
1461 dout(10) << "fetch on " << *this << dendl;
1462
1463 assert(is_auth());
1464 assert(!is_complete());
1465
1466 if (!can_auth_pin() && !ignore_authpinnability) {
1467 if (c) {
1468 dout(7) << "fetch waiting for authpinnable" << dendl;
1469 add_waiter(WAIT_UNFREEZE, c);
1470 } else
1471 dout(7) << "fetch not authpinnable and no context" << dendl;
1472 return;
1473 }
1474
1475 // unlinked directory inode shouldn't have any entry
1476 if (!inode->is_base() && get_parent_dir()->inode->is_stray() &&
1477 !inode->snaprealm) {
1478 dout(7) << "fetch dirfrag for unlinked directory, mark complete" << dendl;
1479 if (get_version() == 0) {
1480 assert(inode->is_auth());
1481 set_version(1);
1482
1483 if (state_test(STATE_REJOINUNDEF)) {
1484 assert(cache->mds->is_rejoin());
1485 state_clear(STATE_REJOINUNDEF);
1486 cache->opened_undef_dirfrag(this);
1487 }
1488 }
1489 mark_complete();
1490
1491 if (c)
1492 cache->mds->queue_waiter(c);
1493 return;
1494 }
1495
1496 if (c) add_waiter(WAIT_COMPLETE, c);
1497 if (!want_dn.empty()) wanted_items.insert(want_dn);
1498
1499 // already fetching?
1500 if (state_test(CDir::STATE_FETCHING)) {
1501 dout(7) << "already fetching; waiting" << dendl;
1502 return;
1503 }
1504
1505 auth_pin(this);
1506 state_set(CDir::STATE_FETCHING);
1507
1508 if (cache->mds->logger) cache->mds->logger->inc(l_mds_dir_fetch);
1509
1510 std::set<dentry_key_t> empty;
1511 _omap_fetch(NULL, empty);
1512 }
1513
1514 void CDir::fetch(MDSInternalContextBase *c, const std::set<dentry_key_t>& keys)
1515 {
1516 dout(10) << "fetch " << keys.size() << " keys on " << *this << dendl;
1517
1518 assert(is_auth());
1519 assert(!is_complete());
1520
1521 if (!can_auth_pin()) {
1522 dout(7) << "fetch keys waiting for authpinnable" << dendl;
1523 add_waiter(WAIT_UNFREEZE, c);
1524 return;
1525 }
1526 if (state_test(CDir::STATE_FETCHING)) {
1527 dout(7) << "fetch keys waiting for full fetch" << dendl;
1528 add_waiter(WAIT_COMPLETE, c);
1529 return;
1530 }
1531
1532 auth_pin(this);
1533 if (cache->mds->logger) cache->mds->logger->inc(l_mds_dir_fetch);
1534
1535 _omap_fetch(c, keys);
1536 }
1537
1538 class C_IO_Dir_OMAP_FetchedMore : public CDirIOContext {
1539 MDSInternalContextBase *fin;
1540 public:
1541 bufferlist hdrbl;
1542 bool more = false;
1543 map<string, bufferlist> omap; ///< carry-over from before
1544 map<string, bufferlist> omap_more; ///< new batch
1545 int ret;
1546 C_IO_Dir_OMAP_FetchedMore(CDir *d, MDSInternalContextBase *f) :
1547 CDirIOContext(d), fin(f), ret(0) { }
1548 void finish(int r) {
1549 // merge results
1550 if (omap.empty()) {
1551 omap.swap(omap_more);
1552 } else {
1553 omap.insert(omap_more.begin(), omap_more.end());
1554 }
1555 if (more) {
1556 dir->_omap_fetch_more(hdrbl, omap, fin);
1557 } else {
1558 dir->_omap_fetched(hdrbl, omap, !fin, r);
1559 if (fin)
1560 fin->complete(r);
1561 }
1562 }
1563 };
1564
1565 class C_IO_Dir_OMAP_Fetched : public CDirIOContext {
1566 MDSInternalContextBase *fin;
1567 public:
1568 bufferlist hdrbl;
1569 bool more = false;
1570 map<string, bufferlist> omap;
1571 bufferlist btbl;
1572 int ret1, ret2, ret3;
1573
1574 C_IO_Dir_OMAP_Fetched(CDir *d, MDSInternalContextBase *f) :
1575 CDirIOContext(d), fin(f), ret1(0), ret2(0), ret3(0) { }
1576 void finish(int r) override {
1577 // check the correctness of backtrace
1578 if (r >= 0 && ret3 != -ECANCELED)
1579 dir->inode->verify_diri_backtrace(btbl, ret3);
1580 if (r >= 0) r = ret1;
1581 if (r >= 0) r = ret2;
1582 if (more) {
1583 dir->_omap_fetch_more(hdrbl, omap, fin);
1584 } else {
1585 dir->_omap_fetched(hdrbl, omap, !fin, r);
1586 if (fin)
1587 fin->complete(r);
1588 }
1589 }
1590 };
1591
1592 void CDir::_omap_fetch(MDSInternalContextBase *c, const std::set<dentry_key_t>& keys)
1593 {
1594 C_IO_Dir_OMAP_Fetched *fin = new C_IO_Dir_OMAP_Fetched(this, c);
1595 object_t oid = get_ondisk_object();
1596 object_locator_t oloc(cache->mds->mdsmap->get_metadata_pool());
1597 ObjectOperation rd;
1598 rd.omap_get_header(&fin->hdrbl, &fin->ret1);
1599 if (keys.empty()) {
1600 assert(!c);
1601 rd.omap_get_vals("", "", g_conf->mds_dir_keys_per_op,
1602 &fin->omap, &fin->more, &fin->ret2);
1603 } else {
1604 assert(c);
1605 std::set<std::string> str_keys;
1606 for (auto p = keys.begin(); p != keys.end(); ++p) {
1607 string str;
1608 p->encode(str);
1609 str_keys.insert(str);
1610 }
1611 rd.omap_get_vals_by_keys(str_keys, &fin->omap, &fin->ret2);
1612 }
1613 // check the correctness of backtrace
1614 if (g_conf->mds_verify_backtrace > 0 && frag == frag_t()) {
1615 rd.getxattr("parent", &fin->btbl, &fin->ret3);
1616 rd.set_last_op_flags(CEPH_OSD_OP_FLAG_FAILOK);
1617 } else {
1618 fin->ret3 = -ECANCELED;
1619 }
1620
1621 cache->mds->objecter->read(oid, oloc, rd, CEPH_NOSNAP, NULL, 0,
1622 new C_OnFinisher(fin, cache->mds->finisher));
1623 }
1624
1625 void CDir::_omap_fetch_more(
1626 bufferlist& hdrbl,
1627 map<string, bufferlist>& omap,
1628 MDSInternalContextBase *c)
1629 {
1630 // we have more omap keys to fetch!
1631 object_t oid = get_ondisk_object();
1632 object_locator_t oloc(cache->mds->mdsmap->get_metadata_pool());
1633 C_IO_Dir_OMAP_FetchedMore *fin = new C_IO_Dir_OMAP_FetchedMore(this, c);
1634 fin->hdrbl.claim(hdrbl);
1635 fin->omap.swap(omap);
1636 ObjectOperation rd;
1637 rd.omap_get_vals(fin->omap.rbegin()->first,
1638 "", /* filter prefix */
1639 g_conf->mds_dir_keys_per_op,
1640 &fin->omap_more,
1641 &fin->more,
1642 &fin->ret);
1643 cache->mds->objecter->read(oid, oloc, rd, CEPH_NOSNAP, NULL, 0,
1644 new C_OnFinisher(fin, cache->mds->finisher));
1645 }
1646
1647 CDentry *CDir::_load_dentry(
1648 const std::string &key,
1649 const std::string &dname,
1650 const snapid_t last,
1651 bufferlist &bl,
1652 const int pos,
1653 const std::set<snapid_t> *snaps,
1654 bool *force_dirty,
1655 list<CInode*> *undef_inodes)
1656 {
1657 bufferlist::iterator q = bl.begin();
1658
1659 snapid_t first;
1660 ::decode(first, q);
1661
1662 // marker
1663 char type;
1664 ::decode(type, q);
1665
1666 dout(20) << "_fetched pos " << pos << " marker '" << type << "' dname '" << dname
1667 << " [" << first << "," << last << "]"
1668 << dendl;
1669
1670 bool stale = false;
1671 if (snaps && last != CEPH_NOSNAP) {
1672 set<snapid_t>::const_iterator p = snaps->lower_bound(first);
1673 if (p == snaps->end() || *p > last) {
1674 dout(10) << " skipping stale dentry on [" << first << "," << last << "]" << dendl;
1675 stale = true;
1676 }
1677 }
1678
1679 /*
1680 * look for existing dentry for _last_ snap, because unlink +
1681 * create may leave a "hole" (epochs during which the dentry
1682 * doesn't exist) but for which no explicit negative dentry is in
1683 * the cache.
1684 */
1685 CDentry *dn;
1686 if (stale)
1687 dn = lookup_exact_snap(dname, last);
1688 else
1689 dn = lookup(dname, last);
1690
1691 if (type == 'L') {
1692 // hard link
1693 inodeno_t ino;
1694 unsigned char d_type;
1695 ::decode(ino, q);
1696 ::decode(d_type, q);
1697
1698 if (stale) {
1699 if (!dn) {
1700 stale_items.insert(key);
1701 *force_dirty = true;
1702 }
1703 return dn;
1704 }
1705
1706 if (dn) {
1707 if (dn->get_linkage()->get_inode() == 0) {
1708 dout(12) << "_fetched had NEG dentry " << *dn << dendl;
1709 } else {
1710 dout(12) << "_fetched had dentry " << *dn << dendl;
1711 }
1712 } else {
1713 // (remote) link
1714 dn = add_remote_dentry(dname, ino, d_type, first, last);
1715
1716 // link to inode?
1717 CInode *in = cache->get_inode(ino); // we may or may not have it.
1718 if (in) {
1719 dn->link_remote(dn->get_linkage(), in);
1720 dout(12) << "_fetched got remote link " << ino << " which we have " << *in << dendl;
1721 } else {
1722 dout(12) << "_fetched got remote link " << ino << " (dont' have it)" << dendl;
1723 }
1724 }
1725 }
1726 else if (type == 'I') {
1727 // inode
1728
1729 // Load inode data before looking up or constructing CInode
1730 InodeStore inode_data;
1731 inode_data.decode_bare(q);
1732
1733 if (stale) {
1734 if (!dn) {
1735 stale_items.insert(key);
1736 *force_dirty = true;
1737 }
1738 return dn;
1739 }
1740
1741 bool undef_inode = false;
1742 if (dn) {
1743 CInode *in = dn->get_linkage()->get_inode();
1744 if (in) {
1745 dout(12) << "_fetched had dentry " << *dn << dendl;
1746 if (in->state_test(CInode::STATE_REJOINUNDEF)) {
1747 undef_inodes->push_back(in);
1748 undef_inode = true;
1749 }
1750 } else
1751 dout(12) << "_fetched had NEG dentry " << *dn << dendl;
1752 }
1753
1754 if (!dn || undef_inode) {
1755 // add inode
1756 CInode *in = cache->get_inode(inode_data.inode.ino, last);
1757 if (!in || undef_inode) {
1758 if (undef_inode && in)
1759 in->first = first;
1760 else
1761 in = new CInode(cache, true, first, last);
1762
1763 in->inode = inode_data.inode;
1764 // symlink?
1765 if (in->is_symlink())
1766 in->symlink = inode_data.symlink;
1767
1768 in->dirfragtree.swap(inode_data.dirfragtree);
1769 in->xattrs.swap(inode_data.xattrs);
1770 in->old_inodes.swap(inode_data.old_inodes);
1771 if (!in->old_inodes.empty()) {
1772 snapid_t min_first = in->old_inodes.rbegin()->first + 1;
1773 if (min_first > in->first)
1774 in->first = min_first;
1775 }
1776
1777 in->oldest_snap = inode_data.oldest_snap;
1778 in->decode_snap_blob(inode_data.snap_blob);
1779 if (snaps && !in->snaprealm)
1780 in->purge_stale_snap_data(*snaps);
1781
1782 if (!undef_inode) {
1783 cache->add_inode(in); // add
1784 dn = add_primary_dentry(dname, in, first, last); // link
1785 }
1786 dout(12) << "_fetched got " << *dn << " " << *in << dendl;
1787
1788 if (in->inode.is_dirty_rstat())
1789 in->mark_dirty_rstat();
1790
1791 //in->hack_accessed = false;
1792 //in->hack_load_stamp = ceph_clock_now();
1793 //num_new_inodes_loaded++;
1794 } else {
1795 dout(0) << "_fetched badness: got (but i already had) " << *in
1796 << " mode " << in->inode.mode
1797 << " mtime " << in->inode.mtime << dendl;
1798 string dirpath, inopath;
1799 this->inode->make_path_string(dirpath);
1800 in->make_path_string(inopath);
1801 cache->mds->clog->error() << "loaded dup inode " << inode_data.inode.ino
1802 << " [" << first << "," << last << "] v" << inode_data.inode.version
1803 << " at " << dirpath << "/" << dname
1804 << ", but inode " << in->vino() << " v" << in->inode.version
1805 << " already exists at " << inopath;
1806 return dn;
1807 }
1808 }
1809 } else {
1810 std::ostringstream oss;
1811 oss << "Invalid tag char '" << type << "' pos " << pos;
1812 throw buffer::malformed_input(oss.str());
1813 }
1814
1815 return dn;
1816 }
1817
1818 void CDir::_omap_fetched(bufferlist& hdrbl, map<string, bufferlist>& omap,
1819 bool complete, int r)
1820 {
1821 LogChannelRef clog = cache->mds->clog;
1822 dout(10) << "_fetched header " << hdrbl.length() << " bytes "
1823 << omap.size() << " keys for " << *this << dendl;
1824
1825 assert(r == 0 || r == -ENOENT || r == -ENODATA);
1826 assert(is_auth());
1827 assert(!is_frozen());
1828
1829 if (hdrbl.length() == 0) {
1830 dout(0) << "_fetched missing object for " << *this << dendl;
1831
1832 clog->error() << "dir " << dirfrag() << " object missing on disk; some "
1833 "files may be lost (" << get_path() << ")";
1834
1835 go_bad(complete);
1836 return;
1837 }
1838
1839 fnode_t got_fnode;
1840 {
1841 bufferlist::iterator p = hdrbl.begin();
1842 try {
1843 ::decode(got_fnode, p);
1844 } catch (const buffer::error &err) {
1845 derr << "Corrupt fnode in dirfrag " << dirfrag()
1846 << ": " << err << dendl;
1847 clog->warn() << "Corrupt fnode header in " << dirfrag() << ": "
1848 << err << " (" << get_path() << ")";
1849 go_bad(complete);
1850 return;
1851 }
1852 if (!p.end()) {
1853 clog->warn() << "header buffer of dir " << dirfrag() << " has "
1854 << hdrbl.length() - p.get_off() << " extra bytes ("
1855 << get_path() << ")";
1856 go_bad(complete);
1857 return;
1858 }
1859 }
1860
1861 dout(10) << "_fetched version " << got_fnode.version << dendl;
1862
1863 // take the loaded fnode?
1864 // only if we are a fresh CDir* with no prior state.
1865 if (get_version() == 0) {
1866 assert(!is_projected());
1867 assert(!state_test(STATE_COMMITTING));
1868 fnode = got_fnode;
1869 projected_version = committing_version = committed_version = got_fnode.version;
1870
1871 if (state_test(STATE_REJOINUNDEF)) {
1872 assert(cache->mds->is_rejoin());
1873 state_clear(STATE_REJOINUNDEF);
1874 cache->opened_undef_dirfrag(this);
1875 }
1876 }
1877
1878 list<CInode*> undef_inodes;
1879
1880 // purge stale snaps?
1881 // only if we have past_parents open!
1882 bool force_dirty = false;
1883 const set<snapid_t> *snaps = NULL;
1884 SnapRealm *realm = inode->find_snaprealm();
1885 if (!realm->have_past_parents_open()) {
1886 dout(10) << " no snap purge, one or more past parents NOT open" << dendl;
1887 } else if (fnode.snap_purged_thru < realm->get_last_destroyed()) {
1888 snaps = &realm->get_snaps();
1889 dout(10) << " snap_purged_thru " << fnode.snap_purged_thru
1890 << " < " << realm->get_last_destroyed()
1891 << ", snap purge based on " << *snaps << dendl;
1892 if (get_num_snap_items() == 0) {
1893 fnode.snap_purged_thru = realm->get_last_destroyed();
1894 force_dirty = true;
1895 }
1896 }
1897
1898 unsigned pos = omap.size() - 1;
1899 for (map<string, bufferlist>::reverse_iterator p = omap.rbegin();
1900 p != omap.rend();
1901 ++p, --pos) {
1902 string dname;
1903 snapid_t last;
1904 dentry_key_t::decode_helper(p->first, dname, last);
1905
1906 CDentry *dn = NULL;
1907 try {
1908 dn = _load_dentry(
1909 p->first, dname, last, p->second, pos, snaps,
1910 &force_dirty, &undef_inodes);
1911 } catch (const buffer::error &err) {
1912 cache->mds->clog->warn() << "Corrupt dentry '" << dname << "' in "
1913 "dir frag " << dirfrag() << ": "
1914 << err << "(" << get_path() << ")";
1915
1916 // Remember that this dentry is damaged. Subsequent operations
1917 // that try to act directly on it will get their EIOs, but this
1918 // dirfrag as a whole will continue to look okay (minus the
1919 // mysteriously-missing dentry)
1920 go_bad_dentry(last, dname);
1921
1922 // Anyone who was WAIT_DENTRY for this guy will get kicked
1923 // to RetryRequest, and hit the DamageTable-interrogating path.
1924 // Stats will now be bogus because we will think we're complete,
1925 // but have 1 or more missing dentries.
1926 continue;
1927 }
1928
1929 if (dn && (wanted_items.count(dname) > 0 || !complete)) {
1930 dout(10) << " touching wanted dn " << *dn << dendl;
1931 inode->mdcache->touch_dentry(dn);
1932 }
1933
1934 /** clean underwater item?
1935 * Underwater item is something that is dirty in our cache from
1936 * journal replay, but was previously flushed to disk before the
1937 * mds failed.
1938 *
1939 * We only do this is committed_version == 0. that implies either
1940 * - this is a fetch after from a clean/empty CDir is created
1941 * (and has no effect, since the dn won't exist); or
1942 * - this is a fetch after _recovery_, which is what we're worried
1943 * about. Items that are marked dirty from the journal should be
1944 * marked clean if they appear on disk.
1945 */
1946 if (committed_version == 0 &&
1947 dn &&
1948 dn->get_version() <= got_fnode.version &&
1949 dn->is_dirty()) {
1950 dout(10) << "_fetched had underwater dentry " << *dn << ", marking clean" << dendl;
1951 dn->mark_clean();
1952
1953 if (dn->get_linkage()->is_primary()) {
1954 assert(dn->get_linkage()->get_inode()->get_version() <= got_fnode.version);
1955 dout(10) << "_fetched had underwater inode " << *dn->get_linkage()->get_inode() << ", marking clean" << dendl;
1956 dn->get_linkage()->get_inode()->mark_clean();
1957 }
1958 }
1959 }
1960
1961 //cache->mds->logger->inc("newin", num_new_inodes_loaded);
1962
1963 // mark complete, !fetching
1964 if (complete) {
1965 wanted_items.clear();
1966 mark_complete();
1967 state_clear(STATE_FETCHING);
1968
1969 if (scrub_infop && scrub_infop->need_scrub_local) {
1970 scrub_infop->need_scrub_local = false;
1971 scrub_local();
1972 }
1973 }
1974
1975 // open & force frags
1976 while (!undef_inodes.empty()) {
1977 CInode *in = undef_inodes.front();
1978 undef_inodes.pop_front();
1979 in->state_clear(CInode::STATE_REJOINUNDEF);
1980 cache->opened_undef_inode(in);
1981 }
1982
1983 // dirty myself to remove stale snap dentries
1984 if (force_dirty && !inode->mdcache->is_readonly())
1985 log_mark_dirty();
1986
1987 auth_unpin(this);
1988
1989 if (complete) {
1990 // kick waiters
1991 finish_waiting(WAIT_COMPLETE, 0);
1992 }
1993 }
1994
1995 void CDir::_go_bad()
1996 {
1997 if (get_version() == 0)
1998 set_version(1);
1999 state_set(STATE_BADFRAG);
2000 // mark complete, !fetching
2001 mark_complete();
2002 state_clear(STATE_FETCHING);
2003 auth_unpin(this);
2004
2005 // kick waiters
2006 finish_waiting(WAIT_COMPLETE, -EIO);
2007 }
2008
2009 void CDir::go_bad_dentry(snapid_t last, const std::string &dname)
2010 {
2011 dout(10) << "go_bad_dentry " << dname << dendl;
2012 const bool fatal = cache->mds->damage_table.notify_dentry(
2013 inode->ino(), frag, last, dname, get_path() + "/" + dname);
2014 if (fatal) {
2015 cache->mds->damaged();
2016 ceph_abort(); // unreachable, damaged() respawns us
2017 }
2018 }
2019
2020 void CDir::go_bad(bool complete)
2021 {
2022 dout(10) << "go_bad " << frag << dendl;
2023 const bool fatal = cache->mds->damage_table.notify_dirfrag(
2024 inode->ino(), frag, get_path());
2025 if (fatal) {
2026 cache->mds->damaged();
2027 ceph_abort(); // unreachable, damaged() respawns us
2028 }
2029
2030 if (complete)
2031 _go_bad();
2032 else
2033 auth_unpin(this);
2034 }
2035
2036 // -----------------------
2037 // COMMIT
2038
2039 /**
2040 * commit
2041 *
2042 * @param want - min version i want committed
2043 * @param c - callback for completion
2044 */
2045 void CDir::commit(version_t want, MDSInternalContextBase *c, bool ignore_authpinnability, int op_prio)
2046 {
2047 dout(10) << "commit want " << want << " on " << *this << dendl;
2048 if (want == 0) want = get_version();
2049
2050 // preconditions
2051 assert(want <= get_version() || get_version() == 0); // can't commit the future
2052 assert(want > committed_version); // the caller is stupid
2053 assert(is_auth());
2054 assert(ignore_authpinnability || can_auth_pin());
2055
2056 // note: queue up a noop if necessary, so that we always
2057 // get an auth_pin.
2058 if (!c)
2059 c = new C_MDSInternalNoop;
2060
2061 // auth_pin on first waiter
2062 if (waiting_for_commit.empty())
2063 auth_pin(this);
2064 waiting_for_commit[want].push_back(c);
2065
2066 // ok.
2067 _commit(want, op_prio);
2068 }
2069
2070 class C_IO_Dir_Committed : public CDirIOContext {
2071 version_t version;
2072 public:
2073 C_IO_Dir_Committed(CDir *d, version_t v) : CDirIOContext(d), version(v) { }
2074 void finish(int r) override {
2075 dir->_committed(r, version);
2076 }
2077 };
2078
2079 /**
2080 * Flush out the modified dentries in this dir. Keep the bufferlist
2081 * below max_write_size;
2082 */
2083 void CDir::_omap_commit(int op_prio)
2084 {
2085 dout(10) << "_omap_commit" << dendl;
2086
2087 unsigned max_write_size = cache->max_dir_commit_size;
2088 unsigned write_size = 0;
2089
2090 if (op_prio < 0)
2091 op_prio = CEPH_MSG_PRIO_DEFAULT;
2092
2093 // snap purge?
2094 const set<snapid_t> *snaps = NULL;
2095 SnapRealm *realm = inode->find_snaprealm();
2096 if (!realm->have_past_parents_open()) {
2097 dout(10) << " no snap purge, one or more past parents NOT open" << dendl;
2098 } else if (fnode.snap_purged_thru < realm->get_last_destroyed()) {
2099 snaps = &realm->get_snaps();
2100 dout(10) << " snap_purged_thru " << fnode.snap_purged_thru
2101 << " < " << realm->get_last_destroyed()
2102 << ", snap purge based on " << *snaps << dendl;
2103 // fnode.snap_purged_thru = realm->get_last_destroyed();
2104 }
2105
2106 set<string> to_remove;
2107 map<string, bufferlist> to_set;
2108
2109 C_GatherBuilder gather(g_ceph_context,
2110 new C_OnFinisher(new C_IO_Dir_Committed(this,
2111 get_version()),
2112 cache->mds->finisher));
2113
2114 SnapContext snapc;
2115 object_t oid = get_ondisk_object();
2116 object_locator_t oloc(cache->mds->mdsmap->get_metadata_pool());
2117
2118 if (!stale_items.empty()) {
2119 for (compact_set<string>::iterator p = stale_items.begin();
2120 p != stale_items.end();
2121 ++p) {
2122 to_remove.insert(*p);
2123 write_size += (*p).length();
2124 }
2125 stale_items.clear();
2126 }
2127
2128 auto write_one = [&](CDentry *dn) {
2129 string key;
2130 dn->key().encode(key);
2131
2132 if (dn->last != CEPH_NOSNAP &&
2133 snaps && try_trim_snap_dentry(dn, *snaps)) {
2134 dout(10) << " rm " << key << dendl;
2135 write_size += key.length();
2136 to_remove.insert(key);
2137 return;
2138 }
2139
2140 if (dn->get_linkage()->is_null()) {
2141 dout(10) << " rm " << dn->name << " " << *dn << dendl;
2142 write_size += key.length();
2143 to_remove.insert(key);
2144 } else {
2145 dout(10) << " set " << dn->name << " " << *dn << dendl;
2146 bufferlist dnbl;
2147 _encode_dentry(dn, dnbl, snaps);
2148 write_size += key.length() + dnbl.length();
2149 to_set[key].swap(dnbl);
2150 }
2151
2152 if (write_size >= max_write_size) {
2153 ObjectOperation op;
2154 op.priority = op_prio;
2155
2156 // don't create new dirfrag blindly
2157 if (!is_new() && !state_test(CDir::STATE_FRAGMENTING))
2158 op.stat(NULL, (ceph::real_time*) NULL, NULL);
2159
2160 if (!to_set.empty())
2161 op.omap_set(to_set);
2162 if (!to_remove.empty())
2163 op.omap_rm_keys(to_remove);
2164
2165 cache->mds->objecter->mutate(oid, oloc, op, snapc,
2166 ceph::real_clock::now(),
2167 0, gather.new_sub());
2168
2169 write_size = 0;
2170 to_set.clear();
2171 to_remove.clear();
2172 }
2173 };
2174
2175 if (state_test(CDir::STATE_FRAGMENTING)) {
2176 for (auto p = items.begin(); p != items.end(); ) {
2177 CDentry *dn = p->second;
2178 ++p;
2179 if (!dn->is_dirty() && dn->get_linkage()->is_null())
2180 continue;
2181 write_one(dn);
2182 }
2183 } else {
2184 for (auto p = dirty_dentries.begin(); !p.end(); ) {
2185 CDentry *dn = *p;
2186 ++p;
2187 write_one(dn);
2188 }
2189 }
2190
2191 ObjectOperation op;
2192 op.priority = op_prio;
2193
2194 // don't create new dirfrag blindly
2195 if (!is_new() && !state_test(CDir::STATE_FRAGMENTING))
2196 op.stat(NULL, (ceph::real_time*)NULL, NULL);
2197
2198 /*
2199 * save the header at the last moment.. If we were to send it off before other
2200 * updates, but die before sending them all, we'd think that the on-disk state
2201 * was fully committed even though it wasn't! However, since the messages are
2202 * strictly ordered between the MDS and the OSD, and since messages to a given
2203 * PG are strictly ordered, if we simply send the message containing the header
2204 * off last, we cannot get our header into an incorrect state.
2205 */
2206 bufferlist header;
2207 ::encode(fnode, header);
2208 op.omap_set_header(header);
2209
2210 if (!to_set.empty())
2211 op.omap_set(to_set);
2212 if (!to_remove.empty())
2213 op.omap_rm_keys(to_remove);
2214
2215 cache->mds->objecter->mutate(oid, oloc, op, snapc,
2216 ceph::real_clock::now(),
2217 0, gather.new_sub());
2218
2219 gather.activate();
2220 }
2221
2222 void CDir::_encode_dentry(CDentry *dn, bufferlist& bl,
2223 const set<snapid_t> *snaps)
2224 {
2225 // clear dentry NEW flag, if any. we can no longer silently drop it.
2226 dn->clear_new();
2227
2228 ::encode(dn->first, bl);
2229
2230 // primary or remote?
2231 if (dn->linkage.is_remote()) {
2232 inodeno_t ino = dn->linkage.get_remote_ino();
2233 unsigned char d_type = dn->linkage.get_remote_d_type();
2234 dout(14) << " pos " << bl.length() << " dn '" << dn->name << "' remote ino " << ino << dendl;
2235
2236 // marker, name, ino
2237 bl.append('L'); // remote link
2238 ::encode(ino, bl);
2239 ::encode(d_type, bl);
2240 } else if (dn->linkage.is_primary()) {
2241 // primary link
2242 CInode *in = dn->linkage.get_inode();
2243 assert(in);
2244
2245 dout(14) << " pos " << bl.length() << " dn '" << dn->name << "' inode " << *in << dendl;
2246
2247 // marker, name, inode, [symlink string]
2248 bl.append('I'); // inode
2249
2250 if (in->is_multiversion()) {
2251 if (!in->snaprealm) {
2252 if (snaps)
2253 in->purge_stale_snap_data(*snaps);
2254 } else if (in->snaprealm->have_past_parents_open()) {
2255 in->purge_stale_snap_data(in->snaprealm->get_snaps());
2256 }
2257 }
2258
2259 bufferlist snap_blob;
2260 in->encode_snap_blob(snap_blob);
2261 in->encode_bare(bl, cache->mds->mdsmap->get_up_features(), &snap_blob);
2262 } else {
2263 assert(!dn->linkage.is_null());
2264 }
2265 }
2266
2267 void CDir::_commit(version_t want, int op_prio)
2268 {
2269 dout(10) << "_commit want " << want << " on " << *this << dendl;
2270
2271 // we can't commit things in the future.
2272 // (even the projected future.)
2273 assert(want <= get_version() || get_version() == 0);
2274
2275 // check pre+postconditions.
2276 assert(is_auth());
2277
2278 // already committed?
2279 if (committed_version >= want) {
2280 dout(10) << "already committed " << committed_version << " >= " << want << dendl;
2281 return;
2282 }
2283 // already committing >= want?
2284 if (committing_version >= want) {
2285 dout(10) << "already committing " << committing_version << " >= " << want << dendl;
2286 assert(state_test(STATE_COMMITTING));
2287 return;
2288 }
2289
2290 // alrady committed an older version?
2291 if (committing_version > committed_version) {
2292 dout(10) << "already committing older " << committing_version << ", waiting for that to finish" << dendl;
2293 return;
2294 }
2295
2296 // commit.
2297 committing_version = get_version();
2298
2299 // mark committing (if not already)
2300 if (!state_test(STATE_COMMITTING)) {
2301 dout(10) << "marking committing" << dendl;
2302 state_set(STATE_COMMITTING);
2303 }
2304
2305 if (cache->mds->logger) cache->mds->logger->inc(l_mds_dir_commit);
2306
2307 _omap_commit(op_prio);
2308 }
2309
2310
2311 /**
2312 * _committed
2313 *
2314 * @param v version i just committed
2315 */
2316 void CDir::_committed(int r, version_t v)
2317 {
2318 if (r < 0) {
2319 // the directory could be partly purged during MDS failover
2320 if (r == -ENOENT && committed_version == 0 &&
2321 !inode->is_base() && get_parent_dir()->inode->is_stray()) {
2322 r = 0;
2323 if (inode->snaprealm)
2324 inode->state_set(CInode::STATE_MISSINGOBJS);
2325 }
2326 if (r < 0) {
2327 dout(1) << "commit error " << r << " v " << v << dendl;
2328 cache->mds->clog->error() << "failed to commit dir " << dirfrag() << " object,"
2329 << " errno " << r;
2330 cache->mds->handle_write_error(r);
2331 return;
2332 }
2333 }
2334
2335 dout(10) << "_committed v " << v << " on " << *this << dendl;
2336 assert(is_auth());
2337
2338 bool stray = inode->is_stray();
2339
2340 // take note.
2341 assert(v > committed_version);
2342 assert(v <= committing_version);
2343 committed_version = v;
2344
2345 // _all_ commits done?
2346 if (committing_version == committed_version)
2347 state_clear(CDir::STATE_COMMITTING);
2348
2349 // _any_ commit, even if we've been redirtied, means we're no longer new.
2350 item_new.remove_myself();
2351
2352 // dir clean?
2353 if (committed_version == get_version())
2354 mark_clean();
2355
2356 // dentries clean?
2357 for (auto p = dirty_dentries.begin(); !p.end(); ) {
2358 CDentry *dn = *p;
2359 ++p;
2360
2361 // inode?
2362 if (dn->linkage.is_primary()) {
2363 CInode *in = dn->linkage.get_inode();
2364 assert(in);
2365 assert(in->is_auth());
2366
2367 if (committed_version >= in->get_version()) {
2368 if (in->is_dirty()) {
2369 dout(15) << " dir " << committed_version << " >= inode " << in->get_version() << " now clean " << *in << dendl;
2370 in->mark_clean();
2371 }
2372 } else {
2373 dout(15) << " dir " << committed_version << " < inode " << in->get_version() << " still dirty " << *in << dendl;
2374 assert(in->is_dirty() || in->last < CEPH_NOSNAP); // special case for cow snap items (not predirtied)
2375 }
2376 }
2377
2378 // dentry
2379 if (committed_version >= dn->get_version()) {
2380 dout(15) << " dir " << committed_version << " >= dn " << dn->get_version() << " now clean " << *dn << dendl;
2381 dn->mark_clean();
2382
2383 // drop clean null stray dentries immediately
2384 if (stray &&
2385 dn->get_num_ref() == 0 &&
2386 !dn->is_projected() &&
2387 dn->get_linkage()->is_null())
2388 remove_dentry(dn);
2389 } else {
2390 dout(15) << " dir " << committed_version << " < dn " << dn->get_version() << " still dirty " << *dn << dendl;
2391 assert(dn->is_dirty());
2392 }
2393 }
2394
2395 // finishers?
2396 bool were_waiters = !waiting_for_commit.empty();
2397
2398 compact_map<version_t, list<MDSInternalContextBase*> >::iterator p = waiting_for_commit.begin();
2399 while (p != waiting_for_commit.end()) {
2400 compact_map<version_t, list<MDSInternalContextBase*> >::iterator n = p;
2401 ++n;
2402 if (p->first > committed_version) {
2403 dout(10) << " there are waiters for " << p->first << ", committing again" << dendl;
2404 _commit(p->first, -1);
2405 break;
2406 }
2407 cache->mds->queue_waiters(p->second);
2408 waiting_for_commit.erase(p);
2409 p = n;
2410 }
2411
2412 // try drop dentries in this dirfrag if it's about to be purged
2413 if (!inode->is_base() && get_parent_dir()->inode->is_stray() &&
2414 inode->snaprealm)
2415 cache->maybe_eval_stray(inode, true);
2416
2417 // unpin if we kicked the last waiter.
2418 if (were_waiters &&
2419 waiting_for_commit.empty())
2420 auth_unpin(this);
2421 }
2422
2423
2424
2425
2426 // IMPORT/EXPORT
2427
2428 void CDir::encode_export(bufferlist& bl)
2429 {
2430 assert(!is_projected());
2431 ::encode(first, bl);
2432 ::encode(fnode, bl);
2433 ::encode(dirty_old_rstat, bl);
2434 ::encode(committed_version, bl);
2435
2436 ::encode(state, bl);
2437 ::encode(dir_rep, bl);
2438
2439 ::encode(pop_me, bl);
2440 ::encode(pop_auth_subtree, bl);
2441
2442 ::encode(dir_rep_by, bl);
2443 ::encode(get_replicas(), bl);
2444
2445 get(PIN_TEMPEXPORTING);
2446 }
2447
2448 void CDir::finish_export(utime_t now)
2449 {
2450 state &= MASK_STATE_EXPORT_KEPT;
2451 pop_auth_subtree_nested.sub(now, cache->decayrate, pop_auth_subtree);
2452 pop_me.zero(now);
2453 pop_auth_subtree.zero(now);
2454 put(PIN_TEMPEXPORTING);
2455 dirty_old_rstat.clear();
2456 }
2457
2458 void CDir::decode_import(bufferlist::iterator& blp, utime_t now, LogSegment *ls)
2459 {
2460 ::decode(first, blp);
2461 ::decode(fnode, blp);
2462 ::decode(dirty_old_rstat, blp);
2463 projected_version = fnode.version;
2464 ::decode(committed_version, blp);
2465 committing_version = committed_version;
2466
2467 unsigned s;
2468 ::decode(s, blp);
2469 state &= MASK_STATE_IMPORT_KEPT;
2470 state_set(STATE_AUTH | (s & MASK_STATE_EXPORTED));
2471
2472 if (is_dirty()) {
2473 get(PIN_DIRTY);
2474 _mark_dirty(ls);
2475 }
2476
2477 ::decode(dir_rep, blp);
2478
2479 ::decode(pop_me, now, blp);
2480 ::decode(pop_auth_subtree, now, blp);
2481 pop_auth_subtree_nested.add(now, cache->decayrate, pop_auth_subtree);
2482
2483 ::decode(dir_rep_by, blp);
2484 ::decode(get_replicas(), blp);
2485 if (is_replicated()) get(PIN_REPLICATED);
2486
2487 replica_nonce = 0; // no longer defined
2488
2489 // did we import some dirty scatterlock data?
2490 if (dirty_old_rstat.size() ||
2491 !(fnode.rstat == fnode.accounted_rstat)) {
2492 cache->mds->locker->mark_updated_scatterlock(&inode->nestlock);
2493 ls->dirty_dirfrag_nest.push_back(&inode->item_dirty_dirfrag_nest);
2494 }
2495 if (!(fnode.fragstat == fnode.accounted_fragstat)) {
2496 cache->mds->locker->mark_updated_scatterlock(&inode->filelock);
2497 ls->dirty_dirfrag_dir.push_back(&inode->item_dirty_dirfrag_dir);
2498 }
2499 if (is_dirty_dft()) {
2500 if (inode->dirfragtreelock.get_state() != LOCK_MIX &&
2501 inode->dirfragtreelock.is_stable()) {
2502 // clear stale dirtydft
2503 state_clear(STATE_DIRTYDFT);
2504 } else {
2505 cache->mds->locker->mark_updated_scatterlock(&inode->dirfragtreelock);
2506 ls->dirty_dirfrag_dirfragtree.push_back(&inode->item_dirty_dirfrag_dirfragtree);
2507 }
2508 }
2509 }
2510
2511
2512
2513
2514 /********************************
2515 * AUTHORITY
2516 */
2517
2518 /*
2519 * if dir_auth.first == parent, auth is same as inode.
2520 * unless .second != unknown, in which case that sticks.
2521 */
2522 mds_authority_t CDir::authority() const
2523 {
2524 if (is_subtree_root())
2525 return dir_auth;
2526 else
2527 return inode->authority();
2528 }
2529
2530 /** is_subtree_root()
2531 * true if this is an auth delegation point.
2532 * that is, dir_auth != default (parent,unknown)
2533 *
2534 * some key observations:
2535 * if i am auth:
2536 * - any region bound will be an export, or frozen.
2537 *
2538 * note that this DOES heed dir_auth.pending
2539 */
2540 /*
2541 bool CDir::is_subtree_root()
2542 {
2543 if (dir_auth == CDIR_AUTH_DEFAULT) {
2544 //dout(10) << "is_subtree_root false " << dir_auth << " != " << CDIR_AUTH_DEFAULT
2545 //<< " on " << ino() << dendl;
2546 return false;
2547 } else {
2548 //dout(10) << "is_subtree_root true " << dir_auth << " != " << CDIR_AUTH_DEFAULT
2549 //<< " on " << ino() << dendl;
2550 return true;
2551 }
2552 }
2553 */
2554
2555 /** contains(x)
2556 * true if we are x, or an ancestor of x
2557 */
2558 bool CDir::contains(CDir *x)
2559 {
2560 while (1) {
2561 if (x == this)
2562 return true;
2563 x = x->get_inode()->get_projected_parent_dir();
2564 if (x == 0)
2565 return false;
2566 }
2567 }
2568
2569
2570
2571 /** set_dir_auth
2572 */
2573 void CDir::set_dir_auth(mds_authority_t a)
2574 {
2575 dout(10) << "setting dir_auth=" << a
2576 << " from " << dir_auth
2577 << " on " << *this << dendl;
2578
2579 bool was_subtree = is_subtree_root();
2580 bool was_ambiguous = dir_auth.second >= 0;
2581
2582 // set it.
2583 dir_auth = a;
2584
2585 // new subtree root?
2586 if (!was_subtree && is_subtree_root()) {
2587 dout(10) << " new subtree root, adjusting auth_pins" << dendl;
2588
2589 // adjust nested auth pins
2590 if (get_cum_auth_pins())
2591 inode->adjust_nested_auth_pins(-1, NULL);
2592
2593 // unpin parent of frozen dir/tree?
2594 if (inode->is_auth()) {
2595 assert(!is_frozen_tree_root());
2596 if (is_frozen_dir())
2597 inode->auth_unpin(this);
2598 }
2599 }
2600 if (was_subtree && !is_subtree_root()) {
2601 dout(10) << " old subtree root, adjusting auth_pins" << dendl;
2602
2603 // adjust nested auth pins
2604 if (get_cum_auth_pins())
2605 inode->adjust_nested_auth_pins(1, NULL);
2606
2607 // pin parent of frozen dir/tree?
2608 if (inode->is_auth()) {
2609 assert(!is_frozen_tree_root());
2610 if (is_frozen_dir())
2611 inode->auth_pin(this);
2612 }
2613 }
2614
2615 // newly single auth?
2616 if (was_ambiguous && dir_auth.second == CDIR_AUTH_UNKNOWN) {
2617 list<MDSInternalContextBase*> ls;
2618 take_waiting(WAIT_SINGLEAUTH, ls);
2619 cache->mds->queue_waiters(ls);
2620 }
2621 }
2622
2623
2624 /*****************************************
2625 * AUTH PINS and FREEZING
2626 *
2627 * the basic plan is that auth_pins only exist in auth regions, and they
2628 * prevent a freeze (and subsequent auth change).
2629 *
2630 * however, we also need to prevent a parent from freezing if a child is frozen.
2631 * for that reason, the parent inode of a frozen directory is auth_pinned.
2632 *
2633 * the oddity is when the frozen directory is a subtree root. if that's the case,
2634 * the parent inode isn't frozen. which means that when subtree authority is adjusted
2635 * at the bounds, inodes for any frozen bound directories need to get auth_pins at that
2636 * time.
2637 *
2638 */
2639
2640 void CDir::auth_pin(void *by)
2641 {
2642 if (auth_pins == 0)
2643 get(PIN_AUTHPIN);
2644 auth_pins++;
2645
2646 #ifdef MDS_AUTHPIN_SET
2647 auth_pin_set.insert(by);
2648 #endif
2649
2650 dout(10) << "auth_pin by " << by
2651 << " on " << *this
2652 << " count now " << auth_pins << " + " << nested_auth_pins << dendl;
2653
2654 // nest pins?
2655 if (!is_subtree_root() &&
2656 get_cum_auth_pins() == 1)
2657 inode->adjust_nested_auth_pins(1, by);
2658 }
2659
2660 void CDir::auth_unpin(void *by)
2661 {
2662 auth_pins--;
2663
2664 #ifdef MDS_AUTHPIN_SET
2665 assert(auth_pin_set.count(by));
2666 auth_pin_set.erase(auth_pin_set.find(by));
2667 #endif
2668 if (auth_pins == 0)
2669 put(PIN_AUTHPIN);
2670
2671 dout(10) << "auth_unpin by " << by
2672 << " on " << *this
2673 << " count now " << auth_pins << " + " << nested_auth_pins << dendl;
2674 assert(auth_pins >= 0);
2675
2676 int newcum = get_cum_auth_pins();
2677
2678 maybe_finish_freeze(); // pending freeze?
2679
2680 // nest?
2681 if (!is_subtree_root() &&
2682 newcum == 0)
2683 inode->adjust_nested_auth_pins(-1, by);
2684 }
2685
2686 void CDir::adjust_nested_auth_pins(int inc, int dirinc, void *by)
2687 {
2688 assert(inc);
2689 nested_auth_pins += inc;
2690 dir_auth_pins += dirinc;
2691
2692 dout(15) << "adjust_nested_auth_pins " << inc << "/" << dirinc << " on " << *this
2693 << " by " << by << " count now "
2694 << auth_pins << " + " << nested_auth_pins << dendl;
2695 assert(nested_auth_pins >= 0);
2696 assert(dir_auth_pins >= 0);
2697
2698 int newcum = get_cum_auth_pins();
2699
2700 maybe_finish_freeze(); // pending freeze?
2701
2702 // nest?
2703 if (!is_subtree_root()) {
2704 if (newcum == 0)
2705 inode->adjust_nested_auth_pins(-1, by);
2706 else if (newcum == inc)
2707 inode->adjust_nested_auth_pins(1, by);
2708 }
2709 }
2710
2711 #ifdef MDS_VERIFY_FRAGSTAT
2712 void CDir::verify_fragstat()
2713 {
2714 assert(is_complete());
2715 if (inode->is_stray())
2716 return;
2717
2718 frag_info_t c;
2719 memset(&c, 0, sizeof(c));
2720
2721 for (map_t::iterator it = items.begin();
2722 it != items.end();
2723 ++it) {
2724 CDentry *dn = it->second;
2725 if (dn->is_null())
2726 continue;
2727
2728 dout(10) << " " << *dn << dendl;
2729 if (dn->is_primary())
2730 dout(10) << " " << *dn->inode << dendl;
2731
2732 if (dn->is_primary()) {
2733 if (dn->inode->is_dir())
2734 c.nsubdirs++;
2735 else
2736 c.nfiles++;
2737 }
2738 if (dn->is_remote()) {
2739 if (dn->get_remote_d_type() == DT_DIR)
2740 c.nsubdirs++;
2741 else
2742 c.nfiles++;
2743 }
2744 }
2745
2746 if (c.nsubdirs != fnode.fragstat.nsubdirs ||
2747 c.nfiles != fnode.fragstat.nfiles) {
2748 dout(0) << "verify_fragstat failed " << fnode.fragstat << " on " << *this << dendl;
2749 dout(0) << " i count " << c << dendl;
2750 ceph_abort();
2751 } else {
2752 dout(0) << "verify_fragstat ok " << fnode.fragstat << " on " << *this << dendl;
2753 }
2754 }
2755 #endif
2756
2757 /*****************************************************************************
2758 * FREEZING
2759 */
2760
2761 // FREEZE TREE
2762
2763 bool CDir::freeze_tree()
2764 {
2765 assert(!is_frozen());
2766 assert(!is_freezing());
2767
2768 auth_pin(this);
2769 if (is_freezeable(true)) {
2770 _freeze_tree();
2771 auth_unpin(this);
2772 return true;
2773 } else {
2774 state_set(STATE_FREEZINGTREE);
2775 ++num_freezing_trees;
2776 dout(10) << "freeze_tree waiting " << *this << dendl;
2777 return false;
2778 }
2779 }
2780
2781 void CDir::_freeze_tree()
2782 {
2783 dout(10) << "_freeze_tree " << *this << dendl;
2784 assert(is_freezeable(true));
2785
2786 // twiddle state
2787 if (state_test(STATE_FREEZINGTREE)) {
2788 state_clear(STATE_FREEZINGTREE); // actually, this may get set again by next context?
2789 --num_freezing_trees;
2790 }
2791
2792 if (is_auth()) {
2793 mds_authority_t auth;
2794 bool was_subtree = is_subtree_root();
2795 if (was_subtree) {
2796 auth = get_dir_auth();
2797 } else {
2798 // temporarily prevent parent subtree from becoming frozen.
2799 inode->auth_pin(this);
2800 // create new subtree
2801 auth = authority();
2802 }
2803
2804 assert(auth.first >= 0);
2805 assert(auth.second == CDIR_AUTH_UNKNOWN);
2806 auth.second = auth.first;
2807 inode->mdcache->adjust_subtree_auth(this, auth);
2808 if (!was_subtree)
2809 inode->auth_unpin(this);
2810 }
2811
2812 state_set(STATE_FROZENTREE);
2813 ++num_frozen_trees;
2814 get(PIN_FROZEN);
2815 }
2816
2817 void CDir::unfreeze_tree()
2818 {
2819 dout(10) << "unfreeze_tree " << *this << dendl;
2820
2821 if (state_test(STATE_FROZENTREE)) {
2822 // frozen. unfreeze.
2823 state_clear(STATE_FROZENTREE);
2824 --num_frozen_trees;
2825
2826 put(PIN_FROZEN);
2827
2828 if (is_auth()) {
2829 // must be subtree
2830 assert(is_subtree_root());
2831 // for debug purpose, caller should ensure 'dir_auth.second == dir_auth.first'
2832 mds_authority_t auth = get_dir_auth();
2833 assert(auth.first >= 0);
2834 assert(auth.second == auth.first);
2835 auth.second = CDIR_AUTH_UNKNOWN;
2836 inode->mdcache->adjust_subtree_auth(this, auth);
2837 }
2838
2839 // waiters?
2840 finish_waiting(WAIT_UNFREEZE);
2841 } else {
2842 finish_waiting(WAIT_FROZEN, -1);
2843
2844 // freezing. stop it.
2845 assert(state_test(STATE_FREEZINGTREE));
2846 state_clear(STATE_FREEZINGTREE);
2847 --num_freezing_trees;
2848 auth_unpin(this);
2849
2850 finish_waiting(WAIT_UNFREEZE);
2851 }
2852 }
2853
2854 bool CDir::is_freezing_tree() const
2855 {
2856 if (num_freezing_trees == 0)
2857 return false;
2858 const CDir *dir = this;
2859 while (1) {
2860 if (dir->is_freezing_tree_root()) return true;
2861 if (dir->is_subtree_root()) return false;
2862 if (dir->inode->parent)
2863 dir = dir->inode->parent->dir;
2864 else
2865 return false; // root on replica
2866 }
2867 }
2868
2869 bool CDir::is_frozen_tree() const
2870 {
2871 if (num_frozen_trees == 0)
2872 return false;
2873 const CDir *dir = this;
2874 while (1) {
2875 if (dir->is_frozen_tree_root()) return true;
2876 if (dir->is_subtree_root()) return false;
2877 if (dir->inode->parent)
2878 dir = dir->inode->parent->dir;
2879 else
2880 return false; // root on replica
2881 }
2882 }
2883
2884 CDir *CDir::get_frozen_tree_root()
2885 {
2886 assert(is_frozen());
2887 CDir *dir = this;
2888 while (1) {
2889 if (dir->is_frozen_tree_root())
2890 return dir;
2891 if (dir->inode->parent)
2892 dir = dir->inode->parent->dir;
2893 else
2894 ceph_abort();
2895 }
2896 }
2897
2898 class C_Dir_AuthUnpin : public CDirContext {
2899 public:
2900 explicit C_Dir_AuthUnpin(CDir *d) : CDirContext(d) {}
2901 void finish(int r) override {
2902 dir->auth_unpin(dir->get_inode());
2903 }
2904 };
2905
2906 void CDir::maybe_finish_freeze()
2907 {
2908 if (auth_pins != 1 || dir_auth_pins != 0)
2909 return;
2910
2911 // we can freeze the _dir_ even with nested pins...
2912 if (state_test(STATE_FREEZINGDIR)) {
2913 _freeze_dir();
2914 auth_unpin(this);
2915 finish_waiting(WAIT_FROZEN);
2916 }
2917
2918 if (nested_auth_pins != 0)
2919 return;
2920
2921 if (state_test(STATE_FREEZINGTREE)) {
2922 if (!is_subtree_root() && inode->is_frozen()) {
2923 dout(10) << "maybe_finish_freeze !subtree root and frozen inode, waiting for unfreeze on " << inode << dendl;
2924 // retake an auth_pin...
2925 auth_pin(inode);
2926 // and release it when the parent inode unfreezes
2927 inode->add_waiter(WAIT_UNFREEZE, new C_Dir_AuthUnpin(this));
2928 return;
2929 }
2930
2931 _freeze_tree();
2932 auth_unpin(this);
2933 finish_waiting(WAIT_FROZEN);
2934 }
2935 }
2936
2937
2938
2939 // FREEZE DIR
2940
2941 bool CDir::freeze_dir()
2942 {
2943 assert(!is_frozen());
2944 assert(!is_freezing());
2945
2946 auth_pin(this);
2947 if (is_freezeable_dir(true)) {
2948 _freeze_dir();
2949 auth_unpin(this);
2950 return true;
2951 } else {
2952 state_set(STATE_FREEZINGDIR);
2953 dout(10) << "freeze_dir + wait " << *this << dendl;
2954 return false;
2955 }
2956 }
2957
2958 void CDir::_freeze_dir()
2959 {
2960 dout(10) << "_freeze_dir " << *this << dendl;
2961 //assert(is_freezeable_dir(true));
2962 // not always true during split because the original fragment may have frozen a while
2963 // ago and we're just now getting around to breaking it up.
2964
2965 state_clear(STATE_FREEZINGDIR);
2966 state_set(STATE_FROZENDIR);
2967 get(PIN_FROZEN);
2968
2969 if (is_auth() && !is_subtree_root())
2970 inode->auth_pin(this); // auth_pin for duration of freeze
2971 }
2972
2973
2974 void CDir::unfreeze_dir()
2975 {
2976 dout(10) << "unfreeze_dir " << *this << dendl;
2977
2978 if (state_test(STATE_FROZENDIR)) {
2979 state_clear(STATE_FROZENDIR);
2980 put(PIN_FROZEN);
2981
2982 // unpin (may => FREEZEABLE) FIXME: is this order good?
2983 if (is_auth() && !is_subtree_root())
2984 inode->auth_unpin(this);
2985
2986 finish_waiting(WAIT_UNFREEZE);
2987 } else {
2988 finish_waiting(WAIT_FROZEN, -1);
2989
2990 // still freezing. stop.
2991 assert(state_test(STATE_FREEZINGDIR));
2992 state_clear(STATE_FREEZINGDIR);
2993 auth_unpin(this);
2994
2995 finish_waiting(WAIT_UNFREEZE);
2996 }
2997 }
2998
2999 /**
3000 * Slightly less complete than operator<<, because this is intended
3001 * for identifying a directory and its state rather than for dumping
3002 * debug output.
3003 */
3004 void CDir::dump(Formatter *f) const
3005 {
3006 assert(f != NULL);
3007
3008 f->dump_stream("path") << get_path();
3009
3010 f->dump_stream("dirfrag") << dirfrag();
3011 f->dump_int("snapid_first", first);
3012
3013 f->dump_stream("projected_version") << get_projected_version();
3014 f->dump_stream("version") << get_version();
3015 f->dump_stream("committing_version") << get_committing_version();
3016 f->dump_stream("committed_version") << get_committed_version();
3017
3018 f->dump_bool("is_rep", is_rep());
3019
3020 if (get_dir_auth() != CDIR_AUTH_DEFAULT) {
3021 if (get_dir_auth().second == CDIR_AUTH_UNKNOWN) {
3022 f->dump_stream("dir_auth") << get_dir_auth().first;
3023 } else {
3024 f->dump_stream("dir_auth") << get_dir_auth();
3025 }
3026 } else {
3027 f->dump_string("dir_auth", "");
3028 }
3029
3030 f->open_array_section("states");
3031 MDSCacheObject::dump_states(f);
3032 if (state_test(CDir::STATE_COMPLETE)) f->dump_string("state", "complete");
3033 if (state_test(CDir::STATE_FREEZINGTREE)) f->dump_string("state", "freezingtree");
3034 if (state_test(CDir::STATE_FROZENTREE)) f->dump_string("state", "frozentree");
3035 if (state_test(CDir::STATE_FROZENDIR)) f->dump_string("state", "frozendir");
3036 if (state_test(CDir::STATE_FREEZINGDIR)) f->dump_string("state", "freezingdir");
3037 if (state_test(CDir::STATE_EXPORTBOUND)) f->dump_string("state", "exportbound");
3038 if (state_test(CDir::STATE_IMPORTBOUND)) f->dump_string("state", "importbound");
3039 if (state_test(CDir::STATE_BADFRAG)) f->dump_string("state", "badfrag");
3040 f->close_section();
3041
3042 MDSCacheObject::dump(f);
3043 }
3044
3045 /****** Scrub Stuff *******/
3046
3047 void CDir::scrub_info_create() const
3048 {
3049 assert(!scrub_infop);
3050
3051 // break out of const-land to set up implicit initial state
3052 CDir *me = const_cast<CDir*>(this);
3053 fnode_t *fn = me->get_projected_fnode();
3054
3055 std::unique_ptr<scrub_info_t> si(new scrub_info_t());
3056
3057 si->last_recursive.version = si->recursive_start.version =
3058 fn->recursive_scrub_version;
3059 si->last_recursive.time = si->recursive_start.time =
3060 fn->recursive_scrub_stamp;
3061
3062 si->last_local.version = fn->localized_scrub_version;
3063 si->last_local.time = fn->localized_scrub_stamp;
3064
3065 me->scrub_infop.swap(si);
3066 }
3067
3068 void CDir::scrub_initialize(const ScrubHeaderRefConst& header)
3069 {
3070 dout(20) << __func__ << dendl;
3071 assert(is_complete());
3072 assert(header != nullptr);
3073
3074 // FIXME: weird implicit construction, is someone else meant
3075 // to be calling scrub_info_create first?
3076 scrub_info();
3077 assert(scrub_infop && !scrub_infop->directory_scrubbing);
3078
3079 scrub_infop->recursive_start.version = get_projected_version();
3080 scrub_infop->recursive_start.time = ceph_clock_now();
3081
3082 scrub_infop->directories_to_scrub.clear();
3083 scrub_infop->directories_scrubbing.clear();
3084 scrub_infop->directories_scrubbed.clear();
3085 scrub_infop->others_to_scrub.clear();
3086 scrub_infop->others_scrubbing.clear();
3087 scrub_infop->others_scrubbed.clear();
3088
3089 for (map_t::iterator i = items.begin();
3090 i != items.end();
3091 ++i) {
3092 // TODO: handle snapshot scrubbing
3093 if (i->first.snapid != CEPH_NOSNAP)
3094 continue;
3095
3096 CDentry::linkage_t *dnl = i->second->get_projected_linkage();
3097 if (dnl->is_primary()) {
3098 if (dnl->get_inode()->is_dir())
3099 scrub_infop->directories_to_scrub.insert(i->first);
3100 else
3101 scrub_infop->others_to_scrub.insert(i->first);
3102 } else if (dnl->is_remote()) {
3103 // TODO: check remote linkage
3104 }
3105 }
3106 scrub_infop->directory_scrubbing = true;
3107 scrub_infop->header = header;
3108 }
3109
3110 void CDir::scrub_finished()
3111 {
3112 dout(20) << __func__ << dendl;
3113 assert(scrub_infop && scrub_infop->directory_scrubbing);
3114
3115 assert(scrub_infop->directories_to_scrub.empty());
3116 assert(scrub_infop->directories_scrubbing.empty());
3117 scrub_infop->directories_scrubbed.clear();
3118 assert(scrub_infop->others_to_scrub.empty());
3119 assert(scrub_infop->others_scrubbing.empty());
3120 scrub_infop->others_scrubbed.clear();
3121 scrub_infop->directory_scrubbing = false;
3122
3123 scrub_infop->last_recursive = scrub_infop->recursive_start;
3124 scrub_infop->last_scrub_dirty = true;
3125 }
3126
3127 int CDir::_next_dentry_on_set(set<dentry_key_t>& dns, bool missing_okay,
3128 MDSInternalContext *cb, CDentry **dnout)
3129 {
3130 dentry_key_t dnkey;
3131 CDentry *dn;
3132
3133 while (!dns.empty()) {
3134 set<dentry_key_t>::iterator front = dns.begin();
3135 dnkey = *front;
3136 dn = lookup(dnkey.name);
3137 if (!dn) {
3138 if (!is_complete() &&
3139 (!has_bloom() || is_in_bloom(dnkey.name))) {
3140 // need to re-read this dirfrag
3141 fetch(cb);
3142 return EAGAIN;
3143 }
3144 // okay, we lost it
3145 if (missing_okay) {
3146 dout(15) << " we no longer have directory dentry "
3147 << dnkey.name << ", assuming it got renamed" << dendl;
3148 dns.erase(dnkey);
3149 continue;
3150 } else {
3151 dout(5) << " we lost dentry " << dnkey.name
3152 << ", bailing out because that's impossible!" << dendl;
3153 ceph_abort();
3154 }
3155 }
3156 // okay, we got a dentry
3157 dns.erase(dnkey);
3158
3159 if (dn->get_projected_version() < scrub_infop->last_recursive.version &&
3160 !(scrub_infop->header->get_force())) {
3161 dout(15) << " skip dentry " << dnkey.name
3162 << ", no change since last scrub" << dendl;
3163 continue;
3164 }
3165
3166 *dnout = dn;
3167 return 0;
3168 }
3169 *dnout = NULL;
3170 return ENOENT;
3171 }
3172
3173 int CDir::scrub_dentry_next(MDSInternalContext *cb, CDentry **dnout)
3174 {
3175 dout(20) << __func__ << dendl;
3176 assert(scrub_infop && scrub_infop->directory_scrubbing);
3177
3178 dout(20) << "trying to scrub directories underneath us" << dendl;
3179 int rval = _next_dentry_on_set(scrub_infop->directories_to_scrub, true,
3180 cb, dnout);
3181 if (rval == 0) {
3182 dout(20) << __func__ << " inserted to directories scrubbing: "
3183 << *dnout << dendl;
3184 scrub_infop->directories_scrubbing.insert((*dnout)->key());
3185 } else if (rval == EAGAIN) {
3186 // we don't need to do anything else
3187 } else { // we emptied out the directory scrub set
3188 assert(rval == ENOENT);
3189 dout(20) << "no directories left, moving on to other kinds of dentries"
3190 << dendl;
3191
3192 rval = _next_dentry_on_set(scrub_infop->others_to_scrub, false, cb, dnout);
3193 if (rval == 0) {
3194 dout(20) << __func__ << " inserted to others scrubbing: "
3195 << *dnout << dendl;
3196 scrub_infop->others_scrubbing.insert((*dnout)->key());
3197 }
3198 }
3199 dout(20) << " returning " << rval << " with dn=" << *dnout << dendl;
3200 return rval;
3201 }
3202
3203 void CDir::scrub_dentries_scrubbing(list<CDentry*> *out_dentries)
3204 {
3205 dout(20) << __func__ << dendl;
3206 assert(scrub_infop && scrub_infop->directory_scrubbing);
3207
3208 for (set<dentry_key_t>::iterator i =
3209 scrub_infop->directories_scrubbing.begin();
3210 i != scrub_infop->directories_scrubbing.end();
3211 ++i) {
3212 CDentry *d = lookup(i->name, i->snapid);
3213 assert(d);
3214 out_dentries->push_back(d);
3215 }
3216 for (set<dentry_key_t>::iterator i = scrub_infop->others_scrubbing.begin();
3217 i != scrub_infop->others_scrubbing.end();
3218 ++i) {
3219 CDentry *d = lookup(i->name, i->snapid);
3220 assert(d);
3221 out_dentries->push_back(d);
3222 }
3223 }
3224
3225 void CDir::scrub_dentry_finished(CDentry *dn)
3226 {
3227 dout(20) << __func__ << " on dn " << *dn << dendl;
3228 assert(scrub_infop && scrub_infop->directory_scrubbing);
3229 dentry_key_t dn_key = dn->key();
3230 if (scrub_infop->directories_scrubbing.erase(dn_key)) {
3231 scrub_infop->directories_scrubbed.insert(dn_key);
3232 } else {
3233 assert(scrub_infop->others_scrubbing.count(dn_key));
3234 scrub_infop->others_scrubbing.erase(dn_key);
3235 scrub_infop->others_scrubbed.insert(dn_key);
3236 }
3237 }
3238
3239 void CDir::scrub_maybe_delete_info()
3240 {
3241 if (scrub_infop &&
3242 !scrub_infop->directory_scrubbing &&
3243 !scrub_infop->need_scrub_local &&
3244 !scrub_infop->last_scrub_dirty &&
3245 !scrub_infop->pending_scrub_error &&
3246 scrub_infop->dirty_scrub_stamps.empty()) {
3247 scrub_infop.reset();
3248 }
3249 }
3250
3251 bool CDir::scrub_local()
3252 {
3253 assert(is_complete());
3254 bool rval = check_rstats(true);
3255
3256 scrub_info();
3257 if (rval) {
3258 scrub_infop->last_local.time = ceph_clock_now();
3259 scrub_infop->last_local.version = get_projected_version();
3260 scrub_infop->pending_scrub_error = false;
3261 scrub_infop->last_scrub_dirty = true;
3262 } else {
3263 scrub_infop->pending_scrub_error = true;
3264 if (scrub_infop->header->get_repair())
3265 cache->repair_dirfrag_stats(this);
3266 }
3267 return rval;
3268 }
3269
3270 std::string CDir::get_path() const
3271 {
3272 std::string path;
3273 get_inode()->make_path_string(path, true);
3274 return path;
3275 }
3276
3277 bool CDir::should_split_fast() const
3278 {
3279 // Max size a fragment can be before trigger fast splitting
3280 int fast_limit = g_conf->mds_bal_split_size * g_conf->mds_bal_fragment_fast_factor;
3281
3282 // Fast path: the sum of accounted size and null dentries does not
3283 // exceed threshold: we definitely are not over it.
3284 if (get_frag_size() + get_num_head_null() <= fast_limit) {
3285 return false;
3286 }
3287
3288 // Fast path: the accounted size of the frag exceeds threshold: we
3289 // definitely are over it
3290 if (get_frag_size() > fast_limit) {
3291 return true;
3292 }
3293
3294 int64_t effective_size = 0;
3295
3296 for (const auto &p : items) {
3297 const CDentry *dn = p.second;
3298 if (!dn->get_projected_linkage()->is_null()) {
3299 effective_size++;
3300 }
3301 }
3302
3303 return effective_size > fast_limit;
3304 }
3305
3306 MEMPOOL_DEFINE_OBJECT_FACTORY(CDir, co_dir, mds_co);