]> git.proxmox.com Git - ceph.git/blame - ceph/src/mds/OpenFileTable.cc
import ceph pacific 16.2.5
[ceph.git] / ceph / src / mds / OpenFileTable.cc
CommitLineData
11fdf7f2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2018 Red Hat
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include "acconfig.h"
16#include "mds/CInode.h"
17#include "mds/CDir.h"
18#include "mds/MDSRank.h"
19#include "mds/MDCache.h"
20#include "osdc/Objecter.h"
21#include "OpenFileTable.h"
22
23#include "common/config.h"
24#include "common/errno.h"
25
9f95a23c
TL
26enum {
27 l_oft_first = 1000000,
28 l_oft_omap_total_objs,
29 l_oft_omap_total_kv_pairs,
30 l_oft_omap_total_updates,
31 l_oft_omap_total_removes,
32 l_oft_last
33};
34
11fdf7f2
TL
35#define dout_context g_ceph_context
36#define dout_subsys ceph_subsys_mds
37#undef dout_prefix
38#define dout_prefix _prefix(_dout, mds)
39static ostream& _prefix(std::ostream *_dout, MDSRank *mds) {
40 return *_dout << "mds." << mds->get_nodeid() << ".openfiles ";
41}
42
9f95a23c
TL
43OpenFileTable::OpenFileTable(MDSRank *m) : mds(m) {
44 PerfCountersBuilder b(mds->cct, "oft", l_oft_first, l_oft_last);
45
46 b.add_u64(l_oft_omap_total_objs, "omap_total_objs");
47 b.add_u64(l_oft_omap_total_kv_pairs, "omap_total_kv_pairs");
48 b.add_u64(l_oft_omap_total_updates, "omap_total_updates");
49 b.add_u64(l_oft_omap_total_removes, "omap_total_removes");
50 logger.reset(b.create_perf_counters());
51 mds->cct->get_perfcounters_collection()->add(logger.get());
52 logger->set(l_oft_omap_total_objs, 0);
53 logger->set(l_oft_omap_total_kv_pairs, 0);
54 logger->set(l_oft_omap_total_updates, 0);
55 logger->set(l_oft_omap_total_removes, 0);
56}
57
58OpenFileTable::~OpenFileTable() {
59 if (logger) {
60 mds->cct->get_perfcounters_collection()->remove(logger.get());
61 }
62}
63
f91f0fd5 64void OpenFileTable::get_ref(CInode *in, frag_t fg)
11fdf7f2
TL
65{
66 do {
67 auto p = anchor_map.find(in->ino());
f91f0fd5
TL
68 if (!in->is_dir()) {
69 ceph_assert(fg == -1U);
70 ceph_assert(p == anchor_map.end());
71 }
72
11fdf7f2
TL
73 if (p != anchor_map.end()) {
74 ceph_assert(in->state_test(CInode::STATE_TRACKEDBYOFT));
75 ceph_assert(p->second.nref > 0);
76 p->second.nref++;
f91f0fd5
TL
77
78 if (fg != -1U) {
79 auto ret = p->second.frags.insert(fg);
80 ceph_assert(ret.second);
81 dirty_items.emplace(in->ino(), (int)DIRTY_UNDEF);
82 }
11fdf7f2
TL
83 break;
84 }
85
86 CDentry *dn = in->get_parent_dn();
87 CInode *pin = dn ? dn->get_dir()->get_inode() : nullptr;
88
89 auto ret = anchor_map.emplace(std::piecewise_construct, std::forward_as_tuple(in->ino()),
90 std::forward_as_tuple(in->ino(), (pin ? pin->ino() : inodeno_t(0)),
91 (dn ? dn->get_name() : string()), in->d_type(), 1));
92 ceph_assert(ret.second == true);
93 in->state_set(CInode::STATE_TRACKEDBYOFT);
94
f91f0fd5
TL
95 if (fg != -1U)
96 ret.first->second.frags.insert(fg);
97
11fdf7f2
TL
98 auto ret1 = dirty_items.emplace(in->ino(), (int)DIRTY_NEW);
99 if (!ret1.second) {
100 int omap_idx = ret1.first->second;
101 ceph_assert(omap_idx >= 0);
102 ret.first->second.omap_idx = omap_idx;
103 }
104
105 in = pin;
f91f0fd5 106 fg = -1U;
11fdf7f2
TL
107 } while (in);
108}
109
f91f0fd5 110void OpenFileTable::put_ref(CInode *in, frag_t fg)
11fdf7f2
TL
111{
112 do {
113 ceph_assert(in->state_test(CInode::STATE_TRACKEDBYOFT));
114 auto p = anchor_map.find(in->ino());
115 ceph_assert(p != anchor_map.end());
116 ceph_assert(p->second.nref > 0);
117
f91f0fd5
TL
118 if (!in->is_dir()) {
119 ceph_assert(fg == -1U);
120 ceph_assert(p->second.nref == 1);
121 }
122
11fdf7f2
TL
123 if (p->second.nref > 1) {
124 p->second.nref--;
f91f0fd5
TL
125 if (fg != -1U) {
126 auto ret = p->second.frags.erase(fg);
127 ceph_assert(ret);
128 dirty_items.emplace(in->ino(), (int)DIRTY_UNDEF);
129 }
11fdf7f2
TL
130 break;
131 }
132
133 CDentry *dn = in->get_parent_dn();
134 CInode *pin = dn ? dn->get_dir()->get_inode() : nullptr;
135 if (dn) {
136 ceph_assert(p->second.dirino == pin->ino());
137 ceph_assert(p->second.d_name == dn->get_name());
138 } else {
139 ceph_assert(p->second.dirino == inodeno_t(0));
140 ceph_assert(p->second.d_name == "");
141 }
142
f91f0fd5
TL
143 if (fg != -1U) {
144 ceph_assert(p->second.frags.size() == 1);
145 ceph_assert(*p->second.frags.begin() == fg);
146 }
147
11fdf7f2
TL
148 int omap_idx = p->second.omap_idx;
149 anchor_map.erase(p);
150 in->state_clear(CInode::STATE_TRACKEDBYOFT);
151
152 auto ret = dirty_items.emplace(in->ino(), omap_idx);
153 if (!ret.second) {
154 if (ret.first->second == DIRTY_NEW) {
155 ceph_assert(omap_idx < 0);
156 dirty_items.erase(ret.first);
157 } else {
158 ceph_assert(omap_idx >= 0);
159 ret.first->second = omap_idx;
160 }
161 }
162
163 in = pin;
f91f0fd5 164 fg = -1U;
11fdf7f2
TL
165 } while (in);
166}
167
168void OpenFileTable::add_inode(CInode *in)
169{
170 dout(10) << __func__ << " " << *in << dendl;
11fdf7f2
TL
171 get_ref(in);
172}
173
174void OpenFileTable::remove_inode(CInode *in)
175{
176 dout(10) << __func__ << " " << *in << dendl;
11fdf7f2
TL
177 put_ref(in);
178}
179
180void OpenFileTable::add_dirfrag(CDir *dir)
181{
182 dout(10) << __func__ << " " << *dir << dendl;
183 ceph_assert(!dir->state_test(CDir::STATE_TRACKEDBYOFT));
184 dir->state_set(CDir::STATE_TRACKEDBYOFT);
f91f0fd5 185 get_ref(dir->get_inode(), dir->get_frag());
11fdf7f2
TL
186}
187
188void OpenFileTable::remove_dirfrag(CDir *dir)
189{
190 dout(10) << __func__ << " " << *dir << dendl;
191 ceph_assert(dir->state_test(CDir::STATE_TRACKEDBYOFT));
192 dir->state_clear(CDir::STATE_TRACKEDBYOFT);
f91f0fd5 193 put_ref(dir->get_inode(), dir->get_frag());
11fdf7f2
TL
194}
195
196void OpenFileTable::notify_link(CInode *in)
197{
198 dout(10) << __func__ << " " << *in << dendl;
199 auto p = anchor_map.find(in->ino());
200 ceph_assert(p != anchor_map.end());
201 ceph_assert(p->second.nref > 0);
202 ceph_assert(p->second.dirino == inodeno_t(0));
203 ceph_assert(p->second.d_name == "");
204
205 CDentry *dn = in->get_parent_dn();
206 CInode *pin = dn->get_dir()->get_inode();
207
208 p->second.dirino = pin->ino();
209 p->second.d_name = dn->get_name();
210 dirty_items.emplace(in->ino(), (int)DIRTY_UNDEF);
211
212 get_ref(pin);
213}
214
215void OpenFileTable::notify_unlink(CInode *in)
216{
217 dout(10) << __func__ << " " << *in << dendl;
218 auto p = anchor_map.find(in->ino());
219 ceph_assert(p != anchor_map.end());
220 ceph_assert(p->second.nref > 0);
221
222 CDentry *dn = in->get_parent_dn();
223 CInode *pin = dn->get_dir()->get_inode();
224 ceph_assert(p->second.dirino == pin->ino());
225 ceph_assert(p->second.d_name == dn->get_name());
226
227 p->second.dirino = inodeno_t(0);
228 p->second.d_name = "";
229 dirty_items.emplace(in->ino(), (int)DIRTY_UNDEF);
230
231 put_ref(pin);
232}
233
234object_t OpenFileTable::get_object_name(unsigned idx) const
235{
236 char s[30];
237 snprintf(s, sizeof(s), "mds%d_openfiles.%x", int(mds->get_nodeid()), idx);
238 return object_t(s);
239}
240
241void OpenFileTable::_encode_header(bufferlist &bl, int j_state)
242{
243 std::string_view magic = CEPH_FS_ONDISK_MAGIC;
244 encode(magic, bl);
245 ENCODE_START(1, 1, bl);
246 encode(omap_version, bl);
247 encode(omap_num_objs, bl);
248 encode((__u8)j_state, bl);
249 ENCODE_FINISH(bl);
250}
251
252class C_IO_OFT_Save : public MDSIOContextBase {
253protected:
254 OpenFileTable *oft;
255 uint64_t log_seq;
256 MDSContext *fin;
257 MDSRank *get_mds() override { return oft->mds; }
258public:
259 C_IO_OFT_Save(OpenFileTable *t, uint64_t s, MDSContext *c) :
260 oft(t), log_seq(s), fin(c) {}
261 void finish(int r) {
262 oft->_commit_finish(r, log_seq, fin);
263 }
264 void print(ostream& out) const override {
265 out << "openfiles_save";
266 }
267};
268
269void OpenFileTable::_commit_finish(int r, uint64_t log_seq, MDSContext *fin)
270{
271 dout(10) << __func__ << " log_seq " << log_seq << dendl;
272 if (r < 0) {
273 mds->handle_write_error(r);
274 return;
275 }
276
277 ceph_assert(log_seq <= committing_log_seq);
278 ceph_assert(log_seq >= committed_log_seq);
279 committed_log_seq = log_seq;
280 num_pending_commit--;
281
282 if (fin)
283 fin->complete(r);
284}
285
286class C_IO_OFT_Journal : public MDSIOContextBase {
287protected:
288 OpenFileTable *oft;
289 uint64_t log_seq;
290 MDSContext *fin;
291 std::map<unsigned, std::vector<ObjectOperation> > ops_map;
292 MDSRank *get_mds() override { return oft->mds; }
293public:
294 C_IO_OFT_Journal(OpenFileTable *t, uint64_t s, MDSContext *c,
295 std::map<unsigned, std::vector<ObjectOperation> >& ops) :
296 oft(t), log_seq(s), fin(c) {
297 ops_map.swap(ops);
298 }
299 void finish(int r) {
300 oft->_journal_finish(r, log_seq, fin, ops_map);
301 }
302 void print(ostream& out) const override {
303 out << "openfiles_journal";
304 }
305};
306
307void OpenFileTable::_journal_finish(int r, uint64_t log_seq, MDSContext *c,
308 std::map<unsigned, std::vector<ObjectOperation> >& ops_map)
309{
310 dout(10) << __func__ << " log_seq " << log_seq << dendl;
311 if (r < 0) {
312 mds->handle_write_error(r);
313 return;
314 }
315
316 C_GatherBuilder gather(g_ceph_context,
317 new C_OnFinisher(new C_IO_OFT_Save(this, log_seq, c),
318 mds->finisher));
319 SnapContext snapc;
b3b6e05e 320 object_locator_t oloc(mds->get_metadata_pool());
f67539c2
TL
321 for (auto& [idx, vops] : ops_map) {
322 object_t oid = get_object_name(idx);
323 for (auto& op : vops) {
11fdf7f2
TL
324 mds->objecter->mutate(oid, oloc, op, snapc, ceph::real_clock::now(),
325 0, gather.new_sub());
326 }
327 }
328 gather.activate();
329
330 journal_state = JOURNAL_NONE;
331 return;
332}
333
334void OpenFileTable::commit(MDSContext *c, uint64_t log_seq, int op_prio)
335{
336 dout(10) << __func__ << " log_seq " << log_seq << dendl;
337
338 ceph_assert(num_pending_commit == 0);
339 num_pending_commit++;
340 ceph_assert(log_seq >= committing_log_seq);
341 committing_log_seq = log_seq;
342
343 omap_version++;
344
345 C_GatherBuilder gather(g_ceph_context);
346
347 SnapContext snapc;
b3b6e05e 348 object_locator_t oloc(mds->get_metadata_pool());
11fdf7f2
TL
349
350 const unsigned max_write_size = mds->mdcache->max_dir_commit_size;
351
352 struct omap_update_ctl {
353 unsigned write_size = 0;
354 unsigned journal_idx = 0;
355 bool clear = false;
356 std::map<string, bufferlist> to_update, journaled_update;
357 std::set<string> to_remove, journaled_remove;
358 };
359 std::vector<omap_update_ctl> omap_updates(omap_num_objs);
360
361 using ceph::encode;
362 auto journal_func = [&](unsigned idx) {
363 auto& ctl = omap_updates.at(idx);
364
365 ObjectOperation op;
366 op.priority = op_prio;
367
368 if (ctl.clear) {
369 ctl.clear = false;
370 op.omap_clear();
371 op.set_last_op_flags(CEPH_OSD_OP_FLAG_FAILOK);
372 }
373
374 if (ctl.journal_idx == 0) {
375 if (journal_state == JOURNAL_NONE)
376 journal_state = JOURNAL_START;
377 else
378 ceph_assert(journal_state == JOURNAL_START);
379
380 bufferlist header;
381 _encode_header(header, journal_state);
382 op.omap_set_header(header);
383 }
384
385 bufferlist bl;
386 encode(omap_version, bl);
387 encode(ctl.to_update, bl);
388 encode(ctl.to_remove, bl);
389
390 char key[32];
391 snprintf(key, sizeof(key), "_journal.%x", ctl.journal_idx++);
392 std::map<string, bufferlist> tmp_map;
393 tmp_map[key].swap(bl);
394 op.omap_set(tmp_map);
395
396 object_t oid = get_object_name(idx);
397 mds->objecter->mutate(oid, oloc, op, snapc, ceph::real_clock::now(), 0,
398 gather.new_sub());
399
400#ifdef HAVE_STDLIB_MAP_SPLICING
401 ctl.journaled_update.merge(ctl.to_update);
402 ctl.journaled_remove.merge(ctl.to_remove);
403#else
404 ctl.journaled_update.insert(make_move_iterator(begin(ctl.to_update)),
405 make_move_iterator(end(ctl.to_update)));
406 ctl.journaled_remove.insert(make_move_iterator(begin(ctl.to_remove)),
407 make_move_iterator(end(ctl.to_remove)));
408#endif
409 ctl.to_update.clear();
410 ctl.to_remove.clear();
411 };
412
413 std::map<unsigned, std::vector<ObjectOperation> > ops_map;
414
415 auto create_op_func = [&](unsigned idx, bool update_header) {
416 auto& ctl = omap_updates.at(idx);
417
418 auto& op_vec = ops_map[idx];
419 op_vec.resize(op_vec.size() + 1);
420 ObjectOperation& op = op_vec.back();
421 op.priority = op_prio;
422
423 if (ctl.clear) {
424 ctl.clear = false;
425 op.omap_clear();
426 op.set_last_op_flags(CEPH_OSD_OP_FLAG_FAILOK);
427 }
428
429 if (update_header) {
430 bufferlist header;
431 _encode_header(header, journal_state);
432 op.omap_set_header(header);
433 }
434
435 if (!ctl.to_update.empty()) {
436 op.omap_set(ctl.to_update);
437 ctl.to_update.clear();
438 }
439 if (!ctl.to_remove.empty()) {
440 op.omap_rm_keys(ctl.to_remove);
441 ctl.to_remove.clear();
442 }
443 };
444
445 auto submit_ops_func = [&]() {
446 gather.set_finisher(new C_OnFinisher(new C_IO_OFT_Save(this, log_seq, c),
447 mds->finisher));
f67539c2
TL
448 for (auto& [idx, vops] : ops_map) {
449 object_t oid = get_object_name(idx);
450 for (auto& op : vops) {
11fdf7f2
TL
451 mds->objecter->mutate(oid, oloc, op, snapc, ceph::real_clock::now(),
452 0, gather.new_sub());
453 }
454 }
455 gather.activate();
456 };
457
458 bool first_commit = !loaded_anchor_map.empty();
459
460 unsigned first_free_idx = 0;
461 unsigned old_num_objs = omap_num_objs;
462 if (omap_num_objs == 0) {
463 omap_num_objs = 1;
464 omap_num_items.resize(omap_num_objs);
465 omap_updates.resize(omap_num_objs);
466 omap_updates.back().clear = true;
467 }
468
f67539c2
TL
469 for (auto& [ino, state] : dirty_items) {
470 auto p = anchor_map.find(ino);
11fdf7f2
TL
471
472 if (first_commit) {
f67539c2 473 auto q = loaded_anchor_map.find(ino);
11fdf7f2
TL
474 if (q != loaded_anchor_map.end()) {
475 ceph_assert(p != anchor_map.end());
476 p->second.omap_idx = q->second.omap_idx;
f91f0fd5 477 bool same = (p->second == q->second);
11fdf7f2
TL
478 loaded_anchor_map.erase(q);
479 if (same)
480 continue;
481 }
482 }
483
484 char key[32];
f67539c2 485 int len = snprintf(key, sizeof(key), "%llx", (unsigned long long)ino.val);
11fdf7f2
TL
486
487 int omap_idx;
488 if (p != anchor_map.end()) {
489 omap_idx = p->second.omap_idx;
490 if (omap_idx < 0) {
f67539c2 491 ceph_assert(state == DIRTY_NEW);
11fdf7f2
TL
492 // find omap object to store the key
493 for (unsigned i = first_free_idx; i < omap_num_objs; i++) {
92f5a8d4 494 if (omap_num_items[i] < MAX_ITEMS_PER_OBJ) {
11fdf7f2 495 omap_idx = i;
92f5a8d4
TL
496 break;
497 }
11fdf7f2
TL
498 }
499 if (omap_idx < 0) {
500 ++omap_num_objs;
501 ceph_assert(omap_num_objs <= MAX_OBJECTS);
502 omap_num_items.resize(omap_num_objs);
503 omap_updates.resize(omap_num_objs);
504 omap_updates.back().clear = true;
505 omap_idx = omap_num_objs - 1;
506 }
507 first_free_idx = omap_idx;
508
509 p->second.omap_idx = omap_idx;
510 ++omap_num_items[omap_idx];
511 }
512 } else {
f67539c2 513 omap_idx = state;
11fdf7f2
TL
514 unsigned& count = omap_num_items.at(omap_idx);
515 ceph_assert(count > 0);
516 --count;
517 if ((unsigned)omap_idx < first_free_idx && count < MAX_ITEMS_PER_OBJ)
518 first_free_idx = omap_idx;
519 }
520 auto& ctl = omap_updates.at(omap_idx);
f67539c2
TL
521 if (ctl.write_size >= max_write_size) {
522 journal_func(omap_idx);
523 ctl.write_size = 0;
524 }
11fdf7f2
TL
525 if (p != anchor_map.end()) {
526 bufferlist bl;
527 encode(p->second, bl);
f91f0fd5 528 encode((__u32)0, bl); // frags set was encoded here
11fdf7f2
TL
529
530 ctl.write_size += bl.length() + len + 2 * sizeof(__u32);
531 ctl.to_update[key].swap(bl);
532 } else {
533 ctl.write_size += len + sizeof(__u32);
534 ctl.to_remove.emplace(key);
535 }
11fdf7f2
TL
536 }
537
538 dirty_items.clear();
539
540 if (first_commit) {
f67539c2 541 for (auto& [ino, anchor] : loaded_anchor_map) {
11fdf7f2 542 char key[32];
f67539c2 543 int len = snprintf(key, sizeof(key), "%llx", (unsigned long long)ino.val);
11fdf7f2 544
f67539c2 545 int omap_idx = anchor.omap_idx;
11fdf7f2
TL
546 unsigned& count = omap_num_items.at(omap_idx);
547 ceph_assert(count > 0);
548 --count;
549
550 auto& ctl = omap_updates.at(omap_idx);
11fdf7f2 551 if (ctl.write_size >= max_write_size) {
f67539c2
TL
552 journal_func(omap_idx);
553 ctl.write_size = 0;
11fdf7f2 554 }
f67539c2
TL
555 ctl.write_size += len + sizeof(__u32);
556 ctl.to_remove.emplace(key);
11fdf7f2
TL
557 }
558 loaded_anchor_map.clear();
11fdf7f2
TL
559 }
560
9f95a23c 561 size_t total_items = 0;
11fdf7f2 562 {
11fdf7f2 563 unsigned used_objs = 1;
9f95a23c 564 std::vector<unsigned> objs_to_write;
11fdf7f2
TL
565 bool journaled = false;
566 for (unsigned i = 0; i < omap_num_objs; i++) {
567 total_items += omap_num_items[i];
568 if (omap_updates[i].journal_idx)
569 journaled = true;
570 else if (omap_updates[i].write_size)
571 objs_to_write.push_back(i);
572
573 if (omap_num_items[i] > 0)
574 used_objs = i + 1;
575 }
576 ceph_assert(total_items == anchor_map.size());
577 // adjust omap object count
578 if (used_objs < omap_num_objs) {
579 omap_num_objs = used_objs;
580 omap_num_items.resize(omap_num_objs);
581 }
582 // skip journal if only one osd request is required and object count
583 // does not change.
584 if (!journaled && old_num_objs == omap_num_objs &&
585 objs_to_write.size() <= 1) {
586 ceph_assert(journal_state == JOURNAL_NONE);
587 ceph_assert(!gather.has_subs());
588
589 unsigned omap_idx = objs_to_write.empty() ? 0 : objs_to_write.front();
590 create_op_func(omap_idx, true);
591 submit_ops_func();
592 return;
593 }
594 }
595
596 for (unsigned omap_idx = 0; omap_idx < omap_updates.size(); omap_idx++) {
597 auto& ctl = omap_updates[omap_idx];
598 if (ctl.write_size > 0) {
599 journal_func(omap_idx);
600 ctl.write_size = 0;
601 }
602 }
603
604 if (journal_state == JOURNAL_START) {
605 ceph_assert(gather.has_subs());
606 journal_state = JOURNAL_FINISH;
607 } else {
608 // only object count changes
609 ceph_assert(journal_state == JOURNAL_NONE);
610 ceph_assert(!gather.has_subs());
611 }
612
9f95a23c
TL
613 uint64_t total_updates = 0;
614 uint64_t total_removes = 0;
615
11fdf7f2
TL
616 for (unsigned omap_idx = 0; omap_idx < omap_updates.size(); omap_idx++) {
617 auto& ctl = omap_updates[omap_idx];
618 ceph_assert(ctl.to_update.empty() && ctl.to_remove.empty());
619 if (ctl.journal_idx == 0)
620 ceph_assert(ctl.journaled_update.empty() && ctl.journaled_remove.empty());
621
622 bool first = true;
623 for (auto& it : ctl.journaled_update) {
11fdf7f2 624 if (ctl.write_size >= max_write_size) {
f67539c2
TL
625 create_op_func(omap_idx, first);
626 ctl.write_size = 0;
627 first = false;
11fdf7f2 628 }
f67539c2
TL
629 ctl.write_size += it.first.length() + it.second.length() + 2 * sizeof(__u32);
630 ctl.to_update[it.first].swap(it.second);
9f95a23c 631 total_updates++;
11fdf7f2
TL
632 }
633
634 for (auto& key : ctl.journaled_remove) {
11fdf7f2 635 if (ctl.write_size >= max_write_size) {
f67539c2
TL
636 create_op_func(omap_idx, first);
637 ctl.write_size = 0;
638 first = false;
11fdf7f2 639 }
f67539c2
TL
640
641 ctl.write_size += key.length() + sizeof(__u32);
642 ctl.to_remove.emplace(key);
9f95a23c 643 total_removes++;
11fdf7f2
TL
644 }
645
646 for (unsigned i = 0; i < ctl.journal_idx; ++i) {
647 char key[32];
648 snprintf(key, sizeof(key), "_journal.%x", i);
649 ctl.to_remove.emplace(key);
650 }
651
652 // update first object's omap header if object count changes
653 if (ctl.clear ||
654 ctl.journal_idx > 0 ||
655 (omap_idx == 0 && old_num_objs != omap_num_objs))
656 create_op_func(omap_idx, first);
657 }
658
659 ceph_assert(!ops_map.empty());
660 if (journal_state == JOURNAL_FINISH) {
661 gather.set_finisher(new C_OnFinisher(new C_IO_OFT_Journal(this, log_seq, c, ops_map),
662 mds->finisher));
663 gather.activate();
664 } else {
665 submit_ops_func();
666 }
9f95a23c
TL
667 logger->set(l_oft_omap_total_objs, omap_num_objs);
668 logger->set(l_oft_omap_total_kv_pairs, total_items);
669 logger->inc(l_oft_omap_total_updates, total_updates);
670 logger->inc(l_oft_omap_total_removes, total_removes);
11fdf7f2
TL
671}
672
673class C_IO_OFT_Load : public MDSIOContextBase {
674protected:
675 OpenFileTable *oft;
676 MDSRank *get_mds() override { return oft->mds; }
677
678public:
679 int header_r = 0; //< Return value from OMAP header read
680 int values_r = 0; //< Return value from OMAP value read
681 bufferlist header_bl;
682 std::map<std::string, bufferlist> values;
683 unsigned index;
684 bool first;
685 bool more = false;
686
687 C_IO_OFT_Load(OpenFileTable *t, unsigned i, bool f) :
688 oft(t), index(i), first(f) {}
689 void finish(int r) override {
690 oft->_load_finish(r, header_r, values_r, index, first, more, header_bl, values);
691 }
692 void print(ostream& out) const override {
693 out << "openfiles_load";
694 }
695};
696
697class C_IO_OFT_Recover : public MDSIOContextBase {
698protected:
699 OpenFileTable *oft;
700 MDSRank *get_mds() override { return oft->mds; }
701public:
702 C_IO_OFT_Recover(OpenFileTable *t) : oft(t) {}
703 void finish(int r) override {
704 oft->_recover_finish(r);
705 }
706 void print(ostream& out) const override {
707 out << "openfiles_recover";
708 }
709};
710
711void OpenFileTable::_recover_finish(int r)
712{
713 if (r < 0) {
714 derr << __func__ << " got " << cpp_strerror(r) << dendl;
715 _reset_states();
716 } else {
717 dout(10) << __func__ << ": load complete" << dendl;
718 }
719
720 journal_state = JOURNAL_NONE;
721 load_done = true;
722 finish_contexts(g_ceph_context, waiting_for_load);
723 waiting_for_load.clear();
724}
725
f67539c2
TL
726void OpenFileTable::_read_omap_values(const std::string& key, unsigned idx,
727 bool first)
728{
729 object_t oid = get_object_name(idx);
730 dout(10) << __func__ << ": load from '" << oid << ":" << key << "'" << dendl;
b3b6e05e 731 object_locator_t oloc(mds->get_metadata_pool());
f67539c2
TL
732 C_IO_OFT_Load *c = new C_IO_OFT_Load(this, idx, first);
733 ObjectOperation op;
734 if (first)
735 op.omap_get_header(&c->header_bl, &c->header_r);
736 op.omap_get_vals(key, "", uint64_t(-1),
737 &c->values, &c->more, &c->values_r);
738 mds->objecter->read(oid, oloc, op, CEPH_NOSNAP, nullptr, 0,
739 new C_OnFinisher(c, mds->finisher));
740}
741
11fdf7f2
TL
742void OpenFileTable::_load_finish(int op_r, int header_r, int values_r,
743 unsigned idx, bool first, bool more,
744 bufferlist &header_bl,
745 std::map<std::string, bufferlist> &values)
746{
747 using ceph::decode;
f67539c2 748 int err = -CEPHFS_EINVAL;
11fdf7f2
TL
749
750 auto decode_func = [this](unsigned idx, inodeno_t ino, bufferlist &bl) {
751 auto p = bl.cbegin();
752
753 size_t count = loaded_anchor_map.size();
754 auto it = loaded_anchor_map.emplace_hint(loaded_anchor_map.end(),
755 std::piecewise_construct,
756 std::make_tuple(ino),
757 std::make_tuple());
758 RecoveredAnchor& anchor = it->second;
759 decode(anchor, p);
f91f0fd5
TL
760 frag_vec_t frags; // unused
761 decode(frags, p);
11fdf7f2
TL
762 ceph_assert(ino == anchor.ino);
763 anchor.omap_idx = idx;
764 anchor.auth = MDS_RANK_NONE;
765
11fdf7f2
TL
766
767 if (loaded_anchor_map.size() > count)
768 ++omap_num_items[idx];
769 };
770
771 if (op_r < 0) {
772 derr << __func__ << " got " << cpp_strerror(op_r) << dendl;
773 err = op_r;
774 goto out;
775 }
776
777 try {
778 if (first) {
779 auto p = header_bl.cbegin();
780
781 string magic;
782 version_t version;
783 unsigned num_objs;
784 __u8 jstate;
785
786 if (header_bl.length() == 13) {
787 // obsolete format.
788 decode(version, p);
789 decode(num_objs, p);
790 decode(jstate, p);
791 } else {
792 decode(magic, p);
793 if (magic != CEPH_FS_ONDISK_MAGIC) {
f67539c2
TL
794 CachedStackStringStream css;
795 *css << "invalid magic '" << magic << "'";
796 throw buffer::malformed_input(css->str());
11fdf7f2
TL
797 }
798
799 DECODE_START(1, p);
800 decode(version, p);
801 decode(num_objs, p);
802 decode(jstate, p);
803 DECODE_FINISH(p);
804 }
805
806 if (num_objs > MAX_OBJECTS) {
f67539c2
TL
807 CachedStackStringStream css;
808 *css << "invalid object count '" << num_objs << "'";
809 throw buffer::malformed_input(css->str());
11fdf7f2
TL
810 }
811 if (jstate > JOURNAL_FINISH) {
f67539c2
TL
812 CachedStackStringStream css;
813 *css << "invalid journal state '" << jstate << "'";
814 throw buffer::malformed_input(css->str());
11fdf7f2
TL
815 }
816
817 if (version > omap_version) {
818 omap_version = version;
819 omap_num_objs = num_objs;
820 omap_num_items.resize(omap_num_objs);
821 journal_state = jstate;
822 } else if (version == omap_version) {
823 ceph_assert(omap_num_objs == num_objs);
824 if (jstate > journal_state)
825 journal_state = jstate;
826 }
827 }
828
829 for (auto& it : values) {
830 if (it.first.compare(0, 9, "_journal.") == 0) {
831 if (idx >= loaded_journals.size())
832 loaded_journals.resize(idx + 1);
833
834 if (journal_state == JOURNAL_FINISH) {
835 loaded_journals[idx][it.first].swap(it.second);
836 } else { // incomplete journal
837 loaded_journals[idx][it.first].length();
838 }
839 continue;
840 }
841
842 inodeno_t ino;
843 sscanf(it.first.c_str(), "%llx", (unsigned long long*)&ino.val);
844 decode_func(idx, ino, it.second);
845 }
846 } catch (buffer::error &e) {
847 derr << __func__ << ": corrupted header/values: " << e.what() << dendl;
848 goto out;
849 }
850
851 if (more || idx + 1 < omap_num_objs) {
852 // Issue another read if we're not at the end of the omap
853 std::string last_key;
854 if (more)
855 last_key = values.rbegin()->first;
856 else
857 idx++;
f67539c2
TL
858
859 _read_omap_values(last_key, idx, !more);
11fdf7f2
TL
860 return;
861 }
862
863 // replay journal
864 if (loaded_journals.size() > 0) {
865 dout(10) << __func__ << ": recover journal" << dendl;
866
867 C_GatherBuilder gather(g_ceph_context,
868 new C_OnFinisher(new C_IO_OFT_Recover(this),
869 mds->finisher));
b3b6e05e 870 object_locator_t oloc(mds->get_metadata_pool());
11fdf7f2
TL
871 SnapContext snapc;
872
873 for (unsigned omap_idx = 0; omap_idx < loaded_journals.size(); omap_idx++) {
874 auto& loaded_journal = loaded_journals[omap_idx];
875
876 std::vector<ObjectOperation> op_vec;
877 try {
878 for (auto& it : loaded_journal) {
879 if (journal_state != JOURNAL_FINISH)
880 continue;
881 auto p = it.second.cbegin();
882 version_t version;
883 std::map<string, bufferlist> to_update;
884 std::set<string> to_remove;
885 decode(version, p);
886 if (version != omap_version)
887 continue;
888 decode(to_update, p);
889 decode(to_remove, p);
890 it.second.clear();
891
892 for (auto& q : to_update) {
893 inodeno_t ino;
894 sscanf(q.first.c_str(), "%llx", (unsigned long long*)&ino.val);
895 decode_func(omap_idx, ino, q.second);
896 }
897 for (auto& q : to_remove) {
898 inodeno_t ino;
899 sscanf(q.c_str(), "%llx",(unsigned long long*)&ino.val);
900 ceph_assert(ino.val > 0);
901 if (loaded_anchor_map.erase(ino)) {
902 unsigned& count = omap_num_items[omap_idx];
903 ceph_assert(count > 0);
904 --count;
905 }
11fdf7f2
TL
906 }
907
908 op_vec.resize(op_vec.size() + 1);
909 ObjectOperation& op = op_vec.back();
910 op.priority = CEPH_MSG_PRIO_HIGH;
911 if (!to_update.empty())
912 op.omap_set(to_update);
913 if (!to_remove.empty())
914 op.omap_rm_keys(to_remove);
915 }
916 } catch (buffer::error &e) {
917 derr << __func__ << ": corrupted journal: " << e.what() << dendl;
918 goto out;
919 }
920
921 op_vec.resize(op_vec.size() + 1);
922 ObjectOperation& op = op_vec.back();
923 {
924 bufferlist header;
925 if (journal_state == JOURNAL_FINISH)
926 _encode_header(header, JOURNAL_FINISH);
927 else
928 _encode_header(header, JOURNAL_NONE);
929 op.omap_set_header(header);
930 }
931 {
932 // remove journal
933 std::set<string> to_remove;
934 for (auto &it : loaded_journal)
935 to_remove.emplace(it.first);
936 op.omap_rm_keys(to_remove);
937 }
938 loaded_journal.clear();
939
940 object_t oid = get_object_name(omap_idx);
941 for (auto& op : op_vec) {
942 mds->objecter->mutate(oid, oloc, op, snapc, ceph::real_clock::now(),
943 0, gather.new_sub());
944 }
945 }
946 gather.activate();
947 return;
948 }
949
950 journal_state = JOURNAL_NONE;
951 err = 0;
952 dout(10) << __func__ << ": load complete" << dendl;
953out:
954
955 if (err < 0)
956 _reset_states();
957
958 load_done = true;
959 finish_contexts(g_ceph_context, waiting_for_load);
960 waiting_for_load.clear();
961}
962
963void OpenFileTable::load(MDSContext *onload)
964{
965 dout(10) << __func__ << dendl;
966 ceph_assert(!load_done);
967 if (onload)
968 waiting_for_load.push_back(onload);
969
f67539c2 970 _read_omap_values("", 0, true);
11fdf7f2
TL
971}
972
f91f0fd5
TL
973void OpenFileTable::_get_ancestors(const Anchor& parent,
974 vector<inode_backpointer_t>& ancestors,
975 mds_rank_t& auth_hint)
11fdf7f2 976{
f91f0fd5
TL
977 inodeno_t dirino = parent.dirino;
978 std::string_view d_name = parent.d_name;
11fdf7f2
TL
979
980 bool first = true;
981 ancestors.clear();
982 while (true) {
f91f0fd5 983 ancestors.push_back(inode_backpointer_t(dirino, string{d_name}, 0));
11fdf7f2 984
f91f0fd5 985 auto p = loaded_anchor_map.find(dirino);
11fdf7f2
TL
986 if (p == loaded_anchor_map.end())
987 break;
988
989 if (first)
990 auth_hint = p->second.auth;
991
992 dirino = p->second.dirino;
f91f0fd5 993 d_name = p->second.d_name;
11fdf7f2
TL
994 if (dirino == inodeno_t(0))
995 break;
996
997 first = false;
998 }
11fdf7f2
TL
999}
1000
1001class C_OFT_OpenInoFinish: public MDSContext {
1002 OpenFileTable *oft;
1003 inodeno_t ino;
1004 MDSRank *get_mds() override { return oft->mds; }
1005public:
1006 C_OFT_OpenInoFinish(OpenFileTable *t, inodeno_t i) : oft(t), ino(i) {}
1007 void finish(int r) override {
1008 oft->_open_ino_finish(ino, r);
1009 }
1010};
1011
1012void OpenFileTable::_open_ino_finish(inodeno_t ino, int r)
1013{
1014 if (prefetch_state == DIR_INODES && r >= 0 && ino != inodeno_t(0)) {
1015 auto p = loaded_anchor_map.find(ino);
1016 ceph_assert(p != loaded_anchor_map.end());
1017 p->second.auth = mds_rank_t(r);
1018 }
1019
1020 if (r != mds->get_nodeid())
1021 mds->mdcache->rejoin_prefetch_ino_finish(ino, r);
1022
1023 num_opening_inodes--;
1024 if (num_opening_inodes == 0) {
1025 if (prefetch_state == DIR_INODES) {
f91f0fd5
TL
1026 if (g_conf().get_val<bool>("mds_oft_prefetch_dirfrags")) {
1027 prefetch_state = DIRFRAGS;
1028 _prefetch_dirfrags();
1029 } else {
1030 prefetch_state = FILE_INODES;
1031 _prefetch_inodes();
1032 }
11fdf7f2
TL
1033 } else if (prefetch_state == FILE_INODES) {
1034 prefetch_state = DONE;
1035 logseg_destroyed_inos.clear();
1036 destroyed_inos_set.clear();
1037 finish_contexts(g_ceph_context, waiting_for_prefetch);
1038 waiting_for_prefetch.clear();
1039 } else {
1040 ceph_abort();
1041 }
1042 }
1043}
1044
1045void OpenFileTable::_prefetch_dirfrags()
1046{
1047 dout(10) << __func__ << dendl;
1048 ceph_assert(prefetch_state == DIRFRAGS);
1049
1050 MDCache *mdcache = mds->mdcache;
f67539c2 1051 std::vector<CDir*> fetch_queue;
11fdf7f2 1052
f67539c2
TL
1053 for (auto& [ino, anchor] : loaded_anchor_map) {
1054 if (anchor.frags.empty())
f91f0fd5 1055 continue;
f67539c2 1056 CInode *diri = mdcache->get_inode(ino);
f91f0fd5
TL
1057 if (!diri)
1058 continue;
11fdf7f2
TL
1059 if (diri->state_test(CInode::STATE_REJOINUNDEF))
1060 continue;
1061
f67539c2 1062 for (auto& fg: anchor.frags) {
f91f0fd5
TL
1063 CDir *dir = diri->get_dirfrag(fg);
1064 if (dir) {
1065 if (dir->is_auth() && !dir->is_complete())
f67539c2 1066 fetch_queue.push_back(dir);
f91f0fd5
TL
1067 } else {
1068 frag_vec_t leaves;
1069 diri->dirfragtree.get_leaves_under(fg, leaves);
f91f0fd5
TL
1070 for (auto& leaf : leaves) {
1071 if (diri->is_auth()) {
1072 dir = diri->get_or_open_dirfrag(mdcache, leaf);
1073 } else {
1074 dir = diri->get_dirfrag(leaf);
1075 }
1076 if (dir && dir->is_auth() && !dir->is_complete())
f67539c2 1077 fetch_queue.push_back(dir);
11fdf7f2 1078 }
11fdf7f2
TL
1079 }
1080 }
1081 }
1082
1083 MDSGatherBuilder gather(g_ceph_context);
1084 int num_opening_dirfrags = 0;
9f95a23c 1085 for (const auto& dir : fetch_queue) {
11fdf7f2
TL
1086 if (dir->state_test(CDir::STATE_REJOINUNDEF))
1087 ceph_assert(dir->get_inode()->dirfragtree.is_leaf(dir->get_frag()));
1088 dir->fetch(gather.new_sub());
1089
1090 if (!(++num_opening_dirfrags % 1000))
1091 mds->heartbeat_reset();
1092 }
1093
1094 auto finish_func = [this](int r) {
1095 prefetch_state = FILE_INODES;
1096 _prefetch_inodes();
1097 };
1098 if (gather.has_subs()) {
1099 gather.set_finisher(
1100 new MDSInternalContextWrapper(mds,
9f95a23c 1101 new LambdaContext(std::move(finish_func))));
11fdf7f2
TL
1102 gather.activate();
1103 } else {
1104 finish_func(0);
1105 }
1106}
1107
1108void OpenFileTable::_prefetch_inodes()
1109{
1110 dout(10) << __func__ << " state " << prefetch_state << dendl;
1111 ceph_assert(!num_opening_inodes);
1112 num_opening_inodes = 1;
1113
1114 int64_t pool;
1115 if (prefetch_state == DIR_INODES)
b3b6e05e 1116 pool = mds->get_metadata_pool();
11fdf7f2
TL
1117 else if (prefetch_state == FILE_INODES)
1118 pool = mds->mdsmap->get_first_data_pool();
1119 else
1120 ceph_abort();
1121
1122 MDCache *mdcache = mds->mdcache;
1123
1124 if (destroyed_inos_set.empty()) {
1125 for (auto& it : logseg_destroyed_inos)
1126 destroyed_inos_set.insert(it.second.begin(), it.second.end());
1127 }
1128
f67539c2
TL
1129 for (auto& [ino, anchor] : loaded_anchor_map) {
1130 if (destroyed_inos_set.count(ino))
11fdf7f2 1131 continue;
f67539c2 1132 if (anchor.d_type == DT_DIR) {
11fdf7f2
TL
1133 if (prefetch_state != DIR_INODES)
1134 continue;
f67539c2
TL
1135 if (MDS_INO_IS_MDSDIR(ino)) {
1136 anchor.auth = MDS_INO_MDSDIR_OWNER(ino);
11fdf7f2
TL
1137 continue;
1138 }
f67539c2
TL
1139 if (MDS_INO_IS_STRAY(ino)) {
1140 anchor.auth = MDS_INO_STRAY_OWNER(ino);
11fdf7f2
TL
1141 continue;
1142 }
1143 } else {
1144 if (prefetch_state != FILE_INODES)
1145 continue;
1146 // load all file inodes for MDCache::identify_files_to_recover()
1147 }
f67539c2 1148 CInode *in = mdcache->get_inode(ino);
11fdf7f2
TL
1149 if (in)
1150 continue;
1151
1152 num_opening_inodes++;
f91f0fd5 1153
f67539c2
TL
1154 auto fin = new C_OFT_OpenInoFinish(this, ino);
1155 if (anchor.dirino != inodeno_t(0)) {
f91f0fd5
TL
1156 vector<inode_backpointer_t> ancestors;
1157 mds_rank_t auth_hint = MDS_RANK_NONE;
f67539c2
TL
1158 _get_ancestors(anchor, ancestors, auth_hint);
1159 mdcache->open_ino(ino, pool, fin, false, false, &ancestors, auth_hint);
f91f0fd5 1160 } else {
f67539c2 1161 mdcache->open_ino(ino, pool, fin, false);
f91f0fd5 1162 }
11fdf7f2
TL
1163
1164 if (!(num_opening_inodes % 1000))
1165 mds->heartbeat_reset();
1166 }
1167
1168 _open_ino_finish(inodeno_t(0), 0);
1169}
1170
1171bool OpenFileTable::prefetch_inodes()
1172{
1173 dout(10) << __func__ << dendl;
1174 ceph_assert(!prefetch_state);
1175 prefetch_state = DIR_INODES;
1176
1177 if (!load_done) {
1178 wait_for_load(
1179 new MDSInternalContextWrapper(mds,
9f95a23c 1180 new LambdaContext([this](int r) {
11fdf7f2
TL
1181 _prefetch_inodes();
1182 })
1183 )
1184 );
1185 return true;
1186 }
1187
1188 _prefetch_inodes();
1189 return !is_prefetched();
1190}
1191
1192bool OpenFileTable::should_log_open(CInode *in)
1193{
1194 if (in->state_test(CInode::STATE_TRACKEDBYOFT)) {
1195 // inode just journaled
1196 if (in->last_journaled >= committing_log_seq)
1197 return false;
1198 // item not dirty. it means the item has already been saved
1199 auto p = dirty_items.find(in->ino());
1200 if (p == dirty_items.end())
1201 return false;
1202 }
1203 return true;
1204}
1205
1206void OpenFileTable::note_destroyed_inos(uint64_t seq, const vector<inodeno_t>& inos)
1207{
1208 auto& vec = logseg_destroyed_inos[seq];
1209 vec.insert(vec.end(), inos.begin(), inos.end());
1210}
1211
1212void OpenFileTable::trim_destroyed_inos(uint64_t seq)
1213{
1214 auto p = logseg_destroyed_inos.begin();
1215 while (p != logseg_destroyed_inos.end()) {
1216 if (p->first >= seq)
1217 break;
1218 logseg_destroyed_inos.erase(p++);
1219 }
1220}