]> git.proxmox.com Git - ceph.git/blob - ceph/src/osdc/Objecter.cc
708c12cc9e6ec631ed4e26fdbe7cda06ad33e538
[ceph.git] / ceph / src / osdc / Objecter.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include <algorithm>
16 #include <cerrno>
17
18 #include "Objecter.h"
19 #include "osd/OSDMap.h"
20 #include "osd/error_code.h"
21 #include "Filer.h"
22
23 #include "mon/MonClient.h"
24 #include "mon/error_code.h"
25
26 #include "msg/Messenger.h"
27 #include "msg/Message.h"
28
29 #include "messages/MPing.h"
30 #include "messages/MOSDOp.h"
31 #include "messages/MOSDOpReply.h"
32 #include "messages/MOSDBackoff.h"
33 #include "messages/MOSDMap.h"
34
35 #include "messages/MPoolOp.h"
36 #include "messages/MPoolOpReply.h"
37
38 #include "messages/MGetPoolStats.h"
39 #include "messages/MGetPoolStatsReply.h"
40 #include "messages/MStatfs.h"
41 #include "messages/MStatfsReply.h"
42
43 #include "messages/MMonCommand.h"
44
45 #include "messages/MCommand.h"
46 #include "messages/MCommandReply.h"
47
48 #include "messages/MWatchNotify.h"
49
50
51 #include "common/Cond.h"
52 #include "common/config.h"
53 #include "common/perf_counters.h"
54 #include "common/scrub_types.h"
55 #include "include/str_list.h"
56 #include "common/errno.h"
57 #include "common/EventTrace.h"
58 #include "common/async/waiter.h"
59 #include "error_code.h"
60
61
62 using std::list;
63 using std::make_pair;
64 using std::map;
65 using std::ostream;
66 using std::ostringstream;
67 using std::pair;
68 using std::set;
69 using std::string;
70 using std::stringstream;
71 using std::vector;
72
73 using ceph::decode;
74 using ceph::encode;
75 using ceph::Formatter;
76
77 using std::defer_lock;
78 using std::scoped_lock;
79 using std::shared_lock;
80 using std::unique_lock;
81
82 using ceph::real_time;
83 using ceph::real_clock;
84
85 using ceph::mono_clock;
86 using ceph::mono_time;
87
88 using ceph::timespan;
89
90 using ceph::shunique_lock;
91 using ceph::acquire_shared;
92 using ceph::acquire_unique;
93
94 namespace bc = boost::container;
95 namespace bs = boost::system;
96 namespace ca = ceph::async;
97 namespace cb = ceph::buffer;
98
99 #define dout_subsys ceph_subsys_objecter
100 #undef dout_prefix
101 #define dout_prefix *_dout << messenger->get_myname() << ".objecter "
102
103
104 enum {
105 l_osdc_first = 123200,
106 l_osdc_op_active,
107 l_osdc_op_laggy,
108 l_osdc_op_send,
109 l_osdc_op_send_bytes,
110 l_osdc_op_resend,
111 l_osdc_op_reply,
112 l_osdc_oplen_avg,
113
114 l_osdc_op,
115 l_osdc_op_r,
116 l_osdc_op_w,
117 l_osdc_op_rmw,
118 l_osdc_op_pg,
119
120 l_osdc_osdop_stat,
121 l_osdc_osdop_create,
122 l_osdc_osdop_read,
123 l_osdc_osdop_write,
124 l_osdc_osdop_writefull,
125 l_osdc_osdop_writesame,
126 l_osdc_osdop_append,
127 l_osdc_osdop_zero,
128 l_osdc_osdop_truncate,
129 l_osdc_osdop_delete,
130 l_osdc_osdop_mapext,
131 l_osdc_osdop_sparse_read,
132 l_osdc_osdop_clonerange,
133 l_osdc_osdop_getxattr,
134 l_osdc_osdop_setxattr,
135 l_osdc_osdop_cmpxattr,
136 l_osdc_osdop_rmxattr,
137 l_osdc_osdop_resetxattrs,
138 l_osdc_osdop_call,
139 l_osdc_osdop_watch,
140 l_osdc_osdop_notify,
141 l_osdc_osdop_src_cmpxattr,
142 l_osdc_osdop_pgls,
143 l_osdc_osdop_pgls_filter,
144 l_osdc_osdop_other,
145
146 l_osdc_linger_active,
147 l_osdc_linger_send,
148 l_osdc_linger_resend,
149 l_osdc_linger_ping,
150
151 l_osdc_poolop_active,
152 l_osdc_poolop_send,
153 l_osdc_poolop_resend,
154
155 l_osdc_poolstat_active,
156 l_osdc_poolstat_send,
157 l_osdc_poolstat_resend,
158
159 l_osdc_statfs_active,
160 l_osdc_statfs_send,
161 l_osdc_statfs_resend,
162
163 l_osdc_command_active,
164 l_osdc_command_send,
165 l_osdc_command_resend,
166
167 l_osdc_map_epoch,
168 l_osdc_map_full,
169 l_osdc_map_inc,
170
171 l_osdc_osd_sessions,
172 l_osdc_osd_session_open,
173 l_osdc_osd_session_close,
174 l_osdc_osd_laggy,
175
176 l_osdc_osdop_omap_wr,
177 l_osdc_osdop_omap_rd,
178 l_osdc_osdop_omap_del,
179
180 l_osdc_last,
181 };
182
183 namespace {
184 inline bs::error_code osdcode(int r) {
185 return (r < 0) ? bs::error_code(-r, osd_category()) : bs::error_code();
186 }
187 }
188
189 // config obs ----------------------------
190
191 class Objecter::RequestStateHook : public AdminSocketHook {
192 Objecter *m_objecter;
193 public:
194 explicit RequestStateHook(Objecter *objecter);
195 int call(std::string_view command, const cmdmap_t& cmdmap,
196 Formatter *f,
197 std::ostream& ss,
198 cb::list& out) override;
199 };
200
201 std::unique_lock<std::mutex> Objecter::OSDSession::get_lock(object_t& oid)
202 {
203 if (oid.name.empty())
204 return {};
205
206 static constexpr uint32_t HASH_PRIME = 1021;
207 uint32_t h = ceph_str_hash_linux(oid.name.c_str(), oid.name.size())
208 % HASH_PRIME;
209
210 return {completion_locks[h % num_locks], std::defer_lock};
211 }
212
213 const char** Objecter::get_tracked_conf_keys() const
214 {
215 static const char *config_keys[] = {
216 "crush_location",
217 "rados_mon_op_timeout",
218 "rados_osd_op_timeout",
219 NULL
220 };
221 return config_keys;
222 }
223
224
225 void Objecter::handle_conf_change(const ConfigProxy& conf,
226 const std::set <std::string> &changed)
227 {
228 if (changed.count("crush_location")) {
229 update_crush_location();
230 }
231 if (changed.count("rados_mon_op_timeout")) {
232 mon_timeout = conf.get_val<std::chrono::seconds>("rados_mon_op_timeout");
233 }
234 if (changed.count("rados_osd_op_timeout")) {
235 osd_timeout = conf.get_val<std::chrono::seconds>("rados_osd_op_timeout");
236 }
237 }
238
239 void Objecter::update_crush_location()
240 {
241 unique_lock wl(rwlock);
242 crush_location = cct->crush_location.get_location();
243 }
244
245 // messages ------------------------------
246
247 /*
248 * initialize only internal data structures, don't initiate cluster interaction
249 */
250 void Objecter::init()
251 {
252 ceph_assert(!initialized);
253
254 if (!logger) {
255 PerfCountersBuilder pcb(cct, "objecter", l_osdc_first, l_osdc_last);
256
257 pcb.add_u64(l_osdc_op_active, "op_active", "Operations active", "actv",
258 PerfCountersBuilder::PRIO_CRITICAL);
259 pcb.add_u64(l_osdc_op_laggy, "op_laggy", "Laggy operations");
260 pcb.add_u64_counter(l_osdc_op_send, "op_send", "Sent operations");
261 pcb.add_u64_counter(l_osdc_op_send_bytes, "op_send_bytes", "Sent data", NULL, 0, unit_t(UNIT_BYTES));
262 pcb.add_u64_counter(l_osdc_op_resend, "op_resend", "Resent operations");
263 pcb.add_u64_counter(l_osdc_op_reply, "op_reply", "Operation reply");
264 pcb.add_u64_avg(l_osdc_oplen_avg, "oplen_avg", "Average length of operation vector");
265
266 pcb.add_u64_counter(l_osdc_op, "op", "Operations");
267 pcb.add_u64_counter(l_osdc_op_r, "op_r", "Read operations", "rd",
268 PerfCountersBuilder::PRIO_CRITICAL);
269 pcb.add_u64_counter(l_osdc_op_w, "op_w", "Write operations", "wr",
270 PerfCountersBuilder::PRIO_CRITICAL);
271 pcb.add_u64_counter(l_osdc_op_rmw, "op_rmw", "Read-modify-write operations",
272 "rdwr", PerfCountersBuilder::PRIO_INTERESTING);
273 pcb.add_u64_counter(l_osdc_op_pg, "op_pg", "PG operation");
274
275 pcb.add_u64_counter(l_osdc_osdop_stat, "osdop_stat", "Stat operations");
276 pcb.add_u64_counter(l_osdc_osdop_create, "osdop_create",
277 "Create object operations");
278 pcb.add_u64_counter(l_osdc_osdop_read, "osdop_read", "Read operations");
279 pcb.add_u64_counter(l_osdc_osdop_write, "osdop_write", "Write operations");
280 pcb.add_u64_counter(l_osdc_osdop_writefull, "osdop_writefull",
281 "Write full object operations");
282 pcb.add_u64_counter(l_osdc_osdop_writesame, "osdop_writesame",
283 "Write same operations");
284 pcb.add_u64_counter(l_osdc_osdop_append, "osdop_append",
285 "Append operation");
286 pcb.add_u64_counter(l_osdc_osdop_zero, "osdop_zero",
287 "Set object to zero operations");
288 pcb.add_u64_counter(l_osdc_osdop_truncate, "osdop_truncate",
289 "Truncate object operations");
290 pcb.add_u64_counter(l_osdc_osdop_delete, "osdop_delete",
291 "Delete object operations");
292 pcb.add_u64_counter(l_osdc_osdop_mapext, "osdop_mapext",
293 "Map extent operations");
294 pcb.add_u64_counter(l_osdc_osdop_sparse_read, "osdop_sparse_read",
295 "Sparse read operations");
296 pcb.add_u64_counter(l_osdc_osdop_clonerange, "osdop_clonerange",
297 "Clone range operations");
298 pcb.add_u64_counter(l_osdc_osdop_getxattr, "osdop_getxattr",
299 "Get xattr operations");
300 pcb.add_u64_counter(l_osdc_osdop_setxattr, "osdop_setxattr",
301 "Set xattr operations");
302 pcb.add_u64_counter(l_osdc_osdop_cmpxattr, "osdop_cmpxattr",
303 "Xattr comparison operations");
304 pcb.add_u64_counter(l_osdc_osdop_rmxattr, "osdop_rmxattr",
305 "Remove xattr operations");
306 pcb.add_u64_counter(l_osdc_osdop_resetxattrs, "osdop_resetxattrs",
307 "Reset xattr operations");
308 pcb.add_u64_counter(l_osdc_osdop_call, "osdop_call",
309 "Call (execute) operations");
310 pcb.add_u64_counter(l_osdc_osdop_watch, "osdop_watch",
311 "Watch by object operations");
312 pcb.add_u64_counter(l_osdc_osdop_notify, "osdop_notify",
313 "Notify about object operations");
314 pcb.add_u64_counter(l_osdc_osdop_src_cmpxattr, "osdop_src_cmpxattr",
315 "Extended attribute comparison in multi operations");
316 pcb.add_u64_counter(l_osdc_osdop_pgls, "osdop_pgls");
317 pcb.add_u64_counter(l_osdc_osdop_pgls_filter, "osdop_pgls_filter");
318 pcb.add_u64_counter(l_osdc_osdop_other, "osdop_other", "Other operations");
319
320 pcb.add_u64(l_osdc_linger_active, "linger_active",
321 "Active lingering operations");
322 pcb.add_u64_counter(l_osdc_linger_send, "linger_send",
323 "Sent lingering operations");
324 pcb.add_u64_counter(l_osdc_linger_resend, "linger_resend",
325 "Resent lingering operations");
326 pcb.add_u64_counter(l_osdc_linger_ping, "linger_ping",
327 "Sent pings to lingering operations");
328
329 pcb.add_u64(l_osdc_poolop_active, "poolop_active",
330 "Active pool operations");
331 pcb.add_u64_counter(l_osdc_poolop_send, "poolop_send",
332 "Sent pool operations");
333 pcb.add_u64_counter(l_osdc_poolop_resend, "poolop_resend",
334 "Resent pool operations");
335
336 pcb.add_u64(l_osdc_poolstat_active, "poolstat_active",
337 "Active get pool stat operations");
338 pcb.add_u64_counter(l_osdc_poolstat_send, "poolstat_send",
339 "Pool stat operations sent");
340 pcb.add_u64_counter(l_osdc_poolstat_resend, "poolstat_resend",
341 "Resent pool stats");
342
343 pcb.add_u64(l_osdc_statfs_active, "statfs_active", "Statfs operations");
344 pcb.add_u64_counter(l_osdc_statfs_send, "statfs_send", "Sent FS stats");
345 pcb.add_u64_counter(l_osdc_statfs_resend, "statfs_resend",
346 "Resent FS stats");
347
348 pcb.add_u64(l_osdc_command_active, "command_active", "Active commands");
349 pcb.add_u64_counter(l_osdc_command_send, "command_send",
350 "Sent commands");
351 pcb.add_u64_counter(l_osdc_command_resend, "command_resend",
352 "Resent commands");
353
354 pcb.add_u64(l_osdc_map_epoch, "map_epoch", "OSD map epoch");
355 pcb.add_u64_counter(l_osdc_map_full, "map_full",
356 "Full OSD maps received");
357 pcb.add_u64_counter(l_osdc_map_inc, "map_inc",
358 "Incremental OSD maps received");
359
360 pcb.add_u64(l_osdc_osd_sessions, "osd_sessions",
361 "Open sessions"); // open sessions
362 pcb.add_u64_counter(l_osdc_osd_session_open, "osd_session_open",
363 "Sessions opened");
364 pcb.add_u64_counter(l_osdc_osd_session_close, "osd_session_close",
365 "Sessions closed");
366 pcb.add_u64(l_osdc_osd_laggy, "osd_laggy", "Laggy OSD sessions");
367
368 pcb.add_u64_counter(l_osdc_osdop_omap_wr, "omap_wr",
369 "OSD OMAP write operations");
370 pcb.add_u64_counter(l_osdc_osdop_omap_rd, "omap_rd",
371 "OSD OMAP read operations");
372 pcb.add_u64_counter(l_osdc_osdop_omap_del, "omap_del",
373 "OSD OMAP delete operations");
374
375 logger = pcb.create_perf_counters();
376 cct->get_perfcounters_collection()->add(logger);
377 }
378
379 m_request_state_hook = new RequestStateHook(this);
380 auto admin_socket = cct->get_admin_socket();
381 int ret = admin_socket->register_command("objecter_requests",
382 m_request_state_hook,
383 "show in-progress osd requests");
384
385 /* Don't warn on EEXIST, happens if multiple ceph clients
386 * are instantiated from one process */
387 if (ret < 0 && ret != -EEXIST) {
388 lderr(cct) << "error registering admin socket command: "
389 << cpp_strerror(ret) << dendl;
390 }
391
392 update_crush_location();
393
394 cct->_conf.add_observer(this);
395
396 initialized = true;
397 }
398
399 /*
400 * ok, cluster interaction can happen
401 */
402 void Objecter::start(const OSDMap* o)
403 {
404 shared_lock rl(rwlock);
405
406 start_tick();
407 if (o) {
408 osdmap->deepish_copy_from(*o);
409 prune_pg_mapping(osdmap->get_pools());
410 } else if (osdmap->get_epoch() == 0) {
411 _maybe_request_map();
412 }
413 }
414
415 void Objecter::shutdown()
416 {
417 ceph_assert(initialized);
418
419 unique_lock wl(rwlock);
420
421 initialized = false;
422
423 wl.unlock();
424 cct->_conf.remove_observer(this);
425 wl.lock();
426
427 while (!osd_sessions.empty()) {
428 auto p = osd_sessions.begin();
429 close_session(p->second);
430 }
431
432 while(!check_latest_map_lingers.empty()) {
433 auto i = check_latest_map_lingers.begin();
434 i->second->put();
435 check_latest_map_lingers.erase(i->first);
436 }
437
438 while(!check_latest_map_ops.empty()) {
439 auto i = check_latest_map_ops.begin();
440 i->second->put();
441 check_latest_map_ops.erase(i->first);
442 }
443
444 while(!check_latest_map_commands.empty()) {
445 auto i = check_latest_map_commands.begin();
446 i->second->put();
447 check_latest_map_commands.erase(i->first);
448 }
449
450 while(!poolstat_ops.empty()) {
451 auto i = poolstat_ops.begin();
452 delete i->second;
453 poolstat_ops.erase(i->first);
454 }
455
456 while(!statfs_ops.empty()) {
457 auto i = statfs_ops.begin();
458 delete i->second;
459 statfs_ops.erase(i->first);
460 }
461
462 while(!pool_ops.empty()) {
463 auto i = pool_ops.begin();
464 delete i->second;
465 pool_ops.erase(i->first);
466 }
467
468 ldout(cct, 20) << __func__ << " clearing up homeless session..." << dendl;
469 while(!homeless_session->linger_ops.empty()) {
470 auto i = homeless_session->linger_ops.begin();
471 ldout(cct, 10) << " linger_op " << i->first << dendl;
472 LingerOp *lop = i->second;
473 {
474 std::unique_lock swl(homeless_session->lock);
475 _session_linger_op_remove(homeless_session, lop);
476 }
477 linger_ops.erase(lop->linger_id);
478 linger_ops_set.erase(lop);
479 lop->put();
480 }
481
482 while(!homeless_session->ops.empty()) {
483 auto i = homeless_session->ops.begin();
484 ldout(cct, 10) << " op " << i->first << dendl;
485 auto op = i->second;
486 {
487 std::unique_lock swl(homeless_session->lock);
488 _session_op_remove(homeless_session, op);
489 }
490 op->put();
491 }
492
493 while(!homeless_session->command_ops.empty()) {
494 auto i = homeless_session->command_ops.begin();
495 ldout(cct, 10) << " command_op " << i->first << dendl;
496 auto cop = i->second;
497 {
498 std::unique_lock swl(homeless_session->lock);
499 _session_command_op_remove(homeless_session, cop);
500 }
501 cop->put();
502 }
503
504 if (tick_event) {
505 if (timer.cancel_event(tick_event)) {
506 ldout(cct, 10) << " successfully canceled tick" << dendl;
507 }
508 tick_event = 0;
509 }
510
511 if (logger) {
512 cct->get_perfcounters_collection()->remove(logger);
513 delete logger;
514 logger = NULL;
515 }
516
517 // Let go of Objecter write lock so timer thread can shutdown
518 wl.unlock();
519
520 // Outside of lock to avoid cycle WRT calls to RequestStateHook
521 // This is safe because we guarantee no concurrent calls to
522 // shutdown() with the ::initialized check at start.
523 if (m_request_state_hook) {
524 auto admin_socket = cct->get_admin_socket();
525 admin_socket->unregister_commands(m_request_state_hook);
526 delete m_request_state_hook;
527 m_request_state_hook = NULL;
528 }
529 }
530
531 void Objecter::_send_linger(LingerOp *info,
532 ceph::shunique_lock<ceph::shared_mutex>& sul)
533 {
534 ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
535
536 fu2::unique_function<Op::OpSig> oncommit;
537 osdc_opvec opv;
538 std::shared_lock watchl(info->watch_lock);
539 cb::list *poutbl = nullptr;
540 if (info->registered && info->is_watch) {
541 ldout(cct, 15) << "send_linger " << info->linger_id << " reconnect"
542 << dendl;
543 opv.push_back(OSDOp());
544 opv.back().op.op = CEPH_OSD_OP_WATCH;
545 opv.back().op.watch.cookie = info->get_cookie();
546 opv.back().op.watch.op = CEPH_OSD_WATCH_OP_RECONNECT;
547 opv.back().op.watch.gen = ++info->register_gen;
548 oncommit = CB_Linger_Reconnect(this, info);
549 } else {
550 ldout(cct, 15) << "send_linger " << info->linger_id << " register"
551 << dendl;
552 opv = info->ops;
553 // TODO Augment ca::Completion with an equivalent of
554 // target so we can handle these cases better.
555 auto c = std::make_unique<CB_Linger_Commit>(this, info);
556 if (!info->is_watch) {
557 info->notify_id = 0;
558 poutbl = &c->outbl;
559 }
560 oncommit = [c = std::move(c)](bs::error_code ec) mutable {
561 std::move(*c)(ec);
562 };
563 }
564 watchl.unlock();
565 auto o = new Op(info->target.base_oid, info->target.base_oloc,
566 std::move(opv), info->target.flags | CEPH_OSD_FLAG_READ,
567 std::move(oncommit), info->pobjver);
568 o->outbl = poutbl;
569 o->snapid = info->snap;
570 o->snapc = info->snapc;
571 o->mtime = info->mtime;
572
573 o->target = info->target;
574 o->tid = ++last_tid;
575
576 // do not resend this; we will send a new op to reregister
577 o->should_resend = false;
578 o->ctx_budgeted = true;
579
580 if (info->register_tid) {
581 // repeat send. cancel old registration op, if any.
582 std::unique_lock sl(info->session->lock);
583 if (info->session->ops.count(info->register_tid)) {
584 auto o = info->session->ops[info->register_tid];
585 _op_cancel_map_check(o);
586 _cancel_linger_op(o);
587 }
588 sl.unlock();
589 }
590
591 _op_submit_with_budget(o, sul, &info->register_tid, &info->ctx_budget);
592
593 logger->inc(l_osdc_linger_send);
594 }
595
596 void Objecter::_linger_commit(LingerOp *info, bs::error_code ec,
597 cb::list& outbl)
598 {
599 std::unique_lock wl(info->watch_lock);
600 ldout(cct, 10) << "_linger_commit " << info->linger_id << dendl;
601 if (info->on_reg_commit) {
602 info->on_reg_commit->defer(std::move(info->on_reg_commit),
603 ec, cb::list{});
604 info->on_reg_commit.reset();
605 }
606 if (ec && info->on_notify_finish) {
607 info->on_notify_finish->defer(std::move(info->on_notify_finish),
608 ec, cb::list{});
609 info->on_notify_finish.reset();
610 }
611
612 // only tell the user the first time we do this
613 info->registered = true;
614 info->pobjver = NULL;
615
616 if (!info->is_watch) {
617 // make note of the notify_id
618 auto p = outbl.cbegin();
619 try {
620 decode(info->notify_id, p);
621 ldout(cct, 10) << "_linger_commit notify_id=" << info->notify_id
622 << dendl;
623 }
624 catch (cb::error& e) {
625 }
626 }
627 }
628
629 class CB_DoWatchError {
630 Objecter *objecter;
631 boost::intrusive_ptr<Objecter::LingerOp> info;
632 bs::error_code ec;
633 public:
634 CB_DoWatchError(Objecter *o, Objecter::LingerOp *i,
635 bs::error_code ec)
636 : objecter(o), info(i), ec(ec) {
637 info->_queued_async();
638 }
639 void operator()() {
640 std::unique_lock wl(objecter->rwlock);
641 bool canceled = info->canceled;
642 wl.unlock();
643
644 if (!canceled) {
645 info->handle(ec, 0, info->get_cookie(), 0, {});
646 }
647
648 info->finished_async();
649 }
650 };
651
652 bs::error_code Objecter::_normalize_watch_error(bs::error_code ec)
653 {
654 // translate ENOENT -> ENOTCONN so that a delete->disconnection
655 // notification and a failure to reconnect because we raced with
656 // the delete appear the same to the user.
657 if (ec == bs::errc::no_such_file_or_directory)
658 ec = bs::error_code(ENOTCONN, osd_category());
659 return ec;
660 }
661
662 void Objecter::_linger_reconnect(LingerOp *info, bs::error_code ec)
663 {
664 ldout(cct, 10) << __func__ << " " << info->linger_id << " = " << ec
665 << " (last_error " << info->last_error << ")" << dendl;
666 std::unique_lock wl(info->watch_lock);
667 if (ec) {
668 if (!info->last_error) {
669 ec = _normalize_watch_error(ec);
670 if (info->handle) {
671 boost::asio::defer(finish_strand, CB_DoWatchError(this, info, ec));
672 }
673 }
674 }
675
676 info->last_error = ec;
677 }
678
679 void Objecter::_send_linger_ping(LingerOp *info)
680 {
681 // rwlock is locked unique
682 // info->session->lock is locked
683
684 if (cct->_conf->objecter_inject_no_watch_ping) {
685 ldout(cct, 10) << __func__ << " " << info->linger_id << " SKIPPING"
686 << dendl;
687 return;
688 }
689 if (osdmap->test_flag(CEPH_OSDMAP_PAUSERD)) {
690 ldout(cct, 10) << __func__ << " PAUSERD" << dendl;
691 return;
692 }
693
694 ceph::coarse_mono_time now = ceph::coarse_mono_clock::now();
695 ldout(cct, 10) << __func__ << " " << info->linger_id << " now " << now
696 << dendl;
697
698 osdc_opvec opv(1);
699 opv[0].op.op = CEPH_OSD_OP_WATCH;
700 opv[0].op.watch.cookie = info->get_cookie();
701 opv[0].op.watch.op = CEPH_OSD_WATCH_OP_PING;
702 opv[0].op.watch.gen = info->register_gen;
703
704 Op *o = new Op(info->target.base_oid, info->target.base_oloc,
705 std::move(opv), info->target.flags | CEPH_OSD_FLAG_READ,
706 CB_Linger_Ping(this, info, now),
707 nullptr, nullptr);
708 o->target = info->target;
709 o->should_resend = false;
710 _send_op_account(o);
711 o->tid = ++last_tid;
712 _session_op_assign(info->session, o);
713 _send_op(o);
714 info->ping_tid = o->tid;
715
716 logger->inc(l_osdc_linger_ping);
717 }
718
719 void Objecter::_linger_ping(LingerOp *info, bs::error_code ec, ceph::coarse_mono_time sent,
720 uint32_t register_gen)
721 {
722 std::unique_lock l(info->watch_lock);
723 ldout(cct, 10) << __func__ << " " << info->linger_id
724 << " sent " << sent << " gen " << register_gen << " = " << ec
725 << " (last_error " << info->last_error
726 << " register_gen " << info->register_gen << ")" << dendl;
727 if (info->register_gen == register_gen) {
728 if (!ec) {
729 info->watch_valid_thru = sent;
730 } else if (ec && !info->last_error) {
731 ec = _normalize_watch_error(ec);
732 info->last_error = ec;
733 if (info->handle) {
734 boost::asio::defer(finish_strand, CB_DoWatchError(this, info, ec));
735 }
736 }
737 } else {
738 ldout(cct, 20) << " ignoring old gen" << dendl;
739 }
740 }
741
742 tl::expected<ceph::timespan,
743 bs::error_code> Objecter::linger_check(LingerOp *info)
744 {
745 std::shared_lock l(info->watch_lock);
746
747 ceph::coarse_mono_time stamp = info->watch_valid_thru;
748 if (!info->watch_pending_async.empty())
749 stamp = std::min(info->watch_valid_thru, info->watch_pending_async.front());
750 auto age = ceph::coarse_mono_clock::now() - stamp;
751
752 ldout(cct, 10) << __func__ << " " << info->linger_id
753 << " err " << info->last_error
754 << " age " << age << dendl;
755 if (info->last_error)
756 return tl::unexpected(info->last_error);
757 // return a safe upper bound (we are truncating to ms)
758 return age;
759 }
760
761 void Objecter::linger_cancel(LingerOp *info)
762 {
763 unique_lock wl(rwlock);
764 _linger_cancel(info);
765 info->put();
766 }
767
768 void Objecter::_linger_cancel(LingerOp *info)
769 {
770 // rwlock is locked unique
771 ldout(cct, 20) << __func__ << " linger_id=" << info->linger_id << dendl;
772 if (!info->canceled) {
773 OSDSession *s = info->session;
774 std::unique_lock sl(s->lock);
775 _session_linger_op_remove(s, info);
776 sl.unlock();
777
778 linger_ops.erase(info->linger_id);
779 linger_ops_set.erase(info);
780 ceph_assert(linger_ops.size() == linger_ops_set.size());
781
782 info->canceled = true;
783 info->put();
784
785 logger->dec(l_osdc_linger_active);
786 }
787 }
788
789
790
791 Objecter::LingerOp *Objecter::linger_register(const object_t& oid,
792 const object_locator_t& oloc,
793 int flags)
794 {
795 unique_lock l(rwlock);
796 // Acquire linger ID
797 auto info = new LingerOp(this, ++max_linger_id);
798 info->target.base_oid = oid;
799 info->target.base_oloc = oloc;
800 if (info->target.base_oloc.key == oid)
801 info->target.base_oloc.key.clear();
802 info->target.flags = flags;
803 info->watch_valid_thru = ceph::coarse_mono_clock::now();
804 ldout(cct, 10) << __func__ << " info " << info
805 << " linger_id " << info->linger_id
806 << " cookie " << info->get_cookie()
807 << dendl;
808 linger_ops[info->linger_id] = info;
809 linger_ops_set.insert(info);
810 ceph_assert(linger_ops.size() == linger_ops_set.size());
811
812 info->get(); // for the caller
813 return info;
814 }
815
816 ceph_tid_t Objecter::linger_watch(LingerOp *info,
817 ObjectOperation& op,
818 const SnapContext& snapc,
819 real_time mtime,
820 cb::list& inbl,
821 decltype(info->on_reg_commit)&& oncommit,
822 version_t *objver)
823 {
824 info->is_watch = true;
825 info->snapc = snapc;
826 info->mtime = mtime;
827 info->target.flags |= CEPH_OSD_FLAG_WRITE;
828 info->ops = op.ops;
829 info->inbl = inbl;
830 info->pobjver = objver;
831 info->on_reg_commit = std::move(oncommit);
832
833 info->ctx_budget = take_linger_budget(info);
834
835 shunique_lock sul(rwlock, ceph::acquire_unique);
836 _linger_submit(info, sul);
837 logger->inc(l_osdc_linger_active);
838
839 op.clear();
840 return info->linger_id;
841 }
842
843 ceph_tid_t Objecter::linger_notify(LingerOp *info,
844 ObjectOperation& op,
845 snapid_t snap, cb::list& inbl,
846 decltype(LingerOp::on_reg_commit)&& onfinish,
847 version_t *objver)
848 {
849 info->snap = snap;
850 info->target.flags |= CEPH_OSD_FLAG_READ;
851 info->ops = op.ops;
852 info->inbl = inbl;
853 info->pobjver = objver;
854 info->on_reg_commit = std::move(onfinish);
855 info->ctx_budget = take_linger_budget(info);
856
857 shunique_lock sul(rwlock, ceph::acquire_unique);
858 _linger_submit(info, sul);
859 logger->inc(l_osdc_linger_active);
860
861 op.clear();
862 return info->linger_id;
863 }
864
865 void Objecter::_linger_submit(LingerOp *info,
866 ceph::shunique_lock<ceph::shared_mutex>& sul)
867 {
868 ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
869 ceph_assert(info->linger_id);
870 ceph_assert(info->ctx_budget != -1); // caller needs to have taken budget already!
871
872 // Populate Op::target
873 OSDSession *s = NULL;
874 int r = _calc_target(&info->target, nullptr);
875 switch (r) {
876 case RECALC_OP_TARGET_POOL_EIO:
877 _check_linger_pool_eio(info);
878 return;
879 }
880
881 // Create LingerOp<->OSDSession relation
882 r = _get_session(info->target.osd, &s, sul);
883 ceph_assert(r == 0);
884 unique_lock sl(s->lock);
885 _session_linger_op_assign(s, info);
886 sl.unlock();
887 put_session(s);
888
889 _send_linger(info, sul);
890 }
891
892 struct CB_DoWatchNotify {
893 Objecter *objecter;
894 boost::intrusive_ptr<Objecter::LingerOp> info;
895 boost::intrusive_ptr<MWatchNotify> msg;
896 CB_DoWatchNotify(Objecter *o, Objecter::LingerOp *i, MWatchNotify *m)
897 : objecter(o), info(i), msg(m) {
898 info->_queued_async();
899 }
900 void operator()() {
901 objecter->_do_watch_notify(std::move(info), std::move(msg));
902 }
903 };
904
905 void Objecter::handle_watch_notify(MWatchNotify *m)
906 {
907 shared_lock l(rwlock);
908 if (!initialized) {
909 return;
910 }
911
912 LingerOp *info = reinterpret_cast<LingerOp*>(m->cookie);
913 if (linger_ops_set.count(info) == 0) {
914 ldout(cct, 7) << __func__ << " cookie " << m->cookie << " dne" << dendl;
915 return;
916 }
917 std::unique_lock wl(info->watch_lock);
918 if (m->opcode == CEPH_WATCH_EVENT_DISCONNECT) {
919 if (!info->last_error) {
920 info->last_error = bs::error_code(ENOTCONN, osd_category());
921 if (info->handle) {
922 boost::asio::defer(finish_strand, CB_DoWatchError(this, info,
923 info->last_error));
924 }
925 }
926 } else if (!info->is_watch) {
927 // we have CEPH_WATCH_EVENT_NOTIFY_COMPLETE; we can do this inline
928 // since we know the only user (librados) is safe to call in
929 // fast-dispatch context
930 if (info->notify_id &&
931 info->notify_id != m->notify_id) {
932 ldout(cct, 10) << __func__ << " reply notify " << m->notify_id
933 << " != " << info->notify_id << ", ignoring" << dendl;
934 } else if (info->on_notify_finish) {
935 info->on_notify_finish->defer(
936 std::move(info->on_notify_finish),
937 osdcode(m->return_code), std::move(m->get_data()));
938
939 // if we race with reconnect we might get a second notify; only
940 // notify the caller once!
941 info->on_notify_finish = nullptr;
942 }
943 } else {
944 boost::asio::defer(finish_strand, CB_DoWatchNotify(this, info, m));
945 }
946 }
947
948 void Objecter::_do_watch_notify(boost::intrusive_ptr<LingerOp> info,
949 boost::intrusive_ptr<MWatchNotify> m)
950 {
951 ldout(cct, 10) << __func__ << " " << *m << dendl;
952
953 shared_lock l(rwlock);
954 ceph_assert(initialized);
955
956 if (info->canceled) {
957 l.unlock();
958 goto out;
959 }
960
961 // notify completion?
962 ceph_assert(info->is_watch);
963 ceph_assert(info->handle);
964 ceph_assert(m->opcode != CEPH_WATCH_EVENT_DISCONNECT);
965
966 l.unlock();
967
968 switch (m->opcode) {
969 case CEPH_WATCH_EVENT_NOTIFY:
970 info->handle({}, m->notify_id, m->cookie, m->notifier_gid, std::move(m->bl));
971 break;
972 }
973
974 out:
975 info->finished_async();
976 }
977
978 bool Objecter::ms_dispatch(Message *m)
979 {
980 ldout(cct, 10) << __func__ << " " << cct << " " << *m << dendl;
981 switch (m->get_type()) {
982 // these we exlusively handle
983 case CEPH_MSG_OSD_OPREPLY:
984 handle_osd_op_reply(static_cast<MOSDOpReply*>(m));
985 return true;
986
987 case CEPH_MSG_OSD_BACKOFF:
988 handle_osd_backoff(static_cast<MOSDBackoff*>(m));
989 return true;
990
991 case CEPH_MSG_WATCH_NOTIFY:
992 handle_watch_notify(static_cast<MWatchNotify*>(m));
993 m->put();
994 return true;
995
996 case MSG_COMMAND_REPLY:
997 if (m->get_source().type() == CEPH_ENTITY_TYPE_OSD) {
998 handle_command_reply(static_cast<MCommandReply*>(m));
999 return true;
1000 } else {
1001 return false;
1002 }
1003
1004 case MSG_GETPOOLSTATSREPLY:
1005 handle_get_pool_stats_reply(static_cast<MGetPoolStatsReply*>(m));
1006 return true;
1007
1008 case CEPH_MSG_POOLOP_REPLY:
1009 handle_pool_op_reply(static_cast<MPoolOpReply*>(m));
1010 return true;
1011
1012 case CEPH_MSG_STATFS_REPLY:
1013 handle_fs_stats_reply(static_cast<MStatfsReply*>(m));
1014 return true;
1015
1016 // these we give others a chance to inspect
1017
1018 // MDS, OSD
1019 case CEPH_MSG_OSD_MAP:
1020 handle_osd_map(static_cast<MOSDMap*>(m));
1021 return false;
1022 }
1023 return false;
1024 }
1025
1026 void Objecter::_scan_requests(
1027 OSDSession *s,
1028 bool skipped_map,
1029 bool cluster_full,
1030 map<int64_t, bool> *pool_full_map,
1031 map<ceph_tid_t, Op*>& need_resend,
1032 list<LingerOp*>& need_resend_linger,
1033 map<ceph_tid_t, CommandOp*>& need_resend_command,
1034 ceph::shunique_lock<ceph::shared_mutex>& sul)
1035 {
1036 ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
1037
1038 list<LingerOp*> unregister_lingers;
1039
1040 std::unique_lock sl(s->lock);
1041
1042 // check for changed linger mappings (_before_ regular ops)
1043 auto lp = s->linger_ops.begin();
1044 while (lp != s->linger_ops.end()) {
1045 auto op = lp->second;
1046 ceph_assert(op->session == s);
1047 // check_linger_pool_dne() may touch linger_ops; prevent iterator
1048 // invalidation
1049 ++lp;
1050 ldout(cct, 10) << " checking linger op " << op->linger_id << dendl;
1051 bool unregister, force_resend_writes = cluster_full;
1052 int r = _recalc_linger_op_target(op, sul);
1053 if (pool_full_map)
1054 force_resend_writes = force_resend_writes ||
1055 (*pool_full_map)[op->target.base_oloc.pool];
1056 switch (r) {
1057 case RECALC_OP_TARGET_NO_ACTION:
1058 if (!skipped_map && !force_resend_writes)
1059 break;
1060 // -- fall-thru --
1061 case RECALC_OP_TARGET_NEED_RESEND:
1062 need_resend_linger.push_back(op);
1063 _linger_cancel_map_check(op);
1064 break;
1065 case RECALC_OP_TARGET_POOL_DNE:
1066 _check_linger_pool_dne(op, &unregister);
1067 if (unregister) {
1068 ldout(cct, 10) << " need to unregister linger op "
1069 << op->linger_id << dendl;
1070 op->get();
1071 unregister_lingers.push_back(op);
1072 }
1073 break;
1074 case RECALC_OP_TARGET_POOL_EIO:
1075 _check_linger_pool_eio(op);
1076 ldout(cct, 10) << " need to unregister linger op "
1077 << op->linger_id << dendl;
1078 op->get();
1079 unregister_lingers.push_back(op);
1080 break;
1081 }
1082 }
1083
1084 // check for changed request mappings
1085 auto p = s->ops.begin();
1086 while (p != s->ops.end()) {
1087 Op *op = p->second;
1088 ++p; // check_op_pool_dne() may touch ops; prevent iterator invalidation
1089 ldout(cct, 10) << " checking op " << op->tid << dendl;
1090 _prune_snapc(osdmap->get_new_removed_snaps(), op);
1091 bool force_resend_writes = cluster_full;
1092 if (pool_full_map)
1093 force_resend_writes = force_resend_writes ||
1094 (*pool_full_map)[op->target.base_oloc.pool];
1095 int r = _calc_target(&op->target,
1096 op->session ? op->session->con.get() : nullptr);
1097 switch (r) {
1098 case RECALC_OP_TARGET_NO_ACTION:
1099 if (!skipped_map && !(force_resend_writes && op->target.respects_full()))
1100 break;
1101 // -- fall-thru --
1102 case RECALC_OP_TARGET_NEED_RESEND:
1103 _session_op_remove(op->session, op);
1104 need_resend[op->tid] = op;
1105 _op_cancel_map_check(op);
1106 break;
1107 case RECALC_OP_TARGET_POOL_DNE:
1108 _check_op_pool_dne(op, &sl);
1109 break;
1110 case RECALC_OP_TARGET_POOL_EIO:
1111 _check_op_pool_eio(op, &sl);
1112 break;
1113 }
1114 }
1115
1116 // commands
1117 auto cp = s->command_ops.begin();
1118 while (cp != s->command_ops.end()) {
1119 auto c = cp->second;
1120 ++cp;
1121 ldout(cct, 10) << " checking command " << c->tid << dendl;
1122 bool force_resend_writes = cluster_full;
1123 if (pool_full_map)
1124 force_resend_writes = force_resend_writes ||
1125 (*pool_full_map)[c->target_pg.pool()];
1126 int r = _calc_command_target(c, sul);
1127 switch (r) {
1128 case RECALC_OP_TARGET_NO_ACTION:
1129 // resend if skipped map; otherwise do nothing.
1130 if (!skipped_map && !force_resend_writes)
1131 break;
1132 // -- fall-thru --
1133 case RECALC_OP_TARGET_NEED_RESEND:
1134 need_resend_command[c->tid] = c;
1135 _session_command_op_remove(c->session, c);
1136 _command_cancel_map_check(c);
1137 break;
1138 case RECALC_OP_TARGET_POOL_DNE:
1139 case RECALC_OP_TARGET_OSD_DNE:
1140 case RECALC_OP_TARGET_OSD_DOWN:
1141 _check_command_map_dne(c);
1142 break;
1143 }
1144 }
1145
1146 sl.unlock();
1147
1148 for (auto iter = unregister_lingers.begin();
1149 iter != unregister_lingers.end();
1150 ++iter) {
1151 _linger_cancel(*iter);
1152 (*iter)->put();
1153 }
1154 }
1155
1156 void Objecter::handle_osd_map(MOSDMap *m)
1157 {
1158 ceph::shunique_lock sul(rwlock, acquire_unique);
1159 if (!initialized)
1160 return;
1161
1162 ceph_assert(osdmap);
1163
1164 if (m->fsid != monc->get_fsid()) {
1165 ldout(cct, 0) << "handle_osd_map fsid " << m->fsid
1166 << " != " << monc->get_fsid() << dendl;
1167 return;
1168 }
1169
1170 bool was_pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
1171 bool cluster_full = _osdmap_full_flag();
1172 bool was_pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || cluster_full ||
1173 _osdmap_has_pool_full();
1174 map<int64_t, bool> pool_full_map;
1175 for (auto it = osdmap->get_pools().begin();
1176 it != osdmap->get_pools().end(); ++it)
1177 pool_full_map[it->first] = _osdmap_pool_full(it->second);
1178
1179
1180 list<LingerOp*> need_resend_linger;
1181 map<ceph_tid_t, Op*> need_resend;
1182 map<ceph_tid_t, CommandOp*> need_resend_command;
1183
1184 if (m->get_last() <= osdmap->get_epoch()) {
1185 ldout(cct, 3) << "handle_osd_map ignoring epochs ["
1186 << m->get_first() << "," << m->get_last()
1187 << "] <= " << osdmap->get_epoch() << dendl;
1188 } else {
1189 ldout(cct, 3) << "handle_osd_map got epochs ["
1190 << m->get_first() << "," << m->get_last()
1191 << "] > " << osdmap->get_epoch() << dendl;
1192
1193 if (osdmap->get_epoch()) {
1194 bool skipped_map = false;
1195 // we want incrementals
1196 for (epoch_t e = osdmap->get_epoch() + 1;
1197 e <= m->get_last();
1198 e++) {
1199
1200 if (osdmap->get_epoch() == e-1 &&
1201 m->incremental_maps.count(e)) {
1202 ldout(cct, 3) << "handle_osd_map decoding incremental epoch " << e
1203 << dendl;
1204 OSDMap::Incremental inc(m->incremental_maps[e]);
1205 osdmap->apply_incremental(inc);
1206
1207 emit_blocklist_events(inc);
1208
1209 logger->inc(l_osdc_map_inc);
1210 }
1211 else if (m->maps.count(e)) {
1212 ldout(cct, 3) << "handle_osd_map decoding full epoch " << e << dendl;
1213 auto new_osdmap = std::make_unique<OSDMap>();
1214 new_osdmap->decode(m->maps[e]);
1215
1216 emit_blocklist_events(*osdmap, *new_osdmap);
1217 osdmap = std::move(new_osdmap);
1218
1219 logger->inc(l_osdc_map_full);
1220 }
1221 else {
1222 if (e >= m->get_oldest()) {
1223 ldout(cct, 3) << "handle_osd_map requesting missing epoch "
1224 << osdmap->get_epoch()+1 << dendl;
1225 _maybe_request_map();
1226 break;
1227 }
1228 ldout(cct, 3) << "handle_osd_map missing epoch "
1229 << osdmap->get_epoch()+1
1230 << ", jumping to " << m->get_oldest() << dendl;
1231 e = m->get_oldest() - 1;
1232 skipped_map = true;
1233 continue;
1234 }
1235 logger->set(l_osdc_map_epoch, osdmap->get_epoch());
1236
1237 prune_pg_mapping(osdmap->get_pools());
1238 cluster_full = cluster_full || _osdmap_full_flag();
1239 update_pool_full_map(pool_full_map);
1240
1241 // check all outstanding requests on every epoch
1242 for (auto& i : need_resend) {
1243 _prune_snapc(osdmap->get_new_removed_snaps(), i.second);
1244 }
1245 _scan_requests(homeless_session, skipped_map, cluster_full,
1246 &pool_full_map, need_resend,
1247 need_resend_linger, need_resend_command, sul);
1248 for (auto p = osd_sessions.begin();
1249 p != osd_sessions.end(); ) {
1250 auto s = p->second;
1251 _scan_requests(s, skipped_map, cluster_full,
1252 &pool_full_map, need_resend,
1253 need_resend_linger, need_resend_command, sul);
1254 ++p;
1255 // osd down or addr change?
1256 if (!osdmap->is_up(s->osd) ||
1257 (s->con &&
1258 s->con->get_peer_addrs() != osdmap->get_addrs(s->osd))) {
1259 close_session(s);
1260 }
1261 }
1262
1263 ceph_assert(e == osdmap->get_epoch());
1264 }
1265
1266 } else {
1267 // first map. we want the full thing.
1268 if (m->maps.count(m->get_last())) {
1269 for (auto p = osd_sessions.begin();
1270 p != osd_sessions.end(); ++p) {
1271 OSDSession *s = p->second;
1272 _scan_requests(s, false, false, NULL, need_resend,
1273 need_resend_linger, need_resend_command, sul);
1274 }
1275 ldout(cct, 3) << "handle_osd_map decoding full epoch "
1276 << m->get_last() << dendl;
1277 osdmap->decode(m->maps[m->get_last()]);
1278 prune_pg_mapping(osdmap->get_pools());
1279
1280 _scan_requests(homeless_session, false, false, NULL,
1281 need_resend, need_resend_linger,
1282 need_resend_command, sul);
1283 } else {
1284 ldout(cct, 3) << "handle_osd_map hmm, i want a full map, requesting"
1285 << dendl;
1286 monc->sub_want("osdmap", 0, CEPH_SUBSCRIBE_ONETIME);
1287 monc->renew_subs();
1288 }
1289 }
1290 }
1291
1292 // make sure need_resend targets reflect latest map
1293 for (auto p = need_resend.begin(); p != need_resend.end(); ) {
1294 Op *op = p->second;
1295 if (op->target.epoch < osdmap->get_epoch()) {
1296 ldout(cct, 10) << __func__ << " checking op " << p->first << dendl;
1297 int r = _calc_target(&op->target, nullptr);
1298 if (r == RECALC_OP_TARGET_POOL_DNE) {
1299 p = need_resend.erase(p);
1300 _check_op_pool_dne(op, nullptr);
1301 } else {
1302 ++p;
1303 }
1304 } else {
1305 ++p;
1306 }
1307 }
1308
1309 bool pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
1310 bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || _osdmap_full_flag()
1311 || _osdmap_has_pool_full();
1312
1313 // was/is paused?
1314 if (was_pauserd || was_pausewr || pauserd || pausewr ||
1315 osdmap->get_epoch() < epoch_barrier) {
1316 _maybe_request_map();
1317 }
1318
1319 // resend requests
1320 for (auto p = need_resend.begin();
1321 p != need_resend.end(); ++p) {
1322 auto op = p->second;
1323 auto s = op->session;
1324 bool mapped_session = false;
1325 if (!s) {
1326 int r = _map_session(&op->target, &s, sul);
1327 ceph_assert(r == 0);
1328 mapped_session = true;
1329 } else {
1330 get_session(s);
1331 }
1332 std::unique_lock sl(s->lock);
1333 if (mapped_session) {
1334 _session_op_assign(s, op);
1335 }
1336 if (op->should_resend) {
1337 if (!op->session->is_homeless() && !op->target.paused) {
1338 logger->inc(l_osdc_op_resend);
1339 _send_op(op);
1340 }
1341 } else {
1342 _op_cancel_map_check(op);
1343 _cancel_linger_op(op);
1344 }
1345 sl.unlock();
1346 put_session(s);
1347 }
1348 for (auto p = need_resend_linger.begin();
1349 p != need_resend_linger.end(); ++p) {
1350 LingerOp *op = *p;
1351 ceph_assert(op->session);
1352 if (!op->session->is_homeless()) {
1353 logger->inc(l_osdc_linger_resend);
1354 _send_linger(op, sul);
1355 }
1356 }
1357 for (auto p = need_resend_command.begin();
1358 p != need_resend_command.end(); ++p) {
1359 auto c = p->second;
1360 if (c->target.osd >= 0) {
1361 _assign_command_session(c, sul);
1362 if (c->session && !c->session->is_homeless()) {
1363 _send_command(c);
1364 }
1365 }
1366 }
1367
1368 _dump_active();
1369
1370 // finish any Contexts that were waiting on a map update
1371 auto p = waiting_for_map.begin();
1372 while (p != waiting_for_map.end() &&
1373 p->first <= osdmap->get_epoch()) {
1374 //go through the list and call the onfinish methods
1375 for (auto& [c, ec] : p->second) {
1376 ca::post(std::move(c), ec);
1377 }
1378 waiting_for_map.erase(p++);
1379 }
1380
1381 monc->sub_got("osdmap", osdmap->get_epoch());
1382
1383 if (!waiting_for_map.empty()) {
1384 _maybe_request_map();
1385 }
1386 }
1387
1388 void Objecter::enable_blocklist_events()
1389 {
1390 unique_lock wl(rwlock);
1391
1392 blocklist_events_enabled = true;
1393 }
1394
1395 void Objecter::consume_blocklist_events(std::set<entity_addr_t> *events)
1396 {
1397 unique_lock wl(rwlock);
1398
1399 if (events->empty()) {
1400 events->swap(blocklist_events);
1401 } else {
1402 for (const auto &i : blocklist_events) {
1403 events->insert(i);
1404 }
1405 blocklist_events.clear();
1406 }
1407 }
1408
1409 void Objecter::emit_blocklist_events(const OSDMap::Incremental &inc)
1410 {
1411 if (!blocklist_events_enabled) {
1412 return;
1413 }
1414
1415 for (const auto &i : inc.new_blocklist) {
1416 blocklist_events.insert(i.first);
1417 }
1418 }
1419
1420 void Objecter::emit_blocklist_events(const OSDMap &old_osd_map,
1421 const OSDMap &new_osd_map)
1422 {
1423 if (!blocklist_events_enabled) {
1424 return;
1425 }
1426
1427 std::set<entity_addr_t> old_set;
1428 std::set<entity_addr_t> new_set;
1429 std::set<entity_addr_t> old_range_set;
1430 std::set<entity_addr_t> new_range_set;
1431
1432 old_osd_map.get_blocklist(&old_set, &old_range_set);
1433 new_osd_map.get_blocklist(&new_set, &new_range_set);
1434
1435 std::set<entity_addr_t> delta_set;
1436 std::set_difference(
1437 new_set.begin(), new_set.end(), old_set.begin(), old_set.end(),
1438 std::inserter(delta_set, delta_set.begin()));
1439 std::set_difference(
1440 new_range_set.begin(), new_range_set.end(),
1441 old_range_set.begin(), old_range_set.end(),
1442 std::inserter(delta_set, delta_set.begin()));
1443 blocklist_events.insert(delta_set.begin(), delta_set.end());
1444 }
1445
1446 // op pool check
1447
1448 void Objecter::CB_Op_Map_Latest::operator()(bs::error_code e,
1449 version_t latest, version_t)
1450 {
1451 if (e == bs::errc::resource_unavailable_try_again ||
1452 e == bs::errc::operation_canceled)
1453 return;
1454
1455 lgeneric_subdout(objecter->cct, objecter, 10)
1456 << "op_map_latest r=" << e << " tid=" << tid
1457 << " latest " << latest << dendl;
1458
1459 unique_lock wl(objecter->rwlock);
1460
1461 auto iter = objecter->check_latest_map_ops.find(tid);
1462 if (iter == objecter->check_latest_map_ops.end()) {
1463 lgeneric_subdout(objecter->cct, objecter, 10)
1464 << "op_map_latest op "<< tid << " not found" << dendl;
1465 return;
1466 }
1467
1468 Op *op = iter->second;
1469 objecter->check_latest_map_ops.erase(iter);
1470
1471 lgeneric_subdout(objecter->cct, objecter, 20)
1472 << "op_map_latest op "<< op << dendl;
1473
1474 if (op->map_dne_bound == 0)
1475 op->map_dne_bound = latest;
1476
1477 unique_lock sl(op->session->lock, defer_lock);
1478 objecter->_check_op_pool_dne(op, &sl);
1479
1480 op->put();
1481 }
1482
1483 int Objecter::pool_snap_by_name(int64_t poolid, const char *snap_name,
1484 snapid_t *snap) const
1485 {
1486 shared_lock rl(rwlock);
1487
1488 auto& pools = osdmap->get_pools();
1489 auto iter = pools.find(poolid);
1490 if (iter == pools.end()) {
1491 return -ENOENT;
1492 }
1493 const pg_pool_t& pg_pool = iter->second;
1494 for (auto p = pg_pool.snaps.begin();
1495 p != pg_pool.snaps.end();
1496 ++p) {
1497 if (p->second.name == snap_name) {
1498 *snap = p->first;
1499 return 0;
1500 }
1501 }
1502 return -ENOENT;
1503 }
1504
1505 int Objecter::pool_snap_get_info(int64_t poolid, snapid_t snap,
1506 pool_snap_info_t *info) const
1507 {
1508 shared_lock rl(rwlock);
1509
1510 auto& pools = osdmap->get_pools();
1511 auto iter = pools.find(poolid);
1512 if (iter == pools.end()) {
1513 return -ENOENT;
1514 }
1515 const pg_pool_t& pg_pool = iter->second;
1516 auto p = pg_pool.snaps.find(snap);
1517 if (p == pg_pool.snaps.end())
1518 return -ENOENT;
1519 *info = p->second;
1520
1521 return 0;
1522 }
1523
1524 int Objecter::pool_snap_list(int64_t poolid, vector<uint64_t> *snaps)
1525 {
1526 shared_lock rl(rwlock);
1527
1528 const pg_pool_t *pi = osdmap->get_pg_pool(poolid);
1529 if (!pi)
1530 return -ENOENT;
1531 for (auto p = pi->snaps.begin();
1532 p != pi->snaps.end();
1533 ++p) {
1534 snaps->push_back(p->first);
1535 }
1536 return 0;
1537 }
1538
1539 // sl may be unlocked.
1540 void Objecter::_check_op_pool_dne(Op *op, std::unique_lock<std::shared_mutex> *sl)
1541 {
1542 // rwlock is locked unique
1543
1544 if (op->target.pool_ever_existed) {
1545 // the pool previously existed and now it does not, which means it
1546 // was deleted.
1547 op->map_dne_bound = osdmap->get_epoch();
1548 ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
1549 << " pool previously exists but now does not"
1550 << dendl;
1551 } else {
1552 ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
1553 << " current " << osdmap->get_epoch()
1554 << " map_dne_bound " << op->map_dne_bound
1555 << dendl;
1556 }
1557 if (op->map_dne_bound > 0) {
1558 if (osdmap->get_epoch() >= op->map_dne_bound) {
1559 // we had a new enough map
1560 ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
1561 << " concluding pool " << op->target.base_pgid.pool()
1562 << " dne" << dendl;
1563 if (op->has_completion()) {
1564 num_in_flight--;
1565 op->complete(osdc_errc::pool_dne, -ENOENT);
1566 }
1567
1568 OSDSession *s = op->session;
1569 if (s) {
1570 ceph_assert(s != NULL);
1571 ceph_assert(sl->mutex() == &s->lock);
1572 bool session_locked = sl->owns_lock();
1573 if (!session_locked) {
1574 sl->lock();
1575 }
1576 _finish_op(op, 0);
1577 if (!session_locked) {
1578 sl->unlock();
1579 }
1580 } else {
1581 _finish_op(op, 0); // no session
1582 }
1583 }
1584 } else {
1585 _send_op_map_check(op);
1586 }
1587 }
1588
1589 // sl may be unlocked.
1590 void Objecter::_check_op_pool_eio(Op *op, std::unique_lock<std::shared_mutex> *sl)
1591 {
1592 // rwlock is locked unique
1593
1594 // we had a new enough map
1595 ldout(cct, 10) << "check_op_pool_eio tid " << op->tid
1596 << " concluding pool " << op->target.base_pgid.pool()
1597 << " has eio" << dendl;
1598 if (op->has_completion()) {
1599 num_in_flight--;
1600 op->complete(osdc_errc::pool_eio, -EIO);
1601 }
1602
1603 OSDSession *s = op->session;
1604 if (s) {
1605 ceph_assert(s != NULL);
1606 ceph_assert(sl->mutex() == &s->lock);
1607 bool session_locked = sl->owns_lock();
1608 if (!session_locked) {
1609 sl->lock();
1610 }
1611 _finish_op(op, 0);
1612 if (!session_locked) {
1613 sl->unlock();
1614 }
1615 } else {
1616 _finish_op(op, 0); // no session
1617 }
1618 }
1619
1620 void Objecter::_send_op_map_check(Op *op)
1621 {
1622 // rwlock is locked unique
1623 // ask the monitor
1624 if (check_latest_map_ops.count(op->tid) == 0) {
1625 op->get();
1626 check_latest_map_ops[op->tid] = op;
1627 monc->get_version("osdmap", CB_Op_Map_Latest(this, op->tid));
1628 }
1629 }
1630
1631 void Objecter::_op_cancel_map_check(Op *op)
1632 {
1633 // rwlock is locked unique
1634 auto iter = check_latest_map_ops.find(op->tid);
1635 if (iter != check_latest_map_ops.end()) {
1636 Op *op = iter->second;
1637 op->put();
1638 check_latest_map_ops.erase(iter);
1639 }
1640 }
1641
1642 // linger pool check
1643
1644 void Objecter::CB_Linger_Map_Latest::operator()(bs::error_code e,
1645 version_t latest,
1646 version_t)
1647 {
1648 if (e == bs::errc::resource_unavailable_try_again ||
1649 e == bs::errc::operation_canceled) {
1650 // ignore callback; we will retry in resend_mon_ops()
1651 return;
1652 }
1653
1654 unique_lock wl(objecter->rwlock);
1655
1656 auto iter = objecter->check_latest_map_lingers.find(linger_id);
1657 if (iter == objecter->check_latest_map_lingers.end()) {
1658 return;
1659 }
1660
1661 auto op = iter->second;
1662 objecter->check_latest_map_lingers.erase(iter);
1663
1664 if (op->map_dne_bound == 0)
1665 op->map_dne_bound = latest;
1666
1667 bool unregister;
1668 objecter->_check_linger_pool_dne(op, &unregister);
1669
1670 if (unregister) {
1671 objecter->_linger_cancel(op);
1672 }
1673
1674 op->put();
1675 }
1676
1677 void Objecter::_check_linger_pool_dne(LingerOp *op, bool *need_unregister)
1678 {
1679 // rwlock is locked unique
1680
1681 *need_unregister = false;
1682
1683 if (op->register_gen > 0) {
1684 ldout(cct, 10) << "_check_linger_pool_dne linger_id " << op->linger_id
1685 << " pool previously existed but now does not"
1686 << dendl;
1687 op->map_dne_bound = osdmap->get_epoch();
1688 } else {
1689 ldout(cct, 10) << "_check_linger_pool_dne linger_id " << op->linger_id
1690 << " current " << osdmap->get_epoch()
1691 << " map_dne_bound " << op->map_dne_bound
1692 << dendl;
1693 }
1694 if (op->map_dne_bound > 0) {
1695 if (osdmap->get_epoch() >= op->map_dne_bound) {
1696 std::unique_lock wl{op->watch_lock};
1697 if (op->on_reg_commit) {
1698 op->on_reg_commit->defer(std::move(op->on_reg_commit),
1699 osdc_errc::pool_dne, cb::list{});
1700 op->on_reg_commit = nullptr;
1701 }
1702 if (op->on_notify_finish) {
1703 op->on_notify_finish->defer(std::move(op->on_notify_finish),
1704 osdc_errc::pool_dne, cb::list{});
1705 op->on_notify_finish = nullptr;
1706 }
1707 *need_unregister = true;
1708 }
1709 } else {
1710 _send_linger_map_check(op);
1711 }
1712 }
1713
1714 void Objecter::_check_linger_pool_eio(LingerOp *op)
1715 {
1716 // rwlock is locked unique
1717
1718 std::unique_lock wl{op->watch_lock};
1719 if (op->on_reg_commit) {
1720 op->on_reg_commit->defer(std::move(op->on_reg_commit),
1721 osdc_errc::pool_dne, cb::list{});
1722 op->on_reg_commit = nullptr;
1723 }
1724 if (op->on_notify_finish) {
1725 op->on_notify_finish->defer(std::move(op->on_notify_finish),
1726 osdc_errc::pool_dne, cb::list{});
1727 op->on_notify_finish = nullptr;
1728 }
1729 }
1730
1731 void Objecter::_send_linger_map_check(LingerOp *op)
1732 {
1733 // ask the monitor
1734 if (check_latest_map_lingers.count(op->linger_id) == 0) {
1735 op->get();
1736 check_latest_map_lingers[op->linger_id] = op;
1737 monc->get_version("osdmap", CB_Linger_Map_Latest(this, op->linger_id));
1738 }
1739 }
1740
1741 void Objecter::_linger_cancel_map_check(LingerOp *op)
1742 {
1743 // rwlock is locked unique
1744
1745 auto iter = check_latest_map_lingers.find(op->linger_id);
1746 if (iter != check_latest_map_lingers.end()) {
1747 LingerOp *op = iter->second;
1748 op->put();
1749 check_latest_map_lingers.erase(iter);
1750 }
1751 }
1752
1753 // command pool check
1754
1755 void Objecter::CB_Command_Map_Latest::operator()(bs::error_code e,
1756 version_t latest, version_t)
1757 {
1758 if (e == bs::errc::resource_unavailable_try_again ||
1759 e == bs::errc::operation_canceled) {
1760 // ignore callback; we will retry in resend_mon_ops()
1761 return;
1762 }
1763
1764 unique_lock wl(objecter->rwlock);
1765
1766 auto iter = objecter->check_latest_map_commands.find(tid);
1767 if (iter == objecter->check_latest_map_commands.end()) {
1768 return;
1769 }
1770
1771 auto c = iter->second;
1772 objecter->check_latest_map_commands.erase(iter);
1773
1774 if (c->map_dne_bound == 0)
1775 c->map_dne_bound = latest;
1776
1777 unique_lock sul(c->session->lock);
1778 objecter->_check_command_map_dne(c);
1779 sul.unlock();
1780
1781 c->put();
1782 }
1783
1784 void Objecter::_check_command_map_dne(CommandOp *c)
1785 {
1786 // rwlock is locked unique
1787 // session is locked unique
1788
1789 ldout(cct, 10) << "_check_command_map_dne tid " << c->tid
1790 << " current " << osdmap->get_epoch()
1791 << " map_dne_bound " << c->map_dne_bound
1792 << dendl;
1793 if (c->map_dne_bound > 0) {
1794 if (osdmap->get_epoch() >= c->map_dne_bound) {
1795 _finish_command(c, osdcode(c->map_check_error),
1796 std::move(c->map_check_error_str), {});
1797 }
1798 } else {
1799 _send_command_map_check(c);
1800 }
1801 }
1802
1803 void Objecter::_send_command_map_check(CommandOp *c)
1804 {
1805 // rwlock is locked unique
1806 // session is locked unique
1807
1808 // ask the monitor
1809 if (check_latest_map_commands.count(c->tid) == 0) {
1810 c->get();
1811 check_latest_map_commands[c->tid] = c;
1812 monc->get_version("osdmap", CB_Command_Map_Latest(this, c->tid));
1813 }
1814 }
1815
1816 void Objecter::_command_cancel_map_check(CommandOp *c)
1817 {
1818 // rwlock is locked uniqe
1819
1820 auto iter = check_latest_map_commands.find(c->tid);
1821 if (iter != check_latest_map_commands.end()) {
1822 auto c = iter->second;
1823 c->put();
1824 check_latest_map_commands.erase(iter);
1825 }
1826 }
1827
1828
1829 /**
1830 * Look up OSDSession by OSD id.
1831 *
1832 * @returns 0 on success, or -EAGAIN if the lock context requires
1833 * promotion to write.
1834 */
1835 int Objecter::_get_session(int osd, OSDSession **session,
1836 shunique_lock<ceph::shared_mutex>& sul)
1837 {
1838 ceph_assert(sul && sul.mutex() == &rwlock);
1839
1840 if (osd < 0) {
1841 *session = homeless_session;
1842 ldout(cct, 20) << __func__ << " osd=" << osd << " returning homeless"
1843 << dendl;
1844 return 0;
1845 }
1846
1847 auto p = osd_sessions.find(osd);
1848 if (p != osd_sessions.end()) {
1849 auto s = p->second;
1850 s->get();
1851 *session = s;
1852 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << osd << " "
1853 << s->get_nref() << dendl;
1854 return 0;
1855 }
1856 if (!sul.owns_lock()) {
1857 return -EAGAIN;
1858 }
1859 auto s = new OSDSession(cct, osd);
1860 osd_sessions[osd] = s;
1861 s->con = messenger->connect_to_osd(osdmap->get_addrs(osd));
1862 s->con->set_priv(RefCountedPtr{s});
1863 logger->inc(l_osdc_osd_session_open);
1864 logger->set(l_osdc_osd_sessions, osd_sessions.size());
1865 s->get();
1866 *session = s;
1867 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << osd << " "
1868 << s->get_nref() << dendl;
1869 return 0;
1870 }
1871
1872 void Objecter::put_session(Objecter::OSDSession *s)
1873 {
1874 if (s && !s->is_homeless()) {
1875 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << s->osd << " "
1876 << s->get_nref() << dendl;
1877 s->put();
1878 }
1879 }
1880
1881 void Objecter::get_session(Objecter::OSDSession *s)
1882 {
1883 ceph_assert(s != NULL);
1884
1885 if (!s->is_homeless()) {
1886 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << s->osd << " "
1887 << s->get_nref() << dendl;
1888 s->get();
1889 }
1890 }
1891
1892 void Objecter::_reopen_session(OSDSession *s)
1893 {
1894 // rwlock is locked unique
1895 // s->lock is locked
1896
1897 auto addrs = osdmap->get_addrs(s->osd);
1898 ldout(cct, 10) << "reopen_session osd." << s->osd << " session, addr now "
1899 << addrs << dendl;
1900 if (s->con) {
1901 s->con->set_priv(NULL);
1902 s->con->mark_down();
1903 logger->inc(l_osdc_osd_session_close);
1904 }
1905 s->con = messenger->connect_to_osd(addrs);
1906 s->con->set_priv(RefCountedPtr{s});
1907 s->incarnation++;
1908 logger->inc(l_osdc_osd_session_open);
1909 }
1910
1911 void Objecter::close_session(OSDSession *s)
1912 {
1913 // rwlock is locked unique
1914
1915 ldout(cct, 10) << "close_session for osd." << s->osd << dendl;
1916 if (s->con) {
1917 s->con->set_priv(NULL);
1918 s->con->mark_down();
1919 logger->inc(l_osdc_osd_session_close);
1920 }
1921 unique_lock sl(s->lock);
1922
1923 std::list<LingerOp*> homeless_lingers;
1924 std::list<CommandOp*> homeless_commands;
1925 std::list<Op*> homeless_ops;
1926
1927 while (!s->linger_ops.empty()) {
1928 auto i = s->linger_ops.begin();
1929 ldout(cct, 10) << " linger_op " << i->first << dendl;
1930 homeless_lingers.push_back(i->second);
1931 _session_linger_op_remove(s, i->second);
1932 }
1933
1934 while (!s->ops.empty()) {
1935 auto i = s->ops.begin();
1936 ldout(cct, 10) << " op " << i->first << dendl;
1937 homeless_ops.push_back(i->second);
1938 _session_op_remove(s, i->second);
1939 }
1940
1941 while (!s->command_ops.empty()) {
1942 auto i = s->command_ops.begin();
1943 ldout(cct, 10) << " command_op " << i->first << dendl;
1944 homeless_commands.push_back(i->second);
1945 _session_command_op_remove(s, i->second);
1946 }
1947
1948 osd_sessions.erase(s->osd);
1949 sl.unlock();
1950 put_session(s);
1951
1952 // Assign any leftover ops to the homeless session
1953 {
1954 unique_lock hsl(homeless_session->lock);
1955 for (auto i = homeless_lingers.begin();
1956 i != homeless_lingers.end(); ++i) {
1957 _session_linger_op_assign(homeless_session, *i);
1958 }
1959 for (auto i = homeless_ops.begin();
1960 i != homeless_ops.end(); ++i) {
1961 _session_op_assign(homeless_session, *i);
1962 }
1963 for (auto i = homeless_commands.begin();
1964 i != homeless_commands.end(); ++i) {
1965 _session_command_op_assign(homeless_session, *i);
1966 }
1967 }
1968
1969 logger->set(l_osdc_osd_sessions, osd_sessions.size());
1970 }
1971
1972 void Objecter::wait_for_osd_map(epoch_t e)
1973 {
1974 unique_lock l(rwlock);
1975 if (osdmap->get_epoch() >= e) {
1976 l.unlock();
1977 return;
1978 }
1979
1980 ca::waiter<bs::error_code> w;
1981 waiting_for_map[e].emplace_back(OpCompletion::create(
1982 service.get_executor(),
1983 w.ref()),
1984 bs::error_code{});
1985 l.unlock();
1986 w.wait();
1987 }
1988
1989 void Objecter::_get_latest_version(epoch_t oldest, epoch_t newest,
1990 std::unique_ptr<OpCompletion> fin,
1991 std::unique_lock<ceph::shared_mutex>&& l)
1992 {
1993 ceph_assert(fin);
1994 if (osdmap->get_epoch() >= newest) {
1995 ldout(cct, 10) << __func__ << " latest " << newest << ", have it" << dendl;
1996 l.unlock();
1997 ca::defer(std::move(fin), bs::error_code{});
1998 } else {
1999 ldout(cct, 10) << __func__ << " latest " << newest << ", waiting" << dendl;
2000 _wait_for_new_map(std::move(fin), newest, bs::error_code{});
2001 l.unlock();
2002 }
2003 }
2004
2005 void Objecter::maybe_request_map()
2006 {
2007 shared_lock rl(rwlock);
2008 _maybe_request_map();
2009 }
2010
2011 void Objecter::_maybe_request_map()
2012 {
2013 // rwlock is locked
2014 int flag = 0;
2015 if (_osdmap_full_flag()
2016 || osdmap->test_flag(CEPH_OSDMAP_PAUSERD)
2017 || osdmap->test_flag(CEPH_OSDMAP_PAUSEWR)) {
2018 ldout(cct, 10) << "_maybe_request_map subscribing (continuous) to next "
2019 "osd map (FULL flag is set)" << dendl;
2020 } else {
2021 ldout(cct, 10)
2022 << "_maybe_request_map subscribing (onetime) to next osd map" << dendl;
2023 flag = CEPH_SUBSCRIBE_ONETIME;
2024 }
2025 epoch_t epoch = osdmap->get_epoch() ? osdmap->get_epoch()+1 : 0;
2026 if (monc->sub_want("osdmap", epoch, flag)) {
2027 monc->renew_subs();
2028 }
2029 }
2030
2031 void Objecter::_wait_for_new_map(std::unique_ptr<OpCompletion> c, epoch_t epoch,
2032 bs::error_code ec)
2033 {
2034 // rwlock is locked unique
2035 waiting_for_map[epoch].emplace_back(std::move(c), ec);
2036 _maybe_request_map();
2037 }
2038
2039
2040 /**
2041 * Use this together with wait_for_map: this is a pre-check to avoid
2042 * allocating a Context for wait_for_map if we can see that we
2043 * definitely already have the epoch.
2044 *
2045 * This does *not* replace the need to handle the return value of
2046 * wait_for_map: just because we don't have it in this pre-check
2047 * doesn't mean we won't have it when calling back into wait_for_map,
2048 * since the objecter lock is dropped in between.
2049 */
2050 bool Objecter::have_map(const epoch_t epoch)
2051 {
2052 shared_lock rl(rwlock);
2053 if (osdmap->get_epoch() >= epoch) {
2054 return true;
2055 } else {
2056 return false;
2057 }
2058 }
2059
2060 void Objecter::_kick_requests(OSDSession *session,
2061 map<uint64_t, LingerOp *>& lresend)
2062 {
2063 // rwlock is locked unique
2064
2065 // clear backoffs
2066 session->backoffs.clear();
2067 session->backoffs_by_id.clear();
2068
2069 // resend ops
2070 map<ceph_tid_t,Op*> resend; // resend in tid order
2071 for (auto p = session->ops.begin(); p != session->ops.end();) {
2072 Op *op = p->second;
2073 ++p;
2074 if (op->should_resend) {
2075 if (!op->target.paused)
2076 resend[op->tid] = op;
2077 } else {
2078 _op_cancel_map_check(op);
2079 _cancel_linger_op(op);
2080 }
2081 }
2082
2083 logger->inc(l_osdc_op_resend, resend.size());
2084 while (!resend.empty()) {
2085 _send_op(resend.begin()->second);
2086 resend.erase(resend.begin());
2087 }
2088
2089 // resend lingers
2090 logger->inc(l_osdc_linger_resend, session->linger_ops.size());
2091 for (auto j = session->linger_ops.begin();
2092 j != session->linger_ops.end(); ++j) {
2093 LingerOp *op = j->second;
2094 op->get();
2095 ceph_assert(lresend.count(j->first) == 0);
2096 lresend[j->first] = op;
2097 }
2098
2099 // resend commands
2100 logger->inc(l_osdc_command_resend, session->command_ops.size());
2101 map<uint64_t,CommandOp*> cresend; // resend in order
2102 for (auto k = session->command_ops.begin();
2103 k != session->command_ops.end(); ++k) {
2104 cresend[k->first] = k->second;
2105 }
2106 while (!cresend.empty()) {
2107 _send_command(cresend.begin()->second);
2108 cresend.erase(cresend.begin());
2109 }
2110 }
2111
2112 void Objecter::_linger_ops_resend(map<uint64_t, LingerOp *>& lresend,
2113 unique_lock<ceph::shared_mutex>& ul)
2114 {
2115 ceph_assert(ul.owns_lock());
2116 shunique_lock sul(std::move(ul));
2117 while (!lresend.empty()) {
2118 LingerOp *op = lresend.begin()->second;
2119 if (!op->canceled) {
2120 _send_linger(op, sul);
2121 }
2122 op->put();
2123 lresend.erase(lresend.begin());
2124 }
2125 ul = sul.release_to_unique();
2126 }
2127
2128 void Objecter::start_tick()
2129 {
2130 ceph_assert(tick_event == 0);
2131 tick_event =
2132 timer.add_event(ceph::make_timespan(cct->_conf->objecter_tick_interval),
2133 &Objecter::tick, this);
2134 }
2135
2136 void Objecter::tick()
2137 {
2138 shared_lock rl(rwlock);
2139
2140 ldout(cct, 10) << "tick" << dendl;
2141
2142 // we are only called by C_Tick
2143 tick_event = 0;
2144
2145 if (!initialized) {
2146 // we raced with shutdown
2147 ldout(cct, 10) << __func__ << " raced with shutdown" << dendl;
2148 return;
2149 }
2150
2151 set<OSDSession*> toping;
2152
2153
2154 // look for laggy requests
2155 auto cutoff = ceph::coarse_mono_clock::now();
2156 cutoff -= ceph::make_timespan(cct->_conf->objecter_timeout); // timeout
2157
2158 unsigned laggy_ops = 0;
2159
2160 for (auto siter = osd_sessions.begin();
2161 siter != osd_sessions.end(); ++siter) {
2162 auto s = siter->second;
2163 scoped_lock l(s->lock);
2164 bool found = false;
2165 for (auto p = s->ops.begin(); p != s->ops.end(); ++p) {
2166 auto op = p->second;
2167 ceph_assert(op->session);
2168 if (op->stamp < cutoff) {
2169 ldout(cct, 2) << " tid " << p->first << " on osd." << op->session->osd
2170 << " is laggy" << dendl;
2171 found = true;
2172 ++laggy_ops;
2173 }
2174 }
2175 for (auto p = s->linger_ops.begin();
2176 p != s->linger_ops.end();
2177 ++p) {
2178 auto op = p->second;
2179 std::unique_lock wl(op->watch_lock);
2180 ceph_assert(op->session);
2181 ldout(cct, 10) << " pinging osd that serves lingering tid " << p->first
2182 << " (osd." << op->session->osd << ")" << dendl;
2183 found = true;
2184 if (op->is_watch && op->registered && !op->last_error)
2185 _send_linger_ping(op);
2186 }
2187 for (auto p = s->command_ops.begin();
2188 p != s->command_ops.end();
2189 ++p) {
2190 auto op = p->second;
2191 ceph_assert(op->session);
2192 ldout(cct, 10) << " pinging osd that serves command tid " << p->first
2193 << " (osd." << op->session->osd << ")" << dendl;
2194 found = true;
2195 }
2196 if (found)
2197 toping.insert(s);
2198 }
2199 if (num_homeless_ops || !toping.empty()) {
2200 _maybe_request_map();
2201 }
2202
2203 logger->set(l_osdc_op_laggy, laggy_ops);
2204 logger->set(l_osdc_osd_laggy, toping.size());
2205
2206 if (!toping.empty()) {
2207 // send a ping to these osds, to ensure we detect any session resets
2208 // (osd reply message policy is lossy)
2209 for (auto i = toping.begin(); i != toping.end(); ++i) {
2210 (*i)->con->send_message(new MPing);
2211 }
2212 }
2213
2214 // Make sure we don't reschedule if we wake up after shutdown
2215 if (initialized) {
2216 tick_event = timer.reschedule_me(ceph::make_timespan(
2217 cct->_conf->objecter_tick_interval));
2218 }
2219 }
2220
2221 void Objecter::resend_mon_ops()
2222 {
2223 unique_lock wl(rwlock);
2224
2225 ldout(cct, 10) << "resend_mon_ops" << dendl;
2226
2227 for (auto p = poolstat_ops.begin(); p != poolstat_ops.end(); ++p) {
2228 _poolstat_submit(p->second);
2229 logger->inc(l_osdc_poolstat_resend);
2230 }
2231
2232 for (auto p = statfs_ops.begin(); p != statfs_ops.end(); ++p) {
2233 _fs_stats_submit(p->second);
2234 logger->inc(l_osdc_statfs_resend);
2235 }
2236
2237 for (auto p = pool_ops.begin(); p != pool_ops.end(); ++p) {
2238 _pool_op_submit(p->second);
2239 logger->inc(l_osdc_poolop_resend);
2240 }
2241
2242 for (auto p = check_latest_map_ops.begin();
2243 p != check_latest_map_ops.end();
2244 ++p) {
2245 monc->get_version("osdmap", CB_Op_Map_Latest(this, p->second->tid));
2246 }
2247
2248 for (auto p = check_latest_map_lingers.begin();
2249 p != check_latest_map_lingers.end();
2250 ++p) {
2251 monc->get_version("osdmap", CB_Linger_Map_Latest(this, p->second->linger_id));
2252 }
2253
2254 for (auto p = check_latest_map_commands.begin();
2255 p != check_latest_map_commands.end();
2256 ++p) {
2257 monc->get_version("osdmap", CB_Command_Map_Latest(this, p->second->tid));
2258 }
2259 }
2260
2261 // read | write ---------------------------
2262
2263 void Objecter::op_submit(Op *op, ceph_tid_t *ptid, int *ctx_budget)
2264 {
2265 shunique_lock rl(rwlock, ceph::acquire_shared);
2266 ceph_tid_t tid = 0;
2267 if (!ptid)
2268 ptid = &tid;
2269 op->trace.event("op submit");
2270 _op_submit_with_budget(op, rl, ptid, ctx_budget);
2271 }
2272
2273 void Objecter::_op_submit_with_budget(Op *op,
2274 shunique_lock<ceph::shared_mutex>& sul,
2275 ceph_tid_t *ptid,
2276 int *ctx_budget)
2277 {
2278 ceph_assert(initialized);
2279
2280 ceph_assert(op->ops.size() == op->out_bl.size());
2281 ceph_assert(op->ops.size() == op->out_rval.size());
2282 ceph_assert(op->ops.size() == op->out_handler.size());
2283
2284 // throttle. before we look at any state, because
2285 // _take_op_budget() may drop our lock while it blocks.
2286 if (!op->ctx_budgeted || (ctx_budget && (*ctx_budget == -1))) {
2287 int op_budget = _take_op_budget(op, sul);
2288 // take and pass out the budget for the first OP
2289 // in the context session
2290 if (ctx_budget && (*ctx_budget == -1)) {
2291 *ctx_budget = op_budget;
2292 }
2293 }
2294
2295 if (osd_timeout > timespan(0)) {
2296 if (op->tid == 0)
2297 op->tid = ++last_tid;
2298 auto tid = op->tid;
2299 op->ontimeout = timer.add_event(osd_timeout,
2300 [this, tid]() {
2301 op_cancel(tid, -ETIMEDOUT); });
2302 }
2303
2304 _op_submit(op, sul, ptid);
2305 }
2306
2307 void Objecter::_send_op_account(Op *op)
2308 {
2309 inflight_ops++;
2310
2311 // add to gather set(s)
2312 if (op->has_completion()) {
2313 num_in_flight++;
2314 } else {
2315 ldout(cct, 20) << " note: not requesting reply" << dendl;
2316 }
2317
2318 logger->inc(l_osdc_op_active);
2319 logger->inc(l_osdc_op);
2320 logger->inc(l_osdc_oplen_avg, op->ops.size());
2321
2322 if ((op->target.flags & (CEPH_OSD_FLAG_READ | CEPH_OSD_FLAG_WRITE)) ==
2323 (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE))
2324 logger->inc(l_osdc_op_rmw);
2325 else if (op->target.flags & CEPH_OSD_FLAG_WRITE)
2326 logger->inc(l_osdc_op_w);
2327 else if (op->target.flags & CEPH_OSD_FLAG_READ)
2328 logger->inc(l_osdc_op_r);
2329
2330 if (op->target.flags & CEPH_OSD_FLAG_PGOP)
2331 logger->inc(l_osdc_op_pg);
2332
2333 for (auto p = op->ops.begin(); p != op->ops.end(); ++p) {
2334 int code = l_osdc_osdop_other;
2335 switch (p->op.op) {
2336 case CEPH_OSD_OP_STAT: code = l_osdc_osdop_stat; break;
2337 case CEPH_OSD_OP_CREATE: code = l_osdc_osdop_create; break;
2338 case CEPH_OSD_OP_READ: code = l_osdc_osdop_read; break;
2339 case CEPH_OSD_OP_WRITE: code = l_osdc_osdop_write; break;
2340 case CEPH_OSD_OP_WRITEFULL: code = l_osdc_osdop_writefull; break;
2341 case CEPH_OSD_OP_WRITESAME: code = l_osdc_osdop_writesame; break;
2342 case CEPH_OSD_OP_APPEND: code = l_osdc_osdop_append; break;
2343 case CEPH_OSD_OP_ZERO: code = l_osdc_osdop_zero; break;
2344 case CEPH_OSD_OP_TRUNCATE: code = l_osdc_osdop_truncate; break;
2345 case CEPH_OSD_OP_DELETE: code = l_osdc_osdop_delete; break;
2346 case CEPH_OSD_OP_MAPEXT: code = l_osdc_osdop_mapext; break;
2347 case CEPH_OSD_OP_SPARSE_READ: code = l_osdc_osdop_sparse_read; break;
2348 case CEPH_OSD_OP_GETXATTR: code = l_osdc_osdop_getxattr; break;
2349 case CEPH_OSD_OP_SETXATTR: code = l_osdc_osdop_setxattr; break;
2350 case CEPH_OSD_OP_CMPXATTR: code = l_osdc_osdop_cmpxattr; break;
2351 case CEPH_OSD_OP_RMXATTR: code = l_osdc_osdop_rmxattr; break;
2352 case CEPH_OSD_OP_RESETXATTRS: code = l_osdc_osdop_resetxattrs; break;
2353
2354 // OMAP read operations
2355 case CEPH_OSD_OP_OMAPGETVALS:
2356 case CEPH_OSD_OP_OMAPGETKEYS:
2357 case CEPH_OSD_OP_OMAPGETHEADER:
2358 case CEPH_OSD_OP_OMAPGETVALSBYKEYS:
2359 case CEPH_OSD_OP_OMAP_CMP: code = l_osdc_osdop_omap_rd; break;
2360
2361 // OMAP write operations
2362 case CEPH_OSD_OP_OMAPSETVALS:
2363 case CEPH_OSD_OP_OMAPSETHEADER: code = l_osdc_osdop_omap_wr; break;
2364
2365 // OMAP del operations
2366 case CEPH_OSD_OP_OMAPCLEAR:
2367 case CEPH_OSD_OP_OMAPRMKEYS: code = l_osdc_osdop_omap_del; break;
2368
2369 case CEPH_OSD_OP_CALL: code = l_osdc_osdop_call; break;
2370 case CEPH_OSD_OP_WATCH: code = l_osdc_osdop_watch; break;
2371 case CEPH_OSD_OP_NOTIFY: code = l_osdc_osdop_notify; break;
2372 }
2373 if (code)
2374 logger->inc(code);
2375 }
2376 }
2377
2378 void Objecter::_op_submit(Op *op, shunique_lock<ceph::shared_mutex>& sul, ceph_tid_t *ptid)
2379 {
2380 // rwlock is locked
2381
2382 ldout(cct, 10) << __func__ << " op " << op << dendl;
2383
2384 // pick target
2385 ceph_assert(op->session == NULL);
2386 OSDSession *s = NULL;
2387
2388 bool check_for_latest_map = false;
2389 int r = _calc_target(&op->target, nullptr);
2390 switch(r) {
2391 case RECALC_OP_TARGET_POOL_DNE:
2392 check_for_latest_map = true;
2393 break;
2394 case RECALC_OP_TARGET_POOL_EIO:
2395 if (op->has_completion()) {
2396 op->complete(osdc_errc::pool_eio, -EIO);
2397 }
2398 return;
2399 }
2400
2401 // Try to get a session, including a retry if we need to take write lock
2402 r = _get_session(op->target.osd, &s, sul);
2403 if (r == -EAGAIN ||
2404 (check_for_latest_map && sul.owns_lock_shared()) ||
2405 cct->_conf->objecter_debug_inject_relock_delay) {
2406 epoch_t orig_epoch = osdmap->get_epoch();
2407 sul.unlock();
2408 if (cct->_conf->objecter_debug_inject_relock_delay) {
2409 sleep(1);
2410 }
2411 sul.lock();
2412 if (orig_epoch != osdmap->get_epoch()) {
2413 // map changed; recalculate mapping
2414 ldout(cct, 10) << __func__ << " relock raced with osdmap, recalc target"
2415 << dendl;
2416 check_for_latest_map = _calc_target(&op->target, nullptr)
2417 == RECALC_OP_TARGET_POOL_DNE;
2418 if (s) {
2419 put_session(s);
2420 s = NULL;
2421 r = -EAGAIN;
2422 }
2423 }
2424 }
2425 if (r == -EAGAIN) {
2426 ceph_assert(s == NULL);
2427 r = _get_session(op->target.osd, &s, sul);
2428 }
2429 ceph_assert(r == 0);
2430 ceph_assert(s); // may be homeless
2431
2432 _send_op_account(op);
2433
2434 // send?
2435
2436 ceph_assert(op->target.flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE));
2437
2438 bool need_send = false;
2439 if (op->target.paused) {
2440 ldout(cct, 10) << " tid " << op->tid << " op " << op << " is paused"
2441 << dendl;
2442 _maybe_request_map();
2443 } else if (!s->is_homeless()) {
2444 need_send = true;
2445 } else {
2446 _maybe_request_map();
2447 }
2448
2449 unique_lock sl(s->lock);
2450 if (op->tid == 0)
2451 op->tid = ++last_tid;
2452
2453 ldout(cct, 10) << "_op_submit oid " << op->target.base_oid
2454 << " '" << op->target.base_oloc << "' '"
2455 << op->target.target_oloc << "' " << op->ops << " tid "
2456 << op->tid << " osd." << (!s->is_homeless() ? s->osd : -1)
2457 << dendl;
2458
2459 _session_op_assign(s, op);
2460
2461 if (need_send) {
2462 _send_op(op);
2463 }
2464
2465 // Last chance to touch Op here, after giving up session lock it can
2466 // be freed at any time by response handler.
2467 ceph_tid_t tid = op->tid;
2468 if (check_for_latest_map) {
2469 _send_op_map_check(op);
2470 }
2471 if (ptid)
2472 *ptid = tid;
2473 op = NULL;
2474
2475 sl.unlock();
2476 put_session(s);
2477
2478 ldout(cct, 5) << num_in_flight << " in flight" << dendl;
2479 }
2480
2481 int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r)
2482 {
2483 ceph_assert(initialized);
2484
2485 unique_lock sl(s->lock);
2486
2487 auto p = s->ops.find(tid);
2488 if (p == s->ops.end()) {
2489 ldout(cct, 10) << __func__ << " tid " << tid << " dne in session "
2490 << s->osd << dendl;
2491 return -ENOENT;
2492 }
2493
2494 #if 0
2495 if (s->con) {
2496 ldout(cct, 20) << " revoking rx ceph::buffer for " << tid
2497 << " on " << s->con << dendl;
2498 s->con->revoke_rx_buffer(tid);
2499 }
2500 #endif
2501
2502 ldout(cct, 10) << __func__ << " tid " << tid << " in session " << s->osd
2503 << dendl;
2504 Op *op = p->second;
2505 if (op->has_completion()) {
2506 num_in_flight--;
2507 op->complete(osdcode(r), r);
2508 }
2509 _op_cancel_map_check(op);
2510 _finish_op(op, r);
2511 sl.unlock();
2512
2513 return 0;
2514 }
2515
2516 int Objecter::op_cancel(ceph_tid_t tid, int r)
2517 {
2518 int ret = 0;
2519
2520 unique_lock wl(rwlock);
2521 ret = _op_cancel(tid, r);
2522
2523 return ret;
2524 }
2525
2526 int Objecter::op_cancel(const vector<ceph_tid_t>& tids, int r)
2527 {
2528 unique_lock wl(rwlock);
2529 ldout(cct,10) << __func__ << " " << tids << dendl;
2530 for (auto tid : tids) {
2531 _op_cancel(tid, r);
2532 }
2533 return 0;
2534 }
2535
2536 int Objecter::_op_cancel(ceph_tid_t tid, int r)
2537 {
2538 int ret = 0;
2539
2540 ldout(cct, 5) << __func__ << ": cancelling tid " << tid << " r=" << r
2541 << dendl;
2542
2543 start:
2544
2545 for (auto siter = osd_sessions.begin();
2546 siter != osd_sessions.end(); ++siter) {
2547 OSDSession *s = siter->second;
2548 shared_lock sl(s->lock);
2549 if (s->ops.find(tid) != s->ops.end()) {
2550 sl.unlock();
2551 ret = op_cancel(s, tid, r);
2552 if (ret == -ENOENT) {
2553 /* oh no! raced, maybe tid moved to another session, restarting */
2554 goto start;
2555 }
2556 return ret;
2557 }
2558 }
2559
2560 ldout(cct, 5) << __func__ << ": tid " << tid
2561 << " not found in live sessions" << dendl;
2562
2563 // Handle case where the op is in homeless session
2564 shared_lock sl(homeless_session->lock);
2565 if (homeless_session->ops.find(tid) != homeless_session->ops.end()) {
2566 sl.unlock();
2567 ret = op_cancel(homeless_session, tid, r);
2568 if (ret == -ENOENT) {
2569 /* oh no! raced, maybe tid moved to another session, restarting */
2570 goto start;
2571 } else {
2572 return ret;
2573 }
2574 } else {
2575 sl.unlock();
2576 }
2577
2578 ldout(cct, 5) << __func__ << ": tid " << tid
2579 << " not found in homeless session" << dendl;
2580
2581 return ret;
2582 }
2583
2584
2585 epoch_t Objecter::op_cancel_writes(int r, int64_t pool)
2586 {
2587 unique_lock wl(rwlock);
2588
2589 std::vector<ceph_tid_t> to_cancel;
2590 bool found = false;
2591
2592 for (auto siter = osd_sessions.begin();
2593 siter != osd_sessions.end(); ++siter) {
2594 OSDSession *s = siter->second;
2595 shared_lock sl(s->lock);
2596 for (auto op_i = s->ops.begin();
2597 op_i != s->ops.end(); ++op_i) {
2598 if (op_i->second->target.flags & CEPH_OSD_FLAG_WRITE
2599 && (pool == -1 || op_i->second->target.target_oloc.pool == pool)) {
2600 to_cancel.push_back(op_i->first);
2601 }
2602 }
2603 sl.unlock();
2604
2605 for (auto titer = to_cancel.begin(); titer != to_cancel.end(); ++titer) {
2606 int cancel_result = op_cancel(s, *titer, r);
2607 // We hold rwlock across search and cancellation, so cancels
2608 // should always succeed
2609 ceph_assert(cancel_result == 0);
2610 }
2611 if (!found && to_cancel.size())
2612 found = true;
2613 to_cancel.clear();
2614 }
2615
2616 const epoch_t epoch = osdmap->get_epoch();
2617
2618 wl.unlock();
2619
2620 if (found) {
2621 return epoch;
2622 } else {
2623 return -1;
2624 }
2625 }
2626
2627 bool Objecter::is_pg_changed(
2628 int oldprimary,
2629 const vector<int>& oldacting,
2630 int newprimary,
2631 const vector<int>& newacting,
2632 bool any_change)
2633 {
2634 if (OSDMap::primary_changed_broken( // https://tracker.ceph.com/issues/43213
2635 oldprimary,
2636 oldacting,
2637 newprimary,
2638 newacting))
2639 return true;
2640 if (any_change && oldacting != newacting)
2641 return true;
2642 return false; // same primary (tho replicas may have changed)
2643 }
2644
2645 bool Objecter::target_should_be_paused(op_target_t *t)
2646 {
2647 const pg_pool_t *pi = osdmap->get_pg_pool(t->base_oloc.pool);
2648 bool pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
2649 bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) ||
2650 (t->respects_full() && (_osdmap_full_flag() || _osdmap_pool_full(*pi)));
2651
2652 return (t->flags & CEPH_OSD_FLAG_READ && pauserd) ||
2653 (t->flags & CEPH_OSD_FLAG_WRITE && pausewr) ||
2654 (osdmap->get_epoch() < epoch_barrier);
2655 }
2656
2657 /**
2658 * Locking public accessor for _osdmap_full_flag
2659 */
2660 bool Objecter::osdmap_full_flag() const
2661 {
2662 shared_lock rl(rwlock);
2663
2664 return _osdmap_full_flag();
2665 }
2666
2667 bool Objecter::osdmap_pool_full(const int64_t pool_id) const
2668 {
2669 shared_lock rl(rwlock);
2670
2671 if (_osdmap_full_flag()) {
2672 return true;
2673 }
2674
2675 return _osdmap_pool_full(pool_id);
2676 }
2677
2678 bool Objecter::_osdmap_pool_full(const int64_t pool_id) const
2679 {
2680 const pg_pool_t *pool = osdmap->get_pg_pool(pool_id);
2681 if (pool == NULL) {
2682 ldout(cct, 4) << __func__ << ": DNE pool " << pool_id << dendl;
2683 return false;
2684 }
2685
2686 return _osdmap_pool_full(*pool);
2687 }
2688
2689 bool Objecter::_osdmap_has_pool_full() const
2690 {
2691 for (auto it = osdmap->get_pools().begin();
2692 it != osdmap->get_pools().end(); ++it) {
2693 if (_osdmap_pool_full(it->second))
2694 return true;
2695 }
2696 return false;
2697 }
2698
2699 /**
2700 * Wrapper around osdmap->test_flag for special handling of the FULL flag.
2701 */
2702 bool Objecter::_osdmap_full_flag() const
2703 {
2704 // Ignore the FULL flag if the caller does not have honor_osdmap_full
2705 return osdmap->test_flag(CEPH_OSDMAP_FULL) && honor_pool_full;
2706 }
2707
2708 void Objecter::update_pool_full_map(map<int64_t, bool>& pool_full_map)
2709 {
2710 for (map<int64_t, pg_pool_t>::const_iterator it
2711 = osdmap->get_pools().begin();
2712 it != osdmap->get_pools().end(); ++it) {
2713 if (pool_full_map.find(it->first) == pool_full_map.end()) {
2714 pool_full_map[it->first] = _osdmap_pool_full(it->second);
2715 } else {
2716 pool_full_map[it->first] = _osdmap_pool_full(it->second) ||
2717 pool_full_map[it->first];
2718 }
2719 }
2720 }
2721
2722 int64_t Objecter::get_object_hash_position(int64_t pool, const string& key,
2723 const string& ns)
2724 {
2725 shared_lock rl(rwlock);
2726 const pg_pool_t *p = osdmap->get_pg_pool(pool);
2727 if (!p)
2728 return -ENOENT;
2729 return p->hash_key(key, ns);
2730 }
2731
2732 int64_t Objecter::get_object_pg_hash_position(int64_t pool, const string& key,
2733 const string& ns)
2734 {
2735 shared_lock rl(rwlock);
2736 const pg_pool_t *p = osdmap->get_pg_pool(pool);
2737 if (!p)
2738 return -ENOENT;
2739 return p->raw_hash_to_pg(p->hash_key(key, ns));
2740 }
2741
2742 void Objecter::_prune_snapc(
2743 const mempool::osdmap::map<int64_t,
2744 snap_interval_set_t>& new_removed_snaps,
2745 Op *op)
2746 {
2747 bool match = false;
2748 auto i = new_removed_snaps.find(op->target.base_pgid.pool());
2749 if (i != new_removed_snaps.end()) {
2750 for (auto s : op->snapc.snaps) {
2751 if (i->second.contains(s)) {
2752 match = true;
2753 break;
2754 }
2755 }
2756 if (match) {
2757 vector<snapid_t> new_snaps;
2758 for (auto s : op->snapc.snaps) {
2759 if (!i->second.contains(s)) {
2760 new_snaps.push_back(s);
2761 }
2762 }
2763 op->snapc.snaps.swap(new_snaps);
2764 ldout(cct,10) << __func__ << " op " << op->tid << " snapc " << op->snapc
2765 << " (was " << new_snaps << ")" << dendl;
2766 }
2767 }
2768 }
2769
2770 int Objecter::_calc_target(op_target_t *t, Connection *con, bool any_change)
2771 {
2772 // rwlock is locked
2773 bool is_read = t->flags & CEPH_OSD_FLAG_READ;
2774 bool is_write = t->flags & CEPH_OSD_FLAG_WRITE;
2775 t->epoch = osdmap->get_epoch();
2776 ldout(cct,20) << __func__ << " epoch " << t->epoch
2777 << " base " << t->base_oid << " " << t->base_oloc
2778 << " precalc_pgid " << (int)t->precalc_pgid
2779 << " pgid " << t->base_pgid
2780 << (is_read ? " is_read" : "")
2781 << (is_write ? " is_write" : "")
2782 << dendl;
2783
2784 const pg_pool_t *pi = osdmap->get_pg_pool(t->base_oloc.pool);
2785 if (!pi) {
2786 t->osd = -1;
2787 return RECALC_OP_TARGET_POOL_DNE;
2788 }
2789
2790 if (pi->has_flag(pg_pool_t::FLAG_EIO)) {
2791 return RECALC_OP_TARGET_POOL_EIO;
2792 }
2793
2794 ldout(cct,30) << __func__ << " base pi " << pi
2795 << " pg_num " << pi->get_pg_num() << dendl;
2796
2797 bool force_resend = false;
2798 if (osdmap->get_epoch() == pi->last_force_op_resend) {
2799 if (t->last_force_resend < pi->last_force_op_resend) {
2800 t->last_force_resend = pi->last_force_op_resend;
2801 force_resend = true;
2802 } else if (t->last_force_resend == 0) {
2803 force_resend = true;
2804 }
2805 }
2806
2807 // apply tiering
2808 t->target_oid = t->base_oid;
2809 t->target_oloc = t->base_oloc;
2810 if ((t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
2811 if (is_read && pi->has_read_tier())
2812 t->target_oloc.pool = pi->read_tier;
2813 if (is_write && pi->has_write_tier())
2814 t->target_oloc.pool = pi->write_tier;
2815 pi = osdmap->get_pg_pool(t->target_oloc.pool);
2816 if (!pi) {
2817 t->osd = -1;
2818 return RECALC_OP_TARGET_POOL_DNE;
2819 }
2820 }
2821
2822 pg_t pgid;
2823 if (t->precalc_pgid) {
2824 ceph_assert(t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY);
2825 ceph_assert(t->base_oid.name.empty()); // make sure this is a pg op
2826 ceph_assert(t->base_oloc.pool == (int64_t)t->base_pgid.pool());
2827 pgid = t->base_pgid;
2828 } else {
2829 int ret = osdmap->object_locator_to_pg(t->target_oid, t->target_oloc,
2830 pgid);
2831 if (ret == -ENOENT) {
2832 t->osd = -1;
2833 return RECALC_OP_TARGET_POOL_DNE;
2834 }
2835 }
2836 ldout(cct,20) << __func__ << " target " << t->target_oid << " "
2837 << t->target_oloc << " -> pgid " << pgid << dendl;
2838 ldout(cct,30) << __func__ << " target pi " << pi
2839 << " pg_num " << pi->get_pg_num() << dendl;
2840 t->pool_ever_existed = true;
2841
2842 int size = pi->size;
2843 int min_size = pi->min_size;
2844 unsigned pg_num = pi->get_pg_num();
2845 unsigned pg_num_mask = pi->get_pg_num_mask();
2846 unsigned pg_num_pending = pi->get_pg_num_pending();
2847 int up_primary, acting_primary;
2848 vector<int> up, acting;
2849 ps_t actual_ps = ceph_stable_mod(pgid.ps(), pg_num, pg_num_mask);
2850 pg_t actual_pgid(actual_ps, pgid.pool());
2851 if (!lookup_pg_mapping(actual_pgid, osdmap->get_epoch(), &up, &up_primary,
2852 &acting, &acting_primary)) {
2853 osdmap->pg_to_up_acting_osds(actual_pgid, &up, &up_primary,
2854 &acting, &acting_primary);
2855 pg_mapping_t pg_mapping(osdmap->get_epoch(),
2856 up, up_primary, acting, acting_primary);
2857 update_pg_mapping(actual_pgid, std::move(pg_mapping));
2858 }
2859 bool sort_bitwise = osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE);
2860 bool recovery_deletes = osdmap->test_flag(CEPH_OSDMAP_RECOVERY_DELETES);
2861 unsigned prev_seed = ceph_stable_mod(pgid.ps(), t->pg_num, t->pg_num_mask);
2862 pg_t prev_pgid(prev_seed, pgid.pool());
2863 if (any_change && PastIntervals::is_new_interval(
2864 t->acting_primary,
2865 acting_primary,
2866 t->acting,
2867 acting,
2868 t->up_primary,
2869 up_primary,
2870 t->up,
2871 up,
2872 t->size,
2873 size,
2874 t->min_size,
2875 min_size,
2876 t->pg_num,
2877 pg_num,
2878 t->pg_num_pending,
2879 pg_num_pending,
2880 t->sort_bitwise,
2881 sort_bitwise,
2882 t->recovery_deletes,
2883 recovery_deletes,
2884 t->peering_crush_bucket_count,
2885 pi->peering_crush_bucket_count,
2886 t->peering_crush_bucket_target,
2887 pi->peering_crush_bucket_target,
2888 t->peering_crush_bucket_barrier,
2889 pi->peering_crush_bucket_barrier,
2890 t->peering_crush_mandatory_member,
2891 pi->peering_crush_mandatory_member,
2892 prev_pgid)) {
2893 force_resend = true;
2894 }
2895
2896 bool unpaused = false;
2897 bool should_be_paused = target_should_be_paused(t);
2898 if (t->paused && !should_be_paused) {
2899 unpaused = true;
2900 }
2901 if (t->paused != should_be_paused) {
2902 ldout(cct, 10) << __func__ << " paused " << t->paused
2903 << " -> " << should_be_paused << dendl;
2904 t->paused = should_be_paused;
2905 }
2906
2907 bool legacy_change =
2908 t->pgid != pgid ||
2909 is_pg_changed(
2910 t->acting_primary, t->acting, acting_primary, acting,
2911 t->used_replica || any_change);
2912 bool split_or_merge = false;
2913 if (t->pg_num) {
2914 split_or_merge =
2915 prev_pgid.is_split(t->pg_num, pg_num, nullptr) ||
2916 prev_pgid.is_merge_source(t->pg_num, pg_num, nullptr) ||
2917 prev_pgid.is_merge_target(t->pg_num, pg_num);
2918 }
2919
2920 if (legacy_change || split_or_merge || force_resend) {
2921 t->pgid = pgid;
2922 t->acting = std::move(acting);
2923 t->acting_primary = acting_primary;
2924 t->up_primary = up_primary;
2925 t->up = std::move(up);
2926 t->size = size;
2927 t->min_size = min_size;
2928 t->pg_num = pg_num;
2929 t->pg_num_mask = pg_num_mask;
2930 t->pg_num_pending = pg_num_pending;
2931 spg_t spgid(actual_pgid);
2932 if (pi->is_erasure()) {
2933 for (uint8_t i = 0; i < t->acting.size(); ++i) {
2934 if (t->acting[i] == acting_primary) {
2935 spgid.reset_shard(shard_id_t(i));
2936 break;
2937 }
2938 }
2939 }
2940 t->actual_pgid = spgid;
2941 t->sort_bitwise = sort_bitwise;
2942 t->recovery_deletes = recovery_deletes;
2943 t->peering_crush_bucket_count = pi->peering_crush_bucket_count;
2944 t->peering_crush_bucket_target = pi->peering_crush_bucket_target;
2945 t->peering_crush_bucket_barrier = pi->peering_crush_bucket_barrier;
2946 t->peering_crush_mandatory_member = pi->peering_crush_mandatory_member;
2947 ldout(cct, 10) << __func__ << " "
2948 << " raw pgid " << pgid << " -> actual " << t->actual_pgid
2949 << " acting " << t->acting
2950 << " primary " << acting_primary << dendl;
2951 t->used_replica = false;
2952 if ((t->flags & (CEPH_OSD_FLAG_BALANCE_READS |
2953 CEPH_OSD_FLAG_LOCALIZE_READS)) &&
2954 !is_write && pi->is_replicated() && t->acting.size() > 1) {
2955 int osd;
2956 ceph_assert(is_read && t->acting[0] == acting_primary);
2957 if (t->flags & CEPH_OSD_FLAG_BALANCE_READS) {
2958 int p = rand() % t->acting.size();
2959 if (p)
2960 t->used_replica = true;
2961 osd = t->acting[p];
2962 ldout(cct, 10) << " chose random osd." << osd << " of " << t->acting
2963 << dendl;
2964 } else {
2965 // look for a local replica. prefer the primary if the
2966 // distance is the same.
2967 int best = -1;
2968 int best_locality = 0;
2969 for (unsigned i = 0; i < t->acting.size(); ++i) {
2970 int locality = osdmap->crush->get_common_ancestor_distance(
2971 cct, t->acting[i], crush_location);
2972 ldout(cct, 20) << __func__ << " localize: rank " << i
2973 << " osd." << t->acting[i]
2974 << " locality " << locality << dendl;
2975 if (i == 0 ||
2976 (locality >= 0 && best_locality >= 0 &&
2977 locality < best_locality) ||
2978 (best_locality < 0 && locality >= 0)) {
2979 best = i;
2980 best_locality = locality;
2981 if (i)
2982 t->used_replica = true;
2983 }
2984 }
2985 ceph_assert(best >= 0);
2986 osd = t->acting[best];
2987 }
2988 t->osd = osd;
2989 } else {
2990 t->osd = acting_primary;
2991 }
2992 }
2993 if (legacy_change || unpaused || force_resend) {
2994 return RECALC_OP_TARGET_NEED_RESEND;
2995 }
2996 if (split_or_merge &&
2997 (osdmap->require_osd_release >= ceph_release_t::luminous ||
2998 HAVE_FEATURE(osdmap->get_xinfo(acting_primary).features,
2999 RESEND_ON_SPLIT))) {
3000 return RECALC_OP_TARGET_NEED_RESEND;
3001 }
3002 return RECALC_OP_TARGET_NO_ACTION;
3003 }
3004
3005 int Objecter::_map_session(op_target_t *target, OSDSession **s,
3006 shunique_lock<ceph::shared_mutex>& sul)
3007 {
3008 _calc_target(target, nullptr);
3009 return _get_session(target->osd, s, sul);
3010 }
3011
3012 void Objecter::_session_op_assign(OSDSession *to, Op *op)
3013 {
3014 // to->lock is locked
3015 ceph_assert(op->session == NULL);
3016 ceph_assert(op->tid);
3017
3018 get_session(to);
3019 op->session = to;
3020 to->ops[op->tid] = op;
3021
3022 if (to->is_homeless()) {
3023 num_homeless_ops++;
3024 }
3025
3026 ldout(cct, 15) << __func__ << " " << to->osd << " " << op->tid << dendl;
3027 }
3028
3029 void Objecter::_session_op_remove(OSDSession *from, Op *op)
3030 {
3031 ceph_assert(op->session == from);
3032 // from->lock is locked
3033
3034 if (from->is_homeless()) {
3035 num_homeless_ops--;
3036 }
3037
3038 from->ops.erase(op->tid);
3039 put_session(from);
3040 op->session = NULL;
3041
3042 ldout(cct, 15) << __func__ << " " << from->osd << " " << op->tid << dendl;
3043 }
3044
3045 void Objecter::_session_linger_op_assign(OSDSession *to, LingerOp *op)
3046 {
3047 // to lock is locked unique
3048 ceph_assert(op->session == NULL);
3049
3050 if (to->is_homeless()) {
3051 num_homeless_ops++;
3052 }
3053
3054 get_session(to);
3055 op->session = to;
3056 to->linger_ops[op->linger_id] = op;
3057
3058 ldout(cct, 15) << __func__ << " " << to->osd << " " << op->linger_id
3059 << dendl;
3060 }
3061
3062 void Objecter::_session_linger_op_remove(OSDSession *from, LingerOp *op)
3063 {
3064 ceph_assert(from == op->session);
3065 // from->lock is locked unique
3066
3067 if (from->is_homeless()) {
3068 num_homeless_ops--;
3069 }
3070
3071 from->linger_ops.erase(op->linger_id);
3072 put_session(from);
3073 op->session = NULL;
3074
3075 ldout(cct, 15) << __func__ << " " << from->osd << " " << op->linger_id
3076 << dendl;
3077 }
3078
3079 void Objecter::_session_command_op_remove(OSDSession *from, CommandOp *op)
3080 {
3081 ceph_assert(from == op->session);
3082 // from->lock is locked
3083
3084 if (from->is_homeless()) {
3085 num_homeless_ops--;
3086 }
3087
3088 from->command_ops.erase(op->tid);
3089 put_session(from);
3090 op->session = NULL;
3091
3092 ldout(cct, 15) << __func__ << " " << from->osd << " " << op->tid << dendl;
3093 }
3094
3095 void Objecter::_session_command_op_assign(OSDSession *to, CommandOp *op)
3096 {
3097 // to->lock is locked
3098 ceph_assert(op->session == NULL);
3099 ceph_assert(op->tid);
3100
3101 if (to->is_homeless()) {
3102 num_homeless_ops++;
3103 }
3104
3105 get_session(to);
3106 op->session = to;
3107 to->command_ops[op->tid] = op;
3108
3109 ldout(cct, 15) << __func__ << " " << to->osd << " " << op->tid << dendl;
3110 }
3111
3112 int Objecter::_recalc_linger_op_target(LingerOp *linger_op,
3113 shunique_lock<ceph::shared_mutex>& sul)
3114 {
3115 // rwlock is locked unique
3116
3117 int r = _calc_target(&linger_op->target, nullptr, true);
3118 if (r == RECALC_OP_TARGET_NEED_RESEND) {
3119 ldout(cct, 10) << "recalc_linger_op_target tid " << linger_op->linger_id
3120 << " pgid " << linger_op->target.pgid
3121 << " acting " << linger_op->target.acting << dendl;
3122
3123 OSDSession *s = NULL;
3124 r = _get_session(linger_op->target.osd, &s, sul);
3125 ceph_assert(r == 0);
3126
3127 if (linger_op->session != s) {
3128 // NB locking two sessions (s and linger_op->session) at the
3129 // same time here is only safe because we are the only one that
3130 // takes two, and we are holding rwlock for write. We use
3131 // std::shared_mutex in OSDSession because lockdep doesn't know
3132 // that.
3133 unique_lock sl(s->lock);
3134 _session_linger_op_remove(linger_op->session, linger_op);
3135 _session_linger_op_assign(s, linger_op);
3136 }
3137
3138 put_session(s);
3139 return RECALC_OP_TARGET_NEED_RESEND;
3140 }
3141 return r;
3142 }
3143
3144 void Objecter::_cancel_linger_op(Op *op)
3145 {
3146 ldout(cct, 15) << "cancel_op " << op->tid << dendl;
3147
3148 ceph_assert(!op->should_resend);
3149 if (op->has_completion()) {
3150 op->onfinish = nullptr;
3151 num_in_flight--;
3152 }
3153
3154 _finish_op(op, 0);
3155 }
3156
3157 void Objecter::_finish_op(Op *op, int r)
3158 {
3159 ldout(cct, 15) << __func__ << " " << op->tid << dendl;
3160
3161 // op->session->lock is locked unique or op->session is null
3162
3163 if (!op->ctx_budgeted && op->budget >= 0) {
3164 put_op_budget_bytes(op->budget);
3165 op->budget = -1;
3166 }
3167
3168 if (op->ontimeout && r != -ETIMEDOUT)
3169 timer.cancel_event(op->ontimeout);
3170
3171 if (op->session) {
3172 _session_op_remove(op->session, op);
3173 }
3174
3175 logger->dec(l_osdc_op_active);
3176
3177 ceph_assert(check_latest_map_ops.find(op->tid) == check_latest_map_ops.end());
3178
3179 inflight_ops--;
3180
3181 op->put();
3182 }
3183
3184 Objecter::MOSDOp *Objecter::_prepare_osd_op(Op *op)
3185 {
3186 // rwlock is locked
3187
3188 int flags = op->target.flags;
3189 flags |= CEPH_OSD_FLAG_KNOWN_REDIR;
3190 flags |= CEPH_OSD_FLAG_SUPPORTSPOOLEIO;
3191
3192 // Nothing checks this any longer, but needed for compatibility with
3193 // pre-luminous osds
3194 flags |= CEPH_OSD_FLAG_ONDISK;
3195
3196 if (!honor_pool_full)
3197 flags |= CEPH_OSD_FLAG_FULL_FORCE;
3198
3199 op->target.paused = false;
3200 op->stamp = ceph::coarse_mono_clock::now();
3201
3202 hobject_t hobj = op->target.get_hobj();
3203 auto m = new MOSDOp(client_inc, op->tid,
3204 hobj, op->target.actual_pgid,
3205 osdmap->get_epoch(),
3206 flags, op->features);
3207
3208 m->set_snapid(op->snapid);
3209 m->set_snap_seq(op->snapc.seq);
3210 m->set_snaps(op->snapc.snaps);
3211
3212 m->ops = op->ops;
3213 m->set_mtime(op->mtime);
3214 m->set_retry_attempt(op->attempts++);
3215
3216 if (!op->trace.valid() && cct->_conf->osdc_blkin_trace_all) {
3217 op->trace.init("op", &trace_endpoint);
3218 }
3219
3220 if (op->priority)
3221 m->set_priority(op->priority);
3222 else
3223 m->set_priority(cct->_conf->osd_client_op_priority);
3224
3225 if (op->reqid != osd_reqid_t()) {
3226 m->set_reqid(op->reqid);
3227 }
3228
3229 logger->inc(l_osdc_op_send);
3230 ssize_t sum = 0;
3231 for (unsigned i = 0; i < m->ops.size(); i++) {
3232 sum += m->ops[i].indata.length();
3233 }
3234 logger->inc(l_osdc_op_send_bytes, sum);
3235
3236 return m;
3237 }
3238
3239 void Objecter::_send_op(Op *op)
3240 {
3241 // rwlock is locked
3242 // op->session->lock is locked
3243
3244 // backoff?
3245 auto p = op->session->backoffs.find(op->target.actual_pgid);
3246 if (p != op->session->backoffs.end()) {
3247 hobject_t hoid = op->target.get_hobj();
3248 auto q = p->second.lower_bound(hoid);
3249 if (q != p->second.begin()) {
3250 --q;
3251 if (hoid >= q->second.end) {
3252 ++q;
3253 }
3254 }
3255 if (q != p->second.end()) {
3256 ldout(cct, 20) << __func__ << " ? " << q->first << " [" << q->second.begin
3257 << "," << q->second.end << ")" << dendl;
3258 int r = cmp(hoid, q->second.begin);
3259 if (r == 0 || (r > 0 && hoid < q->second.end)) {
3260 ldout(cct, 10) << __func__ << " backoff " << op->target.actual_pgid
3261 << " id " << q->second.id << " on " << hoid
3262 << ", queuing " << op << " tid " << op->tid << dendl;
3263 return;
3264 }
3265 }
3266 }
3267
3268 ceph_assert(op->tid > 0);
3269 MOSDOp *m = _prepare_osd_op(op);
3270
3271 if (op->target.actual_pgid != m->get_spg()) {
3272 ldout(cct, 10) << __func__ << " " << op->tid << " pgid change from "
3273 << m->get_spg() << " to " << op->target.actual_pgid
3274 << ", updating and reencoding" << dendl;
3275 m->set_spg(op->target.actual_pgid);
3276 m->clear_payload(); // reencode
3277 }
3278
3279 ldout(cct, 15) << "_send_op " << op->tid << " to "
3280 << op->target.actual_pgid << " on osd." << op->session->osd
3281 << dendl;
3282
3283 ConnectionRef con = op->session->con;
3284 ceph_assert(con);
3285
3286 #if 0
3287 // preallocated rx ceph::buffer?
3288 if (op->con) {
3289 ldout(cct, 20) << " revoking rx ceph::buffer for " << op->tid << " on "
3290 << op->con << dendl;
3291 op->con->revoke_rx_buffer(op->tid);
3292 }
3293 if (op->outbl &&
3294 op->ontimeout == 0 && // only post rx_buffer if no timeout; see #9582
3295 op->outbl->length()) {
3296 op->outbl->invalidate_crc(); // messenger writes through c_str()
3297 ldout(cct, 20) << " posting rx ceph::buffer for " << op->tid << " on " << con
3298 << dendl;
3299 op->con = con;
3300 op->con->post_rx_buffer(op->tid, *op->outbl);
3301 }
3302 #endif
3303
3304 op->incarnation = op->session->incarnation;
3305
3306 if (op->trace.valid()) {
3307 m->trace.init("op msg", nullptr, &op->trace);
3308 }
3309 op->session->con->send_message(m);
3310 }
3311
3312 int Objecter::calc_op_budget(const bc::small_vector_base<OSDOp>& ops)
3313 {
3314 int op_budget = 0;
3315 for (auto i = ops.begin(); i != ops.end(); ++i) {
3316 if (i->op.op & CEPH_OSD_OP_MODE_WR) {
3317 op_budget += i->indata.length();
3318 } else if (ceph_osd_op_mode_read(i->op.op)) {
3319 if (ceph_osd_op_uses_extent(i->op.op)) {
3320 if ((int64_t)i->op.extent.length > 0)
3321 op_budget += (int64_t)i->op.extent.length;
3322 } else if (ceph_osd_op_type_attr(i->op.op)) {
3323 op_budget += i->op.xattr.name_len + i->op.xattr.value_len;
3324 }
3325 }
3326 }
3327 return op_budget;
3328 }
3329
3330 void Objecter::_throttle_op(Op *op,
3331 shunique_lock<ceph::shared_mutex>& sul,
3332 int op_budget)
3333 {
3334 ceph_assert(sul && sul.mutex() == &rwlock);
3335 bool locked_for_write = sul.owns_lock();
3336
3337 if (!op_budget)
3338 op_budget = calc_op_budget(op->ops);
3339 if (!op_throttle_bytes.get_or_fail(op_budget)) { //couldn't take right now
3340 sul.unlock();
3341 op_throttle_bytes.get(op_budget);
3342 if (locked_for_write)
3343 sul.lock();
3344 else
3345 sul.lock_shared();
3346 }
3347 if (!op_throttle_ops.get_or_fail(1)) { //couldn't take right now
3348 sul.unlock();
3349 op_throttle_ops.get(1);
3350 if (locked_for_write)
3351 sul.lock();
3352 else
3353 sul.lock_shared();
3354 }
3355 }
3356
3357 int Objecter::take_linger_budget(LingerOp *info)
3358 {
3359 return 1;
3360 }
3361
3362 /* This function DOES put the passed message before returning */
3363 void Objecter::handle_osd_op_reply(MOSDOpReply *m)
3364 {
3365 ldout(cct, 10) << "in handle_osd_op_reply" << dendl;
3366
3367 // get pio
3368 ceph_tid_t tid = m->get_tid();
3369
3370 shunique_lock sul(rwlock, ceph::acquire_shared);
3371 if (!initialized) {
3372 m->put();
3373 return;
3374 }
3375
3376 ConnectionRef con = m->get_connection();
3377 auto priv = con->get_priv();
3378 auto s = static_cast<OSDSession*>(priv.get());
3379 if (!s || s->con != con) {
3380 ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
3381 m->put();
3382 return;
3383 }
3384
3385 unique_lock sl(s->lock);
3386
3387 map<ceph_tid_t, Op *>::iterator iter = s->ops.find(tid);
3388 if (iter == s->ops.end()) {
3389 ldout(cct, 7) << "handle_osd_op_reply " << tid
3390 << (m->is_ondisk() ? " ondisk" : (m->is_onnvram() ?
3391 " onnvram" : " ack"))
3392 << " ... stray" << dendl;
3393 sl.unlock();
3394 m->put();
3395 return;
3396 }
3397
3398 ldout(cct, 7) << "handle_osd_op_reply " << tid
3399 << (m->is_ondisk() ? " ondisk" :
3400 (m->is_onnvram() ? " onnvram" : " ack"))
3401 << " uv " << m->get_user_version()
3402 << " in " << m->get_pg()
3403 << " attempt " << m->get_retry_attempt()
3404 << dendl;
3405 Op *op = iter->second;
3406 op->trace.event("osd op reply");
3407
3408 if (retry_writes_after_first_reply && op->attempts == 1 &&
3409 (op->target.flags & CEPH_OSD_FLAG_WRITE)) {
3410 ldout(cct, 7) << "retrying write after first reply: " << tid << dendl;
3411 if (op->has_completion()) {
3412 num_in_flight--;
3413 }
3414 _session_op_remove(s, op);
3415 sl.unlock();
3416
3417 _op_submit(op, sul, NULL);
3418 m->put();
3419 return;
3420 }
3421
3422 if (m->get_retry_attempt() >= 0) {
3423 if (m->get_retry_attempt() != (op->attempts - 1)) {
3424 ldout(cct, 7) << " ignoring reply from attempt "
3425 << m->get_retry_attempt()
3426 << " from " << m->get_source_inst()
3427 << "; last attempt " << (op->attempts - 1) << " sent to "
3428 << op->session->con->get_peer_addr() << dendl;
3429 m->put();
3430 sl.unlock();
3431 return;
3432 }
3433 } else {
3434 // we don't know the request attempt because the server is old, so
3435 // just accept this one. we may do ACK callbacks we shouldn't
3436 // have, but that is better than doing callbacks out of order.
3437 }
3438
3439 decltype(op->onfinish) onfinish;
3440
3441 int rc = m->get_result();
3442
3443 if (m->is_redirect_reply()) {
3444 ldout(cct, 5) << " got redirect reply; redirecting" << dendl;
3445 if (op->has_completion())
3446 num_in_flight--;
3447 _session_op_remove(s, op);
3448 sl.unlock();
3449
3450 // FIXME: two redirects could race and reorder
3451
3452 op->tid = 0;
3453 m->get_redirect().combine_with_locator(op->target.target_oloc,
3454 op->target.target_oid.name);
3455 op->target.flags |= (CEPH_OSD_FLAG_REDIRECTED |
3456 CEPH_OSD_FLAG_IGNORE_CACHE |
3457 CEPH_OSD_FLAG_IGNORE_OVERLAY);
3458 _op_submit(op, sul, NULL);
3459 m->put();
3460 return;
3461 }
3462
3463 if (rc == -EAGAIN) {
3464 ldout(cct, 7) << " got -EAGAIN, resubmitting" << dendl;
3465 if (op->has_completion())
3466 num_in_flight--;
3467 _session_op_remove(s, op);
3468 sl.unlock();
3469
3470 op->tid = 0;
3471 op->target.flags &= ~(CEPH_OSD_FLAG_BALANCE_READS |
3472 CEPH_OSD_FLAG_LOCALIZE_READS);
3473 op->target.pgid = pg_t();
3474 _op_submit(op, sul, NULL);
3475 m->put();
3476 return;
3477 }
3478
3479 sul.unlock();
3480
3481 if (op->objver)
3482 *op->objver = m->get_user_version();
3483 if (op->reply_epoch)
3484 *op->reply_epoch = m->get_map_epoch();
3485 if (op->data_offset)
3486 *op->data_offset = m->get_header().data_off;
3487
3488 // got data?
3489 if (op->outbl) {
3490 #if 0
3491 if (op->con)
3492 op->con->revoke_rx_buffer(op->tid);
3493 #endif
3494 auto& bl = m->get_data();
3495 if (op->outbl->length() == bl.length() &&
3496 bl.get_num_buffers() <= 1) {
3497 // this is here to keep previous users to *relied* on getting data
3498 // read into existing buffers happy. Notably,
3499 // libradosstriper::RadosStriperImpl::aio_read().
3500 ldout(cct,10) << __func__ << " copying resulting " << bl.length()
3501 << " into existing ceph::buffer of length " << op->outbl->length()
3502 << dendl;
3503 cb::list t;
3504 t = std::move(*op->outbl);
3505 t.invalidate_crc(); // we're overwriting the raw buffers via c_str()
3506 bl.begin().copy(bl.length(), t.c_str());
3507 op->outbl->substr_of(t, 0, bl.length());
3508 } else {
3509 m->claim_data(*op->outbl);
3510 }
3511 op->outbl = 0;
3512 }
3513
3514 // per-op result demuxing
3515 vector<OSDOp> out_ops;
3516 m->claim_ops(out_ops);
3517
3518 if (out_ops.size() != op->ops.size())
3519 ldout(cct, 0) << "WARNING: tid " << op->tid << " reply ops " << out_ops
3520 << " != request ops " << op->ops
3521 << " from " << m->get_source_inst() << dendl;
3522
3523 ceph_assert(op->ops.size() == op->out_bl.size());
3524 ceph_assert(op->ops.size() == op->out_rval.size());
3525 ceph_assert(op->ops.size() == op->out_ec.size());
3526 ceph_assert(op->ops.size() == op->out_handler.size());
3527 auto pb = op->out_bl.begin();
3528 auto pr = op->out_rval.begin();
3529 auto pe = op->out_ec.begin();
3530 auto ph = op->out_handler.begin();
3531 ceph_assert(op->out_bl.size() == op->out_rval.size());
3532 ceph_assert(op->out_bl.size() == op->out_handler.size());
3533 auto p = out_ops.begin();
3534 for (unsigned i = 0;
3535 p != out_ops.end() && pb != op->out_bl.end();
3536 ++i, ++p, ++pb, ++pr, ++pe, ++ph) {
3537 ldout(cct, 10) << " op " << i << " rval " << p->rval
3538 << " len " << p->outdata.length() << dendl;
3539 if (*pb)
3540 **pb = p->outdata;
3541 // set rval before running handlers so that handlers
3542 // can change it if e.g. decoding fails
3543 if (*pr)
3544 **pr = ceph_to_hostos_errno(p->rval);
3545 if (*pe)
3546 **pe = p->rval < 0 ? bs::error_code(-p->rval, osd_category()) :
3547 bs::error_code();
3548 if (*ph) {
3549 std::move((*ph))(p->rval < 0 ?
3550 bs::error_code(-p->rval, osd_category()) :
3551 bs::error_code(),
3552 p->rval, p->outdata);
3553 }
3554 }
3555
3556 // NOTE: we assume that since we only request ONDISK ever we will
3557 // only ever get back one (type of) ack ever.
3558
3559 if (op->has_completion()) {
3560 num_in_flight--;
3561 onfinish = std::move(op->onfinish);
3562 op->onfinish = nullptr;
3563 }
3564 logger->inc(l_osdc_op_reply);
3565
3566 /* get it before we call _finish_op() */
3567 auto completion_lock = s->get_lock(op->target.base_oid);
3568
3569 ldout(cct, 15) << "handle_osd_op_reply completed tid " << tid << dendl;
3570 _finish_op(op, 0);
3571
3572 ldout(cct, 5) << num_in_flight << " in flight" << dendl;
3573
3574 // serialize completions
3575 if (completion_lock.mutex()) {
3576 completion_lock.lock();
3577 }
3578 sl.unlock();
3579
3580 // do callbacks
3581 if (Op::has_completion(onfinish)) {
3582 Op::complete(std::move(onfinish), osdcode(rc), rc);
3583 }
3584 if (completion_lock.mutex()) {
3585 completion_lock.unlock();
3586 }
3587
3588 m->put();
3589 }
3590
3591 void Objecter::handle_osd_backoff(MOSDBackoff *m)
3592 {
3593 ldout(cct, 10) << __func__ << " " << *m << dendl;
3594 shunique_lock sul(rwlock, ceph::acquire_shared);
3595 if (!initialized) {
3596 m->put();
3597 return;
3598 }
3599
3600 ConnectionRef con = m->get_connection();
3601 auto priv = con->get_priv();
3602 auto s = static_cast<OSDSession*>(priv.get());
3603 if (!s || s->con != con) {
3604 ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
3605 m->put();
3606 return;
3607 }
3608
3609 get_session(s);
3610
3611 unique_lock sl(s->lock);
3612
3613 switch (m->op) {
3614 case CEPH_OSD_BACKOFF_OP_BLOCK:
3615 {
3616 // register
3617 OSDBackoff& b = s->backoffs[m->pgid][m->begin];
3618 s->backoffs_by_id.insert(make_pair(m->id, &b));
3619 b.pgid = m->pgid;
3620 b.id = m->id;
3621 b.begin = m->begin;
3622 b.end = m->end;
3623
3624 // ack with original backoff's epoch so that the osd can discard this if
3625 // there was a pg split.
3626 auto r = new MOSDBackoff(m->pgid, m->map_epoch,
3627 CEPH_OSD_BACKOFF_OP_ACK_BLOCK,
3628 m->id, m->begin, m->end);
3629 // this priority must match the MOSDOps from _prepare_osd_op
3630 r->set_priority(cct->_conf->osd_client_op_priority);
3631 con->send_message(r);
3632 }
3633 break;
3634
3635 case CEPH_OSD_BACKOFF_OP_UNBLOCK:
3636 {
3637 auto p = s->backoffs_by_id.find(m->id);
3638 if (p != s->backoffs_by_id.end()) {
3639 OSDBackoff *b = p->second;
3640 if (b->begin != m->begin &&
3641 b->end != m->end) {
3642 lderr(cct) << __func__ << " got " << m->pgid << " id " << m->id
3643 << " unblock on ["
3644 << m->begin << "," << m->end << ") but backoff is ["
3645 << b->begin << "," << b->end << ")" << dendl;
3646 // hrmpf, unblock it anyway.
3647 }
3648 ldout(cct, 10) << __func__ << " unblock backoff " << b->pgid
3649 << " id " << b->id
3650 << " [" << b->begin << "," << b->end
3651 << ")" << dendl;
3652 auto spgp = s->backoffs.find(b->pgid);
3653 ceph_assert(spgp != s->backoffs.end());
3654 spgp->second.erase(b->begin);
3655 if (spgp->second.empty()) {
3656 s->backoffs.erase(spgp);
3657 }
3658 s->backoffs_by_id.erase(p);
3659
3660 // check for any ops to resend
3661 for (auto& q : s->ops) {
3662 if (q.second->target.actual_pgid == m->pgid) {
3663 int r = q.second->target.contained_by(m->begin, m->end);
3664 ldout(cct, 20) << __func__ << " contained_by " << r << " on "
3665 << q.second->target.get_hobj() << dendl;
3666 if (r) {
3667 _send_op(q.second);
3668 }
3669 }
3670 }
3671 } else {
3672 lderr(cct) << __func__ << " " << m->pgid << " id " << m->id
3673 << " unblock on ["
3674 << m->begin << "," << m->end << ") but backoff dne" << dendl;
3675 }
3676 }
3677 break;
3678
3679 default:
3680 ldout(cct, 10) << __func__ << " unrecognized op " << (int)m->op << dendl;
3681 }
3682
3683 sul.unlock();
3684 sl.unlock();
3685
3686 m->put();
3687 put_session(s);
3688 }
3689
3690 uint32_t Objecter::list_nobjects_seek(NListContext *list_context,
3691 uint32_t pos)
3692 {
3693 shared_lock rl(rwlock);
3694 list_context->pos = hobject_t(object_t(), string(), CEPH_NOSNAP,
3695 pos, list_context->pool_id, string());
3696 ldout(cct, 10) << __func__ << " " << list_context
3697 << " pos " << pos << " -> " << list_context->pos << dendl;
3698 pg_t actual = osdmap->raw_pg_to_pg(pg_t(pos, list_context->pool_id));
3699 list_context->current_pg = actual.ps();
3700 list_context->at_end_of_pool = false;
3701 return pos;
3702 }
3703
3704 uint32_t Objecter::list_nobjects_seek(NListContext *list_context,
3705 const hobject_t& cursor)
3706 {
3707 shared_lock rl(rwlock);
3708 ldout(cct, 10) << "list_nobjects_seek " << list_context << dendl;
3709 list_context->pos = cursor;
3710 list_context->at_end_of_pool = false;
3711 pg_t actual = osdmap->raw_pg_to_pg(pg_t(cursor.get_hash(), list_context->pool_id));
3712 list_context->current_pg = actual.ps();
3713 list_context->sort_bitwise = true;
3714 return list_context->current_pg;
3715 }
3716
3717 void Objecter::list_nobjects_get_cursor(NListContext *list_context,
3718 hobject_t *cursor)
3719 {
3720 shared_lock rl(rwlock);
3721 if (list_context->list.empty()) {
3722 *cursor = list_context->pos;
3723 } else {
3724 const librados::ListObjectImpl& entry = list_context->list.front();
3725 const string *key = (entry.locator.empty() ? &entry.oid : &entry.locator);
3726 uint32_t h = osdmap->get_pg_pool(list_context->pool_id)->hash_key(*key, entry.nspace);
3727 *cursor = hobject_t(entry.oid, entry.locator, list_context->pool_snap_seq, h, list_context->pool_id, entry.nspace);
3728 }
3729 }
3730
3731 void Objecter::list_nobjects(NListContext *list_context, Context *onfinish)
3732 {
3733 ldout(cct, 10) << __func__ << " pool_id " << list_context->pool_id
3734 << " pool_snap_seq " << list_context->pool_snap_seq
3735 << " max_entries " << list_context->max_entries
3736 << " list_context " << list_context
3737 << " onfinish " << onfinish
3738 << " current_pg " << list_context->current_pg
3739 << " pos " << list_context->pos << dendl;
3740
3741 shared_lock rl(rwlock);
3742 const pg_pool_t *pool = osdmap->get_pg_pool(list_context->pool_id);
3743 if (!pool) { // pool is gone
3744 rl.unlock();
3745 put_nlist_context_budget(list_context);
3746 onfinish->complete(-ENOENT);
3747 return;
3748 }
3749 int pg_num = pool->get_pg_num();
3750 bool sort_bitwise = osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE);
3751
3752 if (list_context->pos.is_min()) {
3753 list_context->starting_pg_num = 0;
3754 list_context->sort_bitwise = sort_bitwise;
3755 list_context->starting_pg_num = pg_num;
3756 }
3757 if (list_context->sort_bitwise != sort_bitwise) {
3758 list_context->pos = hobject_t(
3759 object_t(), string(), CEPH_NOSNAP,
3760 list_context->current_pg, list_context->pool_id, string());
3761 list_context->sort_bitwise = sort_bitwise;
3762 ldout(cct, 10) << " hobject sort order changed, restarting this pg at "
3763 << list_context->pos << dendl;
3764 }
3765 if (list_context->starting_pg_num != pg_num) {
3766 if (!sort_bitwise) {
3767 // start reading from the beginning; the pgs have changed
3768 ldout(cct, 10) << " pg_num changed; restarting with " << pg_num << dendl;
3769 list_context->pos = collection_list_handle_t();
3770 }
3771 list_context->starting_pg_num = pg_num;
3772 }
3773
3774 if (list_context->pos.is_max()) {
3775 ldout(cct, 20) << __func__ << " end of pool, list "
3776 << list_context->list << dendl;
3777 if (list_context->list.empty()) {
3778 list_context->at_end_of_pool = true;
3779 }
3780 // release the listing context's budget once all
3781 // OPs (in the session) are finished
3782 put_nlist_context_budget(list_context);
3783 onfinish->complete(0);
3784 return;
3785 }
3786
3787 ObjectOperation op;
3788 op.pg_nls(list_context->max_entries, list_context->filter,
3789 list_context->pos, osdmap->get_epoch());
3790 list_context->bl.clear();
3791 auto onack = new C_NList(list_context, onfinish, this);
3792 object_locator_t oloc(list_context->pool_id, list_context->nspace);
3793
3794 // note current_pg in case we don't have (or lose) SORTBITWISE
3795 list_context->current_pg = pool->raw_hash_to_pg(list_context->pos.get_hash());
3796 rl.unlock();
3797
3798 pg_read(list_context->current_pg, oloc, op,
3799 &list_context->bl, 0, onack, &onack->epoch,
3800 &list_context->ctx_budget);
3801 }
3802
3803 void Objecter::_nlist_reply(NListContext *list_context, int r,
3804 Context *final_finish, epoch_t reply_epoch)
3805 {
3806 ldout(cct, 10) << __func__ << " " << list_context << dendl;
3807
3808 auto iter = list_context->bl.cbegin();
3809 pg_nls_response_t response;
3810 decode(response, iter);
3811 if (!iter.end()) {
3812 // we do this as legacy.
3813 cb::list legacy_extra_info;
3814 decode(legacy_extra_info, iter);
3815 }
3816
3817 // if the osd returns 1 (newer code), or handle MAX, it means we
3818 // hit the end of the pg.
3819 if ((response.handle.is_max() || r == 1) &&
3820 !list_context->sort_bitwise) {
3821 // legacy OSD and !sortbitwise, figure out the next PG on our own
3822 ++list_context->current_pg;
3823 if (list_context->current_pg == list_context->starting_pg_num) {
3824 // end of pool
3825 list_context->pos = hobject_t::get_max();
3826 } else {
3827 // next pg
3828 list_context->pos = hobject_t(object_t(), string(), CEPH_NOSNAP,
3829 list_context->current_pg,
3830 list_context->pool_id, string());
3831 }
3832 } else {
3833 list_context->pos = response.handle;
3834 }
3835
3836 int response_size = response.entries.size();
3837 ldout(cct, 20) << " response.entries.size " << response_size
3838 << ", response.entries " << response.entries
3839 << ", handle " << response.handle
3840 << ", tentative new pos " << list_context->pos << dendl;
3841 if (response_size) {
3842 std::move(response.entries.begin(), response.entries.end(),
3843 std::back_inserter(list_context->list));
3844 response.entries.clear();
3845 }
3846
3847 if (list_context->list.size() >= list_context->max_entries) {
3848 ldout(cct, 20) << " hit max, returning results so far, "
3849 << list_context->list << dendl;
3850 // release the listing context's budget once all
3851 // OPs (in the session) are finished
3852 put_nlist_context_budget(list_context);
3853 final_finish->complete(0);
3854 return;
3855 }
3856
3857 // continue!
3858 list_nobjects(list_context, final_finish);
3859 }
3860
3861 void Objecter::put_nlist_context_budget(NListContext *list_context)
3862 {
3863 if (list_context->ctx_budget >= 0) {
3864 ldout(cct, 10) << " release listing context's budget " <<
3865 list_context->ctx_budget << dendl;
3866 put_op_budget_bytes(list_context->ctx_budget);
3867 list_context->ctx_budget = -1;
3868 }
3869 }
3870
3871 // snapshots
3872
3873 void Objecter::create_pool_snap(int64_t pool, std::string_view snap_name,
3874 decltype(PoolOp::onfinish)&& onfinish)
3875 {
3876 unique_lock wl(rwlock);
3877 ldout(cct, 10) << "create_pool_snap; pool: " << pool << "; snap: "
3878 << snap_name << dendl;
3879
3880 const pg_pool_t *p = osdmap->get_pg_pool(pool);
3881 if (!p) {
3882 onfinish->defer(std::move(onfinish), osdc_errc::pool_dne, cb::list{});
3883 return;
3884 }
3885 if (p->snap_exists(snap_name)) {
3886 onfinish->defer(std::move(onfinish), osdc_errc::snapshot_exists,
3887 cb::list{});
3888 return;
3889 }
3890
3891 auto op = new PoolOp;
3892 op->tid = ++last_tid;
3893 op->pool = pool;
3894 op->name = snap_name;
3895 op->onfinish = std::move(onfinish);
3896 op->pool_op = POOL_OP_CREATE_SNAP;
3897 pool_ops[op->tid] = op;
3898
3899 pool_op_submit(op);
3900 }
3901
3902 struct CB_SelfmanagedSnap {
3903 std::unique_ptr<ca::Completion<void(bs::error_code, snapid_t)>> fin;
3904 CB_SelfmanagedSnap(decltype(fin)&& fin)
3905 : fin(std::move(fin)) {}
3906 void operator()(bs::error_code ec, const cb::list& bl) {
3907 snapid_t snapid = 0;
3908 if (!ec) {
3909 try {
3910 auto p = bl.cbegin();
3911 decode(snapid, p);
3912 } catch (const cb::error& e) {
3913 ec = e.code();
3914 }
3915 }
3916 fin->defer(std::move(fin), ec, snapid);
3917 }
3918 };
3919
3920 void Objecter::allocate_selfmanaged_snap(
3921 int64_t pool,
3922 std::unique_ptr<ca::Completion<void(bs::error_code, snapid_t)>> onfinish)
3923 {
3924 unique_lock wl(rwlock);
3925 ldout(cct, 10) << "allocate_selfmanaged_snap; pool: " << pool << dendl;
3926 auto op = new PoolOp;
3927 op->tid = ++last_tid;
3928 op->pool = pool;
3929 op->onfinish = PoolOp::OpComp::create(
3930 service.get_executor(),
3931 CB_SelfmanagedSnap(std::move(onfinish)));
3932 op->pool_op = POOL_OP_CREATE_UNMANAGED_SNAP;
3933 pool_ops[op->tid] = op;
3934
3935 pool_op_submit(op);
3936 }
3937
3938 void Objecter::delete_pool_snap(
3939 int64_t pool, std::string_view snap_name,
3940 decltype(PoolOp::onfinish)&& onfinish)
3941 {
3942 unique_lock wl(rwlock);
3943 ldout(cct, 10) << "delete_pool_snap; pool: " << pool << "; snap: "
3944 << snap_name << dendl;
3945
3946 const pg_pool_t *p = osdmap->get_pg_pool(pool);
3947 if (!p) {
3948 onfinish->defer(std::move(onfinish), osdc_errc::pool_dne, cb::list{});
3949 return;
3950 }
3951
3952 if (!p->snap_exists(snap_name)) {
3953 onfinish->defer(std::move(onfinish), osdc_errc::snapshot_dne, cb::list{});
3954 return;
3955 }
3956
3957 auto op = new PoolOp;
3958 op->tid = ++last_tid;
3959 op->pool = pool;
3960 op->name = snap_name;
3961 op->onfinish = std::move(onfinish);
3962 op->pool_op = POOL_OP_DELETE_SNAP;
3963 pool_ops[op->tid] = op;
3964
3965 pool_op_submit(op);
3966 }
3967
3968 void Objecter::delete_selfmanaged_snap(int64_t pool, snapid_t snap,
3969 decltype(PoolOp::onfinish)&& onfinish)
3970 {
3971 unique_lock wl(rwlock);
3972 ldout(cct, 10) << "delete_selfmanaged_snap; pool: " << pool << "; snap: "
3973 << snap << dendl;
3974 auto op = new PoolOp;
3975 op->tid = ++last_tid;
3976 op->pool = pool;
3977 op->onfinish = std::move(onfinish);
3978 op->pool_op = POOL_OP_DELETE_UNMANAGED_SNAP;
3979 op->snapid = snap;
3980 pool_ops[op->tid] = op;
3981
3982 pool_op_submit(op);
3983 }
3984
3985 void Objecter::create_pool(std::string_view name,
3986 decltype(PoolOp::onfinish)&& onfinish,
3987 int crush_rule)
3988 {
3989 unique_lock wl(rwlock);
3990 ldout(cct, 10) << "create_pool name=" << name << dendl;
3991
3992 if (osdmap->lookup_pg_pool_name(name) >= 0) {
3993 onfinish->defer(std::move(onfinish), osdc_errc::pool_exists, cb::list{});
3994 return;
3995 }
3996
3997 auto op = new PoolOp;
3998 op->tid = ++last_tid;
3999 op->pool = 0;
4000 op->name = name;
4001 op->onfinish = std::move(onfinish);
4002 op->pool_op = POOL_OP_CREATE;
4003 pool_ops[op->tid] = op;
4004 op->crush_rule = crush_rule;
4005
4006 pool_op_submit(op);
4007 }
4008
4009 void Objecter::delete_pool(int64_t pool,
4010 decltype(PoolOp::onfinish)&& onfinish)
4011 {
4012 unique_lock wl(rwlock);
4013 ldout(cct, 10) << "delete_pool " << pool << dendl;
4014
4015 if (!osdmap->have_pg_pool(pool))
4016 onfinish->defer(std::move(onfinish), osdc_errc::pool_dne, cb::list{});
4017 else
4018 _do_delete_pool(pool, std::move(onfinish));
4019 }
4020
4021 void Objecter::delete_pool(std::string_view pool_name,
4022 decltype(PoolOp::onfinish)&& onfinish)
4023 {
4024 unique_lock wl(rwlock);
4025 ldout(cct, 10) << "delete_pool " << pool_name << dendl;
4026
4027 int64_t pool = osdmap->lookup_pg_pool_name(pool_name);
4028 if (pool < 0)
4029 // This only returns one error: -ENOENT.
4030 onfinish->defer(std::move(onfinish), osdc_errc::pool_dne, cb::list{});
4031 else
4032 _do_delete_pool(pool, std::move(onfinish));
4033 }
4034
4035 void Objecter::_do_delete_pool(int64_t pool,
4036 decltype(PoolOp::onfinish)&& onfinish)
4037
4038 {
4039 auto op = new PoolOp;
4040 op->tid = ++last_tid;
4041 op->pool = pool;
4042 op->name = "delete";
4043 op->onfinish = std::move(onfinish);
4044 op->pool_op = POOL_OP_DELETE;
4045 pool_ops[op->tid] = op;
4046 pool_op_submit(op);
4047 }
4048
4049 void Objecter::pool_op_submit(PoolOp *op)
4050 {
4051 // rwlock is locked
4052 if (mon_timeout > timespan(0)) {
4053 op->ontimeout = timer.add_event(mon_timeout,
4054 [this, op]() {
4055 pool_op_cancel(op->tid, -ETIMEDOUT); });
4056 }
4057 _pool_op_submit(op);
4058 }
4059
4060 void Objecter::_pool_op_submit(PoolOp *op)
4061 {
4062 // rwlock is locked unique
4063
4064 ldout(cct, 10) << "pool_op_submit " << op->tid << dendl;
4065 auto m = new MPoolOp(monc->get_fsid(), op->tid, op->pool,
4066 op->name, op->pool_op,
4067 last_seen_osdmap_version);
4068 if (op->snapid) m->snapid = op->snapid;
4069 if (op->crush_rule) m->crush_rule = op->crush_rule;
4070 monc->send_mon_message(m);
4071 op->last_submit = ceph::coarse_mono_clock::now();
4072
4073 logger->inc(l_osdc_poolop_send);
4074 }
4075
4076 /**
4077 * Handle a reply to a PoolOp message. Check that we sent the message
4078 * and give the caller responsibility for the returned cb::list.
4079 * Then either call the finisher or stash the PoolOp, depending on if we
4080 * have a new enough map.
4081 * Lastly, clean up the message and PoolOp.
4082 */
4083 void Objecter::handle_pool_op_reply(MPoolOpReply *m)
4084 {
4085 int rc = m->replyCode;
4086 auto ec = rc < 0 ? bs::error_code(-rc, mon_category()) : bs::error_code();
4087 FUNCTRACE(cct);
4088 shunique_lock sul(rwlock, acquire_shared);
4089 if (!initialized) {
4090 sul.unlock();
4091 m->put();
4092 return;
4093 }
4094
4095 ldout(cct, 10) << "handle_pool_op_reply " << *m << dendl;
4096 ceph_tid_t tid = m->get_tid();
4097 auto iter = pool_ops.find(tid);
4098 if (iter != pool_ops.end()) {
4099 PoolOp *op = iter->second;
4100 ldout(cct, 10) << "have request " << tid << " at " << op << " Op: "
4101 << ceph_pool_op_name(op->pool_op) << dendl;
4102 cb::list bl{std::move(m->response_data)};
4103 if (m->version > last_seen_osdmap_version)
4104 last_seen_osdmap_version = m->version;
4105 if (osdmap->get_epoch() < m->epoch) {
4106 sul.unlock();
4107 sul.lock();
4108 // recheck op existence since we have let go of rwlock
4109 // (for promotion) above.
4110 iter = pool_ops.find(tid);
4111 if (iter == pool_ops.end())
4112 goto done; // op is gone.
4113 if (osdmap->get_epoch() < m->epoch) {
4114 ldout(cct, 20) << "waiting for client to reach epoch " << m->epoch
4115 << " before calling back" << dendl;
4116 _wait_for_new_map(OpCompletion::create(
4117 service.get_executor(),
4118 [o = std::move(op->onfinish),
4119 bl = std::move(bl)](
4120 bs::error_code ec) mutable {
4121 o->defer(std::move(o), ec, bl);
4122 }),
4123 m->epoch,
4124 ec);
4125 } else {
4126 // map epoch changed, probably because a MOSDMap message
4127 // sneaked in. Do caller-specified callback now or else
4128 // we lose it forever.
4129 ceph_assert(op->onfinish);
4130 op->onfinish->defer(std::move(op->onfinish), ec, std::move(bl));
4131 }
4132 } else {
4133 ceph_assert(op->onfinish);
4134 op->onfinish->defer(std::move(op->onfinish), ec, std::move(bl));
4135 }
4136 op->onfinish = nullptr;
4137 if (!sul.owns_lock()) {
4138 sul.unlock();
4139 sul.lock();
4140 }
4141 iter = pool_ops.find(tid);
4142 if (iter != pool_ops.end()) {
4143 _finish_pool_op(op, 0);
4144 }
4145 } else {
4146 ldout(cct, 10) << "unknown request " << tid << dendl;
4147 }
4148
4149 done:
4150 // Not strictly necessary, since we'll release it on return.
4151 sul.unlock();
4152
4153 ldout(cct, 10) << "done" << dendl;
4154 m->put();
4155 }
4156
4157 int Objecter::pool_op_cancel(ceph_tid_t tid, int r)
4158 {
4159 ceph_assert(initialized);
4160
4161 unique_lock wl(rwlock);
4162
4163 auto it = pool_ops.find(tid);
4164 if (it == pool_ops.end()) {
4165 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4166 return -ENOENT;
4167 }
4168
4169 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4170
4171 PoolOp *op = it->second;
4172 if (op->onfinish)
4173 op->onfinish->defer(std::move(op->onfinish), osdcode(r), cb::list{});
4174
4175 _finish_pool_op(op, r);
4176 return 0;
4177 }
4178
4179 void Objecter::_finish_pool_op(PoolOp *op, int r)
4180 {
4181 // rwlock is locked unique
4182 pool_ops.erase(op->tid);
4183 logger->set(l_osdc_poolop_active, pool_ops.size());
4184
4185 if (op->ontimeout && r != -ETIMEDOUT) {
4186 timer.cancel_event(op->ontimeout);
4187 }
4188
4189 delete op;
4190 }
4191
4192 // pool stats
4193
4194 void Objecter::get_pool_stats(
4195 const std::vector<std::string>& pools,
4196 decltype(PoolStatOp::onfinish)&& onfinish)
4197 {
4198 ldout(cct, 10) << "get_pool_stats " << pools << dendl;
4199
4200 auto op = new PoolStatOp;
4201 op->tid = ++last_tid;
4202 op->pools = pools;
4203 op->onfinish = std::move(onfinish);
4204 if (mon_timeout > timespan(0)) {
4205 op->ontimeout = timer.add_event(mon_timeout,
4206 [this, op]() {
4207 pool_stat_op_cancel(op->tid,
4208 -ETIMEDOUT); });
4209 } else {
4210 op->ontimeout = 0;
4211 }
4212
4213 unique_lock wl(rwlock);
4214
4215 poolstat_ops[op->tid] = op;
4216
4217 logger->set(l_osdc_poolstat_active, poolstat_ops.size());
4218
4219 _poolstat_submit(op);
4220 }
4221
4222 void Objecter::_poolstat_submit(PoolStatOp *op)
4223 {
4224 ldout(cct, 10) << "_poolstat_submit " << op->tid << dendl;
4225 monc->send_mon_message(new MGetPoolStats(monc->get_fsid(), op->tid,
4226 op->pools,
4227 last_seen_pgmap_version));
4228 op->last_submit = ceph::coarse_mono_clock::now();
4229
4230 logger->inc(l_osdc_poolstat_send);
4231 }
4232
4233 void Objecter::handle_get_pool_stats_reply(MGetPoolStatsReply *m)
4234 {
4235 ldout(cct, 10) << "handle_get_pool_stats_reply " << *m << dendl;
4236 ceph_tid_t tid = m->get_tid();
4237
4238 unique_lock wl(rwlock);
4239 if (!initialized) {
4240 m->put();
4241 return;
4242 }
4243
4244 auto iter = poolstat_ops.find(tid);
4245 if (iter != poolstat_ops.end()) {
4246 PoolStatOp *op = poolstat_ops[tid];
4247 ldout(cct, 10) << "have request " << tid << " at " << op << dendl;
4248 if (m->version > last_seen_pgmap_version) {
4249 last_seen_pgmap_version = m->version;
4250 }
4251 op->onfinish->defer(std::move(op->onfinish), bs::error_code{},
4252 std::move(m->pool_stats), m->per_pool);
4253 _finish_pool_stat_op(op, 0);
4254 } else {
4255 ldout(cct, 10) << "unknown request " << tid << dendl;
4256 }
4257 ldout(cct, 10) << "done" << dendl;
4258 m->put();
4259 }
4260
4261 int Objecter::pool_stat_op_cancel(ceph_tid_t tid, int r)
4262 {
4263 ceph_assert(initialized);
4264
4265 unique_lock wl(rwlock);
4266
4267 auto it = poolstat_ops.find(tid);
4268 if (it == poolstat_ops.end()) {
4269 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4270 return -ENOENT;
4271 }
4272
4273 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4274
4275 auto op = it->second;
4276 if (op->onfinish)
4277 op->onfinish->defer(std::move(op->onfinish), osdcode(r),
4278 bc::flat_map<std::string, pool_stat_t>{}, false);
4279 _finish_pool_stat_op(op, r);
4280 return 0;
4281 }
4282
4283 void Objecter::_finish_pool_stat_op(PoolStatOp *op, int r)
4284 {
4285 // rwlock is locked unique
4286
4287 poolstat_ops.erase(op->tid);
4288 logger->set(l_osdc_poolstat_active, poolstat_ops.size());
4289
4290 if (op->ontimeout && r != -ETIMEDOUT)
4291 timer.cancel_event(op->ontimeout);
4292
4293 delete op;
4294 }
4295
4296 void Objecter::get_fs_stats(std::optional<int64_t> poolid,
4297 decltype(StatfsOp::onfinish)&& onfinish)
4298 {
4299 ldout(cct, 10) << "get_fs_stats" << dendl;
4300 unique_lock l(rwlock);
4301
4302 auto op = new StatfsOp;
4303 op->tid = ++last_tid;
4304 op->data_pool = poolid;
4305 op->onfinish = std::move(onfinish);
4306 if (mon_timeout > timespan(0)) {
4307 op->ontimeout = timer.add_event(mon_timeout,
4308 [this, op]() {
4309 statfs_op_cancel(op->tid,
4310 -ETIMEDOUT); });
4311 } else {
4312 op->ontimeout = 0;
4313 }
4314 statfs_ops[op->tid] = op;
4315
4316 logger->set(l_osdc_statfs_active, statfs_ops.size());
4317
4318 _fs_stats_submit(op);
4319 }
4320
4321 void Objecter::_fs_stats_submit(StatfsOp *op)
4322 {
4323 // rwlock is locked unique
4324
4325 ldout(cct, 10) << "fs_stats_submit" << op->tid << dendl;
4326 monc->send_mon_message(new MStatfs(monc->get_fsid(), op->tid,
4327 op->data_pool,
4328 last_seen_pgmap_version));
4329 op->last_submit = ceph::coarse_mono_clock::now();
4330
4331 logger->inc(l_osdc_statfs_send);
4332 }
4333
4334 void Objecter::handle_fs_stats_reply(MStatfsReply *m)
4335 {
4336 unique_lock wl(rwlock);
4337 if (!initialized) {
4338 m->put();
4339 return;
4340 }
4341
4342 ldout(cct, 10) << "handle_fs_stats_reply " << *m << dendl;
4343 ceph_tid_t tid = m->get_tid();
4344
4345 if (statfs_ops.count(tid)) {
4346 StatfsOp *op = statfs_ops[tid];
4347 ldout(cct, 10) << "have request " << tid << " at " << op << dendl;
4348 if (m->h.version > last_seen_pgmap_version)
4349 last_seen_pgmap_version = m->h.version;
4350 op->onfinish->defer(std::move(op->onfinish), bs::error_code{}, m->h.st);
4351 _finish_statfs_op(op, 0);
4352 } else {
4353 ldout(cct, 10) << "unknown request " << tid << dendl;
4354 }
4355 m->put();
4356 ldout(cct, 10) << "done" << dendl;
4357 }
4358
4359 int Objecter::statfs_op_cancel(ceph_tid_t tid, int r)
4360 {
4361 ceph_assert(initialized);
4362
4363 unique_lock wl(rwlock);
4364
4365 auto it = statfs_ops.find(tid);
4366 if (it == statfs_ops.end()) {
4367 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4368 return -ENOENT;
4369 }
4370
4371 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4372
4373 auto op = it->second;
4374 if (op->onfinish)
4375 op->onfinish->defer(std::move(op->onfinish), osdcode(r), ceph_statfs{});
4376 _finish_statfs_op(op, r);
4377 return 0;
4378 }
4379
4380 void Objecter::_finish_statfs_op(StatfsOp *op, int r)
4381 {
4382 // rwlock is locked unique
4383
4384 statfs_ops.erase(op->tid);
4385 logger->set(l_osdc_statfs_active, statfs_ops.size());
4386
4387 if (op->ontimeout && r != -ETIMEDOUT)
4388 timer.cancel_event(op->ontimeout);
4389
4390 delete op;
4391 }
4392
4393 // scatter/gather
4394
4395 void Objecter::_sg_read_finish(vector<ObjectExtent>& extents,
4396 vector<cb::list>& resultbl,
4397 cb::list *bl, Context *onfinish)
4398 {
4399 // all done
4400 ldout(cct, 15) << "_sg_read_finish" << dendl;
4401
4402 if (extents.size() > 1) {
4403 Striper::StripedReadResult r;
4404 auto bit = resultbl.begin();
4405 for (auto eit = extents.begin();
4406 eit != extents.end();
4407 ++eit, ++bit) {
4408 r.add_partial_result(cct, *bit, eit->buffer_extents);
4409 }
4410 bl->clear();
4411 r.assemble_result(cct, *bl, false);
4412 } else {
4413 ldout(cct, 15) << " only one frag" << dendl;
4414 *bl = std::move(resultbl[0]);
4415 }
4416
4417 // done
4418 uint64_t bytes_read = bl->length();
4419 ldout(cct, 7) << "_sg_read_finish " << bytes_read << " bytes" << dendl;
4420
4421 if (onfinish) {
4422 onfinish->complete(bytes_read);// > 0 ? bytes_read:m->get_result());
4423 }
4424 }
4425
4426
4427 void Objecter::ms_handle_connect(Connection *con)
4428 {
4429 ldout(cct, 10) << "ms_handle_connect " << con << dendl;
4430 if (!initialized)
4431 return;
4432
4433 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON)
4434 resend_mon_ops();
4435 }
4436
4437 bool Objecter::ms_handle_reset(Connection *con)
4438 {
4439 if (!initialized)
4440 return false;
4441 if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
4442 unique_lock wl(rwlock);
4443
4444 auto priv = con->get_priv();
4445 auto session = static_cast<OSDSession*>(priv.get());
4446 if (session) {
4447 ldout(cct, 1) << "ms_handle_reset " << con << " session " << session
4448 << " osd." << session->osd << dendl;
4449 // the session maybe had been closed if new osdmap just handled
4450 // says the osd down
4451 if (!(initialized && osdmap->is_up(session->osd))) {
4452 ldout(cct, 1) << "ms_handle_reset aborted,initialized=" << initialized << dendl;
4453 wl.unlock();
4454 return false;
4455 }
4456 map<uint64_t, LingerOp *> lresend;
4457 unique_lock sl(session->lock);
4458 _reopen_session(session);
4459 _kick_requests(session, lresend);
4460 sl.unlock();
4461 _linger_ops_resend(lresend, wl);
4462 wl.unlock();
4463 maybe_request_map();
4464 }
4465 return true;
4466 }
4467 return false;
4468 }
4469
4470 void Objecter::ms_handle_remote_reset(Connection *con)
4471 {
4472 /*
4473 * treat these the same.
4474 */
4475 ms_handle_reset(con);
4476 }
4477
4478 bool Objecter::ms_handle_refused(Connection *con)
4479 {
4480 // just log for now
4481 if (osdmap && (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD)) {
4482 int osd = osdmap->identify_osd(con->get_peer_addr());
4483 if (osd >= 0) {
4484 ldout(cct, 1) << "ms_handle_refused on osd." << osd << dendl;
4485 }
4486 }
4487 return false;
4488 }
4489
4490 void Objecter::op_target_t::dump(Formatter *f) const
4491 {
4492 f->dump_stream("pg") << pgid;
4493 f->dump_int("osd", osd);
4494 f->dump_stream("object_id") << base_oid;
4495 f->dump_stream("object_locator") << base_oloc;
4496 f->dump_stream("target_object_id") << target_oid;
4497 f->dump_stream("target_object_locator") << target_oloc;
4498 f->dump_int("paused", (int)paused);
4499 f->dump_int("used_replica", (int)used_replica);
4500 f->dump_int("precalc_pgid", (int)precalc_pgid);
4501 }
4502
4503 void Objecter::_dump_active(OSDSession *s)
4504 {
4505 for (auto p = s->ops.begin(); p != s->ops.end(); ++p) {
4506 Op *op = p->second;
4507 ldout(cct, 20) << op->tid << "\t" << op->target.pgid
4508 << "\tosd." << (op->session ? op->session->osd : -1)
4509 << "\t" << op->target.base_oid
4510 << "\t" << op->ops << dendl;
4511 }
4512 }
4513
4514 void Objecter::_dump_active()
4515 {
4516 ldout(cct, 20) << "dump_active .. " << num_homeless_ops << " homeless"
4517 << dendl;
4518 for (auto siter = osd_sessions.begin();
4519 siter != osd_sessions.end(); ++siter) {
4520 auto s = siter->second;
4521 shared_lock sl(s->lock);
4522 _dump_active(s);
4523 sl.unlock();
4524 }
4525 _dump_active(homeless_session);
4526 }
4527
4528 void Objecter::dump_active()
4529 {
4530 shared_lock rl(rwlock);
4531 _dump_active();
4532 rl.unlock();
4533 }
4534
4535 void Objecter::dump_requests(Formatter *fmt)
4536 {
4537 // Read-lock on Objecter held here
4538 fmt->open_object_section("requests");
4539 dump_ops(fmt);
4540 dump_linger_ops(fmt);
4541 dump_pool_ops(fmt);
4542 dump_pool_stat_ops(fmt);
4543 dump_statfs_ops(fmt);
4544 dump_command_ops(fmt);
4545 fmt->close_section(); // requests object
4546 }
4547
4548 void Objecter::_dump_ops(const OSDSession *s, Formatter *fmt)
4549 {
4550 for (auto p = s->ops.begin(); p != s->ops.end(); ++p) {
4551 Op *op = p->second;
4552 auto age = std::chrono::duration<double>(ceph::coarse_mono_clock::now() - op->stamp);
4553 fmt->open_object_section("op");
4554 fmt->dump_unsigned("tid", op->tid);
4555 op->target.dump(fmt);
4556 fmt->dump_stream("last_sent") << op->stamp;
4557 fmt->dump_float("age", age.count());
4558 fmt->dump_int("attempts", op->attempts);
4559 fmt->dump_stream("snapid") << op->snapid;
4560 fmt->dump_stream("snap_context") << op->snapc;
4561 fmt->dump_stream("mtime") << op->mtime;
4562
4563 fmt->open_array_section("osd_ops");
4564 for (auto it = op->ops.begin(); it != op->ops.end(); ++it) {
4565 fmt->dump_stream("osd_op") << *it;
4566 }
4567 fmt->close_section(); // osd_ops array
4568
4569 fmt->close_section(); // op object
4570 }
4571 }
4572
4573 void Objecter::dump_ops(Formatter *fmt)
4574 {
4575 // Read-lock on Objecter held
4576 fmt->open_array_section("ops");
4577 for (auto siter = osd_sessions.begin();
4578 siter != osd_sessions.end(); ++siter) {
4579 OSDSession *s = siter->second;
4580 shared_lock sl(s->lock);
4581 _dump_ops(s, fmt);
4582 sl.unlock();
4583 }
4584 _dump_ops(homeless_session, fmt);
4585 fmt->close_section(); // ops array
4586 }
4587
4588 void Objecter::_dump_linger_ops(const OSDSession *s, Formatter *fmt)
4589 {
4590 for (auto p = s->linger_ops.begin(); p != s->linger_ops.end(); ++p) {
4591 auto op = p->second;
4592 fmt->open_object_section("linger_op");
4593 fmt->dump_unsigned("linger_id", op->linger_id);
4594 op->target.dump(fmt);
4595 fmt->dump_stream("snapid") << op->snap;
4596 fmt->dump_stream("registered") << op->registered;
4597 fmt->close_section(); // linger_op object
4598 }
4599 }
4600
4601 void Objecter::dump_linger_ops(Formatter *fmt)
4602 {
4603 // We have a read-lock on the objecter
4604 fmt->open_array_section("linger_ops");
4605 for (auto siter = osd_sessions.begin();
4606 siter != osd_sessions.end(); ++siter) {
4607 auto s = siter->second;
4608 shared_lock sl(s->lock);
4609 _dump_linger_ops(s, fmt);
4610 sl.unlock();
4611 }
4612 _dump_linger_ops(homeless_session, fmt);
4613 fmt->close_section(); // linger_ops array
4614 }
4615
4616 void Objecter::_dump_command_ops(const OSDSession *s, Formatter *fmt)
4617 {
4618 for (auto p = s->command_ops.begin(); p != s->command_ops.end(); ++p) {
4619 auto op = p->second;
4620 fmt->open_object_section("command_op");
4621 fmt->dump_unsigned("command_id", op->tid);
4622 fmt->dump_int("osd", op->session ? op->session->osd : -1);
4623 fmt->open_array_section("command");
4624 for (auto q = op->cmd.begin(); q != op->cmd.end(); ++q)
4625 fmt->dump_string("word", *q);
4626 fmt->close_section();
4627 if (op->target_osd >= 0)
4628 fmt->dump_int("target_osd", op->target_osd);
4629 else
4630 fmt->dump_stream("target_pg") << op->target_pg;
4631 fmt->close_section(); // command_op object
4632 }
4633 }
4634
4635 void Objecter::dump_command_ops(Formatter *fmt)
4636 {
4637 // We have a read-lock on the Objecter here
4638 fmt->open_array_section("command_ops");
4639 for (auto siter = osd_sessions.begin();
4640 siter != osd_sessions.end(); ++siter) {
4641 auto s = siter->second;
4642 shared_lock sl(s->lock);
4643 _dump_command_ops(s, fmt);
4644 sl.unlock();
4645 }
4646 _dump_command_ops(homeless_session, fmt);
4647 fmt->close_section(); // command_ops array
4648 }
4649
4650 void Objecter::dump_pool_ops(Formatter *fmt) const
4651 {
4652 fmt->open_array_section("pool_ops");
4653 for (auto p = pool_ops.begin(); p != pool_ops.end(); ++p) {
4654 auto op = p->second;
4655 fmt->open_object_section("pool_op");
4656 fmt->dump_unsigned("tid", op->tid);
4657 fmt->dump_int("pool", op->pool);
4658 fmt->dump_string("name", op->name);
4659 fmt->dump_int("operation_type", op->pool_op);
4660 fmt->dump_unsigned("crush_rule", op->crush_rule);
4661 fmt->dump_stream("snapid") << op->snapid;
4662 fmt->dump_stream("last_sent") << op->last_submit;
4663 fmt->close_section(); // pool_op object
4664 }
4665 fmt->close_section(); // pool_ops array
4666 }
4667
4668 void Objecter::dump_pool_stat_ops(Formatter *fmt) const
4669 {
4670 fmt->open_array_section("pool_stat_ops");
4671 for (auto p = poolstat_ops.begin();
4672 p != poolstat_ops.end();
4673 ++p) {
4674 PoolStatOp *op = p->second;
4675 fmt->open_object_section("pool_stat_op");
4676 fmt->dump_unsigned("tid", op->tid);
4677 fmt->dump_stream("last_sent") << op->last_submit;
4678
4679 fmt->open_array_section("pools");
4680 for (const auto& it : op->pools) {
4681 fmt->dump_string("pool", it);
4682 }
4683 fmt->close_section(); // pools array
4684
4685 fmt->close_section(); // pool_stat_op object
4686 }
4687 fmt->close_section(); // pool_stat_ops array
4688 }
4689
4690 void Objecter::dump_statfs_ops(Formatter *fmt) const
4691 {
4692 fmt->open_array_section("statfs_ops");
4693 for (auto p = statfs_ops.begin(); p != statfs_ops.end(); ++p) {
4694 auto op = p->second;
4695 fmt->open_object_section("statfs_op");
4696 fmt->dump_unsigned("tid", op->tid);
4697 fmt->dump_stream("last_sent") << op->last_submit;
4698 fmt->close_section(); // statfs_op object
4699 }
4700 fmt->close_section(); // statfs_ops array
4701 }
4702
4703 Objecter::RequestStateHook::RequestStateHook(Objecter *objecter) :
4704 m_objecter(objecter)
4705 {
4706 }
4707
4708 int Objecter::RequestStateHook::call(std::string_view command,
4709 const cmdmap_t& cmdmap,
4710 Formatter *f,
4711 std::ostream& ss,
4712 cb::list& out)
4713 {
4714 shared_lock rl(m_objecter->rwlock);
4715 m_objecter->dump_requests(f);
4716 return 0;
4717 }
4718
4719 void Objecter::blocklist_self(bool set)
4720 {
4721 ldout(cct, 10) << "blocklist_self " << (set ? "add" : "rm") << dendl;
4722
4723 vector<string> cmd;
4724 cmd.push_back("{\"prefix\":\"osd blocklist\", ");
4725 if (set)
4726 cmd.push_back("\"blocklistop\":\"add\",");
4727 else
4728 cmd.push_back("\"blocklistop\":\"rm\",");
4729 stringstream ss;
4730 // this is somewhat imprecise in that we are blocklisting our first addr only
4731 ss << messenger->get_myaddrs().front().get_legacy_str();
4732 cmd.push_back("\"addr\":\"" + ss.str() + "\"");
4733
4734 auto m = new MMonCommand(monc->get_fsid());
4735 m->cmd = cmd;
4736
4737 // NOTE: no fallback to legacy blacklist command implemented here
4738 // since this is only used for test code.
4739
4740 monc->send_mon_message(m);
4741 }
4742
4743 // commands
4744
4745 void Objecter::handle_command_reply(MCommandReply *m)
4746 {
4747 unique_lock wl(rwlock);
4748 if (!initialized) {
4749 m->put();
4750 return;
4751 }
4752
4753 ConnectionRef con = m->get_connection();
4754 auto priv = con->get_priv();
4755 auto s = static_cast<OSDSession*>(priv.get());
4756 if (!s || s->con != con) {
4757 ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
4758 m->put();
4759 return;
4760 }
4761
4762 shared_lock sl(s->lock);
4763 auto p = s->command_ops.find(m->get_tid());
4764 if (p == s->command_ops.end()) {
4765 ldout(cct, 10) << "handle_command_reply tid " << m->get_tid()
4766 << " not found" << dendl;
4767 m->put();
4768 sl.unlock();
4769 return;
4770 }
4771
4772 CommandOp *c = p->second;
4773 if (!c->session ||
4774 m->get_connection() != c->session->con) {
4775 ldout(cct, 10) << "handle_command_reply tid " << m->get_tid()
4776 << " got reply from wrong connection "
4777 << m->get_connection() << " " << m->get_source_inst()
4778 << dendl;
4779 m->put();
4780 sl.unlock();
4781 return;
4782 }
4783
4784 if (m->r == -EAGAIN) {
4785 ldout(cct,10) << __func__ << " tid " << m->get_tid()
4786 << " got EAGAIN, requesting map and resending" << dendl;
4787 // NOTE: This might resend twice... once now, and once again when
4788 // we get an updated osdmap and the PG is found to have moved.
4789 _maybe_request_map();
4790 _send_command(c);
4791 m->put();
4792 sl.unlock();
4793 return;
4794 }
4795
4796 sl.unlock();
4797
4798 unique_lock sul(s->lock);
4799 _finish_command(c, m->r < 0 ? bs::error_code(-m->r, osd_category()) :
4800 bs::error_code(), std::move(m->rs),
4801 std::move(m->get_data()));
4802 sul.unlock();
4803
4804 m->put();
4805 }
4806
4807 Objecter::LingerOp::LingerOp(Objecter *o, uint64_t linger_id)
4808 : objecter(o),
4809 linger_id(linger_id),
4810 watch_lock(ceph::make_shared_mutex(
4811 fmt::format("LingerOp::watch_lock #{}", linger_id)))
4812 {}
4813
4814 void Objecter::submit_command(CommandOp *c, ceph_tid_t *ptid)
4815 {
4816 shunique_lock sul(rwlock, ceph::acquire_unique);
4817
4818 ceph_tid_t tid = ++last_tid;
4819 ldout(cct, 10) << "_submit_command " << tid << " " << c->cmd << dendl;
4820 c->tid = tid;
4821
4822 {
4823 unique_lock hs_wl(homeless_session->lock);
4824 _session_command_op_assign(homeless_session, c);
4825 }
4826
4827 _calc_command_target(c, sul);
4828 _assign_command_session(c, sul);
4829 if (osd_timeout > timespan(0)) {
4830 c->ontimeout = timer.add_event(osd_timeout,
4831 [this, c, tid]() {
4832 command_op_cancel(
4833 c->session, tid,
4834 osdc_errc::timed_out); });
4835 }
4836
4837 if (!c->session->is_homeless()) {
4838 _send_command(c);
4839 } else {
4840 _maybe_request_map();
4841 }
4842 if (c->map_check_error)
4843 _send_command_map_check(c);
4844 if (ptid)
4845 *ptid = tid;
4846
4847 logger->inc(l_osdc_command_active);
4848 }
4849
4850 int Objecter::_calc_command_target(CommandOp *c,
4851 shunique_lock<ceph::shared_mutex>& sul)
4852 {
4853 ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
4854
4855 c->map_check_error = 0;
4856
4857 // ignore overlays, just like we do with pg ops
4858 c->target.flags |= CEPH_OSD_FLAG_IGNORE_OVERLAY;
4859
4860 if (c->target_osd >= 0) {
4861 if (!osdmap->exists(c->target_osd)) {
4862 c->map_check_error = -ENOENT;
4863 c->map_check_error_str = "osd dne";
4864 c->target.osd = -1;
4865 return RECALC_OP_TARGET_OSD_DNE;
4866 }
4867 if (osdmap->is_down(c->target_osd)) {
4868 c->map_check_error = -ENXIO;
4869 c->map_check_error_str = "osd down";
4870 c->target.osd = -1;
4871 return RECALC_OP_TARGET_OSD_DOWN;
4872 }
4873 c->target.osd = c->target_osd;
4874 } else {
4875 int ret = _calc_target(&(c->target), nullptr, true);
4876 if (ret == RECALC_OP_TARGET_POOL_DNE) {
4877 c->map_check_error = -ENOENT;
4878 c->map_check_error_str = "pool dne";
4879 c->target.osd = -1;
4880 return ret;
4881 } else if (ret == RECALC_OP_TARGET_OSD_DOWN) {
4882 c->map_check_error = -ENXIO;
4883 c->map_check_error_str = "osd down";
4884 c->target.osd = -1;
4885 return ret;
4886 }
4887 }
4888
4889 OSDSession *s;
4890 int r = _get_session(c->target.osd, &s, sul);
4891 ceph_assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
4892
4893 if (c->session != s) {
4894 put_session(s);
4895 return RECALC_OP_TARGET_NEED_RESEND;
4896 }
4897
4898 put_session(s);
4899
4900 ldout(cct, 20) << "_recalc_command_target " << c->tid << " no change, "
4901 << c->session << dendl;
4902
4903 return RECALC_OP_TARGET_NO_ACTION;
4904 }
4905
4906 void Objecter::_assign_command_session(CommandOp *c,
4907 shunique_lock<ceph::shared_mutex>& sul)
4908 {
4909 ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
4910
4911 OSDSession *s;
4912 int r = _get_session(c->target.osd, &s, sul);
4913 ceph_assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
4914
4915 if (c->session != s) {
4916 if (c->session) {
4917 OSDSession *cs = c->session;
4918 unique_lock csl(cs->lock);
4919 _session_command_op_remove(c->session, c);
4920 csl.unlock();
4921 }
4922 unique_lock sl(s->lock);
4923 _session_command_op_assign(s, c);
4924 }
4925
4926 put_session(s);
4927 }
4928
4929 void Objecter::_send_command(CommandOp *c)
4930 {
4931 ldout(cct, 10) << "_send_command " << c->tid << dendl;
4932 ceph_assert(c->session);
4933 ceph_assert(c->session->con);
4934 auto m = new MCommand(monc->monmap.fsid);
4935 m->cmd = c->cmd;
4936 m->set_data(c->inbl);
4937 m->set_tid(c->tid);
4938 c->session->con->send_message(m);
4939 logger->inc(l_osdc_command_send);
4940 }
4941
4942 int Objecter::command_op_cancel(OSDSession *s, ceph_tid_t tid,
4943 bs::error_code ec)
4944 {
4945 ceph_assert(initialized);
4946
4947 unique_lock wl(rwlock);
4948
4949 auto it = s->command_ops.find(tid);
4950 if (it == s->command_ops.end()) {
4951 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4952 return -ENOENT;
4953 }
4954
4955 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4956
4957 CommandOp *op = it->second;
4958 _command_cancel_map_check(op);
4959 unique_lock sl(op->session->lock);
4960 _finish_command(op, ec, {}, {});
4961 sl.unlock();
4962 return 0;
4963 }
4964
4965 void Objecter::_finish_command(CommandOp *c, bs::error_code ec,
4966 string&& rs, cb::list&& bl)
4967 {
4968 // rwlock is locked unique
4969 // session lock is locked
4970
4971 ldout(cct, 10) << "_finish_command " << c->tid << " = " << ec << " "
4972 << rs << dendl;
4973
4974 if (c->onfinish)
4975 c->onfinish->defer(std::move(c->onfinish), ec, std::move(rs), std::move(bl));
4976
4977 if (c->ontimeout && ec != bs::errc::timed_out)
4978 timer.cancel_event(c->ontimeout);
4979
4980 _session_command_op_remove(c->session, c);
4981
4982 c->put();
4983
4984 logger->dec(l_osdc_command_active);
4985 }
4986
4987 Objecter::OSDSession::~OSDSession()
4988 {
4989 // Caller is responsible for re-assigning or
4990 // destroying any ops that were assigned to us
4991 ceph_assert(ops.empty());
4992 ceph_assert(linger_ops.empty());
4993 ceph_assert(command_ops.empty());
4994 }
4995
4996 Objecter::Objecter(CephContext *cct,
4997 Messenger *m, MonClient *mc,
4998 boost::asio::io_context& service) :
4999 Dispatcher(cct), messenger(m), monc(mc), service(service)
5000 {
5001 mon_timeout = cct->_conf.get_val<std::chrono::seconds>("rados_mon_op_timeout");
5002 osd_timeout = cct->_conf.get_val<std::chrono::seconds>("rados_osd_op_timeout");
5003 }
5004
5005 Objecter::~Objecter()
5006 {
5007 ceph_assert(homeless_session->get_nref() == 1);
5008 ceph_assert(num_homeless_ops == 0);
5009 homeless_session->put();
5010
5011 ceph_assert(osd_sessions.empty());
5012 ceph_assert(poolstat_ops.empty());
5013 ceph_assert(statfs_ops.empty());
5014 ceph_assert(pool_ops.empty());
5015 ceph_assert(waiting_for_map.empty());
5016 ceph_assert(linger_ops.empty());
5017 ceph_assert(check_latest_map_lingers.empty());
5018 ceph_assert(check_latest_map_ops.empty());
5019 ceph_assert(check_latest_map_commands.empty());
5020
5021 ceph_assert(!m_request_state_hook);
5022 ceph_assert(!logger);
5023 }
5024
5025 /**
5026 * Wait until this OSD map epoch is received before
5027 * sending any more operations to OSDs. Use this
5028 * when it is known that the client can't trust
5029 * anything from before this epoch (e.g. due to
5030 * client blocklist at this epoch).
5031 */
5032 void Objecter::set_epoch_barrier(epoch_t epoch)
5033 {
5034 unique_lock wl(rwlock);
5035
5036 ldout(cct, 7) << __func__ << ": barrier " << epoch << " (was "
5037 << epoch_barrier << ") current epoch " << osdmap->get_epoch()
5038 << dendl;
5039 if (epoch > epoch_barrier) {
5040 epoch_barrier = epoch;
5041 _maybe_request_map();
5042 }
5043 }
5044
5045
5046
5047 hobject_t Objecter::enumerate_objects_begin()
5048 {
5049 return hobject_t();
5050 }
5051
5052 hobject_t Objecter::enumerate_objects_end()
5053 {
5054 return hobject_t::get_max();
5055 }
5056
5057 template<typename T>
5058 struct EnumerationContext {
5059 Objecter* objecter;
5060 const hobject_t end;
5061 const cb::list filter;
5062 uint32_t max;
5063 const object_locator_t oloc;
5064 std::vector<T> ls;
5065 private:
5066 fu2::unique_function<void(bs::error_code,
5067 std::vector<T>,
5068 hobject_t) &&> on_finish;
5069 public:
5070 epoch_t epoch = 0;
5071 int budget = -1;
5072
5073 EnumerationContext(Objecter* objecter,
5074 hobject_t end, cb::list filter,
5075 uint32_t max, object_locator_t oloc,
5076 decltype(on_finish) on_finish)
5077 : objecter(objecter), end(std::move(end)), filter(std::move(filter)),
5078 max(max), oloc(std::move(oloc)), on_finish(std::move(on_finish)) {}
5079
5080 void operator()(bs::error_code ec,
5081 std::vector<T> v,
5082 hobject_t h) && {
5083 if (budget >= 0) {
5084 objecter->put_op_budget_bytes(budget);
5085 budget = -1;
5086 }
5087
5088 std::move(on_finish)(ec, std::move(v), std::move(h));
5089 }
5090 };
5091
5092 template<typename T>
5093 struct CB_EnumerateReply {
5094 cb::list bl;
5095
5096 Objecter* objecter;
5097 std::unique_ptr<EnumerationContext<T>> ctx;
5098
5099 CB_EnumerateReply(Objecter* objecter,
5100 std::unique_ptr<EnumerationContext<T>>&& ctx) :
5101 objecter(objecter), ctx(std::move(ctx)) {}
5102
5103 void operator()(bs::error_code ec) {
5104 objecter->_enumerate_reply(std::move(bl), ec, std::move(ctx));
5105 }
5106 };
5107
5108 template<typename T>
5109 void Objecter::enumerate_objects(
5110 int64_t pool_id,
5111 std::string_view ns,
5112 hobject_t start,
5113 hobject_t end,
5114 const uint32_t max,
5115 const cb::list& filter_bl,
5116 fu2::unique_function<void(bs::error_code,
5117 std::vector<T>,
5118 hobject_t) &&> on_finish) {
5119 if (!end.is_max() && start > end) {
5120 lderr(cct) << __func__ << ": start " << start << " > end " << end << dendl;
5121 std::move(on_finish)(osdc_errc::precondition_violated, {}, {});
5122 return;
5123 }
5124
5125 if (max < 1) {
5126 lderr(cct) << __func__ << ": result size may not be zero" << dendl;
5127 std::move(on_finish)(osdc_errc::precondition_violated, {}, {});
5128 return;
5129 }
5130
5131 if (start.is_max()) {
5132 std::move(on_finish)({}, {}, {});
5133 return;
5134 }
5135
5136 shared_lock rl(rwlock);
5137 ceph_assert(osdmap->get_epoch());
5138 if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) {
5139 rl.unlock();
5140 lderr(cct) << __func__ << ": SORTBITWISE cluster flag not set" << dendl;
5141 std::move(on_finish)(osdc_errc::not_supported, {}, {});
5142 return;
5143 }
5144 const pg_pool_t* p = osdmap->get_pg_pool(pool_id);
5145 if (!p) {
5146 lderr(cct) << __func__ << ": pool " << pool_id << " DNE in osd epoch "
5147 << osdmap->get_epoch() << dendl;
5148 rl.unlock();
5149 std::move(on_finish)(osdc_errc::pool_dne, {}, {});
5150 return;
5151 } else {
5152 rl.unlock();
5153 }
5154
5155 _issue_enumerate(start,
5156 std::make_unique<EnumerationContext<T>>(
5157 this, std::move(end), filter_bl,
5158 max, object_locator_t{pool_id, ns},
5159 std::move(on_finish)));
5160 }
5161
5162 template
5163 void Objecter::enumerate_objects<librados::ListObjectImpl>(
5164 int64_t pool_id,
5165 std::string_view ns,
5166 hobject_t start,
5167 hobject_t end,
5168 const uint32_t max,
5169 const cb::list& filter_bl,
5170 fu2::unique_function<void(bs::error_code,
5171 std::vector<librados::ListObjectImpl>,
5172 hobject_t) &&> on_finish);
5173
5174 template
5175 void Objecter::enumerate_objects<neorados::Entry>(
5176 int64_t pool_id,
5177 std::string_view ns,
5178 hobject_t start,
5179 hobject_t end,
5180 const uint32_t max,
5181 const cb::list& filter_bl,
5182 fu2::unique_function<void(bs::error_code,
5183 std::vector<neorados::Entry>,
5184 hobject_t) &&> on_finish);
5185
5186
5187
5188 template<typename T>
5189 void Objecter::_issue_enumerate(hobject_t start,
5190 std::unique_ptr<EnumerationContext<T>> ctx) {
5191 ObjectOperation op;
5192 auto c = ctx.get();
5193 op.pg_nls(c->max, c->filter, start, osdmap->get_epoch());
5194 auto on_ack = std::make_unique<CB_EnumerateReply<T>>(this, std::move(ctx));
5195 // I hate having to do this. Try to find a cleaner way
5196 // later.
5197 auto epoch = &c->epoch;
5198 auto budget = &c->budget;
5199 auto pbl = &on_ack->bl;
5200
5201 // Issue. See you later in _enumerate_reply
5202 pg_read(start.get_hash(),
5203 c->oloc, op, pbl, 0,
5204 Op::OpComp::create(service.get_executor(),
5205 [c = std::move(on_ack)]
5206 (bs::error_code ec) mutable {
5207 (*c)(ec);
5208 }), epoch, budget);
5209 }
5210
5211 template
5212 void Objecter::_issue_enumerate<librados::ListObjectImpl>(
5213 hobject_t start,
5214 std::unique_ptr<EnumerationContext<librados::ListObjectImpl>> ctx);
5215 template
5216 void Objecter::_issue_enumerate<neorados::Entry>(
5217 hobject_t start, std::unique_ptr<EnumerationContext<neorados::Entry>> ctx);
5218
5219 template<typename T>
5220 void Objecter::_enumerate_reply(
5221 cb::list&& bl,
5222 bs::error_code ec,
5223 std::unique_ptr<EnumerationContext<T>>&& ctx)
5224 {
5225 if (ec) {
5226 std::move(*ctx)(ec, {}, {});
5227 return;
5228 }
5229
5230 // Decode the results
5231 auto iter = bl.cbegin();
5232 pg_nls_response_template<T> response;
5233
5234 try {
5235 response.decode(iter);
5236 if (!iter.end()) {
5237 // extra_info isn't used anywhere. We do this solely to preserve
5238 // backward compatibility
5239 cb::list legacy_extra_info;
5240 decode(legacy_extra_info, iter);
5241 }
5242 } catch (const bs::system_error& e) {
5243 std::move(*ctx)(e.code(), {}, {});
5244 return;
5245 }
5246
5247 shared_lock rl(rwlock);
5248 auto pool = osdmap->get_pg_pool(ctx->oloc.get_pool());
5249 rl.unlock();
5250 if (!pool) {
5251 // pool is gone, drop any results which are now meaningless.
5252 std::move(*ctx)(osdc_errc::pool_dne, {}, {});
5253 return;
5254 }
5255
5256 hobject_t next;
5257 if ((response.handle <= ctx->end)) {
5258 next = response.handle;
5259 } else {
5260 next = ctx->end;
5261
5262 // drop anything after 'end'
5263 while (!response.entries.empty()) {
5264 uint32_t hash = response.entries.back().locator.empty() ?
5265 pool->hash_key(response.entries.back().oid,
5266 response.entries.back().nspace) :
5267 pool->hash_key(response.entries.back().locator,
5268 response.entries.back().nspace);
5269 hobject_t last(response.entries.back().oid,
5270 response.entries.back().locator,
5271 CEPH_NOSNAP,
5272 hash,
5273 ctx->oloc.get_pool(),
5274 response.entries.back().nspace);
5275 if (last < ctx->end)
5276 break;
5277 response.entries.pop_back();
5278 }
5279 }
5280
5281 if (response.entries.size() <= ctx->max) {
5282 ctx->max -= response.entries.size();
5283 std::move(response.entries.begin(), response.entries.end(),
5284 std::back_inserter(ctx->ls));
5285 } else {
5286 auto i = response.entries.begin();
5287 while (ctx->max > 0) {
5288 ctx->ls.push_back(std::move(*i));
5289 --(ctx->max);
5290 ++i;
5291 }
5292 uint32_t hash =
5293 i->locator.empty() ?
5294 pool->hash_key(i->oid, i->nspace) :
5295 pool->hash_key(i->locator, i->nspace);
5296
5297 next = hobject_t{i->oid, i->locator,
5298 CEPH_NOSNAP,
5299 hash,
5300 ctx->oloc.get_pool(),
5301 i->nspace};
5302 }
5303
5304 if (next == ctx->end || ctx->max == 0) {
5305 std::move(*ctx)(ec, std::move(ctx->ls), std::move(next));
5306 } else {
5307 _issue_enumerate(next, std::move(ctx));
5308 }
5309 }
5310
5311 template
5312 void Objecter::_enumerate_reply<librados::ListObjectImpl>(
5313 cb::list&& bl,
5314 bs::error_code ec,
5315 std::unique_ptr<EnumerationContext<librados::ListObjectImpl>>&& ctx);
5316
5317 template
5318 void Objecter::_enumerate_reply<neorados::Entry>(
5319 cb::list&& bl,
5320 bs::error_code ec,
5321 std::unique_ptr<EnumerationContext<neorados::Entry>>&& ctx);
5322
5323 namespace {
5324 using namespace librados;
5325
5326 template <typename T>
5327 void do_decode(std::vector<T>& items, std::vector<cb::list>& bls)
5328 {
5329 for (auto bl : bls) {
5330 auto p = bl.cbegin();
5331 T t;
5332 decode(t, p);
5333 items.push_back(t);
5334 }
5335 }
5336
5337 struct C_ObjectOperation_scrub_ls : public Context {
5338 cb::list bl;
5339 uint32_t* interval;
5340 std::vector<inconsistent_obj_t> *objects = nullptr;
5341 std::vector<inconsistent_snapset_t> *snapsets = nullptr;
5342 int* rval;
5343
5344 C_ObjectOperation_scrub_ls(uint32_t* interval,
5345 std::vector<inconsistent_obj_t>* objects,
5346 int* rval)
5347 : interval(interval), objects(objects), rval(rval) {}
5348 C_ObjectOperation_scrub_ls(uint32_t* interval,
5349 std::vector<inconsistent_snapset_t>* snapsets,
5350 int* rval)
5351 : interval(interval), snapsets(snapsets), rval(rval) {}
5352 void finish(int r) override {
5353 if (r < 0 && r != -EAGAIN) {
5354 if (rval)
5355 *rval = r;
5356 return;
5357 }
5358
5359 if (rval)
5360 *rval = 0;
5361
5362 try {
5363 decode();
5364 } catch (cb::error&) {
5365 if (rval)
5366 *rval = -EIO;
5367 }
5368 }
5369 private:
5370 void decode() {
5371 scrub_ls_result_t result;
5372 auto p = bl.cbegin();
5373 result.decode(p);
5374 *interval = result.interval;
5375 if (objects) {
5376 do_decode(*objects, result.vals);
5377 } else {
5378 do_decode(*snapsets, result.vals);
5379 }
5380 }
5381 };
5382
5383 template <typename T>
5384 void do_scrub_ls(::ObjectOperation* op,
5385 const scrub_ls_arg_t& arg,
5386 std::vector<T> *items,
5387 uint32_t* interval,
5388 int* rval)
5389 {
5390 OSDOp& osd_op = op->add_op(CEPH_OSD_OP_SCRUBLS);
5391 op->flags |= CEPH_OSD_FLAG_PGOP;
5392 ceph_assert(interval);
5393 arg.encode(osd_op.indata);
5394 unsigned p = op->ops.size() - 1;
5395 auto h = new C_ObjectOperation_scrub_ls{interval, items, rval};
5396 op->set_handler(h);
5397 op->out_bl[p] = &h->bl;
5398 op->out_rval[p] = rval;
5399 }
5400 }
5401
5402 void ::ObjectOperation::scrub_ls(const librados::object_id_t& start_after,
5403 uint64_t max_to_get,
5404 std::vector<librados::inconsistent_obj_t>* objects,
5405 uint32_t* interval,
5406 int* rval)
5407 {
5408 scrub_ls_arg_t arg = {*interval, 0, start_after, max_to_get};
5409 do_scrub_ls(this, arg, objects, interval, rval);
5410 }
5411
5412 void ::ObjectOperation::scrub_ls(const librados::object_id_t& start_after,
5413 uint64_t max_to_get,
5414 std::vector<librados::inconsistent_snapset_t> *snapsets,
5415 uint32_t *interval,
5416 int *rval)
5417 {
5418 scrub_ls_arg_t arg = {*interval, 1, start_after, max_to_get};
5419 do_scrub_ls(this, arg, snapsets, interval, rval);
5420 }