]> git.proxmox.com Git - ceph.git/blame - ceph/src/osdc/Objecter.cc
bump version to 17.2.0
[ceph.git] / ceph / src / osdc / Objecter.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
f67539c2 15#include <algorithm>
9f95a23c
TL
16#include <cerrno>
17
7c673cae
FG
18#include "Objecter.h"
19#include "osd/OSDMap.h"
f67539c2 20#include "osd/error_code.h"
7c673cae
FG
21#include "Filer.h"
22
23#include "mon/MonClient.h"
f67539c2 24#include "mon/error_code.h"
7c673cae
FG
25
26#include "msg/Messenger.h"
27#include "msg/Message.h"
28
29#include "messages/MPing.h"
30#include "messages/MOSDOp.h"
31#include "messages/MOSDOpReply.h"
32#include "messages/MOSDBackoff.h"
33#include "messages/MOSDMap.h"
34
35#include "messages/MPoolOp.h"
36#include "messages/MPoolOpReply.h"
37
38#include "messages/MGetPoolStats.h"
39#include "messages/MGetPoolStatsReply.h"
40#include "messages/MStatfs.h"
41#include "messages/MStatfsReply.h"
42
43#include "messages/MMonCommand.h"
44
45#include "messages/MCommand.h"
46#include "messages/MCommandReply.h"
47
48#include "messages/MWatchNotify.h"
49
7c673cae 50
f67539c2 51#include "common/Cond.h"
7c673cae
FG
52#include "common/config.h"
53#include "common/perf_counters.h"
54#include "common/scrub_types.h"
55#include "include/str_list.h"
56#include "common/errno.h"
57#include "common/EventTrace.h"
f67539c2
TL
58#include "common/async/waiter.h"
59#include "error_code.h"
60
7c673cae 61
9f95a23c
TL
62using std::list;
63using std::make_pair;
64using std::map;
65using std::ostream;
66using std::ostringstream;
67using std::pair;
68using std::set;
69using std::string;
70using std::stringstream;
71using std::vector;
72
73using ceph::decode;
74using ceph::encode;
75using ceph::Formatter;
76
77using std::defer_lock;
20effc67
TL
78using std::scoped_lock;
79using std::shared_lock;
80using std::unique_lock;
9f95a23c 81
7c673cae
FG
82using ceph::real_time;
83using ceph::real_clock;
84
85using ceph::mono_clock;
86using ceph::mono_time;
87
88using ceph::timespan;
89
9f95a23c
TL
90using ceph::shunique_lock;
91using ceph::acquire_shared;
92using ceph::acquire_unique;
7c673cae 93
f67539c2
TL
94namespace bc = boost::container;
95namespace bs = boost::system;
96namespace ca = ceph::async;
97namespace cb = ceph::buffer;
98
7c673cae
FG
99#define dout_subsys ceph_subsys_objecter
100#undef dout_prefix
101#define dout_prefix *_dout << messenger->get_myname() << ".objecter "
102
103
104enum {
105 l_osdc_first = 123200,
106 l_osdc_op_active,
107 l_osdc_op_laggy,
108 l_osdc_op_send,
109 l_osdc_op_send_bytes,
110 l_osdc_op_resend,
111 l_osdc_op_reply,
f67539c2 112 l_osdc_oplen_avg,
7c673cae
FG
113
114 l_osdc_op,
115 l_osdc_op_r,
116 l_osdc_op_w,
117 l_osdc_op_rmw,
118 l_osdc_op_pg,
119
120 l_osdc_osdop_stat,
121 l_osdc_osdop_create,
122 l_osdc_osdop_read,
123 l_osdc_osdop_write,
124 l_osdc_osdop_writefull,
125 l_osdc_osdop_writesame,
126 l_osdc_osdop_append,
127 l_osdc_osdop_zero,
128 l_osdc_osdop_truncate,
129 l_osdc_osdop_delete,
130 l_osdc_osdop_mapext,
131 l_osdc_osdop_sparse_read,
132 l_osdc_osdop_clonerange,
133 l_osdc_osdop_getxattr,
134 l_osdc_osdop_setxattr,
135 l_osdc_osdop_cmpxattr,
136 l_osdc_osdop_rmxattr,
137 l_osdc_osdop_resetxattrs,
7c673cae
FG
138 l_osdc_osdop_call,
139 l_osdc_osdop_watch,
140 l_osdc_osdop_notify,
141 l_osdc_osdop_src_cmpxattr,
142 l_osdc_osdop_pgls,
143 l_osdc_osdop_pgls_filter,
144 l_osdc_osdop_other,
145
146 l_osdc_linger_active,
147 l_osdc_linger_send,
148 l_osdc_linger_resend,
149 l_osdc_linger_ping,
150
151 l_osdc_poolop_active,
152 l_osdc_poolop_send,
153 l_osdc_poolop_resend,
154
155 l_osdc_poolstat_active,
156 l_osdc_poolstat_send,
157 l_osdc_poolstat_resend,
158
159 l_osdc_statfs_active,
160 l_osdc_statfs_send,
161 l_osdc_statfs_resend,
162
163 l_osdc_command_active,
164 l_osdc_command_send,
165 l_osdc_command_resend,
166
167 l_osdc_map_epoch,
168 l_osdc_map_full,
169 l_osdc_map_inc,
170
171 l_osdc_osd_sessions,
172 l_osdc_osd_session_open,
173 l_osdc_osd_session_close,
174 l_osdc_osd_laggy,
175
176 l_osdc_osdop_omap_wr,
177 l_osdc_osdop_omap_rd,
178 l_osdc_osdop_omap_del,
179
180 l_osdc_last,
181};
182
f67539c2
TL
183namespace {
184inline bs::error_code osdcode(int r) {
185 return (r < 0) ? bs::error_code(-r, osd_category()) : bs::error_code();
186}
187}
7c673cae
FG
188
189// config obs ----------------------------
190
7c673cae
FG
191class Objecter::RequestStateHook : public AdminSocketHook {
192 Objecter *m_objecter;
193public:
194 explicit RequestStateHook(Objecter *objecter);
9f95a23c
TL
195 int call(std::string_view command, const cmdmap_t& cmdmap,
196 Formatter *f,
197 std::ostream& ss,
f67539c2 198 cb::list& out) override;
7c673cae
FG
199};
200
f67539c2 201std::unique_lock<std::mutex> Objecter::OSDSession::get_lock(object_t& oid)
7c673cae
FG
202{
203 if (oid.name.empty())
f67539c2 204 return {};
7c673cae
FG
205
206 static constexpr uint32_t HASH_PRIME = 1021;
207 uint32_t h = ceph_str_hash_linux(oid.name.c_str(), oid.name.size())
208 % HASH_PRIME;
209
f67539c2 210 return {completion_locks[h % num_locks], std::defer_lock};
7c673cae
FG
211}
212
213const char** Objecter::get_tracked_conf_keys() const
214{
f91f0fd5
TL
215 static const char *config_keys[] = {
216 "crush_location",
217 "rados_mon_op_timeout",
218 "rados_osd_op_timeout",
219 NULL
220 };
7c673cae
FG
221 return config_keys;
222}
223
224
11fdf7f2 225void Objecter::handle_conf_change(const ConfigProxy& conf,
7c673cae
FG
226 const std::set <std::string> &changed)
227{
228 if (changed.count("crush_location")) {
229 update_crush_location();
230 }
f91f0fd5
TL
231 if (changed.count("rados_mon_op_timeout")) {
232 mon_timeout = conf.get_val<std::chrono::seconds>("rados_mon_op_timeout");
233 }
234 if (changed.count("rados_osd_op_timeout")) {
235 osd_timeout = conf.get_val<std::chrono::seconds>("rados_osd_op_timeout");
236 }
7c673cae
FG
237}
238
239void Objecter::update_crush_location()
240{
241 unique_lock wl(rwlock);
242 crush_location = cct->crush_location.get_location();
243}
244
245// messages ------------------------------
246
247/*
248 * initialize only internal data structures, don't initiate cluster interaction
249 */
250void Objecter::init()
251{
11fdf7f2 252 ceph_assert(!initialized);
7c673cae
FG
253
254 if (!logger) {
255 PerfCountersBuilder pcb(cct, "objecter", l_osdc_first, l_osdc_last);
256
257 pcb.add_u64(l_osdc_op_active, "op_active", "Operations active", "actv",
258 PerfCountersBuilder::PRIO_CRITICAL);
259 pcb.add_u64(l_osdc_op_laggy, "op_laggy", "Laggy operations");
260 pcb.add_u64_counter(l_osdc_op_send, "op_send", "Sent operations");
11fdf7f2 261 pcb.add_u64_counter(l_osdc_op_send_bytes, "op_send_bytes", "Sent data", NULL, 0, unit_t(UNIT_BYTES));
7c673cae
FG
262 pcb.add_u64_counter(l_osdc_op_resend, "op_resend", "Resent operations");
263 pcb.add_u64_counter(l_osdc_op_reply, "op_reply", "Operation reply");
f67539c2 264 pcb.add_u64_avg(l_osdc_oplen_avg, "oplen_avg", "Average length of operation vector");
7c673cae
FG
265
266 pcb.add_u64_counter(l_osdc_op, "op", "Operations");
267 pcb.add_u64_counter(l_osdc_op_r, "op_r", "Read operations", "rd",
268 PerfCountersBuilder::PRIO_CRITICAL);
269 pcb.add_u64_counter(l_osdc_op_w, "op_w", "Write operations", "wr",
270 PerfCountersBuilder::PRIO_CRITICAL);
271 pcb.add_u64_counter(l_osdc_op_rmw, "op_rmw", "Read-modify-write operations",
272 "rdwr", PerfCountersBuilder::PRIO_INTERESTING);
273 pcb.add_u64_counter(l_osdc_op_pg, "op_pg", "PG operation");
274
275 pcb.add_u64_counter(l_osdc_osdop_stat, "osdop_stat", "Stat operations");
276 pcb.add_u64_counter(l_osdc_osdop_create, "osdop_create",
277 "Create object operations");
278 pcb.add_u64_counter(l_osdc_osdop_read, "osdop_read", "Read operations");
279 pcb.add_u64_counter(l_osdc_osdop_write, "osdop_write", "Write operations");
280 pcb.add_u64_counter(l_osdc_osdop_writefull, "osdop_writefull",
281 "Write full object operations");
282 pcb.add_u64_counter(l_osdc_osdop_writesame, "osdop_writesame",
283 "Write same operations");
284 pcb.add_u64_counter(l_osdc_osdop_append, "osdop_append",
285 "Append operation");
286 pcb.add_u64_counter(l_osdc_osdop_zero, "osdop_zero",
287 "Set object to zero operations");
288 pcb.add_u64_counter(l_osdc_osdop_truncate, "osdop_truncate",
289 "Truncate object operations");
290 pcb.add_u64_counter(l_osdc_osdop_delete, "osdop_delete",
291 "Delete object operations");
292 pcb.add_u64_counter(l_osdc_osdop_mapext, "osdop_mapext",
293 "Map extent operations");
294 pcb.add_u64_counter(l_osdc_osdop_sparse_read, "osdop_sparse_read",
295 "Sparse read operations");
296 pcb.add_u64_counter(l_osdc_osdop_clonerange, "osdop_clonerange",
297 "Clone range operations");
298 pcb.add_u64_counter(l_osdc_osdop_getxattr, "osdop_getxattr",
299 "Get xattr operations");
300 pcb.add_u64_counter(l_osdc_osdop_setxattr, "osdop_setxattr",
301 "Set xattr operations");
302 pcb.add_u64_counter(l_osdc_osdop_cmpxattr, "osdop_cmpxattr",
303 "Xattr comparison operations");
304 pcb.add_u64_counter(l_osdc_osdop_rmxattr, "osdop_rmxattr",
305 "Remove xattr operations");
306 pcb.add_u64_counter(l_osdc_osdop_resetxattrs, "osdop_resetxattrs",
307 "Reset xattr operations");
7c673cae
FG
308 pcb.add_u64_counter(l_osdc_osdop_call, "osdop_call",
309 "Call (execute) operations");
310 pcb.add_u64_counter(l_osdc_osdop_watch, "osdop_watch",
311 "Watch by object operations");
312 pcb.add_u64_counter(l_osdc_osdop_notify, "osdop_notify",
313 "Notify about object operations");
314 pcb.add_u64_counter(l_osdc_osdop_src_cmpxattr, "osdop_src_cmpxattr",
315 "Extended attribute comparison in multi operations");
316 pcb.add_u64_counter(l_osdc_osdop_pgls, "osdop_pgls");
317 pcb.add_u64_counter(l_osdc_osdop_pgls_filter, "osdop_pgls_filter");
318 pcb.add_u64_counter(l_osdc_osdop_other, "osdop_other", "Other operations");
319
320 pcb.add_u64(l_osdc_linger_active, "linger_active",
321 "Active lingering operations");
322 pcb.add_u64_counter(l_osdc_linger_send, "linger_send",
323 "Sent lingering operations");
324 pcb.add_u64_counter(l_osdc_linger_resend, "linger_resend",
325 "Resent lingering operations");
326 pcb.add_u64_counter(l_osdc_linger_ping, "linger_ping",
327 "Sent pings to lingering operations");
328
329 pcb.add_u64(l_osdc_poolop_active, "poolop_active",
330 "Active pool operations");
331 pcb.add_u64_counter(l_osdc_poolop_send, "poolop_send",
332 "Sent pool operations");
333 pcb.add_u64_counter(l_osdc_poolop_resend, "poolop_resend",
334 "Resent pool operations");
335
336 pcb.add_u64(l_osdc_poolstat_active, "poolstat_active",
337 "Active get pool stat operations");
338 pcb.add_u64_counter(l_osdc_poolstat_send, "poolstat_send",
339 "Pool stat operations sent");
340 pcb.add_u64_counter(l_osdc_poolstat_resend, "poolstat_resend",
341 "Resent pool stats");
342
343 pcb.add_u64(l_osdc_statfs_active, "statfs_active", "Statfs operations");
344 pcb.add_u64_counter(l_osdc_statfs_send, "statfs_send", "Sent FS stats");
345 pcb.add_u64_counter(l_osdc_statfs_resend, "statfs_resend",
346 "Resent FS stats");
347
348 pcb.add_u64(l_osdc_command_active, "command_active", "Active commands");
349 pcb.add_u64_counter(l_osdc_command_send, "command_send",
350 "Sent commands");
351 pcb.add_u64_counter(l_osdc_command_resend, "command_resend",
352 "Resent commands");
353
354 pcb.add_u64(l_osdc_map_epoch, "map_epoch", "OSD map epoch");
355 pcb.add_u64_counter(l_osdc_map_full, "map_full",
356 "Full OSD maps received");
357 pcb.add_u64_counter(l_osdc_map_inc, "map_inc",
358 "Incremental OSD maps received");
359
360 pcb.add_u64(l_osdc_osd_sessions, "osd_sessions",
361 "Open sessions"); // open sessions
362 pcb.add_u64_counter(l_osdc_osd_session_open, "osd_session_open",
363 "Sessions opened");
364 pcb.add_u64_counter(l_osdc_osd_session_close, "osd_session_close",
365 "Sessions closed");
366 pcb.add_u64(l_osdc_osd_laggy, "osd_laggy", "Laggy OSD sessions");
367
368 pcb.add_u64_counter(l_osdc_osdop_omap_wr, "omap_wr",
369 "OSD OMAP write operations");
370 pcb.add_u64_counter(l_osdc_osdop_omap_rd, "omap_rd",
371 "OSD OMAP read operations");
372 pcb.add_u64_counter(l_osdc_osdop_omap_del, "omap_del",
373 "OSD OMAP delete operations");
374
375 logger = pcb.create_perf_counters();
376 cct->get_perfcounters_collection()->add(logger);
377 }
378
379 m_request_state_hook = new RequestStateHook(this);
f67539c2 380 auto admin_socket = cct->get_admin_socket();
7c673cae 381 int ret = admin_socket->register_command("objecter_requests",
7c673cae
FG
382 m_request_state_hook,
383 "show in-progress osd requests");
384
385 /* Don't warn on EEXIST, happens if multiple ceph clients
386 * are instantiated from one process */
387 if (ret < 0 && ret != -EEXIST) {
388 lderr(cct) << "error registering admin socket command: "
389 << cpp_strerror(ret) << dendl;
390 }
391
392 update_crush_location();
393
11fdf7f2 394 cct->_conf.add_observer(this);
7c673cae 395
31f18b77 396 initialized = true;
7c673cae
FG
397}
398
399/*
400 * ok, cluster interaction can happen
401 */
402void Objecter::start(const OSDMap* o)
403{
404 shared_lock rl(rwlock);
405
406 start_tick();
407 if (o) {
408 osdmap->deepish_copy_from(*o);
9f95a23c 409 prune_pg_mapping(osdmap->get_pools());
7c673cae
FG
410 } else if (osdmap->get_epoch() == 0) {
411 _maybe_request_map();
412 }
413}
414
415void Objecter::shutdown()
416{
11fdf7f2 417 ceph_assert(initialized);
7c673cae
FG
418
419 unique_lock wl(rwlock);
420
31f18b77 421 initialized = false;
7c673cae 422
f64942e4 423 wl.unlock();
11fdf7f2 424 cct->_conf.remove_observer(this);
f64942e4 425 wl.lock();
7c673cae 426
7c673cae 427 while (!osd_sessions.empty()) {
f67539c2 428 auto p = osd_sessions.begin();
7c673cae
FG
429 close_session(p->second);
430 }
431
432 while(!check_latest_map_lingers.empty()) {
f67539c2 433 auto i = check_latest_map_lingers.begin();
7c673cae
FG
434 i->second->put();
435 check_latest_map_lingers.erase(i->first);
436 }
437
438 while(!check_latest_map_ops.empty()) {
f67539c2 439 auto i = check_latest_map_ops.begin();
7c673cae
FG
440 i->second->put();
441 check_latest_map_ops.erase(i->first);
442 }
443
444 while(!check_latest_map_commands.empty()) {
f67539c2 445 auto i = check_latest_map_commands.begin();
7c673cae
FG
446 i->second->put();
447 check_latest_map_commands.erase(i->first);
448 }
449
450 while(!poolstat_ops.empty()) {
f67539c2 451 auto i = poolstat_ops.begin();
7c673cae
FG
452 delete i->second;
453 poolstat_ops.erase(i->first);
454 }
455
456 while(!statfs_ops.empty()) {
f67539c2 457 auto i = statfs_ops.begin();
7c673cae
FG
458 delete i->second;
459 statfs_ops.erase(i->first);
460 }
461
462 while(!pool_ops.empty()) {
f67539c2 463 auto i = pool_ops.begin();
7c673cae
FG
464 delete i->second;
465 pool_ops.erase(i->first);
466 }
467
468 ldout(cct, 20) << __func__ << " clearing up homeless session..." << dendl;
469 while(!homeless_session->linger_ops.empty()) {
f67539c2 470 auto i = homeless_session->linger_ops.begin();
7c673cae
FG
471 ldout(cct, 10) << " linger_op " << i->first << dendl;
472 LingerOp *lop = i->second;
473 {
f67539c2 474 std::unique_lock swl(homeless_session->lock);
7c673cae
FG
475 _session_linger_op_remove(homeless_session, lop);
476 }
477 linger_ops.erase(lop->linger_id);
478 linger_ops_set.erase(lop);
479 lop->put();
480 }
481
482 while(!homeless_session->ops.empty()) {
f67539c2 483 auto i = homeless_session->ops.begin();
7c673cae 484 ldout(cct, 10) << " op " << i->first << dendl;
f67539c2 485 auto op = i->second;
7c673cae 486 {
f67539c2 487 std::unique_lock swl(homeless_session->lock);
7c673cae
FG
488 _session_op_remove(homeless_session, op);
489 }
490 op->put();
491 }
492
493 while(!homeless_session->command_ops.empty()) {
f67539c2 494 auto i = homeless_session->command_ops.begin();
7c673cae 495 ldout(cct, 10) << " command_op " << i->first << dendl;
f67539c2 496 auto cop = i->second;
7c673cae 497 {
f67539c2 498 std::unique_lock swl(homeless_session->lock);
7c673cae
FG
499 _session_command_op_remove(homeless_session, cop);
500 }
501 cop->put();
502 }
503
504 if (tick_event) {
505 if (timer.cancel_event(tick_event)) {
506 ldout(cct, 10) << " successfully canceled tick" << dendl;
507 }
508 tick_event = 0;
509 }
510
511 if (logger) {
512 cct->get_perfcounters_collection()->remove(logger);
513 delete logger;
514 logger = NULL;
515 }
516
517 // Let go of Objecter write lock so timer thread can shutdown
518 wl.unlock();
519
520 // Outside of lock to avoid cycle WRT calls to RequestStateHook
521 // This is safe because we guarantee no concurrent calls to
522 // shutdown() with the ::initialized check at start.
523 if (m_request_state_hook) {
f67539c2 524 auto admin_socket = cct->get_admin_socket();
9f95a23c 525 admin_socket->unregister_commands(m_request_state_hook);
7c673cae
FG
526 delete m_request_state_hook;
527 m_request_state_hook = NULL;
528 }
529}
530
531void Objecter::_send_linger(LingerOp *info,
f67539c2 532 ceph::shunique_lock<ceph::shared_mutex>& sul)
7c673cae 533{
11fdf7f2 534 ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
7c673cae 535
f67539c2
TL
536 fu2::unique_function<Op::OpSig> oncommit;
537 osdc_opvec opv;
538 std::shared_lock watchl(info->watch_lock);
539 cb::list *poutbl = nullptr;
7c673cae
FG
540 if (info->registered && info->is_watch) {
541 ldout(cct, 15) << "send_linger " << info->linger_id << " reconnect"
542 << dendl;
543 opv.push_back(OSDOp());
544 opv.back().op.op = CEPH_OSD_OP_WATCH;
545 opv.back().op.watch.cookie = info->get_cookie();
546 opv.back().op.watch.op = CEPH_OSD_WATCH_OP_RECONNECT;
547 opv.back().op.watch.gen = ++info->register_gen;
f67539c2 548 oncommit = CB_Linger_Reconnect(this, info);
7c673cae
FG
549 } else {
550 ldout(cct, 15) << "send_linger " << info->linger_id << " register"
551 << dendl;
552 opv = info->ops;
f67539c2
TL
553 // TODO Augment ca::Completion with an equivalent of
554 // target so we can handle these cases better.
555 auto c = std::make_unique<CB_Linger_Commit>(this, info);
7c673cae
FG
556 if (!info->is_watch) {
557 info->notify_id = 0;
558 poutbl = &c->outbl;
559 }
f67539c2
TL
560 oncommit = [c = std::move(c)](bs::error_code ec) mutable {
561 std::move(*c)(ec);
562 };
7c673cae
FG
563 }
564 watchl.unlock();
f67539c2
TL
565 auto o = new Op(info->target.base_oid, info->target.base_oloc,
566 std::move(opv), info->target.flags | CEPH_OSD_FLAG_READ,
567 std::move(oncommit), info->pobjver);
7c673cae
FG
568 o->outbl = poutbl;
569 o->snapid = info->snap;
570 o->snapc = info->snapc;
571 o->mtime = info->mtime;
572
573 o->target = info->target;
31f18b77 574 o->tid = ++last_tid;
7c673cae
FG
575
576 // do not resend this; we will send a new op to reregister
577 o->should_resend = false;
11fdf7f2 578 o->ctx_budgeted = true;
7c673cae
FG
579
580 if (info->register_tid) {
11fdf7f2 581 // repeat send. cancel old registration op, if any.
f67539c2 582 std::unique_lock sl(info->session->lock);
7c673cae 583 if (info->session->ops.count(info->register_tid)) {
f67539c2 584 auto o = info->session->ops[info->register_tid];
7c673cae
FG
585 _op_cancel_map_check(o);
586 _cancel_linger_op(o);
587 }
588 sl.unlock();
7c673cae
FG
589 }
590
11fdf7f2
TL
591 _op_submit_with_budget(o, sul, &info->register_tid, &info->ctx_budget);
592
7c673cae
FG
593 logger->inc(l_osdc_linger_send);
594}
595
f67539c2
TL
596void Objecter::_linger_commit(LingerOp *info, bs::error_code ec,
597 cb::list& outbl)
7c673cae 598{
f67539c2 599 std::unique_lock wl(info->watch_lock);
7c673cae
FG
600 ldout(cct, 10) << "_linger_commit " << info->linger_id << dendl;
601 if (info->on_reg_commit) {
f67539c2
TL
602 info->on_reg_commit->defer(std::move(info->on_reg_commit),
603 ec, cb::list{});
604 info->on_reg_commit.reset();
7c673cae 605 }
f67539c2
TL
606 if (ec && info->on_notify_finish) {
607 info->on_notify_finish->defer(std::move(info->on_notify_finish),
608 ec, cb::list{});
609 info->on_notify_finish.reset();
28e407b8 610 }
7c673cae
FG
611
612 // only tell the user the first time we do this
613 info->registered = true;
614 info->pobjver = NULL;
615
616 if (!info->is_watch) {
617 // make note of the notify_id
11fdf7f2 618 auto p = outbl.cbegin();
7c673cae 619 try {
11fdf7f2 620 decode(info->notify_id, p);
7c673cae
FG
621 ldout(cct, 10) << "_linger_commit notify_id=" << info->notify_id
622 << dendl;
623 }
f67539c2 624 catch (cb::error& e) {
7c673cae
FG
625 }
626 }
627}
628
f67539c2 629class CB_DoWatchError {
7c673cae 630 Objecter *objecter;
f67539c2
TL
631 boost::intrusive_ptr<Objecter::LingerOp> info;
632 bs::error_code ec;
633public:
634 CB_DoWatchError(Objecter *o, Objecter::LingerOp *i,
635 bs::error_code ec)
636 : objecter(o), info(i), ec(ec) {
7c673cae
FG
637 info->_queued_async();
638 }
f67539c2
TL
639 void operator()() {
640 std::unique_lock wl(objecter->rwlock);
7c673cae
FG
641 bool canceled = info->canceled;
642 wl.unlock();
643
644 if (!canceled) {
f67539c2 645 info->handle(ec, 0, info->get_cookie(), 0, {});
7c673cae
FG
646 }
647
648 info->finished_async();
7c673cae
FG
649 }
650};
651
f67539c2 652bs::error_code Objecter::_normalize_watch_error(bs::error_code ec)
7c673cae
FG
653{
654 // translate ENOENT -> ENOTCONN so that a delete->disconnection
11fdf7f2 655 // notification and a failure to reconnect because we raced with
7c673cae 656 // the delete appear the same to the user.
f67539c2
TL
657 if (ec == bs::errc::no_such_file_or_directory)
658 ec = bs::error_code(ENOTCONN, osd_category());
659 return ec;
7c673cae
FG
660}
661
f67539c2 662void Objecter::_linger_reconnect(LingerOp *info, bs::error_code ec)
7c673cae 663{
f67539c2 664 ldout(cct, 10) << __func__ << " " << info->linger_id << " = " << ec
7c673cae 665 << " (last_error " << info->last_error << ")" << dendl;
f67539c2
TL
666 std::unique_lock wl(info->watch_lock);
667 if (ec) {
7c673cae 668 if (!info->last_error) {
f67539c2
TL
669 ec = _normalize_watch_error(ec);
670 if (info->handle) {
671 boost::asio::defer(finish_strand, CB_DoWatchError(this, info, ec));
7c673cae
FG
672 }
673 }
7c673cae 674 }
f67539c2
TL
675
676 info->last_error = ec;
7c673cae
FG
677}
678
679void Objecter::_send_linger_ping(LingerOp *info)
680{
681 // rwlock is locked unique
682 // info->session->lock is locked
683
684 if (cct->_conf->objecter_inject_no_watch_ping) {
685 ldout(cct, 10) << __func__ << " " << info->linger_id << " SKIPPING"
686 << dendl;
687 return;
688 }
689 if (osdmap->test_flag(CEPH_OSDMAP_PAUSERD)) {
690 ldout(cct, 10) << __func__ << " PAUSERD" << dendl;
691 return;
692 }
693
11fdf7f2 694 ceph::coarse_mono_time now = ceph::coarse_mono_clock::now();
7c673cae
FG
695 ldout(cct, 10) << __func__ << " " << info->linger_id << " now " << now
696 << dendl;
697
f67539c2 698 osdc_opvec opv(1);
7c673cae
FG
699 opv[0].op.op = CEPH_OSD_OP_WATCH;
700 opv[0].op.watch.cookie = info->get_cookie();
701 opv[0].op.watch.op = CEPH_OSD_WATCH_OP_PING;
702 opv[0].op.watch.gen = info->register_gen;
f67539c2 703
7c673cae 704 Op *o = new Op(info->target.base_oid, info->target.base_oloc,
f67539c2
TL
705 std::move(opv), info->target.flags | CEPH_OSD_FLAG_READ,
706 CB_Linger_Ping(this, info, now),
707 nullptr, nullptr);
7c673cae
FG
708 o->target = info->target;
709 o->should_resend = false;
710 _send_op_account(o);
31f18b77 711 o->tid = ++last_tid;
7c673cae 712 _session_op_assign(info->session, o);
11fdf7f2 713 _send_op(o);
7c673cae
FG
714 info->ping_tid = o->tid;
715
7c673cae
FG
716 logger->inc(l_osdc_linger_ping);
717}
718
f67539c2 719void Objecter::_linger_ping(LingerOp *info, bs::error_code ec, ceph::coarse_mono_time sent,
7c673cae
FG
720 uint32_t register_gen)
721{
f67539c2 722 std::unique_lock l(info->watch_lock);
7c673cae 723 ldout(cct, 10) << __func__ << " " << info->linger_id
f67539c2 724 << " sent " << sent << " gen " << register_gen << " = " << ec
7c673cae
FG
725 << " (last_error " << info->last_error
726 << " register_gen " << info->register_gen << ")" << dendl;
727 if (info->register_gen == register_gen) {
f67539c2 728 if (!ec) {
7c673cae 729 info->watch_valid_thru = sent;
f67539c2
TL
730 } else if (ec && !info->last_error) {
731 ec = _normalize_watch_error(ec);
732 info->last_error = ec;
733 if (info->handle) {
734 boost::asio::defer(finish_strand, CB_DoWatchError(this, info, ec));
7c673cae
FG
735 }
736 }
737 } else {
738 ldout(cct, 20) << " ignoring old gen" << dendl;
739 }
740}
741
f67539c2
TL
742tl::expected<ceph::timespan,
743 bs::error_code> Objecter::linger_check(LingerOp *info)
7c673cae 744{
f67539c2 745 std::shared_lock l(info->watch_lock);
7c673cae 746
11fdf7f2 747 ceph::coarse_mono_time stamp = info->watch_valid_thru;
7c673cae 748 if (!info->watch_pending_async.empty())
11fdf7f2
TL
749 stamp = std::min(info->watch_valid_thru, info->watch_pending_async.front());
750 auto age = ceph::coarse_mono_clock::now() - stamp;
7c673cae
FG
751
752 ldout(cct, 10) << __func__ << " " << info->linger_id
753 << " err " << info->last_error
754 << " age " << age << dendl;
755 if (info->last_error)
f67539c2 756 return tl::unexpected(info->last_error);
7c673cae 757 // return a safe upper bound (we are truncating to ms)
f67539c2 758 return age;
7c673cae
FG
759}
760
761void Objecter::linger_cancel(LingerOp *info)
762{
763 unique_lock wl(rwlock);
764 _linger_cancel(info);
765 info->put();
766}
767
768void Objecter::_linger_cancel(LingerOp *info)
769{
770 // rwlock is locked unique
771 ldout(cct, 20) << __func__ << " linger_id=" << info->linger_id << dendl;
772 if (!info->canceled) {
773 OSDSession *s = info->session;
f67539c2 774 std::unique_lock sl(s->lock);
7c673cae
FG
775 _session_linger_op_remove(s, info);
776 sl.unlock();
777
778 linger_ops.erase(info->linger_id);
779 linger_ops_set.erase(info);
11fdf7f2 780 ceph_assert(linger_ops.size() == linger_ops_set.size());
7c673cae
FG
781
782 info->canceled = true;
783 info->put();
784
785 logger->dec(l_osdc_linger_active);
786 }
787}
788
789
790
791Objecter::LingerOp *Objecter::linger_register(const object_t& oid,
792 const object_locator_t& oloc,
793 int flags)
794{
f67539c2
TL
795 unique_lock l(rwlock);
796 // Acquire linger ID
797 auto info = new LingerOp(this, ++max_linger_id);
7c673cae
FG
798 info->target.base_oid = oid;
799 info->target.base_oloc = oloc;
800 if (info->target.base_oloc.key == oid)
801 info->target.base_oloc.key.clear();
802 info->target.flags = flags;
11fdf7f2 803 info->watch_valid_thru = ceph::coarse_mono_clock::now();
7c673cae
FG
804 ldout(cct, 10) << __func__ << " info " << info
805 << " linger_id " << info->linger_id
806 << " cookie " << info->get_cookie()
807 << dendl;
808 linger_ops[info->linger_id] = info;
809 linger_ops_set.insert(info);
11fdf7f2 810 ceph_assert(linger_ops.size() == linger_ops_set.size());
7c673cae
FG
811
812 info->get(); // for the caller
813 return info;
814}
815
816ceph_tid_t Objecter::linger_watch(LingerOp *info,
817 ObjectOperation& op,
818 const SnapContext& snapc,
819 real_time mtime,
f67539c2
TL
820 cb::list& inbl,
821 decltype(info->on_reg_commit)&& oncommit,
7c673cae
FG
822 version_t *objver)
823{
824 info->is_watch = true;
825 info->snapc = snapc;
826 info->mtime = mtime;
827 info->target.flags |= CEPH_OSD_FLAG_WRITE;
828 info->ops = op.ops;
829 info->inbl = inbl;
7c673cae 830 info->pobjver = objver;
f67539c2 831 info->on_reg_commit = std::move(oncommit);
7c673cae 832
11fdf7f2
TL
833 info->ctx_budget = take_linger_budget(info);
834
7c673cae
FG
835 shunique_lock sul(rwlock, ceph::acquire_unique);
836 _linger_submit(info, sul);
837 logger->inc(l_osdc_linger_active);
838
f67539c2 839 op.clear();
7c673cae
FG
840 return info->linger_id;
841}
842
843ceph_tid_t Objecter::linger_notify(LingerOp *info,
844 ObjectOperation& op,
f67539c2
TL
845 snapid_t snap, cb::list& inbl,
846 decltype(LingerOp::on_reg_commit)&& onfinish,
7c673cae
FG
847 version_t *objver)
848{
849 info->snap = snap;
850 info->target.flags |= CEPH_OSD_FLAG_READ;
851 info->ops = op.ops;
852 info->inbl = inbl;
7c673cae 853 info->pobjver = objver;
f67539c2 854 info->on_reg_commit = std::move(onfinish);
11fdf7f2 855 info->ctx_budget = take_linger_budget(info);
f67539c2 856
7c673cae
FG
857 shunique_lock sul(rwlock, ceph::acquire_unique);
858 _linger_submit(info, sul);
859 logger->inc(l_osdc_linger_active);
860
f67539c2 861 op.clear();
7c673cae
FG
862 return info->linger_id;
863}
864
f67539c2
TL
865void Objecter::_linger_submit(LingerOp *info,
866 ceph::shunique_lock<ceph::shared_mutex>& sul)
7c673cae 867{
11fdf7f2
TL
868 ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
869 ceph_assert(info->linger_id);
870 ceph_assert(info->ctx_budget != -1); // caller needs to have taken budget already!
7c673cae
FG
871
872 // Populate Op::target
873 OSDSession *s = NULL;
20effc67
TL
874 int r = _calc_target(&info->target, nullptr);
875 switch (r) {
876 case RECALC_OP_TARGET_POOL_EIO:
877 _check_linger_pool_eio(info);
878 return;
879 }
7c673cae
FG
880
881 // Create LingerOp<->OSDSession relation
20effc67 882 r = _get_session(info->target.osd, &s, sul);
11fdf7f2 883 ceph_assert(r == 0);
f67539c2 884 unique_lock sl(s->lock);
7c673cae
FG
885 _session_linger_op_assign(s, info);
886 sl.unlock();
887 put_session(s);
888
889 _send_linger(info, sul);
890}
891
f67539c2 892struct CB_DoWatchNotify {
7c673cae 893 Objecter *objecter;
f67539c2
TL
894 boost::intrusive_ptr<Objecter::LingerOp> info;
895 boost::intrusive_ptr<MWatchNotify> msg;
896 CB_DoWatchNotify(Objecter *o, Objecter::LingerOp *i, MWatchNotify *m)
7c673cae 897 : objecter(o), info(i), msg(m) {
7c673cae 898 info->_queued_async();
7c673cae 899 }
f67539c2
TL
900 void operator()() {
901 objecter->_do_watch_notify(std::move(info), std::move(msg));
7c673cae
FG
902 }
903};
904
905void Objecter::handle_watch_notify(MWatchNotify *m)
906{
907 shared_lock l(rwlock);
31f18b77 908 if (!initialized) {
7c673cae
FG
909 return;
910 }
911
912 LingerOp *info = reinterpret_cast<LingerOp*>(m->cookie);
913 if (linger_ops_set.count(info) == 0) {
914 ldout(cct, 7) << __func__ << " cookie " << m->cookie << " dne" << dendl;
915 return;
916 }
f67539c2 917 std::unique_lock wl(info->watch_lock);
7c673cae
FG
918 if (m->opcode == CEPH_WATCH_EVENT_DISCONNECT) {
919 if (!info->last_error) {
f67539c2
TL
920 info->last_error = bs::error_code(ENOTCONN, osd_category());
921 if (info->handle) {
922 boost::asio::defer(finish_strand, CB_DoWatchError(this, info,
923 info->last_error));
7c673cae
FG
924 }
925 }
926 } else if (!info->is_watch) {
927 // we have CEPH_WATCH_EVENT_NOTIFY_COMPLETE; we can do this inline
928 // since we know the only user (librados) is safe to call in
929 // fast-dispatch context
930 if (info->notify_id &&
931 info->notify_id != m->notify_id) {
932 ldout(cct, 10) << __func__ << " reply notify " << m->notify_id
933 << " != " << info->notify_id << ", ignoring" << dendl;
934 } else if (info->on_notify_finish) {
f67539c2
TL
935 info->on_notify_finish->defer(
936 std::move(info->on_notify_finish),
937 osdcode(m->return_code), std::move(m->get_data()));
7c673cae
FG
938
939 // if we race with reconnect we might get a second notify; only
940 // notify the caller once!
f67539c2 941 info->on_notify_finish = nullptr;
7c673cae
FG
942 }
943 } else {
f67539c2 944 boost::asio::defer(finish_strand, CB_DoWatchNotify(this, info, m));
7c673cae
FG
945 }
946}
947
f67539c2
TL
948void Objecter::_do_watch_notify(boost::intrusive_ptr<LingerOp> info,
949 boost::intrusive_ptr<MWatchNotify> m)
7c673cae
FG
950{
951 ldout(cct, 10) << __func__ << " " << *m << dendl;
952
953 shared_lock l(rwlock);
11fdf7f2 954 ceph_assert(initialized);
7c673cae
FG
955
956 if (info->canceled) {
957 l.unlock();
958 goto out;
959 }
960
961 // notify completion?
11fdf7f2 962 ceph_assert(info->is_watch);
f67539c2 963 ceph_assert(info->handle);
11fdf7f2 964 ceph_assert(m->opcode != CEPH_WATCH_EVENT_DISCONNECT);
7c673cae
FG
965
966 l.unlock();
967
968 switch (m->opcode) {
969 case CEPH_WATCH_EVENT_NOTIFY:
f67539c2 970 info->handle({}, m->notify_id, m->cookie, m->notifier_gid, std::move(m->bl));
7c673cae
FG
971 break;
972 }
973
974 out:
975 info->finished_async();
7c673cae
FG
976}
977
978bool Objecter::ms_dispatch(Message *m)
979{
980 ldout(cct, 10) << __func__ << " " << cct << " " << *m << dendl;
7c673cae
FG
981 switch (m->get_type()) {
982 // these we exlusively handle
983 case CEPH_MSG_OSD_OPREPLY:
984 handle_osd_op_reply(static_cast<MOSDOpReply*>(m));
985 return true;
986
987 case CEPH_MSG_OSD_BACKOFF:
988 handle_osd_backoff(static_cast<MOSDBackoff*>(m));
989 return true;
990
991 case CEPH_MSG_WATCH_NOTIFY:
992 handle_watch_notify(static_cast<MWatchNotify*>(m));
993 m->put();
994 return true;
995
996 case MSG_COMMAND_REPLY:
997 if (m->get_source().type() == CEPH_ENTITY_TYPE_OSD) {
998 handle_command_reply(static_cast<MCommandReply*>(m));
999 return true;
1000 } else {
1001 return false;
1002 }
1003
1004 case MSG_GETPOOLSTATSREPLY:
1005 handle_get_pool_stats_reply(static_cast<MGetPoolStatsReply*>(m));
1006 return true;
1007
1008 case CEPH_MSG_POOLOP_REPLY:
1009 handle_pool_op_reply(static_cast<MPoolOpReply*>(m));
1010 return true;
1011
1012 case CEPH_MSG_STATFS_REPLY:
1013 handle_fs_stats_reply(static_cast<MStatfsReply*>(m));
1014 return true;
1015
1016 // these we give others a chance to inspect
1017
1018 // MDS, OSD
1019 case CEPH_MSG_OSD_MAP:
1020 handle_osd_map(static_cast<MOSDMap*>(m));
1021 return false;
1022 }
1023 return false;
1024}
1025
11fdf7f2
TL
1026void Objecter::_scan_requests(
1027 OSDSession *s,
1028 bool skipped_map,
1029 bool cluster_full,
1030 map<int64_t, bool> *pool_full_map,
1031 map<ceph_tid_t, Op*>& need_resend,
1032 list<LingerOp*>& need_resend_linger,
1033 map<ceph_tid_t, CommandOp*>& need_resend_command,
f67539c2 1034 ceph::shunique_lock<ceph::shared_mutex>& sul)
7c673cae 1035{
11fdf7f2 1036 ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
7c673cae
FG
1037
1038 list<LingerOp*> unregister_lingers;
1039
f67539c2 1040 std::unique_lock sl(s->lock);
7c673cae
FG
1041
1042 // check for changed linger mappings (_before_ regular ops)
f67539c2 1043 auto lp = s->linger_ops.begin();
7c673cae 1044 while (lp != s->linger_ops.end()) {
f67539c2 1045 auto op = lp->second;
11fdf7f2 1046 ceph_assert(op->session == s);
7c673cae
FG
1047 // check_linger_pool_dne() may touch linger_ops; prevent iterator
1048 // invalidation
1049 ++lp;
1050 ldout(cct, 10) << " checking linger op " << op->linger_id << dendl;
1051 bool unregister, force_resend_writes = cluster_full;
1052 int r = _recalc_linger_op_target(op, sul);
1053 if (pool_full_map)
1054 force_resend_writes = force_resend_writes ||
1055 (*pool_full_map)[op->target.base_oloc.pool];
1056 switch (r) {
1057 case RECALC_OP_TARGET_NO_ACTION:
11fdf7f2 1058 if (!skipped_map && !force_resend_writes)
7c673cae
FG
1059 break;
1060 // -- fall-thru --
1061 case RECALC_OP_TARGET_NEED_RESEND:
1062 need_resend_linger.push_back(op);
1063 _linger_cancel_map_check(op);
1064 break;
1065 case RECALC_OP_TARGET_POOL_DNE:
1066 _check_linger_pool_dne(op, &unregister);
1067 if (unregister) {
1068 ldout(cct, 10) << " need to unregister linger op "
1069 << op->linger_id << dendl;
1070 op->get();
1071 unregister_lingers.push_back(op);
1072 }
1073 break;
20effc67
TL
1074 case RECALC_OP_TARGET_POOL_EIO:
1075 _check_linger_pool_eio(op);
1076 ldout(cct, 10) << " need to unregister linger op "
1077 << op->linger_id << dendl;
1078 op->get();
1079 unregister_lingers.push_back(op);
1080 break;
7c673cae
FG
1081 }
1082 }
1083
1084 // check for changed request mappings
f67539c2 1085 auto p = s->ops.begin();
7c673cae
FG
1086 while (p != s->ops.end()) {
1087 Op *op = p->second;
1088 ++p; // check_op_pool_dne() may touch ops; prevent iterator invalidation
1089 ldout(cct, 10) << " checking op " << op->tid << dendl;
11fdf7f2 1090 _prune_snapc(osdmap->get_new_removed_snaps(), op);
7c673cae
FG
1091 bool force_resend_writes = cluster_full;
1092 if (pool_full_map)
1093 force_resend_writes = force_resend_writes ||
1094 (*pool_full_map)[op->target.base_oloc.pool];
1095 int r = _calc_target(&op->target,
1096 op->session ? op->session->con.get() : nullptr);
1097 switch (r) {
1098 case RECALC_OP_TARGET_NO_ACTION:
9f95a23c 1099 if (!skipped_map && !(force_resend_writes && op->target.respects_full()))
7c673cae
FG
1100 break;
1101 // -- fall-thru --
1102 case RECALC_OP_TARGET_NEED_RESEND:
11fdf7f2 1103 _session_op_remove(op->session, op);
7c673cae
FG
1104 need_resend[op->tid] = op;
1105 _op_cancel_map_check(op);
1106 break;
1107 case RECALC_OP_TARGET_POOL_DNE:
1108 _check_op_pool_dne(op, &sl);
1109 break;
20effc67
TL
1110 case RECALC_OP_TARGET_POOL_EIO:
1111 _check_op_pool_eio(op, &sl);
1112 break;
7c673cae
FG
1113 }
1114 }
1115
1116 // commands
f67539c2 1117 auto cp = s->command_ops.begin();
7c673cae 1118 while (cp != s->command_ops.end()) {
f67539c2 1119 auto c = cp->second;
7c673cae
FG
1120 ++cp;
1121 ldout(cct, 10) << " checking command " << c->tid << dendl;
1122 bool force_resend_writes = cluster_full;
1123 if (pool_full_map)
1124 force_resend_writes = force_resend_writes ||
1125 (*pool_full_map)[c->target_pg.pool()];
1126 int r = _calc_command_target(c, sul);
1127 switch (r) {
1128 case RECALC_OP_TARGET_NO_ACTION:
1129 // resend if skipped map; otherwise do nothing.
11fdf7f2 1130 if (!skipped_map && !force_resend_writes)
7c673cae
FG
1131 break;
1132 // -- fall-thru --
1133 case RECALC_OP_TARGET_NEED_RESEND:
1134 need_resend_command[c->tid] = c;
11fdf7f2 1135 _session_command_op_remove(c->session, c);
7c673cae
FG
1136 _command_cancel_map_check(c);
1137 break;
1138 case RECALC_OP_TARGET_POOL_DNE:
1139 case RECALC_OP_TARGET_OSD_DNE:
1140 case RECALC_OP_TARGET_OSD_DOWN:
1141 _check_command_map_dne(c);
1142 break;
1143 }
1144 }
1145
1146 sl.unlock();
1147
f67539c2 1148 for (auto iter = unregister_lingers.begin();
7c673cae
FG
1149 iter != unregister_lingers.end();
1150 ++iter) {
1151 _linger_cancel(*iter);
1152 (*iter)->put();
1153 }
1154}
1155
1156void Objecter::handle_osd_map(MOSDMap *m)
1157{
f67539c2 1158 ceph::shunique_lock sul(rwlock, acquire_unique);
31f18b77 1159 if (!initialized)
7c673cae
FG
1160 return;
1161
11fdf7f2 1162 ceph_assert(osdmap);
7c673cae
FG
1163
1164 if (m->fsid != monc->get_fsid()) {
1165 ldout(cct, 0) << "handle_osd_map fsid " << m->fsid
1166 << " != " << monc->get_fsid() << dendl;
1167 return;
1168 }
1169
1170 bool was_pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
1171 bool cluster_full = _osdmap_full_flag();
1172 bool was_pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || cluster_full ||
1173 _osdmap_has_pool_full();
1174 map<int64_t, bool> pool_full_map;
f67539c2 1175 for (auto it = osdmap->get_pools().begin();
7c673cae
FG
1176 it != osdmap->get_pools().end(); ++it)
1177 pool_full_map[it->first] = _osdmap_pool_full(it->second);
1178
1179
1180 list<LingerOp*> need_resend_linger;
1181 map<ceph_tid_t, Op*> need_resend;
1182 map<ceph_tid_t, CommandOp*> need_resend_command;
1183
1184 if (m->get_last() <= osdmap->get_epoch()) {
1185 ldout(cct, 3) << "handle_osd_map ignoring epochs ["
1186 << m->get_first() << "," << m->get_last()
1187 << "] <= " << osdmap->get_epoch() << dendl;
1188 } else {
1189 ldout(cct, 3) << "handle_osd_map got epochs ["
1190 << m->get_first() << "," << m->get_last()
1191 << "] > " << osdmap->get_epoch() << dendl;
1192
1193 if (osdmap->get_epoch()) {
1194 bool skipped_map = false;
1195 // we want incrementals
1196 for (epoch_t e = osdmap->get_epoch() + 1;
1197 e <= m->get_last();
1198 e++) {
1199
1200 if (osdmap->get_epoch() == e-1 &&
1201 m->incremental_maps.count(e)) {
1202 ldout(cct, 3) << "handle_osd_map decoding incremental epoch " << e
1203 << dendl;
1204 OSDMap::Incremental inc(m->incremental_maps[e]);
1205 osdmap->apply_incremental(inc);
31f18b77 1206
f67539c2 1207 emit_blocklist_events(inc);
31f18b77 1208
7c673cae
FG
1209 logger->inc(l_osdc_map_inc);
1210 }
1211 else if (m->maps.count(e)) {
1212 ldout(cct, 3) << "handle_osd_map decoding full epoch " << e << dendl;
9f95a23c 1213 auto new_osdmap = std::make_unique<OSDMap>();
31f18b77
FG
1214 new_osdmap->decode(m->maps[e]);
1215
f67539c2 1216 emit_blocklist_events(*osdmap, *new_osdmap);
9f95a23c 1217 osdmap = std::move(new_osdmap);
31f18b77 1218
7c673cae
FG
1219 logger->inc(l_osdc_map_full);
1220 }
1221 else {
1222 if (e >= m->get_oldest()) {
1223 ldout(cct, 3) << "handle_osd_map requesting missing epoch "
1224 << osdmap->get_epoch()+1 << dendl;
1225 _maybe_request_map();
1226 break;
1227 }
1228 ldout(cct, 3) << "handle_osd_map missing epoch "
1229 << osdmap->get_epoch()+1
1230 << ", jumping to " << m->get_oldest() << dendl;
1231 e = m->get_oldest() - 1;
1232 skipped_map = true;
1233 continue;
1234 }
1235 logger->set(l_osdc_map_epoch, osdmap->get_epoch());
1236
9f95a23c 1237 prune_pg_mapping(osdmap->get_pools());
7c673cae
FG
1238 cluster_full = cluster_full || _osdmap_full_flag();
1239 update_pool_full_map(pool_full_map);
1240
1241 // check all outstanding requests on every epoch
11fdf7f2
TL
1242 for (auto& i : need_resend) {
1243 _prune_snapc(osdmap->get_new_removed_snaps(), i.second);
11fdf7f2 1244 }
7c673cae
FG
1245 _scan_requests(homeless_session, skipped_map, cluster_full,
1246 &pool_full_map, need_resend,
9f95a23c 1247 need_resend_linger, need_resend_command, sul);
f67539c2 1248 for (auto p = osd_sessions.begin();
7c673cae 1249 p != osd_sessions.end(); ) {
f67539c2 1250 auto s = p->second;
7c673cae
FG
1251 _scan_requests(s, skipped_map, cluster_full,
1252 &pool_full_map, need_resend,
9f95a23c 1253 need_resend_linger, need_resend_command, sul);
7c673cae
FG
1254 ++p;
1255 // osd down or addr change?
1256 if (!osdmap->is_up(s->osd) ||
1257 (s->con &&
11fdf7f2 1258 s->con->get_peer_addrs() != osdmap->get_addrs(s->osd))) {
7c673cae
FG
1259 close_session(s);
1260 }
1261 }
1262
11fdf7f2 1263 ceph_assert(e == osdmap->get_epoch());
7c673cae
FG
1264 }
1265
1266 } else {
1267 // first map. we want the full thing.
1268 if (m->maps.count(m->get_last())) {
f67539c2 1269 for (auto p = osd_sessions.begin();
7c673cae
FG
1270 p != osd_sessions.end(); ++p) {
1271 OSDSession *s = p->second;
1272 _scan_requests(s, false, false, NULL, need_resend,
9f95a23c 1273 need_resend_linger, need_resend_command, sul);
7c673cae
FG
1274 }
1275 ldout(cct, 3) << "handle_osd_map decoding full epoch "
1276 << m->get_last() << dendl;
1277 osdmap->decode(m->maps[m->get_last()]);
9f95a23c 1278 prune_pg_mapping(osdmap->get_pools());
7c673cae
FG
1279
1280 _scan_requests(homeless_session, false, false, NULL,
1281 need_resend, need_resend_linger,
9f95a23c 1282 need_resend_command, sul);
7c673cae
FG
1283 } else {
1284 ldout(cct, 3) << "handle_osd_map hmm, i want a full map, requesting"
1285 << dendl;
1286 monc->sub_want("osdmap", 0, CEPH_SUBSCRIBE_ONETIME);
1287 monc->renew_subs();
1288 }
1289 }
1290 }
1291
1292 // make sure need_resend targets reflect latest map
1293 for (auto p = need_resend.begin(); p != need_resend.end(); ) {
1294 Op *op = p->second;
1295 if (op->target.epoch < osdmap->get_epoch()) {
1296 ldout(cct, 10) << __func__ << " checking op " << p->first << dendl;
1297 int r = _calc_target(&op->target, nullptr);
1298 if (r == RECALC_OP_TARGET_POOL_DNE) {
1299 p = need_resend.erase(p);
1300 _check_op_pool_dne(op, nullptr);
1301 } else {
1302 ++p;
1303 }
1304 } else {
1305 ++p;
1306 }
1307 }
1308
1309 bool pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
1310 bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || _osdmap_full_flag()
1311 || _osdmap_has_pool_full();
1312
1313 // was/is paused?
1314 if (was_pauserd || was_pausewr || pauserd || pausewr ||
1315 osdmap->get_epoch() < epoch_barrier) {
1316 _maybe_request_map();
1317 }
1318
1319 // resend requests
f67539c2 1320 for (auto p = need_resend.begin();
7c673cae 1321 p != need_resend.end(); ++p) {
f67539c2
TL
1322 auto op = p->second;
1323 auto s = op->session;
7c673cae
FG
1324 bool mapped_session = false;
1325 if (!s) {
1326 int r = _map_session(&op->target, &s, sul);
11fdf7f2 1327 ceph_assert(r == 0);
7c673cae
FG
1328 mapped_session = true;
1329 } else {
1330 get_session(s);
1331 }
f67539c2 1332 std::unique_lock sl(s->lock);
7c673cae
FG
1333 if (mapped_session) {
1334 _session_op_assign(s, op);
1335 }
1336 if (op->should_resend) {
1337 if (!op->session->is_homeless() && !op->target.paused) {
1338 logger->inc(l_osdc_op_resend);
1339 _send_op(op);
1340 }
1341 } else {
1342 _op_cancel_map_check(op);
1343 _cancel_linger_op(op);
1344 }
1345 sl.unlock();
1346 put_session(s);
1347 }
f67539c2 1348 for (auto p = need_resend_linger.begin();
7c673cae
FG
1349 p != need_resend_linger.end(); ++p) {
1350 LingerOp *op = *p;
11fdf7f2 1351 ceph_assert(op->session);
7c673cae
FG
1352 if (!op->session->is_homeless()) {
1353 logger->inc(l_osdc_linger_resend);
1354 _send_linger(op, sul);
1355 }
1356 }
f67539c2 1357 for (auto p = need_resend_command.begin();
7c673cae 1358 p != need_resend_command.end(); ++p) {
f67539c2 1359 auto c = p->second;
7c673cae
FG
1360 if (c->target.osd >= 0) {
1361 _assign_command_session(c, sul);
1362 if (c->session && !c->session->is_homeless()) {
1363 _send_command(c);
1364 }
1365 }
1366 }
1367
1368 _dump_active();
1369
1370 // finish any Contexts that were waiting on a map update
f67539c2 1371 auto p = waiting_for_map.begin();
7c673cae
FG
1372 while (p != waiting_for_map.end() &&
1373 p->first <= osdmap->get_epoch()) {
1374 //go through the list and call the onfinish methods
f67539c2
TL
1375 for (auto& [c, ec] : p->second) {
1376 ca::post(std::move(c), ec);
7c673cae
FG
1377 }
1378 waiting_for_map.erase(p++);
1379 }
1380
1381 monc->sub_got("osdmap", osdmap->get_epoch());
1382
1383 if (!waiting_for_map.empty()) {
1384 _maybe_request_map();
1385 }
1386}
1387
f67539c2 1388void Objecter::enable_blocklist_events()
31f18b77
FG
1389{
1390 unique_lock wl(rwlock);
1391
f67539c2 1392 blocklist_events_enabled = true;
31f18b77
FG
1393}
1394
f67539c2 1395void Objecter::consume_blocklist_events(std::set<entity_addr_t> *events)
31f18b77
FG
1396{
1397 unique_lock wl(rwlock);
1398
1399 if (events->empty()) {
f67539c2 1400 events->swap(blocklist_events);
31f18b77 1401 } else {
f67539c2 1402 for (const auto &i : blocklist_events) {
31f18b77
FG
1403 events->insert(i);
1404 }
f67539c2 1405 blocklist_events.clear();
31f18b77
FG
1406 }
1407}
1408
f67539c2 1409void Objecter::emit_blocklist_events(const OSDMap::Incremental &inc)
31f18b77 1410{
f67539c2 1411 if (!blocklist_events_enabled) {
31f18b77
FG
1412 return;
1413 }
1414
f67539c2
TL
1415 for (const auto &i : inc.new_blocklist) {
1416 blocklist_events.insert(i.first);
31f18b77
FG
1417 }
1418}
1419
f67539c2 1420void Objecter::emit_blocklist_events(const OSDMap &old_osd_map,
31f18b77
FG
1421 const OSDMap &new_osd_map)
1422{
f67539c2 1423 if (!blocklist_events_enabled) {
31f18b77
FG
1424 return;
1425 }
1426
1427 std::set<entity_addr_t> old_set;
1428 std::set<entity_addr_t> new_set;
1429
f67539c2
TL
1430 old_osd_map.get_blocklist(&old_set);
1431 new_osd_map.get_blocklist(&new_set);
31f18b77
FG
1432
1433 std::set<entity_addr_t> delta_set;
1434 std::set_difference(
1435 new_set.begin(), new_set.end(), old_set.begin(), old_set.end(),
1436 std::inserter(delta_set, delta_set.begin()));
f67539c2 1437 blocklist_events.insert(delta_set.begin(), delta_set.end());
31f18b77
FG
1438}
1439
7c673cae
FG
1440// op pool check
1441
f67539c2
TL
1442void Objecter::CB_Op_Map_Latest::operator()(bs::error_code e,
1443 version_t latest, version_t)
7c673cae 1444{
f67539c2
TL
1445 if (e == bs::errc::resource_unavailable_try_again ||
1446 e == bs::errc::operation_canceled)
7c673cae
FG
1447 return;
1448
1449 lgeneric_subdout(objecter->cct, objecter, 10)
f67539c2 1450 << "op_map_latest r=" << e << " tid=" << tid
7c673cae
FG
1451 << " latest " << latest << dendl;
1452
f67539c2 1453 unique_lock wl(objecter->rwlock);
7c673cae 1454
f67539c2 1455 auto iter = objecter->check_latest_map_ops.find(tid);
7c673cae
FG
1456 if (iter == objecter->check_latest_map_ops.end()) {
1457 lgeneric_subdout(objecter->cct, objecter, 10)
1458 << "op_map_latest op "<< tid << " not found" << dendl;
1459 return;
1460 }
1461
1462 Op *op = iter->second;
1463 objecter->check_latest_map_ops.erase(iter);
1464
1465 lgeneric_subdout(objecter->cct, objecter, 20)
1466 << "op_map_latest op "<< op << dendl;
1467
1468 if (op->map_dne_bound == 0)
1469 op->map_dne_bound = latest;
1470
f67539c2 1471 unique_lock sl(op->session->lock, defer_lock);
7c673cae
FG
1472 objecter->_check_op_pool_dne(op, &sl);
1473
1474 op->put();
1475}
1476
1477int Objecter::pool_snap_by_name(int64_t poolid, const char *snap_name,
1478 snapid_t *snap) const
1479{
1480 shared_lock rl(rwlock);
1481
1482 auto& pools = osdmap->get_pools();
1483 auto iter = pools.find(poolid);
1484 if (iter == pools.end()) {
1485 return -ENOENT;
1486 }
1487 const pg_pool_t& pg_pool = iter->second;
1488 for (auto p = pg_pool.snaps.begin();
1489 p != pg_pool.snaps.end();
1490 ++p) {
1491 if (p->second.name == snap_name) {
1492 *snap = p->first;
1493 return 0;
1494 }
1495 }
1496 return -ENOENT;
1497}
1498
1499int Objecter::pool_snap_get_info(int64_t poolid, snapid_t snap,
1500 pool_snap_info_t *info) const
1501{
1502 shared_lock rl(rwlock);
1503
1504 auto& pools = osdmap->get_pools();
1505 auto iter = pools.find(poolid);
1506 if (iter == pools.end()) {
1507 return -ENOENT;
1508 }
1509 const pg_pool_t& pg_pool = iter->second;
1510 auto p = pg_pool.snaps.find(snap);
1511 if (p == pg_pool.snaps.end())
1512 return -ENOENT;
1513 *info = p->second;
1514
1515 return 0;
1516}
1517
1518int Objecter::pool_snap_list(int64_t poolid, vector<uint64_t> *snaps)
1519{
1520 shared_lock rl(rwlock);
1521
1522 const pg_pool_t *pi = osdmap->get_pg_pool(poolid);
1523 if (!pi)
1524 return -ENOENT;
f67539c2 1525 for (auto p = pi->snaps.begin();
7c673cae
FG
1526 p != pi->snaps.end();
1527 ++p) {
1528 snaps->push_back(p->first);
1529 }
1530 return 0;
1531}
1532
1533// sl may be unlocked.
f67539c2 1534void Objecter::_check_op_pool_dne(Op *op, std::unique_lock<std::shared_mutex> *sl)
7c673cae
FG
1535{
1536 // rwlock is locked unique
1537
1538 if (op->target.pool_ever_existed) {
1539 // the pool previously existed and now it does not, which means it
1540 // was deleted.
1541 op->map_dne_bound = osdmap->get_epoch();
1542 ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
1543 << " pool previously exists but now does not"
1544 << dendl;
1545 } else {
1546 ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
1547 << " current " << osdmap->get_epoch()
1548 << " map_dne_bound " << op->map_dne_bound
1549 << dendl;
1550 }
1551 if (op->map_dne_bound > 0) {
1552 if (osdmap->get_epoch() >= op->map_dne_bound) {
1553 // we had a new enough map
1554 ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
1555 << " concluding pool " << op->target.base_pgid.pool()
1556 << " dne" << dendl;
f67539c2 1557 if (op->has_completion()) {
11fdf7f2 1558 num_in_flight--;
f67539c2 1559 op->complete(osdc_errc::pool_dne, -ENOENT);
7c673cae
FG
1560 }
1561
1562 OSDSession *s = op->session;
1563 if (s) {
11fdf7f2
TL
1564 ceph_assert(s != NULL);
1565 ceph_assert(sl->mutex() == &s->lock);
7c673cae
FG
1566 bool session_locked = sl->owns_lock();
1567 if (!session_locked) {
1568 sl->lock();
1569 }
1570 _finish_op(op, 0);
1571 if (!session_locked) {
1572 sl->unlock();
1573 }
1574 } else {
1575 _finish_op(op, 0); // no session
1576 }
1577 }
1578 } else {
1579 _send_op_map_check(op);
1580 }
1581}
1582
20effc67
TL
1583// sl may be unlocked.
1584void Objecter::_check_op_pool_eio(Op *op, std::unique_lock<std::shared_mutex> *sl)
1585{
1586 // rwlock is locked unique
1587
1588 // we had a new enough map
1589 ldout(cct, 10) << "check_op_pool_eio tid " << op->tid
1590 << " concluding pool " << op->target.base_pgid.pool()
1591 << " has eio" << dendl;
1592 if (op->has_completion()) {
1593 num_in_flight--;
1594 op->complete(osdc_errc::pool_eio, -EIO);
1595 }
1596
1597 OSDSession *s = op->session;
1598 if (s) {
1599 ceph_assert(s != NULL);
1600 ceph_assert(sl->mutex() == &s->lock);
1601 bool session_locked = sl->owns_lock();
1602 if (!session_locked) {
1603 sl->lock();
1604 }
1605 _finish_op(op, 0);
1606 if (!session_locked) {
1607 sl->unlock();
1608 }
1609 } else {
1610 _finish_op(op, 0); // no session
1611 }
1612}
1613
7c673cae
FG
1614void Objecter::_send_op_map_check(Op *op)
1615{
1616 // rwlock is locked unique
1617 // ask the monitor
1618 if (check_latest_map_ops.count(op->tid) == 0) {
1619 op->get();
1620 check_latest_map_ops[op->tid] = op;
f67539c2 1621 monc->get_version("osdmap", CB_Op_Map_Latest(this, op->tid));
7c673cae
FG
1622 }
1623}
1624
1625void Objecter::_op_cancel_map_check(Op *op)
1626{
1627 // rwlock is locked unique
f67539c2 1628 auto iter = check_latest_map_ops.find(op->tid);
7c673cae
FG
1629 if (iter != check_latest_map_ops.end()) {
1630 Op *op = iter->second;
1631 op->put();
1632 check_latest_map_ops.erase(iter);
1633 }
1634}
1635
1636// linger pool check
1637
f67539c2
TL
1638void Objecter::CB_Linger_Map_Latest::operator()(bs::error_code e,
1639 version_t latest,
1640 version_t)
7c673cae 1641{
f67539c2
TL
1642 if (e == bs::errc::resource_unavailable_try_again ||
1643 e == bs::errc::operation_canceled) {
7c673cae
FG
1644 // ignore callback; we will retry in resend_mon_ops()
1645 return;
1646 }
1647
1648 unique_lock wl(objecter->rwlock);
1649
f67539c2 1650 auto iter = objecter->check_latest_map_lingers.find(linger_id);
7c673cae
FG
1651 if (iter == objecter->check_latest_map_lingers.end()) {
1652 return;
1653 }
1654
f67539c2 1655 auto op = iter->second;
7c673cae
FG
1656 objecter->check_latest_map_lingers.erase(iter);
1657
1658 if (op->map_dne_bound == 0)
1659 op->map_dne_bound = latest;
1660
1661 bool unregister;
1662 objecter->_check_linger_pool_dne(op, &unregister);
1663
1664 if (unregister) {
1665 objecter->_linger_cancel(op);
1666 }
1667
1668 op->put();
1669}
1670
1671void Objecter::_check_linger_pool_dne(LingerOp *op, bool *need_unregister)
1672{
1673 // rwlock is locked unique
1674
1675 *need_unregister = false;
1676
1677 if (op->register_gen > 0) {
1678 ldout(cct, 10) << "_check_linger_pool_dne linger_id " << op->linger_id
1679 << " pool previously existed but now does not"
1680 << dendl;
1681 op->map_dne_bound = osdmap->get_epoch();
1682 } else {
1683 ldout(cct, 10) << "_check_linger_pool_dne linger_id " << op->linger_id
1684 << " current " << osdmap->get_epoch()
1685 << " map_dne_bound " << op->map_dne_bound
1686 << dendl;
1687 }
1688 if (op->map_dne_bound > 0) {
1689 if (osdmap->get_epoch() >= op->map_dne_bound) {
f67539c2 1690 std::unique_lock wl{op->watch_lock};
7c673cae 1691 if (op->on_reg_commit) {
f67539c2
TL
1692 op->on_reg_commit->defer(std::move(op->on_reg_commit),
1693 osdc_errc::pool_dne, cb::list{});
28e407b8
AA
1694 op->on_reg_commit = nullptr;
1695 }
1696 if (op->on_notify_finish) {
f67539c2
TL
1697 op->on_notify_finish->defer(std::move(op->on_notify_finish),
1698 osdc_errc::pool_dne, cb::list{});
28e407b8 1699 op->on_notify_finish = nullptr;
7c673cae
FG
1700 }
1701 *need_unregister = true;
1702 }
1703 } else {
1704 _send_linger_map_check(op);
1705 }
1706}
1707
20effc67
TL
1708void Objecter::_check_linger_pool_eio(LingerOp *op)
1709{
1710 // rwlock is locked unique
1711
1712 std::unique_lock wl{op->watch_lock};
1713 if (op->on_reg_commit) {
1714 op->on_reg_commit->defer(std::move(op->on_reg_commit),
1715 osdc_errc::pool_dne, cb::list{});
1716 op->on_reg_commit = nullptr;
1717 }
1718 if (op->on_notify_finish) {
1719 op->on_notify_finish->defer(std::move(op->on_notify_finish),
1720 osdc_errc::pool_dne, cb::list{});
1721 op->on_notify_finish = nullptr;
1722 }
1723}
1724
7c673cae
FG
1725void Objecter::_send_linger_map_check(LingerOp *op)
1726{
1727 // ask the monitor
1728 if (check_latest_map_lingers.count(op->linger_id) == 0) {
1729 op->get();
1730 check_latest_map_lingers[op->linger_id] = op;
f67539c2 1731 monc->get_version("osdmap", CB_Linger_Map_Latest(this, op->linger_id));
7c673cae
FG
1732 }
1733}
1734
1735void Objecter::_linger_cancel_map_check(LingerOp *op)
1736{
1737 // rwlock is locked unique
1738
f67539c2 1739 auto iter = check_latest_map_lingers.find(op->linger_id);
7c673cae
FG
1740 if (iter != check_latest_map_lingers.end()) {
1741 LingerOp *op = iter->second;
1742 op->put();
1743 check_latest_map_lingers.erase(iter);
1744 }
1745}
1746
1747// command pool check
1748
f67539c2
TL
1749void Objecter::CB_Command_Map_Latest::operator()(bs::error_code e,
1750 version_t latest, version_t)
7c673cae 1751{
f67539c2
TL
1752 if (e == bs::errc::resource_unavailable_try_again ||
1753 e == bs::errc::operation_canceled) {
7c673cae
FG
1754 // ignore callback; we will retry in resend_mon_ops()
1755 return;
1756 }
1757
1758 unique_lock wl(objecter->rwlock);
1759
f67539c2 1760 auto iter = objecter->check_latest_map_commands.find(tid);
7c673cae
FG
1761 if (iter == objecter->check_latest_map_commands.end()) {
1762 return;
1763 }
1764
f67539c2 1765 auto c = iter->second;
7c673cae
FG
1766 objecter->check_latest_map_commands.erase(iter);
1767
1768 if (c->map_dne_bound == 0)
1769 c->map_dne_bound = latest;
1770
f67539c2 1771 unique_lock sul(c->session->lock);
7c673cae 1772 objecter->_check_command_map_dne(c);
28e407b8 1773 sul.unlock();
7c673cae
FG
1774
1775 c->put();
1776}
1777
1778void Objecter::_check_command_map_dne(CommandOp *c)
1779{
1780 // rwlock is locked unique
28e407b8 1781 // session is locked unique
7c673cae
FG
1782
1783 ldout(cct, 10) << "_check_command_map_dne tid " << c->tid
1784 << " current " << osdmap->get_epoch()
1785 << " map_dne_bound " << c->map_dne_bound
1786 << dendl;
1787 if (c->map_dne_bound > 0) {
1788 if (osdmap->get_epoch() >= c->map_dne_bound) {
f67539c2
TL
1789 _finish_command(c, osdcode(c->map_check_error),
1790 std::move(c->map_check_error_str), {});
7c673cae
FG
1791 }
1792 } else {
1793 _send_command_map_check(c);
1794 }
1795}
1796
1797void Objecter::_send_command_map_check(CommandOp *c)
1798{
1799 // rwlock is locked unique
28e407b8 1800 // session is locked unique
7c673cae
FG
1801
1802 // ask the monitor
1803 if (check_latest_map_commands.count(c->tid) == 0) {
1804 c->get();
1805 check_latest_map_commands[c->tid] = c;
f67539c2 1806 monc->get_version("osdmap", CB_Command_Map_Latest(this, c->tid));
7c673cae
FG
1807 }
1808}
1809
1810void Objecter::_command_cancel_map_check(CommandOp *c)
1811{
1812 // rwlock is locked uniqe
1813
f67539c2 1814 auto iter = check_latest_map_commands.find(c->tid);
7c673cae 1815 if (iter != check_latest_map_commands.end()) {
f67539c2 1816 auto c = iter->second;
7c673cae
FG
1817 c->put();
1818 check_latest_map_commands.erase(iter);
1819 }
1820}
1821
1822
1823/**
1824 * Look up OSDSession by OSD id.
1825 *
1826 * @returns 0 on success, or -EAGAIN if the lock context requires
1827 * promotion to write.
1828 */
f67539c2
TL
1829int Objecter::_get_session(int osd, OSDSession **session,
1830 shunique_lock<ceph::shared_mutex>& sul)
7c673cae 1831{
11fdf7f2 1832 ceph_assert(sul && sul.mutex() == &rwlock);
7c673cae
FG
1833
1834 if (osd < 0) {
1835 *session = homeless_session;
1836 ldout(cct, 20) << __func__ << " osd=" << osd << " returning homeless"
1837 << dendl;
1838 return 0;
1839 }
1840
f67539c2 1841 auto p = osd_sessions.find(osd);
7c673cae 1842 if (p != osd_sessions.end()) {
f67539c2 1843 auto s = p->second;
7c673cae
FG
1844 s->get();
1845 *session = s;
1846 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << osd << " "
1847 << s->get_nref() << dendl;
1848 return 0;
1849 }
1850 if (!sul.owns_lock()) {
1851 return -EAGAIN;
1852 }
f67539c2 1853 auto s = new OSDSession(cct, osd);
7c673cae 1854 osd_sessions[osd] = s;
11fdf7f2
TL
1855 s->con = messenger->connect_to_osd(osdmap->get_addrs(osd));
1856 s->con->set_priv(RefCountedPtr{s});
7c673cae
FG
1857 logger->inc(l_osdc_osd_session_open);
1858 logger->set(l_osdc_osd_sessions, osd_sessions.size());
1859 s->get();
1860 *session = s;
1861 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << osd << " "
1862 << s->get_nref() << dendl;
1863 return 0;
1864}
1865
1866void Objecter::put_session(Objecter::OSDSession *s)
1867{
1868 if (s && !s->is_homeless()) {
1869 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << s->osd << " "
1870 << s->get_nref() << dendl;
1871 s->put();
1872 }
1873}
1874
1875void Objecter::get_session(Objecter::OSDSession *s)
1876{
11fdf7f2 1877 ceph_assert(s != NULL);
7c673cae
FG
1878
1879 if (!s->is_homeless()) {
1880 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << s->osd << " "
1881 << s->get_nref() << dendl;
1882 s->get();
1883 }
1884}
1885
1886void Objecter::_reopen_session(OSDSession *s)
1887{
91327a77 1888 // rwlock is locked unique
7c673cae
FG
1889 // s->lock is locked
1890
11fdf7f2 1891 auto addrs = osdmap->get_addrs(s->osd);
7c673cae 1892 ldout(cct, 10) << "reopen_session osd." << s->osd << " session, addr now "
11fdf7f2 1893 << addrs << dendl;
7c673cae
FG
1894 if (s->con) {
1895 s->con->set_priv(NULL);
1896 s->con->mark_down();
1897 logger->inc(l_osdc_osd_session_close);
1898 }
11fdf7f2
TL
1899 s->con = messenger->connect_to_osd(addrs);
1900 s->con->set_priv(RefCountedPtr{s});
7c673cae
FG
1901 s->incarnation++;
1902 logger->inc(l_osdc_osd_session_open);
1903}
1904
1905void Objecter::close_session(OSDSession *s)
1906{
1907 // rwlock is locked unique
1908
1909 ldout(cct, 10) << "close_session for osd." << s->osd << dendl;
1910 if (s->con) {
1911 s->con->set_priv(NULL);
1912 s->con->mark_down();
1913 logger->inc(l_osdc_osd_session_close);
1914 }
f67539c2 1915 unique_lock sl(s->lock);
7c673cae
FG
1916
1917 std::list<LingerOp*> homeless_lingers;
1918 std::list<CommandOp*> homeless_commands;
1919 std::list<Op*> homeless_ops;
1920
1921 while (!s->linger_ops.empty()) {
f67539c2 1922 auto i = s->linger_ops.begin();
7c673cae
FG
1923 ldout(cct, 10) << " linger_op " << i->first << dendl;
1924 homeless_lingers.push_back(i->second);
1925 _session_linger_op_remove(s, i->second);
1926 }
1927
1928 while (!s->ops.empty()) {
f67539c2 1929 auto i = s->ops.begin();
7c673cae
FG
1930 ldout(cct, 10) << " op " << i->first << dendl;
1931 homeless_ops.push_back(i->second);
1932 _session_op_remove(s, i->second);
1933 }
1934
1935 while (!s->command_ops.empty()) {
f67539c2 1936 auto i = s->command_ops.begin();
7c673cae
FG
1937 ldout(cct, 10) << " command_op " << i->first << dendl;
1938 homeless_commands.push_back(i->second);
1939 _session_command_op_remove(s, i->second);
1940 }
1941
1942 osd_sessions.erase(s->osd);
1943 sl.unlock();
1944 put_session(s);
1945
1946 // Assign any leftover ops to the homeless session
1947 {
f67539c2
TL
1948 unique_lock hsl(homeless_session->lock);
1949 for (auto i = homeless_lingers.begin();
7c673cae
FG
1950 i != homeless_lingers.end(); ++i) {
1951 _session_linger_op_assign(homeless_session, *i);
1952 }
f67539c2 1953 for (auto i = homeless_ops.begin();
7c673cae
FG
1954 i != homeless_ops.end(); ++i) {
1955 _session_op_assign(homeless_session, *i);
1956 }
f67539c2 1957 for (auto i = homeless_commands.begin();
7c673cae
FG
1958 i != homeless_commands.end(); ++i) {
1959 _session_command_op_assign(homeless_session, *i);
1960 }
1961 }
1962
1963 logger->set(l_osdc_osd_sessions, osd_sessions.size());
1964}
1965
9f95a23c 1966void Objecter::wait_for_osd_map(epoch_t e)
7c673cae
FG
1967{
1968 unique_lock l(rwlock);
9f95a23c 1969 if (osdmap->get_epoch() >= e) {
7c673cae
FG
1970 l.unlock();
1971 return;
1972 }
1973
f67539c2
TL
1974 ca::waiter<bs::error_code> w;
1975 waiting_for_map[e].emplace_back(OpCompletion::create(
1976 service.get_executor(),
1977 w.ref()),
1978 bs::error_code{});
7c673cae 1979 l.unlock();
f67539c2 1980 w.wait();
7c673cae
FG
1981}
1982
f67539c2
TL
1983void Objecter::_get_latest_version(epoch_t oldest, epoch_t newest,
1984 std::unique_ptr<OpCompletion> fin,
1985 std::unique_lock<ceph::shared_mutex>&& l)
7c673cae 1986{
f67539c2 1987 ceph_assert(fin);
7c673cae 1988 if (osdmap->get_epoch() >= newest) {
11fdf7f2 1989 ldout(cct, 10) << __func__ << " latest " << newest << ", have it" << dendl;
f67539c2
TL
1990 l.unlock();
1991 ca::defer(std::move(fin), bs::error_code{});
1992 } else {
1993 ldout(cct, 10) << __func__ << " latest " << newest << ", waiting" << dendl;
1994 _wait_for_new_map(std::move(fin), newest, bs::error_code{});
1995 l.unlock();
7c673cae 1996 }
7c673cae
FG
1997}
1998
1999void Objecter::maybe_request_map()
2000{
2001 shared_lock rl(rwlock);
2002 _maybe_request_map();
2003}
2004
2005void Objecter::_maybe_request_map()
2006{
2007 // rwlock is locked
2008 int flag = 0;
2009 if (_osdmap_full_flag()
2010 || osdmap->test_flag(CEPH_OSDMAP_PAUSERD)
2011 || osdmap->test_flag(CEPH_OSDMAP_PAUSEWR)) {
2012 ldout(cct, 10) << "_maybe_request_map subscribing (continuous) to next "
2013 "osd map (FULL flag is set)" << dendl;
2014 } else {
2015 ldout(cct, 10)
2016 << "_maybe_request_map subscribing (onetime) to next osd map" << dendl;
2017 flag = CEPH_SUBSCRIBE_ONETIME;
2018 }
2019 epoch_t epoch = osdmap->get_epoch() ? osdmap->get_epoch()+1 : 0;
2020 if (monc->sub_want("osdmap", epoch, flag)) {
2021 monc->renew_subs();
2022 }
2023}
2024
f67539c2
TL
2025void Objecter::_wait_for_new_map(std::unique_ptr<OpCompletion> c, epoch_t epoch,
2026 bs::error_code ec)
7c673cae
FG
2027{
2028 // rwlock is locked unique
f67539c2 2029 waiting_for_map[epoch].emplace_back(std::move(c), ec);
7c673cae
FG
2030 _maybe_request_map();
2031}
2032
2033
2034/**
2035 * Use this together with wait_for_map: this is a pre-check to avoid
2036 * allocating a Context for wait_for_map if we can see that we
2037 * definitely already have the epoch.
2038 *
2039 * This does *not* replace the need to handle the return value of
2040 * wait_for_map: just because we don't have it in this pre-check
2041 * doesn't mean we won't have it when calling back into wait_for_map,
2042 * since the objecter lock is dropped in between.
2043 */
2044bool Objecter::have_map(const epoch_t epoch)
2045{
2046 shared_lock rl(rwlock);
2047 if (osdmap->get_epoch() >= epoch) {
2048 return true;
2049 } else {
2050 return false;
2051 }
2052}
2053
7c673cae
FG
2054void Objecter::_kick_requests(OSDSession *session,
2055 map<uint64_t, LingerOp *>& lresend)
2056{
2057 // rwlock is locked unique
2058
2059 // clear backoffs
2060 session->backoffs.clear();
2061 session->backoffs_by_id.clear();
2062
2063 // resend ops
2064 map<ceph_tid_t,Op*> resend; // resend in tid order
f67539c2 2065 for (auto p = session->ops.begin(); p != session->ops.end();) {
7c673cae
FG
2066 Op *op = p->second;
2067 ++p;
7c673cae
FG
2068 if (op->should_resend) {
2069 if (!op->target.paused)
2070 resend[op->tid] = op;
2071 } else {
2072 _op_cancel_map_check(op);
2073 _cancel_linger_op(op);
2074 }
2075 }
2076
11fdf7f2 2077 logger->inc(l_osdc_op_resend, resend.size());
7c673cae
FG
2078 while (!resend.empty()) {
2079 _send_op(resend.begin()->second);
2080 resend.erase(resend.begin());
2081 }
2082
2083 // resend lingers
11fdf7f2 2084 logger->inc(l_osdc_linger_resend, session->linger_ops.size());
f67539c2 2085 for (auto j = session->linger_ops.begin();
7c673cae
FG
2086 j != session->linger_ops.end(); ++j) {
2087 LingerOp *op = j->second;
2088 op->get();
11fdf7f2 2089 ceph_assert(lresend.count(j->first) == 0);
7c673cae
FG
2090 lresend[j->first] = op;
2091 }
2092
2093 // resend commands
11fdf7f2 2094 logger->inc(l_osdc_command_resend, session->command_ops.size());
7c673cae 2095 map<uint64_t,CommandOp*> cresend; // resend in order
f67539c2 2096 for (auto k = session->command_ops.begin();
7c673cae 2097 k != session->command_ops.end(); ++k) {
7c673cae
FG
2098 cresend[k->first] = k->second;
2099 }
2100 while (!cresend.empty()) {
2101 _send_command(cresend.begin()->second);
2102 cresend.erase(cresend.begin());
2103 }
2104}
2105
2106void Objecter::_linger_ops_resend(map<uint64_t, LingerOp *>& lresend,
f67539c2 2107 unique_lock<ceph::shared_mutex>& ul)
7c673cae 2108{
11fdf7f2 2109 ceph_assert(ul.owns_lock());
7c673cae
FG
2110 shunique_lock sul(std::move(ul));
2111 while (!lresend.empty()) {
2112 LingerOp *op = lresend.begin()->second;
2113 if (!op->canceled) {
2114 _send_linger(op, sul);
2115 }
2116 op->put();
2117 lresend.erase(lresend.begin());
2118 }
11fdf7f2 2119 ul = sul.release_to_unique();
7c673cae
FG
2120}
2121
2122void Objecter::start_tick()
2123{
11fdf7f2 2124 ceph_assert(tick_event == 0);
7c673cae
FG
2125 tick_event =
2126 timer.add_event(ceph::make_timespan(cct->_conf->objecter_tick_interval),
2127 &Objecter::tick, this);
2128}
2129
2130void Objecter::tick()
2131{
2132 shared_lock rl(rwlock);
2133
2134 ldout(cct, 10) << "tick" << dendl;
2135
2136 // we are only called by C_Tick
2137 tick_event = 0;
2138
31f18b77 2139 if (!initialized) {
7c673cae
FG
2140 // we raced with shutdown
2141 ldout(cct, 10) << __func__ << " raced with shutdown" << dendl;
2142 return;
2143 }
2144
2145 set<OSDSession*> toping;
2146
2147
2148 // look for laggy requests
11fdf7f2 2149 auto cutoff = ceph::coarse_mono_clock::now();
7c673cae
FG
2150 cutoff -= ceph::make_timespan(cct->_conf->objecter_timeout); // timeout
2151
2152 unsigned laggy_ops = 0;
2153
f67539c2 2154 for (auto siter = osd_sessions.begin();
7c673cae 2155 siter != osd_sessions.end(); ++siter) {
f67539c2
TL
2156 auto s = siter->second;
2157 scoped_lock l(s->lock);
7c673cae 2158 bool found = false;
f67539c2
TL
2159 for (auto p = s->ops.begin(); p != s->ops.end(); ++p) {
2160 auto op = p->second;
11fdf7f2 2161 ceph_assert(op->session);
7c673cae
FG
2162 if (op->stamp < cutoff) {
2163 ldout(cct, 2) << " tid " << p->first << " on osd." << op->session->osd
2164 << " is laggy" << dendl;
2165 found = true;
2166 ++laggy_ops;
2167 }
2168 }
f67539c2 2169 for (auto p = s->linger_ops.begin();
7c673cae
FG
2170 p != s->linger_ops.end();
2171 ++p) {
f67539c2
TL
2172 auto op = p->second;
2173 std::unique_lock wl(op->watch_lock);
11fdf7f2 2174 ceph_assert(op->session);
7c673cae
FG
2175 ldout(cct, 10) << " pinging osd that serves lingering tid " << p->first
2176 << " (osd." << op->session->osd << ")" << dendl;
2177 found = true;
2178 if (op->is_watch && op->registered && !op->last_error)
2179 _send_linger_ping(op);
2180 }
f67539c2 2181 for (auto p = s->command_ops.begin();
7c673cae
FG
2182 p != s->command_ops.end();
2183 ++p) {
f67539c2 2184 auto op = p->second;
11fdf7f2 2185 ceph_assert(op->session);
7c673cae
FG
2186 ldout(cct, 10) << " pinging osd that serves command tid " << p->first
2187 << " (osd." << op->session->osd << ")" << dendl;
2188 found = true;
2189 }
2190 if (found)
2191 toping.insert(s);
2192 }
31f18b77 2193 if (num_homeless_ops || !toping.empty()) {
7c673cae
FG
2194 _maybe_request_map();
2195 }
2196
2197 logger->set(l_osdc_op_laggy, laggy_ops);
2198 logger->set(l_osdc_osd_laggy, toping.size());
2199
2200 if (!toping.empty()) {
2201 // send a ping to these osds, to ensure we detect any session resets
2202 // (osd reply message policy is lossy)
f67539c2 2203 for (auto i = toping.begin(); i != toping.end(); ++i) {
7c673cae
FG
2204 (*i)->con->send_message(new MPing);
2205 }
2206 }
2207
11fdf7f2 2208 // Make sure we don't reschedule if we wake up after shutdown
31f18b77 2209 if (initialized) {
7c673cae
FG
2210 tick_event = timer.reschedule_me(ceph::make_timespan(
2211 cct->_conf->objecter_tick_interval));
2212 }
2213}
2214
2215void Objecter::resend_mon_ops()
2216{
2217 unique_lock wl(rwlock);
2218
2219 ldout(cct, 10) << "resend_mon_ops" << dendl;
2220
f67539c2 2221 for (auto p = poolstat_ops.begin(); p != poolstat_ops.end(); ++p) {
7c673cae
FG
2222 _poolstat_submit(p->second);
2223 logger->inc(l_osdc_poolstat_resend);
2224 }
2225
f67539c2 2226 for (auto p = statfs_ops.begin(); p != statfs_ops.end(); ++p) {
7c673cae
FG
2227 _fs_stats_submit(p->second);
2228 logger->inc(l_osdc_statfs_resend);
2229 }
2230
f67539c2 2231 for (auto p = pool_ops.begin(); p != pool_ops.end(); ++p) {
7c673cae
FG
2232 _pool_op_submit(p->second);
2233 logger->inc(l_osdc_poolop_resend);
2234 }
2235
f67539c2 2236 for (auto p = check_latest_map_ops.begin();
7c673cae
FG
2237 p != check_latest_map_ops.end();
2238 ++p) {
f67539c2 2239 monc->get_version("osdmap", CB_Op_Map_Latest(this, p->second->tid));
7c673cae
FG
2240 }
2241
f67539c2 2242 for (auto p = check_latest_map_lingers.begin();
7c673cae
FG
2243 p != check_latest_map_lingers.end();
2244 ++p) {
f67539c2 2245 monc->get_version("osdmap", CB_Linger_Map_Latest(this, p->second->linger_id));
7c673cae
FG
2246 }
2247
f67539c2 2248 for (auto p = check_latest_map_commands.begin();
7c673cae
FG
2249 p != check_latest_map_commands.end();
2250 ++p) {
f67539c2 2251 monc->get_version("osdmap", CB_Command_Map_Latest(this, p->second->tid));
7c673cae
FG
2252 }
2253}
2254
2255// read | write ---------------------------
2256
2257void Objecter::op_submit(Op *op, ceph_tid_t *ptid, int *ctx_budget)
2258{
2259 shunique_lock rl(rwlock, ceph::acquire_shared);
2260 ceph_tid_t tid = 0;
2261 if (!ptid)
2262 ptid = &tid;
31f18b77 2263 op->trace.event("op submit");
7c673cae
FG
2264 _op_submit_with_budget(op, rl, ptid, ctx_budget);
2265}
2266
f67539c2
TL
2267void Objecter::_op_submit_with_budget(Op *op,
2268 shunique_lock<ceph::shared_mutex>& sul,
7c673cae
FG
2269 ceph_tid_t *ptid,
2270 int *ctx_budget)
2271{
11fdf7f2 2272 ceph_assert(initialized);
7c673cae 2273
11fdf7f2
TL
2274 ceph_assert(op->ops.size() == op->out_bl.size());
2275 ceph_assert(op->ops.size() == op->out_rval.size());
2276 ceph_assert(op->ops.size() == op->out_handler.size());
7c673cae
FG
2277
2278 // throttle. before we look at any state, because
2279 // _take_op_budget() may drop our lock while it blocks.
2280 if (!op->ctx_budgeted || (ctx_budget && (*ctx_budget == -1))) {
2281 int op_budget = _take_op_budget(op, sul);
2282 // take and pass out the budget for the first OP
2283 // in the context session
2284 if (ctx_budget && (*ctx_budget == -1)) {
2285 *ctx_budget = op_budget;
2286 }
2287 }
2288
2289 if (osd_timeout > timespan(0)) {
2290 if (op->tid == 0)
31f18b77 2291 op->tid = ++last_tid;
7c673cae
FG
2292 auto tid = op->tid;
2293 op->ontimeout = timer.add_event(osd_timeout,
2294 [this, tid]() {
2295 op_cancel(tid, -ETIMEDOUT); });
2296 }
2297
2298 _op_submit(op, sul, ptid);
2299}
2300
2301void Objecter::_send_op_account(Op *op)
2302{
31f18b77 2303 inflight_ops++;
7c673cae
FG
2304
2305 // add to gather set(s)
f67539c2 2306 if (op->has_completion()) {
31f18b77 2307 num_in_flight++;
7c673cae
FG
2308 } else {
2309 ldout(cct, 20) << " note: not requesting reply" << dendl;
2310 }
2311
2312 logger->inc(l_osdc_op_active);
2313 logger->inc(l_osdc_op);
f67539c2 2314 logger->inc(l_osdc_oplen_avg, op->ops.size());
7c673cae
FG
2315
2316 if ((op->target.flags & (CEPH_OSD_FLAG_READ | CEPH_OSD_FLAG_WRITE)) ==
2317 (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE))
2318 logger->inc(l_osdc_op_rmw);
2319 else if (op->target.flags & CEPH_OSD_FLAG_WRITE)
2320 logger->inc(l_osdc_op_w);
2321 else if (op->target.flags & CEPH_OSD_FLAG_READ)
2322 logger->inc(l_osdc_op_r);
2323
2324 if (op->target.flags & CEPH_OSD_FLAG_PGOP)
2325 logger->inc(l_osdc_op_pg);
2326
f67539c2 2327 for (auto p = op->ops.begin(); p != op->ops.end(); ++p) {
7c673cae
FG
2328 int code = l_osdc_osdop_other;
2329 switch (p->op.op) {
2330 case CEPH_OSD_OP_STAT: code = l_osdc_osdop_stat; break;
2331 case CEPH_OSD_OP_CREATE: code = l_osdc_osdop_create; break;
2332 case CEPH_OSD_OP_READ: code = l_osdc_osdop_read; break;
2333 case CEPH_OSD_OP_WRITE: code = l_osdc_osdop_write; break;
2334 case CEPH_OSD_OP_WRITEFULL: code = l_osdc_osdop_writefull; break;
2335 case CEPH_OSD_OP_WRITESAME: code = l_osdc_osdop_writesame; break;
2336 case CEPH_OSD_OP_APPEND: code = l_osdc_osdop_append; break;
2337 case CEPH_OSD_OP_ZERO: code = l_osdc_osdop_zero; break;
2338 case CEPH_OSD_OP_TRUNCATE: code = l_osdc_osdop_truncate; break;
2339 case CEPH_OSD_OP_DELETE: code = l_osdc_osdop_delete; break;
2340 case CEPH_OSD_OP_MAPEXT: code = l_osdc_osdop_mapext; break;
2341 case CEPH_OSD_OP_SPARSE_READ: code = l_osdc_osdop_sparse_read; break;
2342 case CEPH_OSD_OP_GETXATTR: code = l_osdc_osdop_getxattr; break;
2343 case CEPH_OSD_OP_SETXATTR: code = l_osdc_osdop_setxattr; break;
2344 case CEPH_OSD_OP_CMPXATTR: code = l_osdc_osdop_cmpxattr; break;
2345 case CEPH_OSD_OP_RMXATTR: code = l_osdc_osdop_rmxattr; break;
2346 case CEPH_OSD_OP_RESETXATTRS: code = l_osdc_osdop_resetxattrs; break;
7c673cae
FG
2347
2348 // OMAP read operations
2349 case CEPH_OSD_OP_OMAPGETVALS:
2350 case CEPH_OSD_OP_OMAPGETKEYS:
2351 case CEPH_OSD_OP_OMAPGETHEADER:
2352 case CEPH_OSD_OP_OMAPGETVALSBYKEYS:
2353 case CEPH_OSD_OP_OMAP_CMP: code = l_osdc_osdop_omap_rd; break;
2354
2355 // OMAP write operations
2356 case CEPH_OSD_OP_OMAPSETVALS:
2357 case CEPH_OSD_OP_OMAPSETHEADER: code = l_osdc_osdop_omap_wr; break;
2358
2359 // OMAP del operations
2360 case CEPH_OSD_OP_OMAPCLEAR:
2361 case CEPH_OSD_OP_OMAPRMKEYS: code = l_osdc_osdop_omap_del; break;
2362
2363 case CEPH_OSD_OP_CALL: code = l_osdc_osdop_call; break;
2364 case CEPH_OSD_OP_WATCH: code = l_osdc_osdop_watch; break;
2365 case CEPH_OSD_OP_NOTIFY: code = l_osdc_osdop_notify; break;
2366 }
2367 if (code)
2368 logger->inc(code);
2369 }
2370}
2371
f67539c2 2372void Objecter::_op_submit(Op *op, shunique_lock<ceph::shared_mutex>& sul, ceph_tid_t *ptid)
7c673cae
FG
2373{
2374 // rwlock is locked
2375
2376 ldout(cct, 10) << __func__ << " op " << op << dendl;
2377
2378 // pick target
11fdf7f2 2379 ceph_assert(op->session == NULL);
7c673cae
FG
2380 OSDSession *s = NULL;
2381
20effc67
TL
2382 bool check_for_latest_map = false;
2383 int r = _calc_target(&op->target, nullptr);
2384 switch(r) {
2385 case RECALC_OP_TARGET_POOL_DNE:
2386 check_for_latest_map = true;
2387 break;
2388 case RECALC_OP_TARGET_POOL_EIO:
2389 if (op->has_completion()) {
2390 op->complete(osdc_errc::pool_eio, -EIO);
2391 }
2392 return;
2393 }
7c673cae
FG
2394
2395 // Try to get a session, including a retry if we need to take write lock
20effc67 2396 r = _get_session(op->target.osd, &s, sul);
7c673cae 2397 if (r == -EAGAIN ||
11fdf7f2
TL
2398 (check_for_latest_map && sul.owns_lock_shared()) ||
2399 cct->_conf->objecter_debug_inject_relock_delay) {
7c673cae
FG
2400 epoch_t orig_epoch = osdmap->get_epoch();
2401 sul.unlock();
2402 if (cct->_conf->objecter_debug_inject_relock_delay) {
2403 sleep(1);
2404 }
2405 sul.lock();
2406 if (orig_epoch != osdmap->get_epoch()) {
2407 // map changed; recalculate mapping
2408 ldout(cct, 10) << __func__ << " relock raced with osdmap, recalc target"
2409 << dendl;
2410 check_for_latest_map = _calc_target(&op->target, nullptr)
2411 == RECALC_OP_TARGET_POOL_DNE;
2412 if (s) {
2413 put_session(s);
2414 s = NULL;
2415 r = -EAGAIN;
2416 }
2417 }
2418 }
2419 if (r == -EAGAIN) {
11fdf7f2 2420 ceph_assert(s == NULL);
7c673cae
FG
2421 r = _get_session(op->target.osd, &s, sul);
2422 }
11fdf7f2
TL
2423 ceph_assert(r == 0);
2424 ceph_assert(s); // may be homeless
7c673cae
FG
2425
2426 _send_op_account(op);
2427
2428 // send?
2429
11fdf7f2 2430 ceph_assert(op->target.flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE));
7c673cae 2431
7c673cae 2432 bool need_send = false;
9f95a23c
TL
2433 if (op->target.paused) {
2434 ldout(cct, 10) << " tid " << op->tid << " op " << op << " is paused"
7c673cae 2435 << dendl;
7c673cae
FG
2436 _maybe_request_map();
2437 } else if (!s->is_homeless()) {
2438 need_send = true;
2439 } else {
2440 _maybe_request_map();
2441 }
2442
f67539c2 2443 unique_lock sl(s->lock);
7c673cae 2444 if (op->tid == 0)
31f18b77 2445 op->tid = ++last_tid;
7c673cae
FG
2446
2447 ldout(cct, 10) << "_op_submit oid " << op->target.base_oid
2448 << " '" << op->target.base_oloc << "' '"
2449 << op->target.target_oloc << "' " << op->ops << " tid "
2450 << op->tid << " osd." << (!s->is_homeless() ? s->osd : -1)
2451 << dendl;
2452
2453 _session_op_assign(s, op);
2454
2455 if (need_send) {
11fdf7f2 2456 _send_op(op);
7c673cae
FG
2457 }
2458
2459 // Last chance to touch Op here, after giving up session lock it can
2460 // be freed at any time by response handler.
2461 ceph_tid_t tid = op->tid;
2462 if (check_for_latest_map) {
2463 _send_op_map_check(op);
2464 }
2465 if (ptid)
2466 *ptid = tid;
2467 op = NULL;
2468
2469 sl.unlock();
2470 put_session(s);
2471
31f18b77 2472 ldout(cct, 5) << num_in_flight << " in flight" << dendl;
7c673cae
FG
2473}
2474
2475int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r)
2476{
11fdf7f2 2477 ceph_assert(initialized);
7c673cae 2478
f67539c2 2479 unique_lock sl(s->lock);
7c673cae 2480
f67539c2 2481 auto p = s->ops.find(tid);
7c673cae
FG
2482 if (p == s->ops.end()) {
2483 ldout(cct, 10) << __func__ << " tid " << tid << " dne in session "
2484 << s->osd << dendl;
2485 return -ENOENT;
2486 }
2487
11fdf7f2 2488#if 0
7c673cae 2489 if (s->con) {
9f95a23c 2490 ldout(cct, 20) << " revoking rx ceph::buffer for " << tid
7c673cae
FG
2491 << " on " << s->con << dendl;
2492 s->con->revoke_rx_buffer(tid);
2493 }
11fdf7f2 2494#endif
7c673cae
FG
2495
2496 ldout(cct, 10) << __func__ << " tid " << tid << " in session " << s->osd
2497 << dendl;
2498 Op *op = p->second;
f67539c2 2499 if (op->has_completion()) {
31f18b77 2500 num_in_flight--;
f67539c2 2501 op->complete(osdcode(r), r);
7c673cae
FG
2502 }
2503 _op_cancel_map_check(op);
2504 _finish_op(op, r);
2505 sl.unlock();
2506
2507 return 0;
2508}
2509
2510int Objecter::op_cancel(ceph_tid_t tid, int r)
2511{
2512 int ret = 0;
2513
2514 unique_lock wl(rwlock);
2515 ret = _op_cancel(tid, r);
2516
2517 return ret;
2518}
2519
94b18763
FG
2520int Objecter::op_cancel(const vector<ceph_tid_t>& tids, int r)
2521{
2522 unique_lock wl(rwlock);
2523 ldout(cct,10) << __func__ << " " << tids << dendl;
2524 for (auto tid : tids) {
2525 _op_cancel(tid, r);
2526 }
2527 return 0;
2528}
2529
7c673cae
FG
2530int Objecter::_op_cancel(ceph_tid_t tid, int r)
2531{
2532 int ret = 0;
2533
2534 ldout(cct, 5) << __func__ << ": cancelling tid " << tid << " r=" << r
2535 << dendl;
2536
2537start:
2538
f67539c2 2539 for (auto siter = osd_sessions.begin();
7c673cae
FG
2540 siter != osd_sessions.end(); ++siter) {
2541 OSDSession *s = siter->second;
f67539c2 2542 shared_lock sl(s->lock);
7c673cae
FG
2543 if (s->ops.find(tid) != s->ops.end()) {
2544 sl.unlock();
2545 ret = op_cancel(s, tid, r);
2546 if (ret == -ENOENT) {
2547 /* oh no! raced, maybe tid moved to another session, restarting */
2548 goto start;
2549 }
2550 return ret;
2551 }
2552 }
2553
2554 ldout(cct, 5) << __func__ << ": tid " << tid
2555 << " not found in live sessions" << dendl;
2556
2557 // Handle case where the op is in homeless session
f67539c2 2558 shared_lock sl(homeless_session->lock);
7c673cae
FG
2559 if (homeless_session->ops.find(tid) != homeless_session->ops.end()) {
2560 sl.unlock();
2561 ret = op_cancel(homeless_session, tid, r);
2562 if (ret == -ENOENT) {
2563 /* oh no! raced, maybe tid moved to another session, restarting */
2564 goto start;
2565 } else {
2566 return ret;
2567 }
2568 } else {
2569 sl.unlock();
2570 }
2571
2572 ldout(cct, 5) << __func__ << ": tid " << tid
2573 << " not found in homeless session" << dendl;
2574
2575 return ret;
2576}
2577
2578
2579epoch_t Objecter::op_cancel_writes(int r, int64_t pool)
2580{
2581 unique_lock wl(rwlock);
2582
2583 std::vector<ceph_tid_t> to_cancel;
2584 bool found = false;
2585
f67539c2 2586 for (auto siter = osd_sessions.begin();
7c673cae
FG
2587 siter != osd_sessions.end(); ++siter) {
2588 OSDSession *s = siter->second;
f67539c2
TL
2589 shared_lock sl(s->lock);
2590 for (auto op_i = s->ops.begin();
7c673cae
FG
2591 op_i != s->ops.end(); ++op_i) {
2592 if (op_i->second->target.flags & CEPH_OSD_FLAG_WRITE
2593 && (pool == -1 || op_i->second->target.target_oloc.pool == pool)) {
2594 to_cancel.push_back(op_i->first);
2595 }
2596 }
2597 sl.unlock();
2598
f67539c2 2599 for (auto titer = to_cancel.begin(); titer != to_cancel.end(); ++titer) {
7c673cae
FG
2600 int cancel_result = op_cancel(s, *titer, r);
2601 // We hold rwlock across search and cancellation, so cancels
2602 // should always succeed
11fdf7f2 2603 ceph_assert(cancel_result == 0);
7c673cae
FG
2604 }
2605 if (!found && to_cancel.size())
2606 found = true;
2607 to_cancel.clear();
2608 }
2609
2610 const epoch_t epoch = osdmap->get_epoch();
2611
2612 wl.unlock();
2613
2614 if (found) {
2615 return epoch;
2616 } else {
2617 return -1;
2618 }
2619}
2620
2621bool Objecter::is_pg_changed(
2622 int oldprimary,
2623 const vector<int>& oldacting,
2624 int newprimary,
2625 const vector<int>& newacting,
2626 bool any_change)
2627{
9f95a23c 2628 if (OSDMap::primary_changed_broken( // https://tracker.ceph.com/issues/43213
7c673cae
FG
2629 oldprimary,
2630 oldacting,
2631 newprimary,
2632 newacting))
2633 return true;
2634 if (any_change && oldacting != newacting)
2635 return true;
2636 return false; // same primary (tho replicas may have changed)
2637}
2638
2639bool Objecter::target_should_be_paused(op_target_t *t)
2640{
2641 const pg_pool_t *pi = osdmap->get_pg_pool(t->base_oloc.pool);
2642 bool pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
2643 bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) ||
9f95a23c 2644 (t->respects_full() && (_osdmap_full_flag() || _osdmap_pool_full(*pi)));
7c673cae
FG
2645
2646 return (t->flags & CEPH_OSD_FLAG_READ && pauserd) ||
2647 (t->flags & CEPH_OSD_FLAG_WRITE && pausewr) ||
2648 (osdmap->get_epoch() < epoch_barrier);
2649}
2650
2651/**
2652 * Locking public accessor for _osdmap_full_flag
2653 */
2654bool Objecter::osdmap_full_flag() const
2655{
2656 shared_lock rl(rwlock);
2657
2658 return _osdmap_full_flag();
2659}
2660
2661bool Objecter::osdmap_pool_full(const int64_t pool_id) const
2662{
2663 shared_lock rl(rwlock);
2664
2665 if (_osdmap_full_flag()) {
2666 return true;
2667 }
2668
2669 return _osdmap_pool_full(pool_id);
2670}
2671
2672bool Objecter::_osdmap_pool_full(const int64_t pool_id) const
2673{
2674 const pg_pool_t *pool = osdmap->get_pg_pool(pool_id);
2675 if (pool == NULL) {
2676 ldout(cct, 4) << __func__ << ": DNE pool " << pool_id << dendl;
2677 return false;
2678 }
2679
2680 return _osdmap_pool_full(*pool);
2681}
2682
2683bool Objecter::_osdmap_has_pool_full() const
2684{
f67539c2 2685 for (auto it = osdmap->get_pools().begin();
7c673cae
FG
2686 it != osdmap->get_pools().end(); ++it) {
2687 if (_osdmap_pool_full(it->second))
2688 return true;
2689 }
2690 return false;
2691}
2692
7c673cae
FG
2693/**
2694 * Wrapper around osdmap->test_flag for special handling of the FULL flag.
2695 */
2696bool Objecter::_osdmap_full_flag() const
2697{
11fdf7f2 2698 // Ignore the FULL flag if the caller does not have honor_osdmap_full
9f95a23c 2699 return osdmap->test_flag(CEPH_OSDMAP_FULL) && honor_pool_full;
7c673cae
FG
2700}
2701
2702void Objecter::update_pool_full_map(map<int64_t, bool>& pool_full_map)
2703{
2704 for (map<int64_t, pg_pool_t>::const_iterator it
2705 = osdmap->get_pools().begin();
2706 it != osdmap->get_pools().end(); ++it) {
2707 if (pool_full_map.find(it->first) == pool_full_map.end()) {
2708 pool_full_map[it->first] = _osdmap_pool_full(it->second);
2709 } else {
2710 pool_full_map[it->first] = _osdmap_pool_full(it->second) ||
2711 pool_full_map[it->first];
2712 }
2713 }
2714}
2715
2716int64_t Objecter::get_object_hash_position(int64_t pool, const string& key,
2717 const string& ns)
2718{
2719 shared_lock rl(rwlock);
2720 const pg_pool_t *p = osdmap->get_pg_pool(pool);
2721 if (!p)
2722 return -ENOENT;
2723 return p->hash_key(key, ns);
2724}
2725
2726int64_t Objecter::get_object_pg_hash_position(int64_t pool, const string& key,
2727 const string& ns)
2728{
2729 shared_lock rl(rwlock);
2730 const pg_pool_t *p = osdmap->get_pg_pool(pool);
2731 if (!p)
2732 return -ENOENT;
2733 return p->raw_hash_to_pg(p->hash_key(key, ns));
2734}
2735
11fdf7f2
TL
2736void Objecter::_prune_snapc(
2737 const mempool::osdmap::map<int64_t,
9f95a23c 2738 snap_interval_set_t>& new_removed_snaps,
11fdf7f2
TL
2739 Op *op)
2740{
2741 bool match = false;
2742 auto i = new_removed_snaps.find(op->target.base_pgid.pool());
2743 if (i != new_removed_snaps.end()) {
2744 for (auto s : op->snapc.snaps) {
2745 if (i->second.contains(s)) {
2746 match = true;
2747 break;
2748 }
2749 }
2750 if (match) {
2751 vector<snapid_t> new_snaps;
2752 for (auto s : op->snapc.snaps) {
2753 if (!i->second.contains(s)) {
2754 new_snaps.push_back(s);
2755 }
2756 }
2757 op->snapc.snaps.swap(new_snaps);
2758 ldout(cct,10) << __func__ << " op " << op->tid << " snapc " << op->snapc
2759 << " (was " << new_snaps << ")" << dendl;
2760 }
2761 }
2762}
2763
7c673cae
FG
2764int Objecter::_calc_target(op_target_t *t, Connection *con, bool any_change)
2765{
2766 // rwlock is locked
2767 bool is_read = t->flags & CEPH_OSD_FLAG_READ;
2768 bool is_write = t->flags & CEPH_OSD_FLAG_WRITE;
2769 t->epoch = osdmap->get_epoch();
2770 ldout(cct,20) << __func__ << " epoch " << t->epoch
2771 << " base " << t->base_oid << " " << t->base_oloc
2772 << " precalc_pgid " << (int)t->precalc_pgid
2773 << " pgid " << t->base_pgid
2774 << (is_read ? " is_read" : "")
2775 << (is_write ? " is_write" : "")
2776 << dendl;
2777
2778 const pg_pool_t *pi = osdmap->get_pg_pool(t->base_oloc.pool);
2779 if (!pi) {
2780 t->osd = -1;
2781 return RECALC_OP_TARGET_POOL_DNE;
2782 }
20effc67
TL
2783
2784 if (pi->has_flag(pg_pool_t::FLAG_EIO)) {
2785 return RECALC_OP_TARGET_POOL_EIO;
2786 }
2787
7c673cae
FG
2788 ldout(cct,30) << __func__ << " base pi " << pi
2789 << " pg_num " << pi->get_pg_num() << dendl;
2790
2791 bool force_resend = false;
2792 if (osdmap->get_epoch() == pi->last_force_op_resend) {
2793 if (t->last_force_resend < pi->last_force_op_resend) {
2794 t->last_force_resend = pi->last_force_op_resend;
2795 force_resend = true;
2796 } else if (t->last_force_resend == 0) {
2797 force_resend = true;
2798 }
2799 }
2800
2801 // apply tiering
2802 t->target_oid = t->base_oid;
2803 t->target_oloc = t->base_oloc;
2804 if ((t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
2805 if (is_read && pi->has_read_tier())
2806 t->target_oloc.pool = pi->read_tier;
2807 if (is_write && pi->has_write_tier())
2808 t->target_oloc.pool = pi->write_tier;
2809 pi = osdmap->get_pg_pool(t->target_oloc.pool);
2810 if (!pi) {
2811 t->osd = -1;
2812 return RECALC_OP_TARGET_POOL_DNE;
2813 }
2814 }
2815
2816 pg_t pgid;
2817 if (t->precalc_pgid) {
11fdf7f2
TL
2818 ceph_assert(t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY);
2819 ceph_assert(t->base_oid.name.empty()); // make sure this is a pg op
2820 ceph_assert(t->base_oloc.pool == (int64_t)t->base_pgid.pool());
7c673cae
FG
2821 pgid = t->base_pgid;
2822 } else {
2823 int ret = osdmap->object_locator_to_pg(t->target_oid, t->target_oloc,
2824 pgid);
2825 if (ret == -ENOENT) {
2826 t->osd = -1;
2827 return RECALC_OP_TARGET_POOL_DNE;
2828 }
2829 }
2830 ldout(cct,20) << __func__ << " target " << t->target_oid << " "
2831 << t->target_oloc << " -> pgid " << pgid << dendl;
2832 ldout(cct,30) << __func__ << " target pi " << pi
2833 << " pg_num " << pi->get_pg_num() << dendl;
2834 t->pool_ever_existed = true;
2835
2836 int size = pi->size;
2837 int min_size = pi->min_size;
2838 unsigned pg_num = pi->get_pg_num();
9f95a23c 2839 unsigned pg_num_mask = pi->get_pg_num_mask();
11fdf7f2 2840 unsigned pg_num_pending = pi->get_pg_num_pending();
7c673cae
FG
2841 int up_primary, acting_primary;
2842 vector<int> up, acting;
9f95a23c
TL
2843 ps_t actual_ps = ceph_stable_mod(pgid.ps(), pg_num, pg_num_mask);
2844 pg_t actual_pgid(actual_ps, pgid.pool());
20effc67
TL
2845 if (!lookup_pg_mapping(actual_pgid, osdmap->get_epoch(), &up, &up_primary,
2846 &acting, &acting_primary)) {
9f95a23c
TL
2847 osdmap->pg_to_up_acting_osds(actual_pgid, &up, &up_primary,
2848 &acting, &acting_primary);
2849 pg_mapping_t pg_mapping(osdmap->get_epoch(),
2850 up, up_primary, acting, acting_primary);
2851 update_pg_mapping(actual_pgid, std::move(pg_mapping));
2852 }
7c673cae 2853 bool sort_bitwise = osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE);
c07f9fc5 2854 bool recovery_deletes = osdmap->test_flag(CEPH_OSDMAP_RECOVERY_DELETES);
7c673cae
FG
2855 unsigned prev_seed = ceph_stable_mod(pgid.ps(), t->pg_num, t->pg_num_mask);
2856 pg_t prev_pgid(prev_seed, pgid.pool());
2857 if (any_change && PastIntervals::is_new_interval(
2858 t->acting_primary,
2859 acting_primary,
2860 t->acting,
2861 acting,
2862 t->up_primary,
2863 up_primary,
2864 t->up,
2865 up,
2866 t->size,
2867 size,
2868 t->min_size,
2869 min_size,
2870 t->pg_num,
2871 pg_num,
11fdf7f2
TL
2872 t->pg_num_pending,
2873 pg_num_pending,
7c673cae
FG
2874 t->sort_bitwise,
2875 sort_bitwise,
c07f9fc5
FG
2876 t->recovery_deletes,
2877 recovery_deletes,
f67539c2
TL
2878 t->peering_crush_bucket_count,
2879 pi->peering_crush_bucket_count,
2880 t->peering_crush_bucket_target,
2881 pi->peering_crush_bucket_target,
2882 t->peering_crush_bucket_barrier,
2883 pi->peering_crush_bucket_barrier,
2884 t->peering_crush_mandatory_member,
2885 pi->peering_crush_mandatory_member,
7c673cae
FG
2886 prev_pgid)) {
2887 force_resend = true;
2888 }
2889
2890 bool unpaused = false;
f64942e4
AA
2891 bool should_be_paused = target_should_be_paused(t);
2892 if (t->paused && !should_be_paused) {
7c673cae
FG
2893 unpaused = true;
2894 }
9f95a23c
TL
2895 if (t->paused != should_be_paused) {
2896 ldout(cct, 10) << __func__ << " paused " << t->paused
2897 << " -> " << should_be_paused << dendl;
2898 t->paused = should_be_paused;
2899 }
7c673cae
FG
2900
2901 bool legacy_change =
2902 t->pgid != pgid ||
2903 is_pg_changed(
2904 t->acting_primary, t->acting, acting_primary, acting,
2905 t->used_replica || any_change);
11fdf7f2 2906 bool split_or_merge = false;
7c673cae 2907 if (t->pg_num) {
11fdf7f2
TL
2908 split_or_merge =
2909 prev_pgid.is_split(t->pg_num, pg_num, nullptr) ||
2910 prev_pgid.is_merge_source(t->pg_num, pg_num, nullptr) ||
2911 prev_pgid.is_merge_target(t->pg_num, pg_num);
7c673cae
FG
2912 }
2913
11fdf7f2 2914 if (legacy_change || split_or_merge || force_resend) {
7c673cae 2915 t->pgid = pgid;
20effc67 2916 t->acting = std::move(acting);
7c673cae
FG
2917 t->acting_primary = acting_primary;
2918 t->up_primary = up_primary;
20effc67 2919 t->up = std::move(up);
7c673cae
FG
2920 t->size = size;
2921 t->min_size = min_size;
2922 t->pg_num = pg_num;
9f95a23c 2923 t->pg_num_mask = pg_num_mask;
11fdf7f2 2924 t->pg_num_pending = pg_num_pending;
9f95a23c
TL
2925 spg_t spgid(actual_pgid);
2926 if (pi->is_erasure()) {
20effc67
TL
2927 for (uint8_t i = 0; i < t->acting.size(); ++i) {
2928 if (t->acting[i] == acting_primary) {
9f95a23c
TL
2929 spgid.reset_shard(shard_id_t(i));
2930 break;
2931 }
2932 }
2933 }
2934 t->actual_pgid = spgid;
7c673cae 2935 t->sort_bitwise = sort_bitwise;
c07f9fc5 2936 t->recovery_deletes = recovery_deletes;
f67539c2
TL
2937 t->peering_crush_bucket_count = pi->peering_crush_bucket_count;
2938 t->peering_crush_bucket_target = pi->peering_crush_bucket_target;
2939 t->peering_crush_bucket_barrier = pi->peering_crush_bucket_barrier;
2940 t->peering_crush_mandatory_member = pi->peering_crush_mandatory_member;
7c673cae
FG
2941 ldout(cct, 10) << __func__ << " "
2942 << " raw pgid " << pgid << " -> actual " << t->actual_pgid
20effc67 2943 << " acting " << t->acting
7c673cae
FG
2944 << " primary " << acting_primary << dendl;
2945 t->used_replica = false;
e306af50
TL
2946 if ((t->flags & (CEPH_OSD_FLAG_BALANCE_READS |
2947 CEPH_OSD_FLAG_LOCALIZE_READS)) &&
20effc67 2948 !is_write && pi->is_replicated() && t->acting.size() > 1) {
7c673cae 2949 int osd;
20effc67 2950 ceph_assert(is_read && t->acting[0] == acting_primary);
e306af50 2951 if (t->flags & CEPH_OSD_FLAG_BALANCE_READS) {
20effc67 2952 int p = rand() % t->acting.size();
7c673cae
FG
2953 if (p)
2954 t->used_replica = true;
20effc67
TL
2955 osd = t->acting[p];
2956 ldout(cct, 10) << " chose random osd." << osd << " of " << t->acting
7c673cae 2957 << dendl;
e306af50 2958 } else {
7c673cae
FG
2959 // look for a local replica. prefer the primary if the
2960 // distance is the same.
2961 int best = -1;
2962 int best_locality = 0;
20effc67 2963 for (unsigned i = 0; i < t->acting.size(); ++i) {
7c673cae 2964 int locality = osdmap->crush->get_common_ancestor_distance(
20effc67 2965 cct, t->acting[i], crush_location);
7c673cae 2966 ldout(cct, 20) << __func__ << " localize: rank " << i
20effc67 2967 << " osd." << t->acting[i]
7c673cae
FG
2968 << " locality " << locality << dendl;
2969 if (i == 0 ||
2970 (locality >= 0 && best_locality >= 0 &&
2971 locality < best_locality) ||
2972 (best_locality < 0 && locality >= 0)) {
2973 best = i;
2974 best_locality = locality;
2975 if (i)
2976 t->used_replica = true;
2977 }
2978 }
11fdf7f2 2979 ceph_assert(best >= 0);
20effc67 2980 osd = t->acting[best];
7c673cae
FG
2981 }
2982 t->osd = osd;
e306af50
TL
2983 } else {
2984 t->osd = acting_primary;
7c673cae
FG
2985 }
2986 }
2987 if (legacy_change || unpaused || force_resend) {
2988 return RECALC_OP_TARGET_NEED_RESEND;
2989 }
11fdf7f2 2990 if (split_or_merge &&
9f95a23c 2991 (osdmap->require_osd_release >= ceph_release_t::luminous ||
91327a77
AA
2992 HAVE_FEATURE(osdmap->get_xinfo(acting_primary).features,
2993 RESEND_ON_SPLIT))) {
7c673cae
FG
2994 return RECALC_OP_TARGET_NEED_RESEND;
2995 }
2996 return RECALC_OP_TARGET_NO_ACTION;
2997}
2998
2999int Objecter::_map_session(op_target_t *target, OSDSession **s,
f67539c2 3000 shunique_lock<ceph::shared_mutex>& sul)
7c673cae
FG
3001{
3002 _calc_target(target, nullptr);
3003 return _get_session(target->osd, s, sul);
3004}
3005
3006void Objecter::_session_op_assign(OSDSession *to, Op *op)
3007{
3008 // to->lock is locked
11fdf7f2
TL
3009 ceph_assert(op->session == NULL);
3010 ceph_assert(op->tid);
7c673cae
FG
3011
3012 get_session(to);
3013 op->session = to;
3014 to->ops[op->tid] = op;
3015
3016 if (to->is_homeless()) {
31f18b77 3017 num_homeless_ops++;
7c673cae
FG
3018 }
3019
3020 ldout(cct, 15) << __func__ << " " << to->osd << " " << op->tid << dendl;
3021}
3022
3023void Objecter::_session_op_remove(OSDSession *from, Op *op)
3024{
11fdf7f2 3025 ceph_assert(op->session == from);
7c673cae
FG
3026 // from->lock is locked
3027
3028 if (from->is_homeless()) {
31f18b77 3029 num_homeless_ops--;
7c673cae
FG
3030 }
3031
3032 from->ops.erase(op->tid);
3033 put_session(from);
3034 op->session = NULL;
3035
3036 ldout(cct, 15) << __func__ << " " << from->osd << " " << op->tid << dendl;
3037}
3038
3039void Objecter::_session_linger_op_assign(OSDSession *to, LingerOp *op)
3040{
3041 // to lock is locked unique
11fdf7f2 3042 ceph_assert(op->session == NULL);
7c673cae
FG
3043
3044 if (to->is_homeless()) {
31f18b77 3045 num_homeless_ops++;
7c673cae
FG
3046 }
3047
3048 get_session(to);
3049 op->session = to;
3050 to->linger_ops[op->linger_id] = op;
3051
3052 ldout(cct, 15) << __func__ << " " << to->osd << " " << op->linger_id
3053 << dendl;
3054}
3055
3056void Objecter::_session_linger_op_remove(OSDSession *from, LingerOp *op)
3057{
11fdf7f2 3058 ceph_assert(from == op->session);
7c673cae
FG
3059 // from->lock is locked unique
3060
3061 if (from->is_homeless()) {
31f18b77 3062 num_homeless_ops--;
7c673cae
FG
3063 }
3064
3065 from->linger_ops.erase(op->linger_id);
3066 put_session(from);
3067 op->session = NULL;
3068
3069 ldout(cct, 15) << __func__ << " " << from->osd << " " << op->linger_id
3070 << dendl;
3071}
3072
3073void Objecter::_session_command_op_remove(OSDSession *from, CommandOp *op)
3074{
11fdf7f2 3075 ceph_assert(from == op->session);
7c673cae
FG
3076 // from->lock is locked
3077
3078 if (from->is_homeless()) {
31f18b77 3079 num_homeless_ops--;
7c673cae
FG
3080 }
3081
3082 from->command_ops.erase(op->tid);
3083 put_session(from);
3084 op->session = NULL;
3085
3086 ldout(cct, 15) << __func__ << " " << from->osd << " " << op->tid << dendl;
3087}
3088
3089void Objecter::_session_command_op_assign(OSDSession *to, CommandOp *op)
3090{
3091 // to->lock is locked
11fdf7f2
TL
3092 ceph_assert(op->session == NULL);
3093 ceph_assert(op->tid);
7c673cae
FG
3094
3095 if (to->is_homeless()) {
31f18b77 3096 num_homeless_ops++;
7c673cae
FG
3097 }
3098
3099 get_session(to);
3100 op->session = to;
3101 to->command_ops[op->tid] = op;
3102
3103 ldout(cct, 15) << __func__ << " " << to->osd << " " << op->tid << dendl;
3104}
3105
3106int Objecter::_recalc_linger_op_target(LingerOp *linger_op,
f67539c2 3107 shunique_lock<ceph::shared_mutex>& sul)
7c673cae
FG
3108{
3109 // rwlock is locked unique
3110
3111 int r = _calc_target(&linger_op->target, nullptr, true);
3112 if (r == RECALC_OP_TARGET_NEED_RESEND) {
3113 ldout(cct, 10) << "recalc_linger_op_target tid " << linger_op->linger_id
3114 << " pgid " << linger_op->target.pgid
3115 << " acting " << linger_op->target.acting << dendl;
3116
3117 OSDSession *s = NULL;
3118 r = _get_session(linger_op->target.osd, &s, sul);
11fdf7f2 3119 ceph_assert(r == 0);
7c673cae
FG
3120
3121 if (linger_op->session != s) {
3122 // NB locking two sessions (s and linger_op->session) at the
3123 // same time here is only safe because we are the only one that
f67539c2
TL
3124 // takes two, and we are holding rwlock for write. We use
3125 // std::shared_mutex in OSDSession because lockdep doesn't know
3126 // that.
3127 unique_lock sl(s->lock);
7c673cae
FG
3128 _session_linger_op_remove(linger_op->session, linger_op);
3129 _session_linger_op_assign(s, linger_op);
3130 }
3131
3132 put_session(s);
3133 return RECALC_OP_TARGET_NEED_RESEND;
3134 }
3135 return r;
3136}
3137
3138void Objecter::_cancel_linger_op(Op *op)
3139{
3140 ldout(cct, 15) << "cancel_op " << op->tid << dendl;
3141
11fdf7f2 3142 ceph_assert(!op->should_resend);
f67539c2
TL
3143 if (op->has_completion()) {
3144 op->onfinish = nullptr;
31f18b77 3145 num_in_flight--;
7c673cae
FG
3146 }
3147
3148 _finish_op(op, 0);
3149}
3150
3151void Objecter::_finish_op(Op *op, int r)
3152{
11fdf7f2 3153 ldout(cct, 15) << __func__ << " " << op->tid << dendl;
7c673cae
FG
3154
3155 // op->session->lock is locked unique or op->session is null
3156
11fdf7f2
TL
3157 if (!op->ctx_budgeted && op->budget >= 0) {
3158 put_op_budget_bytes(op->budget);
3159 op->budget = -1;
3160 }
7c673cae
FG
3161
3162 if (op->ontimeout && r != -ETIMEDOUT)
3163 timer.cancel_event(op->ontimeout);
3164
3165 if (op->session) {
3166 _session_op_remove(op->session, op);
3167 }
3168
3169 logger->dec(l_osdc_op_active);
3170
11fdf7f2 3171 ceph_assert(check_latest_map_ops.find(op->tid) == check_latest_map_ops.end());
7c673cae 3172
31f18b77 3173 inflight_ops--;
7c673cae
FG
3174
3175 op->put();
3176}
3177
f67539c2 3178Objecter::MOSDOp *Objecter::_prepare_osd_op(Op *op)
7c673cae
FG
3179{
3180 // rwlock is locked
3181
3182 int flags = op->target.flags;
3183 flags |= CEPH_OSD_FLAG_KNOWN_REDIR;
20effc67 3184 flags |= CEPH_OSD_FLAG_SUPPORTSPOOLEIO;
7c673cae
FG
3185
3186 // Nothing checks this any longer, but needed for compatibility with
3187 // pre-luminous osds
3188 flags |= CEPH_OSD_FLAG_ONDISK;
3189
9f95a23c 3190 if (!honor_pool_full)
7c673cae
FG
3191 flags |= CEPH_OSD_FLAG_FULL_FORCE;
3192
3193 op->target.paused = false;
11fdf7f2 3194 op->stamp = ceph::coarse_mono_clock::now();
7c673cae
FG
3195
3196 hobject_t hobj = op->target.get_hobj();
f67539c2
TL
3197 auto m = new MOSDOp(client_inc, op->tid,
3198 hobj, op->target.actual_pgid,
3199 osdmap->get_epoch(),
3200 flags, op->features);
7c673cae
FG
3201
3202 m->set_snapid(op->snapid);
3203 m->set_snap_seq(op->snapc.seq);
3204 m->set_snaps(op->snapc.snaps);
3205
3206 m->ops = op->ops;
3207 m->set_mtime(op->mtime);
3208 m->set_retry_attempt(op->attempts++);
31f18b77
FG
3209
3210 if (!op->trace.valid() && cct->_conf->osdc_blkin_trace_all) {
3211 op->trace.init("op", &trace_endpoint);
3212 }
7c673cae
FG
3213
3214 if (op->priority)
3215 m->set_priority(op->priority);
3216 else
3217 m->set_priority(cct->_conf->osd_client_op_priority);
3218
3219 if (op->reqid != osd_reqid_t()) {
3220 m->set_reqid(op->reqid);
3221 }
3222
3223 logger->inc(l_osdc_op_send);
b32b8144
FG
3224 ssize_t sum = 0;
3225 for (unsigned i = 0; i < m->ops.size(); i++) {
3226 sum += m->ops[i].indata.length();
3227 }
3228 logger->inc(l_osdc_op_send_bytes, sum);
7c673cae
FG
3229
3230 return m;
3231}
3232
11fdf7f2 3233void Objecter::_send_op(Op *op)
7c673cae
FG
3234{
3235 // rwlock is locked
3236 // op->session->lock is locked
3237
3238 // backoff?
7c673cae
FG
3239 auto p = op->session->backoffs.find(op->target.actual_pgid);
3240 if (p != op->session->backoffs.end()) {
b32b8144 3241 hobject_t hoid = op->target.get_hobj();
7c673cae
FG
3242 auto q = p->second.lower_bound(hoid);
3243 if (q != p->second.begin()) {
3244 --q;
3245 if (hoid >= q->second.end) {
3246 ++q;
3247 }
3248 }
3249 if (q != p->second.end()) {
3250 ldout(cct, 20) << __func__ << " ? " << q->first << " [" << q->second.begin
3251 << "," << q->second.end << ")" << dendl;
3252 int r = cmp(hoid, q->second.begin);
3253 if (r == 0 || (r > 0 && hoid < q->second.end)) {
3254 ldout(cct, 10) << __func__ << " backoff " << op->target.actual_pgid
3255 << " id " << q->second.id << " on " << hoid
3256 << ", queuing " << op << " tid " << op->tid << dendl;
3257 return;
3258 }
3259 }
3260 }
3261
11fdf7f2
TL
3262 ceph_assert(op->tid > 0);
3263 MOSDOp *m = _prepare_osd_op(op);
7c673cae
FG
3264
3265 if (op->target.actual_pgid != m->get_spg()) {
3266 ldout(cct, 10) << __func__ << " " << op->tid << " pgid change from "
3267 << m->get_spg() << " to " << op->target.actual_pgid
3268 << ", updating and reencoding" << dendl;
3269 m->set_spg(op->target.actual_pgid);
3270 m->clear_payload(); // reencode
3271 }
3272
3273 ldout(cct, 15) << "_send_op " << op->tid << " to "
3274 << op->target.actual_pgid << " on osd." << op->session->osd
3275 << dendl;
3276
3277 ConnectionRef con = op->session->con;
11fdf7f2 3278 ceph_assert(con);
7c673cae 3279
11fdf7f2 3280#if 0
9f95a23c 3281 // preallocated rx ceph::buffer?
7c673cae 3282 if (op->con) {
9f95a23c 3283 ldout(cct, 20) << " revoking rx ceph::buffer for " << op->tid << " on "
7c673cae
FG
3284 << op->con << dendl;
3285 op->con->revoke_rx_buffer(op->tid);
3286 }
3287 if (op->outbl &&
3288 op->ontimeout == 0 && // only post rx_buffer if no timeout; see #9582
3289 op->outbl->length()) {
11fdf7f2 3290 op->outbl->invalidate_crc(); // messenger writes through c_str()
9f95a23c 3291 ldout(cct, 20) << " posting rx ceph::buffer for " << op->tid << " on " << con
7c673cae
FG
3292 << dendl;
3293 op->con = con;
3294 op->con->post_rx_buffer(op->tid, *op->outbl);
3295 }
11fdf7f2 3296#endif
7c673cae
FG
3297
3298 op->incarnation = op->session->incarnation;
3299
31f18b77
FG
3300 if (op->trace.valid()) {
3301 m->trace.init("op msg", nullptr, &op->trace);
3302 }
7c673cae
FG
3303 op->session->con->send_message(m);
3304}
3305
f67539c2 3306int Objecter::calc_op_budget(const bc::small_vector_base<OSDOp>& ops)
7c673cae
FG
3307{
3308 int op_budget = 0;
f67539c2 3309 for (auto i = ops.begin(); i != ops.end(); ++i) {
7c673cae
FG
3310 if (i->op.op & CEPH_OSD_OP_MODE_WR) {
3311 op_budget += i->indata.length();
3312 } else if (ceph_osd_op_mode_read(i->op.op)) {
a8e16298
TL
3313 if (ceph_osd_op_uses_extent(i->op.op)) {
3314 if ((int64_t)i->op.extent.length > 0)
3315 op_budget += (int64_t)i->op.extent.length;
7c673cae 3316 } else if (ceph_osd_op_type_attr(i->op.op)) {
a8e16298 3317 op_budget += i->op.xattr.name_len + i->op.xattr.value_len;
7c673cae
FG
3318 }
3319 }
3320 }
3321 return op_budget;
3322}
3323
3324void Objecter::_throttle_op(Op *op,
f67539c2 3325 shunique_lock<ceph::shared_mutex>& sul,
7c673cae
FG
3326 int op_budget)
3327{
11fdf7f2 3328 ceph_assert(sul && sul.mutex() == &rwlock);
7c673cae
FG
3329 bool locked_for_write = sul.owns_lock();
3330
3331 if (!op_budget)
11fdf7f2 3332 op_budget = calc_op_budget(op->ops);
7c673cae
FG
3333 if (!op_throttle_bytes.get_or_fail(op_budget)) { //couldn't take right now
3334 sul.unlock();
3335 op_throttle_bytes.get(op_budget);
3336 if (locked_for_write)
3337 sul.lock();
3338 else
3339 sul.lock_shared();
3340 }
3341 if (!op_throttle_ops.get_or_fail(1)) { //couldn't take right now
3342 sul.unlock();
3343 op_throttle_ops.get(1);
3344 if (locked_for_write)
3345 sul.lock();
3346 else
3347 sul.lock_shared();
3348 }
3349}
3350
11fdf7f2 3351int Objecter::take_linger_budget(LingerOp *info)
7c673cae 3352{
11fdf7f2 3353 return 1;
7c673cae
FG
3354}
3355
3356/* This function DOES put the passed message before returning */
3357void Objecter::handle_osd_op_reply(MOSDOpReply *m)
3358{
3359 ldout(cct, 10) << "in handle_osd_op_reply" << dendl;
3360
3361 // get pio
3362 ceph_tid_t tid = m->get_tid();
3363
3364 shunique_lock sul(rwlock, ceph::acquire_shared);
31f18b77 3365 if (!initialized) {
7c673cae
FG
3366 m->put();
3367 return;
3368 }
3369
3370 ConnectionRef con = m->get_connection();
11fdf7f2
TL
3371 auto priv = con->get_priv();
3372 auto s = static_cast<OSDSession*>(priv.get());
7c673cae
FG
3373 if (!s || s->con != con) {
3374 ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
7c673cae
FG
3375 m->put();
3376 return;
3377 }
3378
f67539c2 3379 unique_lock sl(s->lock);
7c673cae
FG
3380
3381 map<ceph_tid_t, Op *>::iterator iter = s->ops.find(tid);
3382 if (iter == s->ops.end()) {
3383 ldout(cct, 7) << "handle_osd_op_reply " << tid
3384 << (m->is_ondisk() ? " ondisk" : (m->is_onnvram() ?
3385 " onnvram" : " ack"))
3386 << " ... stray" << dendl;
3387 sl.unlock();
7c673cae
FG
3388 m->put();
3389 return;
3390 }
3391
3392 ldout(cct, 7) << "handle_osd_op_reply " << tid
3393 << (m->is_ondisk() ? " ondisk" :
3394 (m->is_onnvram() ? " onnvram" : " ack"))
3395 << " uv " << m->get_user_version()
3396 << " in " << m->get_pg()
3397 << " attempt " << m->get_retry_attempt()
3398 << dendl;
3399 Op *op = iter->second;
31f18b77 3400 op->trace.event("osd op reply");
7c673cae
FG
3401
3402 if (retry_writes_after_first_reply && op->attempts == 1 &&
3403 (op->target.flags & CEPH_OSD_FLAG_WRITE)) {
3404 ldout(cct, 7) << "retrying write after first reply: " << tid << dendl;
f67539c2 3405 if (op->has_completion()) {
31f18b77 3406 num_in_flight--;
7c673cae
FG
3407 }
3408 _session_op_remove(s, op);
3409 sl.unlock();
7c673cae
FG
3410
3411 _op_submit(op, sul, NULL);
3412 m->put();
3413 return;
3414 }
3415
3416 if (m->get_retry_attempt() >= 0) {
3417 if (m->get_retry_attempt() != (op->attempts - 1)) {
3418 ldout(cct, 7) << " ignoring reply from attempt "
3419 << m->get_retry_attempt()
3420 << " from " << m->get_source_inst()
3421 << "; last attempt " << (op->attempts - 1) << " sent to "
3422 << op->session->con->get_peer_addr() << dendl;
3423 m->put();
3424 sl.unlock();
7c673cae
FG
3425 return;
3426 }
3427 } else {
3428 // we don't know the request attempt because the server is old, so
3429 // just accept this one. we may do ACK callbacks we shouldn't
3430 // have, but that is better than doing callbacks out of order.
3431 }
3432
f67539c2 3433 decltype(op->onfinish) onfinish;
7c673cae
FG
3434
3435 int rc = m->get_result();
3436
3437 if (m->is_redirect_reply()) {
3438 ldout(cct, 5) << " got redirect reply; redirecting" << dendl;
f67539c2 3439 if (op->has_completion())
31f18b77 3440 num_in_flight--;
7c673cae
FG
3441 _session_op_remove(s, op);
3442 sl.unlock();
7c673cae
FG
3443
3444 // FIXME: two redirects could race and reorder
3445
3446 op->tid = 0;
3447 m->get_redirect().combine_with_locator(op->target.target_oloc,
3448 op->target.target_oid.name);
f64942e4
AA
3449 op->target.flags |= (CEPH_OSD_FLAG_REDIRECTED |
3450 CEPH_OSD_FLAG_IGNORE_CACHE |
3451 CEPH_OSD_FLAG_IGNORE_OVERLAY);
7c673cae
FG
3452 _op_submit(op, sul, NULL);
3453 m->put();
3454 return;
3455 }
3456
3457 if (rc == -EAGAIN) {
3458 ldout(cct, 7) << " got -EAGAIN, resubmitting" << dendl;
f67539c2 3459 if (op->has_completion())
c07f9fc5
FG
3460 num_in_flight--;
3461 _session_op_remove(s, op);
7c673cae 3462 sl.unlock();
c07f9fc5
FG
3463
3464 op->tid = 0;
3465 op->target.flags &= ~(CEPH_OSD_FLAG_BALANCE_READS |
3466 CEPH_OSD_FLAG_LOCALIZE_READS);
3467 op->target.pgid = pg_t();
3468 _op_submit(op, sul, NULL);
7c673cae
FG
3469 m->put();
3470 return;
3471 }
3472
3473 sul.unlock();
3474
3475 if (op->objver)
3476 *op->objver = m->get_user_version();
3477 if (op->reply_epoch)
3478 *op->reply_epoch = m->get_map_epoch();
3479 if (op->data_offset)
3480 *op->data_offset = m->get_header().data_off;
3481
3482 // got data?
3483 if (op->outbl) {
11fdf7f2 3484#if 0
7c673cae
FG
3485 if (op->con)
3486 op->con->revoke_rx_buffer(op->tid);
11fdf7f2
TL
3487#endif
3488 auto& bl = m->get_data();
3489 if (op->outbl->length() == bl.length() &&
3490 bl.get_num_buffers() <= 1) {
3491 // this is here to keep previous users to *relied* on getting data
3492 // read into existing buffers happy. Notably,
3493 // libradosstriper::RadosStriperImpl::aio_read().
3494 ldout(cct,10) << __func__ << " copying resulting " << bl.length()
9f95a23c 3495 << " into existing ceph::buffer of length " << op->outbl->length()
11fdf7f2 3496 << dendl;
f67539c2
TL
3497 cb::list t;
3498 t = std::move(*op->outbl);
11fdf7f2 3499 t.invalidate_crc(); // we're overwriting the raw buffers via c_str()
9f95a23c 3500 bl.begin().copy(bl.length(), t.c_str());
11fdf7f2
TL
3501 op->outbl->substr_of(t, 0, bl.length());
3502 } else {
3503 m->claim_data(*op->outbl);
3504 }
7c673cae
FG
3505 op->outbl = 0;
3506 }
3507
3508 // per-op result demuxing
3509 vector<OSDOp> out_ops;
3510 m->claim_ops(out_ops);
3511
3512 if (out_ops.size() != op->ops.size())
3513 ldout(cct, 0) << "WARNING: tid " << op->tid << " reply ops " << out_ops
3514 << " != request ops " << op->ops
3515 << " from " << m->get_source_inst() << dendl;
3516
f67539c2
TL
3517 ceph_assert(op->ops.size() == op->out_bl.size());
3518 ceph_assert(op->ops.size() == op->out_rval.size());
3519 ceph_assert(op->ops.size() == op->out_ec.size());
3520 ceph_assert(op->ops.size() == op->out_handler.size());
3521 auto pb = op->out_bl.begin();
3522 auto pr = op->out_rval.begin();
3523 auto pe = op->out_ec.begin();
3524 auto ph = op->out_handler.begin();
11fdf7f2
TL
3525 ceph_assert(op->out_bl.size() == op->out_rval.size());
3526 ceph_assert(op->out_bl.size() == op->out_handler.size());
f67539c2 3527 auto p = out_ops.begin();
7c673cae
FG
3528 for (unsigned i = 0;
3529 p != out_ops.end() && pb != op->out_bl.end();
f67539c2 3530 ++i, ++p, ++pb, ++pr, ++pe, ++ph) {
7c673cae
FG
3531 ldout(cct, 10) << " op " << i << " rval " << p->rval
3532 << " len " << p->outdata.length() << dendl;
3533 if (*pb)
3534 **pb = p->outdata;
3535 // set rval before running handlers so that handlers
3536 // can change it if e.g. decoding fails
3537 if (*pr)
31f18b77 3538 **pr = ceph_to_hostos_errno(p->rval);
f67539c2
TL
3539 if (*pe)
3540 **pe = p->rval < 0 ? bs::error_code(-p->rval, osd_category()) :
3541 bs::error_code();
7c673cae 3542 if (*ph) {
f67539c2
TL
3543 std::move((*ph))(p->rval < 0 ?
3544 bs::error_code(-p->rval, osd_category()) :
3545 bs::error_code(),
3546 p->rval, p->outdata);
7c673cae
FG
3547 }
3548 }
3549
3550 // NOTE: we assume that since we only request ONDISK ever we will
3551 // only ever get back one (type of) ack ever.
3552
f67539c2 3553 if (op->has_completion()) {
31f18b77 3554 num_in_flight--;
f67539c2
TL
3555 onfinish = std::move(op->onfinish);
3556 op->onfinish = nullptr;
7c673cae
FG
3557 }
3558 logger->inc(l_osdc_op_reply);
3559
3560 /* get it before we call _finish_op() */
3561 auto completion_lock = s->get_lock(op->target.base_oid);
3562
3563 ldout(cct, 15) << "handle_osd_op_reply completed tid " << tid << dendl;
3564 _finish_op(op, 0);
3565
31f18b77 3566 ldout(cct, 5) << num_in_flight << " in flight" << dendl;
7c673cae
FG
3567
3568 // serialize completions
3569 if (completion_lock.mutex()) {
3570 completion_lock.lock();
3571 }
3572 sl.unlock();
3573
3574 // do callbacks
f67539c2
TL
3575 if (Op::has_completion(onfinish)) {
3576 Op::complete(std::move(onfinish), osdcode(rc), rc);
7c673cae
FG
3577 }
3578 if (completion_lock.mutex()) {
3579 completion_lock.unlock();
3580 }
3581
3582 m->put();
7c673cae
FG
3583}
3584
3585void Objecter::handle_osd_backoff(MOSDBackoff *m)
3586{
3587 ldout(cct, 10) << __func__ << " " << *m << dendl;
3588 shunique_lock sul(rwlock, ceph::acquire_shared);
31f18b77 3589 if (!initialized) {
7c673cae
FG
3590 m->put();
3591 return;
3592 }
3593
3594 ConnectionRef con = m->get_connection();
11fdf7f2
TL
3595 auto priv = con->get_priv();
3596 auto s = static_cast<OSDSession*>(priv.get());
7c673cae
FG
3597 if (!s || s->con != con) {
3598 ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
7c673cae
FG
3599 m->put();
3600 return;
3601 }
3602
3603 get_session(s);
7c673cae 3604
f67539c2 3605 unique_lock sl(s->lock);
7c673cae
FG
3606
3607 switch (m->op) {
3608 case CEPH_OSD_BACKOFF_OP_BLOCK:
3609 {
3610 // register
3611 OSDBackoff& b = s->backoffs[m->pgid][m->begin];
3612 s->backoffs_by_id.insert(make_pair(m->id, &b));
3613 b.pgid = m->pgid;
3614 b.id = m->id;
3615 b.begin = m->begin;
3616 b.end = m->end;
3617
3618 // ack with original backoff's epoch so that the osd can discard this if
3619 // there was a pg split.
f67539c2
TL
3620 auto r = new MOSDBackoff(m->pgid, m->map_epoch,
3621 CEPH_OSD_BACKOFF_OP_ACK_BLOCK,
3622 m->id, m->begin, m->end);
7c673cae
FG
3623 // this priority must match the MOSDOps from _prepare_osd_op
3624 r->set_priority(cct->_conf->osd_client_op_priority);
3625 con->send_message(r);
3626 }
3627 break;
3628
3629 case CEPH_OSD_BACKOFF_OP_UNBLOCK:
3630 {
3631 auto p = s->backoffs_by_id.find(m->id);
3632 if (p != s->backoffs_by_id.end()) {
3633 OSDBackoff *b = p->second;
3634 if (b->begin != m->begin &&
3635 b->end != m->end) {
3636 lderr(cct) << __func__ << " got " << m->pgid << " id " << m->id
3637 << " unblock on ["
3638 << m->begin << "," << m->end << ") but backoff is ["
3639 << b->begin << "," << b->end << ")" << dendl;
3640 // hrmpf, unblock it anyway.
3641 }
3642 ldout(cct, 10) << __func__ << " unblock backoff " << b->pgid
3643 << " id " << b->id
3644 << " [" << b->begin << "," << b->end
3645 << ")" << dendl;
3646 auto spgp = s->backoffs.find(b->pgid);
11fdf7f2 3647 ceph_assert(spgp != s->backoffs.end());
7c673cae
FG
3648 spgp->second.erase(b->begin);
3649 if (spgp->second.empty()) {
3650 s->backoffs.erase(spgp);
3651 }
3652 s->backoffs_by_id.erase(p);
3653
3654 // check for any ops to resend
3655 for (auto& q : s->ops) {
3656 if (q.second->target.actual_pgid == m->pgid) {
3657 int r = q.second->target.contained_by(m->begin, m->end);
3658 ldout(cct, 20) << __func__ << " contained_by " << r << " on "
3659 << q.second->target.get_hobj() << dendl;
3660 if (r) {
3661 _send_op(q.second);
3662 }
3663 }
3664 }
3665 } else {
3666 lderr(cct) << __func__ << " " << m->pgid << " id " << m->id
3667 << " unblock on ["
3668 << m->begin << "," << m->end << ") but backoff dne" << dendl;
3669 }
3670 }
3671 break;
3672
3673 default:
3674 ldout(cct, 10) << __func__ << " unrecognized op " << (int)m->op << dendl;
3675 }
3676
3677 sul.unlock();
3678 sl.unlock();
3679
3680 m->put();
3681 put_session(s);
3682}
3683
3684uint32_t Objecter::list_nobjects_seek(NListContext *list_context,
3685 uint32_t pos)
3686{
3687 shared_lock rl(rwlock);
3688 list_context->pos = hobject_t(object_t(), string(), CEPH_NOSNAP,
3689 pos, list_context->pool_id, string());
11fdf7f2 3690 ldout(cct, 10) << __func__ << " " << list_context
7c673cae
FG
3691 << " pos " << pos << " -> " << list_context->pos << dendl;
3692 pg_t actual = osdmap->raw_pg_to_pg(pg_t(pos, list_context->pool_id));
3693 list_context->current_pg = actual.ps();
3694 list_context->at_end_of_pool = false;
3695 return pos;
3696}
3697
3698uint32_t Objecter::list_nobjects_seek(NListContext *list_context,
3699 const hobject_t& cursor)
3700{
3701 shared_lock rl(rwlock);
3702 ldout(cct, 10) << "list_nobjects_seek " << list_context << dendl;
3703 list_context->pos = cursor;
3704 list_context->at_end_of_pool = false;
3705 pg_t actual = osdmap->raw_pg_to_pg(pg_t(cursor.get_hash(), list_context->pool_id));
3706 list_context->current_pg = actual.ps();
3707 list_context->sort_bitwise = true;
3708 return list_context->current_pg;
3709}
3710
3711void Objecter::list_nobjects_get_cursor(NListContext *list_context,
3712 hobject_t *cursor)
3713{
3714 shared_lock rl(rwlock);
3715 if (list_context->list.empty()) {
3716 *cursor = list_context->pos;
3717 } else {
3718 const librados::ListObjectImpl& entry = list_context->list.front();
3719 const string *key = (entry.locator.empty() ? &entry.oid : &entry.locator);
3720 uint32_t h = osdmap->get_pg_pool(list_context->pool_id)->hash_key(*key, entry.nspace);
3721 *cursor = hobject_t(entry.oid, entry.locator, list_context->pool_snap_seq, h, list_context->pool_id, entry.nspace);
3722 }
3723}
3724
3725void Objecter::list_nobjects(NListContext *list_context, Context *onfinish)
3726{
3727 ldout(cct, 10) << __func__ << " pool_id " << list_context->pool_id
3728 << " pool_snap_seq " << list_context->pool_snap_seq
3729 << " max_entries " << list_context->max_entries
3730 << " list_context " << list_context
3731 << " onfinish " << onfinish
3732 << " current_pg " << list_context->current_pg
3733 << " pos " << list_context->pos << dendl;
3734
3735 shared_lock rl(rwlock);
3736 const pg_pool_t *pool = osdmap->get_pg_pool(list_context->pool_id);
3737 if (!pool) { // pool is gone
3738 rl.unlock();
3739 put_nlist_context_budget(list_context);
3740 onfinish->complete(-ENOENT);
3741 return;
3742 }
3743 int pg_num = pool->get_pg_num();
3744 bool sort_bitwise = osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE);
3745
3746 if (list_context->pos.is_min()) {
3747 list_context->starting_pg_num = 0;
3748 list_context->sort_bitwise = sort_bitwise;
3749 list_context->starting_pg_num = pg_num;
3750 }
3751 if (list_context->sort_bitwise != sort_bitwise) {
3752 list_context->pos = hobject_t(
3753 object_t(), string(), CEPH_NOSNAP,
3754 list_context->current_pg, list_context->pool_id, string());
3755 list_context->sort_bitwise = sort_bitwise;
3756 ldout(cct, 10) << " hobject sort order changed, restarting this pg at "
3757 << list_context->pos << dendl;
3758 }
3759 if (list_context->starting_pg_num != pg_num) {
3760 if (!sort_bitwise) {
3761 // start reading from the beginning; the pgs have changed
3762 ldout(cct, 10) << " pg_num changed; restarting with " << pg_num << dendl;
3763 list_context->pos = collection_list_handle_t();
3764 }
3765 list_context->starting_pg_num = pg_num;
3766 }
3767
3768 if (list_context->pos.is_max()) {
3769 ldout(cct, 20) << __func__ << " end of pool, list "
3770 << list_context->list << dendl;
3771 if (list_context->list.empty()) {
3772 list_context->at_end_of_pool = true;
3773 }
3774 // release the listing context's budget once all
3775 // OPs (in the session) are finished
3776 put_nlist_context_budget(list_context);
3777 onfinish->complete(0);
3778 return;
3779 }
3780
3781 ObjectOperation op;
3782 op.pg_nls(list_context->max_entries, list_context->filter,
3783 list_context->pos, osdmap->get_epoch());
3784 list_context->bl.clear();
f67539c2 3785 auto onack = new C_NList(list_context, onfinish, this);
7c673cae
FG
3786 object_locator_t oloc(list_context->pool_id, list_context->nspace);
3787
3788 // note current_pg in case we don't have (or lose) SORTBITWISE
3789 list_context->current_pg = pool->raw_hash_to_pg(list_context->pos.get_hash());
3790 rl.unlock();
3791
3792 pg_read(list_context->current_pg, oloc, op,
3793 &list_context->bl, 0, onack, &onack->epoch,
3794 &list_context->ctx_budget);
3795}
3796
3797void Objecter::_nlist_reply(NListContext *list_context, int r,
3798 Context *final_finish, epoch_t reply_epoch)
3799{
3800 ldout(cct, 10) << __func__ << " " << list_context << dendl;
3801
11fdf7f2 3802 auto iter = list_context->bl.cbegin();
7c673cae 3803 pg_nls_response_t response;
11fdf7f2 3804 decode(response, iter);
7c673cae 3805 if (!iter.end()) {
9f95a23c 3806 // we do this as legacy.
f67539c2 3807 cb::list legacy_extra_info;
9f95a23c 3808 decode(legacy_extra_info, iter);
7c673cae
FG
3809 }
3810
3811 // if the osd returns 1 (newer code), or handle MAX, it means we
3812 // hit the end of the pg.
3813 if ((response.handle.is_max() || r == 1) &&
3814 !list_context->sort_bitwise) {
3815 // legacy OSD and !sortbitwise, figure out the next PG on our own
3816 ++list_context->current_pg;
3817 if (list_context->current_pg == list_context->starting_pg_num) {
3818 // end of pool
3819 list_context->pos = hobject_t::get_max();
3820 } else {
3821 // next pg
3822 list_context->pos = hobject_t(object_t(), string(), CEPH_NOSNAP,
3823 list_context->current_pg,
3824 list_context->pool_id, string());
3825 }
3826 } else {
3827 list_context->pos = response.handle;
3828 }
3829
3830 int response_size = response.entries.size();
3831 ldout(cct, 20) << " response.entries.size " << response_size
3832 << ", response.entries " << response.entries
3833 << ", handle " << response.handle
3834 << ", tentative new pos " << list_context->pos << dendl;
7c673cae 3835 if (response_size) {
f67539c2
TL
3836 std::move(response.entries.begin(), response.entries.end(),
3837 std::back_inserter(list_context->list));
3838 response.entries.clear();
7c673cae
FG
3839 }
3840
3841 if (list_context->list.size() >= list_context->max_entries) {
3842 ldout(cct, 20) << " hit max, returning results so far, "
3843 << list_context->list << dendl;
3844 // release the listing context's budget once all
3845 // OPs (in the session) are finished
3846 put_nlist_context_budget(list_context);
3847 final_finish->complete(0);
3848 return;
3849 }
3850
3851 // continue!
3852 list_nobjects(list_context, final_finish);
3853}
3854
3855void Objecter::put_nlist_context_budget(NListContext *list_context)
3856{
3857 if (list_context->ctx_budget >= 0) {
3858 ldout(cct, 10) << " release listing context's budget " <<
3859 list_context->ctx_budget << dendl;
3860 put_op_budget_bytes(list_context->ctx_budget);
3861 list_context->ctx_budget = -1;
3862 }
3863}
3864
3865// snapshots
3866
f67539c2
TL
3867void Objecter::create_pool_snap(int64_t pool, std::string_view snap_name,
3868 decltype(PoolOp::onfinish)&& onfinish)
7c673cae
FG
3869{
3870 unique_lock wl(rwlock);
3871 ldout(cct, 10) << "create_pool_snap; pool: " << pool << "; snap: "
3872 << snap_name << dendl;
3873
3874 const pg_pool_t *p = osdmap->get_pg_pool(pool);
f67539c2
TL
3875 if (!p) {
3876 onfinish->defer(std::move(onfinish), osdc_errc::pool_dne, cb::list{});
3877 return;
3878 }
3879 if (p->snap_exists(snap_name)) {
3880 onfinish->defer(std::move(onfinish), osdc_errc::snapshot_exists,
3881 cb::list{});
3882 return;
3883 }
7c673cae 3884
f67539c2 3885 auto op = new PoolOp;
31f18b77 3886 op->tid = ++last_tid;
7c673cae
FG
3887 op->pool = pool;
3888 op->name = snap_name;
f67539c2 3889 op->onfinish = std::move(onfinish);
7c673cae
FG
3890 op->pool_op = POOL_OP_CREATE_SNAP;
3891 pool_ops[op->tid] = op;
3892
3893 pool_op_submit(op);
7c673cae
FG
3894}
3895
f67539c2
TL
3896struct CB_SelfmanagedSnap {
3897 std::unique_ptr<ca::Completion<void(bs::error_code, snapid_t)>> fin;
3898 CB_SelfmanagedSnap(decltype(fin)&& fin)
3899 : fin(std::move(fin)) {}
3900 void operator()(bs::error_code ec, const cb::list& bl) {
3901 snapid_t snapid = 0;
3902 if (!ec) {
11fdf7f2 3903 try {
f67539c2
TL
3904 auto p = bl.cbegin();
3905 decode(snapid, p);
3906 } catch (const cb::error& e) {
3907 ec = e.code();
11fdf7f2 3908 }
7c673cae 3909 }
f67539c2 3910 fin->defer(std::move(fin), ec, snapid);
7c673cae
FG
3911 }
3912};
3913
f67539c2
TL
3914void Objecter::allocate_selfmanaged_snap(
3915 int64_t pool,
3916 std::unique_ptr<ca::Completion<void(bs::error_code, snapid_t)>> onfinish)
7c673cae
FG
3917{
3918 unique_lock wl(rwlock);
3919 ldout(cct, 10) << "allocate_selfmanaged_snap; pool: " << pool << dendl;
f67539c2 3920 auto op = new PoolOp;
31f18b77 3921 op->tid = ++last_tid;
7c673cae 3922 op->pool = pool;
f67539c2
TL
3923 op->onfinish = PoolOp::OpComp::create(
3924 service.get_executor(),
3925 CB_SelfmanagedSnap(std::move(onfinish)));
7c673cae
FG
3926 op->pool_op = POOL_OP_CREATE_UNMANAGED_SNAP;
3927 pool_ops[op->tid] = op;
3928
3929 pool_op_submit(op);
7c673cae
FG
3930}
3931
f67539c2
TL
3932void Objecter::delete_pool_snap(
3933 int64_t pool, std::string_view snap_name,
3934 decltype(PoolOp::onfinish)&& onfinish)
7c673cae
FG
3935{
3936 unique_lock wl(rwlock);
3937 ldout(cct, 10) << "delete_pool_snap; pool: " << pool << "; snap: "
3938 << snap_name << dendl;
3939
3940 const pg_pool_t *p = osdmap->get_pg_pool(pool);
f67539c2
TL
3941 if (!p) {
3942 onfinish->defer(std::move(onfinish), osdc_errc::pool_dne, cb::list{});
3943 return;
3944 }
7c673cae 3945
f67539c2
TL
3946 if (!p->snap_exists(snap_name)) {
3947 onfinish->defer(std::move(onfinish), osdc_errc::snapshot_dne, cb::list{});
3948 return;
3949 }
3950
3951 auto op = new PoolOp;
31f18b77 3952 op->tid = ++last_tid;
7c673cae
FG
3953 op->pool = pool;
3954 op->name = snap_name;
f67539c2 3955 op->onfinish = std::move(onfinish);
7c673cae
FG
3956 op->pool_op = POOL_OP_DELETE_SNAP;
3957 pool_ops[op->tid] = op;
3958
3959 pool_op_submit(op);
7c673cae
FG
3960}
3961
f67539c2
TL
3962void Objecter::delete_selfmanaged_snap(int64_t pool, snapid_t snap,
3963 decltype(PoolOp::onfinish)&& onfinish)
7c673cae
FG
3964{
3965 unique_lock wl(rwlock);
3966 ldout(cct, 10) << "delete_selfmanaged_snap; pool: " << pool << "; snap: "
3967 << snap << dendl;
f67539c2 3968 auto op = new PoolOp;
31f18b77 3969 op->tid = ++last_tid;
7c673cae 3970 op->pool = pool;
f67539c2 3971 op->onfinish = std::move(onfinish);
7c673cae
FG
3972 op->pool_op = POOL_OP_DELETE_UNMANAGED_SNAP;
3973 op->snapid = snap;
3974 pool_ops[op->tid] = op;
3975
3976 pool_op_submit(op);
7c673cae
FG
3977}
3978
f67539c2
TL
3979void Objecter::create_pool(std::string_view name,
3980 decltype(PoolOp::onfinish)&& onfinish,
3981 int crush_rule)
7c673cae
FG
3982{
3983 unique_lock wl(rwlock);
3984 ldout(cct, 10) << "create_pool name=" << name << dendl;
3985
f67539c2
TL
3986 if (osdmap->lookup_pg_pool_name(name) >= 0) {
3987 onfinish->defer(std::move(onfinish), osdc_errc::pool_exists, cb::list{});
3988 return;
3989 }
7c673cae 3990
f67539c2 3991 auto op = new PoolOp;
31f18b77 3992 op->tid = ++last_tid;
7c673cae
FG
3993 op->pool = 0;
3994 op->name = name;
f67539c2 3995 op->onfinish = std::move(onfinish);
7c673cae
FG
3996 op->pool_op = POOL_OP_CREATE;
3997 pool_ops[op->tid] = op;
7c673cae
FG
3998 op->crush_rule = crush_rule;
3999
4000 pool_op_submit(op);
7c673cae
FG
4001}
4002
f67539c2
TL
4003void Objecter::delete_pool(int64_t pool,
4004 decltype(PoolOp::onfinish)&& onfinish)
7c673cae
FG
4005{
4006 unique_lock wl(rwlock);
4007 ldout(cct, 10) << "delete_pool " << pool << dendl;
4008
4009 if (!osdmap->have_pg_pool(pool))
f67539c2
TL
4010 onfinish->defer(std::move(onfinish), osdc_errc::pool_dne, cb::list{});
4011 else
4012 _do_delete_pool(pool, std::move(onfinish));
7c673cae
FG
4013}
4014
f67539c2
TL
4015void Objecter::delete_pool(std::string_view pool_name,
4016 decltype(PoolOp::onfinish)&& onfinish)
7c673cae
FG
4017{
4018 unique_lock wl(rwlock);
4019 ldout(cct, 10) << "delete_pool " << pool_name << dendl;
4020
4021 int64_t pool = osdmap->lookup_pg_pool_name(pool_name);
4022 if (pool < 0)
f67539c2
TL
4023 // This only returns one error: -ENOENT.
4024 onfinish->defer(std::move(onfinish), osdc_errc::pool_dne, cb::list{});
4025 else
4026 _do_delete_pool(pool, std::move(onfinish));
7c673cae
FG
4027}
4028
f67539c2
TL
4029void Objecter::_do_delete_pool(int64_t pool,
4030 decltype(PoolOp::onfinish)&& onfinish)
4031
7c673cae 4032{
f67539c2 4033 auto op = new PoolOp;
31f18b77 4034 op->tid = ++last_tid;
7c673cae
FG
4035 op->pool = pool;
4036 op->name = "delete";
f67539c2 4037 op->onfinish = std::move(onfinish);
7c673cae
FG
4038 op->pool_op = POOL_OP_DELETE;
4039 pool_ops[op->tid] = op;
4040 pool_op_submit(op);
4041}
4042
7c673cae
FG
4043void Objecter::pool_op_submit(PoolOp *op)
4044{
4045 // rwlock is locked
4046 if (mon_timeout > timespan(0)) {
4047 op->ontimeout = timer.add_event(mon_timeout,
4048 [this, op]() {
4049 pool_op_cancel(op->tid, -ETIMEDOUT); });
4050 }
4051 _pool_op_submit(op);
4052}
4053
4054void Objecter::_pool_op_submit(PoolOp *op)
4055{
4056 // rwlock is locked unique
4057
4058 ldout(cct, 10) << "pool_op_submit " << op->tid << dendl;
f67539c2
TL
4059 auto m = new MPoolOp(monc->get_fsid(), op->tid, op->pool,
4060 op->name, op->pool_op,
4061 last_seen_osdmap_version);
7c673cae
FG
4062 if (op->snapid) m->snapid = op->snapid;
4063 if (op->crush_rule) m->crush_rule = op->crush_rule;
4064 monc->send_mon_message(m);
11fdf7f2 4065 op->last_submit = ceph::coarse_mono_clock::now();
7c673cae
FG
4066
4067 logger->inc(l_osdc_poolop_send);
4068}
4069
4070/**
4071 * Handle a reply to a PoolOp message. Check that we sent the message
f67539c2 4072 * and give the caller responsibility for the returned cb::list.
7c673cae
FG
4073 * Then either call the finisher or stash the PoolOp, depending on if we
4074 * have a new enough map.
4075 * Lastly, clean up the message and PoolOp.
4076 */
4077void Objecter::handle_pool_op_reply(MPoolOpReply *m)
4078{
f67539c2
TL
4079 int rc = m->replyCode;
4080 auto ec = rc < 0 ? bs::error_code(-rc, mon_category()) : bs::error_code();
11fdf7f2 4081 FUNCTRACE(cct);
7c673cae 4082 shunique_lock sul(rwlock, acquire_shared);
31f18b77 4083 if (!initialized) {
7c673cae
FG
4084 sul.unlock();
4085 m->put();
4086 return;
4087 }
4088
4089 ldout(cct, 10) << "handle_pool_op_reply " << *m << dendl;
4090 ceph_tid_t tid = m->get_tid();
f67539c2 4091 auto iter = pool_ops.find(tid);
7c673cae
FG
4092 if (iter != pool_ops.end()) {
4093 PoolOp *op = iter->second;
4094 ldout(cct, 10) << "have request " << tid << " at " << op << " Op: "
4095 << ceph_pool_op_name(op->pool_op) << dendl;
f67539c2 4096 cb::list bl{std::move(m->response_data)};
7c673cae
FG
4097 if (m->version > last_seen_osdmap_version)
4098 last_seen_osdmap_version = m->version;
4099 if (osdmap->get_epoch() < m->epoch) {
4100 sul.unlock();
4101 sul.lock();
4102 // recheck op existence since we have let go of rwlock
4103 // (for promotion) above.
4104 iter = pool_ops.find(tid);
4105 if (iter == pool_ops.end())
4106 goto done; // op is gone.
4107 if (osdmap->get_epoch() < m->epoch) {
4108 ldout(cct, 20) << "waiting for client to reach epoch " << m->epoch
4109 << " before calling back" << dendl;
f67539c2
TL
4110 _wait_for_new_map(OpCompletion::create(
4111 service.get_executor(),
4112 [o = std::move(op->onfinish),
4113 bl = std::move(bl)](
4114 bs::error_code ec) mutable {
4115 o->defer(std::move(o), ec, bl);
4116 }),
4117 m->epoch,
4118 ec);
7c673cae
FG
4119 } else {
4120 // map epoch changed, probably because a MOSDMap message
4121 // sneaked in. Do caller-specified callback now or else
4122 // we lose it forever.
11fdf7f2 4123 ceph_assert(op->onfinish);
f67539c2 4124 op->onfinish->defer(std::move(op->onfinish), ec, std::move(bl));
7c673cae
FG
4125 }
4126 } else {
11fdf7f2 4127 ceph_assert(op->onfinish);
f67539c2 4128 op->onfinish->defer(std::move(op->onfinish), ec, std::move(bl));
7c673cae 4129 }
f67539c2 4130 op->onfinish = nullptr;
7c673cae
FG
4131 if (!sul.owns_lock()) {
4132 sul.unlock();
4133 sul.lock();
4134 }
4135 iter = pool_ops.find(tid);
4136 if (iter != pool_ops.end()) {
4137 _finish_pool_op(op, 0);
4138 }
4139 } else {
4140 ldout(cct, 10) << "unknown request " << tid << dendl;
4141 }
4142
4143done:
4144 // Not strictly necessary, since we'll release it on return.
4145 sul.unlock();
4146
4147 ldout(cct, 10) << "done" << dendl;
4148 m->put();
4149}
4150
4151int Objecter::pool_op_cancel(ceph_tid_t tid, int r)
4152{
11fdf7f2 4153 ceph_assert(initialized);
7c673cae
FG
4154
4155 unique_lock wl(rwlock);
4156
f67539c2 4157 auto it = pool_ops.find(tid);
7c673cae
FG
4158 if (it == pool_ops.end()) {
4159 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4160 return -ENOENT;
4161 }
4162
4163 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4164
4165 PoolOp *op = it->second;
4166 if (op->onfinish)
f67539c2 4167 op->onfinish->defer(std::move(op->onfinish), osdcode(r), cb::list{});
7c673cae
FG
4168
4169 _finish_pool_op(op, r);
4170 return 0;
4171}
4172
4173void Objecter::_finish_pool_op(PoolOp *op, int r)
4174{
4175 // rwlock is locked unique
4176 pool_ops.erase(op->tid);
4177 logger->set(l_osdc_poolop_active, pool_ops.size());
4178
4179 if (op->ontimeout && r != -ETIMEDOUT) {
4180 timer.cancel_event(op->ontimeout);
4181 }
4182
4183 delete op;
4184}
4185
4186// pool stats
4187
f67539c2
TL
4188void Objecter::get_pool_stats(
4189 const std::vector<std::string>& pools,
4190 decltype(PoolStatOp::onfinish)&& onfinish)
7c673cae
FG
4191{
4192 ldout(cct, 10) << "get_pool_stats " << pools << dendl;
4193
f67539c2 4194 auto op = new PoolStatOp;
31f18b77 4195 op->tid = ++last_tid;
7c673cae 4196 op->pools = pools;
f67539c2 4197 op->onfinish = std::move(onfinish);
7c673cae
FG
4198 if (mon_timeout > timespan(0)) {
4199 op->ontimeout = timer.add_event(mon_timeout,
4200 [this, op]() {
4201 pool_stat_op_cancel(op->tid,
4202 -ETIMEDOUT); });
4203 } else {
4204 op->ontimeout = 0;
4205 }
4206
4207 unique_lock wl(rwlock);
4208
4209 poolstat_ops[op->tid] = op;
4210
4211 logger->set(l_osdc_poolstat_active, poolstat_ops.size());
4212
4213 _poolstat_submit(op);
4214}
4215
4216void Objecter::_poolstat_submit(PoolStatOp *op)
4217{
4218 ldout(cct, 10) << "_poolstat_submit " << op->tid << dendl;
4219 monc->send_mon_message(new MGetPoolStats(monc->get_fsid(), op->tid,
4220 op->pools,
4221 last_seen_pgmap_version));
11fdf7f2 4222 op->last_submit = ceph::coarse_mono_clock::now();
7c673cae
FG
4223
4224 logger->inc(l_osdc_poolstat_send);
4225}
4226
4227void Objecter::handle_get_pool_stats_reply(MGetPoolStatsReply *m)
4228{
4229 ldout(cct, 10) << "handle_get_pool_stats_reply " << *m << dendl;
4230 ceph_tid_t tid = m->get_tid();
4231
4232 unique_lock wl(rwlock);
31f18b77 4233 if (!initialized) {
7c673cae
FG
4234 m->put();
4235 return;
4236 }
4237
f67539c2 4238 auto iter = poolstat_ops.find(tid);
7c673cae
FG
4239 if (iter != poolstat_ops.end()) {
4240 PoolStatOp *op = poolstat_ops[tid];
4241 ldout(cct, 10) << "have request " << tid << " at " << op << dendl;
7c673cae
FG
4242 if (m->version > last_seen_pgmap_version) {
4243 last_seen_pgmap_version = m->version;
4244 }
f67539c2
TL
4245 op->onfinish->defer(std::move(op->onfinish), bs::error_code{},
4246 std::move(m->pool_stats), m->per_pool);
7c673cae
FG
4247 _finish_pool_stat_op(op, 0);
4248 } else {
4249 ldout(cct, 10) << "unknown request " << tid << dendl;
4250 }
4251 ldout(cct, 10) << "done" << dendl;
4252 m->put();
4253}
4254
4255int Objecter::pool_stat_op_cancel(ceph_tid_t tid, int r)
4256{
11fdf7f2 4257 ceph_assert(initialized);
7c673cae
FG
4258
4259 unique_lock wl(rwlock);
4260
f67539c2 4261 auto it = poolstat_ops.find(tid);
7c673cae
FG
4262 if (it == poolstat_ops.end()) {
4263 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4264 return -ENOENT;
4265 }
4266
4267 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4268
f67539c2 4269 auto op = it->second;
7c673cae 4270 if (op->onfinish)
f67539c2
TL
4271 op->onfinish->defer(std::move(op->onfinish), osdcode(r),
4272 bc::flat_map<std::string, pool_stat_t>{}, false);
7c673cae
FG
4273 _finish_pool_stat_op(op, r);
4274 return 0;
4275}
4276
4277void Objecter::_finish_pool_stat_op(PoolStatOp *op, int r)
4278{
4279 // rwlock is locked unique
4280
4281 poolstat_ops.erase(op->tid);
4282 logger->set(l_osdc_poolstat_active, poolstat_ops.size());
4283
4284 if (op->ontimeout && r != -ETIMEDOUT)
4285 timer.cancel_event(op->ontimeout);
4286
4287 delete op;
4288}
4289
20effc67 4290void Objecter::get_fs_stats(std::optional<int64_t> poolid,
f67539c2 4291 decltype(StatfsOp::onfinish)&& onfinish)
7c673cae
FG
4292{
4293 ldout(cct, 10) << "get_fs_stats" << dendl;
4294 unique_lock l(rwlock);
4295
f67539c2 4296 auto op = new StatfsOp;
31f18b77 4297 op->tid = ++last_tid;
f67539c2
TL
4298 op->data_pool = poolid;
4299 op->onfinish = std::move(onfinish);
7c673cae
FG
4300 if (mon_timeout > timespan(0)) {
4301 op->ontimeout = timer.add_event(mon_timeout,
4302 [this, op]() {
4303 statfs_op_cancel(op->tid,
4304 -ETIMEDOUT); });
4305 } else {
4306 op->ontimeout = 0;
4307 }
4308 statfs_ops[op->tid] = op;
4309
4310 logger->set(l_osdc_statfs_active, statfs_ops.size());
4311
4312 _fs_stats_submit(op);
4313}
4314
4315void Objecter::_fs_stats_submit(StatfsOp *op)
4316{
4317 // rwlock is locked unique
4318
4319 ldout(cct, 10) << "fs_stats_submit" << op->tid << dendl;
4320 monc->send_mon_message(new MStatfs(monc->get_fsid(), op->tid,
d2e6a577 4321 op->data_pool,
7c673cae 4322 last_seen_pgmap_version));
11fdf7f2 4323 op->last_submit = ceph::coarse_mono_clock::now();
7c673cae
FG
4324
4325 logger->inc(l_osdc_statfs_send);
4326}
4327
4328void Objecter::handle_fs_stats_reply(MStatfsReply *m)
4329{
4330 unique_lock wl(rwlock);
31f18b77 4331 if (!initialized) {
7c673cae
FG
4332 m->put();
4333 return;
4334 }
4335
4336 ldout(cct, 10) << "handle_fs_stats_reply " << *m << dendl;
4337 ceph_tid_t tid = m->get_tid();
4338
4339 if (statfs_ops.count(tid)) {
4340 StatfsOp *op = statfs_ops[tid];
4341 ldout(cct, 10) << "have request " << tid << " at " << op << dendl;
7c673cae
FG
4342 if (m->h.version > last_seen_pgmap_version)
4343 last_seen_pgmap_version = m->h.version;
f67539c2 4344 op->onfinish->defer(std::move(op->onfinish), bs::error_code{}, m->h.st);
7c673cae
FG
4345 _finish_statfs_op(op, 0);
4346 } else {
4347 ldout(cct, 10) << "unknown request " << tid << dendl;
4348 }
4349 m->put();
4350 ldout(cct, 10) << "done" << dendl;
4351}
4352
4353int Objecter::statfs_op_cancel(ceph_tid_t tid, int r)
4354{
11fdf7f2 4355 ceph_assert(initialized);
7c673cae
FG
4356
4357 unique_lock wl(rwlock);
4358
f67539c2 4359 auto it = statfs_ops.find(tid);
7c673cae
FG
4360 if (it == statfs_ops.end()) {
4361 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4362 return -ENOENT;
4363 }
4364
4365 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4366
f67539c2 4367 auto op = it->second;
7c673cae 4368 if (op->onfinish)
f67539c2 4369 op->onfinish->defer(std::move(op->onfinish), osdcode(r), ceph_statfs{});
7c673cae
FG
4370 _finish_statfs_op(op, r);
4371 return 0;
4372}
4373
4374void Objecter::_finish_statfs_op(StatfsOp *op, int r)
4375{
4376 // rwlock is locked unique
4377
4378 statfs_ops.erase(op->tid);
4379 logger->set(l_osdc_statfs_active, statfs_ops.size());
4380
4381 if (op->ontimeout && r != -ETIMEDOUT)
4382 timer.cancel_event(op->ontimeout);
4383
4384 delete op;
4385}
4386
4387// scatter/gather
4388
4389void Objecter::_sg_read_finish(vector<ObjectExtent>& extents,
f67539c2
TL
4390 vector<cb::list>& resultbl,
4391 cb::list *bl, Context *onfinish)
7c673cae
FG
4392{
4393 // all done
4394 ldout(cct, 15) << "_sg_read_finish" << dendl;
4395
4396 if (extents.size() > 1) {
4397 Striper::StripedReadResult r;
f67539c2
TL
4398 auto bit = resultbl.begin();
4399 for (auto eit = extents.begin();
7c673cae
FG
4400 eit != extents.end();
4401 ++eit, ++bit) {
4402 r.add_partial_result(cct, *bit, eit->buffer_extents);
4403 }
4404 bl->clear();
4405 r.assemble_result(cct, *bl, false);
4406 } else {
4407 ldout(cct, 15) << " only one frag" << dendl;
f67539c2 4408 *bl = std::move(resultbl[0]);
7c673cae
FG
4409 }
4410
4411 // done
4412 uint64_t bytes_read = bl->length();
4413 ldout(cct, 7) << "_sg_read_finish " << bytes_read << " bytes" << dendl;
4414
4415 if (onfinish) {
4416 onfinish->complete(bytes_read);// > 0 ? bytes_read:m->get_result());
4417 }
4418}
4419
4420
4421void Objecter::ms_handle_connect(Connection *con)
4422{
4423 ldout(cct, 10) << "ms_handle_connect " << con << dendl;
31f18b77 4424 if (!initialized)
7c673cae
FG
4425 return;
4426
4427 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON)
4428 resend_mon_ops();
4429}
4430
4431bool Objecter::ms_handle_reset(Connection *con)
4432{
31f18b77 4433 if (!initialized)
7c673cae
FG
4434 return false;
4435 if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
91327a77 4436 unique_lock wl(rwlock);
11fdf7f2
TL
4437
4438 auto priv = con->get_priv();
4439 auto session = static_cast<OSDSession*>(priv.get());
7c673cae
FG
4440 if (session) {
4441 ldout(cct, 1) << "ms_handle_reset " << con << " session " << session
4442 << " osd." << session->osd << dendl;
a8e16298
TL
4443 // the session maybe had been closed if new osdmap just handled
4444 // says the osd down
4445 if (!(initialized && osdmap->is_up(session->osd))) {
4446 ldout(cct, 1) << "ms_handle_reset aborted,initialized=" << initialized << dendl;
7c673cae
FG
4447 wl.unlock();
4448 return false;
4449 }
4450 map<uint64_t, LingerOp *> lresend;
f67539c2 4451 unique_lock sl(session->lock);
7c673cae
FG
4452 _reopen_session(session);
4453 _kick_requests(session, lresend);
4454 sl.unlock();
4455 _linger_ops_resend(lresend, wl);
4456 wl.unlock();
4457 maybe_request_map();
7c673cae
FG
4458 }
4459 return true;
4460 }
4461 return false;
4462}
4463
4464void Objecter::ms_handle_remote_reset(Connection *con)
4465{
4466 /*
4467 * treat these the same.
4468 */
4469 ms_handle_reset(con);
4470}
4471
4472bool Objecter::ms_handle_refused(Connection *con)
4473{
4474 // just log for now
4475 if (osdmap && (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD)) {
4476 int osd = osdmap->identify_osd(con->get_peer_addr());
4477 if (osd >= 0) {
4478 ldout(cct, 1) << "ms_handle_refused on osd." << osd << dendl;
4479 }
4480 }
4481 return false;
4482}
4483
7c673cae
FG
4484void Objecter::op_target_t::dump(Formatter *f) const
4485{
4486 f->dump_stream("pg") << pgid;
4487 f->dump_int("osd", osd);
4488 f->dump_stream("object_id") << base_oid;
4489 f->dump_stream("object_locator") << base_oloc;
4490 f->dump_stream("target_object_id") << target_oid;
4491 f->dump_stream("target_object_locator") << target_oloc;
4492 f->dump_int("paused", (int)paused);
4493 f->dump_int("used_replica", (int)used_replica);
4494 f->dump_int("precalc_pgid", (int)precalc_pgid);
4495}
4496
4497void Objecter::_dump_active(OSDSession *s)
4498{
f67539c2 4499 for (auto p = s->ops.begin(); p != s->ops.end(); ++p) {
7c673cae
FG
4500 Op *op = p->second;
4501 ldout(cct, 20) << op->tid << "\t" << op->target.pgid
4502 << "\tosd." << (op->session ? op->session->osd : -1)
4503 << "\t" << op->target.base_oid
4504 << "\t" << op->ops << dendl;
4505 }
4506}
4507
4508void Objecter::_dump_active()
4509{
31f18b77 4510 ldout(cct, 20) << "dump_active .. " << num_homeless_ops << " homeless"
7c673cae 4511 << dendl;
f67539c2 4512 for (auto siter = osd_sessions.begin();
7c673cae 4513 siter != osd_sessions.end(); ++siter) {
f67539c2
TL
4514 auto s = siter->second;
4515 shared_lock sl(s->lock);
7c673cae
FG
4516 _dump_active(s);
4517 sl.unlock();
4518 }
4519 _dump_active(homeless_session);
4520}
4521
4522void Objecter::dump_active()
4523{
4524 shared_lock rl(rwlock);
4525 _dump_active();
4526 rl.unlock();
4527}
4528
4529void Objecter::dump_requests(Formatter *fmt)
4530{
4531 // Read-lock on Objecter held here
4532 fmt->open_object_section("requests");
4533 dump_ops(fmt);
4534 dump_linger_ops(fmt);
4535 dump_pool_ops(fmt);
4536 dump_pool_stat_ops(fmt);
4537 dump_statfs_ops(fmt);
4538 dump_command_ops(fmt);
4539 fmt->close_section(); // requests object
4540}
4541
4542void Objecter::_dump_ops(const OSDSession *s, Formatter *fmt)
4543{
f67539c2 4544 for (auto p = s->ops.begin(); p != s->ops.end(); ++p) {
7c673cae 4545 Op *op = p->second;
f67539c2 4546 auto age = std::chrono::duration<double>(ceph::coarse_mono_clock::now() - op->stamp);
7c673cae
FG
4547 fmt->open_object_section("op");
4548 fmt->dump_unsigned("tid", op->tid);
4549 op->target.dump(fmt);
4550 fmt->dump_stream("last_sent") << op->stamp;
92f5a8d4 4551 fmt->dump_float("age", age.count());
7c673cae
FG
4552 fmt->dump_int("attempts", op->attempts);
4553 fmt->dump_stream("snapid") << op->snapid;
4554 fmt->dump_stream("snap_context") << op->snapc;
4555 fmt->dump_stream("mtime") << op->mtime;
4556
4557 fmt->open_array_section("osd_ops");
f67539c2 4558 for (auto it = op->ops.begin(); it != op->ops.end(); ++it) {
7c673cae
FG
4559 fmt->dump_stream("osd_op") << *it;
4560 }
4561 fmt->close_section(); // osd_ops array
4562
4563 fmt->close_section(); // op object
4564 }
4565}
4566
4567void Objecter::dump_ops(Formatter *fmt)
4568{
4569 // Read-lock on Objecter held
4570 fmt->open_array_section("ops");
f67539c2 4571 for (auto siter = osd_sessions.begin();
7c673cae
FG
4572 siter != osd_sessions.end(); ++siter) {
4573 OSDSession *s = siter->second;
f67539c2 4574 shared_lock sl(s->lock);
7c673cae
FG
4575 _dump_ops(s, fmt);
4576 sl.unlock();
4577 }
4578 _dump_ops(homeless_session, fmt);
4579 fmt->close_section(); // ops array
4580}
4581
4582void Objecter::_dump_linger_ops(const OSDSession *s, Formatter *fmt)
4583{
f67539c2
TL
4584 for (auto p = s->linger_ops.begin(); p != s->linger_ops.end(); ++p) {
4585 auto op = p->second;
7c673cae
FG
4586 fmt->open_object_section("linger_op");
4587 fmt->dump_unsigned("linger_id", op->linger_id);
4588 op->target.dump(fmt);
4589 fmt->dump_stream("snapid") << op->snap;
4590 fmt->dump_stream("registered") << op->registered;
4591 fmt->close_section(); // linger_op object
4592 }
4593}
4594
4595void Objecter::dump_linger_ops(Formatter *fmt)
4596{
4597 // We have a read-lock on the objecter
4598 fmt->open_array_section("linger_ops");
f67539c2 4599 for (auto siter = osd_sessions.begin();
7c673cae 4600 siter != osd_sessions.end(); ++siter) {
f67539c2
TL
4601 auto s = siter->second;
4602 shared_lock sl(s->lock);
7c673cae
FG
4603 _dump_linger_ops(s, fmt);
4604 sl.unlock();
4605 }
4606 _dump_linger_ops(homeless_session, fmt);
4607 fmt->close_section(); // linger_ops array
4608}
4609
4610void Objecter::_dump_command_ops(const OSDSession *s, Formatter *fmt)
4611{
f67539c2
TL
4612 for (auto p = s->command_ops.begin(); p != s->command_ops.end(); ++p) {
4613 auto op = p->second;
7c673cae
FG
4614 fmt->open_object_section("command_op");
4615 fmt->dump_unsigned("command_id", op->tid);
4616 fmt->dump_int("osd", op->session ? op->session->osd : -1);
4617 fmt->open_array_section("command");
f67539c2 4618 for (auto q = op->cmd.begin(); q != op->cmd.end(); ++q)
7c673cae
FG
4619 fmt->dump_string("word", *q);
4620 fmt->close_section();
4621 if (op->target_osd >= 0)
4622 fmt->dump_int("target_osd", op->target_osd);
4623 else
4624 fmt->dump_stream("target_pg") << op->target_pg;
4625 fmt->close_section(); // command_op object
4626 }
4627}
4628
4629void Objecter::dump_command_ops(Formatter *fmt)
4630{
4631 // We have a read-lock on the Objecter here
4632 fmt->open_array_section("command_ops");
f67539c2 4633 for (auto siter = osd_sessions.begin();
7c673cae 4634 siter != osd_sessions.end(); ++siter) {
f67539c2
TL
4635 auto s = siter->second;
4636 shared_lock sl(s->lock);
7c673cae
FG
4637 _dump_command_ops(s, fmt);
4638 sl.unlock();
4639 }
4640 _dump_command_ops(homeless_session, fmt);
4641 fmt->close_section(); // command_ops array
4642}
4643
4644void Objecter::dump_pool_ops(Formatter *fmt) const
4645{
4646 fmt->open_array_section("pool_ops");
f67539c2
TL
4647 for (auto p = pool_ops.begin(); p != pool_ops.end(); ++p) {
4648 auto op = p->second;
7c673cae
FG
4649 fmt->open_object_section("pool_op");
4650 fmt->dump_unsigned("tid", op->tid);
4651 fmt->dump_int("pool", op->pool);
4652 fmt->dump_string("name", op->name);
4653 fmt->dump_int("operation_type", op->pool_op);
7c673cae
FG
4654 fmt->dump_unsigned("crush_rule", op->crush_rule);
4655 fmt->dump_stream("snapid") << op->snapid;
4656 fmt->dump_stream("last_sent") << op->last_submit;
4657 fmt->close_section(); // pool_op object
4658 }
4659 fmt->close_section(); // pool_ops array
4660}
4661
4662void Objecter::dump_pool_stat_ops(Formatter *fmt) const
4663{
4664 fmt->open_array_section("pool_stat_ops");
f67539c2 4665 for (auto p = poolstat_ops.begin();
7c673cae
FG
4666 p != poolstat_ops.end();
4667 ++p) {
4668 PoolStatOp *op = p->second;
4669 fmt->open_object_section("pool_stat_op");
4670 fmt->dump_unsigned("tid", op->tid);
4671 fmt->dump_stream("last_sent") << op->last_submit;
4672
4673 fmt->open_array_section("pools");
f67539c2
TL
4674 for (const auto& it : op->pools) {
4675 fmt->dump_string("pool", it);
7c673cae
FG
4676 }
4677 fmt->close_section(); // pools array
4678
4679 fmt->close_section(); // pool_stat_op object
4680 }
4681 fmt->close_section(); // pool_stat_ops array
4682}
4683
4684void Objecter::dump_statfs_ops(Formatter *fmt) const
4685{
4686 fmt->open_array_section("statfs_ops");
f67539c2
TL
4687 for (auto p = statfs_ops.begin(); p != statfs_ops.end(); ++p) {
4688 auto op = p->second;
7c673cae
FG
4689 fmt->open_object_section("statfs_op");
4690 fmt->dump_unsigned("tid", op->tid);
4691 fmt->dump_stream("last_sent") << op->last_submit;
4692 fmt->close_section(); // statfs_op object
4693 }
4694 fmt->close_section(); // statfs_ops array
4695}
4696
4697Objecter::RequestStateHook::RequestStateHook(Objecter *objecter) :
4698 m_objecter(objecter)
4699{
4700}
4701
9f95a23c
TL
4702int Objecter::RequestStateHook::call(std::string_view command,
4703 const cmdmap_t& cmdmap,
4704 Formatter *f,
4705 std::ostream& ss,
f67539c2 4706 cb::list& out)
7c673cae 4707{
7c673cae
FG
4708 shared_lock rl(m_objecter->rwlock);
4709 m_objecter->dump_requests(f);
9f95a23c 4710 return 0;
7c673cae
FG
4711}
4712
f67539c2 4713void Objecter::blocklist_self(bool set)
7c673cae 4714{
f67539c2 4715 ldout(cct, 10) << "blocklist_self " << (set ? "add" : "rm") << dendl;
7c673cae
FG
4716
4717 vector<string> cmd;
f67539c2 4718 cmd.push_back("{\"prefix\":\"osd blocklist\", ");
7c673cae 4719 if (set)
f67539c2 4720 cmd.push_back("\"blocklistop\":\"add\",");
7c673cae 4721 else
f67539c2 4722 cmd.push_back("\"blocklistop\":\"rm\",");
7c673cae 4723 stringstream ss;
f67539c2 4724 // this is somewhat imprecise in that we are blocklisting our first addr only
11fdf7f2 4725 ss << messenger->get_myaddrs().front().get_legacy_str();
7c673cae
FG
4726 cmd.push_back("\"addr\":\"" + ss.str() + "\"");
4727
f67539c2 4728 auto m = new MMonCommand(monc->get_fsid());
7c673cae
FG
4729 m->cmd = cmd;
4730
f67539c2
TL
4731 // NOTE: no fallback to legacy blacklist command implemented here
4732 // since this is only used for test code.
4733
7c673cae
FG
4734 monc->send_mon_message(m);
4735}
4736
4737// commands
4738
4739void Objecter::handle_command_reply(MCommandReply *m)
4740{
4741 unique_lock wl(rwlock);
31f18b77 4742 if (!initialized) {
7c673cae
FG
4743 m->put();
4744 return;
4745 }
4746
4747 ConnectionRef con = m->get_connection();
11fdf7f2
TL
4748 auto priv = con->get_priv();
4749 auto s = static_cast<OSDSession*>(priv.get());
7c673cae
FG
4750 if (!s || s->con != con) {
4751 ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
4752 m->put();
7c673cae
FG
4753 return;
4754 }
4755
f67539c2
TL
4756 shared_lock sl(s->lock);
4757 auto p = s->command_ops.find(m->get_tid());
7c673cae
FG
4758 if (p == s->command_ops.end()) {
4759 ldout(cct, 10) << "handle_command_reply tid " << m->get_tid()
4760 << " not found" << dendl;
4761 m->put();
4762 sl.unlock();
7c673cae
FG
4763 return;
4764 }
4765
4766 CommandOp *c = p->second;
4767 if (!c->session ||
4768 m->get_connection() != c->session->con) {
4769 ldout(cct, 10) << "handle_command_reply tid " << m->get_tid()
4770 << " got reply from wrong connection "
4771 << m->get_connection() << " " << m->get_source_inst()
4772 << dendl;
4773 m->put();
4774 sl.unlock();
7c673cae
FG
4775 return;
4776 }
f67539c2 4777
9f95a23c
TL
4778 if (m->r == -EAGAIN) {
4779 ldout(cct,10) << __func__ << " tid " << m->get_tid()
4780 << " got EAGAIN, requesting map and resending" << dendl;
4781 // NOTE: This might resend twice... once now, and once again when
4782 // we get an updated osdmap and the PG is found to have moved.
4783 _maybe_request_map();
4784 _send_command(c);
4785 m->put();
4786 sl.unlock();
4787 return;
4788 }
4789
7c673cae
FG
4790 sl.unlock();
4791
f67539c2
TL
4792 unique_lock sul(s->lock);
4793 _finish_command(c, m->r < 0 ? bs::error_code(-m->r, osd_category()) :
4794 bs::error_code(), std::move(m->rs),
4795 std::move(m->get_data()));
28e407b8
AA
4796 sul.unlock();
4797
7c673cae 4798 m->put();
7c673cae
FG
4799}
4800
522d829b
TL
4801Objecter::LingerOp::LingerOp(Objecter *o, uint64_t linger_id)
4802 : objecter(o),
4803 linger_id(linger_id),
4804 watch_lock(ceph::make_shared_mutex(
4805 fmt::format("LingerOp::watch_lock #{}", linger_id)))
4806{}
4807
7c673cae
FG
4808void Objecter::submit_command(CommandOp *c, ceph_tid_t *ptid)
4809{
4810 shunique_lock sul(rwlock, ceph::acquire_unique);
4811
31f18b77 4812 ceph_tid_t tid = ++last_tid;
7c673cae
FG
4813 ldout(cct, 10) << "_submit_command " << tid << " " << c->cmd << dendl;
4814 c->tid = tid;
4815
4816 {
f67539c2 4817 unique_lock hs_wl(homeless_session->lock);
7c673cae
FG
4818 _session_command_op_assign(homeless_session, c);
4819 }
4820
4821 _calc_command_target(c, sul);
4822 _assign_command_session(c, sul);
4823 if (osd_timeout > timespan(0)) {
4824 c->ontimeout = timer.add_event(osd_timeout,
4825 [this, c, tid]() {
f67539c2
TL
4826 command_op_cancel(
4827 c->session, tid,
4828 osdc_errc::timed_out); });
7c673cae
FG
4829 }
4830
4831 if (!c->session->is_homeless()) {
4832 _send_command(c);
4833 } else {
4834 _maybe_request_map();
4835 }
4836 if (c->map_check_error)
4837 _send_command_map_check(c);
f67539c2
TL
4838 if (ptid)
4839 *ptid = tid;
7c673cae
FG
4840
4841 logger->inc(l_osdc_command_active);
4842}
4843
f67539c2
TL
4844int Objecter::_calc_command_target(CommandOp *c,
4845 shunique_lock<ceph::shared_mutex>& sul)
7c673cae 4846{
11fdf7f2 4847 ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
7c673cae
FG
4848
4849 c->map_check_error = 0;
4850
4851 // ignore overlays, just like we do with pg ops
4852 c->target.flags |= CEPH_OSD_FLAG_IGNORE_OVERLAY;
4853
4854 if (c->target_osd >= 0) {
4855 if (!osdmap->exists(c->target_osd)) {
4856 c->map_check_error = -ENOENT;
4857 c->map_check_error_str = "osd dne";
4858 c->target.osd = -1;
4859 return RECALC_OP_TARGET_OSD_DNE;
4860 }
4861 if (osdmap->is_down(c->target_osd)) {
4862 c->map_check_error = -ENXIO;
4863 c->map_check_error_str = "osd down";
4864 c->target.osd = -1;
4865 return RECALC_OP_TARGET_OSD_DOWN;
4866 }
4867 c->target.osd = c->target_osd;
4868 } else {
4869 int ret = _calc_target(&(c->target), nullptr, true);
4870 if (ret == RECALC_OP_TARGET_POOL_DNE) {
4871 c->map_check_error = -ENOENT;
4872 c->map_check_error_str = "pool dne";
4873 c->target.osd = -1;
4874 return ret;
4875 } else if (ret == RECALC_OP_TARGET_OSD_DOWN) {
4876 c->map_check_error = -ENXIO;
4877 c->map_check_error_str = "osd down";
4878 c->target.osd = -1;
4879 return ret;
4880 }
4881 }
4882
4883 OSDSession *s;
4884 int r = _get_session(c->target.osd, &s, sul);
11fdf7f2 4885 ceph_assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
7c673cae
FG
4886
4887 if (c->session != s) {
4888 put_session(s);
4889 return RECALC_OP_TARGET_NEED_RESEND;
4890 }
4891
4892 put_session(s);
4893
4894 ldout(cct, 20) << "_recalc_command_target " << c->tid << " no change, "
4895 << c->session << dendl;
4896
4897 return RECALC_OP_TARGET_NO_ACTION;
4898}
4899
4900void Objecter::_assign_command_session(CommandOp *c,
f67539c2 4901 shunique_lock<ceph::shared_mutex>& sul)
7c673cae 4902{
11fdf7f2 4903 ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
7c673cae
FG
4904
4905 OSDSession *s;
4906 int r = _get_session(c->target.osd, &s, sul);
11fdf7f2 4907 ceph_assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
7c673cae
FG
4908
4909 if (c->session != s) {
4910 if (c->session) {
4911 OSDSession *cs = c->session;
f67539c2 4912 unique_lock csl(cs->lock);
7c673cae
FG
4913 _session_command_op_remove(c->session, c);
4914 csl.unlock();
4915 }
f67539c2 4916 unique_lock sl(s->lock);
7c673cae
FG
4917 _session_command_op_assign(s, c);
4918 }
4919
4920 put_session(s);
4921}
4922
4923void Objecter::_send_command(CommandOp *c)
4924{
4925 ldout(cct, 10) << "_send_command " << c->tid << dendl;
11fdf7f2
TL
4926 ceph_assert(c->session);
4927 ceph_assert(c->session->con);
f67539c2 4928 auto m = new MCommand(monc->monmap.fsid);
7c673cae
FG
4929 m->cmd = c->cmd;
4930 m->set_data(c->inbl);
4931 m->set_tid(c->tid);
4932 c->session->con->send_message(m);
4933 logger->inc(l_osdc_command_send);
4934}
4935
f67539c2
TL
4936int Objecter::command_op_cancel(OSDSession *s, ceph_tid_t tid,
4937 bs::error_code ec)
7c673cae 4938{
11fdf7f2 4939 ceph_assert(initialized);
7c673cae
FG
4940
4941 unique_lock wl(rwlock);
4942
f67539c2 4943 auto it = s->command_ops.find(tid);
7c673cae
FG
4944 if (it == s->command_ops.end()) {
4945 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4946 return -ENOENT;
4947 }
4948
4949 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4950
4951 CommandOp *op = it->second;
4952 _command_cancel_map_check(op);
f67539c2
TL
4953 unique_lock sl(op->session->lock);
4954 _finish_command(op, ec, {}, {});
28e407b8 4955 sl.unlock();
7c673cae
FG
4956 return 0;
4957}
4958
f67539c2
TL
4959void Objecter::_finish_command(CommandOp *c, bs::error_code ec,
4960 string&& rs, cb::list&& bl)
7c673cae
FG
4961{
4962 // rwlock is locked unique
28e407b8 4963 // session lock is locked
7c673cae 4964
f67539c2 4965 ldout(cct, 10) << "_finish_command " << c->tid << " = " << ec << " "
7c673cae 4966 << rs << dendl;
f67539c2 4967
7c673cae 4968 if (c->onfinish)
f67539c2 4969 c->onfinish->defer(std::move(c->onfinish), ec, std::move(rs), std::move(bl));
7c673cae 4970
f67539c2 4971 if (c->ontimeout && ec != bs::errc::timed_out)
7c673cae
FG
4972 timer.cancel_event(c->ontimeout);
4973
7c673cae 4974 _session_command_op_remove(c->session, c);
7c673cae
FG
4975
4976 c->put();
4977
4978 logger->dec(l_osdc_command_active);
4979}
4980
4981Objecter::OSDSession::~OSDSession()
4982{
4983 // Caller is responsible for re-assigning or
4984 // destroying any ops that were assigned to us
11fdf7f2
TL
4985 ceph_assert(ops.empty());
4986 ceph_assert(linger_ops.empty());
4987 ceph_assert(command_ops.empty());
7c673cae
FG
4988}
4989
f67539c2
TL
4990Objecter::Objecter(CephContext *cct,
4991 Messenger *m, MonClient *mc,
4992 boost::asio::io_context& service) :
4993 Dispatcher(cct), messenger(m), monc(mc), service(service)
f91f0fd5
TL
4994{
4995 mon_timeout = cct->_conf.get_val<std::chrono::seconds>("rados_mon_op_timeout");
4996 osd_timeout = cct->_conf.get_val<std::chrono::seconds>("rados_osd_op_timeout");
4997}
9f95a23c 4998
7c673cae
FG
4999Objecter::~Objecter()
5000{
11fdf7f2
TL
5001 ceph_assert(homeless_session->get_nref() == 1);
5002 ceph_assert(num_homeless_ops == 0);
7c673cae
FG
5003 homeless_session->put();
5004
11fdf7f2
TL
5005 ceph_assert(osd_sessions.empty());
5006 ceph_assert(poolstat_ops.empty());
5007 ceph_assert(statfs_ops.empty());
5008 ceph_assert(pool_ops.empty());
5009 ceph_assert(waiting_for_map.empty());
5010 ceph_assert(linger_ops.empty());
5011 ceph_assert(check_latest_map_lingers.empty());
5012 ceph_assert(check_latest_map_ops.empty());
5013 ceph_assert(check_latest_map_commands.empty());
7c673cae 5014
11fdf7f2
TL
5015 ceph_assert(!m_request_state_hook);
5016 ceph_assert(!logger);
7c673cae
FG
5017}
5018
5019/**
5020 * Wait until this OSD map epoch is received before
5021 * sending any more operations to OSDs. Use this
5022 * when it is known that the client can't trust
5023 * anything from before this epoch (e.g. due to
f67539c2 5024 * client blocklist at this epoch).
7c673cae
FG
5025 */
5026void Objecter::set_epoch_barrier(epoch_t epoch)
5027{
5028 unique_lock wl(rwlock);
5029
5030 ldout(cct, 7) << __func__ << ": barrier " << epoch << " (was "
5031 << epoch_barrier << ") current epoch " << osdmap->get_epoch()
5032 << dendl;
5033 if (epoch > epoch_barrier) {
5034 epoch_barrier = epoch;
5035 _maybe_request_map();
5036 }
5037}
5038
5039
5040
5041hobject_t Objecter::enumerate_objects_begin()
5042{
5043 return hobject_t();
5044}
5045
5046hobject_t Objecter::enumerate_objects_end()
5047{
5048 return hobject_t::get_max();
5049}
5050
f67539c2
TL
5051template<typename T>
5052struct EnumerationContext {
5053 Objecter* objecter;
7c673cae 5054 const hobject_t end;
f67539c2
TL
5055 const cb::list filter;
5056 uint32_t max;
5057 const object_locator_t oloc;
5058 std::vector<T> ls;
5059private:
5060 fu2::unique_function<void(bs::error_code,
5061 std::vector<T>,
5062 hobject_t) &&> on_finish;
5063public:
5064 epoch_t epoch = 0;
5065 int budget = -1;
5066
5067 EnumerationContext(Objecter* objecter,
5068 hobject_t end, cb::list filter,
5069 uint32_t max, object_locator_t oloc,
5070 decltype(on_finish) on_finish)
5071 : objecter(objecter), end(std::move(end)), filter(std::move(filter)),
5072 max(max), oloc(std::move(oloc)), on_finish(std::move(on_finish)) {}
5073
5074 void operator()(bs::error_code ec,
5075 std::vector<T> v,
5076 hobject_t h) && {
5077 if (budget >= 0) {
5078 objecter->put_op_budget_bytes(budget);
5079 budget = -1;
5080 }
5081
5082 std::move(on_finish)(ec, std::move(v), std::move(h));
5083 }
5084};
7c673cae 5085
f67539c2
TL
5086template<typename T>
5087struct CB_EnumerateReply {
5088 cb::list bl;
7c673cae 5089
f67539c2
TL
5090 Objecter* objecter;
5091 std::unique_ptr<EnumerationContext<T>> ctx;
7c673cae 5092
f67539c2
TL
5093 CB_EnumerateReply(Objecter* objecter,
5094 std::unique_ptr<EnumerationContext<T>>&& ctx) :
5095 objecter(objecter), ctx(std::move(ctx)) {}
5096
5097 void operator()(bs::error_code ec) {
5098 objecter->_enumerate_reply(std::move(bl), ec, std::move(ctx));
7c673cae
FG
5099 }
5100};
5101
f67539c2 5102template<typename T>
7c673cae 5103void Objecter::enumerate_objects(
f67539c2
TL
5104 int64_t pool_id,
5105 std::string_view ns,
5106 hobject_t start,
5107 hobject_t end,
5108 const uint32_t max,
5109 const cb::list& filter_bl,
5110 fu2::unique_function<void(bs::error_code,
5111 std::vector<T>,
5112 hobject_t) &&> on_finish) {
7c673cae
FG
5113 if (!end.is_max() && start > end) {
5114 lderr(cct) << __func__ << ": start " << start << " > end " << end << dendl;
f67539c2 5115 std::move(on_finish)(osdc_errc::precondition_violated, {}, {});
7c673cae
FG
5116 return;
5117 }
5118
5119 if (max < 1) {
5120 lderr(cct) << __func__ << ": result size may not be zero" << dendl;
f67539c2 5121 std::move(on_finish)(osdc_errc::precondition_violated, {}, {});
7c673cae
FG
5122 return;
5123 }
5124
5125 if (start.is_max()) {
f67539c2 5126 std::move(on_finish)({}, {}, {});
7c673cae
FG
5127 return;
5128 }
5129
5130 shared_lock rl(rwlock);
11fdf7f2 5131 ceph_assert(osdmap->get_epoch());
7c673cae
FG
5132 if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) {
5133 rl.unlock();
5134 lderr(cct) << __func__ << ": SORTBITWISE cluster flag not set" << dendl;
f67539c2 5135 std::move(on_finish)(osdc_errc::not_supported, {}, {});
7c673cae
FG
5136 return;
5137 }
f67539c2 5138 const pg_pool_t* p = osdmap->get_pg_pool(pool_id);
7c673cae
FG
5139 if (!p) {
5140 lderr(cct) << __func__ << ": pool " << pool_id << " DNE in osd epoch "
5141 << osdmap->get_epoch() << dendl;
5142 rl.unlock();
f67539c2 5143 std::move(on_finish)(osdc_errc::pool_dne, {}, {});
7c673cae
FG
5144 return;
5145 } else {
5146 rl.unlock();
5147 }
5148
f67539c2
TL
5149 _issue_enumerate(start,
5150 std::make_unique<EnumerationContext<T>>(
5151 this, std::move(end), filter_bl,
5152 max, object_locator_t{pool_id, ns},
5153 std::move(on_finish)));
5154}
5155
5156template
5157void Objecter::enumerate_objects<librados::ListObjectImpl>(
5158 int64_t pool_id,
5159 std::string_view ns,
5160 hobject_t start,
5161 hobject_t end,
5162 const uint32_t max,
5163 const cb::list& filter_bl,
5164 fu2::unique_function<void(bs::error_code,
5165 std::vector<librados::ListObjectImpl>,
5166 hobject_t) &&> on_finish);
5167
5168template
5169void Objecter::enumerate_objects<neorados::Entry>(
5170 int64_t pool_id,
5171 std::string_view ns,
5172 hobject_t start,
5173 hobject_t end,
5174 const uint32_t max,
5175 const cb::list& filter_bl,
5176 fu2::unique_function<void(bs::error_code,
5177 std::vector<neorados::Entry>,
5178 hobject_t) &&> on_finish);
5179
5180
5181
5182template<typename T>
5183void Objecter::_issue_enumerate(hobject_t start,
5184 std::unique_ptr<EnumerationContext<T>> ctx) {
7c673cae 5185 ObjectOperation op;
f67539c2
TL
5186 auto c = ctx.get();
5187 op.pg_nls(c->max, c->filter, start, osdmap->get_epoch());
5188 auto on_ack = std::make_unique<CB_EnumerateReply<T>>(this, std::move(ctx));
5189 // I hate having to do this. Try to find a cleaner way
5190 // later.
5191 auto epoch = &c->epoch;
5192 auto budget = &c->budget;
5193 auto pbl = &on_ack->bl;
7c673cae
FG
5194
5195 // Issue. See you later in _enumerate_reply
f67539c2
TL
5196 pg_read(start.get_hash(),
5197 c->oloc, op, pbl, 0,
5198 Op::OpComp::create(service.get_executor(),
5199 [c = std::move(on_ack)]
5200 (bs::error_code ec) mutable {
5201 (*c)(ec);
5202 }), epoch, budget);
5203}
5204
5205template
5206void Objecter::_issue_enumerate<librados::ListObjectImpl>(
5207 hobject_t start,
5208 std::unique_ptr<EnumerationContext<librados::ListObjectImpl>> ctx);
5209template
5210void Objecter::_issue_enumerate<neorados::Entry>(
5211 hobject_t start, std::unique_ptr<EnumerationContext<neorados::Entry>> ctx);
5212
5213template<typename T>
7c673cae 5214void Objecter::_enumerate_reply(
f67539c2
TL
5215 cb::list&& bl,
5216 bs::error_code ec,
5217 std::unique_ptr<EnumerationContext<T>>&& ctx)
5218{
5219 if (ec) {
5220 std::move(*ctx)(ec, {}, {});
7c673cae
FG
5221 return;
5222 }
5223
7c673cae 5224 // Decode the results
11fdf7f2 5225 auto iter = bl.cbegin();
f67539c2
TL
5226 pg_nls_response_template<T> response;
5227
5228 try {
5229 response.decode(iter);
5230 if (!iter.end()) {
5231 // extra_info isn't used anywhere. We do this solely to preserve
5232 // backward compatibility
5233 cb::list legacy_extra_info;
5234 decode(legacy_extra_info, iter);
5235 }
5236 } catch (const bs::system_error& e) {
5237 std::move(*ctx)(e.code(), {}, {});
5238 return;
5239 }
7c673cae 5240
f67539c2
TL
5241 shared_lock rl(rwlock);
5242 auto pool = osdmap->get_pg_pool(ctx->oloc.get_pool());
5243 rl.unlock();
5244 if (!pool) {
5245 // pool is gone, drop any results which are now meaningless.
5246 std::move(*ctx)(osdc_errc::pool_dne, {}, {});
5247 return;
7c673cae
FG
5248 }
5249
f67539c2
TL
5250 hobject_t next;
5251 if ((response.handle <= ctx->end)) {
5252 next = response.handle;
7c673cae 5253 } else {
f67539c2 5254 next = ctx->end;
7c673cae
FG
5255
5256 // drop anything after 'end'
7c673cae
FG
5257 while (!response.entries.empty()) {
5258 uint32_t hash = response.entries.back().locator.empty() ?
5259 pool->hash_key(response.entries.back().oid,
5260 response.entries.back().nspace) :
5261 pool->hash_key(response.entries.back().locator,
5262 response.entries.back().nspace);
5263 hobject_t last(response.entries.back().oid,
5264 response.entries.back().locator,
5265 CEPH_NOSNAP,
5266 hash,
f67539c2 5267 ctx->oloc.get_pool(),
7c673cae 5268 response.entries.back().nspace);
f67539c2 5269 if (last < ctx->end)
7c673cae 5270 break;
7c673cae
FG
5271 response.entries.pop_back();
5272 }
7c673cae 5273 }
f67539c2
TL
5274
5275 if (response.entries.size() <= ctx->max) {
5276 ctx->max -= response.entries.size();
5277 std::move(response.entries.begin(), response.entries.end(),
5278 std::back_inserter(ctx->ls));
5279 } else {
5280 auto i = response.entries.begin();
5281 while (ctx->max > 0) {
5282 ctx->ls.push_back(std::move(*i));
5283 --(ctx->max);
5284 ++i;
5285 }
5286 uint32_t hash =
5287 i->locator.empty() ?
5288 pool->hash_key(i->oid, i->nspace) :
5289 pool->hash_key(i->locator, i->nspace);
5290
5291 next = hobject_t{i->oid, i->locator,
5292 CEPH_NOSNAP,
5293 hash,
5294 ctx->oloc.get_pool(),
5295 i->nspace};
7c673cae
FG
5296 }
5297
f67539c2
TL
5298 if (next == ctx->end || ctx->max == 0) {
5299 std::move(*ctx)(ec, std::move(ctx->ls), std::move(next));
5300 } else {
5301 _issue_enumerate(next, std::move(ctx));
5302 }
7c673cae
FG
5303}
5304
f67539c2
TL
5305template
5306void Objecter::_enumerate_reply<librados::ListObjectImpl>(
5307 cb::list&& bl,
5308 bs::error_code ec,
5309 std::unique_ptr<EnumerationContext<librados::ListObjectImpl>>&& ctx);
5310
5311template
5312void Objecter::_enumerate_reply<neorados::Entry>(
5313 cb::list&& bl,
5314 bs::error_code ec,
5315 std::unique_ptr<EnumerationContext<neorados::Entry>>&& ctx);
5316
7c673cae
FG
5317namespace {
5318 using namespace librados;
5319
5320 template <typename T>
f67539c2 5321 void do_decode(std::vector<T>& items, std::vector<cb::list>& bls)
7c673cae
FG
5322 {
5323 for (auto bl : bls) {
11fdf7f2 5324 auto p = bl.cbegin();
7c673cae
FG
5325 T t;
5326 decode(t, p);
5327 items.push_back(t);
5328 }
5329 }
5330
5331 struct C_ObjectOperation_scrub_ls : public Context {
f67539c2
TL
5332 cb::list bl;
5333 uint32_t* interval;
7c673cae
FG
5334 std::vector<inconsistent_obj_t> *objects = nullptr;
5335 std::vector<inconsistent_snapset_t> *snapsets = nullptr;
f67539c2 5336 int* rval;
7c673cae 5337
f67539c2
TL
5338 C_ObjectOperation_scrub_ls(uint32_t* interval,
5339 std::vector<inconsistent_obj_t>* objects,
5340 int* rval)
7c673cae 5341 : interval(interval), objects(objects), rval(rval) {}
f67539c2
TL
5342 C_ObjectOperation_scrub_ls(uint32_t* interval,
5343 std::vector<inconsistent_snapset_t>* snapsets,
5344 int* rval)
7c673cae
FG
5345 : interval(interval), snapsets(snapsets), rval(rval) {}
5346 void finish(int r) override {
5347 if (r < 0 && r != -EAGAIN) {
5348 if (rval)
5349 *rval = r;
5350 return;
5351 }
5352
5353 if (rval)
5354 *rval = 0;
5355
5356 try {
5357 decode();
f67539c2 5358 } catch (cb::error&) {
7c673cae
FG
5359 if (rval)
5360 *rval = -EIO;
5361 }
5362 }
5363 private:
5364 void decode() {
5365 scrub_ls_result_t result;
11fdf7f2 5366 auto p = bl.cbegin();
7c673cae
FG
5367 result.decode(p);
5368 *interval = result.interval;
5369 if (objects) {
5370 do_decode(*objects, result.vals);
5371 } else {
5372 do_decode(*snapsets, result.vals);
5373 }
5374 }
5375 };
5376
5377 template <typename T>
f67539c2 5378 void do_scrub_ls(::ObjectOperation* op,
7c673cae
FG
5379 const scrub_ls_arg_t& arg,
5380 std::vector<T> *items,
f67539c2
TL
5381 uint32_t* interval,
5382 int* rval)
7c673cae
FG
5383 {
5384 OSDOp& osd_op = op->add_op(CEPH_OSD_OP_SCRUBLS);
5385 op->flags |= CEPH_OSD_FLAG_PGOP;
11fdf7f2 5386 ceph_assert(interval);
7c673cae
FG
5387 arg.encode(osd_op.indata);
5388 unsigned p = op->ops.size() - 1;
f67539c2
TL
5389 auto h = new C_ObjectOperation_scrub_ls{interval, items, rval};
5390 op->set_handler(h);
7c673cae
FG
5391 op->out_bl[p] = &h->bl;
5392 op->out_rval[p] = rval;
5393 }
5394}
5395
5396void ::ObjectOperation::scrub_ls(const librados::object_id_t& start_after,
5397 uint64_t max_to_get,
f67539c2
TL
5398 std::vector<librados::inconsistent_obj_t>* objects,
5399 uint32_t* interval,
5400 int* rval)
7c673cae
FG
5401{
5402 scrub_ls_arg_t arg = {*interval, 0, start_after, max_to_get};
5403 do_scrub_ls(this, arg, objects, interval, rval);
5404}
5405
5406void ::ObjectOperation::scrub_ls(const librados::object_id_t& start_after,
5407 uint64_t max_to_get,
5408 std::vector<librados::inconsistent_snapset_t> *snapsets,
5409 uint32_t *interval,
5410 int *rval)
5411{
5412 scrub_ls_arg_t arg = {*interval, 1, start_after, max_to_get};
5413 do_scrub_ls(this, arg, snapsets, interval, rval);
5414}