1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
19 #include "osd/OSDMap.h"
20 #include "osd/error_code.h"
23 #include "mon/MonClient.h"
24 #include "mon/error_code.h"
26 #include "msg/Messenger.h"
27 #include "msg/Message.h"
29 #include "messages/MPing.h"
30 #include "messages/MOSDOp.h"
31 #include "messages/MOSDOpReply.h"
32 #include "messages/MOSDBackoff.h"
33 #include "messages/MOSDMap.h"
35 #include "messages/MPoolOp.h"
36 #include "messages/MPoolOpReply.h"
38 #include "messages/MGetPoolStats.h"
39 #include "messages/MGetPoolStatsReply.h"
40 #include "messages/MStatfs.h"
41 #include "messages/MStatfsReply.h"
43 #include "messages/MMonCommand.h"
45 #include "messages/MCommand.h"
46 #include "messages/MCommandReply.h"
48 #include "messages/MWatchNotify.h"
51 #include "common/Cond.h"
52 #include "common/config.h"
53 #include "common/perf_counters.h"
54 #include "common/scrub_types.h"
55 #include "include/str_list.h"
56 #include "common/errno.h"
57 #include "common/EventTrace.h"
58 #include "common/async/waiter.h"
59 #include "error_code.h"
66 using std::ostringstream
;
70 using std::stringstream
;
75 using ceph::Formatter
;
77 using std::defer_lock
;
78 using std::scoped_lock
;
79 using std::shared_lock
;
80 using std::unique_lock
;
82 using ceph::real_time
;
83 using ceph::real_clock
;
85 using ceph::mono_clock
;
86 using ceph::mono_time
;
90 using ceph::shunique_lock
;
91 using ceph::acquire_shared
;
92 using ceph::acquire_unique
;
94 namespace bc
= boost::container
;
95 namespace bs
= boost::system
;
96 namespace ca
= ceph::async
;
97 namespace cb
= ceph::buffer
;
99 #define dout_subsys ceph_subsys_objecter
101 #define dout_prefix *_dout << messenger->get_myname() << ".objecter "
105 l_osdc_first
= 123200,
109 l_osdc_op_send_bytes
,
124 l_osdc_osdop_writefull
,
125 l_osdc_osdop_writesame
,
128 l_osdc_osdop_truncate
,
131 l_osdc_osdop_sparse_read
,
132 l_osdc_osdop_clonerange
,
133 l_osdc_osdop_getxattr
,
134 l_osdc_osdop_setxattr
,
135 l_osdc_osdop_cmpxattr
,
136 l_osdc_osdop_rmxattr
,
137 l_osdc_osdop_resetxattrs
,
141 l_osdc_osdop_src_cmpxattr
,
143 l_osdc_osdop_pgls_filter
,
146 l_osdc_linger_active
,
148 l_osdc_linger_resend
,
151 l_osdc_poolop_active
,
153 l_osdc_poolop_resend
,
155 l_osdc_poolstat_active
,
156 l_osdc_poolstat_send
,
157 l_osdc_poolstat_resend
,
159 l_osdc_statfs_active
,
161 l_osdc_statfs_resend
,
163 l_osdc_command_active
,
165 l_osdc_command_resend
,
172 l_osdc_osd_session_open
,
173 l_osdc_osd_session_close
,
176 l_osdc_osdop_omap_wr
,
177 l_osdc_osdop_omap_rd
,
178 l_osdc_osdop_omap_del
,
184 inline bs::error_code
osdcode(int r
) {
185 return (r
< 0) ? bs::error_code(-r
, osd_category()) : bs::error_code();
189 // config obs ----------------------------
191 class Objecter::RequestStateHook
: public AdminSocketHook
{
192 Objecter
*m_objecter
;
194 explicit RequestStateHook(Objecter
*objecter
);
195 int call(std::string_view command
, const cmdmap_t
& cmdmap
,
198 cb::list
& out
) override
;
201 std::unique_lock
<std::mutex
> Objecter::OSDSession::get_lock(object_t
& oid
)
203 if (oid
.name
.empty())
206 static constexpr uint32_t HASH_PRIME
= 1021;
207 uint32_t h
= ceph_str_hash_linux(oid
.name
.c_str(), oid
.name
.size())
210 return {completion_locks
[h
% num_locks
], std::defer_lock
};
213 const char** Objecter::get_tracked_conf_keys() const
215 static const char *config_keys
[] = {
217 "rados_mon_op_timeout",
218 "rados_osd_op_timeout",
225 void Objecter::handle_conf_change(const ConfigProxy
& conf
,
226 const std::set
<std::string
> &changed
)
228 if (changed
.count("crush_location")) {
229 update_crush_location();
231 if (changed
.count("rados_mon_op_timeout")) {
232 mon_timeout
= conf
.get_val
<std::chrono::seconds
>("rados_mon_op_timeout");
234 if (changed
.count("rados_osd_op_timeout")) {
235 osd_timeout
= conf
.get_val
<std::chrono::seconds
>("rados_osd_op_timeout");
239 void Objecter::update_crush_location()
241 unique_lock
wl(rwlock
);
242 crush_location
= cct
->crush_location
.get_location();
245 // messages ------------------------------
248 * initialize only internal data structures, don't initiate cluster interaction
250 void Objecter::init()
252 ceph_assert(!initialized
);
255 PerfCountersBuilder
pcb(cct
, "objecter", l_osdc_first
, l_osdc_last
);
257 pcb
.add_u64(l_osdc_op_active
, "op_active", "Operations active", "actv",
258 PerfCountersBuilder::PRIO_CRITICAL
);
259 pcb
.add_u64(l_osdc_op_laggy
, "op_laggy", "Laggy operations");
260 pcb
.add_u64_counter(l_osdc_op_send
, "op_send", "Sent operations");
261 pcb
.add_u64_counter(l_osdc_op_send_bytes
, "op_send_bytes", "Sent data", NULL
, 0, unit_t(UNIT_BYTES
));
262 pcb
.add_u64_counter(l_osdc_op_resend
, "op_resend", "Resent operations");
263 pcb
.add_u64_counter(l_osdc_op_reply
, "op_reply", "Operation reply");
264 pcb
.add_u64_avg(l_osdc_oplen_avg
, "oplen_avg", "Average length of operation vector");
266 pcb
.add_u64_counter(l_osdc_op
, "op", "Operations");
267 pcb
.add_u64_counter(l_osdc_op_r
, "op_r", "Read operations", "rd",
268 PerfCountersBuilder::PRIO_CRITICAL
);
269 pcb
.add_u64_counter(l_osdc_op_w
, "op_w", "Write operations", "wr",
270 PerfCountersBuilder::PRIO_CRITICAL
);
271 pcb
.add_u64_counter(l_osdc_op_rmw
, "op_rmw", "Read-modify-write operations",
272 "rdwr", PerfCountersBuilder::PRIO_INTERESTING
);
273 pcb
.add_u64_counter(l_osdc_op_pg
, "op_pg", "PG operation");
275 pcb
.add_u64_counter(l_osdc_osdop_stat
, "osdop_stat", "Stat operations");
276 pcb
.add_u64_counter(l_osdc_osdop_create
, "osdop_create",
277 "Create object operations");
278 pcb
.add_u64_counter(l_osdc_osdop_read
, "osdop_read", "Read operations");
279 pcb
.add_u64_counter(l_osdc_osdop_write
, "osdop_write", "Write operations");
280 pcb
.add_u64_counter(l_osdc_osdop_writefull
, "osdop_writefull",
281 "Write full object operations");
282 pcb
.add_u64_counter(l_osdc_osdop_writesame
, "osdop_writesame",
283 "Write same operations");
284 pcb
.add_u64_counter(l_osdc_osdop_append
, "osdop_append",
286 pcb
.add_u64_counter(l_osdc_osdop_zero
, "osdop_zero",
287 "Set object to zero operations");
288 pcb
.add_u64_counter(l_osdc_osdop_truncate
, "osdop_truncate",
289 "Truncate object operations");
290 pcb
.add_u64_counter(l_osdc_osdop_delete
, "osdop_delete",
291 "Delete object operations");
292 pcb
.add_u64_counter(l_osdc_osdop_mapext
, "osdop_mapext",
293 "Map extent operations");
294 pcb
.add_u64_counter(l_osdc_osdop_sparse_read
, "osdop_sparse_read",
295 "Sparse read operations");
296 pcb
.add_u64_counter(l_osdc_osdop_clonerange
, "osdop_clonerange",
297 "Clone range operations");
298 pcb
.add_u64_counter(l_osdc_osdop_getxattr
, "osdop_getxattr",
299 "Get xattr operations");
300 pcb
.add_u64_counter(l_osdc_osdop_setxattr
, "osdop_setxattr",
301 "Set xattr operations");
302 pcb
.add_u64_counter(l_osdc_osdop_cmpxattr
, "osdop_cmpxattr",
303 "Xattr comparison operations");
304 pcb
.add_u64_counter(l_osdc_osdop_rmxattr
, "osdop_rmxattr",
305 "Remove xattr operations");
306 pcb
.add_u64_counter(l_osdc_osdop_resetxattrs
, "osdop_resetxattrs",
307 "Reset xattr operations");
308 pcb
.add_u64_counter(l_osdc_osdop_call
, "osdop_call",
309 "Call (execute) operations");
310 pcb
.add_u64_counter(l_osdc_osdop_watch
, "osdop_watch",
311 "Watch by object operations");
312 pcb
.add_u64_counter(l_osdc_osdop_notify
, "osdop_notify",
313 "Notify about object operations");
314 pcb
.add_u64_counter(l_osdc_osdop_src_cmpxattr
, "osdop_src_cmpxattr",
315 "Extended attribute comparison in multi operations");
316 pcb
.add_u64_counter(l_osdc_osdop_pgls
, "osdop_pgls");
317 pcb
.add_u64_counter(l_osdc_osdop_pgls_filter
, "osdop_pgls_filter");
318 pcb
.add_u64_counter(l_osdc_osdop_other
, "osdop_other", "Other operations");
320 pcb
.add_u64(l_osdc_linger_active
, "linger_active",
321 "Active lingering operations");
322 pcb
.add_u64_counter(l_osdc_linger_send
, "linger_send",
323 "Sent lingering operations");
324 pcb
.add_u64_counter(l_osdc_linger_resend
, "linger_resend",
325 "Resent lingering operations");
326 pcb
.add_u64_counter(l_osdc_linger_ping
, "linger_ping",
327 "Sent pings to lingering operations");
329 pcb
.add_u64(l_osdc_poolop_active
, "poolop_active",
330 "Active pool operations");
331 pcb
.add_u64_counter(l_osdc_poolop_send
, "poolop_send",
332 "Sent pool operations");
333 pcb
.add_u64_counter(l_osdc_poolop_resend
, "poolop_resend",
334 "Resent pool operations");
336 pcb
.add_u64(l_osdc_poolstat_active
, "poolstat_active",
337 "Active get pool stat operations");
338 pcb
.add_u64_counter(l_osdc_poolstat_send
, "poolstat_send",
339 "Pool stat operations sent");
340 pcb
.add_u64_counter(l_osdc_poolstat_resend
, "poolstat_resend",
341 "Resent pool stats");
343 pcb
.add_u64(l_osdc_statfs_active
, "statfs_active", "Statfs operations");
344 pcb
.add_u64_counter(l_osdc_statfs_send
, "statfs_send", "Sent FS stats");
345 pcb
.add_u64_counter(l_osdc_statfs_resend
, "statfs_resend",
348 pcb
.add_u64(l_osdc_command_active
, "command_active", "Active commands");
349 pcb
.add_u64_counter(l_osdc_command_send
, "command_send",
351 pcb
.add_u64_counter(l_osdc_command_resend
, "command_resend",
354 pcb
.add_u64(l_osdc_map_epoch
, "map_epoch", "OSD map epoch");
355 pcb
.add_u64_counter(l_osdc_map_full
, "map_full",
356 "Full OSD maps received");
357 pcb
.add_u64_counter(l_osdc_map_inc
, "map_inc",
358 "Incremental OSD maps received");
360 pcb
.add_u64(l_osdc_osd_sessions
, "osd_sessions",
361 "Open sessions"); // open sessions
362 pcb
.add_u64_counter(l_osdc_osd_session_open
, "osd_session_open",
364 pcb
.add_u64_counter(l_osdc_osd_session_close
, "osd_session_close",
366 pcb
.add_u64(l_osdc_osd_laggy
, "osd_laggy", "Laggy OSD sessions");
368 pcb
.add_u64_counter(l_osdc_osdop_omap_wr
, "omap_wr",
369 "OSD OMAP write operations");
370 pcb
.add_u64_counter(l_osdc_osdop_omap_rd
, "omap_rd",
371 "OSD OMAP read operations");
372 pcb
.add_u64_counter(l_osdc_osdop_omap_del
, "omap_del",
373 "OSD OMAP delete operations");
375 logger
= pcb
.create_perf_counters();
376 cct
->get_perfcounters_collection()->add(logger
);
379 m_request_state_hook
= new RequestStateHook(this);
380 auto admin_socket
= cct
->get_admin_socket();
381 int ret
= admin_socket
->register_command("objecter_requests",
382 m_request_state_hook
,
383 "show in-progress osd requests");
385 /* Don't warn on EEXIST, happens if multiple ceph clients
386 * are instantiated from one process */
387 if (ret
< 0 && ret
!= -EEXIST
) {
388 lderr(cct
) << "error registering admin socket command: "
389 << cpp_strerror(ret
) << dendl
;
392 update_crush_location();
394 cct
->_conf
.add_observer(this);
400 * ok, cluster interaction can happen
402 void Objecter::start(const OSDMap
* o
)
404 shared_lock
rl(rwlock
);
408 osdmap
->deepish_copy_from(*o
);
409 prune_pg_mapping(osdmap
->get_pools());
410 } else if (osdmap
->get_epoch() == 0) {
411 _maybe_request_map();
415 void Objecter::shutdown()
417 ceph_assert(initialized
);
419 unique_lock
wl(rwlock
);
424 cct
->_conf
.remove_observer(this);
427 while (!osd_sessions
.empty()) {
428 auto p
= osd_sessions
.begin();
429 close_session(p
->second
);
432 while(!check_latest_map_lingers
.empty()) {
433 auto i
= check_latest_map_lingers
.begin();
435 check_latest_map_lingers
.erase(i
->first
);
438 while(!check_latest_map_ops
.empty()) {
439 auto i
= check_latest_map_ops
.begin();
441 check_latest_map_ops
.erase(i
->first
);
444 while(!check_latest_map_commands
.empty()) {
445 auto i
= check_latest_map_commands
.begin();
447 check_latest_map_commands
.erase(i
->first
);
450 while(!poolstat_ops
.empty()) {
451 auto i
= poolstat_ops
.begin();
453 poolstat_ops
.erase(i
->first
);
456 while(!statfs_ops
.empty()) {
457 auto i
= statfs_ops
.begin();
459 statfs_ops
.erase(i
->first
);
462 while(!pool_ops
.empty()) {
463 auto i
= pool_ops
.begin();
465 pool_ops
.erase(i
->first
);
468 ldout(cct
, 20) << __func__
<< " clearing up homeless session..." << dendl
;
469 while(!homeless_session
->linger_ops
.empty()) {
470 auto i
= homeless_session
->linger_ops
.begin();
471 ldout(cct
, 10) << " linger_op " << i
->first
<< dendl
;
472 LingerOp
*lop
= i
->second
;
474 std::unique_lock
swl(homeless_session
->lock
);
475 _session_linger_op_remove(homeless_session
, lop
);
477 linger_ops
.erase(lop
->linger_id
);
478 linger_ops_set
.erase(lop
);
482 while(!homeless_session
->ops
.empty()) {
483 auto i
= homeless_session
->ops
.begin();
484 ldout(cct
, 10) << " op " << i
->first
<< dendl
;
487 std::unique_lock
swl(homeless_session
->lock
);
488 _session_op_remove(homeless_session
, op
);
493 while(!homeless_session
->command_ops
.empty()) {
494 auto i
= homeless_session
->command_ops
.begin();
495 ldout(cct
, 10) << " command_op " << i
->first
<< dendl
;
496 auto cop
= i
->second
;
498 std::unique_lock
swl(homeless_session
->lock
);
499 _session_command_op_remove(homeless_session
, cop
);
505 if (timer
.cancel_event(tick_event
)) {
506 ldout(cct
, 10) << " successfully canceled tick" << dendl
;
512 cct
->get_perfcounters_collection()->remove(logger
);
517 // Let go of Objecter write lock so timer thread can shutdown
520 // Outside of lock to avoid cycle WRT calls to RequestStateHook
521 // This is safe because we guarantee no concurrent calls to
522 // shutdown() with the ::initialized check at start.
523 if (m_request_state_hook
) {
524 auto admin_socket
= cct
->get_admin_socket();
525 admin_socket
->unregister_commands(m_request_state_hook
);
526 delete m_request_state_hook
;
527 m_request_state_hook
= NULL
;
531 void Objecter::_send_linger(LingerOp
*info
,
532 ceph::shunique_lock
<ceph::shared_mutex
>& sul
)
534 ceph_assert(sul
.owns_lock() && sul
.mutex() == &rwlock
);
536 fu2::unique_function
<Op::OpSig
> oncommit
;
538 std::shared_lock
watchl(info
->watch_lock
);
539 cb::list
*poutbl
= nullptr;
540 if (info
->registered
&& info
->is_watch
) {
541 ldout(cct
, 15) << "send_linger " << info
->linger_id
<< " reconnect"
543 opv
.push_back(OSDOp());
544 opv
.back().op
.op
= CEPH_OSD_OP_WATCH
;
545 opv
.back().op
.watch
.cookie
= info
->get_cookie();
546 opv
.back().op
.watch
.op
= CEPH_OSD_WATCH_OP_RECONNECT
;
547 opv
.back().op
.watch
.gen
= ++info
->register_gen
;
548 oncommit
= CB_Linger_Reconnect(this, info
);
550 ldout(cct
, 15) << "send_linger " << info
->linger_id
<< " register"
553 // TODO Augment ca::Completion with an equivalent of
554 // target so we can handle these cases better.
555 auto c
= std::make_unique
<CB_Linger_Commit
>(this, info
);
556 if (!info
->is_watch
) {
560 oncommit
= [c
= std::move(c
)](bs::error_code ec
) mutable {
565 auto o
= new Op(info
->target
.base_oid
, info
->target
.base_oloc
,
566 std::move(opv
), info
->target
.flags
| CEPH_OSD_FLAG_READ
,
567 std::move(oncommit
), info
->pobjver
);
569 o
->snapid
= info
->snap
;
570 o
->snapc
= info
->snapc
;
571 o
->mtime
= info
->mtime
;
573 o
->target
= info
->target
;
576 // do not resend this; we will send a new op to reregister
577 o
->should_resend
= false;
578 o
->ctx_budgeted
= true;
580 if (info
->register_tid
) {
581 // repeat send. cancel old registration op, if any.
582 std::unique_lock
sl(info
->session
->lock
);
583 if (info
->session
->ops
.count(info
->register_tid
)) {
584 auto o
= info
->session
->ops
[info
->register_tid
];
585 _op_cancel_map_check(o
);
586 _cancel_linger_op(o
);
591 _op_submit_with_budget(o
, sul
, &info
->register_tid
, &info
->ctx_budget
);
593 logger
->inc(l_osdc_linger_send
);
596 void Objecter::_linger_commit(LingerOp
*info
, bs::error_code ec
,
599 std::unique_lock
wl(info
->watch_lock
);
600 ldout(cct
, 10) << "_linger_commit " << info
->linger_id
<< dendl
;
601 if (info
->on_reg_commit
) {
602 info
->on_reg_commit
->defer(std::move(info
->on_reg_commit
),
604 info
->on_reg_commit
.reset();
606 if (ec
&& info
->on_notify_finish
) {
607 info
->on_notify_finish
->defer(std::move(info
->on_notify_finish
),
609 info
->on_notify_finish
.reset();
612 // only tell the user the first time we do this
613 info
->registered
= true;
614 info
->pobjver
= NULL
;
616 if (!info
->is_watch
) {
617 // make note of the notify_id
618 auto p
= outbl
.cbegin();
620 decode(info
->notify_id
, p
);
621 ldout(cct
, 10) << "_linger_commit notify_id=" << info
->notify_id
624 catch (cb::error
& e
) {
629 class CB_DoWatchError
{
631 boost::intrusive_ptr
<Objecter::LingerOp
> info
;
634 CB_DoWatchError(Objecter
*o
, Objecter::LingerOp
*i
,
636 : objecter(o
), info(i
), ec(ec
) {
637 info
->_queued_async();
640 std::unique_lock
wl(objecter
->rwlock
);
641 bool canceled
= info
->canceled
;
645 info
->handle(ec
, 0, info
->get_cookie(), 0, {});
648 info
->finished_async();
652 bs::error_code
Objecter::_normalize_watch_error(bs::error_code ec
)
654 // translate ENOENT -> ENOTCONN so that a delete->disconnection
655 // notification and a failure to reconnect because we raced with
656 // the delete appear the same to the user.
657 if (ec
== bs::errc::no_such_file_or_directory
)
658 ec
= bs::error_code(ENOTCONN
, osd_category());
662 void Objecter::_linger_reconnect(LingerOp
*info
, bs::error_code ec
)
664 ldout(cct
, 10) << __func__
<< " " << info
->linger_id
<< " = " << ec
665 << " (last_error " << info
->last_error
<< ")" << dendl
;
666 std::unique_lock
wl(info
->watch_lock
);
668 if (!info
->last_error
) {
669 ec
= _normalize_watch_error(ec
);
671 boost::asio::defer(finish_strand
, CB_DoWatchError(this, info
, ec
));
676 info
->last_error
= ec
;
679 void Objecter::_send_linger_ping(LingerOp
*info
)
681 // rwlock is locked unique
682 // info->session->lock is locked
684 if (cct
->_conf
->objecter_inject_no_watch_ping
) {
685 ldout(cct
, 10) << __func__
<< " " << info
->linger_id
<< " SKIPPING"
689 if (osdmap
->test_flag(CEPH_OSDMAP_PAUSERD
)) {
690 ldout(cct
, 10) << __func__
<< " PAUSERD" << dendl
;
694 ceph::coarse_mono_time now
= ceph::coarse_mono_clock::now();
695 ldout(cct
, 10) << __func__
<< " " << info
->linger_id
<< " now " << now
699 opv
[0].op
.op
= CEPH_OSD_OP_WATCH
;
700 opv
[0].op
.watch
.cookie
= info
->get_cookie();
701 opv
[0].op
.watch
.op
= CEPH_OSD_WATCH_OP_PING
;
702 opv
[0].op
.watch
.gen
= info
->register_gen
;
704 Op
*o
= new Op(info
->target
.base_oid
, info
->target
.base_oloc
,
705 std::move(opv
), info
->target
.flags
| CEPH_OSD_FLAG_READ
,
706 CB_Linger_Ping(this, info
, now
),
708 o
->target
= info
->target
;
709 o
->should_resend
= false;
712 _session_op_assign(info
->session
, o
);
714 info
->ping_tid
= o
->tid
;
716 logger
->inc(l_osdc_linger_ping
);
719 void Objecter::_linger_ping(LingerOp
*info
, bs::error_code ec
, ceph::coarse_mono_time sent
,
720 uint32_t register_gen
)
722 std::unique_lock
l(info
->watch_lock
);
723 ldout(cct
, 10) << __func__
<< " " << info
->linger_id
724 << " sent " << sent
<< " gen " << register_gen
<< " = " << ec
725 << " (last_error " << info
->last_error
726 << " register_gen " << info
->register_gen
<< ")" << dendl
;
727 if (info
->register_gen
== register_gen
) {
729 info
->watch_valid_thru
= sent
;
730 } else if (ec
&& !info
->last_error
) {
731 ec
= _normalize_watch_error(ec
);
732 info
->last_error
= ec
;
734 boost::asio::defer(finish_strand
, CB_DoWatchError(this, info
, ec
));
738 ldout(cct
, 20) << " ignoring old gen" << dendl
;
742 tl::expected
<ceph::timespan
,
743 bs::error_code
> Objecter::linger_check(LingerOp
*info
)
745 std::shared_lock
l(info
->watch_lock
);
747 ceph::coarse_mono_time stamp
= info
->watch_valid_thru
;
748 if (!info
->watch_pending_async
.empty())
749 stamp
= std::min(info
->watch_valid_thru
, info
->watch_pending_async
.front());
750 auto age
= ceph::coarse_mono_clock::now() - stamp
;
752 ldout(cct
, 10) << __func__
<< " " << info
->linger_id
753 << " err " << info
->last_error
754 << " age " << age
<< dendl
;
755 if (info
->last_error
)
756 return tl::unexpected(info
->last_error
);
757 // return a safe upper bound (we are truncating to ms)
761 void Objecter::linger_cancel(LingerOp
*info
)
763 unique_lock
wl(rwlock
);
764 _linger_cancel(info
);
768 void Objecter::_linger_cancel(LingerOp
*info
)
770 // rwlock is locked unique
771 ldout(cct
, 20) << __func__
<< " linger_id=" << info
->linger_id
<< dendl
;
772 if (!info
->canceled
) {
773 OSDSession
*s
= info
->session
;
774 std::unique_lock
sl(s
->lock
);
775 _session_linger_op_remove(s
, info
);
778 linger_ops
.erase(info
->linger_id
);
779 linger_ops_set
.erase(info
);
780 ceph_assert(linger_ops
.size() == linger_ops_set
.size());
782 info
->canceled
= true;
785 logger
->dec(l_osdc_linger_active
);
791 Objecter::LingerOp
*Objecter::linger_register(const object_t
& oid
,
792 const object_locator_t
& oloc
,
795 unique_lock
l(rwlock
);
797 auto info
= new LingerOp(this, ++max_linger_id
);
798 info
->target
.base_oid
= oid
;
799 info
->target
.base_oloc
= oloc
;
800 if (info
->target
.base_oloc
.key
== oid
)
801 info
->target
.base_oloc
.key
.clear();
802 info
->target
.flags
= flags
;
803 info
->watch_valid_thru
= ceph::coarse_mono_clock::now();
804 ldout(cct
, 10) << __func__
<< " info " << info
805 << " linger_id " << info
->linger_id
806 << " cookie " << info
->get_cookie()
808 linger_ops
[info
->linger_id
] = info
;
809 linger_ops_set
.insert(info
);
810 ceph_assert(linger_ops
.size() == linger_ops_set
.size());
812 info
->get(); // for the caller
816 ceph_tid_t
Objecter::linger_watch(LingerOp
*info
,
818 const SnapContext
& snapc
,
821 decltype(info
->on_reg_commit
)&& oncommit
,
824 info
->is_watch
= true;
827 info
->target
.flags
|= CEPH_OSD_FLAG_WRITE
;
830 info
->pobjver
= objver
;
831 info
->on_reg_commit
= std::move(oncommit
);
833 info
->ctx_budget
= take_linger_budget(info
);
835 shunique_lock
sul(rwlock
, ceph::acquire_unique
);
836 _linger_submit(info
, sul
);
837 logger
->inc(l_osdc_linger_active
);
840 return info
->linger_id
;
843 ceph_tid_t
Objecter::linger_notify(LingerOp
*info
,
845 snapid_t snap
, cb::list
& inbl
,
846 decltype(LingerOp::on_reg_commit
)&& onfinish
,
850 info
->target
.flags
|= CEPH_OSD_FLAG_READ
;
853 info
->pobjver
= objver
;
854 info
->on_reg_commit
= std::move(onfinish
);
855 info
->ctx_budget
= take_linger_budget(info
);
857 shunique_lock
sul(rwlock
, ceph::acquire_unique
);
858 _linger_submit(info
, sul
);
859 logger
->inc(l_osdc_linger_active
);
862 return info
->linger_id
;
865 void Objecter::_linger_submit(LingerOp
*info
,
866 ceph::shunique_lock
<ceph::shared_mutex
>& sul
)
868 ceph_assert(sul
.owns_lock() && sul
.mutex() == &rwlock
);
869 ceph_assert(info
->linger_id
);
870 ceph_assert(info
->ctx_budget
!= -1); // caller needs to have taken budget already!
872 // Populate Op::target
873 OSDSession
*s
= NULL
;
874 int r
= _calc_target(&info
->target
, nullptr);
876 case RECALC_OP_TARGET_POOL_EIO
:
877 _check_linger_pool_eio(info
);
881 // Create LingerOp<->OSDSession relation
882 r
= _get_session(info
->target
.osd
, &s
, sul
);
884 unique_lock
sl(s
->lock
);
885 _session_linger_op_assign(s
, info
);
889 _send_linger(info
, sul
);
892 struct CB_DoWatchNotify
{
894 boost::intrusive_ptr
<Objecter::LingerOp
> info
;
895 boost::intrusive_ptr
<MWatchNotify
> msg
;
896 CB_DoWatchNotify(Objecter
*o
, Objecter::LingerOp
*i
, MWatchNotify
*m
)
897 : objecter(o
), info(i
), msg(m
) {
898 info
->_queued_async();
901 objecter
->_do_watch_notify(std::move(info
), std::move(msg
));
905 void Objecter::handle_watch_notify(MWatchNotify
*m
)
907 shared_lock
l(rwlock
);
912 LingerOp
*info
= reinterpret_cast<LingerOp
*>(m
->cookie
);
913 if (linger_ops_set
.count(info
) == 0) {
914 ldout(cct
, 7) << __func__
<< " cookie " << m
->cookie
<< " dne" << dendl
;
917 std::unique_lock
wl(info
->watch_lock
);
918 if (m
->opcode
== CEPH_WATCH_EVENT_DISCONNECT
) {
919 if (!info
->last_error
) {
920 info
->last_error
= bs::error_code(ENOTCONN
, osd_category());
922 boost::asio::defer(finish_strand
, CB_DoWatchError(this, info
,
926 } else if (!info
->is_watch
) {
927 // we have CEPH_WATCH_EVENT_NOTIFY_COMPLETE; we can do this inline
928 // since we know the only user (librados) is safe to call in
929 // fast-dispatch context
930 if (info
->notify_id
&&
931 info
->notify_id
!= m
->notify_id
) {
932 ldout(cct
, 10) << __func__
<< " reply notify " << m
->notify_id
933 << " != " << info
->notify_id
<< ", ignoring" << dendl
;
934 } else if (info
->on_notify_finish
) {
935 info
->on_notify_finish
->defer(
936 std::move(info
->on_notify_finish
),
937 osdcode(m
->return_code
), std::move(m
->get_data()));
939 // if we race with reconnect we might get a second notify; only
940 // notify the caller once!
941 info
->on_notify_finish
= nullptr;
944 boost::asio::defer(finish_strand
, CB_DoWatchNotify(this, info
, m
));
948 void Objecter::_do_watch_notify(boost::intrusive_ptr
<LingerOp
> info
,
949 boost::intrusive_ptr
<MWatchNotify
> m
)
951 ldout(cct
, 10) << __func__
<< " " << *m
<< dendl
;
953 shared_lock
l(rwlock
);
954 ceph_assert(initialized
);
956 if (info
->canceled
) {
961 // notify completion?
962 ceph_assert(info
->is_watch
);
963 ceph_assert(info
->handle
);
964 ceph_assert(m
->opcode
!= CEPH_WATCH_EVENT_DISCONNECT
);
969 case CEPH_WATCH_EVENT_NOTIFY
:
970 info
->handle({}, m
->notify_id
, m
->cookie
, m
->notifier_gid
, std::move(m
->bl
));
975 info
->finished_async();
978 bool Objecter::ms_dispatch(Message
*m
)
980 ldout(cct
, 10) << __func__
<< " " << cct
<< " " << *m
<< dendl
;
981 switch (m
->get_type()) {
982 // these we exlusively handle
983 case CEPH_MSG_OSD_OPREPLY
:
984 handle_osd_op_reply(static_cast<MOSDOpReply
*>(m
));
987 case CEPH_MSG_OSD_BACKOFF
:
988 handle_osd_backoff(static_cast<MOSDBackoff
*>(m
));
991 case CEPH_MSG_WATCH_NOTIFY
:
992 handle_watch_notify(static_cast<MWatchNotify
*>(m
));
996 case MSG_COMMAND_REPLY
:
997 if (m
->get_source().type() == CEPH_ENTITY_TYPE_OSD
) {
998 handle_command_reply(static_cast<MCommandReply
*>(m
));
1004 case MSG_GETPOOLSTATSREPLY
:
1005 handle_get_pool_stats_reply(static_cast<MGetPoolStatsReply
*>(m
));
1008 case CEPH_MSG_POOLOP_REPLY
:
1009 handle_pool_op_reply(static_cast<MPoolOpReply
*>(m
));
1012 case CEPH_MSG_STATFS_REPLY
:
1013 handle_fs_stats_reply(static_cast<MStatfsReply
*>(m
));
1016 // these we give others a chance to inspect
1019 case CEPH_MSG_OSD_MAP
:
1020 handle_osd_map(static_cast<MOSDMap
*>(m
));
1026 void Objecter::_scan_requests(
1030 map
<int64_t, bool> *pool_full_map
,
1031 map
<ceph_tid_t
, Op
*>& need_resend
,
1032 list
<LingerOp
*>& need_resend_linger
,
1033 map
<ceph_tid_t
, CommandOp
*>& need_resend_command
,
1034 ceph::shunique_lock
<ceph::shared_mutex
>& sul
)
1036 ceph_assert(sul
.owns_lock() && sul
.mutex() == &rwlock
);
1038 list
<LingerOp
*> unregister_lingers
;
1040 std::unique_lock
sl(s
->lock
);
1042 // check for changed linger mappings (_before_ regular ops)
1043 auto lp
= s
->linger_ops
.begin();
1044 while (lp
!= s
->linger_ops
.end()) {
1045 auto op
= lp
->second
;
1046 ceph_assert(op
->session
== s
);
1047 // check_linger_pool_dne() may touch linger_ops; prevent iterator
1050 ldout(cct
, 10) << " checking linger op " << op
->linger_id
<< dendl
;
1051 bool unregister
, force_resend_writes
= cluster_full
;
1052 int r
= _recalc_linger_op_target(op
, sul
);
1054 force_resend_writes
= force_resend_writes
||
1055 (*pool_full_map
)[op
->target
.base_oloc
.pool
];
1057 case RECALC_OP_TARGET_NO_ACTION
:
1058 if (!skipped_map
&& !force_resend_writes
)
1061 case RECALC_OP_TARGET_NEED_RESEND
:
1062 need_resend_linger
.push_back(op
);
1063 _linger_cancel_map_check(op
);
1065 case RECALC_OP_TARGET_POOL_DNE
:
1066 _check_linger_pool_dne(op
, &unregister
);
1068 ldout(cct
, 10) << " need to unregister linger op "
1069 << op
->linger_id
<< dendl
;
1071 unregister_lingers
.push_back(op
);
1074 case RECALC_OP_TARGET_POOL_EIO
:
1075 _check_linger_pool_eio(op
);
1076 ldout(cct
, 10) << " need to unregister linger op "
1077 << op
->linger_id
<< dendl
;
1079 unregister_lingers
.push_back(op
);
1084 // check for changed request mappings
1085 auto p
= s
->ops
.begin();
1086 while (p
!= s
->ops
.end()) {
1088 ++p
; // check_op_pool_dne() may touch ops; prevent iterator invalidation
1089 ldout(cct
, 10) << " checking op " << op
->tid
<< dendl
;
1090 _prune_snapc(osdmap
->get_new_removed_snaps(), op
);
1091 bool force_resend_writes
= cluster_full
;
1093 force_resend_writes
= force_resend_writes
||
1094 (*pool_full_map
)[op
->target
.base_oloc
.pool
];
1095 int r
= _calc_target(&op
->target
,
1096 op
->session
? op
->session
->con
.get() : nullptr);
1098 case RECALC_OP_TARGET_NO_ACTION
:
1099 if (!skipped_map
&& !(force_resend_writes
&& op
->target
.respects_full()))
1102 case RECALC_OP_TARGET_NEED_RESEND
:
1103 _session_op_remove(op
->session
, op
);
1104 need_resend
[op
->tid
] = op
;
1105 _op_cancel_map_check(op
);
1107 case RECALC_OP_TARGET_POOL_DNE
:
1108 _check_op_pool_dne(op
, &sl
);
1110 case RECALC_OP_TARGET_POOL_EIO
:
1111 _check_op_pool_eio(op
, &sl
);
1117 auto cp
= s
->command_ops
.begin();
1118 while (cp
!= s
->command_ops
.end()) {
1119 auto c
= cp
->second
;
1121 ldout(cct
, 10) << " checking command " << c
->tid
<< dendl
;
1122 bool force_resend_writes
= cluster_full
;
1124 force_resend_writes
= force_resend_writes
||
1125 (*pool_full_map
)[c
->target_pg
.pool()];
1126 int r
= _calc_command_target(c
, sul
);
1128 case RECALC_OP_TARGET_NO_ACTION
:
1129 // resend if skipped map; otherwise do nothing.
1130 if (!skipped_map
&& !force_resend_writes
)
1133 case RECALC_OP_TARGET_NEED_RESEND
:
1134 need_resend_command
[c
->tid
] = c
;
1135 _session_command_op_remove(c
->session
, c
);
1136 _command_cancel_map_check(c
);
1138 case RECALC_OP_TARGET_POOL_DNE
:
1139 case RECALC_OP_TARGET_OSD_DNE
:
1140 case RECALC_OP_TARGET_OSD_DOWN
:
1141 _check_command_map_dne(c
);
1148 for (auto iter
= unregister_lingers
.begin();
1149 iter
!= unregister_lingers
.end();
1151 _linger_cancel(*iter
);
1156 void Objecter::handle_osd_map(MOSDMap
*m
)
1158 ceph::shunique_lock
sul(rwlock
, acquire_unique
);
1162 ceph_assert(osdmap
);
1164 if (m
->fsid
!= monc
->get_fsid()) {
1165 ldout(cct
, 0) << "handle_osd_map fsid " << m
->fsid
1166 << " != " << monc
->get_fsid() << dendl
;
1170 bool was_pauserd
= osdmap
->test_flag(CEPH_OSDMAP_PAUSERD
);
1171 bool cluster_full
= _osdmap_full_flag();
1172 bool was_pausewr
= osdmap
->test_flag(CEPH_OSDMAP_PAUSEWR
) || cluster_full
||
1173 _osdmap_has_pool_full();
1174 map
<int64_t, bool> pool_full_map
;
1175 for (auto it
= osdmap
->get_pools().begin();
1176 it
!= osdmap
->get_pools().end(); ++it
)
1177 pool_full_map
[it
->first
] = _osdmap_pool_full(it
->second
);
1180 list
<LingerOp
*> need_resend_linger
;
1181 map
<ceph_tid_t
, Op
*> need_resend
;
1182 map
<ceph_tid_t
, CommandOp
*> need_resend_command
;
1184 if (m
->get_last() <= osdmap
->get_epoch()) {
1185 ldout(cct
, 3) << "handle_osd_map ignoring epochs ["
1186 << m
->get_first() << "," << m
->get_last()
1187 << "] <= " << osdmap
->get_epoch() << dendl
;
1189 ldout(cct
, 3) << "handle_osd_map got epochs ["
1190 << m
->get_first() << "," << m
->get_last()
1191 << "] > " << osdmap
->get_epoch() << dendl
;
1193 if (osdmap
->get_epoch()) {
1194 bool skipped_map
= false;
1195 // we want incrementals
1196 for (epoch_t e
= osdmap
->get_epoch() + 1;
1200 if (osdmap
->get_epoch() == e
-1 &&
1201 m
->incremental_maps
.count(e
)) {
1202 ldout(cct
, 3) << "handle_osd_map decoding incremental epoch " << e
1204 OSDMap::Incremental
inc(m
->incremental_maps
[e
]);
1205 osdmap
->apply_incremental(inc
);
1207 emit_blocklist_events(inc
);
1209 logger
->inc(l_osdc_map_inc
);
1211 else if (m
->maps
.count(e
)) {
1212 ldout(cct
, 3) << "handle_osd_map decoding full epoch " << e
<< dendl
;
1213 auto new_osdmap
= std::make_unique
<OSDMap
>();
1214 new_osdmap
->decode(m
->maps
[e
]);
1216 emit_blocklist_events(*osdmap
, *new_osdmap
);
1217 osdmap
= std::move(new_osdmap
);
1219 logger
->inc(l_osdc_map_full
);
1222 if (e
>= m
->get_oldest()) {
1223 ldout(cct
, 3) << "handle_osd_map requesting missing epoch "
1224 << osdmap
->get_epoch()+1 << dendl
;
1225 _maybe_request_map();
1228 ldout(cct
, 3) << "handle_osd_map missing epoch "
1229 << osdmap
->get_epoch()+1
1230 << ", jumping to " << m
->get_oldest() << dendl
;
1231 e
= m
->get_oldest() - 1;
1235 logger
->set(l_osdc_map_epoch
, osdmap
->get_epoch());
1237 prune_pg_mapping(osdmap
->get_pools());
1238 cluster_full
= cluster_full
|| _osdmap_full_flag();
1239 update_pool_full_map(pool_full_map
);
1241 // check all outstanding requests on every epoch
1242 for (auto& i
: need_resend
) {
1243 _prune_snapc(osdmap
->get_new_removed_snaps(), i
.second
);
1245 _scan_requests(homeless_session
, skipped_map
, cluster_full
,
1246 &pool_full_map
, need_resend
,
1247 need_resend_linger
, need_resend_command
, sul
);
1248 for (auto p
= osd_sessions
.begin();
1249 p
!= osd_sessions
.end(); ) {
1251 _scan_requests(s
, skipped_map
, cluster_full
,
1252 &pool_full_map
, need_resend
,
1253 need_resend_linger
, need_resend_command
, sul
);
1255 // osd down or addr change?
1256 if (!osdmap
->is_up(s
->osd
) ||
1258 s
->con
->get_peer_addrs() != osdmap
->get_addrs(s
->osd
))) {
1263 ceph_assert(e
== osdmap
->get_epoch());
1267 // first map. we want the full thing.
1268 if (m
->maps
.count(m
->get_last())) {
1269 for (auto p
= osd_sessions
.begin();
1270 p
!= osd_sessions
.end(); ++p
) {
1271 OSDSession
*s
= p
->second
;
1272 _scan_requests(s
, false, false, NULL
, need_resend
,
1273 need_resend_linger
, need_resend_command
, sul
);
1275 ldout(cct
, 3) << "handle_osd_map decoding full epoch "
1276 << m
->get_last() << dendl
;
1277 osdmap
->decode(m
->maps
[m
->get_last()]);
1278 prune_pg_mapping(osdmap
->get_pools());
1280 _scan_requests(homeless_session
, false, false, NULL
,
1281 need_resend
, need_resend_linger
,
1282 need_resend_command
, sul
);
1284 ldout(cct
, 3) << "handle_osd_map hmm, i want a full map, requesting"
1286 monc
->sub_want("osdmap", 0, CEPH_SUBSCRIBE_ONETIME
);
1292 // make sure need_resend targets reflect latest map
1293 for (auto p
= need_resend
.begin(); p
!= need_resend
.end(); ) {
1295 if (op
->target
.epoch
< osdmap
->get_epoch()) {
1296 ldout(cct
, 10) << __func__
<< " checking op " << p
->first
<< dendl
;
1297 int r
= _calc_target(&op
->target
, nullptr);
1298 if (r
== RECALC_OP_TARGET_POOL_DNE
) {
1299 p
= need_resend
.erase(p
);
1300 _check_op_pool_dne(op
, nullptr);
1309 bool pauserd
= osdmap
->test_flag(CEPH_OSDMAP_PAUSERD
);
1310 bool pausewr
= osdmap
->test_flag(CEPH_OSDMAP_PAUSEWR
) || _osdmap_full_flag()
1311 || _osdmap_has_pool_full();
1314 if (was_pauserd
|| was_pausewr
|| pauserd
|| pausewr
||
1315 osdmap
->get_epoch() < epoch_barrier
) {
1316 _maybe_request_map();
1320 for (auto p
= need_resend
.begin();
1321 p
!= need_resend
.end(); ++p
) {
1322 auto op
= p
->second
;
1323 auto s
= op
->session
;
1324 bool mapped_session
= false;
1326 int r
= _map_session(&op
->target
, &s
, sul
);
1327 ceph_assert(r
== 0);
1328 mapped_session
= true;
1332 std::unique_lock
sl(s
->lock
);
1333 if (mapped_session
) {
1334 _session_op_assign(s
, op
);
1336 if (op
->should_resend
) {
1337 if (!op
->session
->is_homeless() && !op
->target
.paused
) {
1338 logger
->inc(l_osdc_op_resend
);
1342 _op_cancel_map_check(op
);
1343 _cancel_linger_op(op
);
1348 for (auto p
= need_resend_linger
.begin();
1349 p
!= need_resend_linger
.end(); ++p
) {
1351 ceph_assert(op
->session
);
1352 if (!op
->session
->is_homeless()) {
1353 logger
->inc(l_osdc_linger_resend
);
1354 _send_linger(op
, sul
);
1357 for (auto p
= need_resend_command
.begin();
1358 p
!= need_resend_command
.end(); ++p
) {
1360 if (c
->target
.osd
>= 0) {
1361 _assign_command_session(c
, sul
);
1362 if (c
->session
&& !c
->session
->is_homeless()) {
1370 // finish any Contexts that were waiting on a map update
1371 auto p
= waiting_for_map
.begin();
1372 while (p
!= waiting_for_map
.end() &&
1373 p
->first
<= osdmap
->get_epoch()) {
1374 //go through the list and call the onfinish methods
1375 for (auto& [c
, ec
] : p
->second
) {
1376 ca::post(std::move(c
), ec
);
1378 waiting_for_map
.erase(p
++);
1381 monc
->sub_got("osdmap", osdmap
->get_epoch());
1383 if (!waiting_for_map
.empty()) {
1384 _maybe_request_map();
1388 void Objecter::enable_blocklist_events()
1390 unique_lock
wl(rwlock
);
1392 blocklist_events_enabled
= true;
1395 void Objecter::consume_blocklist_events(std::set
<entity_addr_t
> *events
)
1397 unique_lock
wl(rwlock
);
1399 if (events
->empty()) {
1400 events
->swap(blocklist_events
);
1402 for (const auto &i
: blocklist_events
) {
1405 blocklist_events
.clear();
1409 void Objecter::emit_blocklist_events(const OSDMap::Incremental
&inc
)
1411 if (!blocklist_events_enabled
) {
1415 for (const auto &i
: inc
.new_blocklist
) {
1416 blocklist_events
.insert(i
.first
);
1420 void Objecter::emit_blocklist_events(const OSDMap
&old_osd_map
,
1421 const OSDMap
&new_osd_map
)
1423 if (!blocklist_events_enabled
) {
1427 std::set
<entity_addr_t
> old_set
;
1428 std::set
<entity_addr_t
> new_set
;
1429 std::set
<entity_addr_t
> old_range_set
;
1430 std::set
<entity_addr_t
> new_range_set
;
1432 old_osd_map
.get_blocklist(&old_set
, &old_range_set
);
1433 new_osd_map
.get_blocklist(&new_set
, &new_range_set
);
1435 std::set
<entity_addr_t
> delta_set
;
1436 std::set_difference(
1437 new_set
.begin(), new_set
.end(), old_set
.begin(), old_set
.end(),
1438 std::inserter(delta_set
, delta_set
.begin()));
1439 std::set_difference(
1440 new_range_set
.begin(), new_range_set
.end(),
1441 old_range_set
.begin(), old_range_set
.end(),
1442 std::inserter(delta_set
, delta_set
.begin()));
1443 blocklist_events
.insert(delta_set
.begin(), delta_set
.end());
1448 void Objecter::CB_Op_Map_Latest::operator()(bs::error_code e
,
1449 version_t latest
, version_t
)
1451 if (e
== bs::errc::resource_unavailable_try_again
||
1452 e
== bs::errc::operation_canceled
)
1455 lgeneric_subdout(objecter
->cct
, objecter
, 10)
1456 << "op_map_latest r=" << e
<< " tid=" << tid
1457 << " latest " << latest
<< dendl
;
1459 unique_lock
wl(objecter
->rwlock
);
1461 auto iter
= objecter
->check_latest_map_ops
.find(tid
);
1462 if (iter
== objecter
->check_latest_map_ops
.end()) {
1463 lgeneric_subdout(objecter
->cct
, objecter
, 10)
1464 << "op_map_latest op "<< tid
<< " not found" << dendl
;
1468 Op
*op
= iter
->second
;
1469 objecter
->check_latest_map_ops
.erase(iter
);
1471 lgeneric_subdout(objecter
->cct
, objecter
, 20)
1472 << "op_map_latest op "<< op
<< dendl
;
1474 if (op
->map_dne_bound
== 0)
1475 op
->map_dne_bound
= latest
;
1477 unique_lock
sl(op
->session
->lock
, defer_lock
);
1478 objecter
->_check_op_pool_dne(op
, &sl
);
1483 int Objecter::pool_snap_by_name(int64_t poolid
, const char *snap_name
,
1484 snapid_t
*snap
) const
1486 shared_lock
rl(rwlock
);
1488 auto& pools
= osdmap
->get_pools();
1489 auto iter
= pools
.find(poolid
);
1490 if (iter
== pools
.end()) {
1493 const pg_pool_t
& pg_pool
= iter
->second
;
1494 for (auto p
= pg_pool
.snaps
.begin();
1495 p
!= pg_pool
.snaps
.end();
1497 if (p
->second
.name
== snap_name
) {
1505 int Objecter::pool_snap_get_info(int64_t poolid
, snapid_t snap
,
1506 pool_snap_info_t
*info
) const
1508 shared_lock
rl(rwlock
);
1510 auto& pools
= osdmap
->get_pools();
1511 auto iter
= pools
.find(poolid
);
1512 if (iter
== pools
.end()) {
1515 const pg_pool_t
& pg_pool
= iter
->second
;
1516 auto p
= pg_pool
.snaps
.find(snap
);
1517 if (p
== pg_pool
.snaps
.end())
1524 int Objecter::pool_snap_list(int64_t poolid
, vector
<uint64_t> *snaps
)
1526 shared_lock
rl(rwlock
);
1528 const pg_pool_t
*pi
= osdmap
->get_pg_pool(poolid
);
1531 for (auto p
= pi
->snaps
.begin();
1532 p
!= pi
->snaps
.end();
1534 snaps
->push_back(p
->first
);
1539 // sl may be unlocked.
1540 void Objecter::_check_op_pool_dne(Op
*op
, std::unique_lock
<std::shared_mutex
> *sl
)
1542 // rwlock is locked unique
1544 if (op
->target
.pool_ever_existed
) {
1545 // the pool previously existed and now it does not, which means it
1547 op
->map_dne_bound
= osdmap
->get_epoch();
1548 ldout(cct
, 10) << "check_op_pool_dne tid " << op
->tid
1549 << " pool previously exists but now does not"
1552 ldout(cct
, 10) << "check_op_pool_dne tid " << op
->tid
1553 << " current " << osdmap
->get_epoch()
1554 << " map_dne_bound " << op
->map_dne_bound
1557 if (op
->map_dne_bound
> 0) {
1558 if (osdmap
->get_epoch() >= op
->map_dne_bound
) {
1559 // we had a new enough map
1560 ldout(cct
, 10) << "check_op_pool_dne tid " << op
->tid
1561 << " concluding pool " << op
->target
.base_pgid
.pool()
1563 if (op
->has_completion()) {
1565 op
->complete(osdc_errc::pool_dne
, -ENOENT
);
1568 OSDSession
*s
= op
->session
;
1570 ceph_assert(s
!= NULL
);
1571 ceph_assert(sl
->mutex() == &s
->lock
);
1572 bool session_locked
= sl
->owns_lock();
1573 if (!session_locked
) {
1577 if (!session_locked
) {
1581 _finish_op(op
, 0); // no session
1585 _send_op_map_check(op
);
1589 // sl may be unlocked.
1590 void Objecter::_check_op_pool_eio(Op
*op
, std::unique_lock
<std::shared_mutex
> *sl
)
1592 // rwlock is locked unique
1594 // we had a new enough map
1595 ldout(cct
, 10) << "check_op_pool_eio tid " << op
->tid
1596 << " concluding pool " << op
->target
.base_pgid
.pool()
1597 << " has eio" << dendl
;
1598 if (op
->has_completion()) {
1600 op
->complete(osdc_errc::pool_eio
, -EIO
);
1603 OSDSession
*s
= op
->session
;
1605 ceph_assert(s
!= NULL
);
1606 ceph_assert(sl
->mutex() == &s
->lock
);
1607 bool session_locked
= sl
->owns_lock();
1608 if (!session_locked
) {
1612 if (!session_locked
) {
1616 _finish_op(op
, 0); // no session
1620 void Objecter::_send_op_map_check(Op
*op
)
1622 // rwlock is locked unique
1624 if (check_latest_map_ops
.count(op
->tid
) == 0) {
1626 check_latest_map_ops
[op
->tid
] = op
;
1627 monc
->get_version("osdmap", CB_Op_Map_Latest(this, op
->tid
));
1631 void Objecter::_op_cancel_map_check(Op
*op
)
1633 // rwlock is locked unique
1634 auto iter
= check_latest_map_ops
.find(op
->tid
);
1635 if (iter
!= check_latest_map_ops
.end()) {
1636 Op
*op
= iter
->second
;
1638 check_latest_map_ops
.erase(iter
);
1642 // linger pool check
1644 void Objecter::CB_Linger_Map_Latest::operator()(bs::error_code e
,
1648 if (e
== bs::errc::resource_unavailable_try_again
||
1649 e
== bs::errc::operation_canceled
) {
1650 // ignore callback; we will retry in resend_mon_ops()
1654 unique_lock
wl(objecter
->rwlock
);
1656 auto iter
= objecter
->check_latest_map_lingers
.find(linger_id
);
1657 if (iter
== objecter
->check_latest_map_lingers
.end()) {
1661 auto op
= iter
->second
;
1662 objecter
->check_latest_map_lingers
.erase(iter
);
1664 if (op
->map_dne_bound
== 0)
1665 op
->map_dne_bound
= latest
;
1668 objecter
->_check_linger_pool_dne(op
, &unregister
);
1671 objecter
->_linger_cancel(op
);
1677 void Objecter::_check_linger_pool_dne(LingerOp
*op
, bool *need_unregister
)
1679 // rwlock is locked unique
1681 *need_unregister
= false;
1683 if (op
->register_gen
> 0) {
1684 ldout(cct
, 10) << "_check_linger_pool_dne linger_id " << op
->linger_id
1685 << " pool previously existed but now does not"
1687 op
->map_dne_bound
= osdmap
->get_epoch();
1689 ldout(cct
, 10) << "_check_linger_pool_dne linger_id " << op
->linger_id
1690 << " current " << osdmap
->get_epoch()
1691 << " map_dne_bound " << op
->map_dne_bound
1694 if (op
->map_dne_bound
> 0) {
1695 if (osdmap
->get_epoch() >= op
->map_dne_bound
) {
1696 std::unique_lock wl
{op
->watch_lock
};
1697 if (op
->on_reg_commit
) {
1698 op
->on_reg_commit
->defer(std::move(op
->on_reg_commit
),
1699 osdc_errc::pool_dne
, cb::list
{});
1700 op
->on_reg_commit
= nullptr;
1702 if (op
->on_notify_finish
) {
1703 op
->on_notify_finish
->defer(std::move(op
->on_notify_finish
),
1704 osdc_errc::pool_dne
, cb::list
{});
1705 op
->on_notify_finish
= nullptr;
1707 *need_unregister
= true;
1710 _send_linger_map_check(op
);
1714 void Objecter::_check_linger_pool_eio(LingerOp
*op
)
1716 // rwlock is locked unique
1718 std::unique_lock wl
{op
->watch_lock
};
1719 if (op
->on_reg_commit
) {
1720 op
->on_reg_commit
->defer(std::move(op
->on_reg_commit
),
1721 osdc_errc::pool_dne
, cb::list
{});
1722 op
->on_reg_commit
= nullptr;
1724 if (op
->on_notify_finish
) {
1725 op
->on_notify_finish
->defer(std::move(op
->on_notify_finish
),
1726 osdc_errc::pool_dne
, cb::list
{});
1727 op
->on_notify_finish
= nullptr;
1731 void Objecter::_send_linger_map_check(LingerOp
*op
)
1734 if (check_latest_map_lingers
.count(op
->linger_id
) == 0) {
1736 check_latest_map_lingers
[op
->linger_id
] = op
;
1737 monc
->get_version("osdmap", CB_Linger_Map_Latest(this, op
->linger_id
));
1741 void Objecter::_linger_cancel_map_check(LingerOp
*op
)
1743 // rwlock is locked unique
1745 auto iter
= check_latest_map_lingers
.find(op
->linger_id
);
1746 if (iter
!= check_latest_map_lingers
.end()) {
1747 LingerOp
*op
= iter
->second
;
1749 check_latest_map_lingers
.erase(iter
);
1753 // command pool check
1755 void Objecter::CB_Command_Map_Latest::operator()(bs::error_code e
,
1756 version_t latest
, version_t
)
1758 if (e
== bs::errc::resource_unavailable_try_again
||
1759 e
== bs::errc::operation_canceled
) {
1760 // ignore callback; we will retry in resend_mon_ops()
1764 unique_lock
wl(objecter
->rwlock
);
1766 auto iter
= objecter
->check_latest_map_commands
.find(tid
);
1767 if (iter
== objecter
->check_latest_map_commands
.end()) {
1771 auto c
= iter
->second
;
1772 objecter
->check_latest_map_commands
.erase(iter
);
1774 if (c
->map_dne_bound
== 0)
1775 c
->map_dne_bound
= latest
;
1777 unique_lock
sul(c
->session
->lock
);
1778 objecter
->_check_command_map_dne(c
);
1784 void Objecter::_check_command_map_dne(CommandOp
*c
)
1786 // rwlock is locked unique
1787 // session is locked unique
1789 ldout(cct
, 10) << "_check_command_map_dne tid " << c
->tid
1790 << " current " << osdmap
->get_epoch()
1791 << " map_dne_bound " << c
->map_dne_bound
1793 if (c
->map_dne_bound
> 0) {
1794 if (osdmap
->get_epoch() >= c
->map_dne_bound
) {
1795 _finish_command(c
, osdcode(c
->map_check_error
),
1796 std::move(c
->map_check_error_str
), {});
1799 _send_command_map_check(c
);
1803 void Objecter::_send_command_map_check(CommandOp
*c
)
1805 // rwlock is locked unique
1806 // session is locked unique
1809 if (check_latest_map_commands
.count(c
->tid
) == 0) {
1811 check_latest_map_commands
[c
->tid
] = c
;
1812 monc
->get_version("osdmap", CB_Command_Map_Latest(this, c
->tid
));
1816 void Objecter::_command_cancel_map_check(CommandOp
*c
)
1818 // rwlock is locked uniqe
1820 auto iter
= check_latest_map_commands
.find(c
->tid
);
1821 if (iter
!= check_latest_map_commands
.end()) {
1822 auto c
= iter
->second
;
1824 check_latest_map_commands
.erase(iter
);
1830 * Look up OSDSession by OSD id.
1832 * @returns 0 on success, or -EAGAIN if the lock context requires
1833 * promotion to write.
1835 int Objecter::_get_session(int osd
, OSDSession
**session
,
1836 shunique_lock
<ceph::shared_mutex
>& sul
)
1838 ceph_assert(sul
&& sul
.mutex() == &rwlock
);
1841 *session
= homeless_session
;
1842 ldout(cct
, 20) << __func__
<< " osd=" << osd
<< " returning homeless"
1847 auto p
= osd_sessions
.find(osd
);
1848 if (p
!= osd_sessions
.end()) {
1852 ldout(cct
, 20) << __func__
<< " s=" << s
<< " osd=" << osd
<< " "
1853 << s
->get_nref() << dendl
;
1856 if (!sul
.owns_lock()) {
1859 auto s
= new OSDSession(cct
, osd
);
1860 osd_sessions
[osd
] = s
;
1861 s
->con
= messenger
->connect_to_osd(osdmap
->get_addrs(osd
));
1862 s
->con
->set_priv(RefCountedPtr
{s
});
1863 logger
->inc(l_osdc_osd_session_open
);
1864 logger
->set(l_osdc_osd_sessions
, osd_sessions
.size());
1867 ldout(cct
, 20) << __func__
<< " s=" << s
<< " osd=" << osd
<< " "
1868 << s
->get_nref() << dendl
;
1872 void Objecter::put_session(Objecter::OSDSession
*s
)
1874 if (s
&& !s
->is_homeless()) {
1875 ldout(cct
, 20) << __func__
<< " s=" << s
<< " osd=" << s
->osd
<< " "
1876 << s
->get_nref() << dendl
;
1881 void Objecter::get_session(Objecter::OSDSession
*s
)
1883 ceph_assert(s
!= NULL
);
1885 if (!s
->is_homeless()) {
1886 ldout(cct
, 20) << __func__
<< " s=" << s
<< " osd=" << s
->osd
<< " "
1887 << s
->get_nref() << dendl
;
1892 void Objecter::_reopen_session(OSDSession
*s
)
1894 // rwlock is locked unique
1895 // s->lock is locked
1897 auto addrs
= osdmap
->get_addrs(s
->osd
);
1898 ldout(cct
, 10) << "reopen_session osd." << s
->osd
<< " session, addr now "
1901 s
->con
->set_priv(NULL
);
1902 s
->con
->mark_down();
1903 logger
->inc(l_osdc_osd_session_close
);
1905 s
->con
= messenger
->connect_to_osd(addrs
);
1906 s
->con
->set_priv(RefCountedPtr
{s
});
1908 logger
->inc(l_osdc_osd_session_open
);
1911 void Objecter::close_session(OSDSession
*s
)
1913 // rwlock is locked unique
1915 ldout(cct
, 10) << "close_session for osd." << s
->osd
<< dendl
;
1917 s
->con
->set_priv(NULL
);
1918 s
->con
->mark_down();
1919 logger
->inc(l_osdc_osd_session_close
);
1921 unique_lock
sl(s
->lock
);
1923 std::list
<LingerOp
*> homeless_lingers
;
1924 std::list
<CommandOp
*> homeless_commands
;
1925 std::list
<Op
*> homeless_ops
;
1927 while (!s
->linger_ops
.empty()) {
1928 auto i
= s
->linger_ops
.begin();
1929 ldout(cct
, 10) << " linger_op " << i
->first
<< dendl
;
1930 homeless_lingers
.push_back(i
->second
);
1931 _session_linger_op_remove(s
, i
->second
);
1934 while (!s
->ops
.empty()) {
1935 auto i
= s
->ops
.begin();
1936 ldout(cct
, 10) << " op " << i
->first
<< dendl
;
1937 homeless_ops
.push_back(i
->second
);
1938 _session_op_remove(s
, i
->second
);
1941 while (!s
->command_ops
.empty()) {
1942 auto i
= s
->command_ops
.begin();
1943 ldout(cct
, 10) << " command_op " << i
->first
<< dendl
;
1944 homeless_commands
.push_back(i
->second
);
1945 _session_command_op_remove(s
, i
->second
);
1948 osd_sessions
.erase(s
->osd
);
1952 // Assign any leftover ops to the homeless session
1954 unique_lock
hsl(homeless_session
->lock
);
1955 for (auto i
= homeless_lingers
.begin();
1956 i
!= homeless_lingers
.end(); ++i
) {
1957 _session_linger_op_assign(homeless_session
, *i
);
1959 for (auto i
= homeless_ops
.begin();
1960 i
!= homeless_ops
.end(); ++i
) {
1961 _session_op_assign(homeless_session
, *i
);
1963 for (auto i
= homeless_commands
.begin();
1964 i
!= homeless_commands
.end(); ++i
) {
1965 _session_command_op_assign(homeless_session
, *i
);
1969 logger
->set(l_osdc_osd_sessions
, osd_sessions
.size());
1972 void Objecter::wait_for_osd_map(epoch_t e
)
1974 unique_lock
l(rwlock
);
1975 if (osdmap
->get_epoch() >= e
) {
1980 ca::waiter
<bs::error_code
> w
;
1981 waiting_for_map
[e
].emplace_back(OpCompletion::create(
1982 service
.get_executor(),
1989 void Objecter::_get_latest_version(epoch_t oldest
, epoch_t newest
,
1990 std::unique_ptr
<OpCompletion
> fin
,
1991 std::unique_lock
<ceph::shared_mutex
>&& l
)
1994 if (osdmap
->get_epoch() >= newest
) {
1995 ldout(cct
, 10) << __func__
<< " latest " << newest
<< ", have it" << dendl
;
1997 ca::defer(std::move(fin
), bs::error_code
{});
1999 ldout(cct
, 10) << __func__
<< " latest " << newest
<< ", waiting" << dendl
;
2000 _wait_for_new_map(std::move(fin
), newest
, bs::error_code
{});
2005 void Objecter::maybe_request_map()
2007 shared_lock
rl(rwlock
);
2008 _maybe_request_map();
2011 void Objecter::_maybe_request_map()
2015 if (_osdmap_full_flag()
2016 || osdmap
->test_flag(CEPH_OSDMAP_PAUSERD
)
2017 || osdmap
->test_flag(CEPH_OSDMAP_PAUSEWR
)) {
2018 ldout(cct
, 10) << "_maybe_request_map subscribing (continuous) to next "
2019 "osd map (FULL flag is set)" << dendl
;
2022 << "_maybe_request_map subscribing (onetime) to next osd map" << dendl
;
2023 flag
= CEPH_SUBSCRIBE_ONETIME
;
2025 epoch_t epoch
= osdmap
->get_epoch() ? osdmap
->get_epoch()+1 : 0;
2026 if (monc
->sub_want("osdmap", epoch
, flag
)) {
2031 void Objecter::_wait_for_new_map(std::unique_ptr
<OpCompletion
> c
, epoch_t epoch
,
2034 // rwlock is locked unique
2035 waiting_for_map
[epoch
].emplace_back(std::move(c
), ec
);
2036 _maybe_request_map();
2041 * Use this together with wait_for_map: this is a pre-check to avoid
2042 * allocating a Context for wait_for_map if we can see that we
2043 * definitely already have the epoch.
2045 * This does *not* replace the need to handle the return value of
2046 * wait_for_map: just because we don't have it in this pre-check
2047 * doesn't mean we won't have it when calling back into wait_for_map,
2048 * since the objecter lock is dropped in between.
2050 bool Objecter::have_map(const epoch_t epoch
)
2052 shared_lock
rl(rwlock
);
2053 if (osdmap
->get_epoch() >= epoch
) {
2060 void Objecter::_kick_requests(OSDSession
*session
,
2061 map
<uint64_t, LingerOp
*>& lresend
)
2063 // rwlock is locked unique
2066 session
->backoffs
.clear();
2067 session
->backoffs_by_id
.clear();
2070 map
<ceph_tid_t
,Op
*> resend
; // resend in tid order
2071 for (auto p
= session
->ops
.begin(); p
!= session
->ops
.end();) {
2074 if (op
->should_resend
) {
2075 if (!op
->target
.paused
)
2076 resend
[op
->tid
] = op
;
2078 _op_cancel_map_check(op
);
2079 _cancel_linger_op(op
);
2083 logger
->inc(l_osdc_op_resend
, resend
.size());
2084 while (!resend
.empty()) {
2085 _send_op(resend
.begin()->second
);
2086 resend
.erase(resend
.begin());
2090 logger
->inc(l_osdc_linger_resend
, session
->linger_ops
.size());
2091 for (auto j
= session
->linger_ops
.begin();
2092 j
!= session
->linger_ops
.end(); ++j
) {
2093 LingerOp
*op
= j
->second
;
2095 ceph_assert(lresend
.count(j
->first
) == 0);
2096 lresend
[j
->first
] = op
;
2100 logger
->inc(l_osdc_command_resend
, session
->command_ops
.size());
2101 map
<uint64_t,CommandOp
*> cresend
; // resend in order
2102 for (auto k
= session
->command_ops
.begin();
2103 k
!= session
->command_ops
.end(); ++k
) {
2104 cresend
[k
->first
] = k
->second
;
2106 while (!cresend
.empty()) {
2107 _send_command(cresend
.begin()->second
);
2108 cresend
.erase(cresend
.begin());
2112 void Objecter::_linger_ops_resend(map
<uint64_t, LingerOp
*>& lresend
,
2113 unique_lock
<ceph::shared_mutex
>& ul
)
2115 ceph_assert(ul
.owns_lock());
2116 shunique_lock
sul(std::move(ul
));
2117 while (!lresend
.empty()) {
2118 LingerOp
*op
= lresend
.begin()->second
;
2119 if (!op
->canceled
) {
2120 _send_linger(op
, sul
);
2123 lresend
.erase(lresend
.begin());
2125 ul
= sul
.release_to_unique();
2128 void Objecter::start_tick()
2130 ceph_assert(tick_event
== 0);
2132 timer
.add_event(ceph::make_timespan(cct
->_conf
->objecter_tick_interval
),
2133 &Objecter::tick
, this);
2136 void Objecter::tick()
2138 shared_lock
rl(rwlock
);
2140 ldout(cct
, 10) << "tick" << dendl
;
2142 // we are only called by C_Tick
2146 // we raced with shutdown
2147 ldout(cct
, 10) << __func__
<< " raced with shutdown" << dendl
;
2151 set
<OSDSession
*> toping
;
2154 // look for laggy requests
2155 auto cutoff
= ceph::coarse_mono_clock::now();
2156 cutoff
-= ceph::make_timespan(cct
->_conf
->objecter_timeout
); // timeout
2158 unsigned laggy_ops
= 0;
2160 for (auto siter
= osd_sessions
.begin();
2161 siter
!= osd_sessions
.end(); ++siter
) {
2162 auto s
= siter
->second
;
2163 scoped_lock
l(s
->lock
);
2165 for (auto p
= s
->ops
.begin(); p
!= s
->ops
.end(); ++p
) {
2166 auto op
= p
->second
;
2167 ceph_assert(op
->session
);
2168 if (op
->stamp
< cutoff
) {
2169 ldout(cct
, 2) << " tid " << p
->first
<< " on osd." << op
->session
->osd
2170 << " is laggy" << dendl
;
2175 for (auto p
= s
->linger_ops
.begin();
2176 p
!= s
->linger_ops
.end();
2178 auto op
= p
->second
;
2179 std::unique_lock
wl(op
->watch_lock
);
2180 ceph_assert(op
->session
);
2181 ldout(cct
, 10) << " pinging osd that serves lingering tid " << p
->first
2182 << " (osd." << op
->session
->osd
<< ")" << dendl
;
2184 if (op
->is_watch
&& op
->registered
&& !op
->last_error
)
2185 _send_linger_ping(op
);
2187 for (auto p
= s
->command_ops
.begin();
2188 p
!= s
->command_ops
.end();
2190 auto op
= p
->second
;
2191 ceph_assert(op
->session
);
2192 ldout(cct
, 10) << " pinging osd that serves command tid " << p
->first
2193 << " (osd." << op
->session
->osd
<< ")" << dendl
;
2199 if (num_homeless_ops
|| !toping
.empty()) {
2200 _maybe_request_map();
2203 logger
->set(l_osdc_op_laggy
, laggy_ops
);
2204 logger
->set(l_osdc_osd_laggy
, toping
.size());
2206 if (!toping
.empty()) {
2207 // send a ping to these osds, to ensure we detect any session resets
2208 // (osd reply message policy is lossy)
2209 for (auto i
= toping
.begin(); i
!= toping
.end(); ++i
) {
2210 (*i
)->con
->send_message(new MPing
);
2214 // Make sure we don't reschedule if we wake up after shutdown
2216 tick_event
= timer
.reschedule_me(ceph::make_timespan(
2217 cct
->_conf
->objecter_tick_interval
));
2221 void Objecter::resend_mon_ops()
2223 unique_lock
wl(rwlock
);
2225 ldout(cct
, 10) << "resend_mon_ops" << dendl
;
2227 for (auto p
= poolstat_ops
.begin(); p
!= poolstat_ops
.end(); ++p
) {
2228 _poolstat_submit(p
->second
);
2229 logger
->inc(l_osdc_poolstat_resend
);
2232 for (auto p
= statfs_ops
.begin(); p
!= statfs_ops
.end(); ++p
) {
2233 _fs_stats_submit(p
->second
);
2234 logger
->inc(l_osdc_statfs_resend
);
2237 for (auto p
= pool_ops
.begin(); p
!= pool_ops
.end(); ++p
) {
2238 _pool_op_submit(p
->second
);
2239 logger
->inc(l_osdc_poolop_resend
);
2242 for (auto p
= check_latest_map_ops
.begin();
2243 p
!= check_latest_map_ops
.end();
2245 monc
->get_version("osdmap", CB_Op_Map_Latest(this, p
->second
->tid
));
2248 for (auto p
= check_latest_map_lingers
.begin();
2249 p
!= check_latest_map_lingers
.end();
2251 monc
->get_version("osdmap", CB_Linger_Map_Latest(this, p
->second
->linger_id
));
2254 for (auto p
= check_latest_map_commands
.begin();
2255 p
!= check_latest_map_commands
.end();
2257 monc
->get_version("osdmap", CB_Command_Map_Latest(this, p
->second
->tid
));
2261 // read | write ---------------------------
2263 void Objecter::op_submit(Op
*op
, ceph_tid_t
*ptid
, int *ctx_budget
)
2265 shunique_lock
rl(rwlock
, ceph::acquire_shared
);
2269 op
->trace
.event("op submit");
2270 _op_submit_with_budget(op
, rl
, ptid
, ctx_budget
);
2273 void Objecter::_op_submit_with_budget(Op
*op
,
2274 shunique_lock
<ceph::shared_mutex
>& sul
,
2278 ceph_assert(initialized
);
2280 ceph_assert(op
->ops
.size() == op
->out_bl
.size());
2281 ceph_assert(op
->ops
.size() == op
->out_rval
.size());
2282 ceph_assert(op
->ops
.size() == op
->out_handler
.size());
2284 // throttle. before we look at any state, because
2285 // _take_op_budget() may drop our lock while it blocks.
2286 if (!op
->ctx_budgeted
|| (ctx_budget
&& (*ctx_budget
== -1))) {
2287 int op_budget
= _take_op_budget(op
, sul
);
2288 // take and pass out the budget for the first OP
2289 // in the context session
2290 if (ctx_budget
&& (*ctx_budget
== -1)) {
2291 *ctx_budget
= op_budget
;
2295 if (osd_timeout
> timespan(0)) {
2297 op
->tid
= ++last_tid
;
2299 op
->ontimeout
= timer
.add_event(osd_timeout
,
2301 op_cancel(tid
, -ETIMEDOUT
); });
2304 _op_submit(op
, sul
, ptid
);
2307 void Objecter::_send_op_account(Op
*op
)
2311 // add to gather set(s)
2312 if (op
->has_completion()) {
2315 ldout(cct
, 20) << " note: not requesting reply" << dendl
;
2318 logger
->inc(l_osdc_op_active
);
2319 logger
->inc(l_osdc_op
);
2320 logger
->inc(l_osdc_oplen_avg
, op
->ops
.size());
2322 if ((op
->target
.flags
& (CEPH_OSD_FLAG_READ
| CEPH_OSD_FLAG_WRITE
)) ==
2323 (CEPH_OSD_FLAG_READ
|CEPH_OSD_FLAG_WRITE
))
2324 logger
->inc(l_osdc_op_rmw
);
2325 else if (op
->target
.flags
& CEPH_OSD_FLAG_WRITE
)
2326 logger
->inc(l_osdc_op_w
);
2327 else if (op
->target
.flags
& CEPH_OSD_FLAG_READ
)
2328 logger
->inc(l_osdc_op_r
);
2330 if (op
->target
.flags
& CEPH_OSD_FLAG_PGOP
)
2331 logger
->inc(l_osdc_op_pg
);
2333 for (auto p
= op
->ops
.begin(); p
!= op
->ops
.end(); ++p
) {
2334 int code
= l_osdc_osdop_other
;
2336 case CEPH_OSD_OP_STAT
: code
= l_osdc_osdop_stat
; break;
2337 case CEPH_OSD_OP_CREATE
: code
= l_osdc_osdop_create
; break;
2338 case CEPH_OSD_OP_READ
: code
= l_osdc_osdop_read
; break;
2339 case CEPH_OSD_OP_WRITE
: code
= l_osdc_osdop_write
; break;
2340 case CEPH_OSD_OP_WRITEFULL
: code
= l_osdc_osdop_writefull
; break;
2341 case CEPH_OSD_OP_WRITESAME
: code
= l_osdc_osdop_writesame
; break;
2342 case CEPH_OSD_OP_APPEND
: code
= l_osdc_osdop_append
; break;
2343 case CEPH_OSD_OP_ZERO
: code
= l_osdc_osdop_zero
; break;
2344 case CEPH_OSD_OP_TRUNCATE
: code
= l_osdc_osdop_truncate
; break;
2345 case CEPH_OSD_OP_DELETE
: code
= l_osdc_osdop_delete
; break;
2346 case CEPH_OSD_OP_MAPEXT
: code
= l_osdc_osdop_mapext
; break;
2347 case CEPH_OSD_OP_SPARSE_READ
: code
= l_osdc_osdop_sparse_read
; break;
2348 case CEPH_OSD_OP_GETXATTR
: code
= l_osdc_osdop_getxattr
; break;
2349 case CEPH_OSD_OP_SETXATTR
: code
= l_osdc_osdop_setxattr
; break;
2350 case CEPH_OSD_OP_CMPXATTR
: code
= l_osdc_osdop_cmpxattr
; break;
2351 case CEPH_OSD_OP_RMXATTR
: code
= l_osdc_osdop_rmxattr
; break;
2352 case CEPH_OSD_OP_RESETXATTRS
: code
= l_osdc_osdop_resetxattrs
; break;
2354 // OMAP read operations
2355 case CEPH_OSD_OP_OMAPGETVALS
:
2356 case CEPH_OSD_OP_OMAPGETKEYS
:
2357 case CEPH_OSD_OP_OMAPGETHEADER
:
2358 case CEPH_OSD_OP_OMAPGETVALSBYKEYS
:
2359 case CEPH_OSD_OP_OMAP_CMP
: code
= l_osdc_osdop_omap_rd
; break;
2361 // OMAP write operations
2362 case CEPH_OSD_OP_OMAPSETVALS
:
2363 case CEPH_OSD_OP_OMAPSETHEADER
: code
= l_osdc_osdop_omap_wr
; break;
2365 // OMAP del operations
2366 case CEPH_OSD_OP_OMAPCLEAR
:
2367 case CEPH_OSD_OP_OMAPRMKEYS
: code
= l_osdc_osdop_omap_del
; break;
2369 case CEPH_OSD_OP_CALL
: code
= l_osdc_osdop_call
; break;
2370 case CEPH_OSD_OP_WATCH
: code
= l_osdc_osdop_watch
; break;
2371 case CEPH_OSD_OP_NOTIFY
: code
= l_osdc_osdop_notify
; break;
2378 void Objecter::_op_submit(Op
*op
, shunique_lock
<ceph::shared_mutex
>& sul
, ceph_tid_t
*ptid
)
2382 ldout(cct
, 10) << __func__
<< " op " << op
<< dendl
;
2385 ceph_assert(op
->session
== NULL
);
2386 OSDSession
*s
= NULL
;
2388 bool check_for_latest_map
= false;
2389 int r
= _calc_target(&op
->target
, nullptr);
2391 case RECALC_OP_TARGET_POOL_DNE
:
2392 check_for_latest_map
= true;
2394 case RECALC_OP_TARGET_POOL_EIO
:
2395 if (op
->has_completion()) {
2396 op
->complete(osdc_errc::pool_eio
, -EIO
);
2401 // Try to get a session, including a retry if we need to take write lock
2402 r
= _get_session(op
->target
.osd
, &s
, sul
);
2404 (check_for_latest_map
&& sul
.owns_lock_shared()) ||
2405 cct
->_conf
->objecter_debug_inject_relock_delay
) {
2406 epoch_t orig_epoch
= osdmap
->get_epoch();
2408 if (cct
->_conf
->objecter_debug_inject_relock_delay
) {
2412 if (orig_epoch
!= osdmap
->get_epoch()) {
2413 // map changed; recalculate mapping
2414 ldout(cct
, 10) << __func__
<< " relock raced with osdmap, recalc target"
2416 check_for_latest_map
= _calc_target(&op
->target
, nullptr)
2417 == RECALC_OP_TARGET_POOL_DNE
;
2426 ceph_assert(s
== NULL
);
2427 r
= _get_session(op
->target
.osd
, &s
, sul
);
2429 ceph_assert(r
== 0);
2430 ceph_assert(s
); // may be homeless
2432 _send_op_account(op
);
2436 ceph_assert(op
->target
.flags
& (CEPH_OSD_FLAG_READ
|CEPH_OSD_FLAG_WRITE
));
2438 bool need_send
= false;
2439 if (op
->target
.paused
) {
2440 ldout(cct
, 10) << " tid " << op
->tid
<< " op " << op
<< " is paused"
2442 _maybe_request_map();
2443 } else if (!s
->is_homeless()) {
2446 _maybe_request_map();
2449 unique_lock
sl(s
->lock
);
2451 op
->tid
= ++last_tid
;
2453 ldout(cct
, 10) << "_op_submit oid " << op
->target
.base_oid
2454 << " '" << op
->target
.base_oloc
<< "' '"
2455 << op
->target
.target_oloc
<< "' " << op
->ops
<< " tid "
2456 << op
->tid
<< " osd." << (!s
->is_homeless() ? s
->osd
: -1)
2459 _session_op_assign(s
, op
);
2465 // Last chance to touch Op here, after giving up session lock it can
2466 // be freed at any time by response handler.
2467 ceph_tid_t tid
= op
->tid
;
2468 if (check_for_latest_map
) {
2469 _send_op_map_check(op
);
2478 ldout(cct
, 5) << num_in_flight
<< " in flight" << dendl
;
2481 int Objecter::op_cancel(OSDSession
*s
, ceph_tid_t tid
, int r
)
2483 ceph_assert(initialized
);
2485 unique_lock
sl(s
->lock
);
2487 auto p
= s
->ops
.find(tid
);
2488 if (p
== s
->ops
.end()) {
2489 ldout(cct
, 10) << __func__
<< " tid " << tid
<< " dne in session "
2496 ldout(cct
, 20) << " revoking rx ceph::buffer for " << tid
2497 << " on " << s
->con
<< dendl
;
2498 s
->con
->revoke_rx_buffer(tid
);
2502 ldout(cct
, 10) << __func__
<< " tid " << tid
<< " in session " << s
->osd
2505 if (op
->has_completion()) {
2507 op
->complete(osdcode(r
), r
);
2509 _op_cancel_map_check(op
);
2516 int Objecter::op_cancel(ceph_tid_t tid
, int r
)
2520 unique_lock
wl(rwlock
);
2521 ret
= _op_cancel(tid
, r
);
2526 int Objecter::op_cancel(const vector
<ceph_tid_t
>& tids
, int r
)
2528 unique_lock
wl(rwlock
);
2529 ldout(cct
,10) << __func__
<< " " << tids
<< dendl
;
2530 for (auto tid
: tids
) {
2536 int Objecter::_op_cancel(ceph_tid_t tid
, int r
)
2540 ldout(cct
, 5) << __func__
<< ": cancelling tid " << tid
<< " r=" << r
2545 for (auto siter
= osd_sessions
.begin();
2546 siter
!= osd_sessions
.end(); ++siter
) {
2547 OSDSession
*s
= siter
->second
;
2548 shared_lock
sl(s
->lock
);
2549 if (s
->ops
.find(tid
) != s
->ops
.end()) {
2551 ret
= op_cancel(s
, tid
, r
);
2552 if (ret
== -ENOENT
) {
2553 /* oh no! raced, maybe tid moved to another session, restarting */
2560 ldout(cct
, 5) << __func__
<< ": tid " << tid
2561 << " not found in live sessions" << dendl
;
2563 // Handle case where the op is in homeless session
2564 shared_lock
sl(homeless_session
->lock
);
2565 if (homeless_session
->ops
.find(tid
) != homeless_session
->ops
.end()) {
2567 ret
= op_cancel(homeless_session
, tid
, r
);
2568 if (ret
== -ENOENT
) {
2569 /* oh no! raced, maybe tid moved to another session, restarting */
2578 ldout(cct
, 5) << __func__
<< ": tid " << tid
2579 << " not found in homeless session" << dendl
;
2585 epoch_t
Objecter::op_cancel_writes(int r
, int64_t pool
)
2587 unique_lock
wl(rwlock
);
2589 std::vector
<ceph_tid_t
> to_cancel
;
2592 for (auto siter
= osd_sessions
.begin();
2593 siter
!= osd_sessions
.end(); ++siter
) {
2594 OSDSession
*s
= siter
->second
;
2595 shared_lock
sl(s
->lock
);
2596 for (auto op_i
= s
->ops
.begin();
2597 op_i
!= s
->ops
.end(); ++op_i
) {
2598 if (op_i
->second
->target
.flags
& CEPH_OSD_FLAG_WRITE
2599 && (pool
== -1 || op_i
->second
->target
.target_oloc
.pool
== pool
)) {
2600 to_cancel
.push_back(op_i
->first
);
2605 for (auto titer
= to_cancel
.begin(); titer
!= to_cancel
.end(); ++titer
) {
2606 int cancel_result
= op_cancel(s
, *titer
, r
);
2607 // We hold rwlock across search and cancellation, so cancels
2608 // should always succeed
2609 ceph_assert(cancel_result
== 0);
2611 if (!found
&& to_cancel
.size())
2616 const epoch_t epoch
= osdmap
->get_epoch();
2627 bool Objecter::is_pg_changed(
2629 const vector
<int>& oldacting
,
2631 const vector
<int>& newacting
,
2634 if (OSDMap::primary_changed_broken( // https://tracker.ceph.com/issues/43213
2640 if (any_change
&& oldacting
!= newacting
)
2642 return false; // same primary (tho replicas may have changed)
2645 bool Objecter::target_should_be_paused(op_target_t
*t
)
2647 const pg_pool_t
*pi
= osdmap
->get_pg_pool(t
->base_oloc
.pool
);
2648 bool pauserd
= osdmap
->test_flag(CEPH_OSDMAP_PAUSERD
);
2649 bool pausewr
= osdmap
->test_flag(CEPH_OSDMAP_PAUSEWR
) ||
2650 (t
->respects_full() && (_osdmap_full_flag() || _osdmap_pool_full(*pi
)));
2652 return (t
->flags
& CEPH_OSD_FLAG_READ
&& pauserd
) ||
2653 (t
->flags
& CEPH_OSD_FLAG_WRITE
&& pausewr
) ||
2654 (osdmap
->get_epoch() < epoch_barrier
);
2658 * Locking public accessor for _osdmap_full_flag
2660 bool Objecter::osdmap_full_flag() const
2662 shared_lock
rl(rwlock
);
2664 return _osdmap_full_flag();
2667 bool Objecter::osdmap_pool_full(const int64_t pool_id
) const
2669 shared_lock
rl(rwlock
);
2671 if (_osdmap_full_flag()) {
2675 return _osdmap_pool_full(pool_id
);
2678 bool Objecter::_osdmap_pool_full(const int64_t pool_id
) const
2680 const pg_pool_t
*pool
= osdmap
->get_pg_pool(pool_id
);
2682 ldout(cct
, 4) << __func__
<< ": DNE pool " << pool_id
<< dendl
;
2686 return _osdmap_pool_full(*pool
);
2689 bool Objecter::_osdmap_has_pool_full() const
2691 for (auto it
= osdmap
->get_pools().begin();
2692 it
!= osdmap
->get_pools().end(); ++it
) {
2693 if (_osdmap_pool_full(it
->second
))
2700 * Wrapper around osdmap->test_flag for special handling of the FULL flag.
2702 bool Objecter::_osdmap_full_flag() const
2704 // Ignore the FULL flag if the caller does not have honor_osdmap_full
2705 return osdmap
->test_flag(CEPH_OSDMAP_FULL
) && honor_pool_full
;
2708 void Objecter::update_pool_full_map(map
<int64_t, bool>& pool_full_map
)
2710 for (map
<int64_t, pg_pool_t
>::const_iterator it
2711 = osdmap
->get_pools().begin();
2712 it
!= osdmap
->get_pools().end(); ++it
) {
2713 if (pool_full_map
.find(it
->first
) == pool_full_map
.end()) {
2714 pool_full_map
[it
->first
] = _osdmap_pool_full(it
->second
);
2716 pool_full_map
[it
->first
] = _osdmap_pool_full(it
->second
) ||
2717 pool_full_map
[it
->first
];
2722 int64_t Objecter::get_object_hash_position(int64_t pool
, const string
& key
,
2725 shared_lock
rl(rwlock
);
2726 const pg_pool_t
*p
= osdmap
->get_pg_pool(pool
);
2729 return p
->hash_key(key
, ns
);
2732 int64_t Objecter::get_object_pg_hash_position(int64_t pool
, const string
& key
,
2735 shared_lock
rl(rwlock
);
2736 const pg_pool_t
*p
= osdmap
->get_pg_pool(pool
);
2739 return p
->raw_hash_to_pg(p
->hash_key(key
, ns
));
2742 void Objecter::_prune_snapc(
2743 const mempool::osdmap::map
<int64_t,
2744 snap_interval_set_t
>& new_removed_snaps
,
2748 auto i
= new_removed_snaps
.find(op
->target
.base_pgid
.pool());
2749 if (i
!= new_removed_snaps
.end()) {
2750 for (auto s
: op
->snapc
.snaps
) {
2751 if (i
->second
.contains(s
)) {
2757 vector
<snapid_t
> new_snaps
;
2758 for (auto s
: op
->snapc
.snaps
) {
2759 if (!i
->second
.contains(s
)) {
2760 new_snaps
.push_back(s
);
2763 op
->snapc
.snaps
.swap(new_snaps
);
2764 ldout(cct
,10) << __func__
<< " op " << op
->tid
<< " snapc " << op
->snapc
2765 << " (was " << new_snaps
<< ")" << dendl
;
2770 int Objecter::_calc_target(op_target_t
*t
, Connection
*con
, bool any_change
)
2773 bool is_read
= t
->flags
& CEPH_OSD_FLAG_READ
;
2774 bool is_write
= t
->flags
& CEPH_OSD_FLAG_WRITE
;
2775 t
->epoch
= osdmap
->get_epoch();
2776 ldout(cct
,20) << __func__
<< " epoch " << t
->epoch
2777 << " base " << t
->base_oid
<< " " << t
->base_oloc
2778 << " precalc_pgid " << (int)t
->precalc_pgid
2779 << " pgid " << t
->base_pgid
2780 << (is_read
? " is_read" : "")
2781 << (is_write
? " is_write" : "")
2784 const pg_pool_t
*pi
= osdmap
->get_pg_pool(t
->base_oloc
.pool
);
2787 return RECALC_OP_TARGET_POOL_DNE
;
2790 if (pi
->has_flag(pg_pool_t::FLAG_EIO
)) {
2791 return RECALC_OP_TARGET_POOL_EIO
;
2794 ldout(cct
,30) << __func__
<< " base pi " << pi
2795 << " pg_num " << pi
->get_pg_num() << dendl
;
2797 bool force_resend
= false;
2798 if (osdmap
->get_epoch() == pi
->last_force_op_resend
) {
2799 if (t
->last_force_resend
< pi
->last_force_op_resend
) {
2800 t
->last_force_resend
= pi
->last_force_op_resend
;
2801 force_resend
= true;
2802 } else if (t
->last_force_resend
== 0) {
2803 force_resend
= true;
2808 t
->target_oid
= t
->base_oid
;
2809 t
->target_oloc
= t
->base_oloc
;
2810 if ((t
->flags
& CEPH_OSD_FLAG_IGNORE_OVERLAY
) == 0) {
2811 if (is_read
&& pi
->has_read_tier())
2812 t
->target_oloc
.pool
= pi
->read_tier
;
2813 if (is_write
&& pi
->has_write_tier())
2814 t
->target_oloc
.pool
= pi
->write_tier
;
2815 pi
= osdmap
->get_pg_pool(t
->target_oloc
.pool
);
2818 return RECALC_OP_TARGET_POOL_DNE
;
2823 if (t
->precalc_pgid
) {
2824 ceph_assert(t
->flags
& CEPH_OSD_FLAG_IGNORE_OVERLAY
);
2825 ceph_assert(t
->base_oid
.name
.empty()); // make sure this is a pg op
2826 ceph_assert(t
->base_oloc
.pool
== (int64_t)t
->base_pgid
.pool());
2827 pgid
= t
->base_pgid
;
2829 int ret
= osdmap
->object_locator_to_pg(t
->target_oid
, t
->target_oloc
,
2831 if (ret
== -ENOENT
) {
2833 return RECALC_OP_TARGET_POOL_DNE
;
2836 ldout(cct
,20) << __func__
<< " target " << t
->target_oid
<< " "
2837 << t
->target_oloc
<< " -> pgid " << pgid
<< dendl
;
2838 ldout(cct
,30) << __func__
<< " target pi " << pi
2839 << " pg_num " << pi
->get_pg_num() << dendl
;
2840 t
->pool_ever_existed
= true;
2842 int size
= pi
->size
;
2843 int min_size
= pi
->min_size
;
2844 unsigned pg_num
= pi
->get_pg_num();
2845 unsigned pg_num_mask
= pi
->get_pg_num_mask();
2846 unsigned pg_num_pending
= pi
->get_pg_num_pending();
2847 int up_primary
, acting_primary
;
2848 vector
<int> up
, acting
;
2849 ps_t actual_ps
= ceph_stable_mod(pgid
.ps(), pg_num
, pg_num_mask
);
2850 pg_t
actual_pgid(actual_ps
, pgid
.pool());
2851 if (!lookup_pg_mapping(actual_pgid
, osdmap
->get_epoch(), &up
, &up_primary
,
2852 &acting
, &acting_primary
)) {
2853 osdmap
->pg_to_up_acting_osds(actual_pgid
, &up
, &up_primary
,
2854 &acting
, &acting_primary
);
2855 pg_mapping_t
pg_mapping(osdmap
->get_epoch(),
2856 up
, up_primary
, acting
, acting_primary
);
2857 update_pg_mapping(actual_pgid
, std::move(pg_mapping
));
2859 bool sort_bitwise
= osdmap
->test_flag(CEPH_OSDMAP_SORTBITWISE
);
2860 bool recovery_deletes
= osdmap
->test_flag(CEPH_OSDMAP_RECOVERY_DELETES
);
2861 unsigned prev_seed
= ceph_stable_mod(pgid
.ps(), t
->pg_num
, t
->pg_num_mask
);
2862 pg_t
prev_pgid(prev_seed
, pgid
.pool());
2863 if (any_change
&& PastIntervals::is_new_interval(
2882 t
->recovery_deletes
,
2884 t
->peering_crush_bucket_count
,
2885 pi
->peering_crush_bucket_count
,
2886 t
->peering_crush_bucket_target
,
2887 pi
->peering_crush_bucket_target
,
2888 t
->peering_crush_bucket_barrier
,
2889 pi
->peering_crush_bucket_barrier
,
2890 t
->peering_crush_mandatory_member
,
2891 pi
->peering_crush_mandatory_member
,
2893 force_resend
= true;
2896 bool unpaused
= false;
2897 bool should_be_paused
= target_should_be_paused(t
);
2898 if (t
->paused
&& !should_be_paused
) {
2901 if (t
->paused
!= should_be_paused
) {
2902 ldout(cct
, 10) << __func__
<< " paused " << t
->paused
2903 << " -> " << should_be_paused
<< dendl
;
2904 t
->paused
= should_be_paused
;
2907 bool legacy_change
=
2910 t
->acting_primary
, t
->acting
, acting_primary
, acting
,
2911 t
->used_replica
|| any_change
);
2912 bool split_or_merge
= false;
2915 prev_pgid
.is_split(t
->pg_num
, pg_num
, nullptr) ||
2916 prev_pgid
.is_merge_source(t
->pg_num
, pg_num
, nullptr) ||
2917 prev_pgid
.is_merge_target(t
->pg_num
, pg_num
);
2920 if (legacy_change
|| split_or_merge
|| force_resend
) {
2922 t
->acting
= std::move(acting
);
2923 t
->acting_primary
= acting_primary
;
2924 t
->up_primary
= up_primary
;
2925 t
->up
= std::move(up
);
2927 t
->min_size
= min_size
;
2929 t
->pg_num_mask
= pg_num_mask
;
2930 t
->pg_num_pending
= pg_num_pending
;
2931 spg_t
spgid(actual_pgid
);
2932 if (pi
->is_erasure()) {
2933 for (uint8_t i
= 0; i
< t
->acting
.size(); ++i
) {
2934 if (t
->acting
[i
] == acting_primary
) {
2935 spgid
.reset_shard(shard_id_t(i
));
2940 t
->actual_pgid
= spgid
;
2941 t
->sort_bitwise
= sort_bitwise
;
2942 t
->recovery_deletes
= recovery_deletes
;
2943 t
->peering_crush_bucket_count
= pi
->peering_crush_bucket_count
;
2944 t
->peering_crush_bucket_target
= pi
->peering_crush_bucket_target
;
2945 t
->peering_crush_bucket_barrier
= pi
->peering_crush_bucket_barrier
;
2946 t
->peering_crush_mandatory_member
= pi
->peering_crush_mandatory_member
;
2947 ldout(cct
, 10) << __func__
<< " "
2948 << " raw pgid " << pgid
<< " -> actual " << t
->actual_pgid
2949 << " acting " << t
->acting
2950 << " primary " << acting_primary
<< dendl
;
2951 t
->used_replica
= false;
2952 if ((t
->flags
& (CEPH_OSD_FLAG_BALANCE_READS
|
2953 CEPH_OSD_FLAG_LOCALIZE_READS
)) &&
2954 !is_write
&& pi
->is_replicated() && t
->acting
.size() > 1) {
2956 ceph_assert(is_read
&& t
->acting
[0] == acting_primary
);
2957 if (t
->flags
& CEPH_OSD_FLAG_BALANCE_READS
) {
2958 int p
= rand() % t
->acting
.size();
2960 t
->used_replica
= true;
2962 ldout(cct
, 10) << " chose random osd." << osd
<< " of " << t
->acting
2965 // look for a local replica. prefer the primary if the
2966 // distance is the same.
2968 int best_locality
= 0;
2969 for (unsigned i
= 0; i
< t
->acting
.size(); ++i
) {
2970 int locality
= osdmap
->crush
->get_common_ancestor_distance(
2971 cct
, t
->acting
[i
], crush_location
);
2972 ldout(cct
, 20) << __func__
<< " localize: rank " << i
2973 << " osd." << t
->acting
[i
]
2974 << " locality " << locality
<< dendl
;
2976 (locality
>= 0 && best_locality
>= 0 &&
2977 locality
< best_locality
) ||
2978 (best_locality
< 0 && locality
>= 0)) {
2980 best_locality
= locality
;
2982 t
->used_replica
= true;
2985 ceph_assert(best
>= 0);
2986 osd
= t
->acting
[best
];
2990 t
->osd
= acting_primary
;
2993 if (legacy_change
|| unpaused
|| force_resend
) {
2994 return RECALC_OP_TARGET_NEED_RESEND
;
2996 if (split_or_merge
&&
2997 (osdmap
->require_osd_release
>= ceph_release_t::luminous
||
2998 HAVE_FEATURE(osdmap
->get_xinfo(acting_primary
).features
,
2999 RESEND_ON_SPLIT
))) {
3000 return RECALC_OP_TARGET_NEED_RESEND
;
3002 return RECALC_OP_TARGET_NO_ACTION
;
3005 int Objecter::_map_session(op_target_t
*target
, OSDSession
**s
,
3006 shunique_lock
<ceph::shared_mutex
>& sul
)
3008 _calc_target(target
, nullptr);
3009 return _get_session(target
->osd
, s
, sul
);
3012 void Objecter::_session_op_assign(OSDSession
*to
, Op
*op
)
3014 // to->lock is locked
3015 ceph_assert(op
->session
== NULL
);
3016 ceph_assert(op
->tid
);
3020 to
->ops
[op
->tid
] = op
;
3022 if (to
->is_homeless()) {
3026 ldout(cct
, 15) << __func__
<< " " << to
->osd
<< " " << op
->tid
<< dendl
;
3029 void Objecter::_session_op_remove(OSDSession
*from
, Op
*op
)
3031 ceph_assert(op
->session
== from
);
3032 // from->lock is locked
3034 if (from
->is_homeless()) {
3038 from
->ops
.erase(op
->tid
);
3042 ldout(cct
, 15) << __func__
<< " " << from
->osd
<< " " << op
->tid
<< dendl
;
3045 void Objecter::_session_linger_op_assign(OSDSession
*to
, LingerOp
*op
)
3047 // to lock is locked unique
3048 ceph_assert(op
->session
== NULL
);
3050 if (to
->is_homeless()) {
3056 to
->linger_ops
[op
->linger_id
] = op
;
3058 ldout(cct
, 15) << __func__
<< " " << to
->osd
<< " " << op
->linger_id
3062 void Objecter::_session_linger_op_remove(OSDSession
*from
, LingerOp
*op
)
3064 ceph_assert(from
== op
->session
);
3065 // from->lock is locked unique
3067 if (from
->is_homeless()) {
3071 from
->linger_ops
.erase(op
->linger_id
);
3075 ldout(cct
, 15) << __func__
<< " " << from
->osd
<< " " << op
->linger_id
3079 void Objecter::_session_command_op_remove(OSDSession
*from
, CommandOp
*op
)
3081 ceph_assert(from
== op
->session
);
3082 // from->lock is locked
3084 if (from
->is_homeless()) {
3088 from
->command_ops
.erase(op
->tid
);
3092 ldout(cct
, 15) << __func__
<< " " << from
->osd
<< " " << op
->tid
<< dendl
;
3095 void Objecter::_session_command_op_assign(OSDSession
*to
, CommandOp
*op
)
3097 // to->lock is locked
3098 ceph_assert(op
->session
== NULL
);
3099 ceph_assert(op
->tid
);
3101 if (to
->is_homeless()) {
3107 to
->command_ops
[op
->tid
] = op
;
3109 ldout(cct
, 15) << __func__
<< " " << to
->osd
<< " " << op
->tid
<< dendl
;
3112 int Objecter::_recalc_linger_op_target(LingerOp
*linger_op
,
3113 shunique_lock
<ceph::shared_mutex
>& sul
)
3115 // rwlock is locked unique
3117 int r
= _calc_target(&linger_op
->target
, nullptr, true);
3118 if (r
== RECALC_OP_TARGET_NEED_RESEND
) {
3119 ldout(cct
, 10) << "recalc_linger_op_target tid " << linger_op
->linger_id
3120 << " pgid " << linger_op
->target
.pgid
3121 << " acting " << linger_op
->target
.acting
<< dendl
;
3123 OSDSession
*s
= NULL
;
3124 r
= _get_session(linger_op
->target
.osd
, &s
, sul
);
3125 ceph_assert(r
== 0);
3127 if (linger_op
->session
!= s
) {
3128 // NB locking two sessions (s and linger_op->session) at the
3129 // same time here is only safe because we are the only one that
3130 // takes two, and we are holding rwlock for write. We use
3131 // std::shared_mutex in OSDSession because lockdep doesn't know
3133 unique_lock
sl(s
->lock
);
3134 _session_linger_op_remove(linger_op
->session
, linger_op
);
3135 _session_linger_op_assign(s
, linger_op
);
3139 return RECALC_OP_TARGET_NEED_RESEND
;
3144 void Objecter::_cancel_linger_op(Op
*op
)
3146 ldout(cct
, 15) << "cancel_op " << op
->tid
<< dendl
;
3148 ceph_assert(!op
->should_resend
);
3149 if (op
->has_completion()) {
3150 op
->onfinish
= nullptr;
3157 void Objecter::_finish_op(Op
*op
, int r
)
3159 ldout(cct
, 15) << __func__
<< " " << op
->tid
<< dendl
;
3161 // op->session->lock is locked unique or op->session is null
3163 if (!op
->ctx_budgeted
&& op
->budget
>= 0) {
3164 put_op_budget_bytes(op
->budget
);
3168 if (op
->ontimeout
&& r
!= -ETIMEDOUT
)
3169 timer
.cancel_event(op
->ontimeout
);
3172 _session_op_remove(op
->session
, op
);
3175 logger
->dec(l_osdc_op_active
);
3177 ceph_assert(check_latest_map_ops
.find(op
->tid
) == check_latest_map_ops
.end());
3184 Objecter::MOSDOp
*Objecter::_prepare_osd_op(Op
*op
)
3188 int flags
= op
->target
.flags
;
3189 flags
|= CEPH_OSD_FLAG_KNOWN_REDIR
;
3190 flags
|= CEPH_OSD_FLAG_SUPPORTSPOOLEIO
;
3192 // Nothing checks this any longer, but needed for compatibility with
3193 // pre-luminous osds
3194 flags
|= CEPH_OSD_FLAG_ONDISK
;
3196 if (!honor_pool_full
)
3197 flags
|= CEPH_OSD_FLAG_FULL_FORCE
;
3199 op
->target
.paused
= false;
3200 op
->stamp
= ceph::coarse_mono_clock::now();
3202 hobject_t hobj
= op
->target
.get_hobj();
3203 auto m
= new MOSDOp(client_inc
, op
->tid
,
3204 hobj
, op
->target
.actual_pgid
,
3205 osdmap
->get_epoch(),
3206 flags
, op
->features
);
3208 m
->set_snapid(op
->snapid
);
3209 m
->set_snap_seq(op
->snapc
.seq
);
3210 m
->set_snaps(op
->snapc
.snaps
);
3213 m
->set_mtime(op
->mtime
);
3214 m
->set_retry_attempt(op
->attempts
++);
3216 if (!op
->trace
.valid() && cct
->_conf
->osdc_blkin_trace_all
) {
3217 op
->trace
.init("op", &trace_endpoint
);
3221 m
->set_priority(op
->priority
);
3223 m
->set_priority(cct
->_conf
->osd_client_op_priority
);
3225 if (op
->reqid
!= osd_reqid_t()) {
3226 m
->set_reqid(op
->reqid
);
3229 logger
->inc(l_osdc_op_send
);
3231 for (unsigned i
= 0; i
< m
->ops
.size(); i
++) {
3232 sum
+= m
->ops
[i
].indata
.length();
3234 logger
->inc(l_osdc_op_send_bytes
, sum
);
3239 void Objecter::_send_op(Op
*op
)
3242 // op->session->lock is locked
3245 auto p
= op
->session
->backoffs
.find(op
->target
.actual_pgid
);
3246 if (p
!= op
->session
->backoffs
.end()) {
3247 hobject_t hoid
= op
->target
.get_hobj();
3248 auto q
= p
->second
.lower_bound(hoid
);
3249 if (q
!= p
->second
.begin()) {
3251 if (hoid
>= q
->second
.end
) {
3255 if (q
!= p
->second
.end()) {
3256 ldout(cct
, 20) << __func__
<< " ? " << q
->first
<< " [" << q
->second
.begin
3257 << "," << q
->second
.end
<< ")" << dendl
;
3258 int r
= cmp(hoid
, q
->second
.begin
);
3259 if (r
== 0 || (r
> 0 && hoid
< q
->second
.end
)) {
3260 ldout(cct
, 10) << __func__
<< " backoff " << op
->target
.actual_pgid
3261 << " id " << q
->second
.id
<< " on " << hoid
3262 << ", queuing " << op
<< " tid " << op
->tid
<< dendl
;
3268 ceph_assert(op
->tid
> 0);
3269 MOSDOp
*m
= _prepare_osd_op(op
);
3271 if (op
->target
.actual_pgid
!= m
->get_spg()) {
3272 ldout(cct
, 10) << __func__
<< " " << op
->tid
<< " pgid change from "
3273 << m
->get_spg() << " to " << op
->target
.actual_pgid
3274 << ", updating and reencoding" << dendl
;
3275 m
->set_spg(op
->target
.actual_pgid
);
3276 m
->clear_payload(); // reencode
3279 ldout(cct
, 15) << "_send_op " << op
->tid
<< " to "
3280 << op
->target
.actual_pgid
<< " on osd." << op
->session
->osd
3283 ConnectionRef con
= op
->session
->con
;
3287 // preallocated rx ceph::buffer?
3289 ldout(cct
, 20) << " revoking rx ceph::buffer for " << op
->tid
<< " on "
3290 << op
->con
<< dendl
;
3291 op
->con
->revoke_rx_buffer(op
->tid
);
3294 op
->ontimeout
== 0 && // only post rx_buffer if no timeout; see #9582
3295 op
->outbl
->length()) {
3296 op
->outbl
->invalidate_crc(); // messenger writes through c_str()
3297 ldout(cct
, 20) << " posting rx ceph::buffer for " << op
->tid
<< " on " << con
3300 op
->con
->post_rx_buffer(op
->tid
, *op
->outbl
);
3304 op
->incarnation
= op
->session
->incarnation
;
3306 if (op
->trace
.valid()) {
3307 m
->trace
.init("op msg", nullptr, &op
->trace
);
3309 op
->session
->con
->send_message(m
);
3312 int Objecter::calc_op_budget(const bc::small_vector_base
<OSDOp
>& ops
)
3315 for (auto i
= ops
.begin(); i
!= ops
.end(); ++i
) {
3316 if (i
->op
.op
& CEPH_OSD_OP_MODE_WR
) {
3317 op_budget
+= i
->indata
.length();
3318 } else if (ceph_osd_op_mode_read(i
->op
.op
)) {
3319 if (ceph_osd_op_uses_extent(i
->op
.op
)) {
3320 if ((int64_t)i
->op
.extent
.length
> 0)
3321 op_budget
+= (int64_t)i
->op
.extent
.length
;
3322 } else if (ceph_osd_op_type_attr(i
->op
.op
)) {
3323 op_budget
+= i
->op
.xattr
.name_len
+ i
->op
.xattr
.value_len
;
3330 void Objecter::_throttle_op(Op
*op
,
3331 shunique_lock
<ceph::shared_mutex
>& sul
,
3334 ceph_assert(sul
&& sul
.mutex() == &rwlock
);
3335 bool locked_for_write
= sul
.owns_lock();
3338 op_budget
= calc_op_budget(op
->ops
);
3339 if (!op_throttle_bytes
.get_or_fail(op_budget
)) { //couldn't take right now
3341 op_throttle_bytes
.get(op_budget
);
3342 if (locked_for_write
)
3347 if (!op_throttle_ops
.get_or_fail(1)) { //couldn't take right now
3349 op_throttle_ops
.get(1);
3350 if (locked_for_write
)
3357 int Objecter::take_linger_budget(LingerOp
*info
)
3362 /* This function DOES put the passed message before returning */
3363 void Objecter::handle_osd_op_reply(MOSDOpReply
*m
)
3365 ldout(cct
, 10) << "in handle_osd_op_reply" << dendl
;
3368 ceph_tid_t tid
= m
->get_tid();
3370 shunique_lock
sul(rwlock
, ceph::acquire_shared
);
3376 ConnectionRef con
= m
->get_connection();
3377 auto priv
= con
->get_priv();
3378 auto s
= static_cast<OSDSession
*>(priv
.get());
3379 if (!s
|| s
->con
!= con
) {
3380 ldout(cct
, 7) << __func__
<< " no session on con " << con
<< dendl
;
3385 unique_lock
sl(s
->lock
);
3387 map
<ceph_tid_t
, Op
*>::iterator iter
= s
->ops
.find(tid
);
3388 if (iter
== s
->ops
.end()) {
3389 ldout(cct
, 7) << "handle_osd_op_reply " << tid
3390 << (m
->is_ondisk() ? " ondisk" : (m
->is_onnvram() ?
3391 " onnvram" : " ack"))
3392 << " ... stray" << dendl
;
3398 ldout(cct
, 7) << "handle_osd_op_reply " << tid
3399 << (m
->is_ondisk() ? " ondisk" :
3400 (m
->is_onnvram() ? " onnvram" : " ack"))
3401 << " uv " << m
->get_user_version()
3402 << " in " << m
->get_pg()
3403 << " attempt " << m
->get_retry_attempt()
3405 Op
*op
= iter
->second
;
3406 op
->trace
.event("osd op reply");
3408 if (retry_writes_after_first_reply
&& op
->attempts
== 1 &&
3409 (op
->target
.flags
& CEPH_OSD_FLAG_WRITE
)) {
3410 ldout(cct
, 7) << "retrying write after first reply: " << tid
<< dendl
;
3411 if (op
->has_completion()) {
3414 _session_op_remove(s
, op
);
3417 _op_submit(op
, sul
, NULL
);
3422 if (m
->get_retry_attempt() >= 0) {
3423 if (m
->get_retry_attempt() != (op
->attempts
- 1)) {
3424 ldout(cct
, 7) << " ignoring reply from attempt "
3425 << m
->get_retry_attempt()
3426 << " from " << m
->get_source_inst()
3427 << "; last attempt " << (op
->attempts
- 1) << " sent to "
3428 << op
->session
->con
->get_peer_addr() << dendl
;
3434 // we don't know the request attempt because the server is old, so
3435 // just accept this one. we may do ACK callbacks we shouldn't
3436 // have, but that is better than doing callbacks out of order.
3439 decltype(op
->onfinish
) onfinish
;
3441 int rc
= m
->get_result();
3443 if (m
->is_redirect_reply()) {
3444 ldout(cct
, 5) << " got redirect reply; redirecting" << dendl
;
3445 if (op
->has_completion())
3447 _session_op_remove(s
, op
);
3450 // FIXME: two redirects could race and reorder
3453 m
->get_redirect().combine_with_locator(op
->target
.target_oloc
,
3454 op
->target
.target_oid
.name
);
3455 op
->target
.flags
|= (CEPH_OSD_FLAG_REDIRECTED
|
3456 CEPH_OSD_FLAG_IGNORE_CACHE
|
3457 CEPH_OSD_FLAG_IGNORE_OVERLAY
);
3458 _op_submit(op
, sul
, NULL
);
3463 if (rc
== -EAGAIN
) {
3464 ldout(cct
, 7) << " got -EAGAIN, resubmitting" << dendl
;
3465 if (op
->has_completion())
3467 _session_op_remove(s
, op
);
3471 op
->target
.flags
&= ~(CEPH_OSD_FLAG_BALANCE_READS
|
3472 CEPH_OSD_FLAG_LOCALIZE_READS
);
3473 op
->target
.pgid
= pg_t();
3474 _op_submit(op
, sul
, NULL
);
3482 *op
->objver
= m
->get_user_version();
3483 if (op
->reply_epoch
)
3484 *op
->reply_epoch
= m
->get_map_epoch();
3485 if (op
->data_offset
)
3486 *op
->data_offset
= m
->get_header().data_off
;
3492 op
->con
->revoke_rx_buffer(op
->tid
);
3494 auto& bl
= m
->get_data();
3495 if (op
->outbl
->length() == bl
.length() &&
3496 bl
.get_num_buffers() <= 1) {
3497 // this is here to keep previous users to *relied* on getting data
3498 // read into existing buffers happy. Notably,
3499 // libradosstriper::RadosStriperImpl::aio_read().
3500 ldout(cct
,10) << __func__
<< " copying resulting " << bl
.length()
3501 << " into existing ceph::buffer of length " << op
->outbl
->length()
3504 t
= std::move(*op
->outbl
);
3505 t
.invalidate_crc(); // we're overwriting the raw buffers via c_str()
3506 bl
.begin().copy(bl
.length(), t
.c_str());
3507 op
->outbl
->substr_of(t
, 0, bl
.length());
3509 m
->claim_data(*op
->outbl
);
3514 // per-op result demuxing
3515 vector
<OSDOp
> out_ops
;
3516 m
->claim_ops(out_ops
);
3518 if (out_ops
.size() != op
->ops
.size())
3519 ldout(cct
, 0) << "WARNING: tid " << op
->tid
<< " reply ops " << out_ops
3520 << " != request ops " << op
->ops
3521 << " from " << m
->get_source_inst() << dendl
;
3523 ceph_assert(op
->ops
.size() == op
->out_bl
.size());
3524 ceph_assert(op
->ops
.size() == op
->out_rval
.size());
3525 ceph_assert(op
->ops
.size() == op
->out_ec
.size());
3526 ceph_assert(op
->ops
.size() == op
->out_handler
.size());
3527 auto pb
= op
->out_bl
.begin();
3528 auto pr
= op
->out_rval
.begin();
3529 auto pe
= op
->out_ec
.begin();
3530 auto ph
= op
->out_handler
.begin();
3531 ceph_assert(op
->out_bl
.size() == op
->out_rval
.size());
3532 ceph_assert(op
->out_bl
.size() == op
->out_handler
.size());
3533 auto p
= out_ops
.begin();
3534 for (unsigned i
= 0;
3535 p
!= out_ops
.end() && pb
!= op
->out_bl
.end();
3536 ++i
, ++p
, ++pb
, ++pr
, ++pe
, ++ph
) {
3537 ldout(cct
, 10) << " op " << i
<< " rval " << p
->rval
3538 << " len " << p
->outdata
.length() << dendl
;
3541 // set rval before running handlers so that handlers
3542 // can change it if e.g. decoding fails
3544 **pr
= ceph_to_hostos_errno(p
->rval
);
3546 **pe
= p
->rval
< 0 ? bs::error_code(-p
->rval
, osd_category()) :
3549 std::move((*ph
))(p
->rval
< 0 ?
3550 bs::error_code(-p
->rval
, osd_category()) :
3552 p
->rval
, p
->outdata
);
3556 // NOTE: we assume that since we only request ONDISK ever we will
3557 // only ever get back one (type of) ack ever.
3559 if (op
->has_completion()) {
3561 onfinish
= std::move(op
->onfinish
);
3562 op
->onfinish
= nullptr;
3564 logger
->inc(l_osdc_op_reply
);
3566 /* get it before we call _finish_op() */
3567 auto completion_lock
= s
->get_lock(op
->target
.base_oid
);
3569 ldout(cct
, 15) << "handle_osd_op_reply completed tid " << tid
<< dendl
;
3572 ldout(cct
, 5) << num_in_flight
<< " in flight" << dendl
;
3574 // serialize completions
3575 if (completion_lock
.mutex()) {
3576 completion_lock
.lock();
3581 if (Op::has_completion(onfinish
)) {
3582 Op::complete(std::move(onfinish
), osdcode(rc
), rc
);
3584 if (completion_lock
.mutex()) {
3585 completion_lock
.unlock();
3591 void Objecter::handle_osd_backoff(MOSDBackoff
*m
)
3593 ldout(cct
, 10) << __func__
<< " " << *m
<< dendl
;
3594 shunique_lock
sul(rwlock
, ceph::acquire_shared
);
3600 ConnectionRef con
= m
->get_connection();
3601 auto priv
= con
->get_priv();
3602 auto s
= static_cast<OSDSession
*>(priv
.get());
3603 if (!s
|| s
->con
!= con
) {
3604 ldout(cct
, 7) << __func__
<< " no session on con " << con
<< dendl
;
3611 unique_lock
sl(s
->lock
);
3614 case CEPH_OSD_BACKOFF_OP_BLOCK
:
3617 OSDBackoff
& b
= s
->backoffs
[m
->pgid
][m
->begin
];
3618 s
->backoffs_by_id
.insert(make_pair(m
->id
, &b
));
3624 // ack with original backoff's epoch so that the osd can discard this if
3625 // there was a pg split.
3626 auto r
= new MOSDBackoff(m
->pgid
, m
->map_epoch
,
3627 CEPH_OSD_BACKOFF_OP_ACK_BLOCK
,
3628 m
->id
, m
->begin
, m
->end
);
3629 // this priority must match the MOSDOps from _prepare_osd_op
3630 r
->set_priority(cct
->_conf
->osd_client_op_priority
);
3631 con
->send_message(r
);
3635 case CEPH_OSD_BACKOFF_OP_UNBLOCK
:
3637 auto p
= s
->backoffs_by_id
.find(m
->id
);
3638 if (p
!= s
->backoffs_by_id
.end()) {
3639 OSDBackoff
*b
= p
->second
;
3640 if (b
->begin
!= m
->begin
&&
3642 lderr(cct
) << __func__
<< " got " << m
->pgid
<< " id " << m
->id
3644 << m
->begin
<< "," << m
->end
<< ") but backoff is ["
3645 << b
->begin
<< "," << b
->end
<< ")" << dendl
;
3646 // hrmpf, unblock it anyway.
3648 ldout(cct
, 10) << __func__
<< " unblock backoff " << b
->pgid
3650 << " [" << b
->begin
<< "," << b
->end
3652 auto spgp
= s
->backoffs
.find(b
->pgid
);
3653 ceph_assert(spgp
!= s
->backoffs
.end());
3654 spgp
->second
.erase(b
->begin
);
3655 if (spgp
->second
.empty()) {
3656 s
->backoffs
.erase(spgp
);
3658 s
->backoffs_by_id
.erase(p
);
3660 // check for any ops to resend
3661 for (auto& q
: s
->ops
) {
3662 if (q
.second
->target
.actual_pgid
== m
->pgid
) {
3663 int r
= q
.second
->target
.contained_by(m
->begin
, m
->end
);
3664 ldout(cct
, 20) << __func__
<< " contained_by " << r
<< " on "
3665 << q
.second
->target
.get_hobj() << dendl
;
3672 lderr(cct
) << __func__
<< " " << m
->pgid
<< " id " << m
->id
3674 << m
->begin
<< "," << m
->end
<< ") but backoff dne" << dendl
;
3680 ldout(cct
, 10) << __func__
<< " unrecognized op " << (int)m
->op
<< dendl
;
3690 uint32_t Objecter::list_nobjects_seek(NListContext
*list_context
,
3693 shared_lock
rl(rwlock
);
3694 list_context
->pos
= hobject_t(object_t(), string(), CEPH_NOSNAP
,
3695 pos
, list_context
->pool_id
, string());
3696 ldout(cct
, 10) << __func__
<< " " << list_context
3697 << " pos " << pos
<< " -> " << list_context
->pos
<< dendl
;
3698 pg_t actual
= osdmap
->raw_pg_to_pg(pg_t(pos
, list_context
->pool_id
));
3699 list_context
->current_pg
= actual
.ps();
3700 list_context
->at_end_of_pool
= false;
3704 uint32_t Objecter::list_nobjects_seek(NListContext
*list_context
,
3705 const hobject_t
& cursor
)
3707 shared_lock
rl(rwlock
);
3708 ldout(cct
, 10) << "list_nobjects_seek " << list_context
<< dendl
;
3709 list_context
->pos
= cursor
;
3710 list_context
->at_end_of_pool
= false;
3711 pg_t actual
= osdmap
->raw_pg_to_pg(pg_t(cursor
.get_hash(), list_context
->pool_id
));
3712 list_context
->current_pg
= actual
.ps();
3713 list_context
->sort_bitwise
= true;
3714 return list_context
->current_pg
;
3717 void Objecter::list_nobjects_get_cursor(NListContext
*list_context
,
3720 shared_lock
rl(rwlock
);
3721 if (list_context
->list
.empty()) {
3722 *cursor
= list_context
->pos
;
3724 const librados::ListObjectImpl
& entry
= list_context
->list
.front();
3725 const string
*key
= (entry
.locator
.empty() ? &entry
.oid
: &entry
.locator
);
3726 uint32_t h
= osdmap
->get_pg_pool(list_context
->pool_id
)->hash_key(*key
, entry
.nspace
);
3727 *cursor
= hobject_t(entry
.oid
, entry
.locator
, list_context
->pool_snap_seq
, h
, list_context
->pool_id
, entry
.nspace
);
3731 void Objecter::list_nobjects(NListContext
*list_context
, Context
*onfinish
)
3733 ldout(cct
, 10) << __func__
<< " pool_id " << list_context
->pool_id
3734 << " pool_snap_seq " << list_context
->pool_snap_seq
3735 << " max_entries " << list_context
->max_entries
3736 << " list_context " << list_context
3737 << " onfinish " << onfinish
3738 << " current_pg " << list_context
->current_pg
3739 << " pos " << list_context
->pos
<< dendl
;
3741 shared_lock
rl(rwlock
);
3742 const pg_pool_t
*pool
= osdmap
->get_pg_pool(list_context
->pool_id
);
3743 if (!pool
) { // pool is gone
3745 put_nlist_context_budget(list_context
);
3746 onfinish
->complete(-ENOENT
);
3749 int pg_num
= pool
->get_pg_num();
3750 bool sort_bitwise
= osdmap
->test_flag(CEPH_OSDMAP_SORTBITWISE
);
3752 if (list_context
->pos
.is_min()) {
3753 list_context
->starting_pg_num
= 0;
3754 list_context
->sort_bitwise
= sort_bitwise
;
3755 list_context
->starting_pg_num
= pg_num
;
3757 if (list_context
->sort_bitwise
!= sort_bitwise
) {
3758 list_context
->pos
= hobject_t(
3759 object_t(), string(), CEPH_NOSNAP
,
3760 list_context
->current_pg
, list_context
->pool_id
, string());
3761 list_context
->sort_bitwise
= sort_bitwise
;
3762 ldout(cct
, 10) << " hobject sort order changed, restarting this pg at "
3763 << list_context
->pos
<< dendl
;
3765 if (list_context
->starting_pg_num
!= pg_num
) {
3766 if (!sort_bitwise
) {
3767 // start reading from the beginning; the pgs have changed
3768 ldout(cct
, 10) << " pg_num changed; restarting with " << pg_num
<< dendl
;
3769 list_context
->pos
= collection_list_handle_t();
3771 list_context
->starting_pg_num
= pg_num
;
3774 if (list_context
->pos
.is_max()) {
3775 ldout(cct
, 20) << __func__
<< " end of pool, list "
3776 << list_context
->list
<< dendl
;
3777 if (list_context
->list
.empty()) {
3778 list_context
->at_end_of_pool
= true;
3780 // release the listing context's budget once all
3781 // OPs (in the session) are finished
3782 put_nlist_context_budget(list_context
);
3783 onfinish
->complete(0);
3788 op
.pg_nls(list_context
->max_entries
, list_context
->filter
,
3789 list_context
->pos
, osdmap
->get_epoch());
3790 list_context
->bl
.clear();
3791 auto onack
= new C_NList(list_context
, onfinish
, this);
3792 object_locator_t
oloc(list_context
->pool_id
, list_context
->nspace
);
3794 // note current_pg in case we don't have (or lose) SORTBITWISE
3795 list_context
->current_pg
= pool
->raw_hash_to_pg(list_context
->pos
.get_hash());
3798 pg_read(list_context
->current_pg
, oloc
, op
,
3799 &list_context
->bl
, 0, onack
, &onack
->epoch
,
3800 &list_context
->ctx_budget
);
3803 void Objecter::_nlist_reply(NListContext
*list_context
, int r
,
3804 Context
*final_finish
, epoch_t reply_epoch
)
3806 ldout(cct
, 10) << __func__
<< " " << list_context
<< dendl
;
3808 auto iter
= list_context
->bl
.cbegin();
3809 pg_nls_response_t response
;
3810 decode(response
, iter
);
3812 // we do this as legacy.
3813 cb::list legacy_extra_info
;
3814 decode(legacy_extra_info
, iter
);
3817 // if the osd returns 1 (newer code), or handle MAX, it means we
3818 // hit the end of the pg.
3819 if ((response
.handle
.is_max() || r
== 1) &&
3820 !list_context
->sort_bitwise
) {
3821 // legacy OSD and !sortbitwise, figure out the next PG on our own
3822 ++list_context
->current_pg
;
3823 if (list_context
->current_pg
== list_context
->starting_pg_num
) {
3825 list_context
->pos
= hobject_t::get_max();
3828 list_context
->pos
= hobject_t(object_t(), string(), CEPH_NOSNAP
,
3829 list_context
->current_pg
,
3830 list_context
->pool_id
, string());
3833 list_context
->pos
= response
.handle
;
3836 int response_size
= response
.entries
.size();
3837 ldout(cct
, 20) << " response.entries.size " << response_size
3838 << ", response.entries " << response
.entries
3839 << ", handle " << response
.handle
3840 << ", tentative new pos " << list_context
->pos
<< dendl
;
3841 if (response_size
) {
3842 std::move(response
.entries
.begin(), response
.entries
.end(),
3843 std::back_inserter(list_context
->list
));
3844 response
.entries
.clear();
3847 if (list_context
->list
.size() >= list_context
->max_entries
) {
3848 ldout(cct
, 20) << " hit max, returning results so far, "
3849 << list_context
->list
<< dendl
;
3850 // release the listing context's budget once all
3851 // OPs (in the session) are finished
3852 put_nlist_context_budget(list_context
);
3853 final_finish
->complete(0);
3858 list_nobjects(list_context
, final_finish
);
3861 void Objecter::put_nlist_context_budget(NListContext
*list_context
)
3863 if (list_context
->ctx_budget
>= 0) {
3864 ldout(cct
, 10) << " release listing context's budget " <<
3865 list_context
->ctx_budget
<< dendl
;
3866 put_op_budget_bytes(list_context
->ctx_budget
);
3867 list_context
->ctx_budget
= -1;
3873 void Objecter::create_pool_snap(int64_t pool
, std::string_view snap_name
,
3874 decltype(PoolOp::onfinish
)&& onfinish
)
3876 unique_lock
wl(rwlock
);
3877 ldout(cct
, 10) << "create_pool_snap; pool: " << pool
<< "; snap: "
3878 << snap_name
<< dendl
;
3880 const pg_pool_t
*p
= osdmap
->get_pg_pool(pool
);
3882 onfinish
->defer(std::move(onfinish
), osdc_errc::pool_dne
, cb::list
{});
3885 if (p
->snap_exists(snap_name
)) {
3886 onfinish
->defer(std::move(onfinish
), osdc_errc::snapshot_exists
,
3891 auto op
= new PoolOp
;
3892 op
->tid
= ++last_tid
;
3894 op
->name
= snap_name
;
3895 op
->onfinish
= std::move(onfinish
);
3896 op
->pool_op
= POOL_OP_CREATE_SNAP
;
3897 pool_ops
[op
->tid
] = op
;
3902 struct CB_SelfmanagedSnap
{
3903 std::unique_ptr
<ca::Completion
<void(bs::error_code
, snapid_t
)>> fin
;
3904 CB_SelfmanagedSnap(decltype(fin
)&& fin
)
3905 : fin(std::move(fin
)) {}
3906 void operator()(bs::error_code ec
, const cb::list
& bl
) {
3907 snapid_t snapid
= 0;
3910 auto p
= bl
.cbegin();
3912 } catch (const cb::error
& e
) {
3916 fin
->defer(std::move(fin
), ec
, snapid
);
3920 void Objecter::allocate_selfmanaged_snap(
3922 std::unique_ptr
<ca::Completion
<void(bs::error_code
, snapid_t
)>> onfinish
)
3924 unique_lock
wl(rwlock
);
3925 ldout(cct
, 10) << "allocate_selfmanaged_snap; pool: " << pool
<< dendl
;
3926 auto op
= new PoolOp
;
3927 op
->tid
= ++last_tid
;
3929 op
->onfinish
= PoolOp::OpComp::create(
3930 service
.get_executor(),
3931 CB_SelfmanagedSnap(std::move(onfinish
)));
3932 op
->pool_op
= POOL_OP_CREATE_UNMANAGED_SNAP
;
3933 pool_ops
[op
->tid
] = op
;
3938 void Objecter::delete_pool_snap(
3939 int64_t pool
, std::string_view snap_name
,
3940 decltype(PoolOp::onfinish
)&& onfinish
)
3942 unique_lock
wl(rwlock
);
3943 ldout(cct
, 10) << "delete_pool_snap; pool: " << pool
<< "; snap: "
3944 << snap_name
<< dendl
;
3946 const pg_pool_t
*p
= osdmap
->get_pg_pool(pool
);
3948 onfinish
->defer(std::move(onfinish
), osdc_errc::pool_dne
, cb::list
{});
3952 if (!p
->snap_exists(snap_name
)) {
3953 onfinish
->defer(std::move(onfinish
), osdc_errc::snapshot_dne
, cb::list
{});
3957 auto op
= new PoolOp
;
3958 op
->tid
= ++last_tid
;
3960 op
->name
= snap_name
;
3961 op
->onfinish
= std::move(onfinish
);
3962 op
->pool_op
= POOL_OP_DELETE_SNAP
;
3963 pool_ops
[op
->tid
] = op
;
3968 void Objecter::delete_selfmanaged_snap(int64_t pool
, snapid_t snap
,
3969 decltype(PoolOp::onfinish
)&& onfinish
)
3971 unique_lock
wl(rwlock
);
3972 ldout(cct
, 10) << "delete_selfmanaged_snap; pool: " << pool
<< "; snap: "
3974 auto op
= new PoolOp
;
3975 op
->tid
= ++last_tid
;
3977 op
->onfinish
= std::move(onfinish
);
3978 op
->pool_op
= POOL_OP_DELETE_UNMANAGED_SNAP
;
3980 pool_ops
[op
->tid
] = op
;
3985 void Objecter::create_pool(std::string_view name
,
3986 decltype(PoolOp::onfinish
)&& onfinish
,
3989 unique_lock
wl(rwlock
);
3990 ldout(cct
, 10) << "create_pool name=" << name
<< dendl
;
3992 if (osdmap
->lookup_pg_pool_name(name
) >= 0) {
3993 onfinish
->defer(std::move(onfinish
), osdc_errc::pool_exists
, cb::list
{});
3997 auto op
= new PoolOp
;
3998 op
->tid
= ++last_tid
;
4001 op
->onfinish
= std::move(onfinish
);
4002 op
->pool_op
= POOL_OP_CREATE
;
4003 pool_ops
[op
->tid
] = op
;
4004 op
->crush_rule
= crush_rule
;
4009 void Objecter::delete_pool(int64_t pool
,
4010 decltype(PoolOp::onfinish
)&& onfinish
)
4012 unique_lock
wl(rwlock
);
4013 ldout(cct
, 10) << "delete_pool " << pool
<< dendl
;
4015 if (!osdmap
->have_pg_pool(pool
))
4016 onfinish
->defer(std::move(onfinish
), osdc_errc::pool_dne
, cb::list
{});
4018 _do_delete_pool(pool
, std::move(onfinish
));
4021 void Objecter::delete_pool(std::string_view pool_name
,
4022 decltype(PoolOp::onfinish
)&& onfinish
)
4024 unique_lock
wl(rwlock
);
4025 ldout(cct
, 10) << "delete_pool " << pool_name
<< dendl
;
4027 int64_t pool
= osdmap
->lookup_pg_pool_name(pool_name
);
4029 // This only returns one error: -ENOENT.
4030 onfinish
->defer(std::move(onfinish
), osdc_errc::pool_dne
, cb::list
{});
4032 _do_delete_pool(pool
, std::move(onfinish
));
4035 void Objecter::_do_delete_pool(int64_t pool
,
4036 decltype(PoolOp::onfinish
)&& onfinish
)
4039 auto op
= new PoolOp
;
4040 op
->tid
= ++last_tid
;
4042 op
->name
= "delete";
4043 op
->onfinish
= std::move(onfinish
);
4044 op
->pool_op
= POOL_OP_DELETE
;
4045 pool_ops
[op
->tid
] = op
;
4049 void Objecter::pool_op_submit(PoolOp
*op
)
4052 if (mon_timeout
> timespan(0)) {
4053 op
->ontimeout
= timer
.add_event(mon_timeout
,
4055 pool_op_cancel(op
->tid
, -ETIMEDOUT
); });
4057 _pool_op_submit(op
);
4060 void Objecter::_pool_op_submit(PoolOp
*op
)
4062 // rwlock is locked unique
4064 ldout(cct
, 10) << "pool_op_submit " << op
->tid
<< dendl
;
4065 auto m
= new MPoolOp(monc
->get_fsid(), op
->tid
, op
->pool
,
4066 op
->name
, op
->pool_op
,
4067 last_seen_osdmap_version
);
4068 if (op
->snapid
) m
->snapid
= op
->snapid
;
4069 if (op
->crush_rule
) m
->crush_rule
= op
->crush_rule
;
4070 monc
->send_mon_message(m
);
4071 op
->last_submit
= ceph::coarse_mono_clock::now();
4073 logger
->inc(l_osdc_poolop_send
);
4077 * Handle a reply to a PoolOp message. Check that we sent the message
4078 * and give the caller responsibility for the returned cb::list.
4079 * Then either call the finisher or stash the PoolOp, depending on if we
4080 * have a new enough map.
4081 * Lastly, clean up the message and PoolOp.
4083 void Objecter::handle_pool_op_reply(MPoolOpReply
*m
)
4085 int rc
= m
->replyCode
;
4086 auto ec
= rc
< 0 ? bs::error_code(-rc
, mon_category()) : bs::error_code();
4088 shunique_lock
sul(rwlock
, acquire_shared
);
4095 ldout(cct
, 10) << "handle_pool_op_reply " << *m
<< dendl
;
4096 ceph_tid_t tid
= m
->get_tid();
4097 auto iter
= pool_ops
.find(tid
);
4098 if (iter
!= pool_ops
.end()) {
4099 PoolOp
*op
= iter
->second
;
4100 ldout(cct
, 10) << "have request " << tid
<< " at " << op
<< " Op: "
4101 << ceph_pool_op_name(op
->pool_op
) << dendl
;
4102 cb::list bl
{std::move(m
->response_data
)};
4103 if (m
->version
> last_seen_osdmap_version
)
4104 last_seen_osdmap_version
= m
->version
;
4105 if (osdmap
->get_epoch() < m
->epoch
) {
4108 // recheck op existence since we have let go of rwlock
4109 // (for promotion) above.
4110 iter
= pool_ops
.find(tid
);
4111 if (iter
== pool_ops
.end())
4112 goto done
; // op is gone.
4113 if (osdmap
->get_epoch() < m
->epoch
) {
4114 ldout(cct
, 20) << "waiting for client to reach epoch " << m
->epoch
4115 << " before calling back" << dendl
;
4116 _wait_for_new_map(OpCompletion::create(
4117 service
.get_executor(),
4118 [o
= std::move(op
->onfinish
),
4119 bl
= std::move(bl
)](
4120 bs::error_code ec
) mutable {
4121 o
->defer(std::move(o
), ec
, bl
);
4126 // map epoch changed, probably because a MOSDMap message
4127 // sneaked in. Do caller-specified callback now or else
4128 // we lose it forever.
4129 ceph_assert(op
->onfinish
);
4130 op
->onfinish
->defer(std::move(op
->onfinish
), ec
, std::move(bl
));
4133 ceph_assert(op
->onfinish
);
4134 op
->onfinish
->defer(std::move(op
->onfinish
), ec
, std::move(bl
));
4136 op
->onfinish
= nullptr;
4137 if (!sul
.owns_lock()) {
4141 iter
= pool_ops
.find(tid
);
4142 if (iter
!= pool_ops
.end()) {
4143 _finish_pool_op(op
, 0);
4146 ldout(cct
, 10) << "unknown request " << tid
<< dendl
;
4150 // Not strictly necessary, since we'll release it on return.
4153 ldout(cct
, 10) << "done" << dendl
;
4157 int Objecter::pool_op_cancel(ceph_tid_t tid
, int r
)
4159 ceph_assert(initialized
);
4161 unique_lock
wl(rwlock
);
4163 auto it
= pool_ops
.find(tid
);
4164 if (it
== pool_ops
.end()) {
4165 ldout(cct
, 10) << __func__
<< " tid " << tid
<< " dne" << dendl
;
4169 ldout(cct
, 10) << __func__
<< " tid " << tid
<< dendl
;
4171 PoolOp
*op
= it
->second
;
4173 op
->onfinish
->defer(std::move(op
->onfinish
), osdcode(r
), cb::list
{});
4175 _finish_pool_op(op
, r
);
4179 void Objecter::_finish_pool_op(PoolOp
*op
, int r
)
4181 // rwlock is locked unique
4182 pool_ops
.erase(op
->tid
);
4183 logger
->set(l_osdc_poolop_active
, pool_ops
.size());
4185 if (op
->ontimeout
&& r
!= -ETIMEDOUT
) {
4186 timer
.cancel_event(op
->ontimeout
);
4194 void Objecter::get_pool_stats(
4195 const std::vector
<std::string
>& pools
,
4196 decltype(PoolStatOp::onfinish
)&& onfinish
)
4198 ldout(cct
, 10) << "get_pool_stats " << pools
<< dendl
;
4200 auto op
= new PoolStatOp
;
4201 op
->tid
= ++last_tid
;
4203 op
->onfinish
= std::move(onfinish
);
4204 if (mon_timeout
> timespan(0)) {
4205 op
->ontimeout
= timer
.add_event(mon_timeout
,
4207 pool_stat_op_cancel(op
->tid
,
4213 unique_lock
wl(rwlock
);
4215 poolstat_ops
[op
->tid
] = op
;
4217 logger
->set(l_osdc_poolstat_active
, poolstat_ops
.size());
4219 _poolstat_submit(op
);
4222 void Objecter::_poolstat_submit(PoolStatOp
*op
)
4224 ldout(cct
, 10) << "_poolstat_submit " << op
->tid
<< dendl
;
4225 monc
->send_mon_message(new MGetPoolStats(monc
->get_fsid(), op
->tid
,
4227 last_seen_pgmap_version
));
4228 op
->last_submit
= ceph::coarse_mono_clock::now();
4230 logger
->inc(l_osdc_poolstat_send
);
4233 void Objecter::handle_get_pool_stats_reply(MGetPoolStatsReply
*m
)
4235 ldout(cct
, 10) << "handle_get_pool_stats_reply " << *m
<< dendl
;
4236 ceph_tid_t tid
= m
->get_tid();
4238 unique_lock
wl(rwlock
);
4244 auto iter
= poolstat_ops
.find(tid
);
4245 if (iter
!= poolstat_ops
.end()) {
4246 PoolStatOp
*op
= poolstat_ops
[tid
];
4247 ldout(cct
, 10) << "have request " << tid
<< " at " << op
<< dendl
;
4248 if (m
->version
> last_seen_pgmap_version
) {
4249 last_seen_pgmap_version
= m
->version
;
4251 op
->onfinish
->defer(std::move(op
->onfinish
), bs::error_code
{},
4252 std::move(m
->pool_stats
), m
->per_pool
);
4253 _finish_pool_stat_op(op
, 0);
4255 ldout(cct
, 10) << "unknown request " << tid
<< dendl
;
4257 ldout(cct
, 10) << "done" << dendl
;
4261 int Objecter::pool_stat_op_cancel(ceph_tid_t tid
, int r
)
4263 ceph_assert(initialized
);
4265 unique_lock
wl(rwlock
);
4267 auto it
= poolstat_ops
.find(tid
);
4268 if (it
== poolstat_ops
.end()) {
4269 ldout(cct
, 10) << __func__
<< " tid " << tid
<< " dne" << dendl
;
4273 ldout(cct
, 10) << __func__
<< " tid " << tid
<< dendl
;
4275 auto op
= it
->second
;
4277 op
->onfinish
->defer(std::move(op
->onfinish
), osdcode(r
),
4278 bc::flat_map
<std::string
, pool_stat_t
>{}, false);
4279 _finish_pool_stat_op(op
, r
);
4283 void Objecter::_finish_pool_stat_op(PoolStatOp
*op
, int r
)
4285 // rwlock is locked unique
4287 poolstat_ops
.erase(op
->tid
);
4288 logger
->set(l_osdc_poolstat_active
, poolstat_ops
.size());
4290 if (op
->ontimeout
&& r
!= -ETIMEDOUT
)
4291 timer
.cancel_event(op
->ontimeout
);
4296 void Objecter::get_fs_stats(std::optional
<int64_t> poolid
,
4297 decltype(StatfsOp::onfinish
)&& onfinish
)
4299 ldout(cct
, 10) << "get_fs_stats" << dendl
;
4300 unique_lock
l(rwlock
);
4302 auto op
= new StatfsOp
;
4303 op
->tid
= ++last_tid
;
4304 op
->data_pool
= poolid
;
4305 op
->onfinish
= std::move(onfinish
);
4306 if (mon_timeout
> timespan(0)) {
4307 op
->ontimeout
= timer
.add_event(mon_timeout
,
4309 statfs_op_cancel(op
->tid
,
4314 statfs_ops
[op
->tid
] = op
;
4316 logger
->set(l_osdc_statfs_active
, statfs_ops
.size());
4318 _fs_stats_submit(op
);
4321 void Objecter::_fs_stats_submit(StatfsOp
*op
)
4323 // rwlock is locked unique
4325 ldout(cct
, 10) << "fs_stats_submit" << op
->tid
<< dendl
;
4326 monc
->send_mon_message(new MStatfs(monc
->get_fsid(), op
->tid
,
4328 last_seen_pgmap_version
));
4329 op
->last_submit
= ceph::coarse_mono_clock::now();
4331 logger
->inc(l_osdc_statfs_send
);
4334 void Objecter::handle_fs_stats_reply(MStatfsReply
*m
)
4336 unique_lock
wl(rwlock
);
4342 ldout(cct
, 10) << "handle_fs_stats_reply " << *m
<< dendl
;
4343 ceph_tid_t tid
= m
->get_tid();
4345 if (statfs_ops
.count(tid
)) {
4346 StatfsOp
*op
= statfs_ops
[tid
];
4347 ldout(cct
, 10) << "have request " << tid
<< " at " << op
<< dendl
;
4348 if (m
->h
.version
> last_seen_pgmap_version
)
4349 last_seen_pgmap_version
= m
->h
.version
;
4350 op
->onfinish
->defer(std::move(op
->onfinish
), bs::error_code
{}, m
->h
.st
);
4351 _finish_statfs_op(op
, 0);
4353 ldout(cct
, 10) << "unknown request " << tid
<< dendl
;
4356 ldout(cct
, 10) << "done" << dendl
;
4359 int Objecter::statfs_op_cancel(ceph_tid_t tid
, int r
)
4361 ceph_assert(initialized
);
4363 unique_lock
wl(rwlock
);
4365 auto it
= statfs_ops
.find(tid
);
4366 if (it
== statfs_ops
.end()) {
4367 ldout(cct
, 10) << __func__
<< " tid " << tid
<< " dne" << dendl
;
4371 ldout(cct
, 10) << __func__
<< " tid " << tid
<< dendl
;
4373 auto op
= it
->second
;
4375 op
->onfinish
->defer(std::move(op
->onfinish
), osdcode(r
), ceph_statfs
{});
4376 _finish_statfs_op(op
, r
);
4380 void Objecter::_finish_statfs_op(StatfsOp
*op
, int r
)
4382 // rwlock is locked unique
4384 statfs_ops
.erase(op
->tid
);
4385 logger
->set(l_osdc_statfs_active
, statfs_ops
.size());
4387 if (op
->ontimeout
&& r
!= -ETIMEDOUT
)
4388 timer
.cancel_event(op
->ontimeout
);
4395 void Objecter::_sg_read_finish(vector
<ObjectExtent
>& extents
,
4396 vector
<cb::list
>& resultbl
,
4397 cb::list
*bl
, Context
*onfinish
)
4400 ldout(cct
, 15) << "_sg_read_finish" << dendl
;
4402 if (extents
.size() > 1) {
4403 Striper::StripedReadResult r
;
4404 auto bit
= resultbl
.begin();
4405 for (auto eit
= extents
.begin();
4406 eit
!= extents
.end();
4408 r
.add_partial_result(cct
, *bit
, eit
->buffer_extents
);
4411 r
.assemble_result(cct
, *bl
, false);
4413 ldout(cct
, 15) << " only one frag" << dendl
;
4414 *bl
= std::move(resultbl
[0]);
4418 uint64_t bytes_read
= bl
->length();
4419 ldout(cct
, 7) << "_sg_read_finish " << bytes_read
<< " bytes" << dendl
;
4422 onfinish
->complete(bytes_read
);// > 0 ? bytes_read:m->get_result());
4427 void Objecter::ms_handle_connect(Connection
*con
)
4429 ldout(cct
, 10) << "ms_handle_connect " << con
<< dendl
;
4433 if (con
->get_peer_type() == CEPH_ENTITY_TYPE_MON
)
4437 bool Objecter::ms_handle_reset(Connection
*con
)
4441 if (con
->get_peer_type() == CEPH_ENTITY_TYPE_OSD
) {
4442 unique_lock
wl(rwlock
);
4444 auto priv
= con
->get_priv();
4445 auto session
= static_cast<OSDSession
*>(priv
.get());
4447 ldout(cct
, 1) << "ms_handle_reset " << con
<< " session " << session
4448 << " osd." << session
->osd
<< dendl
;
4449 // the session maybe had been closed if new osdmap just handled
4450 // says the osd down
4451 if (!(initialized
&& osdmap
->is_up(session
->osd
))) {
4452 ldout(cct
, 1) << "ms_handle_reset aborted,initialized=" << initialized
<< dendl
;
4456 map
<uint64_t, LingerOp
*> lresend
;
4457 unique_lock
sl(session
->lock
);
4458 _reopen_session(session
);
4459 _kick_requests(session
, lresend
);
4461 _linger_ops_resend(lresend
, wl
);
4463 maybe_request_map();
4470 void Objecter::ms_handle_remote_reset(Connection
*con
)
4473 * treat these the same.
4475 ms_handle_reset(con
);
4478 bool Objecter::ms_handle_refused(Connection
*con
)
4481 if (osdmap
&& (con
->get_peer_type() == CEPH_ENTITY_TYPE_OSD
)) {
4482 int osd
= osdmap
->identify_osd(con
->get_peer_addr());
4484 ldout(cct
, 1) << "ms_handle_refused on osd." << osd
<< dendl
;
4490 void Objecter::op_target_t::dump(Formatter
*f
) const
4492 f
->dump_stream("pg") << pgid
;
4493 f
->dump_int("osd", osd
);
4494 f
->dump_stream("object_id") << base_oid
;
4495 f
->dump_stream("object_locator") << base_oloc
;
4496 f
->dump_stream("target_object_id") << target_oid
;
4497 f
->dump_stream("target_object_locator") << target_oloc
;
4498 f
->dump_int("paused", (int)paused
);
4499 f
->dump_int("used_replica", (int)used_replica
);
4500 f
->dump_int("precalc_pgid", (int)precalc_pgid
);
4503 void Objecter::_dump_active(OSDSession
*s
)
4505 for (auto p
= s
->ops
.begin(); p
!= s
->ops
.end(); ++p
) {
4507 ldout(cct
, 20) << op
->tid
<< "\t" << op
->target
.pgid
4508 << "\tosd." << (op
->session
? op
->session
->osd
: -1)
4509 << "\t" << op
->target
.base_oid
4510 << "\t" << op
->ops
<< dendl
;
4514 void Objecter::_dump_active()
4516 ldout(cct
, 20) << "dump_active .. " << num_homeless_ops
<< " homeless"
4518 for (auto siter
= osd_sessions
.begin();
4519 siter
!= osd_sessions
.end(); ++siter
) {
4520 auto s
= siter
->second
;
4521 shared_lock
sl(s
->lock
);
4525 _dump_active(homeless_session
);
4528 void Objecter::dump_active()
4530 shared_lock
rl(rwlock
);
4535 void Objecter::dump_requests(Formatter
*fmt
)
4537 // Read-lock on Objecter held here
4538 fmt
->open_object_section("requests");
4540 dump_linger_ops(fmt
);
4542 dump_pool_stat_ops(fmt
);
4543 dump_statfs_ops(fmt
);
4544 dump_command_ops(fmt
);
4545 fmt
->close_section(); // requests object
4548 void Objecter::_dump_ops(const OSDSession
*s
, Formatter
*fmt
)
4550 for (auto p
= s
->ops
.begin(); p
!= s
->ops
.end(); ++p
) {
4552 auto age
= std::chrono::duration
<double>(ceph::coarse_mono_clock::now() - op
->stamp
);
4553 fmt
->open_object_section("op");
4554 fmt
->dump_unsigned("tid", op
->tid
);
4555 op
->target
.dump(fmt
);
4556 fmt
->dump_stream("last_sent") << op
->stamp
;
4557 fmt
->dump_float("age", age
.count());
4558 fmt
->dump_int("attempts", op
->attempts
);
4559 fmt
->dump_stream("snapid") << op
->snapid
;
4560 fmt
->dump_stream("snap_context") << op
->snapc
;
4561 fmt
->dump_stream("mtime") << op
->mtime
;
4563 fmt
->open_array_section("osd_ops");
4564 for (auto it
= op
->ops
.begin(); it
!= op
->ops
.end(); ++it
) {
4565 fmt
->dump_stream("osd_op") << *it
;
4567 fmt
->close_section(); // osd_ops array
4569 fmt
->close_section(); // op object
4573 void Objecter::dump_ops(Formatter
*fmt
)
4575 // Read-lock on Objecter held
4576 fmt
->open_array_section("ops");
4577 for (auto siter
= osd_sessions
.begin();
4578 siter
!= osd_sessions
.end(); ++siter
) {
4579 OSDSession
*s
= siter
->second
;
4580 shared_lock
sl(s
->lock
);
4584 _dump_ops(homeless_session
, fmt
);
4585 fmt
->close_section(); // ops array
4588 void Objecter::_dump_linger_ops(const OSDSession
*s
, Formatter
*fmt
)
4590 for (auto p
= s
->linger_ops
.begin(); p
!= s
->linger_ops
.end(); ++p
) {
4591 auto op
= p
->second
;
4592 fmt
->open_object_section("linger_op");
4593 fmt
->dump_unsigned("linger_id", op
->linger_id
);
4594 op
->target
.dump(fmt
);
4595 fmt
->dump_stream("snapid") << op
->snap
;
4596 fmt
->dump_stream("registered") << op
->registered
;
4597 fmt
->close_section(); // linger_op object
4601 void Objecter::dump_linger_ops(Formatter
*fmt
)
4603 // We have a read-lock on the objecter
4604 fmt
->open_array_section("linger_ops");
4605 for (auto siter
= osd_sessions
.begin();
4606 siter
!= osd_sessions
.end(); ++siter
) {
4607 auto s
= siter
->second
;
4608 shared_lock
sl(s
->lock
);
4609 _dump_linger_ops(s
, fmt
);
4612 _dump_linger_ops(homeless_session
, fmt
);
4613 fmt
->close_section(); // linger_ops array
4616 void Objecter::_dump_command_ops(const OSDSession
*s
, Formatter
*fmt
)
4618 for (auto p
= s
->command_ops
.begin(); p
!= s
->command_ops
.end(); ++p
) {
4619 auto op
= p
->second
;
4620 fmt
->open_object_section("command_op");
4621 fmt
->dump_unsigned("command_id", op
->tid
);
4622 fmt
->dump_int("osd", op
->session
? op
->session
->osd
: -1);
4623 fmt
->open_array_section("command");
4624 for (auto q
= op
->cmd
.begin(); q
!= op
->cmd
.end(); ++q
)
4625 fmt
->dump_string("word", *q
);
4626 fmt
->close_section();
4627 if (op
->target_osd
>= 0)
4628 fmt
->dump_int("target_osd", op
->target_osd
);
4630 fmt
->dump_stream("target_pg") << op
->target_pg
;
4631 fmt
->close_section(); // command_op object
4635 void Objecter::dump_command_ops(Formatter
*fmt
)
4637 // We have a read-lock on the Objecter here
4638 fmt
->open_array_section("command_ops");
4639 for (auto siter
= osd_sessions
.begin();
4640 siter
!= osd_sessions
.end(); ++siter
) {
4641 auto s
= siter
->second
;
4642 shared_lock
sl(s
->lock
);
4643 _dump_command_ops(s
, fmt
);
4646 _dump_command_ops(homeless_session
, fmt
);
4647 fmt
->close_section(); // command_ops array
4650 void Objecter::dump_pool_ops(Formatter
*fmt
) const
4652 fmt
->open_array_section("pool_ops");
4653 for (auto p
= pool_ops
.begin(); p
!= pool_ops
.end(); ++p
) {
4654 auto op
= p
->second
;
4655 fmt
->open_object_section("pool_op");
4656 fmt
->dump_unsigned("tid", op
->tid
);
4657 fmt
->dump_int("pool", op
->pool
);
4658 fmt
->dump_string("name", op
->name
);
4659 fmt
->dump_int("operation_type", op
->pool_op
);
4660 fmt
->dump_unsigned("crush_rule", op
->crush_rule
);
4661 fmt
->dump_stream("snapid") << op
->snapid
;
4662 fmt
->dump_stream("last_sent") << op
->last_submit
;
4663 fmt
->close_section(); // pool_op object
4665 fmt
->close_section(); // pool_ops array
4668 void Objecter::dump_pool_stat_ops(Formatter
*fmt
) const
4670 fmt
->open_array_section("pool_stat_ops");
4671 for (auto p
= poolstat_ops
.begin();
4672 p
!= poolstat_ops
.end();
4674 PoolStatOp
*op
= p
->second
;
4675 fmt
->open_object_section("pool_stat_op");
4676 fmt
->dump_unsigned("tid", op
->tid
);
4677 fmt
->dump_stream("last_sent") << op
->last_submit
;
4679 fmt
->open_array_section("pools");
4680 for (const auto& it
: op
->pools
) {
4681 fmt
->dump_string("pool", it
);
4683 fmt
->close_section(); // pools array
4685 fmt
->close_section(); // pool_stat_op object
4687 fmt
->close_section(); // pool_stat_ops array
4690 void Objecter::dump_statfs_ops(Formatter
*fmt
) const
4692 fmt
->open_array_section("statfs_ops");
4693 for (auto p
= statfs_ops
.begin(); p
!= statfs_ops
.end(); ++p
) {
4694 auto op
= p
->second
;
4695 fmt
->open_object_section("statfs_op");
4696 fmt
->dump_unsigned("tid", op
->tid
);
4697 fmt
->dump_stream("last_sent") << op
->last_submit
;
4698 fmt
->close_section(); // statfs_op object
4700 fmt
->close_section(); // statfs_ops array
4703 Objecter::RequestStateHook::RequestStateHook(Objecter
*objecter
) :
4704 m_objecter(objecter
)
4708 int Objecter::RequestStateHook::call(std::string_view command
,
4709 const cmdmap_t
& cmdmap
,
4714 shared_lock
rl(m_objecter
->rwlock
);
4715 m_objecter
->dump_requests(f
);
4719 void Objecter::blocklist_self(bool set
)
4721 ldout(cct
, 10) << "blocklist_self " << (set
? "add" : "rm") << dendl
;
4724 cmd
.push_back("{\"prefix\":\"osd blocklist\", ");
4726 cmd
.push_back("\"blocklistop\":\"add\",");
4728 cmd
.push_back("\"blocklistop\":\"rm\",");
4730 // this is somewhat imprecise in that we are blocklisting our first addr only
4731 ss
<< messenger
->get_myaddrs().front().get_legacy_str();
4732 cmd
.push_back("\"addr\":\"" + ss
.str() + "\"");
4734 auto m
= new MMonCommand(monc
->get_fsid());
4737 // NOTE: no fallback to legacy blacklist command implemented here
4738 // since this is only used for test code.
4740 monc
->send_mon_message(m
);
4745 void Objecter::handle_command_reply(MCommandReply
*m
)
4747 unique_lock
wl(rwlock
);
4753 ConnectionRef con
= m
->get_connection();
4754 auto priv
= con
->get_priv();
4755 auto s
= static_cast<OSDSession
*>(priv
.get());
4756 if (!s
|| s
->con
!= con
) {
4757 ldout(cct
, 7) << __func__
<< " no session on con " << con
<< dendl
;
4762 shared_lock
sl(s
->lock
);
4763 auto p
= s
->command_ops
.find(m
->get_tid());
4764 if (p
== s
->command_ops
.end()) {
4765 ldout(cct
, 10) << "handle_command_reply tid " << m
->get_tid()
4766 << " not found" << dendl
;
4772 CommandOp
*c
= p
->second
;
4774 m
->get_connection() != c
->session
->con
) {
4775 ldout(cct
, 10) << "handle_command_reply tid " << m
->get_tid()
4776 << " got reply from wrong connection "
4777 << m
->get_connection() << " " << m
->get_source_inst()
4784 if (m
->r
== -EAGAIN
) {
4785 ldout(cct
,10) << __func__
<< " tid " << m
->get_tid()
4786 << " got EAGAIN, requesting map and resending" << dendl
;
4787 // NOTE: This might resend twice... once now, and once again when
4788 // we get an updated osdmap and the PG is found to have moved.
4789 _maybe_request_map();
4798 unique_lock
sul(s
->lock
);
4799 _finish_command(c
, m
->r
< 0 ? bs::error_code(-m
->r
, osd_category()) :
4800 bs::error_code(), std::move(m
->rs
),
4801 std::move(m
->get_data()));
4807 Objecter::LingerOp::LingerOp(Objecter
*o
, uint64_t linger_id
)
4809 linger_id(linger_id
),
4810 watch_lock(ceph::make_shared_mutex(
4811 fmt::format("LingerOp::watch_lock #{}", linger_id
)))
4814 void Objecter::submit_command(CommandOp
*c
, ceph_tid_t
*ptid
)
4816 shunique_lock
sul(rwlock
, ceph::acquire_unique
);
4818 ceph_tid_t tid
= ++last_tid
;
4819 ldout(cct
, 10) << "_submit_command " << tid
<< " " << c
->cmd
<< dendl
;
4823 unique_lock
hs_wl(homeless_session
->lock
);
4824 _session_command_op_assign(homeless_session
, c
);
4827 _calc_command_target(c
, sul
);
4828 _assign_command_session(c
, sul
);
4829 if (osd_timeout
> timespan(0)) {
4830 c
->ontimeout
= timer
.add_event(osd_timeout
,
4834 osdc_errc::timed_out
); });
4837 if (!c
->session
->is_homeless()) {
4840 _maybe_request_map();
4842 if (c
->map_check_error
)
4843 _send_command_map_check(c
);
4847 logger
->inc(l_osdc_command_active
);
4850 int Objecter::_calc_command_target(CommandOp
*c
,
4851 shunique_lock
<ceph::shared_mutex
>& sul
)
4853 ceph_assert(sul
.owns_lock() && sul
.mutex() == &rwlock
);
4855 c
->map_check_error
= 0;
4857 // ignore overlays, just like we do with pg ops
4858 c
->target
.flags
|= CEPH_OSD_FLAG_IGNORE_OVERLAY
;
4860 if (c
->target_osd
>= 0) {
4861 if (!osdmap
->exists(c
->target_osd
)) {
4862 c
->map_check_error
= -ENOENT
;
4863 c
->map_check_error_str
= "osd dne";
4865 return RECALC_OP_TARGET_OSD_DNE
;
4867 if (osdmap
->is_down(c
->target_osd
)) {
4868 c
->map_check_error
= -ENXIO
;
4869 c
->map_check_error_str
= "osd down";
4871 return RECALC_OP_TARGET_OSD_DOWN
;
4873 c
->target
.osd
= c
->target_osd
;
4875 int ret
= _calc_target(&(c
->target
), nullptr, true);
4876 if (ret
== RECALC_OP_TARGET_POOL_DNE
) {
4877 c
->map_check_error
= -ENOENT
;
4878 c
->map_check_error_str
= "pool dne";
4881 } else if (ret
== RECALC_OP_TARGET_OSD_DOWN
) {
4882 c
->map_check_error
= -ENXIO
;
4883 c
->map_check_error_str
= "osd down";
4890 int r
= _get_session(c
->target
.osd
, &s
, sul
);
4891 ceph_assert(r
!= -EAGAIN
); /* shouldn't happen as we're holding the write lock */
4893 if (c
->session
!= s
) {
4895 return RECALC_OP_TARGET_NEED_RESEND
;
4900 ldout(cct
, 20) << "_recalc_command_target " << c
->tid
<< " no change, "
4901 << c
->session
<< dendl
;
4903 return RECALC_OP_TARGET_NO_ACTION
;
4906 void Objecter::_assign_command_session(CommandOp
*c
,
4907 shunique_lock
<ceph::shared_mutex
>& sul
)
4909 ceph_assert(sul
.owns_lock() && sul
.mutex() == &rwlock
);
4912 int r
= _get_session(c
->target
.osd
, &s
, sul
);
4913 ceph_assert(r
!= -EAGAIN
); /* shouldn't happen as we're holding the write lock */
4915 if (c
->session
!= s
) {
4917 OSDSession
*cs
= c
->session
;
4918 unique_lock
csl(cs
->lock
);
4919 _session_command_op_remove(c
->session
, c
);
4922 unique_lock
sl(s
->lock
);
4923 _session_command_op_assign(s
, c
);
4929 void Objecter::_send_command(CommandOp
*c
)
4931 ldout(cct
, 10) << "_send_command " << c
->tid
<< dendl
;
4932 ceph_assert(c
->session
);
4933 ceph_assert(c
->session
->con
);
4934 auto m
= new MCommand(monc
->monmap
.fsid
);
4936 m
->set_data(c
->inbl
);
4938 c
->session
->con
->send_message(m
);
4939 logger
->inc(l_osdc_command_send
);
4942 int Objecter::command_op_cancel(OSDSession
*s
, ceph_tid_t tid
,
4945 ceph_assert(initialized
);
4947 unique_lock
wl(rwlock
);
4949 auto it
= s
->command_ops
.find(tid
);
4950 if (it
== s
->command_ops
.end()) {
4951 ldout(cct
, 10) << __func__
<< " tid " << tid
<< " dne" << dendl
;
4955 ldout(cct
, 10) << __func__
<< " tid " << tid
<< dendl
;
4957 CommandOp
*op
= it
->second
;
4958 _command_cancel_map_check(op
);
4959 unique_lock
sl(op
->session
->lock
);
4960 _finish_command(op
, ec
, {}, {});
4965 void Objecter::_finish_command(CommandOp
*c
, bs::error_code ec
,
4966 string
&& rs
, cb::list
&& bl
)
4968 // rwlock is locked unique
4969 // session lock is locked
4971 ldout(cct
, 10) << "_finish_command " << c
->tid
<< " = " << ec
<< " "
4975 c
->onfinish
->defer(std::move(c
->onfinish
), ec
, std::move(rs
), std::move(bl
));
4977 if (c
->ontimeout
&& ec
!= bs::errc::timed_out
)
4978 timer
.cancel_event(c
->ontimeout
);
4980 _session_command_op_remove(c
->session
, c
);
4984 logger
->dec(l_osdc_command_active
);
4987 Objecter::OSDSession::~OSDSession()
4989 // Caller is responsible for re-assigning or
4990 // destroying any ops that were assigned to us
4991 ceph_assert(ops
.empty());
4992 ceph_assert(linger_ops
.empty());
4993 ceph_assert(command_ops
.empty());
4996 Objecter::Objecter(CephContext
*cct
,
4997 Messenger
*m
, MonClient
*mc
,
4998 boost::asio::io_context
& service
) :
4999 Dispatcher(cct
), messenger(m
), monc(mc
), service(service
)
5001 mon_timeout
= cct
->_conf
.get_val
<std::chrono::seconds
>("rados_mon_op_timeout");
5002 osd_timeout
= cct
->_conf
.get_val
<std::chrono::seconds
>("rados_osd_op_timeout");
5005 Objecter::~Objecter()
5007 ceph_assert(homeless_session
->get_nref() == 1);
5008 ceph_assert(num_homeless_ops
== 0);
5009 homeless_session
->put();
5011 ceph_assert(osd_sessions
.empty());
5012 ceph_assert(poolstat_ops
.empty());
5013 ceph_assert(statfs_ops
.empty());
5014 ceph_assert(pool_ops
.empty());
5015 ceph_assert(waiting_for_map
.empty());
5016 ceph_assert(linger_ops
.empty());
5017 ceph_assert(check_latest_map_lingers
.empty());
5018 ceph_assert(check_latest_map_ops
.empty());
5019 ceph_assert(check_latest_map_commands
.empty());
5021 ceph_assert(!m_request_state_hook
);
5022 ceph_assert(!logger
);
5026 * Wait until this OSD map epoch is received before
5027 * sending any more operations to OSDs. Use this
5028 * when it is known that the client can't trust
5029 * anything from before this epoch (e.g. due to
5030 * client blocklist at this epoch).
5032 void Objecter::set_epoch_barrier(epoch_t epoch
)
5034 unique_lock
wl(rwlock
);
5036 ldout(cct
, 7) << __func__
<< ": barrier " << epoch
<< " (was "
5037 << epoch_barrier
<< ") current epoch " << osdmap
->get_epoch()
5039 if (epoch
> epoch_barrier
) {
5040 epoch_barrier
= epoch
;
5041 _maybe_request_map();
5047 hobject_t
Objecter::enumerate_objects_begin()
5052 hobject_t
Objecter::enumerate_objects_end()
5054 return hobject_t::get_max();
5057 template<typename T
>
5058 struct EnumerationContext
{
5060 const hobject_t end
;
5061 const cb::list filter
;
5063 const object_locator_t oloc
;
5066 fu2::unique_function
<void(bs::error_code
,
5068 hobject_t
) &&> on_finish
;
5073 EnumerationContext(Objecter
* objecter
,
5074 hobject_t end
, cb::list filter
,
5075 uint32_t max
, object_locator_t oloc
,
5076 decltype(on_finish
) on_finish
)
5077 : objecter(objecter
), end(std::move(end
)), filter(std::move(filter
)),
5078 max(max
), oloc(std::move(oloc
)), on_finish(std::move(on_finish
)) {}
5080 void operator()(bs::error_code ec
,
5084 objecter
->put_op_budget_bytes(budget
);
5088 std::move(on_finish
)(ec
, std::move(v
), std::move(h
));
5092 template<typename T
>
5093 struct CB_EnumerateReply
{
5097 std::unique_ptr
<EnumerationContext
<T
>> ctx
;
5099 CB_EnumerateReply(Objecter
* objecter
,
5100 std::unique_ptr
<EnumerationContext
<T
>>&& ctx
) :
5101 objecter(objecter
), ctx(std::move(ctx
)) {}
5103 void operator()(bs::error_code ec
) {
5104 objecter
->_enumerate_reply(std::move(bl
), ec
, std::move(ctx
));
5108 template<typename T
>
5109 void Objecter::enumerate_objects(
5111 std::string_view ns
,
5115 const cb::list
& filter_bl
,
5116 fu2::unique_function
<void(bs::error_code
,
5118 hobject_t
) &&> on_finish
) {
5119 if (!end
.is_max() && start
> end
) {
5120 lderr(cct
) << __func__
<< ": start " << start
<< " > end " << end
<< dendl
;
5121 std::move(on_finish
)(osdc_errc::precondition_violated
, {}, {});
5126 lderr(cct
) << __func__
<< ": result size may not be zero" << dendl
;
5127 std::move(on_finish
)(osdc_errc::precondition_violated
, {}, {});
5131 if (start
.is_max()) {
5132 std::move(on_finish
)({}, {}, {});
5136 shared_lock
rl(rwlock
);
5137 ceph_assert(osdmap
->get_epoch());
5138 if (!osdmap
->test_flag(CEPH_OSDMAP_SORTBITWISE
)) {
5140 lderr(cct
) << __func__
<< ": SORTBITWISE cluster flag not set" << dendl
;
5141 std::move(on_finish
)(osdc_errc::not_supported
, {}, {});
5144 const pg_pool_t
* p
= osdmap
->get_pg_pool(pool_id
);
5146 lderr(cct
) << __func__
<< ": pool " << pool_id
<< " DNE in osd epoch "
5147 << osdmap
->get_epoch() << dendl
;
5149 std::move(on_finish
)(osdc_errc::pool_dne
, {}, {});
5155 _issue_enumerate(start
,
5156 std::make_unique
<EnumerationContext
<T
>>(
5157 this, std::move(end
), filter_bl
,
5158 max
, object_locator_t
{pool_id
, ns
},
5159 std::move(on_finish
)));
5163 void Objecter::enumerate_objects
<librados::ListObjectImpl
>(
5165 std::string_view ns
,
5169 const cb::list
& filter_bl
,
5170 fu2::unique_function
<void(bs::error_code
,
5171 std::vector
<librados::ListObjectImpl
>,
5172 hobject_t
) &&> on_finish
);
5175 void Objecter::enumerate_objects
<neorados::Entry
>(
5177 std::string_view ns
,
5181 const cb::list
& filter_bl
,
5182 fu2::unique_function
<void(bs::error_code
,
5183 std::vector
<neorados::Entry
>,
5184 hobject_t
) &&> on_finish
);
5188 template<typename T
>
5189 void Objecter::_issue_enumerate(hobject_t start
,
5190 std::unique_ptr
<EnumerationContext
<T
>> ctx
) {
5193 op
.pg_nls(c
->max
, c
->filter
, start
, osdmap
->get_epoch());
5194 auto on_ack
= std::make_unique
<CB_EnumerateReply
<T
>>(this, std::move(ctx
));
5195 // I hate having to do this. Try to find a cleaner way
5197 auto epoch
= &c
->epoch
;
5198 auto budget
= &c
->budget
;
5199 auto pbl
= &on_ack
->bl
;
5201 // Issue. See you later in _enumerate_reply
5202 pg_read(start
.get_hash(),
5203 c
->oloc
, op
, pbl
, 0,
5204 Op::OpComp::create(service
.get_executor(),
5205 [c
= std::move(on_ack
)]
5206 (bs::error_code ec
) mutable {
5212 void Objecter::_issue_enumerate
<librados::ListObjectImpl
>(
5214 std::unique_ptr
<EnumerationContext
<librados::ListObjectImpl
>> ctx
);
5216 void Objecter::_issue_enumerate
<neorados::Entry
>(
5217 hobject_t start
, std::unique_ptr
<EnumerationContext
<neorados::Entry
>> ctx
);
5219 template<typename T
>
5220 void Objecter::_enumerate_reply(
5223 std::unique_ptr
<EnumerationContext
<T
>>&& ctx
)
5226 std::move(*ctx
)(ec
, {}, {});
5230 // Decode the results
5231 auto iter
= bl
.cbegin();
5232 pg_nls_response_template
<T
> response
;
5235 response
.decode(iter
);
5237 // extra_info isn't used anywhere. We do this solely to preserve
5238 // backward compatibility
5239 cb::list legacy_extra_info
;
5240 decode(legacy_extra_info
, iter
);
5242 } catch (const bs::system_error
& e
) {
5243 std::move(*ctx
)(e
.code(), {}, {});
5247 shared_lock
rl(rwlock
);
5248 auto pool
= osdmap
->get_pg_pool(ctx
->oloc
.get_pool());
5251 // pool is gone, drop any results which are now meaningless.
5252 std::move(*ctx
)(osdc_errc::pool_dne
, {}, {});
5257 if ((response
.handle
<= ctx
->end
)) {
5258 next
= response
.handle
;
5262 // drop anything after 'end'
5263 while (!response
.entries
.empty()) {
5264 uint32_t hash
= response
.entries
.back().locator
.empty() ?
5265 pool
->hash_key(response
.entries
.back().oid
,
5266 response
.entries
.back().nspace
) :
5267 pool
->hash_key(response
.entries
.back().locator
,
5268 response
.entries
.back().nspace
);
5269 hobject_t
last(response
.entries
.back().oid
,
5270 response
.entries
.back().locator
,
5273 ctx
->oloc
.get_pool(),
5274 response
.entries
.back().nspace
);
5275 if (last
< ctx
->end
)
5277 response
.entries
.pop_back();
5281 if (response
.entries
.size() <= ctx
->max
) {
5282 ctx
->max
-= response
.entries
.size();
5283 std::move(response
.entries
.begin(), response
.entries
.end(),
5284 std::back_inserter(ctx
->ls
));
5286 auto i
= response
.entries
.begin();
5287 while (ctx
->max
> 0) {
5288 ctx
->ls
.push_back(std::move(*i
));
5293 i
->locator
.empty() ?
5294 pool
->hash_key(i
->oid
, i
->nspace
) :
5295 pool
->hash_key(i
->locator
, i
->nspace
);
5297 next
= hobject_t
{i
->oid
, i
->locator
,
5300 ctx
->oloc
.get_pool(),
5304 if (next
== ctx
->end
|| ctx
->max
== 0) {
5305 std::move(*ctx
)(ec
, std::move(ctx
->ls
), std::move(next
));
5307 _issue_enumerate(next
, std::move(ctx
));
5312 void Objecter::_enumerate_reply
<librados::ListObjectImpl
>(
5315 std::unique_ptr
<EnumerationContext
<librados::ListObjectImpl
>>&& ctx
);
5318 void Objecter::_enumerate_reply
<neorados::Entry
>(
5321 std::unique_ptr
<EnumerationContext
<neorados::Entry
>>&& ctx
);
5324 using namespace librados
;
5326 template <typename T
>
5327 void do_decode(std::vector
<T
>& items
, std::vector
<cb::list
>& bls
)
5329 for (auto bl
: bls
) {
5330 auto p
= bl
.cbegin();
5337 struct C_ObjectOperation_scrub_ls
: public Context
{
5340 std::vector
<inconsistent_obj_t
> *objects
= nullptr;
5341 std::vector
<inconsistent_snapset_t
> *snapsets
= nullptr;
5344 C_ObjectOperation_scrub_ls(uint32_t* interval
,
5345 std::vector
<inconsistent_obj_t
>* objects
,
5347 : interval(interval
), objects(objects
), rval(rval
) {}
5348 C_ObjectOperation_scrub_ls(uint32_t* interval
,
5349 std::vector
<inconsistent_snapset_t
>* snapsets
,
5351 : interval(interval
), snapsets(snapsets
), rval(rval
) {}
5352 void finish(int r
) override
{
5353 if (r
< 0 && r
!= -EAGAIN
) {
5364 } catch (cb::error
&) {
5371 scrub_ls_result_t result
;
5372 auto p
= bl
.cbegin();
5374 *interval
= result
.interval
;
5376 do_decode(*objects
, result
.vals
);
5378 do_decode(*snapsets
, result
.vals
);
5383 template <typename T
>
5384 void do_scrub_ls(::ObjectOperation
* op
,
5385 const scrub_ls_arg_t
& arg
,
5386 std::vector
<T
> *items
,
5390 OSDOp
& osd_op
= op
->add_op(CEPH_OSD_OP_SCRUBLS
);
5391 op
->flags
|= CEPH_OSD_FLAG_PGOP
;
5392 ceph_assert(interval
);
5393 arg
.encode(osd_op
.indata
);
5394 unsigned p
= op
->ops
.size() - 1;
5395 auto h
= new C_ObjectOperation_scrub_ls
{interval
, items
, rval
};
5397 op
->out_bl
[p
] = &h
->bl
;
5398 op
->out_rval
[p
] = rval
;
5402 void ::ObjectOperation::scrub_ls(const librados::object_id_t
& start_after
,
5403 uint64_t max_to_get
,
5404 std::vector
<librados::inconsistent_obj_t
>* objects
,
5408 scrub_ls_arg_t arg
= {*interval
, 0, start_after
, max_to_get
};
5409 do_scrub_ls(this, arg
, objects
, interval
, rval
);
5412 void ::ObjectOperation::scrub_ls(const librados::object_id_t
& start_after
,
5413 uint64_t max_to_get
,
5414 std::vector
<librados::inconsistent_snapset_t
> *snapsets
,
5418 scrub_ls_arg_t arg
= {*interval
, 1, start_after
, max_to_get
};
5419 do_scrub_ls(this, arg
, snapsets
, interval
, rval
);