]> git.proxmox.com Git - ceph.git/blame - ceph/src/osdc/Objecter.cc
import ceph quincy 17.2.6
[ceph.git] / ceph / src / osdc / Objecter.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
f67539c2 15#include <algorithm>
9f95a23c
TL
16#include <cerrno>
17
7c673cae
FG
18#include "Objecter.h"
19#include "osd/OSDMap.h"
f67539c2 20#include "osd/error_code.h"
7c673cae
FG
21#include "Filer.h"
22
23#include "mon/MonClient.h"
f67539c2 24#include "mon/error_code.h"
7c673cae
FG
25
26#include "msg/Messenger.h"
27#include "msg/Message.h"
28
29#include "messages/MPing.h"
30#include "messages/MOSDOp.h"
31#include "messages/MOSDOpReply.h"
32#include "messages/MOSDBackoff.h"
33#include "messages/MOSDMap.h"
34
35#include "messages/MPoolOp.h"
36#include "messages/MPoolOpReply.h"
37
38#include "messages/MGetPoolStats.h"
39#include "messages/MGetPoolStatsReply.h"
40#include "messages/MStatfs.h"
41#include "messages/MStatfsReply.h"
42
43#include "messages/MMonCommand.h"
44
45#include "messages/MCommand.h"
46#include "messages/MCommandReply.h"
47
48#include "messages/MWatchNotify.h"
49
7c673cae 50
f67539c2 51#include "common/Cond.h"
7c673cae
FG
52#include "common/config.h"
53#include "common/perf_counters.h"
54#include "common/scrub_types.h"
55#include "include/str_list.h"
56#include "common/errno.h"
57#include "common/EventTrace.h"
f67539c2
TL
58#include "common/async/waiter.h"
59#include "error_code.h"
60
7c673cae 61
9f95a23c
TL
62using std::list;
63using std::make_pair;
64using std::map;
65using std::ostream;
66using std::ostringstream;
67using std::pair;
68using std::set;
69using std::string;
70using std::stringstream;
71using std::vector;
72
73using ceph::decode;
74using ceph::encode;
75using ceph::Formatter;
76
77using std::defer_lock;
20effc67
TL
78using std::scoped_lock;
79using std::shared_lock;
80using std::unique_lock;
9f95a23c 81
7c673cae
FG
82using ceph::real_time;
83using ceph::real_clock;
84
85using ceph::mono_clock;
86using ceph::mono_time;
87
88using ceph::timespan;
89
9f95a23c
TL
90using ceph::shunique_lock;
91using ceph::acquire_shared;
92using ceph::acquire_unique;
7c673cae 93
f67539c2
TL
94namespace bc = boost::container;
95namespace bs = boost::system;
96namespace ca = ceph::async;
97namespace cb = ceph::buffer;
98
7c673cae
FG
99#define dout_subsys ceph_subsys_objecter
100#undef dout_prefix
101#define dout_prefix *_dout << messenger->get_myname() << ".objecter "
102
103
104enum {
105 l_osdc_first = 123200,
106 l_osdc_op_active,
107 l_osdc_op_laggy,
108 l_osdc_op_send,
109 l_osdc_op_send_bytes,
110 l_osdc_op_resend,
111 l_osdc_op_reply,
f67539c2 112 l_osdc_oplen_avg,
7c673cae
FG
113
114 l_osdc_op,
115 l_osdc_op_r,
116 l_osdc_op_w,
117 l_osdc_op_rmw,
118 l_osdc_op_pg,
119
120 l_osdc_osdop_stat,
121 l_osdc_osdop_create,
122 l_osdc_osdop_read,
123 l_osdc_osdop_write,
124 l_osdc_osdop_writefull,
125 l_osdc_osdop_writesame,
126 l_osdc_osdop_append,
127 l_osdc_osdop_zero,
128 l_osdc_osdop_truncate,
129 l_osdc_osdop_delete,
130 l_osdc_osdop_mapext,
131 l_osdc_osdop_sparse_read,
132 l_osdc_osdop_clonerange,
133 l_osdc_osdop_getxattr,
134 l_osdc_osdop_setxattr,
135 l_osdc_osdop_cmpxattr,
136 l_osdc_osdop_rmxattr,
137 l_osdc_osdop_resetxattrs,
7c673cae
FG
138 l_osdc_osdop_call,
139 l_osdc_osdop_watch,
140 l_osdc_osdop_notify,
141 l_osdc_osdop_src_cmpxattr,
142 l_osdc_osdop_pgls,
143 l_osdc_osdop_pgls_filter,
144 l_osdc_osdop_other,
145
146 l_osdc_linger_active,
147 l_osdc_linger_send,
148 l_osdc_linger_resend,
149 l_osdc_linger_ping,
150
151 l_osdc_poolop_active,
152 l_osdc_poolop_send,
153 l_osdc_poolop_resend,
154
155 l_osdc_poolstat_active,
156 l_osdc_poolstat_send,
157 l_osdc_poolstat_resend,
158
159 l_osdc_statfs_active,
160 l_osdc_statfs_send,
161 l_osdc_statfs_resend,
162
163 l_osdc_command_active,
164 l_osdc_command_send,
165 l_osdc_command_resend,
166
167 l_osdc_map_epoch,
168 l_osdc_map_full,
169 l_osdc_map_inc,
170
171 l_osdc_osd_sessions,
172 l_osdc_osd_session_open,
173 l_osdc_osd_session_close,
174 l_osdc_osd_laggy,
175
176 l_osdc_osdop_omap_wr,
177 l_osdc_osdop_omap_rd,
178 l_osdc_osdop_omap_del,
179
180 l_osdc_last,
181};
182
f67539c2
TL
183namespace {
184inline bs::error_code osdcode(int r) {
185 return (r < 0) ? bs::error_code(-r, osd_category()) : bs::error_code();
186}
187}
7c673cae
FG
188
189// config obs ----------------------------
190
7c673cae
FG
191class Objecter::RequestStateHook : public AdminSocketHook {
192 Objecter *m_objecter;
193public:
194 explicit RequestStateHook(Objecter *objecter);
9f95a23c 195 int call(std::string_view command, const cmdmap_t& cmdmap,
39ae355f 196 const bufferlist&,
9f95a23c
TL
197 Formatter *f,
198 std::ostream& ss,
f67539c2 199 cb::list& out) override;
7c673cae
FG
200};
201
f67539c2 202std::unique_lock<std::mutex> Objecter::OSDSession::get_lock(object_t& oid)
7c673cae
FG
203{
204 if (oid.name.empty())
f67539c2 205 return {};
7c673cae
FG
206
207 static constexpr uint32_t HASH_PRIME = 1021;
208 uint32_t h = ceph_str_hash_linux(oid.name.c_str(), oid.name.size())
209 % HASH_PRIME;
210
f67539c2 211 return {completion_locks[h % num_locks], std::defer_lock};
7c673cae
FG
212}
213
214const char** Objecter::get_tracked_conf_keys() const
215{
f91f0fd5
TL
216 static const char *config_keys[] = {
217 "crush_location",
218 "rados_mon_op_timeout",
219 "rados_osd_op_timeout",
220 NULL
221 };
7c673cae
FG
222 return config_keys;
223}
224
225
11fdf7f2 226void Objecter::handle_conf_change(const ConfigProxy& conf,
7c673cae
FG
227 const std::set <std::string> &changed)
228{
229 if (changed.count("crush_location")) {
230 update_crush_location();
231 }
f91f0fd5
TL
232 if (changed.count("rados_mon_op_timeout")) {
233 mon_timeout = conf.get_val<std::chrono::seconds>("rados_mon_op_timeout");
234 }
235 if (changed.count("rados_osd_op_timeout")) {
236 osd_timeout = conf.get_val<std::chrono::seconds>("rados_osd_op_timeout");
237 }
7c673cae
FG
238}
239
240void Objecter::update_crush_location()
241{
242 unique_lock wl(rwlock);
243 crush_location = cct->crush_location.get_location();
244}
245
246// messages ------------------------------
247
248/*
249 * initialize only internal data structures, don't initiate cluster interaction
250 */
251void Objecter::init()
252{
11fdf7f2 253 ceph_assert(!initialized);
7c673cae
FG
254
255 if (!logger) {
256 PerfCountersBuilder pcb(cct, "objecter", l_osdc_first, l_osdc_last);
257
258 pcb.add_u64(l_osdc_op_active, "op_active", "Operations active", "actv",
259 PerfCountersBuilder::PRIO_CRITICAL);
260 pcb.add_u64(l_osdc_op_laggy, "op_laggy", "Laggy operations");
261 pcb.add_u64_counter(l_osdc_op_send, "op_send", "Sent operations");
11fdf7f2 262 pcb.add_u64_counter(l_osdc_op_send_bytes, "op_send_bytes", "Sent data", NULL, 0, unit_t(UNIT_BYTES));
7c673cae
FG
263 pcb.add_u64_counter(l_osdc_op_resend, "op_resend", "Resent operations");
264 pcb.add_u64_counter(l_osdc_op_reply, "op_reply", "Operation reply");
f67539c2 265 pcb.add_u64_avg(l_osdc_oplen_avg, "oplen_avg", "Average length of operation vector");
7c673cae
FG
266
267 pcb.add_u64_counter(l_osdc_op, "op", "Operations");
268 pcb.add_u64_counter(l_osdc_op_r, "op_r", "Read operations", "rd",
269 PerfCountersBuilder::PRIO_CRITICAL);
270 pcb.add_u64_counter(l_osdc_op_w, "op_w", "Write operations", "wr",
271 PerfCountersBuilder::PRIO_CRITICAL);
272 pcb.add_u64_counter(l_osdc_op_rmw, "op_rmw", "Read-modify-write operations",
273 "rdwr", PerfCountersBuilder::PRIO_INTERESTING);
274 pcb.add_u64_counter(l_osdc_op_pg, "op_pg", "PG operation");
275
276 pcb.add_u64_counter(l_osdc_osdop_stat, "osdop_stat", "Stat operations");
277 pcb.add_u64_counter(l_osdc_osdop_create, "osdop_create",
278 "Create object operations");
279 pcb.add_u64_counter(l_osdc_osdop_read, "osdop_read", "Read operations");
280 pcb.add_u64_counter(l_osdc_osdop_write, "osdop_write", "Write operations");
281 pcb.add_u64_counter(l_osdc_osdop_writefull, "osdop_writefull",
282 "Write full object operations");
283 pcb.add_u64_counter(l_osdc_osdop_writesame, "osdop_writesame",
284 "Write same operations");
285 pcb.add_u64_counter(l_osdc_osdop_append, "osdop_append",
286 "Append operation");
287 pcb.add_u64_counter(l_osdc_osdop_zero, "osdop_zero",
288 "Set object to zero operations");
289 pcb.add_u64_counter(l_osdc_osdop_truncate, "osdop_truncate",
290 "Truncate object operations");
291 pcb.add_u64_counter(l_osdc_osdop_delete, "osdop_delete",
292 "Delete object operations");
293 pcb.add_u64_counter(l_osdc_osdop_mapext, "osdop_mapext",
294 "Map extent operations");
295 pcb.add_u64_counter(l_osdc_osdop_sparse_read, "osdop_sparse_read",
296 "Sparse read operations");
297 pcb.add_u64_counter(l_osdc_osdop_clonerange, "osdop_clonerange",
298 "Clone range operations");
299 pcb.add_u64_counter(l_osdc_osdop_getxattr, "osdop_getxattr",
300 "Get xattr operations");
301 pcb.add_u64_counter(l_osdc_osdop_setxattr, "osdop_setxattr",
302 "Set xattr operations");
303 pcb.add_u64_counter(l_osdc_osdop_cmpxattr, "osdop_cmpxattr",
304 "Xattr comparison operations");
305 pcb.add_u64_counter(l_osdc_osdop_rmxattr, "osdop_rmxattr",
306 "Remove xattr operations");
307 pcb.add_u64_counter(l_osdc_osdop_resetxattrs, "osdop_resetxattrs",
308 "Reset xattr operations");
7c673cae
FG
309 pcb.add_u64_counter(l_osdc_osdop_call, "osdop_call",
310 "Call (execute) operations");
311 pcb.add_u64_counter(l_osdc_osdop_watch, "osdop_watch",
312 "Watch by object operations");
313 pcb.add_u64_counter(l_osdc_osdop_notify, "osdop_notify",
314 "Notify about object operations");
315 pcb.add_u64_counter(l_osdc_osdop_src_cmpxattr, "osdop_src_cmpxattr",
316 "Extended attribute comparison in multi operations");
317 pcb.add_u64_counter(l_osdc_osdop_pgls, "osdop_pgls");
318 pcb.add_u64_counter(l_osdc_osdop_pgls_filter, "osdop_pgls_filter");
319 pcb.add_u64_counter(l_osdc_osdop_other, "osdop_other", "Other operations");
320
321 pcb.add_u64(l_osdc_linger_active, "linger_active",
322 "Active lingering operations");
323 pcb.add_u64_counter(l_osdc_linger_send, "linger_send",
324 "Sent lingering operations");
325 pcb.add_u64_counter(l_osdc_linger_resend, "linger_resend",
326 "Resent lingering operations");
327 pcb.add_u64_counter(l_osdc_linger_ping, "linger_ping",
328 "Sent pings to lingering operations");
329
330 pcb.add_u64(l_osdc_poolop_active, "poolop_active",
331 "Active pool operations");
332 pcb.add_u64_counter(l_osdc_poolop_send, "poolop_send",
333 "Sent pool operations");
334 pcb.add_u64_counter(l_osdc_poolop_resend, "poolop_resend",
335 "Resent pool operations");
336
337 pcb.add_u64(l_osdc_poolstat_active, "poolstat_active",
338 "Active get pool stat operations");
339 pcb.add_u64_counter(l_osdc_poolstat_send, "poolstat_send",
340 "Pool stat operations sent");
341 pcb.add_u64_counter(l_osdc_poolstat_resend, "poolstat_resend",
342 "Resent pool stats");
343
344 pcb.add_u64(l_osdc_statfs_active, "statfs_active", "Statfs operations");
345 pcb.add_u64_counter(l_osdc_statfs_send, "statfs_send", "Sent FS stats");
346 pcb.add_u64_counter(l_osdc_statfs_resend, "statfs_resend",
347 "Resent FS stats");
348
349 pcb.add_u64(l_osdc_command_active, "command_active", "Active commands");
350 pcb.add_u64_counter(l_osdc_command_send, "command_send",
351 "Sent commands");
352 pcb.add_u64_counter(l_osdc_command_resend, "command_resend",
353 "Resent commands");
354
355 pcb.add_u64(l_osdc_map_epoch, "map_epoch", "OSD map epoch");
356 pcb.add_u64_counter(l_osdc_map_full, "map_full",
357 "Full OSD maps received");
358 pcb.add_u64_counter(l_osdc_map_inc, "map_inc",
359 "Incremental OSD maps received");
360
361 pcb.add_u64(l_osdc_osd_sessions, "osd_sessions",
362 "Open sessions"); // open sessions
363 pcb.add_u64_counter(l_osdc_osd_session_open, "osd_session_open",
364 "Sessions opened");
365 pcb.add_u64_counter(l_osdc_osd_session_close, "osd_session_close",
366 "Sessions closed");
367 pcb.add_u64(l_osdc_osd_laggy, "osd_laggy", "Laggy OSD sessions");
368
369 pcb.add_u64_counter(l_osdc_osdop_omap_wr, "omap_wr",
370 "OSD OMAP write operations");
371 pcb.add_u64_counter(l_osdc_osdop_omap_rd, "omap_rd",
372 "OSD OMAP read operations");
373 pcb.add_u64_counter(l_osdc_osdop_omap_del, "omap_del",
374 "OSD OMAP delete operations");
375
376 logger = pcb.create_perf_counters();
377 cct->get_perfcounters_collection()->add(logger);
378 }
379
380 m_request_state_hook = new RequestStateHook(this);
f67539c2 381 auto admin_socket = cct->get_admin_socket();
7c673cae 382 int ret = admin_socket->register_command("objecter_requests",
7c673cae
FG
383 m_request_state_hook,
384 "show in-progress osd requests");
385
386 /* Don't warn on EEXIST, happens if multiple ceph clients
387 * are instantiated from one process */
388 if (ret < 0 && ret != -EEXIST) {
389 lderr(cct) << "error registering admin socket command: "
390 << cpp_strerror(ret) << dendl;
391 }
392
393 update_crush_location();
394
11fdf7f2 395 cct->_conf.add_observer(this);
7c673cae 396
31f18b77 397 initialized = true;
7c673cae
FG
398}
399
400/*
401 * ok, cluster interaction can happen
402 */
403void Objecter::start(const OSDMap* o)
404{
405 shared_lock rl(rwlock);
406
407 start_tick();
408 if (o) {
409 osdmap->deepish_copy_from(*o);
9f95a23c 410 prune_pg_mapping(osdmap->get_pools());
7c673cae
FG
411 } else if (osdmap->get_epoch() == 0) {
412 _maybe_request_map();
413 }
414}
415
416void Objecter::shutdown()
417{
11fdf7f2 418 ceph_assert(initialized);
7c673cae
FG
419
420 unique_lock wl(rwlock);
421
31f18b77 422 initialized = false;
7c673cae 423
f64942e4 424 wl.unlock();
11fdf7f2 425 cct->_conf.remove_observer(this);
f64942e4 426 wl.lock();
7c673cae 427
7c673cae 428 while (!osd_sessions.empty()) {
f67539c2 429 auto p = osd_sessions.begin();
7c673cae
FG
430 close_session(p->second);
431 }
432
433 while(!check_latest_map_lingers.empty()) {
f67539c2 434 auto i = check_latest_map_lingers.begin();
7c673cae
FG
435 i->second->put();
436 check_latest_map_lingers.erase(i->first);
437 }
438
439 while(!check_latest_map_ops.empty()) {
f67539c2 440 auto i = check_latest_map_ops.begin();
7c673cae
FG
441 i->second->put();
442 check_latest_map_ops.erase(i->first);
443 }
444
445 while(!check_latest_map_commands.empty()) {
f67539c2 446 auto i = check_latest_map_commands.begin();
7c673cae
FG
447 i->second->put();
448 check_latest_map_commands.erase(i->first);
449 }
450
451 while(!poolstat_ops.empty()) {
f67539c2 452 auto i = poolstat_ops.begin();
7c673cae
FG
453 delete i->second;
454 poolstat_ops.erase(i->first);
455 }
456
457 while(!statfs_ops.empty()) {
f67539c2 458 auto i = statfs_ops.begin();
7c673cae
FG
459 delete i->second;
460 statfs_ops.erase(i->first);
461 }
462
463 while(!pool_ops.empty()) {
f67539c2 464 auto i = pool_ops.begin();
7c673cae
FG
465 delete i->second;
466 pool_ops.erase(i->first);
467 }
468
469 ldout(cct, 20) << __func__ << " clearing up homeless session..." << dendl;
470 while(!homeless_session->linger_ops.empty()) {
f67539c2 471 auto i = homeless_session->linger_ops.begin();
7c673cae
FG
472 ldout(cct, 10) << " linger_op " << i->first << dendl;
473 LingerOp *lop = i->second;
474 {
f67539c2 475 std::unique_lock swl(homeless_session->lock);
7c673cae
FG
476 _session_linger_op_remove(homeless_session, lop);
477 }
478 linger_ops.erase(lop->linger_id);
479 linger_ops_set.erase(lop);
480 lop->put();
481 }
482
483 while(!homeless_session->ops.empty()) {
f67539c2 484 auto i = homeless_session->ops.begin();
7c673cae 485 ldout(cct, 10) << " op " << i->first << dendl;
f67539c2 486 auto op = i->second;
7c673cae 487 {
f67539c2 488 std::unique_lock swl(homeless_session->lock);
7c673cae
FG
489 _session_op_remove(homeless_session, op);
490 }
491 op->put();
492 }
493
494 while(!homeless_session->command_ops.empty()) {
f67539c2 495 auto i = homeless_session->command_ops.begin();
7c673cae 496 ldout(cct, 10) << " command_op " << i->first << dendl;
f67539c2 497 auto cop = i->second;
7c673cae 498 {
f67539c2 499 std::unique_lock swl(homeless_session->lock);
7c673cae
FG
500 _session_command_op_remove(homeless_session, cop);
501 }
502 cop->put();
503 }
504
505 if (tick_event) {
506 if (timer.cancel_event(tick_event)) {
507 ldout(cct, 10) << " successfully canceled tick" << dendl;
508 }
509 tick_event = 0;
510 }
511
512 if (logger) {
513 cct->get_perfcounters_collection()->remove(logger);
514 delete logger;
515 logger = NULL;
516 }
517
518 // Let go of Objecter write lock so timer thread can shutdown
519 wl.unlock();
520
521 // Outside of lock to avoid cycle WRT calls to RequestStateHook
522 // This is safe because we guarantee no concurrent calls to
523 // shutdown() with the ::initialized check at start.
524 if (m_request_state_hook) {
f67539c2 525 auto admin_socket = cct->get_admin_socket();
9f95a23c 526 admin_socket->unregister_commands(m_request_state_hook);
7c673cae
FG
527 delete m_request_state_hook;
528 m_request_state_hook = NULL;
529 }
530}
531
532void Objecter::_send_linger(LingerOp *info,
f67539c2 533 ceph::shunique_lock<ceph::shared_mutex>& sul)
7c673cae 534{
11fdf7f2 535 ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
7c673cae 536
f67539c2
TL
537 fu2::unique_function<Op::OpSig> oncommit;
538 osdc_opvec opv;
539 std::shared_lock watchl(info->watch_lock);
540 cb::list *poutbl = nullptr;
7c673cae
FG
541 if (info->registered && info->is_watch) {
542 ldout(cct, 15) << "send_linger " << info->linger_id << " reconnect"
543 << dendl;
544 opv.push_back(OSDOp());
545 opv.back().op.op = CEPH_OSD_OP_WATCH;
546 opv.back().op.watch.cookie = info->get_cookie();
547 opv.back().op.watch.op = CEPH_OSD_WATCH_OP_RECONNECT;
548 opv.back().op.watch.gen = ++info->register_gen;
f67539c2 549 oncommit = CB_Linger_Reconnect(this, info);
7c673cae
FG
550 } else {
551 ldout(cct, 15) << "send_linger " << info->linger_id << " register"
552 << dendl;
553 opv = info->ops;
f67539c2
TL
554 // TODO Augment ca::Completion with an equivalent of
555 // target so we can handle these cases better.
556 auto c = std::make_unique<CB_Linger_Commit>(this, info);
7c673cae
FG
557 if (!info->is_watch) {
558 info->notify_id = 0;
559 poutbl = &c->outbl;
560 }
f67539c2
TL
561 oncommit = [c = std::move(c)](bs::error_code ec) mutable {
562 std::move(*c)(ec);
563 };
7c673cae
FG
564 }
565 watchl.unlock();
f67539c2
TL
566 auto o = new Op(info->target.base_oid, info->target.base_oloc,
567 std::move(opv), info->target.flags | CEPH_OSD_FLAG_READ,
568 std::move(oncommit), info->pobjver);
7c673cae
FG
569 o->outbl = poutbl;
570 o->snapid = info->snap;
571 o->snapc = info->snapc;
572 o->mtime = info->mtime;
573
574 o->target = info->target;
31f18b77 575 o->tid = ++last_tid;
7c673cae
FG
576
577 // do not resend this; we will send a new op to reregister
578 o->should_resend = false;
11fdf7f2 579 o->ctx_budgeted = true;
7c673cae
FG
580
581 if (info->register_tid) {
11fdf7f2 582 // repeat send. cancel old registration op, if any.
f67539c2 583 std::unique_lock sl(info->session->lock);
7c673cae 584 if (info->session->ops.count(info->register_tid)) {
f67539c2 585 auto o = info->session->ops[info->register_tid];
7c673cae
FG
586 _op_cancel_map_check(o);
587 _cancel_linger_op(o);
588 }
589 sl.unlock();
7c673cae
FG
590 }
591
11fdf7f2
TL
592 _op_submit_with_budget(o, sul, &info->register_tid, &info->ctx_budget);
593
7c673cae
FG
594 logger->inc(l_osdc_linger_send);
595}
596
f67539c2
TL
597void Objecter::_linger_commit(LingerOp *info, bs::error_code ec,
598 cb::list& outbl)
7c673cae 599{
f67539c2 600 std::unique_lock wl(info->watch_lock);
7c673cae
FG
601 ldout(cct, 10) << "_linger_commit " << info->linger_id << dendl;
602 if (info->on_reg_commit) {
f67539c2
TL
603 info->on_reg_commit->defer(std::move(info->on_reg_commit),
604 ec, cb::list{});
605 info->on_reg_commit.reset();
7c673cae 606 }
f67539c2
TL
607 if (ec && info->on_notify_finish) {
608 info->on_notify_finish->defer(std::move(info->on_notify_finish),
609 ec, cb::list{});
610 info->on_notify_finish.reset();
28e407b8 611 }
7c673cae
FG
612
613 // only tell the user the first time we do this
614 info->registered = true;
615 info->pobjver = NULL;
616
617 if (!info->is_watch) {
618 // make note of the notify_id
11fdf7f2 619 auto p = outbl.cbegin();
7c673cae 620 try {
11fdf7f2 621 decode(info->notify_id, p);
7c673cae
FG
622 ldout(cct, 10) << "_linger_commit notify_id=" << info->notify_id
623 << dendl;
624 }
f67539c2 625 catch (cb::error& e) {
7c673cae
FG
626 }
627 }
628}
629
f67539c2 630class CB_DoWatchError {
7c673cae 631 Objecter *objecter;
f67539c2
TL
632 boost::intrusive_ptr<Objecter::LingerOp> info;
633 bs::error_code ec;
634public:
635 CB_DoWatchError(Objecter *o, Objecter::LingerOp *i,
636 bs::error_code ec)
637 : objecter(o), info(i), ec(ec) {
7c673cae
FG
638 info->_queued_async();
639 }
f67539c2
TL
640 void operator()() {
641 std::unique_lock wl(objecter->rwlock);
7c673cae
FG
642 bool canceled = info->canceled;
643 wl.unlock();
644
645 if (!canceled) {
f67539c2 646 info->handle(ec, 0, info->get_cookie(), 0, {});
7c673cae
FG
647 }
648
649 info->finished_async();
7c673cae
FG
650 }
651};
652
f67539c2 653bs::error_code Objecter::_normalize_watch_error(bs::error_code ec)
7c673cae
FG
654{
655 // translate ENOENT -> ENOTCONN so that a delete->disconnection
11fdf7f2 656 // notification and a failure to reconnect because we raced with
7c673cae 657 // the delete appear the same to the user.
f67539c2
TL
658 if (ec == bs::errc::no_such_file_or_directory)
659 ec = bs::error_code(ENOTCONN, osd_category());
660 return ec;
7c673cae
FG
661}
662
f67539c2 663void Objecter::_linger_reconnect(LingerOp *info, bs::error_code ec)
7c673cae 664{
f67539c2 665 ldout(cct, 10) << __func__ << " " << info->linger_id << " = " << ec
7c673cae 666 << " (last_error " << info->last_error << ")" << dendl;
f67539c2
TL
667 std::unique_lock wl(info->watch_lock);
668 if (ec) {
7c673cae 669 if (!info->last_error) {
f67539c2
TL
670 ec = _normalize_watch_error(ec);
671 if (info->handle) {
672 boost::asio::defer(finish_strand, CB_DoWatchError(this, info, ec));
7c673cae
FG
673 }
674 }
7c673cae 675 }
f67539c2
TL
676
677 info->last_error = ec;
7c673cae
FG
678}
679
680void Objecter::_send_linger_ping(LingerOp *info)
681{
682 // rwlock is locked unique
683 // info->session->lock is locked
684
685 if (cct->_conf->objecter_inject_no_watch_ping) {
686 ldout(cct, 10) << __func__ << " " << info->linger_id << " SKIPPING"
687 << dendl;
688 return;
689 }
690 if (osdmap->test_flag(CEPH_OSDMAP_PAUSERD)) {
691 ldout(cct, 10) << __func__ << " PAUSERD" << dendl;
692 return;
693 }
694
11fdf7f2 695 ceph::coarse_mono_time now = ceph::coarse_mono_clock::now();
7c673cae
FG
696 ldout(cct, 10) << __func__ << " " << info->linger_id << " now " << now
697 << dendl;
698
f67539c2 699 osdc_opvec opv(1);
7c673cae
FG
700 opv[0].op.op = CEPH_OSD_OP_WATCH;
701 opv[0].op.watch.cookie = info->get_cookie();
702 opv[0].op.watch.op = CEPH_OSD_WATCH_OP_PING;
703 opv[0].op.watch.gen = info->register_gen;
f67539c2 704
7c673cae 705 Op *o = new Op(info->target.base_oid, info->target.base_oloc,
f67539c2
TL
706 std::move(opv), info->target.flags | CEPH_OSD_FLAG_READ,
707 CB_Linger_Ping(this, info, now),
708 nullptr, nullptr);
7c673cae
FG
709 o->target = info->target;
710 o->should_resend = false;
711 _send_op_account(o);
31f18b77 712 o->tid = ++last_tid;
7c673cae 713 _session_op_assign(info->session, o);
11fdf7f2 714 _send_op(o);
7c673cae
FG
715 info->ping_tid = o->tid;
716
7c673cae
FG
717 logger->inc(l_osdc_linger_ping);
718}
719
f67539c2 720void Objecter::_linger_ping(LingerOp *info, bs::error_code ec, ceph::coarse_mono_time sent,
7c673cae
FG
721 uint32_t register_gen)
722{
f67539c2 723 std::unique_lock l(info->watch_lock);
7c673cae 724 ldout(cct, 10) << __func__ << " " << info->linger_id
f67539c2 725 << " sent " << sent << " gen " << register_gen << " = " << ec
7c673cae
FG
726 << " (last_error " << info->last_error
727 << " register_gen " << info->register_gen << ")" << dendl;
728 if (info->register_gen == register_gen) {
f67539c2 729 if (!ec) {
7c673cae 730 info->watch_valid_thru = sent;
f67539c2
TL
731 } else if (ec && !info->last_error) {
732 ec = _normalize_watch_error(ec);
733 info->last_error = ec;
734 if (info->handle) {
735 boost::asio::defer(finish_strand, CB_DoWatchError(this, info, ec));
7c673cae
FG
736 }
737 }
738 } else {
739 ldout(cct, 20) << " ignoring old gen" << dendl;
740 }
741}
742
f67539c2
TL
743tl::expected<ceph::timespan,
744 bs::error_code> Objecter::linger_check(LingerOp *info)
7c673cae 745{
f67539c2 746 std::shared_lock l(info->watch_lock);
7c673cae 747
11fdf7f2 748 ceph::coarse_mono_time stamp = info->watch_valid_thru;
7c673cae 749 if (!info->watch_pending_async.empty())
11fdf7f2
TL
750 stamp = std::min(info->watch_valid_thru, info->watch_pending_async.front());
751 auto age = ceph::coarse_mono_clock::now() - stamp;
7c673cae
FG
752
753 ldout(cct, 10) << __func__ << " " << info->linger_id
754 << " err " << info->last_error
755 << " age " << age << dendl;
756 if (info->last_error)
f67539c2 757 return tl::unexpected(info->last_error);
7c673cae 758 // return a safe upper bound (we are truncating to ms)
f67539c2 759 return age;
7c673cae
FG
760}
761
762void Objecter::linger_cancel(LingerOp *info)
763{
764 unique_lock wl(rwlock);
765 _linger_cancel(info);
766 info->put();
767}
768
769void Objecter::_linger_cancel(LingerOp *info)
770{
771 // rwlock is locked unique
772 ldout(cct, 20) << __func__ << " linger_id=" << info->linger_id << dendl;
773 if (!info->canceled) {
774 OSDSession *s = info->session;
f67539c2 775 std::unique_lock sl(s->lock);
7c673cae
FG
776 _session_linger_op_remove(s, info);
777 sl.unlock();
778
779 linger_ops.erase(info->linger_id);
780 linger_ops_set.erase(info);
11fdf7f2 781 ceph_assert(linger_ops.size() == linger_ops_set.size());
7c673cae
FG
782
783 info->canceled = true;
784 info->put();
785
786 logger->dec(l_osdc_linger_active);
787 }
788}
789
790
791
792Objecter::LingerOp *Objecter::linger_register(const object_t& oid,
793 const object_locator_t& oloc,
794 int flags)
795{
f67539c2
TL
796 unique_lock l(rwlock);
797 // Acquire linger ID
798 auto info = new LingerOp(this, ++max_linger_id);
7c673cae
FG
799 info->target.base_oid = oid;
800 info->target.base_oloc = oloc;
801 if (info->target.base_oloc.key == oid)
802 info->target.base_oloc.key.clear();
803 info->target.flags = flags;
11fdf7f2 804 info->watch_valid_thru = ceph::coarse_mono_clock::now();
7c673cae
FG
805 ldout(cct, 10) << __func__ << " info " << info
806 << " linger_id " << info->linger_id
807 << " cookie " << info->get_cookie()
808 << dendl;
809 linger_ops[info->linger_id] = info;
810 linger_ops_set.insert(info);
11fdf7f2 811 ceph_assert(linger_ops.size() == linger_ops_set.size());
7c673cae
FG
812
813 info->get(); // for the caller
814 return info;
815}
816
817ceph_tid_t Objecter::linger_watch(LingerOp *info,
818 ObjectOperation& op,
819 const SnapContext& snapc,
820 real_time mtime,
f67539c2
TL
821 cb::list& inbl,
822 decltype(info->on_reg_commit)&& oncommit,
7c673cae
FG
823 version_t *objver)
824{
825 info->is_watch = true;
826 info->snapc = snapc;
827 info->mtime = mtime;
828 info->target.flags |= CEPH_OSD_FLAG_WRITE;
829 info->ops = op.ops;
830 info->inbl = inbl;
7c673cae 831 info->pobjver = objver;
f67539c2 832 info->on_reg_commit = std::move(oncommit);
7c673cae 833
11fdf7f2
TL
834 info->ctx_budget = take_linger_budget(info);
835
7c673cae
FG
836 shunique_lock sul(rwlock, ceph::acquire_unique);
837 _linger_submit(info, sul);
838 logger->inc(l_osdc_linger_active);
839
f67539c2 840 op.clear();
7c673cae
FG
841 return info->linger_id;
842}
843
844ceph_tid_t Objecter::linger_notify(LingerOp *info,
845 ObjectOperation& op,
f67539c2
TL
846 snapid_t snap, cb::list& inbl,
847 decltype(LingerOp::on_reg_commit)&& onfinish,
7c673cae
FG
848 version_t *objver)
849{
850 info->snap = snap;
851 info->target.flags |= CEPH_OSD_FLAG_READ;
852 info->ops = op.ops;
853 info->inbl = inbl;
7c673cae 854 info->pobjver = objver;
f67539c2 855 info->on_reg_commit = std::move(onfinish);
11fdf7f2 856 info->ctx_budget = take_linger_budget(info);
f67539c2 857
7c673cae
FG
858 shunique_lock sul(rwlock, ceph::acquire_unique);
859 _linger_submit(info, sul);
860 logger->inc(l_osdc_linger_active);
861
f67539c2 862 op.clear();
7c673cae
FG
863 return info->linger_id;
864}
865
f67539c2
TL
866void Objecter::_linger_submit(LingerOp *info,
867 ceph::shunique_lock<ceph::shared_mutex>& sul)
7c673cae 868{
11fdf7f2
TL
869 ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
870 ceph_assert(info->linger_id);
871 ceph_assert(info->ctx_budget != -1); // caller needs to have taken budget already!
7c673cae
FG
872
873 // Populate Op::target
874 OSDSession *s = NULL;
20effc67
TL
875 int r = _calc_target(&info->target, nullptr);
876 switch (r) {
877 case RECALC_OP_TARGET_POOL_EIO:
878 _check_linger_pool_eio(info);
879 return;
880 }
7c673cae
FG
881
882 // Create LingerOp<->OSDSession relation
20effc67 883 r = _get_session(info->target.osd, &s, sul);
11fdf7f2 884 ceph_assert(r == 0);
f67539c2 885 unique_lock sl(s->lock);
7c673cae
FG
886 _session_linger_op_assign(s, info);
887 sl.unlock();
888 put_session(s);
889
890 _send_linger(info, sul);
891}
892
f67539c2 893struct CB_DoWatchNotify {
7c673cae 894 Objecter *objecter;
f67539c2
TL
895 boost::intrusive_ptr<Objecter::LingerOp> info;
896 boost::intrusive_ptr<MWatchNotify> msg;
897 CB_DoWatchNotify(Objecter *o, Objecter::LingerOp *i, MWatchNotify *m)
7c673cae 898 : objecter(o), info(i), msg(m) {
7c673cae 899 info->_queued_async();
7c673cae 900 }
f67539c2
TL
901 void operator()() {
902 objecter->_do_watch_notify(std::move(info), std::move(msg));
7c673cae
FG
903 }
904};
905
906void Objecter::handle_watch_notify(MWatchNotify *m)
907{
908 shared_lock l(rwlock);
31f18b77 909 if (!initialized) {
7c673cae
FG
910 return;
911 }
912
913 LingerOp *info = reinterpret_cast<LingerOp*>(m->cookie);
914 if (linger_ops_set.count(info) == 0) {
915 ldout(cct, 7) << __func__ << " cookie " << m->cookie << " dne" << dendl;
916 return;
917 }
f67539c2 918 std::unique_lock wl(info->watch_lock);
7c673cae
FG
919 if (m->opcode == CEPH_WATCH_EVENT_DISCONNECT) {
920 if (!info->last_error) {
f67539c2
TL
921 info->last_error = bs::error_code(ENOTCONN, osd_category());
922 if (info->handle) {
923 boost::asio::defer(finish_strand, CB_DoWatchError(this, info,
924 info->last_error));
7c673cae
FG
925 }
926 }
927 } else if (!info->is_watch) {
928 // we have CEPH_WATCH_EVENT_NOTIFY_COMPLETE; we can do this inline
929 // since we know the only user (librados) is safe to call in
930 // fast-dispatch context
931 if (info->notify_id &&
932 info->notify_id != m->notify_id) {
933 ldout(cct, 10) << __func__ << " reply notify " << m->notify_id
934 << " != " << info->notify_id << ", ignoring" << dendl;
935 } else if (info->on_notify_finish) {
f67539c2
TL
936 info->on_notify_finish->defer(
937 std::move(info->on_notify_finish),
938 osdcode(m->return_code), std::move(m->get_data()));
7c673cae
FG
939
940 // if we race with reconnect we might get a second notify; only
941 // notify the caller once!
f67539c2 942 info->on_notify_finish = nullptr;
7c673cae
FG
943 }
944 } else {
f67539c2 945 boost::asio::defer(finish_strand, CB_DoWatchNotify(this, info, m));
7c673cae
FG
946 }
947}
948
f67539c2
TL
949void Objecter::_do_watch_notify(boost::intrusive_ptr<LingerOp> info,
950 boost::intrusive_ptr<MWatchNotify> m)
7c673cae
FG
951{
952 ldout(cct, 10) << __func__ << " " << *m << dendl;
953
954 shared_lock l(rwlock);
11fdf7f2 955 ceph_assert(initialized);
7c673cae
FG
956
957 if (info->canceled) {
958 l.unlock();
959 goto out;
960 }
961
962 // notify completion?
11fdf7f2 963 ceph_assert(info->is_watch);
f67539c2 964 ceph_assert(info->handle);
11fdf7f2 965 ceph_assert(m->opcode != CEPH_WATCH_EVENT_DISCONNECT);
7c673cae
FG
966
967 l.unlock();
968
969 switch (m->opcode) {
970 case CEPH_WATCH_EVENT_NOTIFY:
f67539c2 971 info->handle({}, m->notify_id, m->cookie, m->notifier_gid, std::move(m->bl));
7c673cae
FG
972 break;
973 }
974
975 out:
976 info->finished_async();
7c673cae
FG
977}
978
979bool Objecter::ms_dispatch(Message *m)
980{
981 ldout(cct, 10) << __func__ << " " << cct << " " << *m << dendl;
7c673cae
FG
982 switch (m->get_type()) {
983 // these we exlusively handle
984 case CEPH_MSG_OSD_OPREPLY:
985 handle_osd_op_reply(static_cast<MOSDOpReply*>(m));
986 return true;
987
988 case CEPH_MSG_OSD_BACKOFF:
989 handle_osd_backoff(static_cast<MOSDBackoff*>(m));
990 return true;
991
992 case CEPH_MSG_WATCH_NOTIFY:
993 handle_watch_notify(static_cast<MWatchNotify*>(m));
994 m->put();
995 return true;
996
997 case MSG_COMMAND_REPLY:
998 if (m->get_source().type() == CEPH_ENTITY_TYPE_OSD) {
999 handle_command_reply(static_cast<MCommandReply*>(m));
1000 return true;
1001 } else {
1002 return false;
1003 }
1004
1005 case MSG_GETPOOLSTATSREPLY:
1006 handle_get_pool_stats_reply(static_cast<MGetPoolStatsReply*>(m));
1007 return true;
1008
1009 case CEPH_MSG_POOLOP_REPLY:
1010 handle_pool_op_reply(static_cast<MPoolOpReply*>(m));
1011 return true;
1012
1013 case CEPH_MSG_STATFS_REPLY:
1014 handle_fs_stats_reply(static_cast<MStatfsReply*>(m));
1015 return true;
1016
1017 // these we give others a chance to inspect
1018
1019 // MDS, OSD
1020 case CEPH_MSG_OSD_MAP:
1021 handle_osd_map(static_cast<MOSDMap*>(m));
1022 return false;
1023 }
1024 return false;
1025}
1026
11fdf7f2
TL
1027void Objecter::_scan_requests(
1028 OSDSession *s,
1029 bool skipped_map,
1030 bool cluster_full,
1031 map<int64_t, bool> *pool_full_map,
1032 map<ceph_tid_t, Op*>& need_resend,
1033 list<LingerOp*>& need_resend_linger,
1034 map<ceph_tid_t, CommandOp*>& need_resend_command,
f67539c2 1035 ceph::shunique_lock<ceph::shared_mutex>& sul)
7c673cae 1036{
11fdf7f2 1037 ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
7c673cae
FG
1038
1039 list<LingerOp*> unregister_lingers;
1040
f67539c2 1041 std::unique_lock sl(s->lock);
7c673cae
FG
1042
1043 // check for changed linger mappings (_before_ regular ops)
f67539c2 1044 auto lp = s->linger_ops.begin();
7c673cae 1045 while (lp != s->linger_ops.end()) {
f67539c2 1046 auto op = lp->second;
11fdf7f2 1047 ceph_assert(op->session == s);
7c673cae
FG
1048 // check_linger_pool_dne() may touch linger_ops; prevent iterator
1049 // invalidation
1050 ++lp;
1051 ldout(cct, 10) << " checking linger op " << op->linger_id << dendl;
1052 bool unregister, force_resend_writes = cluster_full;
1053 int r = _recalc_linger_op_target(op, sul);
1054 if (pool_full_map)
1055 force_resend_writes = force_resend_writes ||
1056 (*pool_full_map)[op->target.base_oloc.pool];
1057 switch (r) {
1058 case RECALC_OP_TARGET_NO_ACTION:
11fdf7f2 1059 if (!skipped_map && !force_resend_writes)
7c673cae
FG
1060 break;
1061 // -- fall-thru --
1062 case RECALC_OP_TARGET_NEED_RESEND:
1063 need_resend_linger.push_back(op);
1064 _linger_cancel_map_check(op);
1065 break;
1066 case RECALC_OP_TARGET_POOL_DNE:
1067 _check_linger_pool_dne(op, &unregister);
1068 if (unregister) {
1069 ldout(cct, 10) << " need to unregister linger op "
1070 << op->linger_id << dendl;
1071 op->get();
1072 unregister_lingers.push_back(op);
1073 }
1074 break;
20effc67
TL
1075 case RECALC_OP_TARGET_POOL_EIO:
1076 _check_linger_pool_eio(op);
1077 ldout(cct, 10) << " need to unregister linger op "
1078 << op->linger_id << dendl;
1079 op->get();
1080 unregister_lingers.push_back(op);
1081 break;
7c673cae
FG
1082 }
1083 }
1084
1085 // check for changed request mappings
f67539c2 1086 auto p = s->ops.begin();
7c673cae
FG
1087 while (p != s->ops.end()) {
1088 Op *op = p->second;
1089 ++p; // check_op_pool_dne() may touch ops; prevent iterator invalidation
1090 ldout(cct, 10) << " checking op " << op->tid << dendl;
11fdf7f2 1091 _prune_snapc(osdmap->get_new_removed_snaps(), op);
7c673cae
FG
1092 bool force_resend_writes = cluster_full;
1093 if (pool_full_map)
1094 force_resend_writes = force_resend_writes ||
1095 (*pool_full_map)[op->target.base_oloc.pool];
1096 int r = _calc_target(&op->target,
1097 op->session ? op->session->con.get() : nullptr);
1098 switch (r) {
1099 case RECALC_OP_TARGET_NO_ACTION:
9f95a23c 1100 if (!skipped_map && !(force_resend_writes && op->target.respects_full()))
7c673cae
FG
1101 break;
1102 // -- fall-thru --
1103 case RECALC_OP_TARGET_NEED_RESEND:
11fdf7f2 1104 _session_op_remove(op->session, op);
7c673cae
FG
1105 need_resend[op->tid] = op;
1106 _op_cancel_map_check(op);
1107 break;
1108 case RECALC_OP_TARGET_POOL_DNE:
1109 _check_op_pool_dne(op, &sl);
1110 break;
20effc67
TL
1111 case RECALC_OP_TARGET_POOL_EIO:
1112 _check_op_pool_eio(op, &sl);
1113 break;
7c673cae
FG
1114 }
1115 }
1116
1117 // commands
f67539c2 1118 auto cp = s->command_ops.begin();
7c673cae 1119 while (cp != s->command_ops.end()) {
f67539c2 1120 auto c = cp->second;
7c673cae
FG
1121 ++cp;
1122 ldout(cct, 10) << " checking command " << c->tid << dendl;
1123 bool force_resend_writes = cluster_full;
1124 if (pool_full_map)
1125 force_resend_writes = force_resend_writes ||
1126 (*pool_full_map)[c->target_pg.pool()];
1127 int r = _calc_command_target(c, sul);
1128 switch (r) {
1129 case RECALC_OP_TARGET_NO_ACTION:
1130 // resend if skipped map; otherwise do nothing.
11fdf7f2 1131 if (!skipped_map && !force_resend_writes)
7c673cae
FG
1132 break;
1133 // -- fall-thru --
1134 case RECALC_OP_TARGET_NEED_RESEND:
1135 need_resend_command[c->tid] = c;
11fdf7f2 1136 _session_command_op_remove(c->session, c);
7c673cae
FG
1137 _command_cancel_map_check(c);
1138 break;
1139 case RECALC_OP_TARGET_POOL_DNE:
1140 case RECALC_OP_TARGET_OSD_DNE:
1141 case RECALC_OP_TARGET_OSD_DOWN:
1142 _check_command_map_dne(c);
1143 break;
1144 }
1145 }
1146
1147 sl.unlock();
1148
f67539c2 1149 for (auto iter = unregister_lingers.begin();
7c673cae
FG
1150 iter != unregister_lingers.end();
1151 ++iter) {
1152 _linger_cancel(*iter);
1153 (*iter)->put();
1154 }
1155}
1156
1157void Objecter::handle_osd_map(MOSDMap *m)
1158{
f67539c2 1159 ceph::shunique_lock sul(rwlock, acquire_unique);
31f18b77 1160 if (!initialized)
7c673cae
FG
1161 return;
1162
11fdf7f2 1163 ceph_assert(osdmap);
7c673cae
FG
1164
1165 if (m->fsid != monc->get_fsid()) {
1166 ldout(cct, 0) << "handle_osd_map fsid " << m->fsid
1167 << " != " << monc->get_fsid() << dendl;
1168 return;
1169 }
1170
1171 bool was_pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
1172 bool cluster_full = _osdmap_full_flag();
1173 bool was_pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || cluster_full ||
1174 _osdmap_has_pool_full();
1175 map<int64_t, bool> pool_full_map;
f67539c2 1176 for (auto it = osdmap->get_pools().begin();
7c673cae
FG
1177 it != osdmap->get_pools().end(); ++it)
1178 pool_full_map[it->first] = _osdmap_pool_full(it->second);
1179
1180
1181 list<LingerOp*> need_resend_linger;
1182 map<ceph_tid_t, Op*> need_resend;
1183 map<ceph_tid_t, CommandOp*> need_resend_command;
1184
1185 if (m->get_last() <= osdmap->get_epoch()) {
1186 ldout(cct, 3) << "handle_osd_map ignoring epochs ["
1187 << m->get_first() << "," << m->get_last()
1188 << "] <= " << osdmap->get_epoch() << dendl;
1189 } else {
1190 ldout(cct, 3) << "handle_osd_map got epochs ["
1191 << m->get_first() << "," << m->get_last()
1192 << "] > " << osdmap->get_epoch() << dendl;
1193
1194 if (osdmap->get_epoch()) {
1195 bool skipped_map = false;
1196 // we want incrementals
1197 for (epoch_t e = osdmap->get_epoch() + 1;
1198 e <= m->get_last();
1199 e++) {
1200
1201 if (osdmap->get_epoch() == e-1 &&
1202 m->incremental_maps.count(e)) {
1203 ldout(cct, 3) << "handle_osd_map decoding incremental epoch " << e
1204 << dendl;
1205 OSDMap::Incremental inc(m->incremental_maps[e]);
1206 osdmap->apply_incremental(inc);
31f18b77 1207
f67539c2 1208 emit_blocklist_events(inc);
31f18b77 1209
7c673cae
FG
1210 logger->inc(l_osdc_map_inc);
1211 }
1212 else if (m->maps.count(e)) {
1213 ldout(cct, 3) << "handle_osd_map decoding full epoch " << e << dendl;
9f95a23c 1214 auto new_osdmap = std::make_unique<OSDMap>();
31f18b77
FG
1215 new_osdmap->decode(m->maps[e]);
1216
f67539c2 1217 emit_blocklist_events(*osdmap, *new_osdmap);
9f95a23c 1218 osdmap = std::move(new_osdmap);
31f18b77 1219
7c673cae
FG
1220 logger->inc(l_osdc_map_full);
1221 }
1222 else {
1223 if (e >= m->get_oldest()) {
1224 ldout(cct, 3) << "handle_osd_map requesting missing epoch "
1225 << osdmap->get_epoch()+1 << dendl;
1226 _maybe_request_map();
1227 break;
1228 }
1229 ldout(cct, 3) << "handle_osd_map missing epoch "
1230 << osdmap->get_epoch()+1
1231 << ", jumping to " << m->get_oldest() << dendl;
1232 e = m->get_oldest() - 1;
1233 skipped_map = true;
1234 continue;
1235 }
1236 logger->set(l_osdc_map_epoch, osdmap->get_epoch());
1237
9f95a23c 1238 prune_pg_mapping(osdmap->get_pools());
7c673cae
FG
1239 cluster_full = cluster_full || _osdmap_full_flag();
1240 update_pool_full_map(pool_full_map);
1241
1242 // check all outstanding requests on every epoch
11fdf7f2
TL
1243 for (auto& i : need_resend) {
1244 _prune_snapc(osdmap->get_new_removed_snaps(), i.second);
11fdf7f2 1245 }
7c673cae
FG
1246 _scan_requests(homeless_session, skipped_map, cluster_full,
1247 &pool_full_map, need_resend,
9f95a23c 1248 need_resend_linger, need_resend_command, sul);
f67539c2 1249 for (auto p = osd_sessions.begin();
7c673cae 1250 p != osd_sessions.end(); ) {
f67539c2 1251 auto s = p->second;
7c673cae
FG
1252 _scan_requests(s, skipped_map, cluster_full,
1253 &pool_full_map, need_resend,
9f95a23c 1254 need_resend_linger, need_resend_command, sul);
7c673cae
FG
1255 ++p;
1256 // osd down or addr change?
1257 if (!osdmap->is_up(s->osd) ||
1258 (s->con &&
11fdf7f2 1259 s->con->get_peer_addrs() != osdmap->get_addrs(s->osd))) {
7c673cae
FG
1260 close_session(s);
1261 }
1262 }
1263
11fdf7f2 1264 ceph_assert(e == osdmap->get_epoch());
7c673cae
FG
1265 }
1266
1267 } else {
1268 // first map. we want the full thing.
1269 if (m->maps.count(m->get_last())) {
f67539c2 1270 for (auto p = osd_sessions.begin();
7c673cae
FG
1271 p != osd_sessions.end(); ++p) {
1272 OSDSession *s = p->second;
1273 _scan_requests(s, false, false, NULL, need_resend,
9f95a23c 1274 need_resend_linger, need_resend_command, sul);
7c673cae
FG
1275 }
1276 ldout(cct, 3) << "handle_osd_map decoding full epoch "
1277 << m->get_last() << dendl;
1278 osdmap->decode(m->maps[m->get_last()]);
9f95a23c 1279 prune_pg_mapping(osdmap->get_pools());
7c673cae
FG
1280
1281 _scan_requests(homeless_session, false, false, NULL,
1282 need_resend, need_resend_linger,
9f95a23c 1283 need_resend_command, sul);
7c673cae
FG
1284 } else {
1285 ldout(cct, 3) << "handle_osd_map hmm, i want a full map, requesting"
1286 << dendl;
1287 monc->sub_want("osdmap", 0, CEPH_SUBSCRIBE_ONETIME);
1288 monc->renew_subs();
1289 }
1290 }
1291 }
1292
1293 // make sure need_resend targets reflect latest map
1294 for (auto p = need_resend.begin(); p != need_resend.end(); ) {
1295 Op *op = p->second;
1296 if (op->target.epoch < osdmap->get_epoch()) {
1297 ldout(cct, 10) << __func__ << " checking op " << p->first << dendl;
1298 int r = _calc_target(&op->target, nullptr);
1299 if (r == RECALC_OP_TARGET_POOL_DNE) {
1300 p = need_resend.erase(p);
1301 _check_op_pool_dne(op, nullptr);
1302 } else {
1303 ++p;
1304 }
1305 } else {
1306 ++p;
1307 }
1308 }
1309
1310 bool pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
1311 bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || _osdmap_full_flag()
1312 || _osdmap_has_pool_full();
1313
1314 // was/is paused?
1315 if (was_pauserd || was_pausewr || pauserd || pausewr ||
1316 osdmap->get_epoch() < epoch_barrier) {
1317 _maybe_request_map();
1318 }
1319
1320 // resend requests
f67539c2 1321 for (auto p = need_resend.begin();
7c673cae 1322 p != need_resend.end(); ++p) {
f67539c2
TL
1323 auto op = p->second;
1324 auto s = op->session;
7c673cae
FG
1325 bool mapped_session = false;
1326 if (!s) {
1327 int r = _map_session(&op->target, &s, sul);
11fdf7f2 1328 ceph_assert(r == 0);
7c673cae
FG
1329 mapped_session = true;
1330 } else {
1331 get_session(s);
1332 }
f67539c2 1333 std::unique_lock sl(s->lock);
7c673cae
FG
1334 if (mapped_session) {
1335 _session_op_assign(s, op);
1336 }
1337 if (op->should_resend) {
1338 if (!op->session->is_homeless() && !op->target.paused) {
1339 logger->inc(l_osdc_op_resend);
1340 _send_op(op);
1341 }
1342 } else {
1343 _op_cancel_map_check(op);
1344 _cancel_linger_op(op);
1345 }
1346 sl.unlock();
1347 put_session(s);
1348 }
f67539c2 1349 for (auto p = need_resend_linger.begin();
7c673cae
FG
1350 p != need_resend_linger.end(); ++p) {
1351 LingerOp *op = *p;
11fdf7f2 1352 ceph_assert(op->session);
7c673cae
FG
1353 if (!op->session->is_homeless()) {
1354 logger->inc(l_osdc_linger_resend);
1355 _send_linger(op, sul);
1356 }
1357 }
f67539c2 1358 for (auto p = need_resend_command.begin();
7c673cae 1359 p != need_resend_command.end(); ++p) {
f67539c2 1360 auto c = p->second;
7c673cae
FG
1361 if (c->target.osd >= 0) {
1362 _assign_command_session(c, sul);
1363 if (c->session && !c->session->is_homeless()) {
1364 _send_command(c);
1365 }
1366 }
1367 }
1368
1369 _dump_active();
1370
1371 // finish any Contexts that were waiting on a map update
f67539c2 1372 auto p = waiting_for_map.begin();
7c673cae
FG
1373 while (p != waiting_for_map.end() &&
1374 p->first <= osdmap->get_epoch()) {
1375 //go through the list and call the onfinish methods
f67539c2
TL
1376 for (auto& [c, ec] : p->second) {
1377 ca::post(std::move(c), ec);
7c673cae
FG
1378 }
1379 waiting_for_map.erase(p++);
1380 }
1381
1382 monc->sub_got("osdmap", osdmap->get_epoch());
1383
1384 if (!waiting_for_map.empty()) {
1385 _maybe_request_map();
1386 }
1387}
1388
f67539c2 1389void Objecter::enable_blocklist_events()
31f18b77
FG
1390{
1391 unique_lock wl(rwlock);
1392
f67539c2 1393 blocklist_events_enabled = true;
31f18b77
FG
1394}
1395
f67539c2 1396void Objecter::consume_blocklist_events(std::set<entity_addr_t> *events)
31f18b77
FG
1397{
1398 unique_lock wl(rwlock);
1399
1400 if (events->empty()) {
f67539c2 1401 events->swap(blocklist_events);
31f18b77 1402 } else {
f67539c2 1403 for (const auto &i : blocklist_events) {
31f18b77
FG
1404 events->insert(i);
1405 }
f67539c2 1406 blocklist_events.clear();
31f18b77
FG
1407 }
1408}
1409
f67539c2 1410void Objecter::emit_blocklist_events(const OSDMap::Incremental &inc)
31f18b77 1411{
f67539c2 1412 if (!blocklist_events_enabled) {
31f18b77
FG
1413 return;
1414 }
1415
f67539c2
TL
1416 for (const auto &i : inc.new_blocklist) {
1417 blocklist_events.insert(i.first);
31f18b77
FG
1418 }
1419}
1420
f67539c2 1421void Objecter::emit_blocklist_events(const OSDMap &old_osd_map,
31f18b77
FG
1422 const OSDMap &new_osd_map)
1423{
f67539c2 1424 if (!blocklist_events_enabled) {
31f18b77
FG
1425 return;
1426 }
1427
1428 std::set<entity_addr_t> old_set;
1429 std::set<entity_addr_t> new_set;
33c7a0ef
TL
1430 std::set<entity_addr_t> old_range_set;
1431 std::set<entity_addr_t> new_range_set;
31f18b77 1432
33c7a0ef
TL
1433 old_osd_map.get_blocklist(&old_set, &old_range_set);
1434 new_osd_map.get_blocklist(&new_set, &new_range_set);
31f18b77
FG
1435
1436 std::set<entity_addr_t> delta_set;
1437 std::set_difference(
1438 new_set.begin(), new_set.end(), old_set.begin(), old_set.end(),
1439 std::inserter(delta_set, delta_set.begin()));
33c7a0ef
TL
1440 std::set_difference(
1441 new_range_set.begin(), new_range_set.end(),
1442 old_range_set.begin(), old_range_set.end(),
1443 std::inserter(delta_set, delta_set.begin()));
f67539c2 1444 blocklist_events.insert(delta_set.begin(), delta_set.end());
31f18b77
FG
1445}
1446
7c673cae
FG
1447// op pool check
1448
f67539c2
TL
1449void Objecter::CB_Op_Map_Latest::operator()(bs::error_code e,
1450 version_t latest, version_t)
7c673cae 1451{
f67539c2
TL
1452 if (e == bs::errc::resource_unavailable_try_again ||
1453 e == bs::errc::operation_canceled)
7c673cae
FG
1454 return;
1455
1456 lgeneric_subdout(objecter->cct, objecter, 10)
f67539c2 1457 << "op_map_latest r=" << e << " tid=" << tid
7c673cae
FG
1458 << " latest " << latest << dendl;
1459
f67539c2 1460 unique_lock wl(objecter->rwlock);
7c673cae 1461
f67539c2 1462 auto iter = objecter->check_latest_map_ops.find(tid);
7c673cae
FG
1463 if (iter == objecter->check_latest_map_ops.end()) {
1464 lgeneric_subdout(objecter->cct, objecter, 10)
1465 << "op_map_latest op "<< tid << " not found" << dendl;
1466 return;
1467 }
1468
1469 Op *op = iter->second;
1470 objecter->check_latest_map_ops.erase(iter);
1471
1472 lgeneric_subdout(objecter->cct, objecter, 20)
1473 << "op_map_latest op "<< op << dendl;
1474
1475 if (op->map_dne_bound == 0)
1476 op->map_dne_bound = latest;
1477
f67539c2 1478 unique_lock sl(op->session->lock, defer_lock);
7c673cae
FG
1479 objecter->_check_op_pool_dne(op, &sl);
1480
1481 op->put();
1482}
1483
1484int Objecter::pool_snap_by_name(int64_t poolid, const char *snap_name,
1485 snapid_t *snap) const
1486{
1487 shared_lock rl(rwlock);
1488
1489 auto& pools = osdmap->get_pools();
1490 auto iter = pools.find(poolid);
1491 if (iter == pools.end()) {
1492 return -ENOENT;
1493 }
1494 const pg_pool_t& pg_pool = iter->second;
1495 for (auto p = pg_pool.snaps.begin();
1496 p != pg_pool.snaps.end();
1497 ++p) {
1498 if (p->second.name == snap_name) {
1499 *snap = p->first;
1500 return 0;
1501 }
1502 }
1503 return -ENOENT;
1504}
1505
1506int Objecter::pool_snap_get_info(int64_t poolid, snapid_t snap,
1507 pool_snap_info_t *info) const
1508{
1509 shared_lock rl(rwlock);
1510
1511 auto& pools = osdmap->get_pools();
1512 auto iter = pools.find(poolid);
1513 if (iter == pools.end()) {
1514 return -ENOENT;
1515 }
1516 const pg_pool_t& pg_pool = iter->second;
1517 auto p = pg_pool.snaps.find(snap);
1518 if (p == pg_pool.snaps.end())
1519 return -ENOENT;
1520 *info = p->second;
1521
1522 return 0;
1523}
1524
1525int Objecter::pool_snap_list(int64_t poolid, vector<uint64_t> *snaps)
1526{
1527 shared_lock rl(rwlock);
1528
1529 const pg_pool_t *pi = osdmap->get_pg_pool(poolid);
1530 if (!pi)
1531 return -ENOENT;
f67539c2 1532 for (auto p = pi->snaps.begin();
7c673cae
FG
1533 p != pi->snaps.end();
1534 ++p) {
1535 snaps->push_back(p->first);
1536 }
1537 return 0;
1538}
1539
1540// sl may be unlocked.
f67539c2 1541void Objecter::_check_op_pool_dne(Op *op, std::unique_lock<std::shared_mutex> *sl)
7c673cae
FG
1542{
1543 // rwlock is locked unique
1544
1545 if (op->target.pool_ever_existed) {
1546 // the pool previously existed and now it does not, which means it
1547 // was deleted.
1548 op->map_dne_bound = osdmap->get_epoch();
1549 ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
1550 << " pool previously exists but now does not"
1551 << dendl;
1552 } else {
1553 ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
1554 << " current " << osdmap->get_epoch()
1555 << " map_dne_bound " << op->map_dne_bound
1556 << dendl;
1557 }
1558 if (op->map_dne_bound > 0) {
1559 if (osdmap->get_epoch() >= op->map_dne_bound) {
1560 // we had a new enough map
1561 ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
1562 << " concluding pool " << op->target.base_pgid.pool()
1563 << " dne" << dendl;
f67539c2 1564 if (op->has_completion()) {
11fdf7f2 1565 num_in_flight--;
f67539c2 1566 op->complete(osdc_errc::pool_dne, -ENOENT);
7c673cae
FG
1567 }
1568
1569 OSDSession *s = op->session;
1570 if (s) {
11fdf7f2
TL
1571 ceph_assert(s != NULL);
1572 ceph_assert(sl->mutex() == &s->lock);
7c673cae
FG
1573 bool session_locked = sl->owns_lock();
1574 if (!session_locked) {
1575 sl->lock();
1576 }
1577 _finish_op(op, 0);
1578 if (!session_locked) {
1579 sl->unlock();
1580 }
1581 } else {
1582 _finish_op(op, 0); // no session
1583 }
1584 }
1585 } else {
1586 _send_op_map_check(op);
1587 }
1588}
1589
20effc67
TL
1590// sl may be unlocked.
1591void Objecter::_check_op_pool_eio(Op *op, std::unique_lock<std::shared_mutex> *sl)
1592{
1593 // rwlock is locked unique
1594
1595 // we had a new enough map
1596 ldout(cct, 10) << "check_op_pool_eio tid " << op->tid
1597 << " concluding pool " << op->target.base_pgid.pool()
1598 << " has eio" << dendl;
1599 if (op->has_completion()) {
1600 num_in_flight--;
1601 op->complete(osdc_errc::pool_eio, -EIO);
1602 }
1603
1604 OSDSession *s = op->session;
1605 if (s) {
1606 ceph_assert(s != NULL);
1607 ceph_assert(sl->mutex() == &s->lock);
1608 bool session_locked = sl->owns_lock();
1609 if (!session_locked) {
1610 sl->lock();
1611 }
1612 _finish_op(op, 0);
1613 if (!session_locked) {
1614 sl->unlock();
1615 }
1616 } else {
1617 _finish_op(op, 0); // no session
1618 }
1619}
1620
7c673cae
FG
1621void Objecter::_send_op_map_check(Op *op)
1622{
1623 // rwlock is locked unique
1624 // ask the monitor
1625 if (check_latest_map_ops.count(op->tid) == 0) {
1626 op->get();
1627 check_latest_map_ops[op->tid] = op;
f67539c2 1628 monc->get_version("osdmap", CB_Op_Map_Latest(this, op->tid));
7c673cae
FG
1629 }
1630}
1631
1632void Objecter::_op_cancel_map_check(Op *op)
1633{
1634 // rwlock is locked unique
f67539c2 1635 auto iter = check_latest_map_ops.find(op->tid);
7c673cae
FG
1636 if (iter != check_latest_map_ops.end()) {
1637 Op *op = iter->second;
1638 op->put();
1639 check_latest_map_ops.erase(iter);
1640 }
1641}
1642
1643// linger pool check
1644
f67539c2
TL
1645void Objecter::CB_Linger_Map_Latest::operator()(bs::error_code e,
1646 version_t latest,
1647 version_t)
7c673cae 1648{
f67539c2
TL
1649 if (e == bs::errc::resource_unavailable_try_again ||
1650 e == bs::errc::operation_canceled) {
7c673cae
FG
1651 // ignore callback; we will retry in resend_mon_ops()
1652 return;
1653 }
1654
1655 unique_lock wl(objecter->rwlock);
1656
f67539c2 1657 auto iter = objecter->check_latest_map_lingers.find(linger_id);
7c673cae
FG
1658 if (iter == objecter->check_latest_map_lingers.end()) {
1659 return;
1660 }
1661
f67539c2 1662 auto op = iter->second;
7c673cae
FG
1663 objecter->check_latest_map_lingers.erase(iter);
1664
1665 if (op->map_dne_bound == 0)
1666 op->map_dne_bound = latest;
1667
1668 bool unregister;
1669 objecter->_check_linger_pool_dne(op, &unregister);
1670
1671 if (unregister) {
1672 objecter->_linger_cancel(op);
1673 }
1674
1675 op->put();
1676}
1677
1678void Objecter::_check_linger_pool_dne(LingerOp *op, bool *need_unregister)
1679{
1680 // rwlock is locked unique
1681
1682 *need_unregister = false;
1683
1684 if (op->register_gen > 0) {
1685 ldout(cct, 10) << "_check_linger_pool_dne linger_id " << op->linger_id
1686 << " pool previously existed but now does not"
1687 << dendl;
1688 op->map_dne_bound = osdmap->get_epoch();
1689 } else {
1690 ldout(cct, 10) << "_check_linger_pool_dne linger_id " << op->linger_id
1691 << " current " << osdmap->get_epoch()
1692 << " map_dne_bound " << op->map_dne_bound
1693 << dendl;
1694 }
1695 if (op->map_dne_bound > 0) {
1696 if (osdmap->get_epoch() >= op->map_dne_bound) {
f67539c2 1697 std::unique_lock wl{op->watch_lock};
7c673cae 1698 if (op->on_reg_commit) {
f67539c2
TL
1699 op->on_reg_commit->defer(std::move(op->on_reg_commit),
1700 osdc_errc::pool_dne, cb::list{});
28e407b8
AA
1701 op->on_reg_commit = nullptr;
1702 }
1703 if (op->on_notify_finish) {
f67539c2
TL
1704 op->on_notify_finish->defer(std::move(op->on_notify_finish),
1705 osdc_errc::pool_dne, cb::list{});
28e407b8 1706 op->on_notify_finish = nullptr;
7c673cae
FG
1707 }
1708 *need_unregister = true;
1709 }
1710 } else {
1711 _send_linger_map_check(op);
1712 }
1713}
1714
20effc67
TL
1715void Objecter::_check_linger_pool_eio(LingerOp *op)
1716{
1717 // rwlock is locked unique
1718
1719 std::unique_lock wl{op->watch_lock};
1720 if (op->on_reg_commit) {
1721 op->on_reg_commit->defer(std::move(op->on_reg_commit),
1722 osdc_errc::pool_dne, cb::list{});
1723 op->on_reg_commit = nullptr;
1724 }
1725 if (op->on_notify_finish) {
1726 op->on_notify_finish->defer(std::move(op->on_notify_finish),
1727 osdc_errc::pool_dne, cb::list{});
1728 op->on_notify_finish = nullptr;
1729 }
1730}
1731
7c673cae
FG
1732void Objecter::_send_linger_map_check(LingerOp *op)
1733{
1734 // ask the monitor
1735 if (check_latest_map_lingers.count(op->linger_id) == 0) {
1736 op->get();
1737 check_latest_map_lingers[op->linger_id] = op;
f67539c2 1738 monc->get_version("osdmap", CB_Linger_Map_Latest(this, op->linger_id));
7c673cae
FG
1739 }
1740}
1741
1742void Objecter::_linger_cancel_map_check(LingerOp *op)
1743{
1744 // rwlock is locked unique
1745
f67539c2 1746 auto iter = check_latest_map_lingers.find(op->linger_id);
7c673cae
FG
1747 if (iter != check_latest_map_lingers.end()) {
1748 LingerOp *op = iter->second;
1749 op->put();
1750 check_latest_map_lingers.erase(iter);
1751 }
1752}
1753
1754// command pool check
1755
f67539c2
TL
1756void Objecter::CB_Command_Map_Latest::operator()(bs::error_code e,
1757 version_t latest, version_t)
7c673cae 1758{
f67539c2
TL
1759 if (e == bs::errc::resource_unavailable_try_again ||
1760 e == bs::errc::operation_canceled) {
7c673cae
FG
1761 // ignore callback; we will retry in resend_mon_ops()
1762 return;
1763 }
1764
1765 unique_lock wl(objecter->rwlock);
1766
f67539c2 1767 auto iter = objecter->check_latest_map_commands.find(tid);
7c673cae
FG
1768 if (iter == objecter->check_latest_map_commands.end()) {
1769 return;
1770 }
1771
f67539c2 1772 auto c = iter->second;
7c673cae
FG
1773 objecter->check_latest_map_commands.erase(iter);
1774
1775 if (c->map_dne_bound == 0)
1776 c->map_dne_bound = latest;
1777
f67539c2 1778 unique_lock sul(c->session->lock);
7c673cae 1779 objecter->_check_command_map_dne(c);
28e407b8 1780 sul.unlock();
7c673cae
FG
1781
1782 c->put();
1783}
1784
1785void Objecter::_check_command_map_dne(CommandOp *c)
1786{
1787 // rwlock is locked unique
28e407b8 1788 // session is locked unique
7c673cae
FG
1789
1790 ldout(cct, 10) << "_check_command_map_dne tid " << c->tid
1791 << " current " << osdmap->get_epoch()
1792 << " map_dne_bound " << c->map_dne_bound
1793 << dendl;
1794 if (c->map_dne_bound > 0) {
1795 if (osdmap->get_epoch() >= c->map_dne_bound) {
f67539c2
TL
1796 _finish_command(c, osdcode(c->map_check_error),
1797 std::move(c->map_check_error_str), {});
7c673cae
FG
1798 }
1799 } else {
1800 _send_command_map_check(c);
1801 }
1802}
1803
1804void Objecter::_send_command_map_check(CommandOp *c)
1805{
1806 // rwlock is locked unique
28e407b8 1807 // session is locked unique
7c673cae
FG
1808
1809 // ask the monitor
1810 if (check_latest_map_commands.count(c->tid) == 0) {
1811 c->get();
1812 check_latest_map_commands[c->tid] = c;
f67539c2 1813 monc->get_version("osdmap", CB_Command_Map_Latest(this, c->tid));
7c673cae
FG
1814 }
1815}
1816
1817void Objecter::_command_cancel_map_check(CommandOp *c)
1818{
1819 // rwlock is locked uniqe
1820
f67539c2 1821 auto iter = check_latest_map_commands.find(c->tid);
7c673cae 1822 if (iter != check_latest_map_commands.end()) {
f67539c2 1823 auto c = iter->second;
7c673cae
FG
1824 c->put();
1825 check_latest_map_commands.erase(iter);
1826 }
1827}
1828
1829
1830/**
1831 * Look up OSDSession by OSD id.
1832 *
1833 * @returns 0 on success, or -EAGAIN if the lock context requires
1834 * promotion to write.
1835 */
f67539c2
TL
1836int Objecter::_get_session(int osd, OSDSession **session,
1837 shunique_lock<ceph::shared_mutex>& sul)
7c673cae 1838{
11fdf7f2 1839 ceph_assert(sul && sul.mutex() == &rwlock);
7c673cae
FG
1840
1841 if (osd < 0) {
1842 *session = homeless_session;
1843 ldout(cct, 20) << __func__ << " osd=" << osd << " returning homeless"
1844 << dendl;
1845 return 0;
1846 }
1847
f67539c2 1848 auto p = osd_sessions.find(osd);
7c673cae 1849 if (p != osd_sessions.end()) {
f67539c2 1850 auto s = p->second;
7c673cae
FG
1851 s->get();
1852 *session = s;
1853 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << osd << " "
1854 << s->get_nref() << dendl;
1855 return 0;
1856 }
1857 if (!sul.owns_lock()) {
1858 return -EAGAIN;
1859 }
f67539c2 1860 auto s = new OSDSession(cct, osd);
7c673cae 1861 osd_sessions[osd] = s;
11fdf7f2
TL
1862 s->con = messenger->connect_to_osd(osdmap->get_addrs(osd));
1863 s->con->set_priv(RefCountedPtr{s});
7c673cae
FG
1864 logger->inc(l_osdc_osd_session_open);
1865 logger->set(l_osdc_osd_sessions, osd_sessions.size());
1866 s->get();
1867 *session = s;
1868 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << osd << " "
1869 << s->get_nref() << dendl;
1870 return 0;
1871}
1872
1873void Objecter::put_session(Objecter::OSDSession *s)
1874{
1875 if (s && !s->is_homeless()) {
1876 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << s->osd << " "
1877 << s->get_nref() << dendl;
1878 s->put();
1879 }
1880}
1881
1882void Objecter::get_session(Objecter::OSDSession *s)
1883{
11fdf7f2 1884 ceph_assert(s != NULL);
7c673cae
FG
1885
1886 if (!s->is_homeless()) {
1887 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << s->osd << " "
1888 << s->get_nref() << dendl;
1889 s->get();
1890 }
1891}
1892
1893void Objecter::_reopen_session(OSDSession *s)
1894{
91327a77 1895 // rwlock is locked unique
7c673cae
FG
1896 // s->lock is locked
1897
11fdf7f2 1898 auto addrs = osdmap->get_addrs(s->osd);
7c673cae 1899 ldout(cct, 10) << "reopen_session osd." << s->osd << " session, addr now "
11fdf7f2 1900 << addrs << dendl;
7c673cae
FG
1901 if (s->con) {
1902 s->con->set_priv(NULL);
1903 s->con->mark_down();
1904 logger->inc(l_osdc_osd_session_close);
1905 }
11fdf7f2
TL
1906 s->con = messenger->connect_to_osd(addrs);
1907 s->con->set_priv(RefCountedPtr{s});
7c673cae
FG
1908 s->incarnation++;
1909 logger->inc(l_osdc_osd_session_open);
1910}
1911
1912void Objecter::close_session(OSDSession *s)
1913{
1914 // rwlock is locked unique
1915
1916 ldout(cct, 10) << "close_session for osd." << s->osd << dendl;
1917 if (s->con) {
1918 s->con->set_priv(NULL);
1919 s->con->mark_down();
1920 logger->inc(l_osdc_osd_session_close);
1921 }
f67539c2 1922 unique_lock sl(s->lock);
7c673cae
FG
1923
1924 std::list<LingerOp*> homeless_lingers;
1925 std::list<CommandOp*> homeless_commands;
1926 std::list<Op*> homeless_ops;
1927
1928 while (!s->linger_ops.empty()) {
f67539c2 1929 auto i = s->linger_ops.begin();
7c673cae
FG
1930 ldout(cct, 10) << " linger_op " << i->first << dendl;
1931 homeless_lingers.push_back(i->second);
1932 _session_linger_op_remove(s, i->second);
1933 }
1934
1935 while (!s->ops.empty()) {
f67539c2 1936 auto i = s->ops.begin();
7c673cae
FG
1937 ldout(cct, 10) << " op " << i->first << dendl;
1938 homeless_ops.push_back(i->second);
1939 _session_op_remove(s, i->second);
1940 }
1941
1942 while (!s->command_ops.empty()) {
f67539c2 1943 auto i = s->command_ops.begin();
7c673cae
FG
1944 ldout(cct, 10) << " command_op " << i->first << dendl;
1945 homeless_commands.push_back(i->second);
1946 _session_command_op_remove(s, i->second);
1947 }
1948
1949 osd_sessions.erase(s->osd);
1950 sl.unlock();
1951 put_session(s);
1952
1953 // Assign any leftover ops to the homeless session
1954 {
f67539c2
TL
1955 unique_lock hsl(homeless_session->lock);
1956 for (auto i = homeless_lingers.begin();
7c673cae
FG
1957 i != homeless_lingers.end(); ++i) {
1958 _session_linger_op_assign(homeless_session, *i);
1959 }
f67539c2 1960 for (auto i = homeless_ops.begin();
7c673cae
FG
1961 i != homeless_ops.end(); ++i) {
1962 _session_op_assign(homeless_session, *i);
1963 }
f67539c2 1964 for (auto i = homeless_commands.begin();
7c673cae
FG
1965 i != homeless_commands.end(); ++i) {
1966 _session_command_op_assign(homeless_session, *i);
1967 }
1968 }
1969
1970 logger->set(l_osdc_osd_sessions, osd_sessions.size());
1971}
1972
9f95a23c 1973void Objecter::wait_for_osd_map(epoch_t e)
7c673cae
FG
1974{
1975 unique_lock l(rwlock);
9f95a23c 1976 if (osdmap->get_epoch() >= e) {
7c673cae
FG
1977 l.unlock();
1978 return;
1979 }
1980
f67539c2
TL
1981 ca::waiter<bs::error_code> w;
1982 waiting_for_map[e].emplace_back(OpCompletion::create(
1983 service.get_executor(),
1984 w.ref()),
1985 bs::error_code{});
7c673cae 1986 l.unlock();
f67539c2 1987 w.wait();
7c673cae
FG
1988}
1989
f67539c2
TL
1990void Objecter::_get_latest_version(epoch_t oldest, epoch_t newest,
1991 std::unique_ptr<OpCompletion> fin,
1992 std::unique_lock<ceph::shared_mutex>&& l)
7c673cae 1993{
f67539c2 1994 ceph_assert(fin);
7c673cae 1995 if (osdmap->get_epoch() >= newest) {
11fdf7f2 1996 ldout(cct, 10) << __func__ << " latest " << newest << ", have it" << dendl;
f67539c2
TL
1997 l.unlock();
1998 ca::defer(std::move(fin), bs::error_code{});
1999 } else {
2000 ldout(cct, 10) << __func__ << " latest " << newest << ", waiting" << dendl;
2001 _wait_for_new_map(std::move(fin), newest, bs::error_code{});
2002 l.unlock();
7c673cae 2003 }
7c673cae
FG
2004}
2005
2006void Objecter::maybe_request_map()
2007{
2008 shared_lock rl(rwlock);
2009 _maybe_request_map();
2010}
2011
2012void Objecter::_maybe_request_map()
2013{
2014 // rwlock is locked
2015 int flag = 0;
2016 if (_osdmap_full_flag()
2017 || osdmap->test_flag(CEPH_OSDMAP_PAUSERD)
2018 || osdmap->test_flag(CEPH_OSDMAP_PAUSEWR)) {
2019 ldout(cct, 10) << "_maybe_request_map subscribing (continuous) to next "
2020 "osd map (FULL flag is set)" << dendl;
2021 } else {
2022 ldout(cct, 10)
2023 << "_maybe_request_map subscribing (onetime) to next osd map" << dendl;
2024 flag = CEPH_SUBSCRIBE_ONETIME;
2025 }
2026 epoch_t epoch = osdmap->get_epoch() ? osdmap->get_epoch()+1 : 0;
2027 if (monc->sub_want("osdmap", epoch, flag)) {
2028 monc->renew_subs();
2029 }
2030}
2031
f67539c2
TL
2032void Objecter::_wait_for_new_map(std::unique_ptr<OpCompletion> c, epoch_t epoch,
2033 bs::error_code ec)
7c673cae
FG
2034{
2035 // rwlock is locked unique
f67539c2 2036 waiting_for_map[epoch].emplace_back(std::move(c), ec);
7c673cae
FG
2037 _maybe_request_map();
2038}
2039
2040
2041/**
2042 * Use this together with wait_for_map: this is a pre-check to avoid
2043 * allocating a Context for wait_for_map if we can see that we
2044 * definitely already have the epoch.
2045 *
2046 * This does *not* replace the need to handle the return value of
2047 * wait_for_map: just because we don't have it in this pre-check
2048 * doesn't mean we won't have it when calling back into wait_for_map,
2049 * since the objecter lock is dropped in between.
2050 */
2051bool Objecter::have_map(const epoch_t epoch)
2052{
2053 shared_lock rl(rwlock);
2054 if (osdmap->get_epoch() >= epoch) {
2055 return true;
2056 } else {
2057 return false;
2058 }
2059}
2060
7c673cae
FG
2061void Objecter::_kick_requests(OSDSession *session,
2062 map<uint64_t, LingerOp *>& lresend)
2063{
2064 // rwlock is locked unique
2065
2066 // clear backoffs
2067 session->backoffs.clear();
2068 session->backoffs_by_id.clear();
2069
2070 // resend ops
2071 map<ceph_tid_t,Op*> resend; // resend in tid order
f67539c2 2072 for (auto p = session->ops.begin(); p != session->ops.end();) {
7c673cae
FG
2073 Op *op = p->second;
2074 ++p;
7c673cae
FG
2075 if (op->should_resend) {
2076 if (!op->target.paused)
2077 resend[op->tid] = op;
2078 } else {
2079 _op_cancel_map_check(op);
2080 _cancel_linger_op(op);
2081 }
2082 }
2083
11fdf7f2 2084 logger->inc(l_osdc_op_resend, resend.size());
7c673cae
FG
2085 while (!resend.empty()) {
2086 _send_op(resend.begin()->second);
2087 resend.erase(resend.begin());
2088 }
2089
2090 // resend lingers
11fdf7f2 2091 logger->inc(l_osdc_linger_resend, session->linger_ops.size());
f67539c2 2092 for (auto j = session->linger_ops.begin();
7c673cae
FG
2093 j != session->linger_ops.end(); ++j) {
2094 LingerOp *op = j->second;
2095 op->get();
11fdf7f2 2096 ceph_assert(lresend.count(j->first) == 0);
7c673cae
FG
2097 lresend[j->first] = op;
2098 }
2099
2100 // resend commands
11fdf7f2 2101 logger->inc(l_osdc_command_resend, session->command_ops.size());
7c673cae 2102 map<uint64_t,CommandOp*> cresend; // resend in order
f67539c2 2103 for (auto k = session->command_ops.begin();
7c673cae 2104 k != session->command_ops.end(); ++k) {
7c673cae
FG
2105 cresend[k->first] = k->second;
2106 }
2107 while (!cresend.empty()) {
2108 _send_command(cresend.begin()->second);
2109 cresend.erase(cresend.begin());
2110 }
2111}
2112
2113void Objecter::_linger_ops_resend(map<uint64_t, LingerOp *>& lresend,
f67539c2 2114 unique_lock<ceph::shared_mutex>& ul)
7c673cae 2115{
11fdf7f2 2116 ceph_assert(ul.owns_lock());
7c673cae
FG
2117 shunique_lock sul(std::move(ul));
2118 while (!lresend.empty()) {
2119 LingerOp *op = lresend.begin()->second;
2120 if (!op->canceled) {
2121 _send_linger(op, sul);
2122 }
2123 op->put();
2124 lresend.erase(lresend.begin());
2125 }
11fdf7f2 2126 ul = sul.release_to_unique();
7c673cae
FG
2127}
2128
2129void Objecter::start_tick()
2130{
11fdf7f2 2131 ceph_assert(tick_event == 0);
7c673cae
FG
2132 tick_event =
2133 timer.add_event(ceph::make_timespan(cct->_conf->objecter_tick_interval),
2134 &Objecter::tick, this);
2135}
2136
2137void Objecter::tick()
2138{
2139 shared_lock rl(rwlock);
2140
2141 ldout(cct, 10) << "tick" << dendl;
2142
2143 // we are only called by C_Tick
2144 tick_event = 0;
2145
31f18b77 2146 if (!initialized) {
7c673cae
FG
2147 // we raced with shutdown
2148 ldout(cct, 10) << __func__ << " raced with shutdown" << dendl;
2149 return;
2150 }
2151
2152 set<OSDSession*> toping;
2153
2154
2155 // look for laggy requests
11fdf7f2 2156 auto cutoff = ceph::coarse_mono_clock::now();
7c673cae
FG
2157 cutoff -= ceph::make_timespan(cct->_conf->objecter_timeout); // timeout
2158
2159 unsigned laggy_ops = 0;
2160
f67539c2 2161 for (auto siter = osd_sessions.begin();
7c673cae 2162 siter != osd_sessions.end(); ++siter) {
f67539c2
TL
2163 auto s = siter->second;
2164 scoped_lock l(s->lock);
7c673cae 2165 bool found = false;
f67539c2
TL
2166 for (auto p = s->ops.begin(); p != s->ops.end(); ++p) {
2167 auto op = p->second;
11fdf7f2 2168 ceph_assert(op->session);
7c673cae
FG
2169 if (op->stamp < cutoff) {
2170 ldout(cct, 2) << " tid " << p->first << " on osd." << op->session->osd
2171 << " is laggy" << dendl;
2172 found = true;
2173 ++laggy_ops;
2174 }
2175 }
f67539c2 2176 for (auto p = s->linger_ops.begin();
7c673cae
FG
2177 p != s->linger_ops.end();
2178 ++p) {
f67539c2
TL
2179 auto op = p->second;
2180 std::unique_lock wl(op->watch_lock);
11fdf7f2 2181 ceph_assert(op->session);
7c673cae
FG
2182 ldout(cct, 10) << " pinging osd that serves lingering tid " << p->first
2183 << " (osd." << op->session->osd << ")" << dendl;
2184 found = true;
2185 if (op->is_watch && op->registered && !op->last_error)
2186 _send_linger_ping(op);
2187 }
f67539c2 2188 for (auto p = s->command_ops.begin();
7c673cae
FG
2189 p != s->command_ops.end();
2190 ++p) {
f67539c2 2191 auto op = p->second;
11fdf7f2 2192 ceph_assert(op->session);
7c673cae
FG
2193 ldout(cct, 10) << " pinging osd that serves command tid " << p->first
2194 << " (osd." << op->session->osd << ")" << dendl;
2195 found = true;
2196 }
2197 if (found)
2198 toping.insert(s);
2199 }
31f18b77 2200 if (num_homeless_ops || !toping.empty()) {
7c673cae
FG
2201 _maybe_request_map();
2202 }
2203
2204 logger->set(l_osdc_op_laggy, laggy_ops);
2205 logger->set(l_osdc_osd_laggy, toping.size());
2206
2207 if (!toping.empty()) {
2208 // send a ping to these osds, to ensure we detect any session resets
2209 // (osd reply message policy is lossy)
f67539c2 2210 for (auto i = toping.begin(); i != toping.end(); ++i) {
7c673cae
FG
2211 (*i)->con->send_message(new MPing);
2212 }
2213 }
2214
11fdf7f2 2215 // Make sure we don't reschedule if we wake up after shutdown
31f18b77 2216 if (initialized) {
7c673cae
FG
2217 tick_event = timer.reschedule_me(ceph::make_timespan(
2218 cct->_conf->objecter_tick_interval));
2219 }
2220}
2221
2222void Objecter::resend_mon_ops()
2223{
2224 unique_lock wl(rwlock);
2225
2226 ldout(cct, 10) << "resend_mon_ops" << dendl;
2227
f67539c2 2228 for (auto p = poolstat_ops.begin(); p != poolstat_ops.end(); ++p) {
7c673cae
FG
2229 _poolstat_submit(p->second);
2230 logger->inc(l_osdc_poolstat_resend);
2231 }
2232
f67539c2 2233 for (auto p = statfs_ops.begin(); p != statfs_ops.end(); ++p) {
7c673cae
FG
2234 _fs_stats_submit(p->second);
2235 logger->inc(l_osdc_statfs_resend);
2236 }
2237
f67539c2 2238 for (auto p = pool_ops.begin(); p != pool_ops.end(); ++p) {
7c673cae
FG
2239 _pool_op_submit(p->second);
2240 logger->inc(l_osdc_poolop_resend);
2241 }
2242
f67539c2 2243 for (auto p = check_latest_map_ops.begin();
7c673cae
FG
2244 p != check_latest_map_ops.end();
2245 ++p) {
f67539c2 2246 monc->get_version("osdmap", CB_Op_Map_Latest(this, p->second->tid));
7c673cae
FG
2247 }
2248
f67539c2 2249 for (auto p = check_latest_map_lingers.begin();
7c673cae
FG
2250 p != check_latest_map_lingers.end();
2251 ++p) {
f67539c2 2252 monc->get_version("osdmap", CB_Linger_Map_Latest(this, p->second->linger_id));
7c673cae
FG
2253 }
2254
f67539c2 2255 for (auto p = check_latest_map_commands.begin();
7c673cae
FG
2256 p != check_latest_map_commands.end();
2257 ++p) {
f67539c2 2258 monc->get_version("osdmap", CB_Command_Map_Latest(this, p->second->tid));
7c673cae
FG
2259 }
2260}
2261
2262// read | write ---------------------------
2263
2264void Objecter::op_submit(Op *op, ceph_tid_t *ptid, int *ctx_budget)
2265{
2266 shunique_lock rl(rwlock, ceph::acquire_shared);
2267 ceph_tid_t tid = 0;
2268 if (!ptid)
2269 ptid = &tid;
31f18b77 2270 op->trace.event("op submit");
7c673cae
FG
2271 _op_submit_with_budget(op, rl, ptid, ctx_budget);
2272}
2273
f67539c2
TL
2274void Objecter::_op_submit_with_budget(Op *op,
2275 shunique_lock<ceph::shared_mutex>& sul,
7c673cae
FG
2276 ceph_tid_t *ptid,
2277 int *ctx_budget)
2278{
11fdf7f2 2279 ceph_assert(initialized);
7c673cae 2280
11fdf7f2
TL
2281 ceph_assert(op->ops.size() == op->out_bl.size());
2282 ceph_assert(op->ops.size() == op->out_rval.size());
2283 ceph_assert(op->ops.size() == op->out_handler.size());
7c673cae
FG
2284
2285 // throttle. before we look at any state, because
2286 // _take_op_budget() may drop our lock while it blocks.
2287 if (!op->ctx_budgeted || (ctx_budget && (*ctx_budget == -1))) {
2288 int op_budget = _take_op_budget(op, sul);
2289 // take and pass out the budget for the first OP
2290 // in the context session
2291 if (ctx_budget && (*ctx_budget == -1)) {
2292 *ctx_budget = op_budget;
2293 }
2294 }
2295
2296 if (osd_timeout > timespan(0)) {
2297 if (op->tid == 0)
31f18b77 2298 op->tid = ++last_tid;
7c673cae
FG
2299 auto tid = op->tid;
2300 op->ontimeout = timer.add_event(osd_timeout,
2301 [this, tid]() {
2302 op_cancel(tid, -ETIMEDOUT); });
2303 }
2304
2305 _op_submit(op, sul, ptid);
2306}
2307
2308void Objecter::_send_op_account(Op *op)
2309{
31f18b77 2310 inflight_ops++;
7c673cae
FG
2311
2312 // add to gather set(s)
f67539c2 2313 if (op->has_completion()) {
31f18b77 2314 num_in_flight++;
7c673cae
FG
2315 } else {
2316 ldout(cct, 20) << " note: not requesting reply" << dendl;
2317 }
2318
2319 logger->inc(l_osdc_op_active);
2320 logger->inc(l_osdc_op);
f67539c2 2321 logger->inc(l_osdc_oplen_avg, op->ops.size());
7c673cae
FG
2322
2323 if ((op->target.flags & (CEPH_OSD_FLAG_READ | CEPH_OSD_FLAG_WRITE)) ==
2324 (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE))
2325 logger->inc(l_osdc_op_rmw);
2326 else if (op->target.flags & CEPH_OSD_FLAG_WRITE)
2327 logger->inc(l_osdc_op_w);
2328 else if (op->target.flags & CEPH_OSD_FLAG_READ)
2329 logger->inc(l_osdc_op_r);
2330
2331 if (op->target.flags & CEPH_OSD_FLAG_PGOP)
2332 logger->inc(l_osdc_op_pg);
2333
f67539c2 2334 for (auto p = op->ops.begin(); p != op->ops.end(); ++p) {
7c673cae
FG
2335 int code = l_osdc_osdop_other;
2336 switch (p->op.op) {
2337 case CEPH_OSD_OP_STAT: code = l_osdc_osdop_stat; break;
2338 case CEPH_OSD_OP_CREATE: code = l_osdc_osdop_create; break;
2339 case CEPH_OSD_OP_READ: code = l_osdc_osdop_read; break;
2340 case CEPH_OSD_OP_WRITE: code = l_osdc_osdop_write; break;
2341 case CEPH_OSD_OP_WRITEFULL: code = l_osdc_osdop_writefull; break;
2342 case CEPH_OSD_OP_WRITESAME: code = l_osdc_osdop_writesame; break;
2343 case CEPH_OSD_OP_APPEND: code = l_osdc_osdop_append; break;
2344 case CEPH_OSD_OP_ZERO: code = l_osdc_osdop_zero; break;
2345 case CEPH_OSD_OP_TRUNCATE: code = l_osdc_osdop_truncate; break;
2346 case CEPH_OSD_OP_DELETE: code = l_osdc_osdop_delete; break;
2347 case CEPH_OSD_OP_MAPEXT: code = l_osdc_osdop_mapext; break;
2348 case CEPH_OSD_OP_SPARSE_READ: code = l_osdc_osdop_sparse_read; break;
2349 case CEPH_OSD_OP_GETXATTR: code = l_osdc_osdop_getxattr; break;
2350 case CEPH_OSD_OP_SETXATTR: code = l_osdc_osdop_setxattr; break;
2351 case CEPH_OSD_OP_CMPXATTR: code = l_osdc_osdop_cmpxattr; break;
2352 case CEPH_OSD_OP_RMXATTR: code = l_osdc_osdop_rmxattr; break;
2353 case CEPH_OSD_OP_RESETXATTRS: code = l_osdc_osdop_resetxattrs; break;
7c673cae
FG
2354
2355 // OMAP read operations
2356 case CEPH_OSD_OP_OMAPGETVALS:
2357 case CEPH_OSD_OP_OMAPGETKEYS:
2358 case CEPH_OSD_OP_OMAPGETHEADER:
2359 case CEPH_OSD_OP_OMAPGETVALSBYKEYS:
2360 case CEPH_OSD_OP_OMAP_CMP: code = l_osdc_osdop_omap_rd; break;
2361
2362 // OMAP write operations
2363 case CEPH_OSD_OP_OMAPSETVALS:
2364 case CEPH_OSD_OP_OMAPSETHEADER: code = l_osdc_osdop_omap_wr; break;
2365
2366 // OMAP del operations
2367 case CEPH_OSD_OP_OMAPCLEAR:
2368 case CEPH_OSD_OP_OMAPRMKEYS: code = l_osdc_osdop_omap_del; break;
2369
2370 case CEPH_OSD_OP_CALL: code = l_osdc_osdop_call; break;
2371 case CEPH_OSD_OP_WATCH: code = l_osdc_osdop_watch; break;
2372 case CEPH_OSD_OP_NOTIFY: code = l_osdc_osdop_notify; break;
2373 }
2374 if (code)
2375 logger->inc(code);
2376 }
2377}
2378
f67539c2 2379void Objecter::_op_submit(Op *op, shunique_lock<ceph::shared_mutex>& sul, ceph_tid_t *ptid)
7c673cae
FG
2380{
2381 // rwlock is locked
2382
2383 ldout(cct, 10) << __func__ << " op " << op << dendl;
2384
2385 // pick target
11fdf7f2 2386 ceph_assert(op->session == NULL);
7c673cae
FG
2387 OSDSession *s = NULL;
2388
20effc67
TL
2389 bool check_for_latest_map = false;
2390 int r = _calc_target(&op->target, nullptr);
2391 switch(r) {
2392 case RECALC_OP_TARGET_POOL_DNE:
2393 check_for_latest_map = true;
2394 break;
2395 case RECALC_OP_TARGET_POOL_EIO:
2396 if (op->has_completion()) {
2397 op->complete(osdc_errc::pool_eio, -EIO);
2398 }
2399 return;
2400 }
7c673cae
FG
2401
2402 // Try to get a session, including a retry if we need to take write lock
20effc67 2403 r = _get_session(op->target.osd, &s, sul);
7c673cae 2404 if (r == -EAGAIN ||
11fdf7f2
TL
2405 (check_for_latest_map && sul.owns_lock_shared()) ||
2406 cct->_conf->objecter_debug_inject_relock_delay) {
7c673cae
FG
2407 epoch_t orig_epoch = osdmap->get_epoch();
2408 sul.unlock();
2409 if (cct->_conf->objecter_debug_inject_relock_delay) {
2410 sleep(1);
2411 }
2412 sul.lock();
2413 if (orig_epoch != osdmap->get_epoch()) {
2414 // map changed; recalculate mapping
2415 ldout(cct, 10) << __func__ << " relock raced with osdmap, recalc target"
2416 << dendl;
2417 check_for_latest_map = _calc_target(&op->target, nullptr)
2418 == RECALC_OP_TARGET_POOL_DNE;
2419 if (s) {
2420 put_session(s);
2421 s = NULL;
2422 r = -EAGAIN;
2423 }
2424 }
2425 }
2426 if (r == -EAGAIN) {
11fdf7f2 2427 ceph_assert(s == NULL);
7c673cae
FG
2428 r = _get_session(op->target.osd, &s, sul);
2429 }
11fdf7f2
TL
2430 ceph_assert(r == 0);
2431 ceph_assert(s); // may be homeless
7c673cae
FG
2432
2433 _send_op_account(op);
2434
2435 // send?
2436
11fdf7f2 2437 ceph_assert(op->target.flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE));
7c673cae 2438
7c673cae 2439 bool need_send = false;
9f95a23c
TL
2440 if (op->target.paused) {
2441 ldout(cct, 10) << " tid " << op->tid << " op " << op << " is paused"
7c673cae 2442 << dendl;
7c673cae
FG
2443 _maybe_request_map();
2444 } else if (!s->is_homeless()) {
2445 need_send = true;
2446 } else {
2447 _maybe_request_map();
2448 }
2449
f67539c2 2450 unique_lock sl(s->lock);
7c673cae 2451 if (op->tid == 0)
31f18b77 2452 op->tid = ++last_tid;
7c673cae
FG
2453
2454 ldout(cct, 10) << "_op_submit oid " << op->target.base_oid
2455 << " '" << op->target.base_oloc << "' '"
2456 << op->target.target_oloc << "' " << op->ops << " tid "
2457 << op->tid << " osd." << (!s->is_homeless() ? s->osd : -1)
2458 << dendl;
2459
2460 _session_op_assign(s, op);
2461
2462 if (need_send) {
11fdf7f2 2463 _send_op(op);
7c673cae
FG
2464 }
2465
2466 // Last chance to touch Op here, after giving up session lock it can
2467 // be freed at any time by response handler.
2468 ceph_tid_t tid = op->tid;
2469 if (check_for_latest_map) {
2470 _send_op_map_check(op);
2471 }
2472 if (ptid)
2473 *ptid = tid;
2474 op = NULL;
2475
2476 sl.unlock();
2477 put_session(s);
2478
31f18b77 2479 ldout(cct, 5) << num_in_flight << " in flight" << dendl;
7c673cae
FG
2480}
2481
2482int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r)
2483{
11fdf7f2 2484 ceph_assert(initialized);
7c673cae 2485
f67539c2 2486 unique_lock sl(s->lock);
7c673cae 2487
f67539c2 2488 auto p = s->ops.find(tid);
7c673cae
FG
2489 if (p == s->ops.end()) {
2490 ldout(cct, 10) << __func__ << " tid " << tid << " dne in session "
2491 << s->osd << dendl;
2492 return -ENOENT;
2493 }
2494
11fdf7f2 2495#if 0
7c673cae 2496 if (s->con) {
9f95a23c 2497 ldout(cct, 20) << " revoking rx ceph::buffer for " << tid
7c673cae
FG
2498 << " on " << s->con << dendl;
2499 s->con->revoke_rx_buffer(tid);
2500 }
11fdf7f2 2501#endif
7c673cae
FG
2502
2503 ldout(cct, 10) << __func__ << " tid " << tid << " in session " << s->osd
2504 << dendl;
2505 Op *op = p->second;
f67539c2 2506 if (op->has_completion()) {
31f18b77 2507 num_in_flight--;
f67539c2 2508 op->complete(osdcode(r), r);
7c673cae
FG
2509 }
2510 _op_cancel_map_check(op);
2511 _finish_op(op, r);
2512 sl.unlock();
2513
2514 return 0;
2515}
2516
2517int Objecter::op_cancel(ceph_tid_t tid, int r)
2518{
2519 int ret = 0;
2520
2521 unique_lock wl(rwlock);
2522 ret = _op_cancel(tid, r);
2523
2524 return ret;
2525}
2526
94b18763
FG
2527int Objecter::op_cancel(const vector<ceph_tid_t>& tids, int r)
2528{
2529 unique_lock wl(rwlock);
2530 ldout(cct,10) << __func__ << " " << tids << dendl;
2531 for (auto tid : tids) {
2532 _op_cancel(tid, r);
2533 }
2534 return 0;
2535}
2536
7c673cae
FG
2537int Objecter::_op_cancel(ceph_tid_t tid, int r)
2538{
2539 int ret = 0;
2540
2541 ldout(cct, 5) << __func__ << ": cancelling tid " << tid << " r=" << r
2542 << dendl;
2543
2544start:
2545
f67539c2 2546 for (auto siter = osd_sessions.begin();
7c673cae
FG
2547 siter != osd_sessions.end(); ++siter) {
2548 OSDSession *s = siter->second;
f67539c2 2549 shared_lock sl(s->lock);
7c673cae
FG
2550 if (s->ops.find(tid) != s->ops.end()) {
2551 sl.unlock();
2552 ret = op_cancel(s, tid, r);
2553 if (ret == -ENOENT) {
2554 /* oh no! raced, maybe tid moved to another session, restarting */
2555 goto start;
2556 }
2557 return ret;
2558 }
2559 }
2560
2561 ldout(cct, 5) << __func__ << ": tid " << tid
2562 << " not found in live sessions" << dendl;
2563
2564 // Handle case where the op is in homeless session
f67539c2 2565 shared_lock sl(homeless_session->lock);
7c673cae
FG
2566 if (homeless_session->ops.find(tid) != homeless_session->ops.end()) {
2567 sl.unlock();
2568 ret = op_cancel(homeless_session, tid, r);
2569 if (ret == -ENOENT) {
2570 /* oh no! raced, maybe tid moved to another session, restarting */
2571 goto start;
2572 } else {
2573 return ret;
2574 }
2575 } else {
2576 sl.unlock();
2577 }
2578
2579 ldout(cct, 5) << __func__ << ": tid " << tid
2580 << " not found in homeless session" << dendl;
2581
2582 return ret;
2583}
2584
2585
2586epoch_t Objecter::op_cancel_writes(int r, int64_t pool)
2587{
2588 unique_lock wl(rwlock);
2589
2590 std::vector<ceph_tid_t> to_cancel;
2591 bool found = false;
2592
f67539c2 2593 for (auto siter = osd_sessions.begin();
7c673cae
FG
2594 siter != osd_sessions.end(); ++siter) {
2595 OSDSession *s = siter->second;
f67539c2
TL
2596 shared_lock sl(s->lock);
2597 for (auto op_i = s->ops.begin();
7c673cae
FG
2598 op_i != s->ops.end(); ++op_i) {
2599 if (op_i->second->target.flags & CEPH_OSD_FLAG_WRITE
2600 && (pool == -1 || op_i->second->target.target_oloc.pool == pool)) {
2601 to_cancel.push_back(op_i->first);
2602 }
2603 }
2604 sl.unlock();
2605
f67539c2 2606 for (auto titer = to_cancel.begin(); titer != to_cancel.end(); ++titer) {
7c673cae
FG
2607 int cancel_result = op_cancel(s, *titer, r);
2608 // We hold rwlock across search and cancellation, so cancels
2609 // should always succeed
11fdf7f2 2610 ceph_assert(cancel_result == 0);
7c673cae
FG
2611 }
2612 if (!found && to_cancel.size())
2613 found = true;
2614 to_cancel.clear();
2615 }
2616
2617 const epoch_t epoch = osdmap->get_epoch();
2618
2619 wl.unlock();
2620
2621 if (found) {
2622 return epoch;
2623 } else {
2624 return -1;
2625 }
2626}
2627
2628bool Objecter::is_pg_changed(
2629 int oldprimary,
2630 const vector<int>& oldacting,
2631 int newprimary,
2632 const vector<int>& newacting,
2633 bool any_change)
2634{
9f95a23c 2635 if (OSDMap::primary_changed_broken( // https://tracker.ceph.com/issues/43213
7c673cae
FG
2636 oldprimary,
2637 oldacting,
2638 newprimary,
2639 newacting))
2640 return true;
2641 if (any_change && oldacting != newacting)
2642 return true;
2643 return false; // same primary (tho replicas may have changed)
2644}
2645
2646bool Objecter::target_should_be_paused(op_target_t *t)
2647{
2648 const pg_pool_t *pi = osdmap->get_pg_pool(t->base_oloc.pool);
2649 bool pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
2650 bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) ||
9f95a23c 2651 (t->respects_full() && (_osdmap_full_flag() || _osdmap_pool_full(*pi)));
7c673cae
FG
2652
2653 return (t->flags & CEPH_OSD_FLAG_READ && pauserd) ||
2654 (t->flags & CEPH_OSD_FLAG_WRITE && pausewr) ||
2655 (osdmap->get_epoch() < epoch_barrier);
2656}
2657
2658/**
2659 * Locking public accessor for _osdmap_full_flag
2660 */
2661bool Objecter::osdmap_full_flag() const
2662{
2663 shared_lock rl(rwlock);
2664
2665 return _osdmap_full_flag();
2666}
2667
2668bool Objecter::osdmap_pool_full(const int64_t pool_id) const
2669{
2670 shared_lock rl(rwlock);
2671
2672 if (_osdmap_full_flag()) {
2673 return true;
2674 }
2675
2676 return _osdmap_pool_full(pool_id);
2677}
2678
2679bool Objecter::_osdmap_pool_full(const int64_t pool_id) const
2680{
2681 const pg_pool_t *pool = osdmap->get_pg_pool(pool_id);
2682 if (pool == NULL) {
2683 ldout(cct, 4) << __func__ << ": DNE pool " << pool_id << dendl;
2684 return false;
2685 }
2686
2687 return _osdmap_pool_full(*pool);
2688}
2689
2690bool Objecter::_osdmap_has_pool_full() const
2691{
f67539c2 2692 for (auto it = osdmap->get_pools().begin();
7c673cae
FG
2693 it != osdmap->get_pools().end(); ++it) {
2694 if (_osdmap_pool_full(it->second))
2695 return true;
2696 }
2697 return false;
2698}
2699
7c673cae
FG
2700/**
2701 * Wrapper around osdmap->test_flag for special handling of the FULL flag.
2702 */
2703bool Objecter::_osdmap_full_flag() const
2704{
11fdf7f2 2705 // Ignore the FULL flag if the caller does not have honor_osdmap_full
9f95a23c 2706 return osdmap->test_flag(CEPH_OSDMAP_FULL) && honor_pool_full;
7c673cae
FG
2707}
2708
2709void Objecter::update_pool_full_map(map<int64_t, bool>& pool_full_map)
2710{
2711 for (map<int64_t, pg_pool_t>::const_iterator it
2712 = osdmap->get_pools().begin();
2713 it != osdmap->get_pools().end(); ++it) {
2714 if (pool_full_map.find(it->first) == pool_full_map.end()) {
2715 pool_full_map[it->first] = _osdmap_pool_full(it->second);
2716 } else {
2717 pool_full_map[it->first] = _osdmap_pool_full(it->second) ||
2718 pool_full_map[it->first];
2719 }
2720 }
2721}
2722
2723int64_t Objecter::get_object_hash_position(int64_t pool, const string& key,
2724 const string& ns)
2725{
2726 shared_lock rl(rwlock);
2727 const pg_pool_t *p = osdmap->get_pg_pool(pool);
2728 if (!p)
2729 return -ENOENT;
2730 return p->hash_key(key, ns);
2731}
2732
2733int64_t Objecter::get_object_pg_hash_position(int64_t pool, const string& key,
2734 const string& ns)
2735{
2736 shared_lock rl(rwlock);
2737 const pg_pool_t *p = osdmap->get_pg_pool(pool);
2738 if (!p)
2739 return -ENOENT;
2740 return p->raw_hash_to_pg(p->hash_key(key, ns));
2741}
2742
11fdf7f2
TL
2743void Objecter::_prune_snapc(
2744 const mempool::osdmap::map<int64_t,
9f95a23c 2745 snap_interval_set_t>& new_removed_snaps,
11fdf7f2
TL
2746 Op *op)
2747{
2748 bool match = false;
2749 auto i = new_removed_snaps.find(op->target.base_pgid.pool());
2750 if (i != new_removed_snaps.end()) {
2751 for (auto s : op->snapc.snaps) {
2752 if (i->second.contains(s)) {
2753 match = true;
2754 break;
2755 }
2756 }
2757 if (match) {
2758 vector<snapid_t> new_snaps;
2759 for (auto s : op->snapc.snaps) {
2760 if (!i->second.contains(s)) {
2761 new_snaps.push_back(s);
2762 }
2763 }
2764 op->snapc.snaps.swap(new_snaps);
2765 ldout(cct,10) << __func__ << " op " << op->tid << " snapc " << op->snapc
2766 << " (was " << new_snaps << ")" << dendl;
2767 }
2768 }
2769}
2770
7c673cae
FG
2771int Objecter::_calc_target(op_target_t *t, Connection *con, bool any_change)
2772{
2773 // rwlock is locked
2774 bool is_read = t->flags & CEPH_OSD_FLAG_READ;
2775 bool is_write = t->flags & CEPH_OSD_FLAG_WRITE;
2776 t->epoch = osdmap->get_epoch();
2777 ldout(cct,20) << __func__ << " epoch " << t->epoch
2778 << " base " << t->base_oid << " " << t->base_oloc
2779 << " precalc_pgid " << (int)t->precalc_pgid
2780 << " pgid " << t->base_pgid
2781 << (is_read ? " is_read" : "")
2782 << (is_write ? " is_write" : "")
2783 << dendl;
2784
2785 const pg_pool_t *pi = osdmap->get_pg_pool(t->base_oloc.pool);
2786 if (!pi) {
2787 t->osd = -1;
2788 return RECALC_OP_TARGET_POOL_DNE;
2789 }
20effc67
TL
2790
2791 if (pi->has_flag(pg_pool_t::FLAG_EIO)) {
2792 return RECALC_OP_TARGET_POOL_EIO;
2793 }
2794
7c673cae
FG
2795 ldout(cct,30) << __func__ << " base pi " << pi
2796 << " pg_num " << pi->get_pg_num() << dendl;
2797
2798 bool force_resend = false;
2799 if (osdmap->get_epoch() == pi->last_force_op_resend) {
2800 if (t->last_force_resend < pi->last_force_op_resend) {
2801 t->last_force_resend = pi->last_force_op_resend;
2802 force_resend = true;
2803 } else if (t->last_force_resend == 0) {
2804 force_resend = true;
2805 }
2806 }
2807
2808 // apply tiering
2809 t->target_oid = t->base_oid;
2810 t->target_oloc = t->base_oloc;
2811 if ((t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
2812 if (is_read && pi->has_read_tier())
2813 t->target_oloc.pool = pi->read_tier;
2814 if (is_write && pi->has_write_tier())
2815 t->target_oloc.pool = pi->write_tier;
2816 pi = osdmap->get_pg_pool(t->target_oloc.pool);
2817 if (!pi) {
2818 t->osd = -1;
2819 return RECALC_OP_TARGET_POOL_DNE;
2820 }
2821 }
2822
2823 pg_t pgid;
2824 if (t->precalc_pgid) {
11fdf7f2
TL
2825 ceph_assert(t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY);
2826 ceph_assert(t->base_oid.name.empty()); // make sure this is a pg op
2827 ceph_assert(t->base_oloc.pool == (int64_t)t->base_pgid.pool());
7c673cae
FG
2828 pgid = t->base_pgid;
2829 } else {
2830 int ret = osdmap->object_locator_to_pg(t->target_oid, t->target_oloc,
2831 pgid);
2832 if (ret == -ENOENT) {
2833 t->osd = -1;
2834 return RECALC_OP_TARGET_POOL_DNE;
2835 }
2836 }
2837 ldout(cct,20) << __func__ << " target " << t->target_oid << " "
2838 << t->target_oloc << " -> pgid " << pgid << dendl;
2839 ldout(cct,30) << __func__ << " target pi " << pi
2840 << " pg_num " << pi->get_pg_num() << dendl;
2841 t->pool_ever_existed = true;
2842
2843 int size = pi->size;
2844 int min_size = pi->min_size;
2845 unsigned pg_num = pi->get_pg_num();
9f95a23c 2846 unsigned pg_num_mask = pi->get_pg_num_mask();
11fdf7f2 2847 unsigned pg_num_pending = pi->get_pg_num_pending();
7c673cae
FG
2848 int up_primary, acting_primary;
2849 vector<int> up, acting;
9f95a23c
TL
2850 ps_t actual_ps = ceph_stable_mod(pgid.ps(), pg_num, pg_num_mask);
2851 pg_t actual_pgid(actual_ps, pgid.pool());
20effc67
TL
2852 if (!lookup_pg_mapping(actual_pgid, osdmap->get_epoch(), &up, &up_primary,
2853 &acting, &acting_primary)) {
9f95a23c
TL
2854 osdmap->pg_to_up_acting_osds(actual_pgid, &up, &up_primary,
2855 &acting, &acting_primary);
2856 pg_mapping_t pg_mapping(osdmap->get_epoch(),
2857 up, up_primary, acting, acting_primary);
2858 update_pg_mapping(actual_pgid, std::move(pg_mapping));
2859 }
7c673cae 2860 bool sort_bitwise = osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE);
c07f9fc5 2861 bool recovery_deletes = osdmap->test_flag(CEPH_OSDMAP_RECOVERY_DELETES);
7c673cae
FG
2862 unsigned prev_seed = ceph_stable_mod(pgid.ps(), t->pg_num, t->pg_num_mask);
2863 pg_t prev_pgid(prev_seed, pgid.pool());
2864 if (any_change && PastIntervals::is_new_interval(
2865 t->acting_primary,
2866 acting_primary,
2867 t->acting,
2868 acting,
2869 t->up_primary,
2870 up_primary,
2871 t->up,
2872 up,
2873 t->size,
2874 size,
2875 t->min_size,
2876 min_size,
2877 t->pg_num,
2878 pg_num,
11fdf7f2
TL
2879 t->pg_num_pending,
2880 pg_num_pending,
7c673cae
FG
2881 t->sort_bitwise,
2882 sort_bitwise,
c07f9fc5
FG
2883 t->recovery_deletes,
2884 recovery_deletes,
f67539c2
TL
2885 t->peering_crush_bucket_count,
2886 pi->peering_crush_bucket_count,
2887 t->peering_crush_bucket_target,
2888 pi->peering_crush_bucket_target,
2889 t->peering_crush_bucket_barrier,
2890 pi->peering_crush_bucket_barrier,
2891 t->peering_crush_mandatory_member,
2892 pi->peering_crush_mandatory_member,
7c673cae
FG
2893 prev_pgid)) {
2894 force_resend = true;
2895 }
2896
2897 bool unpaused = false;
f64942e4
AA
2898 bool should_be_paused = target_should_be_paused(t);
2899 if (t->paused && !should_be_paused) {
7c673cae
FG
2900 unpaused = true;
2901 }
9f95a23c
TL
2902 if (t->paused != should_be_paused) {
2903 ldout(cct, 10) << __func__ << " paused " << t->paused
2904 << " -> " << should_be_paused << dendl;
2905 t->paused = should_be_paused;
2906 }
7c673cae
FG
2907
2908 bool legacy_change =
2909 t->pgid != pgid ||
2910 is_pg_changed(
2911 t->acting_primary, t->acting, acting_primary, acting,
2912 t->used_replica || any_change);
11fdf7f2 2913 bool split_or_merge = false;
7c673cae 2914 if (t->pg_num) {
11fdf7f2
TL
2915 split_or_merge =
2916 prev_pgid.is_split(t->pg_num, pg_num, nullptr) ||
2917 prev_pgid.is_merge_source(t->pg_num, pg_num, nullptr) ||
2918 prev_pgid.is_merge_target(t->pg_num, pg_num);
7c673cae
FG
2919 }
2920
11fdf7f2 2921 if (legacy_change || split_or_merge || force_resend) {
7c673cae 2922 t->pgid = pgid;
20effc67 2923 t->acting = std::move(acting);
7c673cae
FG
2924 t->acting_primary = acting_primary;
2925 t->up_primary = up_primary;
20effc67 2926 t->up = std::move(up);
7c673cae
FG
2927 t->size = size;
2928 t->min_size = min_size;
2929 t->pg_num = pg_num;
9f95a23c 2930 t->pg_num_mask = pg_num_mask;
11fdf7f2 2931 t->pg_num_pending = pg_num_pending;
9f95a23c
TL
2932 spg_t spgid(actual_pgid);
2933 if (pi->is_erasure()) {
20effc67
TL
2934 for (uint8_t i = 0; i < t->acting.size(); ++i) {
2935 if (t->acting[i] == acting_primary) {
9f95a23c
TL
2936 spgid.reset_shard(shard_id_t(i));
2937 break;
2938 }
2939 }
2940 }
2941 t->actual_pgid = spgid;
7c673cae 2942 t->sort_bitwise = sort_bitwise;
c07f9fc5 2943 t->recovery_deletes = recovery_deletes;
f67539c2
TL
2944 t->peering_crush_bucket_count = pi->peering_crush_bucket_count;
2945 t->peering_crush_bucket_target = pi->peering_crush_bucket_target;
2946 t->peering_crush_bucket_barrier = pi->peering_crush_bucket_barrier;
2947 t->peering_crush_mandatory_member = pi->peering_crush_mandatory_member;
7c673cae
FG
2948 ldout(cct, 10) << __func__ << " "
2949 << " raw pgid " << pgid << " -> actual " << t->actual_pgid
20effc67 2950 << " acting " << t->acting
7c673cae
FG
2951 << " primary " << acting_primary << dendl;
2952 t->used_replica = false;
e306af50
TL
2953 if ((t->flags & (CEPH_OSD_FLAG_BALANCE_READS |
2954 CEPH_OSD_FLAG_LOCALIZE_READS)) &&
20effc67 2955 !is_write && pi->is_replicated() && t->acting.size() > 1) {
7c673cae 2956 int osd;
20effc67 2957 ceph_assert(is_read && t->acting[0] == acting_primary);
e306af50 2958 if (t->flags & CEPH_OSD_FLAG_BALANCE_READS) {
20effc67 2959 int p = rand() % t->acting.size();
7c673cae
FG
2960 if (p)
2961 t->used_replica = true;
20effc67
TL
2962 osd = t->acting[p];
2963 ldout(cct, 10) << " chose random osd." << osd << " of " << t->acting
7c673cae 2964 << dendl;
e306af50 2965 } else {
7c673cae
FG
2966 // look for a local replica. prefer the primary if the
2967 // distance is the same.
2968 int best = -1;
2969 int best_locality = 0;
20effc67 2970 for (unsigned i = 0; i < t->acting.size(); ++i) {
7c673cae 2971 int locality = osdmap->crush->get_common_ancestor_distance(
20effc67 2972 cct, t->acting[i], crush_location);
7c673cae 2973 ldout(cct, 20) << __func__ << " localize: rank " << i
20effc67 2974 << " osd." << t->acting[i]
7c673cae
FG
2975 << " locality " << locality << dendl;
2976 if (i == 0 ||
2977 (locality >= 0 && best_locality >= 0 &&
2978 locality < best_locality) ||
2979 (best_locality < 0 && locality >= 0)) {
2980 best = i;
2981 best_locality = locality;
2982 if (i)
2983 t->used_replica = true;
2984 }
2985 }
11fdf7f2 2986 ceph_assert(best >= 0);
20effc67 2987 osd = t->acting[best];
7c673cae
FG
2988 }
2989 t->osd = osd;
e306af50
TL
2990 } else {
2991 t->osd = acting_primary;
7c673cae
FG
2992 }
2993 }
2994 if (legacy_change || unpaused || force_resend) {
2995 return RECALC_OP_TARGET_NEED_RESEND;
2996 }
11fdf7f2 2997 if (split_or_merge &&
9f95a23c 2998 (osdmap->require_osd_release >= ceph_release_t::luminous ||
91327a77
AA
2999 HAVE_FEATURE(osdmap->get_xinfo(acting_primary).features,
3000 RESEND_ON_SPLIT))) {
7c673cae
FG
3001 return RECALC_OP_TARGET_NEED_RESEND;
3002 }
3003 return RECALC_OP_TARGET_NO_ACTION;
3004}
3005
3006int Objecter::_map_session(op_target_t *target, OSDSession **s,
f67539c2 3007 shunique_lock<ceph::shared_mutex>& sul)
7c673cae
FG
3008{
3009 _calc_target(target, nullptr);
3010 return _get_session(target->osd, s, sul);
3011}
3012
3013void Objecter::_session_op_assign(OSDSession *to, Op *op)
3014{
3015 // to->lock is locked
11fdf7f2
TL
3016 ceph_assert(op->session == NULL);
3017 ceph_assert(op->tid);
7c673cae
FG
3018
3019 get_session(to);
3020 op->session = to;
3021 to->ops[op->tid] = op;
3022
3023 if (to->is_homeless()) {
31f18b77 3024 num_homeless_ops++;
7c673cae
FG
3025 }
3026
3027 ldout(cct, 15) << __func__ << " " << to->osd << " " << op->tid << dendl;
3028}
3029
3030void Objecter::_session_op_remove(OSDSession *from, Op *op)
3031{
11fdf7f2 3032 ceph_assert(op->session == from);
7c673cae
FG
3033 // from->lock is locked
3034
3035 if (from->is_homeless()) {
31f18b77 3036 num_homeless_ops--;
7c673cae
FG
3037 }
3038
3039 from->ops.erase(op->tid);
3040 put_session(from);
3041 op->session = NULL;
3042
3043 ldout(cct, 15) << __func__ << " " << from->osd << " " << op->tid << dendl;
3044}
3045
3046void Objecter::_session_linger_op_assign(OSDSession *to, LingerOp *op)
3047{
3048 // to lock is locked unique
11fdf7f2 3049 ceph_assert(op->session == NULL);
7c673cae
FG
3050
3051 if (to->is_homeless()) {
31f18b77 3052 num_homeless_ops++;
7c673cae
FG
3053 }
3054
3055 get_session(to);
3056 op->session = to;
3057 to->linger_ops[op->linger_id] = op;
3058
3059 ldout(cct, 15) << __func__ << " " << to->osd << " " << op->linger_id
3060 << dendl;
3061}
3062
3063void Objecter::_session_linger_op_remove(OSDSession *from, LingerOp *op)
3064{
11fdf7f2 3065 ceph_assert(from == op->session);
7c673cae
FG
3066 // from->lock is locked unique
3067
3068 if (from->is_homeless()) {
31f18b77 3069 num_homeless_ops--;
7c673cae
FG
3070 }
3071
3072 from->linger_ops.erase(op->linger_id);
3073 put_session(from);
3074 op->session = NULL;
3075
3076 ldout(cct, 15) << __func__ << " " << from->osd << " " << op->linger_id
3077 << dendl;
3078}
3079
3080void Objecter::_session_command_op_remove(OSDSession *from, CommandOp *op)
3081{
11fdf7f2 3082 ceph_assert(from == op->session);
7c673cae
FG
3083 // from->lock is locked
3084
3085 if (from->is_homeless()) {
31f18b77 3086 num_homeless_ops--;
7c673cae
FG
3087 }
3088
3089 from->command_ops.erase(op->tid);
3090 put_session(from);
3091 op->session = NULL;
3092
3093 ldout(cct, 15) << __func__ << " " << from->osd << " " << op->tid << dendl;
3094}
3095
3096void Objecter::_session_command_op_assign(OSDSession *to, CommandOp *op)
3097{
3098 // to->lock is locked
11fdf7f2
TL
3099 ceph_assert(op->session == NULL);
3100 ceph_assert(op->tid);
7c673cae
FG
3101
3102 if (to->is_homeless()) {
31f18b77 3103 num_homeless_ops++;
7c673cae
FG
3104 }
3105
3106 get_session(to);
3107 op->session = to;
3108 to->command_ops[op->tid] = op;
3109
3110 ldout(cct, 15) << __func__ << " " << to->osd << " " << op->tid << dendl;
3111}
3112
3113int Objecter::_recalc_linger_op_target(LingerOp *linger_op,
f67539c2 3114 shunique_lock<ceph::shared_mutex>& sul)
7c673cae
FG
3115{
3116 // rwlock is locked unique
3117
3118 int r = _calc_target(&linger_op->target, nullptr, true);
3119 if (r == RECALC_OP_TARGET_NEED_RESEND) {
3120 ldout(cct, 10) << "recalc_linger_op_target tid " << linger_op->linger_id
3121 << " pgid " << linger_op->target.pgid
3122 << " acting " << linger_op->target.acting << dendl;
3123
3124 OSDSession *s = NULL;
3125 r = _get_session(linger_op->target.osd, &s, sul);
11fdf7f2 3126 ceph_assert(r == 0);
7c673cae
FG
3127
3128 if (linger_op->session != s) {
3129 // NB locking two sessions (s and linger_op->session) at the
3130 // same time here is only safe because we are the only one that
f67539c2
TL
3131 // takes two, and we are holding rwlock for write. We use
3132 // std::shared_mutex in OSDSession because lockdep doesn't know
3133 // that.
3134 unique_lock sl(s->lock);
7c673cae
FG
3135 _session_linger_op_remove(linger_op->session, linger_op);
3136 _session_linger_op_assign(s, linger_op);
3137 }
3138
3139 put_session(s);
3140 return RECALC_OP_TARGET_NEED_RESEND;
3141 }
3142 return r;
3143}
3144
3145void Objecter::_cancel_linger_op(Op *op)
3146{
3147 ldout(cct, 15) << "cancel_op " << op->tid << dendl;
3148
11fdf7f2 3149 ceph_assert(!op->should_resend);
f67539c2
TL
3150 if (op->has_completion()) {
3151 op->onfinish = nullptr;
31f18b77 3152 num_in_flight--;
7c673cae
FG
3153 }
3154
3155 _finish_op(op, 0);
3156}
3157
3158void Objecter::_finish_op(Op *op, int r)
3159{
11fdf7f2 3160 ldout(cct, 15) << __func__ << " " << op->tid << dendl;
7c673cae
FG
3161
3162 // op->session->lock is locked unique or op->session is null
3163
11fdf7f2
TL
3164 if (!op->ctx_budgeted && op->budget >= 0) {
3165 put_op_budget_bytes(op->budget);
3166 op->budget = -1;
3167 }
7c673cae
FG
3168
3169 if (op->ontimeout && r != -ETIMEDOUT)
3170 timer.cancel_event(op->ontimeout);
3171
3172 if (op->session) {
3173 _session_op_remove(op->session, op);
3174 }
3175
3176 logger->dec(l_osdc_op_active);
3177
11fdf7f2 3178 ceph_assert(check_latest_map_ops.find(op->tid) == check_latest_map_ops.end());
7c673cae 3179
31f18b77 3180 inflight_ops--;
7c673cae
FG
3181
3182 op->put();
3183}
3184
f67539c2 3185Objecter::MOSDOp *Objecter::_prepare_osd_op(Op *op)
7c673cae
FG
3186{
3187 // rwlock is locked
3188
3189 int flags = op->target.flags;
3190 flags |= CEPH_OSD_FLAG_KNOWN_REDIR;
20effc67 3191 flags |= CEPH_OSD_FLAG_SUPPORTSPOOLEIO;
7c673cae
FG
3192
3193 // Nothing checks this any longer, but needed for compatibility with
3194 // pre-luminous osds
3195 flags |= CEPH_OSD_FLAG_ONDISK;
3196
9f95a23c 3197 if (!honor_pool_full)
7c673cae
FG
3198 flags |= CEPH_OSD_FLAG_FULL_FORCE;
3199
3200 op->target.paused = false;
11fdf7f2 3201 op->stamp = ceph::coarse_mono_clock::now();
7c673cae
FG
3202
3203 hobject_t hobj = op->target.get_hobj();
f67539c2
TL
3204 auto m = new MOSDOp(client_inc, op->tid,
3205 hobj, op->target.actual_pgid,
3206 osdmap->get_epoch(),
3207 flags, op->features);
7c673cae
FG
3208
3209 m->set_snapid(op->snapid);
3210 m->set_snap_seq(op->snapc.seq);
3211 m->set_snaps(op->snapc.snaps);
3212
3213 m->ops = op->ops;
3214 m->set_mtime(op->mtime);
3215 m->set_retry_attempt(op->attempts++);
31f18b77
FG
3216
3217 if (!op->trace.valid() && cct->_conf->osdc_blkin_trace_all) {
3218 op->trace.init("op", &trace_endpoint);
3219 }
7c673cae
FG
3220
3221 if (op->priority)
3222 m->set_priority(op->priority);
3223 else
3224 m->set_priority(cct->_conf->osd_client_op_priority);
3225
3226 if (op->reqid != osd_reqid_t()) {
3227 m->set_reqid(op->reqid);
3228 }
3229
3230 logger->inc(l_osdc_op_send);
b32b8144
FG
3231 ssize_t sum = 0;
3232 for (unsigned i = 0; i < m->ops.size(); i++) {
3233 sum += m->ops[i].indata.length();
3234 }
3235 logger->inc(l_osdc_op_send_bytes, sum);
7c673cae
FG
3236
3237 return m;
3238}
3239
11fdf7f2 3240void Objecter::_send_op(Op *op)
7c673cae
FG
3241{
3242 // rwlock is locked
3243 // op->session->lock is locked
3244
3245 // backoff?
7c673cae
FG
3246 auto p = op->session->backoffs.find(op->target.actual_pgid);
3247 if (p != op->session->backoffs.end()) {
b32b8144 3248 hobject_t hoid = op->target.get_hobj();
7c673cae
FG
3249 auto q = p->second.lower_bound(hoid);
3250 if (q != p->second.begin()) {
3251 --q;
3252 if (hoid >= q->second.end) {
3253 ++q;
3254 }
3255 }
3256 if (q != p->second.end()) {
3257 ldout(cct, 20) << __func__ << " ? " << q->first << " [" << q->second.begin
3258 << "," << q->second.end << ")" << dendl;
3259 int r = cmp(hoid, q->second.begin);
3260 if (r == 0 || (r > 0 && hoid < q->second.end)) {
3261 ldout(cct, 10) << __func__ << " backoff " << op->target.actual_pgid
3262 << " id " << q->second.id << " on " << hoid
3263 << ", queuing " << op << " tid " << op->tid << dendl;
3264 return;
3265 }
3266 }
3267 }
3268
11fdf7f2
TL
3269 ceph_assert(op->tid > 0);
3270 MOSDOp *m = _prepare_osd_op(op);
7c673cae
FG
3271
3272 if (op->target.actual_pgid != m->get_spg()) {
3273 ldout(cct, 10) << __func__ << " " << op->tid << " pgid change from "
3274 << m->get_spg() << " to " << op->target.actual_pgid
3275 << ", updating and reencoding" << dendl;
3276 m->set_spg(op->target.actual_pgid);
3277 m->clear_payload(); // reencode
3278 }
3279
3280 ldout(cct, 15) << "_send_op " << op->tid << " to "
3281 << op->target.actual_pgid << " on osd." << op->session->osd
3282 << dendl;
3283
3284 ConnectionRef con = op->session->con;
11fdf7f2 3285 ceph_assert(con);
7c673cae 3286
11fdf7f2 3287#if 0
9f95a23c 3288 // preallocated rx ceph::buffer?
7c673cae 3289 if (op->con) {
9f95a23c 3290 ldout(cct, 20) << " revoking rx ceph::buffer for " << op->tid << " on "
7c673cae
FG
3291 << op->con << dendl;
3292 op->con->revoke_rx_buffer(op->tid);
3293 }
3294 if (op->outbl &&
3295 op->ontimeout == 0 && // only post rx_buffer if no timeout; see #9582
3296 op->outbl->length()) {
11fdf7f2 3297 op->outbl->invalidate_crc(); // messenger writes through c_str()
9f95a23c 3298 ldout(cct, 20) << " posting rx ceph::buffer for " << op->tid << " on " << con
7c673cae
FG
3299 << dendl;
3300 op->con = con;
3301 op->con->post_rx_buffer(op->tid, *op->outbl);
3302 }
11fdf7f2 3303#endif
7c673cae
FG
3304
3305 op->incarnation = op->session->incarnation;
3306
31f18b77
FG
3307 if (op->trace.valid()) {
3308 m->trace.init("op msg", nullptr, &op->trace);
3309 }
7c673cae
FG
3310 op->session->con->send_message(m);
3311}
3312
f67539c2 3313int Objecter::calc_op_budget(const bc::small_vector_base<OSDOp>& ops)
7c673cae
FG
3314{
3315 int op_budget = 0;
f67539c2 3316 for (auto i = ops.begin(); i != ops.end(); ++i) {
7c673cae
FG
3317 if (i->op.op & CEPH_OSD_OP_MODE_WR) {
3318 op_budget += i->indata.length();
3319 } else if (ceph_osd_op_mode_read(i->op.op)) {
a8e16298
TL
3320 if (ceph_osd_op_uses_extent(i->op.op)) {
3321 if ((int64_t)i->op.extent.length > 0)
3322 op_budget += (int64_t)i->op.extent.length;
7c673cae 3323 } else if (ceph_osd_op_type_attr(i->op.op)) {
a8e16298 3324 op_budget += i->op.xattr.name_len + i->op.xattr.value_len;
7c673cae
FG
3325 }
3326 }
3327 }
3328 return op_budget;
3329}
3330
3331void Objecter::_throttle_op(Op *op,
f67539c2 3332 shunique_lock<ceph::shared_mutex>& sul,
7c673cae
FG
3333 int op_budget)
3334{
11fdf7f2 3335 ceph_assert(sul && sul.mutex() == &rwlock);
7c673cae
FG
3336 bool locked_for_write = sul.owns_lock();
3337
3338 if (!op_budget)
11fdf7f2 3339 op_budget = calc_op_budget(op->ops);
7c673cae
FG
3340 if (!op_throttle_bytes.get_or_fail(op_budget)) { //couldn't take right now
3341 sul.unlock();
3342 op_throttle_bytes.get(op_budget);
3343 if (locked_for_write)
3344 sul.lock();
3345 else
3346 sul.lock_shared();
3347 }
3348 if (!op_throttle_ops.get_or_fail(1)) { //couldn't take right now
3349 sul.unlock();
3350 op_throttle_ops.get(1);
3351 if (locked_for_write)
3352 sul.lock();
3353 else
3354 sul.lock_shared();
3355 }
3356}
3357
11fdf7f2 3358int Objecter::take_linger_budget(LingerOp *info)
7c673cae 3359{
11fdf7f2 3360 return 1;
7c673cae
FG
3361}
3362
3363/* This function DOES put the passed message before returning */
3364void Objecter::handle_osd_op_reply(MOSDOpReply *m)
3365{
3366 ldout(cct, 10) << "in handle_osd_op_reply" << dendl;
3367
3368 // get pio
3369 ceph_tid_t tid = m->get_tid();
3370
3371 shunique_lock sul(rwlock, ceph::acquire_shared);
31f18b77 3372 if (!initialized) {
7c673cae
FG
3373 m->put();
3374 return;
3375 }
3376
3377 ConnectionRef con = m->get_connection();
11fdf7f2
TL
3378 auto priv = con->get_priv();
3379 auto s = static_cast<OSDSession*>(priv.get());
7c673cae
FG
3380 if (!s || s->con != con) {
3381 ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
7c673cae
FG
3382 m->put();
3383 return;
3384 }
3385
f67539c2 3386 unique_lock sl(s->lock);
7c673cae
FG
3387
3388 map<ceph_tid_t, Op *>::iterator iter = s->ops.find(tid);
3389 if (iter == s->ops.end()) {
3390 ldout(cct, 7) << "handle_osd_op_reply " << tid
3391 << (m->is_ondisk() ? " ondisk" : (m->is_onnvram() ?
3392 " onnvram" : " ack"))
3393 << " ... stray" << dendl;
3394 sl.unlock();
7c673cae
FG
3395 m->put();
3396 return;
3397 }
3398
3399 ldout(cct, 7) << "handle_osd_op_reply " << tid
3400 << (m->is_ondisk() ? " ondisk" :
3401 (m->is_onnvram() ? " onnvram" : " ack"))
3402 << " uv " << m->get_user_version()
3403 << " in " << m->get_pg()
3404 << " attempt " << m->get_retry_attempt()
3405 << dendl;
3406 Op *op = iter->second;
31f18b77 3407 op->trace.event("osd op reply");
7c673cae
FG
3408
3409 if (retry_writes_after_first_reply && op->attempts == 1 &&
3410 (op->target.flags & CEPH_OSD_FLAG_WRITE)) {
3411 ldout(cct, 7) << "retrying write after first reply: " << tid << dendl;
f67539c2 3412 if (op->has_completion()) {
31f18b77 3413 num_in_flight--;
7c673cae
FG
3414 }
3415 _session_op_remove(s, op);
3416 sl.unlock();
7c673cae
FG
3417
3418 _op_submit(op, sul, NULL);
3419 m->put();
3420 return;
3421 }
3422
3423 if (m->get_retry_attempt() >= 0) {
3424 if (m->get_retry_attempt() != (op->attempts - 1)) {
3425 ldout(cct, 7) << " ignoring reply from attempt "
3426 << m->get_retry_attempt()
3427 << " from " << m->get_source_inst()
3428 << "; last attempt " << (op->attempts - 1) << " sent to "
3429 << op->session->con->get_peer_addr() << dendl;
3430 m->put();
3431 sl.unlock();
7c673cae
FG
3432 return;
3433 }
3434 } else {
3435 // we don't know the request attempt because the server is old, so
3436 // just accept this one. we may do ACK callbacks we shouldn't
3437 // have, but that is better than doing callbacks out of order.
3438 }
3439
f67539c2 3440 decltype(op->onfinish) onfinish;
7c673cae
FG
3441
3442 int rc = m->get_result();
3443
3444 if (m->is_redirect_reply()) {
3445 ldout(cct, 5) << " got redirect reply; redirecting" << dendl;
f67539c2 3446 if (op->has_completion())
31f18b77 3447 num_in_flight--;
7c673cae
FG
3448 _session_op_remove(s, op);
3449 sl.unlock();
7c673cae
FG
3450
3451 // FIXME: two redirects could race and reorder
3452
3453 op->tid = 0;
3454 m->get_redirect().combine_with_locator(op->target.target_oloc,
3455 op->target.target_oid.name);
f64942e4
AA
3456 op->target.flags |= (CEPH_OSD_FLAG_REDIRECTED |
3457 CEPH_OSD_FLAG_IGNORE_CACHE |
3458 CEPH_OSD_FLAG_IGNORE_OVERLAY);
7c673cae
FG
3459 _op_submit(op, sul, NULL);
3460 m->put();
3461 return;
3462 }
3463
3464 if (rc == -EAGAIN) {
3465 ldout(cct, 7) << " got -EAGAIN, resubmitting" << dendl;
f67539c2 3466 if (op->has_completion())
c07f9fc5
FG
3467 num_in_flight--;
3468 _session_op_remove(s, op);
7c673cae 3469 sl.unlock();
c07f9fc5
FG
3470
3471 op->tid = 0;
3472 op->target.flags &= ~(CEPH_OSD_FLAG_BALANCE_READS |
3473 CEPH_OSD_FLAG_LOCALIZE_READS);
3474 op->target.pgid = pg_t();
3475 _op_submit(op, sul, NULL);
7c673cae
FG
3476 m->put();
3477 return;
3478 }
3479
3480 sul.unlock();
3481
3482 if (op->objver)
3483 *op->objver = m->get_user_version();
3484 if (op->reply_epoch)
3485 *op->reply_epoch = m->get_map_epoch();
3486 if (op->data_offset)
3487 *op->data_offset = m->get_header().data_off;
3488
3489 // got data?
3490 if (op->outbl) {
11fdf7f2 3491#if 0
7c673cae
FG
3492 if (op->con)
3493 op->con->revoke_rx_buffer(op->tid);
11fdf7f2
TL
3494#endif
3495 auto& bl = m->get_data();
3496 if (op->outbl->length() == bl.length() &&
3497 bl.get_num_buffers() <= 1) {
3498 // this is here to keep previous users to *relied* on getting data
3499 // read into existing buffers happy. Notably,
3500 // libradosstriper::RadosStriperImpl::aio_read().
3501 ldout(cct,10) << __func__ << " copying resulting " << bl.length()
9f95a23c 3502 << " into existing ceph::buffer of length " << op->outbl->length()
11fdf7f2 3503 << dendl;
f67539c2
TL
3504 cb::list t;
3505 t = std::move(*op->outbl);
11fdf7f2 3506 t.invalidate_crc(); // we're overwriting the raw buffers via c_str()
9f95a23c 3507 bl.begin().copy(bl.length(), t.c_str());
11fdf7f2
TL
3508 op->outbl->substr_of(t, 0, bl.length());
3509 } else {
3510 m->claim_data(*op->outbl);
3511 }
7c673cae
FG
3512 op->outbl = 0;
3513 }
3514
3515 // per-op result demuxing
3516 vector<OSDOp> out_ops;
3517 m->claim_ops(out_ops);
3518
3519 if (out_ops.size() != op->ops.size())
3520 ldout(cct, 0) << "WARNING: tid " << op->tid << " reply ops " << out_ops
3521 << " != request ops " << op->ops
3522 << " from " << m->get_source_inst() << dendl;
3523
f67539c2
TL
3524 ceph_assert(op->ops.size() == op->out_bl.size());
3525 ceph_assert(op->ops.size() == op->out_rval.size());
3526 ceph_assert(op->ops.size() == op->out_ec.size());
3527 ceph_assert(op->ops.size() == op->out_handler.size());
3528 auto pb = op->out_bl.begin();
3529 auto pr = op->out_rval.begin();
3530 auto pe = op->out_ec.begin();
3531 auto ph = op->out_handler.begin();
11fdf7f2
TL
3532 ceph_assert(op->out_bl.size() == op->out_rval.size());
3533 ceph_assert(op->out_bl.size() == op->out_handler.size());
f67539c2 3534 auto p = out_ops.begin();
7c673cae
FG
3535 for (unsigned i = 0;
3536 p != out_ops.end() && pb != op->out_bl.end();
f67539c2 3537 ++i, ++p, ++pb, ++pr, ++pe, ++ph) {
7c673cae
FG
3538 ldout(cct, 10) << " op " << i << " rval " << p->rval
3539 << " len " << p->outdata.length() << dendl;
3540 if (*pb)
3541 **pb = p->outdata;
3542 // set rval before running handlers so that handlers
3543 // can change it if e.g. decoding fails
3544 if (*pr)
31f18b77 3545 **pr = ceph_to_hostos_errno(p->rval);
f67539c2
TL
3546 if (*pe)
3547 **pe = p->rval < 0 ? bs::error_code(-p->rval, osd_category()) :
3548 bs::error_code();
7c673cae 3549 if (*ph) {
f67539c2
TL
3550 std::move((*ph))(p->rval < 0 ?
3551 bs::error_code(-p->rval, osd_category()) :
3552 bs::error_code(),
3553 p->rval, p->outdata);
7c673cae
FG
3554 }
3555 }
3556
3557 // NOTE: we assume that since we only request ONDISK ever we will
3558 // only ever get back one (type of) ack ever.
3559
f67539c2 3560 if (op->has_completion()) {
31f18b77 3561 num_in_flight--;
f67539c2
TL
3562 onfinish = std::move(op->onfinish);
3563 op->onfinish = nullptr;
7c673cae
FG
3564 }
3565 logger->inc(l_osdc_op_reply);
3566
3567 /* get it before we call _finish_op() */
3568 auto completion_lock = s->get_lock(op->target.base_oid);
3569
3570 ldout(cct, 15) << "handle_osd_op_reply completed tid " << tid << dendl;
3571 _finish_op(op, 0);
3572
31f18b77 3573 ldout(cct, 5) << num_in_flight << " in flight" << dendl;
7c673cae
FG
3574
3575 // serialize completions
3576 if (completion_lock.mutex()) {
3577 completion_lock.lock();
3578 }
3579 sl.unlock();
3580
3581 // do callbacks
f67539c2
TL
3582 if (Op::has_completion(onfinish)) {
3583 Op::complete(std::move(onfinish), osdcode(rc), rc);
7c673cae
FG
3584 }
3585 if (completion_lock.mutex()) {
3586 completion_lock.unlock();
3587 }
3588
3589 m->put();
7c673cae
FG
3590}
3591
3592void Objecter::handle_osd_backoff(MOSDBackoff *m)
3593{
3594 ldout(cct, 10) << __func__ << " " << *m << dendl;
3595 shunique_lock sul(rwlock, ceph::acquire_shared);
31f18b77 3596 if (!initialized) {
7c673cae
FG
3597 m->put();
3598 return;
3599 }
3600
3601 ConnectionRef con = m->get_connection();
11fdf7f2
TL
3602 auto priv = con->get_priv();
3603 auto s = static_cast<OSDSession*>(priv.get());
7c673cae
FG
3604 if (!s || s->con != con) {
3605 ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
7c673cae
FG
3606 m->put();
3607 return;
3608 }
3609
3610 get_session(s);
7c673cae 3611
f67539c2 3612 unique_lock sl(s->lock);
7c673cae
FG
3613
3614 switch (m->op) {
3615 case CEPH_OSD_BACKOFF_OP_BLOCK:
3616 {
3617 // register
3618 OSDBackoff& b = s->backoffs[m->pgid][m->begin];
3619 s->backoffs_by_id.insert(make_pair(m->id, &b));
3620 b.pgid = m->pgid;
3621 b.id = m->id;
3622 b.begin = m->begin;
3623 b.end = m->end;
3624
3625 // ack with original backoff's epoch so that the osd can discard this if
3626 // there was a pg split.
f67539c2
TL
3627 auto r = new MOSDBackoff(m->pgid, m->map_epoch,
3628 CEPH_OSD_BACKOFF_OP_ACK_BLOCK,
3629 m->id, m->begin, m->end);
7c673cae
FG
3630 // this priority must match the MOSDOps from _prepare_osd_op
3631 r->set_priority(cct->_conf->osd_client_op_priority);
3632 con->send_message(r);
3633 }
3634 break;
3635
3636 case CEPH_OSD_BACKOFF_OP_UNBLOCK:
3637 {
3638 auto p = s->backoffs_by_id.find(m->id);
3639 if (p != s->backoffs_by_id.end()) {
3640 OSDBackoff *b = p->second;
3641 if (b->begin != m->begin &&
3642 b->end != m->end) {
3643 lderr(cct) << __func__ << " got " << m->pgid << " id " << m->id
3644 << " unblock on ["
3645 << m->begin << "," << m->end << ") but backoff is ["
3646 << b->begin << "," << b->end << ")" << dendl;
3647 // hrmpf, unblock it anyway.
3648 }
3649 ldout(cct, 10) << __func__ << " unblock backoff " << b->pgid
3650 << " id " << b->id
3651 << " [" << b->begin << "," << b->end
3652 << ")" << dendl;
3653 auto spgp = s->backoffs.find(b->pgid);
11fdf7f2 3654 ceph_assert(spgp != s->backoffs.end());
7c673cae
FG
3655 spgp->second.erase(b->begin);
3656 if (spgp->second.empty()) {
3657 s->backoffs.erase(spgp);
3658 }
3659 s->backoffs_by_id.erase(p);
3660
3661 // check for any ops to resend
3662 for (auto& q : s->ops) {
3663 if (q.second->target.actual_pgid == m->pgid) {
3664 int r = q.second->target.contained_by(m->begin, m->end);
3665 ldout(cct, 20) << __func__ << " contained_by " << r << " on "
3666 << q.second->target.get_hobj() << dendl;
3667 if (r) {
3668 _send_op(q.second);
3669 }
3670 }
3671 }
3672 } else {
3673 lderr(cct) << __func__ << " " << m->pgid << " id " << m->id
3674 << " unblock on ["
3675 << m->begin << "," << m->end << ") but backoff dne" << dendl;
3676 }
3677 }
3678 break;
3679
3680 default:
3681 ldout(cct, 10) << __func__ << " unrecognized op " << (int)m->op << dendl;
3682 }
3683
3684 sul.unlock();
3685 sl.unlock();
3686
3687 m->put();
3688 put_session(s);
3689}
3690
3691uint32_t Objecter::list_nobjects_seek(NListContext *list_context,
3692 uint32_t pos)
3693{
3694 shared_lock rl(rwlock);
3695 list_context->pos = hobject_t(object_t(), string(), CEPH_NOSNAP,
3696 pos, list_context->pool_id, string());
11fdf7f2 3697 ldout(cct, 10) << __func__ << " " << list_context
7c673cae
FG
3698 << " pos " << pos << " -> " << list_context->pos << dendl;
3699 pg_t actual = osdmap->raw_pg_to_pg(pg_t(pos, list_context->pool_id));
3700 list_context->current_pg = actual.ps();
3701 list_context->at_end_of_pool = false;
3702 return pos;
3703}
3704
3705uint32_t Objecter::list_nobjects_seek(NListContext *list_context,
3706 const hobject_t& cursor)
3707{
3708 shared_lock rl(rwlock);
3709 ldout(cct, 10) << "list_nobjects_seek " << list_context << dendl;
3710 list_context->pos = cursor;
3711 list_context->at_end_of_pool = false;
3712 pg_t actual = osdmap->raw_pg_to_pg(pg_t(cursor.get_hash(), list_context->pool_id));
3713 list_context->current_pg = actual.ps();
3714 list_context->sort_bitwise = true;
3715 return list_context->current_pg;
3716}
3717
3718void Objecter::list_nobjects_get_cursor(NListContext *list_context,
3719 hobject_t *cursor)
3720{
3721 shared_lock rl(rwlock);
3722 if (list_context->list.empty()) {
3723 *cursor = list_context->pos;
3724 } else {
3725 const librados::ListObjectImpl& entry = list_context->list.front();
3726 const string *key = (entry.locator.empty() ? &entry.oid : &entry.locator);
3727 uint32_t h = osdmap->get_pg_pool(list_context->pool_id)->hash_key(*key, entry.nspace);
3728 *cursor = hobject_t(entry.oid, entry.locator, list_context->pool_snap_seq, h, list_context->pool_id, entry.nspace);
3729 }
3730}
3731
3732void Objecter::list_nobjects(NListContext *list_context, Context *onfinish)
3733{
3734 ldout(cct, 10) << __func__ << " pool_id " << list_context->pool_id
3735 << " pool_snap_seq " << list_context->pool_snap_seq
3736 << " max_entries " << list_context->max_entries
3737 << " list_context " << list_context
3738 << " onfinish " << onfinish
3739 << " current_pg " << list_context->current_pg
3740 << " pos " << list_context->pos << dendl;
3741
3742 shared_lock rl(rwlock);
3743 const pg_pool_t *pool = osdmap->get_pg_pool(list_context->pool_id);
3744 if (!pool) { // pool is gone
3745 rl.unlock();
3746 put_nlist_context_budget(list_context);
3747 onfinish->complete(-ENOENT);
3748 return;
3749 }
3750 int pg_num = pool->get_pg_num();
3751 bool sort_bitwise = osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE);
3752
3753 if (list_context->pos.is_min()) {
3754 list_context->starting_pg_num = 0;
3755 list_context->sort_bitwise = sort_bitwise;
3756 list_context->starting_pg_num = pg_num;
3757 }
3758 if (list_context->sort_bitwise != sort_bitwise) {
3759 list_context->pos = hobject_t(
3760 object_t(), string(), CEPH_NOSNAP,
3761 list_context->current_pg, list_context->pool_id, string());
3762 list_context->sort_bitwise = sort_bitwise;
3763 ldout(cct, 10) << " hobject sort order changed, restarting this pg at "
3764 << list_context->pos << dendl;
3765 }
3766 if (list_context->starting_pg_num != pg_num) {
3767 if (!sort_bitwise) {
3768 // start reading from the beginning; the pgs have changed
3769 ldout(cct, 10) << " pg_num changed; restarting with " << pg_num << dendl;
3770 list_context->pos = collection_list_handle_t();
3771 }
3772 list_context->starting_pg_num = pg_num;
3773 }
3774
3775 if (list_context->pos.is_max()) {
3776 ldout(cct, 20) << __func__ << " end of pool, list "
3777 << list_context->list << dendl;
3778 if (list_context->list.empty()) {
3779 list_context->at_end_of_pool = true;
3780 }
3781 // release the listing context's budget once all
3782 // OPs (in the session) are finished
3783 put_nlist_context_budget(list_context);
3784 onfinish->complete(0);
3785 return;
3786 }
3787
3788 ObjectOperation op;
3789 op.pg_nls(list_context->max_entries, list_context->filter,
3790 list_context->pos, osdmap->get_epoch());
3791 list_context->bl.clear();
f67539c2 3792 auto onack = new C_NList(list_context, onfinish, this);
7c673cae
FG
3793 object_locator_t oloc(list_context->pool_id, list_context->nspace);
3794
3795 // note current_pg in case we don't have (or lose) SORTBITWISE
3796 list_context->current_pg = pool->raw_hash_to_pg(list_context->pos.get_hash());
3797 rl.unlock();
3798
3799 pg_read(list_context->current_pg, oloc, op,
3800 &list_context->bl, 0, onack, &onack->epoch,
3801 &list_context->ctx_budget);
3802}
3803
3804void Objecter::_nlist_reply(NListContext *list_context, int r,
3805 Context *final_finish, epoch_t reply_epoch)
3806{
3807 ldout(cct, 10) << __func__ << " " << list_context << dendl;
3808
11fdf7f2 3809 auto iter = list_context->bl.cbegin();
7c673cae 3810 pg_nls_response_t response;
11fdf7f2 3811 decode(response, iter);
7c673cae 3812 if (!iter.end()) {
9f95a23c 3813 // we do this as legacy.
f67539c2 3814 cb::list legacy_extra_info;
9f95a23c 3815 decode(legacy_extra_info, iter);
7c673cae
FG
3816 }
3817
3818 // if the osd returns 1 (newer code), or handle MAX, it means we
3819 // hit the end of the pg.
3820 if ((response.handle.is_max() || r == 1) &&
3821 !list_context->sort_bitwise) {
3822 // legacy OSD and !sortbitwise, figure out the next PG on our own
3823 ++list_context->current_pg;
3824 if (list_context->current_pg == list_context->starting_pg_num) {
3825 // end of pool
3826 list_context->pos = hobject_t::get_max();
3827 } else {
3828 // next pg
3829 list_context->pos = hobject_t(object_t(), string(), CEPH_NOSNAP,
3830 list_context->current_pg,
3831 list_context->pool_id, string());
3832 }
3833 } else {
3834 list_context->pos = response.handle;
3835 }
3836
3837 int response_size = response.entries.size();
3838 ldout(cct, 20) << " response.entries.size " << response_size
3839 << ", response.entries " << response.entries
3840 << ", handle " << response.handle
3841 << ", tentative new pos " << list_context->pos << dendl;
7c673cae 3842 if (response_size) {
f67539c2
TL
3843 std::move(response.entries.begin(), response.entries.end(),
3844 std::back_inserter(list_context->list));
3845 response.entries.clear();
7c673cae
FG
3846 }
3847
3848 if (list_context->list.size() >= list_context->max_entries) {
3849 ldout(cct, 20) << " hit max, returning results so far, "
3850 << list_context->list << dendl;
3851 // release the listing context's budget once all
3852 // OPs (in the session) are finished
3853 put_nlist_context_budget(list_context);
3854 final_finish->complete(0);
3855 return;
3856 }
3857
3858 // continue!
3859 list_nobjects(list_context, final_finish);
3860}
3861
3862void Objecter::put_nlist_context_budget(NListContext *list_context)
3863{
3864 if (list_context->ctx_budget >= 0) {
3865 ldout(cct, 10) << " release listing context's budget " <<
3866 list_context->ctx_budget << dendl;
3867 put_op_budget_bytes(list_context->ctx_budget);
3868 list_context->ctx_budget = -1;
3869 }
3870}
3871
3872// snapshots
3873
f67539c2
TL
3874void Objecter::create_pool_snap(int64_t pool, std::string_view snap_name,
3875 decltype(PoolOp::onfinish)&& onfinish)
7c673cae
FG
3876{
3877 unique_lock wl(rwlock);
3878 ldout(cct, 10) << "create_pool_snap; pool: " << pool << "; snap: "
3879 << snap_name << dendl;
3880
3881 const pg_pool_t *p = osdmap->get_pg_pool(pool);
f67539c2
TL
3882 if (!p) {
3883 onfinish->defer(std::move(onfinish), osdc_errc::pool_dne, cb::list{});
3884 return;
3885 }
3886 if (p->snap_exists(snap_name)) {
3887 onfinish->defer(std::move(onfinish), osdc_errc::snapshot_exists,
3888 cb::list{});
3889 return;
3890 }
7c673cae 3891
f67539c2 3892 auto op = new PoolOp;
31f18b77 3893 op->tid = ++last_tid;
7c673cae
FG
3894 op->pool = pool;
3895 op->name = snap_name;
f67539c2 3896 op->onfinish = std::move(onfinish);
7c673cae
FG
3897 op->pool_op = POOL_OP_CREATE_SNAP;
3898 pool_ops[op->tid] = op;
3899
3900 pool_op_submit(op);
7c673cae
FG
3901}
3902
f67539c2
TL
3903struct CB_SelfmanagedSnap {
3904 std::unique_ptr<ca::Completion<void(bs::error_code, snapid_t)>> fin;
3905 CB_SelfmanagedSnap(decltype(fin)&& fin)
3906 : fin(std::move(fin)) {}
3907 void operator()(bs::error_code ec, const cb::list& bl) {
3908 snapid_t snapid = 0;
3909 if (!ec) {
11fdf7f2 3910 try {
f67539c2
TL
3911 auto p = bl.cbegin();
3912 decode(snapid, p);
3913 } catch (const cb::error& e) {
3914 ec = e.code();
11fdf7f2 3915 }
7c673cae 3916 }
f67539c2 3917 fin->defer(std::move(fin), ec, snapid);
7c673cae
FG
3918 }
3919};
3920
f67539c2
TL
3921void Objecter::allocate_selfmanaged_snap(
3922 int64_t pool,
3923 std::unique_ptr<ca::Completion<void(bs::error_code, snapid_t)>> onfinish)
7c673cae
FG
3924{
3925 unique_lock wl(rwlock);
3926 ldout(cct, 10) << "allocate_selfmanaged_snap; pool: " << pool << dendl;
f67539c2 3927 auto op = new PoolOp;
31f18b77 3928 op->tid = ++last_tid;
7c673cae 3929 op->pool = pool;
f67539c2
TL
3930 op->onfinish = PoolOp::OpComp::create(
3931 service.get_executor(),
3932 CB_SelfmanagedSnap(std::move(onfinish)));
7c673cae
FG
3933 op->pool_op = POOL_OP_CREATE_UNMANAGED_SNAP;
3934 pool_ops[op->tid] = op;
3935
3936 pool_op_submit(op);
7c673cae
FG
3937}
3938
f67539c2
TL
3939void Objecter::delete_pool_snap(
3940 int64_t pool, std::string_view snap_name,
3941 decltype(PoolOp::onfinish)&& onfinish)
7c673cae
FG
3942{
3943 unique_lock wl(rwlock);
3944 ldout(cct, 10) << "delete_pool_snap; pool: " << pool << "; snap: "
3945 << snap_name << dendl;
3946
3947 const pg_pool_t *p = osdmap->get_pg_pool(pool);
f67539c2
TL
3948 if (!p) {
3949 onfinish->defer(std::move(onfinish), osdc_errc::pool_dne, cb::list{});
3950 return;
3951 }
7c673cae 3952
f67539c2
TL
3953 if (!p->snap_exists(snap_name)) {
3954 onfinish->defer(std::move(onfinish), osdc_errc::snapshot_dne, cb::list{});
3955 return;
3956 }
3957
3958 auto op = new PoolOp;
31f18b77 3959 op->tid = ++last_tid;
7c673cae
FG
3960 op->pool = pool;
3961 op->name = snap_name;
f67539c2 3962 op->onfinish = std::move(onfinish);
7c673cae
FG
3963 op->pool_op = POOL_OP_DELETE_SNAP;
3964 pool_ops[op->tid] = op;
3965
3966 pool_op_submit(op);
7c673cae
FG
3967}
3968
f67539c2
TL
3969void Objecter::delete_selfmanaged_snap(int64_t pool, snapid_t snap,
3970 decltype(PoolOp::onfinish)&& onfinish)
7c673cae
FG
3971{
3972 unique_lock wl(rwlock);
3973 ldout(cct, 10) << "delete_selfmanaged_snap; pool: " << pool << "; snap: "
3974 << snap << dendl;
f67539c2 3975 auto op = new PoolOp;
31f18b77 3976 op->tid = ++last_tid;
7c673cae 3977 op->pool = pool;
f67539c2 3978 op->onfinish = std::move(onfinish);
7c673cae
FG
3979 op->pool_op = POOL_OP_DELETE_UNMANAGED_SNAP;
3980 op->snapid = snap;
3981 pool_ops[op->tid] = op;
3982
3983 pool_op_submit(op);
7c673cae
FG
3984}
3985
f67539c2
TL
3986void Objecter::create_pool(std::string_view name,
3987 decltype(PoolOp::onfinish)&& onfinish,
3988 int crush_rule)
7c673cae
FG
3989{
3990 unique_lock wl(rwlock);
3991 ldout(cct, 10) << "create_pool name=" << name << dendl;
3992
f67539c2
TL
3993 if (osdmap->lookup_pg_pool_name(name) >= 0) {
3994 onfinish->defer(std::move(onfinish), osdc_errc::pool_exists, cb::list{});
3995 return;
3996 }
7c673cae 3997
f67539c2 3998 auto op = new PoolOp;
31f18b77 3999 op->tid = ++last_tid;
7c673cae
FG
4000 op->pool = 0;
4001 op->name = name;
f67539c2 4002 op->onfinish = std::move(onfinish);
7c673cae
FG
4003 op->pool_op = POOL_OP_CREATE;
4004 pool_ops[op->tid] = op;
7c673cae
FG
4005 op->crush_rule = crush_rule;
4006
4007 pool_op_submit(op);
7c673cae
FG
4008}
4009
f67539c2
TL
4010void Objecter::delete_pool(int64_t pool,
4011 decltype(PoolOp::onfinish)&& onfinish)
7c673cae
FG
4012{
4013 unique_lock wl(rwlock);
4014 ldout(cct, 10) << "delete_pool " << pool << dendl;
4015
4016 if (!osdmap->have_pg_pool(pool))
f67539c2
TL
4017 onfinish->defer(std::move(onfinish), osdc_errc::pool_dne, cb::list{});
4018 else
4019 _do_delete_pool(pool, std::move(onfinish));
7c673cae
FG
4020}
4021
f67539c2
TL
4022void Objecter::delete_pool(std::string_view pool_name,
4023 decltype(PoolOp::onfinish)&& onfinish)
7c673cae
FG
4024{
4025 unique_lock wl(rwlock);
4026 ldout(cct, 10) << "delete_pool " << pool_name << dendl;
4027
4028 int64_t pool = osdmap->lookup_pg_pool_name(pool_name);
4029 if (pool < 0)
f67539c2
TL
4030 // This only returns one error: -ENOENT.
4031 onfinish->defer(std::move(onfinish), osdc_errc::pool_dne, cb::list{});
4032 else
4033 _do_delete_pool(pool, std::move(onfinish));
7c673cae
FG
4034}
4035
f67539c2
TL
4036void Objecter::_do_delete_pool(int64_t pool,
4037 decltype(PoolOp::onfinish)&& onfinish)
4038
7c673cae 4039{
f67539c2 4040 auto op = new PoolOp;
31f18b77 4041 op->tid = ++last_tid;
7c673cae
FG
4042 op->pool = pool;
4043 op->name = "delete";
f67539c2 4044 op->onfinish = std::move(onfinish);
7c673cae
FG
4045 op->pool_op = POOL_OP_DELETE;
4046 pool_ops[op->tid] = op;
4047 pool_op_submit(op);
4048}
4049
7c673cae
FG
4050void Objecter::pool_op_submit(PoolOp *op)
4051{
4052 // rwlock is locked
4053 if (mon_timeout > timespan(0)) {
4054 op->ontimeout = timer.add_event(mon_timeout,
4055 [this, op]() {
4056 pool_op_cancel(op->tid, -ETIMEDOUT); });
4057 }
4058 _pool_op_submit(op);
4059}
4060
4061void Objecter::_pool_op_submit(PoolOp *op)
4062{
4063 // rwlock is locked unique
4064
4065 ldout(cct, 10) << "pool_op_submit " << op->tid << dendl;
f67539c2
TL
4066 auto m = new MPoolOp(monc->get_fsid(), op->tid, op->pool,
4067 op->name, op->pool_op,
4068 last_seen_osdmap_version);
7c673cae
FG
4069 if (op->snapid) m->snapid = op->snapid;
4070 if (op->crush_rule) m->crush_rule = op->crush_rule;
4071 monc->send_mon_message(m);
11fdf7f2 4072 op->last_submit = ceph::coarse_mono_clock::now();
7c673cae
FG
4073
4074 logger->inc(l_osdc_poolop_send);
4075}
4076
4077/**
4078 * Handle a reply to a PoolOp message. Check that we sent the message
f67539c2 4079 * and give the caller responsibility for the returned cb::list.
7c673cae
FG
4080 * Then either call the finisher or stash the PoolOp, depending on if we
4081 * have a new enough map.
4082 * Lastly, clean up the message and PoolOp.
4083 */
4084void Objecter::handle_pool_op_reply(MPoolOpReply *m)
4085{
f67539c2
TL
4086 int rc = m->replyCode;
4087 auto ec = rc < 0 ? bs::error_code(-rc, mon_category()) : bs::error_code();
11fdf7f2 4088 FUNCTRACE(cct);
7c673cae 4089 shunique_lock sul(rwlock, acquire_shared);
31f18b77 4090 if (!initialized) {
7c673cae
FG
4091 sul.unlock();
4092 m->put();
4093 return;
4094 }
4095
4096 ldout(cct, 10) << "handle_pool_op_reply " << *m << dendl;
4097 ceph_tid_t tid = m->get_tid();
f67539c2 4098 auto iter = pool_ops.find(tid);
7c673cae
FG
4099 if (iter != pool_ops.end()) {
4100 PoolOp *op = iter->second;
4101 ldout(cct, 10) << "have request " << tid << " at " << op << " Op: "
4102 << ceph_pool_op_name(op->pool_op) << dendl;
f67539c2 4103 cb::list bl{std::move(m->response_data)};
7c673cae
FG
4104 if (m->version > last_seen_osdmap_version)
4105 last_seen_osdmap_version = m->version;
4106 if (osdmap->get_epoch() < m->epoch) {
4107 sul.unlock();
4108 sul.lock();
4109 // recheck op existence since we have let go of rwlock
4110 // (for promotion) above.
4111 iter = pool_ops.find(tid);
4112 if (iter == pool_ops.end())
4113 goto done; // op is gone.
4114 if (osdmap->get_epoch() < m->epoch) {
4115 ldout(cct, 20) << "waiting for client to reach epoch " << m->epoch
4116 << " before calling back" << dendl;
f67539c2
TL
4117 _wait_for_new_map(OpCompletion::create(
4118 service.get_executor(),
4119 [o = std::move(op->onfinish),
4120 bl = std::move(bl)](
4121 bs::error_code ec) mutable {
4122 o->defer(std::move(o), ec, bl);
4123 }),
4124 m->epoch,
4125 ec);
7c673cae
FG
4126 } else {
4127 // map epoch changed, probably because a MOSDMap message
4128 // sneaked in. Do caller-specified callback now or else
4129 // we lose it forever.
11fdf7f2 4130 ceph_assert(op->onfinish);
f67539c2 4131 op->onfinish->defer(std::move(op->onfinish), ec, std::move(bl));
7c673cae
FG
4132 }
4133 } else {
11fdf7f2 4134 ceph_assert(op->onfinish);
f67539c2 4135 op->onfinish->defer(std::move(op->onfinish), ec, std::move(bl));
7c673cae 4136 }
f67539c2 4137 op->onfinish = nullptr;
7c673cae
FG
4138 if (!sul.owns_lock()) {
4139 sul.unlock();
4140 sul.lock();
4141 }
4142 iter = pool_ops.find(tid);
4143 if (iter != pool_ops.end()) {
4144 _finish_pool_op(op, 0);
4145 }
4146 } else {
4147 ldout(cct, 10) << "unknown request " << tid << dendl;
4148 }
4149
4150done:
4151 // Not strictly necessary, since we'll release it on return.
4152 sul.unlock();
4153
4154 ldout(cct, 10) << "done" << dendl;
4155 m->put();
4156}
4157
4158int Objecter::pool_op_cancel(ceph_tid_t tid, int r)
4159{
11fdf7f2 4160 ceph_assert(initialized);
7c673cae
FG
4161
4162 unique_lock wl(rwlock);
4163
f67539c2 4164 auto it = pool_ops.find(tid);
7c673cae
FG
4165 if (it == pool_ops.end()) {
4166 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4167 return -ENOENT;
4168 }
4169
4170 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4171
4172 PoolOp *op = it->second;
4173 if (op->onfinish)
f67539c2 4174 op->onfinish->defer(std::move(op->onfinish), osdcode(r), cb::list{});
7c673cae
FG
4175
4176 _finish_pool_op(op, r);
4177 return 0;
4178}
4179
4180void Objecter::_finish_pool_op(PoolOp *op, int r)
4181{
4182 // rwlock is locked unique
4183 pool_ops.erase(op->tid);
4184 logger->set(l_osdc_poolop_active, pool_ops.size());
4185
4186 if (op->ontimeout && r != -ETIMEDOUT) {
4187 timer.cancel_event(op->ontimeout);
4188 }
4189
4190 delete op;
4191}
4192
4193// pool stats
4194
f67539c2
TL
4195void Objecter::get_pool_stats(
4196 const std::vector<std::string>& pools,
4197 decltype(PoolStatOp::onfinish)&& onfinish)
7c673cae
FG
4198{
4199 ldout(cct, 10) << "get_pool_stats " << pools << dendl;
4200
f67539c2 4201 auto op = new PoolStatOp;
31f18b77 4202 op->tid = ++last_tid;
7c673cae 4203 op->pools = pools;
f67539c2 4204 op->onfinish = std::move(onfinish);
7c673cae
FG
4205 if (mon_timeout > timespan(0)) {
4206 op->ontimeout = timer.add_event(mon_timeout,
4207 [this, op]() {
4208 pool_stat_op_cancel(op->tid,
4209 -ETIMEDOUT); });
4210 } else {
4211 op->ontimeout = 0;
4212 }
4213
4214 unique_lock wl(rwlock);
4215
4216 poolstat_ops[op->tid] = op;
4217
4218 logger->set(l_osdc_poolstat_active, poolstat_ops.size());
4219
4220 _poolstat_submit(op);
4221}
4222
4223void Objecter::_poolstat_submit(PoolStatOp *op)
4224{
4225 ldout(cct, 10) << "_poolstat_submit " << op->tid << dendl;
4226 monc->send_mon_message(new MGetPoolStats(monc->get_fsid(), op->tid,
4227 op->pools,
4228 last_seen_pgmap_version));
11fdf7f2 4229 op->last_submit = ceph::coarse_mono_clock::now();
7c673cae
FG
4230
4231 logger->inc(l_osdc_poolstat_send);
4232}
4233
4234void Objecter::handle_get_pool_stats_reply(MGetPoolStatsReply *m)
4235{
4236 ldout(cct, 10) << "handle_get_pool_stats_reply " << *m << dendl;
4237 ceph_tid_t tid = m->get_tid();
4238
4239 unique_lock wl(rwlock);
31f18b77 4240 if (!initialized) {
7c673cae
FG
4241 m->put();
4242 return;
4243 }
4244
f67539c2 4245 auto iter = poolstat_ops.find(tid);
7c673cae
FG
4246 if (iter != poolstat_ops.end()) {
4247 PoolStatOp *op = poolstat_ops[tid];
4248 ldout(cct, 10) << "have request " << tid << " at " << op << dendl;
7c673cae
FG
4249 if (m->version > last_seen_pgmap_version) {
4250 last_seen_pgmap_version = m->version;
4251 }
f67539c2
TL
4252 op->onfinish->defer(std::move(op->onfinish), bs::error_code{},
4253 std::move(m->pool_stats), m->per_pool);
7c673cae
FG
4254 _finish_pool_stat_op(op, 0);
4255 } else {
4256 ldout(cct, 10) << "unknown request " << tid << dendl;
4257 }
4258 ldout(cct, 10) << "done" << dendl;
4259 m->put();
4260}
4261
4262int Objecter::pool_stat_op_cancel(ceph_tid_t tid, int r)
4263{
11fdf7f2 4264 ceph_assert(initialized);
7c673cae
FG
4265
4266 unique_lock wl(rwlock);
4267
f67539c2 4268 auto it = poolstat_ops.find(tid);
7c673cae
FG
4269 if (it == poolstat_ops.end()) {
4270 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4271 return -ENOENT;
4272 }
4273
4274 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4275
f67539c2 4276 auto op = it->second;
7c673cae 4277 if (op->onfinish)
f67539c2
TL
4278 op->onfinish->defer(std::move(op->onfinish), osdcode(r),
4279 bc::flat_map<std::string, pool_stat_t>{}, false);
7c673cae
FG
4280 _finish_pool_stat_op(op, r);
4281 return 0;
4282}
4283
4284void Objecter::_finish_pool_stat_op(PoolStatOp *op, int r)
4285{
4286 // rwlock is locked unique
4287
4288 poolstat_ops.erase(op->tid);
4289 logger->set(l_osdc_poolstat_active, poolstat_ops.size());
4290
4291 if (op->ontimeout && r != -ETIMEDOUT)
4292 timer.cancel_event(op->ontimeout);
4293
4294 delete op;
4295}
4296
20effc67 4297void Objecter::get_fs_stats(std::optional<int64_t> poolid,
f67539c2 4298 decltype(StatfsOp::onfinish)&& onfinish)
7c673cae
FG
4299{
4300 ldout(cct, 10) << "get_fs_stats" << dendl;
4301 unique_lock l(rwlock);
4302
f67539c2 4303 auto op = new StatfsOp;
31f18b77 4304 op->tid = ++last_tid;
f67539c2
TL
4305 op->data_pool = poolid;
4306 op->onfinish = std::move(onfinish);
7c673cae
FG
4307 if (mon_timeout > timespan(0)) {
4308 op->ontimeout = timer.add_event(mon_timeout,
4309 [this, op]() {
4310 statfs_op_cancel(op->tid,
4311 -ETIMEDOUT); });
4312 } else {
4313 op->ontimeout = 0;
4314 }
4315 statfs_ops[op->tid] = op;
4316
4317 logger->set(l_osdc_statfs_active, statfs_ops.size());
4318
4319 _fs_stats_submit(op);
4320}
4321
4322void Objecter::_fs_stats_submit(StatfsOp *op)
4323{
4324 // rwlock is locked unique
4325
4326 ldout(cct, 10) << "fs_stats_submit" << op->tid << dendl;
4327 monc->send_mon_message(new MStatfs(monc->get_fsid(), op->tid,
d2e6a577 4328 op->data_pool,
7c673cae 4329 last_seen_pgmap_version));
11fdf7f2 4330 op->last_submit = ceph::coarse_mono_clock::now();
7c673cae
FG
4331
4332 logger->inc(l_osdc_statfs_send);
4333}
4334
4335void Objecter::handle_fs_stats_reply(MStatfsReply *m)
4336{
4337 unique_lock wl(rwlock);
31f18b77 4338 if (!initialized) {
7c673cae
FG
4339 m->put();
4340 return;
4341 }
4342
4343 ldout(cct, 10) << "handle_fs_stats_reply " << *m << dendl;
4344 ceph_tid_t tid = m->get_tid();
4345
4346 if (statfs_ops.count(tid)) {
4347 StatfsOp *op = statfs_ops[tid];
4348 ldout(cct, 10) << "have request " << tid << " at " << op << dendl;
7c673cae
FG
4349 if (m->h.version > last_seen_pgmap_version)
4350 last_seen_pgmap_version = m->h.version;
f67539c2 4351 op->onfinish->defer(std::move(op->onfinish), bs::error_code{}, m->h.st);
7c673cae
FG
4352 _finish_statfs_op(op, 0);
4353 } else {
4354 ldout(cct, 10) << "unknown request " << tid << dendl;
4355 }
4356 m->put();
4357 ldout(cct, 10) << "done" << dendl;
4358}
4359
4360int Objecter::statfs_op_cancel(ceph_tid_t tid, int r)
4361{
11fdf7f2 4362 ceph_assert(initialized);
7c673cae
FG
4363
4364 unique_lock wl(rwlock);
4365
f67539c2 4366 auto it = statfs_ops.find(tid);
7c673cae
FG
4367 if (it == statfs_ops.end()) {
4368 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4369 return -ENOENT;
4370 }
4371
4372 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4373
f67539c2 4374 auto op = it->second;
7c673cae 4375 if (op->onfinish)
f67539c2 4376 op->onfinish->defer(std::move(op->onfinish), osdcode(r), ceph_statfs{});
7c673cae
FG
4377 _finish_statfs_op(op, r);
4378 return 0;
4379}
4380
4381void Objecter::_finish_statfs_op(StatfsOp *op, int r)
4382{
4383 // rwlock is locked unique
4384
4385 statfs_ops.erase(op->tid);
4386 logger->set(l_osdc_statfs_active, statfs_ops.size());
4387
4388 if (op->ontimeout && r != -ETIMEDOUT)
4389 timer.cancel_event(op->ontimeout);
4390
4391 delete op;
4392}
4393
4394// scatter/gather
4395
4396void Objecter::_sg_read_finish(vector<ObjectExtent>& extents,
f67539c2
TL
4397 vector<cb::list>& resultbl,
4398 cb::list *bl, Context *onfinish)
7c673cae
FG
4399{
4400 // all done
4401 ldout(cct, 15) << "_sg_read_finish" << dendl;
4402
4403 if (extents.size() > 1) {
4404 Striper::StripedReadResult r;
f67539c2
TL
4405 auto bit = resultbl.begin();
4406 for (auto eit = extents.begin();
7c673cae
FG
4407 eit != extents.end();
4408 ++eit, ++bit) {
4409 r.add_partial_result(cct, *bit, eit->buffer_extents);
4410 }
4411 bl->clear();
4412 r.assemble_result(cct, *bl, false);
4413 } else {
4414 ldout(cct, 15) << " only one frag" << dendl;
f67539c2 4415 *bl = std::move(resultbl[0]);
7c673cae
FG
4416 }
4417
4418 // done
4419 uint64_t bytes_read = bl->length();
4420 ldout(cct, 7) << "_sg_read_finish " << bytes_read << " bytes" << dendl;
4421
4422 if (onfinish) {
4423 onfinish->complete(bytes_read);// > 0 ? bytes_read:m->get_result());
4424 }
4425}
4426
4427
4428void Objecter::ms_handle_connect(Connection *con)
4429{
4430 ldout(cct, 10) << "ms_handle_connect " << con << dendl;
31f18b77 4431 if (!initialized)
7c673cae
FG
4432 return;
4433
4434 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON)
4435 resend_mon_ops();
4436}
4437
4438bool Objecter::ms_handle_reset(Connection *con)
4439{
31f18b77 4440 if (!initialized)
7c673cae
FG
4441 return false;
4442 if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
91327a77 4443 unique_lock wl(rwlock);
11fdf7f2
TL
4444
4445 auto priv = con->get_priv();
4446 auto session = static_cast<OSDSession*>(priv.get());
7c673cae
FG
4447 if (session) {
4448 ldout(cct, 1) << "ms_handle_reset " << con << " session " << session
4449 << " osd." << session->osd << dendl;
a8e16298
TL
4450 // the session maybe had been closed if new osdmap just handled
4451 // says the osd down
4452 if (!(initialized && osdmap->is_up(session->osd))) {
4453 ldout(cct, 1) << "ms_handle_reset aborted,initialized=" << initialized << dendl;
7c673cae
FG
4454 wl.unlock();
4455 return false;
4456 }
4457 map<uint64_t, LingerOp *> lresend;
f67539c2 4458 unique_lock sl(session->lock);
7c673cae
FG
4459 _reopen_session(session);
4460 _kick_requests(session, lresend);
4461 sl.unlock();
4462 _linger_ops_resend(lresend, wl);
4463 wl.unlock();
4464 maybe_request_map();
7c673cae
FG
4465 }
4466 return true;
4467 }
4468 return false;
4469}
4470
4471void Objecter::ms_handle_remote_reset(Connection *con)
4472{
4473 /*
4474 * treat these the same.
4475 */
4476 ms_handle_reset(con);
4477}
4478
4479bool Objecter::ms_handle_refused(Connection *con)
4480{
4481 // just log for now
4482 if (osdmap && (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD)) {
4483 int osd = osdmap->identify_osd(con->get_peer_addr());
4484 if (osd >= 0) {
4485 ldout(cct, 1) << "ms_handle_refused on osd." << osd << dendl;
4486 }
4487 }
4488 return false;
4489}
4490
7c673cae
FG
4491void Objecter::op_target_t::dump(Formatter *f) const
4492{
4493 f->dump_stream("pg") << pgid;
4494 f->dump_int("osd", osd);
4495 f->dump_stream("object_id") << base_oid;
4496 f->dump_stream("object_locator") << base_oloc;
4497 f->dump_stream("target_object_id") << target_oid;
4498 f->dump_stream("target_object_locator") << target_oloc;
4499 f->dump_int("paused", (int)paused);
4500 f->dump_int("used_replica", (int)used_replica);
4501 f->dump_int("precalc_pgid", (int)precalc_pgid);
4502}
4503
4504void Objecter::_dump_active(OSDSession *s)
4505{
f67539c2 4506 for (auto p = s->ops.begin(); p != s->ops.end(); ++p) {
7c673cae
FG
4507 Op *op = p->second;
4508 ldout(cct, 20) << op->tid << "\t" << op->target.pgid
4509 << "\tosd." << (op->session ? op->session->osd : -1)
4510 << "\t" << op->target.base_oid
4511 << "\t" << op->ops << dendl;
4512 }
4513}
4514
4515void Objecter::_dump_active()
4516{
31f18b77 4517 ldout(cct, 20) << "dump_active .. " << num_homeless_ops << " homeless"
7c673cae 4518 << dendl;
f67539c2 4519 for (auto siter = osd_sessions.begin();
7c673cae 4520 siter != osd_sessions.end(); ++siter) {
f67539c2
TL
4521 auto s = siter->second;
4522 shared_lock sl(s->lock);
7c673cae
FG
4523 _dump_active(s);
4524 sl.unlock();
4525 }
4526 _dump_active(homeless_session);
4527}
4528
4529void Objecter::dump_active()
4530{
4531 shared_lock rl(rwlock);
4532 _dump_active();
4533 rl.unlock();
4534}
4535
4536void Objecter::dump_requests(Formatter *fmt)
4537{
4538 // Read-lock on Objecter held here
4539 fmt->open_object_section("requests");
4540 dump_ops(fmt);
4541 dump_linger_ops(fmt);
4542 dump_pool_ops(fmt);
4543 dump_pool_stat_ops(fmt);
4544 dump_statfs_ops(fmt);
4545 dump_command_ops(fmt);
4546 fmt->close_section(); // requests object
4547}
4548
4549void Objecter::_dump_ops(const OSDSession *s, Formatter *fmt)
4550{
f67539c2 4551 for (auto p = s->ops.begin(); p != s->ops.end(); ++p) {
7c673cae 4552 Op *op = p->second;
f67539c2 4553 auto age = std::chrono::duration<double>(ceph::coarse_mono_clock::now() - op->stamp);
7c673cae
FG
4554 fmt->open_object_section("op");
4555 fmt->dump_unsigned("tid", op->tid);
4556 op->target.dump(fmt);
4557 fmt->dump_stream("last_sent") << op->stamp;
92f5a8d4 4558 fmt->dump_float("age", age.count());
7c673cae
FG
4559 fmt->dump_int("attempts", op->attempts);
4560 fmt->dump_stream("snapid") << op->snapid;
4561 fmt->dump_stream("snap_context") << op->snapc;
4562 fmt->dump_stream("mtime") << op->mtime;
4563
4564 fmt->open_array_section("osd_ops");
f67539c2 4565 for (auto it = op->ops.begin(); it != op->ops.end(); ++it) {
7c673cae
FG
4566 fmt->dump_stream("osd_op") << *it;
4567 }
4568 fmt->close_section(); // osd_ops array
4569
4570 fmt->close_section(); // op object
4571 }
4572}
4573
4574void Objecter::dump_ops(Formatter *fmt)
4575{
4576 // Read-lock on Objecter held
4577 fmt->open_array_section("ops");
f67539c2 4578 for (auto siter = osd_sessions.begin();
7c673cae
FG
4579 siter != osd_sessions.end(); ++siter) {
4580 OSDSession *s = siter->second;
f67539c2 4581 shared_lock sl(s->lock);
7c673cae
FG
4582 _dump_ops(s, fmt);
4583 sl.unlock();
4584 }
4585 _dump_ops(homeless_session, fmt);
4586 fmt->close_section(); // ops array
4587}
4588
4589void Objecter::_dump_linger_ops(const OSDSession *s, Formatter *fmt)
4590{
f67539c2
TL
4591 for (auto p = s->linger_ops.begin(); p != s->linger_ops.end(); ++p) {
4592 auto op = p->second;
7c673cae
FG
4593 fmt->open_object_section("linger_op");
4594 fmt->dump_unsigned("linger_id", op->linger_id);
4595 op->target.dump(fmt);
4596 fmt->dump_stream("snapid") << op->snap;
4597 fmt->dump_stream("registered") << op->registered;
4598 fmt->close_section(); // linger_op object
4599 }
4600}
4601
4602void Objecter::dump_linger_ops(Formatter *fmt)
4603{
4604 // We have a read-lock on the objecter
4605 fmt->open_array_section("linger_ops");
f67539c2 4606 for (auto siter = osd_sessions.begin();
7c673cae 4607 siter != osd_sessions.end(); ++siter) {
f67539c2
TL
4608 auto s = siter->second;
4609 shared_lock sl(s->lock);
7c673cae
FG
4610 _dump_linger_ops(s, fmt);
4611 sl.unlock();
4612 }
4613 _dump_linger_ops(homeless_session, fmt);
4614 fmt->close_section(); // linger_ops array
4615}
4616
4617void Objecter::_dump_command_ops(const OSDSession *s, Formatter *fmt)
4618{
f67539c2
TL
4619 for (auto p = s->command_ops.begin(); p != s->command_ops.end(); ++p) {
4620 auto op = p->second;
7c673cae
FG
4621 fmt->open_object_section("command_op");
4622 fmt->dump_unsigned("command_id", op->tid);
4623 fmt->dump_int("osd", op->session ? op->session->osd : -1);
4624 fmt->open_array_section("command");
f67539c2 4625 for (auto q = op->cmd.begin(); q != op->cmd.end(); ++q)
7c673cae
FG
4626 fmt->dump_string("word", *q);
4627 fmt->close_section();
4628 if (op->target_osd >= 0)
4629 fmt->dump_int("target_osd", op->target_osd);
4630 else
4631 fmt->dump_stream("target_pg") << op->target_pg;
4632 fmt->close_section(); // command_op object
4633 }
4634}
4635
4636void Objecter::dump_command_ops(Formatter *fmt)
4637{
4638 // We have a read-lock on the Objecter here
4639 fmt->open_array_section("command_ops");
f67539c2 4640 for (auto siter = osd_sessions.begin();
7c673cae 4641 siter != osd_sessions.end(); ++siter) {
f67539c2
TL
4642 auto s = siter->second;
4643 shared_lock sl(s->lock);
7c673cae
FG
4644 _dump_command_ops(s, fmt);
4645 sl.unlock();
4646 }
4647 _dump_command_ops(homeless_session, fmt);
4648 fmt->close_section(); // command_ops array
4649}
4650
4651void Objecter::dump_pool_ops(Formatter *fmt) const
4652{
4653 fmt->open_array_section("pool_ops");
f67539c2
TL
4654 for (auto p = pool_ops.begin(); p != pool_ops.end(); ++p) {
4655 auto op = p->second;
7c673cae
FG
4656 fmt->open_object_section("pool_op");
4657 fmt->dump_unsigned("tid", op->tid);
4658 fmt->dump_int("pool", op->pool);
4659 fmt->dump_string("name", op->name);
4660 fmt->dump_int("operation_type", op->pool_op);
7c673cae
FG
4661 fmt->dump_unsigned("crush_rule", op->crush_rule);
4662 fmt->dump_stream("snapid") << op->snapid;
4663 fmt->dump_stream("last_sent") << op->last_submit;
4664 fmt->close_section(); // pool_op object
4665 }
4666 fmt->close_section(); // pool_ops array
4667}
4668
4669void Objecter::dump_pool_stat_ops(Formatter *fmt) const
4670{
4671 fmt->open_array_section("pool_stat_ops");
f67539c2 4672 for (auto p = poolstat_ops.begin();
7c673cae
FG
4673 p != poolstat_ops.end();
4674 ++p) {
4675 PoolStatOp *op = p->second;
4676 fmt->open_object_section("pool_stat_op");
4677 fmt->dump_unsigned("tid", op->tid);
4678 fmt->dump_stream("last_sent") << op->last_submit;
4679
4680 fmt->open_array_section("pools");
f67539c2
TL
4681 for (const auto& it : op->pools) {
4682 fmt->dump_string("pool", it);
7c673cae
FG
4683 }
4684 fmt->close_section(); // pools array
4685
4686 fmt->close_section(); // pool_stat_op object
4687 }
4688 fmt->close_section(); // pool_stat_ops array
4689}
4690
4691void Objecter::dump_statfs_ops(Formatter *fmt) const
4692{
4693 fmt->open_array_section("statfs_ops");
f67539c2
TL
4694 for (auto p = statfs_ops.begin(); p != statfs_ops.end(); ++p) {
4695 auto op = p->second;
7c673cae
FG
4696 fmt->open_object_section("statfs_op");
4697 fmt->dump_unsigned("tid", op->tid);
4698 fmt->dump_stream("last_sent") << op->last_submit;
4699 fmt->close_section(); // statfs_op object
4700 }
4701 fmt->close_section(); // statfs_ops array
4702}
4703
4704Objecter::RequestStateHook::RequestStateHook(Objecter *objecter) :
4705 m_objecter(objecter)
4706{
4707}
4708
9f95a23c
TL
4709int Objecter::RequestStateHook::call(std::string_view command,
4710 const cmdmap_t& cmdmap,
39ae355f 4711 const bufferlist&,
9f95a23c
TL
4712 Formatter *f,
4713 std::ostream& ss,
f67539c2 4714 cb::list& out)
7c673cae 4715{
7c673cae
FG
4716 shared_lock rl(m_objecter->rwlock);
4717 m_objecter->dump_requests(f);
9f95a23c 4718 return 0;
7c673cae
FG
4719}
4720
f67539c2 4721void Objecter::blocklist_self(bool set)
7c673cae 4722{
f67539c2 4723 ldout(cct, 10) << "blocklist_self " << (set ? "add" : "rm") << dendl;
7c673cae
FG
4724
4725 vector<string> cmd;
f67539c2 4726 cmd.push_back("{\"prefix\":\"osd blocklist\", ");
7c673cae 4727 if (set)
f67539c2 4728 cmd.push_back("\"blocklistop\":\"add\",");
7c673cae 4729 else
f67539c2 4730 cmd.push_back("\"blocklistop\":\"rm\",");
7c673cae 4731 stringstream ss;
f67539c2 4732 // this is somewhat imprecise in that we are blocklisting our first addr only
11fdf7f2 4733 ss << messenger->get_myaddrs().front().get_legacy_str();
7c673cae
FG
4734 cmd.push_back("\"addr\":\"" + ss.str() + "\"");
4735
f67539c2 4736 auto m = new MMonCommand(monc->get_fsid());
7c673cae
FG
4737 m->cmd = cmd;
4738
f67539c2
TL
4739 // NOTE: no fallback to legacy blacklist command implemented here
4740 // since this is only used for test code.
4741
7c673cae
FG
4742 monc->send_mon_message(m);
4743}
4744
4745// commands
4746
4747void Objecter::handle_command_reply(MCommandReply *m)
4748{
4749 unique_lock wl(rwlock);
31f18b77 4750 if (!initialized) {
7c673cae
FG
4751 m->put();
4752 return;
4753 }
4754
4755 ConnectionRef con = m->get_connection();
11fdf7f2
TL
4756 auto priv = con->get_priv();
4757 auto s = static_cast<OSDSession*>(priv.get());
7c673cae
FG
4758 if (!s || s->con != con) {
4759 ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
4760 m->put();
7c673cae
FG
4761 return;
4762 }
4763
f67539c2
TL
4764 shared_lock sl(s->lock);
4765 auto p = s->command_ops.find(m->get_tid());
7c673cae
FG
4766 if (p == s->command_ops.end()) {
4767 ldout(cct, 10) << "handle_command_reply tid " << m->get_tid()
4768 << " not found" << dendl;
4769 m->put();
4770 sl.unlock();
7c673cae
FG
4771 return;
4772 }
4773
4774 CommandOp *c = p->second;
4775 if (!c->session ||
4776 m->get_connection() != c->session->con) {
4777 ldout(cct, 10) << "handle_command_reply tid " << m->get_tid()
4778 << " got reply from wrong connection "
4779 << m->get_connection() << " " << m->get_source_inst()
4780 << dendl;
4781 m->put();
4782 sl.unlock();
7c673cae
FG
4783 return;
4784 }
f67539c2 4785
9f95a23c
TL
4786 if (m->r == -EAGAIN) {
4787 ldout(cct,10) << __func__ << " tid " << m->get_tid()
4788 << " got EAGAIN, requesting map and resending" << dendl;
4789 // NOTE: This might resend twice... once now, and once again when
4790 // we get an updated osdmap and the PG is found to have moved.
4791 _maybe_request_map();
4792 _send_command(c);
4793 m->put();
4794 sl.unlock();
4795 return;
4796 }
4797
7c673cae
FG
4798 sl.unlock();
4799
f67539c2
TL
4800 unique_lock sul(s->lock);
4801 _finish_command(c, m->r < 0 ? bs::error_code(-m->r, osd_category()) :
4802 bs::error_code(), std::move(m->rs),
4803 std::move(m->get_data()));
28e407b8
AA
4804 sul.unlock();
4805
7c673cae 4806 m->put();
7c673cae
FG
4807}
4808
522d829b
TL
4809Objecter::LingerOp::LingerOp(Objecter *o, uint64_t linger_id)
4810 : objecter(o),
4811 linger_id(linger_id),
4812 watch_lock(ceph::make_shared_mutex(
4813 fmt::format("LingerOp::watch_lock #{}", linger_id)))
4814{}
4815
7c673cae
FG
4816void Objecter::submit_command(CommandOp *c, ceph_tid_t *ptid)
4817{
4818 shunique_lock sul(rwlock, ceph::acquire_unique);
4819
31f18b77 4820 ceph_tid_t tid = ++last_tid;
7c673cae
FG
4821 ldout(cct, 10) << "_submit_command " << tid << " " << c->cmd << dendl;
4822 c->tid = tid;
4823
4824 {
f67539c2 4825 unique_lock hs_wl(homeless_session->lock);
7c673cae
FG
4826 _session_command_op_assign(homeless_session, c);
4827 }
4828
4829 _calc_command_target(c, sul);
4830 _assign_command_session(c, sul);
4831 if (osd_timeout > timespan(0)) {
4832 c->ontimeout = timer.add_event(osd_timeout,
4833 [this, c, tid]() {
f67539c2
TL
4834 command_op_cancel(
4835 c->session, tid,
4836 osdc_errc::timed_out); });
7c673cae
FG
4837 }
4838
4839 if (!c->session->is_homeless()) {
4840 _send_command(c);
4841 } else {
4842 _maybe_request_map();
4843 }
4844 if (c->map_check_error)
4845 _send_command_map_check(c);
f67539c2
TL
4846 if (ptid)
4847 *ptid = tid;
7c673cae
FG
4848
4849 logger->inc(l_osdc_command_active);
4850}
4851
f67539c2
TL
4852int Objecter::_calc_command_target(CommandOp *c,
4853 shunique_lock<ceph::shared_mutex>& sul)
7c673cae 4854{
11fdf7f2 4855 ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
7c673cae
FG
4856
4857 c->map_check_error = 0;
4858
4859 // ignore overlays, just like we do with pg ops
4860 c->target.flags |= CEPH_OSD_FLAG_IGNORE_OVERLAY;
4861
4862 if (c->target_osd >= 0) {
4863 if (!osdmap->exists(c->target_osd)) {
4864 c->map_check_error = -ENOENT;
4865 c->map_check_error_str = "osd dne";
4866 c->target.osd = -1;
4867 return RECALC_OP_TARGET_OSD_DNE;
4868 }
4869 if (osdmap->is_down(c->target_osd)) {
4870 c->map_check_error = -ENXIO;
4871 c->map_check_error_str = "osd down";
4872 c->target.osd = -1;
4873 return RECALC_OP_TARGET_OSD_DOWN;
4874 }
4875 c->target.osd = c->target_osd;
4876 } else {
4877 int ret = _calc_target(&(c->target), nullptr, true);
4878 if (ret == RECALC_OP_TARGET_POOL_DNE) {
4879 c->map_check_error = -ENOENT;
4880 c->map_check_error_str = "pool dne";
4881 c->target.osd = -1;
4882 return ret;
4883 } else if (ret == RECALC_OP_TARGET_OSD_DOWN) {
4884 c->map_check_error = -ENXIO;
4885 c->map_check_error_str = "osd down";
4886 c->target.osd = -1;
4887 return ret;
4888 }
4889 }
4890
4891 OSDSession *s;
4892 int r = _get_session(c->target.osd, &s, sul);
11fdf7f2 4893 ceph_assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
7c673cae
FG
4894
4895 if (c->session != s) {
4896 put_session(s);
4897 return RECALC_OP_TARGET_NEED_RESEND;
4898 }
4899
4900 put_session(s);
4901
4902 ldout(cct, 20) << "_recalc_command_target " << c->tid << " no change, "
4903 << c->session << dendl;
4904
4905 return RECALC_OP_TARGET_NO_ACTION;
4906}
4907
4908void Objecter::_assign_command_session(CommandOp *c,
f67539c2 4909 shunique_lock<ceph::shared_mutex>& sul)
7c673cae 4910{
11fdf7f2 4911 ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
7c673cae
FG
4912
4913 OSDSession *s;
4914 int r = _get_session(c->target.osd, &s, sul);
11fdf7f2 4915 ceph_assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
7c673cae
FG
4916
4917 if (c->session != s) {
4918 if (c->session) {
4919 OSDSession *cs = c->session;
f67539c2 4920 unique_lock csl(cs->lock);
7c673cae
FG
4921 _session_command_op_remove(c->session, c);
4922 csl.unlock();
4923 }
f67539c2 4924 unique_lock sl(s->lock);
7c673cae
FG
4925 _session_command_op_assign(s, c);
4926 }
4927
4928 put_session(s);
4929}
4930
4931void Objecter::_send_command(CommandOp *c)
4932{
4933 ldout(cct, 10) << "_send_command " << c->tid << dendl;
11fdf7f2
TL
4934 ceph_assert(c->session);
4935 ceph_assert(c->session->con);
f67539c2 4936 auto m = new MCommand(monc->monmap.fsid);
7c673cae
FG
4937 m->cmd = c->cmd;
4938 m->set_data(c->inbl);
4939 m->set_tid(c->tid);
4940 c->session->con->send_message(m);
4941 logger->inc(l_osdc_command_send);
4942}
4943
f67539c2
TL
4944int Objecter::command_op_cancel(OSDSession *s, ceph_tid_t tid,
4945 bs::error_code ec)
7c673cae 4946{
11fdf7f2 4947 ceph_assert(initialized);
7c673cae
FG
4948
4949 unique_lock wl(rwlock);
4950
f67539c2 4951 auto it = s->command_ops.find(tid);
7c673cae
FG
4952 if (it == s->command_ops.end()) {
4953 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4954 return -ENOENT;
4955 }
4956
4957 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4958
4959 CommandOp *op = it->second;
4960 _command_cancel_map_check(op);
f67539c2
TL
4961 unique_lock sl(op->session->lock);
4962 _finish_command(op, ec, {}, {});
28e407b8 4963 sl.unlock();
7c673cae
FG
4964 return 0;
4965}
4966
f67539c2
TL
4967void Objecter::_finish_command(CommandOp *c, bs::error_code ec,
4968 string&& rs, cb::list&& bl)
7c673cae
FG
4969{
4970 // rwlock is locked unique
28e407b8 4971 // session lock is locked
7c673cae 4972
f67539c2 4973 ldout(cct, 10) << "_finish_command " << c->tid << " = " << ec << " "
7c673cae 4974 << rs << dendl;
f67539c2 4975
7c673cae 4976 if (c->onfinish)
f67539c2 4977 c->onfinish->defer(std::move(c->onfinish), ec, std::move(rs), std::move(bl));
7c673cae 4978
f67539c2 4979 if (c->ontimeout && ec != bs::errc::timed_out)
7c673cae
FG
4980 timer.cancel_event(c->ontimeout);
4981
7c673cae 4982 _session_command_op_remove(c->session, c);
7c673cae
FG
4983
4984 c->put();
4985
4986 logger->dec(l_osdc_command_active);
4987}
4988
4989Objecter::OSDSession::~OSDSession()
4990{
4991 // Caller is responsible for re-assigning or
4992 // destroying any ops that were assigned to us
11fdf7f2
TL
4993 ceph_assert(ops.empty());
4994 ceph_assert(linger_ops.empty());
4995 ceph_assert(command_ops.empty());
7c673cae
FG
4996}
4997
f67539c2
TL
4998Objecter::Objecter(CephContext *cct,
4999 Messenger *m, MonClient *mc,
5000 boost::asio::io_context& service) :
5001 Dispatcher(cct), messenger(m), monc(mc), service(service)
f91f0fd5
TL
5002{
5003 mon_timeout = cct->_conf.get_val<std::chrono::seconds>("rados_mon_op_timeout");
5004 osd_timeout = cct->_conf.get_val<std::chrono::seconds>("rados_osd_op_timeout");
5005}
9f95a23c 5006
7c673cae
FG
5007Objecter::~Objecter()
5008{
11fdf7f2
TL
5009 ceph_assert(homeless_session->get_nref() == 1);
5010 ceph_assert(num_homeless_ops == 0);
7c673cae
FG
5011 homeless_session->put();
5012
11fdf7f2
TL
5013 ceph_assert(osd_sessions.empty());
5014 ceph_assert(poolstat_ops.empty());
5015 ceph_assert(statfs_ops.empty());
5016 ceph_assert(pool_ops.empty());
5017 ceph_assert(waiting_for_map.empty());
5018 ceph_assert(linger_ops.empty());
5019 ceph_assert(check_latest_map_lingers.empty());
5020 ceph_assert(check_latest_map_ops.empty());
5021 ceph_assert(check_latest_map_commands.empty());
7c673cae 5022
11fdf7f2
TL
5023 ceph_assert(!m_request_state_hook);
5024 ceph_assert(!logger);
7c673cae
FG
5025}
5026
5027/**
5028 * Wait until this OSD map epoch is received before
5029 * sending any more operations to OSDs. Use this
5030 * when it is known that the client can't trust
5031 * anything from before this epoch (e.g. due to
f67539c2 5032 * client blocklist at this epoch).
7c673cae
FG
5033 */
5034void Objecter::set_epoch_barrier(epoch_t epoch)
5035{
5036 unique_lock wl(rwlock);
5037
5038 ldout(cct, 7) << __func__ << ": barrier " << epoch << " (was "
5039 << epoch_barrier << ") current epoch " << osdmap->get_epoch()
5040 << dendl;
5041 if (epoch > epoch_barrier) {
5042 epoch_barrier = epoch;
5043 _maybe_request_map();
5044 }
5045}
5046
5047
5048
5049hobject_t Objecter::enumerate_objects_begin()
5050{
5051 return hobject_t();
5052}
5053
5054hobject_t Objecter::enumerate_objects_end()
5055{
5056 return hobject_t::get_max();
5057}
5058
f67539c2
TL
5059template<typename T>
5060struct EnumerationContext {
5061 Objecter* objecter;
7c673cae 5062 const hobject_t end;
f67539c2
TL
5063 const cb::list filter;
5064 uint32_t max;
5065 const object_locator_t oloc;
5066 std::vector<T> ls;
5067private:
5068 fu2::unique_function<void(bs::error_code,
5069 std::vector<T>,
5070 hobject_t) &&> on_finish;
5071public:
5072 epoch_t epoch = 0;
5073 int budget = -1;
5074
5075 EnumerationContext(Objecter* objecter,
5076 hobject_t end, cb::list filter,
5077 uint32_t max, object_locator_t oloc,
5078 decltype(on_finish) on_finish)
5079 : objecter(objecter), end(std::move(end)), filter(std::move(filter)),
5080 max(max), oloc(std::move(oloc)), on_finish(std::move(on_finish)) {}
5081
5082 void operator()(bs::error_code ec,
5083 std::vector<T> v,
5084 hobject_t h) && {
5085 if (budget >= 0) {
5086 objecter->put_op_budget_bytes(budget);
5087 budget = -1;
5088 }
5089
5090 std::move(on_finish)(ec, std::move(v), std::move(h));
5091 }
5092};
7c673cae 5093
f67539c2
TL
5094template<typename T>
5095struct CB_EnumerateReply {
5096 cb::list bl;
7c673cae 5097
f67539c2
TL
5098 Objecter* objecter;
5099 std::unique_ptr<EnumerationContext<T>> ctx;
7c673cae 5100
f67539c2
TL
5101 CB_EnumerateReply(Objecter* objecter,
5102 std::unique_ptr<EnumerationContext<T>>&& ctx) :
5103 objecter(objecter), ctx(std::move(ctx)) {}
5104
5105 void operator()(bs::error_code ec) {
5106 objecter->_enumerate_reply(std::move(bl), ec, std::move(ctx));
7c673cae
FG
5107 }
5108};
5109
f67539c2 5110template<typename T>
7c673cae 5111void Objecter::enumerate_objects(
f67539c2
TL
5112 int64_t pool_id,
5113 std::string_view ns,
5114 hobject_t start,
5115 hobject_t end,
5116 const uint32_t max,
5117 const cb::list& filter_bl,
5118 fu2::unique_function<void(bs::error_code,
5119 std::vector<T>,
5120 hobject_t) &&> on_finish) {
7c673cae
FG
5121 if (!end.is_max() && start > end) {
5122 lderr(cct) << __func__ << ": start " << start << " > end " << end << dendl;
f67539c2 5123 std::move(on_finish)(osdc_errc::precondition_violated, {}, {});
7c673cae
FG
5124 return;
5125 }
5126
5127 if (max < 1) {
5128 lderr(cct) << __func__ << ": result size may not be zero" << dendl;
f67539c2 5129 std::move(on_finish)(osdc_errc::precondition_violated, {}, {});
7c673cae
FG
5130 return;
5131 }
5132
5133 if (start.is_max()) {
f67539c2 5134 std::move(on_finish)({}, {}, {});
7c673cae
FG
5135 return;
5136 }
5137
5138 shared_lock rl(rwlock);
11fdf7f2 5139 ceph_assert(osdmap->get_epoch());
7c673cae
FG
5140 if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) {
5141 rl.unlock();
5142 lderr(cct) << __func__ << ": SORTBITWISE cluster flag not set" << dendl;
f67539c2 5143 std::move(on_finish)(osdc_errc::not_supported, {}, {});
7c673cae
FG
5144 return;
5145 }
f67539c2 5146 const pg_pool_t* p = osdmap->get_pg_pool(pool_id);
7c673cae
FG
5147 if (!p) {
5148 lderr(cct) << __func__ << ": pool " << pool_id << " DNE in osd epoch "
5149 << osdmap->get_epoch() << dendl;
5150 rl.unlock();
f67539c2 5151 std::move(on_finish)(osdc_errc::pool_dne, {}, {});
7c673cae
FG
5152 return;
5153 } else {
5154 rl.unlock();
5155 }
5156
f67539c2
TL
5157 _issue_enumerate(start,
5158 std::make_unique<EnumerationContext<T>>(
5159 this, std::move(end), filter_bl,
5160 max, object_locator_t{pool_id, ns},
5161 std::move(on_finish)));
5162}
5163
5164template
5165void Objecter::enumerate_objects<librados::ListObjectImpl>(
5166 int64_t pool_id,
5167 std::string_view ns,
5168 hobject_t start,
5169 hobject_t end,
5170 const uint32_t max,
5171 const cb::list& filter_bl,
5172 fu2::unique_function<void(bs::error_code,
5173 std::vector<librados::ListObjectImpl>,
5174 hobject_t) &&> on_finish);
5175
5176template
5177void Objecter::enumerate_objects<neorados::Entry>(
5178 int64_t pool_id,
5179 std::string_view ns,
5180 hobject_t start,
5181 hobject_t end,
5182 const uint32_t max,
5183 const cb::list& filter_bl,
5184 fu2::unique_function<void(bs::error_code,
5185 std::vector<neorados::Entry>,
5186 hobject_t) &&> on_finish);
5187
5188
5189
5190template<typename T>
5191void Objecter::_issue_enumerate(hobject_t start,
5192 std::unique_ptr<EnumerationContext<T>> ctx) {
7c673cae 5193 ObjectOperation op;
f67539c2
TL
5194 auto c = ctx.get();
5195 op.pg_nls(c->max, c->filter, start, osdmap->get_epoch());
5196 auto on_ack = std::make_unique<CB_EnumerateReply<T>>(this, std::move(ctx));
5197 // I hate having to do this. Try to find a cleaner way
5198 // later.
5199 auto epoch = &c->epoch;
5200 auto budget = &c->budget;
5201 auto pbl = &on_ack->bl;
7c673cae
FG
5202
5203 // Issue. See you later in _enumerate_reply
f67539c2
TL
5204 pg_read(start.get_hash(),
5205 c->oloc, op, pbl, 0,
5206 Op::OpComp::create(service.get_executor(),
5207 [c = std::move(on_ack)]
5208 (bs::error_code ec) mutable {
5209 (*c)(ec);
5210 }), epoch, budget);
5211}
5212
5213template
5214void Objecter::_issue_enumerate<librados::ListObjectImpl>(
5215 hobject_t start,
5216 std::unique_ptr<EnumerationContext<librados::ListObjectImpl>> ctx);
5217template
5218void Objecter::_issue_enumerate<neorados::Entry>(
5219 hobject_t start, std::unique_ptr<EnumerationContext<neorados::Entry>> ctx);
5220
5221template<typename T>
7c673cae 5222void Objecter::_enumerate_reply(
f67539c2
TL
5223 cb::list&& bl,
5224 bs::error_code ec,
5225 std::unique_ptr<EnumerationContext<T>>&& ctx)
5226{
5227 if (ec) {
5228 std::move(*ctx)(ec, {}, {});
7c673cae
FG
5229 return;
5230 }
5231
7c673cae 5232 // Decode the results
11fdf7f2 5233 auto iter = bl.cbegin();
f67539c2
TL
5234 pg_nls_response_template<T> response;
5235
5236 try {
5237 response.decode(iter);
5238 if (!iter.end()) {
5239 // extra_info isn't used anywhere. We do this solely to preserve
5240 // backward compatibility
5241 cb::list legacy_extra_info;
5242 decode(legacy_extra_info, iter);
5243 }
5244 } catch (const bs::system_error& e) {
5245 std::move(*ctx)(e.code(), {}, {});
5246 return;
5247 }
7c673cae 5248
f67539c2
TL
5249 shared_lock rl(rwlock);
5250 auto pool = osdmap->get_pg_pool(ctx->oloc.get_pool());
5251 rl.unlock();
5252 if (!pool) {
5253 // pool is gone, drop any results which are now meaningless.
5254 std::move(*ctx)(osdc_errc::pool_dne, {}, {});
5255 return;
7c673cae
FG
5256 }
5257
f67539c2
TL
5258 hobject_t next;
5259 if ((response.handle <= ctx->end)) {
5260 next = response.handle;
7c673cae 5261 } else {
f67539c2 5262 next = ctx->end;
7c673cae
FG
5263
5264 // drop anything after 'end'
7c673cae
FG
5265 while (!response.entries.empty()) {
5266 uint32_t hash = response.entries.back().locator.empty() ?
5267 pool->hash_key(response.entries.back().oid,
5268 response.entries.back().nspace) :
5269 pool->hash_key(response.entries.back().locator,
5270 response.entries.back().nspace);
5271 hobject_t last(response.entries.back().oid,
5272 response.entries.back().locator,
5273 CEPH_NOSNAP,
5274 hash,
f67539c2 5275 ctx->oloc.get_pool(),
7c673cae 5276 response.entries.back().nspace);
f67539c2 5277 if (last < ctx->end)
7c673cae 5278 break;
7c673cae
FG
5279 response.entries.pop_back();
5280 }
7c673cae 5281 }
f67539c2
TL
5282
5283 if (response.entries.size() <= ctx->max) {
5284 ctx->max -= response.entries.size();
5285 std::move(response.entries.begin(), response.entries.end(),
5286 std::back_inserter(ctx->ls));
5287 } else {
5288 auto i = response.entries.begin();
5289 while (ctx->max > 0) {
5290 ctx->ls.push_back(std::move(*i));
5291 --(ctx->max);
5292 ++i;
5293 }
5294 uint32_t hash =
5295 i->locator.empty() ?
5296 pool->hash_key(i->oid, i->nspace) :
5297 pool->hash_key(i->locator, i->nspace);
5298
5299 next = hobject_t{i->oid, i->locator,
5300 CEPH_NOSNAP,
5301 hash,
5302 ctx->oloc.get_pool(),
5303 i->nspace};
7c673cae
FG
5304 }
5305
f67539c2
TL
5306 if (next == ctx->end || ctx->max == 0) {
5307 std::move(*ctx)(ec, std::move(ctx->ls), std::move(next));
5308 } else {
5309 _issue_enumerate(next, std::move(ctx));
5310 }
7c673cae
FG
5311}
5312
f67539c2
TL
5313template
5314void Objecter::_enumerate_reply<librados::ListObjectImpl>(
5315 cb::list&& bl,
5316 bs::error_code ec,
5317 std::unique_ptr<EnumerationContext<librados::ListObjectImpl>>&& ctx);
5318
5319template
5320void Objecter::_enumerate_reply<neorados::Entry>(
5321 cb::list&& bl,
5322 bs::error_code ec,
5323 std::unique_ptr<EnumerationContext<neorados::Entry>>&& ctx);
5324
7c673cae
FG
5325namespace {
5326 using namespace librados;
5327
5328 template <typename T>
f67539c2 5329 void do_decode(std::vector<T>& items, std::vector<cb::list>& bls)
7c673cae
FG
5330 {
5331 for (auto bl : bls) {
11fdf7f2 5332 auto p = bl.cbegin();
7c673cae
FG
5333 T t;
5334 decode(t, p);
5335 items.push_back(t);
5336 }
5337 }
5338
5339 struct C_ObjectOperation_scrub_ls : public Context {
f67539c2
TL
5340 cb::list bl;
5341 uint32_t* interval;
7c673cae
FG
5342 std::vector<inconsistent_obj_t> *objects = nullptr;
5343 std::vector<inconsistent_snapset_t> *snapsets = nullptr;
f67539c2 5344 int* rval;
7c673cae 5345
f67539c2
TL
5346 C_ObjectOperation_scrub_ls(uint32_t* interval,
5347 std::vector<inconsistent_obj_t>* objects,
5348 int* rval)
7c673cae 5349 : interval(interval), objects(objects), rval(rval) {}
f67539c2
TL
5350 C_ObjectOperation_scrub_ls(uint32_t* interval,
5351 std::vector<inconsistent_snapset_t>* snapsets,
5352 int* rval)
7c673cae
FG
5353 : interval(interval), snapsets(snapsets), rval(rval) {}
5354 void finish(int r) override {
5355 if (r < 0 && r != -EAGAIN) {
5356 if (rval)
5357 *rval = r;
5358 return;
5359 }
5360
5361 if (rval)
5362 *rval = 0;
5363
5364 try {
5365 decode();
f67539c2 5366 } catch (cb::error&) {
7c673cae
FG
5367 if (rval)
5368 *rval = -EIO;
5369 }
5370 }
5371 private:
5372 void decode() {
5373 scrub_ls_result_t result;
11fdf7f2 5374 auto p = bl.cbegin();
7c673cae
FG
5375 result.decode(p);
5376 *interval = result.interval;
5377 if (objects) {
5378 do_decode(*objects, result.vals);
5379 } else {
5380 do_decode(*snapsets, result.vals);
5381 }
5382 }
5383 };
5384
5385 template <typename T>
f67539c2 5386 void do_scrub_ls(::ObjectOperation* op,
7c673cae
FG
5387 const scrub_ls_arg_t& arg,
5388 std::vector<T> *items,
f67539c2
TL
5389 uint32_t* interval,
5390 int* rval)
7c673cae
FG
5391 {
5392 OSDOp& osd_op = op->add_op(CEPH_OSD_OP_SCRUBLS);
5393 op->flags |= CEPH_OSD_FLAG_PGOP;
11fdf7f2 5394 ceph_assert(interval);
7c673cae
FG
5395 arg.encode(osd_op.indata);
5396 unsigned p = op->ops.size() - 1;
f67539c2
TL
5397 auto h = new C_ObjectOperation_scrub_ls{interval, items, rval};
5398 op->set_handler(h);
7c673cae
FG
5399 op->out_bl[p] = &h->bl;
5400 op->out_rval[p] = rval;
5401 }
5402}
5403
5404void ::ObjectOperation::scrub_ls(const librados::object_id_t& start_after,
5405 uint64_t max_to_get,
f67539c2
TL
5406 std::vector<librados::inconsistent_obj_t>* objects,
5407 uint32_t* interval,
5408 int* rval)
7c673cae
FG
5409{
5410 scrub_ls_arg_t arg = {*interval, 0, start_after, max_to_get};
5411 do_scrub_ls(this, arg, objects, interval, rval);
5412}
5413
5414void ::ObjectOperation::scrub_ls(const librados::object_id_t& start_after,
5415 uint64_t max_to_get,
5416 std::vector<librados::inconsistent_snapset_t> *snapsets,
5417 uint32_t *interval,
5418 int *rval)
5419{
5420 scrub_ls_arg_t arg = {*interval, 1, start_after, max_to_get};
5421 do_scrub_ls(this, arg, snapsets, interval, rval);
5422}