]> git.proxmox.com Git - ceph.git/blob - ceph/src/mds/OpenFileTable.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / mds / OpenFileTable.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2018 Red Hat
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include "acconfig.h"
16 #include "mds/CInode.h"
17 #include "mds/CDir.h"
18 #include "mds/MDSRank.h"
19 #include "mds/MDCache.h"
20 #include "osdc/Objecter.h"
21 #include "OpenFileTable.h"
22
23 #include "common/config.h"
24 #include "common/errno.h"
25
26 enum {
27 l_oft_first = 1000000,
28 l_oft_omap_total_objs,
29 l_oft_omap_total_kv_pairs,
30 l_oft_omap_total_updates,
31 l_oft_omap_total_removes,
32 l_oft_last
33 };
34
35 #define dout_context g_ceph_context
36 #define dout_subsys ceph_subsys_mds
37 #undef dout_prefix
38 #define dout_prefix _prefix(_dout, mds)
39 static ostream& _prefix(std::ostream *_dout, MDSRank *mds) {
40 return *_dout << "mds." << mds->get_nodeid() << ".openfiles ";
41 }
42
43 OpenFileTable::OpenFileTable(MDSRank *m) : mds(m) {
44 PerfCountersBuilder b(mds->cct, "oft", l_oft_first, l_oft_last);
45
46 b.add_u64(l_oft_omap_total_objs, "omap_total_objs");
47 b.add_u64(l_oft_omap_total_kv_pairs, "omap_total_kv_pairs");
48 b.add_u64(l_oft_omap_total_updates, "omap_total_updates");
49 b.add_u64(l_oft_omap_total_removes, "omap_total_removes");
50 logger.reset(b.create_perf_counters());
51 mds->cct->get_perfcounters_collection()->add(logger.get());
52 logger->set(l_oft_omap_total_objs, 0);
53 logger->set(l_oft_omap_total_kv_pairs, 0);
54 logger->set(l_oft_omap_total_updates, 0);
55 logger->set(l_oft_omap_total_removes, 0);
56 }
57
58 OpenFileTable::~OpenFileTable() {
59 if (logger) {
60 mds->cct->get_perfcounters_collection()->remove(logger.get());
61 }
62 }
63
64 void OpenFileTable::get_ref(CInode *in)
65 {
66 do {
67 auto p = anchor_map.find(in->ino());
68 if (p != anchor_map.end()) {
69 ceph_assert(in->state_test(CInode::STATE_TRACKEDBYOFT));
70 ceph_assert(p->second.nref > 0);
71 p->second.nref++;
72 break;
73 }
74
75 CDentry *dn = in->get_parent_dn();
76 CInode *pin = dn ? dn->get_dir()->get_inode() : nullptr;
77
78 auto ret = anchor_map.emplace(std::piecewise_construct, std::forward_as_tuple(in->ino()),
79 std::forward_as_tuple(in->ino(), (pin ? pin->ino() : inodeno_t(0)),
80 (dn ? dn->get_name() : string()), in->d_type(), 1));
81 ceph_assert(ret.second == true);
82 in->state_set(CInode::STATE_TRACKEDBYOFT);
83
84 auto ret1 = dirty_items.emplace(in->ino(), (int)DIRTY_NEW);
85 if (!ret1.second) {
86 int omap_idx = ret1.first->second;
87 ceph_assert(omap_idx >= 0);
88 ret.first->second.omap_idx = omap_idx;
89 }
90
91 in = pin;
92 } while (in);
93 }
94
95 void OpenFileTable::put_ref(CInode *in)
96 {
97 do {
98 ceph_assert(in->state_test(CInode::STATE_TRACKEDBYOFT));
99 auto p = anchor_map.find(in->ino());
100 ceph_assert(p != anchor_map.end());
101 ceph_assert(p->second.nref > 0);
102
103 if (p->second.nref > 1) {
104 p->second.nref--;
105 break;
106 }
107
108 CDentry *dn = in->get_parent_dn();
109 CInode *pin = dn ? dn->get_dir()->get_inode() : nullptr;
110 if (dn) {
111 ceph_assert(p->second.dirino == pin->ino());
112 ceph_assert(p->second.d_name == dn->get_name());
113 } else {
114 ceph_assert(p->second.dirino == inodeno_t(0));
115 ceph_assert(p->second.d_name == "");
116 }
117
118 int omap_idx = p->second.omap_idx;
119 anchor_map.erase(p);
120 in->state_clear(CInode::STATE_TRACKEDBYOFT);
121
122 auto ret = dirty_items.emplace(in->ino(), omap_idx);
123 if (!ret.second) {
124 if (ret.first->second == DIRTY_NEW) {
125 ceph_assert(omap_idx < 0);
126 dirty_items.erase(ret.first);
127 } else {
128 ceph_assert(omap_idx >= 0);
129 ret.first->second = omap_idx;
130 }
131 }
132
133 in = pin;
134 } while (in);
135 }
136
137 void OpenFileTable::add_inode(CInode *in)
138 {
139 dout(10) << __func__ << " " << *in << dendl;
140 if (!in->is_dir()) {
141 auto p = anchor_map.find(in->ino());
142 ceph_assert(p == anchor_map.end());
143 }
144 get_ref(in);
145 }
146
147 void OpenFileTable::remove_inode(CInode *in)
148 {
149 dout(10) << __func__ << " " << *in << dendl;
150 if (!in->is_dir()) {
151 auto p = anchor_map.find(in->ino());
152 ceph_assert(p != anchor_map.end());
153 ceph_assert(p->second.nref == 1);
154 }
155 put_ref(in);
156 }
157
158 void OpenFileTable::add_dirfrag(CDir *dir)
159 {
160 dout(10) << __func__ << " " << *dir << dendl;
161 ceph_assert(!dir->state_test(CDir::STATE_TRACKEDBYOFT));
162 dir->state_set(CDir::STATE_TRACKEDBYOFT);
163 auto ret = dirfrags.insert(dir->dirfrag());
164 ceph_assert(ret.second);
165 get_ref(dir->get_inode());
166 dirty_items.emplace(dir->ino(), (int)DIRTY_UNDEF);
167 }
168
169 void OpenFileTable::remove_dirfrag(CDir *dir)
170 {
171 dout(10) << __func__ << " " << *dir << dendl;
172 ceph_assert(dir->state_test(CDir::STATE_TRACKEDBYOFT));
173 dir->state_clear(CDir::STATE_TRACKEDBYOFT);
174 auto p = dirfrags.find(dir->dirfrag());
175 ceph_assert(p != dirfrags.end());
176 dirfrags.erase(p);
177 dirty_items.emplace(dir->ino(), (int)DIRTY_UNDEF);
178 put_ref(dir->get_inode());
179 }
180
181 void OpenFileTable::notify_link(CInode *in)
182 {
183 dout(10) << __func__ << " " << *in << dendl;
184 auto p = anchor_map.find(in->ino());
185 ceph_assert(p != anchor_map.end());
186 ceph_assert(p->second.nref > 0);
187 ceph_assert(p->second.dirino == inodeno_t(0));
188 ceph_assert(p->second.d_name == "");
189
190 CDentry *dn = in->get_parent_dn();
191 CInode *pin = dn->get_dir()->get_inode();
192
193 p->second.dirino = pin->ino();
194 p->second.d_name = dn->get_name();
195 dirty_items.emplace(in->ino(), (int)DIRTY_UNDEF);
196
197 get_ref(pin);
198 }
199
200 void OpenFileTable::notify_unlink(CInode *in)
201 {
202 dout(10) << __func__ << " " << *in << dendl;
203 auto p = anchor_map.find(in->ino());
204 ceph_assert(p != anchor_map.end());
205 ceph_assert(p->second.nref > 0);
206
207 CDentry *dn = in->get_parent_dn();
208 CInode *pin = dn->get_dir()->get_inode();
209 ceph_assert(p->second.dirino == pin->ino());
210 ceph_assert(p->second.d_name == dn->get_name());
211
212 p->second.dirino = inodeno_t(0);
213 p->second.d_name = "";
214 dirty_items.emplace(in->ino(), (int)DIRTY_UNDEF);
215
216 put_ref(pin);
217 }
218
219 object_t OpenFileTable::get_object_name(unsigned idx) const
220 {
221 char s[30];
222 snprintf(s, sizeof(s), "mds%d_openfiles.%x", int(mds->get_nodeid()), idx);
223 return object_t(s);
224 }
225
226 void OpenFileTable::_encode_header(bufferlist &bl, int j_state)
227 {
228 std::string_view magic = CEPH_FS_ONDISK_MAGIC;
229 encode(magic, bl);
230 ENCODE_START(1, 1, bl);
231 encode(omap_version, bl);
232 encode(omap_num_objs, bl);
233 encode((__u8)j_state, bl);
234 ENCODE_FINISH(bl);
235 }
236
237 class C_IO_OFT_Save : public MDSIOContextBase {
238 protected:
239 OpenFileTable *oft;
240 uint64_t log_seq;
241 MDSContext *fin;
242 MDSRank *get_mds() override { return oft->mds; }
243 public:
244 C_IO_OFT_Save(OpenFileTable *t, uint64_t s, MDSContext *c) :
245 oft(t), log_seq(s), fin(c) {}
246 void finish(int r) {
247 oft->_commit_finish(r, log_seq, fin);
248 }
249 void print(ostream& out) const override {
250 out << "openfiles_save";
251 }
252 };
253
254 void OpenFileTable::_commit_finish(int r, uint64_t log_seq, MDSContext *fin)
255 {
256 dout(10) << __func__ << " log_seq " << log_seq << dendl;
257 if (r < 0) {
258 mds->handle_write_error(r);
259 return;
260 }
261
262 ceph_assert(log_seq <= committing_log_seq);
263 ceph_assert(log_seq >= committed_log_seq);
264 committed_log_seq = log_seq;
265 num_pending_commit--;
266
267 if (fin)
268 fin->complete(r);
269 }
270
271 class C_IO_OFT_Journal : public MDSIOContextBase {
272 protected:
273 OpenFileTable *oft;
274 uint64_t log_seq;
275 MDSContext *fin;
276 std::map<unsigned, std::vector<ObjectOperation> > ops_map;
277 MDSRank *get_mds() override { return oft->mds; }
278 public:
279 C_IO_OFT_Journal(OpenFileTable *t, uint64_t s, MDSContext *c,
280 std::map<unsigned, std::vector<ObjectOperation> >& ops) :
281 oft(t), log_seq(s), fin(c) {
282 ops_map.swap(ops);
283 }
284 void finish(int r) {
285 oft->_journal_finish(r, log_seq, fin, ops_map);
286 }
287 void print(ostream& out) const override {
288 out << "openfiles_journal";
289 }
290 };
291
292 void OpenFileTable::_journal_finish(int r, uint64_t log_seq, MDSContext *c,
293 std::map<unsigned, std::vector<ObjectOperation> >& ops_map)
294 {
295 dout(10) << __func__ << " log_seq " << log_seq << dendl;
296 if (r < 0) {
297 mds->handle_write_error(r);
298 return;
299 }
300
301 C_GatherBuilder gather(g_ceph_context,
302 new C_OnFinisher(new C_IO_OFT_Save(this, log_seq, c),
303 mds->finisher));
304 SnapContext snapc;
305 object_locator_t oloc(mds->mdsmap->get_metadata_pool());
306 for (auto& it : ops_map) {
307 object_t oid = get_object_name(it.first);
308 for (auto& op : it.second) {
309 mds->objecter->mutate(oid, oloc, op, snapc, ceph::real_clock::now(),
310 0, gather.new_sub());
311 }
312 }
313 gather.activate();
314
315 journal_state = JOURNAL_NONE;
316 return;
317 }
318
319 void OpenFileTable::commit(MDSContext *c, uint64_t log_seq, int op_prio)
320 {
321 dout(10) << __func__ << " log_seq " << log_seq << dendl;
322
323 ceph_assert(num_pending_commit == 0);
324 num_pending_commit++;
325 ceph_assert(log_seq >= committing_log_seq);
326 committing_log_seq = log_seq;
327
328 omap_version++;
329
330 C_GatherBuilder gather(g_ceph_context);
331
332 SnapContext snapc;
333 object_locator_t oloc(mds->mdsmap->get_metadata_pool());
334
335 const unsigned max_write_size = mds->mdcache->max_dir_commit_size;
336
337 struct omap_update_ctl {
338 unsigned write_size = 0;
339 unsigned journal_idx = 0;
340 bool clear = false;
341 std::map<string, bufferlist> to_update, journaled_update;
342 std::set<string> to_remove, journaled_remove;
343 };
344 std::vector<omap_update_ctl> omap_updates(omap_num_objs);
345
346 using ceph::encode;
347 auto journal_func = [&](unsigned idx) {
348 auto& ctl = omap_updates.at(idx);
349
350 ObjectOperation op;
351 op.priority = op_prio;
352
353 if (ctl.clear) {
354 ctl.clear = false;
355 op.omap_clear();
356 op.set_last_op_flags(CEPH_OSD_OP_FLAG_FAILOK);
357 }
358
359 if (ctl.journal_idx == 0) {
360 if (journal_state == JOURNAL_NONE)
361 journal_state = JOURNAL_START;
362 else
363 ceph_assert(journal_state == JOURNAL_START);
364
365 bufferlist header;
366 _encode_header(header, journal_state);
367 op.omap_set_header(header);
368 }
369
370 bufferlist bl;
371 encode(omap_version, bl);
372 encode(ctl.to_update, bl);
373 encode(ctl.to_remove, bl);
374
375 char key[32];
376 snprintf(key, sizeof(key), "_journal.%x", ctl.journal_idx++);
377 std::map<string, bufferlist> tmp_map;
378 tmp_map[key].swap(bl);
379 op.omap_set(tmp_map);
380
381 object_t oid = get_object_name(idx);
382 mds->objecter->mutate(oid, oloc, op, snapc, ceph::real_clock::now(), 0,
383 gather.new_sub());
384
385 #ifdef HAVE_STDLIB_MAP_SPLICING
386 ctl.journaled_update.merge(ctl.to_update);
387 ctl.journaled_remove.merge(ctl.to_remove);
388 #else
389 ctl.journaled_update.insert(make_move_iterator(begin(ctl.to_update)),
390 make_move_iterator(end(ctl.to_update)));
391 ctl.journaled_remove.insert(make_move_iterator(begin(ctl.to_remove)),
392 make_move_iterator(end(ctl.to_remove)));
393 #endif
394 ctl.to_update.clear();
395 ctl.to_remove.clear();
396 };
397
398 std::map<unsigned, std::vector<ObjectOperation> > ops_map;
399
400 auto create_op_func = [&](unsigned idx, bool update_header) {
401 auto& ctl = omap_updates.at(idx);
402
403 auto& op_vec = ops_map[idx];
404 op_vec.resize(op_vec.size() + 1);
405 ObjectOperation& op = op_vec.back();
406 op.priority = op_prio;
407
408 if (ctl.clear) {
409 ctl.clear = false;
410 op.omap_clear();
411 op.set_last_op_flags(CEPH_OSD_OP_FLAG_FAILOK);
412 }
413
414 if (update_header) {
415 bufferlist header;
416 _encode_header(header, journal_state);
417 op.omap_set_header(header);
418 }
419
420 if (!ctl.to_update.empty()) {
421 op.omap_set(ctl.to_update);
422 ctl.to_update.clear();
423 }
424 if (!ctl.to_remove.empty()) {
425 op.omap_rm_keys(ctl.to_remove);
426 ctl.to_remove.clear();
427 }
428 };
429
430 auto submit_ops_func = [&]() {
431 gather.set_finisher(new C_OnFinisher(new C_IO_OFT_Save(this, log_seq, c),
432 mds->finisher));
433 for (auto& it : ops_map) {
434 object_t oid = get_object_name(it.first);
435 for (auto& op : it.second) {
436 mds->objecter->mutate(oid, oloc, op, snapc, ceph::real_clock::now(),
437 0, gather.new_sub());
438 }
439 }
440 gather.activate();
441 };
442
443 bool first_commit = !loaded_anchor_map.empty();
444
445 unsigned first_free_idx = 0;
446 unsigned old_num_objs = omap_num_objs;
447 if (omap_num_objs == 0) {
448 omap_num_objs = 1;
449 omap_num_items.resize(omap_num_objs);
450 omap_updates.resize(omap_num_objs);
451 omap_updates.back().clear = true;
452 }
453
454 for (auto& it : dirty_items) {
455 frag_vec_t frags;
456 auto p = anchor_map.find(it.first);
457 if (p != anchor_map.end()) {
458 for (auto q = dirfrags.lower_bound(dirfrag_t(it.first, 0));
459 q != dirfrags.end() && q->ino == it.first;
460 ++q)
461 frags.push_back(q->frag);
462 }
463
464 if (first_commit) {
465 auto q = loaded_anchor_map.find(it.first);
466 if (q != loaded_anchor_map.end()) {
467 ceph_assert(p != anchor_map.end());
468 p->second.omap_idx = q->second.omap_idx;
469 bool same = p->second == q->second;
470 if (same) {
471 auto r = loaded_dirfrags.lower_bound(dirfrag_t(it.first, 0));
472 for (const auto& fg : frags) {
473 if (r == loaded_dirfrags.end() || !(*r == dirfrag_t(it.first, fg))) {
474 same = false;
475 break;
476 }
477 ++r;
478 }
479 if (same && r != loaded_dirfrags.end() && r->ino == it.first)
480 same = false;
481 }
482 loaded_anchor_map.erase(q);
483 if (same)
484 continue;
485 }
486 }
487
488 char key[32];
489 int len = snprintf(key, sizeof(key), "%llx", (unsigned long long)it.first.val);
490
491 int omap_idx;
492 if (p != anchor_map.end()) {
493 omap_idx = p->second.omap_idx;
494 if (omap_idx < 0) {
495 ceph_assert(it.second == DIRTY_NEW);
496 // find omap object to store the key
497 for (unsigned i = first_free_idx; i < omap_num_objs; i++) {
498 if (omap_num_items[i] < MAX_ITEMS_PER_OBJ) {
499 omap_idx = i;
500 break;
501 }
502 }
503 if (omap_idx < 0) {
504 ++omap_num_objs;
505 ceph_assert(omap_num_objs <= MAX_OBJECTS);
506 omap_num_items.resize(omap_num_objs);
507 omap_updates.resize(omap_num_objs);
508 omap_updates.back().clear = true;
509 omap_idx = omap_num_objs - 1;
510 }
511 first_free_idx = omap_idx;
512
513 p->second.omap_idx = omap_idx;
514 ++omap_num_items[omap_idx];
515 }
516 } else {
517 omap_idx = it.second;
518 unsigned& count = omap_num_items.at(omap_idx);
519 ceph_assert(count > 0);
520 --count;
521 if ((unsigned)omap_idx < first_free_idx && count < MAX_ITEMS_PER_OBJ)
522 first_free_idx = omap_idx;
523 }
524 auto& ctl = omap_updates.at(omap_idx);
525
526 if (p != anchor_map.end()) {
527 bufferlist bl;
528 encode(p->second, bl);
529 encode(frags, bl);
530
531 ctl.write_size += bl.length() + len + 2 * sizeof(__u32);
532 ctl.to_update[key].swap(bl);
533 } else {
534 ctl.write_size += len + sizeof(__u32);
535 ctl.to_remove.emplace(key);
536 }
537
538 if (ctl.write_size >= max_write_size) {
539 journal_func(omap_idx);
540 ctl.write_size = 0;
541 }
542 }
543
544 dirty_items.clear();
545
546 if (first_commit) {
547 for (auto& it : loaded_anchor_map) {
548 char key[32];
549 int len = snprintf(key, sizeof(key), "%llx", (unsigned long long)it.first.val);
550
551 int omap_idx = it.second.omap_idx;
552 unsigned& count = omap_num_items.at(omap_idx);
553 ceph_assert(count > 0);
554 --count;
555
556 auto& ctl = omap_updates.at(omap_idx);
557 ctl.write_size += len + sizeof(__u32);
558 ctl.to_remove.emplace(key);
559
560 if (ctl.write_size >= max_write_size) {
561 journal_func(omap_idx);
562 ctl.write_size = 0;
563 }
564 }
565 loaded_anchor_map.clear();
566 loaded_dirfrags.clear();
567 }
568
569 size_t total_items = 0;
570 {
571 unsigned used_objs = 1;
572 std::vector<unsigned> objs_to_write;
573 bool journaled = false;
574 for (unsigned i = 0; i < omap_num_objs; i++) {
575 total_items += omap_num_items[i];
576 if (omap_updates[i].journal_idx)
577 journaled = true;
578 else if (omap_updates[i].write_size)
579 objs_to_write.push_back(i);
580
581 if (omap_num_items[i] > 0)
582 used_objs = i + 1;
583 }
584 ceph_assert(total_items == anchor_map.size());
585 // adjust omap object count
586 if (used_objs < omap_num_objs) {
587 omap_num_objs = used_objs;
588 omap_num_items.resize(omap_num_objs);
589 }
590 // skip journal if only one osd request is required and object count
591 // does not change.
592 if (!journaled && old_num_objs == omap_num_objs &&
593 objs_to_write.size() <= 1) {
594 ceph_assert(journal_state == JOURNAL_NONE);
595 ceph_assert(!gather.has_subs());
596
597 unsigned omap_idx = objs_to_write.empty() ? 0 : objs_to_write.front();
598 create_op_func(omap_idx, true);
599 submit_ops_func();
600 return;
601 }
602 }
603
604 for (unsigned omap_idx = 0; omap_idx < omap_updates.size(); omap_idx++) {
605 auto& ctl = omap_updates[omap_idx];
606 if (ctl.write_size > 0) {
607 journal_func(omap_idx);
608 ctl.write_size = 0;
609 }
610 }
611
612 if (journal_state == JOURNAL_START) {
613 ceph_assert(gather.has_subs());
614 journal_state = JOURNAL_FINISH;
615 } else {
616 // only object count changes
617 ceph_assert(journal_state == JOURNAL_NONE);
618 ceph_assert(!gather.has_subs());
619 }
620
621 uint64_t total_updates = 0;
622 uint64_t total_removes = 0;
623
624 for (unsigned omap_idx = 0; omap_idx < omap_updates.size(); omap_idx++) {
625 auto& ctl = omap_updates[omap_idx];
626 ceph_assert(ctl.to_update.empty() && ctl.to_remove.empty());
627 if (ctl.journal_idx == 0)
628 ceph_assert(ctl.journaled_update.empty() && ctl.journaled_remove.empty());
629
630 bool first = true;
631 for (auto& it : ctl.journaled_update) {
632 ctl.write_size += it.first.length() + it.second.length() + 2 * sizeof(__u32);
633 ctl.to_update[it.first].swap(it.second);
634 if (ctl.write_size >= max_write_size) {
635 create_op_func(omap_idx, first);
636 ctl.write_size = 0;
637 first = false;
638 }
639 total_updates++;
640 }
641
642 for (auto& key : ctl.journaled_remove) {
643 ctl.write_size += key.length() + sizeof(__u32);
644 ctl.to_remove.emplace(key);
645 if (ctl.write_size >= max_write_size) {
646 create_op_func(omap_idx, first);
647 ctl.write_size = 0;
648 first = false;
649 }
650 total_removes++;
651 }
652
653 for (unsigned i = 0; i < ctl.journal_idx; ++i) {
654 char key[32];
655 snprintf(key, sizeof(key), "_journal.%x", i);
656 ctl.to_remove.emplace(key);
657 }
658
659 // update first object's omap header if object count changes
660 if (ctl.clear ||
661 ctl.journal_idx > 0 ||
662 (omap_idx == 0 && old_num_objs != omap_num_objs))
663 create_op_func(omap_idx, first);
664 }
665
666 ceph_assert(!ops_map.empty());
667 if (journal_state == JOURNAL_FINISH) {
668 gather.set_finisher(new C_OnFinisher(new C_IO_OFT_Journal(this, log_seq, c, ops_map),
669 mds->finisher));
670 gather.activate();
671 } else {
672 submit_ops_func();
673 }
674 logger->set(l_oft_omap_total_objs, omap_num_objs);
675 logger->set(l_oft_omap_total_kv_pairs, total_items);
676 logger->inc(l_oft_omap_total_updates, total_updates);
677 logger->inc(l_oft_omap_total_removes, total_removes);
678 }
679
680 class C_IO_OFT_Load : public MDSIOContextBase {
681 protected:
682 OpenFileTable *oft;
683 MDSRank *get_mds() override { return oft->mds; }
684
685 public:
686 int header_r = 0; //< Return value from OMAP header read
687 int values_r = 0; //< Return value from OMAP value read
688 bufferlist header_bl;
689 std::map<std::string, bufferlist> values;
690 unsigned index;
691 bool first;
692 bool more = false;
693
694 C_IO_OFT_Load(OpenFileTable *t, unsigned i, bool f) :
695 oft(t), index(i), first(f) {}
696 void finish(int r) override {
697 oft->_load_finish(r, header_r, values_r, index, first, more, header_bl, values);
698 }
699 void print(ostream& out) const override {
700 out << "openfiles_load";
701 }
702 };
703
704 class C_IO_OFT_Recover : public MDSIOContextBase {
705 protected:
706 OpenFileTable *oft;
707 MDSRank *get_mds() override { return oft->mds; }
708 public:
709 C_IO_OFT_Recover(OpenFileTable *t) : oft(t) {}
710 void finish(int r) override {
711 oft->_recover_finish(r);
712 }
713 void print(ostream& out) const override {
714 out << "openfiles_recover";
715 }
716 };
717
718 void OpenFileTable::_recover_finish(int r)
719 {
720 if (r < 0) {
721 derr << __func__ << " got " << cpp_strerror(r) << dendl;
722 _reset_states();
723 } else {
724 dout(10) << __func__ << ": load complete" << dendl;
725 }
726
727 journal_state = JOURNAL_NONE;
728 load_done = true;
729 finish_contexts(g_ceph_context, waiting_for_load);
730 waiting_for_load.clear();
731 }
732
733 void OpenFileTable::_load_finish(int op_r, int header_r, int values_r,
734 unsigned idx, bool first, bool more,
735 bufferlist &header_bl,
736 std::map<std::string, bufferlist> &values)
737 {
738 using ceph::decode;
739 int err = -EINVAL;
740
741 auto decode_func = [this](unsigned idx, inodeno_t ino, bufferlist &bl) {
742 auto p = bl.cbegin();
743
744 size_t count = loaded_anchor_map.size();
745 auto it = loaded_anchor_map.emplace_hint(loaded_anchor_map.end(),
746 std::piecewise_construct,
747 std::make_tuple(ino),
748 std::make_tuple());
749 RecoveredAnchor& anchor = it->second;
750 decode(anchor, p);
751 ceph_assert(ino == anchor.ino);
752 anchor.omap_idx = idx;
753 anchor.auth = MDS_RANK_NONE;
754
755 frag_vec_t frags;
756 decode(frags, p);
757 for (const auto& fg : frags)
758 loaded_dirfrags.insert(loaded_dirfrags.end(), dirfrag_t(anchor.ino, fg));
759
760 if (loaded_anchor_map.size() > count)
761 ++omap_num_items[idx];
762 };
763
764 if (op_r < 0) {
765 derr << __func__ << " got " << cpp_strerror(op_r) << dendl;
766 err = op_r;
767 goto out;
768 }
769
770 try {
771 if (first) {
772 auto p = header_bl.cbegin();
773
774 string magic;
775 version_t version;
776 unsigned num_objs;
777 __u8 jstate;
778
779 if (header_bl.length() == 13) {
780 // obsolete format.
781 decode(version, p);
782 decode(num_objs, p);
783 decode(jstate, p);
784 } else {
785 decode(magic, p);
786 if (magic != CEPH_FS_ONDISK_MAGIC) {
787 std::ostringstream oss;
788 oss << "invalid magic '" << magic << "'";
789 throw buffer::malformed_input(oss.str());
790 }
791
792 DECODE_START(1, p);
793 decode(version, p);
794 decode(num_objs, p);
795 decode(jstate, p);
796 DECODE_FINISH(p);
797 }
798
799 if (num_objs > MAX_OBJECTS) {
800 std::ostringstream oss;
801 oss << "invalid object count '" << num_objs << "'";
802 throw buffer::malformed_input(oss.str());
803 }
804 if (jstate > JOURNAL_FINISH) {
805 std::ostringstream oss;
806 oss << "invalid journal state '" << jstate << "'";
807 throw buffer::malformed_input(oss.str());
808 }
809
810 if (version > omap_version) {
811 omap_version = version;
812 omap_num_objs = num_objs;
813 omap_num_items.resize(omap_num_objs);
814 journal_state = jstate;
815 } else if (version == omap_version) {
816 ceph_assert(omap_num_objs == num_objs);
817 if (jstate > journal_state)
818 journal_state = jstate;
819 }
820 }
821
822 for (auto& it : values) {
823 if (it.first.compare(0, 9, "_journal.") == 0) {
824 if (idx >= loaded_journals.size())
825 loaded_journals.resize(idx + 1);
826
827 if (journal_state == JOURNAL_FINISH) {
828 loaded_journals[idx][it.first].swap(it.second);
829 } else { // incomplete journal
830 loaded_journals[idx][it.first].length();
831 }
832 continue;
833 }
834
835 inodeno_t ino;
836 sscanf(it.first.c_str(), "%llx", (unsigned long long*)&ino.val);
837 decode_func(idx, ino, it.second);
838 }
839 } catch (buffer::error &e) {
840 derr << __func__ << ": corrupted header/values: " << e.what() << dendl;
841 goto out;
842 }
843
844 if (more || idx + 1 < omap_num_objs) {
845 // Issue another read if we're not at the end of the omap
846 std::string last_key;
847 if (more)
848 last_key = values.rbegin()->first;
849 else
850 idx++;
851 dout(10) << __func__ << ": continue to load from '" << last_key << "'" << dendl;
852 object_t oid = get_object_name(idx);
853 object_locator_t oloc(mds->mdsmap->get_metadata_pool());
854 C_IO_OFT_Load *c = new C_IO_OFT_Load(this, idx, !more);
855 ObjectOperation op;
856 if (!more)
857 op.omap_get_header(&c->header_bl, &c->header_r);
858 op.omap_get_vals(last_key, "", uint64_t(-1),
859 &c->values, &c->more, &c->values_r);
860 mds->objecter->read(oid, oloc, op, CEPH_NOSNAP, nullptr, 0,
861 new C_OnFinisher(c, mds->finisher));
862 return;
863 }
864
865 // replay journal
866 if (loaded_journals.size() > 0) {
867 dout(10) << __func__ << ": recover journal" << dendl;
868
869 C_GatherBuilder gather(g_ceph_context,
870 new C_OnFinisher(new C_IO_OFT_Recover(this),
871 mds->finisher));
872 object_locator_t oloc(mds->mdsmap->get_metadata_pool());
873 SnapContext snapc;
874
875 for (unsigned omap_idx = 0; omap_idx < loaded_journals.size(); omap_idx++) {
876 auto& loaded_journal = loaded_journals[omap_idx];
877
878 std::vector<ObjectOperation> op_vec;
879 try {
880 for (auto& it : loaded_journal) {
881 if (journal_state != JOURNAL_FINISH)
882 continue;
883 auto p = it.second.cbegin();
884 version_t version;
885 std::map<string, bufferlist> to_update;
886 std::set<string> to_remove;
887 decode(version, p);
888 if (version != omap_version)
889 continue;
890 decode(to_update, p);
891 decode(to_remove, p);
892 it.second.clear();
893
894 for (auto& q : to_update) {
895 inodeno_t ino;
896 sscanf(q.first.c_str(), "%llx", (unsigned long long*)&ino.val);
897 decode_func(omap_idx, ino, q.second);
898 }
899 for (auto& q : to_remove) {
900 inodeno_t ino;
901 sscanf(q.c_str(), "%llx",(unsigned long long*)&ino.val);
902 ceph_assert(ino.val > 0);
903 if (loaded_anchor_map.erase(ino)) {
904 unsigned& count = omap_num_items[omap_idx];
905 ceph_assert(count > 0);
906 --count;
907 }
908 auto r = loaded_dirfrags.lower_bound(dirfrag_t(ino, 0));
909 while (r != loaded_dirfrags.end() && r->ino == ino)
910 loaded_dirfrags.erase(r++);
911 }
912
913 op_vec.resize(op_vec.size() + 1);
914 ObjectOperation& op = op_vec.back();
915 op.priority = CEPH_MSG_PRIO_HIGH;
916 if (!to_update.empty())
917 op.omap_set(to_update);
918 if (!to_remove.empty())
919 op.omap_rm_keys(to_remove);
920 }
921 } catch (buffer::error &e) {
922 derr << __func__ << ": corrupted journal: " << e.what() << dendl;
923 goto out;
924 }
925
926 op_vec.resize(op_vec.size() + 1);
927 ObjectOperation& op = op_vec.back();
928 {
929 bufferlist header;
930 if (journal_state == JOURNAL_FINISH)
931 _encode_header(header, JOURNAL_FINISH);
932 else
933 _encode_header(header, JOURNAL_NONE);
934 op.omap_set_header(header);
935 }
936 {
937 // remove journal
938 std::set<string> to_remove;
939 for (auto &it : loaded_journal)
940 to_remove.emplace(it.first);
941 op.omap_rm_keys(to_remove);
942 }
943 loaded_journal.clear();
944
945 object_t oid = get_object_name(omap_idx);
946 for (auto& op : op_vec) {
947 mds->objecter->mutate(oid, oloc, op, snapc, ceph::real_clock::now(),
948 0, gather.new_sub());
949 }
950 }
951 gather.activate();
952 return;
953 }
954
955 journal_state = JOURNAL_NONE;
956 err = 0;
957 dout(10) << __func__ << ": load complete" << dendl;
958 out:
959
960 if (err < 0)
961 _reset_states();
962
963 load_done = true;
964 finish_contexts(g_ceph_context, waiting_for_load);
965 waiting_for_load.clear();
966 }
967
968 void OpenFileTable::load(MDSContext *onload)
969 {
970 dout(10) << __func__ << dendl;
971 ceph_assert(!load_done);
972 if (onload)
973 waiting_for_load.push_back(onload);
974
975 C_IO_OFT_Load *c = new C_IO_OFT_Load(this, 0, true);
976 object_t oid = get_object_name(0);
977 object_locator_t oloc(mds->mdsmap->get_metadata_pool());
978
979 ObjectOperation op;
980 op.omap_get_header(&c->header_bl, &c->header_r);
981 op.omap_get_vals("", "", uint64_t(-1),
982 &c->values, &c->more, &c->values_r);
983
984 mds->objecter->read(oid, oloc, op, CEPH_NOSNAP, nullptr, 0,
985 new C_OnFinisher(c, mds->finisher));
986 }
987
988 bool OpenFileTable::get_ancestors(inodeno_t ino, vector<inode_backpointer_t>& ancestors,
989 mds_rank_t& auth_hint)
990 {
991 auto p = loaded_anchor_map.find(ino);
992 if (p == loaded_anchor_map.end())
993 return false;
994
995 inodeno_t dirino = p->second.dirino;
996 if (dirino == inodeno_t(0))
997 return false;
998
999 bool first = true;
1000 ancestors.clear();
1001 while (true) {
1002 ancestors.push_back(inode_backpointer_t(dirino, p->second.d_name, 0));
1003
1004 p = loaded_anchor_map.find(dirino);
1005 if (p == loaded_anchor_map.end())
1006 break;
1007
1008 if (first)
1009 auth_hint = p->second.auth;
1010
1011 dirino = p->second.dirino;
1012 if (dirino == inodeno_t(0))
1013 break;
1014
1015 first = false;
1016 }
1017 return true;
1018 }
1019
1020 class C_OFT_OpenInoFinish: public MDSContext {
1021 OpenFileTable *oft;
1022 inodeno_t ino;
1023 MDSRank *get_mds() override { return oft->mds; }
1024 public:
1025 C_OFT_OpenInoFinish(OpenFileTable *t, inodeno_t i) : oft(t), ino(i) {}
1026 void finish(int r) override {
1027 oft->_open_ino_finish(ino, r);
1028 }
1029 };
1030
1031 void OpenFileTable::_open_ino_finish(inodeno_t ino, int r)
1032 {
1033 if (prefetch_state == DIR_INODES && r >= 0 && ino != inodeno_t(0)) {
1034 auto p = loaded_anchor_map.find(ino);
1035 ceph_assert(p != loaded_anchor_map.end());
1036 p->second.auth = mds_rank_t(r);
1037 }
1038
1039 if (r != mds->get_nodeid())
1040 mds->mdcache->rejoin_prefetch_ino_finish(ino, r);
1041
1042 num_opening_inodes--;
1043 if (num_opening_inodes == 0) {
1044 if (prefetch_state == DIR_INODES) {
1045 prefetch_state = DIRFRAGS;
1046 _prefetch_dirfrags();
1047 } else if (prefetch_state == FILE_INODES) {
1048 prefetch_state = DONE;
1049 logseg_destroyed_inos.clear();
1050 destroyed_inos_set.clear();
1051 finish_contexts(g_ceph_context, waiting_for_prefetch);
1052 waiting_for_prefetch.clear();
1053 } else {
1054 ceph_abort();
1055 }
1056 }
1057 }
1058
1059 void OpenFileTable::_prefetch_dirfrags()
1060 {
1061 dout(10) << __func__ << dendl;
1062 ceph_assert(prefetch_state == DIRFRAGS);
1063
1064 MDCache *mdcache = mds->mdcache;
1065 std::vector<CDir*> fetch_queue;
1066
1067 CInode *last_in = nullptr;
1068 for (auto df : loaded_dirfrags) {
1069 CInode *diri;
1070 if (last_in && last_in->ino() == df.ino) {
1071 diri = last_in;
1072 } else {
1073 diri = mdcache->get_inode(df.ino);
1074 if (!diri)
1075 continue;
1076 last_in = diri;
1077 }
1078 if (diri->state_test(CInode::STATE_REJOINUNDEF))
1079 continue;
1080
1081 CDir *dir = diri->get_dirfrag(df.frag);
1082 if (dir) {
1083 if (dir->is_auth() && !dir->is_complete())
1084 fetch_queue.push_back(dir);
1085 } else {
1086 frag_vec_t leaves;
1087 diri->dirfragtree.get_leaves_under(df.frag, leaves);
1088 for (const auto& leaf : leaves) {
1089 if (diri->is_auth()) {
1090 dir = diri->get_or_open_dirfrag(mdcache, leaf);
1091 } else {
1092 dir = diri->get_dirfrag(leaf);
1093 }
1094 if (dir && dir->is_auth() && !dir->is_complete())
1095 fetch_queue.push_back(dir);
1096 }
1097 }
1098 }
1099
1100 MDSGatherBuilder gather(g_ceph_context);
1101 int num_opening_dirfrags = 0;
1102 for (const auto& dir : fetch_queue) {
1103 if (dir->state_test(CDir::STATE_REJOINUNDEF))
1104 ceph_assert(dir->get_inode()->dirfragtree.is_leaf(dir->get_frag()));
1105 dir->fetch(gather.new_sub());
1106
1107 if (!(++num_opening_dirfrags % 1000))
1108 mds->heartbeat_reset();
1109 }
1110
1111 auto finish_func = [this](int r) {
1112 prefetch_state = FILE_INODES;
1113 _prefetch_inodes();
1114 };
1115 if (gather.has_subs()) {
1116 gather.set_finisher(
1117 new MDSInternalContextWrapper(mds,
1118 new LambdaContext(std::move(finish_func))));
1119 gather.activate();
1120 } else {
1121 finish_func(0);
1122 }
1123 }
1124
1125 void OpenFileTable::_prefetch_inodes()
1126 {
1127 dout(10) << __func__ << " state " << prefetch_state << dendl;
1128 ceph_assert(!num_opening_inodes);
1129 num_opening_inodes = 1;
1130
1131 int64_t pool;
1132 if (prefetch_state == DIR_INODES)
1133 pool = mds->mdsmap->get_metadata_pool();
1134 else if (prefetch_state == FILE_INODES)
1135 pool = mds->mdsmap->get_first_data_pool();
1136 else
1137 ceph_abort();
1138
1139 MDCache *mdcache = mds->mdcache;
1140
1141 if (destroyed_inos_set.empty()) {
1142 for (auto& it : logseg_destroyed_inos)
1143 destroyed_inos_set.insert(it.second.begin(), it.second.end());
1144 }
1145
1146 for (auto& it : loaded_anchor_map) {
1147 if (destroyed_inos_set.count(it.first))
1148 continue;
1149 if (it.second.d_type == DT_DIR) {
1150 if (prefetch_state != DIR_INODES)
1151 continue;
1152 if (MDS_INO_IS_MDSDIR(it.first)) {
1153 it.second.auth = MDS_INO_MDSDIR_OWNER(it.first);
1154 continue;
1155 }
1156 if (MDS_INO_IS_STRAY(it.first)) {
1157 it.second.auth = MDS_INO_STRAY_OWNER(it.first);
1158 continue;
1159 }
1160 } else {
1161 if (prefetch_state != FILE_INODES)
1162 continue;
1163 // load all file inodes for MDCache::identify_files_to_recover()
1164 }
1165 CInode *in = mdcache->get_inode(it.first);
1166 if (in)
1167 continue;
1168
1169 num_opening_inodes++;
1170 mdcache->open_ino(it.first, pool, new C_OFT_OpenInoFinish(this, it.first), false);
1171
1172 if (!(num_opening_inodes % 1000))
1173 mds->heartbeat_reset();
1174 }
1175
1176 _open_ino_finish(inodeno_t(0), 0);
1177 }
1178
1179 bool OpenFileTable::prefetch_inodes()
1180 {
1181 dout(10) << __func__ << dendl;
1182 ceph_assert(!prefetch_state);
1183 prefetch_state = DIR_INODES;
1184
1185 if (!load_done) {
1186 wait_for_load(
1187 new MDSInternalContextWrapper(mds,
1188 new LambdaContext([this](int r) {
1189 _prefetch_inodes();
1190 })
1191 )
1192 );
1193 return true;
1194 }
1195
1196 _prefetch_inodes();
1197 return !is_prefetched();
1198 }
1199
1200 bool OpenFileTable::should_log_open(CInode *in)
1201 {
1202 if (in->state_test(CInode::STATE_TRACKEDBYOFT)) {
1203 // inode just journaled
1204 if (in->last_journaled >= committing_log_seq)
1205 return false;
1206 // item not dirty. it means the item has already been saved
1207 auto p = dirty_items.find(in->ino());
1208 if (p == dirty_items.end())
1209 return false;
1210 }
1211 return true;
1212 }
1213
1214 void OpenFileTable::note_destroyed_inos(uint64_t seq, const vector<inodeno_t>& inos)
1215 {
1216 auto& vec = logseg_destroyed_inos[seq];
1217 vec.insert(vec.end(), inos.begin(), inos.end());
1218 }
1219
1220 void OpenFileTable::trim_destroyed_inos(uint64_t seq)
1221 {
1222 auto p = logseg_destroyed_inos.begin();
1223 while (p != logseg_destroyed_inos.end()) {
1224 if (p->first >= seq)
1225 break;
1226 logseg_destroyed_inos.erase(p++);
1227 }
1228 }