]> git.proxmox.com Git - ceph.git/blame - ceph/src/osdc/Objecter.cc
buildsys: switch source download to quincy
[ceph.git] / ceph / src / osdc / Objecter.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
f67539c2 15#include <algorithm>
9f95a23c
TL
16#include <cerrno>
17
7c673cae
FG
18#include "Objecter.h"
19#include "osd/OSDMap.h"
f67539c2 20#include "osd/error_code.h"
7c673cae
FG
21#include "Filer.h"
22
23#include "mon/MonClient.h"
f67539c2 24#include "mon/error_code.h"
7c673cae
FG
25
26#include "msg/Messenger.h"
27#include "msg/Message.h"
28
29#include "messages/MPing.h"
30#include "messages/MOSDOp.h"
31#include "messages/MOSDOpReply.h"
32#include "messages/MOSDBackoff.h"
33#include "messages/MOSDMap.h"
34
35#include "messages/MPoolOp.h"
36#include "messages/MPoolOpReply.h"
37
38#include "messages/MGetPoolStats.h"
39#include "messages/MGetPoolStatsReply.h"
40#include "messages/MStatfs.h"
41#include "messages/MStatfsReply.h"
42
43#include "messages/MMonCommand.h"
44
45#include "messages/MCommand.h"
46#include "messages/MCommandReply.h"
47
48#include "messages/MWatchNotify.h"
49
7c673cae 50
f67539c2 51#include "common/Cond.h"
7c673cae
FG
52#include "common/config.h"
53#include "common/perf_counters.h"
54#include "common/scrub_types.h"
55#include "include/str_list.h"
56#include "common/errno.h"
57#include "common/EventTrace.h"
f67539c2
TL
58#include "common/async/waiter.h"
59#include "error_code.h"
60
7c673cae 61
9f95a23c
TL
62using std::list;
63using std::make_pair;
64using std::map;
65using std::ostream;
66using std::ostringstream;
67using std::pair;
68using std::set;
69using std::string;
70using std::stringstream;
71using std::vector;
72
73using ceph::decode;
74using ceph::encode;
75using ceph::Formatter;
76
77using std::defer_lock;
78
7c673cae
FG
79using ceph::real_time;
80using ceph::real_clock;
81
82using ceph::mono_clock;
83using ceph::mono_time;
84
85using ceph::timespan;
86
9f95a23c
TL
87using ceph::shunique_lock;
88using ceph::acquire_shared;
89using ceph::acquire_unique;
7c673cae 90
f67539c2
TL
91namespace bc = boost::container;
92namespace bs = boost::system;
93namespace ca = ceph::async;
94namespace cb = ceph::buffer;
95
7c673cae
FG
96#define dout_subsys ceph_subsys_objecter
97#undef dout_prefix
98#define dout_prefix *_dout << messenger->get_myname() << ".objecter "
99
100
101enum {
102 l_osdc_first = 123200,
103 l_osdc_op_active,
104 l_osdc_op_laggy,
105 l_osdc_op_send,
106 l_osdc_op_send_bytes,
107 l_osdc_op_resend,
108 l_osdc_op_reply,
f67539c2 109 l_osdc_oplen_avg,
7c673cae
FG
110
111 l_osdc_op,
112 l_osdc_op_r,
113 l_osdc_op_w,
114 l_osdc_op_rmw,
115 l_osdc_op_pg,
116
117 l_osdc_osdop_stat,
118 l_osdc_osdop_create,
119 l_osdc_osdop_read,
120 l_osdc_osdop_write,
121 l_osdc_osdop_writefull,
122 l_osdc_osdop_writesame,
123 l_osdc_osdop_append,
124 l_osdc_osdop_zero,
125 l_osdc_osdop_truncate,
126 l_osdc_osdop_delete,
127 l_osdc_osdop_mapext,
128 l_osdc_osdop_sparse_read,
129 l_osdc_osdop_clonerange,
130 l_osdc_osdop_getxattr,
131 l_osdc_osdop_setxattr,
132 l_osdc_osdop_cmpxattr,
133 l_osdc_osdop_rmxattr,
134 l_osdc_osdop_resetxattrs,
7c673cae
FG
135 l_osdc_osdop_call,
136 l_osdc_osdop_watch,
137 l_osdc_osdop_notify,
138 l_osdc_osdop_src_cmpxattr,
139 l_osdc_osdop_pgls,
140 l_osdc_osdop_pgls_filter,
141 l_osdc_osdop_other,
142
143 l_osdc_linger_active,
144 l_osdc_linger_send,
145 l_osdc_linger_resend,
146 l_osdc_linger_ping,
147
148 l_osdc_poolop_active,
149 l_osdc_poolop_send,
150 l_osdc_poolop_resend,
151
152 l_osdc_poolstat_active,
153 l_osdc_poolstat_send,
154 l_osdc_poolstat_resend,
155
156 l_osdc_statfs_active,
157 l_osdc_statfs_send,
158 l_osdc_statfs_resend,
159
160 l_osdc_command_active,
161 l_osdc_command_send,
162 l_osdc_command_resend,
163
164 l_osdc_map_epoch,
165 l_osdc_map_full,
166 l_osdc_map_inc,
167
168 l_osdc_osd_sessions,
169 l_osdc_osd_session_open,
170 l_osdc_osd_session_close,
171 l_osdc_osd_laggy,
172
173 l_osdc_osdop_omap_wr,
174 l_osdc_osdop_omap_rd,
175 l_osdc_osdop_omap_del,
176
177 l_osdc_last,
178};
179
f67539c2
TL
180namespace {
181inline bs::error_code osdcode(int r) {
182 return (r < 0) ? bs::error_code(-r, osd_category()) : bs::error_code();
183}
184}
7c673cae
FG
185
186// config obs ----------------------------
187
7c673cae
FG
188class Objecter::RequestStateHook : public AdminSocketHook {
189 Objecter *m_objecter;
190public:
191 explicit RequestStateHook(Objecter *objecter);
9f95a23c
TL
192 int call(std::string_view command, const cmdmap_t& cmdmap,
193 Formatter *f,
194 std::ostream& ss,
f67539c2 195 cb::list& out) override;
7c673cae
FG
196};
197
f67539c2 198std::unique_lock<std::mutex> Objecter::OSDSession::get_lock(object_t& oid)
7c673cae
FG
199{
200 if (oid.name.empty())
f67539c2 201 return {};
7c673cae
FG
202
203 static constexpr uint32_t HASH_PRIME = 1021;
204 uint32_t h = ceph_str_hash_linux(oid.name.c_str(), oid.name.size())
205 % HASH_PRIME;
206
f67539c2 207 return {completion_locks[h % num_locks], std::defer_lock};
7c673cae
FG
208}
209
210const char** Objecter::get_tracked_conf_keys() const
211{
f91f0fd5
TL
212 static const char *config_keys[] = {
213 "crush_location",
214 "rados_mon_op_timeout",
215 "rados_osd_op_timeout",
216 NULL
217 };
7c673cae
FG
218 return config_keys;
219}
220
221
11fdf7f2 222void Objecter::handle_conf_change(const ConfigProxy& conf,
7c673cae
FG
223 const std::set <std::string> &changed)
224{
225 if (changed.count("crush_location")) {
226 update_crush_location();
227 }
f91f0fd5
TL
228 if (changed.count("rados_mon_op_timeout")) {
229 mon_timeout = conf.get_val<std::chrono::seconds>("rados_mon_op_timeout");
230 }
231 if (changed.count("rados_osd_op_timeout")) {
232 osd_timeout = conf.get_val<std::chrono::seconds>("rados_osd_op_timeout");
233 }
7c673cae
FG
234}
235
236void Objecter::update_crush_location()
237{
238 unique_lock wl(rwlock);
239 crush_location = cct->crush_location.get_location();
240}
241
242// messages ------------------------------
243
244/*
245 * initialize only internal data structures, don't initiate cluster interaction
246 */
247void Objecter::init()
248{
11fdf7f2 249 ceph_assert(!initialized);
7c673cae
FG
250
251 if (!logger) {
252 PerfCountersBuilder pcb(cct, "objecter", l_osdc_first, l_osdc_last);
253
254 pcb.add_u64(l_osdc_op_active, "op_active", "Operations active", "actv",
255 PerfCountersBuilder::PRIO_CRITICAL);
256 pcb.add_u64(l_osdc_op_laggy, "op_laggy", "Laggy operations");
257 pcb.add_u64_counter(l_osdc_op_send, "op_send", "Sent operations");
11fdf7f2 258 pcb.add_u64_counter(l_osdc_op_send_bytes, "op_send_bytes", "Sent data", NULL, 0, unit_t(UNIT_BYTES));
7c673cae
FG
259 pcb.add_u64_counter(l_osdc_op_resend, "op_resend", "Resent operations");
260 pcb.add_u64_counter(l_osdc_op_reply, "op_reply", "Operation reply");
f67539c2 261 pcb.add_u64_avg(l_osdc_oplen_avg, "oplen_avg", "Average length of operation vector");
7c673cae
FG
262
263 pcb.add_u64_counter(l_osdc_op, "op", "Operations");
264 pcb.add_u64_counter(l_osdc_op_r, "op_r", "Read operations", "rd",
265 PerfCountersBuilder::PRIO_CRITICAL);
266 pcb.add_u64_counter(l_osdc_op_w, "op_w", "Write operations", "wr",
267 PerfCountersBuilder::PRIO_CRITICAL);
268 pcb.add_u64_counter(l_osdc_op_rmw, "op_rmw", "Read-modify-write operations",
269 "rdwr", PerfCountersBuilder::PRIO_INTERESTING);
270 pcb.add_u64_counter(l_osdc_op_pg, "op_pg", "PG operation");
271
272 pcb.add_u64_counter(l_osdc_osdop_stat, "osdop_stat", "Stat operations");
273 pcb.add_u64_counter(l_osdc_osdop_create, "osdop_create",
274 "Create object operations");
275 pcb.add_u64_counter(l_osdc_osdop_read, "osdop_read", "Read operations");
276 pcb.add_u64_counter(l_osdc_osdop_write, "osdop_write", "Write operations");
277 pcb.add_u64_counter(l_osdc_osdop_writefull, "osdop_writefull",
278 "Write full object operations");
279 pcb.add_u64_counter(l_osdc_osdop_writesame, "osdop_writesame",
280 "Write same operations");
281 pcb.add_u64_counter(l_osdc_osdop_append, "osdop_append",
282 "Append operation");
283 pcb.add_u64_counter(l_osdc_osdop_zero, "osdop_zero",
284 "Set object to zero operations");
285 pcb.add_u64_counter(l_osdc_osdop_truncate, "osdop_truncate",
286 "Truncate object operations");
287 pcb.add_u64_counter(l_osdc_osdop_delete, "osdop_delete",
288 "Delete object operations");
289 pcb.add_u64_counter(l_osdc_osdop_mapext, "osdop_mapext",
290 "Map extent operations");
291 pcb.add_u64_counter(l_osdc_osdop_sparse_read, "osdop_sparse_read",
292 "Sparse read operations");
293 pcb.add_u64_counter(l_osdc_osdop_clonerange, "osdop_clonerange",
294 "Clone range operations");
295 pcb.add_u64_counter(l_osdc_osdop_getxattr, "osdop_getxattr",
296 "Get xattr operations");
297 pcb.add_u64_counter(l_osdc_osdop_setxattr, "osdop_setxattr",
298 "Set xattr operations");
299 pcb.add_u64_counter(l_osdc_osdop_cmpxattr, "osdop_cmpxattr",
300 "Xattr comparison operations");
301 pcb.add_u64_counter(l_osdc_osdop_rmxattr, "osdop_rmxattr",
302 "Remove xattr operations");
303 pcb.add_u64_counter(l_osdc_osdop_resetxattrs, "osdop_resetxattrs",
304 "Reset xattr operations");
7c673cae
FG
305 pcb.add_u64_counter(l_osdc_osdop_call, "osdop_call",
306 "Call (execute) operations");
307 pcb.add_u64_counter(l_osdc_osdop_watch, "osdop_watch",
308 "Watch by object operations");
309 pcb.add_u64_counter(l_osdc_osdop_notify, "osdop_notify",
310 "Notify about object operations");
311 pcb.add_u64_counter(l_osdc_osdop_src_cmpxattr, "osdop_src_cmpxattr",
312 "Extended attribute comparison in multi operations");
313 pcb.add_u64_counter(l_osdc_osdop_pgls, "osdop_pgls");
314 pcb.add_u64_counter(l_osdc_osdop_pgls_filter, "osdop_pgls_filter");
315 pcb.add_u64_counter(l_osdc_osdop_other, "osdop_other", "Other operations");
316
317 pcb.add_u64(l_osdc_linger_active, "linger_active",
318 "Active lingering operations");
319 pcb.add_u64_counter(l_osdc_linger_send, "linger_send",
320 "Sent lingering operations");
321 pcb.add_u64_counter(l_osdc_linger_resend, "linger_resend",
322 "Resent lingering operations");
323 pcb.add_u64_counter(l_osdc_linger_ping, "linger_ping",
324 "Sent pings to lingering operations");
325
326 pcb.add_u64(l_osdc_poolop_active, "poolop_active",
327 "Active pool operations");
328 pcb.add_u64_counter(l_osdc_poolop_send, "poolop_send",
329 "Sent pool operations");
330 pcb.add_u64_counter(l_osdc_poolop_resend, "poolop_resend",
331 "Resent pool operations");
332
333 pcb.add_u64(l_osdc_poolstat_active, "poolstat_active",
334 "Active get pool stat operations");
335 pcb.add_u64_counter(l_osdc_poolstat_send, "poolstat_send",
336 "Pool stat operations sent");
337 pcb.add_u64_counter(l_osdc_poolstat_resend, "poolstat_resend",
338 "Resent pool stats");
339
340 pcb.add_u64(l_osdc_statfs_active, "statfs_active", "Statfs operations");
341 pcb.add_u64_counter(l_osdc_statfs_send, "statfs_send", "Sent FS stats");
342 pcb.add_u64_counter(l_osdc_statfs_resend, "statfs_resend",
343 "Resent FS stats");
344
345 pcb.add_u64(l_osdc_command_active, "command_active", "Active commands");
346 pcb.add_u64_counter(l_osdc_command_send, "command_send",
347 "Sent commands");
348 pcb.add_u64_counter(l_osdc_command_resend, "command_resend",
349 "Resent commands");
350
351 pcb.add_u64(l_osdc_map_epoch, "map_epoch", "OSD map epoch");
352 pcb.add_u64_counter(l_osdc_map_full, "map_full",
353 "Full OSD maps received");
354 pcb.add_u64_counter(l_osdc_map_inc, "map_inc",
355 "Incremental OSD maps received");
356
357 pcb.add_u64(l_osdc_osd_sessions, "osd_sessions",
358 "Open sessions"); // open sessions
359 pcb.add_u64_counter(l_osdc_osd_session_open, "osd_session_open",
360 "Sessions opened");
361 pcb.add_u64_counter(l_osdc_osd_session_close, "osd_session_close",
362 "Sessions closed");
363 pcb.add_u64(l_osdc_osd_laggy, "osd_laggy", "Laggy OSD sessions");
364
365 pcb.add_u64_counter(l_osdc_osdop_omap_wr, "omap_wr",
366 "OSD OMAP write operations");
367 pcb.add_u64_counter(l_osdc_osdop_omap_rd, "omap_rd",
368 "OSD OMAP read operations");
369 pcb.add_u64_counter(l_osdc_osdop_omap_del, "omap_del",
370 "OSD OMAP delete operations");
371
372 logger = pcb.create_perf_counters();
373 cct->get_perfcounters_collection()->add(logger);
374 }
375
376 m_request_state_hook = new RequestStateHook(this);
f67539c2 377 auto admin_socket = cct->get_admin_socket();
7c673cae 378 int ret = admin_socket->register_command("objecter_requests",
7c673cae
FG
379 m_request_state_hook,
380 "show in-progress osd requests");
381
382 /* Don't warn on EEXIST, happens if multiple ceph clients
383 * are instantiated from one process */
384 if (ret < 0 && ret != -EEXIST) {
385 lderr(cct) << "error registering admin socket command: "
386 << cpp_strerror(ret) << dendl;
387 }
388
389 update_crush_location();
390
11fdf7f2 391 cct->_conf.add_observer(this);
7c673cae 392
31f18b77 393 initialized = true;
7c673cae
FG
394}
395
396/*
397 * ok, cluster interaction can happen
398 */
399void Objecter::start(const OSDMap* o)
400{
401 shared_lock rl(rwlock);
402
403 start_tick();
404 if (o) {
405 osdmap->deepish_copy_from(*o);
9f95a23c 406 prune_pg_mapping(osdmap->get_pools());
7c673cae
FG
407 } else if (osdmap->get_epoch() == 0) {
408 _maybe_request_map();
409 }
410}
411
412void Objecter::shutdown()
413{
11fdf7f2 414 ceph_assert(initialized);
7c673cae
FG
415
416 unique_lock wl(rwlock);
417
31f18b77 418 initialized = false;
7c673cae 419
f64942e4 420 wl.unlock();
11fdf7f2 421 cct->_conf.remove_observer(this);
f64942e4 422 wl.lock();
7c673cae 423
7c673cae 424 while (!osd_sessions.empty()) {
f67539c2 425 auto p = osd_sessions.begin();
7c673cae
FG
426 close_session(p->second);
427 }
428
429 while(!check_latest_map_lingers.empty()) {
f67539c2 430 auto i = check_latest_map_lingers.begin();
7c673cae
FG
431 i->second->put();
432 check_latest_map_lingers.erase(i->first);
433 }
434
435 while(!check_latest_map_ops.empty()) {
f67539c2 436 auto i = check_latest_map_ops.begin();
7c673cae
FG
437 i->second->put();
438 check_latest_map_ops.erase(i->first);
439 }
440
441 while(!check_latest_map_commands.empty()) {
f67539c2 442 auto i = check_latest_map_commands.begin();
7c673cae
FG
443 i->second->put();
444 check_latest_map_commands.erase(i->first);
445 }
446
447 while(!poolstat_ops.empty()) {
f67539c2 448 auto i = poolstat_ops.begin();
7c673cae
FG
449 delete i->second;
450 poolstat_ops.erase(i->first);
451 }
452
453 while(!statfs_ops.empty()) {
f67539c2 454 auto i = statfs_ops.begin();
7c673cae
FG
455 delete i->second;
456 statfs_ops.erase(i->first);
457 }
458
459 while(!pool_ops.empty()) {
f67539c2 460 auto i = pool_ops.begin();
7c673cae
FG
461 delete i->second;
462 pool_ops.erase(i->first);
463 }
464
465 ldout(cct, 20) << __func__ << " clearing up homeless session..." << dendl;
466 while(!homeless_session->linger_ops.empty()) {
f67539c2 467 auto i = homeless_session->linger_ops.begin();
7c673cae
FG
468 ldout(cct, 10) << " linger_op " << i->first << dendl;
469 LingerOp *lop = i->second;
470 {
f67539c2 471 std::unique_lock swl(homeless_session->lock);
7c673cae
FG
472 _session_linger_op_remove(homeless_session, lop);
473 }
474 linger_ops.erase(lop->linger_id);
475 linger_ops_set.erase(lop);
476 lop->put();
477 }
478
479 while(!homeless_session->ops.empty()) {
f67539c2 480 auto i = homeless_session->ops.begin();
7c673cae 481 ldout(cct, 10) << " op " << i->first << dendl;
f67539c2 482 auto op = i->second;
7c673cae 483 {
f67539c2 484 std::unique_lock swl(homeless_session->lock);
7c673cae
FG
485 _session_op_remove(homeless_session, op);
486 }
487 op->put();
488 }
489
490 while(!homeless_session->command_ops.empty()) {
f67539c2 491 auto i = homeless_session->command_ops.begin();
7c673cae 492 ldout(cct, 10) << " command_op " << i->first << dendl;
f67539c2 493 auto cop = i->second;
7c673cae 494 {
f67539c2 495 std::unique_lock swl(homeless_session->lock);
7c673cae
FG
496 _session_command_op_remove(homeless_session, cop);
497 }
498 cop->put();
499 }
500
501 if (tick_event) {
502 if (timer.cancel_event(tick_event)) {
503 ldout(cct, 10) << " successfully canceled tick" << dendl;
504 }
505 tick_event = 0;
506 }
507
508 if (logger) {
509 cct->get_perfcounters_collection()->remove(logger);
510 delete logger;
511 logger = NULL;
512 }
513
514 // Let go of Objecter write lock so timer thread can shutdown
515 wl.unlock();
516
517 // Outside of lock to avoid cycle WRT calls to RequestStateHook
518 // This is safe because we guarantee no concurrent calls to
519 // shutdown() with the ::initialized check at start.
520 if (m_request_state_hook) {
f67539c2 521 auto admin_socket = cct->get_admin_socket();
9f95a23c 522 admin_socket->unregister_commands(m_request_state_hook);
7c673cae
FG
523 delete m_request_state_hook;
524 m_request_state_hook = NULL;
525 }
526}
527
528void Objecter::_send_linger(LingerOp *info,
f67539c2 529 ceph::shunique_lock<ceph::shared_mutex>& sul)
7c673cae 530{
11fdf7f2 531 ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
7c673cae 532
f67539c2
TL
533 fu2::unique_function<Op::OpSig> oncommit;
534 osdc_opvec opv;
535 std::shared_lock watchl(info->watch_lock);
536 cb::list *poutbl = nullptr;
7c673cae
FG
537 if (info->registered && info->is_watch) {
538 ldout(cct, 15) << "send_linger " << info->linger_id << " reconnect"
539 << dendl;
540 opv.push_back(OSDOp());
541 opv.back().op.op = CEPH_OSD_OP_WATCH;
542 opv.back().op.watch.cookie = info->get_cookie();
543 opv.back().op.watch.op = CEPH_OSD_WATCH_OP_RECONNECT;
544 opv.back().op.watch.gen = ++info->register_gen;
f67539c2 545 oncommit = CB_Linger_Reconnect(this, info);
7c673cae
FG
546 } else {
547 ldout(cct, 15) << "send_linger " << info->linger_id << " register"
548 << dendl;
549 opv = info->ops;
f67539c2
TL
550 // TODO Augment ca::Completion with an equivalent of
551 // target so we can handle these cases better.
552 auto c = std::make_unique<CB_Linger_Commit>(this, info);
7c673cae
FG
553 if (!info->is_watch) {
554 info->notify_id = 0;
555 poutbl = &c->outbl;
556 }
f67539c2
TL
557 oncommit = [c = std::move(c)](bs::error_code ec) mutable {
558 std::move(*c)(ec);
559 };
7c673cae
FG
560 }
561 watchl.unlock();
f67539c2
TL
562 auto o = new Op(info->target.base_oid, info->target.base_oloc,
563 std::move(opv), info->target.flags | CEPH_OSD_FLAG_READ,
564 std::move(oncommit), info->pobjver);
7c673cae
FG
565 o->outbl = poutbl;
566 o->snapid = info->snap;
567 o->snapc = info->snapc;
568 o->mtime = info->mtime;
569
570 o->target = info->target;
31f18b77 571 o->tid = ++last_tid;
7c673cae
FG
572
573 // do not resend this; we will send a new op to reregister
574 o->should_resend = false;
11fdf7f2 575 o->ctx_budgeted = true;
7c673cae
FG
576
577 if (info->register_tid) {
11fdf7f2 578 // repeat send. cancel old registration op, if any.
f67539c2 579 std::unique_lock sl(info->session->lock);
7c673cae 580 if (info->session->ops.count(info->register_tid)) {
f67539c2 581 auto o = info->session->ops[info->register_tid];
7c673cae
FG
582 _op_cancel_map_check(o);
583 _cancel_linger_op(o);
584 }
585 sl.unlock();
7c673cae
FG
586 }
587
11fdf7f2
TL
588 _op_submit_with_budget(o, sul, &info->register_tid, &info->ctx_budget);
589
7c673cae
FG
590 logger->inc(l_osdc_linger_send);
591}
592
f67539c2
TL
593void Objecter::_linger_commit(LingerOp *info, bs::error_code ec,
594 cb::list& outbl)
7c673cae 595{
f67539c2 596 std::unique_lock wl(info->watch_lock);
7c673cae
FG
597 ldout(cct, 10) << "_linger_commit " << info->linger_id << dendl;
598 if (info->on_reg_commit) {
f67539c2
TL
599 info->on_reg_commit->defer(std::move(info->on_reg_commit),
600 ec, cb::list{});
601 info->on_reg_commit.reset();
7c673cae 602 }
f67539c2
TL
603 if (ec && info->on_notify_finish) {
604 info->on_notify_finish->defer(std::move(info->on_notify_finish),
605 ec, cb::list{});
606 info->on_notify_finish.reset();
28e407b8 607 }
7c673cae
FG
608
609 // only tell the user the first time we do this
610 info->registered = true;
611 info->pobjver = NULL;
612
613 if (!info->is_watch) {
614 // make note of the notify_id
11fdf7f2 615 auto p = outbl.cbegin();
7c673cae 616 try {
11fdf7f2 617 decode(info->notify_id, p);
7c673cae
FG
618 ldout(cct, 10) << "_linger_commit notify_id=" << info->notify_id
619 << dendl;
620 }
f67539c2 621 catch (cb::error& e) {
7c673cae
FG
622 }
623 }
624}
625
f67539c2 626class CB_DoWatchError {
7c673cae 627 Objecter *objecter;
f67539c2
TL
628 boost::intrusive_ptr<Objecter::LingerOp> info;
629 bs::error_code ec;
630public:
631 CB_DoWatchError(Objecter *o, Objecter::LingerOp *i,
632 bs::error_code ec)
633 : objecter(o), info(i), ec(ec) {
7c673cae
FG
634 info->_queued_async();
635 }
f67539c2
TL
636 void operator()() {
637 std::unique_lock wl(objecter->rwlock);
7c673cae
FG
638 bool canceled = info->canceled;
639 wl.unlock();
640
641 if (!canceled) {
f67539c2 642 info->handle(ec, 0, info->get_cookie(), 0, {});
7c673cae
FG
643 }
644
645 info->finished_async();
7c673cae
FG
646 }
647};
648
f67539c2 649bs::error_code Objecter::_normalize_watch_error(bs::error_code ec)
7c673cae
FG
650{
651 // translate ENOENT -> ENOTCONN so that a delete->disconnection
11fdf7f2 652 // notification and a failure to reconnect because we raced with
7c673cae 653 // the delete appear the same to the user.
f67539c2
TL
654 if (ec == bs::errc::no_such_file_or_directory)
655 ec = bs::error_code(ENOTCONN, osd_category());
656 return ec;
7c673cae
FG
657}
658
f67539c2 659void Objecter::_linger_reconnect(LingerOp *info, bs::error_code ec)
7c673cae 660{
f67539c2 661 ldout(cct, 10) << __func__ << " " << info->linger_id << " = " << ec
7c673cae 662 << " (last_error " << info->last_error << ")" << dendl;
f67539c2
TL
663 std::unique_lock wl(info->watch_lock);
664 if (ec) {
7c673cae 665 if (!info->last_error) {
f67539c2
TL
666 ec = _normalize_watch_error(ec);
667 if (info->handle) {
668 boost::asio::defer(finish_strand, CB_DoWatchError(this, info, ec));
7c673cae
FG
669 }
670 }
7c673cae 671 }
f67539c2
TL
672
673 info->last_error = ec;
7c673cae
FG
674}
675
676void Objecter::_send_linger_ping(LingerOp *info)
677{
678 // rwlock is locked unique
679 // info->session->lock is locked
680
681 if (cct->_conf->objecter_inject_no_watch_ping) {
682 ldout(cct, 10) << __func__ << " " << info->linger_id << " SKIPPING"
683 << dendl;
684 return;
685 }
686 if (osdmap->test_flag(CEPH_OSDMAP_PAUSERD)) {
687 ldout(cct, 10) << __func__ << " PAUSERD" << dendl;
688 return;
689 }
690
11fdf7f2 691 ceph::coarse_mono_time now = ceph::coarse_mono_clock::now();
7c673cae
FG
692 ldout(cct, 10) << __func__ << " " << info->linger_id << " now " << now
693 << dendl;
694
f67539c2 695 osdc_opvec opv(1);
7c673cae
FG
696 opv[0].op.op = CEPH_OSD_OP_WATCH;
697 opv[0].op.watch.cookie = info->get_cookie();
698 opv[0].op.watch.op = CEPH_OSD_WATCH_OP_PING;
699 opv[0].op.watch.gen = info->register_gen;
f67539c2 700
7c673cae 701 Op *o = new Op(info->target.base_oid, info->target.base_oloc,
f67539c2
TL
702 std::move(opv), info->target.flags | CEPH_OSD_FLAG_READ,
703 CB_Linger_Ping(this, info, now),
704 nullptr, nullptr);
7c673cae
FG
705 o->target = info->target;
706 o->should_resend = false;
707 _send_op_account(o);
31f18b77 708 o->tid = ++last_tid;
7c673cae 709 _session_op_assign(info->session, o);
11fdf7f2 710 _send_op(o);
7c673cae
FG
711 info->ping_tid = o->tid;
712
7c673cae
FG
713 logger->inc(l_osdc_linger_ping);
714}
715
f67539c2 716void Objecter::_linger_ping(LingerOp *info, bs::error_code ec, ceph::coarse_mono_time sent,
7c673cae
FG
717 uint32_t register_gen)
718{
f67539c2 719 std::unique_lock l(info->watch_lock);
7c673cae 720 ldout(cct, 10) << __func__ << " " << info->linger_id
f67539c2 721 << " sent " << sent << " gen " << register_gen << " = " << ec
7c673cae
FG
722 << " (last_error " << info->last_error
723 << " register_gen " << info->register_gen << ")" << dendl;
724 if (info->register_gen == register_gen) {
f67539c2 725 if (!ec) {
7c673cae 726 info->watch_valid_thru = sent;
f67539c2
TL
727 } else if (ec && !info->last_error) {
728 ec = _normalize_watch_error(ec);
729 info->last_error = ec;
730 if (info->handle) {
731 boost::asio::defer(finish_strand, CB_DoWatchError(this, info, ec));
7c673cae
FG
732 }
733 }
734 } else {
735 ldout(cct, 20) << " ignoring old gen" << dendl;
736 }
737}
738
f67539c2
TL
739tl::expected<ceph::timespan,
740 bs::error_code> Objecter::linger_check(LingerOp *info)
7c673cae 741{
f67539c2 742 std::shared_lock l(info->watch_lock);
7c673cae 743
11fdf7f2 744 ceph::coarse_mono_time stamp = info->watch_valid_thru;
7c673cae 745 if (!info->watch_pending_async.empty())
11fdf7f2
TL
746 stamp = std::min(info->watch_valid_thru, info->watch_pending_async.front());
747 auto age = ceph::coarse_mono_clock::now() - stamp;
7c673cae
FG
748
749 ldout(cct, 10) << __func__ << " " << info->linger_id
750 << " err " << info->last_error
751 << " age " << age << dendl;
752 if (info->last_error)
f67539c2 753 return tl::unexpected(info->last_error);
7c673cae 754 // return a safe upper bound (we are truncating to ms)
f67539c2 755 return age;
7c673cae
FG
756}
757
758void Objecter::linger_cancel(LingerOp *info)
759{
760 unique_lock wl(rwlock);
761 _linger_cancel(info);
762 info->put();
763}
764
765void Objecter::_linger_cancel(LingerOp *info)
766{
767 // rwlock is locked unique
768 ldout(cct, 20) << __func__ << " linger_id=" << info->linger_id << dendl;
769 if (!info->canceled) {
770 OSDSession *s = info->session;
f67539c2 771 std::unique_lock sl(s->lock);
7c673cae
FG
772 _session_linger_op_remove(s, info);
773 sl.unlock();
774
775 linger_ops.erase(info->linger_id);
776 linger_ops_set.erase(info);
11fdf7f2 777 ceph_assert(linger_ops.size() == linger_ops_set.size());
7c673cae
FG
778
779 info->canceled = true;
780 info->put();
781
782 logger->dec(l_osdc_linger_active);
783 }
784}
785
786
787
788Objecter::LingerOp *Objecter::linger_register(const object_t& oid,
789 const object_locator_t& oloc,
790 int flags)
791{
f67539c2
TL
792 unique_lock l(rwlock);
793 // Acquire linger ID
794 auto info = new LingerOp(this, ++max_linger_id);
7c673cae
FG
795 info->target.base_oid = oid;
796 info->target.base_oloc = oloc;
797 if (info->target.base_oloc.key == oid)
798 info->target.base_oloc.key.clear();
799 info->target.flags = flags;
11fdf7f2 800 info->watch_valid_thru = ceph::coarse_mono_clock::now();
7c673cae
FG
801 ldout(cct, 10) << __func__ << " info " << info
802 << " linger_id " << info->linger_id
803 << " cookie " << info->get_cookie()
804 << dendl;
805 linger_ops[info->linger_id] = info;
806 linger_ops_set.insert(info);
11fdf7f2 807 ceph_assert(linger_ops.size() == linger_ops_set.size());
7c673cae
FG
808
809 info->get(); // for the caller
810 return info;
811}
812
813ceph_tid_t Objecter::linger_watch(LingerOp *info,
814 ObjectOperation& op,
815 const SnapContext& snapc,
816 real_time mtime,
f67539c2
TL
817 cb::list& inbl,
818 decltype(info->on_reg_commit)&& oncommit,
7c673cae
FG
819 version_t *objver)
820{
821 info->is_watch = true;
822 info->snapc = snapc;
823 info->mtime = mtime;
824 info->target.flags |= CEPH_OSD_FLAG_WRITE;
825 info->ops = op.ops;
826 info->inbl = inbl;
7c673cae 827 info->pobjver = objver;
f67539c2 828 info->on_reg_commit = std::move(oncommit);
7c673cae 829
11fdf7f2
TL
830 info->ctx_budget = take_linger_budget(info);
831
7c673cae
FG
832 shunique_lock sul(rwlock, ceph::acquire_unique);
833 _linger_submit(info, sul);
834 logger->inc(l_osdc_linger_active);
835
f67539c2 836 op.clear();
7c673cae
FG
837 return info->linger_id;
838}
839
840ceph_tid_t Objecter::linger_notify(LingerOp *info,
841 ObjectOperation& op,
f67539c2
TL
842 snapid_t snap, cb::list& inbl,
843 decltype(LingerOp::on_reg_commit)&& onfinish,
7c673cae
FG
844 version_t *objver)
845{
846 info->snap = snap;
847 info->target.flags |= CEPH_OSD_FLAG_READ;
848 info->ops = op.ops;
849 info->inbl = inbl;
7c673cae 850 info->pobjver = objver;
f67539c2 851 info->on_reg_commit = std::move(onfinish);
11fdf7f2 852 info->ctx_budget = take_linger_budget(info);
f67539c2 853
7c673cae
FG
854 shunique_lock sul(rwlock, ceph::acquire_unique);
855 _linger_submit(info, sul);
856 logger->inc(l_osdc_linger_active);
857
f67539c2 858 op.clear();
7c673cae
FG
859 return info->linger_id;
860}
861
f67539c2
TL
862void Objecter::_linger_submit(LingerOp *info,
863 ceph::shunique_lock<ceph::shared_mutex>& sul)
7c673cae 864{
11fdf7f2
TL
865 ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
866 ceph_assert(info->linger_id);
867 ceph_assert(info->ctx_budget != -1); // caller needs to have taken budget already!
7c673cae
FG
868
869 // Populate Op::target
870 OSDSession *s = NULL;
871 _calc_target(&info->target, nullptr);
872
873 // Create LingerOp<->OSDSession relation
874 int r = _get_session(info->target.osd, &s, sul);
11fdf7f2 875 ceph_assert(r == 0);
f67539c2 876 unique_lock sl(s->lock);
7c673cae
FG
877 _session_linger_op_assign(s, info);
878 sl.unlock();
879 put_session(s);
880
881 _send_linger(info, sul);
882}
883
f67539c2 884struct CB_DoWatchNotify {
7c673cae 885 Objecter *objecter;
f67539c2
TL
886 boost::intrusive_ptr<Objecter::LingerOp> info;
887 boost::intrusive_ptr<MWatchNotify> msg;
888 CB_DoWatchNotify(Objecter *o, Objecter::LingerOp *i, MWatchNotify *m)
7c673cae 889 : objecter(o), info(i), msg(m) {
7c673cae 890 info->_queued_async();
7c673cae 891 }
f67539c2
TL
892 void operator()() {
893 objecter->_do_watch_notify(std::move(info), std::move(msg));
7c673cae
FG
894 }
895};
896
897void Objecter::handle_watch_notify(MWatchNotify *m)
898{
899 shared_lock l(rwlock);
31f18b77 900 if (!initialized) {
7c673cae
FG
901 return;
902 }
903
904 LingerOp *info = reinterpret_cast<LingerOp*>(m->cookie);
905 if (linger_ops_set.count(info) == 0) {
906 ldout(cct, 7) << __func__ << " cookie " << m->cookie << " dne" << dendl;
907 return;
908 }
f67539c2 909 std::unique_lock wl(info->watch_lock);
7c673cae
FG
910 if (m->opcode == CEPH_WATCH_EVENT_DISCONNECT) {
911 if (!info->last_error) {
f67539c2
TL
912 info->last_error = bs::error_code(ENOTCONN, osd_category());
913 if (info->handle) {
914 boost::asio::defer(finish_strand, CB_DoWatchError(this, info,
915 info->last_error));
7c673cae
FG
916 }
917 }
918 } else if (!info->is_watch) {
919 // we have CEPH_WATCH_EVENT_NOTIFY_COMPLETE; we can do this inline
920 // since we know the only user (librados) is safe to call in
921 // fast-dispatch context
922 if (info->notify_id &&
923 info->notify_id != m->notify_id) {
924 ldout(cct, 10) << __func__ << " reply notify " << m->notify_id
925 << " != " << info->notify_id << ", ignoring" << dendl;
926 } else if (info->on_notify_finish) {
f67539c2
TL
927 info->on_notify_finish->defer(
928 std::move(info->on_notify_finish),
929 osdcode(m->return_code), std::move(m->get_data()));
7c673cae
FG
930
931 // if we race with reconnect we might get a second notify; only
932 // notify the caller once!
f67539c2 933 info->on_notify_finish = nullptr;
7c673cae
FG
934 }
935 } else {
f67539c2 936 boost::asio::defer(finish_strand, CB_DoWatchNotify(this, info, m));
7c673cae
FG
937 }
938}
939
f67539c2
TL
940void Objecter::_do_watch_notify(boost::intrusive_ptr<LingerOp> info,
941 boost::intrusive_ptr<MWatchNotify> m)
7c673cae
FG
942{
943 ldout(cct, 10) << __func__ << " " << *m << dendl;
944
945 shared_lock l(rwlock);
11fdf7f2 946 ceph_assert(initialized);
7c673cae
FG
947
948 if (info->canceled) {
949 l.unlock();
950 goto out;
951 }
952
953 // notify completion?
11fdf7f2 954 ceph_assert(info->is_watch);
f67539c2 955 ceph_assert(info->handle);
11fdf7f2 956 ceph_assert(m->opcode != CEPH_WATCH_EVENT_DISCONNECT);
7c673cae
FG
957
958 l.unlock();
959
960 switch (m->opcode) {
961 case CEPH_WATCH_EVENT_NOTIFY:
f67539c2 962 info->handle({}, m->notify_id, m->cookie, m->notifier_gid, std::move(m->bl));
7c673cae
FG
963 break;
964 }
965
966 out:
967 info->finished_async();
7c673cae
FG
968}
969
970bool Objecter::ms_dispatch(Message *m)
971{
972 ldout(cct, 10) << __func__ << " " << cct << " " << *m << dendl;
7c673cae
FG
973 switch (m->get_type()) {
974 // these we exlusively handle
975 case CEPH_MSG_OSD_OPREPLY:
976 handle_osd_op_reply(static_cast<MOSDOpReply*>(m));
977 return true;
978
979 case CEPH_MSG_OSD_BACKOFF:
980 handle_osd_backoff(static_cast<MOSDBackoff*>(m));
981 return true;
982
983 case CEPH_MSG_WATCH_NOTIFY:
984 handle_watch_notify(static_cast<MWatchNotify*>(m));
985 m->put();
986 return true;
987
988 case MSG_COMMAND_REPLY:
989 if (m->get_source().type() == CEPH_ENTITY_TYPE_OSD) {
990 handle_command_reply(static_cast<MCommandReply*>(m));
991 return true;
992 } else {
993 return false;
994 }
995
996 case MSG_GETPOOLSTATSREPLY:
997 handle_get_pool_stats_reply(static_cast<MGetPoolStatsReply*>(m));
998 return true;
999
1000 case CEPH_MSG_POOLOP_REPLY:
1001 handle_pool_op_reply(static_cast<MPoolOpReply*>(m));
1002 return true;
1003
1004 case CEPH_MSG_STATFS_REPLY:
1005 handle_fs_stats_reply(static_cast<MStatfsReply*>(m));
1006 return true;
1007
1008 // these we give others a chance to inspect
1009
1010 // MDS, OSD
1011 case CEPH_MSG_OSD_MAP:
1012 handle_osd_map(static_cast<MOSDMap*>(m));
1013 return false;
1014 }
1015 return false;
1016}
1017
11fdf7f2
TL
1018void Objecter::_scan_requests(
1019 OSDSession *s,
1020 bool skipped_map,
1021 bool cluster_full,
1022 map<int64_t, bool> *pool_full_map,
1023 map<ceph_tid_t, Op*>& need_resend,
1024 list<LingerOp*>& need_resend_linger,
1025 map<ceph_tid_t, CommandOp*>& need_resend_command,
f67539c2 1026 ceph::shunique_lock<ceph::shared_mutex>& sul)
7c673cae 1027{
11fdf7f2 1028 ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
7c673cae
FG
1029
1030 list<LingerOp*> unregister_lingers;
1031
f67539c2 1032 std::unique_lock sl(s->lock);
7c673cae
FG
1033
1034 // check for changed linger mappings (_before_ regular ops)
f67539c2 1035 auto lp = s->linger_ops.begin();
7c673cae 1036 while (lp != s->linger_ops.end()) {
f67539c2 1037 auto op = lp->second;
11fdf7f2 1038 ceph_assert(op->session == s);
7c673cae
FG
1039 // check_linger_pool_dne() may touch linger_ops; prevent iterator
1040 // invalidation
1041 ++lp;
1042 ldout(cct, 10) << " checking linger op " << op->linger_id << dendl;
1043 bool unregister, force_resend_writes = cluster_full;
1044 int r = _recalc_linger_op_target(op, sul);
1045 if (pool_full_map)
1046 force_resend_writes = force_resend_writes ||
1047 (*pool_full_map)[op->target.base_oloc.pool];
1048 switch (r) {
1049 case RECALC_OP_TARGET_NO_ACTION:
11fdf7f2 1050 if (!skipped_map && !force_resend_writes)
7c673cae
FG
1051 break;
1052 // -- fall-thru --
1053 case RECALC_OP_TARGET_NEED_RESEND:
1054 need_resend_linger.push_back(op);
1055 _linger_cancel_map_check(op);
1056 break;
1057 case RECALC_OP_TARGET_POOL_DNE:
1058 _check_linger_pool_dne(op, &unregister);
1059 if (unregister) {
1060 ldout(cct, 10) << " need to unregister linger op "
1061 << op->linger_id << dendl;
1062 op->get();
1063 unregister_lingers.push_back(op);
1064 }
1065 break;
1066 }
1067 }
1068
1069 // check for changed request mappings
f67539c2 1070 auto p = s->ops.begin();
7c673cae
FG
1071 while (p != s->ops.end()) {
1072 Op *op = p->second;
1073 ++p; // check_op_pool_dne() may touch ops; prevent iterator invalidation
1074 ldout(cct, 10) << " checking op " << op->tid << dendl;
11fdf7f2 1075 _prune_snapc(osdmap->get_new_removed_snaps(), op);
7c673cae
FG
1076 bool force_resend_writes = cluster_full;
1077 if (pool_full_map)
1078 force_resend_writes = force_resend_writes ||
1079 (*pool_full_map)[op->target.base_oloc.pool];
1080 int r = _calc_target(&op->target,
1081 op->session ? op->session->con.get() : nullptr);
1082 switch (r) {
1083 case RECALC_OP_TARGET_NO_ACTION:
9f95a23c 1084 if (!skipped_map && !(force_resend_writes && op->target.respects_full()))
7c673cae
FG
1085 break;
1086 // -- fall-thru --
1087 case RECALC_OP_TARGET_NEED_RESEND:
11fdf7f2 1088 _session_op_remove(op->session, op);
7c673cae
FG
1089 need_resend[op->tid] = op;
1090 _op_cancel_map_check(op);
1091 break;
1092 case RECALC_OP_TARGET_POOL_DNE:
1093 _check_op_pool_dne(op, &sl);
1094 break;
1095 }
1096 }
1097
1098 // commands
f67539c2 1099 auto cp = s->command_ops.begin();
7c673cae 1100 while (cp != s->command_ops.end()) {
f67539c2 1101 auto c = cp->second;
7c673cae
FG
1102 ++cp;
1103 ldout(cct, 10) << " checking command " << c->tid << dendl;
1104 bool force_resend_writes = cluster_full;
1105 if (pool_full_map)
1106 force_resend_writes = force_resend_writes ||
1107 (*pool_full_map)[c->target_pg.pool()];
1108 int r = _calc_command_target(c, sul);
1109 switch (r) {
1110 case RECALC_OP_TARGET_NO_ACTION:
1111 // resend if skipped map; otherwise do nothing.
11fdf7f2 1112 if (!skipped_map && !force_resend_writes)
7c673cae
FG
1113 break;
1114 // -- fall-thru --
1115 case RECALC_OP_TARGET_NEED_RESEND:
1116 need_resend_command[c->tid] = c;
11fdf7f2 1117 _session_command_op_remove(c->session, c);
7c673cae
FG
1118 _command_cancel_map_check(c);
1119 break;
1120 case RECALC_OP_TARGET_POOL_DNE:
1121 case RECALC_OP_TARGET_OSD_DNE:
1122 case RECALC_OP_TARGET_OSD_DOWN:
1123 _check_command_map_dne(c);
1124 break;
1125 }
1126 }
1127
1128 sl.unlock();
1129
f67539c2 1130 for (auto iter = unregister_lingers.begin();
7c673cae
FG
1131 iter != unregister_lingers.end();
1132 ++iter) {
1133 _linger_cancel(*iter);
1134 (*iter)->put();
1135 }
1136}
1137
1138void Objecter::handle_osd_map(MOSDMap *m)
1139{
f67539c2 1140 ceph::shunique_lock sul(rwlock, acquire_unique);
31f18b77 1141 if (!initialized)
7c673cae
FG
1142 return;
1143
11fdf7f2 1144 ceph_assert(osdmap);
7c673cae
FG
1145
1146 if (m->fsid != monc->get_fsid()) {
1147 ldout(cct, 0) << "handle_osd_map fsid " << m->fsid
1148 << " != " << monc->get_fsid() << dendl;
1149 return;
1150 }
1151
1152 bool was_pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
1153 bool cluster_full = _osdmap_full_flag();
1154 bool was_pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || cluster_full ||
1155 _osdmap_has_pool_full();
1156 map<int64_t, bool> pool_full_map;
f67539c2 1157 for (auto it = osdmap->get_pools().begin();
7c673cae
FG
1158 it != osdmap->get_pools().end(); ++it)
1159 pool_full_map[it->first] = _osdmap_pool_full(it->second);
1160
1161
1162 list<LingerOp*> need_resend_linger;
1163 map<ceph_tid_t, Op*> need_resend;
1164 map<ceph_tid_t, CommandOp*> need_resend_command;
1165
1166 if (m->get_last() <= osdmap->get_epoch()) {
1167 ldout(cct, 3) << "handle_osd_map ignoring epochs ["
1168 << m->get_first() << "," << m->get_last()
1169 << "] <= " << osdmap->get_epoch() << dendl;
1170 } else {
1171 ldout(cct, 3) << "handle_osd_map got epochs ["
1172 << m->get_first() << "," << m->get_last()
1173 << "] > " << osdmap->get_epoch() << dendl;
1174
1175 if (osdmap->get_epoch()) {
1176 bool skipped_map = false;
1177 // we want incrementals
1178 for (epoch_t e = osdmap->get_epoch() + 1;
1179 e <= m->get_last();
1180 e++) {
1181
1182 if (osdmap->get_epoch() == e-1 &&
1183 m->incremental_maps.count(e)) {
1184 ldout(cct, 3) << "handle_osd_map decoding incremental epoch " << e
1185 << dendl;
1186 OSDMap::Incremental inc(m->incremental_maps[e]);
1187 osdmap->apply_incremental(inc);
31f18b77 1188
f67539c2 1189 emit_blocklist_events(inc);
31f18b77 1190
7c673cae
FG
1191 logger->inc(l_osdc_map_inc);
1192 }
1193 else if (m->maps.count(e)) {
1194 ldout(cct, 3) << "handle_osd_map decoding full epoch " << e << dendl;
9f95a23c 1195 auto new_osdmap = std::make_unique<OSDMap>();
31f18b77
FG
1196 new_osdmap->decode(m->maps[e]);
1197
f67539c2 1198 emit_blocklist_events(*osdmap, *new_osdmap);
9f95a23c 1199 osdmap = std::move(new_osdmap);
31f18b77 1200
7c673cae
FG
1201 logger->inc(l_osdc_map_full);
1202 }
1203 else {
1204 if (e >= m->get_oldest()) {
1205 ldout(cct, 3) << "handle_osd_map requesting missing epoch "
1206 << osdmap->get_epoch()+1 << dendl;
1207 _maybe_request_map();
1208 break;
1209 }
1210 ldout(cct, 3) << "handle_osd_map missing epoch "
1211 << osdmap->get_epoch()+1
1212 << ", jumping to " << m->get_oldest() << dendl;
1213 e = m->get_oldest() - 1;
1214 skipped_map = true;
1215 continue;
1216 }
1217 logger->set(l_osdc_map_epoch, osdmap->get_epoch());
1218
9f95a23c 1219 prune_pg_mapping(osdmap->get_pools());
7c673cae
FG
1220 cluster_full = cluster_full || _osdmap_full_flag();
1221 update_pool_full_map(pool_full_map);
1222
1223 // check all outstanding requests on every epoch
11fdf7f2
TL
1224 for (auto& i : need_resend) {
1225 _prune_snapc(osdmap->get_new_removed_snaps(), i.second);
11fdf7f2 1226 }
7c673cae
FG
1227 _scan_requests(homeless_session, skipped_map, cluster_full,
1228 &pool_full_map, need_resend,
9f95a23c 1229 need_resend_linger, need_resend_command, sul);
f67539c2 1230 for (auto p = osd_sessions.begin();
7c673cae 1231 p != osd_sessions.end(); ) {
f67539c2 1232 auto s = p->second;
7c673cae
FG
1233 _scan_requests(s, skipped_map, cluster_full,
1234 &pool_full_map, need_resend,
9f95a23c 1235 need_resend_linger, need_resend_command, sul);
7c673cae
FG
1236 ++p;
1237 // osd down or addr change?
1238 if (!osdmap->is_up(s->osd) ||
1239 (s->con &&
11fdf7f2 1240 s->con->get_peer_addrs() != osdmap->get_addrs(s->osd))) {
7c673cae
FG
1241 close_session(s);
1242 }
1243 }
1244
11fdf7f2 1245 ceph_assert(e == osdmap->get_epoch());
7c673cae
FG
1246 }
1247
1248 } else {
1249 // first map. we want the full thing.
1250 if (m->maps.count(m->get_last())) {
f67539c2 1251 for (auto p = osd_sessions.begin();
7c673cae
FG
1252 p != osd_sessions.end(); ++p) {
1253 OSDSession *s = p->second;
1254 _scan_requests(s, false, false, NULL, need_resend,
9f95a23c 1255 need_resend_linger, need_resend_command, sul);
7c673cae
FG
1256 }
1257 ldout(cct, 3) << "handle_osd_map decoding full epoch "
1258 << m->get_last() << dendl;
1259 osdmap->decode(m->maps[m->get_last()]);
9f95a23c 1260 prune_pg_mapping(osdmap->get_pools());
7c673cae
FG
1261
1262 _scan_requests(homeless_session, false, false, NULL,
1263 need_resend, need_resend_linger,
9f95a23c 1264 need_resend_command, sul);
7c673cae
FG
1265 } else {
1266 ldout(cct, 3) << "handle_osd_map hmm, i want a full map, requesting"
1267 << dendl;
1268 monc->sub_want("osdmap", 0, CEPH_SUBSCRIBE_ONETIME);
1269 monc->renew_subs();
1270 }
1271 }
1272 }
1273
1274 // make sure need_resend targets reflect latest map
1275 for (auto p = need_resend.begin(); p != need_resend.end(); ) {
1276 Op *op = p->second;
1277 if (op->target.epoch < osdmap->get_epoch()) {
1278 ldout(cct, 10) << __func__ << " checking op " << p->first << dendl;
1279 int r = _calc_target(&op->target, nullptr);
1280 if (r == RECALC_OP_TARGET_POOL_DNE) {
1281 p = need_resend.erase(p);
1282 _check_op_pool_dne(op, nullptr);
1283 } else {
1284 ++p;
1285 }
1286 } else {
1287 ++p;
1288 }
1289 }
1290
1291 bool pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
1292 bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || _osdmap_full_flag()
1293 || _osdmap_has_pool_full();
1294
1295 // was/is paused?
1296 if (was_pauserd || was_pausewr || pauserd || pausewr ||
1297 osdmap->get_epoch() < epoch_barrier) {
1298 _maybe_request_map();
1299 }
1300
1301 // resend requests
f67539c2 1302 for (auto p = need_resend.begin();
7c673cae 1303 p != need_resend.end(); ++p) {
f67539c2
TL
1304 auto op = p->second;
1305 auto s = op->session;
7c673cae
FG
1306 bool mapped_session = false;
1307 if (!s) {
1308 int r = _map_session(&op->target, &s, sul);
11fdf7f2 1309 ceph_assert(r == 0);
7c673cae
FG
1310 mapped_session = true;
1311 } else {
1312 get_session(s);
1313 }
f67539c2 1314 std::unique_lock sl(s->lock);
7c673cae
FG
1315 if (mapped_session) {
1316 _session_op_assign(s, op);
1317 }
1318 if (op->should_resend) {
1319 if (!op->session->is_homeless() && !op->target.paused) {
1320 logger->inc(l_osdc_op_resend);
1321 _send_op(op);
1322 }
1323 } else {
1324 _op_cancel_map_check(op);
1325 _cancel_linger_op(op);
1326 }
1327 sl.unlock();
1328 put_session(s);
1329 }
f67539c2 1330 for (auto p = need_resend_linger.begin();
7c673cae
FG
1331 p != need_resend_linger.end(); ++p) {
1332 LingerOp *op = *p;
11fdf7f2 1333 ceph_assert(op->session);
7c673cae
FG
1334 if (!op->session->is_homeless()) {
1335 logger->inc(l_osdc_linger_resend);
1336 _send_linger(op, sul);
1337 }
1338 }
f67539c2 1339 for (auto p = need_resend_command.begin();
7c673cae 1340 p != need_resend_command.end(); ++p) {
f67539c2 1341 auto c = p->second;
7c673cae
FG
1342 if (c->target.osd >= 0) {
1343 _assign_command_session(c, sul);
1344 if (c->session && !c->session->is_homeless()) {
1345 _send_command(c);
1346 }
1347 }
1348 }
1349
1350 _dump_active();
1351
1352 // finish any Contexts that were waiting on a map update
f67539c2 1353 auto p = waiting_for_map.begin();
7c673cae
FG
1354 while (p != waiting_for_map.end() &&
1355 p->first <= osdmap->get_epoch()) {
1356 //go through the list and call the onfinish methods
f67539c2
TL
1357 for (auto& [c, ec] : p->second) {
1358 ca::post(std::move(c), ec);
7c673cae
FG
1359 }
1360 waiting_for_map.erase(p++);
1361 }
1362
1363 monc->sub_got("osdmap", osdmap->get_epoch());
1364
1365 if (!waiting_for_map.empty()) {
1366 _maybe_request_map();
1367 }
1368}
1369
f67539c2 1370void Objecter::enable_blocklist_events()
31f18b77
FG
1371{
1372 unique_lock wl(rwlock);
1373
f67539c2 1374 blocklist_events_enabled = true;
31f18b77
FG
1375}
1376
f67539c2 1377void Objecter::consume_blocklist_events(std::set<entity_addr_t> *events)
31f18b77
FG
1378{
1379 unique_lock wl(rwlock);
1380
1381 if (events->empty()) {
f67539c2 1382 events->swap(blocklist_events);
31f18b77 1383 } else {
f67539c2 1384 for (const auto &i : blocklist_events) {
31f18b77
FG
1385 events->insert(i);
1386 }
f67539c2 1387 blocklist_events.clear();
31f18b77
FG
1388 }
1389}
1390
f67539c2 1391void Objecter::emit_blocklist_events(const OSDMap::Incremental &inc)
31f18b77 1392{
f67539c2 1393 if (!blocklist_events_enabled) {
31f18b77
FG
1394 return;
1395 }
1396
f67539c2
TL
1397 for (const auto &i : inc.new_blocklist) {
1398 blocklist_events.insert(i.first);
31f18b77
FG
1399 }
1400}
1401
f67539c2 1402void Objecter::emit_blocklist_events(const OSDMap &old_osd_map,
31f18b77
FG
1403 const OSDMap &new_osd_map)
1404{
f67539c2 1405 if (!blocklist_events_enabled) {
31f18b77
FG
1406 return;
1407 }
1408
1409 std::set<entity_addr_t> old_set;
1410 std::set<entity_addr_t> new_set;
1411
f67539c2
TL
1412 old_osd_map.get_blocklist(&old_set);
1413 new_osd_map.get_blocklist(&new_set);
31f18b77
FG
1414
1415 std::set<entity_addr_t> delta_set;
1416 std::set_difference(
1417 new_set.begin(), new_set.end(), old_set.begin(), old_set.end(),
1418 std::inserter(delta_set, delta_set.begin()));
f67539c2 1419 blocklist_events.insert(delta_set.begin(), delta_set.end());
31f18b77
FG
1420}
1421
7c673cae
FG
1422// op pool check
1423
f67539c2
TL
1424void Objecter::CB_Op_Map_Latest::operator()(bs::error_code e,
1425 version_t latest, version_t)
7c673cae 1426{
f67539c2
TL
1427 if (e == bs::errc::resource_unavailable_try_again ||
1428 e == bs::errc::operation_canceled)
7c673cae
FG
1429 return;
1430
1431 lgeneric_subdout(objecter->cct, objecter, 10)
f67539c2 1432 << "op_map_latest r=" << e << " tid=" << tid
7c673cae
FG
1433 << " latest " << latest << dendl;
1434
f67539c2 1435 unique_lock wl(objecter->rwlock);
7c673cae 1436
f67539c2 1437 auto iter = objecter->check_latest_map_ops.find(tid);
7c673cae
FG
1438 if (iter == objecter->check_latest_map_ops.end()) {
1439 lgeneric_subdout(objecter->cct, objecter, 10)
1440 << "op_map_latest op "<< tid << " not found" << dendl;
1441 return;
1442 }
1443
1444 Op *op = iter->second;
1445 objecter->check_latest_map_ops.erase(iter);
1446
1447 lgeneric_subdout(objecter->cct, objecter, 20)
1448 << "op_map_latest op "<< op << dendl;
1449
1450 if (op->map_dne_bound == 0)
1451 op->map_dne_bound = latest;
1452
f67539c2 1453 unique_lock sl(op->session->lock, defer_lock);
7c673cae
FG
1454 objecter->_check_op_pool_dne(op, &sl);
1455
1456 op->put();
1457}
1458
1459int Objecter::pool_snap_by_name(int64_t poolid, const char *snap_name,
1460 snapid_t *snap) const
1461{
1462 shared_lock rl(rwlock);
1463
1464 auto& pools = osdmap->get_pools();
1465 auto iter = pools.find(poolid);
1466 if (iter == pools.end()) {
1467 return -ENOENT;
1468 }
1469 const pg_pool_t& pg_pool = iter->second;
1470 for (auto p = pg_pool.snaps.begin();
1471 p != pg_pool.snaps.end();
1472 ++p) {
1473 if (p->second.name == snap_name) {
1474 *snap = p->first;
1475 return 0;
1476 }
1477 }
1478 return -ENOENT;
1479}
1480
1481int Objecter::pool_snap_get_info(int64_t poolid, snapid_t snap,
1482 pool_snap_info_t *info) const
1483{
1484 shared_lock rl(rwlock);
1485
1486 auto& pools = osdmap->get_pools();
1487 auto iter = pools.find(poolid);
1488 if (iter == pools.end()) {
1489 return -ENOENT;
1490 }
1491 const pg_pool_t& pg_pool = iter->second;
1492 auto p = pg_pool.snaps.find(snap);
1493 if (p == pg_pool.snaps.end())
1494 return -ENOENT;
1495 *info = p->second;
1496
1497 return 0;
1498}
1499
1500int Objecter::pool_snap_list(int64_t poolid, vector<uint64_t> *snaps)
1501{
1502 shared_lock rl(rwlock);
1503
1504 const pg_pool_t *pi = osdmap->get_pg_pool(poolid);
1505 if (!pi)
1506 return -ENOENT;
f67539c2 1507 for (auto p = pi->snaps.begin();
7c673cae
FG
1508 p != pi->snaps.end();
1509 ++p) {
1510 snaps->push_back(p->first);
1511 }
1512 return 0;
1513}
1514
1515// sl may be unlocked.
f67539c2 1516void Objecter::_check_op_pool_dne(Op *op, std::unique_lock<std::shared_mutex> *sl)
7c673cae
FG
1517{
1518 // rwlock is locked unique
1519
1520 if (op->target.pool_ever_existed) {
1521 // the pool previously existed and now it does not, which means it
1522 // was deleted.
1523 op->map_dne_bound = osdmap->get_epoch();
1524 ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
1525 << " pool previously exists but now does not"
1526 << dendl;
1527 } else {
1528 ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
1529 << " current " << osdmap->get_epoch()
1530 << " map_dne_bound " << op->map_dne_bound
1531 << dendl;
1532 }
1533 if (op->map_dne_bound > 0) {
1534 if (osdmap->get_epoch() >= op->map_dne_bound) {
1535 // we had a new enough map
1536 ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
1537 << " concluding pool " << op->target.base_pgid.pool()
1538 << " dne" << dendl;
f67539c2 1539 if (op->has_completion()) {
11fdf7f2 1540 num_in_flight--;
f67539c2 1541 op->complete(osdc_errc::pool_dne, -ENOENT);
7c673cae
FG
1542 }
1543
1544 OSDSession *s = op->session;
1545 if (s) {
11fdf7f2
TL
1546 ceph_assert(s != NULL);
1547 ceph_assert(sl->mutex() == &s->lock);
7c673cae
FG
1548 bool session_locked = sl->owns_lock();
1549 if (!session_locked) {
1550 sl->lock();
1551 }
1552 _finish_op(op, 0);
1553 if (!session_locked) {
1554 sl->unlock();
1555 }
1556 } else {
1557 _finish_op(op, 0); // no session
1558 }
1559 }
1560 } else {
1561 _send_op_map_check(op);
1562 }
1563}
1564
1565void Objecter::_send_op_map_check(Op *op)
1566{
1567 // rwlock is locked unique
1568 // ask the monitor
1569 if (check_latest_map_ops.count(op->tid) == 0) {
1570 op->get();
1571 check_latest_map_ops[op->tid] = op;
f67539c2 1572 monc->get_version("osdmap", CB_Op_Map_Latest(this, op->tid));
7c673cae
FG
1573 }
1574}
1575
1576void Objecter::_op_cancel_map_check(Op *op)
1577{
1578 // rwlock is locked unique
f67539c2 1579 auto iter = check_latest_map_ops.find(op->tid);
7c673cae
FG
1580 if (iter != check_latest_map_ops.end()) {
1581 Op *op = iter->second;
1582 op->put();
1583 check_latest_map_ops.erase(iter);
1584 }
1585}
1586
1587// linger pool check
1588
f67539c2
TL
1589void Objecter::CB_Linger_Map_Latest::operator()(bs::error_code e,
1590 version_t latest,
1591 version_t)
7c673cae 1592{
f67539c2
TL
1593 if (e == bs::errc::resource_unavailable_try_again ||
1594 e == bs::errc::operation_canceled) {
7c673cae
FG
1595 // ignore callback; we will retry in resend_mon_ops()
1596 return;
1597 }
1598
1599 unique_lock wl(objecter->rwlock);
1600
f67539c2 1601 auto iter = objecter->check_latest_map_lingers.find(linger_id);
7c673cae
FG
1602 if (iter == objecter->check_latest_map_lingers.end()) {
1603 return;
1604 }
1605
f67539c2 1606 auto op = iter->second;
7c673cae
FG
1607 objecter->check_latest_map_lingers.erase(iter);
1608
1609 if (op->map_dne_bound == 0)
1610 op->map_dne_bound = latest;
1611
1612 bool unregister;
1613 objecter->_check_linger_pool_dne(op, &unregister);
1614
1615 if (unregister) {
1616 objecter->_linger_cancel(op);
1617 }
1618
1619 op->put();
1620}
1621
1622void Objecter::_check_linger_pool_dne(LingerOp *op, bool *need_unregister)
1623{
1624 // rwlock is locked unique
1625
1626 *need_unregister = false;
1627
1628 if (op->register_gen > 0) {
1629 ldout(cct, 10) << "_check_linger_pool_dne linger_id " << op->linger_id
1630 << " pool previously existed but now does not"
1631 << dendl;
1632 op->map_dne_bound = osdmap->get_epoch();
1633 } else {
1634 ldout(cct, 10) << "_check_linger_pool_dne linger_id " << op->linger_id
1635 << " current " << osdmap->get_epoch()
1636 << " map_dne_bound " << op->map_dne_bound
1637 << dendl;
1638 }
1639 if (op->map_dne_bound > 0) {
1640 if (osdmap->get_epoch() >= op->map_dne_bound) {
f67539c2 1641 std::unique_lock wl{op->watch_lock};
7c673cae 1642 if (op->on_reg_commit) {
f67539c2
TL
1643 op->on_reg_commit->defer(std::move(op->on_reg_commit),
1644 osdc_errc::pool_dne, cb::list{});
28e407b8
AA
1645 op->on_reg_commit = nullptr;
1646 }
1647 if (op->on_notify_finish) {
f67539c2
TL
1648 op->on_notify_finish->defer(std::move(op->on_notify_finish),
1649 osdc_errc::pool_dne, cb::list{});
28e407b8 1650 op->on_notify_finish = nullptr;
7c673cae
FG
1651 }
1652 *need_unregister = true;
1653 }
1654 } else {
1655 _send_linger_map_check(op);
1656 }
1657}
1658
1659void Objecter::_send_linger_map_check(LingerOp *op)
1660{
1661 // ask the monitor
1662 if (check_latest_map_lingers.count(op->linger_id) == 0) {
1663 op->get();
1664 check_latest_map_lingers[op->linger_id] = op;
f67539c2 1665 monc->get_version("osdmap", CB_Linger_Map_Latest(this, op->linger_id));
7c673cae
FG
1666 }
1667}
1668
1669void Objecter::_linger_cancel_map_check(LingerOp *op)
1670{
1671 // rwlock is locked unique
1672
f67539c2 1673 auto iter = check_latest_map_lingers.find(op->linger_id);
7c673cae
FG
1674 if (iter != check_latest_map_lingers.end()) {
1675 LingerOp *op = iter->second;
1676 op->put();
1677 check_latest_map_lingers.erase(iter);
1678 }
1679}
1680
1681// command pool check
1682
f67539c2
TL
1683void Objecter::CB_Command_Map_Latest::operator()(bs::error_code e,
1684 version_t latest, version_t)
7c673cae 1685{
f67539c2
TL
1686 if (e == bs::errc::resource_unavailable_try_again ||
1687 e == bs::errc::operation_canceled) {
7c673cae
FG
1688 // ignore callback; we will retry in resend_mon_ops()
1689 return;
1690 }
1691
1692 unique_lock wl(objecter->rwlock);
1693
f67539c2 1694 auto iter = objecter->check_latest_map_commands.find(tid);
7c673cae
FG
1695 if (iter == objecter->check_latest_map_commands.end()) {
1696 return;
1697 }
1698
f67539c2 1699 auto c = iter->second;
7c673cae
FG
1700 objecter->check_latest_map_commands.erase(iter);
1701
1702 if (c->map_dne_bound == 0)
1703 c->map_dne_bound = latest;
1704
f67539c2 1705 unique_lock sul(c->session->lock);
7c673cae 1706 objecter->_check_command_map_dne(c);
28e407b8 1707 sul.unlock();
7c673cae
FG
1708
1709 c->put();
1710}
1711
1712void Objecter::_check_command_map_dne(CommandOp *c)
1713{
1714 // rwlock is locked unique
28e407b8 1715 // session is locked unique
7c673cae
FG
1716
1717 ldout(cct, 10) << "_check_command_map_dne tid " << c->tid
1718 << " current " << osdmap->get_epoch()
1719 << " map_dne_bound " << c->map_dne_bound
1720 << dendl;
1721 if (c->map_dne_bound > 0) {
1722 if (osdmap->get_epoch() >= c->map_dne_bound) {
f67539c2
TL
1723 _finish_command(c, osdcode(c->map_check_error),
1724 std::move(c->map_check_error_str), {});
7c673cae
FG
1725 }
1726 } else {
1727 _send_command_map_check(c);
1728 }
1729}
1730
1731void Objecter::_send_command_map_check(CommandOp *c)
1732{
1733 // rwlock is locked unique
28e407b8 1734 // session is locked unique
7c673cae
FG
1735
1736 // ask the monitor
1737 if (check_latest_map_commands.count(c->tid) == 0) {
1738 c->get();
1739 check_latest_map_commands[c->tid] = c;
f67539c2 1740 monc->get_version("osdmap", CB_Command_Map_Latest(this, c->tid));
7c673cae
FG
1741 }
1742}
1743
1744void Objecter::_command_cancel_map_check(CommandOp *c)
1745{
1746 // rwlock is locked uniqe
1747
f67539c2 1748 auto iter = check_latest_map_commands.find(c->tid);
7c673cae 1749 if (iter != check_latest_map_commands.end()) {
f67539c2 1750 auto c = iter->second;
7c673cae
FG
1751 c->put();
1752 check_latest_map_commands.erase(iter);
1753 }
1754}
1755
1756
1757/**
1758 * Look up OSDSession by OSD id.
1759 *
1760 * @returns 0 on success, or -EAGAIN if the lock context requires
1761 * promotion to write.
1762 */
f67539c2
TL
1763int Objecter::_get_session(int osd, OSDSession **session,
1764 shunique_lock<ceph::shared_mutex>& sul)
7c673cae 1765{
11fdf7f2 1766 ceph_assert(sul && sul.mutex() == &rwlock);
7c673cae
FG
1767
1768 if (osd < 0) {
1769 *session = homeless_session;
1770 ldout(cct, 20) << __func__ << " osd=" << osd << " returning homeless"
1771 << dendl;
1772 return 0;
1773 }
1774
f67539c2 1775 auto p = osd_sessions.find(osd);
7c673cae 1776 if (p != osd_sessions.end()) {
f67539c2 1777 auto s = p->second;
7c673cae
FG
1778 s->get();
1779 *session = s;
1780 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << osd << " "
1781 << s->get_nref() << dendl;
1782 return 0;
1783 }
1784 if (!sul.owns_lock()) {
1785 return -EAGAIN;
1786 }
f67539c2 1787 auto s = new OSDSession(cct, osd);
7c673cae 1788 osd_sessions[osd] = s;
11fdf7f2
TL
1789 s->con = messenger->connect_to_osd(osdmap->get_addrs(osd));
1790 s->con->set_priv(RefCountedPtr{s});
7c673cae
FG
1791 logger->inc(l_osdc_osd_session_open);
1792 logger->set(l_osdc_osd_sessions, osd_sessions.size());
1793 s->get();
1794 *session = s;
1795 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << osd << " "
1796 << s->get_nref() << dendl;
1797 return 0;
1798}
1799
1800void Objecter::put_session(Objecter::OSDSession *s)
1801{
1802 if (s && !s->is_homeless()) {
1803 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << s->osd << " "
1804 << s->get_nref() << dendl;
1805 s->put();
1806 }
1807}
1808
1809void Objecter::get_session(Objecter::OSDSession *s)
1810{
11fdf7f2 1811 ceph_assert(s != NULL);
7c673cae
FG
1812
1813 if (!s->is_homeless()) {
1814 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << s->osd << " "
1815 << s->get_nref() << dendl;
1816 s->get();
1817 }
1818}
1819
1820void Objecter::_reopen_session(OSDSession *s)
1821{
91327a77 1822 // rwlock is locked unique
7c673cae
FG
1823 // s->lock is locked
1824
11fdf7f2 1825 auto addrs = osdmap->get_addrs(s->osd);
7c673cae 1826 ldout(cct, 10) << "reopen_session osd." << s->osd << " session, addr now "
11fdf7f2 1827 << addrs << dendl;
7c673cae
FG
1828 if (s->con) {
1829 s->con->set_priv(NULL);
1830 s->con->mark_down();
1831 logger->inc(l_osdc_osd_session_close);
1832 }
11fdf7f2
TL
1833 s->con = messenger->connect_to_osd(addrs);
1834 s->con->set_priv(RefCountedPtr{s});
7c673cae
FG
1835 s->incarnation++;
1836 logger->inc(l_osdc_osd_session_open);
1837}
1838
1839void Objecter::close_session(OSDSession *s)
1840{
1841 // rwlock is locked unique
1842
1843 ldout(cct, 10) << "close_session for osd." << s->osd << dendl;
1844 if (s->con) {
1845 s->con->set_priv(NULL);
1846 s->con->mark_down();
1847 logger->inc(l_osdc_osd_session_close);
1848 }
f67539c2 1849 unique_lock sl(s->lock);
7c673cae
FG
1850
1851 std::list<LingerOp*> homeless_lingers;
1852 std::list<CommandOp*> homeless_commands;
1853 std::list<Op*> homeless_ops;
1854
1855 while (!s->linger_ops.empty()) {
f67539c2 1856 auto i = s->linger_ops.begin();
7c673cae
FG
1857 ldout(cct, 10) << " linger_op " << i->first << dendl;
1858 homeless_lingers.push_back(i->second);
1859 _session_linger_op_remove(s, i->second);
1860 }
1861
1862 while (!s->ops.empty()) {
f67539c2 1863 auto i = s->ops.begin();
7c673cae
FG
1864 ldout(cct, 10) << " op " << i->first << dendl;
1865 homeless_ops.push_back(i->second);
1866 _session_op_remove(s, i->second);
1867 }
1868
1869 while (!s->command_ops.empty()) {
f67539c2 1870 auto i = s->command_ops.begin();
7c673cae
FG
1871 ldout(cct, 10) << " command_op " << i->first << dendl;
1872 homeless_commands.push_back(i->second);
1873 _session_command_op_remove(s, i->second);
1874 }
1875
1876 osd_sessions.erase(s->osd);
1877 sl.unlock();
1878 put_session(s);
1879
1880 // Assign any leftover ops to the homeless session
1881 {
f67539c2
TL
1882 unique_lock hsl(homeless_session->lock);
1883 for (auto i = homeless_lingers.begin();
7c673cae
FG
1884 i != homeless_lingers.end(); ++i) {
1885 _session_linger_op_assign(homeless_session, *i);
1886 }
f67539c2 1887 for (auto i = homeless_ops.begin();
7c673cae
FG
1888 i != homeless_ops.end(); ++i) {
1889 _session_op_assign(homeless_session, *i);
1890 }
f67539c2 1891 for (auto i = homeless_commands.begin();
7c673cae
FG
1892 i != homeless_commands.end(); ++i) {
1893 _session_command_op_assign(homeless_session, *i);
1894 }
1895 }
1896
1897 logger->set(l_osdc_osd_sessions, osd_sessions.size());
1898}
1899
9f95a23c 1900void Objecter::wait_for_osd_map(epoch_t e)
7c673cae
FG
1901{
1902 unique_lock l(rwlock);
9f95a23c 1903 if (osdmap->get_epoch() >= e) {
7c673cae
FG
1904 l.unlock();
1905 return;
1906 }
1907
f67539c2
TL
1908 ca::waiter<bs::error_code> w;
1909 waiting_for_map[e].emplace_back(OpCompletion::create(
1910 service.get_executor(),
1911 w.ref()),
1912 bs::error_code{});
7c673cae 1913 l.unlock();
f67539c2 1914 w.wait();
7c673cae
FG
1915}
1916
f67539c2
TL
1917void Objecter::_get_latest_version(epoch_t oldest, epoch_t newest,
1918 std::unique_ptr<OpCompletion> fin,
1919 std::unique_lock<ceph::shared_mutex>&& l)
7c673cae 1920{
f67539c2 1921 ceph_assert(fin);
7c673cae 1922 if (osdmap->get_epoch() >= newest) {
11fdf7f2 1923 ldout(cct, 10) << __func__ << " latest " << newest << ", have it" << dendl;
f67539c2
TL
1924 l.unlock();
1925 ca::defer(std::move(fin), bs::error_code{});
1926 } else {
1927 ldout(cct, 10) << __func__ << " latest " << newest << ", waiting" << dendl;
1928 _wait_for_new_map(std::move(fin), newest, bs::error_code{});
1929 l.unlock();
7c673cae 1930 }
7c673cae
FG
1931}
1932
1933void Objecter::maybe_request_map()
1934{
1935 shared_lock rl(rwlock);
1936 _maybe_request_map();
1937}
1938
1939void Objecter::_maybe_request_map()
1940{
1941 // rwlock is locked
1942 int flag = 0;
1943 if (_osdmap_full_flag()
1944 || osdmap->test_flag(CEPH_OSDMAP_PAUSERD)
1945 || osdmap->test_flag(CEPH_OSDMAP_PAUSEWR)) {
1946 ldout(cct, 10) << "_maybe_request_map subscribing (continuous) to next "
1947 "osd map (FULL flag is set)" << dendl;
1948 } else {
1949 ldout(cct, 10)
1950 << "_maybe_request_map subscribing (onetime) to next osd map" << dendl;
1951 flag = CEPH_SUBSCRIBE_ONETIME;
1952 }
1953 epoch_t epoch = osdmap->get_epoch() ? osdmap->get_epoch()+1 : 0;
1954 if (monc->sub_want("osdmap", epoch, flag)) {
1955 monc->renew_subs();
1956 }
1957}
1958
f67539c2
TL
1959void Objecter::_wait_for_new_map(std::unique_ptr<OpCompletion> c, epoch_t epoch,
1960 bs::error_code ec)
7c673cae
FG
1961{
1962 // rwlock is locked unique
f67539c2 1963 waiting_for_map[epoch].emplace_back(std::move(c), ec);
7c673cae
FG
1964 _maybe_request_map();
1965}
1966
1967
1968/**
1969 * Use this together with wait_for_map: this is a pre-check to avoid
1970 * allocating a Context for wait_for_map if we can see that we
1971 * definitely already have the epoch.
1972 *
1973 * This does *not* replace the need to handle the return value of
1974 * wait_for_map: just because we don't have it in this pre-check
1975 * doesn't mean we won't have it when calling back into wait_for_map,
1976 * since the objecter lock is dropped in between.
1977 */
1978bool Objecter::have_map(const epoch_t epoch)
1979{
1980 shared_lock rl(rwlock);
1981 if (osdmap->get_epoch() >= epoch) {
1982 return true;
1983 } else {
1984 return false;
1985 }
1986}
1987
7c673cae
FG
1988void Objecter::_kick_requests(OSDSession *session,
1989 map<uint64_t, LingerOp *>& lresend)
1990{
1991 // rwlock is locked unique
1992
1993 // clear backoffs
1994 session->backoffs.clear();
1995 session->backoffs_by_id.clear();
1996
1997 // resend ops
1998 map<ceph_tid_t,Op*> resend; // resend in tid order
f67539c2 1999 for (auto p = session->ops.begin(); p != session->ops.end();) {
7c673cae
FG
2000 Op *op = p->second;
2001 ++p;
7c673cae
FG
2002 if (op->should_resend) {
2003 if (!op->target.paused)
2004 resend[op->tid] = op;
2005 } else {
2006 _op_cancel_map_check(op);
2007 _cancel_linger_op(op);
2008 }
2009 }
2010
11fdf7f2 2011 logger->inc(l_osdc_op_resend, resend.size());
7c673cae
FG
2012 while (!resend.empty()) {
2013 _send_op(resend.begin()->second);
2014 resend.erase(resend.begin());
2015 }
2016
2017 // resend lingers
11fdf7f2 2018 logger->inc(l_osdc_linger_resend, session->linger_ops.size());
f67539c2 2019 for (auto j = session->linger_ops.begin();
7c673cae
FG
2020 j != session->linger_ops.end(); ++j) {
2021 LingerOp *op = j->second;
2022 op->get();
11fdf7f2 2023 ceph_assert(lresend.count(j->first) == 0);
7c673cae
FG
2024 lresend[j->first] = op;
2025 }
2026
2027 // resend commands
11fdf7f2 2028 logger->inc(l_osdc_command_resend, session->command_ops.size());
7c673cae 2029 map<uint64_t,CommandOp*> cresend; // resend in order
f67539c2 2030 for (auto k = session->command_ops.begin();
7c673cae 2031 k != session->command_ops.end(); ++k) {
7c673cae
FG
2032 cresend[k->first] = k->second;
2033 }
2034 while (!cresend.empty()) {
2035 _send_command(cresend.begin()->second);
2036 cresend.erase(cresend.begin());
2037 }
2038}
2039
2040void Objecter::_linger_ops_resend(map<uint64_t, LingerOp *>& lresend,
f67539c2 2041 unique_lock<ceph::shared_mutex>& ul)
7c673cae 2042{
11fdf7f2 2043 ceph_assert(ul.owns_lock());
7c673cae
FG
2044 shunique_lock sul(std::move(ul));
2045 while (!lresend.empty()) {
2046 LingerOp *op = lresend.begin()->second;
2047 if (!op->canceled) {
2048 _send_linger(op, sul);
2049 }
2050 op->put();
2051 lresend.erase(lresend.begin());
2052 }
11fdf7f2 2053 ul = sul.release_to_unique();
7c673cae
FG
2054}
2055
2056void Objecter::start_tick()
2057{
11fdf7f2 2058 ceph_assert(tick_event == 0);
7c673cae
FG
2059 tick_event =
2060 timer.add_event(ceph::make_timespan(cct->_conf->objecter_tick_interval),
2061 &Objecter::tick, this);
2062}
2063
2064void Objecter::tick()
2065{
2066 shared_lock rl(rwlock);
2067
2068 ldout(cct, 10) << "tick" << dendl;
2069
2070 // we are only called by C_Tick
2071 tick_event = 0;
2072
31f18b77 2073 if (!initialized) {
7c673cae
FG
2074 // we raced with shutdown
2075 ldout(cct, 10) << __func__ << " raced with shutdown" << dendl;
2076 return;
2077 }
2078
2079 set<OSDSession*> toping;
2080
2081
2082 // look for laggy requests
11fdf7f2 2083 auto cutoff = ceph::coarse_mono_clock::now();
7c673cae
FG
2084 cutoff -= ceph::make_timespan(cct->_conf->objecter_timeout); // timeout
2085
2086 unsigned laggy_ops = 0;
2087
f67539c2 2088 for (auto siter = osd_sessions.begin();
7c673cae 2089 siter != osd_sessions.end(); ++siter) {
f67539c2
TL
2090 auto s = siter->second;
2091 scoped_lock l(s->lock);
7c673cae 2092 bool found = false;
f67539c2
TL
2093 for (auto p = s->ops.begin(); p != s->ops.end(); ++p) {
2094 auto op = p->second;
11fdf7f2 2095 ceph_assert(op->session);
7c673cae
FG
2096 if (op->stamp < cutoff) {
2097 ldout(cct, 2) << " tid " << p->first << " on osd." << op->session->osd
2098 << " is laggy" << dendl;
2099 found = true;
2100 ++laggy_ops;
2101 }
2102 }
f67539c2 2103 for (auto p = s->linger_ops.begin();
7c673cae
FG
2104 p != s->linger_ops.end();
2105 ++p) {
f67539c2
TL
2106 auto op = p->second;
2107 std::unique_lock wl(op->watch_lock);
11fdf7f2 2108 ceph_assert(op->session);
7c673cae
FG
2109 ldout(cct, 10) << " pinging osd that serves lingering tid " << p->first
2110 << " (osd." << op->session->osd << ")" << dendl;
2111 found = true;
2112 if (op->is_watch && op->registered && !op->last_error)
2113 _send_linger_ping(op);
2114 }
f67539c2 2115 for (auto p = s->command_ops.begin();
7c673cae
FG
2116 p != s->command_ops.end();
2117 ++p) {
f67539c2 2118 auto op = p->second;
11fdf7f2 2119 ceph_assert(op->session);
7c673cae
FG
2120 ldout(cct, 10) << " pinging osd that serves command tid " << p->first
2121 << " (osd." << op->session->osd << ")" << dendl;
2122 found = true;
2123 }
2124 if (found)
2125 toping.insert(s);
2126 }
31f18b77 2127 if (num_homeless_ops || !toping.empty()) {
7c673cae
FG
2128 _maybe_request_map();
2129 }
2130
2131 logger->set(l_osdc_op_laggy, laggy_ops);
2132 logger->set(l_osdc_osd_laggy, toping.size());
2133
2134 if (!toping.empty()) {
2135 // send a ping to these osds, to ensure we detect any session resets
2136 // (osd reply message policy is lossy)
f67539c2 2137 for (auto i = toping.begin(); i != toping.end(); ++i) {
7c673cae
FG
2138 (*i)->con->send_message(new MPing);
2139 }
2140 }
2141
11fdf7f2 2142 // Make sure we don't reschedule if we wake up after shutdown
31f18b77 2143 if (initialized) {
7c673cae
FG
2144 tick_event = timer.reschedule_me(ceph::make_timespan(
2145 cct->_conf->objecter_tick_interval));
2146 }
2147}
2148
2149void Objecter::resend_mon_ops()
2150{
2151 unique_lock wl(rwlock);
2152
2153 ldout(cct, 10) << "resend_mon_ops" << dendl;
2154
f67539c2 2155 for (auto p = poolstat_ops.begin(); p != poolstat_ops.end(); ++p) {
7c673cae
FG
2156 _poolstat_submit(p->second);
2157 logger->inc(l_osdc_poolstat_resend);
2158 }
2159
f67539c2 2160 for (auto p = statfs_ops.begin(); p != statfs_ops.end(); ++p) {
7c673cae
FG
2161 _fs_stats_submit(p->second);
2162 logger->inc(l_osdc_statfs_resend);
2163 }
2164
f67539c2 2165 for (auto p = pool_ops.begin(); p != pool_ops.end(); ++p) {
7c673cae
FG
2166 _pool_op_submit(p->second);
2167 logger->inc(l_osdc_poolop_resend);
2168 }
2169
f67539c2 2170 for (auto p = check_latest_map_ops.begin();
7c673cae
FG
2171 p != check_latest_map_ops.end();
2172 ++p) {
f67539c2 2173 monc->get_version("osdmap", CB_Op_Map_Latest(this, p->second->tid));
7c673cae
FG
2174 }
2175
f67539c2 2176 for (auto p = check_latest_map_lingers.begin();
7c673cae
FG
2177 p != check_latest_map_lingers.end();
2178 ++p) {
f67539c2 2179 monc->get_version("osdmap", CB_Linger_Map_Latest(this, p->second->linger_id));
7c673cae
FG
2180 }
2181
f67539c2 2182 for (auto p = check_latest_map_commands.begin();
7c673cae
FG
2183 p != check_latest_map_commands.end();
2184 ++p) {
f67539c2 2185 monc->get_version("osdmap", CB_Command_Map_Latest(this, p->second->tid));
7c673cae
FG
2186 }
2187}
2188
2189// read | write ---------------------------
2190
2191void Objecter::op_submit(Op *op, ceph_tid_t *ptid, int *ctx_budget)
2192{
2193 shunique_lock rl(rwlock, ceph::acquire_shared);
2194 ceph_tid_t tid = 0;
2195 if (!ptid)
2196 ptid = &tid;
31f18b77 2197 op->trace.event("op submit");
7c673cae
FG
2198 _op_submit_with_budget(op, rl, ptid, ctx_budget);
2199}
2200
f67539c2
TL
2201void Objecter::_op_submit_with_budget(Op *op,
2202 shunique_lock<ceph::shared_mutex>& sul,
7c673cae
FG
2203 ceph_tid_t *ptid,
2204 int *ctx_budget)
2205{
11fdf7f2 2206 ceph_assert(initialized);
7c673cae 2207
11fdf7f2
TL
2208 ceph_assert(op->ops.size() == op->out_bl.size());
2209 ceph_assert(op->ops.size() == op->out_rval.size());
2210 ceph_assert(op->ops.size() == op->out_handler.size());
7c673cae
FG
2211
2212 // throttle. before we look at any state, because
2213 // _take_op_budget() may drop our lock while it blocks.
2214 if (!op->ctx_budgeted || (ctx_budget && (*ctx_budget == -1))) {
2215 int op_budget = _take_op_budget(op, sul);
2216 // take and pass out the budget for the first OP
2217 // in the context session
2218 if (ctx_budget && (*ctx_budget == -1)) {
2219 *ctx_budget = op_budget;
2220 }
2221 }
2222
2223 if (osd_timeout > timespan(0)) {
2224 if (op->tid == 0)
31f18b77 2225 op->tid = ++last_tid;
7c673cae
FG
2226 auto tid = op->tid;
2227 op->ontimeout = timer.add_event(osd_timeout,
2228 [this, tid]() {
2229 op_cancel(tid, -ETIMEDOUT); });
2230 }
2231
2232 _op_submit(op, sul, ptid);
2233}
2234
2235void Objecter::_send_op_account(Op *op)
2236{
31f18b77 2237 inflight_ops++;
7c673cae
FG
2238
2239 // add to gather set(s)
f67539c2 2240 if (op->has_completion()) {
31f18b77 2241 num_in_flight++;
7c673cae
FG
2242 } else {
2243 ldout(cct, 20) << " note: not requesting reply" << dendl;
2244 }
2245
2246 logger->inc(l_osdc_op_active);
2247 logger->inc(l_osdc_op);
f67539c2 2248 logger->inc(l_osdc_oplen_avg, op->ops.size());
7c673cae
FG
2249
2250 if ((op->target.flags & (CEPH_OSD_FLAG_READ | CEPH_OSD_FLAG_WRITE)) ==
2251 (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE))
2252 logger->inc(l_osdc_op_rmw);
2253 else if (op->target.flags & CEPH_OSD_FLAG_WRITE)
2254 logger->inc(l_osdc_op_w);
2255 else if (op->target.flags & CEPH_OSD_FLAG_READ)
2256 logger->inc(l_osdc_op_r);
2257
2258 if (op->target.flags & CEPH_OSD_FLAG_PGOP)
2259 logger->inc(l_osdc_op_pg);
2260
f67539c2 2261 for (auto p = op->ops.begin(); p != op->ops.end(); ++p) {
7c673cae
FG
2262 int code = l_osdc_osdop_other;
2263 switch (p->op.op) {
2264 case CEPH_OSD_OP_STAT: code = l_osdc_osdop_stat; break;
2265 case CEPH_OSD_OP_CREATE: code = l_osdc_osdop_create; break;
2266 case CEPH_OSD_OP_READ: code = l_osdc_osdop_read; break;
2267 case CEPH_OSD_OP_WRITE: code = l_osdc_osdop_write; break;
2268 case CEPH_OSD_OP_WRITEFULL: code = l_osdc_osdop_writefull; break;
2269 case CEPH_OSD_OP_WRITESAME: code = l_osdc_osdop_writesame; break;
2270 case CEPH_OSD_OP_APPEND: code = l_osdc_osdop_append; break;
2271 case CEPH_OSD_OP_ZERO: code = l_osdc_osdop_zero; break;
2272 case CEPH_OSD_OP_TRUNCATE: code = l_osdc_osdop_truncate; break;
2273 case CEPH_OSD_OP_DELETE: code = l_osdc_osdop_delete; break;
2274 case CEPH_OSD_OP_MAPEXT: code = l_osdc_osdop_mapext; break;
2275 case CEPH_OSD_OP_SPARSE_READ: code = l_osdc_osdop_sparse_read; break;
2276 case CEPH_OSD_OP_GETXATTR: code = l_osdc_osdop_getxattr; break;
2277 case CEPH_OSD_OP_SETXATTR: code = l_osdc_osdop_setxattr; break;
2278 case CEPH_OSD_OP_CMPXATTR: code = l_osdc_osdop_cmpxattr; break;
2279 case CEPH_OSD_OP_RMXATTR: code = l_osdc_osdop_rmxattr; break;
2280 case CEPH_OSD_OP_RESETXATTRS: code = l_osdc_osdop_resetxattrs; break;
7c673cae
FG
2281
2282 // OMAP read operations
2283 case CEPH_OSD_OP_OMAPGETVALS:
2284 case CEPH_OSD_OP_OMAPGETKEYS:
2285 case CEPH_OSD_OP_OMAPGETHEADER:
2286 case CEPH_OSD_OP_OMAPGETVALSBYKEYS:
2287 case CEPH_OSD_OP_OMAP_CMP: code = l_osdc_osdop_omap_rd; break;
2288
2289 // OMAP write operations
2290 case CEPH_OSD_OP_OMAPSETVALS:
2291 case CEPH_OSD_OP_OMAPSETHEADER: code = l_osdc_osdop_omap_wr; break;
2292
2293 // OMAP del operations
2294 case CEPH_OSD_OP_OMAPCLEAR:
2295 case CEPH_OSD_OP_OMAPRMKEYS: code = l_osdc_osdop_omap_del; break;
2296
2297 case CEPH_OSD_OP_CALL: code = l_osdc_osdop_call; break;
2298 case CEPH_OSD_OP_WATCH: code = l_osdc_osdop_watch; break;
2299 case CEPH_OSD_OP_NOTIFY: code = l_osdc_osdop_notify; break;
2300 }
2301 if (code)
2302 logger->inc(code);
2303 }
2304}
2305
f67539c2 2306void Objecter::_op_submit(Op *op, shunique_lock<ceph::shared_mutex>& sul, ceph_tid_t *ptid)
7c673cae
FG
2307{
2308 // rwlock is locked
2309
2310 ldout(cct, 10) << __func__ << " op " << op << dendl;
2311
2312 // pick target
11fdf7f2 2313 ceph_assert(op->session == NULL);
7c673cae
FG
2314 OSDSession *s = NULL;
2315
2316 bool check_for_latest_map = _calc_target(&op->target, nullptr)
2317 == RECALC_OP_TARGET_POOL_DNE;
2318
2319 // Try to get a session, including a retry if we need to take write lock
2320 int r = _get_session(op->target.osd, &s, sul);
2321 if (r == -EAGAIN ||
11fdf7f2
TL
2322 (check_for_latest_map && sul.owns_lock_shared()) ||
2323 cct->_conf->objecter_debug_inject_relock_delay) {
7c673cae
FG
2324 epoch_t orig_epoch = osdmap->get_epoch();
2325 sul.unlock();
2326 if (cct->_conf->objecter_debug_inject_relock_delay) {
2327 sleep(1);
2328 }
2329 sul.lock();
2330 if (orig_epoch != osdmap->get_epoch()) {
2331 // map changed; recalculate mapping
2332 ldout(cct, 10) << __func__ << " relock raced with osdmap, recalc target"
2333 << dendl;
2334 check_for_latest_map = _calc_target(&op->target, nullptr)
2335 == RECALC_OP_TARGET_POOL_DNE;
2336 if (s) {
2337 put_session(s);
2338 s = NULL;
2339 r = -EAGAIN;
2340 }
2341 }
2342 }
2343 if (r == -EAGAIN) {
11fdf7f2 2344 ceph_assert(s == NULL);
7c673cae
FG
2345 r = _get_session(op->target.osd, &s, sul);
2346 }
11fdf7f2
TL
2347 ceph_assert(r == 0);
2348 ceph_assert(s); // may be homeless
7c673cae
FG
2349
2350 _send_op_account(op);
2351
2352 // send?
2353
11fdf7f2 2354 ceph_assert(op->target.flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE));
7c673cae 2355
7c673cae 2356 bool need_send = false;
9f95a23c
TL
2357 if (op->target.paused) {
2358 ldout(cct, 10) << " tid " << op->tid << " op " << op << " is paused"
7c673cae 2359 << dendl;
7c673cae
FG
2360 _maybe_request_map();
2361 } else if (!s->is_homeless()) {
2362 need_send = true;
2363 } else {
2364 _maybe_request_map();
2365 }
2366
f67539c2 2367 unique_lock sl(s->lock);
7c673cae 2368 if (op->tid == 0)
31f18b77 2369 op->tid = ++last_tid;
7c673cae
FG
2370
2371 ldout(cct, 10) << "_op_submit oid " << op->target.base_oid
2372 << " '" << op->target.base_oloc << "' '"
2373 << op->target.target_oloc << "' " << op->ops << " tid "
2374 << op->tid << " osd." << (!s->is_homeless() ? s->osd : -1)
2375 << dendl;
2376
2377 _session_op_assign(s, op);
2378
2379 if (need_send) {
11fdf7f2 2380 _send_op(op);
7c673cae
FG
2381 }
2382
2383 // Last chance to touch Op here, after giving up session lock it can
2384 // be freed at any time by response handler.
2385 ceph_tid_t tid = op->tid;
2386 if (check_for_latest_map) {
2387 _send_op_map_check(op);
2388 }
2389 if (ptid)
2390 *ptid = tid;
2391 op = NULL;
2392
2393 sl.unlock();
2394 put_session(s);
2395
31f18b77 2396 ldout(cct, 5) << num_in_flight << " in flight" << dendl;
7c673cae
FG
2397}
2398
2399int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r)
2400{
11fdf7f2 2401 ceph_assert(initialized);
7c673cae 2402
f67539c2 2403 unique_lock sl(s->lock);
7c673cae 2404
f67539c2 2405 auto p = s->ops.find(tid);
7c673cae
FG
2406 if (p == s->ops.end()) {
2407 ldout(cct, 10) << __func__ << " tid " << tid << " dne in session "
2408 << s->osd << dendl;
2409 return -ENOENT;
2410 }
2411
11fdf7f2 2412#if 0
7c673cae 2413 if (s->con) {
9f95a23c 2414 ldout(cct, 20) << " revoking rx ceph::buffer for " << tid
7c673cae
FG
2415 << " on " << s->con << dendl;
2416 s->con->revoke_rx_buffer(tid);
2417 }
11fdf7f2 2418#endif
7c673cae
FG
2419
2420 ldout(cct, 10) << __func__ << " tid " << tid << " in session " << s->osd
2421 << dendl;
2422 Op *op = p->second;
f67539c2 2423 if (op->has_completion()) {
31f18b77 2424 num_in_flight--;
f67539c2 2425 op->complete(osdcode(r), r);
7c673cae
FG
2426 }
2427 _op_cancel_map_check(op);
2428 _finish_op(op, r);
2429 sl.unlock();
2430
2431 return 0;
2432}
2433
2434int Objecter::op_cancel(ceph_tid_t tid, int r)
2435{
2436 int ret = 0;
2437
2438 unique_lock wl(rwlock);
2439 ret = _op_cancel(tid, r);
2440
2441 return ret;
2442}
2443
94b18763
FG
2444int Objecter::op_cancel(const vector<ceph_tid_t>& tids, int r)
2445{
2446 unique_lock wl(rwlock);
2447 ldout(cct,10) << __func__ << " " << tids << dendl;
2448 for (auto tid : tids) {
2449 _op_cancel(tid, r);
2450 }
2451 return 0;
2452}
2453
7c673cae
FG
2454int Objecter::_op_cancel(ceph_tid_t tid, int r)
2455{
2456 int ret = 0;
2457
2458 ldout(cct, 5) << __func__ << ": cancelling tid " << tid << " r=" << r
2459 << dendl;
2460
2461start:
2462
f67539c2 2463 for (auto siter = osd_sessions.begin();
7c673cae
FG
2464 siter != osd_sessions.end(); ++siter) {
2465 OSDSession *s = siter->second;
f67539c2 2466 shared_lock sl(s->lock);
7c673cae
FG
2467 if (s->ops.find(tid) != s->ops.end()) {
2468 sl.unlock();
2469 ret = op_cancel(s, tid, r);
2470 if (ret == -ENOENT) {
2471 /* oh no! raced, maybe tid moved to another session, restarting */
2472 goto start;
2473 }
2474 return ret;
2475 }
2476 }
2477
2478 ldout(cct, 5) << __func__ << ": tid " << tid
2479 << " not found in live sessions" << dendl;
2480
2481 // Handle case where the op is in homeless session
f67539c2 2482 shared_lock sl(homeless_session->lock);
7c673cae
FG
2483 if (homeless_session->ops.find(tid) != homeless_session->ops.end()) {
2484 sl.unlock();
2485 ret = op_cancel(homeless_session, tid, r);
2486 if (ret == -ENOENT) {
2487 /* oh no! raced, maybe tid moved to another session, restarting */
2488 goto start;
2489 } else {
2490 return ret;
2491 }
2492 } else {
2493 sl.unlock();
2494 }
2495
2496 ldout(cct, 5) << __func__ << ": tid " << tid
2497 << " not found in homeless session" << dendl;
2498
2499 return ret;
2500}
2501
2502
2503epoch_t Objecter::op_cancel_writes(int r, int64_t pool)
2504{
2505 unique_lock wl(rwlock);
2506
2507 std::vector<ceph_tid_t> to_cancel;
2508 bool found = false;
2509
f67539c2 2510 for (auto siter = osd_sessions.begin();
7c673cae
FG
2511 siter != osd_sessions.end(); ++siter) {
2512 OSDSession *s = siter->second;
f67539c2
TL
2513 shared_lock sl(s->lock);
2514 for (auto op_i = s->ops.begin();
7c673cae
FG
2515 op_i != s->ops.end(); ++op_i) {
2516 if (op_i->second->target.flags & CEPH_OSD_FLAG_WRITE
2517 && (pool == -1 || op_i->second->target.target_oloc.pool == pool)) {
2518 to_cancel.push_back(op_i->first);
2519 }
2520 }
2521 sl.unlock();
2522
f67539c2 2523 for (auto titer = to_cancel.begin(); titer != to_cancel.end(); ++titer) {
7c673cae
FG
2524 int cancel_result = op_cancel(s, *titer, r);
2525 // We hold rwlock across search and cancellation, so cancels
2526 // should always succeed
11fdf7f2 2527 ceph_assert(cancel_result == 0);
7c673cae
FG
2528 }
2529 if (!found && to_cancel.size())
2530 found = true;
2531 to_cancel.clear();
2532 }
2533
2534 const epoch_t epoch = osdmap->get_epoch();
2535
2536 wl.unlock();
2537
2538 if (found) {
2539 return epoch;
2540 } else {
2541 return -1;
2542 }
2543}
2544
2545bool Objecter::is_pg_changed(
2546 int oldprimary,
2547 const vector<int>& oldacting,
2548 int newprimary,
2549 const vector<int>& newacting,
2550 bool any_change)
2551{
9f95a23c 2552 if (OSDMap::primary_changed_broken( // https://tracker.ceph.com/issues/43213
7c673cae
FG
2553 oldprimary,
2554 oldacting,
2555 newprimary,
2556 newacting))
2557 return true;
2558 if (any_change && oldacting != newacting)
2559 return true;
2560 return false; // same primary (tho replicas may have changed)
2561}
2562
2563bool Objecter::target_should_be_paused(op_target_t *t)
2564{
2565 const pg_pool_t *pi = osdmap->get_pg_pool(t->base_oloc.pool);
2566 bool pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
2567 bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) ||
9f95a23c 2568 (t->respects_full() && (_osdmap_full_flag() || _osdmap_pool_full(*pi)));
7c673cae
FG
2569
2570 return (t->flags & CEPH_OSD_FLAG_READ && pauserd) ||
2571 (t->flags & CEPH_OSD_FLAG_WRITE && pausewr) ||
2572 (osdmap->get_epoch() < epoch_barrier);
2573}
2574
2575/**
2576 * Locking public accessor for _osdmap_full_flag
2577 */
2578bool Objecter::osdmap_full_flag() const
2579{
2580 shared_lock rl(rwlock);
2581
2582 return _osdmap_full_flag();
2583}
2584
2585bool Objecter::osdmap_pool_full(const int64_t pool_id) const
2586{
2587 shared_lock rl(rwlock);
2588
2589 if (_osdmap_full_flag()) {
2590 return true;
2591 }
2592
2593 return _osdmap_pool_full(pool_id);
2594}
2595
2596bool Objecter::_osdmap_pool_full(const int64_t pool_id) const
2597{
2598 const pg_pool_t *pool = osdmap->get_pg_pool(pool_id);
2599 if (pool == NULL) {
2600 ldout(cct, 4) << __func__ << ": DNE pool " << pool_id << dendl;
2601 return false;
2602 }
2603
2604 return _osdmap_pool_full(*pool);
2605}
2606
2607bool Objecter::_osdmap_has_pool_full() const
2608{
f67539c2 2609 for (auto it = osdmap->get_pools().begin();
7c673cae
FG
2610 it != osdmap->get_pools().end(); ++it) {
2611 if (_osdmap_pool_full(it->second))
2612 return true;
2613 }
2614 return false;
2615}
2616
7c673cae
FG
2617/**
2618 * Wrapper around osdmap->test_flag for special handling of the FULL flag.
2619 */
2620bool Objecter::_osdmap_full_flag() const
2621{
11fdf7f2 2622 // Ignore the FULL flag if the caller does not have honor_osdmap_full
9f95a23c 2623 return osdmap->test_flag(CEPH_OSDMAP_FULL) && honor_pool_full;
7c673cae
FG
2624}
2625
2626void Objecter::update_pool_full_map(map<int64_t, bool>& pool_full_map)
2627{
2628 for (map<int64_t, pg_pool_t>::const_iterator it
2629 = osdmap->get_pools().begin();
2630 it != osdmap->get_pools().end(); ++it) {
2631 if (pool_full_map.find(it->first) == pool_full_map.end()) {
2632 pool_full_map[it->first] = _osdmap_pool_full(it->second);
2633 } else {
2634 pool_full_map[it->first] = _osdmap_pool_full(it->second) ||
2635 pool_full_map[it->first];
2636 }
2637 }
2638}
2639
2640int64_t Objecter::get_object_hash_position(int64_t pool, const string& key,
2641 const string& ns)
2642{
2643 shared_lock rl(rwlock);
2644 const pg_pool_t *p = osdmap->get_pg_pool(pool);
2645 if (!p)
2646 return -ENOENT;
2647 return p->hash_key(key, ns);
2648}
2649
2650int64_t Objecter::get_object_pg_hash_position(int64_t pool, const string& key,
2651 const string& ns)
2652{
2653 shared_lock rl(rwlock);
2654 const pg_pool_t *p = osdmap->get_pg_pool(pool);
2655 if (!p)
2656 return -ENOENT;
2657 return p->raw_hash_to_pg(p->hash_key(key, ns));
2658}
2659
11fdf7f2
TL
2660void Objecter::_prune_snapc(
2661 const mempool::osdmap::map<int64_t,
9f95a23c 2662 snap_interval_set_t>& new_removed_snaps,
11fdf7f2
TL
2663 Op *op)
2664{
2665 bool match = false;
2666 auto i = new_removed_snaps.find(op->target.base_pgid.pool());
2667 if (i != new_removed_snaps.end()) {
2668 for (auto s : op->snapc.snaps) {
2669 if (i->second.contains(s)) {
2670 match = true;
2671 break;
2672 }
2673 }
2674 if (match) {
2675 vector<snapid_t> new_snaps;
2676 for (auto s : op->snapc.snaps) {
2677 if (!i->second.contains(s)) {
2678 new_snaps.push_back(s);
2679 }
2680 }
2681 op->snapc.snaps.swap(new_snaps);
2682 ldout(cct,10) << __func__ << " op " << op->tid << " snapc " << op->snapc
2683 << " (was " << new_snaps << ")" << dendl;
2684 }
2685 }
2686}
2687
7c673cae
FG
2688int Objecter::_calc_target(op_target_t *t, Connection *con, bool any_change)
2689{
2690 // rwlock is locked
2691 bool is_read = t->flags & CEPH_OSD_FLAG_READ;
2692 bool is_write = t->flags & CEPH_OSD_FLAG_WRITE;
2693 t->epoch = osdmap->get_epoch();
2694 ldout(cct,20) << __func__ << " epoch " << t->epoch
2695 << " base " << t->base_oid << " " << t->base_oloc
2696 << " precalc_pgid " << (int)t->precalc_pgid
2697 << " pgid " << t->base_pgid
2698 << (is_read ? " is_read" : "")
2699 << (is_write ? " is_write" : "")
2700 << dendl;
2701
2702 const pg_pool_t *pi = osdmap->get_pg_pool(t->base_oloc.pool);
2703 if (!pi) {
2704 t->osd = -1;
2705 return RECALC_OP_TARGET_POOL_DNE;
2706 }
2707 ldout(cct,30) << __func__ << " base pi " << pi
2708 << " pg_num " << pi->get_pg_num() << dendl;
2709
2710 bool force_resend = false;
2711 if (osdmap->get_epoch() == pi->last_force_op_resend) {
2712 if (t->last_force_resend < pi->last_force_op_resend) {
2713 t->last_force_resend = pi->last_force_op_resend;
2714 force_resend = true;
2715 } else if (t->last_force_resend == 0) {
2716 force_resend = true;
2717 }
2718 }
2719
2720 // apply tiering
2721 t->target_oid = t->base_oid;
2722 t->target_oloc = t->base_oloc;
2723 if ((t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
2724 if (is_read && pi->has_read_tier())
2725 t->target_oloc.pool = pi->read_tier;
2726 if (is_write && pi->has_write_tier())
2727 t->target_oloc.pool = pi->write_tier;
2728 pi = osdmap->get_pg_pool(t->target_oloc.pool);
2729 if (!pi) {
2730 t->osd = -1;
2731 return RECALC_OP_TARGET_POOL_DNE;
2732 }
2733 }
2734
2735 pg_t pgid;
2736 if (t->precalc_pgid) {
11fdf7f2
TL
2737 ceph_assert(t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY);
2738 ceph_assert(t->base_oid.name.empty()); // make sure this is a pg op
2739 ceph_assert(t->base_oloc.pool == (int64_t)t->base_pgid.pool());
7c673cae
FG
2740 pgid = t->base_pgid;
2741 } else {
2742 int ret = osdmap->object_locator_to_pg(t->target_oid, t->target_oloc,
2743 pgid);
2744 if (ret == -ENOENT) {
2745 t->osd = -1;
2746 return RECALC_OP_TARGET_POOL_DNE;
2747 }
2748 }
2749 ldout(cct,20) << __func__ << " target " << t->target_oid << " "
2750 << t->target_oloc << " -> pgid " << pgid << dendl;
2751 ldout(cct,30) << __func__ << " target pi " << pi
2752 << " pg_num " << pi->get_pg_num() << dendl;
2753 t->pool_ever_existed = true;
2754
2755 int size = pi->size;
2756 int min_size = pi->min_size;
2757 unsigned pg_num = pi->get_pg_num();
9f95a23c 2758 unsigned pg_num_mask = pi->get_pg_num_mask();
11fdf7f2 2759 unsigned pg_num_pending = pi->get_pg_num_pending();
7c673cae
FG
2760 int up_primary, acting_primary;
2761 vector<int> up, acting;
9f95a23c
TL
2762 ps_t actual_ps = ceph_stable_mod(pgid.ps(), pg_num, pg_num_mask);
2763 pg_t actual_pgid(actual_ps, pgid.pool());
2764 pg_mapping_t pg_mapping;
2765 pg_mapping.epoch = osdmap->get_epoch();
2766 if (lookup_pg_mapping(actual_pgid, &pg_mapping)) {
2767 up = pg_mapping.up;
2768 up_primary = pg_mapping.up_primary;
2769 acting = pg_mapping.acting;
2770 acting_primary = pg_mapping.acting_primary;
2771 } else {
2772 osdmap->pg_to_up_acting_osds(actual_pgid, &up, &up_primary,
2773 &acting, &acting_primary);
2774 pg_mapping_t pg_mapping(osdmap->get_epoch(),
2775 up, up_primary, acting, acting_primary);
2776 update_pg_mapping(actual_pgid, std::move(pg_mapping));
2777 }
7c673cae 2778 bool sort_bitwise = osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE);
c07f9fc5 2779 bool recovery_deletes = osdmap->test_flag(CEPH_OSDMAP_RECOVERY_DELETES);
7c673cae
FG
2780 unsigned prev_seed = ceph_stable_mod(pgid.ps(), t->pg_num, t->pg_num_mask);
2781 pg_t prev_pgid(prev_seed, pgid.pool());
2782 if (any_change && PastIntervals::is_new_interval(
2783 t->acting_primary,
2784 acting_primary,
2785 t->acting,
2786 acting,
2787 t->up_primary,
2788 up_primary,
2789 t->up,
2790 up,
2791 t->size,
2792 size,
2793 t->min_size,
2794 min_size,
2795 t->pg_num,
2796 pg_num,
11fdf7f2
TL
2797 t->pg_num_pending,
2798 pg_num_pending,
7c673cae
FG
2799 t->sort_bitwise,
2800 sort_bitwise,
c07f9fc5
FG
2801 t->recovery_deletes,
2802 recovery_deletes,
f67539c2
TL
2803 t->peering_crush_bucket_count,
2804 pi->peering_crush_bucket_count,
2805 t->peering_crush_bucket_target,
2806 pi->peering_crush_bucket_target,
2807 t->peering_crush_bucket_barrier,
2808 pi->peering_crush_bucket_barrier,
2809 t->peering_crush_mandatory_member,
2810 pi->peering_crush_mandatory_member,
7c673cae
FG
2811 prev_pgid)) {
2812 force_resend = true;
2813 }
2814
2815 bool unpaused = false;
f64942e4
AA
2816 bool should_be_paused = target_should_be_paused(t);
2817 if (t->paused && !should_be_paused) {
7c673cae
FG
2818 unpaused = true;
2819 }
9f95a23c
TL
2820 if (t->paused != should_be_paused) {
2821 ldout(cct, 10) << __func__ << " paused " << t->paused
2822 << " -> " << should_be_paused << dendl;
2823 t->paused = should_be_paused;
2824 }
7c673cae
FG
2825
2826 bool legacy_change =
2827 t->pgid != pgid ||
2828 is_pg_changed(
2829 t->acting_primary, t->acting, acting_primary, acting,
2830 t->used_replica || any_change);
11fdf7f2 2831 bool split_or_merge = false;
7c673cae 2832 if (t->pg_num) {
11fdf7f2
TL
2833 split_or_merge =
2834 prev_pgid.is_split(t->pg_num, pg_num, nullptr) ||
2835 prev_pgid.is_merge_source(t->pg_num, pg_num, nullptr) ||
2836 prev_pgid.is_merge_target(t->pg_num, pg_num);
7c673cae
FG
2837 }
2838
11fdf7f2 2839 if (legacy_change || split_or_merge || force_resend) {
7c673cae
FG
2840 t->pgid = pgid;
2841 t->acting = acting;
2842 t->acting_primary = acting_primary;
2843 t->up_primary = up_primary;
2844 t->up = up;
2845 t->size = size;
2846 t->min_size = min_size;
2847 t->pg_num = pg_num;
9f95a23c 2848 t->pg_num_mask = pg_num_mask;
11fdf7f2 2849 t->pg_num_pending = pg_num_pending;
9f95a23c
TL
2850 spg_t spgid(actual_pgid);
2851 if (pi->is_erasure()) {
2852 for (uint8_t i = 0; i < acting.size(); ++i) {
2853 if (acting[i] == acting_primary) {
2854 spgid.reset_shard(shard_id_t(i));
2855 break;
2856 }
2857 }
2858 }
2859 t->actual_pgid = spgid;
7c673cae 2860 t->sort_bitwise = sort_bitwise;
c07f9fc5 2861 t->recovery_deletes = recovery_deletes;
f67539c2
TL
2862 t->peering_crush_bucket_count = pi->peering_crush_bucket_count;
2863 t->peering_crush_bucket_target = pi->peering_crush_bucket_target;
2864 t->peering_crush_bucket_barrier = pi->peering_crush_bucket_barrier;
2865 t->peering_crush_mandatory_member = pi->peering_crush_mandatory_member;
7c673cae
FG
2866 ldout(cct, 10) << __func__ << " "
2867 << " raw pgid " << pgid << " -> actual " << t->actual_pgid
2868 << " acting " << acting
2869 << " primary " << acting_primary << dendl;
2870 t->used_replica = false;
e306af50
TL
2871 if ((t->flags & (CEPH_OSD_FLAG_BALANCE_READS |
2872 CEPH_OSD_FLAG_LOCALIZE_READS)) &&
2873 !is_write && pi->is_replicated() && acting.size() > 1) {
7c673cae 2874 int osd;
e306af50
TL
2875 ceph_assert(is_read && acting[0] == acting_primary);
2876 if (t->flags & CEPH_OSD_FLAG_BALANCE_READS) {
7c673cae
FG
2877 int p = rand() % acting.size();
2878 if (p)
2879 t->used_replica = true;
2880 osd = acting[p];
2881 ldout(cct, 10) << " chose random osd." << osd << " of " << acting
2882 << dendl;
e306af50 2883 } else {
7c673cae
FG
2884 // look for a local replica. prefer the primary if the
2885 // distance is the same.
2886 int best = -1;
2887 int best_locality = 0;
2888 for (unsigned i = 0; i < acting.size(); ++i) {
2889 int locality = osdmap->crush->get_common_ancestor_distance(
2890 cct, acting[i], crush_location);
2891 ldout(cct, 20) << __func__ << " localize: rank " << i
2892 << " osd." << acting[i]
2893 << " locality " << locality << dendl;
2894 if (i == 0 ||
2895 (locality >= 0 && best_locality >= 0 &&
2896 locality < best_locality) ||
2897 (best_locality < 0 && locality >= 0)) {
2898 best = i;
2899 best_locality = locality;
2900 if (i)
2901 t->used_replica = true;
2902 }
2903 }
11fdf7f2 2904 ceph_assert(best >= 0);
7c673cae 2905 osd = acting[best];
7c673cae
FG
2906 }
2907 t->osd = osd;
e306af50
TL
2908 } else {
2909 t->osd = acting_primary;
7c673cae
FG
2910 }
2911 }
2912 if (legacy_change || unpaused || force_resend) {
2913 return RECALC_OP_TARGET_NEED_RESEND;
2914 }
11fdf7f2 2915 if (split_or_merge &&
9f95a23c 2916 (osdmap->require_osd_release >= ceph_release_t::luminous ||
91327a77
AA
2917 HAVE_FEATURE(osdmap->get_xinfo(acting_primary).features,
2918 RESEND_ON_SPLIT))) {
7c673cae
FG
2919 return RECALC_OP_TARGET_NEED_RESEND;
2920 }
2921 return RECALC_OP_TARGET_NO_ACTION;
2922}
2923
2924int Objecter::_map_session(op_target_t *target, OSDSession **s,
f67539c2 2925 shunique_lock<ceph::shared_mutex>& sul)
7c673cae
FG
2926{
2927 _calc_target(target, nullptr);
2928 return _get_session(target->osd, s, sul);
2929}
2930
2931void Objecter::_session_op_assign(OSDSession *to, Op *op)
2932{
2933 // to->lock is locked
11fdf7f2
TL
2934 ceph_assert(op->session == NULL);
2935 ceph_assert(op->tid);
7c673cae
FG
2936
2937 get_session(to);
2938 op->session = to;
2939 to->ops[op->tid] = op;
2940
2941 if (to->is_homeless()) {
31f18b77 2942 num_homeless_ops++;
7c673cae
FG
2943 }
2944
2945 ldout(cct, 15) << __func__ << " " << to->osd << " " << op->tid << dendl;
2946}
2947
2948void Objecter::_session_op_remove(OSDSession *from, Op *op)
2949{
11fdf7f2 2950 ceph_assert(op->session == from);
7c673cae
FG
2951 // from->lock is locked
2952
2953 if (from->is_homeless()) {
31f18b77 2954 num_homeless_ops--;
7c673cae
FG
2955 }
2956
2957 from->ops.erase(op->tid);
2958 put_session(from);
2959 op->session = NULL;
2960
2961 ldout(cct, 15) << __func__ << " " << from->osd << " " << op->tid << dendl;
2962}
2963
2964void Objecter::_session_linger_op_assign(OSDSession *to, LingerOp *op)
2965{
2966 // to lock is locked unique
11fdf7f2 2967 ceph_assert(op->session == NULL);
7c673cae
FG
2968
2969 if (to->is_homeless()) {
31f18b77 2970 num_homeless_ops++;
7c673cae
FG
2971 }
2972
2973 get_session(to);
2974 op->session = to;
2975 to->linger_ops[op->linger_id] = op;
2976
2977 ldout(cct, 15) << __func__ << " " << to->osd << " " << op->linger_id
2978 << dendl;
2979}
2980
2981void Objecter::_session_linger_op_remove(OSDSession *from, LingerOp *op)
2982{
11fdf7f2 2983 ceph_assert(from == op->session);
7c673cae
FG
2984 // from->lock is locked unique
2985
2986 if (from->is_homeless()) {
31f18b77 2987 num_homeless_ops--;
7c673cae
FG
2988 }
2989
2990 from->linger_ops.erase(op->linger_id);
2991 put_session(from);
2992 op->session = NULL;
2993
2994 ldout(cct, 15) << __func__ << " " << from->osd << " " << op->linger_id
2995 << dendl;
2996}
2997
2998void Objecter::_session_command_op_remove(OSDSession *from, CommandOp *op)
2999{
11fdf7f2 3000 ceph_assert(from == op->session);
7c673cae
FG
3001 // from->lock is locked
3002
3003 if (from->is_homeless()) {
31f18b77 3004 num_homeless_ops--;
7c673cae
FG
3005 }
3006
3007 from->command_ops.erase(op->tid);
3008 put_session(from);
3009 op->session = NULL;
3010
3011 ldout(cct, 15) << __func__ << " " << from->osd << " " << op->tid << dendl;
3012}
3013
3014void Objecter::_session_command_op_assign(OSDSession *to, CommandOp *op)
3015{
3016 // to->lock is locked
11fdf7f2
TL
3017 ceph_assert(op->session == NULL);
3018 ceph_assert(op->tid);
7c673cae
FG
3019
3020 if (to->is_homeless()) {
31f18b77 3021 num_homeless_ops++;
7c673cae
FG
3022 }
3023
3024 get_session(to);
3025 op->session = to;
3026 to->command_ops[op->tid] = op;
3027
3028 ldout(cct, 15) << __func__ << " " << to->osd << " " << op->tid << dendl;
3029}
3030
3031int Objecter::_recalc_linger_op_target(LingerOp *linger_op,
f67539c2 3032 shunique_lock<ceph::shared_mutex>& sul)
7c673cae
FG
3033{
3034 // rwlock is locked unique
3035
3036 int r = _calc_target(&linger_op->target, nullptr, true);
3037 if (r == RECALC_OP_TARGET_NEED_RESEND) {
3038 ldout(cct, 10) << "recalc_linger_op_target tid " << linger_op->linger_id
3039 << " pgid " << linger_op->target.pgid
3040 << " acting " << linger_op->target.acting << dendl;
3041
3042 OSDSession *s = NULL;
3043 r = _get_session(linger_op->target.osd, &s, sul);
11fdf7f2 3044 ceph_assert(r == 0);
7c673cae
FG
3045
3046 if (linger_op->session != s) {
3047 // NB locking two sessions (s and linger_op->session) at the
3048 // same time here is only safe because we are the only one that
f67539c2
TL
3049 // takes two, and we are holding rwlock for write. We use
3050 // std::shared_mutex in OSDSession because lockdep doesn't know
3051 // that.
3052 unique_lock sl(s->lock);
7c673cae
FG
3053 _session_linger_op_remove(linger_op->session, linger_op);
3054 _session_linger_op_assign(s, linger_op);
3055 }
3056
3057 put_session(s);
3058 return RECALC_OP_TARGET_NEED_RESEND;
3059 }
3060 return r;
3061}
3062
3063void Objecter::_cancel_linger_op(Op *op)
3064{
3065 ldout(cct, 15) << "cancel_op " << op->tid << dendl;
3066
11fdf7f2 3067 ceph_assert(!op->should_resend);
f67539c2
TL
3068 if (op->has_completion()) {
3069 op->onfinish = nullptr;
31f18b77 3070 num_in_flight--;
7c673cae
FG
3071 }
3072
3073 _finish_op(op, 0);
3074}
3075
3076void Objecter::_finish_op(Op *op, int r)
3077{
11fdf7f2 3078 ldout(cct, 15) << __func__ << " " << op->tid << dendl;
7c673cae
FG
3079
3080 // op->session->lock is locked unique or op->session is null
3081
11fdf7f2
TL
3082 if (!op->ctx_budgeted && op->budget >= 0) {
3083 put_op_budget_bytes(op->budget);
3084 op->budget = -1;
3085 }
7c673cae
FG
3086
3087 if (op->ontimeout && r != -ETIMEDOUT)
3088 timer.cancel_event(op->ontimeout);
3089
3090 if (op->session) {
3091 _session_op_remove(op->session, op);
3092 }
3093
3094 logger->dec(l_osdc_op_active);
3095
11fdf7f2 3096 ceph_assert(check_latest_map_ops.find(op->tid) == check_latest_map_ops.end());
7c673cae 3097
31f18b77 3098 inflight_ops--;
7c673cae
FG
3099
3100 op->put();
3101}
3102
f67539c2 3103Objecter::MOSDOp *Objecter::_prepare_osd_op(Op *op)
7c673cae
FG
3104{
3105 // rwlock is locked
3106
3107 int flags = op->target.flags;
3108 flags |= CEPH_OSD_FLAG_KNOWN_REDIR;
3109
3110 // Nothing checks this any longer, but needed for compatibility with
3111 // pre-luminous osds
3112 flags |= CEPH_OSD_FLAG_ONDISK;
3113
9f95a23c 3114 if (!honor_pool_full)
7c673cae
FG
3115 flags |= CEPH_OSD_FLAG_FULL_FORCE;
3116
3117 op->target.paused = false;
11fdf7f2 3118 op->stamp = ceph::coarse_mono_clock::now();
7c673cae
FG
3119
3120 hobject_t hobj = op->target.get_hobj();
f67539c2
TL
3121 auto m = new MOSDOp(client_inc, op->tid,
3122 hobj, op->target.actual_pgid,
3123 osdmap->get_epoch(),
3124 flags, op->features);
7c673cae
FG
3125
3126 m->set_snapid(op->snapid);
3127 m->set_snap_seq(op->snapc.seq);
3128 m->set_snaps(op->snapc.snaps);
3129
3130 m->ops = op->ops;
3131 m->set_mtime(op->mtime);
3132 m->set_retry_attempt(op->attempts++);
31f18b77
FG
3133
3134 if (!op->trace.valid() && cct->_conf->osdc_blkin_trace_all) {
3135 op->trace.init("op", &trace_endpoint);
3136 }
7c673cae
FG
3137
3138 if (op->priority)
3139 m->set_priority(op->priority);
3140 else
3141 m->set_priority(cct->_conf->osd_client_op_priority);
3142
3143 if (op->reqid != osd_reqid_t()) {
3144 m->set_reqid(op->reqid);
3145 }
3146
3147 logger->inc(l_osdc_op_send);
b32b8144
FG
3148 ssize_t sum = 0;
3149 for (unsigned i = 0; i < m->ops.size(); i++) {
3150 sum += m->ops[i].indata.length();
3151 }
3152 logger->inc(l_osdc_op_send_bytes, sum);
7c673cae
FG
3153
3154 return m;
3155}
3156
11fdf7f2 3157void Objecter::_send_op(Op *op)
7c673cae
FG
3158{
3159 // rwlock is locked
3160 // op->session->lock is locked
3161
3162 // backoff?
7c673cae
FG
3163 auto p = op->session->backoffs.find(op->target.actual_pgid);
3164 if (p != op->session->backoffs.end()) {
b32b8144 3165 hobject_t hoid = op->target.get_hobj();
7c673cae
FG
3166 auto q = p->second.lower_bound(hoid);
3167 if (q != p->second.begin()) {
3168 --q;
3169 if (hoid >= q->second.end) {
3170 ++q;
3171 }
3172 }
3173 if (q != p->second.end()) {
3174 ldout(cct, 20) << __func__ << " ? " << q->first << " [" << q->second.begin
3175 << "," << q->second.end << ")" << dendl;
3176 int r = cmp(hoid, q->second.begin);
3177 if (r == 0 || (r > 0 && hoid < q->second.end)) {
3178 ldout(cct, 10) << __func__ << " backoff " << op->target.actual_pgid
3179 << " id " << q->second.id << " on " << hoid
3180 << ", queuing " << op << " tid " << op->tid << dendl;
3181 return;
3182 }
3183 }
3184 }
3185
11fdf7f2
TL
3186 ceph_assert(op->tid > 0);
3187 MOSDOp *m = _prepare_osd_op(op);
7c673cae
FG
3188
3189 if (op->target.actual_pgid != m->get_spg()) {
3190 ldout(cct, 10) << __func__ << " " << op->tid << " pgid change from "
3191 << m->get_spg() << " to " << op->target.actual_pgid
3192 << ", updating and reencoding" << dendl;
3193 m->set_spg(op->target.actual_pgid);
3194 m->clear_payload(); // reencode
3195 }
3196
3197 ldout(cct, 15) << "_send_op " << op->tid << " to "
3198 << op->target.actual_pgid << " on osd." << op->session->osd
3199 << dendl;
3200
3201 ConnectionRef con = op->session->con;
11fdf7f2 3202 ceph_assert(con);
7c673cae 3203
11fdf7f2 3204#if 0
9f95a23c 3205 // preallocated rx ceph::buffer?
7c673cae 3206 if (op->con) {
9f95a23c 3207 ldout(cct, 20) << " revoking rx ceph::buffer for " << op->tid << " on "
7c673cae
FG
3208 << op->con << dendl;
3209 op->con->revoke_rx_buffer(op->tid);
3210 }
3211 if (op->outbl &&
3212 op->ontimeout == 0 && // only post rx_buffer if no timeout; see #9582
3213 op->outbl->length()) {
11fdf7f2 3214 op->outbl->invalidate_crc(); // messenger writes through c_str()
9f95a23c 3215 ldout(cct, 20) << " posting rx ceph::buffer for " << op->tid << " on " << con
7c673cae
FG
3216 << dendl;
3217 op->con = con;
3218 op->con->post_rx_buffer(op->tid, *op->outbl);
3219 }
11fdf7f2 3220#endif
7c673cae
FG
3221
3222 op->incarnation = op->session->incarnation;
3223
31f18b77
FG
3224 if (op->trace.valid()) {
3225 m->trace.init("op msg", nullptr, &op->trace);
3226 }
7c673cae
FG
3227 op->session->con->send_message(m);
3228}
3229
f67539c2 3230int Objecter::calc_op_budget(const bc::small_vector_base<OSDOp>& ops)
7c673cae
FG
3231{
3232 int op_budget = 0;
f67539c2 3233 for (auto i = ops.begin(); i != ops.end(); ++i) {
7c673cae
FG
3234 if (i->op.op & CEPH_OSD_OP_MODE_WR) {
3235 op_budget += i->indata.length();
3236 } else if (ceph_osd_op_mode_read(i->op.op)) {
a8e16298
TL
3237 if (ceph_osd_op_uses_extent(i->op.op)) {
3238 if ((int64_t)i->op.extent.length > 0)
3239 op_budget += (int64_t)i->op.extent.length;
7c673cae 3240 } else if (ceph_osd_op_type_attr(i->op.op)) {
a8e16298 3241 op_budget += i->op.xattr.name_len + i->op.xattr.value_len;
7c673cae
FG
3242 }
3243 }
3244 }
3245 return op_budget;
3246}
3247
3248void Objecter::_throttle_op(Op *op,
f67539c2 3249 shunique_lock<ceph::shared_mutex>& sul,
7c673cae
FG
3250 int op_budget)
3251{
11fdf7f2 3252 ceph_assert(sul && sul.mutex() == &rwlock);
7c673cae
FG
3253 bool locked_for_write = sul.owns_lock();
3254
3255 if (!op_budget)
11fdf7f2 3256 op_budget = calc_op_budget(op->ops);
7c673cae
FG
3257 if (!op_throttle_bytes.get_or_fail(op_budget)) { //couldn't take right now
3258 sul.unlock();
3259 op_throttle_bytes.get(op_budget);
3260 if (locked_for_write)
3261 sul.lock();
3262 else
3263 sul.lock_shared();
3264 }
3265 if (!op_throttle_ops.get_or_fail(1)) { //couldn't take right now
3266 sul.unlock();
3267 op_throttle_ops.get(1);
3268 if (locked_for_write)
3269 sul.lock();
3270 else
3271 sul.lock_shared();
3272 }
3273}
3274
11fdf7f2 3275int Objecter::take_linger_budget(LingerOp *info)
7c673cae 3276{
11fdf7f2 3277 return 1;
7c673cae
FG
3278}
3279
3280/* This function DOES put the passed message before returning */
3281void Objecter::handle_osd_op_reply(MOSDOpReply *m)
3282{
3283 ldout(cct, 10) << "in handle_osd_op_reply" << dendl;
3284
3285 // get pio
3286 ceph_tid_t tid = m->get_tid();
3287
3288 shunique_lock sul(rwlock, ceph::acquire_shared);
31f18b77 3289 if (!initialized) {
7c673cae
FG
3290 m->put();
3291 return;
3292 }
3293
3294 ConnectionRef con = m->get_connection();
11fdf7f2
TL
3295 auto priv = con->get_priv();
3296 auto s = static_cast<OSDSession*>(priv.get());
7c673cae
FG
3297 if (!s || s->con != con) {
3298 ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
7c673cae
FG
3299 m->put();
3300 return;
3301 }
3302
f67539c2 3303 unique_lock sl(s->lock);
7c673cae
FG
3304
3305 map<ceph_tid_t, Op *>::iterator iter = s->ops.find(tid);
3306 if (iter == s->ops.end()) {
3307 ldout(cct, 7) << "handle_osd_op_reply " << tid
3308 << (m->is_ondisk() ? " ondisk" : (m->is_onnvram() ?
3309 " onnvram" : " ack"))
3310 << " ... stray" << dendl;
3311 sl.unlock();
7c673cae
FG
3312 m->put();
3313 return;
3314 }
3315
3316 ldout(cct, 7) << "handle_osd_op_reply " << tid
3317 << (m->is_ondisk() ? " ondisk" :
3318 (m->is_onnvram() ? " onnvram" : " ack"))
3319 << " uv " << m->get_user_version()
3320 << " in " << m->get_pg()
3321 << " attempt " << m->get_retry_attempt()
3322 << dendl;
3323 Op *op = iter->second;
31f18b77 3324 op->trace.event("osd op reply");
7c673cae
FG
3325
3326 if (retry_writes_after_first_reply && op->attempts == 1 &&
3327 (op->target.flags & CEPH_OSD_FLAG_WRITE)) {
3328 ldout(cct, 7) << "retrying write after first reply: " << tid << dendl;
f67539c2 3329 if (op->has_completion()) {
31f18b77 3330 num_in_flight--;
7c673cae
FG
3331 }
3332 _session_op_remove(s, op);
3333 sl.unlock();
7c673cae
FG
3334
3335 _op_submit(op, sul, NULL);
3336 m->put();
3337 return;
3338 }
3339
3340 if (m->get_retry_attempt() >= 0) {
3341 if (m->get_retry_attempt() != (op->attempts - 1)) {
3342 ldout(cct, 7) << " ignoring reply from attempt "
3343 << m->get_retry_attempt()
3344 << " from " << m->get_source_inst()
3345 << "; last attempt " << (op->attempts - 1) << " sent to "
3346 << op->session->con->get_peer_addr() << dendl;
3347 m->put();
3348 sl.unlock();
7c673cae
FG
3349 return;
3350 }
3351 } else {
3352 // we don't know the request attempt because the server is old, so
3353 // just accept this one. we may do ACK callbacks we shouldn't
3354 // have, but that is better than doing callbacks out of order.
3355 }
3356
f67539c2 3357 decltype(op->onfinish) onfinish;
7c673cae
FG
3358
3359 int rc = m->get_result();
3360
3361 if (m->is_redirect_reply()) {
3362 ldout(cct, 5) << " got redirect reply; redirecting" << dendl;
f67539c2 3363 if (op->has_completion())
31f18b77 3364 num_in_flight--;
7c673cae
FG
3365 _session_op_remove(s, op);
3366 sl.unlock();
7c673cae
FG
3367
3368 // FIXME: two redirects could race and reorder
3369
3370 op->tid = 0;
3371 m->get_redirect().combine_with_locator(op->target.target_oloc,
3372 op->target.target_oid.name);
f64942e4
AA
3373 op->target.flags |= (CEPH_OSD_FLAG_REDIRECTED |
3374 CEPH_OSD_FLAG_IGNORE_CACHE |
3375 CEPH_OSD_FLAG_IGNORE_OVERLAY);
7c673cae
FG
3376 _op_submit(op, sul, NULL);
3377 m->put();
3378 return;
3379 }
3380
3381 if (rc == -EAGAIN) {
3382 ldout(cct, 7) << " got -EAGAIN, resubmitting" << dendl;
f67539c2 3383 if (op->has_completion())
c07f9fc5
FG
3384 num_in_flight--;
3385 _session_op_remove(s, op);
7c673cae 3386 sl.unlock();
c07f9fc5
FG
3387
3388 op->tid = 0;
3389 op->target.flags &= ~(CEPH_OSD_FLAG_BALANCE_READS |
3390 CEPH_OSD_FLAG_LOCALIZE_READS);
3391 op->target.pgid = pg_t();
3392 _op_submit(op, sul, NULL);
7c673cae
FG
3393 m->put();
3394 return;
3395 }
3396
3397 sul.unlock();
3398
3399 if (op->objver)
3400 *op->objver = m->get_user_version();
3401 if (op->reply_epoch)
3402 *op->reply_epoch = m->get_map_epoch();
3403 if (op->data_offset)
3404 *op->data_offset = m->get_header().data_off;
3405
3406 // got data?
3407 if (op->outbl) {
11fdf7f2 3408#if 0
7c673cae
FG
3409 if (op->con)
3410 op->con->revoke_rx_buffer(op->tid);
11fdf7f2
TL
3411#endif
3412 auto& bl = m->get_data();
3413 if (op->outbl->length() == bl.length() &&
3414 bl.get_num_buffers() <= 1) {
3415 // this is here to keep previous users to *relied* on getting data
3416 // read into existing buffers happy. Notably,
3417 // libradosstriper::RadosStriperImpl::aio_read().
3418 ldout(cct,10) << __func__ << " copying resulting " << bl.length()
9f95a23c 3419 << " into existing ceph::buffer of length " << op->outbl->length()
11fdf7f2 3420 << dendl;
f67539c2
TL
3421 cb::list t;
3422 t = std::move(*op->outbl);
11fdf7f2 3423 t.invalidate_crc(); // we're overwriting the raw buffers via c_str()
9f95a23c 3424 bl.begin().copy(bl.length(), t.c_str());
11fdf7f2
TL
3425 op->outbl->substr_of(t, 0, bl.length());
3426 } else {
3427 m->claim_data(*op->outbl);
3428 }
7c673cae
FG
3429 op->outbl = 0;
3430 }
3431
3432 // per-op result demuxing
3433 vector<OSDOp> out_ops;
3434 m->claim_ops(out_ops);
3435
3436 if (out_ops.size() != op->ops.size())
3437 ldout(cct, 0) << "WARNING: tid " << op->tid << " reply ops " << out_ops
3438 << " != request ops " << op->ops
3439 << " from " << m->get_source_inst() << dendl;
3440
f67539c2
TL
3441 ceph_assert(op->ops.size() == op->out_bl.size());
3442 ceph_assert(op->ops.size() == op->out_rval.size());
3443 ceph_assert(op->ops.size() == op->out_ec.size());
3444 ceph_assert(op->ops.size() == op->out_handler.size());
3445 auto pb = op->out_bl.begin();
3446 auto pr = op->out_rval.begin();
3447 auto pe = op->out_ec.begin();
3448 auto ph = op->out_handler.begin();
11fdf7f2
TL
3449 ceph_assert(op->out_bl.size() == op->out_rval.size());
3450 ceph_assert(op->out_bl.size() == op->out_handler.size());
f67539c2 3451 auto p = out_ops.begin();
7c673cae
FG
3452 for (unsigned i = 0;
3453 p != out_ops.end() && pb != op->out_bl.end();
f67539c2 3454 ++i, ++p, ++pb, ++pr, ++pe, ++ph) {
7c673cae
FG
3455 ldout(cct, 10) << " op " << i << " rval " << p->rval
3456 << " len " << p->outdata.length() << dendl;
3457 if (*pb)
3458 **pb = p->outdata;
3459 // set rval before running handlers so that handlers
3460 // can change it if e.g. decoding fails
3461 if (*pr)
31f18b77 3462 **pr = ceph_to_hostos_errno(p->rval);
f67539c2
TL
3463 if (*pe)
3464 **pe = p->rval < 0 ? bs::error_code(-p->rval, osd_category()) :
3465 bs::error_code();
7c673cae 3466 if (*ph) {
f67539c2
TL
3467 std::move((*ph))(p->rval < 0 ?
3468 bs::error_code(-p->rval, osd_category()) :
3469 bs::error_code(),
3470 p->rval, p->outdata);
7c673cae
FG
3471 }
3472 }
3473
3474 // NOTE: we assume that since we only request ONDISK ever we will
3475 // only ever get back one (type of) ack ever.
3476
f67539c2 3477 if (op->has_completion()) {
31f18b77 3478 num_in_flight--;
f67539c2
TL
3479 onfinish = std::move(op->onfinish);
3480 op->onfinish = nullptr;
7c673cae
FG
3481 }
3482 logger->inc(l_osdc_op_reply);
3483
3484 /* get it before we call _finish_op() */
3485 auto completion_lock = s->get_lock(op->target.base_oid);
3486
3487 ldout(cct, 15) << "handle_osd_op_reply completed tid " << tid << dendl;
3488 _finish_op(op, 0);
3489
31f18b77 3490 ldout(cct, 5) << num_in_flight << " in flight" << dendl;
7c673cae
FG
3491
3492 // serialize completions
3493 if (completion_lock.mutex()) {
3494 completion_lock.lock();
3495 }
3496 sl.unlock();
3497
3498 // do callbacks
f67539c2
TL
3499 if (Op::has_completion(onfinish)) {
3500 Op::complete(std::move(onfinish), osdcode(rc), rc);
7c673cae
FG
3501 }
3502 if (completion_lock.mutex()) {
3503 completion_lock.unlock();
3504 }
3505
3506 m->put();
7c673cae
FG
3507}
3508
3509void Objecter::handle_osd_backoff(MOSDBackoff *m)
3510{
3511 ldout(cct, 10) << __func__ << " " << *m << dendl;
3512 shunique_lock sul(rwlock, ceph::acquire_shared);
31f18b77 3513 if (!initialized) {
7c673cae
FG
3514 m->put();
3515 return;
3516 }
3517
3518 ConnectionRef con = m->get_connection();
11fdf7f2
TL
3519 auto priv = con->get_priv();
3520 auto s = static_cast<OSDSession*>(priv.get());
7c673cae
FG
3521 if (!s || s->con != con) {
3522 ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
7c673cae
FG
3523 m->put();
3524 return;
3525 }
3526
3527 get_session(s);
7c673cae 3528
f67539c2 3529 unique_lock sl(s->lock);
7c673cae
FG
3530
3531 switch (m->op) {
3532 case CEPH_OSD_BACKOFF_OP_BLOCK:
3533 {
3534 // register
3535 OSDBackoff& b = s->backoffs[m->pgid][m->begin];
3536 s->backoffs_by_id.insert(make_pair(m->id, &b));
3537 b.pgid = m->pgid;
3538 b.id = m->id;
3539 b.begin = m->begin;
3540 b.end = m->end;
3541
3542 // ack with original backoff's epoch so that the osd can discard this if
3543 // there was a pg split.
f67539c2
TL
3544 auto r = new MOSDBackoff(m->pgid, m->map_epoch,
3545 CEPH_OSD_BACKOFF_OP_ACK_BLOCK,
3546 m->id, m->begin, m->end);
7c673cae
FG
3547 // this priority must match the MOSDOps from _prepare_osd_op
3548 r->set_priority(cct->_conf->osd_client_op_priority);
3549 con->send_message(r);
3550 }
3551 break;
3552
3553 case CEPH_OSD_BACKOFF_OP_UNBLOCK:
3554 {
3555 auto p = s->backoffs_by_id.find(m->id);
3556 if (p != s->backoffs_by_id.end()) {
3557 OSDBackoff *b = p->second;
3558 if (b->begin != m->begin &&
3559 b->end != m->end) {
3560 lderr(cct) << __func__ << " got " << m->pgid << " id " << m->id
3561 << " unblock on ["
3562 << m->begin << "," << m->end << ") but backoff is ["
3563 << b->begin << "," << b->end << ")" << dendl;
3564 // hrmpf, unblock it anyway.
3565 }
3566 ldout(cct, 10) << __func__ << " unblock backoff " << b->pgid
3567 << " id " << b->id
3568 << " [" << b->begin << "," << b->end
3569 << ")" << dendl;
3570 auto spgp = s->backoffs.find(b->pgid);
11fdf7f2 3571 ceph_assert(spgp != s->backoffs.end());
7c673cae
FG
3572 spgp->second.erase(b->begin);
3573 if (spgp->second.empty()) {
3574 s->backoffs.erase(spgp);
3575 }
3576 s->backoffs_by_id.erase(p);
3577
3578 // check for any ops to resend
3579 for (auto& q : s->ops) {
3580 if (q.second->target.actual_pgid == m->pgid) {
3581 int r = q.second->target.contained_by(m->begin, m->end);
3582 ldout(cct, 20) << __func__ << " contained_by " << r << " on "
3583 << q.second->target.get_hobj() << dendl;
3584 if (r) {
3585 _send_op(q.second);
3586 }
3587 }
3588 }
3589 } else {
3590 lderr(cct) << __func__ << " " << m->pgid << " id " << m->id
3591 << " unblock on ["
3592 << m->begin << "," << m->end << ") but backoff dne" << dendl;
3593 }
3594 }
3595 break;
3596
3597 default:
3598 ldout(cct, 10) << __func__ << " unrecognized op " << (int)m->op << dendl;
3599 }
3600
3601 sul.unlock();
3602 sl.unlock();
3603
3604 m->put();
3605 put_session(s);
3606}
3607
3608uint32_t Objecter::list_nobjects_seek(NListContext *list_context,
3609 uint32_t pos)
3610{
3611 shared_lock rl(rwlock);
3612 list_context->pos = hobject_t(object_t(), string(), CEPH_NOSNAP,
3613 pos, list_context->pool_id, string());
11fdf7f2 3614 ldout(cct, 10) << __func__ << " " << list_context
7c673cae
FG
3615 << " pos " << pos << " -> " << list_context->pos << dendl;
3616 pg_t actual = osdmap->raw_pg_to_pg(pg_t(pos, list_context->pool_id));
3617 list_context->current_pg = actual.ps();
3618 list_context->at_end_of_pool = false;
3619 return pos;
3620}
3621
3622uint32_t Objecter::list_nobjects_seek(NListContext *list_context,
3623 const hobject_t& cursor)
3624{
3625 shared_lock rl(rwlock);
3626 ldout(cct, 10) << "list_nobjects_seek " << list_context << dendl;
3627 list_context->pos = cursor;
3628 list_context->at_end_of_pool = false;
3629 pg_t actual = osdmap->raw_pg_to_pg(pg_t(cursor.get_hash(), list_context->pool_id));
3630 list_context->current_pg = actual.ps();
3631 list_context->sort_bitwise = true;
3632 return list_context->current_pg;
3633}
3634
3635void Objecter::list_nobjects_get_cursor(NListContext *list_context,
3636 hobject_t *cursor)
3637{
3638 shared_lock rl(rwlock);
3639 if (list_context->list.empty()) {
3640 *cursor = list_context->pos;
3641 } else {
3642 const librados::ListObjectImpl& entry = list_context->list.front();
3643 const string *key = (entry.locator.empty() ? &entry.oid : &entry.locator);
3644 uint32_t h = osdmap->get_pg_pool(list_context->pool_id)->hash_key(*key, entry.nspace);
3645 *cursor = hobject_t(entry.oid, entry.locator, list_context->pool_snap_seq, h, list_context->pool_id, entry.nspace);
3646 }
3647}
3648
3649void Objecter::list_nobjects(NListContext *list_context, Context *onfinish)
3650{
3651 ldout(cct, 10) << __func__ << " pool_id " << list_context->pool_id
3652 << " pool_snap_seq " << list_context->pool_snap_seq
3653 << " max_entries " << list_context->max_entries
3654 << " list_context " << list_context
3655 << " onfinish " << onfinish
3656 << " current_pg " << list_context->current_pg
3657 << " pos " << list_context->pos << dendl;
3658
3659 shared_lock rl(rwlock);
3660 const pg_pool_t *pool = osdmap->get_pg_pool(list_context->pool_id);
3661 if (!pool) { // pool is gone
3662 rl.unlock();
3663 put_nlist_context_budget(list_context);
3664 onfinish->complete(-ENOENT);
3665 return;
3666 }
3667 int pg_num = pool->get_pg_num();
3668 bool sort_bitwise = osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE);
3669
3670 if (list_context->pos.is_min()) {
3671 list_context->starting_pg_num = 0;
3672 list_context->sort_bitwise = sort_bitwise;
3673 list_context->starting_pg_num = pg_num;
3674 }
3675 if (list_context->sort_bitwise != sort_bitwise) {
3676 list_context->pos = hobject_t(
3677 object_t(), string(), CEPH_NOSNAP,
3678 list_context->current_pg, list_context->pool_id, string());
3679 list_context->sort_bitwise = sort_bitwise;
3680 ldout(cct, 10) << " hobject sort order changed, restarting this pg at "
3681 << list_context->pos << dendl;
3682 }
3683 if (list_context->starting_pg_num != pg_num) {
3684 if (!sort_bitwise) {
3685 // start reading from the beginning; the pgs have changed
3686 ldout(cct, 10) << " pg_num changed; restarting with " << pg_num << dendl;
3687 list_context->pos = collection_list_handle_t();
3688 }
3689 list_context->starting_pg_num = pg_num;
3690 }
3691
3692 if (list_context->pos.is_max()) {
3693 ldout(cct, 20) << __func__ << " end of pool, list "
3694 << list_context->list << dendl;
3695 if (list_context->list.empty()) {
3696 list_context->at_end_of_pool = true;
3697 }
3698 // release the listing context's budget once all
3699 // OPs (in the session) are finished
3700 put_nlist_context_budget(list_context);
3701 onfinish->complete(0);
3702 return;
3703 }
3704
3705 ObjectOperation op;
3706 op.pg_nls(list_context->max_entries, list_context->filter,
3707 list_context->pos, osdmap->get_epoch());
3708 list_context->bl.clear();
f67539c2 3709 auto onack = new C_NList(list_context, onfinish, this);
7c673cae
FG
3710 object_locator_t oloc(list_context->pool_id, list_context->nspace);
3711
3712 // note current_pg in case we don't have (or lose) SORTBITWISE
3713 list_context->current_pg = pool->raw_hash_to_pg(list_context->pos.get_hash());
3714 rl.unlock();
3715
3716 pg_read(list_context->current_pg, oloc, op,
3717 &list_context->bl, 0, onack, &onack->epoch,
3718 &list_context->ctx_budget);
3719}
3720
3721void Objecter::_nlist_reply(NListContext *list_context, int r,
3722 Context *final_finish, epoch_t reply_epoch)
3723{
3724 ldout(cct, 10) << __func__ << " " << list_context << dendl;
3725
11fdf7f2 3726 auto iter = list_context->bl.cbegin();
7c673cae 3727 pg_nls_response_t response;
11fdf7f2 3728 decode(response, iter);
7c673cae 3729 if (!iter.end()) {
9f95a23c 3730 // we do this as legacy.
f67539c2 3731 cb::list legacy_extra_info;
9f95a23c 3732 decode(legacy_extra_info, iter);
7c673cae
FG
3733 }
3734
3735 // if the osd returns 1 (newer code), or handle MAX, it means we
3736 // hit the end of the pg.
3737 if ((response.handle.is_max() || r == 1) &&
3738 !list_context->sort_bitwise) {
3739 // legacy OSD and !sortbitwise, figure out the next PG on our own
3740 ++list_context->current_pg;
3741 if (list_context->current_pg == list_context->starting_pg_num) {
3742 // end of pool
3743 list_context->pos = hobject_t::get_max();
3744 } else {
3745 // next pg
3746 list_context->pos = hobject_t(object_t(), string(), CEPH_NOSNAP,
3747 list_context->current_pg,
3748 list_context->pool_id, string());
3749 }
3750 } else {
3751 list_context->pos = response.handle;
3752 }
3753
3754 int response_size = response.entries.size();
3755 ldout(cct, 20) << " response.entries.size " << response_size
3756 << ", response.entries " << response.entries
3757 << ", handle " << response.handle
3758 << ", tentative new pos " << list_context->pos << dendl;
7c673cae 3759 if (response_size) {
f67539c2
TL
3760 std::move(response.entries.begin(), response.entries.end(),
3761 std::back_inserter(list_context->list));
3762 response.entries.clear();
7c673cae
FG
3763 }
3764
3765 if (list_context->list.size() >= list_context->max_entries) {
3766 ldout(cct, 20) << " hit max, returning results so far, "
3767 << list_context->list << dendl;
3768 // release the listing context's budget once all
3769 // OPs (in the session) are finished
3770 put_nlist_context_budget(list_context);
3771 final_finish->complete(0);
3772 return;
3773 }
3774
3775 // continue!
3776 list_nobjects(list_context, final_finish);
3777}
3778
3779void Objecter::put_nlist_context_budget(NListContext *list_context)
3780{
3781 if (list_context->ctx_budget >= 0) {
3782 ldout(cct, 10) << " release listing context's budget " <<
3783 list_context->ctx_budget << dendl;
3784 put_op_budget_bytes(list_context->ctx_budget);
3785 list_context->ctx_budget = -1;
3786 }
3787}
3788
3789// snapshots
3790
f67539c2
TL
3791void Objecter::create_pool_snap(int64_t pool, std::string_view snap_name,
3792 decltype(PoolOp::onfinish)&& onfinish)
7c673cae
FG
3793{
3794 unique_lock wl(rwlock);
3795 ldout(cct, 10) << "create_pool_snap; pool: " << pool << "; snap: "
3796 << snap_name << dendl;
3797
3798 const pg_pool_t *p = osdmap->get_pg_pool(pool);
f67539c2
TL
3799 if (!p) {
3800 onfinish->defer(std::move(onfinish), osdc_errc::pool_dne, cb::list{});
3801 return;
3802 }
3803 if (p->snap_exists(snap_name)) {
3804 onfinish->defer(std::move(onfinish), osdc_errc::snapshot_exists,
3805 cb::list{});
3806 return;
3807 }
7c673cae 3808
f67539c2 3809 auto op = new PoolOp;
31f18b77 3810 op->tid = ++last_tid;
7c673cae
FG
3811 op->pool = pool;
3812 op->name = snap_name;
f67539c2 3813 op->onfinish = std::move(onfinish);
7c673cae
FG
3814 op->pool_op = POOL_OP_CREATE_SNAP;
3815 pool_ops[op->tid] = op;
3816
3817 pool_op_submit(op);
7c673cae
FG
3818}
3819
f67539c2
TL
3820struct CB_SelfmanagedSnap {
3821 std::unique_ptr<ca::Completion<void(bs::error_code, snapid_t)>> fin;
3822 CB_SelfmanagedSnap(decltype(fin)&& fin)
3823 : fin(std::move(fin)) {}
3824 void operator()(bs::error_code ec, const cb::list& bl) {
3825 snapid_t snapid = 0;
3826 if (!ec) {
11fdf7f2 3827 try {
f67539c2
TL
3828 auto p = bl.cbegin();
3829 decode(snapid, p);
3830 } catch (const cb::error& e) {
3831 ec = e.code();
11fdf7f2 3832 }
7c673cae 3833 }
f67539c2 3834 fin->defer(std::move(fin), ec, snapid);
7c673cae
FG
3835 }
3836};
3837
f67539c2
TL
3838void Objecter::allocate_selfmanaged_snap(
3839 int64_t pool,
3840 std::unique_ptr<ca::Completion<void(bs::error_code, snapid_t)>> onfinish)
7c673cae
FG
3841{
3842 unique_lock wl(rwlock);
3843 ldout(cct, 10) << "allocate_selfmanaged_snap; pool: " << pool << dendl;
f67539c2 3844 auto op = new PoolOp;
31f18b77 3845 op->tid = ++last_tid;
7c673cae 3846 op->pool = pool;
f67539c2
TL
3847 op->onfinish = PoolOp::OpComp::create(
3848 service.get_executor(),
3849 CB_SelfmanagedSnap(std::move(onfinish)));
7c673cae
FG
3850 op->pool_op = POOL_OP_CREATE_UNMANAGED_SNAP;
3851 pool_ops[op->tid] = op;
3852
3853 pool_op_submit(op);
7c673cae
FG
3854}
3855
f67539c2
TL
3856void Objecter::delete_pool_snap(
3857 int64_t pool, std::string_view snap_name,
3858 decltype(PoolOp::onfinish)&& onfinish)
7c673cae
FG
3859{
3860 unique_lock wl(rwlock);
3861 ldout(cct, 10) << "delete_pool_snap; pool: " << pool << "; snap: "
3862 << snap_name << dendl;
3863
3864 const pg_pool_t *p = osdmap->get_pg_pool(pool);
f67539c2
TL
3865 if (!p) {
3866 onfinish->defer(std::move(onfinish), osdc_errc::pool_dne, cb::list{});
3867 return;
3868 }
7c673cae 3869
f67539c2
TL
3870 if (!p->snap_exists(snap_name)) {
3871 onfinish->defer(std::move(onfinish), osdc_errc::snapshot_dne, cb::list{});
3872 return;
3873 }
3874
3875 auto op = new PoolOp;
31f18b77 3876 op->tid = ++last_tid;
7c673cae
FG
3877 op->pool = pool;
3878 op->name = snap_name;
f67539c2 3879 op->onfinish = std::move(onfinish);
7c673cae
FG
3880 op->pool_op = POOL_OP_DELETE_SNAP;
3881 pool_ops[op->tid] = op;
3882
3883 pool_op_submit(op);
7c673cae
FG
3884}
3885
f67539c2
TL
3886void Objecter::delete_selfmanaged_snap(int64_t pool, snapid_t snap,
3887 decltype(PoolOp::onfinish)&& onfinish)
7c673cae
FG
3888{
3889 unique_lock wl(rwlock);
3890 ldout(cct, 10) << "delete_selfmanaged_snap; pool: " << pool << "; snap: "
3891 << snap << dendl;
f67539c2 3892 auto op = new PoolOp;
31f18b77 3893 op->tid = ++last_tid;
7c673cae 3894 op->pool = pool;
f67539c2 3895 op->onfinish = std::move(onfinish);
7c673cae
FG
3896 op->pool_op = POOL_OP_DELETE_UNMANAGED_SNAP;
3897 op->snapid = snap;
3898 pool_ops[op->tid] = op;
3899
3900 pool_op_submit(op);
7c673cae
FG
3901}
3902
f67539c2
TL
3903void Objecter::create_pool(std::string_view name,
3904 decltype(PoolOp::onfinish)&& onfinish,
3905 int crush_rule)
7c673cae
FG
3906{
3907 unique_lock wl(rwlock);
3908 ldout(cct, 10) << "create_pool name=" << name << dendl;
3909
f67539c2
TL
3910 if (osdmap->lookup_pg_pool_name(name) >= 0) {
3911 onfinish->defer(std::move(onfinish), osdc_errc::pool_exists, cb::list{});
3912 return;
3913 }
7c673cae 3914
f67539c2 3915 auto op = new PoolOp;
31f18b77 3916 op->tid = ++last_tid;
7c673cae
FG
3917 op->pool = 0;
3918 op->name = name;
f67539c2 3919 op->onfinish = std::move(onfinish);
7c673cae
FG
3920 op->pool_op = POOL_OP_CREATE;
3921 pool_ops[op->tid] = op;
7c673cae
FG
3922 op->crush_rule = crush_rule;
3923
3924 pool_op_submit(op);
7c673cae
FG
3925}
3926
f67539c2
TL
3927void Objecter::delete_pool(int64_t pool,
3928 decltype(PoolOp::onfinish)&& onfinish)
7c673cae
FG
3929{
3930 unique_lock wl(rwlock);
3931 ldout(cct, 10) << "delete_pool " << pool << dendl;
3932
3933 if (!osdmap->have_pg_pool(pool))
f67539c2
TL
3934 onfinish->defer(std::move(onfinish), osdc_errc::pool_dne, cb::list{});
3935 else
3936 _do_delete_pool(pool, std::move(onfinish));
7c673cae
FG
3937}
3938
f67539c2
TL
3939void Objecter::delete_pool(std::string_view pool_name,
3940 decltype(PoolOp::onfinish)&& onfinish)
7c673cae
FG
3941{
3942 unique_lock wl(rwlock);
3943 ldout(cct, 10) << "delete_pool " << pool_name << dendl;
3944
3945 int64_t pool = osdmap->lookup_pg_pool_name(pool_name);
3946 if (pool < 0)
f67539c2
TL
3947 // This only returns one error: -ENOENT.
3948 onfinish->defer(std::move(onfinish), osdc_errc::pool_dne, cb::list{});
3949 else
3950 _do_delete_pool(pool, std::move(onfinish));
7c673cae
FG
3951}
3952
f67539c2
TL
3953void Objecter::_do_delete_pool(int64_t pool,
3954 decltype(PoolOp::onfinish)&& onfinish)
3955
7c673cae 3956{
f67539c2 3957 auto op = new PoolOp;
31f18b77 3958 op->tid = ++last_tid;
7c673cae
FG
3959 op->pool = pool;
3960 op->name = "delete";
f67539c2 3961 op->onfinish = std::move(onfinish);
7c673cae
FG
3962 op->pool_op = POOL_OP_DELETE;
3963 pool_ops[op->tid] = op;
3964 pool_op_submit(op);
3965}
3966
7c673cae
FG
3967void Objecter::pool_op_submit(PoolOp *op)
3968{
3969 // rwlock is locked
3970 if (mon_timeout > timespan(0)) {
3971 op->ontimeout = timer.add_event(mon_timeout,
3972 [this, op]() {
3973 pool_op_cancel(op->tid, -ETIMEDOUT); });
3974 }
3975 _pool_op_submit(op);
3976}
3977
3978void Objecter::_pool_op_submit(PoolOp *op)
3979{
3980 // rwlock is locked unique
3981
3982 ldout(cct, 10) << "pool_op_submit " << op->tid << dendl;
f67539c2
TL
3983 auto m = new MPoolOp(monc->get_fsid(), op->tid, op->pool,
3984 op->name, op->pool_op,
3985 last_seen_osdmap_version);
7c673cae
FG
3986 if (op->snapid) m->snapid = op->snapid;
3987 if (op->crush_rule) m->crush_rule = op->crush_rule;
3988 monc->send_mon_message(m);
11fdf7f2 3989 op->last_submit = ceph::coarse_mono_clock::now();
7c673cae
FG
3990
3991 logger->inc(l_osdc_poolop_send);
3992}
3993
3994/**
3995 * Handle a reply to a PoolOp message. Check that we sent the message
f67539c2 3996 * and give the caller responsibility for the returned cb::list.
7c673cae
FG
3997 * Then either call the finisher or stash the PoolOp, depending on if we
3998 * have a new enough map.
3999 * Lastly, clean up the message and PoolOp.
4000 */
4001void Objecter::handle_pool_op_reply(MPoolOpReply *m)
4002{
f67539c2
TL
4003 int rc = m->replyCode;
4004 auto ec = rc < 0 ? bs::error_code(-rc, mon_category()) : bs::error_code();
11fdf7f2 4005 FUNCTRACE(cct);
7c673cae 4006 shunique_lock sul(rwlock, acquire_shared);
31f18b77 4007 if (!initialized) {
7c673cae
FG
4008 sul.unlock();
4009 m->put();
4010 return;
4011 }
4012
4013 ldout(cct, 10) << "handle_pool_op_reply " << *m << dendl;
4014 ceph_tid_t tid = m->get_tid();
f67539c2 4015 auto iter = pool_ops.find(tid);
7c673cae
FG
4016 if (iter != pool_ops.end()) {
4017 PoolOp *op = iter->second;
4018 ldout(cct, 10) << "have request " << tid << " at " << op << " Op: "
4019 << ceph_pool_op_name(op->pool_op) << dendl;
f67539c2 4020 cb::list bl{std::move(m->response_data)};
7c673cae
FG
4021 if (m->version > last_seen_osdmap_version)
4022 last_seen_osdmap_version = m->version;
4023 if (osdmap->get_epoch() < m->epoch) {
4024 sul.unlock();
4025 sul.lock();
4026 // recheck op existence since we have let go of rwlock
4027 // (for promotion) above.
4028 iter = pool_ops.find(tid);
4029 if (iter == pool_ops.end())
4030 goto done; // op is gone.
4031 if (osdmap->get_epoch() < m->epoch) {
4032 ldout(cct, 20) << "waiting for client to reach epoch " << m->epoch
4033 << " before calling back" << dendl;
f67539c2
TL
4034 _wait_for_new_map(OpCompletion::create(
4035 service.get_executor(),
4036 [o = std::move(op->onfinish),
4037 bl = std::move(bl)](
4038 bs::error_code ec) mutable {
4039 o->defer(std::move(o), ec, bl);
4040 }),
4041 m->epoch,
4042 ec);
7c673cae
FG
4043 } else {
4044 // map epoch changed, probably because a MOSDMap message
4045 // sneaked in. Do caller-specified callback now or else
4046 // we lose it forever.
11fdf7f2 4047 ceph_assert(op->onfinish);
f67539c2 4048 op->onfinish->defer(std::move(op->onfinish), ec, std::move(bl));
7c673cae
FG
4049 }
4050 } else {
11fdf7f2 4051 ceph_assert(op->onfinish);
f67539c2 4052 op->onfinish->defer(std::move(op->onfinish), ec, std::move(bl));
7c673cae 4053 }
f67539c2 4054 op->onfinish = nullptr;
7c673cae
FG
4055 if (!sul.owns_lock()) {
4056 sul.unlock();
4057 sul.lock();
4058 }
4059 iter = pool_ops.find(tid);
4060 if (iter != pool_ops.end()) {
4061 _finish_pool_op(op, 0);
4062 }
4063 } else {
4064 ldout(cct, 10) << "unknown request " << tid << dendl;
4065 }
4066
4067done:
4068 // Not strictly necessary, since we'll release it on return.
4069 sul.unlock();
4070
4071 ldout(cct, 10) << "done" << dendl;
4072 m->put();
4073}
4074
4075int Objecter::pool_op_cancel(ceph_tid_t tid, int r)
4076{
11fdf7f2 4077 ceph_assert(initialized);
7c673cae
FG
4078
4079 unique_lock wl(rwlock);
4080
f67539c2 4081 auto it = pool_ops.find(tid);
7c673cae
FG
4082 if (it == pool_ops.end()) {
4083 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4084 return -ENOENT;
4085 }
4086
4087 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4088
4089 PoolOp *op = it->second;
4090 if (op->onfinish)
f67539c2 4091 op->onfinish->defer(std::move(op->onfinish), osdcode(r), cb::list{});
7c673cae
FG
4092
4093 _finish_pool_op(op, r);
4094 return 0;
4095}
4096
4097void Objecter::_finish_pool_op(PoolOp *op, int r)
4098{
4099 // rwlock is locked unique
4100 pool_ops.erase(op->tid);
4101 logger->set(l_osdc_poolop_active, pool_ops.size());
4102
4103 if (op->ontimeout && r != -ETIMEDOUT) {
4104 timer.cancel_event(op->ontimeout);
4105 }
4106
4107 delete op;
4108}
4109
4110// pool stats
4111
f67539c2
TL
4112void Objecter::get_pool_stats(
4113 const std::vector<std::string>& pools,
4114 decltype(PoolStatOp::onfinish)&& onfinish)
7c673cae
FG
4115{
4116 ldout(cct, 10) << "get_pool_stats " << pools << dendl;
4117
f67539c2 4118 auto op = new PoolStatOp;
31f18b77 4119 op->tid = ++last_tid;
7c673cae 4120 op->pools = pools;
f67539c2 4121 op->onfinish = std::move(onfinish);
7c673cae
FG
4122 if (mon_timeout > timespan(0)) {
4123 op->ontimeout = timer.add_event(mon_timeout,
4124 [this, op]() {
4125 pool_stat_op_cancel(op->tid,
4126 -ETIMEDOUT); });
4127 } else {
4128 op->ontimeout = 0;
4129 }
4130
4131 unique_lock wl(rwlock);
4132
4133 poolstat_ops[op->tid] = op;
4134
4135 logger->set(l_osdc_poolstat_active, poolstat_ops.size());
4136
4137 _poolstat_submit(op);
4138}
4139
4140void Objecter::_poolstat_submit(PoolStatOp *op)
4141{
4142 ldout(cct, 10) << "_poolstat_submit " << op->tid << dendl;
4143 monc->send_mon_message(new MGetPoolStats(monc->get_fsid(), op->tid,
4144 op->pools,
4145 last_seen_pgmap_version));
11fdf7f2 4146 op->last_submit = ceph::coarse_mono_clock::now();
7c673cae
FG
4147
4148 logger->inc(l_osdc_poolstat_send);
4149}
4150
4151void Objecter::handle_get_pool_stats_reply(MGetPoolStatsReply *m)
4152{
4153 ldout(cct, 10) << "handle_get_pool_stats_reply " << *m << dendl;
4154 ceph_tid_t tid = m->get_tid();
4155
4156 unique_lock wl(rwlock);
31f18b77 4157 if (!initialized) {
7c673cae
FG
4158 m->put();
4159 return;
4160 }
4161
f67539c2 4162 auto iter = poolstat_ops.find(tid);
7c673cae
FG
4163 if (iter != poolstat_ops.end()) {
4164 PoolStatOp *op = poolstat_ops[tid];
4165 ldout(cct, 10) << "have request " << tid << " at " << op << dendl;
7c673cae
FG
4166 if (m->version > last_seen_pgmap_version) {
4167 last_seen_pgmap_version = m->version;
4168 }
f67539c2
TL
4169 op->onfinish->defer(std::move(op->onfinish), bs::error_code{},
4170 std::move(m->pool_stats), m->per_pool);
7c673cae
FG
4171 _finish_pool_stat_op(op, 0);
4172 } else {
4173 ldout(cct, 10) << "unknown request " << tid << dendl;
4174 }
4175 ldout(cct, 10) << "done" << dendl;
4176 m->put();
4177}
4178
4179int Objecter::pool_stat_op_cancel(ceph_tid_t tid, int r)
4180{
11fdf7f2 4181 ceph_assert(initialized);
7c673cae
FG
4182
4183 unique_lock wl(rwlock);
4184
f67539c2 4185 auto it = poolstat_ops.find(tid);
7c673cae
FG
4186 if (it == poolstat_ops.end()) {
4187 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4188 return -ENOENT;
4189 }
4190
4191 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4192
f67539c2 4193 auto op = it->second;
7c673cae 4194 if (op->onfinish)
f67539c2
TL
4195 op->onfinish->defer(std::move(op->onfinish), osdcode(r),
4196 bc::flat_map<std::string, pool_stat_t>{}, false);
7c673cae
FG
4197 _finish_pool_stat_op(op, r);
4198 return 0;
4199}
4200
4201void Objecter::_finish_pool_stat_op(PoolStatOp *op, int r)
4202{
4203 // rwlock is locked unique
4204
4205 poolstat_ops.erase(op->tid);
4206 logger->set(l_osdc_poolstat_active, poolstat_ops.size());
4207
4208 if (op->ontimeout && r != -ETIMEDOUT)
4209 timer.cancel_event(op->ontimeout);
4210
4211 delete op;
4212}
4213
f67539c2
TL
4214void Objecter::get_fs_stats(boost::optional<int64_t> poolid,
4215 decltype(StatfsOp::onfinish)&& onfinish)
7c673cae
FG
4216{
4217 ldout(cct, 10) << "get_fs_stats" << dendl;
4218 unique_lock l(rwlock);
4219
f67539c2 4220 auto op = new StatfsOp;
31f18b77 4221 op->tid = ++last_tid;
f67539c2
TL
4222 op->data_pool = poolid;
4223 op->onfinish = std::move(onfinish);
7c673cae
FG
4224 if (mon_timeout > timespan(0)) {
4225 op->ontimeout = timer.add_event(mon_timeout,
4226 [this, op]() {
4227 statfs_op_cancel(op->tid,
4228 -ETIMEDOUT); });
4229 } else {
4230 op->ontimeout = 0;
4231 }
4232 statfs_ops[op->tid] = op;
4233
4234 logger->set(l_osdc_statfs_active, statfs_ops.size());
4235
4236 _fs_stats_submit(op);
4237}
4238
4239void Objecter::_fs_stats_submit(StatfsOp *op)
4240{
4241 // rwlock is locked unique
4242
4243 ldout(cct, 10) << "fs_stats_submit" << op->tid << dendl;
4244 monc->send_mon_message(new MStatfs(monc->get_fsid(), op->tid,
d2e6a577 4245 op->data_pool,
7c673cae 4246 last_seen_pgmap_version));
11fdf7f2 4247 op->last_submit = ceph::coarse_mono_clock::now();
7c673cae
FG
4248
4249 logger->inc(l_osdc_statfs_send);
4250}
4251
4252void Objecter::handle_fs_stats_reply(MStatfsReply *m)
4253{
4254 unique_lock wl(rwlock);
31f18b77 4255 if (!initialized) {
7c673cae
FG
4256 m->put();
4257 return;
4258 }
4259
4260 ldout(cct, 10) << "handle_fs_stats_reply " << *m << dendl;
4261 ceph_tid_t tid = m->get_tid();
4262
4263 if (statfs_ops.count(tid)) {
4264 StatfsOp *op = statfs_ops[tid];
4265 ldout(cct, 10) << "have request " << tid << " at " << op << dendl;
7c673cae
FG
4266 if (m->h.version > last_seen_pgmap_version)
4267 last_seen_pgmap_version = m->h.version;
f67539c2 4268 op->onfinish->defer(std::move(op->onfinish), bs::error_code{}, m->h.st);
7c673cae
FG
4269 _finish_statfs_op(op, 0);
4270 } else {
4271 ldout(cct, 10) << "unknown request " << tid << dendl;
4272 }
4273 m->put();
4274 ldout(cct, 10) << "done" << dendl;
4275}
4276
4277int Objecter::statfs_op_cancel(ceph_tid_t tid, int r)
4278{
11fdf7f2 4279 ceph_assert(initialized);
7c673cae
FG
4280
4281 unique_lock wl(rwlock);
4282
f67539c2 4283 auto it = statfs_ops.find(tid);
7c673cae
FG
4284 if (it == statfs_ops.end()) {
4285 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4286 return -ENOENT;
4287 }
4288
4289 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4290
f67539c2 4291 auto op = it->second;
7c673cae 4292 if (op->onfinish)
f67539c2 4293 op->onfinish->defer(std::move(op->onfinish), osdcode(r), ceph_statfs{});
7c673cae
FG
4294 _finish_statfs_op(op, r);
4295 return 0;
4296}
4297
4298void Objecter::_finish_statfs_op(StatfsOp *op, int r)
4299{
4300 // rwlock is locked unique
4301
4302 statfs_ops.erase(op->tid);
4303 logger->set(l_osdc_statfs_active, statfs_ops.size());
4304
4305 if (op->ontimeout && r != -ETIMEDOUT)
4306 timer.cancel_event(op->ontimeout);
4307
4308 delete op;
4309}
4310
4311// scatter/gather
4312
4313void Objecter::_sg_read_finish(vector<ObjectExtent>& extents,
f67539c2
TL
4314 vector<cb::list>& resultbl,
4315 cb::list *bl, Context *onfinish)
7c673cae
FG
4316{
4317 // all done
4318 ldout(cct, 15) << "_sg_read_finish" << dendl;
4319
4320 if (extents.size() > 1) {
4321 Striper::StripedReadResult r;
f67539c2
TL
4322 auto bit = resultbl.begin();
4323 for (auto eit = extents.begin();
7c673cae
FG
4324 eit != extents.end();
4325 ++eit, ++bit) {
4326 r.add_partial_result(cct, *bit, eit->buffer_extents);
4327 }
4328 bl->clear();
4329 r.assemble_result(cct, *bl, false);
4330 } else {
4331 ldout(cct, 15) << " only one frag" << dendl;
f67539c2 4332 *bl = std::move(resultbl[0]);
7c673cae
FG
4333 }
4334
4335 // done
4336 uint64_t bytes_read = bl->length();
4337 ldout(cct, 7) << "_sg_read_finish " << bytes_read << " bytes" << dendl;
4338
4339 if (onfinish) {
4340 onfinish->complete(bytes_read);// > 0 ? bytes_read:m->get_result());
4341 }
4342}
4343
4344
4345void Objecter::ms_handle_connect(Connection *con)
4346{
4347 ldout(cct, 10) << "ms_handle_connect " << con << dendl;
31f18b77 4348 if (!initialized)
7c673cae
FG
4349 return;
4350
4351 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON)
4352 resend_mon_ops();
4353}
4354
4355bool Objecter::ms_handle_reset(Connection *con)
4356{
31f18b77 4357 if (!initialized)
7c673cae
FG
4358 return false;
4359 if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
91327a77 4360 unique_lock wl(rwlock);
11fdf7f2
TL
4361
4362 auto priv = con->get_priv();
4363 auto session = static_cast<OSDSession*>(priv.get());
7c673cae
FG
4364 if (session) {
4365 ldout(cct, 1) << "ms_handle_reset " << con << " session " << session
4366 << " osd." << session->osd << dendl;
a8e16298
TL
4367 // the session maybe had been closed if new osdmap just handled
4368 // says the osd down
4369 if (!(initialized && osdmap->is_up(session->osd))) {
4370 ldout(cct, 1) << "ms_handle_reset aborted,initialized=" << initialized << dendl;
7c673cae
FG
4371 wl.unlock();
4372 return false;
4373 }
4374 map<uint64_t, LingerOp *> lresend;
f67539c2 4375 unique_lock sl(session->lock);
7c673cae
FG
4376 _reopen_session(session);
4377 _kick_requests(session, lresend);
4378 sl.unlock();
4379 _linger_ops_resend(lresend, wl);
4380 wl.unlock();
4381 maybe_request_map();
7c673cae
FG
4382 }
4383 return true;
4384 }
4385 return false;
4386}
4387
4388void Objecter::ms_handle_remote_reset(Connection *con)
4389{
4390 /*
4391 * treat these the same.
4392 */
4393 ms_handle_reset(con);
4394}
4395
4396bool Objecter::ms_handle_refused(Connection *con)
4397{
4398 // just log for now
4399 if (osdmap && (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD)) {
4400 int osd = osdmap->identify_osd(con->get_peer_addr());
4401 if (osd >= 0) {
4402 ldout(cct, 1) << "ms_handle_refused on osd." << osd << dendl;
4403 }
4404 }
4405 return false;
4406}
4407
7c673cae
FG
4408void Objecter::op_target_t::dump(Formatter *f) const
4409{
4410 f->dump_stream("pg") << pgid;
4411 f->dump_int("osd", osd);
4412 f->dump_stream("object_id") << base_oid;
4413 f->dump_stream("object_locator") << base_oloc;
4414 f->dump_stream("target_object_id") << target_oid;
4415 f->dump_stream("target_object_locator") << target_oloc;
4416 f->dump_int("paused", (int)paused);
4417 f->dump_int("used_replica", (int)used_replica);
4418 f->dump_int("precalc_pgid", (int)precalc_pgid);
4419}
4420
4421void Objecter::_dump_active(OSDSession *s)
4422{
f67539c2 4423 for (auto p = s->ops.begin(); p != s->ops.end(); ++p) {
7c673cae
FG
4424 Op *op = p->second;
4425 ldout(cct, 20) << op->tid << "\t" << op->target.pgid
4426 << "\tosd." << (op->session ? op->session->osd : -1)
4427 << "\t" << op->target.base_oid
4428 << "\t" << op->ops << dendl;
4429 }
4430}
4431
4432void Objecter::_dump_active()
4433{
31f18b77 4434 ldout(cct, 20) << "dump_active .. " << num_homeless_ops << " homeless"
7c673cae 4435 << dendl;
f67539c2 4436 for (auto siter = osd_sessions.begin();
7c673cae 4437 siter != osd_sessions.end(); ++siter) {
f67539c2
TL
4438 auto s = siter->second;
4439 shared_lock sl(s->lock);
7c673cae
FG
4440 _dump_active(s);
4441 sl.unlock();
4442 }
4443 _dump_active(homeless_session);
4444}
4445
4446void Objecter::dump_active()
4447{
4448 shared_lock rl(rwlock);
4449 _dump_active();
4450 rl.unlock();
4451}
4452
4453void Objecter::dump_requests(Formatter *fmt)
4454{
4455 // Read-lock on Objecter held here
4456 fmt->open_object_section("requests");
4457 dump_ops(fmt);
4458 dump_linger_ops(fmt);
4459 dump_pool_ops(fmt);
4460 dump_pool_stat_ops(fmt);
4461 dump_statfs_ops(fmt);
4462 dump_command_ops(fmt);
4463 fmt->close_section(); // requests object
4464}
4465
4466void Objecter::_dump_ops(const OSDSession *s, Formatter *fmt)
4467{
f67539c2 4468 for (auto p = s->ops.begin(); p != s->ops.end(); ++p) {
7c673cae 4469 Op *op = p->second;
f67539c2 4470 auto age = std::chrono::duration<double>(ceph::coarse_mono_clock::now() - op->stamp);
7c673cae
FG
4471 fmt->open_object_section("op");
4472 fmt->dump_unsigned("tid", op->tid);
4473 op->target.dump(fmt);
4474 fmt->dump_stream("last_sent") << op->stamp;
92f5a8d4 4475 fmt->dump_float("age", age.count());
7c673cae
FG
4476 fmt->dump_int("attempts", op->attempts);
4477 fmt->dump_stream("snapid") << op->snapid;
4478 fmt->dump_stream("snap_context") << op->snapc;
4479 fmt->dump_stream("mtime") << op->mtime;
4480
4481 fmt->open_array_section("osd_ops");
f67539c2 4482 for (auto it = op->ops.begin(); it != op->ops.end(); ++it) {
7c673cae
FG
4483 fmt->dump_stream("osd_op") << *it;
4484 }
4485 fmt->close_section(); // osd_ops array
4486
4487 fmt->close_section(); // op object
4488 }
4489}
4490
4491void Objecter::dump_ops(Formatter *fmt)
4492{
4493 // Read-lock on Objecter held
4494 fmt->open_array_section("ops");
f67539c2 4495 for (auto siter = osd_sessions.begin();
7c673cae
FG
4496 siter != osd_sessions.end(); ++siter) {
4497 OSDSession *s = siter->second;
f67539c2 4498 shared_lock sl(s->lock);
7c673cae
FG
4499 _dump_ops(s, fmt);
4500 sl.unlock();
4501 }
4502 _dump_ops(homeless_session, fmt);
4503 fmt->close_section(); // ops array
4504}
4505
4506void Objecter::_dump_linger_ops(const OSDSession *s, Formatter *fmt)
4507{
f67539c2
TL
4508 for (auto p = s->linger_ops.begin(); p != s->linger_ops.end(); ++p) {
4509 auto op = p->second;
7c673cae
FG
4510 fmt->open_object_section("linger_op");
4511 fmt->dump_unsigned("linger_id", op->linger_id);
4512 op->target.dump(fmt);
4513 fmt->dump_stream("snapid") << op->snap;
4514 fmt->dump_stream("registered") << op->registered;
4515 fmt->close_section(); // linger_op object
4516 }
4517}
4518
4519void Objecter::dump_linger_ops(Formatter *fmt)
4520{
4521 // We have a read-lock on the objecter
4522 fmt->open_array_section("linger_ops");
f67539c2 4523 for (auto siter = osd_sessions.begin();
7c673cae 4524 siter != osd_sessions.end(); ++siter) {
f67539c2
TL
4525 auto s = siter->second;
4526 shared_lock sl(s->lock);
7c673cae
FG
4527 _dump_linger_ops(s, fmt);
4528 sl.unlock();
4529 }
4530 _dump_linger_ops(homeless_session, fmt);
4531 fmt->close_section(); // linger_ops array
4532}
4533
4534void Objecter::_dump_command_ops(const OSDSession *s, Formatter *fmt)
4535{
f67539c2
TL
4536 for (auto p = s->command_ops.begin(); p != s->command_ops.end(); ++p) {
4537 auto op = p->second;
7c673cae
FG
4538 fmt->open_object_section("command_op");
4539 fmt->dump_unsigned("command_id", op->tid);
4540 fmt->dump_int("osd", op->session ? op->session->osd : -1);
4541 fmt->open_array_section("command");
f67539c2 4542 for (auto q = op->cmd.begin(); q != op->cmd.end(); ++q)
7c673cae
FG
4543 fmt->dump_string("word", *q);
4544 fmt->close_section();
4545 if (op->target_osd >= 0)
4546 fmt->dump_int("target_osd", op->target_osd);
4547 else
4548 fmt->dump_stream("target_pg") << op->target_pg;
4549 fmt->close_section(); // command_op object
4550 }
4551}
4552
4553void Objecter::dump_command_ops(Formatter *fmt)
4554{
4555 // We have a read-lock on the Objecter here
4556 fmt->open_array_section("command_ops");
f67539c2 4557 for (auto siter = osd_sessions.begin();
7c673cae 4558 siter != osd_sessions.end(); ++siter) {
f67539c2
TL
4559 auto s = siter->second;
4560 shared_lock sl(s->lock);
7c673cae
FG
4561 _dump_command_ops(s, fmt);
4562 sl.unlock();
4563 }
4564 _dump_command_ops(homeless_session, fmt);
4565 fmt->close_section(); // command_ops array
4566}
4567
4568void Objecter::dump_pool_ops(Formatter *fmt) const
4569{
4570 fmt->open_array_section("pool_ops");
f67539c2
TL
4571 for (auto p = pool_ops.begin(); p != pool_ops.end(); ++p) {
4572 auto op = p->second;
7c673cae
FG
4573 fmt->open_object_section("pool_op");
4574 fmt->dump_unsigned("tid", op->tid);
4575 fmt->dump_int("pool", op->pool);
4576 fmt->dump_string("name", op->name);
4577 fmt->dump_int("operation_type", op->pool_op);
7c673cae
FG
4578 fmt->dump_unsigned("crush_rule", op->crush_rule);
4579 fmt->dump_stream("snapid") << op->snapid;
4580 fmt->dump_stream("last_sent") << op->last_submit;
4581 fmt->close_section(); // pool_op object
4582 }
4583 fmt->close_section(); // pool_ops array
4584}
4585
4586void Objecter::dump_pool_stat_ops(Formatter *fmt) const
4587{
4588 fmt->open_array_section("pool_stat_ops");
f67539c2 4589 for (auto p = poolstat_ops.begin();
7c673cae
FG
4590 p != poolstat_ops.end();
4591 ++p) {
4592 PoolStatOp *op = p->second;
4593 fmt->open_object_section("pool_stat_op");
4594 fmt->dump_unsigned("tid", op->tid);
4595 fmt->dump_stream("last_sent") << op->last_submit;
4596
4597 fmt->open_array_section("pools");
f67539c2
TL
4598 for (const auto& it : op->pools) {
4599 fmt->dump_string("pool", it);
7c673cae
FG
4600 }
4601 fmt->close_section(); // pools array
4602
4603 fmt->close_section(); // pool_stat_op object
4604 }
4605 fmt->close_section(); // pool_stat_ops array
4606}
4607
4608void Objecter::dump_statfs_ops(Formatter *fmt) const
4609{
4610 fmt->open_array_section("statfs_ops");
f67539c2
TL
4611 for (auto p = statfs_ops.begin(); p != statfs_ops.end(); ++p) {
4612 auto op = p->second;
7c673cae
FG
4613 fmt->open_object_section("statfs_op");
4614 fmt->dump_unsigned("tid", op->tid);
4615 fmt->dump_stream("last_sent") << op->last_submit;
4616 fmt->close_section(); // statfs_op object
4617 }
4618 fmt->close_section(); // statfs_ops array
4619}
4620
4621Objecter::RequestStateHook::RequestStateHook(Objecter *objecter) :
4622 m_objecter(objecter)
4623{
4624}
4625
9f95a23c
TL
4626int Objecter::RequestStateHook::call(std::string_view command,
4627 const cmdmap_t& cmdmap,
4628 Formatter *f,
4629 std::ostream& ss,
f67539c2 4630 cb::list& out)
7c673cae 4631{
7c673cae
FG
4632 shared_lock rl(m_objecter->rwlock);
4633 m_objecter->dump_requests(f);
9f95a23c 4634 return 0;
7c673cae
FG
4635}
4636
f67539c2 4637void Objecter::blocklist_self(bool set)
7c673cae 4638{
f67539c2 4639 ldout(cct, 10) << "blocklist_self " << (set ? "add" : "rm") << dendl;
7c673cae
FG
4640
4641 vector<string> cmd;
f67539c2 4642 cmd.push_back("{\"prefix\":\"osd blocklist\", ");
7c673cae 4643 if (set)
f67539c2 4644 cmd.push_back("\"blocklistop\":\"add\",");
7c673cae 4645 else
f67539c2 4646 cmd.push_back("\"blocklistop\":\"rm\",");
7c673cae 4647 stringstream ss;
f67539c2 4648 // this is somewhat imprecise in that we are blocklisting our first addr only
11fdf7f2 4649 ss << messenger->get_myaddrs().front().get_legacy_str();
7c673cae
FG
4650 cmd.push_back("\"addr\":\"" + ss.str() + "\"");
4651
f67539c2 4652 auto m = new MMonCommand(monc->get_fsid());
7c673cae
FG
4653 m->cmd = cmd;
4654
f67539c2
TL
4655 // NOTE: no fallback to legacy blacklist command implemented here
4656 // since this is only used for test code.
4657
7c673cae
FG
4658 monc->send_mon_message(m);
4659}
4660
4661// commands
4662
4663void Objecter::handle_command_reply(MCommandReply *m)
4664{
4665 unique_lock wl(rwlock);
31f18b77 4666 if (!initialized) {
7c673cae
FG
4667 m->put();
4668 return;
4669 }
4670
4671 ConnectionRef con = m->get_connection();
11fdf7f2
TL
4672 auto priv = con->get_priv();
4673 auto s = static_cast<OSDSession*>(priv.get());
7c673cae
FG
4674 if (!s || s->con != con) {
4675 ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
4676 m->put();
7c673cae
FG
4677 return;
4678 }
4679
f67539c2
TL
4680 shared_lock sl(s->lock);
4681 auto p = s->command_ops.find(m->get_tid());
7c673cae
FG
4682 if (p == s->command_ops.end()) {
4683 ldout(cct, 10) << "handle_command_reply tid " << m->get_tid()
4684 << " not found" << dendl;
4685 m->put();
4686 sl.unlock();
7c673cae
FG
4687 return;
4688 }
4689
4690 CommandOp *c = p->second;
4691 if (!c->session ||
4692 m->get_connection() != c->session->con) {
4693 ldout(cct, 10) << "handle_command_reply tid " << m->get_tid()
4694 << " got reply from wrong connection "
4695 << m->get_connection() << " " << m->get_source_inst()
4696 << dendl;
4697 m->put();
4698 sl.unlock();
7c673cae
FG
4699 return;
4700 }
f67539c2 4701
9f95a23c
TL
4702 if (m->r == -EAGAIN) {
4703 ldout(cct,10) << __func__ << " tid " << m->get_tid()
4704 << " got EAGAIN, requesting map and resending" << dendl;
4705 // NOTE: This might resend twice... once now, and once again when
4706 // we get an updated osdmap and the PG is found to have moved.
4707 _maybe_request_map();
4708 _send_command(c);
4709 m->put();
4710 sl.unlock();
4711 return;
4712 }
4713
7c673cae
FG
4714 sl.unlock();
4715
f67539c2
TL
4716 unique_lock sul(s->lock);
4717 _finish_command(c, m->r < 0 ? bs::error_code(-m->r, osd_category()) :
4718 bs::error_code(), std::move(m->rs),
4719 std::move(m->get_data()));
28e407b8
AA
4720 sul.unlock();
4721
7c673cae 4722 m->put();
7c673cae
FG
4723}
4724
522d829b
TL
4725Objecter::LingerOp::LingerOp(Objecter *o, uint64_t linger_id)
4726 : objecter(o),
4727 linger_id(linger_id),
4728 watch_lock(ceph::make_shared_mutex(
4729 fmt::format("LingerOp::watch_lock #{}", linger_id)))
4730{}
4731
7c673cae
FG
4732void Objecter::submit_command(CommandOp *c, ceph_tid_t *ptid)
4733{
4734 shunique_lock sul(rwlock, ceph::acquire_unique);
4735
31f18b77 4736 ceph_tid_t tid = ++last_tid;
7c673cae
FG
4737 ldout(cct, 10) << "_submit_command " << tid << " " << c->cmd << dendl;
4738 c->tid = tid;
4739
4740 {
f67539c2 4741 unique_lock hs_wl(homeless_session->lock);
7c673cae
FG
4742 _session_command_op_assign(homeless_session, c);
4743 }
4744
4745 _calc_command_target(c, sul);
4746 _assign_command_session(c, sul);
4747 if (osd_timeout > timespan(0)) {
4748 c->ontimeout = timer.add_event(osd_timeout,
4749 [this, c, tid]() {
f67539c2
TL
4750 command_op_cancel(
4751 c->session, tid,
4752 osdc_errc::timed_out); });
7c673cae
FG
4753 }
4754
4755 if (!c->session->is_homeless()) {
4756 _send_command(c);
4757 } else {
4758 _maybe_request_map();
4759 }
4760 if (c->map_check_error)
4761 _send_command_map_check(c);
f67539c2
TL
4762 if (ptid)
4763 *ptid = tid;
7c673cae
FG
4764
4765 logger->inc(l_osdc_command_active);
4766}
4767
f67539c2
TL
4768int Objecter::_calc_command_target(CommandOp *c,
4769 shunique_lock<ceph::shared_mutex>& sul)
7c673cae 4770{
11fdf7f2 4771 ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
7c673cae
FG
4772
4773 c->map_check_error = 0;
4774
4775 // ignore overlays, just like we do with pg ops
4776 c->target.flags |= CEPH_OSD_FLAG_IGNORE_OVERLAY;
4777
4778 if (c->target_osd >= 0) {
4779 if (!osdmap->exists(c->target_osd)) {
4780 c->map_check_error = -ENOENT;
4781 c->map_check_error_str = "osd dne";
4782 c->target.osd = -1;
4783 return RECALC_OP_TARGET_OSD_DNE;
4784 }
4785 if (osdmap->is_down(c->target_osd)) {
4786 c->map_check_error = -ENXIO;
4787 c->map_check_error_str = "osd down";
4788 c->target.osd = -1;
4789 return RECALC_OP_TARGET_OSD_DOWN;
4790 }
4791 c->target.osd = c->target_osd;
4792 } else {
4793 int ret = _calc_target(&(c->target), nullptr, true);
4794 if (ret == RECALC_OP_TARGET_POOL_DNE) {
4795 c->map_check_error = -ENOENT;
4796 c->map_check_error_str = "pool dne";
4797 c->target.osd = -1;
4798 return ret;
4799 } else if (ret == RECALC_OP_TARGET_OSD_DOWN) {
4800 c->map_check_error = -ENXIO;
4801 c->map_check_error_str = "osd down";
4802 c->target.osd = -1;
4803 return ret;
4804 }
4805 }
4806
4807 OSDSession *s;
4808 int r = _get_session(c->target.osd, &s, sul);
11fdf7f2 4809 ceph_assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
7c673cae
FG
4810
4811 if (c->session != s) {
4812 put_session(s);
4813 return RECALC_OP_TARGET_NEED_RESEND;
4814 }
4815
4816 put_session(s);
4817
4818 ldout(cct, 20) << "_recalc_command_target " << c->tid << " no change, "
4819 << c->session << dendl;
4820
4821 return RECALC_OP_TARGET_NO_ACTION;
4822}
4823
4824void Objecter::_assign_command_session(CommandOp *c,
f67539c2 4825 shunique_lock<ceph::shared_mutex>& sul)
7c673cae 4826{
11fdf7f2 4827 ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
7c673cae
FG
4828
4829 OSDSession *s;
4830 int r = _get_session(c->target.osd, &s, sul);
11fdf7f2 4831 ceph_assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
7c673cae
FG
4832
4833 if (c->session != s) {
4834 if (c->session) {
4835 OSDSession *cs = c->session;
f67539c2 4836 unique_lock csl(cs->lock);
7c673cae
FG
4837 _session_command_op_remove(c->session, c);
4838 csl.unlock();
4839 }
f67539c2 4840 unique_lock sl(s->lock);
7c673cae
FG
4841 _session_command_op_assign(s, c);
4842 }
4843
4844 put_session(s);
4845}
4846
4847void Objecter::_send_command(CommandOp *c)
4848{
4849 ldout(cct, 10) << "_send_command " << c->tid << dendl;
11fdf7f2
TL
4850 ceph_assert(c->session);
4851 ceph_assert(c->session->con);
f67539c2 4852 auto m = new MCommand(monc->monmap.fsid);
7c673cae
FG
4853 m->cmd = c->cmd;
4854 m->set_data(c->inbl);
4855 m->set_tid(c->tid);
4856 c->session->con->send_message(m);
4857 logger->inc(l_osdc_command_send);
4858}
4859
f67539c2
TL
4860int Objecter::command_op_cancel(OSDSession *s, ceph_tid_t tid,
4861 bs::error_code ec)
7c673cae 4862{
11fdf7f2 4863 ceph_assert(initialized);
7c673cae
FG
4864
4865 unique_lock wl(rwlock);
4866
f67539c2 4867 auto it = s->command_ops.find(tid);
7c673cae
FG
4868 if (it == s->command_ops.end()) {
4869 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4870 return -ENOENT;
4871 }
4872
4873 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4874
4875 CommandOp *op = it->second;
4876 _command_cancel_map_check(op);
f67539c2
TL
4877 unique_lock sl(op->session->lock);
4878 _finish_command(op, ec, {}, {});
28e407b8 4879 sl.unlock();
7c673cae
FG
4880 return 0;
4881}
4882
f67539c2
TL
4883void Objecter::_finish_command(CommandOp *c, bs::error_code ec,
4884 string&& rs, cb::list&& bl)
7c673cae
FG
4885{
4886 // rwlock is locked unique
28e407b8 4887 // session lock is locked
7c673cae 4888
f67539c2 4889 ldout(cct, 10) << "_finish_command " << c->tid << " = " << ec << " "
7c673cae 4890 << rs << dendl;
f67539c2 4891
7c673cae 4892 if (c->onfinish)
f67539c2 4893 c->onfinish->defer(std::move(c->onfinish), ec, std::move(rs), std::move(bl));
7c673cae 4894
f67539c2 4895 if (c->ontimeout && ec != bs::errc::timed_out)
7c673cae
FG
4896 timer.cancel_event(c->ontimeout);
4897
7c673cae 4898 _session_command_op_remove(c->session, c);
7c673cae
FG
4899
4900 c->put();
4901
4902 logger->dec(l_osdc_command_active);
4903}
4904
4905Objecter::OSDSession::~OSDSession()
4906{
4907 // Caller is responsible for re-assigning or
4908 // destroying any ops that were assigned to us
11fdf7f2
TL
4909 ceph_assert(ops.empty());
4910 ceph_assert(linger_ops.empty());
4911 ceph_assert(command_ops.empty());
7c673cae
FG
4912}
4913
f67539c2
TL
4914Objecter::Objecter(CephContext *cct,
4915 Messenger *m, MonClient *mc,
4916 boost::asio::io_context& service) :
4917 Dispatcher(cct), messenger(m), monc(mc), service(service)
f91f0fd5
TL
4918{
4919 mon_timeout = cct->_conf.get_val<std::chrono::seconds>("rados_mon_op_timeout");
4920 osd_timeout = cct->_conf.get_val<std::chrono::seconds>("rados_osd_op_timeout");
4921}
9f95a23c 4922
7c673cae
FG
4923Objecter::~Objecter()
4924{
11fdf7f2
TL
4925 ceph_assert(homeless_session->get_nref() == 1);
4926 ceph_assert(num_homeless_ops == 0);
7c673cae
FG
4927 homeless_session->put();
4928
11fdf7f2
TL
4929 ceph_assert(osd_sessions.empty());
4930 ceph_assert(poolstat_ops.empty());
4931 ceph_assert(statfs_ops.empty());
4932 ceph_assert(pool_ops.empty());
4933 ceph_assert(waiting_for_map.empty());
4934 ceph_assert(linger_ops.empty());
4935 ceph_assert(check_latest_map_lingers.empty());
4936 ceph_assert(check_latest_map_ops.empty());
4937 ceph_assert(check_latest_map_commands.empty());
7c673cae 4938
11fdf7f2
TL
4939 ceph_assert(!m_request_state_hook);
4940 ceph_assert(!logger);
7c673cae
FG
4941}
4942
4943/**
4944 * Wait until this OSD map epoch is received before
4945 * sending any more operations to OSDs. Use this
4946 * when it is known that the client can't trust
4947 * anything from before this epoch (e.g. due to
f67539c2 4948 * client blocklist at this epoch).
7c673cae
FG
4949 */
4950void Objecter::set_epoch_barrier(epoch_t epoch)
4951{
4952 unique_lock wl(rwlock);
4953
4954 ldout(cct, 7) << __func__ << ": barrier " << epoch << " (was "
4955 << epoch_barrier << ") current epoch " << osdmap->get_epoch()
4956 << dendl;
4957 if (epoch > epoch_barrier) {
4958 epoch_barrier = epoch;
4959 _maybe_request_map();
4960 }
4961}
4962
4963
4964
4965hobject_t Objecter::enumerate_objects_begin()
4966{
4967 return hobject_t();
4968}
4969
4970hobject_t Objecter::enumerate_objects_end()
4971{
4972 return hobject_t::get_max();
4973}
4974
f67539c2
TL
4975template<typename T>
4976struct EnumerationContext {
4977 Objecter* objecter;
7c673cae 4978 const hobject_t end;
f67539c2
TL
4979 const cb::list filter;
4980 uint32_t max;
4981 const object_locator_t oloc;
4982 std::vector<T> ls;
4983private:
4984 fu2::unique_function<void(bs::error_code,
4985 std::vector<T>,
4986 hobject_t) &&> on_finish;
4987public:
4988 epoch_t epoch = 0;
4989 int budget = -1;
4990
4991 EnumerationContext(Objecter* objecter,
4992 hobject_t end, cb::list filter,
4993 uint32_t max, object_locator_t oloc,
4994 decltype(on_finish) on_finish)
4995 : objecter(objecter), end(std::move(end)), filter(std::move(filter)),
4996 max(max), oloc(std::move(oloc)), on_finish(std::move(on_finish)) {}
4997
4998 void operator()(bs::error_code ec,
4999 std::vector<T> v,
5000 hobject_t h) && {
5001 if (budget >= 0) {
5002 objecter->put_op_budget_bytes(budget);
5003 budget = -1;
5004 }
5005
5006 std::move(on_finish)(ec, std::move(v), std::move(h));
5007 }
5008};
7c673cae 5009
f67539c2
TL
5010template<typename T>
5011struct CB_EnumerateReply {
5012 cb::list bl;
7c673cae 5013
f67539c2
TL
5014 Objecter* objecter;
5015 std::unique_ptr<EnumerationContext<T>> ctx;
7c673cae 5016
f67539c2
TL
5017 CB_EnumerateReply(Objecter* objecter,
5018 std::unique_ptr<EnumerationContext<T>>&& ctx) :
5019 objecter(objecter), ctx(std::move(ctx)) {}
5020
5021 void operator()(bs::error_code ec) {
5022 objecter->_enumerate_reply(std::move(bl), ec, std::move(ctx));
7c673cae
FG
5023 }
5024};
5025
f67539c2 5026template<typename T>
7c673cae 5027void Objecter::enumerate_objects(
f67539c2
TL
5028 int64_t pool_id,
5029 std::string_view ns,
5030 hobject_t start,
5031 hobject_t end,
5032 const uint32_t max,
5033 const cb::list& filter_bl,
5034 fu2::unique_function<void(bs::error_code,
5035 std::vector<T>,
5036 hobject_t) &&> on_finish) {
7c673cae
FG
5037 if (!end.is_max() && start > end) {
5038 lderr(cct) << __func__ << ": start " << start << " > end " << end << dendl;
f67539c2 5039 std::move(on_finish)(osdc_errc::precondition_violated, {}, {});
7c673cae
FG
5040 return;
5041 }
5042
5043 if (max < 1) {
5044 lderr(cct) << __func__ << ": result size may not be zero" << dendl;
f67539c2 5045 std::move(on_finish)(osdc_errc::precondition_violated, {}, {});
7c673cae
FG
5046 return;
5047 }
5048
5049 if (start.is_max()) {
f67539c2 5050 std::move(on_finish)({}, {}, {});
7c673cae
FG
5051 return;
5052 }
5053
5054 shared_lock rl(rwlock);
11fdf7f2 5055 ceph_assert(osdmap->get_epoch());
7c673cae
FG
5056 if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) {
5057 rl.unlock();
5058 lderr(cct) << __func__ << ": SORTBITWISE cluster flag not set" << dendl;
f67539c2 5059 std::move(on_finish)(osdc_errc::not_supported, {}, {});
7c673cae
FG
5060 return;
5061 }
f67539c2 5062 const pg_pool_t* p = osdmap->get_pg_pool(pool_id);
7c673cae
FG
5063 if (!p) {
5064 lderr(cct) << __func__ << ": pool " << pool_id << " DNE in osd epoch "
5065 << osdmap->get_epoch() << dendl;
5066 rl.unlock();
f67539c2 5067 std::move(on_finish)(osdc_errc::pool_dne, {}, {});
7c673cae
FG
5068 return;
5069 } else {
5070 rl.unlock();
5071 }
5072
f67539c2
TL
5073 _issue_enumerate(start,
5074 std::make_unique<EnumerationContext<T>>(
5075 this, std::move(end), filter_bl,
5076 max, object_locator_t{pool_id, ns},
5077 std::move(on_finish)));
5078}
5079
5080template
5081void Objecter::enumerate_objects<librados::ListObjectImpl>(
5082 int64_t pool_id,
5083 std::string_view ns,
5084 hobject_t start,
5085 hobject_t end,
5086 const uint32_t max,
5087 const cb::list& filter_bl,
5088 fu2::unique_function<void(bs::error_code,
5089 std::vector<librados::ListObjectImpl>,
5090 hobject_t) &&> on_finish);
5091
5092template
5093void Objecter::enumerate_objects<neorados::Entry>(
5094 int64_t pool_id,
5095 std::string_view ns,
5096 hobject_t start,
5097 hobject_t end,
5098 const uint32_t max,
5099 const cb::list& filter_bl,
5100 fu2::unique_function<void(bs::error_code,
5101 std::vector<neorados::Entry>,
5102 hobject_t) &&> on_finish);
5103
5104
5105
5106template<typename T>
5107void Objecter::_issue_enumerate(hobject_t start,
5108 std::unique_ptr<EnumerationContext<T>> ctx) {
7c673cae 5109 ObjectOperation op;
f67539c2
TL
5110 auto c = ctx.get();
5111 op.pg_nls(c->max, c->filter, start, osdmap->get_epoch());
5112 auto on_ack = std::make_unique<CB_EnumerateReply<T>>(this, std::move(ctx));
5113 // I hate having to do this. Try to find a cleaner way
5114 // later.
5115 auto epoch = &c->epoch;
5116 auto budget = &c->budget;
5117 auto pbl = &on_ack->bl;
7c673cae
FG
5118
5119 // Issue. See you later in _enumerate_reply
f67539c2
TL
5120 pg_read(start.get_hash(),
5121 c->oloc, op, pbl, 0,
5122 Op::OpComp::create(service.get_executor(),
5123 [c = std::move(on_ack)]
5124 (bs::error_code ec) mutable {
5125 (*c)(ec);
5126 }), epoch, budget);
5127}
5128
5129template
5130void Objecter::_issue_enumerate<librados::ListObjectImpl>(
5131 hobject_t start,
5132 std::unique_ptr<EnumerationContext<librados::ListObjectImpl>> ctx);
5133template
5134void Objecter::_issue_enumerate<neorados::Entry>(
5135 hobject_t start, std::unique_ptr<EnumerationContext<neorados::Entry>> ctx);
5136
5137template<typename T>
7c673cae 5138void Objecter::_enumerate_reply(
f67539c2
TL
5139 cb::list&& bl,
5140 bs::error_code ec,
5141 std::unique_ptr<EnumerationContext<T>>&& ctx)
5142{
5143 if (ec) {
5144 std::move(*ctx)(ec, {}, {});
7c673cae
FG
5145 return;
5146 }
5147
7c673cae 5148 // Decode the results
11fdf7f2 5149 auto iter = bl.cbegin();
f67539c2
TL
5150 pg_nls_response_template<T> response;
5151
5152 try {
5153 response.decode(iter);
5154 if (!iter.end()) {
5155 // extra_info isn't used anywhere. We do this solely to preserve
5156 // backward compatibility
5157 cb::list legacy_extra_info;
5158 decode(legacy_extra_info, iter);
5159 }
5160 } catch (const bs::system_error& e) {
5161 std::move(*ctx)(e.code(), {}, {});
5162 return;
5163 }
7c673cae 5164
f67539c2
TL
5165 shared_lock rl(rwlock);
5166 auto pool = osdmap->get_pg_pool(ctx->oloc.get_pool());
5167 rl.unlock();
5168 if (!pool) {
5169 // pool is gone, drop any results which are now meaningless.
5170 std::move(*ctx)(osdc_errc::pool_dne, {}, {});
5171 return;
7c673cae
FG
5172 }
5173
f67539c2
TL
5174 hobject_t next;
5175 if ((response.handle <= ctx->end)) {
5176 next = response.handle;
7c673cae 5177 } else {
f67539c2 5178 next = ctx->end;
7c673cae
FG
5179
5180 // drop anything after 'end'
7c673cae
FG
5181 while (!response.entries.empty()) {
5182 uint32_t hash = response.entries.back().locator.empty() ?
5183 pool->hash_key(response.entries.back().oid,
5184 response.entries.back().nspace) :
5185 pool->hash_key(response.entries.back().locator,
5186 response.entries.back().nspace);
5187 hobject_t last(response.entries.back().oid,
5188 response.entries.back().locator,
5189 CEPH_NOSNAP,
5190 hash,
f67539c2 5191 ctx->oloc.get_pool(),
7c673cae 5192 response.entries.back().nspace);
f67539c2 5193 if (last < ctx->end)
7c673cae 5194 break;
7c673cae
FG
5195 response.entries.pop_back();
5196 }
7c673cae 5197 }
f67539c2
TL
5198
5199 if (response.entries.size() <= ctx->max) {
5200 ctx->max -= response.entries.size();
5201 std::move(response.entries.begin(), response.entries.end(),
5202 std::back_inserter(ctx->ls));
5203 } else {
5204 auto i = response.entries.begin();
5205 while (ctx->max > 0) {
5206 ctx->ls.push_back(std::move(*i));
5207 --(ctx->max);
5208 ++i;
5209 }
5210 uint32_t hash =
5211 i->locator.empty() ?
5212 pool->hash_key(i->oid, i->nspace) :
5213 pool->hash_key(i->locator, i->nspace);
5214
5215 next = hobject_t{i->oid, i->locator,
5216 CEPH_NOSNAP,
5217 hash,
5218 ctx->oloc.get_pool(),
5219 i->nspace};
7c673cae
FG
5220 }
5221
f67539c2
TL
5222 if (next == ctx->end || ctx->max == 0) {
5223 std::move(*ctx)(ec, std::move(ctx->ls), std::move(next));
5224 } else {
5225 _issue_enumerate(next, std::move(ctx));
5226 }
7c673cae
FG
5227}
5228
f67539c2
TL
5229template
5230void Objecter::_enumerate_reply<librados::ListObjectImpl>(
5231 cb::list&& bl,
5232 bs::error_code ec,
5233 std::unique_ptr<EnumerationContext<librados::ListObjectImpl>>&& ctx);
5234
5235template
5236void Objecter::_enumerate_reply<neorados::Entry>(
5237 cb::list&& bl,
5238 bs::error_code ec,
5239 std::unique_ptr<EnumerationContext<neorados::Entry>>&& ctx);
5240
7c673cae
FG
5241namespace {
5242 using namespace librados;
5243
5244 template <typename T>
f67539c2 5245 void do_decode(std::vector<T>& items, std::vector<cb::list>& bls)
7c673cae
FG
5246 {
5247 for (auto bl : bls) {
11fdf7f2 5248 auto p = bl.cbegin();
7c673cae
FG
5249 T t;
5250 decode(t, p);
5251 items.push_back(t);
5252 }
5253 }
5254
5255 struct C_ObjectOperation_scrub_ls : public Context {
f67539c2
TL
5256 cb::list bl;
5257 uint32_t* interval;
7c673cae
FG
5258 std::vector<inconsistent_obj_t> *objects = nullptr;
5259 std::vector<inconsistent_snapset_t> *snapsets = nullptr;
f67539c2 5260 int* rval;
7c673cae 5261
f67539c2
TL
5262 C_ObjectOperation_scrub_ls(uint32_t* interval,
5263 std::vector<inconsistent_obj_t>* objects,
5264 int* rval)
7c673cae 5265 : interval(interval), objects(objects), rval(rval) {}
f67539c2
TL
5266 C_ObjectOperation_scrub_ls(uint32_t* interval,
5267 std::vector<inconsistent_snapset_t>* snapsets,
5268 int* rval)
7c673cae
FG
5269 : interval(interval), snapsets(snapsets), rval(rval) {}
5270 void finish(int r) override {
5271 if (r < 0 && r != -EAGAIN) {
5272 if (rval)
5273 *rval = r;
5274 return;
5275 }
5276
5277 if (rval)
5278 *rval = 0;
5279
5280 try {
5281 decode();
f67539c2 5282 } catch (cb::error&) {
7c673cae
FG
5283 if (rval)
5284 *rval = -EIO;
5285 }
5286 }
5287 private:
5288 void decode() {
5289 scrub_ls_result_t result;
11fdf7f2 5290 auto p = bl.cbegin();
7c673cae
FG
5291 result.decode(p);
5292 *interval = result.interval;
5293 if (objects) {
5294 do_decode(*objects, result.vals);
5295 } else {
5296 do_decode(*snapsets, result.vals);
5297 }
5298 }
5299 };
5300
5301 template <typename T>
f67539c2 5302 void do_scrub_ls(::ObjectOperation* op,
7c673cae
FG
5303 const scrub_ls_arg_t& arg,
5304 std::vector<T> *items,
f67539c2
TL
5305 uint32_t* interval,
5306 int* rval)
7c673cae
FG
5307 {
5308 OSDOp& osd_op = op->add_op(CEPH_OSD_OP_SCRUBLS);
5309 op->flags |= CEPH_OSD_FLAG_PGOP;
11fdf7f2 5310 ceph_assert(interval);
7c673cae
FG
5311 arg.encode(osd_op.indata);
5312 unsigned p = op->ops.size() - 1;
f67539c2
TL
5313 auto h = new C_ObjectOperation_scrub_ls{interval, items, rval};
5314 op->set_handler(h);
7c673cae
FG
5315 op->out_bl[p] = &h->bl;
5316 op->out_rval[p] = rval;
5317 }
5318}
5319
5320void ::ObjectOperation::scrub_ls(const librados::object_id_t& start_after,
5321 uint64_t max_to_get,
f67539c2
TL
5322 std::vector<librados::inconsistent_obj_t>* objects,
5323 uint32_t* interval,
5324 int* rval)
7c673cae
FG
5325{
5326 scrub_ls_arg_t arg = {*interval, 0, start_after, max_to_get};
5327 do_scrub_ls(this, arg, objects, interval, rval);
5328}
5329
5330void ::ObjectOperation::scrub_ls(const librados::object_id_t& start_after,
5331 uint64_t max_to_get,
5332 std::vector<librados::inconsistent_snapset_t> *snapsets,
5333 uint32_t *interval,
5334 int *rval)
5335{
5336 scrub_ls_arg_t arg = {*interval, 1, start_after, max_to_get};
5337 do_scrub_ls(this, arg, snapsets, interval, rval);
5338}