]> git.proxmox.com Git - ceph.git/blame - ceph/src/osdc/Objecter.cc
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / osdc / Objecter.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
f67539c2 15#include <algorithm>
9f95a23c
TL
16#include <cerrno>
17
7c673cae
FG
18#include "Objecter.h"
19#include "osd/OSDMap.h"
f67539c2 20#include "osd/error_code.h"
7c673cae
FG
21#include "Filer.h"
22
23#include "mon/MonClient.h"
f67539c2 24#include "mon/error_code.h"
7c673cae
FG
25
26#include "msg/Messenger.h"
27#include "msg/Message.h"
28
29#include "messages/MPing.h"
30#include "messages/MOSDOp.h"
31#include "messages/MOSDOpReply.h"
32#include "messages/MOSDBackoff.h"
33#include "messages/MOSDMap.h"
34
35#include "messages/MPoolOp.h"
36#include "messages/MPoolOpReply.h"
37
38#include "messages/MGetPoolStats.h"
39#include "messages/MGetPoolStatsReply.h"
40#include "messages/MStatfs.h"
41#include "messages/MStatfsReply.h"
42
43#include "messages/MMonCommand.h"
44
45#include "messages/MCommand.h"
46#include "messages/MCommandReply.h"
47
48#include "messages/MWatchNotify.h"
49
7c673cae 50
f67539c2 51#include "common/Cond.h"
7c673cae
FG
52#include "common/config.h"
53#include "common/perf_counters.h"
54#include "common/scrub_types.h"
55#include "include/str_list.h"
56#include "common/errno.h"
57#include "common/EventTrace.h"
f67539c2
TL
58#include "common/async/waiter.h"
59#include "error_code.h"
60
7c673cae 61
9f95a23c
TL
62using std::list;
63using std::make_pair;
64using std::map;
65using std::ostream;
66using std::ostringstream;
67using std::pair;
68using std::set;
69using std::string;
70using std::stringstream;
71using std::vector;
72
73using ceph::decode;
74using ceph::encode;
75using ceph::Formatter;
76
77using std::defer_lock;
20effc67
TL
78using std::scoped_lock;
79using std::shared_lock;
80using std::unique_lock;
9f95a23c 81
7c673cae
FG
82using ceph::real_time;
83using ceph::real_clock;
84
85using ceph::mono_clock;
86using ceph::mono_time;
87
88using ceph::timespan;
89
9f95a23c
TL
90using ceph::shunique_lock;
91using ceph::acquire_shared;
92using ceph::acquire_unique;
7c673cae 93
f67539c2
TL
94namespace bc = boost::container;
95namespace bs = boost::system;
96namespace ca = ceph::async;
97namespace cb = ceph::buffer;
98
7c673cae
FG
99#define dout_subsys ceph_subsys_objecter
100#undef dout_prefix
101#define dout_prefix *_dout << messenger->get_myname() << ".objecter "
102
103
104enum {
105 l_osdc_first = 123200,
106 l_osdc_op_active,
107 l_osdc_op_laggy,
108 l_osdc_op_send,
109 l_osdc_op_send_bytes,
110 l_osdc_op_resend,
111 l_osdc_op_reply,
1e59de90
TL
112 l_osdc_op_latency,
113 l_osdc_op_inflight,
f67539c2 114 l_osdc_oplen_avg,
7c673cae
FG
115
116 l_osdc_op,
117 l_osdc_op_r,
118 l_osdc_op_w,
119 l_osdc_op_rmw,
120 l_osdc_op_pg,
121
122 l_osdc_osdop_stat,
123 l_osdc_osdop_create,
124 l_osdc_osdop_read,
125 l_osdc_osdop_write,
126 l_osdc_osdop_writefull,
127 l_osdc_osdop_writesame,
128 l_osdc_osdop_append,
129 l_osdc_osdop_zero,
130 l_osdc_osdop_truncate,
131 l_osdc_osdop_delete,
132 l_osdc_osdop_mapext,
133 l_osdc_osdop_sparse_read,
134 l_osdc_osdop_clonerange,
135 l_osdc_osdop_getxattr,
136 l_osdc_osdop_setxattr,
137 l_osdc_osdop_cmpxattr,
138 l_osdc_osdop_rmxattr,
139 l_osdc_osdop_resetxattrs,
7c673cae
FG
140 l_osdc_osdop_call,
141 l_osdc_osdop_watch,
142 l_osdc_osdop_notify,
143 l_osdc_osdop_src_cmpxattr,
144 l_osdc_osdop_pgls,
145 l_osdc_osdop_pgls_filter,
146 l_osdc_osdop_other,
147
148 l_osdc_linger_active,
149 l_osdc_linger_send,
150 l_osdc_linger_resend,
151 l_osdc_linger_ping,
152
153 l_osdc_poolop_active,
154 l_osdc_poolop_send,
155 l_osdc_poolop_resend,
156
157 l_osdc_poolstat_active,
158 l_osdc_poolstat_send,
159 l_osdc_poolstat_resend,
160
161 l_osdc_statfs_active,
162 l_osdc_statfs_send,
163 l_osdc_statfs_resend,
164
165 l_osdc_command_active,
166 l_osdc_command_send,
167 l_osdc_command_resend,
168
169 l_osdc_map_epoch,
170 l_osdc_map_full,
171 l_osdc_map_inc,
172
173 l_osdc_osd_sessions,
174 l_osdc_osd_session_open,
175 l_osdc_osd_session_close,
176 l_osdc_osd_laggy,
177
178 l_osdc_osdop_omap_wr,
179 l_osdc_osdop_omap_rd,
180 l_osdc_osdop_omap_del,
181
182 l_osdc_last,
183};
184
f67539c2
TL
185namespace {
186inline bs::error_code osdcode(int r) {
187 return (r < 0) ? bs::error_code(-r, osd_category()) : bs::error_code();
188}
189}
7c673cae
FG
190
191// config obs ----------------------------
192
7c673cae
FG
193class Objecter::RequestStateHook : public AdminSocketHook {
194 Objecter *m_objecter;
195public:
196 explicit RequestStateHook(Objecter *objecter);
9f95a23c 197 int call(std::string_view command, const cmdmap_t& cmdmap,
39ae355f 198 const bufferlist&,
9f95a23c
TL
199 Formatter *f,
200 std::ostream& ss,
f67539c2 201 cb::list& out) override;
7c673cae
FG
202};
203
f67539c2 204std::unique_lock<std::mutex> Objecter::OSDSession::get_lock(object_t& oid)
7c673cae
FG
205{
206 if (oid.name.empty())
f67539c2 207 return {};
7c673cae
FG
208
209 static constexpr uint32_t HASH_PRIME = 1021;
210 uint32_t h = ceph_str_hash_linux(oid.name.c_str(), oid.name.size())
211 % HASH_PRIME;
212
f67539c2 213 return {completion_locks[h % num_locks], std::defer_lock};
7c673cae
FG
214}
215
216const char** Objecter::get_tracked_conf_keys() const
217{
f91f0fd5
TL
218 static const char *config_keys[] = {
219 "crush_location",
220 "rados_mon_op_timeout",
221 "rados_osd_op_timeout",
222 NULL
223 };
7c673cae
FG
224 return config_keys;
225}
226
227
11fdf7f2 228void Objecter::handle_conf_change(const ConfigProxy& conf,
7c673cae
FG
229 const std::set <std::string> &changed)
230{
231 if (changed.count("crush_location")) {
232 update_crush_location();
233 }
f91f0fd5
TL
234 if (changed.count("rados_mon_op_timeout")) {
235 mon_timeout = conf.get_val<std::chrono::seconds>("rados_mon_op_timeout");
236 }
237 if (changed.count("rados_osd_op_timeout")) {
238 osd_timeout = conf.get_val<std::chrono::seconds>("rados_osd_op_timeout");
239 }
7c673cae
FG
240}
241
242void Objecter::update_crush_location()
243{
244 unique_lock wl(rwlock);
245 crush_location = cct->crush_location.get_location();
246}
247
248// messages ------------------------------
249
250/*
251 * initialize only internal data structures, don't initiate cluster interaction
252 */
253void Objecter::init()
254{
11fdf7f2 255 ceph_assert(!initialized);
7c673cae
FG
256
257 if (!logger) {
258 PerfCountersBuilder pcb(cct, "objecter", l_osdc_first, l_osdc_last);
259
260 pcb.add_u64(l_osdc_op_active, "op_active", "Operations active", "actv",
261 PerfCountersBuilder::PRIO_CRITICAL);
262 pcb.add_u64(l_osdc_op_laggy, "op_laggy", "Laggy operations");
263 pcb.add_u64_counter(l_osdc_op_send, "op_send", "Sent operations");
11fdf7f2 264 pcb.add_u64_counter(l_osdc_op_send_bytes, "op_send_bytes", "Sent data", NULL, 0, unit_t(UNIT_BYTES));
7c673cae
FG
265 pcb.add_u64_counter(l_osdc_op_resend, "op_resend", "Resent operations");
266 pcb.add_u64_counter(l_osdc_op_reply, "op_reply", "Operation reply");
1e59de90
TL
267 pcb.add_time_avg(l_osdc_op_latency, "op_latency", "Operation latency");
268 pcb.add_u64(l_osdc_op_inflight, "op_inflight", "Operations in flight");
f67539c2 269 pcb.add_u64_avg(l_osdc_oplen_avg, "oplen_avg", "Average length of operation vector");
7c673cae
FG
270
271 pcb.add_u64_counter(l_osdc_op, "op", "Operations");
272 pcb.add_u64_counter(l_osdc_op_r, "op_r", "Read operations", "rd",
273 PerfCountersBuilder::PRIO_CRITICAL);
274 pcb.add_u64_counter(l_osdc_op_w, "op_w", "Write operations", "wr",
275 PerfCountersBuilder::PRIO_CRITICAL);
276 pcb.add_u64_counter(l_osdc_op_rmw, "op_rmw", "Read-modify-write operations",
277 "rdwr", PerfCountersBuilder::PRIO_INTERESTING);
278 pcb.add_u64_counter(l_osdc_op_pg, "op_pg", "PG operation");
279
280 pcb.add_u64_counter(l_osdc_osdop_stat, "osdop_stat", "Stat operations");
281 pcb.add_u64_counter(l_osdc_osdop_create, "osdop_create",
282 "Create object operations");
283 pcb.add_u64_counter(l_osdc_osdop_read, "osdop_read", "Read operations");
284 pcb.add_u64_counter(l_osdc_osdop_write, "osdop_write", "Write operations");
285 pcb.add_u64_counter(l_osdc_osdop_writefull, "osdop_writefull",
286 "Write full object operations");
287 pcb.add_u64_counter(l_osdc_osdop_writesame, "osdop_writesame",
288 "Write same operations");
289 pcb.add_u64_counter(l_osdc_osdop_append, "osdop_append",
290 "Append operation");
291 pcb.add_u64_counter(l_osdc_osdop_zero, "osdop_zero",
292 "Set object to zero operations");
293 pcb.add_u64_counter(l_osdc_osdop_truncate, "osdop_truncate",
294 "Truncate object operations");
295 pcb.add_u64_counter(l_osdc_osdop_delete, "osdop_delete",
296 "Delete object operations");
297 pcb.add_u64_counter(l_osdc_osdop_mapext, "osdop_mapext",
298 "Map extent operations");
299 pcb.add_u64_counter(l_osdc_osdop_sparse_read, "osdop_sparse_read",
300 "Sparse read operations");
301 pcb.add_u64_counter(l_osdc_osdop_clonerange, "osdop_clonerange",
302 "Clone range operations");
303 pcb.add_u64_counter(l_osdc_osdop_getxattr, "osdop_getxattr",
304 "Get xattr operations");
305 pcb.add_u64_counter(l_osdc_osdop_setxattr, "osdop_setxattr",
306 "Set xattr operations");
307 pcb.add_u64_counter(l_osdc_osdop_cmpxattr, "osdop_cmpxattr",
308 "Xattr comparison operations");
309 pcb.add_u64_counter(l_osdc_osdop_rmxattr, "osdop_rmxattr",
310 "Remove xattr operations");
311 pcb.add_u64_counter(l_osdc_osdop_resetxattrs, "osdop_resetxattrs",
312 "Reset xattr operations");
7c673cae
FG
313 pcb.add_u64_counter(l_osdc_osdop_call, "osdop_call",
314 "Call (execute) operations");
315 pcb.add_u64_counter(l_osdc_osdop_watch, "osdop_watch",
316 "Watch by object operations");
317 pcb.add_u64_counter(l_osdc_osdop_notify, "osdop_notify",
318 "Notify about object operations");
319 pcb.add_u64_counter(l_osdc_osdop_src_cmpxattr, "osdop_src_cmpxattr",
320 "Extended attribute comparison in multi operations");
321 pcb.add_u64_counter(l_osdc_osdop_pgls, "osdop_pgls");
322 pcb.add_u64_counter(l_osdc_osdop_pgls_filter, "osdop_pgls_filter");
323 pcb.add_u64_counter(l_osdc_osdop_other, "osdop_other", "Other operations");
324
325 pcb.add_u64(l_osdc_linger_active, "linger_active",
326 "Active lingering operations");
327 pcb.add_u64_counter(l_osdc_linger_send, "linger_send",
328 "Sent lingering operations");
329 pcb.add_u64_counter(l_osdc_linger_resend, "linger_resend",
330 "Resent lingering operations");
331 pcb.add_u64_counter(l_osdc_linger_ping, "linger_ping",
332 "Sent pings to lingering operations");
333
334 pcb.add_u64(l_osdc_poolop_active, "poolop_active",
335 "Active pool operations");
336 pcb.add_u64_counter(l_osdc_poolop_send, "poolop_send",
337 "Sent pool operations");
338 pcb.add_u64_counter(l_osdc_poolop_resend, "poolop_resend",
339 "Resent pool operations");
340
341 pcb.add_u64(l_osdc_poolstat_active, "poolstat_active",
342 "Active get pool stat operations");
343 pcb.add_u64_counter(l_osdc_poolstat_send, "poolstat_send",
344 "Pool stat operations sent");
345 pcb.add_u64_counter(l_osdc_poolstat_resend, "poolstat_resend",
346 "Resent pool stats");
347
348 pcb.add_u64(l_osdc_statfs_active, "statfs_active", "Statfs operations");
349 pcb.add_u64_counter(l_osdc_statfs_send, "statfs_send", "Sent FS stats");
350 pcb.add_u64_counter(l_osdc_statfs_resend, "statfs_resend",
351 "Resent FS stats");
352
353 pcb.add_u64(l_osdc_command_active, "command_active", "Active commands");
354 pcb.add_u64_counter(l_osdc_command_send, "command_send",
355 "Sent commands");
356 pcb.add_u64_counter(l_osdc_command_resend, "command_resend",
357 "Resent commands");
358
359 pcb.add_u64(l_osdc_map_epoch, "map_epoch", "OSD map epoch");
360 pcb.add_u64_counter(l_osdc_map_full, "map_full",
361 "Full OSD maps received");
362 pcb.add_u64_counter(l_osdc_map_inc, "map_inc",
363 "Incremental OSD maps received");
364
365 pcb.add_u64(l_osdc_osd_sessions, "osd_sessions",
366 "Open sessions"); // open sessions
367 pcb.add_u64_counter(l_osdc_osd_session_open, "osd_session_open",
368 "Sessions opened");
369 pcb.add_u64_counter(l_osdc_osd_session_close, "osd_session_close",
370 "Sessions closed");
371 pcb.add_u64(l_osdc_osd_laggy, "osd_laggy", "Laggy OSD sessions");
372
373 pcb.add_u64_counter(l_osdc_osdop_omap_wr, "omap_wr",
374 "OSD OMAP write operations");
375 pcb.add_u64_counter(l_osdc_osdop_omap_rd, "omap_rd",
376 "OSD OMAP read operations");
377 pcb.add_u64_counter(l_osdc_osdop_omap_del, "omap_del",
378 "OSD OMAP delete operations");
379
380 logger = pcb.create_perf_counters();
381 cct->get_perfcounters_collection()->add(logger);
382 }
383
384 m_request_state_hook = new RequestStateHook(this);
f67539c2 385 auto admin_socket = cct->get_admin_socket();
7c673cae 386 int ret = admin_socket->register_command("objecter_requests",
7c673cae
FG
387 m_request_state_hook,
388 "show in-progress osd requests");
389
390 /* Don't warn on EEXIST, happens if multiple ceph clients
391 * are instantiated from one process */
392 if (ret < 0 && ret != -EEXIST) {
393 lderr(cct) << "error registering admin socket command: "
394 << cpp_strerror(ret) << dendl;
395 }
396
397 update_crush_location();
398
11fdf7f2 399 cct->_conf.add_observer(this);
7c673cae 400
31f18b77 401 initialized = true;
7c673cae
FG
402}
403
404/*
405 * ok, cluster interaction can happen
406 */
407void Objecter::start(const OSDMap* o)
408{
409 shared_lock rl(rwlock);
410
411 start_tick();
412 if (o) {
413 osdmap->deepish_copy_from(*o);
9f95a23c 414 prune_pg_mapping(osdmap->get_pools());
7c673cae
FG
415 } else if (osdmap->get_epoch() == 0) {
416 _maybe_request_map();
417 }
418}
419
420void Objecter::shutdown()
421{
11fdf7f2 422 ceph_assert(initialized);
7c673cae
FG
423
424 unique_lock wl(rwlock);
425
31f18b77 426 initialized = false;
7c673cae 427
f64942e4 428 wl.unlock();
11fdf7f2 429 cct->_conf.remove_observer(this);
f64942e4 430 wl.lock();
7c673cae 431
7c673cae 432 while (!osd_sessions.empty()) {
f67539c2 433 auto p = osd_sessions.begin();
7c673cae
FG
434 close_session(p->second);
435 }
436
437 while(!check_latest_map_lingers.empty()) {
f67539c2 438 auto i = check_latest_map_lingers.begin();
7c673cae
FG
439 i->second->put();
440 check_latest_map_lingers.erase(i->first);
441 }
442
443 while(!check_latest_map_ops.empty()) {
f67539c2 444 auto i = check_latest_map_ops.begin();
7c673cae
FG
445 i->second->put();
446 check_latest_map_ops.erase(i->first);
447 }
448
449 while(!check_latest_map_commands.empty()) {
f67539c2 450 auto i = check_latest_map_commands.begin();
7c673cae
FG
451 i->second->put();
452 check_latest_map_commands.erase(i->first);
453 }
454
455 while(!poolstat_ops.empty()) {
f67539c2 456 auto i = poolstat_ops.begin();
7c673cae
FG
457 delete i->second;
458 poolstat_ops.erase(i->first);
459 }
460
461 while(!statfs_ops.empty()) {
f67539c2 462 auto i = statfs_ops.begin();
7c673cae
FG
463 delete i->second;
464 statfs_ops.erase(i->first);
465 }
466
467 while(!pool_ops.empty()) {
f67539c2 468 auto i = pool_ops.begin();
7c673cae
FG
469 delete i->second;
470 pool_ops.erase(i->first);
471 }
472
473 ldout(cct, 20) << __func__ << " clearing up homeless session..." << dendl;
474 while(!homeless_session->linger_ops.empty()) {
f67539c2 475 auto i = homeless_session->linger_ops.begin();
7c673cae
FG
476 ldout(cct, 10) << " linger_op " << i->first << dendl;
477 LingerOp *lop = i->second;
478 {
f67539c2 479 std::unique_lock swl(homeless_session->lock);
7c673cae
FG
480 _session_linger_op_remove(homeless_session, lop);
481 }
482 linger_ops.erase(lop->linger_id);
483 linger_ops_set.erase(lop);
484 lop->put();
485 }
486
487 while(!homeless_session->ops.empty()) {
f67539c2 488 auto i = homeless_session->ops.begin();
7c673cae 489 ldout(cct, 10) << " op " << i->first << dendl;
f67539c2 490 auto op = i->second;
7c673cae 491 {
f67539c2 492 std::unique_lock swl(homeless_session->lock);
7c673cae
FG
493 _session_op_remove(homeless_session, op);
494 }
495 op->put();
496 }
497
498 while(!homeless_session->command_ops.empty()) {
f67539c2 499 auto i = homeless_session->command_ops.begin();
7c673cae 500 ldout(cct, 10) << " command_op " << i->first << dendl;
f67539c2 501 auto cop = i->second;
7c673cae 502 {
f67539c2 503 std::unique_lock swl(homeless_session->lock);
7c673cae
FG
504 _session_command_op_remove(homeless_session, cop);
505 }
506 cop->put();
507 }
508
509 if (tick_event) {
510 if (timer.cancel_event(tick_event)) {
511 ldout(cct, 10) << " successfully canceled tick" << dendl;
512 }
513 tick_event = 0;
514 }
515
516 if (logger) {
517 cct->get_perfcounters_collection()->remove(logger);
518 delete logger;
519 logger = NULL;
520 }
521
522 // Let go of Objecter write lock so timer thread can shutdown
523 wl.unlock();
524
525 // Outside of lock to avoid cycle WRT calls to RequestStateHook
526 // This is safe because we guarantee no concurrent calls to
527 // shutdown() with the ::initialized check at start.
528 if (m_request_state_hook) {
f67539c2 529 auto admin_socket = cct->get_admin_socket();
9f95a23c 530 admin_socket->unregister_commands(m_request_state_hook);
7c673cae
FG
531 delete m_request_state_hook;
532 m_request_state_hook = NULL;
533 }
534}
535
536void Objecter::_send_linger(LingerOp *info,
f67539c2 537 ceph::shunique_lock<ceph::shared_mutex>& sul)
7c673cae 538{
11fdf7f2 539 ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
7c673cae 540
f67539c2
TL
541 fu2::unique_function<Op::OpSig> oncommit;
542 osdc_opvec opv;
543 std::shared_lock watchl(info->watch_lock);
544 cb::list *poutbl = nullptr;
7c673cae
FG
545 if (info->registered && info->is_watch) {
546 ldout(cct, 15) << "send_linger " << info->linger_id << " reconnect"
547 << dendl;
548 opv.push_back(OSDOp());
549 opv.back().op.op = CEPH_OSD_OP_WATCH;
550 opv.back().op.watch.cookie = info->get_cookie();
551 opv.back().op.watch.op = CEPH_OSD_WATCH_OP_RECONNECT;
552 opv.back().op.watch.gen = ++info->register_gen;
f67539c2 553 oncommit = CB_Linger_Reconnect(this, info);
7c673cae
FG
554 } else {
555 ldout(cct, 15) << "send_linger " << info->linger_id << " register"
556 << dendl;
557 opv = info->ops;
f67539c2
TL
558 // TODO Augment ca::Completion with an equivalent of
559 // target so we can handle these cases better.
560 auto c = std::make_unique<CB_Linger_Commit>(this, info);
7c673cae
FG
561 if (!info->is_watch) {
562 info->notify_id = 0;
563 poutbl = &c->outbl;
564 }
f67539c2
TL
565 oncommit = [c = std::move(c)](bs::error_code ec) mutable {
566 std::move(*c)(ec);
567 };
7c673cae
FG
568 }
569 watchl.unlock();
f67539c2
TL
570 auto o = new Op(info->target.base_oid, info->target.base_oloc,
571 std::move(opv), info->target.flags | CEPH_OSD_FLAG_READ,
572 std::move(oncommit), info->pobjver);
7c673cae
FG
573 o->outbl = poutbl;
574 o->snapid = info->snap;
575 o->snapc = info->snapc;
576 o->mtime = info->mtime;
577
578 o->target = info->target;
31f18b77 579 o->tid = ++last_tid;
7c673cae
FG
580
581 // do not resend this; we will send a new op to reregister
582 o->should_resend = false;
11fdf7f2 583 o->ctx_budgeted = true;
7c673cae
FG
584
585 if (info->register_tid) {
11fdf7f2 586 // repeat send. cancel old registration op, if any.
f67539c2 587 std::unique_lock sl(info->session->lock);
7c673cae 588 if (info->session->ops.count(info->register_tid)) {
f67539c2 589 auto o = info->session->ops[info->register_tid];
7c673cae
FG
590 _op_cancel_map_check(o);
591 _cancel_linger_op(o);
592 }
593 sl.unlock();
7c673cae
FG
594 }
595
11fdf7f2
TL
596 _op_submit_with_budget(o, sul, &info->register_tid, &info->ctx_budget);
597
7c673cae
FG
598 logger->inc(l_osdc_linger_send);
599}
600
f67539c2
TL
601void Objecter::_linger_commit(LingerOp *info, bs::error_code ec,
602 cb::list& outbl)
7c673cae 603{
f67539c2 604 std::unique_lock wl(info->watch_lock);
7c673cae
FG
605 ldout(cct, 10) << "_linger_commit " << info->linger_id << dendl;
606 if (info->on_reg_commit) {
f67539c2
TL
607 info->on_reg_commit->defer(std::move(info->on_reg_commit),
608 ec, cb::list{});
609 info->on_reg_commit.reset();
7c673cae 610 }
f67539c2
TL
611 if (ec && info->on_notify_finish) {
612 info->on_notify_finish->defer(std::move(info->on_notify_finish),
613 ec, cb::list{});
614 info->on_notify_finish.reset();
28e407b8 615 }
7c673cae
FG
616
617 // only tell the user the first time we do this
618 info->registered = true;
619 info->pobjver = NULL;
620
621 if (!info->is_watch) {
622 // make note of the notify_id
11fdf7f2 623 auto p = outbl.cbegin();
7c673cae 624 try {
11fdf7f2 625 decode(info->notify_id, p);
7c673cae
FG
626 ldout(cct, 10) << "_linger_commit notify_id=" << info->notify_id
627 << dendl;
628 }
f67539c2 629 catch (cb::error& e) {
7c673cae
FG
630 }
631 }
632}
633
f67539c2 634class CB_DoWatchError {
7c673cae 635 Objecter *objecter;
f67539c2
TL
636 boost::intrusive_ptr<Objecter::LingerOp> info;
637 bs::error_code ec;
638public:
639 CB_DoWatchError(Objecter *o, Objecter::LingerOp *i,
640 bs::error_code ec)
641 : objecter(o), info(i), ec(ec) {
7c673cae
FG
642 info->_queued_async();
643 }
f67539c2
TL
644 void operator()() {
645 std::unique_lock wl(objecter->rwlock);
7c673cae
FG
646 bool canceled = info->canceled;
647 wl.unlock();
648
649 if (!canceled) {
f67539c2 650 info->handle(ec, 0, info->get_cookie(), 0, {});
7c673cae
FG
651 }
652
653 info->finished_async();
7c673cae
FG
654 }
655};
656
f67539c2 657bs::error_code Objecter::_normalize_watch_error(bs::error_code ec)
7c673cae
FG
658{
659 // translate ENOENT -> ENOTCONN so that a delete->disconnection
11fdf7f2 660 // notification and a failure to reconnect because we raced with
7c673cae 661 // the delete appear the same to the user.
f67539c2
TL
662 if (ec == bs::errc::no_such_file_or_directory)
663 ec = bs::error_code(ENOTCONN, osd_category());
664 return ec;
7c673cae
FG
665}
666
f67539c2 667void Objecter::_linger_reconnect(LingerOp *info, bs::error_code ec)
7c673cae 668{
f67539c2 669 ldout(cct, 10) << __func__ << " " << info->linger_id << " = " << ec
7c673cae 670 << " (last_error " << info->last_error << ")" << dendl;
f67539c2
TL
671 std::unique_lock wl(info->watch_lock);
672 if (ec) {
7c673cae 673 if (!info->last_error) {
f67539c2
TL
674 ec = _normalize_watch_error(ec);
675 if (info->handle) {
676 boost::asio::defer(finish_strand, CB_DoWatchError(this, info, ec));
7c673cae
FG
677 }
678 }
7c673cae 679 }
f67539c2
TL
680
681 info->last_error = ec;
7c673cae
FG
682}
683
684void Objecter::_send_linger_ping(LingerOp *info)
685{
686 // rwlock is locked unique
687 // info->session->lock is locked
688
689 if (cct->_conf->objecter_inject_no_watch_ping) {
690 ldout(cct, 10) << __func__ << " " << info->linger_id << " SKIPPING"
691 << dendl;
692 return;
693 }
694 if (osdmap->test_flag(CEPH_OSDMAP_PAUSERD)) {
695 ldout(cct, 10) << __func__ << " PAUSERD" << dendl;
696 return;
697 }
698
11fdf7f2 699 ceph::coarse_mono_time now = ceph::coarse_mono_clock::now();
7c673cae
FG
700 ldout(cct, 10) << __func__ << " " << info->linger_id << " now " << now
701 << dendl;
702
f67539c2 703 osdc_opvec opv(1);
7c673cae
FG
704 opv[0].op.op = CEPH_OSD_OP_WATCH;
705 opv[0].op.watch.cookie = info->get_cookie();
706 opv[0].op.watch.op = CEPH_OSD_WATCH_OP_PING;
707 opv[0].op.watch.gen = info->register_gen;
f67539c2 708
7c673cae 709 Op *o = new Op(info->target.base_oid, info->target.base_oloc,
f67539c2
TL
710 std::move(opv), info->target.flags | CEPH_OSD_FLAG_READ,
711 CB_Linger_Ping(this, info, now),
712 nullptr, nullptr);
7c673cae
FG
713 o->target = info->target;
714 o->should_resend = false;
715 _send_op_account(o);
31f18b77 716 o->tid = ++last_tid;
7c673cae 717 _session_op_assign(info->session, o);
11fdf7f2 718 _send_op(o);
7c673cae
FG
719 info->ping_tid = o->tid;
720
7c673cae
FG
721 logger->inc(l_osdc_linger_ping);
722}
723
f67539c2 724void Objecter::_linger_ping(LingerOp *info, bs::error_code ec, ceph::coarse_mono_time sent,
7c673cae
FG
725 uint32_t register_gen)
726{
f67539c2 727 std::unique_lock l(info->watch_lock);
7c673cae 728 ldout(cct, 10) << __func__ << " " << info->linger_id
f67539c2 729 << " sent " << sent << " gen " << register_gen << " = " << ec
7c673cae
FG
730 << " (last_error " << info->last_error
731 << " register_gen " << info->register_gen << ")" << dendl;
732 if (info->register_gen == register_gen) {
f67539c2 733 if (!ec) {
7c673cae 734 info->watch_valid_thru = sent;
f67539c2
TL
735 } else if (ec && !info->last_error) {
736 ec = _normalize_watch_error(ec);
737 info->last_error = ec;
738 if (info->handle) {
739 boost::asio::defer(finish_strand, CB_DoWatchError(this, info, ec));
7c673cae
FG
740 }
741 }
742 } else {
743 ldout(cct, 20) << " ignoring old gen" << dendl;
744 }
745}
746
f67539c2
TL
747tl::expected<ceph::timespan,
748 bs::error_code> Objecter::linger_check(LingerOp *info)
7c673cae 749{
f67539c2 750 std::shared_lock l(info->watch_lock);
7c673cae 751
11fdf7f2 752 ceph::coarse_mono_time stamp = info->watch_valid_thru;
7c673cae 753 if (!info->watch_pending_async.empty())
11fdf7f2
TL
754 stamp = std::min(info->watch_valid_thru, info->watch_pending_async.front());
755 auto age = ceph::coarse_mono_clock::now() - stamp;
7c673cae
FG
756
757 ldout(cct, 10) << __func__ << " " << info->linger_id
758 << " err " << info->last_error
759 << " age " << age << dendl;
760 if (info->last_error)
f67539c2 761 return tl::unexpected(info->last_error);
7c673cae 762 // return a safe upper bound (we are truncating to ms)
f67539c2 763 return age;
7c673cae
FG
764}
765
766void Objecter::linger_cancel(LingerOp *info)
767{
768 unique_lock wl(rwlock);
769 _linger_cancel(info);
770 info->put();
771}
772
773void Objecter::_linger_cancel(LingerOp *info)
774{
775 // rwlock is locked unique
776 ldout(cct, 20) << __func__ << " linger_id=" << info->linger_id << dendl;
777 if (!info->canceled) {
778 OSDSession *s = info->session;
f67539c2 779 std::unique_lock sl(s->lock);
7c673cae
FG
780 _session_linger_op_remove(s, info);
781 sl.unlock();
782
783 linger_ops.erase(info->linger_id);
784 linger_ops_set.erase(info);
11fdf7f2 785 ceph_assert(linger_ops.size() == linger_ops_set.size());
7c673cae
FG
786
787 info->canceled = true;
788 info->put();
789
790 logger->dec(l_osdc_linger_active);
791 }
792}
793
794
795
796Objecter::LingerOp *Objecter::linger_register(const object_t& oid,
797 const object_locator_t& oloc,
798 int flags)
799{
f67539c2
TL
800 unique_lock l(rwlock);
801 // Acquire linger ID
802 auto info = new LingerOp(this, ++max_linger_id);
7c673cae
FG
803 info->target.base_oid = oid;
804 info->target.base_oloc = oloc;
805 if (info->target.base_oloc.key == oid)
806 info->target.base_oloc.key.clear();
807 info->target.flags = flags;
11fdf7f2 808 info->watch_valid_thru = ceph::coarse_mono_clock::now();
7c673cae
FG
809 ldout(cct, 10) << __func__ << " info " << info
810 << " linger_id " << info->linger_id
811 << " cookie " << info->get_cookie()
812 << dendl;
813 linger_ops[info->linger_id] = info;
814 linger_ops_set.insert(info);
11fdf7f2 815 ceph_assert(linger_ops.size() == linger_ops_set.size());
7c673cae
FG
816
817 info->get(); // for the caller
818 return info;
819}
820
821ceph_tid_t Objecter::linger_watch(LingerOp *info,
822 ObjectOperation& op,
823 const SnapContext& snapc,
824 real_time mtime,
f67539c2
TL
825 cb::list& inbl,
826 decltype(info->on_reg_commit)&& oncommit,
7c673cae
FG
827 version_t *objver)
828{
829 info->is_watch = true;
830 info->snapc = snapc;
831 info->mtime = mtime;
832 info->target.flags |= CEPH_OSD_FLAG_WRITE;
833 info->ops = op.ops;
834 info->inbl = inbl;
7c673cae 835 info->pobjver = objver;
f67539c2 836 info->on_reg_commit = std::move(oncommit);
7c673cae 837
11fdf7f2
TL
838 info->ctx_budget = take_linger_budget(info);
839
7c673cae
FG
840 shunique_lock sul(rwlock, ceph::acquire_unique);
841 _linger_submit(info, sul);
842 logger->inc(l_osdc_linger_active);
843
f67539c2 844 op.clear();
7c673cae
FG
845 return info->linger_id;
846}
847
848ceph_tid_t Objecter::linger_notify(LingerOp *info,
849 ObjectOperation& op,
f67539c2
TL
850 snapid_t snap, cb::list& inbl,
851 decltype(LingerOp::on_reg_commit)&& onfinish,
7c673cae
FG
852 version_t *objver)
853{
854 info->snap = snap;
855 info->target.flags |= CEPH_OSD_FLAG_READ;
856 info->ops = op.ops;
857 info->inbl = inbl;
7c673cae 858 info->pobjver = objver;
f67539c2 859 info->on_reg_commit = std::move(onfinish);
11fdf7f2 860 info->ctx_budget = take_linger_budget(info);
f67539c2 861
7c673cae
FG
862 shunique_lock sul(rwlock, ceph::acquire_unique);
863 _linger_submit(info, sul);
864 logger->inc(l_osdc_linger_active);
865
f67539c2 866 op.clear();
7c673cae
FG
867 return info->linger_id;
868}
869
f67539c2
TL
870void Objecter::_linger_submit(LingerOp *info,
871 ceph::shunique_lock<ceph::shared_mutex>& sul)
7c673cae 872{
11fdf7f2
TL
873 ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
874 ceph_assert(info->linger_id);
875 ceph_assert(info->ctx_budget != -1); // caller needs to have taken budget already!
7c673cae
FG
876
877 // Populate Op::target
878 OSDSession *s = NULL;
20effc67
TL
879 int r = _calc_target(&info->target, nullptr);
880 switch (r) {
881 case RECALC_OP_TARGET_POOL_EIO:
882 _check_linger_pool_eio(info);
883 return;
884 }
7c673cae
FG
885
886 // Create LingerOp<->OSDSession relation
20effc67 887 r = _get_session(info->target.osd, &s, sul);
11fdf7f2 888 ceph_assert(r == 0);
f67539c2 889 unique_lock sl(s->lock);
7c673cae
FG
890 _session_linger_op_assign(s, info);
891 sl.unlock();
892 put_session(s);
893
894 _send_linger(info, sul);
895}
896
f67539c2 897struct CB_DoWatchNotify {
7c673cae 898 Objecter *objecter;
f67539c2
TL
899 boost::intrusive_ptr<Objecter::LingerOp> info;
900 boost::intrusive_ptr<MWatchNotify> msg;
901 CB_DoWatchNotify(Objecter *o, Objecter::LingerOp *i, MWatchNotify *m)
7c673cae 902 : objecter(o), info(i), msg(m) {
7c673cae 903 info->_queued_async();
7c673cae 904 }
f67539c2
TL
905 void operator()() {
906 objecter->_do_watch_notify(std::move(info), std::move(msg));
7c673cae
FG
907 }
908};
909
910void Objecter::handle_watch_notify(MWatchNotify *m)
911{
912 shared_lock l(rwlock);
31f18b77 913 if (!initialized) {
7c673cae
FG
914 return;
915 }
916
917 LingerOp *info = reinterpret_cast<LingerOp*>(m->cookie);
918 if (linger_ops_set.count(info) == 0) {
919 ldout(cct, 7) << __func__ << " cookie " << m->cookie << " dne" << dendl;
920 return;
921 }
f67539c2 922 std::unique_lock wl(info->watch_lock);
7c673cae
FG
923 if (m->opcode == CEPH_WATCH_EVENT_DISCONNECT) {
924 if (!info->last_error) {
f67539c2
TL
925 info->last_error = bs::error_code(ENOTCONN, osd_category());
926 if (info->handle) {
927 boost::asio::defer(finish_strand, CB_DoWatchError(this, info,
928 info->last_error));
7c673cae
FG
929 }
930 }
931 } else if (!info->is_watch) {
932 // we have CEPH_WATCH_EVENT_NOTIFY_COMPLETE; we can do this inline
933 // since we know the only user (librados) is safe to call in
934 // fast-dispatch context
935 if (info->notify_id &&
936 info->notify_id != m->notify_id) {
937 ldout(cct, 10) << __func__ << " reply notify " << m->notify_id
938 << " != " << info->notify_id << ", ignoring" << dendl;
939 } else if (info->on_notify_finish) {
f67539c2
TL
940 info->on_notify_finish->defer(
941 std::move(info->on_notify_finish),
942 osdcode(m->return_code), std::move(m->get_data()));
7c673cae
FG
943
944 // if we race with reconnect we might get a second notify; only
945 // notify the caller once!
f67539c2 946 info->on_notify_finish = nullptr;
7c673cae
FG
947 }
948 } else {
f67539c2 949 boost::asio::defer(finish_strand, CB_DoWatchNotify(this, info, m));
7c673cae
FG
950 }
951}
952
f67539c2
TL
953void Objecter::_do_watch_notify(boost::intrusive_ptr<LingerOp> info,
954 boost::intrusive_ptr<MWatchNotify> m)
7c673cae
FG
955{
956 ldout(cct, 10) << __func__ << " " << *m << dendl;
957
958 shared_lock l(rwlock);
11fdf7f2 959 ceph_assert(initialized);
7c673cae
FG
960
961 if (info->canceled) {
962 l.unlock();
963 goto out;
964 }
965
966 // notify completion?
11fdf7f2 967 ceph_assert(info->is_watch);
f67539c2 968 ceph_assert(info->handle);
11fdf7f2 969 ceph_assert(m->opcode != CEPH_WATCH_EVENT_DISCONNECT);
7c673cae
FG
970
971 l.unlock();
972
973 switch (m->opcode) {
974 case CEPH_WATCH_EVENT_NOTIFY:
f67539c2 975 info->handle({}, m->notify_id, m->cookie, m->notifier_gid, std::move(m->bl));
7c673cae
FG
976 break;
977 }
978
979 out:
980 info->finished_async();
7c673cae
FG
981}
982
983bool Objecter::ms_dispatch(Message *m)
984{
985 ldout(cct, 10) << __func__ << " " << cct << " " << *m << dendl;
7c673cae
FG
986 switch (m->get_type()) {
987 // these we exlusively handle
988 case CEPH_MSG_OSD_OPREPLY:
989 handle_osd_op_reply(static_cast<MOSDOpReply*>(m));
990 return true;
991
992 case CEPH_MSG_OSD_BACKOFF:
993 handle_osd_backoff(static_cast<MOSDBackoff*>(m));
994 return true;
995
996 case CEPH_MSG_WATCH_NOTIFY:
997 handle_watch_notify(static_cast<MWatchNotify*>(m));
998 m->put();
999 return true;
1000
1001 case MSG_COMMAND_REPLY:
1002 if (m->get_source().type() == CEPH_ENTITY_TYPE_OSD) {
1003 handle_command_reply(static_cast<MCommandReply*>(m));
1004 return true;
1005 } else {
1006 return false;
1007 }
1008
1009 case MSG_GETPOOLSTATSREPLY:
1010 handle_get_pool_stats_reply(static_cast<MGetPoolStatsReply*>(m));
1011 return true;
1012
1013 case CEPH_MSG_POOLOP_REPLY:
1014 handle_pool_op_reply(static_cast<MPoolOpReply*>(m));
1015 return true;
1016
1017 case CEPH_MSG_STATFS_REPLY:
1018 handle_fs_stats_reply(static_cast<MStatfsReply*>(m));
1019 return true;
1020
1021 // these we give others a chance to inspect
1022
1023 // MDS, OSD
1024 case CEPH_MSG_OSD_MAP:
1025 handle_osd_map(static_cast<MOSDMap*>(m));
1026 return false;
1027 }
1028 return false;
1029}
1030
11fdf7f2
TL
1031void Objecter::_scan_requests(
1032 OSDSession *s,
1033 bool skipped_map,
1034 bool cluster_full,
1035 map<int64_t, bool> *pool_full_map,
1036 map<ceph_tid_t, Op*>& need_resend,
1037 list<LingerOp*>& need_resend_linger,
1038 map<ceph_tid_t, CommandOp*>& need_resend_command,
f67539c2 1039 ceph::shunique_lock<ceph::shared_mutex>& sul)
7c673cae 1040{
11fdf7f2 1041 ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
7c673cae
FG
1042
1043 list<LingerOp*> unregister_lingers;
1044
f67539c2 1045 std::unique_lock sl(s->lock);
7c673cae
FG
1046
1047 // check for changed linger mappings (_before_ regular ops)
f67539c2 1048 auto lp = s->linger_ops.begin();
7c673cae 1049 while (lp != s->linger_ops.end()) {
f67539c2 1050 auto op = lp->second;
11fdf7f2 1051 ceph_assert(op->session == s);
7c673cae
FG
1052 // check_linger_pool_dne() may touch linger_ops; prevent iterator
1053 // invalidation
1054 ++lp;
1055 ldout(cct, 10) << " checking linger op " << op->linger_id << dendl;
1056 bool unregister, force_resend_writes = cluster_full;
1057 int r = _recalc_linger_op_target(op, sul);
1058 if (pool_full_map)
1059 force_resend_writes = force_resend_writes ||
1060 (*pool_full_map)[op->target.base_oloc.pool];
1061 switch (r) {
1062 case RECALC_OP_TARGET_NO_ACTION:
11fdf7f2 1063 if (!skipped_map && !force_resend_writes)
7c673cae
FG
1064 break;
1065 // -- fall-thru --
1066 case RECALC_OP_TARGET_NEED_RESEND:
1067 need_resend_linger.push_back(op);
1068 _linger_cancel_map_check(op);
1069 break;
1070 case RECALC_OP_TARGET_POOL_DNE:
1071 _check_linger_pool_dne(op, &unregister);
1072 if (unregister) {
1073 ldout(cct, 10) << " need to unregister linger op "
1074 << op->linger_id << dendl;
1075 op->get();
1076 unregister_lingers.push_back(op);
1077 }
1078 break;
20effc67
TL
1079 case RECALC_OP_TARGET_POOL_EIO:
1080 _check_linger_pool_eio(op);
1081 ldout(cct, 10) << " need to unregister linger op "
1082 << op->linger_id << dendl;
1083 op->get();
1084 unregister_lingers.push_back(op);
1085 break;
7c673cae
FG
1086 }
1087 }
1088
1089 // check for changed request mappings
f67539c2 1090 auto p = s->ops.begin();
7c673cae
FG
1091 while (p != s->ops.end()) {
1092 Op *op = p->second;
1093 ++p; // check_op_pool_dne() may touch ops; prevent iterator invalidation
1094 ldout(cct, 10) << " checking op " << op->tid << dendl;
11fdf7f2 1095 _prune_snapc(osdmap->get_new_removed_snaps(), op);
7c673cae
FG
1096 bool force_resend_writes = cluster_full;
1097 if (pool_full_map)
1098 force_resend_writes = force_resend_writes ||
1099 (*pool_full_map)[op->target.base_oloc.pool];
1100 int r = _calc_target(&op->target,
1101 op->session ? op->session->con.get() : nullptr);
1102 switch (r) {
1103 case RECALC_OP_TARGET_NO_ACTION:
9f95a23c 1104 if (!skipped_map && !(force_resend_writes && op->target.respects_full()))
7c673cae
FG
1105 break;
1106 // -- fall-thru --
1107 case RECALC_OP_TARGET_NEED_RESEND:
11fdf7f2 1108 _session_op_remove(op->session, op);
7c673cae
FG
1109 need_resend[op->tid] = op;
1110 _op_cancel_map_check(op);
1111 break;
1112 case RECALC_OP_TARGET_POOL_DNE:
1113 _check_op_pool_dne(op, &sl);
1114 break;
20effc67
TL
1115 case RECALC_OP_TARGET_POOL_EIO:
1116 _check_op_pool_eio(op, &sl);
1117 break;
7c673cae
FG
1118 }
1119 }
1120
1121 // commands
f67539c2 1122 auto cp = s->command_ops.begin();
7c673cae 1123 while (cp != s->command_ops.end()) {
f67539c2 1124 auto c = cp->second;
7c673cae
FG
1125 ++cp;
1126 ldout(cct, 10) << " checking command " << c->tid << dendl;
1127 bool force_resend_writes = cluster_full;
1128 if (pool_full_map)
1129 force_resend_writes = force_resend_writes ||
1130 (*pool_full_map)[c->target_pg.pool()];
1131 int r = _calc_command_target(c, sul);
1132 switch (r) {
1133 case RECALC_OP_TARGET_NO_ACTION:
1134 // resend if skipped map; otherwise do nothing.
11fdf7f2 1135 if (!skipped_map && !force_resend_writes)
7c673cae
FG
1136 break;
1137 // -- fall-thru --
1138 case RECALC_OP_TARGET_NEED_RESEND:
1139 need_resend_command[c->tid] = c;
11fdf7f2 1140 _session_command_op_remove(c->session, c);
7c673cae
FG
1141 _command_cancel_map_check(c);
1142 break;
1143 case RECALC_OP_TARGET_POOL_DNE:
1144 case RECALC_OP_TARGET_OSD_DNE:
1145 case RECALC_OP_TARGET_OSD_DOWN:
1146 _check_command_map_dne(c);
1147 break;
1148 }
1149 }
1150
1151 sl.unlock();
1152
f67539c2 1153 for (auto iter = unregister_lingers.begin();
7c673cae
FG
1154 iter != unregister_lingers.end();
1155 ++iter) {
1156 _linger_cancel(*iter);
1157 (*iter)->put();
1158 }
1159}
1160
1161void Objecter::handle_osd_map(MOSDMap *m)
1162{
f67539c2 1163 ceph::shunique_lock sul(rwlock, acquire_unique);
31f18b77 1164 if (!initialized)
7c673cae
FG
1165 return;
1166
11fdf7f2 1167 ceph_assert(osdmap);
7c673cae
FG
1168
1169 if (m->fsid != monc->get_fsid()) {
1170 ldout(cct, 0) << "handle_osd_map fsid " << m->fsid
1171 << " != " << monc->get_fsid() << dendl;
1172 return;
1173 }
1174
1175 bool was_pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
1176 bool cluster_full = _osdmap_full_flag();
1177 bool was_pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || cluster_full ||
1178 _osdmap_has_pool_full();
1179 map<int64_t, bool> pool_full_map;
f67539c2 1180 for (auto it = osdmap->get_pools().begin();
7c673cae
FG
1181 it != osdmap->get_pools().end(); ++it)
1182 pool_full_map[it->first] = _osdmap_pool_full(it->second);
1183
1184
1185 list<LingerOp*> need_resend_linger;
1186 map<ceph_tid_t, Op*> need_resend;
1187 map<ceph_tid_t, CommandOp*> need_resend_command;
1188
1189 if (m->get_last() <= osdmap->get_epoch()) {
1190 ldout(cct, 3) << "handle_osd_map ignoring epochs ["
1191 << m->get_first() << "," << m->get_last()
1192 << "] <= " << osdmap->get_epoch() << dendl;
1193 } else {
1194 ldout(cct, 3) << "handle_osd_map got epochs ["
1195 << m->get_first() << "," << m->get_last()
1196 << "] > " << osdmap->get_epoch() << dendl;
1197
1198 if (osdmap->get_epoch()) {
1199 bool skipped_map = false;
1200 // we want incrementals
1201 for (epoch_t e = osdmap->get_epoch() + 1;
1202 e <= m->get_last();
1203 e++) {
1204
1205 if (osdmap->get_epoch() == e-1 &&
1206 m->incremental_maps.count(e)) {
1207 ldout(cct, 3) << "handle_osd_map decoding incremental epoch " << e
1208 << dendl;
1209 OSDMap::Incremental inc(m->incremental_maps[e]);
1210 osdmap->apply_incremental(inc);
31f18b77 1211
f67539c2 1212 emit_blocklist_events(inc);
31f18b77 1213
7c673cae
FG
1214 logger->inc(l_osdc_map_inc);
1215 }
1216 else if (m->maps.count(e)) {
1217 ldout(cct, 3) << "handle_osd_map decoding full epoch " << e << dendl;
9f95a23c 1218 auto new_osdmap = std::make_unique<OSDMap>();
31f18b77
FG
1219 new_osdmap->decode(m->maps[e]);
1220
f67539c2 1221 emit_blocklist_events(*osdmap, *new_osdmap);
9f95a23c 1222 osdmap = std::move(new_osdmap);
31f18b77 1223
7c673cae
FG
1224 logger->inc(l_osdc_map_full);
1225 }
1226 else {
1e59de90 1227 if (e >= m->cluster_osdmap_trim_lower_bound) {
7c673cae
FG
1228 ldout(cct, 3) << "handle_osd_map requesting missing epoch "
1229 << osdmap->get_epoch()+1 << dendl;
1230 _maybe_request_map();
1231 break;
1232 }
1233 ldout(cct, 3) << "handle_osd_map missing epoch "
1234 << osdmap->get_epoch()+1
1e59de90
TL
1235 << ", jumping to "
1236 << m->cluster_osdmap_trim_lower_bound << dendl;
1237 e = m->cluster_osdmap_trim_lower_bound - 1;
7c673cae
FG
1238 skipped_map = true;
1239 continue;
1240 }
1241 logger->set(l_osdc_map_epoch, osdmap->get_epoch());
1242
9f95a23c 1243 prune_pg_mapping(osdmap->get_pools());
7c673cae
FG
1244 cluster_full = cluster_full || _osdmap_full_flag();
1245 update_pool_full_map(pool_full_map);
1246
1247 // check all outstanding requests on every epoch
11fdf7f2
TL
1248 for (auto& i : need_resend) {
1249 _prune_snapc(osdmap->get_new_removed_snaps(), i.second);
11fdf7f2 1250 }
7c673cae
FG
1251 _scan_requests(homeless_session, skipped_map, cluster_full,
1252 &pool_full_map, need_resend,
9f95a23c 1253 need_resend_linger, need_resend_command, sul);
f67539c2 1254 for (auto p = osd_sessions.begin();
7c673cae 1255 p != osd_sessions.end(); ) {
f67539c2 1256 auto s = p->second;
7c673cae
FG
1257 _scan_requests(s, skipped_map, cluster_full,
1258 &pool_full_map, need_resend,
9f95a23c 1259 need_resend_linger, need_resend_command, sul);
7c673cae
FG
1260 ++p;
1261 // osd down or addr change?
1262 if (!osdmap->is_up(s->osd) ||
1263 (s->con &&
11fdf7f2 1264 s->con->get_peer_addrs() != osdmap->get_addrs(s->osd))) {
7c673cae
FG
1265 close_session(s);
1266 }
1267 }
1268
11fdf7f2 1269 ceph_assert(e == osdmap->get_epoch());
7c673cae
FG
1270 }
1271
1272 } else {
1273 // first map. we want the full thing.
1274 if (m->maps.count(m->get_last())) {
f67539c2 1275 for (auto p = osd_sessions.begin();
7c673cae
FG
1276 p != osd_sessions.end(); ++p) {
1277 OSDSession *s = p->second;
1278 _scan_requests(s, false, false, NULL, need_resend,
9f95a23c 1279 need_resend_linger, need_resend_command, sul);
7c673cae
FG
1280 }
1281 ldout(cct, 3) << "handle_osd_map decoding full epoch "
1282 << m->get_last() << dendl;
1283 osdmap->decode(m->maps[m->get_last()]);
9f95a23c 1284 prune_pg_mapping(osdmap->get_pools());
7c673cae
FG
1285
1286 _scan_requests(homeless_session, false, false, NULL,
1287 need_resend, need_resend_linger,
9f95a23c 1288 need_resend_command, sul);
7c673cae
FG
1289 } else {
1290 ldout(cct, 3) << "handle_osd_map hmm, i want a full map, requesting"
1291 << dendl;
1292 monc->sub_want("osdmap", 0, CEPH_SUBSCRIBE_ONETIME);
1293 monc->renew_subs();
1294 }
1295 }
1296 }
1297
1298 // make sure need_resend targets reflect latest map
1299 for (auto p = need_resend.begin(); p != need_resend.end(); ) {
1300 Op *op = p->second;
1301 if (op->target.epoch < osdmap->get_epoch()) {
1302 ldout(cct, 10) << __func__ << " checking op " << p->first << dendl;
1303 int r = _calc_target(&op->target, nullptr);
1304 if (r == RECALC_OP_TARGET_POOL_DNE) {
1305 p = need_resend.erase(p);
1306 _check_op_pool_dne(op, nullptr);
1307 } else {
1308 ++p;
1309 }
1310 } else {
1311 ++p;
1312 }
1313 }
1314
1315 bool pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
1316 bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || _osdmap_full_flag()
1317 || _osdmap_has_pool_full();
1318
1319 // was/is paused?
1320 if (was_pauserd || was_pausewr || pauserd || pausewr ||
1321 osdmap->get_epoch() < epoch_barrier) {
1322 _maybe_request_map();
1323 }
1324
1325 // resend requests
f67539c2 1326 for (auto p = need_resend.begin();
7c673cae 1327 p != need_resend.end(); ++p) {
f67539c2
TL
1328 auto op = p->second;
1329 auto s = op->session;
7c673cae
FG
1330 bool mapped_session = false;
1331 if (!s) {
1332 int r = _map_session(&op->target, &s, sul);
11fdf7f2 1333 ceph_assert(r == 0);
7c673cae
FG
1334 mapped_session = true;
1335 } else {
1336 get_session(s);
1337 }
f67539c2 1338 std::unique_lock sl(s->lock);
7c673cae
FG
1339 if (mapped_session) {
1340 _session_op_assign(s, op);
1341 }
1342 if (op->should_resend) {
1343 if (!op->session->is_homeless() && !op->target.paused) {
1344 logger->inc(l_osdc_op_resend);
1345 _send_op(op);
1346 }
1347 } else {
1348 _op_cancel_map_check(op);
1349 _cancel_linger_op(op);
1350 }
1351 sl.unlock();
1352 put_session(s);
1353 }
f67539c2 1354 for (auto p = need_resend_linger.begin();
7c673cae
FG
1355 p != need_resend_linger.end(); ++p) {
1356 LingerOp *op = *p;
11fdf7f2 1357 ceph_assert(op->session);
7c673cae
FG
1358 if (!op->session->is_homeless()) {
1359 logger->inc(l_osdc_linger_resend);
1360 _send_linger(op, sul);
1361 }
1362 }
f67539c2 1363 for (auto p = need_resend_command.begin();
7c673cae 1364 p != need_resend_command.end(); ++p) {
f67539c2 1365 auto c = p->second;
7c673cae
FG
1366 if (c->target.osd >= 0) {
1367 _assign_command_session(c, sul);
1368 if (c->session && !c->session->is_homeless()) {
1369 _send_command(c);
1370 }
1371 }
1372 }
1373
1374 _dump_active();
1375
1376 // finish any Contexts that were waiting on a map update
f67539c2 1377 auto p = waiting_for_map.begin();
7c673cae
FG
1378 while (p != waiting_for_map.end() &&
1379 p->first <= osdmap->get_epoch()) {
1380 //go through the list and call the onfinish methods
f67539c2
TL
1381 for (auto& [c, ec] : p->second) {
1382 ca::post(std::move(c), ec);
7c673cae
FG
1383 }
1384 waiting_for_map.erase(p++);
1385 }
1386
1387 monc->sub_got("osdmap", osdmap->get_epoch());
1388
1389 if (!waiting_for_map.empty()) {
1390 _maybe_request_map();
1391 }
1392}
1393
f67539c2 1394void Objecter::enable_blocklist_events()
31f18b77
FG
1395{
1396 unique_lock wl(rwlock);
1397
f67539c2 1398 blocklist_events_enabled = true;
31f18b77
FG
1399}
1400
f67539c2 1401void Objecter::consume_blocklist_events(std::set<entity_addr_t> *events)
31f18b77
FG
1402{
1403 unique_lock wl(rwlock);
1404
1405 if (events->empty()) {
f67539c2 1406 events->swap(blocklist_events);
31f18b77 1407 } else {
f67539c2 1408 for (const auto &i : blocklist_events) {
31f18b77
FG
1409 events->insert(i);
1410 }
f67539c2 1411 blocklist_events.clear();
31f18b77
FG
1412 }
1413}
1414
f67539c2 1415void Objecter::emit_blocklist_events(const OSDMap::Incremental &inc)
31f18b77 1416{
f67539c2 1417 if (!blocklist_events_enabled) {
31f18b77
FG
1418 return;
1419 }
1420
f67539c2
TL
1421 for (const auto &i : inc.new_blocklist) {
1422 blocklist_events.insert(i.first);
31f18b77
FG
1423 }
1424}
1425
f67539c2 1426void Objecter::emit_blocklist_events(const OSDMap &old_osd_map,
31f18b77
FG
1427 const OSDMap &new_osd_map)
1428{
f67539c2 1429 if (!blocklist_events_enabled) {
31f18b77
FG
1430 return;
1431 }
1432
1433 std::set<entity_addr_t> old_set;
1434 std::set<entity_addr_t> new_set;
33c7a0ef
TL
1435 std::set<entity_addr_t> old_range_set;
1436 std::set<entity_addr_t> new_range_set;
31f18b77 1437
33c7a0ef
TL
1438 old_osd_map.get_blocklist(&old_set, &old_range_set);
1439 new_osd_map.get_blocklist(&new_set, &new_range_set);
31f18b77
FG
1440
1441 std::set<entity_addr_t> delta_set;
1442 std::set_difference(
1443 new_set.begin(), new_set.end(), old_set.begin(), old_set.end(),
1444 std::inserter(delta_set, delta_set.begin()));
33c7a0ef
TL
1445 std::set_difference(
1446 new_range_set.begin(), new_range_set.end(),
1447 old_range_set.begin(), old_range_set.end(),
1448 std::inserter(delta_set, delta_set.begin()));
f67539c2 1449 blocklist_events.insert(delta_set.begin(), delta_set.end());
31f18b77
FG
1450}
1451
7c673cae
FG
1452// op pool check
1453
f67539c2
TL
1454void Objecter::CB_Op_Map_Latest::operator()(bs::error_code e,
1455 version_t latest, version_t)
7c673cae 1456{
f67539c2
TL
1457 if (e == bs::errc::resource_unavailable_try_again ||
1458 e == bs::errc::operation_canceled)
7c673cae
FG
1459 return;
1460
1461 lgeneric_subdout(objecter->cct, objecter, 10)
f67539c2 1462 << "op_map_latest r=" << e << " tid=" << tid
7c673cae
FG
1463 << " latest " << latest << dendl;
1464
f67539c2 1465 unique_lock wl(objecter->rwlock);
7c673cae 1466
f67539c2 1467 auto iter = objecter->check_latest_map_ops.find(tid);
7c673cae
FG
1468 if (iter == objecter->check_latest_map_ops.end()) {
1469 lgeneric_subdout(objecter->cct, objecter, 10)
1470 << "op_map_latest op "<< tid << " not found" << dendl;
1471 return;
1472 }
1473
1474 Op *op = iter->second;
1475 objecter->check_latest_map_ops.erase(iter);
1476
1477 lgeneric_subdout(objecter->cct, objecter, 20)
1478 << "op_map_latest op "<< op << dendl;
1479
1480 if (op->map_dne_bound == 0)
1481 op->map_dne_bound = latest;
1482
f67539c2 1483 unique_lock sl(op->session->lock, defer_lock);
7c673cae
FG
1484 objecter->_check_op_pool_dne(op, &sl);
1485
1486 op->put();
1487}
1488
1489int Objecter::pool_snap_by_name(int64_t poolid, const char *snap_name,
1490 snapid_t *snap) const
1491{
1492 shared_lock rl(rwlock);
1493
1494 auto& pools = osdmap->get_pools();
1495 auto iter = pools.find(poolid);
1496 if (iter == pools.end()) {
1497 return -ENOENT;
1498 }
1499 const pg_pool_t& pg_pool = iter->second;
1500 for (auto p = pg_pool.snaps.begin();
1501 p != pg_pool.snaps.end();
1502 ++p) {
1503 if (p->second.name == snap_name) {
1504 *snap = p->first;
1505 return 0;
1506 }
1507 }
1508 return -ENOENT;
1509}
1510
1511int Objecter::pool_snap_get_info(int64_t poolid, snapid_t snap,
1512 pool_snap_info_t *info) const
1513{
1514 shared_lock rl(rwlock);
1515
1516 auto& pools = osdmap->get_pools();
1517 auto iter = pools.find(poolid);
1518 if (iter == pools.end()) {
1519 return -ENOENT;
1520 }
1521 const pg_pool_t& pg_pool = iter->second;
1522 auto p = pg_pool.snaps.find(snap);
1523 if (p == pg_pool.snaps.end())
1524 return -ENOENT;
1525 *info = p->second;
1526
1527 return 0;
1528}
1529
1530int Objecter::pool_snap_list(int64_t poolid, vector<uint64_t> *snaps)
1531{
1532 shared_lock rl(rwlock);
1533
1534 const pg_pool_t *pi = osdmap->get_pg_pool(poolid);
1535 if (!pi)
1536 return -ENOENT;
f67539c2 1537 for (auto p = pi->snaps.begin();
7c673cae
FG
1538 p != pi->snaps.end();
1539 ++p) {
1540 snaps->push_back(p->first);
1541 }
1542 return 0;
1543}
1544
1545// sl may be unlocked.
f67539c2 1546void Objecter::_check_op_pool_dne(Op *op, std::unique_lock<std::shared_mutex> *sl)
7c673cae
FG
1547{
1548 // rwlock is locked unique
1549
1550 if (op->target.pool_ever_existed) {
1551 // the pool previously existed and now it does not, which means it
1552 // was deleted.
1553 op->map_dne_bound = osdmap->get_epoch();
1554 ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
1555 << " pool previously exists but now does not"
1556 << dendl;
1557 } else {
1558 ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
1559 << " current " << osdmap->get_epoch()
1560 << " map_dne_bound " << op->map_dne_bound
1561 << dendl;
1562 }
1563 if (op->map_dne_bound > 0) {
1564 if (osdmap->get_epoch() >= op->map_dne_bound) {
1565 // we had a new enough map
1566 ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
1567 << " concluding pool " << op->target.base_pgid.pool()
1568 << " dne" << dendl;
f67539c2 1569 if (op->has_completion()) {
11fdf7f2 1570 num_in_flight--;
f67539c2 1571 op->complete(osdc_errc::pool_dne, -ENOENT);
7c673cae
FG
1572 }
1573
1574 OSDSession *s = op->session;
1575 if (s) {
11fdf7f2
TL
1576 ceph_assert(s != NULL);
1577 ceph_assert(sl->mutex() == &s->lock);
7c673cae
FG
1578 bool session_locked = sl->owns_lock();
1579 if (!session_locked) {
1580 sl->lock();
1581 }
1582 _finish_op(op, 0);
1583 if (!session_locked) {
1584 sl->unlock();
1585 }
1586 } else {
1587 _finish_op(op, 0); // no session
1588 }
1589 }
1590 } else {
1591 _send_op_map_check(op);
1592 }
1593}
1594
20effc67
TL
1595// sl may be unlocked.
1596void Objecter::_check_op_pool_eio(Op *op, std::unique_lock<std::shared_mutex> *sl)
1597{
1598 // rwlock is locked unique
1599
1600 // we had a new enough map
1601 ldout(cct, 10) << "check_op_pool_eio tid " << op->tid
1602 << " concluding pool " << op->target.base_pgid.pool()
1603 << " has eio" << dendl;
1604 if (op->has_completion()) {
1605 num_in_flight--;
1606 op->complete(osdc_errc::pool_eio, -EIO);
1607 }
1608
1609 OSDSession *s = op->session;
1610 if (s) {
1611 ceph_assert(s != NULL);
1612 ceph_assert(sl->mutex() == &s->lock);
1613 bool session_locked = sl->owns_lock();
1614 if (!session_locked) {
1615 sl->lock();
1616 }
1617 _finish_op(op, 0);
1618 if (!session_locked) {
1619 sl->unlock();
1620 }
1621 } else {
1622 _finish_op(op, 0); // no session
1623 }
1624}
1625
7c673cae
FG
1626void Objecter::_send_op_map_check(Op *op)
1627{
1628 // rwlock is locked unique
1629 // ask the monitor
1630 if (check_latest_map_ops.count(op->tid) == 0) {
1631 op->get();
1632 check_latest_map_ops[op->tid] = op;
f67539c2 1633 monc->get_version("osdmap", CB_Op_Map_Latest(this, op->tid));
7c673cae
FG
1634 }
1635}
1636
1637void Objecter::_op_cancel_map_check(Op *op)
1638{
1639 // rwlock is locked unique
f67539c2 1640 auto iter = check_latest_map_ops.find(op->tid);
7c673cae
FG
1641 if (iter != check_latest_map_ops.end()) {
1642 Op *op = iter->second;
1643 op->put();
1644 check_latest_map_ops.erase(iter);
1645 }
1646}
1647
1648// linger pool check
1649
f67539c2
TL
1650void Objecter::CB_Linger_Map_Latest::operator()(bs::error_code e,
1651 version_t latest,
1652 version_t)
7c673cae 1653{
f67539c2
TL
1654 if (e == bs::errc::resource_unavailable_try_again ||
1655 e == bs::errc::operation_canceled) {
7c673cae
FG
1656 // ignore callback; we will retry in resend_mon_ops()
1657 return;
1658 }
1659
1660 unique_lock wl(objecter->rwlock);
1661
f67539c2 1662 auto iter = objecter->check_latest_map_lingers.find(linger_id);
7c673cae
FG
1663 if (iter == objecter->check_latest_map_lingers.end()) {
1664 return;
1665 }
1666
f67539c2 1667 auto op = iter->second;
7c673cae
FG
1668 objecter->check_latest_map_lingers.erase(iter);
1669
1670 if (op->map_dne_bound == 0)
1671 op->map_dne_bound = latest;
1672
1673 bool unregister;
1674 objecter->_check_linger_pool_dne(op, &unregister);
1675
1676 if (unregister) {
1677 objecter->_linger_cancel(op);
1678 }
1679
1680 op->put();
1681}
1682
1683void Objecter::_check_linger_pool_dne(LingerOp *op, bool *need_unregister)
1684{
1685 // rwlock is locked unique
1686
1687 *need_unregister = false;
1688
1689 if (op->register_gen > 0) {
1690 ldout(cct, 10) << "_check_linger_pool_dne linger_id " << op->linger_id
1691 << " pool previously existed but now does not"
1692 << dendl;
1693 op->map_dne_bound = osdmap->get_epoch();
1694 } else {
1695 ldout(cct, 10) << "_check_linger_pool_dne linger_id " << op->linger_id
1696 << " current " << osdmap->get_epoch()
1697 << " map_dne_bound " << op->map_dne_bound
1698 << dendl;
1699 }
1700 if (op->map_dne_bound > 0) {
1701 if (osdmap->get_epoch() >= op->map_dne_bound) {
f67539c2 1702 std::unique_lock wl{op->watch_lock};
7c673cae 1703 if (op->on_reg_commit) {
f67539c2
TL
1704 op->on_reg_commit->defer(std::move(op->on_reg_commit),
1705 osdc_errc::pool_dne, cb::list{});
28e407b8
AA
1706 op->on_reg_commit = nullptr;
1707 }
1708 if (op->on_notify_finish) {
f67539c2
TL
1709 op->on_notify_finish->defer(std::move(op->on_notify_finish),
1710 osdc_errc::pool_dne, cb::list{});
28e407b8 1711 op->on_notify_finish = nullptr;
7c673cae
FG
1712 }
1713 *need_unregister = true;
1714 }
1715 } else {
1716 _send_linger_map_check(op);
1717 }
1718}
1719
20effc67
TL
1720void Objecter::_check_linger_pool_eio(LingerOp *op)
1721{
1722 // rwlock is locked unique
1723
1724 std::unique_lock wl{op->watch_lock};
1725 if (op->on_reg_commit) {
1726 op->on_reg_commit->defer(std::move(op->on_reg_commit),
1727 osdc_errc::pool_dne, cb::list{});
1728 op->on_reg_commit = nullptr;
1729 }
1730 if (op->on_notify_finish) {
1731 op->on_notify_finish->defer(std::move(op->on_notify_finish),
1732 osdc_errc::pool_dne, cb::list{});
1733 op->on_notify_finish = nullptr;
1734 }
1735}
1736
7c673cae
FG
1737void Objecter::_send_linger_map_check(LingerOp *op)
1738{
1739 // ask the monitor
1740 if (check_latest_map_lingers.count(op->linger_id) == 0) {
1741 op->get();
1742 check_latest_map_lingers[op->linger_id] = op;
f67539c2 1743 monc->get_version("osdmap", CB_Linger_Map_Latest(this, op->linger_id));
7c673cae
FG
1744 }
1745}
1746
1747void Objecter::_linger_cancel_map_check(LingerOp *op)
1748{
1749 // rwlock is locked unique
1750
f67539c2 1751 auto iter = check_latest_map_lingers.find(op->linger_id);
7c673cae
FG
1752 if (iter != check_latest_map_lingers.end()) {
1753 LingerOp *op = iter->second;
1754 op->put();
1755 check_latest_map_lingers.erase(iter);
1756 }
1757}
1758
1759// command pool check
1760
f67539c2
TL
1761void Objecter::CB_Command_Map_Latest::operator()(bs::error_code e,
1762 version_t latest, version_t)
7c673cae 1763{
f67539c2
TL
1764 if (e == bs::errc::resource_unavailable_try_again ||
1765 e == bs::errc::operation_canceled) {
7c673cae
FG
1766 // ignore callback; we will retry in resend_mon_ops()
1767 return;
1768 }
1769
1770 unique_lock wl(objecter->rwlock);
1771
f67539c2 1772 auto iter = objecter->check_latest_map_commands.find(tid);
7c673cae
FG
1773 if (iter == objecter->check_latest_map_commands.end()) {
1774 return;
1775 }
1776
f67539c2 1777 auto c = iter->second;
7c673cae
FG
1778 objecter->check_latest_map_commands.erase(iter);
1779
1780 if (c->map_dne_bound == 0)
1781 c->map_dne_bound = latest;
1782
f67539c2 1783 unique_lock sul(c->session->lock);
7c673cae 1784 objecter->_check_command_map_dne(c);
28e407b8 1785 sul.unlock();
7c673cae
FG
1786
1787 c->put();
1788}
1789
1790void Objecter::_check_command_map_dne(CommandOp *c)
1791{
1792 // rwlock is locked unique
28e407b8 1793 // session is locked unique
7c673cae
FG
1794
1795 ldout(cct, 10) << "_check_command_map_dne tid " << c->tid
1796 << " current " << osdmap->get_epoch()
1797 << " map_dne_bound " << c->map_dne_bound
1798 << dendl;
1799 if (c->map_dne_bound > 0) {
1800 if (osdmap->get_epoch() >= c->map_dne_bound) {
f67539c2
TL
1801 _finish_command(c, osdcode(c->map_check_error),
1802 std::move(c->map_check_error_str), {});
7c673cae
FG
1803 }
1804 } else {
1805 _send_command_map_check(c);
1806 }
1807}
1808
1809void Objecter::_send_command_map_check(CommandOp *c)
1810{
1811 // rwlock is locked unique
28e407b8 1812 // session is locked unique
7c673cae
FG
1813
1814 // ask the monitor
1815 if (check_latest_map_commands.count(c->tid) == 0) {
1816 c->get();
1817 check_latest_map_commands[c->tid] = c;
f67539c2 1818 monc->get_version("osdmap", CB_Command_Map_Latest(this, c->tid));
7c673cae
FG
1819 }
1820}
1821
1822void Objecter::_command_cancel_map_check(CommandOp *c)
1823{
1824 // rwlock is locked uniqe
1825
f67539c2 1826 auto iter = check_latest_map_commands.find(c->tid);
7c673cae 1827 if (iter != check_latest_map_commands.end()) {
f67539c2 1828 auto c = iter->second;
7c673cae
FG
1829 c->put();
1830 check_latest_map_commands.erase(iter);
1831 }
1832}
1833
1834
1835/**
1836 * Look up OSDSession by OSD id.
1837 *
1838 * @returns 0 on success, or -EAGAIN if the lock context requires
1839 * promotion to write.
1840 */
f67539c2
TL
1841int Objecter::_get_session(int osd, OSDSession **session,
1842 shunique_lock<ceph::shared_mutex>& sul)
7c673cae 1843{
11fdf7f2 1844 ceph_assert(sul && sul.mutex() == &rwlock);
7c673cae
FG
1845
1846 if (osd < 0) {
1847 *session = homeless_session;
1848 ldout(cct, 20) << __func__ << " osd=" << osd << " returning homeless"
1849 << dendl;
1850 return 0;
1851 }
1852
f67539c2 1853 auto p = osd_sessions.find(osd);
7c673cae 1854 if (p != osd_sessions.end()) {
f67539c2 1855 auto s = p->second;
7c673cae
FG
1856 s->get();
1857 *session = s;
1858 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << osd << " "
1859 << s->get_nref() << dendl;
1860 return 0;
1861 }
1862 if (!sul.owns_lock()) {
1863 return -EAGAIN;
1864 }
f67539c2 1865 auto s = new OSDSession(cct, osd);
7c673cae 1866 osd_sessions[osd] = s;
11fdf7f2
TL
1867 s->con = messenger->connect_to_osd(osdmap->get_addrs(osd));
1868 s->con->set_priv(RefCountedPtr{s});
7c673cae
FG
1869 logger->inc(l_osdc_osd_session_open);
1870 logger->set(l_osdc_osd_sessions, osd_sessions.size());
1871 s->get();
1872 *session = s;
1873 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << osd << " "
1874 << s->get_nref() << dendl;
1875 return 0;
1876}
1877
1878void Objecter::put_session(Objecter::OSDSession *s)
1879{
1880 if (s && !s->is_homeless()) {
1881 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << s->osd << " "
1882 << s->get_nref() << dendl;
1883 s->put();
1884 }
1885}
1886
1887void Objecter::get_session(Objecter::OSDSession *s)
1888{
11fdf7f2 1889 ceph_assert(s != NULL);
7c673cae
FG
1890
1891 if (!s->is_homeless()) {
1892 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << s->osd << " "
1893 << s->get_nref() << dendl;
1894 s->get();
1895 }
1896}
1897
1898void Objecter::_reopen_session(OSDSession *s)
1899{
91327a77 1900 // rwlock is locked unique
7c673cae
FG
1901 // s->lock is locked
1902
11fdf7f2 1903 auto addrs = osdmap->get_addrs(s->osd);
7c673cae 1904 ldout(cct, 10) << "reopen_session osd." << s->osd << " session, addr now "
11fdf7f2 1905 << addrs << dendl;
7c673cae
FG
1906 if (s->con) {
1907 s->con->set_priv(NULL);
1908 s->con->mark_down();
1909 logger->inc(l_osdc_osd_session_close);
1910 }
11fdf7f2
TL
1911 s->con = messenger->connect_to_osd(addrs);
1912 s->con->set_priv(RefCountedPtr{s});
7c673cae
FG
1913 s->incarnation++;
1914 logger->inc(l_osdc_osd_session_open);
1915}
1916
1917void Objecter::close_session(OSDSession *s)
1918{
1919 // rwlock is locked unique
1920
1921 ldout(cct, 10) << "close_session for osd." << s->osd << dendl;
1922 if (s->con) {
1923 s->con->set_priv(NULL);
1924 s->con->mark_down();
1925 logger->inc(l_osdc_osd_session_close);
1926 }
f67539c2 1927 unique_lock sl(s->lock);
7c673cae
FG
1928
1929 std::list<LingerOp*> homeless_lingers;
1930 std::list<CommandOp*> homeless_commands;
1931 std::list<Op*> homeless_ops;
1932
1933 while (!s->linger_ops.empty()) {
f67539c2 1934 auto i = s->linger_ops.begin();
7c673cae
FG
1935 ldout(cct, 10) << " linger_op " << i->first << dendl;
1936 homeless_lingers.push_back(i->second);
1937 _session_linger_op_remove(s, i->second);
1938 }
1939
1940 while (!s->ops.empty()) {
f67539c2 1941 auto i = s->ops.begin();
7c673cae
FG
1942 ldout(cct, 10) << " op " << i->first << dendl;
1943 homeless_ops.push_back(i->second);
1944 _session_op_remove(s, i->second);
1945 }
1946
1947 while (!s->command_ops.empty()) {
f67539c2 1948 auto i = s->command_ops.begin();
7c673cae
FG
1949 ldout(cct, 10) << " command_op " << i->first << dendl;
1950 homeless_commands.push_back(i->second);
1951 _session_command_op_remove(s, i->second);
1952 }
1953
1954 osd_sessions.erase(s->osd);
1955 sl.unlock();
1956 put_session(s);
1957
1958 // Assign any leftover ops to the homeless session
1959 {
f67539c2
TL
1960 unique_lock hsl(homeless_session->lock);
1961 for (auto i = homeless_lingers.begin();
7c673cae
FG
1962 i != homeless_lingers.end(); ++i) {
1963 _session_linger_op_assign(homeless_session, *i);
1964 }
f67539c2 1965 for (auto i = homeless_ops.begin();
7c673cae
FG
1966 i != homeless_ops.end(); ++i) {
1967 _session_op_assign(homeless_session, *i);
1968 }
f67539c2 1969 for (auto i = homeless_commands.begin();
7c673cae
FG
1970 i != homeless_commands.end(); ++i) {
1971 _session_command_op_assign(homeless_session, *i);
1972 }
1973 }
1974
1975 logger->set(l_osdc_osd_sessions, osd_sessions.size());
1976}
1977
9f95a23c 1978void Objecter::wait_for_osd_map(epoch_t e)
7c673cae
FG
1979{
1980 unique_lock l(rwlock);
9f95a23c 1981 if (osdmap->get_epoch() >= e) {
7c673cae
FG
1982 l.unlock();
1983 return;
1984 }
1985
f67539c2
TL
1986 ca::waiter<bs::error_code> w;
1987 waiting_for_map[e].emplace_back(OpCompletion::create(
1988 service.get_executor(),
1989 w.ref()),
1990 bs::error_code{});
7c673cae 1991 l.unlock();
f67539c2 1992 w.wait();
7c673cae
FG
1993}
1994
f67539c2
TL
1995void Objecter::_get_latest_version(epoch_t oldest, epoch_t newest,
1996 std::unique_ptr<OpCompletion> fin,
1997 std::unique_lock<ceph::shared_mutex>&& l)
7c673cae 1998{
f67539c2 1999 ceph_assert(fin);
7c673cae 2000 if (osdmap->get_epoch() >= newest) {
11fdf7f2 2001 ldout(cct, 10) << __func__ << " latest " << newest << ", have it" << dendl;
f67539c2
TL
2002 l.unlock();
2003 ca::defer(std::move(fin), bs::error_code{});
2004 } else {
2005 ldout(cct, 10) << __func__ << " latest " << newest << ", waiting" << dendl;
2006 _wait_for_new_map(std::move(fin), newest, bs::error_code{});
2007 l.unlock();
7c673cae 2008 }
7c673cae
FG
2009}
2010
2011void Objecter::maybe_request_map()
2012{
2013 shared_lock rl(rwlock);
2014 _maybe_request_map();
2015}
2016
2017void Objecter::_maybe_request_map()
2018{
2019 // rwlock is locked
2020 int flag = 0;
2021 if (_osdmap_full_flag()
2022 || osdmap->test_flag(CEPH_OSDMAP_PAUSERD)
2023 || osdmap->test_flag(CEPH_OSDMAP_PAUSEWR)) {
2024 ldout(cct, 10) << "_maybe_request_map subscribing (continuous) to next "
2025 "osd map (FULL flag is set)" << dendl;
2026 } else {
2027 ldout(cct, 10)
2028 << "_maybe_request_map subscribing (onetime) to next osd map" << dendl;
2029 flag = CEPH_SUBSCRIBE_ONETIME;
2030 }
2031 epoch_t epoch = osdmap->get_epoch() ? osdmap->get_epoch()+1 : 0;
2032 if (monc->sub_want("osdmap", epoch, flag)) {
2033 monc->renew_subs();
2034 }
2035}
2036
f67539c2
TL
2037void Objecter::_wait_for_new_map(std::unique_ptr<OpCompletion> c, epoch_t epoch,
2038 bs::error_code ec)
7c673cae
FG
2039{
2040 // rwlock is locked unique
f67539c2 2041 waiting_for_map[epoch].emplace_back(std::move(c), ec);
7c673cae
FG
2042 _maybe_request_map();
2043}
2044
2045
2046/**
2047 * Use this together with wait_for_map: this is a pre-check to avoid
2048 * allocating a Context for wait_for_map if we can see that we
2049 * definitely already have the epoch.
2050 *
2051 * This does *not* replace the need to handle the return value of
2052 * wait_for_map: just because we don't have it in this pre-check
2053 * doesn't mean we won't have it when calling back into wait_for_map,
2054 * since the objecter lock is dropped in between.
2055 */
2056bool Objecter::have_map(const epoch_t epoch)
2057{
2058 shared_lock rl(rwlock);
2059 if (osdmap->get_epoch() >= epoch) {
2060 return true;
2061 } else {
2062 return false;
2063 }
2064}
2065
7c673cae
FG
2066void Objecter::_kick_requests(OSDSession *session,
2067 map<uint64_t, LingerOp *>& lresend)
2068{
2069 // rwlock is locked unique
2070
2071 // clear backoffs
2072 session->backoffs.clear();
2073 session->backoffs_by_id.clear();
2074
2075 // resend ops
2076 map<ceph_tid_t,Op*> resend; // resend in tid order
f67539c2 2077 for (auto p = session->ops.begin(); p != session->ops.end();) {
7c673cae
FG
2078 Op *op = p->second;
2079 ++p;
7c673cae
FG
2080 if (op->should_resend) {
2081 if (!op->target.paused)
2082 resend[op->tid] = op;
2083 } else {
2084 _op_cancel_map_check(op);
2085 _cancel_linger_op(op);
2086 }
2087 }
2088
11fdf7f2 2089 logger->inc(l_osdc_op_resend, resend.size());
7c673cae
FG
2090 while (!resend.empty()) {
2091 _send_op(resend.begin()->second);
2092 resend.erase(resend.begin());
2093 }
2094
2095 // resend lingers
11fdf7f2 2096 logger->inc(l_osdc_linger_resend, session->linger_ops.size());
f67539c2 2097 for (auto j = session->linger_ops.begin();
7c673cae
FG
2098 j != session->linger_ops.end(); ++j) {
2099 LingerOp *op = j->second;
2100 op->get();
11fdf7f2 2101 ceph_assert(lresend.count(j->first) == 0);
7c673cae
FG
2102 lresend[j->first] = op;
2103 }
2104
2105 // resend commands
11fdf7f2 2106 logger->inc(l_osdc_command_resend, session->command_ops.size());
7c673cae 2107 map<uint64_t,CommandOp*> cresend; // resend in order
f67539c2 2108 for (auto k = session->command_ops.begin();
7c673cae 2109 k != session->command_ops.end(); ++k) {
7c673cae
FG
2110 cresend[k->first] = k->second;
2111 }
2112 while (!cresend.empty()) {
2113 _send_command(cresend.begin()->second);
2114 cresend.erase(cresend.begin());
2115 }
2116}
2117
2118void Objecter::_linger_ops_resend(map<uint64_t, LingerOp *>& lresend,
f67539c2 2119 unique_lock<ceph::shared_mutex>& ul)
7c673cae 2120{
11fdf7f2 2121 ceph_assert(ul.owns_lock());
7c673cae
FG
2122 shunique_lock sul(std::move(ul));
2123 while (!lresend.empty()) {
2124 LingerOp *op = lresend.begin()->second;
2125 if (!op->canceled) {
2126 _send_linger(op, sul);
2127 }
2128 op->put();
2129 lresend.erase(lresend.begin());
2130 }
11fdf7f2 2131 ul = sul.release_to_unique();
7c673cae
FG
2132}
2133
2134void Objecter::start_tick()
2135{
11fdf7f2 2136 ceph_assert(tick_event == 0);
7c673cae
FG
2137 tick_event =
2138 timer.add_event(ceph::make_timespan(cct->_conf->objecter_tick_interval),
2139 &Objecter::tick, this);
2140}
2141
2142void Objecter::tick()
2143{
2144 shared_lock rl(rwlock);
2145
2146 ldout(cct, 10) << "tick" << dendl;
2147
2148 // we are only called by C_Tick
2149 tick_event = 0;
2150
31f18b77 2151 if (!initialized) {
7c673cae
FG
2152 // we raced with shutdown
2153 ldout(cct, 10) << __func__ << " raced with shutdown" << dendl;
2154 return;
2155 }
2156
2157 set<OSDSession*> toping;
2158
2159
2160 // look for laggy requests
11fdf7f2 2161 auto cutoff = ceph::coarse_mono_clock::now();
7c673cae
FG
2162 cutoff -= ceph::make_timespan(cct->_conf->objecter_timeout); // timeout
2163
2164 unsigned laggy_ops = 0;
2165
f67539c2 2166 for (auto siter = osd_sessions.begin();
7c673cae 2167 siter != osd_sessions.end(); ++siter) {
f67539c2
TL
2168 auto s = siter->second;
2169 scoped_lock l(s->lock);
7c673cae 2170 bool found = false;
f67539c2
TL
2171 for (auto p = s->ops.begin(); p != s->ops.end(); ++p) {
2172 auto op = p->second;
11fdf7f2 2173 ceph_assert(op->session);
7c673cae
FG
2174 if (op->stamp < cutoff) {
2175 ldout(cct, 2) << " tid " << p->first << " on osd." << op->session->osd
2176 << " is laggy" << dendl;
2177 found = true;
2178 ++laggy_ops;
2179 }
2180 }
f67539c2 2181 for (auto p = s->linger_ops.begin();
7c673cae
FG
2182 p != s->linger_ops.end();
2183 ++p) {
f67539c2
TL
2184 auto op = p->second;
2185 std::unique_lock wl(op->watch_lock);
11fdf7f2 2186 ceph_assert(op->session);
7c673cae
FG
2187 ldout(cct, 10) << " pinging osd that serves lingering tid " << p->first
2188 << " (osd." << op->session->osd << ")" << dendl;
2189 found = true;
2190 if (op->is_watch && op->registered && !op->last_error)
2191 _send_linger_ping(op);
2192 }
f67539c2 2193 for (auto p = s->command_ops.begin();
7c673cae
FG
2194 p != s->command_ops.end();
2195 ++p) {
f67539c2 2196 auto op = p->second;
11fdf7f2 2197 ceph_assert(op->session);
7c673cae
FG
2198 ldout(cct, 10) << " pinging osd that serves command tid " << p->first
2199 << " (osd." << op->session->osd << ")" << dendl;
2200 found = true;
2201 }
2202 if (found)
2203 toping.insert(s);
2204 }
31f18b77 2205 if (num_homeless_ops || !toping.empty()) {
7c673cae
FG
2206 _maybe_request_map();
2207 }
2208
2209 logger->set(l_osdc_op_laggy, laggy_ops);
2210 logger->set(l_osdc_osd_laggy, toping.size());
2211
2212 if (!toping.empty()) {
2213 // send a ping to these osds, to ensure we detect any session resets
2214 // (osd reply message policy is lossy)
f67539c2 2215 for (auto i = toping.begin(); i != toping.end(); ++i) {
7c673cae
FG
2216 (*i)->con->send_message(new MPing);
2217 }
2218 }
2219
11fdf7f2 2220 // Make sure we don't reschedule if we wake up after shutdown
31f18b77 2221 if (initialized) {
7c673cae
FG
2222 tick_event = timer.reschedule_me(ceph::make_timespan(
2223 cct->_conf->objecter_tick_interval));
2224 }
2225}
2226
2227void Objecter::resend_mon_ops()
2228{
2229 unique_lock wl(rwlock);
2230
2231 ldout(cct, 10) << "resend_mon_ops" << dendl;
2232
f67539c2 2233 for (auto p = poolstat_ops.begin(); p != poolstat_ops.end(); ++p) {
7c673cae
FG
2234 _poolstat_submit(p->second);
2235 logger->inc(l_osdc_poolstat_resend);
2236 }
2237
f67539c2 2238 for (auto p = statfs_ops.begin(); p != statfs_ops.end(); ++p) {
7c673cae
FG
2239 _fs_stats_submit(p->second);
2240 logger->inc(l_osdc_statfs_resend);
2241 }
2242
f67539c2 2243 for (auto p = pool_ops.begin(); p != pool_ops.end(); ++p) {
7c673cae
FG
2244 _pool_op_submit(p->second);
2245 logger->inc(l_osdc_poolop_resend);
2246 }
2247
f67539c2 2248 for (auto p = check_latest_map_ops.begin();
7c673cae
FG
2249 p != check_latest_map_ops.end();
2250 ++p) {
f67539c2 2251 monc->get_version("osdmap", CB_Op_Map_Latest(this, p->second->tid));
7c673cae
FG
2252 }
2253
f67539c2 2254 for (auto p = check_latest_map_lingers.begin();
7c673cae
FG
2255 p != check_latest_map_lingers.end();
2256 ++p) {
f67539c2 2257 monc->get_version("osdmap", CB_Linger_Map_Latest(this, p->second->linger_id));
7c673cae
FG
2258 }
2259
f67539c2 2260 for (auto p = check_latest_map_commands.begin();
7c673cae
FG
2261 p != check_latest_map_commands.end();
2262 ++p) {
f67539c2 2263 monc->get_version("osdmap", CB_Command_Map_Latest(this, p->second->tid));
7c673cae
FG
2264 }
2265}
2266
2267// read | write ---------------------------
2268
2269void Objecter::op_submit(Op *op, ceph_tid_t *ptid, int *ctx_budget)
2270{
2271 shunique_lock rl(rwlock, ceph::acquire_shared);
2272 ceph_tid_t tid = 0;
2273 if (!ptid)
2274 ptid = &tid;
31f18b77 2275 op->trace.event("op submit");
7c673cae
FG
2276 _op_submit_with_budget(op, rl, ptid, ctx_budget);
2277}
2278
f67539c2
TL
2279void Objecter::_op_submit_with_budget(Op *op,
2280 shunique_lock<ceph::shared_mutex>& sul,
7c673cae
FG
2281 ceph_tid_t *ptid,
2282 int *ctx_budget)
2283{
11fdf7f2 2284 ceph_assert(initialized);
7c673cae 2285
11fdf7f2
TL
2286 ceph_assert(op->ops.size() == op->out_bl.size());
2287 ceph_assert(op->ops.size() == op->out_rval.size());
2288 ceph_assert(op->ops.size() == op->out_handler.size());
7c673cae
FG
2289
2290 // throttle. before we look at any state, because
2291 // _take_op_budget() may drop our lock while it blocks.
2292 if (!op->ctx_budgeted || (ctx_budget && (*ctx_budget == -1))) {
2293 int op_budget = _take_op_budget(op, sul);
2294 // take and pass out the budget for the first OP
2295 // in the context session
2296 if (ctx_budget && (*ctx_budget == -1)) {
2297 *ctx_budget = op_budget;
2298 }
2299 }
2300
2301 if (osd_timeout > timespan(0)) {
2302 if (op->tid == 0)
31f18b77 2303 op->tid = ++last_tid;
7c673cae
FG
2304 auto tid = op->tid;
2305 op->ontimeout = timer.add_event(osd_timeout,
2306 [this, tid]() {
2307 op_cancel(tid, -ETIMEDOUT); });
2308 }
2309
2310 _op_submit(op, sul, ptid);
2311}
2312
2313void Objecter::_send_op_account(Op *op)
2314{
31f18b77 2315 inflight_ops++;
7c673cae
FG
2316
2317 // add to gather set(s)
f67539c2 2318 if (op->has_completion()) {
31f18b77 2319 num_in_flight++;
7c673cae
FG
2320 } else {
2321 ldout(cct, 20) << " note: not requesting reply" << dendl;
2322 }
2323
2324 logger->inc(l_osdc_op_active);
2325 logger->inc(l_osdc_op);
f67539c2 2326 logger->inc(l_osdc_oplen_avg, op->ops.size());
7c673cae
FG
2327
2328 if ((op->target.flags & (CEPH_OSD_FLAG_READ | CEPH_OSD_FLAG_WRITE)) ==
2329 (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE))
2330 logger->inc(l_osdc_op_rmw);
2331 else if (op->target.flags & CEPH_OSD_FLAG_WRITE)
2332 logger->inc(l_osdc_op_w);
2333 else if (op->target.flags & CEPH_OSD_FLAG_READ)
2334 logger->inc(l_osdc_op_r);
2335
2336 if (op->target.flags & CEPH_OSD_FLAG_PGOP)
2337 logger->inc(l_osdc_op_pg);
2338
f67539c2 2339 for (auto p = op->ops.begin(); p != op->ops.end(); ++p) {
7c673cae
FG
2340 int code = l_osdc_osdop_other;
2341 switch (p->op.op) {
2342 case CEPH_OSD_OP_STAT: code = l_osdc_osdop_stat; break;
2343 case CEPH_OSD_OP_CREATE: code = l_osdc_osdop_create; break;
2344 case CEPH_OSD_OP_READ: code = l_osdc_osdop_read; break;
2345 case CEPH_OSD_OP_WRITE: code = l_osdc_osdop_write; break;
2346 case CEPH_OSD_OP_WRITEFULL: code = l_osdc_osdop_writefull; break;
2347 case CEPH_OSD_OP_WRITESAME: code = l_osdc_osdop_writesame; break;
2348 case CEPH_OSD_OP_APPEND: code = l_osdc_osdop_append; break;
2349 case CEPH_OSD_OP_ZERO: code = l_osdc_osdop_zero; break;
2350 case CEPH_OSD_OP_TRUNCATE: code = l_osdc_osdop_truncate; break;
2351 case CEPH_OSD_OP_DELETE: code = l_osdc_osdop_delete; break;
2352 case CEPH_OSD_OP_MAPEXT: code = l_osdc_osdop_mapext; break;
2353 case CEPH_OSD_OP_SPARSE_READ: code = l_osdc_osdop_sparse_read; break;
2354 case CEPH_OSD_OP_GETXATTR: code = l_osdc_osdop_getxattr; break;
2355 case CEPH_OSD_OP_SETXATTR: code = l_osdc_osdop_setxattr; break;
2356 case CEPH_OSD_OP_CMPXATTR: code = l_osdc_osdop_cmpxattr; break;
2357 case CEPH_OSD_OP_RMXATTR: code = l_osdc_osdop_rmxattr; break;
2358 case CEPH_OSD_OP_RESETXATTRS: code = l_osdc_osdop_resetxattrs; break;
7c673cae
FG
2359
2360 // OMAP read operations
2361 case CEPH_OSD_OP_OMAPGETVALS:
2362 case CEPH_OSD_OP_OMAPGETKEYS:
2363 case CEPH_OSD_OP_OMAPGETHEADER:
2364 case CEPH_OSD_OP_OMAPGETVALSBYKEYS:
2365 case CEPH_OSD_OP_OMAP_CMP: code = l_osdc_osdop_omap_rd; break;
2366
2367 // OMAP write operations
2368 case CEPH_OSD_OP_OMAPSETVALS:
2369 case CEPH_OSD_OP_OMAPSETHEADER: code = l_osdc_osdop_omap_wr; break;
2370
2371 // OMAP del operations
2372 case CEPH_OSD_OP_OMAPCLEAR:
2373 case CEPH_OSD_OP_OMAPRMKEYS: code = l_osdc_osdop_omap_del; break;
2374
2375 case CEPH_OSD_OP_CALL: code = l_osdc_osdop_call; break;
2376 case CEPH_OSD_OP_WATCH: code = l_osdc_osdop_watch; break;
2377 case CEPH_OSD_OP_NOTIFY: code = l_osdc_osdop_notify; break;
2378 }
2379 if (code)
2380 logger->inc(code);
2381 }
2382}
2383
f67539c2 2384void Objecter::_op_submit(Op *op, shunique_lock<ceph::shared_mutex>& sul, ceph_tid_t *ptid)
7c673cae
FG
2385{
2386 // rwlock is locked
2387
2388 ldout(cct, 10) << __func__ << " op " << op << dendl;
2389
2390 // pick target
11fdf7f2 2391 ceph_assert(op->session == NULL);
7c673cae
FG
2392 OSDSession *s = NULL;
2393
20effc67
TL
2394 bool check_for_latest_map = false;
2395 int r = _calc_target(&op->target, nullptr);
2396 switch(r) {
2397 case RECALC_OP_TARGET_POOL_DNE:
2398 check_for_latest_map = true;
2399 break;
2400 case RECALC_OP_TARGET_POOL_EIO:
2401 if (op->has_completion()) {
2402 op->complete(osdc_errc::pool_eio, -EIO);
2403 }
2404 return;
2405 }
7c673cae
FG
2406
2407 // Try to get a session, including a retry if we need to take write lock
20effc67 2408 r = _get_session(op->target.osd, &s, sul);
7c673cae 2409 if (r == -EAGAIN ||
11fdf7f2
TL
2410 (check_for_latest_map && sul.owns_lock_shared()) ||
2411 cct->_conf->objecter_debug_inject_relock_delay) {
7c673cae
FG
2412 epoch_t orig_epoch = osdmap->get_epoch();
2413 sul.unlock();
2414 if (cct->_conf->objecter_debug_inject_relock_delay) {
2415 sleep(1);
2416 }
2417 sul.lock();
2418 if (orig_epoch != osdmap->get_epoch()) {
2419 // map changed; recalculate mapping
2420 ldout(cct, 10) << __func__ << " relock raced with osdmap, recalc target"
2421 << dendl;
2422 check_for_latest_map = _calc_target(&op->target, nullptr)
2423 == RECALC_OP_TARGET_POOL_DNE;
2424 if (s) {
2425 put_session(s);
2426 s = NULL;
2427 r = -EAGAIN;
2428 }
2429 }
2430 }
2431 if (r == -EAGAIN) {
11fdf7f2 2432 ceph_assert(s == NULL);
7c673cae
FG
2433 r = _get_session(op->target.osd, &s, sul);
2434 }
11fdf7f2
TL
2435 ceph_assert(r == 0);
2436 ceph_assert(s); // may be homeless
7c673cae
FG
2437
2438 _send_op_account(op);
2439
2440 // send?
2441
11fdf7f2 2442 ceph_assert(op->target.flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE));
7c673cae 2443
7c673cae 2444 bool need_send = false;
9f95a23c
TL
2445 if (op->target.paused) {
2446 ldout(cct, 10) << " tid " << op->tid << " op " << op << " is paused"
7c673cae 2447 << dendl;
7c673cae
FG
2448 _maybe_request_map();
2449 } else if (!s->is_homeless()) {
2450 need_send = true;
2451 } else {
2452 _maybe_request_map();
2453 }
2454
f67539c2 2455 unique_lock sl(s->lock);
7c673cae 2456 if (op->tid == 0)
31f18b77 2457 op->tid = ++last_tid;
7c673cae
FG
2458
2459 ldout(cct, 10) << "_op_submit oid " << op->target.base_oid
2460 << " '" << op->target.base_oloc << "' '"
2461 << op->target.target_oloc << "' " << op->ops << " tid "
2462 << op->tid << " osd." << (!s->is_homeless() ? s->osd : -1)
2463 << dendl;
2464
2465 _session_op_assign(s, op);
2466
2467 if (need_send) {
11fdf7f2 2468 _send_op(op);
7c673cae
FG
2469 }
2470
2471 // Last chance to touch Op here, after giving up session lock it can
2472 // be freed at any time by response handler.
2473 ceph_tid_t tid = op->tid;
2474 if (check_for_latest_map) {
2475 _send_op_map_check(op);
2476 }
2477 if (ptid)
2478 *ptid = tid;
2479 op = NULL;
2480
2481 sl.unlock();
2482 put_session(s);
2483
31f18b77 2484 ldout(cct, 5) << num_in_flight << " in flight" << dendl;
7c673cae
FG
2485}
2486
2487int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r)
2488{
11fdf7f2 2489 ceph_assert(initialized);
7c673cae 2490
f67539c2 2491 unique_lock sl(s->lock);
7c673cae 2492
f67539c2 2493 auto p = s->ops.find(tid);
7c673cae
FG
2494 if (p == s->ops.end()) {
2495 ldout(cct, 10) << __func__ << " tid " << tid << " dne in session "
2496 << s->osd << dendl;
2497 return -ENOENT;
2498 }
2499
11fdf7f2 2500#if 0
7c673cae 2501 if (s->con) {
9f95a23c 2502 ldout(cct, 20) << " revoking rx ceph::buffer for " << tid
7c673cae
FG
2503 << " on " << s->con << dendl;
2504 s->con->revoke_rx_buffer(tid);
2505 }
11fdf7f2 2506#endif
7c673cae
FG
2507
2508 ldout(cct, 10) << __func__ << " tid " << tid << " in session " << s->osd
2509 << dendl;
2510 Op *op = p->second;
f67539c2 2511 if (op->has_completion()) {
31f18b77 2512 num_in_flight--;
f67539c2 2513 op->complete(osdcode(r), r);
7c673cae
FG
2514 }
2515 _op_cancel_map_check(op);
2516 _finish_op(op, r);
2517 sl.unlock();
2518
2519 return 0;
2520}
2521
2522int Objecter::op_cancel(ceph_tid_t tid, int r)
2523{
2524 int ret = 0;
2525
2526 unique_lock wl(rwlock);
2527 ret = _op_cancel(tid, r);
2528
2529 return ret;
2530}
2531
94b18763
FG
2532int Objecter::op_cancel(const vector<ceph_tid_t>& tids, int r)
2533{
2534 unique_lock wl(rwlock);
2535 ldout(cct,10) << __func__ << " " << tids << dendl;
2536 for (auto tid : tids) {
2537 _op_cancel(tid, r);
2538 }
2539 return 0;
2540}
2541
7c673cae
FG
2542int Objecter::_op_cancel(ceph_tid_t tid, int r)
2543{
2544 int ret = 0;
2545
2546 ldout(cct, 5) << __func__ << ": cancelling tid " << tid << " r=" << r
2547 << dendl;
2548
2549start:
2550
f67539c2 2551 for (auto siter = osd_sessions.begin();
7c673cae
FG
2552 siter != osd_sessions.end(); ++siter) {
2553 OSDSession *s = siter->second;
f67539c2 2554 shared_lock sl(s->lock);
7c673cae
FG
2555 if (s->ops.find(tid) != s->ops.end()) {
2556 sl.unlock();
2557 ret = op_cancel(s, tid, r);
2558 if (ret == -ENOENT) {
2559 /* oh no! raced, maybe tid moved to another session, restarting */
2560 goto start;
2561 }
2562 return ret;
2563 }
2564 }
2565
2566 ldout(cct, 5) << __func__ << ": tid " << tid
2567 << " not found in live sessions" << dendl;
2568
2569 // Handle case where the op is in homeless session
f67539c2 2570 shared_lock sl(homeless_session->lock);
7c673cae
FG
2571 if (homeless_session->ops.find(tid) != homeless_session->ops.end()) {
2572 sl.unlock();
2573 ret = op_cancel(homeless_session, tid, r);
2574 if (ret == -ENOENT) {
2575 /* oh no! raced, maybe tid moved to another session, restarting */
2576 goto start;
2577 } else {
2578 return ret;
2579 }
2580 } else {
2581 sl.unlock();
2582 }
2583
2584 ldout(cct, 5) << __func__ << ": tid " << tid
2585 << " not found in homeless session" << dendl;
2586
2587 return ret;
2588}
2589
2590
2591epoch_t Objecter::op_cancel_writes(int r, int64_t pool)
2592{
2593 unique_lock wl(rwlock);
2594
2595 std::vector<ceph_tid_t> to_cancel;
2596 bool found = false;
2597
f67539c2 2598 for (auto siter = osd_sessions.begin();
7c673cae
FG
2599 siter != osd_sessions.end(); ++siter) {
2600 OSDSession *s = siter->second;
f67539c2
TL
2601 shared_lock sl(s->lock);
2602 for (auto op_i = s->ops.begin();
7c673cae
FG
2603 op_i != s->ops.end(); ++op_i) {
2604 if (op_i->second->target.flags & CEPH_OSD_FLAG_WRITE
2605 && (pool == -1 || op_i->second->target.target_oloc.pool == pool)) {
2606 to_cancel.push_back(op_i->first);
2607 }
2608 }
2609 sl.unlock();
2610
f67539c2 2611 for (auto titer = to_cancel.begin(); titer != to_cancel.end(); ++titer) {
7c673cae
FG
2612 int cancel_result = op_cancel(s, *titer, r);
2613 // We hold rwlock across search and cancellation, so cancels
2614 // should always succeed
11fdf7f2 2615 ceph_assert(cancel_result == 0);
7c673cae
FG
2616 }
2617 if (!found && to_cancel.size())
2618 found = true;
2619 to_cancel.clear();
2620 }
2621
2622 const epoch_t epoch = osdmap->get_epoch();
2623
2624 wl.unlock();
2625
2626 if (found) {
2627 return epoch;
2628 } else {
2629 return -1;
2630 }
2631}
2632
2633bool Objecter::is_pg_changed(
2634 int oldprimary,
2635 const vector<int>& oldacting,
2636 int newprimary,
2637 const vector<int>& newacting,
2638 bool any_change)
2639{
9f95a23c 2640 if (OSDMap::primary_changed_broken( // https://tracker.ceph.com/issues/43213
7c673cae
FG
2641 oldprimary,
2642 oldacting,
2643 newprimary,
2644 newacting))
2645 return true;
2646 if (any_change && oldacting != newacting)
2647 return true;
2648 return false; // same primary (tho replicas may have changed)
2649}
2650
2651bool Objecter::target_should_be_paused(op_target_t *t)
2652{
2653 const pg_pool_t *pi = osdmap->get_pg_pool(t->base_oloc.pool);
2654 bool pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
2655 bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) ||
9f95a23c 2656 (t->respects_full() && (_osdmap_full_flag() || _osdmap_pool_full(*pi)));
7c673cae
FG
2657
2658 return (t->flags & CEPH_OSD_FLAG_READ && pauserd) ||
2659 (t->flags & CEPH_OSD_FLAG_WRITE && pausewr) ||
2660 (osdmap->get_epoch() < epoch_barrier);
2661}
2662
2663/**
2664 * Locking public accessor for _osdmap_full_flag
2665 */
2666bool Objecter::osdmap_full_flag() const
2667{
2668 shared_lock rl(rwlock);
2669
2670 return _osdmap_full_flag();
2671}
2672
2673bool Objecter::osdmap_pool_full(const int64_t pool_id) const
2674{
2675 shared_lock rl(rwlock);
2676
2677 if (_osdmap_full_flag()) {
2678 return true;
2679 }
2680
2681 return _osdmap_pool_full(pool_id);
2682}
2683
2684bool Objecter::_osdmap_pool_full(const int64_t pool_id) const
2685{
2686 const pg_pool_t *pool = osdmap->get_pg_pool(pool_id);
2687 if (pool == NULL) {
2688 ldout(cct, 4) << __func__ << ": DNE pool " << pool_id << dendl;
2689 return false;
2690 }
2691
2692 return _osdmap_pool_full(*pool);
2693}
2694
2695bool Objecter::_osdmap_has_pool_full() const
2696{
f67539c2 2697 for (auto it = osdmap->get_pools().begin();
7c673cae
FG
2698 it != osdmap->get_pools().end(); ++it) {
2699 if (_osdmap_pool_full(it->second))
2700 return true;
2701 }
2702 return false;
2703}
2704
7c673cae
FG
2705/**
2706 * Wrapper around osdmap->test_flag for special handling of the FULL flag.
2707 */
2708bool Objecter::_osdmap_full_flag() const
2709{
11fdf7f2 2710 // Ignore the FULL flag if the caller does not have honor_osdmap_full
9f95a23c 2711 return osdmap->test_flag(CEPH_OSDMAP_FULL) && honor_pool_full;
7c673cae
FG
2712}
2713
2714void Objecter::update_pool_full_map(map<int64_t, bool>& pool_full_map)
2715{
2716 for (map<int64_t, pg_pool_t>::const_iterator it
2717 = osdmap->get_pools().begin();
2718 it != osdmap->get_pools().end(); ++it) {
2719 if (pool_full_map.find(it->first) == pool_full_map.end()) {
2720 pool_full_map[it->first] = _osdmap_pool_full(it->second);
2721 } else {
2722 pool_full_map[it->first] = _osdmap_pool_full(it->second) ||
2723 pool_full_map[it->first];
2724 }
2725 }
2726}
2727
2728int64_t Objecter::get_object_hash_position(int64_t pool, const string& key,
2729 const string& ns)
2730{
2731 shared_lock rl(rwlock);
2732 const pg_pool_t *p = osdmap->get_pg_pool(pool);
2733 if (!p)
2734 return -ENOENT;
2735 return p->hash_key(key, ns);
2736}
2737
2738int64_t Objecter::get_object_pg_hash_position(int64_t pool, const string& key,
2739 const string& ns)
2740{
2741 shared_lock rl(rwlock);
2742 const pg_pool_t *p = osdmap->get_pg_pool(pool);
2743 if (!p)
2744 return -ENOENT;
2745 return p->raw_hash_to_pg(p->hash_key(key, ns));
2746}
2747
11fdf7f2
TL
2748void Objecter::_prune_snapc(
2749 const mempool::osdmap::map<int64_t,
9f95a23c 2750 snap_interval_set_t>& new_removed_snaps,
11fdf7f2
TL
2751 Op *op)
2752{
2753 bool match = false;
2754 auto i = new_removed_snaps.find(op->target.base_pgid.pool());
2755 if (i != new_removed_snaps.end()) {
2756 for (auto s : op->snapc.snaps) {
2757 if (i->second.contains(s)) {
2758 match = true;
2759 break;
2760 }
2761 }
2762 if (match) {
2763 vector<snapid_t> new_snaps;
2764 for (auto s : op->snapc.snaps) {
2765 if (!i->second.contains(s)) {
2766 new_snaps.push_back(s);
2767 }
2768 }
2769 op->snapc.snaps.swap(new_snaps);
2770 ldout(cct,10) << __func__ << " op " << op->tid << " snapc " << op->snapc
2771 << " (was " << new_snaps << ")" << dendl;
2772 }
2773 }
2774}
2775
7c673cae
FG
2776int Objecter::_calc_target(op_target_t *t, Connection *con, bool any_change)
2777{
2778 // rwlock is locked
2779 bool is_read = t->flags & CEPH_OSD_FLAG_READ;
2780 bool is_write = t->flags & CEPH_OSD_FLAG_WRITE;
2781 t->epoch = osdmap->get_epoch();
2782 ldout(cct,20) << __func__ << " epoch " << t->epoch
2783 << " base " << t->base_oid << " " << t->base_oloc
2784 << " precalc_pgid " << (int)t->precalc_pgid
2785 << " pgid " << t->base_pgid
2786 << (is_read ? " is_read" : "")
2787 << (is_write ? " is_write" : "")
2788 << dendl;
2789
2790 const pg_pool_t *pi = osdmap->get_pg_pool(t->base_oloc.pool);
2791 if (!pi) {
2792 t->osd = -1;
2793 return RECALC_OP_TARGET_POOL_DNE;
2794 }
20effc67
TL
2795
2796 if (pi->has_flag(pg_pool_t::FLAG_EIO)) {
2797 return RECALC_OP_TARGET_POOL_EIO;
2798 }
2799
7c673cae
FG
2800 ldout(cct,30) << __func__ << " base pi " << pi
2801 << " pg_num " << pi->get_pg_num() << dendl;
2802
2803 bool force_resend = false;
2804 if (osdmap->get_epoch() == pi->last_force_op_resend) {
2805 if (t->last_force_resend < pi->last_force_op_resend) {
2806 t->last_force_resend = pi->last_force_op_resend;
2807 force_resend = true;
2808 } else if (t->last_force_resend == 0) {
2809 force_resend = true;
2810 }
2811 }
2812
2813 // apply tiering
2814 t->target_oid = t->base_oid;
2815 t->target_oloc = t->base_oloc;
2816 if ((t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
2817 if (is_read && pi->has_read_tier())
2818 t->target_oloc.pool = pi->read_tier;
2819 if (is_write && pi->has_write_tier())
2820 t->target_oloc.pool = pi->write_tier;
2821 pi = osdmap->get_pg_pool(t->target_oloc.pool);
2822 if (!pi) {
2823 t->osd = -1;
2824 return RECALC_OP_TARGET_POOL_DNE;
2825 }
2826 }
2827
2828 pg_t pgid;
2829 if (t->precalc_pgid) {
11fdf7f2
TL
2830 ceph_assert(t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY);
2831 ceph_assert(t->base_oid.name.empty()); // make sure this is a pg op
2832 ceph_assert(t->base_oloc.pool == (int64_t)t->base_pgid.pool());
7c673cae
FG
2833 pgid = t->base_pgid;
2834 } else {
2835 int ret = osdmap->object_locator_to_pg(t->target_oid, t->target_oloc,
2836 pgid);
2837 if (ret == -ENOENT) {
2838 t->osd = -1;
2839 return RECALC_OP_TARGET_POOL_DNE;
2840 }
2841 }
2842 ldout(cct,20) << __func__ << " target " << t->target_oid << " "
2843 << t->target_oloc << " -> pgid " << pgid << dendl;
2844 ldout(cct,30) << __func__ << " target pi " << pi
2845 << " pg_num " << pi->get_pg_num() << dendl;
2846 t->pool_ever_existed = true;
2847
2848 int size = pi->size;
2849 int min_size = pi->min_size;
2850 unsigned pg_num = pi->get_pg_num();
9f95a23c 2851 unsigned pg_num_mask = pi->get_pg_num_mask();
11fdf7f2 2852 unsigned pg_num_pending = pi->get_pg_num_pending();
7c673cae
FG
2853 int up_primary, acting_primary;
2854 vector<int> up, acting;
9f95a23c
TL
2855 ps_t actual_ps = ceph_stable_mod(pgid.ps(), pg_num, pg_num_mask);
2856 pg_t actual_pgid(actual_ps, pgid.pool());
20effc67
TL
2857 if (!lookup_pg_mapping(actual_pgid, osdmap->get_epoch(), &up, &up_primary,
2858 &acting, &acting_primary)) {
9f95a23c
TL
2859 osdmap->pg_to_up_acting_osds(actual_pgid, &up, &up_primary,
2860 &acting, &acting_primary);
2861 pg_mapping_t pg_mapping(osdmap->get_epoch(),
2862 up, up_primary, acting, acting_primary);
2863 update_pg_mapping(actual_pgid, std::move(pg_mapping));
2864 }
7c673cae 2865 bool sort_bitwise = osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE);
c07f9fc5 2866 bool recovery_deletes = osdmap->test_flag(CEPH_OSDMAP_RECOVERY_DELETES);
7c673cae
FG
2867 unsigned prev_seed = ceph_stable_mod(pgid.ps(), t->pg_num, t->pg_num_mask);
2868 pg_t prev_pgid(prev_seed, pgid.pool());
2869 if (any_change && PastIntervals::is_new_interval(
2870 t->acting_primary,
2871 acting_primary,
2872 t->acting,
2873 acting,
2874 t->up_primary,
2875 up_primary,
2876 t->up,
2877 up,
2878 t->size,
2879 size,
2880 t->min_size,
2881 min_size,
2882 t->pg_num,
2883 pg_num,
11fdf7f2
TL
2884 t->pg_num_pending,
2885 pg_num_pending,
7c673cae
FG
2886 t->sort_bitwise,
2887 sort_bitwise,
c07f9fc5
FG
2888 t->recovery_deletes,
2889 recovery_deletes,
f67539c2
TL
2890 t->peering_crush_bucket_count,
2891 pi->peering_crush_bucket_count,
2892 t->peering_crush_bucket_target,
2893 pi->peering_crush_bucket_target,
2894 t->peering_crush_bucket_barrier,
2895 pi->peering_crush_bucket_barrier,
2896 t->peering_crush_mandatory_member,
2897 pi->peering_crush_mandatory_member,
7c673cae
FG
2898 prev_pgid)) {
2899 force_resend = true;
2900 }
2901
2902 bool unpaused = false;
f64942e4
AA
2903 bool should_be_paused = target_should_be_paused(t);
2904 if (t->paused && !should_be_paused) {
7c673cae
FG
2905 unpaused = true;
2906 }
9f95a23c
TL
2907 if (t->paused != should_be_paused) {
2908 ldout(cct, 10) << __func__ << " paused " << t->paused
2909 << " -> " << should_be_paused << dendl;
2910 t->paused = should_be_paused;
2911 }
7c673cae
FG
2912
2913 bool legacy_change =
2914 t->pgid != pgid ||
2915 is_pg_changed(
2916 t->acting_primary, t->acting, acting_primary, acting,
2917 t->used_replica || any_change);
11fdf7f2 2918 bool split_or_merge = false;
7c673cae 2919 if (t->pg_num) {
11fdf7f2
TL
2920 split_or_merge =
2921 prev_pgid.is_split(t->pg_num, pg_num, nullptr) ||
2922 prev_pgid.is_merge_source(t->pg_num, pg_num, nullptr) ||
2923 prev_pgid.is_merge_target(t->pg_num, pg_num);
7c673cae
FG
2924 }
2925
11fdf7f2 2926 if (legacy_change || split_or_merge || force_resend) {
7c673cae 2927 t->pgid = pgid;
20effc67 2928 t->acting = std::move(acting);
7c673cae
FG
2929 t->acting_primary = acting_primary;
2930 t->up_primary = up_primary;
20effc67 2931 t->up = std::move(up);
7c673cae
FG
2932 t->size = size;
2933 t->min_size = min_size;
2934 t->pg_num = pg_num;
9f95a23c 2935 t->pg_num_mask = pg_num_mask;
11fdf7f2 2936 t->pg_num_pending = pg_num_pending;
9f95a23c
TL
2937 spg_t spgid(actual_pgid);
2938 if (pi->is_erasure()) {
20effc67
TL
2939 for (uint8_t i = 0; i < t->acting.size(); ++i) {
2940 if (t->acting[i] == acting_primary) {
9f95a23c
TL
2941 spgid.reset_shard(shard_id_t(i));
2942 break;
2943 }
2944 }
2945 }
2946 t->actual_pgid = spgid;
7c673cae 2947 t->sort_bitwise = sort_bitwise;
c07f9fc5 2948 t->recovery_deletes = recovery_deletes;
f67539c2
TL
2949 t->peering_crush_bucket_count = pi->peering_crush_bucket_count;
2950 t->peering_crush_bucket_target = pi->peering_crush_bucket_target;
2951 t->peering_crush_bucket_barrier = pi->peering_crush_bucket_barrier;
2952 t->peering_crush_mandatory_member = pi->peering_crush_mandatory_member;
7c673cae
FG
2953 ldout(cct, 10) << __func__ << " "
2954 << " raw pgid " << pgid << " -> actual " << t->actual_pgid
20effc67 2955 << " acting " << t->acting
7c673cae
FG
2956 << " primary " << acting_primary << dendl;
2957 t->used_replica = false;
e306af50
TL
2958 if ((t->flags & (CEPH_OSD_FLAG_BALANCE_READS |
2959 CEPH_OSD_FLAG_LOCALIZE_READS)) &&
20effc67 2960 !is_write && pi->is_replicated() && t->acting.size() > 1) {
7c673cae 2961 int osd;
20effc67 2962 ceph_assert(is_read && t->acting[0] == acting_primary);
e306af50 2963 if (t->flags & CEPH_OSD_FLAG_BALANCE_READS) {
20effc67 2964 int p = rand() % t->acting.size();
7c673cae
FG
2965 if (p)
2966 t->used_replica = true;
20effc67
TL
2967 osd = t->acting[p];
2968 ldout(cct, 10) << " chose random osd." << osd << " of " << t->acting
7c673cae 2969 << dendl;
e306af50 2970 } else {
7c673cae
FG
2971 // look for a local replica. prefer the primary if the
2972 // distance is the same.
2973 int best = -1;
2974 int best_locality = 0;
20effc67 2975 for (unsigned i = 0; i < t->acting.size(); ++i) {
7c673cae 2976 int locality = osdmap->crush->get_common_ancestor_distance(
20effc67 2977 cct, t->acting[i], crush_location);
7c673cae 2978 ldout(cct, 20) << __func__ << " localize: rank " << i
20effc67 2979 << " osd." << t->acting[i]
7c673cae
FG
2980 << " locality " << locality << dendl;
2981 if (i == 0 ||
2982 (locality >= 0 && best_locality >= 0 &&
2983 locality < best_locality) ||
2984 (best_locality < 0 && locality >= 0)) {
2985 best = i;
2986 best_locality = locality;
2987 if (i)
2988 t->used_replica = true;
2989 }
2990 }
11fdf7f2 2991 ceph_assert(best >= 0);
20effc67 2992 osd = t->acting[best];
7c673cae
FG
2993 }
2994 t->osd = osd;
e306af50
TL
2995 } else {
2996 t->osd = acting_primary;
7c673cae
FG
2997 }
2998 }
2999 if (legacy_change || unpaused || force_resend) {
3000 return RECALC_OP_TARGET_NEED_RESEND;
3001 }
11fdf7f2 3002 if (split_or_merge &&
9f95a23c 3003 (osdmap->require_osd_release >= ceph_release_t::luminous ||
91327a77
AA
3004 HAVE_FEATURE(osdmap->get_xinfo(acting_primary).features,
3005 RESEND_ON_SPLIT))) {
7c673cae
FG
3006 return RECALC_OP_TARGET_NEED_RESEND;
3007 }
3008 return RECALC_OP_TARGET_NO_ACTION;
3009}
3010
3011int Objecter::_map_session(op_target_t *target, OSDSession **s,
f67539c2 3012 shunique_lock<ceph::shared_mutex>& sul)
7c673cae
FG
3013{
3014 _calc_target(target, nullptr);
3015 return _get_session(target->osd, s, sul);
3016}
3017
3018void Objecter::_session_op_assign(OSDSession *to, Op *op)
3019{
3020 // to->lock is locked
11fdf7f2
TL
3021 ceph_assert(op->session == NULL);
3022 ceph_assert(op->tid);
7c673cae
FG
3023
3024 get_session(to);
3025 op->session = to;
3026 to->ops[op->tid] = op;
3027
3028 if (to->is_homeless()) {
31f18b77 3029 num_homeless_ops++;
7c673cae
FG
3030 }
3031
3032 ldout(cct, 15) << __func__ << " " << to->osd << " " << op->tid << dendl;
3033}
3034
3035void Objecter::_session_op_remove(OSDSession *from, Op *op)
3036{
11fdf7f2 3037 ceph_assert(op->session == from);
7c673cae
FG
3038 // from->lock is locked
3039
3040 if (from->is_homeless()) {
31f18b77 3041 num_homeless_ops--;
7c673cae
FG
3042 }
3043
3044 from->ops.erase(op->tid);
3045 put_session(from);
3046 op->session = NULL;
3047
3048 ldout(cct, 15) << __func__ << " " << from->osd << " " << op->tid << dendl;
3049}
3050
3051void Objecter::_session_linger_op_assign(OSDSession *to, LingerOp *op)
3052{
3053 // to lock is locked unique
11fdf7f2 3054 ceph_assert(op->session == NULL);
7c673cae
FG
3055
3056 if (to->is_homeless()) {
31f18b77 3057 num_homeless_ops++;
7c673cae
FG
3058 }
3059
3060 get_session(to);
3061 op->session = to;
3062 to->linger_ops[op->linger_id] = op;
3063
3064 ldout(cct, 15) << __func__ << " " << to->osd << " " << op->linger_id
3065 << dendl;
3066}
3067
3068void Objecter::_session_linger_op_remove(OSDSession *from, LingerOp *op)
3069{
11fdf7f2 3070 ceph_assert(from == op->session);
7c673cae
FG
3071 // from->lock is locked unique
3072
3073 if (from->is_homeless()) {
31f18b77 3074 num_homeless_ops--;
7c673cae
FG
3075 }
3076
3077 from->linger_ops.erase(op->linger_id);
3078 put_session(from);
3079 op->session = NULL;
3080
3081 ldout(cct, 15) << __func__ << " " << from->osd << " " << op->linger_id
3082 << dendl;
3083}
3084
3085void Objecter::_session_command_op_remove(OSDSession *from, CommandOp *op)
3086{
11fdf7f2 3087 ceph_assert(from == op->session);
7c673cae
FG
3088 // from->lock is locked
3089
3090 if (from->is_homeless()) {
31f18b77 3091 num_homeless_ops--;
7c673cae
FG
3092 }
3093
3094 from->command_ops.erase(op->tid);
3095 put_session(from);
3096 op->session = NULL;
3097
3098 ldout(cct, 15) << __func__ << " " << from->osd << " " << op->tid << dendl;
3099}
3100
3101void Objecter::_session_command_op_assign(OSDSession *to, CommandOp *op)
3102{
3103 // to->lock is locked
11fdf7f2
TL
3104 ceph_assert(op->session == NULL);
3105 ceph_assert(op->tid);
7c673cae
FG
3106
3107 if (to->is_homeless()) {
31f18b77 3108 num_homeless_ops++;
7c673cae
FG
3109 }
3110
3111 get_session(to);
3112 op->session = to;
3113 to->command_ops[op->tid] = op;
3114
3115 ldout(cct, 15) << __func__ << " " << to->osd << " " << op->tid << dendl;
3116}
3117
3118int Objecter::_recalc_linger_op_target(LingerOp *linger_op,
f67539c2 3119 shunique_lock<ceph::shared_mutex>& sul)
7c673cae
FG
3120{
3121 // rwlock is locked unique
3122
3123 int r = _calc_target(&linger_op->target, nullptr, true);
3124 if (r == RECALC_OP_TARGET_NEED_RESEND) {
3125 ldout(cct, 10) << "recalc_linger_op_target tid " << linger_op->linger_id
3126 << " pgid " << linger_op->target.pgid
3127 << " acting " << linger_op->target.acting << dendl;
3128
3129 OSDSession *s = NULL;
3130 r = _get_session(linger_op->target.osd, &s, sul);
11fdf7f2 3131 ceph_assert(r == 0);
7c673cae
FG
3132
3133 if (linger_op->session != s) {
3134 // NB locking two sessions (s and linger_op->session) at the
3135 // same time here is only safe because we are the only one that
f67539c2
TL
3136 // takes two, and we are holding rwlock for write. We use
3137 // std::shared_mutex in OSDSession because lockdep doesn't know
3138 // that.
3139 unique_lock sl(s->lock);
7c673cae
FG
3140 _session_linger_op_remove(linger_op->session, linger_op);
3141 _session_linger_op_assign(s, linger_op);
3142 }
3143
3144 put_session(s);
3145 return RECALC_OP_TARGET_NEED_RESEND;
3146 }
3147 return r;
3148}
3149
3150void Objecter::_cancel_linger_op(Op *op)
3151{
3152 ldout(cct, 15) << "cancel_op " << op->tid << dendl;
3153
11fdf7f2 3154 ceph_assert(!op->should_resend);
f67539c2
TL
3155 if (op->has_completion()) {
3156 op->onfinish = nullptr;
31f18b77 3157 num_in_flight--;
7c673cae
FG
3158 }
3159
3160 _finish_op(op, 0);
3161}
3162
3163void Objecter::_finish_op(Op *op, int r)
3164{
11fdf7f2 3165 ldout(cct, 15) << __func__ << " " << op->tid << dendl;
7c673cae
FG
3166
3167 // op->session->lock is locked unique or op->session is null
3168
11fdf7f2
TL
3169 if (!op->ctx_budgeted && op->budget >= 0) {
3170 put_op_budget_bytes(op->budget);
3171 op->budget = -1;
3172 }
7c673cae
FG
3173
3174 if (op->ontimeout && r != -ETIMEDOUT)
3175 timer.cancel_event(op->ontimeout);
3176
3177 if (op->session) {
3178 _session_op_remove(op->session, op);
3179 }
3180
3181 logger->dec(l_osdc_op_active);
3182
11fdf7f2 3183 ceph_assert(check_latest_map_ops.find(op->tid) == check_latest_map_ops.end());
7c673cae 3184
31f18b77 3185 inflight_ops--;
7c673cae
FG
3186
3187 op->put();
3188}
3189
f67539c2 3190Objecter::MOSDOp *Objecter::_prepare_osd_op(Op *op)
7c673cae
FG
3191{
3192 // rwlock is locked
3193
3194 int flags = op->target.flags;
3195 flags |= CEPH_OSD_FLAG_KNOWN_REDIR;
20effc67 3196 flags |= CEPH_OSD_FLAG_SUPPORTSPOOLEIO;
7c673cae
FG
3197
3198 // Nothing checks this any longer, but needed for compatibility with
3199 // pre-luminous osds
3200 flags |= CEPH_OSD_FLAG_ONDISK;
3201
9f95a23c 3202 if (!honor_pool_full)
7c673cae
FG
3203 flags |= CEPH_OSD_FLAG_FULL_FORCE;
3204
3205 op->target.paused = false;
11fdf7f2 3206 op->stamp = ceph::coarse_mono_clock::now();
7c673cae
FG
3207
3208 hobject_t hobj = op->target.get_hobj();
f67539c2
TL
3209 auto m = new MOSDOp(client_inc, op->tid,
3210 hobj, op->target.actual_pgid,
3211 osdmap->get_epoch(),
3212 flags, op->features);
7c673cae
FG
3213
3214 m->set_snapid(op->snapid);
3215 m->set_snap_seq(op->snapc.seq);
3216 m->set_snaps(op->snapc.snaps);
3217
3218 m->ops = op->ops;
3219 m->set_mtime(op->mtime);
3220 m->set_retry_attempt(op->attempts++);
31f18b77
FG
3221
3222 if (!op->trace.valid() && cct->_conf->osdc_blkin_trace_all) {
3223 op->trace.init("op", &trace_endpoint);
3224 }
7c673cae
FG
3225
3226 if (op->priority)
3227 m->set_priority(op->priority);
3228 else
3229 m->set_priority(cct->_conf->osd_client_op_priority);
3230
3231 if (op->reqid != osd_reqid_t()) {
3232 m->set_reqid(op->reqid);
3233 }
3234
3235 logger->inc(l_osdc_op_send);
b32b8144
FG
3236 ssize_t sum = 0;
3237 for (unsigned i = 0; i < m->ops.size(); i++) {
3238 sum += m->ops[i].indata.length();
3239 }
3240 logger->inc(l_osdc_op_send_bytes, sum);
7c673cae
FG
3241
3242 return m;
3243}
3244
11fdf7f2 3245void Objecter::_send_op(Op *op)
7c673cae
FG
3246{
3247 // rwlock is locked
3248 // op->session->lock is locked
3249
3250 // backoff?
7c673cae
FG
3251 auto p = op->session->backoffs.find(op->target.actual_pgid);
3252 if (p != op->session->backoffs.end()) {
b32b8144 3253 hobject_t hoid = op->target.get_hobj();
7c673cae
FG
3254 auto q = p->second.lower_bound(hoid);
3255 if (q != p->second.begin()) {
3256 --q;
3257 if (hoid >= q->second.end) {
3258 ++q;
3259 }
3260 }
3261 if (q != p->second.end()) {
3262 ldout(cct, 20) << __func__ << " ? " << q->first << " [" << q->second.begin
3263 << "," << q->second.end << ")" << dendl;
3264 int r = cmp(hoid, q->second.begin);
3265 if (r == 0 || (r > 0 && hoid < q->second.end)) {
3266 ldout(cct, 10) << __func__ << " backoff " << op->target.actual_pgid
3267 << " id " << q->second.id << " on " << hoid
3268 << ", queuing " << op << " tid " << op->tid << dendl;
3269 return;
3270 }
3271 }
3272 }
3273
11fdf7f2
TL
3274 ceph_assert(op->tid > 0);
3275 MOSDOp *m = _prepare_osd_op(op);
7c673cae
FG
3276
3277 if (op->target.actual_pgid != m->get_spg()) {
3278 ldout(cct, 10) << __func__ << " " << op->tid << " pgid change from "
3279 << m->get_spg() << " to " << op->target.actual_pgid
3280 << ", updating and reencoding" << dendl;
3281 m->set_spg(op->target.actual_pgid);
3282 m->clear_payload(); // reencode
3283 }
3284
3285 ldout(cct, 15) << "_send_op " << op->tid << " to "
3286 << op->target.actual_pgid << " on osd." << op->session->osd
3287 << dendl;
3288
3289 ConnectionRef con = op->session->con;
11fdf7f2 3290 ceph_assert(con);
7c673cae 3291
11fdf7f2 3292#if 0
9f95a23c 3293 // preallocated rx ceph::buffer?
7c673cae 3294 if (op->con) {
9f95a23c 3295 ldout(cct, 20) << " revoking rx ceph::buffer for " << op->tid << " on "
7c673cae
FG
3296 << op->con << dendl;
3297 op->con->revoke_rx_buffer(op->tid);
3298 }
3299 if (op->outbl &&
3300 op->ontimeout == 0 && // only post rx_buffer if no timeout; see #9582
3301 op->outbl->length()) {
11fdf7f2 3302 op->outbl->invalidate_crc(); // messenger writes through c_str()
9f95a23c 3303 ldout(cct, 20) << " posting rx ceph::buffer for " << op->tid << " on " << con
7c673cae
FG
3304 << dendl;
3305 op->con = con;
3306 op->con->post_rx_buffer(op->tid, *op->outbl);
3307 }
11fdf7f2 3308#endif
7c673cae
FG
3309
3310 op->incarnation = op->session->incarnation;
3311
31f18b77
FG
3312 if (op->trace.valid()) {
3313 m->trace.init("op msg", nullptr, &op->trace);
3314 }
7c673cae
FG
3315 op->session->con->send_message(m);
3316}
3317
f67539c2 3318int Objecter::calc_op_budget(const bc::small_vector_base<OSDOp>& ops)
7c673cae
FG
3319{
3320 int op_budget = 0;
f67539c2 3321 for (auto i = ops.begin(); i != ops.end(); ++i) {
7c673cae
FG
3322 if (i->op.op & CEPH_OSD_OP_MODE_WR) {
3323 op_budget += i->indata.length();
3324 } else if (ceph_osd_op_mode_read(i->op.op)) {
a8e16298
TL
3325 if (ceph_osd_op_uses_extent(i->op.op)) {
3326 if ((int64_t)i->op.extent.length > 0)
3327 op_budget += (int64_t)i->op.extent.length;
7c673cae 3328 } else if (ceph_osd_op_type_attr(i->op.op)) {
a8e16298 3329 op_budget += i->op.xattr.name_len + i->op.xattr.value_len;
7c673cae
FG
3330 }
3331 }
3332 }
3333 return op_budget;
3334}
3335
3336void Objecter::_throttle_op(Op *op,
f67539c2 3337 shunique_lock<ceph::shared_mutex>& sul,
7c673cae
FG
3338 int op_budget)
3339{
11fdf7f2 3340 ceph_assert(sul && sul.mutex() == &rwlock);
7c673cae
FG
3341 bool locked_for_write = sul.owns_lock();
3342
3343 if (!op_budget)
11fdf7f2 3344 op_budget = calc_op_budget(op->ops);
7c673cae
FG
3345 if (!op_throttle_bytes.get_or_fail(op_budget)) { //couldn't take right now
3346 sul.unlock();
3347 op_throttle_bytes.get(op_budget);
3348 if (locked_for_write)
3349 sul.lock();
3350 else
3351 sul.lock_shared();
3352 }
3353 if (!op_throttle_ops.get_or_fail(1)) { //couldn't take right now
3354 sul.unlock();
3355 op_throttle_ops.get(1);
3356 if (locked_for_write)
3357 sul.lock();
3358 else
3359 sul.lock_shared();
3360 }
3361}
3362
11fdf7f2 3363int Objecter::take_linger_budget(LingerOp *info)
7c673cae 3364{
11fdf7f2 3365 return 1;
7c673cae
FG
3366}
3367
3368/* This function DOES put the passed message before returning */
3369void Objecter::handle_osd_op_reply(MOSDOpReply *m)
3370{
3371 ldout(cct, 10) << "in handle_osd_op_reply" << dendl;
3372
3373 // get pio
3374 ceph_tid_t tid = m->get_tid();
3375
3376 shunique_lock sul(rwlock, ceph::acquire_shared);
31f18b77 3377 if (!initialized) {
7c673cae
FG
3378 m->put();
3379 return;
3380 }
3381
3382 ConnectionRef con = m->get_connection();
11fdf7f2
TL
3383 auto priv = con->get_priv();
3384 auto s = static_cast<OSDSession*>(priv.get());
7c673cae
FG
3385 if (!s || s->con != con) {
3386 ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
7c673cae
FG
3387 m->put();
3388 return;
3389 }
3390
f67539c2 3391 unique_lock sl(s->lock);
7c673cae
FG
3392
3393 map<ceph_tid_t, Op *>::iterator iter = s->ops.find(tid);
3394 if (iter == s->ops.end()) {
3395 ldout(cct, 7) << "handle_osd_op_reply " << tid
3396 << (m->is_ondisk() ? " ondisk" : (m->is_onnvram() ?
3397 " onnvram" : " ack"))
3398 << " ... stray" << dendl;
3399 sl.unlock();
7c673cae
FG
3400 m->put();
3401 return;
3402 }
3403
3404 ldout(cct, 7) << "handle_osd_op_reply " << tid
3405 << (m->is_ondisk() ? " ondisk" :
3406 (m->is_onnvram() ? " onnvram" : " ack"))
3407 << " uv " << m->get_user_version()
3408 << " in " << m->get_pg()
3409 << " attempt " << m->get_retry_attempt()
3410 << dendl;
3411 Op *op = iter->second;
31f18b77 3412 op->trace.event("osd op reply");
7c673cae
FG
3413
3414 if (retry_writes_after_first_reply && op->attempts == 1 &&
3415 (op->target.flags & CEPH_OSD_FLAG_WRITE)) {
3416 ldout(cct, 7) << "retrying write after first reply: " << tid << dendl;
f67539c2 3417 if (op->has_completion()) {
31f18b77 3418 num_in_flight--;
7c673cae
FG
3419 }
3420 _session_op_remove(s, op);
3421 sl.unlock();
7c673cae
FG
3422
3423 _op_submit(op, sul, NULL);
3424 m->put();
3425 return;
3426 }
3427
3428 if (m->get_retry_attempt() >= 0) {
3429 if (m->get_retry_attempt() != (op->attempts - 1)) {
3430 ldout(cct, 7) << " ignoring reply from attempt "
3431 << m->get_retry_attempt()
3432 << " from " << m->get_source_inst()
3433 << "; last attempt " << (op->attempts - 1) << " sent to "
3434 << op->session->con->get_peer_addr() << dendl;
3435 m->put();
3436 sl.unlock();
7c673cae
FG
3437 return;
3438 }
3439 } else {
3440 // we don't know the request attempt because the server is old, so
3441 // just accept this one. we may do ACK callbacks we shouldn't
3442 // have, but that is better than doing callbacks out of order.
3443 }
3444
f67539c2 3445 decltype(op->onfinish) onfinish;
7c673cae
FG
3446
3447 int rc = m->get_result();
3448
3449 if (m->is_redirect_reply()) {
3450 ldout(cct, 5) << " got redirect reply; redirecting" << dendl;
f67539c2 3451 if (op->has_completion())
31f18b77 3452 num_in_flight--;
7c673cae
FG
3453 _session_op_remove(s, op);
3454 sl.unlock();
7c673cae
FG
3455
3456 // FIXME: two redirects could race and reorder
3457
3458 op->tid = 0;
3459 m->get_redirect().combine_with_locator(op->target.target_oloc,
3460 op->target.target_oid.name);
f64942e4
AA
3461 op->target.flags |= (CEPH_OSD_FLAG_REDIRECTED |
3462 CEPH_OSD_FLAG_IGNORE_CACHE |
3463 CEPH_OSD_FLAG_IGNORE_OVERLAY);
7c673cae
FG
3464 _op_submit(op, sul, NULL);
3465 m->put();
3466 return;
3467 }
3468
3469 if (rc == -EAGAIN) {
3470 ldout(cct, 7) << " got -EAGAIN, resubmitting" << dendl;
f67539c2 3471 if (op->has_completion())
c07f9fc5
FG
3472 num_in_flight--;
3473 _session_op_remove(s, op);
7c673cae 3474 sl.unlock();
c07f9fc5
FG
3475
3476 op->tid = 0;
3477 op->target.flags &= ~(CEPH_OSD_FLAG_BALANCE_READS |
3478 CEPH_OSD_FLAG_LOCALIZE_READS);
3479 op->target.pgid = pg_t();
3480 _op_submit(op, sul, NULL);
7c673cae
FG
3481 m->put();
3482 return;
3483 }
3484
3485 sul.unlock();
3486
3487 if (op->objver)
3488 *op->objver = m->get_user_version();
3489 if (op->reply_epoch)
3490 *op->reply_epoch = m->get_map_epoch();
3491 if (op->data_offset)
3492 *op->data_offset = m->get_header().data_off;
3493
3494 // got data?
3495 if (op->outbl) {
11fdf7f2 3496#if 0
7c673cae
FG
3497 if (op->con)
3498 op->con->revoke_rx_buffer(op->tid);
11fdf7f2
TL
3499#endif
3500 auto& bl = m->get_data();
3501 if (op->outbl->length() == bl.length() &&
3502 bl.get_num_buffers() <= 1) {
3503 // this is here to keep previous users to *relied* on getting data
3504 // read into existing buffers happy. Notably,
3505 // libradosstriper::RadosStriperImpl::aio_read().
3506 ldout(cct,10) << __func__ << " copying resulting " << bl.length()
9f95a23c 3507 << " into existing ceph::buffer of length " << op->outbl->length()
11fdf7f2 3508 << dendl;
f67539c2
TL
3509 cb::list t;
3510 t = std::move(*op->outbl);
11fdf7f2 3511 t.invalidate_crc(); // we're overwriting the raw buffers via c_str()
9f95a23c 3512 bl.begin().copy(bl.length(), t.c_str());
11fdf7f2
TL
3513 op->outbl->substr_of(t, 0, bl.length());
3514 } else {
3515 m->claim_data(*op->outbl);
3516 }
7c673cae
FG
3517 op->outbl = 0;
3518 }
3519
3520 // per-op result demuxing
3521 vector<OSDOp> out_ops;
3522 m->claim_ops(out_ops);
3523
3524 if (out_ops.size() != op->ops.size())
3525 ldout(cct, 0) << "WARNING: tid " << op->tid << " reply ops " << out_ops
3526 << " != request ops " << op->ops
3527 << " from " << m->get_source_inst() << dendl;
3528
f67539c2
TL
3529 ceph_assert(op->ops.size() == op->out_bl.size());
3530 ceph_assert(op->ops.size() == op->out_rval.size());
3531 ceph_assert(op->ops.size() == op->out_ec.size());
3532 ceph_assert(op->ops.size() == op->out_handler.size());
3533 auto pb = op->out_bl.begin();
3534 auto pr = op->out_rval.begin();
3535 auto pe = op->out_ec.begin();
3536 auto ph = op->out_handler.begin();
11fdf7f2
TL
3537 ceph_assert(op->out_bl.size() == op->out_rval.size());
3538 ceph_assert(op->out_bl.size() == op->out_handler.size());
f67539c2 3539 auto p = out_ops.begin();
7c673cae
FG
3540 for (unsigned i = 0;
3541 p != out_ops.end() && pb != op->out_bl.end();
f67539c2 3542 ++i, ++p, ++pb, ++pr, ++pe, ++ph) {
7c673cae
FG
3543 ldout(cct, 10) << " op " << i << " rval " << p->rval
3544 << " len " << p->outdata.length() << dendl;
3545 if (*pb)
3546 **pb = p->outdata;
3547 // set rval before running handlers so that handlers
3548 // can change it if e.g. decoding fails
3549 if (*pr)
31f18b77 3550 **pr = ceph_to_hostos_errno(p->rval);
f67539c2
TL
3551 if (*pe)
3552 **pe = p->rval < 0 ? bs::error_code(-p->rval, osd_category()) :
3553 bs::error_code();
7c673cae 3554 if (*ph) {
f67539c2
TL
3555 std::move((*ph))(p->rval < 0 ?
3556 bs::error_code(-p->rval, osd_category()) :
3557 bs::error_code(),
3558 p->rval, p->outdata);
7c673cae
FG
3559 }
3560 }
3561
3562 // NOTE: we assume that since we only request ONDISK ever we will
3563 // only ever get back one (type of) ack ever.
3564
f67539c2 3565 if (op->has_completion()) {
31f18b77 3566 num_in_flight--;
f67539c2
TL
3567 onfinish = std::move(op->onfinish);
3568 op->onfinish = nullptr;
7c673cae
FG
3569 }
3570 logger->inc(l_osdc_op_reply);
1e59de90
TL
3571 logger->tinc(l_osdc_op_latency, ceph::coarse_mono_time::clock::now() - op->stamp);
3572 logger->set(l_osdc_op_inflight, num_in_flight);
7c673cae
FG
3573
3574 /* get it before we call _finish_op() */
3575 auto completion_lock = s->get_lock(op->target.base_oid);
3576
3577 ldout(cct, 15) << "handle_osd_op_reply completed tid " << tid << dendl;
3578 _finish_op(op, 0);
3579
31f18b77 3580 ldout(cct, 5) << num_in_flight << " in flight" << dendl;
7c673cae
FG
3581
3582 // serialize completions
3583 if (completion_lock.mutex()) {
3584 completion_lock.lock();
3585 }
3586 sl.unlock();
3587
3588 // do callbacks
f67539c2
TL
3589 if (Op::has_completion(onfinish)) {
3590 Op::complete(std::move(onfinish), osdcode(rc), rc);
7c673cae
FG
3591 }
3592 if (completion_lock.mutex()) {
3593 completion_lock.unlock();
3594 }
3595
3596 m->put();
7c673cae
FG
3597}
3598
3599void Objecter::handle_osd_backoff(MOSDBackoff *m)
3600{
3601 ldout(cct, 10) << __func__ << " " << *m << dendl;
3602 shunique_lock sul(rwlock, ceph::acquire_shared);
31f18b77 3603 if (!initialized) {
7c673cae
FG
3604 m->put();
3605 return;
3606 }
3607
3608 ConnectionRef con = m->get_connection();
11fdf7f2
TL
3609 auto priv = con->get_priv();
3610 auto s = static_cast<OSDSession*>(priv.get());
7c673cae
FG
3611 if (!s || s->con != con) {
3612 ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
7c673cae
FG
3613 m->put();
3614 return;
3615 }
3616
3617 get_session(s);
7c673cae 3618
f67539c2 3619 unique_lock sl(s->lock);
7c673cae
FG
3620
3621 switch (m->op) {
3622 case CEPH_OSD_BACKOFF_OP_BLOCK:
3623 {
3624 // register
3625 OSDBackoff& b = s->backoffs[m->pgid][m->begin];
3626 s->backoffs_by_id.insert(make_pair(m->id, &b));
3627 b.pgid = m->pgid;
3628 b.id = m->id;
3629 b.begin = m->begin;
3630 b.end = m->end;
3631
3632 // ack with original backoff's epoch so that the osd can discard this if
3633 // there was a pg split.
f67539c2
TL
3634 auto r = new MOSDBackoff(m->pgid, m->map_epoch,
3635 CEPH_OSD_BACKOFF_OP_ACK_BLOCK,
3636 m->id, m->begin, m->end);
7c673cae
FG
3637 // this priority must match the MOSDOps from _prepare_osd_op
3638 r->set_priority(cct->_conf->osd_client_op_priority);
3639 con->send_message(r);
3640 }
3641 break;
3642
3643 case CEPH_OSD_BACKOFF_OP_UNBLOCK:
3644 {
3645 auto p = s->backoffs_by_id.find(m->id);
3646 if (p != s->backoffs_by_id.end()) {
3647 OSDBackoff *b = p->second;
3648 if (b->begin != m->begin &&
3649 b->end != m->end) {
3650 lderr(cct) << __func__ << " got " << m->pgid << " id " << m->id
3651 << " unblock on ["
3652 << m->begin << "," << m->end << ") but backoff is ["
3653 << b->begin << "," << b->end << ")" << dendl;
3654 // hrmpf, unblock it anyway.
3655 }
3656 ldout(cct, 10) << __func__ << " unblock backoff " << b->pgid
3657 << " id " << b->id
3658 << " [" << b->begin << "," << b->end
3659 << ")" << dendl;
3660 auto spgp = s->backoffs.find(b->pgid);
11fdf7f2 3661 ceph_assert(spgp != s->backoffs.end());
7c673cae
FG
3662 spgp->second.erase(b->begin);
3663 if (spgp->second.empty()) {
3664 s->backoffs.erase(spgp);
3665 }
3666 s->backoffs_by_id.erase(p);
3667
3668 // check for any ops to resend
3669 for (auto& q : s->ops) {
3670 if (q.second->target.actual_pgid == m->pgid) {
3671 int r = q.second->target.contained_by(m->begin, m->end);
3672 ldout(cct, 20) << __func__ << " contained_by " << r << " on "
3673 << q.second->target.get_hobj() << dendl;
3674 if (r) {
3675 _send_op(q.second);
3676 }
3677 }
3678 }
3679 } else {
3680 lderr(cct) << __func__ << " " << m->pgid << " id " << m->id
3681 << " unblock on ["
3682 << m->begin << "," << m->end << ") but backoff dne" << dendl;
3683 }
3684 }
3685 break;
3686
3687 default:
3688 ldout(cct, 10) << __func__ << " unrecognized op " << (int)m->op << dendl;
3689 }
3690
3691 sul.unlock();
3692 sl.unlock();
3693
3694 m->put();
3695 put_session(s);
3696}
3697
3698uint32_t Objecter::list_nobjects_seek(NListContext *list_context,
3699 uint32_t pos)
3700{
3701 shared_lock rl(rwlock);
3702 list_context->pos = hobject_t(object_t(), string(), CEPH_NOSNAP,
3703 pos, list_context->pool_id, string());
11fdf7f2 3704 ldout(cct, 10) << __func__ << " " << list_context
7c673cae
FG
3705 << " pos " << pos << " -> " << list_context->pos << dendl;
3706 pg_t actual = osdmap->raw_pg_to_pg(pg_t(pos, list_context->pool_id));
3707 list_context->current_pg = actual.ps();
3708 list_context->at_end_of_pool = false;
3709 return pos;
3710}
3711
3712uint32_t Objecter::list_nobjects_seek(NListContext *list_context,
3713 const hobject_t& cursor)
3714{
3715 shared_lock rl(rwlock);
3716 ldout(cct, 10) << "list_nobjects_seek " << list_context << dendl;
3717 list_context->pos = cursor;
3718 list_context->at_end_of_pool = false;
3719 pg_t actual = osdmap->raw_pg_to_pg(pg_t(cursor.get_hash(), list_context->pool_id));
3720 list_context->current_pg = actual.ps();
3721 list_context->sort_bitwise = true;
3722 return list_context->current_pg;
3723}
3724
3725void Objecter::list_nobjects_get_cursor(NListContext *list_context,
3726 hobject_t *cursor)
3727{
3728 shared_lock rl(rwlock);
3729 if (list_context->list.empty()) {
3730 *cursor = list_context->pos;
3731 } else {
3732 const librados::ListObjectImpl& entry = list_context->list.front();
3733 const string *key = (entry.locator.empty() ? &entry.oid : &entry.locator);
3734 uint32_t h = osdmap->get_pg_pool(list_context->pool_id)->hash_key(*key, entry.nspace);
3735 *cursor = hobject_t(entry.oid, entry.locator, list_context->pool_snap_seq, h, list_context->pool_id, entry.nspace);
3736 }
3737}
3738
3739void Objecter::list_nobjects(NListContext *list_context, Context *onfinish)
3740{
3741 ldout(cct, 10) << __func__ << " pool_id " << list_context->pool_id
3742 << " pool_snap_seq " << list_context->pool_snap_seq
3743 << " max_entries " << list_context->max_entries
3744 << " list_context " << list_context
3745 << " onfinish " << onfinish
3746 << " current_pg " << list_context->current_pg
3747 << " pos " << list_context->pos << dendl;
3748
3749 shared_lock rl(rwlock);
3750 const pg_pool_t *pool = osdmap->get_pg_pool(list_context->pool_id);
3751 if (!pool) { // pool is gone
3752 rl.unlock();
3753 put_nlist_context_budget(list_context);
3754 onfinish->complete(-ENOENT);
3755 return;
3756 }
3757 int pg_num = pool->get_pg_num();
3758 bool sort_bitwise = osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE);
3759
3760 if (list_context->pos.is_min()) {
3761 list_context->starting_pg_num = 0;
3762 list_context->sort_bitwise = sort_bitwise;
3763 list_context->starting_pg_num = pg_num;
3764 }
3765 if (list_context->sort_bitwise != sort_bitwise) {
3766 list_context->pos = hobject_t(
3767 object_t(), string(), CEPH_NOSNAP,
3768 list_context->current_pg, list_context->pool_id, string());
3769 list_context->sort_bitwise = sort_bitwise;
3770 ldout(cct, 10) << " hobject sort order changed, restarting this pg at "
3771 << list_context->pos << dendl;
3772 }
3773 if (list_context->starting_pg_num != pg_num) {
3774 if (!sort_bitwise) {
3775 // start reading from the beginning; the pgs have changed
3776 ldout(cct, 10) << " pg_num changed; restarting with " << pg_num << dendl;
3777 list_context->pos = collection_list_handle_t();
3778 }
3779 list_context->starting_pg_num = pg_num;
3780 }
3781
3782 if (list_context->pos.is_max()) {
3783 ldout(cct, 20) << __func__ << " end of pool, list "
3784 << list_context->list << dendl;
3785 if (list_context->list.empty()) {
3786 list_context->at_end_of_pool = true;
3787 }
3788 // release the listing context's budget once all
3789 // OPs (in the session) are finished
3790 put_nlist_context_budget(list_context);
3791 onfinish->complete(0);
3792 return;
3793 }
3794
3795 ObjectOperation op;
3796 op.pg_nls(list_context->max_entries, list_context->filter,
3797 list_context->pos, osdmap->get_epoch());
3798 list_context->bl.clear();
f67539c2 3799 auto onack = new C_NList(list_context, onfinish, this);
7c673cae
FG
3800 object_locator_t oloc(list_context->pool_id, list_context->nspace);
3801
3802 // note current_pg in case we don't have (or lose) SORTBITWISE
3803 list_context->current_pg = pool->raw_hash_to_pg(list_context->pos.get_hash());
3804 rl.unlock();
3805
3806 pg_read(list_context->current_pg, oloc, op,
3807 &list_context->bl, 0, onack, &onack->epoch,
3808 &list_context->ctx_budget);
3809}
3810
3811void Objecter::_nlist_reply(NListContext *list_context, int r,
3812 Context *final_finish, epoch_t reply_epoch)
3813{
3814 ldout(cct, 10) << __func__ << " " << list_context << dendl;
3815
11fdf7f2 3816 auto iter = list_context->bl.cbegin();
7c673cae 3817 pg_nls_response_t response;
11fdf7f2 3818 decode(response, iter);
7c673cae 3819 if (!iter.end()) {
9f95a23c 3820 // we do this as legacy.
f67539c2 3821 cb::list legacy_extra_info;
9f95a23c 3822 decode(legacy_extra_info, iter);
7c673cae
FG
3823 }
3824
3825 // if the osd returns 1 (newer code), or handle MAX, it means we
3826 // hit the end of the pg.
3827 if ((response.handle.is_max() || r == 1) &&
3828 !list_context->sort_bitwise) {
3829 // legacy OSD and !sortbitwise, figure out the next PG on our own
3830 ++list_context->current_pg;
3831 if (list_context->current_pg == list_context->starting_pg_num) {
3832 // end of pool
3833 list_context->pos = hobject_t::get_max();
3834 } else {
3835 // next pg
3836 list_context->pos = hobject_t(object_t(), string(), CEPH_NOSNAP,
3837 list_context->current_pg,
3838 list_context->pool_id, string());
3839 }
3840 } else {
3841 list_context->pos = response.handle;
3842 }
3843
3844 int response_size = response.entries.size();
3845 ldout(cct, 20) << " response.entries.size " << response_size
3846 << ", response.entries " << response.entries
3847 << ", handle " << response.handle
3848 << ", tentative new pos " << list_context->pos << dendl;
7c673cae 3849 if (response_size) {
f67539c2
TL
3850 std::move(response.entries.begin(), response.entries.end(),
3851 std::back_inserter(list_context->list));
3852 response.entries.clear();
7c673cae
FG
3853 }
3854
3855 if (list_context->list.size() >= list_context->max_entries) {
3856 ldout(cct, 20) << " hit max, returning results so far, "
3857 << list_context->list << dendl;
3858 // release the listing context's budget once all
3859 // OPs (in the session) are finished
3860 put_nlist_context_budget(list_context);
3861 final_finish->complete(0);
3862 return;
3863 }
3864
3865 // continue!
3866 list_nobjects(list_context, final_finish);
3867}
3868
3869void Objecter::put_nlist_context_budget(NListContext *list_context)
3870{
3871 if (list_context->ctx_budget >= 0) {
3872 ldout(cct, 10) << " release listing context's budget " <<
3873 list_context->ctx_budget << dendl;
3874 put_op_budget_bytes(list_context->ctx_budget);
3875 list_context->ctx_budget = -1;
3876 }
3877}
3878
3879// snapshots
3880
f67539c2
TL
3881void Objecter::create_pool_snap(int64_t pool, std::string_view snap_name,
3882 decltype(PoolOp::onfinish)&& onfinish)
7c673cae
FG
3883{
3884 unique_lock wl(rwlock);
3885 ldout(cct, 10) << "create_pool_snap; pool: " << pool << "; snap: "
3886 << snap_name << dendl;
3887
3888 const pg_pool_t *p = osdmap->get_pg_pool(pool);
f67539c2
TL
3889 if (!p) {
3890 onfinish->defer(std::move(onfinish), osdc_errc::pool_dne, cb::list{});
3891 return;
3892 }
3893 if (p->snap_exists(snap_name)) {
3894 onfinish->defer(std::move(onfinish), osdc_errc::snapshot_exists,
3895 cb::list{});
3896 return;
3897 }
7c673cae 3898
f67539c2 3899 auto op = new PoolOp;
31f18b77 3900 op->tid = ++last_tid;
7c673cae
FG
3901 op->pool = pool;
3902 op->name = snap_name;
f67539c2 3903 op->onfinish = std::move(onfinish);
7c673cae
FG
3904 op->pool_op = POOL_OP_CREATE_SNAP;
3905 pool_ops[op->tid] = op;
3906
3907 pool_op_submit(op);
7c673cae
FG
3908}
3909
f67539c2
TL
3910struct CB_SelfmanagedSnap {
3911 std::unique_ptr<ca::Completion<void(bs::error_code, snapid_t)>> fin;
3912 CB_SelfmanagedSnap(decltype(fin)&& fin)
3913 : fin(std::move(fin)) {}
3914 void operator()(bs::error_code ec, const cb::list& bl) {
3915 snapid_t snapid = 0;
3916 if (!ec) {
11fdf7f2 3917 try {
f67539c2
TL
3918 auto p = bl.cbegin();
3919 decode(snapid, p);
3920 } catch (const cb::error& e) {
3921 ec = e.code();
11fdf7f2 3922 }
7c673cae 3923 }
f67539c2 3924 fin->defer(std::move(fin), ec, snapid);
7c673cae
FG
3925 }
3926};
3927
f67539c2
TL
3928void Objecter::allocate_selfmanaged_snap(
3929 int64_t pool,
3930 std::unique_ptr<ca::Completion<void(bs::error_code, snapid_t)>> onfinish)
7c673cae
FG
3931{
3932 unique_lock wl(rwlock);
3933 ldout(cct, 10) << "allocate_selfmanaged_snap; pool: " << pool << dendl;
f67539c2 3934 auto op = new PoolOp;
31f18b77 3935 op->tid = ++last_tid;
7c673cae 3936 op->pool = pool;
f67539c2
TL
3937 op->onfinish = PoolOp::OpComp::create(
3938 service.get_executor(),
3939 CB_SelfmanagedSnap(std::move(onfinish)));
7c673cae
FG
3940 op->pool_op = POOL_OP_CREATE_UNMANAGED_SNAP;
3941 pool_ops[op->tid] = op;
3942
3943 pool_op_submit(op);
7c673cae
FG
3944}
3945
f67539c2
TL
3946void Objecter::delete_pool_snap(
3947 int64_t pool, std::string_view snap_name,
3948 decltype(PoolOp::onfinish)&& onfinish)
7c673cae
FG
3949{
3950 unique_lock wl(rwlock);
3951 ldout(cct, 10) << "delete_pool_snap; pool: " << pool << "; snap: "
3952 << snap_name << dendl;
3953
3954 const pg_pool_t *p = osdmap->get_pg_pool(pool);
f67539c2
TL
3955 if (!p) {
3956 onfinish->defer(std::move(onfinish), osdc_errc::pool_dne, cb::list{});
3957 return;
3958 }
7c673cae 3959
f67539c2
TL
3960 if (!p->snap_exists(snap_name)) {
3961 onfinish->defer(std::move(onfinish), osdc_errc::snapshot_dne, cb::list{});
3962 return;
3963 }
3964
3965 auto op = new PoolOp;
31f18b77 3966 op->tid = ++last_tid;
7c673cae
FG
3967 op->pool = pool;
3968 op->name = snap_name;
f67539c2 3969 op->onfinish = std::move(onfinish);
7c673cae
FG
3970 op->pool_op = POOL_OP_DELETE_SNAP;
3971 pool_ops[op->tid] = op;
3972
3973 pool_op_submit(op);
7c673cae
FG
3974}
3975
f67539c2
TL
3976void Objecter::delete_selfmanaged_snap(int64_t pool, snapid_t snap,
3977 decltype(PoolOp::onfinish)&& onfinish)
7c673cae
FG
3978{
3979 unique_lock wl(rwlock);
3980 ldout(cct, 10) << "delete_selfmanaged_snap; pool: " << pool << "; snap: "
3981 << snap << dendl;
f67539c2 3982 auto op = new PoolOp;
31f18b77 3983 op->tid = ++last_tid;
7c673cae 3984 op->pool = pool;
f67539c2 3985 op->onfinish = std::move(onfinish);
7c673cae
FG
3986 op->pool_op = POOL_OP_DELETE_UNMANAGED_SNAP;
3987 op->snapid = snap;
3988 pool_ops[op->tid] = op;
3989
3990 pool_op_submit(op);
7c673cae
FG
3991}
3992
f67539c2
TL
3993void Objecter::create_pool(std::string_view name,
3994 decltype(PoolOp::onfinish)&& onfinish,
3995 int crush_rule)
7c673cae
FG
3996{
3997 unique_lock wl(rwlock);
3998 ldout(cct, 10) << "create_pool name=" << name << dendl;
3999
f67539c2
TL
4000 if (osdmap->lookup_pg_pool_name(name) >= 0) {
4001 onfinish->defer(std::move(onfinish), osdc_errc::pool_exists, cb::list{});
4002 return;
4003 }
7c673cae 4004
f67539c2 4005 auto op = new PoolOp;
31f18b77 4006 op->tid = ++last_tid;
7c673cae
FG
4007 op->pool = 0;
4008 op->name = name;
f67539c2 4009 op->onfinish = std::move(onfinish);
7c673cae
FG
4010 op->pool_op = POOL_OP_CREATE;
4011 pool_ops[op->tid] = op;
7c673cae
FG
4012 op->crush_rule = crush_rule;
4013
4014 pool_op_submit(op);
7c673cae
FG
4015}
4016
f67539c2
TL
4017void Objecter::delete_pool(int64_t pool,
4018 decltype(PoolOp::onfinish)&& onfinish)
7c673cae
FG
4019{
4020 unique_lock wl(rwlock);
4021 ldout(cct, 10) << "delete_pool " << pool << dendl;
4022
4023 if (!osdmap->have_pg_pool(pool))
f67539c2
TL
4024 onfinish->defer(std::move(onfinish), osdc_errc::pool_dne, cb::list{});
4025 else
4026 _do_delete_pool(pool, std::move(onfinish));
7c673cae
FG
4027}
4028
f67539c2
TL
4029void Objecter::delete_pool(std::string_view pool_name,
4030 decltype(PoolOp::onfinish)&& onfinish)
7c673cae
FG
4031{
4032 unique_lock wl(rwlock);
4033 ldout(cct, 10) << "delete_pool " << pool_name << dendl;
4034
4035 int64_t pool = osdmap->lookup_pg_pool_name(pool_name);
4036 if (pool < 0)
f67539c2
TL
4037 // This only returns one error: -ENOENT.
4038 onfinish->defer(std::move(onfinish), osdc_errc::pool_dne, cb::list{});
4039 else
4040 _do_delete_pool(pool, std::move(onfinish));
7c673cae
FG
4041}
4042
f67539c2
TL
4043void Objecter::_do_delete_pool(int64_t pool,
4044 decltype(PoolOp::onfinish)&& onfinish)
4045
7c673cae 4046{
f67539c2 4047 auto op = new PoolOp;
31f18b77 4048 op->tid = ++last_tid;
7c673cae
FG
4049 op->pool = pool;
4050 op->name = "delete";
f67539c2 4051 op->onfinish = std::move(onfinish);
7c673cae
FG
4052 op->pool_op = POOL_OP_DELETE;
4053 pool_ops[op->tid] = op;
4054 pool_op_submit(op);
4055}
4056
7c673cae
FG
4057void Objecter::pool_op_submit(PoolOp *op)
4058{
4059 // rwlock is locked
4060 if (mon_timeout > timespan(0)) {
4061 op->ontimeout = timer.add_event(mon_timeout,
4062 [this, op]() {
4063 pool_op_cancel(op->tid, -ETIMEDOUT); });
4064 }
4065 _pool_op_submit(op);
4066}
4067
4068void Objecter::_pool_op_submit(PoolOp *op)
4069{
4070 // rwlock is locked unique
4071
4072 ldout(cct, 10) << "pool_op_submit " << op->tid << dendl;
f67539c2
TL
4073 auto m = new MPoolOp(monc->get_fsid(), op->tid, op->pool,
4074 op->name, op->pool_op,
4075 last_seen_osdmap_version);
7c673cae
FG
4076 if (op->snapid) m->snapid = op->snapid;
4077 if (op->crush_rule) m->crush_rule = op->crush_rule;
4078 monc->send_mon_message(m);
11fdf7f2 4079 op->last_submit = ceph::coarse_mono_clock::now();
7c673cae
FG
4080
4081 logger->inc(l_osdc_poolop_send);
4082}
4083
4084/**
4085 * Handle a reply to a PoolOp message. Check that we sent the message
f67539c2 4086 * and give the caller responsibility for the returned cb::list.
7c673cae
FG
4087 * Then either call the finisher or stash the PoolOp, depending on if we
4088 * have a new enough map.
4089 * Lastly, clean up the message and PoolOp.
4090 */
4091void Objecter::handle_pool_op_reply(MPoolOpReply *m)
4092{
f67539c2
TL
4093 int rc = m->replyCode;
4094 auto ec = rc < 0 ? bs::error_code(-rc, mon_category()) : bs::error_code();
11fdf7f2 4095 FUNCTRACE(cct);
7c673cae 4096 shunique_lock sul(rwlock, acquire_shared);
31f18b77 4097 if (!initialized) {
7c673cae
FG
4098 sul.unlock();
4099 m->put();
4100 return;
4101 }
4102
4103 ldout(cct, 10) << "handle_pool_op_reply " << *m << dendl;
4104 ceph_tid_t tid = m->get_tid();
f67539c2 4105 auto iter = pool_ops.find(tid);
7c673cae
FG
4106 if (iter != pool_ops.end()) {
4107 PoolOp *op = iter->second;
4108 ldout(cct, 10) << "have request " << tid << " at " << op << " Op: "
4109 << ceph_pool_op_name(op->pool_op) << dendl;
f67539c2 4110 cb::list bl{std::move(m->response_data)};
7c673cae
FG
4111 if (m->version > last_seen_osdmap_version)
4112 last_seen_osdmap_version = m->version;
4113 if (osdmap->get_epoch() < m->epoch) {
4114 sul.unlock();
4115 sul.lock();
4116 // recheck op existence since we have let go of rwlock
4117 // (for promotion) above.
4118 iter = pool_ops.find(tid);
4119 if (iter == pool_ops.end())
4120 goto done; // op is gone.
4121 if (osdmap->get_epoch() < m->epoch) {
4122 ldout(cct, 20) << "waiting for client to reach epoch " << m->epoch
4123 << " before calling back" << dendl;
f67539c2
TL
4124 _wait_for_new_map(OpCompletion::create(
4125 service.get_executor(),
4126 [o = std::move(op->onfinish),
4127 bl = std::move(bl)](
4128 bs::error_code ec) mutable {
4129 o->defer(std::move(o), ec, bl);
4130 }),
4131 m->epoch,
4132 ec);
7c673cae
FG
4133 } else {
4134 // map epoch changed, probably because a MOSDMap message
4135 // sneaked in. Do caller-specified callback now or else
4136 // we lose it forever.
11fdf7f2 4137 ceph_assert(op->onfinish);
f67539c2 4138 op->onfinish->defer(std::move(op->onfinish), ec, std::move(bl));
7c673cae
FG
4139 }
4140 } else {
11fdf7f2 4141 ceph_assert(op->onfinish);
f67539c2 4142 op->onfinish->defer(std::move(op->onfinish), ec, std::move(bl));
7c673cae 4143 }
f67539c2 4144 op->onfinish = nullptr;
7c673cae
FG
4145 if (!sul.owns_lock()) {
4146 sul.unlock();
4147 sul.lock();
4148 }
4149 iter = pool_ops.find(tid);
4150 if (iter != pool_ops.end()) {
4151 _finish_pool_op(op, 0);
4152 }
4153 } else {
4154 ldout(cct, 10) << "unknown request " << tid << dendl;
4155 }
4156
4157done:
4158 // Not strictly necessary, since we'll release it on return.
4159 sul.unlock();
4160
4161 ldout(cct, 10) << "done" << dendl;
4162 m->put();
4163}
4164
4165int Objecter::pool_op_cancel(ceph_tid_t tid, int r)
4166{
11fdf7f2 4167 ceph_assert(initialized);
7c673cae
FG
4168
4169 unique_lock wl(rwlock);
4170
f67539c2 4171 auto it = pool_ops.find(tid);
7c673cae
FG
4172 if (it == pool_ops.end()) {
4173 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4174 return -ENOENT;
4175 }
4176
4177 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4178
4179 PoolOp *op = it->second;
4180 if (op->onfinish)
f67539c2 4181 op->onfinish->defer(std::move(op->onfinish), osdcode(r), cb::list{});
7c673cae
FG
4182
4183 _finish_pool_op(op, r);
4184 return 0;
4185}
4186
4187void Objecter::_finish_pool_op(PoolOp *op, int r)
4188{
4189 // rwlock is locked unique
4190 pool_ops.erase(op->tid);
4191 logger->set(l_osdc_poolop_active, pool_ops.size());
4192
4193 if (op->ontimeout && r != -ETIMEDOUT) {
4194 timer.cancel_event(op->ontimeout);
4195 }
4196
4197 delete op;
4198}
4199
4200// pool stats
4201
f67539c2
TL
4202void Objecter::get_pool_stats(
4203 const std::vector<std::string>& pools,
4204 decltype(PoolStatOp::onfinish)&& onfinish)
7c673cae
FG
4205{
4206 ldout(cct, 10) << "get_pool_stats " << pools << dendl;
4207
f67539c2 4208 auto op = new PoolStatOp;
31f18b77 4209 op->tid = ++last_tid;
7c673cae 4210 op->pools = pools;
f67539c2 4211 op->onfinish = std::move(onfinish);
7c673cae
FG
4212 if (mon_timeout > timespan(0)) {
4213 op->ontimeout = timer.add_event(mon_timeout,
4214 [this, op]() {
4215 pool_stat_op_cancel(op->tid,
4216 -ETIMEDOUT); });
4217 } else {
4218 op->ontimeout = 0;
4219 }
4220
4221 unique_lock wl(rwlock);
4222
4223 poolstat_ops[op->tid] = op;
4224
4225 logger->set(l_osdc_poolstat_active, poolstat_ops.size());
4226
4227 _poolstat_submit(op);
4228}
4229
4230void Objecter::_poolstat_submit(PoolStatOp *op)
4231{
4232 ldout(cct, 10) << "_poolstat_submit " << op->tid << dendl;
4233 monc->send_mon_message(new MGetPoolStats(monc->get_fsid(), op->tid,
4234 op->pools,
4235 last_seen_pgmap_version));
11fdf7f2 4236 op->last_submit = ceph::coarse_mono_clock::now();
7c673cae
FG
4237
4238 logger->inc(l_osdc_poolstat_send);
4239}
4240
4241void Objecter::handle_get_pool_stats_reply(MGetPoolStatsReply *m)
4242{
4243 ldout(cct, 10) << "handle_get_pool_stats_reply " << *m << dendl;
4244 ceph_tid_t tid = m->get_tid();
4245
4246 unique_lock wl(rwlock);
31f18b77 4247 if (!initialized) {
7c673cae
FG
4248 m->put();
4249 return;
4250 }
4251
f67539c2 4252 auto iter = poolstat_ops.find(tid);
7c673cae
FG
4253 if (iter != poolstat_ops.end()) {
4254 PoolStatOp *op = poolstat_ops[tid];
4255 ldout(cct, 10) << "have request " << tid << " at " << op << dendl;
7c673cae
FG
4256 if (m->version > last_seen_pgmap_version) {
4257 last_seen_pgmap_version = m->version;
4258 }
f67539c2
TL
4259 op->onfinish->defer(std::move(op->onfinish), bs::error_code{},
4260 std::move(m->pool_stats), m->per_pool);
7c673cae
FG
4261 _finish_pool_stat_op(op, 0);
4262 } else {
4263 ldout(cct, 10) << "unknown request " << tid << dendl;
4264 }
4265 ldout(cct, 10) << "done" << dendl;
4266 m->put();
4267}
4268
4269int Objecter::pool_stat_op_cancel(ceph_tid_t tid, int r)
4270{
11fdf7f2 4271 ceph_assert(initialized);
7c673cae
FG
4272
4273 unique_lock wl(rwlock);
4274
f67539c2 4275 auto it = poolstat_ops.find(tid);
7c673cae
FG
4276 if (it == poolstat_ops.end()) {
4277 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4278 return -ENOENT;
4279 }
4280
4281 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4282
f67539c2 4283 auto op = it->second;
7c673cae 4284 if (op->onfinish)
f67539c2
TL
4285 op->onfinish->defer(std::move(op->onfinish), osdcode(r),
4286 bc::flat_map<std::string, pool_stat_t>{}, false);
7c673cae
FG
4287 _finish_pool_stat_op(op, r);
4288 return 0;
4289}
4290
4291void Objecter::_finish_pool_stat_op(PoolStatOp *op, int r)
4292{
4293 // rwlock is locked unique
4294
4295 poolstat_ops.erase(op->tid);
4296 logger->set(l_osdc_poolstat_active, poolstat_ops.size());
4297
4298 if (op->ontimeout && r != -ETIMEDOUT)
4299 timer.cancel_event(op->ontimeout);
4300
4301 delete op;
4302}
4303
20effc67 4304void Objecter::get_fs_stats(std::optional<int64_t> poolid,
f67539c2 4305 decltype(StatfsOp::onfinish)&& onfinish)
7c673cae
FG
4306{
4307 ldout(cct, 10) << "get_fs_stats" << dendl;
4308 unique_lock l(rwlock);
4309
f67539c2 4310 auto op = new StatfsOp;
31f18b77 4311 op->tid = ++last_tid;
f67539c2
TL
4312 op->data_pool = poolid;
4313 op->onfinish = std::move(onfinish);
7c673cae
FG
4314 if (mon_timeout > timespan(0)) {
4315 op->ontimeout = timer.add_event(mon_timeout,
4316 [this, op]() {
4317 statfs_op_cancel(op->tid,
4318 -ETIMEDOUT); });
4319 } else {
4320 op->ontimeout = 0;
4321 }
4322 statfs_ops[op->tid] = op;
4323
4324 logger->set(l_osdc_statfs_active, statfs_ops.size());
4325
4326 _fs_stats_submit(op);
4327}
4328
4329void Objecter::_fs_stats_submit(StatfsOp *op)
4330{
4331 // rwlock is locked unique
4332
4333 ldout(cct, 10) << "fs_stats_submit" << op->tid << dendl;
4334 monc->send_mon_message(new MStatfs(monc->get_fsid(), op->tid,
d2e6a577 4335 op->data_pool,
7c673cae 4336 last_seen_pgmap_version));
11fdf7f2 4337 op->last_submit = ceph::coarse_mono_clock::now();
7c673cae
FG
4338
4339 logger->inc(l_osdc_statfs_send);
4340}
4341
4342void Objecter::handle_fs_stats_reply(MStatfsReply *m)
4343{
4344 unique_lock wl(rwlock);
31f18b77 4345 if (!initialized) {
7c673cae
FG
4346 m->put();
4347 return;
4348 }
4349
4350 ldout(cct, 10) << "handle_fs_stats_reply " << *m << dendl;
4351 ceph_tid_t tid = m->get_tid();
4352
4353 if (statfs_ops.count(tid)) {
4354 StatfsOp *op = statfs_ops[tid];
4355 ldout(cct, 10) << "have request " << tid << " at " << op << dendl;
7c673cae
FG
4356 if (m->h.version > last_seen_pgmap_version)
4357 last_seen_pgmap_version = m->h.version;
f67539c2 4358 op->onfinish->defer(std::move(op->onfinish), bs::error_code{}, m->h.st);
7c673cae
FG
4359 _finish_statfs_op(op, 0);
4360 } else {
4361 ldout(cct, 10) << "unknown request " << tid << dendl;
4362 }
4363 m->put();
4364 ldout(cct, 10) << "done" << dendl;
4365}
4366
4367int Objecter::statfs_op_cancel(ceph_tid_t tid, int r)
4368{
11fdf7f2 4369 ceph_assert(initialized);
7c673cae
FG
4370
4371 unique_lock wl(rwlock);
4372
f67539c2 4373 auto it = statfs_ops.find(tid);
7c673cae
FG
4374 if (it == statfs_ops.end()) {
4375 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4376 return -ENOENT;
4377 }
4378
4379 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4380
f67539c2 4381 auto op = it->second;
7c673cae 4382 if (op->onfinish)
f67539c2 4383 op->onfinish->defer(std::move(op->onfinish), osdcode(r), ceph_statfs{});
7c673cae
FG
4384 _finish_statfs_op(op, r);
4385 return 0;
4386}
4387
4388void Objecter::_finish_statfs_op(StatfsOp *op, int r)
4389{
4390 // rwlock is locked unique
4391
4392 statfs_ops.erase(op->tid);
4393 logger->set(l_osdc_statfs_active, statfs_ops.size());
4394
4395 if (op->ontimeout && r != -ETIMEDOUT)
4396 timer.cancel_event(op->ontimeout);
4397
4398 delete op;
4399}
4400
4401// scatter/gather
4402
4403void Objecter::_sg_read_finish(vector<ObjectExtent>& extents,
f67539c2
TL
4404 vector<cb::list>& resultbl,
4405 cb::list *bl, Context *onfinish)
7c673cae
FG
4406{
4407 // all done
4408 ldout(cct, 15) << "_sg_read_finish" << dendl;
4409
4410 if (extents.size() > 1) {
4411 Striper::StripedReadResult r;
f67539c2
TL
4412 auto bit = resultbl.begin();
4413 for (auto eit = extents.begin();
7c673cae
FG
4414 eit != extents.end();
4415 ++eit, ++bit) {
4416 r.add_partial_result(cct, *bit, eit->buffer_extents);
4417 }
4418 bl->clear();
4419 r.assemble_result(cct, *bl, false);
4420 } else {
4421 ldout(cct, 15) << " only one frag" << dendl;
f67539c2 4422 *bl = std::move(resultbl[0]);
7c673cae
FG
4423 }
4424
4425 // done
4426 uint64_t bytes_read = bl->length();
4427 ldout(cct, 7) << "_sg_read_finish " << bytes_read << " bytes" << dendl;
4428
4429 if (onfinish) {
4430 onfinish->complete(bytes_read);// > 0 ? bytes_read:m->get_result());
4431 }
4432}
4433
4434
4435void Objecter::ms_handle_connect(Connection *con)
4436{
4437 ldout(cct, 10) << "ms_handle_connect " << con << dendl;
31f18b77 4438 if (!initialized)
7c673cae
FG
4439 return;
4440
4441 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON)
4442 resend_mon_ops();
4443}
4444
4445bool Objecter::ms_handle_reset(Connection *con)
4446{
31f18b77 4447 if (!initialized)
7c673cae
FG
4448 return false;
4449 if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
91327a77 4450 unique_lock wl(rwlock);
11fdf7f2
TL
4451
4452 auto priv = con->get_priv();
4453 auto session = static_cast<OSDSession*>(priv.get());
7c673cae
FG
4454 if (session) {
4455 ldout(cct, 1) << "ms_handle_reset " << con << " session " << session
4456 << " osd." << session->osd << dendl;
a8e16298
TL
4457 // the session maybe had been closed if new osdmap just handled
4458 // says the osd down
4459 if (!(initialized && osdmap->is_up(session->osd))) {
4460 ldout(cct, 1) << "ms_handle_reset aborted,initialized=" << initialized << dendl;
7c673cae
FG
4461 wl.unlock();
4462 return false;
4463 }
4464 map<uint64_t, LingerOp *> lresend;
f67539c2 4465 unique_lock sl(session->lock);
7c673cae
FG
4466 _reopen_session(session);
4467 _kick_requests(session, lresend);
4468 sl.unlock();
4469 _linger_ops_resend(lresend, wl);
4470 wl.unlock();
4471 maybe_request_map();
7c673cae
FG
4472 }
4473 return true;
4474 }
4475 return false;
4476}
4477
4478void Objecter::ms_handle_remote_reset(Connection *con)
4479{
4480 /*
4481 * treat these the same.
4482 */
4483 ms_handle_reset(con);
4484}
4485
4486bool Objecter::ms_handle_refused(Connection *con)
4487{
4488 // just log for now
4489 if (osdmap && (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD)) {
4490 int osd = osdmap->identify_osd(con->get_peer_addr());
4491 if (osd >= 0) {
4492 ldout(cct, 1) << "ms_handle_refused on osd." << osd << dendl;
4493 }
4494 }
4495 return false;
4496}
4497
7c673cae
FG
4498void Objecter::op_target_t::dump(Formatter *f) const
4499{
4500 f->dump_stream("pg") << pgid;
4501 f->dump_int("osd", osd);
4502 f->dump_stream("object_id") << base_oid;
4503 f->dump_stream("object_locator") << base_oloc;
4504 f->dump_stream("target_object_id") << target_oid;
4505 f->dump_stream("target_object_locator") << target_oloc;
4506 f->dump_int("paused", (int)paused);
4507 f->dump_int("used_replica", (int)used_replica);
4508 f->dump_int("precalc_pgid", (int)precalc_pgid);
4509}
4510
4511void Objecter::_dump_active(OSDSession *s)
4512{
f67539c2 4513 for (auto p = s->ops.begin(); p != s->ops.end(); ++p) {
7c673cae
FG
4514 Op *op = p->second;
4515 ldout(cct, 20) << op->tid << "\t" << op->target.pgid
4516 << "\tosd." << (op->session ? op->session->osd : -1)
4517 << "\t" << op->target.base_oid
4518 << "\t" << op->ops << dendl;
4519 }
4520}
4521
4522void Objecter::_dump_active()
4523{
31f18b77 4524 ldout(cct, 20) << "dump_active .. " << num_homeless_ops << " homeless"
7c673cae 4525 << dendl;
f67539c2 4526 for (auto siter = osd_sessions.begin();
7c673cae 4527 siter != osd_sessions.end(); ++siter) {
f67539c2
TL
4528 auto s = siter->second;
4529 shared_lock sl(s->lock);
7c673cae
FG
4530 _dump_active(s);
4531 sl.unlock();
4532 }
4533 _dump_active(homeless_session);
4534}
4535
4536void Objecter::dump_active()
4537{
4538 shared_lock rl(rwlock);
4539 _dump_active();
4540 rl.unlock();
4541}
4542
4543void Objecter::dump_requests(Formatter *fmt)
4544{
4545 // Read-lock on Objecter held here
4546 fmt->open_object_section("requests");
4547 dump_ops(fmt);
4548 dump_linger_ops(fmt);
4549 dump_pool_ops(fmt);
4550 dump_pool_stat_ops(fmt);
4551 dump_statfs_ops(fmt);
4552 dump_command_ops(fmt);
4553 fmt->close_section(); // requests object
4554}
4555
4556void Objecter::_dump_ops(const OSDSession *s, Formatter *fmt)
4557{
f67539c2 4558 for (auto p = s->ops.begin(); p != s->ops.end(); ++p) {
7c673cae 4559 Op *op = p->second;
f67539c2 4560 auto age = std::chrono::duration<double>(ceph::coarse_mono_clock::now() - op->stamp);
7c673cae
FG
4561 fmt->open_object_section("op");
4562 fmt->dump_unsigned("tid", op->tid);
4563 op->target.dump(fmt);
4564 fmt->dump_stream("last_sent") << op->stamp;
92f5a8d4 4565 fmt->dump_float("age", age.count());
7c673cae
FG
4566 fmt->dump_int("attempts", op->attempts);
4567 fmt->dump_stream("snapid") << op->snapid;
4568 fmt->dump_stream("snap_context") << op->snapc;
4569 fmt->dump_stream("mtime") << op->mtime;
4570
4571 fmt->open_array_section("osd_ops");
f67539c2 4572 for (auto it = op->ops.begin(); it != op->ops.end(); ++it) {
7c673cae
FG
4573 fmt->dump_stream("osd_op") << *it;
4574 }
4575 fmt->close_section(); // osd_ops array
4576
4577 fmt->close_section(); // op object
4578 }
4579}
4580
4581void Objecter::dump_ops(Formatter *fmt)
4582{
4583 // Read-lock on Objecter held
4584 fmt->open_array_section("ops");
f67539c2 4585 for (auto siter = osd_sessions.begin();
7c673cae
FG
4586 siter != osd_sessions.end(); ++siter) {
4587 OSDSession *s = siter->second;
f67539c2 4588 shared_lock sl(s->lock);
7c673cae
FG
4589 _dump_ops(s, fmt);
4590 sl.unlock();
4591 }
4592 _dump_ops(homeless_session, fmt);
4593 fmt->close_section(); // ops array
4594}
4595
4596void Objecter::_dump_linger_ops(const OSDSession *s, Formatter *fmt)
4597{
f67539c2
TL
4598 for (auto p = s->linger_ops.begin(); p != s->linger_ops.end(); ++p) {
4599 auto op = p->second;
7c673cae
FG
4600 fmt->open_object_section("linger_op");
4601 fmt->dump_unsigned("linger_id", op->linger_id);
4602 op->target.dump(fmt);
4603 fmt->dump_stream("snapid") << op->snap;
4604 fmt->dump_stream("registered") << op->registered;
4605 fmt->close_section(); // linger_op object
4606 }
4607}
4608
4609void Objecter::dump_linger_ops(Formatter *fmt)
4610{
4611 // We have a read-lock on the objecter
4612 fmt->open_array_section("linger_ops");
f67539c2 4613 for (auto siter = osd_sessions.begin();
7c673cae 4614 siter != osd_sessions.end(); ++siter) {
f67539c2
TL
4615 auto s = siter->second;
4616 shared_lock sl(s->lock);
7c673cae
FG
4617 _dump_linger_ops(s, fmt);
4618 sl.unlock();
4619 }
4620 _dump_linger_ops(homeless_session, fmt);
4621 fmt->close_section(); // linger_ops array
4622}
4623
4624void Objecter::_dump_command_ops(const OSDSession *s, Formatter *fmt)
4625{
f67539c2
TL
4626 for (auto p = s->command_ops.begin(); p != s->command_ops.end(); ++p) {
4627 auto op = p->second;
7c673cae
FG
4628 fmt->open_object_section("command_op");
4629 fmt->dump_unsigned("command_id", op->tid);
4630 fmt->dump_int("osd", op->session ? op->session->osd : -1);
4631 fmt->open_array_section("command");
f67539c2 4632 for (auto q = op->cmd.begin(); q != op->cmd.end(); ++q)
7c673cae
FG
4633 fmt->dump_string("word", *q);
4634 fmt->close_section();
4635 if (op->target_osd >= 0)
4636 fmt->dump_int("target_osd", op->target_osd);
4637 else
4638 fmt->dump_stream("target_pg") << op->target_pg;
4639 fmt->close_section(); // command_op object
4640 }
4641}
4642
4643void Objecter::dump_command_ops(Formatter *fmt)
4644{
4645 // We have a read-lock on the Objecter here
4646 fmt->open_array_section("command_ops");
f67539c2 4647 for (auto siter = osd_sessions.begin();
7c673cae 4648 siter != osd_sessions.end(); ++siter) {
f67539c2
TL
4649 auto s = siter->second;
4650 shared_lock sl(s->lock);
7c673cae
FG
4651 _dump_command_ops(s, fmt);
4652 sl.unlock();
4653 }
4654 _dump_command_ops(homeless_session, fmt);
4655 fmt->close_section(); // command_ops array
4656}
4657
4658void Objecter::dump_pool_ops(Formatter *fmt) const
4659{
4660 fmt->open_array_section("pool_ops");
f67539c2
TL
4661 for (auto p = pool_ops.begin(); p != pool_ops.end(); ++p) {
4662 auto op = p->second;
7c673cae
FG
4663 fmt->open_object_section("pool_op");
4664 fmt->dump_unsigned("tid", op->tid);
4665 fmt->dump_int("pool", op->pool);
4666 fmt->dump_string("name", op->name);
4667 fmt->dump_int("operation_type", op->pool_op);
7c673cae
FG
4668 fmt->dump_unsigned("crush_rule", op->crush_rule);
4669 fmt->dump_stream("snapid") << op->snapid;
4670 fmt->dump_stream("last_sent") << op->last_submit;
4671 fmt->close_section(); // pool_op object
4672 }
4673 fmt->close_section(); // pool_ops array
4674}
4675
4676void Objecter::dump_pool_stat_ops(Formatter *fmt) const
4677{
4678 fmt->open_array_section("pool_stat_ops");
f67539c2 4679 for (auto p = poolstat_ops.begin();
7c673cae
FG
4680 p != poolstat_ops.end();
4681 ++p) {
4682 PoolStatOp *op = p->second;
4683 fmt->open_object_section("pool_stat_op");
4684 fmt->dump_unsigned("tid", op->tid);
4685 fmt->dump_stream("last_sent") << op->last_submit;
4686
4687 fmt->open_array_section("pools");
f67539c2
TL
4688 for (const auto& it : op->pools) {
4689 fmt->dump_string("pool", it);
7c673cae
FG
4690 }
4691 fmt->close_section(); // pools array
4692
4693 fmt->close_section(); // pool_stat_op object
4694 }
4695 fmt->close_section(); // pool_stat_ops array
4696}
4697
4698void Objecter::dump_statfs_ops(Formatter *fmt) const
4699{
4700 fmt->open_array_section("statfs_ops");
f67539c2
TL
4701 for (auto p = statfs_ops.begin(); p != statfs_ops.end(); ++p) {
4702 auto op = p->second;
7c673cae
FG
4703 fmt->open_object_section("statfs_op");
4704 fmt->dump_unsigned("tid", op->tid);
4705 fmt->dump_stream("last_sent") << op->last_submit;
4706 fmt->close_section(); // statfs_op object
4707 }
4708 fmt->close_section(); // statfs_ops array
4709}
4710
4711Objecter::RequestStateHook::RequestStateHook(Objecter *objecter) :
4712 m_objecter(objecter)
4713{
4714}
4715
9f95a23c
TL
4716int Objecter::RequestStateHook::call(std::string_view command,
4717 const cmdmap_t& cmdmap,
39ae355f 4718 const bufferlist&,
9f95a23c
TL
4719 Formatter *f,
4720 std::ostream& ss,
f67539c2 4721 cb::list& out)
7c673cae 4722{
7c673cae
FG
4723 shared_lock rl(m_objecter->rwlock);
4724 m_objecter->dump_requests(f);
9f95a23c 4725 return 0;
7c673cae
FG
4726}
4727
f67539c2 4728void Objecter::blocklist_self(bool set)
7c673cae 4729{
f67539c2 4730 ldout(cct, 10) << "blocklist_self " << (set ? "add" : "rm") << dendl;
7c673cae
FG
4731
4732 vector<string> cmd;
f67539c2 4733 cmd.push_back("{\"prefix\":\"osd blocklist\", ");
7c673cae 4734 if (set)
f67539c2 4735 cmd.push_back("\"blocklistop\":\"add\",");
7c673cae 4736 else
f67539c2 4737 cmd.push_back("\"blocklistop\":\"rm\",");
7c673cae 4738 stringstream ss;
f67539c2 4739 // this is somewhat imprecise in that we are blocklisting our first addr only
11fdf7f2 4740 ss << messenger->get_myaddrs().front().get_legacy_str();
7c673cae
FG
4741 cmd.push_back("\"addr\":\"" + ss.str() + "\"");
4742
f67539c2 4743 auto m = new MMonCommand(monc->get_fsid());
7c673cae
FG
4744 m->cmd = cmd;
4745
f67539c2
TL
4746 // NOTE: no fallback to legacy blacklist command implemented here
4747 // since this is only used for test code.
4748
7c673cae
FG
4749 monc->send_mon_message(m);
4750}
4751
4752// commands
4753
4754void Objecter::handle_command_reply(MCommandReply *m)
4755{
4756 unique_lock wl(rwlock);
31f18b77 4757 if (!initialized) {
7c673cae
FG
4758 m->put();
4759 return;
4760 }
4761
4762 ConnectionRef con = m->get_connection();
11fdf7f2
TL
4763 auto priv = con->get_priv();
4764 auto s = static_cast<OSDSession*>(priv.get());
7c673cae
FG
4765 if (!s || s->con != con) {
4766 ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
4767 m->put();
7c673cae
FG
4768 return;
4769 }
4770
f67539c2
TL
4771 shared_lock sl(s->lock);
4772 auto p = s->command_ops.find(m->get_tid());
7c673cae
FG
4773 if (p == s->command_ops.end()) {
4774 ldout(cct, 10) << "handle_command_reply tid " << m->get_tid()
4775 << " not found" << dendl;
4776 m->put();
4777 sl.unlock();
7c673cae
FG
4778 return;
4779 }
4780
4781 CommandOp *c = p->second;
4782 if (!c->session ||
4783 m->get_connection() != c->session->con) {
4784 ldout(cct, 10) << "handle_command_reply tid " << m->get_tid()
4785 << " got reply from wrong connection "
4786 << m->get_connection() << " " << m->get_source_inst()
4787 << dendl;
4788 m->put();
4789 sl.unlock();
7c673cae
FG
4790 return;
4791 }
f67539c2 4792
9f95a23c
TL
4793 if (m->r == -EAGAIN) {
4794 ldout(cct,10) << __func__ << " tid " << m->get_tid()
4795 << " got EAGAIN, requesting map and resending" << dendl;
4796 // NOTE: This might resend twice... once now, and once again when
4797 // we get an updated osdmap and the PG is found to have moved.
4798 _maybe_request_map();
4799 _send_command(c);
4800 m->put();
4801 sl.unlock();
4802 return;
4803 }
4804
7c673cae
FG
4805 sl.unlock();
4806
f67539c2
TL
4807 unique_lock sul(s->lock);
4808 _finish_command(c, m->r < 0 ? bs::error_code(-m->r, osd_category()) :
4809 bs::error_code(), std::move(m->rs),
4810 std::move(m->get_data()));
28e407b8
AA
4811 sul.unlock();
4812
7c673cae 4813 m->put();
7c673cae
FG
4814}
4815
522d829b
TL
4816Objecter::LingerOp::LingerOp(Objecter *o, uint64_t linger_id)
4817 : objecter(o),
4818 linger_id(linger_id),
4819 watch_lock(ceph::make_shared_mutex(
4820 fmt::format("LingerOp::watch_lock #{}", linger_id)))
4821{}
4822
7c673cae
FG
4823void Objecter::submit_command(CommandOp *c, ceph_tid_t *ptid)
4824{
4825 shunique_lock sul(rwlock, ceph::acquire_unique);
4826
31f18b77 4827 ceph_tid_t tid = ++last_tid;
7c673cae
FG
4828 ldout(cct, 10) << "_submit_command " << tid << " " << c->cmd << dendl;
4829 c->tid = tid;
4830
4831 {
f67539c2 4832 unique_lock hs_wl(homeless_session->lock);
7c673cae
FG
4833 _session_command_op_assign(homeless_session, c);
4834 }
4835
4836 _calc_command_target(c, sul);
4837 _assign_command_session(c, sul);
4838 if (osd_timeout > timespan(0)) {
4839 c->ontimeout = timer.add_event(osd_timeout,
4840 [this, c, tid]() {
f67539c2
TL
4841 command_op_cancel(
4842 c->session, tid,
4843 osdc_errc::timed_out); });
7c673cae
FG
4844 }
4845
4846 if (!c->session->is_homeless()) {
4847 _send_command(c);
4848 } else {
4849 _maybe_request_map();
4850 }
4851 if (c->map_check_error)
4852 _send_command_map_check(c);
f67539c2
TL
4853 if (ptid)
4854 *ptid = tid;
7c673cae
FG
4855
4856 logger->inc(l_osdc_command_active);
4857}
4858
f67539c2
TL
4859int Objecter::_calc_command_target(CommandOp *c,
4860 shunique_lock<ceph::shared_mutex>& sul)
7c673cae 4861{
11fdf7f2 4862 ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
7c673cae
FG
4863
4864 c->map_check_error = 0;
4865
4866 // ignore overlays, just like we do with pg ops
4867 c->target.flags |= CEPH_OSD_FLAG_IGNORE_OVERLAY;
4868
4869 if (c->target_osd >= 0) {
4870 if (!osdmap->exists(c->target_osd)) {
4871 c->map_check_error = -ENOENT;
4872 c->map_check_error_str = "osd dne";
4873 c->target.osd = -1;
4874 return RECALC_OP_TARGET_OSD_DNE;
4875 }
4876 if (osdmap->is_down(c->target_osd)) {
4877 c->map_check_error = -ENXIO;
4878 c->map_check_error_str = "osd down";
4879 c->target.osd = -1;
4880 return RECALC_OP_TARGET_OSD_DOWN;
4881 }
4882 c->target.osd = c->target_osd;
4883 } else {
4884 int ret = _calc_target(&(c->target), nullptr, true);
4885 if (ret == RECALC_OP_TARGET_POOL_DNE) {
4886 c->map_check_error = -ENOENT;
4887 c->map_check_error_str = "pool dne";
4888 c->target.osd = -1;
4889 return ret;
4890 } else if (ret == RECALC_OP_TARGET_OSD_DOWN) {
4891 c->map_check_error = -ENXIO;
4892 c->map_check_error_str = "osd down";
4893 c->target.osd = -1;
4894 return ret;
4895 }
4896 }
4897
4898 OSDSession *s;
4899 int r = _get_session(c->target.osd, &s, sul);
11fdf7f2 4900 ceph_assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
7c673cae
FG
4901
4902 if (c->session != s) {
4903 put_session(s);
4904 return RECALC_OP_TARGET_NEED_RESEND;
4905 }
4906
4907 put_session(s);
4908
4909 ldout(cct, 20) << "_recalc_command_target " << c->tid << " no change, "
4910 << c->session << dendl;
4911
4912 return RECALC_OP_TARGET_NO_ACTION;
4913}
4914
4915void Objecter::_assign_command_session(CommandOp *c,
f67539c2 4916 shunique_lock<ceph::shared_mutex>& sul)
7c673cae 4917{
11fdf7f2 4918 ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
7c673cae
FG
4919
4920 OSDSession *s;
4921 int r = _get_session(c->target.osd, &s, sul);
11fdf7f2 4922 ceph_assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
7c673cae
FG
4923
4924 if (c->session != s) {
4925 if (c->session) {
4926 OSDSession *cs = c->session;
f67539c2 4927 unique_lock csl(cs->lock);
7c673cae
FG
4928 _session_command_op_remove(c->session, c);
4929 csl.unlock();
4930 }
f67539c2 4931 unique_lock sl(s->lock);
7c673cae
FG
4932 _session_command_op_assign(s, c);
4933 }
4934
4935 put_session(s);
4936}
4937
4938void Objecter::_send_command(CommandOp *c)
4939{
4940 ldout(cct, 10) << "_send_command " << c->tid << dendl;
11fdf7f2
TL
4941 ceph_assert(c->session);
4942 ceph_assert(c->session->con);
f67539c2 4943 auto m = new MCommand(monc->monmap.fsid);
7c673cae
FG
4944 m->cmd = c->cmd;
4945 m->set_data(c->inbl);
4946 m->set_tid(c->tid);
4947 c->session->con->send_message(m);
4948 logger->inc(l_osdc_command_send);
4949}
4950
f67539c2
TL
4951int Objecter::command_op_cancel(OSDSession *s, ceph_tid_t tid,
4952 bs::error_code ec)
7c673cae 4953{
11fdf7f2 4954 ceph_assert(initialized);
7c673cae
FG
4955
4956 unique_lock wl(rwlock);
4957
f67539c2 4958 auto it = s->command_ops.find(tid);
7c673cae
FG
4959 if (it == s->command_ops.end()) {
4960 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4961 return -ENOENT;
4962 }
4963
4964 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4965
4966 CommandOp *op = it->second;
4967 _command_cancel_map_check(op);
f67539c2
TL
4968 unique_lock sl(op->session->lock);
4969 _finish_command(op, ec, {}, {});
28e407b8 4970 sl.unlock();
7c673cae
FG
4971 return 0;
4972}
4973
f67539c2
TL
4974void Objecter::_finish_command(CommandOp *c, bs::error_code ec,
4975 string&& rs, cb::list&& bl)
7c673cae
FG
4976{
4977 // rwlock is locked unique
28e407b8 4978 // session lock is locked
7c673cae 4979
f67539c2 4980 ldout(cct, 10) << "_finish_command " << c->tid << " = " << ec << " "
7c673cae 4981 << rs << dendl;
f67539c2 4982
7c673cae 4983 if (c->onfinish)
f67539c2 4984 c->onfinish->defer(std::move(c->onfinish), ec, std::move(rs), std::move(bl));
7c673cae 4985
f67539c2 4986 if (c->ontimeout && ec != bs::errc::timed_out)
7c673cae
FG
4987 timer.cancel_event(c->ontimeout);
4988
7c673cae 4989 _session_command_op_remove(c->session, c);
7c673cae
FG
4990
4991 c->put();
4992
4993 logger->dec(l_osdc_command_active);
4994}
4995
4996Objecter::OSDSession::~OSDSession()
4997{
4998 // Caller is responsible for re-assigning or
4999 // destroying any ops that were assigned to us
11fdf7f2
TL
5000 ceph_assert(ops.empty());
5001 ceph_assert(linger_ops.empty());
5002 ceph_assert(command_ops.empty());
7c673cae
FG
5003}
5004
f67539c2
TL
5005Objecter::Objecter(CephContext *cct,
5006 Messenger *m, MonClient *mc,
5007 boost::asio::io_context& service) :
5008 Dispatcher(cct), messenger(m), monc(mc), service(service)
f91f0fd5
TL
5009{
5010 mon_timeout = cct->_conf.get_val<std::chrono::seconds>("rados_mon_op_timeout");
5011 osd_timeout = cct->_conf.get_val<std::chrono::seconds>("rados_osd_op_timeout");
5012}
9f95a23c 5013
7c673cae
FG
5014Objecter::~Objecter()
5015{
11fdf7f2
TL
5016 ceph_assert(homeless_session->get_nref() == 1);
5017 ceph_assert(num_homeless_ops == 0);
7c673cae
FG
5018 homeless_session->put();
5019
11fdf7f2
TL
5020 ceph_assert(osd_sessions.empty());
5021 ceph_assert(poolstat_ops.empty());
5022 ceph_assert(statfs_ops.empty());
5023 ceph_assert(pool_ops.empty());
5024 ceph_assert(waiting_for_map.empty());
5025 ceph_assert(linger_ops.empty());
5026 ceph_assert(check_latest_map_lingers.empty());
5027 ceph_assert(check_latest_map_ops.empty());
5028 ceph_assert(check_latest_map_commands.empty());
7c673cae 5029
11fdf7f2
TL
5030 ceph_assert(!m_request_state_hook);
5031 ceph_assert(!logger);
7c673cae
FG
5032}
5033
5034/**
5035 * Wait until this OSD map epoch is received before
5036 * sending any more operations to OSDs. Use this
5037 * when it is known that the client can't trust
5038 * anything from before this epoch (e.g. due to
f67539c2 5039 * client blocklist at this epoch).
7c673cae
FG
5040 */
5041void Objecter::set_epoch_barrier(epoch_t epoch)
5042{
5043 unique_lock wl(rwlock);
5044
5045 ldout(cct, 7) << __func__ << ": barrier " << epoch << " (was "
5046 << epoch_barrier << ") current epoch " << osdmap->get_epoch()
5047 << dendl;
5048 if (epoch > epoch_barrier) {
5049 epoch_barrier = epoch;
5050 _maybe_request_map();
5051 }
5052}
5053
5054
5055
5056hobject_t Objecter::enumerate_objects_begin()
5057{
5058 return hobject_t();
5059}
5060
5061hobject_t Objecter::enumerate_objects_end()
5062{
5063 return hobject_t::get_max();
5064}
5065
f67539c2
TL
5066template<typename T>
5067struct EnumerationContext {
5068 Objecter* objecter;
7c673cae 5069 const hobject_t end;
f67539c2
TL
5070 const cb::list filter;
5071 uint32_t max;
5072 const object_locator_t oloc;
5073 std::vector<T> ls;
5074private:
5075 fu2::unique_function<void(bs::error_code,
5076 std::vector<T>,
5077 hobject_t) &&> on_finish;
5078public:
5079 epoch_t epoch = 0;
5080 int budget = -1;
5081
5082 EnumerationContext(Objecter* objecter,
5083 hobject_t end, cb::list filter,
5084 uint32_t max, object_locator_t oloc,
5085 decltype(on_finish) on_finish)
5086 : objecter(objecter), end(std::move(end)), filter(std::move(filter)),
5087 max(max), oloc(std::move(oloc)), on_finish(std::move(on_finish)) {}
5088
5089 void operator()(bs::error_code ec,
5090 std::vector<T> v,
5091 hobject_t h) && {
5092 if (budget >= 0) {
5093 objecter->put_op_budget_bytes(budget);
5094 budget = -1;
5095 }
5096
5097 std::move(on_finish)(ec, std::move(v), std::move(h));
5098 }
5099};
7c673cae 5100
f67539c2
TL
5101template<typename T>
5102struct CB_EnumerateReply {
5103 cb::list bl;
7c673cae 5104
f67539c2
TL
5105 Objecter* objecter;
5106 std::unique_ptr<EnumerationContext<T>> ctx;
7c673cae 5107
f67539c2
TL
5108 CB_EnumerateReply(Objecter* objecter,
5109 std::unique_ptr<EnumerationContext<T>>&& ctx) :
5110 objecter(objecter), ctx(std::move(ctx)) {}
5111
5112 void operator()(bs::error_code ec) {
5113 objecter->_enumerate_reply(std::move(bl), ec, std::move(ctx));
7c673cae
FG
5114 }
5115};
5116
f67539c2 5117template<typename T>
7c673cae 5118void Objecter::enumerate_objects(
f67539c2
TL
5119 int64_t pool_id,
5120 std::string_view ns,
5121 hobject_t start,
5122 hobject_t end,
5123 const uint32_t max,
5124 const cb::list& filter_bl,
5125 fu2::unique_function<void(bs::error_code,
5126 std::vector<T>,
5127 hobject_t) &&> on_finish) {
7c673cae
FG
5128 if (!end.is_max() && start > end) {
5129 lderr(cct) << __func__ << ": start " << start << " > end " << end << dendl;
f67539c2 5130 std::move(on_finish)(osdc_errc::precondition_violated, {}, {});
7c673cae
FG
5131 return;
5132 }
5133
5134 if (max < 1) {
5135 lderr(cct) << __func__ << ": result size may not be zero" << dendl;
f67539c2 5136 std::move(on_finish)(osdc_errc::precondition_violated, {}, {});
7c673cae
FG
5137 return;
5138 }
5139
5140 if (start.is_max()) {
f67539c2 5141 std::move(on_finish)({}, {}, {});
7c673cae
FG
5142 return;
5143 }
5144
5145 shared_lock rl(rwlock);
11fdf7f2 5146 ceph_assert(osdmap->get_epoch());
7c673cae
FG
5147 if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) {
5148 rl.unlock();
5149 lderr(cct) << __func__ << ": SORTBITWISE cluster flag not set" << dendl;
f67539c2 5150 std::move(on_finish)(osdc_errc::not_supported, {}, {});
7c673cae
FG
5151 return;
5152 }
f67539c2 5153 const pg_pool_t* p = osdmap->get_pg_pool(pool_id);
7c673cae
FG
5154 if (!p) {
5155 lderr(cct) << __func__ << ": pool " << pool_id << " DNE in osd epoch "
5156 << osdmap->get_epoch() << dendl;
5157 rl.unlock();
f67539c2 5158 std::move(on_finish)(osdc_errc::pool_dne, {}, {});
7c673cae
FG
5159 return;
5160 } else {
5161 rl.unlock();
5162 }
5163
f67539c2
TL
5164 _issue_enumerate(start,
5165 std::make_unique<EnumerationContext<T>>(
5166 this, std::move(end), filter_bl,
5167 max, object_locator_t{pool_id, ns},
5168 std::move(on_finish)));
5169}
5170
5171template
5172void Objecter::enumerate_objects<librados::ListObjectImpl>(
5173 int64_t pool_id,
5174 std::string_view ns,
5175 hobject_t start,
5176 hobject_t end,
5177 const uint32_t max,
5178 const cb::list& filter_bl,
5179 fu2::unique_function<void(bs::error_code,
5180 std::vector<librados::ListObjectImpl>,
5181 hobject_t) &&> on_finish);
5182
5183template
5184void Objecter::enumerate_objects<neorados::Entry>(
5185 int64_t pool_id,
5186 std::string_view ns,
5187 hobject_t start,
5188 hobject_t end,
5189 const uint32_t max,
5190 const cb::list& filter_bl,
5191 fu2::unique_function<void(bs::error_code,
5192 std::vector<neorados::Entry>,
5193 hobject_t) &&> on_finish);
5194
5195
5196
5197template<typename T>
5198void Objecter::_issue_enumerate(hobject_t start,
5199 std::unique_ptr<EnumerationContext<T>> ctx) {
7c673cae 5200 ObjectOperation op;
f67539c2
TL
5201 auto c = ctx.get();
5202 op.pg_nls(c->max, c->filter, start, osdmap->get_epoch());
5203 auto on_ack = std::make_unique<CB_EnumerateReply<T>>(this, std::move(ctx));
5204 // I hate having to do this. Try to find a cleaner way
5205 // later.
5206 auto epoch = &c->epoch;
5207 auto budget = &c->budget;
5208 auto pbl = &on_ack->bl;
7c673cae
FG
5209
5210 // Issue. See you later in _enumerate_reply
f67539c2
TL
5211 pg_read(start.get_hash(),
5212 c->oloc, op, pbl, 0,
5213 Op::OpComp::create(service.get_executor(),
5214 [c = std::move(on_ack)]
5215 (bs::error_code ec) mutable {
5216 (*c)(ec);
5217 }), epoch, budget);
5218}
5219
5220template
5221void Objecter::_issue_enumerate<librados::ListObjectImpl>(
5222 hobject_t start,
5223 std::unique_ptr<EnumerationContext<librados::ListObjectImpl>> ctx);
5224template
5225void Objecter::_issue_enumerate<neorados::Entry>(
5226 hobject_t start, std::unique_ptr<EnumerationContext<neorados::Entry>> ctx);
5227
5228template<typename T>
7c673cae 5229void Objecter::_enumerate_reply(
f67539c2
TL
5230 cb::list&& bl,
5231 bs::error_code ec,
5232 std::unique_ptr<EnumerationContext<T>>&& ctx)
5233{
5234 if (ec) {
5235 std::move(*ctx)(ec, {}, {});
7c673cae
FG
5236 return;
5237 }
5238
7c673cae 5239 // Decode the results
11fdf7f2 5240 auto iter = bl.cbegin();
f67539c2
TL
5241 pg_nls_response_template<T> response;
5242
5243 try {
5244 response.decode(iter);
5245 if (!iter.end()) {
5246 // extra_info isn't used anywhere. We do this solely to preserve
5247 // backward compatibility
5248 cb::list legacy_extra_info;
5249 decode(legacy_extra_info, iter);
5250 }
5251 } catch (const bs::system_error& e) {
5252 std::move(*ctx)(e.code(), {}, {});
5253 return;
5254 }
7c673cae 5255
f67539c2
TL
5256 shared_lock rl(rwlock);
5257 auto pool = osdmap->get_pg_pool(ctx->oloc.get_pool());
5258 rl.unlock();
5259 if (!pool) {
5260 // pool is gone, drop any results which are now meaningless.
5261 std::move(*ctx)(osdc_errc::pool_dne, {}, {});
5262 return;
7c673cae
FG
5263 }
5264
f67539c2
TL
5265 hobject_t next;
5266 if ((response.handle <= ctx->end)) {
5267 next = response.handle;
7c673cae 5268 } else {
f67539c2 5269 next = ctx->end;
7c673cae
FG
5270
5271 // drop anything after 'end'
7c673cae
FG
5272 while (!response.entries.empty()) {
5273 uint32_t hash = response.entries.back().locator.empty() ?
5274 pool->hash_key(response.entries.back().oid,
5275 response.entries.back().nspace) :
5276 pool->hash_key(response.entries.back().locator,
5277 response.entries.back().nspace);
5278 hobject_t last(response.entries.back().oid,
5279 response.entries.back().locator,
5280 CEPH_NOSNAP,
5281 hash,
f67539c2 5282 ctx->oloc.get_pool(),
7c673cae 5283 response.entries.back().nspace);
f67539c2 5284 if (last < ctx->end)
7c673cae 5285 break;
7c673cae
FG
5286 response.entries.pop_back();
5287 }
7c673cae 5288 }
f67539c2
TL
5289
5290 if (response.entries.size() <= ctx->max) {
5291 ctx->max -= response.entries.size();
5292 std::move(response.entries.begin(), response.entries.end(),
5293 std::back_inserter(ctx->ls));
5294 } else {
5295 auto i = response.entries.begin();
5296 while (ctx->max > 0) {
5297 ctx->ls.push_back(std::move(*i));
5298 --(ctx->max);
5299 ++i;
5300 }
5301 uint32_t hash =
5302 i->locator.empty() ?
5303 pool->hash_key(i->oid, i->nspace) :
5304 pool->hash_key(i->locator, i->nspace);
5305
5306 next = hobject_t{i->oid, i->locator,
5307 CEPH_NOSNAP,
5308 hash,
5309 ctx->oloc.get_pool(),
5310 i->nspace};
7c673cae
FG
5311 }
5312
f67539c2
TL
5313 if (next == ctx->end || ctx->max == 0) {
5314 std::move(*ctx)(ec, std::move(ctx->ls), std::move(next));
5315 } else {
5316 _issue_enumerate(next, std::move(ctx));
5317 }
7c673cae
FG
5318}
5319
f67539c2
TL
5320template
5321void Objecter::_enumerate_reply<librados::ListObjectImpl>(
5322 cb::list&& bl,
5323 bs::error_code ec,
5324 std::unique_ptr<EnumerationContext<librados::ListObjectImpl>>&& ctx);
5325
5326template
5327void Objecter::_enumerate_reply<neorados::Entry>(
5328 cb::list&& bl,
5329 bs::error_code ec,
5330 std::unique_ptr<EnumerationContext<neorados::Entry>>&& ctx);
5331
7c673cae
FG
5332namespace {
5333 using namespace librados;
5334
5335 template <typename T>
f67539c2 5336 void do_decode(std::vector<T>& items, std::vector<cb::list>& bls)
7c673cae
FG
5337 {
5338 for (auto bl : bls) {
11fdf7f2 5339 auto p = bl.cbegin();
7c673cae
FG
5340 T t;
5341 decode(t, p);
5342 items.push_back(t);
5343 }
5344 }
5345
5346 struct C_ObjectOperation_scrub_ls : public Context {
f67539c2
TL
5347 cb::list bl;
5348 uint32_t* interval;
7c673cae
FG
5349 std::vector<inconsistent_obj_t> *objects = nullptr;
5350 std::vector<inconsistent_snapset_t> *snapsets = nullptr;
f67539c2 5351 int* rval;
7c673cae 5352
f67539c2
TL
5353 C_ObjectOperation_scrub_ls(uint32_t* interval,
5354 std::vector<inconsistent_obj_t>* objects,
5355 int* rval)
7c673cae 5356 : interval(interval), objects(objects), rval(rval) {}
f67539c2
TL
5357 C_ObjectOperation_scrub_ls(uint32_t* interval,
5358 std::vector<inconsistent_snapset_t>* snapsets,
5359 int* rval)
7c673cae
FG
5360 : interval(interval), snapsets(snapsets), rval(rval) {}
5361 void finish(int r) override {
5362 if (r < 0 && r != -EAGAIN) {
5363 if (rval)
5364 *rval = r;
5365 return;
5366 }
5367
5368 if (rval)
5369 *rval = 0;
5370
5371 try {
5372 decode();
f67539c2 5373 } catch (cb::error&) {
7c673cae
FG
5374 if (rval)
5375 *rval = -EIO;
5376 }
5377 }
5378 private:
5379 void decode() {
5380 scrub_ls_result_t result;
11fdf7f2 5381 auto p = bl.cbegin();
7c673cae
FG
5382 result.decode(p);
5383 *interval = result.interval;
5384 if (objects) {
5385 do_decode(*objects, result.vals);
5386 } else {
5387 do_decode(*snapsets, result.vals);
5388 }
5389 }
5390 };
5391
5392 template <typename T>
f67539c2 5393 void do_scrub_ls(::ObjectOperation* op,
7c673cae
FG
5394 const scrub_ls_arg_t& arg,
5395 std::vector<T> *items,
f67539c2
TL
5396 uint32_t* interval,
5397 int* rval)
7c673cae
FG
5398 {
5399 OSDOp& osd_op = op->add_op(CEPH_OSD_OP_SCRUBLS);
5400 op->flags |= CEPH_OSD_FLAG_PGOP;
11fdf7f2 5401 ceph_assert(interval);
7c673cae
FG
5402 arg.encode(osd_op.indata);
5403 unsigned p = op->ops.size() - 1;
f67539c2
TL
5404 auto h = new C_ObjectOperation_scrub_ls{interval, items, rval};
5405 op->set_handler(h);
7c673cae
FG
5406 op->out_bl[p] = &h->bl;
5407 op->out_rval[p] = rval;
5408 }
5409}
5410
5411void ::ObjectOperation::scrub_ls(const librados::object_id_t& start_after,
5412 uint64_t max_to_get,
f67539c2
TL
5413 std::vector<librados::inconsistent_obj_t>* objects,
5414 uint32_t* interval,
5415 int* rval)
7c673cae
FG
5416{
5417 scrub_ls_arg_t arg = {*interval, 0, start_after, max_to_get};
5418 do_scrub_ls(this, arg, objects, interval, rval);
5419}
5420
5421void ::ObjectOperation::scrub_ls(const librados::object_id_t& start_after,
5422 uint64_t max_to_get,
5423 std::vector<librados::inconsistent_snapset_t> *snapsets,
5424 uint32_t *interval,
5425 int *rval)
5426{
5427 scrub_ls_arg_t arg = {*interval, 1, start_after, max_to_get};
5428 do_scrub_ls(this, arg, snapsets, interval, rval);
5429}