]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | #include "MDSRank.h" | |
16 | #include "MDCache.h" | |
17 | #include "Mutation.h" | |
18 | #include "SessionMap.h" | |
19 | #include "osdc/Filer.h" | |
20 | #include "common/Finisher.h" | |
21 | ||
22 | #include "common/config.h" | |
23 | #include "common/errno.h" | |
24 | #include "include/assert.h" | |
25 | #include "include/stringify.h" | |
26 | ||
27 | #define dout_context g_ceph_context | |
28 | #define dout_subsys ceph_subsys_mds | |
29 | #undef dout_prefix | |
30 | #define dout_prefix *_dout << "mds." << rank << ".sessionmap " | |
31 | ||
32 | namespace { | |
33 | class SessionMapIOContext : public MDSIOContextBase | |
34 | { | |
35 | protected: | |
36 | SessionMap *sessionmap; | |
37 | MDSRank *get_mds() override {return sessionmap->mds;} | |
38 | public: | |
39 | explicit SessionMapIOContext(SessionMap *sessionmap_) : sessionmap(sessionmap_) { | |
40 | assert(sessionmap != NULL); | |
41 | } | |
42 | }; | |
43 | }; | |
44 | ||
45 | void SessionMap::register_perfcounters() | |
46 | { | |
47 | PerfCountersBuilder plb(g_ceph_context, "mds_sessions", | |
48 | l_mdssm_first, l_mdssm_last); | |
49 | plb.add_u64(l_mdssm_session_count, "session_count", | |
50 | "Session count"); | |
51 | plb.add_u64_counter(l_mdssm_session_add, "session_add", | |
52 | "Sessions added"); | |
53 | plb.add_u64_counter(l_mdssm_session_remove, "session_remove", | |
54 | "Sessions removed"); | |
55 | logger = plb.create_perf_counters(); | |
56 | g_ceph_context->get_perfcounters_collection()->add(logger); | |
57 | } | |
58 | ||
59 | void SessionMap::dump() | |
60 | { | |
61 | dout(10) << "dump" << dendl; | |
62 | for (ceph::unordered_map<entity_name_t,Session*>::iterator p = session_map.begin(); | |
63 | p != session_map.end(); | |
64 | ++p) | |
65 | dout(10) << p->first << " " << p->second | |
66 | << " state " << p->second->get_state_name() | |
67 | << " completed " << p->second->info.completed_requests | |
68 | << " prealloc_inos " << p->second->info.prealloc_inos | |
69 | << " used_inos " << p->second->info.used_inos | |
70 | << dendl; | |
71 | } | |
72 | ||
73 | ||
74 | // ---------------- | |
75 | // LOAD | |
76 | ||
77 | ||
78 | object_t SessionMap::get_object_name() const | |
79 | { | |
80 | char s[30]; | |
81 | snprintf(s, sizeof(s), "mds%d_sessionmap", int(mds->get_nodeid())); | |
82 | return object_t(s); | |
83 | } | |
84 | ||
85 | namespace { | |
86 | class C_IO_SM_Load : public SessionMapIOContext { | |
87 | public: | |
88 | const bool first; //< Am I the initial (header) load? | |
89 | int header_r; //< Return value from OMAP header read | |
90 | int values_r; //< Return value from OMAP value read | |
91 | bufferlist header_bl; | |
92 | std::map<std::string, bufferlist> session_vals; | |
93 | bool more_session_vals = false; | |
94 | ||
95 | C_IO_SM_Load(SessionMap *cm, const bool f) | |
96 | : SessionMapIOContext(cm), first(f), header_r(0), values_r(0) {} | |
97 | ||
98 | void finish(int r) override { | |
99 | sessionmap->_load_finish(r, header_r, values_r, first, header_bl, session_vals, | |
100 | more_session_vals); | |
101 | } | |
102 | }; | |
103 | } | |
104 | ||
105 | ||
106 | /** | |
107 | * Decode OMAP header. Call this once when loading. | |
108 | */ | |
109 | void SessionMapStore::decode_header( | |
110 | bufferlist &header_bl) | |
111 | { | |
112 | bufferlist::iterator q = header_bl.begin(); | |
113 | DECODE_START(1, q) | |
114 | ::decode(version, q); | |
115 | DECODE_FINISH(q); | |
116 | } | |
117 | ||
118 | void SessionMapStore::encode_header( | |
119 | bufferlist *header_bl) | |
120 | { | |
121 | ENCODE_START(1, 1, *header_bl); | |
122 | ::encode(version, *header_bl); | |
123 | ENCODE_FINISH(*header_bl); | |
124 | } | |
125 | ||
126 | /** | |
127 | * Decode and insert some serialized OMAP values. Call this | |
128 | * repeatedly to insert batched loads. | |
129 | */ | |
130 | void SessionMapStore::decode_values(std::map<std::string, bufferlist> &session_vals) | |
131 | { | |
132 | for (std::map<std::string, bufferlist>::iterator i = session_vals.begin(); | |
133 | i != session_vals.end(); ++i) { | |
134 | ||
135 | entity_inst_t inst; | |
136 | ||
137 | bool parsed = inst.name.parse(i->first); | |
138 | if (!parsed) { | |
139 | derr << "Corrupt entity name '" << i->first << "' in sessionmap" << dendl; | |
140 | throw buffer::malformed_input("Corrupt entity name in sessionmap"); | |
141 | } | |
142 | ||
143 | Session *s = get_or_add_session(inst); | |
144 | if (s->is_closed()) | |
145 | s->set_state(Session::STATE_OPEN); | |
146 | bufferlist::iterator q = i->second.begin(); | |
147 | s->decode(q); | |
148 | } | |
149 | } | |
150 | ||
151 | /** | |
152 | * An OMAP read finished. | |
153 | */ | |
154 | void SessionMap::_load_finish( | |
155 | int operation_r, | |
156 | int header_r, | |
157 | int values_r, | |
158 | bool first, | |
159 | bufferlist &header_bl, | |
160 | std::map<std::string, bufferlist> &session_vals, | |
161 | bool more_session_vals) | |
162 | { | |
163 | if (operation_r < 0) { | |
164 | derr << "_load_finish got " << cpp_strerror(operation_r) << dendl; | |
165 | mds->clog->error() << "error reading sessionmap '" << get_object_name() | |
166 | << "' " << operation_r << " (" | |
167 | << cpp_strerror(operation_r) << ")"; | |
168 | mds->damaged(); | |
169 | ceph_abort(); // Should be unreachable because damaged() calls respawn() | |
170 | } | |
171 | ||
172 | // Decode header | |
173 | if (first) { | |
174 | if (header_r != 0) { | |
175 | derr << __func__ << ": header error: " << cpp_strerror(header_r) << dendl; | |
176 | mds->clog->error() << "error reading sessionmap header " | |
177 | << header_r << " (" << cpp_strerror(header_r) << ")"; | |
178 | mds->damaged(); | |
179 | ceph_abort(); // Should be unreachable because damaged() calls respawn() | |
180 | } | |
181 | ||
182 | if(header_bl.length() == 0) { | |
183 | dout(4) << __func__ << ": header missing, loading legacy..." << dendl; | |
184 | load_legacy(); | |
185 | return; | |
186 | } | |
187 | ||
188 | try { | |
189 | decode_header(header_bl); | |
190 | } catch (buffer::error &e) { | |
191 | mds->clog->error() << "corrupt sessionmap header: " << e.what(); | |
192 | mds->damaged(); | |
193 | ceph_abort(); // Should be unreachable because damaged() calls respawn() | |
194 | } | |
195 | dout(10) << __func__ << " loaded version " << version << dendl; | |
196 | } | |
197 | ||
198 | if (values_r != 0) { | |
199 | derr << __func__ << ": error reading values: " | |
200 | << cpp_strerror(values_r) << dendl; | |
201 | mds->clog->error() << "error reading sessionmap values: " | |
202 | << values_r << " (" << cpp_strerror(values_r) << ")"; | |
203 | mds->damaged(); | |
204 | ceph_abort(); // Should be unreachable because damaged() calls respawn() | |
205 | } | |
206 | ||
207 | // Decode session_vals | |
208 | try { | |
209 | decode_values(session_vals); | |
210 | } catch (buffer::error &e) { | |
211 | mds->clog->error() << "corrupt sessionmap values: " << e.what(); | |
212 | mds->damaged(); | |
213 | ceph_abort(); // Should be unreachable because damaged() calls respawn() | |
214 | } | |
215 | ||
216 | if (more_session_vals) { | |
217 | // Issue another read if we're not at the end of the omap | |
218 | const std::string last_key = session_vals.rbegin()->first; | |
219 | dout(10) << __func__ << ": continue omap load from '" | |
220 | << last_key << "'" << dendl; | |
221 | object_t oid = get_object_name(); | |
222 | object_locator_t oloc(mds->mdsmap->get_metadata_pool()); | |
223 | C_IO_SM_Load *c = new C_IO_SM_Load(this, false); | |
224 | ObjectOperation op; | |
225 | op.omap_get_vals(last_key, "", g_conf->mds_sessionmap_keys_per_op, | |
226 | &c->session_vals, &c->more_session_vals, &c->values_r); | |
227 | mds->objecter->read(oid, oloc, op, CEPH_NOSNAP, NULL, 0, | |
228 | new C_OnFinisher(c, mds->finisher)); | |
229 | } else { | |
230 | // I/O is complete. Update `by_state` | |
231 | dout(10) << __func__ << ": omap load complete" << dendl; | |
232 | for (ceph::unordered_map<entity_name_t, Session*>::iterator i = session_map.begin(); | |
233 | i != session_map.end(); ++i) { | |
234 | Session *s = i->second; | |
235 | auto by_state_entry = by_state.find(s->get_state()); | |
236 | if (by_state_entry == by_state.end()) | |
237 | by_state_entry = by_state.emplace(s->get_state(), | |
238 | new xlist<Session*>).first; | |
239 | by_state_entry->second->push_back(&s->item_session_list); | |
240 | } | |
241 | ||
242 | // Population is complete. Trigger load waiters. | |
243 | dout(10) << __func__ << ": v " << version | |
244 | << ", " << session_map.size() << " sessions" << dendl; | |
245 | projected = committing = committed = version; | |
246 | dump(); | |
247 | finish_contexts(g_ceph_context, waiting_for_load); | |
248 | } | |
249 | } | |
250 | ||
251 | /** | |
252 | * Populate session state from OMAP records in this | |
253 | * rank's sessionmap object. | |
254 | */ | |
255 | void SessionMap::load(MDSInternalContextBase *onload) | |
256 | { | |
257 | dout(10) << "load" << dendl; | |
258 | ||
259 | if (onload) | |
260 | waiting_for_load.push_back(onload); | |
261 | ||
262 | C_IO_SM_Load *c = new C_IO_SM_Load(this, true); | |
263 | object_t oid = get_object_name(); | |
264 | object_locator_t oloc(mds->mdsmap->get_metadata_pool()); | |
265 | ||
266 | ObjectOperation op; | |
267 | op.omap_get_header(&c->header_bl, &c->header_r); | |
268 | op.omap_get_vals("", "", g_conf->mds_sessionmap_keys_per_op, | |
269 | &c->session_vals, &c->more_session_vals, &c->values_r); | |
270 | ||
271 | mds->objecter->read(oid, oloc, op, CEPH_NOSNAP, NULL, 0, new C_OnFinisher(c, mds->finisher)); | |
272 | } | |
273 | ||
274 | namespace { | |
275 | class C_IO_SM_LoadLegacy : public SessionMapIOContext { | |
276 | public: | |
277 | bufferlist bl; | |
278 | explicit C_IO_SM_LoadLegacy(SessionMap *cm) : SessionMapIOContext(cm) {} | |
279 | void finish(int r) override { | |
280 | sessionmap->_load_legacy_finish(r, bl); | |
281 | } | |
282 | }; | |
283 | } | |
284 | ||
285 | ||
286 | /** | |
287 | * Load legacy (object data blob) SessionMap format, assuming | |
288 | * that waiting_for_load has already been populated with | |
289 | * the relevant completion. This is the fallback if we do not | |
290 | * find an OMAP header when attempting to load normally. | |
291 | */ | |
292 | void SessionMap::load_legacy() | |
293 | { | |
294 | dout(10) << __func__ << dendl; | |
295 | ||
296 | C_IO_SM_LoadLegacy *c = new C_IO_SM_LoadLegacy(this); | |
297 | object_t oid = get_object_name(); | |
298 | object_locator_t oloc(mds->mdsmap->get_metadata_pool()); | |
299 | ||
300 | mds->objecter->read_full(oid, oloc, CEPH_NOSNAP, &c->bl, 0, | |
301 | new C_OnFinisher(c, mds->finisher)); | |
302 | } | |
303 | ||
304 | void SessionMap::_load_legacy_finish(int r, bufferlist &bl) | |
305 | { | |
306 | bufferlist::iterator blp = bl.begin(); | |
307 | if (r < 0) { | |
308 | derr << "_load_finish got " << cpp_strerror(r) << dendl; | |
309 | assert(0 == "failed to load sessionmap"); | |
310 | } | |
311 | dump(); | |
312 | decode_legacy(blp); // note: this sets last_cap_renew = now() | |
313 | dout(10) << "_load_finish v " << version | |
314 | << ", " << session_map.size() << " sessions, " | |
315 | << bl.length() << " bytes" | |
316 | << dendl; | |
317 | projected = committing = committed = version; | |
318 | dump(); | |
319 | ||
320 | // Mark all sessions dirty, so that on next save() we will write | |
321 | // a complete OMAP version of the data loaded from the legacy format | |
322 | for (ceph::unordered_map<entity_name_t, Session*>::iterator i = session_map.begin(); | |
323 | i != session_map.end(); ++i) { | |
324 | // Don't use mark_dirty because on this occasion we want to ignore the | |
325 | // keys_per_op limit and do one big write (upgrade must be atomic) | |
326 | dirty_sessions.insert(i->first); | |
327 | } | |
328 | loaded_legacy = true; | |
329 | ||
330 | finish_contexts(g_ceph_context, waiting_for_load); | |
331 | } | |
332 | ||
333 | ||
334 | // ---------------- | |
335 | // SAVE | |
336 | ||
337 | namespace { | |
338 | class C_IO_SM_Save : public SessionMapIOContext { | |
339 | version_t version; | |
340 | public: | |
341 | C_IO_SM_Save(SessionMap *cm, version_t v) : SessionMapIOContext(cm), version(v) {} | |
342 | void finish(int r) override { | |
343 | if (r != 0) { | |
344 | get_mds()->handle_write_error(r); | |
345 | } else { | |
346 | sessionmap->_save_finish(version); | |
347 | } | |
348 | } | |
349 | }; | |
350 | } | |
351 | ||
352 | void SessionMap::save(MDSInternalContextBase *onsave, version_t needv) | |
353 | { | |
354 | dout(10) << __func__ << ": needv " << needv << ", v " << version << dendl; | |
355 | ||
356 | if (needv && committing >= needv) { | |
357 | assert(committing > committed); | |
358 | commit_waiters[committing].push_back(onsave); | |
359 | return; | |
360 | } | |
361 | ||
362 | commit_waiters[version].push_back(onsave); | |
363 | ||
364 | committing = version; | |
365 | SnapContext snapc; | |
366 | object_t oid = get_object_name(); | |
367 | object_locator_t oloc(mds->mdsmap->get_metadata_pool()); | |
368 | ||
369 | ObjectOperation op; | |
370 | ||
371 | /* Compose OSD OMAP transaction for full write */ | |
372 | bufferlist header_bl; | |
373 | encode_header(&header_bl); | |
374 | op.omap_set_header(header_bl); | |
375 | ||
376 | /* If we loaded a legacy sessionmap, then erase the old data. If | |
377 | * an old-versioned MDS tries to read it, it'll fail out safely | |
378 | * with an end_of_buffer exception */ | |
379 | if (loaded_legacy) { | |
380 | dout(4) << __func__ << " erasing legacy sessionmap" << dendl; | |
381 | op.truncate(0); | |
382 | loaded_legacy = false; // only need to truncate once. | |
383 | } | |
384 | ||
385 | dout(20) << " updating keys:" << dendl; | |
386 | map<string, bufferlist> to_set; | |
387 | for(std::set<entity_name_t>::iterator i = dirty_sessions.begin(); | |
388 | i != dirty_sessions.end(); ++i) { | |
389 | const entity_name_t name = *i; | |
390 | Session *session = session_map[name]; | |
391 | ||
392 | if (session->is_open() || | |
393 | session->is_closing() || | |
394 | session->is_stale() || | |
395 | session->is_killing()) { | |
396 | dout(20) << " " << name << dendl; | |
397 | // Serialize K | |
398 | std::ostringstream k; | |
399 | k << name; | |
400 | ||
401 | // Serialize V | |
402 | bufferlist bl; | |
403 | session->info.encode(bl, mds->mdsmap->get_up_features()); | |
404 | ||
405 | // Add to RADOS op | |
406 | to_set[k.str()] = bl; | |
407 | ||
408 | session->clear_dirty_completed_requests(); | |
409 | } else { | |
410 | dout(20) << " " << name << " (ignoring)" << dendl; | |
411 | } | |
412 | } | |
413 | if (!to_set.empty()) { | |
414 | op.omap_set(to_set); | |
415 | } | |
416 | ||
417 | dout(20) << " removing keys:" << dendl; | |
418 | set<string> to_remove; | |
419 | for(std::set<entity_name_t>::const_iterator i = null_sessions.begin(); | |
420 | i != null_sessions.end(); ++i) { | |
421 | dout(20) << " " << *i << dendl; | |
422 | std::ostringstream k; | |
423 | k << *i; | |
424 | to_remove.insert(k.str()); | |
425 | } | |
426 | if (!to_remove.empty()) { | |
427 | op.omap_rm_keys(to_remove); | |
428 | } | |
429 | ||
430 | dirty_sessions.clear(); | |
431 | null_sessions.clear(); | |
432 | ||
433 | mds->objecter->mutate(oid, oloc, op, snapc, | |
434 | ceph::real_clock::now(), | |
435 | 0, | |
436 | new C_OnFinisher(new C_IO_SM_Save(this, version), | |
437 | mds->finisher)); | |
438 | } | |
439 | ||
440 | void SessionMap::_save_finish(version_t v) | |
441 | { | |
442 | dout(10) << "_save_finish v" << v << dendl; | |
443 | committed = v; | |
444 | ||
445 | finish_contexts(g_ceph_context, commit_waiters[v]); | |
446 | commit_waiters.erase(v); | |
447 | } | |
448 | ||
449 | ||
450 | /** | |
451 | * Deserialize sessions, and update by_state index | |
452 | */ | |
453 | void SessionMap::decode_legacy(bufferlist::iterator &p) | |
454 | { | |
455 | // Populate `sessions` | |
456 | SessionMapStore::decode_legacy(p); | |
457 | ||
458 | // Update `by_state` | |
459 | for (ceph::unordered_map<entity_name_t, Session*>::iterator i = session_map.begin(); | |
460 | i != session_map.end(); ++i) { | |
461 | Session *s = i->second; | |
462 | auto by_state_entry = by_state.find(s->get_state()); | |
463 | if (by_state_entry == by_state.end()) | |
464 | by_state_entry = by_state.emplace(s->get_state(), | |
465 | new xlist<Session*>).first; | |
466 | by_state_entry->second->push_back(&s->item_session_list); | |
467 | } | |
468 | } | |
469 | ||
470 | uint64_t SessionMap::set_state(Session *session, int s) { | |
471 | if (session->state != s) { | |
472 | session->set_state(s); | |
473 | auto by_state_entry = by_state.find(s); | |
474 | if (by_state_entry == by_state.end()) | |
475 | by_state_entry = by_state.emplace(s, new xlist<Session*>).first; | |
476 | by_state_entry->second->push_back(&session->item_session_list); | |
477 | } | |
478 | return session->get_state_seq(); | |
479 | } | |
480 | ||
481 | void SessionMapStore::decode_legacy(bufferlist::iterator& p) | |
482 | { | |
483 | utime_t now = ceph_clock_now(); | |
484 | uint64_t pre; | |
485 | ::decode(pre, p); | |
486 | if (pre == (uint64_t)-1) { | |
487 | DECODE_START_LEGACY_COMPAT_LEN(3, 3, 3, p); | |
488 | assert(struct_v >= 2); | |
489 | ||
490 | ::decode(version, p); | |
491 | ||
492 | while (!p.end()) { | |
493 | entity_inst_t inst; | |
494 | ::decode(inst.name, p); | |
495 | Session *s = get_or_add_session(inst); | |
496 | if (s->is_closed()) | |
497 | s->set_state(Session::STATE_OPEN); | |
498 | s->decode(p); | |
499 | } | |
500 | ||
501 | DECODE_FINISH(p); | |
502 | } else { | |
503 | // --- old format ---- | |
504 | version = pre; | |
505 | ||
506 | // this is a meaningless upper bound. can be ignored. | |
507 | __u32 n; | |
508 | ::decode(n, p); | |
509 | ||
510 | while (n-- && !p.end()) { | |
511 | bufferlist::iterator p2 = p; | |
512 | Session *s = new Session; | |
513 | s->info.decode(p); | |
514 | if (session_map.count(s->info.inst.name)) { | |
515 | // eager client connected too fast! aie. | |
516 | dout(10) << " already had session for " << s->info.inst.name << ", recovering" << dendl; | |
517 | entity_name_t n = s->info.inst.name; | |
518 | delete s; | |
519 | s = session_map[n]; | |
520 | p = p2; | |
521 | s->info.decode(p); | |
522 | } else { | |
523 | session_map[s->info.inst.name] = s; | |
524 | } | |
525 | s->set_state(Session::STATE_OPEN); | |
526 | s->last_cap_renew = now; | |
527 | } | |
528 | } | |
529 | } | |
530 | ||
531 | void SessionMapStore::dump(Formatter *f) const | |
532 | { | |
533 | f->open_array_section("Sessions"); | |
534 | for (ceph::unordered_map<entity_name_t,Session*>::const_iterator p = session_map.begin(); | |
535 | p != session_map.end(); | |
536 | ++p) { | |
537 | f->open_object_section("Session"); | |
538 | f->open_object_section("entity name"); | |
539 | p->first.dump(f); | |
540 | f->close_section(); // entity name | |
541 | f->dump_string("state", p->second->get_state_name()); | |
542 | f->open_object_section("Session info"); | |
543 | p->second->info.dump(f); | |
544 | f->close_section(); // Session info | |
545 | f->close_section(); // Session | |
546 | } | |
547 | f->close_section(); // Sessions | |
548 | } | |
549 | ||
550 | void SessionMapStore::generate_test_instances(list<SessionMapStore*>& ls) | |
551 | { | |
552 | // pretty boring for now | |
553 | ls.push_back(new SessionMapStore()); | |
554 | } | |
555 | ||
556 | void SessionMap::wipe() | |
557 | { | |
558 | dout(1) << "wipe start" << dendl; | |
559 | dump(); | |
560 | while (!session_map.empty()) { | |
561 | Session *s = session_map.begin()->second; | |
562 | remove_session(s); | |
563 | } | |
564 | version = ++projected; | |
565 | dout(1) << "wipe result" << dendl; | |
566 | dump(); | |
567 | dout(1) << "wipe done" << dendl; | |
568 | } | |
569 | ||
570 | void SessionMap::wipe_ino_prealloc() | |
571 | { | |
572 | for (ceph::unordered_map<entity_name_t,Session*>::iterator p = session_map.begin(); | |
573 | p != session_map.end(); | |
574 | ++p) { | |
575 | p->second->pending_prealloc_inos.clear(); | |
576 | p->second->info.prealloc_inos.clear(); | |
577 | p->second->info.used_inos.clear(); | |
578 | } | |
579 | projected = ++version; | |
580 | } | |
581 | ||
582 | void SessionMap::add_session(Session *s) | |
583 | { | |
584 | dout(10) << __func__ << " s=" << s << " name=" << s->info.inst.name << dendl; | |
585 | ||
586 | assert(session_map.count(s->info.inst.name) == 0); | |
587 | session_map[s->info.inst.name] = s; | |
588 | auto by_state_entry = by_state.find(s->state); | |
589 | if (by_state_entry == by_state.end()) | |
590 | by_state_entry = by_state.emplace(s->state, new xlist<Session*>).first; | |
591 | by_state_entry->second->push_back(&s->item_session_list); | |
592 | s->get(); | |
593 | ||
594 | logger->set(l_mdssm_session_count, session_map.size()); | |
595 | logger->inc(l_mdssm_session_add); | |
596 | } | |
597 | ||
598 | void SessionMap::remove_session(Session *s) | |
599 | { | |
600 | dout(10) << __func__ << " s=" << s << " name=" << s->info.inst.name << dendl; | |
601 | ||
602 | s->trim_completed_requests(0); | |
603 | s->item_session_list.remove_myself(); | |
604 | session_map.erase(s->info.inst.name); | |
605 | dirty_sessions.erase(s->info.inst.name); | |
606 | null_sessions.insert(s->info.inst.name); | |
607 | s->put(); | |
608 | ||
609 | logger->set(l_mdssm_session_count, session_map.size()); | |
610 | logger->inc(l_mdssm_session_remove); | |
611 | } | |
612 | ||
613 | void SessionMap::touch_session(Session *session) | |
614 | { | |
615 | dout(10) << __func__ << " s=" << session << " name=" << session->info.inst.name << dendl; | |
616 | ||
617 | // Move to the back of the session list for this state (should | |
618 | // already be on a list courtesy of add_session and set_state) | |
619 | assert(session->item_session_list.is_on_list()); | |
620 | auto by_state_entry = by_state.find(session->state); | |
621 | if (by_state_entry == by_state.end()) | |
622 | by_state_entry = by_state.emplace(session->state, | |
623 | new xlist<Session*>).first; | |
624 | by_state_entry->second->push_back(&session->item_session_list); | |
625 | ||
626 | session->last_cap_renew = ceph_clock_now(); | |
627 | } | |
628 | ||
629 | void SessionMap::_mark_dirty(Session *s) | |
630 | { | |
631 | if (dirty_sessions.size() >= g_conf->mds_sessionmap_keys_per_op) { | |
632 | // Pre-empt the usual save() call from journal segment trim, in | |
633 | // order to avoid building up an oversized OMAP update operation | |
634 | // from too many sessions modified at once | |
635 | save(new C_MDSInternalNoop, version); | |
636 | } | |
637 | ||
638 | dirty_sessions.insert(s->info.inst.name); | |
639 | } | |
640 | ||
641 | void SessionMap::mark_dirty(Session *s) | |
642 | { | |
643 | dout(20) << __func__ << " s=" << s << " name=" << s->info.inst.name | |
644 | << " v=" << version << dendl; | |
645 | ||
646 | _mark_dirty(s); | |
647 | version++; | |
648 | s->pop_pv(version); | |
649 | } | |
650 | ||
651 | void SessionMap::replay_dirty_session(Session *s) | |
652 | { | |
653 | dout(20) << __func__ << " s=" << s << " name=" << s->info.inst.name | |
654 | << " v=" << version << dendl; | |
655 | ||
656 | _mark_dirty(s); | |
657 | ||
658 | replay_advance_version(); | |
659 | } | |
660 | ||
661 | void SessionMap::replay_advance_version() | |
662 | { | |
663 | version++; | |
664 | projected = version; | |
665 | } | |
666 | ||
667 | version_t SessionMap::mark_projected(Session *s) | |
668 | { | |
669 | dout(20) << __func__ << " s=" << s << " name=" << s->info.inst.name | |
670 | << " pv=" << projected << " -> " << projected + 1 << dendl; | |
671 | ++projected; | |
672 | s->push_pv(projected); | |
673 | return projected; | |
674 | } | |
675 | ||
676 | namespace { | |
677 | class C_IO_SM_Save_One : public SessionMapIOContext { | |
678 | MDSInternalContextBase *on_safe; | |
679 | public: | |
680 | C_IO_SM_Save_One(SessionMap *cm, MDSInternalContextBase *on_safe_) | |
681 | : SessionMapIOContext(cm), on_safe(on_safe_) {} | |
682 | void finish(int r) override { | |
683 | if (r != 0) { | |
684 | get_mds()->handle_write_error(r); | |
685 | } else { | |
686 | on_safe->complete(r); | |
687 | } | |
688 | } | |
689 | }; | |
690 | } | |
691 | ||
692 | ||
693 | void SessionMap::save_if_dirty(const std::set<entity_name_t> &tgt_sessions, | |
694 | MDSGatherBuilder *gather_bld) | |
695 | { | |
696 | assert(gather_bld != NULL); | |
697 | ||
698 | std::vector<entity_name_t> write_sessions; | |
699 | ||
700 | // Decide which sessions require a write | |
701 | for (std::set<entity_name_t>::iterator i = tgt_sessions.begin(); | |
702 | i != tgt_sessions.end(); ++i) { | |
703 | const entity_name_t &session_id = *i; | |
704 | ||
705 | if (session_map.count(session_id) == 0) { | |
706 | // Session isn't around any more, never mind. | |
707 | continue; | |
708 | } | |
709 | ||
710 | Session *session = session_map[session_id]; | |
711 | if (!session->has_dirty_completed_requests()) { | |
712 | // Session hasn't had completed_requests | |
713 | // modified since last write, no need to | |
714 | // write it now. | |
715 | continue; | |
716 | } | |
717 | ||
718 | if (dirty_sessions.count(session_id) > 0) { | |
719 | // Session is already dirtied, will be written, no | |
720 | // need to pre-empt that. | |
721 | continue; | |
722 | } | |
723 | // Okay, passed all our checks, now we write | |
724 | // this session out. The version we write | |
725 | // into the OMAP may now be higher-versioned | |
726 | // than the version in the header, but that's | |
727 | // okay because it's never a problem to have | |
728 | // an overly-fresh copy of a session. | |
729 | write_sessions.push_back(*i); | |
730 | } | |
731 | ||
732 | dout(4) << __func__ << ": writing " << write_sessions.size() << dendl; | |
733 | ||
734 | // Batch writes into mds_sessionmap_keys_per_op | |
735 | const uint32_t kpo = g_conf->mds_sessionmap_keys_per_op; | |
736 | map<string, bufferlist> to_set; | |
737 | for (uint32_t i = 0; i < write_sessions.size(); ++i) { | |
738 | // Start a new write transaction? | |
739 | if (i % g_conf->mds_sessionmap_keys_per_op == 0) { | |
740 | to_set.clear(); | |
741 | } | |
742 | ||
743 | const entity_name_t &session_id = write_sessions[i]; | |
744 | Session *session = session_map[session_id]; | |
745 | session->clear_dirty_completed_requests(); | |
746 | ||
747 | // Serialize K | |
748 | std::ostringstream k; | |
749 | k << session_id; | |
750 | ||
751 | // Serialize V | |
752 | bufferlist bl; | |
753 | session->info.encode(bl, mds->mdsmap->get_up_features()); | |
754 | ||
755 | // Add to RADOS op | |
756 | to_set[k.str()] = bl; | |
757 | ||
758 | // Complete this write transaction? | |
759 | if (i == write_sessions.size() - 1 | |
760 | || i % kpo == kpo - 1) { | |
761 | ObjectOperation op; | |
762 | op.omap_set(to_set); | |
763 | ||
764 | SnapContext snapc; | |
765 | object_t oid = get_object_name(); | |
766 | object_locator_t oloc(mds->mdsmap->get_metadata_pool()); | |
767 | MDSInternalContextBase *on_safe = gather_bld->new_sub(); | |
768 | mds->objecter->mutate(oid, oloc, op, snapc, | |
769 | ceph::real_clock::now(), | |
770 | 0, new C_OnFinisher( | |
771 | new C_IO_SM_Save_One(this, on_safe), | |
772 | mds->finisher)); | |
773 | } | |
774 | } | |
775 | } | |
776 | ||
777 | // ================= | |
778 | // Session | |
779 | ||
780 | #undef dout_prefix | |
781 | #define dout_prefix *_dout << "Session " | |
782 | ||
783 | /** | |
784 | * Calculate the length of the `requests` member list, | |
785 | * because elist does not have a size() method. | |
786 | * | |
787 | * O(N) runtime. This would be const, but elist doesn't | |
788 | * have const iterators. | |
789 | */ | |
790 | size_t Session::get_request_count() | |
791 | { | |
792 | size_t result = 0; | |
793 | ||
794 | elist<MDRequestImpl*>::iterator p = requests.begin( | |
795 | member_offset(MDRequestImpl, item_session_request)); | |
796 | while (!p.end()) { | |
797 | ++result; | |
798 | ++p; | |
799 | } | |
800 | ||
801 | return result; | |
802 | } | |
803 | ||
804 | /** | |
805 | * Capped in response to a CEPH_MSG_CLIENT_CAPRELEASE message, | |
806 | * with n_caps equal to the number of caps that were released | |
807 | * in the message. Used to update state about how many caps a | |
808 | * client has released since it was last instructed to RECALL_STATE. | |
809 | */ | |
810 | void Session::notify_cap_release(size_t n_caps) | |
811 | { | |
812 | if (!recalled_at.is_zero()) { | |
813 | recall_release_count += n_caps; | |
814 | if (recall_release_count >= recall_count) | |
815 | clear_recalled_at(); | |
816 | } | |
817 | } | |
818 | ||
819 | /** | |
820 | * Called when a CEPH_MSG_CLIENT_SESSION->CEPH_SESSION_RECALL_STATE | |
821 | * message is sent to the client. Update our recall-related state | |
822 | * in order to generate health metrics if the session doesn't see | |
823 | * a commensurate number of calls to ::notify_cap_release | |
824 | */ | |
825 | void Session::notify_recall_sent(int const new_limit) | |
826 | { | |
827 | if (recalled_at.is_zero()) { | |
828 | // Entering recall phase, set up counters so we can later | |
829 | // judge whether the client has respected the recall request | |
830 | recalled_at = last_recall_sent = ceph_clock_now(); | |
831 | assert (new_limit < caps.size()); // Behaviour of Server::recall_client_state | |
832 | recall_count = caps.size() - new_limit; | |
833 | recall_release_count = 0; | |
834 | } else { | |
835 | last_recall_sent = ceph_clock_now(); | |
836 | } | |
837 | } | |
838 | ||
839 | void Session::clear_recalled_at() | |
840 | { | |
841 | recalled_at = last_recall_sent = utime_t(); | |
842 | recall_count = 0; | |
843 | recall_release_count = 0; | |
844 | } | |
845 | ||
846 | void Session::set_client_metadata(map<string, string> const &meta) | |
847 | { | |
848 | info.client_metadata = meta; | |
849 | ||
850 | _update_human_name(); | |
851 | } | |
852 | ||
853 | /** | |
854 | * Use client metadata to generate a somewhat-friendlier | |
855 | * name for the client than its session ID. | |
856 | * | |
857 | * This is *not* guaranteed to be unique, and any machine | |
858 | * consumers of session-related output should always use | |
859 | * the session ID as a primary capacity and use this only | |
860 | * as a presentation hint. | |
861 | */ | |
862 | void Session::_update_human_name() | |
863 | { | |
864 | auto info_client_metadata_entry = info.client_metadata.find("hostname"); | |
865 | if (info_client_metadata_entry != info.client_metadata.end()) { | |
866 | // Happy path, refer to clients by hostname | |
867 | human_name = info_client_metadata_entry->second; | |
868 | if (!info.auth_name.has_default_id()) { | |
869 | // When a non-default entity ID is set by the user, assume they | |
870 | // would like to see it in references to the client, if it's | |
871 | // reasonable short. Limit the length because we don't want | |
872 | // to put e.g. uuid-generated names into a "human readable" | |
873 | // rendering. | |
874 | const int arbitrarily_short = 16; | |
875 | if (info.auth_name.get_id().size() < arbitrarily_short) { | |
876 | human_name += std::string(":") + info.auth_name.get_id(); | |
877 | } | |
878 | } | |
879 | } else { | |
880 | // Fallback, refer to clients by ID e.g. client.4567 | |
881 | human_name = stringify(info.inst.name.num()); | |
882 | } | |
883 | } | |
884 | ||
885 | void Session::decode(bufferlist::iterator &p) | |
886 | { | |
887 | info.decode(p); | |
888 | ||
889 | _update_human_name(); | |
890 | } | |
891 | ||
892 | int Session::check_access(CInode *in, unsigned mask, | |
893 | int caller_uid, int caller_gid, | |
894 | const vector<uint64_t> *caller_gid_list, | |
895 | int new_uid, int new_gid) | |
896 | { | |
897 | string path; | |
898 | CInode *diri = NULL; | |
899 | if (!in->is_base()) | |
900 | diri = in->get_projected_parent_dn()->get_dir()->get_inode(); | |
901 | if (diri && diri->is_stray()){ | |
902 | path = in->get_projected_inode()->stray_prior_path; | |
903 | dout(20) << __func__ << " stray_prior_path " << path << dendl; | |
904 | } else { | |
905 | in->make_path_string(path, true); | |
906 | dout(20) << __func__ << " path " << path << dendl; | |
907 | } | |
908 | if (path.length()) | |
909 | path = path.substr(1); // drop leading / | |
910 | ||
911 | if (in->inode.is_dir() && | |
912 | in->inode.has_layout() && | |
913 | in->inode.layout.pool_ns.length() && | |
914 | !connection->has_feature(CEPH_FEATURE_FS_FILE_LAYOUT_V2)) { | |
915 | dout(10) << __func__ << " client doesn't support FS_FILE_LAYOUT_V2" << dendl; | |
916 | return -EIO; | |
917 | } | |
918 | ||
919 | if (!auth_caps.is_capable(path, in->inode.uid, in->inode.gid, in->inode.mode, | |
920 | caller_uid, caller_gid, caller_gid_list, mask, | |
921 | new_uid, new_gid)) { | |
922 | return -EACCES; | |
923 | } | |
924 | return 0; | |
925 | } | |
926 | ||
927 | int SessionFilter::parse( | |
928 | const std::vector<std::string> &args, | |
929 | std::stringstream *ss) | |
930 | { | |
931 | assert(ss != NULL); | |
932 | ||
933 | for (const auto &s : args) { | |
934 | dout(20) << __func__ << " parsing filter '" << s << "'" << dendl; | |
935 | ||
936 | auto eq = s.find("="); | |
937 | if (eq == std::string::npos || eq == s.size()) { | |
938 | *ss << "Invalid filter '" << s << "'"; | |
939 | return -EINVAL; | |
940 | } | |
941 | ||
942 | // Keys that start with this are to be taken as referring | |
943 | // to freeform client metadata fields. | |
944 | const std::string metadata_prefix("client_metadata."); | |
945 | ||
946 | auto k = s.substr(0, eq); | |
947 | auto v = s.substr(eq + 1); | |
948 | ||
949 | dout(20) << __func__ << " parsed k='" << k << "', v='" << v << "'" << dendl; | |
950 | ||
951 | if (k.compare(0, metadata_prefix.size(), metadata_prefix) == 0 | |
952 | && k.size() > metadata_prefix.size()) { | |
953 | // Filter on arbitrary metadata key (no fixed schema for this, | |
954 | // so anything after the dot is a valid field to filter on) | |
955 | auto metadata_key = k.substr(metadata_prefix.size()); | |
956 | metadata.insert(std::make_pair(metadata_key, v)); | |
957 | } else if (k == "auth_name") { | |
958 | // Filter on client entity name | |
959 | auth_name = v; | |
960 | } else if (k == "state") { | |
961 | state = v; | |
962 | } else if (k == "id") { | |
963 | std::string err; | |
964 | id = strict_strtoll(v.c_str(), 10, &err); | |
965 | if (!err.empty()) { | |
966 | *ss << err; | |
967 | return -EINVAL; | |
968 | } | |
969 | } else if (k == "reconnecting") { | |
970 | ||
971 | /** | |
972 | * Strict boolean parser. Allow true/false/0/1. | |
973 | * Anything else is -EINVAL. | |
974 | */ | |
975 | auto is_true = [](const std::string &bstr, bool *out) -> bool | |
976 | { | |
977 | assert(out != nullptr); | |
978 | ||
979 | if (bstr == "true" || bstr == "1") { | |
980 | *out = true; | |
981 | return 0; | |
982 | } else if (bstr == "false" || bstr == "0") { | |
983 | *out = false; | |
984 | return 0; | |
985 | } else { | |
986 | return -EINVAL; | |
987 | } | |
988 | }; | |
989 | ||
990 | bool bval; | |
991 | int r = is_true(v, &bval); | |
992 | if (r == 0) { | |
993 | set_reconnecting(bval); | |
994 | } else { | |
995 | *ss << "Invalid boolean value '" << v << "'"; | |
996 | return -EINVAL; | |
997 | } | |
998 | } else { | |
999 | *ss << "Invalid filter key '" << k << "'"; | |
1000 | return -EINVAL; | |
1001 | } | |
1002 | } | |
1003 | ||
1004 | return 0; | |
1005 | } | |
1006 | ||
1007 | bool SessionFilter::match( | |
1008 | const Session &session, | |
1009 | std::function<bool(client_t)> is_reconnecting) const | |
1010 | { | |
1011 | for (const auto &m : metadata) { | |
1012 | const auto &k = m.first; | |
1013 | const auto &v = m.second; | |
1014 | if (session.info.client_metadata.count(k) == 0) { | |
1015 | return false; | |
1016 | } | |
1017 | if (session.info.client_metadata.at(k) != v) { | |
1018 | return false; | |
1019 | } | |
1020 | } | |
1021 | ||
1022 | if (!auth_name.empty() && auth_name != session.info.auth_name.get_id()) { | |
1023 | return false; | |
1024 | } | |
1025 | ||
1026 | if (!state.empty() && state != session.get_state_name()) { | |
1027 | return false; | |
1028 | } | |
1029 | ||
1030 | if (id != 0 && id != session.info.inst.name.num()) { | |
1031 | return false; | |
1032 | } | |
1033 | ||
1034 | if (reconnecting.first) { | |
1035 | const bool am_reconnecting = is_reconnecting(session.info.inst.name.num()); | |
1036 | if (reconnecting.second != am_reconnecting) { | |
1037 | return false; | |
1038 | } | |
1039 | } | |
1040 | ||
1041 | return true; | |
1042 | } | |
1043 | ||
1044 | std::ostream& operator<<(std::ostream &out, const Session &s) | |
1045 | { | |
1046 | if (s.get_human_name() == stringify(s.info.inst.name.num())) { | |
1047 | out << s.get_human_name(); | |
1048 | } else { | |
1049 | out << s.get_human_name() << " (" << std::dec << s.info.inst.name.num() << ")"; | |
1050 | } | |
1051 | return out; | |
1052 | } | |
1053 |