]> git.proxmox.com Git - ceph.git/blame - ceph/src/mds/OpenFileTable.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / mds / OpenFileTable.cc
CommitLineData
11fdf7f2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2018 Red Hat
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include "acconfig.h"
16#include "mds/CInode.h"
17#include "mds/CDir.h"
18#include "mds/MDSRank.h"
19#include "mds/MDCache.h"
20#include "osdc/Objecter.h"
21#include "OpenFileTable.h"
22
23#include "common/config.h"
24#include "common/errno.h"
25
9f95a23c
TL
26enum {
27 l_oft_first = 1000000,
28 l_oft_omap_total_objs,
29 l_oft_omap_total_kv_pairs,
30 l_oft_omap_total_updates,
31 l_oft_omap_total_removes,
32 l_oft_last
33};
34
11fdf7f2
TL
35#define dout_context g_ceph_context
36#define dout_subsys ceph_subsys_mds
37#undef dout_prefix
38#define dout_prefix _prefix(_dout, mds)
20effc67
TL
39
40using namespace std;
41
42static std::ostream& _prefix(std::ostream *_dout, MDSRank *mds) {
11fdf7f2
TL
43 return *_dout << "mds." << mds->get_nodeid() << ".openfiles ";
44}
45
9f95a23c
TL
46OpenFileTable::OpenFileTable(MDSRank *m) : mds(m) {
47 PerfCountersBuilder b(mds->cct, "oft", l_oft_first, l_oft_last);
48
49 b.add_u64(l_oft_omap_total_objs, "omap_total_objs");
50 b.add_u64(l_oft_omap_total_kv_pairs, "omap_total_kv_pairs");
51 b.add_u64(l_oft_omap_total_updates, "omap_total_updates");
52 b.add_u64(l_oft_omap_total_removes, "omap_total_removes");
53 logger.reset(b.create_perf_counters());
54 mds->cct->get_perfcounters_collection()->add(logger.get());
55 logger->set(l_oft_omap_total_objs, 0);
56 logger->set(l_oft_omap_total_kv_pairs, 0);
57 logger->set(l_oft_omap_total_updates, 0);
58 logger->set(l_oft_omap_total_removes, 0);
59}
60
61OpenFileTable::~OpenFileTable() {
62 if (logger) {
63 mds->cct->get_perfcounters_collection()->remove(logger.get());
64 }
65}
66
f91f0fd5 67void OpenFileTable::get_ref(CInode *in, frag_t fg)
11fdf7f2
TL
68{
69 do {
70 auto p = anchor_map.find(in->ino());
f91f0fd5
TL
71 if (!in->is_dir()) {
72 ceph_assert(fg == -1U);
73 ceph_assert(p == anchor_map.end());
74 }
75
11fdf7f2
TL
76 if (p != anchor_map.end()) {
77 ceph_assert(in->state_test(CInode::STATE_TRACKEDBYOFT));
78 ceph_assert(p->second.nref > 0);
79 p->second.nref++;
f91f0fd5
TL
80
81 if (fg != -1U) {
82 auto ret = p->second.frags.insert(fg);
83 ceph_assert(ret.second);
84 dirty_items.emplace(in->ino(), (int)DIRTY_UNDEF);
85 }
11fdf7f2
TL
86 break;
87 }
88
89 CDentry *dn = in->get_parent_dn();
90 CInode *pin = dn ? dn->get_dir()->get_inode() : nullptr;
91
92 auto ret = anchor_map.emplace(std::piecewise_construct, std::forward_as_tuple(in->ino()),
93 std::forward_as_tuple(in->ino(), (pin ? pin->ino() : inodeno_t(0)),
94 (dn ? dn->get_name() : string()), in->d_type(), 1));
95 ceph_assert(ret.second == true);
96 in->state_set(CInode::STATE_TRACKEDBYOFT);
97
f91f0fd5
TL
98 if (fg != -1U)
99 ret.first->second.frags.insert(fg);
100
11fdf7f2
TL
101 auto ret1 = dirty_items.emplace(in->ino(), (int)DIRTY_NEW);
102 if (!ret1.second) {
103 int omap_idx = ret1.first->second;
104 ceph_assert(omap_idx >= 0);
105 ret.first->second.omap_idx = omap_idx;
106 }
107
108 in = pin;
f91f0fd5 109 fg = -1U;
11fdf7f2
TL
110 } while (in);
111}
112
f91f0fd5 113void OpenFileTable::put_ref(CInode *in, frag_t fg)
11fdf7f2
TL
114{
115 do {
116 ceph_assert(in->state_test(CInode::STATE_TRACKEDBYOFT));
117 auto p = anchor_map.find(in->ino());
118 ceph_assert(p != anchor_map.end());
119 ceph_assert(p->second.nref > 0);
120
f91f0fd5
TL
121 if (!in->is_dir()) {
122 ceph_assert(fg == -1U);
123 ceph_assert(p->second.nref == 1);
124 }
125
11fdf7f2
TL
126 if (p->second.nref > 1) {
127 p->second.nref--;
f91f0fd5
TL
128 if (fg != -1U) {
129 auto ret = p->second.frags.erase(fg);
130 ceph_assert(ret);
131 dirty_items.emplace(in->ino(), (int)DIRTY_UNDEF);
132 }
11fdf7f2
TL
133 break;
134 }
135
136 CDentry *dn = in->get_parent_dn();
137 CInode *pin = dn ? dn->get_dir()->get_inode() : nullptr;
138 if (dn) {
139 ceph_assert(p->second.dirino == pin->ino());
140 ceph_assert(p->second.d_name == dn->get_name());
141 } else {
142 ceph_assert(p->second.dirino == inodeno_t(0));
143 ceph_assert(p->second.d_name == "");
144 }
145
f91f0fd5
TL
146 if (fg != -1U) {
147 ceph_assert(p->second.frags.size() == 1);
148 ceph_assert(*p->second.frags.begin() == fg);
149 }
150
11fdf7f2
TL
151 int omap_idx = p->second.omap_idx;
152 anchor_map.erase(p);
153 in->state_clear(CInode::STATE_TRACKEDBYOFT);
154
155 auto ret = dirty_items.emplace(in->ino(), omap_idx);
156 if (!ret.second) {
157 if (ret.first->second == DIRTY_NEW) {
158 ceph_assert(omap_idx < 0);
159 dirty_items.erase(ret.first);
160 } else {
161 ceph_assert(omap_idx >= 0);
162 ret.first->second = omap_idx;
163 }
164 }
165
166 in = pin;
f91f0fd5 167 fg = -1U;
11fdf7f2
TL
168 } while (in);
169}
170
171void OpenFileTable::add_inode(CInode *in)
172{
173 dout(10) << __func__ << " " << *in << dendl;
11fdf7f2
TL
174 get_ref(in);
175}
176
177void OpenFileTable::remove_inode(CInode *in)
178{
179 dout(10) << __func__ << " " << *in << dendl;
11fdf7f2
TL
180 put_ref(in);
181}
182
183void OpenFileTable::add_dirfrag(CDir *dir)
184{
185 dout(10) << __func__ << " " << *dir << dendl;
186 ceph_assert(!dir->state_test(CDir::STATE_TRACKEDBYOFT));
187 dir->state_set(CDir::STATE_TRACKEDBYOFT);
f91f0fd5 188 get_ref(dir->get_inode(), dir->get_frag());
11fdf7f2
TL
189}
190
191void OpenFileTable::remove_dirfrag(CDir *dir)
192{
193 dout(10) << __func__ << " " << *dir << dendl;
194 ceph_assert(dir->state_test(CDir::STATE_TRACKEDBYOFT));
195 dir->state_clear(CDir::STATE_TRACKEDBYOFT);
f91f0fd5 196 put_ref(dir->get_inode(), dir->get_frag());
11fdf7f2
TL
197}
198
199void OpenFileTable::notify_link(CInode *in)
200{
201 dout(10) << __func__ << " " << *in << dendl;
202 auto p = anchor_map.find(in->ino());
203 ceph_assert(p != anchor_map.end());
204 ceph_assert(p->second.nref > 0);
205 ceph_assert(p->second.dirino == inodeno_t(0));
206 ceph_assert(p->second.d_name == "");
207
208 CDentry *dn = in->get_parent_dn();
209 CInode *pin = dn->get_dir()->get_inode();
210
211 p->second.dirino = pin->ino();
212 p->second.d_name = dn->get_name();
213 dirty_items.emplace(in->ino(), (int)DIRTY_UNDEF);
214
215 get_ref(pin);
216}
217
218void OpenFileTable::notify_unlink(CInode *in)
219{
220 dout(10) << __func__ << " " << *in << dendl;
221 auto p = anchor_map.find(in->ino());
222 ceph_assert(p != anchor_map.end());
223 ceph_assert(p->second.nref > 0);
224
225 CDentry *dn = in->get_parent_dn();
226 CInode *pin = dn->get_dir()->get_inode();
227 ceph_assert(p->second.dirino == pin->ino());
228 ceph_assert(p->second.d_name == dn->get_name());
229
230 p->second.dirino = inodeno_t(0);
231 p->second.d_name = "";
232 dirty_items.emplace(in->ino(), (int)DIRTY_UNDEF);
233
234 put_ref(pin);
235}
236
237object_t OpenFileTable::get_object_name(unsigned idx) const
238{
239 char s[30];
240 snprintf(s, sizeof(s), "mds%d_openfiles.%x", int(mds->get_nodeid()), idx);
241 return object_t(s);
242}
243
244void OpenFileTable::_encode_header(bufferlist &bl, int j_state)
245{
246 std::string_view magic = CEPH_FS_ONDISK_MAGIC;
247 encode(magic, bl);
248 ENCODE_START(1, 1, bl);
249 encode(omap_version, bl);
250 encode(omap_num_objs, bl);
251 encode((__u8)j_state, bl);
252 ENCODE_FINISH(bl);
253}
254
255class C_IO_OFT_Save : public MDSIOContextBase {
256protected:
257 OpenFileTable *oft;
258 uint64_t log_seq;
259 MDSContext *fin;
260 MDSRank *get_mds() override { return oft->mds; }
261public:
262 C_IO_OFT_Save(OpenFileTable *t, uint64_t s, MDSContext *c) :
263 oft(t), log_seq(s), fin(c) {}
264 void finish(int r) {
265 oft->_commit_finish(r, log_seq, fin);
266 }
267 void print(ostream& out) const override {
268 out << "openfiles_save";
269 }
270};
271
272void OpenFileTable::_commit_finish(int r, uint64_t log_seq, MDSContext *fin)
273{
274 dout(10) << __func__ << " log_seq " << log_seq << dendl;
275 if (r < 0) {
276 mds->handle_write_error(r);
277 return;
278 }
279
280 ceph_assert(log_seq <= committing_log_seq);
281 ceph_assert(log_seq >= committed_log_seq);
282 committed_log_seq = log_seq;
283 num_pending_commit--;
284
285 if (fin)
286 fin->complete(r);
287}
288
289class C_IO_OFT_Journal : public MDSIOContextBase {
290protected:
291 OpenFileTable *oft;
292 uint64_t log_seq;
293 MDSContext *fin;
294 std::map<unsigned, std::vector<ObjectOperation> > ops_map;
295 MDSRank *get_mds() override { return oft->mds; }
296public:
297 C_IO_OFT_Journal(OpenFileTable *t, uint64_t s, MDSContext *c,
298 std::map<unsigned, std::vector<ObjectOperation> >& ops) :
299 oft(t), log_seq(s), fin(c) {
300 ops_map.swap(ops);
301 }
302 void finish(int r) {
303 oft->_journal_finish(r, log_seq, fin, ops_map);
304 }
305 void print(ostream& out) const override {
306 out << "openfiles_journal";
307 }
308};
309
310void OpenFileTable::_journal_finish(int r, uint64_t log_seq, MDSContext *c,
311 std::map<unsigned, std::vector<ObjectOperation> >& ops_map)
312{
313 dout(10) << __func__ << " log_seq " << log_seq << dendl;
314 if (r < 0) {
315 mds->handle_write_error(r);
316 return;
317 }
318
319 C_GatherBuilder gather(g_ceph_context,
320 new C_OnFinisher(new C_IO_OFT_Save(this, log_seq, c),
321 mds->finisher));
322 SnapContext snapc;
b3b6e05e 323 object_locator_t oloc(mds->get_metadata_pool());
f67539c2
TL
324 for (auto& [idx, vops] : ops_map) {
325 object_t oid = get_object_name(idx);
326 for (auto& op : vops) {
11fdf7f2
TL
327 mds->objecter->mutate(oid, oloc, op, snapc, ceph::real_clock::now(),
328 0, gather.new_sub());
329 }
330 }
331 gather.activate();
332
333 journal_state = JOURNAL_NONE;
334 return;
335}
336
337void OpenFileTable::commit(MDSContext *c, uint64_t log_seq, int op_prio)
338{
339 dout(10) << __func__ << " log_seq " << log_seq << dendl;
340
341 ceph_assert(num_pending_commit == 0);
342 num_pending_commit++;
343 ceph_assert(log_seq >= committing_log_seq);
344 committing_log_seq = log_seq;
345
346 omap_version++;
347
348 C_GatherBuilder gather(g_ceph_context);
349
350 SnapContext snapc;
b3b6e05e 351 object_locator_t oloc(mds->get_metadata_pool());
11fdf7f2
TL
352
353 const unsigned max_write_size = mds->mdcache->max_dir_commit_size;
354
355 struct omap_update_ctl {
356 unsigned write_size = 0;
357 unsigned journal_idx = 0;
358 bool clear = false;
359 std::map<string, bufferlist> to_update, journaled_update;
360 std::set<string> to_remove, journaled_remove;
361 };
362 std::vector<omap_update_ctl> omap_updates(omap_num_objs);
363
364 using ceph::encode;
365 auto journal_func = [&](unsigned idx) {
366 auto& ctl = omap_updates.at(idx);
367
368 ObjectOperation op;
369 op.priority = op_prio;
370
371 if (ctl.clear) {
372 ctl.clear = false;
373 op.omap_clear();
374 op.set_last_op_flags(CEPH_OSD_OP_FLAG_FAILOK);
375 }
376
377 if (ctl.journal_idx == 0) {
378 if (journal_state == JOURNAL_NONE)
379 journal_state = JOURNAL_START;
380 else
381 ceph_assert(journal_state == JOURNAL_START);
382
383 bufferlist header;
384 _encode_header(header, journal_state);
385 op.omap_set_header(header);
386 }
387
388 bufferlist bl;
389 encode(omap_version, bl);
390 encode(ctl.to_update, bl);
391 encode(ctl.to_remove, bl);
392
393 char key[32];
394 snprintf(key, sizeof(key), "_journal.%x", ctl.journal_idx++);
395 std::map<string, bufferlist> tmp_map;
396 tmp_map[key].swap(bl);
397 op.omap_set(tmp_map);
398
399 object_t oid = get_object_name(idx);
400 mds->objecter->mutate(oid, oloc, op, snapc, ceph::real_clock::now(), 0,
401 gather.new_sub());
402
403#ifdef HAVE_STDLIB_MAP_SPLICING
404 ctl.journaled_update.merge(ctl.to_update);
405 ctl.journaled_remove.merge(ctl.to_remove);
406#else
407 ctl.journaled_update.insert(make_move_iterator(begin(ctl.to_update)),
408 make_move_iterator(end(ctl.to_update)));
409 ctl.journaled_remove.insert(make_move_iterator(begin(ctl.to_remove)),
410 make_move_iterator(end(ctl.to_remove)));
411#endif
412 ctl.to_update.clear();
413 ctl.to_remove.clear();
414 };
415
416 std::map<unsigned, std::vector<ObjectOperation> > ops_map;
417
418 auto create_op_func = [&](unsigned idx, bool update_header) {
419 auto& ctl = omap_updates.at(idx);
420
421 auto& op_vec = ops_map[idx];
422 op_vec.resize(op_vec.size() + 1);
423 ObjectOperation& op = op_vec.back();
424 op.priority = op_prio;
425
426 if (ctl.clear) {
427 ctl.clear = false;
428 op.omap_clear();
429 op.set_last_op_flags(CEPH_OSD_OP_FLAG_FAILOK);
430 }
431
432 if (update_header) {
433 bufferlist header;
434 _encode_header(header, journal_state);
435 op.omap_set_header(header);
436 }
437
438 if (!ctl.to_update.empty()) {
439 op.omap_set(ctl.to_update);
440 ctl.to_update.clear();
441 }
442 if (!ctl.to_remove.empty()) {
443 op.omap_rm_keys(ctl.to_remove);
444 ctl.to_remove.clear();
445 }
446 };
447
448 auto submit_ops_func = [&]() {
449 gather.set_finisher(new C_OnFinisher(new C_IO_OFT_Save(this, log_seq, c),
450 mds->finisher));
f67539c2
TL
451 for (auto& [idx, vops] : ops_map) {
452 object_t oid = get_object_name(idx);
453 for (auto& op : vops) {
11fdf7f2
TL
454 mds->objecter->mutate(oid, oloc, op, snapc, ceph::real_clock::now(),
455 0, gather.new_sub());
456 }
457 }
458 gather.activate();
459 };
460
461 bool first_commit = !loaded_anchor_map.empty();
462
463 unsigned first_free_idx = 0;
464 unsigned old_num_objs = omap_num_objs;
465 if (omap_num_objs == 0) {
466 omap_num_objs = 1;
467 omap_num_items.resize(omap_num_objs);
468 omap_updates.resize(omap_num_objs);
469 omap_updates.back().clear = true;
470 }
471
f67539c2
TL
472 for (auto& [ino, state] : dirty_items) {
473 auto p = anchor_map.find(ino);
11fdf7f2
TL
474
475 if (first_commit) {
f67539c2 476 auto q = loaded_anchor_map.find(ino);
11fdf7f2
TL
477 if (q != loaded_anchor_map.end()) {
478 ceph_assert(p != anchor_map.end());
479 p->second.omap_idx = q->second.omap_idx;
f91f0fd5 480 bool same = (p->second == q->second);
11fdf7f2
TL
481 loaded_anchor_map.erase(q);
482 if (same)
483 continue;
484 }
485 }
486
487 char key[32];
f67539c2 488 int len = snprintf(key, sizeof(key), "%llx", (unsigned long long)ino.val);
11fdf7f2
TL
489
490 int omap_idx;
491 if (p != anchor_map.end()) {
492 omap_idx = p->second.omap_idx;
493 if (omap_idx < 0) {
f67539c2 494 ceph_assert(state == DIRTY_NEW);
11fdf7f2
TL
495 // find omap object to store the key
496 for (unsigned i = first_free_idx; i < omap_num_objs; i++) {
92f5a8d4 497 if (omap_num_items[i] < MAX_ITEMS_PER_OBJ) {
11fdf7f2 498 omap_idx = i;
92f5a8d4
TL
499 break;
500 }
11fdf7f2
TL
501 }
502 if (omap_idx < 0) {
503 ++omap_num_objs;
504 ceph_assert(omap_num_objs <= MAX_OBJECTS);
505 omap_num_items.resize(omap_num_objs);
506 omap_updates.resize(omap_num_objs);
507 omap_updates.back().clear = true;
508 omap_idx = omap_num_objs - 1;
509 }
510 first_free_idx = omap_idx;
511
512 p->second.omap_idx = omap_idx;
513 ++omap_num_items[omap_idx];
514 }
515 } else {
f67539c2 516 omap_idx = state;
11fdf7f2
TL
517 unsigned& count = omap_num_items.at(omap_idx);
518 ceph_assert(count > 0);
519 --count;
520 if ((unsigned)omap_idx < first_free_idx && count < MAX_ITEMS_PER_OBJ)
521 first_free_idx = omap_idx;
522 }
523 auto& ctl = omap_updates.at(omap_idx);
f67539c2
TL
524 if (ctl.write_size >= max_write_size) {
525 journal_func(omap_idx);
526 ctl.write_size = 0;
527 }
11fdf7f2
TL
528 if (p != anchor_map.end()) {
529 bufferlist bl;
530 encode(p->second, bl);
f91f0fd5 531 encode((__u32)0, bl); // frags set was encoded here
11fdf7f2
TL
532
533 ctl.write_size += bl.length() + len + 2 * sizeof(__u32);
534 ctl.to_update[key].swap(bl);
535 } else {
536 ctl.write_size += len + sizeof(__u32);
537 ctl.to_remove.emplace(key);
538 }
11fdf7f2
TL
539 }
540
541 dirty_items.clear();
542
543 if (first_commit) {
f67539c2 544 for (auto& [ino, anchor] : loaded_anchor_map) {
11fdf7f2 545 char key[32];
f67539c2 546 int len = snprintf(key, sizeof(key), "%llx", (unsigned long long)ino.val);
11fdf7f2 547
f67539c2 548 int omap_idx = anchor.omap_idx;
11fdf7f2
TL
549 unsigned& count = omap_num_items.at(omap_idx);
550 ceph_assert(count > 0);
551 --count;
552
553 auto& ctl = omap_updates.at(omap_idx);
11fdf7f2 554 if (ctl.write_size >= max_write_size) {
f67539c2
TL
555 journal_func(omap_idx);
556 ctl.write_size = 0;
11fdf7f2 557 }
f67539c2
TL
558 ctl.write_size += len + sizeof(__u32);
559 ctl.to_remove.emplace(key);
11fdf7f2
TL
560 }
561 loaded_anchor_map.clear();
11fdf7f2
TL
562 }
563
9f95a23c 564 size_t total_items = 0;
11fdf7f2 565 {
11fdf7f2 566 unsigned used_objs = 1;
9f95a23c 567 std::vector<unsigned> objs_to_write;
11fdf7f2
TL
568 bool journaled = false;
569 for (unsigned i = 0; i < omap_num_objs; i++) {
570 total_items += omap_num_items[i];
571 if (omap_updates[i].journal_idx)
572 journaled = true;
573 else if (omap_updates[i].write_size)
574 objs_to_write.push_back(i);
575
576 if (omap_num_items[i] > 0)
577 used_objs = i + 1;
578 }
579 ceph_assert(total_items == anchor_map.size());
580 // adjust omap object count
581 if (used_objs < omap_num_objs) {
582 omap_num_objs = used_objs;
583 omap_num_items.resize(omap_num_objs);
584 }
585 // skip journal if only one osd request is required and object count
586 // does not change.
587 if (!journaled && old_num_objs == omap_num_objs &&
588 objs_to_write.size() <= 1) {
589 ceph_assert(journal_state == JOURNAL_NONE);
590 ceph_assert(!gather.has_subs());
591
592 unsigned omap_idx = objs_to_write.empty() ? 0 : objs_to_write.front();
593 create_op_func(omap_idx, true);
594 submit_ops_func();
595 return;
596 }
597 }
598
599 for (unsigned omap_idx = 0; omap_idx < omap_updates.size(); omap_idx++) {
600 auto& ctl = omap_updates[omap_idx];
601 if (ctl.write_size > 0) {
602 journal_func(omap_idx);
603 ctl.write_size = 0;
604 }
605 }
606
607 if (journal_state == JOURNAL_START) {
608 ceph_assert(gather.has_subs());
609 journal_state = JOURNAL_FINISH;
610 } else {
611 // only object count changes
612 ceph_assert(journal_state == JOURNAL_NONE);
613 ceph_assert(!gather.has_subs());
614 }
615
9f95a23c
TL
616 uint64_t total_updates = 0;
617 uint64_t total_removes = 0;
618
11fdf7f2
TL
619 for (unsigned omap_idx = 0; omap_idx < omap_updates.size(); omap_idx++) {
620 auto& ctl = omap_updates[omap_idx];
621 ceph_assert(ctl.to_update.empty() && ctl.to_remove.empty());
622 if (ctl.journal_idx == 0)
623 ceph_assert(ctl.journaled_update.empty() && ctl.journaled_remove.empty());
624
625 bool first = true;
626 for (auto& it : ctl.journaled_update) {
11fdf7f2 627 if (ctl.write_size >= max_write_size) {
f67539c2
TL
628 create_op_func(omap_idx, first);
629 ctl.write_size = 0;
630 first = false;
11fdf7f2 631 }
f67539c2
TL
632 ctl.write_size += it.first.length() + it.second.length() + 2 * sizeof(__u32);
633 ctl.to_update[it.first].swap(it.second);
9f95a23c 634 total_updates++;
11fdf7f2
TL
635 }
636
637 for (auto& key : ctl.journaled_remove) {
11fdf7f2 638 if (ctl.write_size >= max_write_size) {
f67539c2
TL
639 create_op_func(omap_idx, first);
640 ctl.write_size = 0;
641 first = false;
11fdf7f2 642 }
f67539c2
TL
643
644 ctl.write_size += key.length() + sizeof(__u32);
645 ctl.to_remove.emplace(key);
9f95a23c 646 total_removes++;
11fdf7f2
TL
647 }
648
649 for (unsigned i = 0; i < ctl.journal_idx; ++i) {
650 char key[32];
651 snprintf(key, sizeof(key), "_journal.%x", i);
652 ctl.to_remove.emplace(key);
653 }
654
655 // update first object's omap header if object count changes
656 if (ctl.clear ||
657 ctl.journal_idx > 0 ||
658 (omap_idx == 0 && old_num_objs != omap_num_objs))
659 create_op_func(omap_idx, first);
660 }
661
662 ceph_assert(!ops_map.empty());
663 if (journal_state == JOURNAL_FINISH) {
664 gather.set_finisher(new C_OnFinisher(new C_IO_OFT_Journal(this, log_seq, c, ops_map),
665 mds->finisher));
666 gather.activate();
667 } else {
668 submit_ops_func();
669 }
9f95a23c
TL
670 logger->set(l_oft_omap_total_objs, omap_num_objs);
671 logger->set(l_oft_omap_total_kv_pairs, total_items);
672 logger->inc(l_oft_omap_total_updates, total_updates);
673 logger->inc(l_oft_omap_total_removes, total_removes);
11fdf7f2
TL
674}
675
676class C_IO_OFT_Load : public MDSIOContextBase {
677protected:
678 OpenFileTable *oft;
679 MDSRank *get_mds() override { return oft->mds; }
680
681public:
682 int header_r = 0; //< Return value from OMAP header read
683 int values_r = 0; //< Return value from OMAP value read
684 bufferlist header_bl;
685 std::map<std::string, bufferlist> values;
686 unsigned index;
687 bool first;
688 bool more = false;
689
690 C_IO_OFT_Load(OpenFileTable *t, unsigned i, bool f) :
691 oft(t), index(i), first(f) {}
692 void finish(int r) override {
693 oft->_load_finish(r, header_r, values_r, index, first, more, header_bl, values);
694 }
695 void print(ostream& out) const override {
696 out << "openfiles_load";
697 }
698};
699
700class C_IO_OFT_Recover : public MDSIOContextBase {
701protected:
702 OpenFileTable *oft;
703 MDSRank *get_mds() override { return oft->mds; }
704public:
705 C_IO_OFT_Recover(OpenFileTable *t) : oft(t) {}
706 void finish(int r) override {
707 oft->_recover_finish(r);
708 }
709 void print(ostream& out) const override {
710 out << "openfiles_recover";
711 }
712};
713
714void OpenFileTable::_recover_finish(int r)
715{
716 if (r < 0) {
717 derr << __func__ << " got " << cpp_strerror(r) << dendl;
718 _reset_states();
719 } else {
720 dout(10) << __func__ << ": load complete" << dendl;
721 }
722
723 journal_state = JOURNAL_NONE;
724 load_done = true;
725 finish_contexts(g_ceph_context, waiting_for_load);
726 waiting_for_load.clear();
727}
728
f67539c2
TL
729void OpenFileTable::_read_omap_values(const std::string& key, unsigned idx,
730 bool first)
731{
732 object_t oid = get_object_name(idx);
733 dout(10) << __func__ << ": load from '" << oid << ":" << key << "'" << dendl;
b3b6e05e 734 object_locator_t oloc(mds->get_metadata_pool());
f67539c2
TL
735 C_IO_OFT_Load *c = new C_IO_OFT_Load(this, idx, first);
736 ObjectOperation op;
737 if (first)
738 op.omap_get_header(&c->header_bl, &c->header_r);
739 op.omap_get_vals(key, "", uint64_t(-1),
740 &c->values, &c->more, &c->values_r);
741 mds->objecter->read(oid, oloc, op, CEPH_NOSNAP, nullptr, 0,
742 new C_OnFinisher(c, mds->finisher));
743}
744
11fdf7f2
TL
745void OpenFileTable::_load_finish(int op_r, int header_r, int values_r,
746 unsigned idx, bool first, bool more,
747 bufferlist &header_bl,
748 std::map<std::string, bufferlist> &values)
749{
750 using ceph::decode;
f67539c2 751 int err = -CEPHFS_EINVAL;
11fdf7f2
TL
752
753 auto decode_func = [this](unsigned idx, inodeno_t ino, bufferlist &bl) {
754 auto p = bl.cbegin();
755
756 size_t count = loaded_anchor_map.size();
757 auto it = loaded_anchor_map.emplace_hint(loaded_anchor_map.end(),
758 std::piecewise_construct,
759 std::make_tuple(ino),
760 std::make_tuple());
761 RecoveredAnchor& anchor = it->second;
762 decode(anchor, p);
f91f0fd5
TL
763 frag_vec_t frags; // unused
764 decode(frags, p);
11fdf7f2
TL
765 ceph_assert(ino == anchor.ino);
766 anchor.omap_idx = idx;
767 anchor.auth = MDS_RANK_NONE;
768
11fdf7f2
TL
769
770 if (loaded_anchor_map.size() > count)
771 ++omap_num_items[idx];
772 };
773
774 if (op_r < 0) {
775 derr << __func__ << " got " << cpp_strerror(op_r) << dendl;
776 err = op_r;
777 goto out;
778 }
779
780 try {
781 if (first) {
782 auto p = header_bl.cbegin();
783
784 string magic;
785 version_t version;
786 unsigned num_objs;
787 __u8 jstate;
788
789 if (header_bl.length() == 13) {
790 // obsolete format.
791 decode(version, p);
792 decode(num_objs, p);
793 decode(jstate, p);
794 } else {
795 decode(magic, p);
796 if (magic != CEPH_FS_ONDISK_MAGIC) {
f67539c2
TL
797 CachedStackStringStream css;
798 *css << "invalid magic '" << magic << "'";
799 throw buffer::malformed_input(css->str());
11fdf7f2
TL
800 }
801
802 DECODE_START(1, p);
803 decode(version, p);
804 decode(num_objs, p);
805 decode(jstate, p);
806 DECODE_FINISH(p);
807 }
808
809 if (num_objs > MAX_OBJECTS) {
f67539c2
TL
810 CachedStackStringStream css;
811 *css << "invalid object count '" << num_objs << "'";
812 throw buffer::malformed_input(css->str());
11fdf7f2
TL
813 }
814 if (jstate > JOURNAL_FINISH) {
f67539c2
TL
815 CachedStackStringStream css;
816 *css << "invalid journal state '" << jstate << "'";
817 throw buffer::malformed_input(css->str());
11fdf7f2
TL
818 }
819
820 if (version > omap_version) {
821 omap_version = version;
822 omap_num_objs = num_objs;
823 omap_num_items.resize(omap_num_objs);
824 journal_state = jstate;
825 } else if (version == omap_version) {
826 ceph_assert(omap_num_objs == num_objs);
827 if (jstate > journal_state)
828 journal_state = jstate;
829 }
830 }
831
832 for (auto& it : values) {
833 if (it.first.compare(0, 9, "_journal.") == 0) {
834 if (idx >= loaded_journals.size())
835 loaded_journals.resize(idx + 1);
836
837 if (journal_state == JOURNAL_FINISH) {
838 loaded_journals[idx][it.first].swap(it.second);
839 } else { // incomplete journal
840 loaded_journals[idx][it.first].length();
841 }
842 continue;
843 }
844
845 inodeno_t ino;
846 sscanf(it.first.c_str(), "%llx", (unsigned long long*)&ino.val);
847 decode_func(idx, ino, it.second);
848 }
849 } catch (buffer::error &e) {
850 derr << __func__ << ": corrupted header/values: " << e.what() << dendl;
851 goto out;
852 }
853
854 if (more || idx + 1 < omap_num_objs) {
855 // Issue another read if we're not at the end of the omap
856 std::string last_key;
857 if (more)
858 last_key = values.rbegin()->first;
859 else
860 idx++;
f67539c2
TL
861
862 _read_omap_values(last_key, idx, !more);
11fdf7f2
TL
863 return;
864 }
865
866 // replay journal
867 if (loaded_journals.size() > 0) {
868 dout(10) << __func__ << ": recover journal" << dendl;
869
870 C_GatherBuilder gather(g_ceph_context,
871 new C_OnFinisher(new C_IO_OFT_Recover(this),
872 mds->finisher));
b3b6e05e 873 object_locator_t oloc(mds->get_metadata_pool());
11fdf7f2
TL
874 SnapContext snapc;
875
876 for (unsigned omap_idx = 0; omap_idx < loaded_journals.size(); omap_idx++) {
877 auto& loaded_journal = loaded_journals[omap_idx];
878
879 std::vector<ObjectOperation> op_vec;
880 try {
881 for (auto& it : loaded_journal) {
882 if (journal_state != JOURNAL_FINISH)
883 continue;
884 auto p = it.second.cbegin();
885 version_t version;
886 std::map<string, bufferlist> to_update;
887 std::set<string> to_remove;
888 decode(version, p);
889 if (version != omap_version)
890 continue;
891 decode(to_update, p);
892 decode(to_remove, p);
893 it.second.clear();
894
895 for (auto& q : to_update) {
896 inodeno_t ino;
897 sscanf(q.first.c_str(), "%llx", (unsigned long long*)&ino.val);
898 decode_func(omap_idx, ino, q.second);
899 }
900 for (auto& q : to_remove) {
901 inodeno_t ino;
902 sscanf(q.c_str(), "%llx",(unsigned long long*)&ino.val);
903 ceph_assert(ino.val > 0);
904 if (loaded_anchor_map.erase(ino)) {
905 unsigned& count = omap_num_items[omap_idx];
906 ceph_assert(count > 0);
907 --count;
908 }
11fdf7f2
TL
909 }
910
911 op_vec.resize(op_vec.size() + 1);
912 ObjectOperation& op = op_vec.back();
913 op.priority = CEPH_MSG_PRIO_HIGH;
914 if (!to_update.empty())
915 op.omap_set(to_update);
916 if (!to_remove.empty())
917 op.omap_rm_keys(to_remove);
918 }
919 } catch (buffer::error &e) {
920 derr << __func__ << ": corrupted journal: " << e.what() << dendl;
921 goto out;
922 }
923
924 op_vec.resize(op_vec.size() + 1);
925 ObjectOperation& op = op_vec.back();
926 {
927 bufferlist header;
928 if (journal_state == JOURNAL_FINISH)
929 _encode_header(header, JOURNAL_FINISH);
930 else
931 _encode_header(header, JOURNAL_NONE);
932 op.omap_set_header(header);
933 }
934 {
935 // remove journal
936 std::set<string> to_remove;
937 for (auto &it : loaded_journal)
938 to_remove.emplace(it.first);
939 op.omap_rm_keys(to_remove);
940 }
941 loaded_journal.clear();
942
943 object_t oid = get_object_name(omap_idx);
944 for (auto& op : op_vec) {
945 mds->objecter->mutate(oid, oloc, op, snapc, ceph::real_clock::now(),
946 0, gather.new_sub());
947 }
948 }
949 gather.activate();
950 return;
951 }
952
953 journal_state = JOURNAL_NONE;
954 err = 0;
955 dout(10) << __func__ << ": load complete" << dendl;
956out:
957
958 if (err < 0)
959 _reset_states();
960
961 load_done = true;
962 finish_contexts(g_ceph_context, waiting_for_load);
963 waiting_for_load.clear();
964}
965
966void OpenFileTable::load(MDSContext *onload)
967{
968 dout(10) << __func__ << dendl;
969 ceph_assert(!load_done);
970 if (onload)
971 waiting_for_load.push_back(onload);
972
f67539c2 973 _read_omap_values("", 0, true);
11fdf7f2
TL
974}
975
f91f0fd5
TL
976void OpenFileTable::_get_ancestors(const Anchor& parent,
977 vector<inode_backpointer_t>& ancestors,
978 mds_rank_t& auth_hint)
11fdf7f2 979{
f91f0fd5
TL
980 inodeno_t dirino = parent.dirino;
981 std::string_view d_name = parent.d_name;
11fdf7f2
TL
982
983 bool first = true;
984 ancestors.clear();
985 while (true) {
f91f0fd5 986 ancestors.push_back(inode_backpointer_t(dirino, string{d_name}, 0));
11fdf7f2 987
f91f0fd5 988 auto p = loaded_anchor_map.find(dirino);
11fdf7f2
TL
989 if (p == loaded_anchor_map.end())
990 break;
991
992 if (first)
993 auth_hint = p->second.auth;
994
995 dirino = p->second.dirino;
f91f0fd5 996 d_name = p->second.d_name;
11fdf7f2
TL
997 if (dirino == inodeno_t(0))
998 break;
999
1000 first = false;
1001 }
11fdf7f2
TL
1002}
1003
1004class C_OFT_OpenInoFinish: public MDSContext {
1005 OpenFileTable *oft;
1006 inodeno_t ino;
1007 MDSRank *get_mds() override { return oft->mds; }
1008public:
1009 C_OFT_OpenInoFinish(OpenFileTable *t, inodeno_t i) : oft(t), ino(i) {}
1010 void finish(int r) override {
1011 oft->_open_ino_finish(ino, r);
1012 }
1013};
1014
1015void OpenFileTable::_open_ino_finish(inodeno_t ino, int r)
1016{
1017 if (prefetch_state == DIR_INODES && r >= 0 && ino != inodeno_t(0)) {
1018 auto p = loaded_anchor_map.find(ino);
1019 ceph_assert(p != loaded_anchor_map.end());
1020 p->second.auth = mds_rank_t(r);
1021 }
1022
1023 if (r != mds->get_nodeid())
1024 mds->mdcache->rejoin_prefetch_ino_finish(ino, r);
1025
1026 num_opening_inodes--;
1027 if (num_opening_inodes == 0) {
1028 if (prefetch_state == DIR_INODES) {
f91f0fd5
TL
1029 if (g_conf().get_val<bool>("mds_oft_prefetch_dirfrags")) {
1030 prefetch_state = DIRFRAGS;
1031 _prefetch_dirfrags();
1032 } else {
1033 prefetch_state = FILE_INODES;
1034 _prefetch_inodes();
1035 }
11fdf7f2
TL
1036 } else if (prefetch_state == FILE_INODES) {
1037 prefetch_state = DONE;
1038 logseg_destroyed_inos.clear();
1039 destroyed_inos_set.clear();
1040 finish_contexts(g_ceph_context, waiting_for_prefetch);
1041 waiting_for_prefetch.clear();
1042 } else {
1043 ceph_abort();
1044 }
1045 }
1046}
1047
1048void OpenFileTable::_prefetch_dirfrags()
1049{
1050 dout(10) << __func__ << dendl;
1051 ceph_assert(prefetch_state == DIRFRAGS);
1052
1053 MDCache *mdcache = mds->mdcache;
f67539c2 1054 std::vector<CDir*> fetch_queue;
11fdf7f2 1055
f67539c2
TL
1056 for (auto& [ino, anchor] : loaded_anchor_map) {
1057 if (anchor.frags.empty())
f91f0fd5 1058 continue;
f67539c2 1059 CInode *diri = mdcache->get_inode(ino);
f91f0fd5
TL
1060 if (!diri)
1061 continue;
11fdf7f2
TL
1062 if (diri->state_test(CInode::STATE_REJOINUNDEF))
1063 continue;
1064
f67539c2 1065 for (auto& fg: anchor.frags) {
f91f0fd5
TL
1066 CDir *dir = diri->get_dirfrag(fg);
1067 if (dir) {
1068 if (dir->is_auth() && !dir->is_complete())
f67539c2 1069 fetch_queue.push_back(dir);
f91f0fd5
TL
1070 } else {
1071 frag_vec_t leaves;
1072 diri->dirfragtree.get_leaves_under(fg, leaves);
f91f0fd5
TL
1073 for (auto& leaf : leaves) {
1074 if (diri->is_auth()) {
1075 dir = diri->get_or_open_dirfrag(mdcache, leaf);
1076 } else {
1077 dir = diri->get_dirfrag(leaf);
1078 }
1079 if (dir && dir->is_auth() && !dir->is_complete())
f67539c2 1080 fetch_queue.push_back(dir);
11fdf7f2 1081 }
11fdf7f2
TL
1082 }
1083 }
1084 }
1085
1086 MDSGatherBuilder gather(g_ceph_context);
1087 int num_opening_dirfrags = 0;
9f95a23c 1088 for (const auto& dir : fetch_queue) {
11fdf7f2
TL
1089 if (dir->state_test(CDir::STATE_REJOINUNDEF))
1090 ceph_assert(dir->get_inode()->dirfragtree.is_leaf(dir->get_frag()));
1091 dir->fetch(gather.new_sub());
1092
1093 if (!(++num_opening_dirfrags % 1000))
1094 mds->heartbeat_reset();
1095 }
1096
1097 auto finish_func = [this](int r) {
1098 prefetch_state = FILE_INODES;
1099 _prefetch_inodes();
1100 };
1101 if (gather.has_subs()) {
1102 gather.set_finisher(
1103 new MDSInternalContextWrapper(mds,
9f95a23c 1104 new LambdaContext(std::move(finish_func))));
11fdf7f2
TL
1105 gather.activate();
1106 } else {
1107 finish_func(0);
1108 }
1109}
1110
1111void OpenFileTable::_prefetch_inodes()
1112{
1113 dout(10) << __func__ << " state " << prefetch_state << dendl;
1114 ceph_assert(!num_opening_inodes);
1115 num_opening_inodes = 1;
1116
1117 int64_t pool;
1118 if (prefetch_state == DIR_INODES)
b3b6e05e 1119 pool = mds->get_metadata_pool();
11fdf7f2
TL
1120 else if (prefetch_state == FILE_INODES)
1121 pool = mds->mdsmap->get_first_data_pool();
1122 else
1123 ceph_abort();
1124
1125 MDCache *mdcache = mds->mdcache;
1126
1127 if (destroyed_inos_set.empty()) {
1128 for (auto& it : logseg_destroyed_inos)
1129 destroyed_inos_set.insert(it.second.begin(), it.second.end());
1130 }
1131
f67539c2
TL
1132 for (auto& [ino, anchor] : loaded_anchor_map) {
1133 if (destroyed_inos_set.count(ino))
11fdf7f2 1134 continue;
f67539c2 1135 if (anchor.d_type == DT_DIR) {
11fdf7f2
TL
1136 if (prefetch_state != DIR_INODES)
1137 continue;
f67539c2
TL
1138 if (MDS_INO_IS_MDSDIR(ino)) {
1139 anchor.auth = MDS_INO_MDSDIR_OWNER(ino);
11fdf7f2
TL
1140 continue;
1141 }
f67539c2
TL
1142 if (MDS_INO_IS_STRAY(ino)) {
1143 anchor.auth = MDS_INO_STRAY_OWNER(ino);
11fdf7f2
TL
1144 continue;
1145 }
1146 } else {
1147 if (prefetch_state != FILE_INODES)
1148 continue;
1149 // load all file inodes for MDCache::identify_files_to_recover()
1150 }
f67539c2 1151 CInode *in = mdcache->get_inode(ino);
11fdf7f2
TL
1152 if (in)
1153 continue;
1154
1155 num_opening_inodes++;
f91f0fd5 1156
f67539c2
TL
1157 auto fin = new C_OFT_OpenInoFinish(this, ino);
1158 if (anchor.dirino != inodeno_t(0)) {
f91f0fd5
TL
1159 vector<inode_backpointer_t> ancestors;
1160 mds_rank_t auth_hint = MDS_RANK_NONE;
f67539c2
TL
1161 _get_ancestors(anchor, ancestors, auth_hint);
1162 mdcache->open_ino(ino, pool, fin, false, false, &ancestors, auth_hint);
f91f0fd5 1163 } else {
f67539c2 1164 mdcache->open_ino(ino, pool, fin, false);
f91f0fd5 1165 }
11fdf7f2
TL
1166
1167 if (!(num_opening_inodes % 1000))
1168 mds->heartbeat_reset();
1169 }
1170
1171 _open_ino_finish(inodeno_t(0), 0);
1172}
1173
1174bool OpenFileTable::prefetch_inodes()
1175{
1176 dout(10) << __func__ << dendl;
1177 ceph_assert(!prefetch_state);
1178 prefetch_state = DIR_INODES;
1179
1180 if (!load_done) {
1181 wait_for_load(
1182 new MDSInternalContextWrapper(mds,
9f95a23c 1183 new LambdaContext([this](int r) {
11fdf7f2
TL
1184 _prefetch_inodes();
1185 })
1186 )
1187 );
1188 return true;
1189 }
1190
1191 _prefetch_inodes();
1192 return !is_prefetched();
1193}
1194
1195bool OpenFileTable::should_log_open(CInode *in)
1196{
1197 if (in->state_test(CInode::STATE_TRACKEDBYOFT)) {
1198 // inode just journaled
1199 if (in->last_journaled >= committing_log_seq)
1200 return false;
1201 // item not dirty. it means the item has already been saved
1202 auto p = dirty_items.find(in->ino());
1203 if (p == dirty_items.end())
1204 return false;
1205 }
1206 return true;
1207}
1208
1209void OpenFileTable::note_destroyed_inos(uint64_t seq, const vector<inodeno_t>& inos)
1210{
1211 auto& vec = logseg_destroyed_inos[seq];
1212 vec.insert(vec.end(), inos.begin(), inos.end());
1213}
1214
1215void OpenFileTable::trim_destroyed_inos(uint64_t seq)
1216{
1217 auto p = logseg_destroyed_inos.begin();
1218 while (p != logseg_destroyed_inos.end()) {
1219 if (p->first >= seq)
1220 break;
1221 logseg_destroyed_inos.erase(p++);
1222 }
1223}