1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2018 Red Hat
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
16 #include "mds/CInode.h"
18 #include "mds/MDSRank.h"
19 #include "mds/MDCache.h"
20 #include "osdc/Objecter.h"
21 #include "OpenFileTable.h"
23 #include "common/config.h"
24 #include "common/errno.h"
27 l_oft_first
= 1000000,
28 l_oft_omap_total_objs
,
29 l_oft_omap_total_kv_pairs
,
30 l_oft_omap_total_updates
,
31 l_oft_omap_total_removes
,
35 #define dout_context g_ceph_context
36 #define dout_subsys ceph_subsys_mds
38 #define dout_prefix _prefix(_dout, mds)
42 static std::ostream
& _prefix(std::ostream
*_dout
, MDSRank
*mds
) {
43 return *_dout
<< "mds." << mds
->get_nodeid() << ".openfiles ";
46 OpenFileTable::OpenFileTable(MDSRank
*m
) : mds(m
) {
47 PerfCountersBuilder
b(mds
->cct
, "oft", l_oft_first
, l_oft_last
);
49 b
.add_u64(l_oft_omap_total_objs
, "omap_total_objs");
50 b
.add_u64(l_oft_omap_total_kv_pairs
, "omap_total_kv_pairs");
51 b
.add_u64(l_oft_omap_total_updates
, "omap_total_updates");
52 b
.add_u64(l_oft_omap_total_removes
, "omap_total_removes");
53 logger
.reset(b
.create_perf_counters());
54 mds
->cct
->get_perfcounters_collection()->add(logger
.get());
55 logger
->set(l_oft_omap_total_objs
, 0);
56 logger
->set(l_oft_omap_total_kv_pairs
, 0);
57 logger
->set(l_oft_omap_total_updates
, 0);
58 logger
->set(l_oft_omap_total_removes
, 0);
61 OpenFileTable::~OpenFileTable() {
63 mds
->cct
->get_perfcounters_collection()->remove(logger
.get());
67 void OpenFileTable::get_ref(CInode
*in
, frag_t fg
)
70 auto p
= anchor_map
.find(in
->ino());
72 ceph_assert(fg
== -1U);
73 ceph_assert(p
== anchor_map
.end());
76 if (p
!= anchor_map
.end()) {
77 ceph_assert(in
->state_test(CInode::STATE_TRACKEDBYOFT
));
78 ceph_assert(p
->second
.nref
> 0);
82 auto ret
= p
->second
.frags
.insert(fg
);
83 ceph_assert(ret
.second
);
84 dirty_items
.emplace(in
->ino(), (int)DIRTY_UNDEF
);
89 CDentry
*dn
= in
->get_parent_dn();
90 CInode
*pin
= dn
? dn
->get_dir()->get_inode() : nullptr;
92 auto ret
= anchor_map
.emplace(std::piecewise_construct
, std::forward_as_tuple(in
->ino()),
93 std::forward_as_tuple(in
->ino(), (pin
? pin
->ino() : inodeno_t(0)),
94 (dn
? dn
->get_name() : string()), in
->d_type(), 1));
95 ceph_assert(ret
.second
== true);
96 in
->state_set(CInode::STATE_TRACKEDBYOFT
);
99 ret
.first
->second
.frags
.insert(fg
);
101 auto ret1
= dirty_items
.emplace(in
->ino(), (int)DIRTY_NEW
);
103 int omap_idx
= ret1
.first
->second
;
104 ceph_assert(omap_idx
>= 0);
105 ret
.first
->second
.omap_idx
= omap_idx
;
113 void OpenFileTable::put_ref(CInode
*in
, frag_t fg
)
116 ceph_assert(in
->state_test(CInode::STATE_TRACKEDBYOFT
));
117 auto p
= anchor_map
.find(in
->ino());
118 ceph_assert(p
!= anchor_map
.end());
119 ceph_assert(p
->second
.nref
> 0);
122 ceph_assert(fg
== -1U);
123 ceph_assert(p
->second
.nref
== 1);
126 if (p
->second
.nref
> 1) {
129 auto ret
= p
->second
.frags
.erase(fg
);
131 dirty_items
.emplace(in
->ino(), (int)DIRTY_UNDEF
);
136 CDentry
*dn
= in
->get_parent_dn();
137 CInode
*pin
= dn
? dn
->get_dir()->get_inode() : nullptr;
139 ceph_assert(p
->second
.dirino
== pin
->ino());
140 ceph_assert(p
->second
.d_name
== dn
->get_name());
142 ceph_assert(p
->second
.dirino
== inodeno_t(0));
143 ceph_assert(p
->second
.d_name
== "");
147 ceph_assert(p
->second
.frags
.size() == 1);
148 ceph_assert(*p
->second
.frags
.begin() == fg
);
151 int omap_idx
= p
->second
.omap_idx
;
153 in
->state_clear(CInode::STATE_TRACKEDBYOFT
);
155 auto ret
= dirty_items
.emplace(in
->ino(), omap_idx
);
157 if (ret
.first
->second
== DIRTY_NEW
) {
158 ceph_assert(omap_idx
< 0);
159 dirty_items
.erase(ret
.first
);
161 ceph_assert(omap_idx
>= 0);
162 ret
.first
->second
= omap_idx
;
171 void OpenFileTable::add_inode(CInode
*in
)
173 dout(10) << __func__
<< " " << *in
<< dendl
;
177 void OpenFileTable::remove_inode(CInode
*in
)
179 dout(10) << __func__
<< " " << *in
<< dendl
;
183 void OpenFileTable::add_dirfrag(CDir
*dir
)
185 dout(10) << __func__
<< " " << *dir
<< dendl
;
186 ceph_assert(!dir
->state_test(CDir::STATE_TRACKEDBYOFT
));
187 dir
->state_set(CDir::STATE_TRACKEDBYOFT
);
188 get_ref(dir
->get_inode(), dir
->get_frag());
191 void OpenFileTable::remove_dirfrag(CDir
*dir
)
193 dout(10) << __func__
<< " " << *dir
<< dendl
;
194 ceph_assert(dir
->state_test(CDir::STATE_TRACKEDBYOFT
));
195 dir
->state_clear(CDir::STATE_TRACKEDBYOFT
);
196 put_ref(dir
->get_inode(), dir
->get_frag());
199 void OpenFileTable::notify_link(CInode
*in
)
201 dout(10) << __func__
<< " " << *in
<< dendl
;
202 auto p
= anchor_map
.find(in
->ino());
203 ceph_assert(p
!= anchor_map
.end());
204 ceph_assert(p
->second
.nref
> 0);
205 ceph_assert(p
->second
.dirino
== inodeno_t(0));
206 ceph_assert(p
->second
.d_name
== "");
208 CDentry
*dn
= in
->get_parent_dn();
209 CInode
*pin
= dn
->get_dir()->get_inode();
211 p
->second
.dirino
= pin
->ino();
212 p
->second
.d_name
= dn
->get_name();
213 dirty_items
.emplace(in
->ino(), (int)DIRTY_UNDEF
);
218 void OpenFileTable::notify_unlink(CInode
*in
)
220 dout(10) << __func__
<< " " << *in
<< dendl
;
221 auto p
= anchor_map
.find(in
->ino());
222 ceph_assert(p
!= anchor_map
.end());
223 ceph_assert(p
->second
.nref
> 0);
225 CDentry
*dn
= in
->get_parent_dn();
226 CInode
*pin
= dn
->get_dir()->get_inode();
227 ceph_assert(p
->second
.dirino
== pin
->ino());
228 ceph_assert(p
->second
.d_name
== dn
->get_name());
230 p
->second
.dirino
= inodeno_t(0);
231 p
->second
.d_name
= "";
232 dirty_items
.emplace(in
->ino(), (int)DIRTY_UNDEF
);
237 object_t
OpenFileTable::get_object_name(unsigned idx
) const
240 snprintf(s
, sizeof(s
), "mds%d_openfiles.%x", int(mds
->get_nodeid()), idx
);
244 void OpenFileTable::_encode_header(bufferlist
&bl
, int j_state
)
246 std::string_view magic
= CEPH_FS_ONDISK_MAGIC
;
248 ENCODE_START(1, 1, bl
);
249 encode(omap_version
, bl
);
250 encode(omap_num_objs
, bl
);
251 encode((__u8
)j_state
, bl
);
255 class C_IO_OFT_Save
: public MDSIOContextBase
{
260 MDSRank
*get_mds() override
{ return oft
->mds
; }
262 C_IO_OFT_Save(OpenFileTable
*t
, uint64_t s
, MDSContext
*c
) :
263 oft(t
), log_seq(s
), fin(c
) {}
265 oft
->_commit_finish(r
, log_seq
, fin
);
267 void print(ostream
& out
) const override
{
268 out
<< "openfiles_save";
272 void OpenFileTable::_commit_finish(int r
, uint64_t log_seq
, MDSContext
*fin
)
274 dout(10) << __func__
<< " log_seq " << log_seq
<< dendl
;
276 mds
->handle_write_error(r
);
280 ceph_assert(log_seq
<= committing_log_seq
);
281 ceph_assert(log_seq
>= committed_log_seq
);
282 committed_log_seq
= log_seq
;
283 num_pending_commit
--;
289 class C_IO_OFT_Journal
: public MDSIOContextBase
{
294 std::map
<unsigned, std::vector
<ObjectOperation
> > ops_map
;
295 MDSRank
*get_mds() override
{ return oft
->mds
; }
297 C_IO_OFT_Journal(OpenFileTable
*t
, uint64_t s
, MDSContext
*c
,
298 std::map
<unsigned, std::vector
<ObjectOperation
> >& ops
) :
299 oft(t
), log_seq(s
), fin(c
) {
303 oft
->_journal_finish(r
, log_seq
, fin
, ops_map
);
305 void print(ostream
& out
) const override
{
306 out
<< "openfiles_journal";
310 void OpenFileTable::_journal_finish(int r
, uint64_t log_seq
, MDSContext
*c
,
311 std::map
<unsigned, std::vector
<ObjectOperation
> >& ops_map
)
313 dout(10) << __func__
<< " log_seq " << log_seq
<< dendl
;
315 mds
->handle_write_error(r
);
319 C_GatherBuilder
gather(g_ceph_context
,
320 new C_OnFinisher(new C_IO_OFT_Save(this, log_seq
, c
),
323 object_locator_t
oloc(mds
->get_metadata_pool());
324 for (auto& [idx
, vops
] : ops_map
) {
325 object_t oid
= get_object_name(idx
);
326 for (auto& op
: vops
) {
327 mds
->objecter
->mutate(oid
, oloc
, op
, snapc
, ceph::real_clock::now(),
328 0, gather
.new_sub());
333 journal_state
= JOURNAL_NONE
;
337 void OpenFileTable::commit(MDSContext
*c
, uint64_t log_seq
, int op_prio
)
339 dout(10) << __func__
<< " log_seq " << log_seq
<< dendl
;
341 ceph_assert(num_pending_commit
== 0);
342 num_pending_commit
++;
343 ceph_assert(log_seq
>= committing_log_seq
);
344 committing_log_seq
= log_seq
;
348 C_GatherBuilder
gather(g_ceph_context
);
351 object_locator_t
oloc(mds
->get_metadata_pool());
353 const unsigned max_write_size
= mds
->mdcache
->max_dir_commit_size
;
355 struct omap_update_ctl
{
356 unsigned write_size
= 0;
357 unsigned journal_idx
= 0;
359 std::map
<string
, bufferlist
> to_update
, journaled_update
;
360 std::set
<string
> to_remove
, journaled_remove
;
362 std::vector
<omap_update_ctl
> omap_updates(omap_num_objs
);
365 auto journal_func
= [&](unsigned idx
) {
366 auto& ctl
= omap_updates
.at(idx
);
369 op
.priority
= op_prio
;
374 op
.set_last_op_flags(CEPH_OSD_OP_FLAG_FAILOK
);
377 if (ctl
.journal_idx
== 0) {
378 if (journal_state
== JOURNAL_NONE
)
379 journal_state
= JOURNAL_START
;
381 ceph_assert(journal_state
== JOURNAL_START
);
384 _encode_header(header
, journal_state
);
385 op
.omap_set_header(header
);
389 encode(omap_version
, bl
);
390 encode(ctl
.to_update
, bl
);
391 encode(ctl
.to_remove
, bl
);
394 snprintf(key
, sizeof(key
), "_journal.%x", ctl
.journal_idx
++);
395 std::map
<string
, bufferlist
> tmp_map
;
396 tmp_map
[key
].swap(bl
);
397 op
.omap_set(tmp_map
);
399 object_t oid
= get_object_name(idx
);
400 mds
->objecter
->mutate(oid
, oloc
, op
, snapc
, ceph::real_clock::now(), 0,
403 #ifdef HAVE_STDLIB_MAP_SPLICING
404 ctl
.journaled_update
.merge(ctl
.to_update
);
405 ctl
.journaled_remove
.merge(ctl
.to_remove
);
407 ctl
.journaled_update
.insert(make_move_iterator(begin(ctl
.to_update
)),
408 make_move_iterator(end(ctl
.to_update
)));
409 ctl
.journaled_remove
.insert(make_move_iterator(begin(ctl
.to_remove
)),
410 make_move_iterator(end(ctl
.to_remove
)));
412 ctl
.to_update
.clear();
413 ctl
.to_remove
.clear();
416 std::map
<unsigned, std::vector
<ObjectOperation
> > ops_map
;
418 auto create_op_func
= [&](unsigned idx
, bool update_header
) {
419 auto& ctl
= omap_updates
.at(idx
);
421 auto& op_vec
= ops_map
[idx
];
422 op_vec
.resize(op_vec
.size() + 1);
423 ObjectOperation
& op
= op_vec
.back();
424 op
.priority
= op_prio
;
429 op
.set_last_op_flags(CEPH_OSD_OP_FLAG_FAILOK
);
434 _encode_header(header
, journal_state
);
435 op
.omap_set_header(header
);
438 if (!ctl
.to_update
.empty()) {
439 op
.omap_set(ctl
.to_update
);
440 ctl
.to_update
.clear();
442 if (!ctl
.to_remove
.empty()) {
443 op
.omap_rm_keys(ctl
.to_remove
);
444 ctl
.to_remove
.clear();
448 auto submit_ops_func
= [&]() {
449 gather
.set_finisher(new C_OnFinisher(new C_IO_OFT_Save(this, log_seq
, c
),
451 for (auto& [idx
, vops
] : ops_map
) {
452 object_t oid
= get_object_name(idx
);
453 for (auto& op
: vops
) {
454 mds
->objecter
->mutate(oid
, oloc
, op
, snapc
, ceph::real_clock::now(),
455 0, gather
.new_sub());
461 bool first_commit
= !loaded_anchor_map
.empty();
463 unsigned first_free_idx
= 0;
464 unsigned old_num_objs
= omap_num_objs
;
465 if (omap_num_objs
== 0) {
467 omap_num_items
.resize(omap_num_objs
);
468 omap_updates
.resize(omap_num_objs
);
469 omap_updates
.back().clear
= true;
472 for (auto& [ino
, state
] : dirty_items
) {
473 auto p
= anchor_map
.find(ino
);
476 auto q
= loaded_anchor_map
.find(ino
);
477 if (q
!= loaded_anchor_map
.end()) {
478 ceph_assert(p
!= anchor_map
.end());
479 p
->second
.omap_idx
= q
->second
.omap_idx
;
480 bool same
= (p
->second
== q
->second
);
481 loaded_anchor_map
.erase(q
);
488 int len
= snprintf(key
, sizeof(key
), "%llx", (unsigned long long)ino
.val
);
491 if (p
!= anchor_map
.end()) {
492 omap_idx
= p
->second
.omap_idx
;
494 ceph_assert(state
== DIRTY_NEW
);
495 // find omap object to store the key
496 for (unsigned i
= first_free_idx
; i
< omap_num_objs
; i
++) {
497 if (omap_num_items
[i
] < MAX_ITEMS_PER_OBJ
) {
504 ceph_assert(omap_num_objs
<= MAX_OBJECTS
);
505 omap_num_items
.resize(omap_num_objs
);
506 omap_updates
.resize(omap_num_objs
);
507 omap_updates
.back().clear
= true;
508 omap_idx
= omap_num_objs
- 1;
510 first_free_idx
= omap_idx
;
512 p
->second
.omap_idx
= omap_idx
;
513 ++omap_num_items
[omap_idx
];
517 unsigned& count
= omap_num_items
.at(omap_idx
);
518 ceph_assert(count
> 0);
520 if ((unsigned)omap_idx
< first_free_idx
&& count
< MAX_ITEMS_PER_OBJ
)
521 first_free_idx
= omap_idx
;
523 auto& ctl
= omap_updates
.at(omap_idx
);
524 if (ctl
.write_size
>= max_write_size
) {
525 journal_func(omap_idx
);
528 if (p
!= anchor_map
.end()) {
530 encode(p
->second
, bl
);
531 encode((__u32
)0, bl
); // frags set was encoded here
533 ctl
.write_size
+= bl
.length() + len
+ 2 * sizeof(__u32
);
534 ctl
.to_update
[key
].swap(bl
);
536 ctl
.write_size
+= len
+ sizeof(__u32
);
537 ctl
.to_remove
.emplace(key
);
544 for (auto& [ino
, anchor
] : loaded_anchor_map
) {
546 int len
= snprintf(key
, sizeof(key
), "%llx", (unsigned long long)ino
.val
);
548 int omap_idx
= anchor
.omap_idx
;
549 unsigned& count
= omap_num_items
.at(omap_idx
);
550 ceph_assert(count
> 0);
553 auto& ctl
= omap_updates
.at(omap_idx
);
554 if (ctl
.write_size
>= max_write_size
) {
555 journal_func(omap_idx
);
558 ctl
.write_size
+= len
+ sizeof(__u32
);
559 ctl
.to_remove
.emplace(key
);
561 loaded_anchor_map
.clear();
564 size_t total_items
= 0;
566 unsigned used_objs
= 1;
567 std::vector
<unsigned> objs_to_write
;
568 bool journaled
= false;
569 for (unsigned i
= 0; i
< omap_num_objs
; i
++) {
570 total_items
+= omap_num_items
[i
];
571 if (omap_updates
[i
].journal_idx
)
573 else if (omap_updates
[i
].write_size
)
574 objs_to_write
.push_back(i
);
576 if (omap_num_items
[i
] > 0)
579 ceph_assert(total_items
== anchor_map
.size());
580 // adjust omap object count
581 if (used_objs
< omap_num_objs
) {
582 omap_num_objs
= used_objs
;
583 omap_num_items
.resize(omap_num_objs
);
585 // skip journal if only one osd request is required and object count
587 if (!journaled
&& old_num_objs
== omap_num_objs
&&
588 objs_to_write
.size() <= 1) {
589 ceph_assert(journal_state
== JOURNAL_NONE
);
590 ceph_assert(!gather
.has_subs());
592 unsigned omap_idx
= objs_to_write
.empty() ? 0 : objs_to_write
.front();
593 create_op_func(omap_idx
, true);
599 for (unsigned omap_idx
= 0; omap_idx
< omap_updates
.size(); omap_idx
++) {
600 auto& ctl
= omap_updates
[omap_idx
];
601 if (ctl
.write_size
> 0) {
602 journal_func(omap_idx
);
607 if (journal_state
== JOURNAL_START
) {
608 ceph_assert(gather
.has_subs());
609 journal_state
= JOURNAL_FINISH
;
611 // only object count changes
612 ceph_assert(journal_state
== JOURNAL_NONE
);
613 ceph_assert(!gather
.has_subs());
616 uint64_t total_updates
= 0;
617 uint64_t total_removes
= 0;
619 for (unsigned omap_idx
= 0; omap_idx
< omap_updates
.size(); omap_idx
++) {
620 auto& ctl
= omap_updates
[omap_idx
];
621 ceph_assert(ctl
.to_update
.empty() && ctl
.to_remove
.empty());
622 if (ctl
.journal_idx
== 0)
623 ceph_assert(ctl
.journaled_update
.empty() && ctl
.journaled_remove
.empty());
626 for (auto& it
: ctl
.journaled_update
) {
627 if (ctl
.write_size
>= max_write_size
) {
628 create_op_func(omap_idx
, first
);
632 ctl
.write_size
+= it
.first
.length() + it
.second
.length() + 2 * sizeof(__u32
);
633 ctl
.to_update
[it
.first
].swap(it
.second
);
637 for (auto& key
: ctl
.journaled_remove
) {
638 if (ctl
.write_size
>= max_write_size
) {
639 create_op_func(omap_idx
, first
);
644 ctl
.write_size
+= key
.length() + sizeof(__u32
);
645 ctl
.to_remove
.emplace(key
);
649 for (unsigned i
= 0; i
< ctl
.journal_idx
; ++i
) {
651 snprintf(key
, sizeof(key
), "_journal.%x", i
);
652 ctl
.to_remove
.emplace(key
);
655 // update first object's omap header if object count changes
657 ctl
.journal_idx
> 0 ||
658 (omap_idx
== 0 && old_num_objs
!= omap_num_objs
))
659 create_op_func(omap_idx
, first
);
662 ceph_assert(!ops_map
.empty());
663 if (journal_state
== JOURNAL_FINISH
) {
664 gather
.set_finisher(new C_OnFinisher(new C_IO_OFT_Journal(this, log_seq
, c
, ops_map
),
670 logger
->set(l_oft_omap_total_objs
, omap_num_objs
);
671 logger
->set(l_oft_omap_total_kv_pairs
, total_items
);
672 logger
->inc(l_oft_omap_total_updates
, total_updates
);
673 logger
->inc(l_oft_omap_total_removes
, total_removes
);
676 class C_IO_OFT_Load
: public MDSIOContextBase
{
679 MDSRank
*get_mds() override
{ return oft
->mds
; }
682 int header_r
= 0; //< Return value from OMAP header read
683 int values_r
= 0; //< Return value from OMAP value read
684 bufferlist header_bl
;
685 std::map
<std::string
, bufferlist
> values
;
690 C_IO_OFT_Load(OpenFileTable
*t
, unsigned i
, bool f
) :
691 oft(t
), index(i
), first(f
) {}
692 void finish(int r
) override
{
693 oft
->_load_finish(r
, header_r
, values_r
, index
, first
, more
, header_bl
, values
);
695 void print(ostream
& out
) const override
{
696 out
<< "openfiles_load";
700 class C_IO_OFT_Recover
: public MDSIOContextBase
{
703 MDSRank
*get_mds() override
{ return oft
->mds
; }
705 C_IO_OFT_Recover(OpenFileTable
*t
) : oft(t
) {}
706 void finish(int r
) override
{
707 oft
->_recover_finish(r
);
709 void print(ostream
& out
) const override
{
710 out
<< "openfiles_recover";
714 void OpenFileTable::_recover_finish(int r
)
717 derr
<< __func__
<< " got " << cpp_strerror(r
) << dendl
;
720 dout(10) << __func__
<< ": load complete" << dendl
;
723 journal_state
= JOURNAL_NONE
;
725 finish_contexts(g_ceph_context
, waiting_for_load
);
726 waiting_for_load
.clear();
729 void OpenFileTable::_read_omap_values(const std::string
& key
, unsigned idx
,
732 object_t oid
= get_object_name(idx
);
733 dout(10) << __func__
<< ": load from '" << oid
<< ":" << key
<< "'" << dendl
;
734 object_locator_t
oloc(mds
->get_metadata_pool());
735 C_IO_OFT_Load
*c
= new C_IO_OFT_Load(this, idx
, first
);
738 op
.omap_get_header(&c
->header_bl
, &c
->header_r
);
739 op
.omap_get_vals(key
, "", uint64_t(-1),
740 &c
->values
, &c
->more
, &c
->values_r
);
741 mds
->objecter
->read(oid
, oloc
, op
, CEPH_NOSNAP
, nullptr, 0,
742 new C_OnFinisher(c
, mds
->finisher
));
745 void OpenFileTable::_load_finish(int op_r
, int header_r
, int values_r
,
746 unsigned idx
, bool first
, bool more
,
747 bufferlist
&header_bl
,
748 std::map
<std::string
, bufferlist
> &values
)
751 int err
= -CEPHFS_EINVAL
;
753 auto decode_func
= [this](unsigned idx
, inodeno_t ino
, bufferlist
&bl
) {
754 auto p
= bl
.cbegin();
756 size_t count
= loaded_anchor_map
.size();
757 auto it
= loaded_anchor_map
.emplace_hint(loaded_anchor_map
.end(),
758 std::piecewise_construct
,
759 std::make_tuple(ino
),
761 RecoveredAnchor
& anchor
= it
->second
;
763 frag_vec_t frags
; // unused
765 ceph_assert(ino
== anchor
.ino
);
766 anchor
.omap_idx
= idx
;
767 anchor
.auth
= MDS_RANK_NONE
;
770 if (loaded_anchor_map
.size() > count
)
771 ++omap_num_items
[idx
];
775 derr
<< __func__
<< " got " << cpp_strerror(op_r
) << dendl
;
782 auto p
= header_bl
.cbegin();
789 if (header_bl
.length() == 13) {
796 if (magic
!= CEPH_FS_ONDISK_MAGIC
) {
797 CachedStackStringStream css
;
798 *css
<< "invalid magic '" << magic
<< "'";
799 throw buffer::malformed_input(css
->str());
809 if (num_objs
> MAX_OBJECTS
) {
810 CachedStackStringStream css
;
811 *css
<< "invalid object count '" << num_objs
<< "'";
812 throw buffer::malformed_input(css
->str());
814 if (jstate
> JOURNAL_FINISH
) {
815 CachedStackStringStream css
;
816 *css
<< "invalid journal state '" << jstate
<< "'";
817 throw buffer::malformed_input(css
->str());
820 if (version
> omap_version
) {
821 omap_version
= version
;
822 omap_num_objs
= num_objs
;
823 omap_num_items
.resize(omap_num_objs
);
824 journal_state
= jstate
;
825 } else if (version
== omap_version
) {
826 ceph_assert(omap_num_objs
== num_objs
);
827 if (jstate
> journal_state
)
828 journal_state
= jstate
;
832 for (auto& it
: values
) {
833 if (it
.first
.compare(0, 9, "_journal.") == 0) {
834 if (idx
>= loaded_journals
.size())
835 loaded_journals
.resize(idx
+ 1);
837 if (journal_state
== JOURNAL_FINISH
) {
838 loaded_journals
[idx
][it
.first
].swap(it
.second
);
839 } else { // incomplete journal
840 loaded_journals
[idx
][it
.first
].length();
846 sscanf(it
.first
.c_str(), "%llx", (unsigned long long*)&ino
.val
);
847 decode_func(idx
, ino
, it
.second
);
849 } catch (buffer::error
&e
) {
850 derr
<< __func__
<< ": corrupted header/values: " << e
.what() << dendl
;
854 if (more
|| idx
+ 1 < omap_num_objs
) {
855 // Issue another read if we're not at the end of the omap
856 std::string last_key
;
858 last_key
= values
.rbegin()->first
;
862 _read_omap_values(last_key
, idx
, !more
);
867 if (loaded_journals
.size() > 0) {
868 dout(10) << __func__
<< ": recover journal" << dendl
;
870 C_GatherBuilder
gather(g_ceph_context
,
871 new C_OnFinisher(new C_IO_OFT_Recover(this),
873 object_locator_t
oloc(mds
->get_metadata_pool());
876 for (unsigned omap_idx
= 0; omap_idx
< loaded_journals
.size(); omap_idx
++) {
877 auto& loaded_journal
= loaded_journals
[omap_idx
];
879 std::vector
<ObjectOperation
> op_vec
;
881 for (auto& it
: loaded_journal
) {
882 if (journal_state
!= JOURNAL_FINISH
)
884 auto p
= it
.second
.cbegin();
886 std::map
<string
, bufferlist
> to_update
;
887 std::set
<string
> to_remove
;
889 if (version
!= omap_version
)
891 decode(to_update
, p
);
892 decode(to_remove
, p
);
895 for (auto& q
: to_update
) {
897 sscanf(q
.first
.c_str(), "%llx", (unsigned long long*)&ino
.val
);
898 decode_func(omap_idx
, ino
, q
.second
);
900 for (auto& q
: to_remove
) {
902 sscanf(q
.c_str(), "%llx",(unsigned long long*)&ino
.val
);
903 ceph_assert(ino
.val
> 0);
904 if (loaded_anchor_map
.erase(ino
)) {
905 unsigned& count
= omap_num_items
[omap_idx
];
906 ceph_assert(count
> 0);
911 op_vec
.resize(op_vec
.size() + 1);
912 ObjectOperation
& op
= op_vec
.back();
913 op
.priority
= CEPH_MSG_PRIO_HIGH
;
914 if (!to_update
.empty())
915 op
.omap_set(to_update
);
916 if (!to_remove
.empty())
917 op
.omap_rm_keys(to_remove
);
919 } catch (buffer::error
&e
) {
920 derr
<< __func__
<< ": corrupted journal: " << e
.what() << dendl
;
924 op_vec
.resize(op_vec
.size() + 1);
925 ObjectOperation
& op
= op_vec
.back();
928 if (journal_state
== JOURNAL_FINISH
)
929 _encode_header(header
, JOURNAL_FINISH
);
931 _encode_header(header
, JOURNAL_NONE
);
932 op
.omap_set_header(header
);
936 std::set
<string
> to_remove
;
937 for (auto &it
: loaded_journal
)
938 to_remove
.emplace(it
.first
);
939 op
.omap_rm_keys(to_remove
);
941 loaded_journal
.clear();
943 object_t oid
= get_object_name(omap_idx
);
944 for (auto& op
: op_vec
) {
945 mds
->objecter
->mutate(oid
, oloc
, op
, snapc
, ceph::real_clock::now(),
946 0, gather
.new_sub());
953 journal_state
= JOURNAL_NONE
;
955 dout(10) << __func__
<< ": load complete" << dendl
;
962 finish_contexts(g_ceph_context
, waiting_for_load
);
963 waiting_for_load
.clear();
966 void OpenFileTable::load(MDSContext
*onload
)
968 dout(10) << __func__
<< dendl
;
969 ceph_assert(!load_done
);
971 waiting_for_load
.push_back(onload
);
973 _read_omap_values("", 0, true);
976 void OpenFileTable::_get_ancestors(const Anchor
& parent
,
977 vector
<inode_backpointer_t
>& ancestors
,
978 mds_rank_t
& auth_hint
)
980 inodeno_t dirino
= parent
.dirino
;
981 std::string_view d_name
= parent
.d_name
;
986 ancestors
.push_back(inode_backpointer_t(dirino
, string
{d_name
}, 0));
988 auto p
= loaded_anchor_map
.find(dirino
);
989 if (p
== loaded_anchor_map
.end())
993 auth_hint
= p
->second
.auth
;
995 dirino
= p
->second
.dirino
;
996 d_name
= p
->second
.d_name
;
997 if (dirino
== inodeno_t(0))
1004 class C_OFT_OpenInoFinish
: public MDSContext
{
1007 MDSRank
*get_mds() override
{ return oft
->mds
; }
1009 C_OFT_OpenInoFinish(OpenFileTable
*t
, inodeno_t i
) : oft(t
), ino(i
) {}
1010 void finish(int r
) override
{
1011 oft
->_open_ino_finish(ino
, r
);
1015 void OpenFileTable::_open_ino_finish(inodeno_t ino
, int r
)
1017 if (prefetch_state
== DIR_INODES
&& r
>= 0 && ino
!= inodeno_t(0)) {
1018 auto p
= loaded_anchor_map
.find(ino
);
1019 ceph_assert(p
!= loaded_anchor_map
.end());
1020 p
->second
.auth
= mds_rank_t(r
);
1023 if (r
!= mds
->get_nodeid())
1024 mds
->mdcache
->rejoin_prefetch_ino_finish(ino
, r
);
1026 num_opening_inodes
--;
1027 if (num_opening_inodes
== 0) {
1028 if (prefetch_state
== DIR_INODES
) {
1029 if (g_conf().get_val
<bool>("mds_oft_prefetch_dirfrags")) {
1030 prefetch_state
= DIRFRAGS
;
1031 _prefetch_dirfrags();
1033 prefetch_state
= FILE_INODES
;
1036 } else if (prefetch_state
== FILE_INODES
) {
1037 prefetch_state
= DONE
;
1038 logseg_destroyed_inos
.clear();
1039 destroyed_inos_set
.clear();
1040 finish_contexts(g_ceph_context
, waiting_for_prefetch
);
1041 waiting_for_prefetch
.clear();
1048 void OpenFileTable::_prefetch_dirfrags()
1050 dout(10) << __func__
<< dendl
;
1051 ceph_assert(prefetch_state
== DIRFRAGS
);
1053 MDCache
*mdcache
= mds
->mdcache
;
1054 std::vector
<CDir
*> fetch_queue
;
1056 for (auto& [ino
, anchor
] : loaded_anchor_map
) {
1057 if (anchor
.frags
.empty())
1059 CInode
*diri
= mdcache
->get_inode(ino
);
1062 if (diri
->state_test(CInode::STATE_REJOINUNDEF
))
1065 for (auto& fg
: anchor
.frags
) {
1066 CDir
*dir
= diri
->get_dirfrag(fg
);
1068 if (dir
->is_auth() && !dir
->is_complete())
1069 fetch_queue
.push_back(dir
);
1072 diri
->dirfragtree
.get_leaves_under(fg
, leaves
);
1073 for (auto& leaf
: leaves
) {
1074 if (diri
->is_auth()) {
1075 dir
= diri
->get_or_open_dirfrag(mdcache
, leaf
);
1077 dir
= diri
->get_dirfrag(leaf
);
1079 if (dir
&& dir
->is_auth() && !dir
->is_complete())
1080 fetch_queue
.push_back(dir
);
1086 MDSGatherBuilder
gather(g_ceph_context
);
1087 int num_opening_dirfrags
= 0;
1088 for (const auto& dir
: fetch_queue
) {
1089 if (dir
->state_test(CDir::STATE_REJOINUNDEF
))
1090 ceph_assert(dir
->get_inode()->dirfragtree
.is_leaf(dir
->get_frag()));
1091 dir
->fetch(gather
.new_sub());
1093 if (!(++num_opening_dirfrags
% mds
->heartbeat_reset_grace()))
1094 mds
->heartbeat_reset();
1097 auto finish_func
= [this](int r
) {
1098 prefetch_state
= FILE_INODES
;
1101 if (gather
.has_subs()) {
1102 gather
.set_finisher(
1103 new MDSInternalContextWrapper(mds
,
1104 new LambdaContext(std::move(finish_func
))));
1111 void OpenFileTable::_prefetch_inodes()
1113 dout(10) << __func__
<< " state " << prefetch_state
<< dendl
;
1114 ceph_assert(!num_opening_inodes
);
1115 num_opening_inodes
= 1;
1118 if (prefetch_state
== DIR_INODES
)
1119 pool
= mds
->get_metadata_pool();
1120 else if (prefetch_state
== FILE_INODES
)
1121 pool
= mds
->mdsmap
->get_first_data_pool();
1125 MDCache
*mdcache
= mds
->mdcache
;
1127 if (destroyed_inos_set
.empty()) {
1128 for (auto& it
: logseg_destroyed_inos
)
1129 destroyed_inos_set
.insert(it
.second
.begin(), it
.second
.end());
1132 for (auto& [ino
, anchor
] : loaded_anchor_map
) {
1133 if (destroyed_inos_set
.count(ino
))
1135 if (anchor
.d_type
== DT_DIR
) {
1136 if (prefetch_state
!= DIR_INODES
)
1138 if (MDS_INO_IS_MDSDIR(ino
)) {
1139 anchor
.auth
= MDS_INO_MDSDIR_OWNER(ino
);
1142 if (MDS_INO_IS_STRAY(ino
)) {
1143 anchor
.auth
= MDS_INO_STRAY_OWNER(ino
);
1147 if (prefetch_state
!= FILE_INODES
)
1149 // load all file inodes for MDCache::identify_files_to_recover()
1151 CInode
*in
= mdcache
->get_inode(ino
);
1155 num_opening_inodes
++;
1157 auto fin
= new C_OFT_OpenInoFinish(this, ino
);
1158 if (anchor
.dirino
!= inodeno_t(0)) {
1159 vector
<inode_backpointer_t
> ancestors
;
1160 mds_rank_t auth_hint
= MDS_RANK_NONE
;
1161 _get_ancestors(anchor
, ancestors
, auth_hint
);
1162 mdcache
->open_ino(ino
, pool
, fin
, false, false, &ancestors
, auth_hint
);
1164 mdcache
->open_ino(ino
, pool
, fin
, false);
1167 if (!(num_opening_inodes
% mds
->heartbeat_reset_grace()))
1168 mds
->heartbeat_reset();
1171 _open_ino_finish(inodeno_t(0), 0);
1174 bool OpenFileTable::prefetch_inodes()
1176 dout(10) << __func__
<< dendl
;
1177 ceph_assert(!prefetch_state
);
1178 prefetch_state
= DIR_INODES
;
1182 new MDSInternalContextWrapper(mds
,
1183 new LambdaContext([this](int r
) {
1192 return !is_prefetched();
1195 bool OpenFileTable::should_log_open(CInode
*in
)
1197 if (in
->state_test(CInode::STATE_TRACKEDBYOFT
)) {
1198 // inode just journaled
1199 if (in
->last_journaled
>= committing_log_seq
)
1201 // item not dirty. it means the item has already been saved
1202 auto p
= dirty_items
.find(in
->ino());
1203 if (p
== dirty_items
.end())
1209 void OpenFileTable::note_destroyed_inos(uint64_t seq
, const vector
<inodeno_t
>& inos
)
1211 auto& vec
= logseg_destroyed_inos
[seq
];
1212 vec
.insert(vec
.end(), inos
.begin(), inos
.end());
1215 void OpenFileTable::trim_destroyed_inos(uint64_t seq
)
1217 auto p
= logseg_destroyed_inos
.begin();
1218 while (p
!= logseg_destroyed_inos
.end()) {
1219 if (p
->first
>= seq
)
1221 logseg_destroyed_inos
.erase(p
++);