1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2018 Red Hat
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
16 #include "mds/CInode.h"
18 #include "mds/MDSRank.h"
19 #include "mds/MDCache.h"
20 #include "osdc/Objecter.h"
21 #include "OpenFileTable.h"
23 #include "common/config.h"
24 #include "common/errno.h"
27 l_oft_first
= 1000000,
28 l_oft_omap_total_objs
,
29 l_oft_omap_total_kv_pairs
,
30 l_oft_omap_total_updates
,
31 l_oft_omap_total_removes
,
35 #define dout_context g_ceph_context
36 #define dout_subsys ceph_subsys_mds
38 #define dout_prefix _prefix(_dout, mds)
39 static ostream
& _prefix(std::ostream
*_dout
, MDSRank
*mds
) {
40 return *_dout
<< "mds." << mds
->get_nodeid() << ".openfiles ";
43 OpenFileTable::OpenFileTable(MDSRank
*m
) : mds(m
) {
44 PerfCountersBuilder
b(mds
->cct
, "oft", l_oft_first
, l_oft_last
);
46 b
.add_u64(l_oft_omap_total_objs
, "omap_total_objs");
47 b
.add_u64(l_oft_omap_total_kv_pairs
, "omap_total_kv_pairs");
48 b
.add_u64(l_oft_omap_total_updates
, "omap_total_updates");
49 b
.add_u64(l_oft_omap_total_removes
, "omap_total_removes");
50 logger
.reset(b
.create_perf_counters());
51 mds
->cct
->get_perfcounters_collection()->add(logger
.get());
52 logger
->set(l_oft_omap_total_objs
, 0);
53 logger
->set(l_oft_omap_total_kv_pairs
, 0);
54 logger
->set(l_oft_omap_total_updates
, 0);
55 logger
->set(l_oft_omap_total_removes
, 0);
58 OpenFileTable::~OpenFileTable() {
60 mds
->cct
->get_perfcounters_collection()->remove(logger
.get());
64 void OpenFileTable::get_ref(CInode
*in
)
67 auto p
= anchor_map
.find(in
->ino());
68 if (p
!= anchor_map
.end()) {
69 ceph_assert(in
->state_test(CInode::STATE_TRACKEDBYOFT
));
70 ceph_assert(p
->second
.nref
> 0);
75 CDentry
*dn
= in
->get_parent_dn();
76 CInode
*pin
= dn
? dn
->get_dir()->get_inode() : nullptr;
78 auto ret
= anchor_map
.emplace(std::piecewise_construct
, std::forward_as_tuple(in
->ino()),
79 std::forward_as_tuple(in
->ino(), (pin
? pin
->ino() : inodeno_t(0)),
80 (dn
? dn
->get_name() : string()), in
->d_type(), 1));
81 ceph_assert(ret
.second
== true);
82 in
->state_set(CInode::STATE_TRACKEDBYOFT
);
84 auto ret1
= dirty_items
.emplace(in
->ino(), (int)DIRTY_NEW
);
86 int omap_idx
= ret1
.first
->second
;
87 ceph_assert(omap_idx
>= 0);
88 ret
.first
->second
.omap_idx
= omap_idx
;
95 void OpenFileTable::put_ref(CInode
*in
)
98 ceph_assert(in
->state_test(CInode::STATE_TRACKEDBYOFT
));
99 auto p
= anchor_map
.find(in
->ino());
100 ceph_assert(p
!= anchor_map
.end());
101 ceph_assert(p
->second
.nref
> 0);
103 if (p
->second
.nref
> 1) {
108 CDentry
*dn
= in
->get_parent_dn();
109 CInode
*pin
= dn
? dn
->get_dir()->get_inode() : nullptr;
111 ceph_assert(p
->second
.dirino
== pin
->ino());
112 ceph_assert(p
->second
.d_name
== dn
->get_name());
114 ceph_assert(p
->second
.dirino
== inodeno_t(0));
115 ceph_assert(p
->second
.d_name
== "");
118 int omap_idx
= p
->second
.omap_idx
;
120 in
->state_clear(CInode::STATE_TRACKEDBYOFT
);
122 auto ret
= dirty_items
.emplace(in
->ino(), omap_idx
);
124 if (ret
.first
->second
== DIRTY_NEW
) {
125 ceph_assert(omap_idx
< 0);
126 dirty_items
.erase(ret
.first
);
128 ceph_assert(omap_idx
>= 0);
129 ret
.first
->second
= omap_idx
;
137 void OpenFileTable::add_inode(CInode
*in
)
139 dout(10) << __func__
<< " " << *in
<< dendl
;
141 auto p
= anchor_map
.find(in
->ino());
142 ceph_assert(p
== anchor_map
.end());
147 void OpenFileTable::remove_inode(CInode
*in
)
149 dout(10) << __func__
<< " " << *in
<< dendl
;
151 auto p
= anchor_map
.find(in
->ino());
152 ceph_assert(p
!= anchor_map
.end());
153 ceph_assert(p
->second
.nref
== 1);
158 void OpenFileTable::add_dirfrag(CDir
*dir
)
160 dout(10) << __func__
<< " " << *dir
<< dendl
;
161 ceph_assert(!dir
->state_test(CDir::STATE_TRACKEDBYOFT
));
162 dir
->state_set(CDir::STATE_TRACKEDBYOFT
);
163 auto ret
= dirfrags
.insert(dir
->dirfrag());
164 ceph_assert(ret
.second
);
165 get_ref(dir
->get_inode());
166 dirty_items
.emplace(dir
->ino(), (int)DIRTY_UNDEF
);
169 void OpenFileTable::remove_dirfrag(CDir
*dir
)
171 dout(10) << __func__
<< " " << *dir
<< dendl
;
172 ceph_assert(dir
->state_test(CDir::STATE_TRACKEDBYOFT
));
173 dir
->state_clear(CDir::STATE_TRACKEDBYOFT
);
174 auto p
= dirfrags
.find(dir
->dirfrag());
175 ceph_assert(p
!= dirfrags
.end());
177 dirty_items
.emplace(dir
->ino(), (int)DIRTY_UNDEF
);
178 put_ref(dir
->get_inode());
181 void OpenFileTable::notify_link(CInode
*in
)
183 dout(10) << __func__
<< " " << *in
<< dendl
;
184 auto p
= anchor_map
.find(in
->ino());
185 ceph_assert(p
!= anchor_map
.end());
186 ceph_assert(p
->second
.nref
> 0);
187 ceph_assert(p
->second
.dirino
== inodeno_t(0));
188 ceph_assert(p
->second
.d_name
== "");
190 CDentry
*dn
= in
->get_parent_dn();
191 CInode
*pin
= dn
->get_dir()->get_inode();
193 p
->second
.dirino
= pin
->ino();
194 p
->second
.d_name
= dn
->get_name();
195 dirty_items
.emplace(in
->ino(), (int)DIRTY_UNDEF
);
200 void OpenFileTable::notify_unlink(CInode
*in
)
202 dout(10) << __func__
<< " " << *in
<< dendl
;
203 auto p
= anchor_map
.find(in
->ino());
204 ceph_assert(p
!= anchor_map
.end());
205 ceph_assert(p
->second
.nref
> 0);
207 CDentry
*dn
= in
->get_parent_dn();
208 CInode
*pin
= dn
->get_dir()->get_inode();
209 ceph_assert(p
->second
.dirino
== pin
->ino());
210 ceph_assert(p
->second
.d_name
== dn
->get_name());
212 p
->second
.dirino
= inodeno_t(0);
213 p
->second
.d_name
= "";
214 dirty_items
.emplace(in
->ino(), (int)DIRTY_UNDEF
);
219 object_t
OpenFileTable::get_object_name(unsigned idx
) const
222 snprintf(s
, sizeof(s
), "mds%d_openfiles.%x", int(mds
->get_nodeid()), idx
);
226 void OpenFileTable::_encode_header(bufferlist
&bl
, int j_state
)
228 std::string_view magic
= CEPH_FS_ONDISK_MAGIC
;
230 ENCODE_START(1, 1, bl
);
231 encode(omap_version
, bl
);
232 encode(omap_num_objs
, bl
);
233 encode((__u8
)j_state
, bl
);
237 class C_IO_OFT_Save
: public MDSIOContextBase
{
242 MDSRank
*get_mds() override
{ return oft
->mds
; }
244 C_IO_OFT_Save(OpenFileTable
*t
, uint64_t s
, MDSContext
*c
) :
245 oft(t
), log_seq(s
), fin(c
) {}
247 oft
->_commit_finish(r
, log_seq
, fin
);
249 void print(ostream
& out
) const override
{
250 out
<< "openfiles_save";
254 void OpenFileTable::_commit_finish(int r
, uint64_t log_seq
, MDSContext
*fin
)
256 dout(10) << __func__
<< " log_seq " << log_seq
<< dendl
;
258 mds
->handle_write_error(r
);
262 ceph_assert(log_seq
<= committing_log_seq
);
263 ceph_assert(log_seq
>= committed_log_seq
);
264 committed_log_seq
= log_seq
;
265 num_pending_commit
--;
271 class C_IO_OFT_Journal
: public MDSIOContextBase
{
276 std::map
<unsigned, std::vector
<ObjectOperation
> > ops_map
;
277 MDSRank
*get_mds() override
{ return oft
->mds
; }
279 C_IO_OFT_Journal(OpenFileTable
*t
, uint64_t s
, MDSContext
*c
,
280 std::map
<unsigned, std::vector
<ObjectOperation
> >& ops
) :
281 oft(t
), log_seq(s
), fin(c
) {
285 oft
->_journal_finish(r
, log_seq
, fin
, ops_map
);
287 void print(ostream
& out
) const override
{
288 out
<< "openfiles_journal";
292 void OpenFileTable::_journal_finish(int r
, uint64_t log_seq
, MDSContext
*c
,
293 std::map
<unsigned, std::vector
<ObjectOperation
> >& ops_map
)
295 dout(10) << __func__
<< " log_seq " << log_seq
<< dendl
;
297 mds
->handle_write_error(r
);
301 C_GatherBuilder
gather(g_ceph_context
,
302 new C_OnFinisher(new C_IO_OFT_Save(this, log_seq
, c
),
305 object_locator_t
oloc(mds
->mdsmap
->get_metadata_pool());
306 for (auto& it
: ops_map
) {
307 object_t oid
= get_object_name(it
.first
);
308 for (auto& op
: it
.second
) {
309 mds
->objecter
->mutate(oid
, oloc
, op
, snapc
, ceph::real_clock::now(),
310 0, gather
.new_sub());
315 journal_state
= JOURNAL_NONE
;
319 void OpenFileTable::commit(MDSContext
*c
, uint64_t log_seq
, int op_prio
)
321 dout(10) << __func__
<< " log_seq " << log_seq
<< dendl
;
323 ceph_assert(num_pending_commit
== 0);
324 num_pending_commit
++;
325 ceph_assert(log_seq
>= committing_log_seq
);
326 committing_log_seq
= log_seq
;
330 C_GatherBuilder
gather(g_ceph_context
);
333 object_locator_t
oloc(mds
->mdsmap
->get_metadata_pool());
335 const unsigned max_write_size
= mds
->mdcache
->max_dir_commit_size
;
337 struct omap_update_ctl
{
338 unsigned write_size
= 0;
339 unsigned journal_idx
= 0;
341 std::map
<string
, bufferlist
> to_update
, journaled_update
;
342 std::set
<string
> to_remove
, journaled_remove
;
344 std::vector
<omap_update_ctl
> omap_updates(omap_num_objs
);
347 auto journal_func
= [&](unsigned idx
) {
348 auto& ctl
= omap_updates
.at(idx
);
351 op
.priority
= op_prio
;
356 op
.set_last_op_flags(CEPH_OSD_OP_FLAG_FAILOK
);
359 if (ctl
.journal_idx
== 0) {
360 if (journal_state
== JOURNAL_NONE
)
361 journal_state
= JOURNAL_START
;
363 ceph_assert(journal_state
== JOURNAL_START
);
366 _encode_header(header
, journal_state
);
367 op
.omap_set_header(header
);
371 encode(omap_version
, bl
);
372 encode(ctl
.to_update
, bl
);
373 encode(ctl
.to_remove
, bl
);
376 snprintf(key
, sizeof(key
), "_journal.%x", ctl
.journal_idx
++);
377 std::map
<string
, bufferlist
> tmp_map
;
378 tmp_map
[key
].swap(bl
);
379 op
.omap_set(tmp_map
);
381 object_t oid
= get_object_name(idx
);
382 mds
->objecter
->mutate(oid
, oloc
, op
, snapc
, ceph::real_clock::now(), 0,
385 #ifdef HAVE_STDLIB_MAP_SPLICING
386 ctl
.journaled_update
.merge(ctl
.to_update
);
387 ctl
.journaled_remove
.merge(ctl
.to_remove
);
389 ctl
.journaled_update
.insert(make_move_iterator(begin(ctl
.to_update
)),
390 make_move_iterator(end(ctl
.to_update
)));
391 ctl
.journaled_remove
.insert(make_move_iterator(begin(ctl
.to_remove
)),
392 make_move_iterator(end(ctl
.to_remove
)));
394 ctl
.to_update
.clear();
395 ctl
.to_remove
.clear();
398 std::map
<unsigned, std::vector
<ObjectOperation
> > ops_map
;
400 auto create_op_func
= [&](unsigned idx
, bool update_header
) {
401 auto& ctl
= omap_updates
.at(idx
);
403 auto& op_vec
= ops_map
[idx
];
404 op_vec
.resize(op_vec
.size() + 1);
405 ObjectOperation
& op
= op_vec
.back();
406 op
.priority
= op_prio
;
411 op
.set_last_op_flags(CEPH_OSD_OP_FLAG_FAILOK
);
416 _encode_header(header
, journal_state
);
417 op
.omap_set_header(header
);
420 if (!ctl
.to_update
.empty()) {
421 op
.omap_set(ctl
.to_update
);
422 ctl
.to_update
.clear();
424 if (!ctl
.to_remove
.empty()) {
425 op
.omap_rm_keys(ctl
.to_remove
);
426 ctl
.to_remove
.clear();
430 auto submit_ops_func
= [&]() {
431 gather
.set_finisher(new C_OnFinisher(new C_IO_OFT_Save(this, log_seq
, c
),
433 for (auto& it
: ops_map
) {
434 object_t oid
= get_object_name(it
.first
);
435 for (auto& op
: it
.second
) {
436 mds
->objecter
->mutate(oid
, oloc
, op
, snapc
, ceph::real_clock::now(),
437 0, gather
.new_sub());
443 bool first_commit
= !loaded_anchor_map
.empty();
445 unsigned first_free_idx
= 0;
446 unsigned old_num_objs
= omap_num_objs
;
447 if (omap_num_objs
== 0) {
449 omap_num_items
.resize(omap_num_objs
);
450 omap_updates
.resize(omap_num_objs
);
451 omap_updates
.back().clear
= true;
454 for (auto& it
: dirty_items
) {
456 auto p
= anchor_map
.find(it
.first
);
457 if (p
!= anchor_map
.end()) {
458 for (auto q
= dirfrags
.lower_bound(dirfrag_t(it
.first
, 0));
459 q
!= dirfrags
.end() && q
->ino
== it
.first
;
461 frags
.push_back(q
->frag
);
465 auto q
= loaded_anchor_map
.find(it
.first
);
466 if (q
!= loaded_anchor_map
.end()) {
467 ceph_assert(p
!= anchor_map
.end());
468 p
->second
.omap_idx
= q
->second
.omap_idx
;
469 bool same
= p
->second
== q
->second
;
471 auto r
= loaded_dirfrags
.lower_bound(dirfrag_t(it
.first
, 0));
472 for (const auto& fg
: frags
) {
473 if (r
== loaded_dirfrags
.end() || !(*r
== dirfrag_t(it
.first
, fg
))) {
479 if (same
&& r
!= loaded_dirfrags
.end() && r
->ino
== it
.first
)
482 loaded_anchor_map
.erase(q
);
489 int len
= snprintf(key
, sizeof(key
), "%llx", (unsigned long long)it
.first
.val
);
492 if (p
!= anchor_map
.end()) {
493 omap_idx
= p
->second
.omap_idx
;
495 ceph_assert(it
.second
== DIRTY_NEW
);
496 // find omap object to store the key
497 for (unsigned i
= first_free_idx
; i
< omap_num_objs
; i
++) {
498 if (omap_num_items
[i
] < MAX_ITEMS_PER_OBJ
) {
505 ceph_assert(omap_num_objs
<= MAX_OBJECTS
);
506 omap_num_items
.resize(omap_num_objs
);
507 omap_updates
.resize(omap_num_objs
);
508 omap_updates
.back().clear
= true;
509 omap_idx
= omap_num_objs
- 1;
511 first_free_idx
= omap_idx
;
513 p
->second
.omap_idx
= omap_idx
;
514 ++omap_num_items
[omap_idx
];
517 omap_idx
= it
.second
;
518 unsigned& count
= omap_num_items
.at(omap_idx
);
519 ceph_assert(count
> 0);
521 if ((unsigned)omap_idx
< first_free_idx
&& count
< MAX_ITEMS_PER_OBJ
)
522 first_free_idx
= omap_idx
;
524 auto& ctl
= omap_updates
.at(omap_idx
);
526 if (p
!= anchor_map
.end()) {
528 encode(p
->second
, bl
);
531 ctl
.write_size
+= bl
.length() + len
+ 2 * sizeof(__u32
);
532 ctl
.to_update
[key
].swap(bl
);
534 ctl
.write_size
+= len
+ sizeof(__u32
);
535 ctl
.to_remove
.emplace(key
);
538 if (ctl
.write_size
>= max_write_size
) {
539 journal_func(omap_idx
);
547 for (auto& it
: loaded_anchor_map
) {
549 int len
= snprintf(key
, sizeof(key
), "%llx", (unsigned long long)it
.first
.val
);
551 int omap_idx
= it
.second
.omap_idx
;
552 unsigned& count
= omap_num_items
.at(omap_idx
);
553 ceph_assert(count
> 0);
556 auto& ctl
= omap_updates
.at(omap_idx
);
557 ctl
.write_size
+= len
+ sizeof(__u32
);
558 ctl
.to_remove
.emplace(key
);
560 if (ctl
.write_size
>= max_write_size
) {
561 journal_func(omap_idx
);
565 loaded_anchor_map
.clear();
566 loaded_dirfrags
.clear();
569 size_t total_items
= 0;
571 unsigned used_objs
= 1;
572 std::vector
<unsigned> objs_to_write
;
573 bool journaled
= false;
574 for (unsigned i
= 0; i
< omap_num_objs
; i
++) {
575 total_items
+= omap_num_items
[i
];
576 if (omap_updates
[i
].journal_idx
)
578 else if (omap_updates
[i
].write_size
)
579 objs_to_write
.push_back(i
);
581 if (omap_num_items
[i
] > 0)
584 ceph_assert(total_items
== anchor_map
.size());
585 // adjust omap object count
586 if (used_objs
< omap_num_objs
) {
587 omap_num_objs
= used_objs
;
588 omap_num_items
.resize(omap_num_objs
);
590 // skip journal if only one osd request is required and object count
592 if (!journaled
&& old_num_objs
== omap_num_objs
&&
593 objs_to_write
.size() <= 1) {
594 ceph_assert(journal_state
== JOURNAL_NONE
);
595 ceph_assert(!gather
.has_subs());
597 unsigned omap_idx
= objs_to_write
.empty() ? 0 : objs_to_write
.front();
598 create_op_func(omap_idx
, true);
604 for (unsigned omap_idx
= 0; omap_idx
< omap_updates
.size(); omap_idx
++) {
605 auto& ctl
= omap_updates
[omap_idx
];
606 if (ctl
.write_size
> 0) {
607 journal_func(omap_idx
);
612 if (journal_state
== JOURNAL_START
) {
613 ceph_assert(gather
.has_subs());
614 journal_state
= JOURNAL_FINISH
;
616 // only object count changes
617 ceph_assert(journal_state
== JOURNAL_NONE
);
618 ceph_assert(!gather
.has_subs());
621 uint64_t total_updates
= 0;
622 uint64_t total_removes
= 0;
624 for (unsigned omap_idx
= 0; omap_idx
< omap_updates
.size(); omap_idx
++) {
625 auto& ctl
= omap_updates
[omap_idx
];
626 ceph_assert(ctl
.to_update
.empty() && ctl
.to_remove
.empty());
627 if (ctl
.journal_idx
== 0)
628 ceph_assert(ctl
.journaled_update
.empty() && ctl
.journaled_remove
.empty());
631 for (auto& it
: ctl
.journaled_update
) {
632 ctl
.write_size
+= it
.first
.length() + it
.second
.length() + 2 * sizeof(__u32
);
633 ctl
.to_update
[it
.first
].swap(it
.second
);
634 if (ctl
.write_size
>= max_write_size
) {
635 create_op_func(omap_idx
, first
);
642 for (auto& key
: ctl
.journaled_remove
) {
643 ctl
.write_size
+= key
.length() + sizeof(__u32
);
644 ctl
.to_remove
.emplace(key
);
645 if (ctl
.write_size
>= max_write_size
) {
646 create_op_func(omap_idx
, first
);
653 for (unsigned i
= 0; i
< ctl
.journal_idx
; ++i
) {
655 snprintf(key
, sizeof(key
), "_journal.%x", i
);
656 ctl
.to_remove
.emplace(key
);
659 // update first object's omap header if object count changes
661 ctl
.journal_idx
> 0 ||
662 (omap_idx
== 0 && old_num_objs
!= omap_num_objs
))
663 create_op_func(omap_idx
, first
);
666 ceph_assert(!ops_map
.empty());
667 if (journal_state
== JOURNAL_FINISH
) {
668 gather
.set_finisher(new C_OnFinisher(new C_IO_OFT_Journal(this, log_seq
, c
, ops_map
),
674 logger
->set(l_oft_omap_total_objs
, omap_num_objs
);
675 logger
->set(l_oft_omap_total_kv_pairs
, total_items
);
676 logger
->inc(l_oft_omap_total_updates
, total_updates
);
677 logger
->inc(l_oft_omap_total_removes
, total_removes
);
680 class C_IO_OFT_Load
: public MDSIOContextBase
{
683 MDSRank
*get_mds() override
{ return oft
->mds
; }
686 int header_r
= 0; //< Return value from OMAP header read
687 int values_r
= 0; //< Return value from OMAP value read
688 bufferlist header_bl
;
689 std::map
<std::string
, bufferlist
> values
;
694 C_IO_OFT_Load(OpenFileTable
*t
, unsigned i
, bool f
) :
695 oft(t
), index(i
), first(f
) {}
696 void finish(int r
) override
{
697 oft
->_load_finish(r
, header_r
, values_r
, index
, first
, more
, header_bl
, values
);
699 void print(ostream
& out
) const override
{
700 out
<< "openfiles_load";
704 class C_IO_OFT_Recover
: public MDSIOContextBase
{
707 MDSRank
*get_mds() override
{ return oft
->mds
; }
709 C_IO_OFT_Recover(OpenFileTable
*t
) : oft(t
) {}
710 void finish(int r
) override
{
711 oft
->_recover_finish(r
);
713 void print(ostream
& out
) const override
{
714 out
<< "openfiles_recover";
718 void OpenFileTable::_recover_finish(int r
)
721 derr
<< __func__
<< " got " << cpp_strerror(r
) << dendl
;
724 dout(10) << __func__
<< ": load complete" << dendl
;
727 journal_state
= JOURNAL_NONE
;
729 finish_contexts(g_ceph_context
, waiting_for_load
);
730 waiting_for_load
.clear();
733 void OpenFileTable::_load_finish(int op_r
, int header_r
, int values_r
,
734 unsigned idx
, bool first
, bool more
,
735 bufferlist
&header_bl
,
736 std::map
<std::string
, bufferlist
> &values
)
741 auto decode_func
= [this](unsigned idx
, inodeno_t ino
, bufferlist
&bl
) {
742 auto p
= bl
.cbegin();
744 size_t count
= loaded_anchor_map
.size();
745 auto it
= loaded_anchor_map
.emplace_hint(loaded_anchor_map
.end(),
746 std::piecewise_construct
,
747 std::make_tuple(ino
),
749 RecoveredAnchor
& anchor
= it
->second
;
751 ceph_assert(ino
== anchor
.ino
);
752 anchor
.omap_idx
= idx
;
753 anchor
.auth
= MDS_RANK_NONE
;
757 for (const auto& fg
: frags
)
758 loaded_dirfrags
.insert(loaded_dirfrags
.end(), dirfrag_t(anchor
.ino
, fg
));
760 if (loaded_anchor_map
.size() > count
)
761 ++omap_num_items
[idx
];
765 derr
<< __func__
<< " got " << cpp_strerror(op_r
) << dendl
;
772 auto p
= header_bl
.cbegin();
779 if (header_bl
.length() == 13) {
786 if (magic
!= CEPH_FS_ONDISK_MAGIC
) {
787 std::ostringstream oss
;
788 oss
<< "invalid magic '" << magic
<< "'";
789 throw buffer::malformed_input(oss
.str());
799 if (num_objs
> MAX_OBJECTS
) {
800 std::ostringstream oss
;
801 oss
<< "invalid object count '" << num_objs
<< "'";
802 throw buffer::malformed_input(oss
.str());
804 if (jstate
> JOURNAL_FINISH
) {
805 std::ostringstream oss
;
806 oss
<< "invalid journal state '" << jstate
<< "'";
807 throw buffer::malformed_input(oss
.str());
810 if (version
> omap_version
) {
811 omap_version
= version
;
812 omap_num_objs
= num_objs
;
813 omap_num_items
.resize(omap_num_objs
);
814 journal_state
= jstate
;
815 } else if (version
== omap_version
) {
816 ceph_assert(omap_num_objs
== num_objs
);
817 if (jstate
> journal_state
)
818 journal_state
= jstate
;
822 for (auto& it
: values
) {
823 if (it
.first
.compare(0, 9, "_journal.") == 0) {
824 if (idx
>= loaded_journals
.size())
825 loaded_journals
.resize(idx
+ 1);
827 if (journal_state
== JOURNAL_FINISH
) {
828 loaded_journals
[idx
][it
.first
].swap(it
.second
);
829 } else { // incomplete journal
830 loaded_journals
[idx
][it
.first
].length();
836 sscanf(it
.first
.c_str(), "%llx", (unsigned long long*)&ino
.val
);
837 decode_func(idx
, ino
, it
.second
);
839 } catch (buffer::error
&e
) {
840 derr
<< __func__
<< ": corrupted header/values: " << e
.what() << dendl
;
844 if (more
|| idx
+ 1 < omap_num_objs
) {
845 // Issue another read if we're not at the end of the omap
846 std::string last_key
;
848 last_key
= values
.rbegin()->first
;
851 dout(10) << __func__
<< ": continue to load from '" << last_key
<< "'" << dendl
;
852 object_t oid
= get_object_name(idx
);
853 object_locator_t
oloc(mds
->mdsmap
->get_metadata_pool());
854 C_IO_OFT_Load
*c
= new C_IO_OFT_Load(this, idx
, !more
);
857 op
.omap_get_header(&c
->header_bl
, &c
->header_r
);
858 op
.omap_get_vals(last_key
, "", uint64_t(-1),
859 &c
->values
, &c
->more
, &c
->values_r
);
860 mds
->objecter
->read(oid
, oloc
, op
, CEPH_NOSNAP
, nullptr, 0,
861 new C_OnFinisher(c
, mds
->finisher
));
866 if (loaded_journals
.size() > 0) {
867 dout(10) << __func__
<< ": recover journal" << dendl
;
869 C_GatherBuilder
gather(g_ceph_context
,
870 new C_OnFinisher(new C_IO_OFT_Recover(this),
872 object_locator_t
oloc(mds
->mdsmap
->get_metadata_pool());
875 for (unsigned omap_idx
= 0; omap_idx
< loaded_journals
.size(); omap_idx
++) {
876 auto& loaded_journal
= loaded_journals
[omap_idx
];
878 std::vector
<ObjectOperation
> op_vec
;
880 for (auto& it
: loaded_journal
) {
881 if (journal_state
!= JOURNAL_FINISH
)
883 auto p
= it
.second
.cbegin();
885 std::map
<string
, bufferlist
> to_update
;
886 std::set
<string
> to_remove
;
888 if (version
!= omap_version
)
890 decode(to_update
, p
);
891 decode(to_remove
, p
);
894 for (auto& q
: to_update
) {
896 sscanf(q
.first
.c_str(), "%llx", (unsigned long long*)&ino
.val
);
897 decode_func(omap_idx
, ino
, q
.second
);
899 for (auto& q
: to_remove
) {
901 sscanf(q
.c_str(), "%llx",(unsigned long long*)&ino
.val
);
902 ceph_assert(ino
.val
> 0);
903 if (loaded_anchor_map
.erase(ino
)) {
904 unsigned& count
= omap_num_items
[omap_idx
];
905 ceph_assert(count
> 0);
908 auto r
= loaded_dirfrags
.lower_bound(dirfrag_t(ino
, 0));
909 while (r
!= loaded_dirfrags
.end() && r
->ino
== ino
)
910 loaded_dirfrags
.erase(r
++);
913 op_vec
.resize(op_vec
.size() + 1);
914 ObjectOperation
& op
= op_vec
.back();
915 op
.priority
= CEPH_MSG_PRIO_HIGH
;
916 if (!to_update
.empty())
917 op
.omap_set(to_update
);
918 if (!to_remove
.empty())
919 op
.omap_rm_keys(to_remove
);
921 } catch (buffer::error
&e
) {
922 derr
<< __func__
<< ": corrupted journal: " << e
.what() << dendl
;
926 op_vec
.resize(op_vec
.size() + 1);
927 ObjectOperation
& op
= op_vec
.back();
930 if (journal_state
== JOURNAL_FINISH
)
931 _encode_header(header
, JOURNAL_FINISH
);
933 _encode_header(header
, JOURNAL_NONE
);
934 op
.omap_set_header(header
);
938 std::set
<string
> to_remove
;
939 for (auto &it
: loaded_journal
)
940 to_remove
.emplace(it
.first
);
941 op
.omap_rm_keys(to_remove
);
943 loaded_journal
.clear();
945 object_t oid
= get_object_name(omap_idx
);
946 for (auto& op
: op_vec
) {
947 mds
->objecter
->mutate(oid
, oloc
, op
, snapc
, ceph::real_clock::now(),
948 0, gather
.new_sub());
955 journal_state
= JOURNAL_NONE
;
957 dout(10) << __func__
<< ": load complete" << dendl
;
964 finish_contexts(g_ceph_context
, waiting_for_load
);
965 waiting_for_load
.clear();
968 void OpenFileTable::load(MDSContext
*onload
)
970 dout(10) << __func__
<< dendl
;
971 ceph_assert(!load_done
);
973 waiting_for_load
.push_back(onload
);
975 C_IO_OFT_Load
*c
= new C_IO_OFT_Load(this, 0, true);
976 object_t oid
= get_object_name(0);
977 object_locator_t
oloc(mds
->mdsmap
->get_metadata_pool());
980 op
.omap_get_header(&c
->header_bl
, &c
->header_r
);
981 op
.omap_get_vals("", "", uint64_t(-1),
982 &c
->values
, &c
->more
, &c
->values_r
);
984 mds
->objecter
->read(oid
, oloc
, op
, CEPH_NOSNAP
, nullptr, 0,
985 new C_OnFinisher(c
, mds
->finisher
));
988 bool OpenFileTable::get_ancestors(inodeno_t ino
, vector
<inode_backpointer_t
>& ancestors
,
989 mds_rank_t
& auth_hint
)
991 auto p
= loaded_anchor_map
.find(ino
);
992 if (p
== loaded_anchor_map
.end())
995 inodeno_t dirino
= p
->second
.dirino
;
996 if (dirino
== inodeno_t(0))
1002 ancestors
.push_back(inode_backpointer_t(dirino
, p
->second
.d_name
, 0));
1004 p
= loaded_anchor_map
.find(dirino
);
1005 if (p
== loaded_anchor_map
.end())
1009 auth_hint
= p
->second
.auth
;
1011 dirino
= p
->second
.dirino
;
1012 if (dirino
== inodeno_t(0))
1020 class C_OFT_OpenInoFinish
: public MDSContext
{
1023 MDSRank
*get_mds() override
{ return oft
->mds
; }
1025 C_OFT_OpenInoFinish(OpenFileTable
*t
, inodeno_t i
) : oft(t
), ino(i
) {}
1026 void finish(int r
) override
{
1027 oft
->_open_ino_finish(ino
, r
);
1031 void OpenFileTable::_open_ino_finish(inodeno_t ino
, int r
)
1033 if (prefetch_state
== DIR_INODES
&& r
>= 0 && ino
!= inodeno_t(0)) {
1034 auto p
= loaded_anchor_map
.find(ino
);
1035 ceph_assert(p
!= loaded_anchor_map
.end());
1036 p
->second
.auth
= mds_rank_t(r
);
1039 if (r
!= mds
->get_nodeid())
1040 mds
->mdcache
->rejoin_prefetch_ino_finish(ino
, r
);
1042 num_opening_inodes
--;
1043 if (num_opening_inodes
== 0) {
1044 if (prefetch_state
== DIR_INODES
) {
1045 prefetch_state
= DIRFRAGS
;
1046 _prefetch_dirfrags();
1047 } else if (prefetch_state
== FILE_INODES
) {
1048 prefetch_state
= DONE
;
1049 logseg_destroyed_inos
.clear();
1050 destroyed_inos_set
.clear();
1051 finish_contexts(g_ceph_context
, waiting_for_prefetch
);
1052 waiting_for_prefetch
.clear();
1059 void OpenFileTable::_prefetch_dirfrags()
1061 dout(10) << __func__
<< dendl
;
1062 ceph_assert(prefetch_state
== DIRFRAGS
);
1064 MDCache
*mdcache
= mds
->mdcache
;
1065 std::vector
<CDir
*> fetch_queue
;
1067 CInode
*last_in
= nullptr;
1068 for (auto df
: loaded_dirfrags
) {
1070 if (last_in
&& last_in
->ino() == df
.ino
) {
1073 diri
= mdcache
->get_inode(df
.ino
);
1078 if (diri
->state_test(CInode::STATE_REJOINUNDEF
))
1081 CDir
*dir
= diri
->get_dirfrag(df
.frag
);
1083 if (dir
->is_auth() && !dir
->is_complete())
1084 fetch_queue
.push_back(dir
);
1087 diri
->dirfragtree
.get_leaves_under(df
.frag
, leaves
);
1088 for (const auto& leaf
: leaves
) {
1089 if (diri
->is_auth()) {
1090 dir
= diri
->get_or_open_dirfrag(mdcache
, leaf
);
1092 dir
= diri
->get_dirfrag(leaf
);
1094 if (dir
&& dir
->is_auth() && !dir
->is_complete())
1095 fetch_queue
.push_back(dir
);
1100 MDSGatherBuilder
gather(g_ceph_context
);
1101 int num_opening_dirfrags
= 0;
1102 for (const auto& dir
: fetch_queue
) {
1103 if (dir
->state_test(CDir::STATE_REJOINUNDEF
))
1104 ceph_assert(dir
->get_inode()->dirfragtree
.is_leaf(dir
->get_frag()));
1105 dir
->fetch(gather
.new_sub());
1107 if (!(++num_opening_dirfrags
% 1000))
1108 mds
->heartbeat_reset();
1111 auto finish_func
= [this](int r
) {
1112 prefetch_state
= FILE_INODES
;
1115 if (gather
.has_subs()) {
1116 gather
.set_finisher(
1117 new MDSInternalContextWrapper(mds
,
1118 new LambdaContext(std::move(finish_func
))));
1125 void OpenFileTable::_prefetch_inodes()
1127 dout(10) << __func__
<< " state " << prefetch_state
<< dendl
;
1128 ceph_assert(!num_opening_inodes
);
1129 num_opening_inodes
= 1;
1132 if (prefetch_state
== DIR_INODES
)
1133 pool
= mds
->mdsmap
->get_metadata_pool();
1134 else if (prefetch_state
== FILE_INODES
)
1135 pool
= mds
->mdsmap
->get_first_data_pool();
1139 MDCache
*mdcache
= mds
->mdcache
;
1141 if (destroyed_inos_set
.empty()) {
1142 for (auto& it
: logseg_destroyed_inos
)
1143 destroyed_inos_set
.insert(it
.second
.begin(), it
.second
.end());
1146 for (auto& it
: loaded_anchor_map
) {
1147 if (destroyed_inos_set
.count(it
.first
))
1149 if (it
.second
.d_type
== DT_DIR
) {
1150 if (prefetch_state
!= DIR_INODES
)
1152 if (MDS_INO_IS_MDSDIR(it
.first
)) {
1153 it
.second
.auth
= MDS_INO_MDSDIR_OWNER(it
.first
);
1156 if (MDS_INO_IS_STRAY(it
.first
)) {
1157 it
.second
.auth
= MDS_INO_STRAY_OWNER(it
.first
);
1161 if (prefetch_state
!= FILE_INODES
)
1163 // load all file inodes for MDCache::identify_files_to_recover()
1165 CInode
*in
= mdcache
->get_inode(it
.first
);
1169 num_opening_inodes
++;
1170 mdcache
->open_ino(it
.first
, pool
, new C_OFT_OpenInoFinish(this, it
.first
), false);
1172 if (!(num_opening_inodes
% 1000))
1173 mds
->heartbeat_reset();
1176 _open_ino_finish(inodeno_t(0), 0);
1179 bool OpenFileTable::prefetch_inodes()
1181 dout(10) << __func__
<< dendl
;
1182 ceph_assert(!prefetch_state
);
1183 prefetch_state
= DIR_INODES
;
1187 new MDSInternalContextWrapper(mds
,
1188 new LambdaContext([this](int r
) {
1197 return !is_prefetched();
1200 bool OpenFileTable::should_log_open(CInode
*in
)
1202 if (in
->state_test(CInode::STATE_TRACKEDBYOFT
)) {
1203 // inode just journaled
1204 if (in
->last_journaled
>= committing_log_seq
)
1206 // item not dirty. it means the item has already been saved
1207 auto p
= dirty_items
.find(in
->ino());
1208 if (p
== dirty_items
.end())
1214 void OpenFileTable::note_destroyed_inos(uint64_t seq
, const vector
<inodeno_t
>& inos
)
1216 auto& vec
= logseg_destroyed_inos
[seq
];
1217 vec
.insert(vec
.end(), inos
.begin(), inos
.end());
1220 void OpenFileTable::trim_destroyed_inos(uint64_t seq
)
1222 auto p
= logseg_destroyed_inos
.begin();
1223 while (p
!= logseg_destroyed_inos
.end()) {
1224 if (p
->first
>= seq
)
1226 logseg_destroyed_inos
.erase(p
++);