]> git.proxmox.com Git - ceph.git/blame - ceph/src/mds/OpenFileTable.cc
import ceph quincy 17.2.4
[ceph.git] / ceph / src / mds / OpenFileTable.cc
CommitLineData
11fdf7f2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2018 Red Hat
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include "acconfig.h"
16#include "mds/CInode.h"
17#include "mds/CDir.h"
18#include "mds/MDSRank.h"
19#include "mds/MDCache.h"
20#include "osdc/Objecter.h"
21#include "OpenFileTable.h"
22
23#include "common/config.h"
24#include "common/errno.h"
25
9f95a23c
TL
26enum {
27 l_oft_first = 1000000,
28 l_oft_omap_total_objs,
29 l_oft_omap_total_kv_pairs,
30 l_oft_omap_total_updates,
31 l_oft_omap_total_removes,
32 l_oft_last
33};
34
11fdf7f2
TL
35#define dout_context g_ceph_context
36#define dout_subsys ceph_subsys_mds
37#undef dout_prefix
38#define dout_prefix _prefix(_dout, mds)
20effc67
TL
39
40using namespace std;
41
42static std::ostream& _prefix(std::ostream *_dout, MDSRank *mds) {
11fdf7f2
TL
43 return *_dout << "mds." << mds->get_nodeid() << ".openfiles ";
44}
45
9f95a23c
TL
46OpenFileTable::OpenFileTable(MDSRank *m) : mds(m) {
47 PerfCountersBuilder b(mds->cct, "oft", l_oft_first, l_oft_last);
48
49 b.add_u64(l_oft_omap_total_objs, "omap_total_objs");
50 b.add_u64(l_oft_omap_total_kv_pairs, "omap_total_kv_pairs");
51 b.add_u64(l_oft_omap_total_updates, "omap_total_updates");
52 b.add_u64(l_oft_omap_total_removes, "omap_total_removes");
53 logger.reset(b.create_perf_counters());
54 mds->cct->get_perfcounters_collection()->add(logger.get());
55 logger->set(l_oft_omap_total_objs, 0);
56 logger->set(l_oft_omap_total_kv_pairs, 0);
57 logger->set(l_oft_omap_total_updates, 0);
58 logger->set(l_oft_omap_total_removes, 0);
59}
60
61OpenFileTable::~OpenFileTable() {
62 if (logger) {
63 mds->cct->get_perfcounters_collection()->remove(logger.get());
64 }
65}
66
f91f0fd5 67void OpenFileTable::get_ref(CInode *in, frag_t fg)
11fdf7f2
TL
68{
69 do {
70 auto p = anchor_map.find(in->ino());
f91f0fd5
TL
71 if (!in->is_dir()) {
72 ceph_assert(fg == -1U);
73 ceph_assert(p == anchor_map.end());
74 }
75
11fdf7f2
TL
76 if (p != anchor_map.end()) {
77 ceph_assert(in->state_test(CInode::STATE_TRACKEDBYOFT));
78 ceph_assert(p->second.nref > 0);
79 p->second.nref++;
f91f0fd5
TL
80
81 if (fg != -1U) {
82 auto ret = p->second.frags.insert(fg);
83 ceph_assert(ret.second);
84 dirty_items.emplace(in->ino(), (int)DIRTY_UNDEF);
85 }
11fdf7f2
TL
86 break;
87 }
88
89 CDentry *dn = in->get_parent_dn();
90 CInode *pin = dn ? dn->get_dir()->get_inode() : nullptr;
91
92 auto ret = anchor_map.emplace(std::piecewise_construct, std::forward_as_tuple(in->ino()),
93 std::forward_as_tuple(in->ino(), (pin ? pin->ino() : inodeno_t(0)),
94 (dn ? dn->get_name() : string()), in->d_type(), 1));
95 ceph_assert(ret.second == true);
96 in->state_set(CInode::STATE_TRACKEDBYOFT);
97
f91f0fd5
TL
98 if (fg != -1U)
99 ret.first->second.frags.insert(fg);
100
11fdf7f2
TL
101 auto ret1 = dirty_items.emplace(in->ino(), (int)DIRTY_NEW);
102 if (!ret1.second) {
103 int omap_idx = ret1.first->second;
104 ceph_assert(omap_idx >= 0);
105 ret.first->second.omap_idx = omap_idx;
106 }
107
108 in = pin;
f91f0fd5 109 fg = -1U;
11fdf7f2
TL
110 } while (in);
111}
112
f91f0fd5 113void OpenFileTable::put_ref(CInode *in, frag_t fg)
11fdf7f2
TL
114{
115 do {
116 ceph_assert(in->state_test(CInode::STATE_TRACKEDBYOFT));
117 auto p = anchor_map.find(in->ino());
118 ceph_assert(p != anchor_map.end());
119 ceph_assert(p->second.nref > 0);
120
f91f0fd5
TL
121 if (!in->is_dir()) {
122 ceph_assert(fg == -1U);
123 ceph_assert(p->second.nref == 1);
124 }
125
11fdf7f2
TL
126 if (p->second.nref > 1) {
127 p->second.nref--;
f91f0fd5
TL
128 if (fg != -1U) {
129 auto ret = p->second.frags.erase(fg);
130 ceph_assert(ret);
131 dirty_items.emplace(in->ino(), (int)DIRTY_UNDEF);
132 }
11fdf7f2
TL
133 break;
134 }
135
136 CDentry *dn = in->get_parent_dn();
137 CInode *pin = dn ? dn->get_dir()->get_inode() : nullptr;
138 if (dn) {
139 ceph_assert(p->second.dirino == pin->ino());
140 ceph_assert(p->second.d_name == dn->get_name());
141 } else {
142 ceph_assert(p->second.dirino == inodeno_t(0));
143 ceph_assert(p->second.d_name == "");
144 }
145
f91f0fd5
TL
146 if (fg != -1U) {
147 ceph_assert(p->second.frags.size() == 1);
148 ceph_assert(*p->second.frags.begin() == fg);
149 }
150
11fdf7f2
TL
151 int omap_idx = p->second.omap_idx;
152 anchor_map.erase(p);
153 in->state_clear(CInode::STATE_TRACKEDBYOFT);
154
155 auto ret = dirty_items.emplace(in->ino(), omap_idx);
156 if (!ret.second) {
157 if (ret.first->second == DIRTY_NEW) {
158 ceph_assert(omap_idx < 0);
159 dirty_items.erase(ret.first);
160 } else {
161 ceph_assert(omap_idx >= 0);
162 ret.first->second = omap_idx;
163 }
164 }
165
166 in = pin;
f91f0fd5 167 fg = -1U;
11fdf7f2
TL
168 } while (in);
169}
170
171void OpenFileTable::add_inode(CInode *in)
172{
173 dout(10) << __func__ << " " << *in << dendl;
11fdf7f2
TL
174 get_ref(in);
175}
176
177void OpenFileTable::remove_inode(CInode *in)
178{
179 dout(10) << __func__ << " " << *in << dendl;
11fdf7f2
TL
180 put_ref(in);
181}
182
183void OpenFileTable::add_dirfrag(CDir *dir)
184{
185 dout(10) << __func__ << " " << *dir << dendl;
186 ceph_assert(!dir->state_test(CDir::STATE_TRACKEDBYOFT));
187 dir->state_set(CDir::STATE_TRACKEDBYOFT);
f91f0fd5 188 get_ref(dir->get_inode(), dir->get_frag());
11fdf7f2
TL
189}
190
191void OpenFileTable::remove_dirfrag(CDir *dir)
192{
193 dout(10) << __func__ << " " << *dir << dendl;
194 ceph_assert(dir->state_test(CDir::STATE_TRACKEDBYOFT));
195 dir->state_clear(CDir::STATE_TRACKEDBYOFT);
f91f0fd5 196 put_ref(dir->get_inode(), dir->get_frag());
11fdf7f2
TL
197}
198
199void OpenFileTable::notify_link(CInode *in)
200{
201 dout(10) << __func__ << " " << *in << dendl;
202 auto p = anchor_map.find(in->ino());
203 ceph_assert(p != anchor_map.end());
204 ceph_assert(p->second.nref > 0);
205 ceph_assert(p->second.dirino == inodeno_t(0));
206 ceph_assert(p->second.d_name == "");
207
208 CDentry *dn = in->get_parent_dn();
209 CInode *pin = dn->get_dir()->get_inode();
210
211 p->second.dirino = pin->ino();
212 p->second.d_name = dn->get_name();
213 dirty_items.emplace(in->ino(), (int)DIRTY_UNDEF);
214
215 get_ref(pin);
216}
217
218void OpenFileTable::notify_unlink(CInode *in)
219{
220 dout(10) << __func__ << " " << *in << dendl;
221 auto p = anchor_map.find(in->ino());
222 ceph_assert(p != anchor_map.end());
223 ceph_assert(p->second.nref > 0);
224
225 CDentry *dn = in->get_parent_dn();
226 CInode *pin = dn->get_dir()->get_inode();
227 ceph_assert(p->second.dirino == pin->ino());
228 ceph_assert(p->second.d_name == dn->get_name());
229
230 p->second.dirino = inodeno_t(0);
231 p->second.d_name = "";
232 dirty_items.emplace(in->ino(), (int)DIRTY_UNDEF);
233
234 put_ref(pin);
235}
236
237object_t OpenFileTable::get_object_name(unsigned idx) const
238{
239 char s[30];
240 snprintf(s, sizeof(s), "mds%d_openfiles.%x", int(mds->get_nodeid()), idx);
241 return object_t(s);
242}
243
244void OpenFileTable::_encode_header(bufferlist &bl, int j_state)
245{
246 std::string_view magic = CEPH_FS_ONDISK_MAGIC;
247 encode(magic, bl);
248 ENCODE_START(1, 1, bl);
249 encode(omap_version, bl);
250 encode(omap_num_objs, bl);
251 encode((__u8)j_state, bl);
252 ENCODE_FINISH(bl);
253}
254
255class C_IO_OFT_Save : public MDSIOContextBase {
256protected:
257 OpenFileTable *oft;
258 uint64_t log_seq;
259 MDSContext *fin;
260 MDSRank *get_mds() override { return oft->mds; }
261public:
262 C_IO_OFT_Save(OpenFileTable *t, uint64_t s, MDSContext *c) :
263 oft(t), log_seq(s), fin(c) {}
264 void finish(int r) {
265 oft->_commit_finish(r, log_seq, fin);
266 }
267 void print(ostream& out) const override {
268 out << "openfiles_save";
269 }
270};
271
272void OpenFileTable::_commit_finish(int r, uint64_t log_seq, MDSContext *fin)
273{
2a845540
TL
274 dout(10) << __func__ << " log_seq " << log_seq << " committed_log_seq " << committed_log_seq
275 << " committing_log_seq " << committing_log_seq << dendl;
11fdf7f2
TL
276 if (r < 0) {
277 mds->handle_write_error(r);
278 return;
279 }
280
2a845540 281 ceph_assert(log_seq == committing_log_seq);
11fdf7f2
TL
282 ceph_assert(log_seq >= committed_log_seq);
283 committed_log_seq = log_seq;
284 num_pending_commit--;
285
286 if (fin)
287 fin->complete(r);
288}
289
290class C_IO_OFT_Journal : public MDSIOContextBase {
291protected:
292 OpenFileTable *oft;
293 uint64_t log_seq;
294 MDSContext *fin;
295 std::map<unsigned, std::vector<ObjectOperation> > ops_map;
296 MDSRank *get_mds() override { return oft->mds; }
297public:
298 C_IO_OFT_Journal(OpenFileTable *t, uint64_t s, MDSContext *c,
299 std::map<unsigned, std::vector<ObjectOperation> >& ops) :
300 oft(t), log_seq(s), fin(c) {
301 ops_map.swap(ops);
302 }
303 void finish(int r) {
304 oft->_journal_finish(r, log_seq, fin, ops_map);
305 }
306 void print(ostream& out) const override {
307 out << "openfiles_journal";
308 }
309};
310
311void OpenFileTable::_journal_finish(int r, uint64_t log_seq, MDSContext *c,
312 std::map<unsigned, std::vector<ObjectOperation> >& ops_map)
313{
314 dout(10) << __func__ << " log_seq " << log_seq << dendl;
315 if (r < 0) {
316 mds->handle_write_error(r);
317 return;
318 }
319
320 C_GatherBuilder gather(g_ceph_context,
321 new C_OnFinisher(new C_IO_OFT_Save(this, log_seq, c),
322 mds->finisher));
323 SnapContext snapc;
b3b6e05e 324 object_locator_t oloc(mds->get_metadata_pool());
f67539c2
TL
325 for (auto& [idx, vops] : ops_map) {
326 object_t oid = get_object_name(idx);
327 for (auto& op : vops) {
11fdf7f2
TL
328 mds->objecter->mutate(oid, oloc, op, snapc, ceph::real_clock::now(),
329 0, gather.new_sub());
330 }
331 }
332 gather.activate();
333
334 journal_state = JOURNAL_NONE;
335 return;
336}
337
338void OpenFileTable::commit(MDSContext *c, uint64_t log_seq, int op_prio)
339{
2a845540
TL
340 dout(10) << __func__ << " log_seq " << log_seq << " committing_log_seq:"
341 << committing_log_seq << dendl;
11fdf7f2
TL
342
343 ceph_assert(num_pending_commit == 0);
344 num_pending_commit++;
345 ceph_assert(log_seq >= committing_log_seq);
346 committing_log_seq = log_seq;
347
348 omap_version++;
349
350 C_GatherBuilder gather(g_ceph_context);
351
352 SnapContext snapc;
b3b6e05e 353 object_locator_t oloc(mds->get_metadata_pool());
11fdf7f2
TL
354
355 const unsigned max_write_size = mds->mdcache->max_dir_commit_size;
356
357 struct omap_update_ctl {
358 unsigned write_size = 0;
359 unsigned journal_idx = 0;
360 bool clear = false;
361 std::map<string, bufferlist> to_update, journaled_update;
362 std::set<string> to_remove, journaled_remove;
363 };
364 std::vector<omap_update_ctl> omap_updates(omap_num_objs);
365
366 using ceph::encode;
367 auto journal_func = [&](unsigned idx) {
368 auto& ctl = omap_updates.at(idx);
369
370 ObjectOperation op;
371 op.priority = op_prio;
372
373 if (ctl.clear) {
374 ctl.clear = false;
375 op.omap_clear();
376 op.set_last_op_flags(CEPH_OSD_OP_FLAG_FAILOK);
377 }
378
379 if (ctl.journal_idx == 0) {
380 if (journal_state == JOURNAL_NONE)
381 journal_state = JOURNAL_START;
382 else
383 ceph_assert(journal_state == JOURNAL_START);
384
385 bufferlist header;
386 _encode_header(header, journal_state);
387 op.omap_set_header(header);
388 }
389
390 bufferlist bl;
391 encode(omap_version, bl);
392 encode(ctl.to_update, bl);
393 encode(ctl.to_remove, bl);
394
395 char key[32];
396 snprintf(key, sizeof(key), "_journal.%x", ctl.journal_idx++);
397 std::map<string, bufferlist> tmp_map;
398 tmp_map[key].swap(bl);
399 op.omap_set(tmp_map);
400
401 object_t oid = get_object_name(idx);
402 mds->objecter->mutate(oid, oloc, op, snapc, ceph::real_clock::now(), 0,
403 gather.new_sub());
404
405#ifdef HAVE_STDLIB_MAP_SPLICING
406 ctl.journaled_update.merge(ctl.to_update);
407 ctl.journaled_remove.merge(ctl.to_remove);
408#else
409 ctl.journaled_update.insert(make_move_iterator(begin(ctl.to_update)),
410 make_move_iterator(end(ctl.to_update)));
411 ctl.journaled_remove.insert(make_move_iterator(begin(ctl.to_remove)),
412 make_move_iterator(end(ctl.to_remove)));
413#endif
414 ctl.to_update.clear();
415 ctl.to_remove.clear();
416 };
417
418 std::map<unsigned, std::vector<ObjectOperation> > ops_map;
419
420 auto create_op_func = [&](unsigned idx, bool update_header) {
421 auto& ctl = omap_updates.at(idx);
422
423 auto& op_vec = ops_map[idx];
424 op_vec.resize(op_vec.size() + 1);
425 ObjectOperation& op = op_vec.back();
426 op.priority = op_prio;
427
428 if (ctl.clear) {
429 ctl.clear = false;
430 op.omap_clear();
431 op.set_last_op_flags(CEPH_OSD_OP_FLAG_FAILOK);
432 }
433
434 if (update_header) {
435 bufferlist header;
436 _encode_header(header, journal_state);
437 op.omap_set_header(header);
438 }
439
440 if (!ctl.to_update.empty()) {
441 op.omap_set(ctl.to_update);
442 ctl.to_update.clear();
443 }
444 if (!ctl.to_remove.empty()) {
445 op.omap_rm_keys(ctl.to_remove);
446 ctl.to_remove.clear();
447 }
448 };
449
450 auto submit_ops_func = [&]() {
451 gather.set_finisher(new C_OnFinisher(new C_IO_OFT_Save(this, log_seq, c),
452 mds->finisher));
f67539c2
TL
453 for (auto& [idx, vops] : ops_map) {
454 object_t oid = get_object_name(idx);
455 for (auto& op : vops) {
11fdf7f2
TL
456 mds->objecter->mutate(oid, oloc, op, snapc, ceph::real_clock::now(),
457 0, gather.new_sub());
458 }
459 }
460 gather.activate();
461 };
462
463 bool first_commit = !loaded_anchor_map.empty();
464
465 unsigned first_free_idx = 0;
466 unsigned old_num_objs = omap_num_objs;
467 if (omap_num_objs == 0) {
468 omap_num_objs = 1;
469 omap_num_items.resize(omap_num_objs);
470 omap_updates.resize(omap_num_objs);
471 omap_updates.back().clear = true;
472 }
473
f67539c2
TL
474 for (auto& [ino, state] : dirty_items) {
475 auto p = anchor_map.find(ino);
11fdf7f2
TL
476
477 if (first_commit) {
f67539c2 478 auto q = loaded_anchor_map.find(ino);
11fdf7f2
TL
479 if (q != loaded_anchor_map.end()) {
480 ceph_assert(p != anchor_map.end());
481 p->second.omap_idx = q->second.omap_idx;
f91f0fd5 482 bool same = (p->second == q->second);
11fdf7f2
TL
483 loaded_anchor_map.erase(q);
484 if (same)
485 continue;
486 }
487 }
488
489 char key[32];
f67539c2 490 int len = snprintf(key, sizeof(key), "%llx", (unsigned long long)ino.val);
11fdf7f2
TL
491
492 int omap_idx;
493 if (p != anchor_map.end()) {
494 omap_idx = p->second.omap_idx;
495 if (omap_idx < 0) {
f67539c2 496 ceph_assert(state == DIRTY_NEW);
11fdf7f2
TL
497 // find omap object to store the key
498 for (unsigned i = first_free_idx; i < omap_num_objs; i++) {
92f5a8d4 499 if (omap_num_items[i] < MAX_ITEMS_PER_OBJ) {
11fdf7f2 500 omap_idx = i;
92f5a8d4
TL
501 break;
502 }
11fdf7f2
TL
503 }
504 if (omap_idx < 0) {
505 ++omap_num_objs;
506 ceph_assert(omap_num_objs <= MAX_OBJECTS);
507 omap_num_items.resize(omap_num_objs);
508 omap_updates.resize(omap_num_objs);
509 omap_updates.back().clear = true;
510 omap_idx = omap_num_objs - 1;
511 }
512 first_free_idx = omap_idx;
513
514 p->second.omap_idx = omap_idx;
515 ++omap_num_items[omap_idx];
516 }
517 } else {
f67539c2 518 omap_idx = state;
11fdf7f2
TL
519 unsigned& count = omap_num_items.at(omap_idx);
520 ceph_assert(count > 0);
521 --count;
522 if ((unsigned)omap_idx < first_free_idx && count < MAX_ITEMS_PER_OBJ)
523 first_free_idx = omap_idx;
524 }
525 auto& ctl = omap_updates.at(omap_idx);
f67539c2
TL
526 if (ctl.write_size >= max_write_size) {
527 journal_func(omap_idx);
528 ctl.write_size = 0;
529 }
11fdf7f2
TL
530 if (p != anchor_map.end()) {
531 bufferlist bl;
532 encode(p->second, bl);
f91f0fd5 533 encode((__u32)0, bl); // frags set was encoded here
11fdf7f2
TL
534
535 ctl.write_size += bl.length() + len + 2 * sizeof(__u32);
536 ctl.to_update[key].swap(bl);
537 } else {
538 ctl.write_size += len + sizeof(__u32);
539 ctl.to_remove.emplace(key);
540 }
11fdf7f2
TL
541 }
542
543 dirty_items.clear();
544
545 if (first_commit) {
f67539c2 546 for (auto& [ino, anchor] : loaded_anchor_map) {
11fdf7f2 547 char key[32];
f67539c2 548 int len = snprintf(key, sizeof(key), "%llx", (unsigned long long)ino.val);
11fdf7f2 549
f67539c2 550 int omap_idx = anchor.omap_idx;
11fdf7f2
TL
551 unsigned& count = omap_num_items.at(omap_idx);
552 ceph_assert(count > 0);
553 --count;
554
555 auto& ctl = omap_updates.at(omap_idx);
11fdf7f2 556 if (ctl.write_size >= max_write_size) {
f67539c2
TL
557 journal_func(omap_idx);
558 ctl.write_size = 0;
11fdf7f2 559 }
f67539c2
TL
560 ctl.write_size += len + sizeof(__u32);
561 ctl.to_remove.emplace(key);
11fdf7f2
TL
562 }
563 loaded_anchor_map.clear();
11fdf7f2
TL
564 }
565
9f95a23c 566 size_t total_items = 0;
11fdf7f2 567 {
11fdf7f2 568 unsigned used_objs = 1;
9f95a23c 569 std::vector<unsigned> objs_to_write;
11fdf7f2
TL
570 bool journaled = false;
571 for (unsigned i = 0; i < omap_num_objs; i++) {
572 total_items += omap_num_items[i];
573 if (omap_updates[i].journal_idx)
574 journaled = true;
575 else if (omap_updates[i].write_size)
576 objs_to_write.push_back(i);
577
578 if (omap_num_items[i] > 0)
579 used_objs = i + 1;
580 }
581 ceph_assert(total_items == anchor_map.size());
582 // adjust omap object count
583 if (used_objs < omap_num_objs) {
584 omap_num_objs = used_objs;
585 omap_num_items.resize(omap_num_objs);
586 }
587 // skip journal if only one osd request is required and object count
588 // does not change.
589 if (!journaled && old_num_objs == omap_num_objs &&
590 objs_to_write.size() <= 1) {
591 ceph_assert(journal_state == JOURNAL_NONE);
592 ceph_assert(!gather.has_subs());
593
594 unsigned omap_idx = objs_to_write.empty() ? 0 : objs_to_write.front();
595 create_op_func(omap_idx, true);
596 submit_ops_func();
597 return;
598 }
599 }
600
601 for (unsigned omap_idx = 0; omap_idx < omap_updates.size(); omap_idx++) {
602 auto& ctl = omap_updates[omap_idx];
603 if (ctl.write_size > 0) {
604 journal_func(omap_idx);
605 ctl.write_size = 0;
606 }
607 }
608
609 if (journal_state == JOURNAL_START) {
610 ceph_assert(gather.has_subs());
611 journal_state = JOURNAL_FINISH;
612 } else {
613 // only object count changes
614 ceph_assert(journal_state == JOURNAL_NONE);
615 ceph_assert(!gather.has_subs());
616 }
617
9f95a23c
TL
618 uint64_t total_updates = 0;
619 uint64_t total_removes = 0;
620
11fdf7f2
TL
621 for (unsigned omap_idx = 0; omap_idx < omap_updates.size(); omap_idx++) {
622 auto& ctl = omap_updates[omap_idx];
623 ceph_assert(ctl.to_update.empty() && ctl.to_remove.empty());
624 if (ctl.journal_idx == 0)
625 ceph_assert(ctl.journaled_update.empty() && ctl.journaled_remove.empty());
626
627 bool first = true;
628 for (auto& it : ctl.journaled_update) {
11fdf7f2 629 if (ctl.write_size >= max_write_size) {
f67539c2
TL
630 create_op_func(omap_idx, first);
631 ctl.write_size = 0;
632 first = false;
11fdf7f2 633 }
f67539c2
TL
634 ctl.write_size += it.first.length() + it.second.length() + 2 * sizeof(__u32);
635 ctl.to_update[it.first].swap(it.second);
9f95a23c 636 total_updates++;
11fdf7f2
TL
637 }
638
639 for (auto& key : ctl.journaled_remove) {
11fdf7f2 640 if (ctl.write_size >= max_write_size) {
f67539c2
TL
641 create_op_func(omap_idx, first);
642 ctl.write_size = 0;
643 first = false;
11fdf7f2 644 }
f67539c2
TL
645
646 ctl.write_size += key.length() + sizeof(__u32);
647 ctl.to_remove.emplace(key);
9f95a23c 648 total_removes++;
11fdf7f2
TL
649 }
650
651 for (unsigned i = 0; i < ctl.journal_idx; ++i) {
652 char key[32];
653 snprintf(key, sizeof(key), "_journal.%x", i);
654 ctl.to_remove.emplace(key);
655 }
656
657 // update first object's omap header if object count changes
658 if (ctl.clear ||
659 ctl.journal_idx > 0 ||
660 (omap_idx == 0 && old_num_objs != omap_num_objs))
661 create_op_func(omap_idx, first);
662 }
663
664 ceph_assert(!ops_map.empty());
665 if (journal_state == JOURNAL_FINISH) {
666 gather.set_finisher(new C_OnFinisher(new C_IO_OFT_Journal(this, log_seq, c, ops_map),
667 mds->finisher));
668 gather.activate();
669 } else {
670 submit_ops_func();
671 }
9f95a23c
TL
672 logger->set(l_oft_omap_total_objs, omap_num_objs);
673 logger->set(l_oft_omap_total_kv_pairs, total_items);
674 logger->inc(l_oft_omap_total_updates, total_updates);
675 logger->inc(l_oft_omap_total_removes, total_removes);
11fdf7f2
TL
676}
677
678class C_IO_OFT_Load : public MDSIOContextBase {
679protected:
680 OpenFileTable *oft;
681 MDSRank *get_mds() override { return oft->mds; }
682
683public:
684 int header_r = 0; //< Return value from OMAP header read
685 int values_r = 0; //< Return value from OMAP value read
686 bufferlist header_bl;
687 std::map<std::string, bufferlist> values;
688 unsigned index;
689 bool first;
690 bool more = false;
691
692 C_IO_OFT_Load(OpenFileTable *t, unsigned i, bool f) :
693 oft(t), index(i), first(f) {}
694 void finish(int r) override {
695 oft->_load_finish(r, header_r, values_r, index, first, more, header_bl, values);
696 }
697 void print(ostream& out) const override {
698 out << "openfiles_load";
699 }
700};
701
702class C_IO_OFT_Recover : public MDSIOContextBase {
703protected:
704 OpenFileTable *oft;
705 MDSRank *get_mds() override { return oft->mds; }
706public:
707 C_IO_OFT_Recover(OpenFileTable *t) : oft(t) {}
708 void finish(int r) override {
709 oft->_recover_finish(r);
710 }
711 void print(ostream& out) const override {
712 out << "openfiles_recover";
713 }
714};
715
716void OpenFileTable::_recover_finish(int r)
717{
718 if (r < 0) {
719 derr << __func__ << " got " << cpp_strerror(r) << dendl;
720 _reset_states();
721 } else {
722 dout(10) << __func__ << ": load complete" << dendl;
723 }
724
725 journal_state = JOURNAL_NONE;
726 load_done = true;
727 finish_contexts(g_ceph_context, waiting_for_load);
728 waiting_for_load.clear();
729}
730
f67539c2
TL
731void OpenFileTable::_read_omap_values(const std::string& key, unsigned idx,
732 bool first)
733{
734 object_t oid = get_object_name(idx);
735 dout(10) << __func__ << ": load from '" << oid << ":" << key << "'" << dendl;
b3b6e05e 736 object_locator_t oloc(mds->get_metadata_pool());
f67539c2
TL
737 C_IO_OFT_Load *c = new C_IO_OFT_Load(this, idx, first);
738 ObjectOperation op;
739 if (first)
740 op.omap_get_header(&c->header_bl, &c->header_r);
741 op.omap_get_vals(key, "", uint64_t(-1),
742 &c->values, &c->more, &c->values_r);
743 mds->objecter->read(oid, oloc, op, CEPH_NOSNAP, nullptr, 0,
744 new C_OnFinisher(c, mds->finisher));
745}
746
11fdf7f2
TL
747void OpenFileTable::_load_finish(int op_r, int header_r, int values_r,
748 unsigned idx, bool first, bool more,
749 bufferlist &header_bl,
750 std::map<std::string, bufferlist> &values)
751{
752 using ceph::decode;
f67539c2 753 int err = -CEPHFS_EINVAL;
11fdf7f2
TL
754
755 auto decode_func = [this](unsigned idx, inodeno_t ino, bufferlist &bl) {
756 auto p = bl.cbegin();
757
758 size_t count = loaded_anchor_map.size();
759 auto it = loaded_anchor_map.emplace_hint(loaded_anchor_map.end(),
760 std::piecewise_construct,
761 std::make_tuple(ino),
762 std::make_tuple());
763 RecoveredAnchor& anchor = it->second;
764 decode(anchor, p);
f91f0fd5
TL
765 frag_vec_t frags; // unused
766 decode(frags, p);
11fdf7f2
TL
767 ceph_assert(ino == anchor.ino);
768 anchor.omap_idx = idx;
769 anchor.auth = MDS_RANK_NONE;
770
11fdf7f2
TL
771
772 if (loaded_anchor_map.size() > count)
773 ++omap_num_items[idx];
774 };
775
776 if (op_r < 0) {
777 derr << __func__ << " got " << cpp_strerror(op_r) << dendl;
778 err = op_r;
779 goto out;
780 }
781
782 try {
783 if (first) {
784 auto p = header_bl.cbegin();
785
786 string magic;
787 version_t version;
788 unsigned num_objs;
789 __u8 jstate;
790
791 if (header_bl.length() == 13) {
792 // obsolete format.
793 decode(version, p);
794 decode(num_objs, p);
795 decode(jstate, p);
796 } else {
797 decode(magic, p);
798 if (magic != CEPH_FS_ONDISK_MAGIC) {
f67539c2
TL
799 CachedStackStringStream css;
800 *css << "invalid magic '" << magic << "'";
801 throw buffer::malformed_input(css->str());
11fdf7f2
TL
802 }
803
804 DECODE_START(1, p);
805 decode(version, p);
806 decode(num_objs, p);
807 decode(jstate, p);
808 DECODE_FINISH(p);
809 }
810
811 if (num_objs > MAX_OBJECTS) {
f67539c2
TL
812 CachedStackStringStream css;
813 *css << "invalid object count '" << num_objs << "'";
814 throw buffer::malformed_input(css->str());
11fdf7f2
TL
815 }
816 if (jstate > JOURNAL_FINISH) {
f67539c2
TL
817 CachedStackStringStream css;
818 *css << "invalid journal state '" << jstate << "'";
819 throw buffer::malformed_input(css->str());
11fdf7f2
TL
820 }
821
822 if (version > omap_version) {
823 omap_version = version;
824 omap_num_objs = num_objs;
825 omap_num_items.resize(omap_num_objs);
826 journal_state = jstate;
827 } else if (version == omap_version) {
828 ceph_assert(omap_num_objs == num_objs);
829 if (jstate > journal_state)
830 journal_state = jstate;
831 }
832 }
833
834 for (auto& it : values) {
835 if (it.first.compare(0, 9, "_journal.") == 0) {
836 if (idx >= loaded_journals.size())
837 loaded_journals.resize(idx + 1);
838
839 if (journal_state == JOURNAL_FINISH) {
840 loaded_journals[idx][it.first].swap(it.second);
841 } else { // incomplete journal
842 loaded_journals[idx][it.first].length();
843 }
844 continue;
845 }
846
847 inodeno_t ino;
848 sscanf(it.first.c_str(), "%llx", (unsigned long long*)&ino.val);
849 decode_func(idx, ino, it.second);
850 }
851 } catch (buffer::error &e) {
852 derr << __func__ << ": corrupted header/values: " << e.what() << dendl;
853 goto out;
854 }
855
856 if (more || idx + 1 < omap_num_objs) {
857 // Issue another read if we're not at the end of the omap
858 std::string last_key;
859 if (more)
860 last_key = values.rbegin()->first;
861 else
862 idx++;
f67539c2
TL
863
864 _read_omap_values(last_key, idx, !more);
11fdf7f2
TL
865 return;
866 }
867
868 // replay journal
869 if (loaded_journals.size() > 0) {
870 dout(10) << __func__ << ": recover journal" << dendl;
871
872 C_GatherBuilder gather(g_ceph_context,
873 new C_OnFinisher(new C_IO_OFT_Recover(this),
874 mds->finisher));
b3b6e05e 875 object_locator_t oloc(mds->get_metadata_pool());
11fdf7f2
TL
876 SnapContext snapc;
877
878 for (unsigned omap_idx = 0; omap_idx < loaded_journals.size(); omap_idx++) {
879 auto& loaded_journal = loaded_journals[omap_idx];
880
881 std::vector<ObjectOperation> op_vec;
882 try {
883 for (auto& it : loaded_journal) {
884 if (journal_state != JOURNAL_FINISH)
885 continue;
886 auto p = it.second.cbegin();
887 version_t version;
888 std::map<string, bufferlist> to_update;
889 std::set<string> to_remove;
890 decode(version, p);
891 if (version != omap_version)
892 continue;
893 decode(to_update, p);
894 decode(to_remove, p);
895 it.second.clear();
896
897 for (auto& q : to_update) {
898 inodeno_t ino;
899 sscanf(q.first.c_str(), "%llx", (unsigned long long*)&ino.val);
900 decode_func(omap_idx, ino, q.second);
901 }
902 for (auto& q : to_remove) {
903 inodeno_t ino;
904 sscanf(q.c_str(), "%llx",(unsigned long long*)&ino.val);
905 ceph_assert(ino.val > 0);
906 if (loaded_anchor_map.erase(ino)) {
907 unsigned& count = omap_num_items[omap_idx];
908 ceph_assert(count > 0);
909 --count;
910 }
11fdf7f2
TL
911 }
912
913 op_vec.resize(op_vec.size() + 1);
914 ObjectOperation& op = op_vec.back();
915 op.priority = CEPH_MSG_PRIO_HIGH;
916 if (!to_update.empty())
917 op.omap_set(to_update);
918 if (!to_remove.empty())
919 op.omap_rm_keys(to_remove);
920 }
921 } catch (buffer::error &e) {
922 derr << __func__ << ": corrupted journal: " << e.what() << dendl;
923 goto out;
924 }
925
926 op_vec.resize(op_vec.size() + 1);
927 ObjectOperation& op = op_vec.back();
928 {
929 bufferlist header;
930 if (journal_state == JOURNAL_FINISH)
931 _encode_header(header, JOURNAL_FINISH);
932 else
933 _encode_header(header, JOURNAL_NONE);
934 op.omap_set_header(header);
935 }
936 {
937 // remove journal
938 std::set<string> to_remove;
939 for (auto &it : loaded_journal)
940 to_remove.emplace(it.first);
941 op.omap_rm_keys(to_remove);
942 }
943 loaded_journal.clear();
944
945 object_t oid = get_object_name(omap_idx);
946 for (auto& op : op_vec) {
947 mds->objecter->mutate(oid, oloc, op, snapc, ceph::real_clock::now(),
948 0, gather.new_sub());
949 }
950 }
951 gather.activate();
952 return;
953 }
954
955 journal_state = JOURNAL_NONE;
956 err = 0;
957 dout(10) << __func__ << ": load complete" << dendl;
958out:
959
960 if (err < 0)
961 _reset_states();
962
963 load_done = true;
964 finish_contexts(g_ceph_context, waiting_for_load);
965 waiting_for_load.clear();
966}
967
968void OpenFileTable::load(MDSContext *onload)
969{
970 dout(10) << __func__ << dendl;
971 ceph_assert(!load_done);
972 if (onload)
973 waiting_for_load.push_back(onload);
974
f67539c2 975 _read_omap_values("", 0, true);
11fdf7f2
TL
976}
977
f91f0fd5
TL
978void OpenFileTable::_get_ancestors(const Anchor& parent,
979 vector<inode_backpointer_t>& ancestors,
980 mds_rank_t& auth_hint)
11fdf7f2 981{
f91f0fd5
TL
982 inodeno_t dirino = parent.dirino;
983 std::string_view d_name = parent.d_name;
11fdf7f2
TL
984
985 bool first = true;
986 ancestors.clear();
987 while (true) {
f91f0fd5 988 ancestors.push_back(inode_backpointer_t(dirino, string{d_name}, 0));
11fdf7f2 989
f91f0fd5 990 auto p = loaded_anchor_map.find(dirino);
11fdf7f2
TL
991 if (p == loaded_anchor_map.end())
992 break;
993
994 if (first)
995 auth_hint = p->second.auth;
996
997 dirino = p->second.dirino;
f91f0fd5 998 d_name = p->second.d_name;
11fdf7f2
TL
999 if (dirino == inodeno_t(0))
1000 break;
1001
1002 first = false;
1003 }
11fdf7f2
TL
1004}
1005
1006class C_OFT_OpenInoFinish: public MDSContext {
1007 OpenFileTable *oft;
1008 inodeno_t ino;
1009 MDSRank *get_mds() override { return oft->mds; }
1010public:
1011 C_OFT_OpenInoFinish(OpenFileTable *t, inodeno_t i) : oft(t), ino(i) {}
1012 void finish(int r) override {
1013 oft->_open_ino_finish(ino, r);
1014 }
1015};
1016
1017void OpenFileTable::_open_ino_finish(inodeno_t ino, int r)
1018{
1019 if (prefetch_state == DIR_INODES && r >= 0 && ino != inodeno_t(0)) {
1020 auto p = loaded_anchor_map.find(ino);
1021 ceph_assert(p != loaded_anchor_map.end());
1022 p->second.auth = mds_rank_t(r);
1023 }
1024
1025 if (r != mds->get_nodeid())
1026 mds->mdcache->rejoin_prefetch_ino_finish(ino, r);
1027
1028 num_opening_inodes--;
1029 if (num_opening_inodes == 0) {
1030 if (prefetch_state == DIR_INODES) {
f91f0fd5
TL
1031 if (g_conf().get_val<bool>("mds_oft_prefetch_dirfrags")) {
1032 prefetch_state = DIRFRAGS;
1033 _prefetch_dirfrags();
1034 } else {
1035 prefetch_state = FILE_INODES;
1036 _prefetch_inodes();
1037 }
11fdf7f2
TL
1038 } else if (prefetch_state == FILE_INODES) {
1039 prefetch_state = DONE;
1040 logseg_destroyed_inos.clear();
1041 destroyed_inos_set.clear();
1042 finish_contexts(g_ceph_context, waiting_for_prefetch);
1043 waiting_for_prefetch.clear();
1044 } else {
1045 ceph_abort();
1046 }
1047 }
1048}
1049
1050void OpenFileTable::_prefetch_dirfrags()
1051{
1052 dout(10) << __func__ << dendl;
1053 ceph_assert(prefetch_state == DIRFRAGS);
1054
1055 MDCache *mdcache = mds->mdcache;
f67539c2 1056 std::vector<CDir*> fetch_queue;
11fdf7f2 1057
f67539c2
TL
1058 for (auto& [ino, anchor] : loaded_anchor_map) {
1059 if (anchor.frags.empty())
f91f0fd5 1060 continue;
f67539c2 1061 CInode *diri = mdcache->get_inode(ino);
f91f0fd5
TL
1062 if (!diri)
1063 continue;
2a845540
TL
1064
1065 if (!diri->is_dir()) {
1066 dout(10) << " " << *diri << " is not dir" << dendl;
1067 continue;
1068 }
1069
11fdf7f2
TL
1070 if (diri->state_test(CInode::STATE_REJOINUNDEF))
1071 continue;
1072
f67539c2 1073 for (auto& fg: anchor.frags) {
f91f0fd5
TL
1074 CDir *dir = diri->get_dirfrag(fg);
1075 if (dir) {
1076 if (dir->is_auth() && !dir->is_complete())
f67539c2 1077 fetch_queue.push_back(dir);
f91f0fd5
TL
1078 } else {
1079 frag_vec_t leaves;
1080 diri->dirfragtree.get_leaves_under(fg, leaves);
f91f0fd5
TL
1081 for (auto& leaf : leaves) {
1082 if (diri->is_auth()) {
1083 dir = diri->get_or_open_dirfrag(mdcache, leaf);
1084 } else {
1085 dir = diri->get_dirfrag(leaf);
1086 }
1087 if (dir && dir->is_auth() && !dir->is_complete())
f67539c2 1088 fetch_queue.push_back(dir);
11fdf7f2 1089 }
11fdf7f2
TL
1090 }
1091 }
1092 }
1093
1094 MDSGatherBuilder gather(g_ceph_context);
1095 int num_opening_dirfrags = 0;
9f95a23c 1096 for (const auto& dir : fetch_queue) {
11fdf7f2
TL
1097 if (dir->state_test(CDir::STATE_REJOINUNDEF))
1098 ceph_assert(dir->get_inode()->dirfragtree.is_leaf(dir->get_frag()));
1099 dir->fetch(gather.new_sub());
1100
33c7a0ef 1101 if (!(++num_opening_dirfrags % mds->heartbeat_reset_grace()))
11fdf7f2
TL
1102 mds->heartbeat_reset();
1103 }
1104
1105 auto finish_func = [this](int r) {
1106 prefetch_state = FILE_INODES;
1107 _prefetch_inodes();
1108 };
1109 if (gather.has_subs()) {
1110 gather.set_finisher(
1111 new MDSInternalContextWrapper(mds,
9f95a23c 1112 new LambdaContext(std::move(finish_func))));
11fdf7f2
TL
1113 gather.activate();
1114 } else {
1115 finish_func(0);
1116 }
1117}
1118
1119void OpenFileTable::_prefetch_inodes()
1120{
1121 dout(10) << __func__ << " state " << prefetch_state << dendl;
1122 ceph_assert(!num_opening_inodes);
1123 num_opening_inodes = 1;
1124
1125 int64_t pool;
1126 if (prefetch_state == DIR_INODES)
b3b6e05e 1127 pool = mds->get_metadata_pool();
11fdf7f2
TL
1128 else if (prefetch_state == FILE_INODES)
1129 pool = mds->mdsmap->get_first_data_pool();
1130 else
1131 ceph_abort();
1132
1133 MDCache *mdcache = mds->mdcache;
1134
1135 if (destroyed_inos_set.empty()) {
1136 for (auto& it : logseg_destroyed_inos)
1137 destroyed_inos_set.insert(it.second.begin(), it.second.end());
1138 }
1139
f67539c2
TL
1140 for (auto& [ino, anchor] : loaded_anchor_map) {
1141 if (destroyed_inos_set.count(ino))
11fdf7f2 1142 continue;
f67539c2 1143 if (anchor.d_type == DT_DIR) {
11fdf7f2
TL
1144 if (prefetch_state != DIR_INODES)
1145 continue;
f67539c2
TL
1146 if (MDS_INO_IS_MDSDIR(ino)) {
1147 anchor.auth = MDS_INO_MDSDIR_OWNER(ino);
11fdf7f2
TL
1148 continue;
1149 }
f67539c2
TL
1150 if (MDS_INO_IS_STRAY(ino)) {
1151 anchor.auth = MDS_INO_STRAY_OWNER(ino);
11fdf7f2
TL
1152 continue;
1153 }
1154 } else {
1155 if (prefetch_state != FILE_INODES)
1156 continue;
1157 // load all file inodes for MDCache::identify_files_to_recover()
1158 }
f67539c2 1159 CInode *in = mdcache->get_inode(ino);
11fdf7f2
TL
1160 if (in)
1161 continue;
1162
1163 num_opening_inodes++;
f91f0fd5 1164
f67539c2
TL
1165 auto fin = new C_OFT_OpenInoFinish(this, ino);
1166 if (anchor.dirino != inodeno_t(0)) {
f91f0fd5
TL
1167 vector<inode_backpointer_t> ancestors;
1168 mds_rank_t auth_hint = MDS_RANK_NONE;
f67539c2
TL
1169 _get_ancestors(anchor, ancestors, auth_hint);
1170 mdcache->open_ino(ino, pool, fin, false, false, &ancestors, auth_hint);
f91f0fd5 1171 } else {
f67539c2 1172 mdcache->open_ino(ino, pool, fin, false);
f91f0fd5 1173 }
11fdf7f2 1174
33c7a0ef 1175 if (!(num_opening_inodes % mds->heartbeat_reset_grace()))
11fdf7f2
TL
1176 mds->heartbeat_reset();
1177 }
1178
1179 _open_ino_finish(inodeno_t(0), 0);
1180}
1181
1182bool OpenFileTable::prefetch_inodes()
1183{
1184 dout(10) << __func__ << dendl;
1185 ceph_assert(!prefetch_state);
1186 prefetch_state = DIR_INODES;
1187
1188 if (!load_done) {
1189 wait_for_load(
1190 new MDSInternalContextWrapper(mds,
9f95a23c 1191 new LambdaContext([this](int r) {
11fdf7f2
TL
1192 _prefetch_inodes();
1193 })
1194 )
1195 );
1196 return true;
1197 }
1198
1199 _prefetch_inodes();
1200 return !is_prefetched();
1201}
1202
1203bool OpenFileTable::should_log_open(CInode *in)
1204{
1205 if (in->state_test(CInode::STATE_TRACKEDBYOFT)) {
1206 // inode just journaled
1207 if (in->last_journaled >= committing_log_seq)
1208 return false;
1209 // item not dirty. it means the item has already been saved
1210 auto p = dirty_items.find(in->ino());
1211 if (p == dirty_items.end())
1212 return false;
1213 }
1214 return true;
1215}
1216
1217void OpenFileTable::note_destroyed_inos(uint64_t seq, const vector<inodeno_t>& inos)
1218{
1219 auto& vec = logseg_destroyed_inos[seq];
1220 vec.insert(vec.end(), inos.begin(), inos.end());
1221}
1222
1223void OpenFileTable::trim_destroyed_inos(uint64_t seq)
1224{
1225 auto p = logseg_destroyed_inos.begin();
1226 while (p != logseg_destroyed_inos.end()) {
1227 if (p->first >= seq)
1228 break;
1229 logseg_destroyed_inos.erase(p++);
1230 }
1231}