1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
19 #include "osd/OSDMap.h"
20 #include "osd/error_code.h"
23 #include "mon/MonClient.h"
24 #include "mon/error_code.h"
26 #include "msg/Messenger.h"
27 #include "msg/Message.h"
29 #include "messages/MPing.h"
30 #include "messages/MOSDOp.h"
31 #include "messages/MOSDOpReply.h"
32 #include "messages/MOSDBackoff.h"
33 #include "messages/MOSDMap.h"
35 #include "messages/MPoolOp.h"
36 #include "messages/MPoolOpReply.h"
38 #include "messages/MGetPoolStats.h"
39 #include "messages/MGetPoolStatsReply.h"
40 #include "messages/MStatfs.h"
41 #include "messages/MStatfsReply.h"
43 #include "messages/MMonCommand.h"
45 #include "messages/MCommand.h"
46 #include "messages/MCommandReply.h"
48 #include "messages/MWatchNotify.h"
51 #include "common/Cond.h"
52 #include "common/config.h"
53 #include "common/perf_counters.h"
54 #include "common/scrub_types.h"
55 #include "include/str_list.h"
56 #include "common/errno.h"
57 #include "common/EventTrace.h"
58 #include "common/async/waiter.h"
59 #include "error_code.h"
66 using std::ostringstream
;
70 using std::stringstream
;
75 using ceph::Formatter
;
77 using std::defer_lock
;
78 using std::scoped_lock
;
79 using std::shared_lock
;
80 using std::unique_lock
;
82 using ceph::real_time
;
83 using ceph::real_clock
;
85 using ceph::mono_clock
;
86 using ceph::mono_time
;
90 using ceph::shunique_lock
;
91 using ceph::acquire_shared
;
92 using ceph::acquire_unique
;
94 namespace bc
= boost::container
;
95 namespace bs
= boost::system
;
96 namespace ca
= ceph::async
;
97 namespace cb
= ceph::buffer
;
99 #define dout_subsys ceph_subsys_objecter
101 #define dout_prefix *_dout << messenger->get_myname() << ".objecter "
105 l_osdc_first
= 123200,
109 l_osdc_op_send_bytes
,
126 l_osdc_osdop_writefull
,
127 l_osdc_osdop_writesame
,
130 l_osdc_osdop_truncate
,
133 l_osdc_osdop_sparse_read
,
134 l_osdc_osdop_clonerange
,
135 l_osdc_osdop_getxattr
,
136 l_osdc_osdop_setxattr
,
137 l_osdc_osdop_cmpxattr
,
138 l_osdc_osdop_rmxattr
,
139 l_osdc_osdop_resetxattrs
,
143 l_osdc_osdop_src_cmpxattr
,
145 l_osdc_osdop_pgls_filter
,
148 l_osdc_linger_active
,
150 l_osdc_linger_resend
,
153 l_osdc_poolop_active
,
155 l_osdc_poolop_resend
,
157 l_osdc_poolstat_active
,
158 l_osdc_poolstat_send
,
159 l_osdc_poolstat_resend
,
161 l_osdc_statfs_active
,
163 l_osdc_statfs_resend
,
165 l_osdc_command_active
,
167 l_osdc_command_resend
,
174 l_osdc_osd_session_open
,
175 l_osdc_osd_session_close
,
178 l_osdc_osdop_omap_wr
,
179 l_osdc_osdop_omap_rd
,
180 l_osdc_osdop_omap_del
,
186 inline bs::error_code
osdcode(int r
) {
187 return (r
< 0) ? bs::error_code(-r
, osd_category()) : bs::error_code();
191 // config obs ----------------------------
193 class Objecter::RequestStateHook
: public AdminSocketHook
{
194 Objecter
*m_objecter
;
196 explicit RequestStateHook(Objecter
*objecter
);
197 int call(std::string_view command
, const cmdmap_t
& cmdmap
,
201 cb::list
& out
) override
;
204 std::unique_lock
<std::mutex
> Objecter::OSDSession::get_lock(object_t
& oid
)
206 if (oid
.name
.empty())
209 static constexpr uint32_t HASH_PRIME
= 1021;
210 uint32_t h
= ceph_str_hash_linux(oid
.name
.c_str(), oid
.name
.size())
213 return {completion_locks
[h
% num_locks
], std::defer_lock
};
216 const char** Objecter::get_tracked_conf_keys() const
218 static const char *config_keys
[] = {
220 "rados_mon_op_timeout",
221 "rados_osd_op_timeout",
228 void Objecter::handle_conf_change(const ConfigProxy
& conf
,
229 const std::set
<std::string
> &changed
)
231 if (changed
.count("crush_location")) {
232 update_crush_location();
234 if (changed
.count("rados_mon_op_timeout")) {
235 mon_timeout
= conf
.get_val
<std::chrono::seconds
>("rados_mon_op_timeout");
237 if (changed
.count("rados_osd_op_timeout")) {
238 osd_timeout
= conf
.get_val
<std::chrono::seconds
>("rados_osd_op_timeout");
242 void Objecter::update_crush_location()
244 unique_lock
wl(rwlock
);
245 crush_location
= cct
->crush_location
.get_location();
248 // messages ------------------------------
251 * initialize only internal data structures, don't initiate cluster interaction
253 void Objecter::init()
255 ceph_assert(!initialized
);
258 PerfCountersBuilder
pcb(cct
, "objecter", l_osdc_first
, l_osdc_last
);
260 pcb
.add_u64(l_osdc_op_active
, "op_active", "Operations active", "actv",
261 PerfCountersBuilder::PRIO_CRITICAL
);
262 pcb
.add_u64(l_osdc_op_laggy
, "op_laggy", "Laggy operations");
263 pcb
.add_u64_counter(l_osdc_op_send
, "op_send", "Sent operations");
264 pcb
.add_u64_counter(l_osdc_op_send_bytes
, "op_send_bytes", "Sent data", NULL
, 0, unit_t(UNIT_BYTES
));
265 pcb
.add_u64_counter(l_osdc_op_resend
, "op_resend", "Resent operations");
266 pcb
.add_u64_counter(l_osdc_op_reply
, "op_reply", "Operation reply");
267 pcb
.add_time_avg(l_osdc_op_latency
, "op_latency", "Operation latency");
268 pcb
.add_u64(l_osdc_op_inflight
, "op_inflight", "Operations in flight");
269 pcb
.add_u64_avg(l_osdc_oplen_avg
, "oplen_avg", "Average length of operation vector");
271 pcb
.add_u64_counter(l_osdc_op
, "op", "Operations");
272 pcb
.add_u64_counter(l_osdc_op_r
, "op_r", "Read operations", "rd",
273 PerfCountersBuilder::PRIO_CRITICAL
);
274 pcb
.add_u64_counter(l_osdc_op_w
, "op_w", "Write operations", "wr",
275 PerfCountersBuilder::PRIO_CRITICAL
);
276 pcb
.add_u64_counter(l_osdc_op_rmw
, "op_rmw", "Read-modify-write operations",
277 "rdwr", PerfCountersBuilder::PRIO_INTERESTING
);
278 pcb
.add_u64_counter(l_osdc_op_pg
, "op_pg", "PG operation");
280 pcb
.add_u64_counter(l_osdc_osdop_stat
, "osdop_stat", "Stat operations");
281 pcb
.add_u64_counter(l_osdc_osdop_create
, "osdop_create",
282 "Create object operations");
283 pcb
.add_u64_counter(l_osdc_osdop_read
, "osdop_read", "Read operations");
284 pcb
.add_u64_counter(l_osdc_osdop_write
, "osdop_write", "Write operations");
285 pcb
.add_u64_counter(l_osdc_osdop_writefull
, "osdop_writefull",
286 "Write full object operations");
287 pcb
.add_u64_counter(l_osdc_osdop_writesame
, "osdop_writesame",
288 "Write same operations");
289 pcb
.add_u64_counter(l_osdc_osdop_append
, "osdop_append",
291 pcb
.add_u64_counter(l_osdc_osdop_zero
, "osdop_zero",
292 "Set object to zero operations");
293 pcb
.add_u64_counter(l_osdc_osdop_truncate
, "osdop_truncate",
294 "Truncate object operations");
295 pcb
.add_u64_counter(l_osdc_osdop_delete
, "osdop_delete",
296 "Delete object operations");
297 pcb
.add_u64_counter(l_osdc_osdop_mapext
, "osdop_mapext",
298 "Map extent operations");
299 pcb
.add_u64_counter(l_osdc_osdop_sparse_read
, "osdop_sparse_read",
300 "Sparse read operations");
301 pcb
.add_u64_counter(l_osdc_osdop_clonerange
, "osdop_clonerange",
302 "Clone range operations");
303 pcb
.add_u64_counter(l_osdc_osdop_getxattr
, "osdop_getxattr",
304 "Get xattr operations");
305 pcb
.add_u64_counter(l_osdc_osdop_setxattr
, "osdop_setxattr",
306 "Set xattr operations");
307 pcb
.add_u64_counter(l_osdc_osdop_cmpxattr
, "osdop_cmpxattr",
308 "Xattr comparison operations");
309 pcb
.add_u64_counter(l_osdc_osdop_rmxattr
, "osdop_rmxattr",
310 "Remove xattr operations");
311 pcb
.add_u64_counter(l_osdc_osdop_resetxattrs
, "osdop_resetxattrs",
312 "Reset xattr operations");
313 pcb
.add_u64_counter(l_osdc_osdop_call
, "osdop_call",
314 "Call (execute) operations");
315 pcb
.add_u64_counter(l_osdc_osdop_watch
, "osdop_watch",
316 "Watch by object operations");
317 pcb
.add_u64_counter(l_osdc_osdop_notify
, "osdop_notify",
318 "Notify about object operations");
319 pcb
.add_u64_counter(l_osdc_osdop_src_cmpxattr
, "osdop_src_cmpxattr",
320 "Extended attribute comparison in multi operations");
321 pcb
.add_u64_counter(l_osdc_osdop_pgls
, "osdop_pgls");
322 pcb
.add_u64_counter(l_osdc_osdop_pgls_filter
, "osdop_pgls_filter");
323 pcb
.add_u64_counter(l_osdc_osdop_other
, "osdop_other", "Other operations");
325 pcb
.add_u64(l_osdc_linger_active
, "linger_active",
326 "Active lingering operations");
327 pcb
.add_u64_counter(l_osdc_linger_send
, "linger_send",
328 "Sent lingering operations");
329 pcb
.add_u64_counter(l_osdc_linger_resend
, "linger_resend",
330 "Resent lingering operations");
331 pcb
.add_u64_counter(l_osdc_linger_ping
, "linger_ping",
332 "Sent pings to lingering operations");
334 pcb
.add_u64(l_osdc_poolop_active
, "poolop_active",
335 "Active pool operations");
336 pcb
.add_u64_counter(l_osdc_poolop_send
, "poolop_send",
337 "Sent pool operations");
338 pcb
.add_u64_counter(l_osdc_poolop_resend
, "poolop_resend",
339 "Resent pool operations");
341 pcb
.add_u64(l_osdc_poolstat_active
, "poolstat_active",
342 "Active get pool stat operations");
343 pcb
.add_u64_counter(l_osdc_poolstat_send
, "poolstat_send",
344 "Pool stat operations sent");
345 pcb
.add_u64_counter(l_osdc_poolstat_resend
, "poolstat_resend",
346 "Resent pool stats");
348 pcb
.add_u64(l_osdc_statfs_active
, "statfs_active", "Statfs operations");
349 pcb
.add_u64_counter(l_osdc_statfs_send
, "statfs_send", "Sent FS stats");
350 pcb
.add_u64_counter(l_osdc_statfs_resend
, "statfs_resend",
353 pcb
.add_u64(l_osdc_command_active
, "command_active", "Active commands");
354 pcb
.add_u64_counter(l_osdc_command_send
, "command_send",
356 pcb
.add_u64_counter(l_osdc_command_resend
, "command_resend",
359 pcb
.add_u64(l_osdc_map_epoch
, "map_epoch", "OSD map epoch");
360 pcb
.add_u64_counter(l_osdc_map_full
, "map_full",
361 "Full OSD maps received");
362 pcb
.add_u64_counter(l_osdc_map_inc
, "map_inc",
363 "Incremental OSD maps received");
365 pcb
.add_u64(l_osdc_osd_sessions
, "osd_sessions",
366 "Open sessions"); // open sessions
367 pcb
.add_u64_counter(l_osdc_osd_session_open
, "osd_session_open",
369 pcb
.add_u64_counter(l_osdc_osd_session_close
, "osd_session_close",
371 pcb
.add_u64(l_osdc_osd_laggy
, "osd_laggy", "Laggy OSD sessions");
373 pcb
.add_u64_counter(l_osdc_osdop_omap_wr
, "omap_wr",
374 "OSD OMAP write operations");
375 pcb
.add_u64_counter(l_osdc_osdop_omap_rd
, "omap_rd",
376 "OSD OMAP read operations");
377 pcb
.add_u64_counter(l_osdc_osdop_omap_del
, "omap_del",
378 "OSD OMAP delete operations");
380 logger
= pcb
.create_perf_counters();
381 cct
->get_perfcounters_collection()->add(logger
);
384 m_request_state_hook
= new RequestStateHook(this);
385 auto admin_socket
= cct
->get_admin_socket();
386 int ret
= admin_socket
->register_command("objecter_requests",
387 m_request_state_hook
,
388 "show in-progress osd requests");
390 /* Don't warn on EEXIST, happens if multiple ceph clients
391 * are instantiated from one process */
392 if (ret
< 0 && ret
!= -EEXIST
) {
393 lderr(cct
) << "error registering admin socket command: "
394 << cpp_strerror(ret
) << dendl
;
397 update_crush_location();
399 cct
->_conf
.add_observer(this);
405 * ok, cluster interaction can happen
407 void Objecter::start(const OSDMap
* o
)
409 shared_lock
rl(rwlock
);
413 osdmap
->deepish_copy_from(*o
);
414 prune_pg_mapping(osdmap
->get_pools());
415 } else if (osdmap
->get_epoch() == 0) {
416 _maybe_request_map();
420 void Objecter::shutdown()
422 ceph_assert(initialized
);
424 unique_lock
wl(rwlock
);
429 cct
->_conf
.remove_observer(this);
432 while (!osd_sessions
.empty()) {
433 auto p
= osd_sessions
.begin();
434 close_session(p
->second
);
437 while(!check_latest_map_lingers
.empty()) {
438 auto i
= check_latest_map_lingers
.begin();
440 check_latest_map_lingers
.erase(i
->first
);
443 while(!check_latest_map_ops
.empty()) {
444 auto i
= check_latest_map_ops
.begin();
446 check_latest_map_ops
.erase(i
->first
);
449 while(!check_latest_map_commands
.empty()) {
450 auto i
= check_latest_map_commands
.begin();
452 check_latest_map_commands
.erase(i
->first
);
455 while(!poolstat_ops
.empty()) {
456 auto i
= poolstat_ops
.begin();
458 poolstat_ops
.erase(i
->first
);
461 while(!statfs_ops
.empty()) {
462 auto i
= statfs_ops
.begin();
464 statfs_ops
.erase(i
->first
);
467 while(!pool_ops
.empty()) {
468 auto i
= pool_ops
.begin();
470 pool_ops
.erase(i
->first
);
473 ldout(cct
, 20) << __func__
<< " clearing up homeless session..." << dendl
;
474 while(!homeless_session
->linger_ops
.empty()) {
475 auto i
= homeless_session
->linger_ops
.begin();
476 ldout(cct
, 10) << " linger_op " << i
->first
<< dendl
;
477 LingerOp
*lop
= i
->second
;
479 std::unique_lock
swl(homeless_session
->lock
);
480 _session_linger_op_remove(homeless_session
, lop
);
482 linger_ops
.erase(lop
->linger_id
);
483 linger_ops_set
.erase(lop
);
487 while(!homeless_session
->ops
.empty()) {
488 auto i
= homeless_session
->ops
.begin();
489 ldout(cct
, 10) << " op " << i
->first
<< dendl
;
492 std::unique_lock
swl(homeless_session
->lock
);
493 _session_op_remove(homeless_session
, op
);
498 while(!homeless_session
->command_ops
.empty()) {
499 auto i
= homeless_session
->command_ops
.begin();
500 ldout(cct
, 10) << " command_op " << i
->first
<< dendl
;
501 auto cop
= i
->second
;
503 std::unique_lock
swl(homeless_session
->lock
);
504 _session_command_op_remove(homeless_session
, cop
);
510 if (timer
.cancel_event(tick_event
)) {
511 ldout(cct
, 10) << " successfully canceled tick" << dendl
;
517 cct
->get_perfcounters_collection()->remove(logger
);
522 // Let go of Objecter write lock so timer thread can shutdown
525 // Outside of lock to avoid cycle WRT calls to RequestStateHook
526 // This is safe because we guarantee no concurrent calls to
527 // shutdown() with the ::initialized check at start.
528 if (m_request_state_hook
) {
529 auto admin_socket
= cct
->get_admin_socket();
530 admin_socket
->unregister_commands(m_request_state_hook
);
531 delete m_request_state_hook
;
532 m_request_state_hook
= NULL
;
536 void Objecter::_send_linger(LingerOp
*info
,
537 ceph::shunique_lock
<ceph::shared_mutex
>& sul
)
539 ceph_assert(sul
.owns_lock() && sul
.mutex() == &rwlock
);
541 fu2::unique_function
<Op::OpSig
> oncommit
;
543 std::shared_lock
watchl(info
->watch_lock
);
544 cb::list
*poutbl
= nullptr;
545 if (info
->registered
&& info
->is_watch
) {
546 ldout(cct
, 15) << "send_linger " << info
->linger_id
<< " reconnect"
548 opv
.push_back(OSDOp());
549 opv
.back().op
.op
= CEPH_OSD_OP_WATCH
;
550 opv
.back().op
.watch
.cookie
= info
->get_cookie();
551 opv
.back().op
.watch
.op
= CEPH_OSD_WATCH_OP_RECONNECT
;
552 opv
.back().op
.watch
.gen
= ++info
->register_gen
;
553 oncommit
= CB_Linger_Reconnect(this, info
);
555 ldout(cct
, 15) << "send_linger " << info
->linger_id
<< " register"
558 // TODO Augment ca::Completion with an equivalent of
559 // target so we can handle these cases better.
560 auto c
= std::make_unique
<CB_Linger_Commit
>(this, info
);
561 if (!info
->is_watch
) {
565 oncommit
= [c
= std::move(c
)](bs::error_code ec
) mutable {
570 auto o
= new Op(info
->target
.base_oid
, info
->target
.base_oloc
,
571 std::move(opv
), info
->target
.flags
| CEPH_OSD_FLAG_READ
,
572 std::move(oncommit
), info
->pobjver
);
574 o
->snapid
= info
->snap
;
575 o
->snapc
= info
->snapc
;
576 o
->mtime
= info
->mtime
;
578 o
->target
= info
->target
;
581 // do not resend this; we will send a new op to reregister
582 o
->should_resend
= false;
583 o
->ctx_budgeted
= true;
585 if (info
->register_tid
) {
586 // repeat send. cancel old registration op, if any.
587 std::unique_lock
sl(info
->session
->lock
);
588 if (info
->session
->ops
.count(info
->register_tid
)) {
589 auto o
= info
->session
->ops
[info
->register_tid
];
590 _op_cancel_map_check(o
);
591 _cancel_linger_op(o
);
596 _op_submit_with_budget(o
, sul
, &info
->register_tid
, &info
->ctx_budget
);
598 logger
->inc(l_osdc_linger_send
);
601 void Objecter::_linger_commit(LingerOp
*info
, bs::error_code ec
,
604 std::unique_lock
wl(info
->watch_lock
);
605 ldout(cct
, 10) << "_linger_commit " << info
->linger_id
<< dendl
;
606 if (info
->on_reg_commit
) {
607 info
->on_reg_commit
->defer(std::move(info
->on_reg_commit
),
609 info
->on_reg_commit
.reset();
611 if (ec
&& info
->on_notify_finish
) {
612 info
->on_notify_finish
->defer(std::move(info
->on_notify_finish
),
614 info
->on_notify_finish
.reset();
617 // only tell the user the first time we do this
618 info
->registered
= true;
619 info
->pobjver
= NULL
;
621 if (!info
->is_watch
) {
622 // make note of the notify_id
623 auto p
= outbl
.cbegin();
625 decode(info
->notify_id
, p
);
626 ldout(cct
, 10) << "_linger_commit notify_id=" << info
->notify_id
629 catch (cb::error
& e
) {
634 class CB_DoWatchError
{
636 boost::intrusive_ptr
<Objecter::LingerOp
> info
;
639 CB_DoWatchError(Objecter
*o
, Objecter::LingerOp
*i
,
641 : objecter(o
), info(i
), ec(ec
) {
642 info
->_queued_async();
645 std::unique_lock
wl(objecter
->rwlock
);
646 bool canceled
= info
->canceled
;
650 info
->handle(ec
, 0, info
->get_cookie(), 0, {});
653 info
->finished_async();
657 bs::error_code
Objecter::_normalize_watch_error(bs::error_code ec
)
659 // translate ENOENT -> ENOTCONN so that a delete->disconnection
660 // notification and a failure to reconnect because we raced with
661 // the delete appear the same to the user.
662 if (ec
== bs::errc::no_such_file_or_directory
)
663 ec
= bs::error_code(ENOTCONN
, osd_category());
667 void Objecter::_linger_reconnect(LingerOp
*info
, bs::error_code ec
)
669 ldout(cct
, 10) << __func__
<< " " << info
->linger_id
<< " = " << ec
670 << " (last_error " << info
->last_error
<< ")" << dendl
;
671 std::unique_lock
wl(info
->watch_lock
);
673 if (!info
->last_error
) {
674 ec
= _normalize_watch_error(ec
);
676 boost::asio::defer(finish_strand
, CB_DoWatchError(this, info
, ec
));
681 info
->last_error
= ec
;
684 void Objecter::_send_linger_ping(LingerOp
*info
)
686 // rwlock is locked unique
687 // info->session->lock is locked
689 if (cct
->_conf
->objecter_inject_no_watch_ping
) {
690 ldout(cct
, 10) << __func__
<< " " << info
->linger_id
<< " SKIPPING"
694 if (osdmap
->test_flag(CEPH_OSDMAP_PAUSERD
)) {
695 ldout(cct
, 10) << __func__
<< " PAUSERD" << dendl
;
699 ceph::coarse_mono_time now
= ceph::coarse_mono_clock::now();
700 ldout(cct
, 10) << __func__
<< " " << info
->linger_id
<< " now " << now
704 opv
[0].op
.op
= CEPH_OSD_OP_WATCH
;
705 opv
[0].op
.watch
.cookie
= info
->get_cookie();
706 opv
[0].op
.watch
.op
= CEPH_OSD_WATCH_OP_PING
;
707 opv
[0].op
.watch
.gen
= info
->register_gen
;
709 Op
*o
= new Op(info
->target
.base_oid
, info
->target
.base_oloc
,
710 std::move(opv
), info
->target
.flags
| CEPH_OSD_FLAG_READ
,
711 CB_Linger_Ping(this, info
, now
),
713 o
->target
= info
->target
;
714 o
->should_resend
= false;
717 _session_op_assign(info
->session
, o
);
719 info
->ping_tid
= o
->tid
;
721 logger
->inc(l_osdc_linger_ping
);
724 void Objecter::_linger_ping(LingerOp
*info
, bs::error_code ec
, ceph::coarse_mono_time sent
,
725 uint32_t register_gen
)
727 std::unique_lock
l(info
->watch_lock
);
728 ldout(cct
, 10) << __func__
<< " " << info
->linger_id
729 << " sent " << sent
<< " gen " << register_gen
<< " = " << ec
730 << " (last_error " << info
->last_error
731 << " register_gen " << info
->register_gen
<< ")" << dendl
;
732 if (info
->register_gen
== register_gen
) {
734 info
->watch_valid_thru
= sent
;
735 } else if (ec
&& !info
->last_error
) {
736 ec
= _normalize_watch_error(ec
);
737 info
->last_error
= ec
;
739 boost::asio::defer(finish_strand
, CB_DoWatchError(this, info
, ec
));
743 ldout(cct
, 20) << " ignoring old gen" << dendl
;
747 tl::expected
<ceph::timespan
,
748 bs::error_code
> Objecter::linger_check(LingerOp
*info
)
750 std::shared_lock
l(info
->watch_lock
);
752 ceph::coarse_mono_time stamp
= info
->watch_valid_thru
;
753 if (!info
->watch_pending_async
.empty())
754 stamp
= std::min(info
->watch_valid_thru
, info
->watch_pending_async
.front());
755 auto age
= ceph::coarse_mono_clock::now() - stamp
;
757 ldout(cct
, 10) << __func__
<< " " << info
->linger_id
758 << " err " << info
->last_error
759 << " age " << age
<< dendl
;
760 if (info
->last_error
)
761 return tl::unexpected(info
->last_error
);
762 // return a safe upper bound (we are truncating to ms)
766 void Objecter::linger_cancel(LingerOp
*info
)
768 unique_lock
wl(rwlock
);
769 _linger_cancel(info
);
773 void Objecter::_linger_cancel(LingerOp
*info
)
775 // rwlock is locked unique
776 ldout(cct
, 20) << __func__
<< " linger_id=" << info
->linger_id
<< dendl
;
777 if (!info
->canceled
) {
778 OSDSession
*s
= info
->session
;
779 std::unique_lock
sl(s
->lock
);
780 _session_linger_op_remove(s
, info
);
783 linger_ops
.erase(info
->linger_id
);
784 linger_ops_set
.erase(info
);
785 ceph_assert(linger_ops
.size() == linger_ops_set
.size());
787 info
->canceled
= true;
790 logger
->dec(l_osdc_linger_active
);
796 Objecter::LingerOp
*Objecter::linger_register(const object_t
& oid
,
797 const object_locator_t
& oloc
,
800 unique_lock
l(rwlock
);
802 auto info
= new LingerOp(this, ++max_linger_id
);
803 info
->target
.base_oid
= oid
;
804 info
->target
.base_oloc
= oloc
;
805 if (info
->target
.base_oloc
.key
== oid
)
806 info
->target
.base_oloc
.key
.clear();
807 info
->target
.flags
= flags
;
808 info
->watch_valid_thru
= ceph::coarse_mono_clock::now();
809 ldout(cct
, 10) << __func__
<< " info " << info
810 << " linger_id " << info
->linger_id
811 << " cookie " << info
->get_cookie()
813 linger_ops
[info
->linger_id
] = info
;
814 linger_ops_set
.insert(info
);
815 ceph_assert(linger_ops
.size() == linger_ops_set
.size());
817 info
->get(); // for the caller
821 ceph_tid_t
Objecter::linger_watch(LingerOp
*info
,
823 const SnapContext
& snapc
,
826 decltype(info
->on_reg_commit
)&& oncommit
,
829 info
->is_watch
= true;
832 info
->target
.flags
|= CEPH_OSD_FLAG_WRITE
;
835 info
->pobjver
= objver
;
836 info
->on_reg_commit
= std::move(oncommit
);
838 info
->ctx_budget
= take_linger_budget(info
);
840 shunique_lock
sul(rwlock
, ceph::acquire_unique
);
841 _linger_submit(info
, sul
);
842 logger
->inc(l_osdc_linger_active
);
845 return info
->linger_id
;
848 ceph_tid_t
Objecter::linger_notify(LingerOp
*info
,
850 snapid_t snap
, cb::list
& inbl
,
851 decltype(LingerOp::on_reg_commit
)&& onfinish
,
855 info
->target
.flags
|= CEPH_OSD_FLAG_READ
;
858 info
->pobjver
= objver
;
859 info
->on_reg_commit
= std::move(onfinish
);
860 info
->ctx_budget
= take_linger_budget(info
);
862 shunique_lock
sul(rwlock
, ceph::acquire_unique
);
863 _linger_submit(info
, sul
);
864 logger
->inc(l_osdc_linger_active
);
867 return info
->linger_id
;
870 void Objecter::_linger_submit(LingerOp
*info
,
871 ceph::shunique_lock
<ceph::shared_mutex
>& sul
)
873 ceph_assert(sul
.owns_lock() && sul
.mutex() == &rwlock
);
874 ceph_assert(info
->linger_id
);
875 ceph_assert(info
->ctx_budget
!= -1); // caller needs to have taken budget already!
877 // Populate Op::target
878 OSDSession
*s
= NULL
;
879 int r
= _calc_target(&info
->target
, nullptr);
881 case RECALC_OP_TARGET_POOL_EIO
:
882 _check_linger_pool_eio(info
);
886 // Create LingerOp<->OSDSession relation
887 r
= _get_session(info
->target
.osd
, &s
, sul
);
889 unique_lock
sl(s
->lock
);
890 _session_linger_op_assign(s
, info
);
894 _send_linger(info
, sul
);
897 struct CB_DoWatchNotify
{
899 boost::intrusive_ptr
<Objecter::LingerOp
> info
;
900 boost::intrusive_ptr
<MWatchNotify
> msg
;
901 CB_DoWatchNotify(Objecter
*o
, Objecter::LingerOp
*i
, MWatchNotify
*m
)
902 : objecter(o
), info(i
), msg(m
) {
903 info
->_queued_async();
906 objecter
->_do_watch_notify(std::move(info
), std::move(msg
));
910 void Objecter::handle_watch_notify(MWatchNotify
*m
)
912 shared_lock
l(rwlock
);
917 LingerOp
*info
= reinterpret_cast<LingerOp
*>(m
->cookie
);
918 if (linger_ops_set
.count(info
) == 0) {
919 ldout(cct
, 7) << __func__
<< " cookie " << m
->cookie
<< " dne" << dendl
;
922 std::unique_lock
wl(info
->watch_lock
);
923 if (m
->opcode
== CEPH_WATCH_EVENT_DISCONNECT
) {
924 if (!info
->last_error
) {
925 info
->last_error
= bs::error_code(ENOTCONN
, osd_category());
927 boost::asio::defer(finish_strand
, CB_DoWatchError(this, info
,
931 } else if (!info
->is_watch
) {
932 // we have CEPH_WATCH_EVENT_NOTIFY_COMPLETE; we can do this inline
933 // since we know the only user (librados) is safe to call in
934 // fast-dispatch context
935 if (info
->notify_id
&&
936 info
->notify_id
!= m
->notify_id
) {
937 ldout(cct
, 10) << __func__
<< " reply notify " << m
->notify_id
938 << " != " << info
->notify_id
<< ", ignoring" << dendl
;
939 } else if (info
->on_notify_finish
) {
940 info
->on_notify_finish
->defer(
941 std::move(info
->on_notify_finish
),
942 osdcode(m
->return_code
), std::move(m
->get_data()));
944 // if we race with reconnect we might get a second notify; only
945 // notify the caller once!
946 info
->on_notify_finish
= nullptr;
949 boost::asio::defer(finish_strand
, CB_DoWatchNotify(this, info
, m
));
953 void Objecter::_do_watch_notify(boost::intrusive_ptr
<LingerOp
> info
,
954 boost::intrusive_ptr
<MWatchNotify
> m
)
956 ldout(cct
, 10) << __func__
<< " " << *m
<< dendl
;
958 shared_lock
l(rwlock
);
959 ceph_assert(initialized
);
961 if (info
->canceled
) {
966 // notify completion?
967 ceph_assert(info
->is_watch
);
968 ceph_assert(info
->handle
);
969 ceph_assert(m
->opcode
!= CEPH_WATCH_EVENT_DISCONNECT
);
974 case CEPH_WATCH_EVENT_NOTIFY
:
975 info
->handle({}, m
->notify_id
, m
->cookie
, m
->notifier_gid
, std::move(m
->bl
));
980 info
->finished_async();
983 bool Objecter::ms_dispatch(Message
*m
)
985 ldout(cct
, 10) << __func__
<< " " << cct
<< " " << *m
<< dendl
;
986 switch (m
->get_type()) {
987 // these we exlusively handle
988 case CEPH_MSG_OSD_OPREPLY
:
989 handle_osd_op_reply(static_cast<MOSDOpReply
*>(m
));
992 case CEPH_MSG_OSD_BACKOFF
:
993 handle_osd_backoff(static_cast<MOSDBackoff
*>(m
));
996 case CEPH_MSG_WATCH_NOTIFY
:
997 handle_watch_notify(static_cast<MWatchNotify
*>(m
));
1001 case MSG_COMMAND_REPLY
:
1002 if (m
->get_source().type() == CEPH_ENTITY_TYPE_OSD
) {
1003 handle_command_reply(static_cast<MCommandReply
*>(m
));
1009 case MSG_GETPOOLSTATSREPLY
:
1010 handle_get_pool_stats_reply(static_cast<MGetPoolStatsReply
*>(m
));
1013 case CEPH_MSG_POOLOP_REPLY
:
1014 handle_pool_op_reply(static_cast<MPoolOpReply
*>(m
));
1017 case CEPH_MSG_STATFS_REPLY
:
1018 handle_fs_stats_reply(static_cast<MStatfsReply
*>(m
));
1021 // these we give others a chance to inspect
1024 case CEPH_MSG_OSD_MAP
:
1025 handle_osd_map(static_cast<MOSDMap
*>(m
));
1031 void Objecter::_scan_requests(
1035 map
<int64_t, bool> *pool_full_map
,
1036 map
<ceph_tid_t
, Op
*>& need_resend
,
1037 list
<LingerOp
*>& need_resend_linger
,
1038 map
<ceph_tid_t
, CommandOp
*>& need_resend_command
,
1039 ceph::shunique_lock
<ceph::shared_mutex
>& sul
)
1041 ceph_assert(sul
.owns_lock() && sul
.mutex() == &rwlock
);
1043 list
<LingerOp
*> unregister_lingers
;
1045 std::unique_lock
sl(s
->lock
);
1047 // check for changed linger mappings (_before_ regular ops)
1048 auto lp
= s
->linger_ops
.begin();
1049 while (lp
!= s
->linger_ops
.end()) {
1050 auto op
= lp
->second
;
1051 ceph_assert(op
->session
== s
);
1052 // check_linger_pool_dne() may touch linger_ops; prevent iterator
1055 ldout(cct
, 10) << " checking linger op " << op
->linger_id
<< dendl
;
1056 bool unregister
, force_resend_writes
= cluster_full
;
1057 int r
= _recalc_linger_op_target(op
, sul
);
1059 force_resend_writes
= force_resend_writes
||
1060 (*pool_full_map
)[op
->target
.base_oloc
.pool
];
1062 case RECALC_OP_TARGET_NO_ACTION
:
1063 if (!skipped_map
&& !force_resend_writes
)
1066 case RECALC_OP_TARGET_NEED_RESEND
:
1067 need_resend_linger
.push_back(op
);
1068 _linger_cancel_map_check(op
);
1070 case RECALC_OP_TARGET_POOL_DNE
:
1071 _check_linger_pool_dne(op
, &unregister
);
1073 ldout(cct
, 10) << " need to unregister linger op "
1074 << op
->linger_id
<< dendl
;
1076 unregister_lingers
.push_back(op
);
1079 case RECALC_OP_TARGET_POOL_EIO
:
1080 _check_linger_pool_eio(op
);
1081 ldout(cct
, 10) << " need to unregister linger op "
1082 << op
->linger_id
<< dendl
;
1084 unregister_lingers
.push_back(op
);
1089 // check for changed request mappings
1090 auto p
= s
->ops
.begin();
1091 while (p
!= s
->ops
.end()) {
1093 ++p
; // check_op_pool_dne() may touch ops; prevent iterator invalidation
1094 ldout(cct
, 10) << " checking op " << op
->tid
<< dendl
;
1095 _prune_snapc(osdmap
->get_new_removed_snaps(), op
);
1096 bool force_resend_writes
= cluster_full
;
1098 force_resend_writes
= force_resend_writes
||
1099 (*pool_full_map
)[op
->target
.base_oloc
.pool
];
1100 int r
= _calc_target(&op
->target
,
1101 op
->session
? op
->session
->con
.get() : nullptr);
1103 case RECALC_OP_TARGET_NO_ACTION
:
1104 if (!skipped_map
&& !(force_resend_writes
&& op
->target
.respects_full()))
1107 case RECALC_OP_TARGET_NEED_RESEND
:
1108 _session_op_remove(op
->session
, op
);
1109 need_resend
[op
->tid
] = op
;
1110 _op_cancel_map_check(op
);
1112 case RECALC_OP_TARGET_POOL_DNE
:
1113 _check_op_pool_dne(op
, &sl
);
1115 case RECALC_OP_TARGET_POOL_EIO
:
1116 _check_op_pool_eio(op
, &sl
);
1122 auto cp
= s
->command_ops
.begin();
1123 while (cp
!= s
->command_ops
.end()) {
1124 auto c
= cp
->second
;
1126 ldout(cct
, 10) << " checking command " << c
->tid
<< dendl
;
1127 bool force_resend_writes
= cluster_full
;
1129 force_resend_writes
= force_resend_writes
||
1130 (*pool_full_map
)[c
->target_pg
.pool()];
1131 int r
= _calc_command_target(c
, sul
);
1133 case RECALC_OP_TARGET_NO_ACTION
:
1134 // resend if skipped map; otherwise do nothing.
1135 if (!skipped_map
&& !force_resend_writes
)
1138 case RECALC_OP_TARGET_NEED_RESEND
:
1139 need_resend_command
[c
->tid
] = c
;
1140 _session_command_op_remove(c
->session
, c
);
1141 _command_cancel_map_check(c
);
1143 case RECALC_OP_TARGET_POOL_DNE
:
1144 case RECALC_OP_TARGET_OSD_DNE
:
1145 case RECALC_OP_TARGET_OSD_DOWN
:
1146 _check_command_map_dne(c
);
1153 for (auto iter
= unregister_lingers
.begin();
1154 iter
!= unregister_lingers
.end();
1156 _linger_cancel(*iter
);
1161 void Objecter::handle_osd_map(MOSDMap
*m
)
1163 ceph::shunique_lock
sul(rwlock
, acquire_unique
);
1167 ceph_assert(osdmap
);
1169 if (m
->fsid
!= monc
->get_fsid()) {
1170 ldout(cct
, 0) << "handle_osd_map fsid " << m
->fsid
1171 << " != " << monc
->get_fsid() << dendl
;
1175 bool was_pauserd
= osdmap
->test_flag(CEPH_OSDMAP_PAUSERD
);
1176 bool cluster_full
= _osdmap_full_flag();
1177 bool was_pausewr
= osdmap
->test_flag(CEPH_OSDMAP_PAUSEWR
) || cluster_full
||
1178 _osdmap_has_pool_full();
1179 map
<int64_t, bool> pool_full_map
;
1180 for (auto it
= osdmap
->get_pools().begin();
1181 it
!= osdmap
->get_pools().end(); ++it
)
1182 pool_full_map
[it
->first
] = _osdmap_pool_full(it
->second
);
1185 list
<LingerOp
*> need_resend_linger
;
1186 map
<ceph_tid_t
, Op
*> need_resend
;
1187 map
<ceph_tid_t
, CommandOp
*> need_resend_command
;
1189 if (m
->get_last() <= osdmap
->get_epoch()) {
1190 ldout(cct
, 3) << "handle_osd_map ignoring epochs ["
1191 << m
->get_first() << "," << m
->get_last()
1192 << "] <= " << osdmap
->get_epoch() << dendl
;
1194 ldout(cct
, 3) << "handle_osd_map got epochs ["
1195 << m
->get_first() << "," << m
->get_last()
1196 << "] > " << osdmap
->get_epoch() << dendl
;
1198 if (osdmap
->get_epoch()) {
1199 bool skipped_map
= false;
1200 // we want incrementals
1201 for (epoch_t e
= osdmap
->get_epoch() + 1;
1205 if (osdmap
->get_epoch() == e
-1 &&
1206 m
->incremental_maps
.count(e
)) {
1207 ldout(cct
, 3) << "handle_osd_map decoding incremental epoch " << e
1209 OSDMap::Incremental
inc(m
->incremental_maps
[e
]);
1210 osdmap
->apply_incremental(inc
);
1212 emit_blocklist_events(inc
);
1214 logger
->inc(l_osdc_map_inc
);
1216 else if (m
->maps
.count(e
)) {
1217 ldout(cct
, 3) << "handle_osd_map decoding full epoch " << e
<< dendl
;
1218 auto new_osdmap
= std::make_unique
<OSDMap
>();
1219 new_osdmap
->decode(m
->maps
[e
]);
1221 emit_blocklist_events(*osdmap
, *new_osdmap
);
1222 osdmap
= std::move(new_osdmap
);
1224 logger
->inc(l_osdc_map_full
);
1227 if (e
>= m
->cluster_osdmap_trim_lower_bound
) {
1228 ldout(cct
, 3) << "handle_osd_map requesting missing epoch "
1229 << osdmap
->get_epoch()+1 << dendl
;
1230 _maybe_request_map();
1233 ldout(cct
, 3) << "handle_osd_map missing epoch "
1234 << osdmap
->get_epoch()+1
1236 << m
->cluster_osdmap_trim_lower_bound
<< dendl
;
1237 e
= m
->cluster_osdmap_trim_lower_bound
- 1;
1241 logger
->set(l_osdc_map_epoch
, osdmap
->get_epoch());
1243 prune_pg_mapping(osdmap
->get_pools());
1244 cluster_full
= cluster_full
|| _osdmap_full_flag();
1245 update_pool_full_map(pool_full_map
);
1247 // check all outstanding requests on every epoch
1248 for (auto& i
: need_resend
) {
1249 _prune_snapc(osdmap
->get_new_removed_snaps(), i
.second
);
1251 _scan_requests(homeless_session
, skipped_map
, cluster_full
,
1252 &pool_full_map
, need_resend
,
1253 need_resend_linger
, need_resend_command
, sul
);
1254 for (auto p
= osd_sessions
.begin();
1255 p
!= osd_sessions
.end(); ) {
1257 _scan_requests(s
, skipped_map
, cluster_full
,
1258 &pool_full_map
, need_resend
,
1259 need_resend_linger
, need_resend_command
, sul
);
1261 // osd down or addr change?
1262 if (!osdmap
->is_up(s
->osd
) ||
1264 s
->con
->get_peer_addrs() != osdmap
->get_addrs(s
->osd
))) {
1269 ceph_assert(e
== osdmap
->get_epoch());
1273 // first map. we want the full thing.
1274 if (m
->maps
.count(m
->get_last())) {
1275 for (auto p
= osd_sessions
.begin();
1276 p
!= osd_sessions
.end(); ++p
) {
1277 OSDSession
*s
= p
->second
;
1278 _scan_requests(s
, false, false, NULL
, need_resend
,
1279 need_resend_linger
, need_resend_command
, sul
);
1281 ldout(cct
, 3) << "handle_osd_map decoding full epoch "
1282 << m
->get_last() << dendl
;
1283 osdmap
->decode(m
->maps
[m
->get_last()]);
1284 prune_pg_mapping(osdmap
->get_pools());
1286 _scan_requests(homeless_session
, false, false, NULL
,
1287 need_resend
, need_resend_linger
,
1288 need_resend_command
, sul
);
1290 ldout(cct
, 3) << "handle_osd_map hmm, i want a full map, requesting"
1292 monc
->sub_want("osdmap", 0, CEPH_SUBSCRIBE_ONETIME
);
1298 // make sure need_resend targets reflect latest map
1299 for (auto p
= need_resend
.begin(); p
!= need_resend
.end(); ) {
1301 if (op
->target
.epoch
< osdmap
->get_epoch()) {
1302 ldout(cct
, 10) << __func__
<< " checking op " << p
->first
<< dendl
;
1303 int r
= _calc_target(&op
->target
, nullptr);
1304 if (r
== RECALC_OP_TARGET_POOL_DNE
) {
1305 p
= need_resend
.erase(p
);
1306 _check_op_pool_dne(op
, nullptr);
1315 bool pauserd
= osdmap
->test_flag(CEPH_OSDMAP_PAUSERD
);
1316 bool pausewr
= osdmap
->test_flag(CEPH_OSDMAP_PAUSEWR
) || _osdmap_full_flag()
1317 || _osdmap_has_pool_full();
1320 if (was_pauserd
|| was_pausewr
|| pauserd
|| pausewr
||
1321 osdmap
->get_epoch() < epoch_barrier
) {
1322 _maybe_request_map();
1326 for (auto p
= need_resend
.begin();
1327 p
!= need_resend
.end(); ++p
) {
1328 auto op
= p
->second
;
1329 auto s
= op
->session
;
1330 bool mapped_session
= false;
1332 int r
= _map_session(&op
->target
, &s
, sul
);
1333 ceph_assert(r
== 0);
1334 mapped_session
= true;
1338 std::unique_lock
sl(s
->lock
);
1339 if (mapped_session
) {
1340 _session_op_assign(s
, op
);
1342 if (op
->should_resend
) {
1343 if (!op
->session
->is_homeless() && !op
->target
.paused
) {
1344 logger
->inc(l_osdc_op_resend
);
1348 _op_cancel_map_check(op
);
1349 _cancel_linger_op(op
);
1354 for (auto p
= need_resend_linger
.begin();
1355 p
!= need_resend_linger
.end(); ++p
) {
1357 ceph_assert(op
->session
);
1358 if (!op
->session
->is_homeless()) {
1359 logger
->inc(l_osdc_linger_resend
);
1360 _send_linger(op
, sul
);
1363 for (auto p
= need_resend_command
.begin();
1364 p
!= need_resend_command
.end(); ++p
) {
1366 if (c
->target
.osd
>= 0) {
1367 _assign_command_session(c
, sul
);
1368 if (c
->session
&& !c
->session
->is_homeless()) {
1376 // finish any Contexts that were waiting on a map update
1377 auto p
= waiting_for_map
.begin();
1378 while (p
!= waiting_for_map
.end() &&
1379 p
->first
<= osdmap
->get_epoch()) {
1380 //go through the list and call the onfinish methods
1381 for (auto& [c
, ec
] : p
->second
) {
1382 ca::post(std::move(c
), ec
);
1384 waiting_for_map
.erase(p
++);
1387 monc
->sub_got("osdmap", osdmap
->get_epoch());
1389 if (!waiting_for_map
.empty()) {
1390 _maybe_request_map();
1394 void Objecter::enable_blocklist_events()
1396 unique_lock
wl(rwlock
);
1398 blocklist_events_enabled
= true;
1401 void Objecter::consume_blocklist_events(std::set
<entity_addr_t
> *events
)
1403 unique_lock
wl(rwlock
);
1405 if (events
->empty()) {
1406 events
->swap(blocklist_events
);
1408 for (const auto &i
: blocklist_events
) {
1411 blocklist_events
.clear();
1415 void Objecter::emit_blocklist_events(const OSDMap::Incremental
&inc
)
1417 if (!blocklist_events_enabled
) {
1421 for (const auto &i
: inc
.new_blocklist
) {
1422 blocklist_events
.insert(i
.first
);
1426 void Objecter::emit_blocklist_events(const OSDMap
&old_osd_map
,
1427 const OSDMap
&new_osd_map
)
1429 if (!blocklist_events_enabled
) {
1433 std::set
<entity_addr_t
> old_set
;
1434 std::set
<entity_addr_t
> new_set
;
1435 std::set
<entity_addr_t
> old_range_set
;
1436 std::set
<entity_addr_t
> new_range_set
;
1438 old_osd_map
.get_blocklist(&old_set
, &old_range_set
);
1439 new_osd_map
.get_blocklist(&new_set
, &new_range_set
);
1441 std::set
<entity_addr_t
> delta_set
;
1442 std::set_difference(
1443 new_set
.begin(), new_set
.end(), old_set
.begin(), old_set
.end(),
1444 std::inserter(delta_set
, delta_set
.begin()));
1445 std::set_difference(
1446 new_range_set
.begin(), new_range_set
.end(),
1447 old_range_set
.begin(), old_range_set
.end(),
1448 std::inserter(delta_set
, delta_set
.begin()));
1449 blocklist_events
.insert(delta_set
.begin(), delta_set
.end());
1454 void Objecter::CB_Op_Map_Latest::operator()(bs::error_code e
,
1455 version_t latest
, version_t
)
1457 if (e
== bs::errc::resource_unavailable_try_again
||
1458 e
== bs::errc::operation_canceled
)
1461 lgeneric_subdout(objecter
->cct
, objecter
, 10)
1462 << "op_map_latest r=" << e
<< " tid=" << tid
1463 << " latest " << latest
<< dendl
;
1465 unique_lock
wl(objecter
->rwlock
);
1467 auto iter
= objecter
->check_latest_map_ops
.find(tid
);
1468 if (iter
== objecter
->check_latest_map_ops
.end()) {
1469 lgeneric_subdout(objecter
->cct
, objecter
, 10)
1470 << "op_map_latest op "<< tid
<< " not found" << dendl
;
1474 Op
*op
= iter
->second
;
1475 objecter
->check_latest_map_ops
.erase(iter
);
1477 lgeneric_subdout(objecter
->cct
, objecter
, 20)
1478 << "op_map_latest op "<< op
<< dendl
;
1480 if (op
->map_dne_bound
== 0)
1481 op
->map_dne_bound
= latest
;
1483 unique_lock
sl(op
->session
->lock
, defer_lock
);
1484 objecter
->_check_op_pool_dne(op
, &sl
);
1489 int Objecter::pool_snap_by_name(int64_t poolid
, const char *snap_name
,
1490 snapid_t
*snap
) const
1492 shared_lock
rl(rwlock
);
1494 auto& pools
= osdmap
->get_pools();
1495 auto iter
= pools
.find(poolid
);
1496 if (iter
== pools
.end()) {
1499 const pg_pool_t
& pg_pool
= iter
->second
;
1500 for (auto p
= pg_pool
.snaps
.begin();
1501 p
!= pg_pool
.snaps
.end();
1503 if (p
->second
.name
== snap_name
) {
1511 int Objecter::pool_snap_get_info(int64_t poolid
, snapid_t snap
,
1512 pool_snap_info_t
*info
) const
1514 shared_lock
rl(rwlock
);
1516 auto& pools
= osdmap
->get_pools();
1517 auto iter
= pools
.find(poolid
);
1518 if (iter
== pools
.end()) {
1521 const pg_pool_t
& pg_pool
= iter
->second
;
1522 auto p
= pg_pool
.snaps
.find(snap
);
1523 if (p
== pg_pool
.snaps
.end())
1530 int Objecter::pool_snap_list(int64_t poolid
, vector
<uint64_t> *snaps
)
1532 shared_lock
rl(rwlock
);
1534 const pg_pool_t
*pi
= osdmap
->get_pg_pool(poolid
);
1537 for (auto p
= pi
->snaps
.begin();
1538 p
!= pi
->snaps
.end();
1540 snaps
->push_back(p
->first
);
1545 // sl may be unlocked.
1546 void Objecter::_check_op_pool_dne(Op
*op
, std::unique_lock
<std::shared_mutex
> *sl
)
1548 // rwlock is locked unique
1550 if (op
->target
.pool_ever_existed
) {
1551 // the pool previously existed and now it does not, which means it
1553 op
->map_dne_bound
= osdmap
->get_epoch();
1554 ldout(cct
, 10) << "check_op_pool_dne tid " << op
->tid
1555 << " pool previously exists but now does not"
1558 ldout(cct
, 10) << "check_op_pool_dne tid " << op
->tid
1559 << " current " << osdmap
->get_epoch()
1560 << " map_dne_bound " << op
->map_dne_bound
1563 if (op
->map_dne_bound
> 0) {
1564 if (osdmap
->get_epoch() >= op
->map_dne_bound
) {
1565 // we had a new enough map
1566 ldout(cct
, 10) << "check_op_pool_dne tid " << op
->tid
1567 << " concluding pool " << op
->target
.base_pgid
.pool()
1569 if (op
->has_completion()) {
1571 op
->complete(osdc_errc::pool_dne
, -ENOENT
);
1574 OSDSession
*s
= op
->session
;
1576 ceph_assert(s
!= NULL
);
1577 ceph_assert(sl
->mutex() == &s
->lock
);
1578 bool session_locked
= sl
->owns_lock();
1579 if (!session_locked
) {
1583 if (!session_locked
) {
1587 _finish_op(op
, 0); // no session
1591 _send_op_map_check(op
);
1595 // sl may be unlocked.
1596 void Objecter::_check_op_pool_eio(Op
*op
, std::unique_lock
<std::shared_mutex
> *sl
)
1598 // rwlock is locked unique
1600 // we had a new enough map
1601 ldout(cct
, 10) << "check_op_pool_eio tid " << op
->tid
1602 << " concluding pool " << op
->target
.base_pgid
.pool()
1603 << " has eio" << dendl
;
1604 if (op
->has_completion()) {
1606 op
->complete(osdc_errc::pool_eio
, -EIO
);
1609 OSDSession
*s
= op
->session
;
1611 ceph_assert(s
!= NULL
);
1612 ceph_assert(sl
->mutex() == &s
->lock
);
1613 bool session_locked
= sl
->owns_lock();
1614 if (!session_locked
) {
1618 if (!session_locked
) {
1622 _finish_op(op
, 0); // no session
1626 void Objecter::_send_op_map_check(Op
*op
)
1628 // rwlock is locked unique
1630 if (check_latest_map_ops
.count(op
->tid
) == 0) {
1632 check_latest_map_ops
[op
->tid
] = op
;
1633 monc
->get_version("osdmap", CB_Op_Map_Latest(this, op
->tid
));
1637 void Objecter::_op_cancel_map_check(Op
*op
)
1639 // rwlock is locked unique
1640 auto iter
= check_latest_map_ops
.find(op
->tid
);
1641 if (iter
!= check_latest_map_ops
.end()) {
1642 Op
*op
= iter
->second
;
1644 check_latest_map_ops
.erase(iter
);
1648 // linger pool check
1650 void Objecter::CB_Linger_Map_Latest::operator()(bs::error_code e
,
1654 if (e
== bs::errc::resource_unavailable_try_again
||
1655 e
== bs::errc::operation_canceled
) {
1656 // ignore callback; we will retry in resend_mon_ops()
1660 unique_lock
wl(objecter
->rwlock
);
1662 auto iter
= objecter
->check_latest_map_lingers
.find(linger_id
);
1663 if (iter
== objecter
->check_latest_map_lingers
.end()) {
1667 auto op
= iter
->second
;
1668 objecter
->check_latest_map_lingers
.erase(iter
);
1670 if (op
->map_dne_bound
== 0)
1671 op
->map_dne_bound
= latest
;
1674 objecter
->_check_linger_pool_dne(op
, &unregister
);
1677 objecter
->_linger_cancel(op
);
1683 void Objecter::_check_linger_pool_dne(LingerOp
*op
, bool *need_unregister
)
1685 // rwlock is locked unique
1687 *need_unregister
= false;
1689 if (op
->register_gen
> 0) {
1690 ldout(cct
, 10) << "_check_linger_pool_dne linger_id " << op
->linger_id
1691 << " pool previously existed but now does not"
1693 op
->map_dne_bound
= osdmap
->get_epoch();
1695 ldout(cct
, 10) << "_check_linger_pool_dne linger_id " << op
->linger_id
1696 << " current " << osdmap
->get_epoch()
1697 << " map_dne_bound " << op
->map_dne_bound
1700 if (op
->map_dne_bound
> 0) {
1701 if (osdmap
->get_epoch() >= op
->map_dne_bound
) {
1702 std::unique_lock wl
{op
->watch_lock
};
1703 if (op
->on_reg_commit
) {
1704 op
->on_reg_commit
->defer(std::move(op
->on_reg_commit
),
1705 osdc_errc::pool_dne
, cb::list
{});
1706 op
->on_reg_commit
= nullptr;
1708 if (op
->on_notify_finish
) {
1709 op
->on_notify_finish
->defer(std::move(op
->on_notify_finish
),
1710 osdc_errc::pool_dne
, cb::list
{});
1711 op
->on_notify_finish
= nullptr;
1713 *need_unregister
= true;
1716 _send_linger_map_check(op
);
1720 void Objecter::_check_linger_pool_eio(LingerOp
*op
)
1722 // rwlock is locked unique
1724 std::unique_lock wl
{op
->watch_lock
};
1725 if (op
->on_reg_commit
) {
1726 op
->on_reg_commit
->defer(std::move(op
->on_reg_commit
),
1727 osdc_errc::pool_dne
, cb::list
{});
1728 op
->on_reg_commit
= nullptr;
1730 if (op
->on_notify_finish
) {
1731 op
->on_notify_finish
->defer(std::move(op
->on_notify_finish
),
1732 osdc_errc::pool_dne
, cb::list
{});
1733 op
->on_notify_finish
= nullptr;
1737 void Objecter::_send_linger_map_check(LingerOp
*op
)
1740 if (check_latest_map_lingers
.count(op
->linger_id
) == 0) {
1742 check_latest_map_lingers
[op
->linger_id
] = op
;
1743 monc
->get_version("osdmap", CB_Linger_Map_Latest(this, op
->linger_id
));
1747 void Objecter::_linger_cancel_map_check(LingerOp
*op
)
1749 // rwlock is locked unique
1751 auto iter
= check_latest_map_lingers
.find(op
->linger_id
);
1752 if (iter
!= check_latest_map_lingers
.end()) {
1753 LingerOp
*op
= iter
->second
;
1755 check_latest_map_lingers
.erase(iter
);
1759 // command pool check
1761 void Objecter::CB_Command_Map_Latest::operator()(bs::error_code e
,
1762 version_t latest
, version_t
)
1764 if (e
== bs::errc::resource_unavailable_try_again
||
1765 e
== bs::errc::operation_canceled
) {
1766 // ignore callback; we will retry in resend_mon_ops()
1770 unique_lock
wl(objecter
->rwlock
);
1772 auto iter
= objecter
->check_latest_map_commands
.find(tid
);
1773 if (iter
== objecter
->check_latest_map_commands
.end()) {
1777 auto c
= iter
->second
;
1778 objecter
->check_latest_map_commands
.erase(iter
);
1780 if (c
->map_dne_bound
== 0)
1781 c
->map_dne_bound
= latest
;
1783 unique_lock
sul(c
->session
->lock
);
1784 objecter
->_check_command_map_dne(c
);
1790 void Objecter::_check_command_map_dne(CommandOp
*c
)
1792 // rwlock is locked unique
1793 // session is locked unique
1795 ldout(cct
, 10) << "_check_command_map_dne tid " << c
->tid
1796 << " current " << osdmap
->get_epoch()
1797 << " map_dne_bound " << c
->map_dne_bound
1799 if (c
->map_dne_bound
> 0) {
1800 if (osdmap
->get_epoch() >= c
->map_dne_bound
) {
1801 _finish_command(c
, osdcode(c
->map_check_error
),
1802 std::move(c
->map_check_error_str
), {});
1805 _send_command_map_check(c
);
1809 void Objecter::_send_command_map_check(CommandOp
*c
)
1811 // rwlock is locked unique
1812 // session is locked unique
1815 if (check_latest_map_commands
.count(c
->tid
) == 0) {
1817 check_latest_map_commands
[c
->tid
] = c
;
1818 monc
->get_version("osdmap", CB_Command_Map_Latest(this, c
->tid
));
1822 void Objecter::_command_cancel_map_check(CommandOp
*c
)
1824 // rwlock is locked uniqe
1826 auto iter
= check_latest_map_commands
.find(c
->tid
);
1827 if (iter
!= check_latest_map_commands
.end()) {
1828 auto c
= iter
->second
;
1830 check_latest_map_commands
.erase(iter
);
1836 * Look up OSDSession by OSD id.
1838 * @returns 0 on success, or -EAGAIN if the lock context requires
1839 * promotion to write.
1841 int Objecter::_get_session(int osd
, OSDSession
**session
,
1842 shunique_lock
<ceph::shared_mutex
>& sul
)
1844 ceph_assert(sul
&& sul
.mutex() == &rwlock
);
1847 *session
= homeless_session
;
1848 ldout(cct
, 20) << __func__
<< " osd=" << osd
<< " returning homeless"
1853 auto p
= osd_sessions
.find(osd
);
1854 if (p
!= osd_sessions
.end()) {
1858 ldout(cct
, 20) << __func__
<< " s=" << s
<< " osd=" << osd
<< " "
1859 << s
->get_nref() << dendl
;
1862 if (!sul
.owns_lock()) {
1865 auto s
= new OSDSession(cct
, osd
);
1866 osd_sessions
[osd
] = s
;
1867 s
->con
= messenger
->connect_to_osd(osdmap
->get_addrs(osd
));
1868 s
->con
->set_priv(RefCountedPtr
{s
});
1869 logger
->inc(l_osdc_osd_session_open
);
1870 logger
->set(l_osdc_osd_sessions
, osd_sessions
.size());
1873 ldout(cct
, 20) << __func__
<< " s=" << s
<< " osd=" << osd
<< " "
1874 << s
->get_nref() << dendl
;
1878 void Objecter::put_session(Objecter::OSDSession
*s
)
1880 if (s
&& !s
->is_homeless()) {
1881 ldout(cct
, 20) << __func__
<< " s=" << s
<< " osd=" << s
->osd
<< " "
1882 << s
->get_nref() << dendl
;
1887 void Objecter::get_session(Objecter::OSDSession
*s
)
1889 ceph_assert(s
!= NULL
);
1891 if (!s
->is_homeless()) {
1892 ldout(cct
, 20) << __func__
<< " s=" << s
<< " osd=" << s
->osd
<< " "
1893 << s
->get_nref() << dendl
;
1898 void Objecter::_reopen_session(OSDSession
*s
)
1900 // rwlock is locked unique
1901 // s->lock is locked
1903 auto addrs
= osdmap
->get_addrs(s
->osd
);
1904 ldout(cct
, 10) << "reopen_session osd." << s
->osd
<< " session, addr now "
1907 s
->con
->set_priv(NULL
);
1908 s
->con
->mark_down();
1909 logger
->inc(l_osdc_osd_session_close
);
1911 s
->con
= messenger
->connect_to_osd(addrs
);
1912 s
->con
->set_priv(RefCountedPtr
{s
});
1914 logger
->inc(l_osdc_osd_session_open
);
1917 void Objecter::close_session(OSDSession
*s
)
1919 // rwlock is locked unique
1921 ldout(cct
, 10) << "close_session for osd." << s
->osd
<< dendl
;
1923 s
->con
->set_priv(NULL
);
1924 s
->con
->mark_down();
1925 logger
->inc(l_osdc_osd_session_close
);
1927 unique_lock
sl(s
->lock
);
1929 std::list
<LingerOp
*> homeless_lingers
;
1930 std::list
<CommandOp
*> homeless_commands
;
1931 std::list
<Op
*> homeless_ops
;
1933 while (!s
->linger_ops
.empty()) {
1934 auto i
= s
->linger_ops
.begin();
1935 ldout(cct
, 10) << " linger_op " << i
->first
<< dendl
;
1936 homeless_lingers
.push_back(i
->second
);
1937 _session_linger_op_remove(s
, i
->second
);
1940 while (!s
->ops
.empty()) {
1941 auto i
= s
->ops
.begin();
1942 ldout(cct
, 10) << " op " << i
->first
<< dendl
;
1943 homeless_ops
.push_back(i
->second
);
1944 _session_op_remove(s
, i
->second
);
1947 while (!s
->command_ops
.empty()) {
1948 auto i
= s
->command_ops
.begin();
1949 ldout(cct
, 10) << " command_op " << i
->first
<< dendl
;
1950 homeless_commands
.push_back(i
->second
);
1951 _session_command_op_remove(s
, i
->second
);
1954 osd_sessions
.erase(s
->osd
);
1958 // Assign any leftover ops to the homeless session
1960 unique_lock
hsl(homeless_session
->lock
);
1961 for (auto i
= homeless_lingers
.begin();
1962 i
!= homeless_lingers
.end(); ++i
) {
1963 _session_linger_op_assign(homeless_session
, *i
);
1965 for (auto i
= homeless_ops
.begin();
1966 i
!= homeless_ops
.end(); ++i
) {
1967 _session_op_assign(homeless_session
, *i
);
1969 for (auto i
= homeless_commands
.begin();
1970 i
!= homeless_commands
.end(); ++i
) {
1971 _session_command_op_assign(homeless_session
, *i
);
1975 logger
->set(l_osdc_osd_sessions
, osd_sessions
.size());
1978 void Objecter::wait_for_osd_map(epoch_t e
)
1980 unique_lock
l(rwlock
);
1981 if (osdmap
->get_epoch() >= e
) {
1986 ca::waiter
<bs::error_code
> w
;
1987 waiting_for_map
[e
].emplace_back(OpCompletion::create(
1988 service
.get_executor(),
1995 void Objecter::_get_latest_version(epoch_t oldest
, epoch_t newest
,
1996 std::unique_ptr
<OpCompletion
> fin
,
1997 std::unique_lock
<ceph::shared_mutex
>&& l
)
2000 if (osdmap
->get_epoch() >= newest
) {
2001 ldout(cct
, 10) << __func__
<< " latest " << newest
<< ", have it" << dendl
;
2003 ca::defer(std::move(fin
), bs::error_code
{});
2005 ldout(cct
, 10) << __func__
<< " latest " << newest
<< ", waiting" << dendl
;
2006 _wait_for_new_map(std::move(fin
), newest
, bs::error_code
{});
2011 void Objecter::maybe_request_map()
2013 shared_lock
rl(rwlock
);
2014 _maybe_request_map();
2017 void Objecter::_maybe_request_map()
2021 if (_osdmap_full_flag()
2022 || osdmap
->test_flag(CEPH_OSDMAP_PAUSERD
)
2023 || osdmap
->test_flag(CEPH_OSDMAP_PAUSEWR
)) {
2024 ldout(cct
, 10) << "_maybe_request_map subscribing (continuous) to next "
2025 "osd map (FULL flag is set)" << dendl
;
2028 << "_maybe_request_map subscribing (onetime) to next osd map" << dendl
;
2029 flag
= CEPH_SUBSCRIBE_ONETIME
;
2031 epoch_t epoch
= osdmap
->get_epoch() ? osdmap
->get_epoch()+1 : 0;
2032 if (monc
->sub_want("osdmap", epoch
, flag
)) {
2037 void Objecter::_wait_for_new_map(std::unique_ptr
<OpCompletion
> c
, epoch_t epoch
,
2040 // rwlock is locked unique
2041 waiting_for_map
[epoch
].emplace_back(std::move(c
), ec
);
2042 _maybe_request_map();
2047 * Use this together with wait_for_map: this is a pre-check to avoid
2048 * allocating a Context for wait_for_map if we can see that we
2049 * definitely already have the epoch.
2051 * This does *not* replace the need to handle the return value of
2052 * wait_for_map: just because we don't have it in this pre-check
2053 * doesn't mean we won't have it when calling back into wait_for_map,
2054 * since the objecter lock is dropped in between.
2056 bool Objecter::have_map(const epoch_t epoch
)
2058 shared_lock
rl(rwlock
);
2059 if (osdmap
->get_epoch() >= epoch
) {
2066 void Objecter::_kick_requests(OSDSession
*session
,
2067 map
<uint64_t, LingerOp
*>& lresend
)
2069 // rwlock is locked unique
2072 session
->backoffs
.clear();
2073 session
->backoffs_by_id
.clear();
2076 map
<ceph_tid_t
,Op
*> resend
; // resend in tid order
2077 for (auto p
= session
->ops
.begin(); p
!= session
->ops
.end();) {
2080 if (op
->should_resend
) {
2081 if (!op
->target
.paused
)
2082 resend
[op
->tid
] = op
;
2084 _op_cancel_map_check(op
);
2085 _cancel_linger_op(op
);
2089 logger
->inc(l_osdc_op_resend
, resend
.size());
2090 while (!resend
.empty()) {
2091 _send_op(resend
.begin()->second
);
2092 resend
.erase(resend
.begin());
2096 logger
->inc(l_osdc_linger_resend
, session
->linger_ops
.size());
2097 for (auto j
= session
->linger_ops
.begin();
2098 j
!= session
->linger_ops
.end(); ++j
) {
2099 LingerOp
*op
= j
->second
;
2101 ceph_assert(lresend
.count(j
->first
) == 0);
2102 lresend
[j
->first
] = op
;
2106 logger
->inc(l_osdc_command_resend
, session
->command_ops
.size());
2107 map
<uint64_t,CommandOp
*> cresend
; // resend in order
2108 for (auto k
= session
->command_ops
.begin();
2109 k
!= session
->command_ops
.end(); ++k
) {
2110 cresend
[k
->first
] = k
->second
;
2112 while (!cresend
.empty()) {
2113 _send_command(cresend
.begin()->second
);
2114 cresend
.erase(cresend
.begin());
2118 void Objecter::_linger_ops_resend(map
<uint64_t, LingerOp
*>& lresend
,
2119 unique_lock
<ceph::shared_mutex
>& ul
)
2121 ceph_assert(ul
.owns_lock());
2122 shunique_lock
sul(std::move(ul
));
2123 while (!lresend
.empty()) {
2124 LingerOp
*op
= lresend
.begin()->second
;
2125 if (!op
->canceled
) {
2126 _send_linger(op
, sul
);
2129 lresend
.erase(lresend
.begin());
2131 ul
= sul
.release_to_unique();
2134 void Objecter::start_tick()
2136 ceph_assert(tick_event
== 0);
2138 timer
.add_event(ceph::make_timespan(cct
->_conf
->objecter_tick_interval
),
2139 &Objecter::tick
, this);
2142 void Objecter::tick()
2144 shared_lock
rl(rwlock
);
2146 ldout(cct
, 10) << "tick" << dendl
;
2148 // we are only called by C_Tick
2152 // we raced with shutdown
2153 ldout(cct
, 10) << __func__
<< " raced with shutdown" << dendl
;
2157 set
<OSDSession
*> toping
;
2160 // look for laggy requests
2161 auto cutoff
= ceph::coarse_mono_clock::now();
2162 cutoff
-= ceph::make_timespan(cct
->_conf
->objecter_timeout
); // timeout
2164 unsigned laggy_ops
= 0;
2166 for (auto siter
= osd_sessions
.begin();
2167 siter
!= osd_sessions
.end(); ++siter
) {
2168 auto s
= siter
->second
;
2169 scoped_lock
l(s
->lock
);
2171 for (auto p
= s
->ops
.begin(); p
!= s
->ops
.end(); ++p
) {
2172 auto op
= p
->second
;
2173 ceph_assert(op
->session
);
2174 if (op
->stamp
< cutoff
) {
2175 ldout(cct
, 2) << " tid " << p
->first
<< " on osd." << op
->session
->osd
2176 << " is laggy" << dendl
;
2181 for (auto p
= s
->linger_ops
.begin();
2182 p
!= s
->linger_ops
.end();
2184 auto op
= p
->second
;
2185 std::unique_lock
wl(op
->watch_lock
);
2186 ceph_assert(op
->session
);
2187 ldout(cct
, 10) << " pinging osd that serves lingering tid " << p
->first
2188 << " (osd." << op
->session
->osd
<< ")" << dendl
;
2190 if (op
->is_watch
&& op
->registered
&& !op
->last_error
)
2191 _send_linger_ping(op
);
2193 for (auto p
= s
->command_ops
.begin();
2194 p
!= s
->command_ops
.end();
2196 auto op
= p
->second
;
2197 ceph_assert(op
->session
);
2198 ldout(cct
, 10) << " pinging osd that serves command tid " << p
->first
2199 << " (osd." << op
->session
->osd
<< ")" << dendl
;
2205 if (num_homeless_ops
|| !toping
.empty()) {
2206 _maybe_request_map();
2209 logger
->set(l_osdc_op_laggy
, laggy_ops
);
2210 logger
->set(l_osdc_osd_laggy
, toping
.size());
2212 if (!toping
.empty()) {
2213 // send a ping to these osds, to ensure we detect any session resets
2214 // (osd reply message policy is lossy)
2215 for (auto i
= toping
.begin(); i
!= toping
.end(); ++i
) {
2216 (*i
)->con
->send_message(new MPing
);
2220 // Make sure we don't reschedule if we wake up after shutdown
2222 tick_event
= timer
.reschedule_me(ceph::make_timespan(
2223 cct
->_conf
->objecter_tick_interval
));
2227 void Objecter::resend_mon_ops()
2229 unique_lock
wl(rwlock
);
2231 ldout(cct
, 10) << "resend_mon_ops" << dendl
;
2233 for (auto p
= poolstat_ops
.begin(); p
!= poolstat_ops
.end(); ++p
) {
2234 _poolstat_submit(p
->second
);
2235 logger
->inc(l_osdc_poolstat_resend
);
2238 for (auto p
= statfs_ops
.begin(); p
!= statfs_ops
.end(); ++p
) {
2239 _fs_stats_submit(p
->second
);
2240 logger
->inc(l_osdc_statfs_resend
);
2243 for (auto p
= pool_ops
.begin(); p
!= pool_ops
.end(); ++p
) {
2244 _pool_op_submit(p
->second
);
2245 logger
->inc(l_osdc_poolop_resend
);
2248 for (auto p
= check_latest_map_ops
.begin();
2249 p
!= check_latest_map_ops
.end();
2251 monc
->get_version("osdmap", CB_Op_Map_Latest(this, p
->second
->tid
));
2254 for (auto p
= check_latest_map_lingers
.begin();
2255 p
!= check_latest_map_lingers
.end();
2257 monc
->get_version("osdmap", CB_Linger_Map_Latest(this, p
->second
->linger_id
));
2260 for (auto p
= check_latest_map_commands
.begin();
2261 p
!= check_latest_map_commands
.end();
2263 monc
->get_version("osdmap", CB_Command_Map_Latest(this, p
->second
->tid
));
2267 // read | write ---------------------------
2269 void Objecter::op_submit(Op
*op
, ceph_tid_t
*ptid
, int *ctx_budget
)
2271 shunique_lock
rl(rwlock
, ceph::acquire_shared
);
2275 op
->trace
.event("op submit");
2276 _op_submit_with_budget(op
, rl
, ptid
, ctx_budget
);
2279 void Objecter::_op_submit_with_budget(Op
*op
,
2280 shunique_lock
<ceph::shared_mutex
>& sul
,
2284 ceph_assert(initialized
);
2286 ceph_assert(op
->ops
.size() == op
->out_bl
.size());
2287 ceph_assert(op
->ops
.size() == op
->out_rval
.size());
2288 ceph_assert(op
->ops
.size() == op
->out_handler
.size());
2290 // throttle. before we look at any state, because
2291 // _take_op_budget() may drop our lock while it blocks.
2292 if (!op
->ctx_budgeted
|| (ctx_budget
&& (*ctx_budget
== -1))) {
2293 int op_budget
= _take_op_budget(op
, sul
);
2294 // take and pass out the budget for the first OP
2295 // in the context session
2296 if (ctx_budget
&& (*ctx_budget
== -1)) {
2297 *ctx_budget
= op_budget
;
2301 if (osd_timeout
> timespan(0)) {
2303 op
->tid
= ++last_tid
;
2305 op
->ontimeout
= timer
.add_event(osd_timeout
,
2307 op_cancel(tid
, -ETIMEDOUT
); });
2310 _op_submit(op
, sul
, ptid
);
2313 void Objecter::_send_op_account(Op
*op
)
2317 // add to gather set(s)
2318 if (op
->has_completion()) {
2321 ldout(cct
, 20) << " note: not requesting reply" << dendl
;
2324 logger
->inc(l_osdc_op_active
);
2325 logger
->inc(l_osdc_op
);
2326 logger
->inc(l_osdc_oplen_avg
, op
->ops
.size());
2328 if ((op
->target
.flags
& (CEPH_OSD_FLAG_READ
| CEPH_OSD_FLAG_WRITE
)) ==
2329 (CEPH_OSD_FLAG_READ
|CEPH_OSD_FLAG_WRITE
))
2330 logger
->inc(l_osdc_op_rmw
);
2331 else if (op
->target
.flags
& CEPH_OSD_FLAG_WRITE
)
2332 logger
->inc(l_osdc_op_w
);
2333 else if (op
->target
.flags
& CEPH_OSD_FLAG_READ
)
2334 logger
->inc(l_osdc_op_r
);
2336 if (op
->target
.flags
& CEPH_OSD_FLAG_PGOP
)
2337 logger
->inc(l_osdc_op_pg
);
2339 for (auto p
= op
->ops
.begin(); p
!= op
->ops
.end(); ++p
) {
2340 int code
= l_osdc_osdop_other
;
2342 case CEPH_OSD_OP_STAT
: code
= l_osdc_osdop_stat
; break;
2343 case CEPH_OSD_OP_CREATE
: code
= l_osdc_osdop_create
; break;
2344 case CEPH_OSD_OP_READ
: code
= l_osdc_osdop_read
; break;
2345 case CEPH_OSD_OP_WRITE
: code
= l_osdc_osdop_write
; break;
2346 case CEPH_OSD_OP_WRITEFULL
: code
= l_osdc_osdop_writefull
; break;
2347 case CEPH_OSD_OP_WRITESAME
: code
= l_osdc_osdop_writesame
; break;
2348 case CEPH_OSD_OP_APPEND
: code
= l_osdc_osdop_append
; break;
2349 case CEPH_OSD_OP_ZERO
: code
= l_osdc_osdop_zero
; break;
2350 case CEPH_OSD_OP_TRUNCATE
: code
= l_osdc_osdop_truncate
; break;
2351 case CEPH_OSD_OP_DELETE
: code
= l_osdc_osdop_delete
; break;
2352 case CEPH_OSD_OP_MAPEXT
: code
= l_osdc_osdop_mapext
; break;
2353 case CEPH_OSD_OP_SPARSE_READ
: code
= l_osdc_osdop_sparse_read
; break;
2354 case CEPH_OSD_OP_GETXATTR
: code
= l_osdc_osdop_getxattr
; break;
2355 case CEPH_OSD_OP_SETXATTR
: code
= l_osdc_osdop_setxattr
; break;
2356 case CEPH_OSD_OP_CMPXATTR
: code
= l_osdc_osdop_cmpxattr
; break;
2357 case CEPH_OSD_OP_RMXATTR
: code
= l_osdc_osdop_rmxattr
; break;
2358 case CEPH_OSD_OP_RESETXATTRS
: code
= l_osdc_osdop_resetxattrs
; break;
2360 // OMAP read operations
2361 case CEPH_OSD_OP_OMAPGETVALS
:
2362 case CEPH_OSD_OP_OMAPGETKEYS
:
2363 case CEPH_OSD_OP_OMAPGETHEADER
:
2364 case CEPH_OSD_OP_OMAPGETVALSBYKEYS
:
2365 case CEPH_OSD_OP_OMAP_CMP
: code
= l_osdc_osdop_omap_rd
; break;
2367 // OMAP write operations
2368 case CEPH_OSD_OP_OMAPSETVALS
:
2369 case CEPH_OSD_OP_OMAPSETHEADER
: code
= l_osdc_osdop_omap_wr
; break;
2371 // OMAP del operations
2372 case CEPH_OSD_OP_OMAPCLEAR
:
2373 case CEPH_OSD_OP_OMAPRMKEYS
: code
= l_osdc_osdop_omap_del
; break;
2375 case CEPH_OSD_OP_CALL
: code
= l_osdc_osdop_call
; break;
2376 case CEPH_OSD_OP_WATCH
: code
= l_osdc_osdop_watch
; break;
2377 case CEPH_OSD_OP_NOTIFY
: code
= l_osdc_osdop_notify
; break;
2384 void Objecter::_op_submit(Op
*op
, shunique_lock
<ceph::shared_mutex
>& sul
, ceph_tid_t
*ptid
)
2388 ldout(cct
, 10) << __func__
<< " op " << op
<< dendl
;
2391 ceph_assert(op
->session
== NULL
);
2392 OSDSession
*s
= NULL
;
2394 bool check_for_latest_map
= false;
2395 int r
= _calc_target(&op
->target
, nullptr);
2397 case RECALC_OP_TARGET_POOL_DNE
:
2398 check_for_latest_map
= true;
2400 case RECALC_OP_TARGET_POOL_EIO
:
2401 if (op
->has_completion()) {
2402 op
->complete(osdc_errc::pool_eio
, -EIO
);
2407 // Try to get a session, including a retry if we need to take write lock
2408 r
= _get_session(op
->target
.osd
, &s
, sul
);
2410 (check_for_latest_map
&& sul
.owns_lock_shared()) ||
2411 cct
->_conf
->objecter_debug_inject_relock_delay
) {
2412 epoch_t orig_epoch
= osdmap
->get_epoch();
2414 if (cct
->_conf
->objecter_debug_inject_relock_delay
) {
2418 if (orig_epoch
!= osdmap
->get_epoch()) {
2419 // map changed; recalculate mapping
2420 ldout(cct
, 10) << __func__
<< " relock raced with osdmap, recalc target"
2422 check_for_latest_map
= _calc_target(&op
->target
, nullptr)
2423 == RECALC_OP_TARGET_POOL_DNE
;
2432 ceph_assert(s
== NULL
);
2433 r
= _get_session(op
->target
.osd
, &s
, sul
);
2435 ceph_assert(r
== 0);
2436 ceph_assert(s
); // may be homeless
2438 _send_op_account(op
);
2442 ceph_assert(op
->target
.flags
& (CEPH_OSD_FLAG_READ
|CEPH_OSD_FLAG_WRITE
));
2444 bool need_send
= false;
2445 if (op
->target
.paused
) {
2446 ldout(cct
, 10) << " tid " << op
->tid
<< " op " << op
<< " is paused"
2448 _maybe_request_map();
2449 } else if (!s
->is_homeless()) {
2452 _maybe_request_map();
2455 unique_lock
sl(s
->lock
);
2457 op
->tid
= ++last_tid
;
2459 ldout(cct
, 10) << "_op_submit oid " << op
->target
.base_oid
2460 << " '" << op
->target
.base_oloc
<< "' '"
2461 << op
->target
.target_oloc
<< "' " << op
->ops
<< " tid "
2462 << op
->tid
<< " osd." << (!s
->is_homeless() ? s
->osd
: -1)
2465 _session_op_assign(s
, op
);
2471 // Last chance to touch Op here, after giving up session lock it can
2472 // be freed at any time by response handler.
2473 ceph_tid_t tid
= op
->tid
;
2474 if (check_for_latest_map
) {
2475 _send_op_map_check(op
);
2484 ldout(cct
, 5) << num_in_flight
<< " in flight" << dendl
;
2487 int Objecter::op_cancel(OSDSession
*s
, ceph_tid_t tid
, int r
)
2489 ceph_assert(initialized
);
2491 unique_lock
sl(s
->lock
);
2493 auto p
= s
->ops
.find(tid
);
2494 if (p
== s
->ops
.end()) {
2495 ldout(cct
, 10) << __func__
<< " tid " << tid
<< " dne in session "
2502 ldout(cct
, 20) << " revoking rx ceph::buffer for " << tid
2503 << " on " << s
->con
<< dendl
;
2504 s
->con
->revoke_rx_buffer(tid
);
2508 ldout(cct
, 10) << __func__
<< " tid " << tid
<< " in session " << s
->osd
2511 if (op
->has_completion()) {
2513 op
->complete(osdcode(r
), r
);
2515 _op_cancel_map_check(op
);
2522 int Objecter::op_cancel(ceph_tid_t tid
, int r
)
2526 unique_lock
wl(rwlock
);
2527 ret
= _op_cancel(tid
, r
);
2532 int Objecter::op_cancel(const vector
<ceph_tid_t
>& tids
, int r
)
2534 unique_lock
wl(rwlock
);
2535 ldout(cct
,10) << __func__
<< " " << tids
<< dendl
;
2536 for (auto tid
: tids
) {
2542 int Objecter::_op_cancel(ceph_tid_t tid
, int r
)
2546 ldout(cct
, 5) << __func__
<< ": cancelling tid " << tid
<< " r=" << r
2551 for (auto siter
= osd_sessions
.begin();
2552 siter
!= osd_sessions
.end(); ++siter
) {
2553 OSDSession
*s
= siter
->second
;
2554 shared_lock
sl(s
->lock
);
2555 if (s
->ops
.find(tid
) != s
->ops
.end()) {
2557 ret
= op_cancel(s
, tid
, r
);
2558 if (ret
== -ENOENT
) {
2559 /* oh no! raced, maybe tid moved to another session, restarting */
2566 ldout(cct
, 5) << __func__
<< ": tid " << tid
2567 << " not found in live sessions" << dendl
;
2569 // Handle case where the op is in homeless session
2570 shared_lock
sl(homeless_session
->lock
);
2571 if (homeless_session
->ops
.find(tid
) != homeless_session
->ops
.end()) {
2573 ret
= op_cancel(homeless_session
, tid
, r
);
2574 if (ret
== -ENOENT
) {
2575 /* oh no! raced, maybe tid moved to another session, restarting */
2584 ldout(cct
, 5) << __func__
<< ": tid " << tid
2585 << " not found in homeless session" << dendl
;
2591 epoch_t
Objecter::op_cancel_writes(int r
, int64_t pool
)
2593 unique_lock
wl(rwlock
);
2595 std::vector
<ceph_tid_t
> to_cancel
;
2598 for (auto siter
= osd_sessions
.begin();
2599 siter
!= osd_sessions
.end(); ++siter
) {
2600 OSDSession
*s
= siter
->second
;
2601 shared_lock
sl(s
->lock
);
2602 for (auto op_i
= s
->ops
.begin();
2603 op_i
!= s
->ops
.end(); ++op_i
) {
2604 if (op_i
->second
->target
.flags
& CEPH_OSD_FLAG_WRITE
2605 && (pool
== -1 || op_i
->second
->target
.target_oloc
.pool
== pool
)) {
2606 to_cancel
.push_back(op_i
->first
);
2611 for (auto titer
= to_cancel
.begin(); titer
!= to_cancel
.end(); ++titer
) {
2612 int cancel_result
= op_cancel(s
, *titer
, r
);
2613 // We hold rwlock across search and cancellation, so cancels
2614 // should always succeed
2615 ceph_assert(cancel_result
== 0);
2617 if (!found
&& to_cancel
.size())
2622 const epoch_t epoch
= osdmap
->get_epoch();
2633 bool Objecter::is_pg_changed(
2635 const vector
<int>& oldacting
,
2637 const vector
<int>& newacting
,
2640 if (OSDMap::primary_changed_broken( // https://tracker.ceph.com/issues/43213
2646 if (any_change
&& oldacting
!= newacting
)
2648 return false; // same primary (tho replicas may have changed)
2651 bool Objecter::target_should_be_paused(op_target_t
*t
)
2653 const pg_pool_t
*pi
= osdmap
->get_pg_pool(t
->base_oloc
.pool
);
2654 bool pauserd
= osdmap
->test_flag(CEPH_OSDMAP_PAUSERD
);
2655 bool pausewr
= osdmap
->test_flag(CEPH_OSDMAP_PAUSEWR
) ||
2656 (t
->respects_full() && (_osdmap_full_flag() || _osdmap_pool_full(*pi
)));
2658 return (t
->flags
& CEPH_OSD_FLAG_READ
&& pauserd
) ||
2659 (t
->flags
& CEPH_OSD_FLAG_WRITE
&& pausewr
) ||
2660 (osdmap
->get_epoch() < epoch_barrier
);
2664 * Locking public accessor for _osdmap_full_flag
2666 bool Objecter::osdmap_full_flag() const
2668 shared_lock
rl(rwlock
);
2670 return _osdmap_full_flag();
2673 bool Objecter::osdmap_pool_full(const int64_t pool_id
) const
2675 shared_lock
rl(rwlock
);
2677 if (_osdmap_full_flag()) {
2681 return _osdmap_pool_full(pool_id
);
2684 bool Objecter::_osdmap_pool_full(const int64_t pool_id
) const
2686 const pg_pool_t
*pool
= osdmap
->get_pg_pool(pool_id
);
2688 ldout(cct
, 4) << __func__
<< ": DNE pool " << pool_id
<< dendl
;
2692 return _osdmap_pool_full(*pool
);
2695 bool Objecter::_osdmap_has_pool_full() const
2697 for (auto it
= osdmap
->get_pools().begin();
2698 it
!= osdmap
->get_pools().end(); ++it
) {
2699 if (_osdmap_pool_full(it
->second
))
2706 * Wrapper around osdmap->test_flag for special handling of the FULL flag.
2708 bool Objecter::_osdmap_full_flag() const
2710 // Ignore the FULL flag if the caller does not have honor_osdmap_full
2711 return osdmap
->test_flag(CEPH_OSDMAP_FULL
) && honor_pool_full
;
2714 void Objecter::update_pool_full_map(map
<int64_t, bool>& pool_full_map
)
2716 for (map
<int64_t, pg_pool_t
>::const_iterator it
2717 = osdmap
->get_pools().begin();
2718 it
!= osdmap
->get_pools().end(); ++it
) {
2719 if (pool_full_map
.find(it
->first
) == pool_full_map
.end()) {
2720 pool_full_map
[it
->first
] = _osdmap_pool_full(it
->second
);
2722 pool_full_map
[it
->first
] = _osdmap_pool_full(it
->second
) ||
2723 pool_full_map
[it
->first
];
2728 int64_t Objecter::get_object_hash_position(int64_t pool
, const string
& key
,
2731 shared_lock
rl(rwlock
);
2732 const pg_pool_t
*p
= osdmap
->get_pg_pool(pool
);
2735 return p
->hash_key(key
, ns
);
2738 int64_t Objecter::get_object_pg_hash_position(int64_t pool
, const string
& key
,
2741 shared_lock
rl(rwlock
);
2742 const pg_pool_t
*p
= osdmap
->get_pg_pool(pool
);
2745 return p
->raw_hash_to_pg(p
->hash_key(key
, ns
));
2748 void Objecter::_prune_snapc(
2749 const mempool::osdmap::map
<int64_t,
2750 snap_interval_set_t
>& new_removed_snaps
,
2754 auto i
= new_removed_snaps
.find(op
->target
.base_pgid
.pool());
2755 if (i
!= new_removed_snaps
.end()) {
2756 for (auto s
: op
->snapc
.snaps
) {
2757 if (i
->second
.contains(s
)) {
2763 vector
<snapid_t
> new_snaps
;
2764 for (auto s
: op
->snapc
.snaps
) {
2765 if (!i
->second
.contains(s
)) {
2766 new_snaps
.push_back(s
);
2769 op
->snapc
.snaps
.swap(new_snaps
);
2770 ldout(cct
,10) << __func__
<< " op " << op
->tid
<< " snapc " << op
->snapc
2771 << " (was " << new_snaps
<< ")" << dendl
;
2776 int Objecter::_calc_target(op_target_t
*t
, Connection
*con
, bool any_change
)
2779 bool is_read
= t
->flags
& CEPH_OSD_FLAG_READ
;
2780 bool is_write
= t
->flags
& CEPH_OSD_FLAG_WRITE
;
2781 t
->epoch
= osdmap
->get_epoch();
2782 ldout(cct
,20) << __func__
<< " epoch " << t
->epoch
2783 << " base " << t
->base_oid
<< " " << t
->base_oloc
2784 << " precalc_pgid " << (int)t
->precalc_pgid
2785 << " pgid " << t
->base_pgid
2786 << (is_read
? " is_read" : "")
2787 << (is_write
? " is_write" : "")
2790 const pg_pool_t
*pi
= osdmap
->get_pg_pool(t
->base_oloc
.pool
);
2793 return RECALC_OP_TARGET_POOL_DNE
;
2796 if (pi
->has_flag(pg_pool_t::FLAG_EIO
)) {
2797 return RECALC_OP_TARGET_POOL_EIO
;
2800 ldout(cct
,30) << __func__
<< " base pi " << pi
2801 << " pg_num " << pi
->get_pg_num() << dendl
;
2803 bool force_resend
= false;
2804 if (osdmap
->get_epoch() == pi
->last_force_op_resend
) {
2805 if (t
->last_force_resend
< pi
->last_force_op_resend
) {
2806 t
->last_force_resend
= pi
->last_force_op_resend
;
2807 force_resend
= true;
2808 } else if (t
->last_force_resend
== 0) {
2809 force_resend
= true;
2814 t
->target_oid
= t
->base_oid
;
2815 t
->target_oloc
= t
->base_oloc
;
2816 if ((t
->flags
& CEPH_OSD_FLAG_IGNORE_OVERLAY
) == 0) {
2817 if (is_read
&& pi
->has_read_tier())
2818 t
->target_oloc
.pool
= pi
->read_tier
;
2819 if (is_write
&& pi
->has_write_tier())
2820 t
->target_oloc
.pool
= pi
->write_tier
;
2821 pi
= osdmap
->get_pg_pool(t
->target_oloc
.pool
);
2824 return RECALC_OP_TARGET_POOL_DNE
;
2829 if (t
->precalc_pgid
) {
2830 ceph_assert(t
->flags
& CEPH_OSD_FLAG_IGNORE_OVERLAY
);
2831 ceph_assert(t
->base_oid
.name
.empty()); // make sure this is a pg op
2832 ceph_assert(t
->base_oloc
.pool
== (int64_t)t
->base_pgid
.pool());
2833 pgid
= t
->base_pgid
;
2835 int ret
= osdmap
->object_locator_to_pg(t
->target_oid
, t
->target_oloc
,
2837 if (ret
== -ENOENT
) {
2839 return RECALC_OP_TARGET_POOL_DNE
;
2842 ldout(cct
,20) << __func__
<< " target " << t
->target_oid
<< " "
2843 << t
->target_oloc
<< " -> pgid " << pgid
<< dendl
;
2844 ldout(cct
,30) << __func__
<< " target pi " << pi
2845 << " pg_num " << pi
->get_pg_num() << dendl
;
2846 t
->pool_ever_existed
= true;
2848 int size
= pi
->size
;
2849 int min_size
= pi
->min_size
;
2850 unsigned pg_num
= pi
->get_pg_num();
2851 unsigned pg_num_mask
= pi
->get_pg_num_mask();
2852 unsigned pg_num_pending
= pi
->get_pg_num_pending();
2853 int up_primary
, acting_primary
;
2854 vector
<int> up
, acting
;
2855 ps_t actual_ps
= ceph_stable_mod(pgid
.ps(), pg_num
, pg_num_mask
);
2856 pg_t
actual_pgid(actual_ps
, pgid
.pool());
2857 if (!lookup_pg_mapping(actual_pgid
, osdmap
->get_epoch(), &up
, &up_primary
,
2858 &acting
, &acting_primary
)) {
2859 osdmap
->pg_to_up_acting_osds(actual_pgid
, &up
, &up_primary
,
2860 &acting
, &acting_primary
);
2861 pg_mapping_t
pg_mapping(osdmap
->get_epoch(),
2862 up
, up_primary
, acting
, acting_primary
);
2863 update_pg_mapping(actual_pgid
, std::move(pg_mapping
));
2865 bool sort_bitwise
= osdmap
->test_flag(CEPH_OSDMAP_SORTBITWISE
);
2866 bool recovery_deletes
= osdmap
->test_flag(CEPH_OSDMAP_RECOVERY_DELETES
);
2867 unsigned prev_seed
= ceph_stable_mod(pgid
.ps(), t
->pg_num
, t
->pg_num_mask
);
2868 pg_t
prev_pgid(prev_seed
, pgid
.pool());
2869 if (any_change
&& PastIntervals::is_new_interval(
2888 t
->recovery_deletes
,
2890 t
->peering_crush_bucket_count
,
2891 pi
->peering_crush_bucket_count
,
2892 t
->peering_crush_bucket_target
,
2893 pi
->peering_crush_bucket_target
,
2894 t
->peering_crush_bucket_barrier
,
2895 pi
->peering_crush_bucket_barrier
,
2896 t
->peering_crush_mandatory_member
,
2897 pi
->peering_crush_mandatory_member
,
2899 force_resend
= true;
2902 bool unpaused
= false;
2903 bool should_be_paused
= target_should_be_paused(t
);
2904 if (t
->paused
&& !should_be_paused
) {
2907 if (t
->paused
!= should_be_paused
) {
2908 ldout(cct
, 10) << __func__
<< " paused " << t
->paused
2909 << " -> " << should_be_paused
<< dendl
;
2910 t
->paused
= should_be_paused
;
2913 bool legacy_change
=
2916 t
->acting_primary
, t
->acting
, acting_primary
, acting
,
2917 t
->used_replica
|| any_change
);
2918 bool split_or_merge
= false;
2921 prev_pgid
.is_split(t
->pg_num
, pg_num
, nullptr) ||
2922 prev_pgid
.is_merge_source(t
->pg_num
, pg_num
, nullptr) ||
2923 prev_pgid
.is_merge_target(t
->pg_num
, pg_num
);
2926 if (legacy_change
|| split_or_merge
|| force_resend
) {
2928 t
->acting
= std::move(acting
);
2929 t
->acting_primary
= acting_primary
;
2930 t
->up_primary
= up_primary
;
2931 t
->up
= std::move(up
);
2933 t
->min_size
= min_size
;
2935 t
->pg_num_mask
= pg_num_mask
;
2936 t
->pg_num_pending
= pg_num_pending
;
2937 spg_t
spgid(actual_pgid
);
2938 if (pi
->is_erasure()) {
2939 for (uint8_t i
= 0; i
< t
->acting
.size(); ++i
) {
2940 if (t
->acting
[i
] == acting_primary
) {
2941 spgid
.reset_shard(shard_id_t(i
));
2946 t
->actual_pgid
= spgid
;
2947 t
->sort_bitwise
= sort_bitwise
;
2948 t
->recovery_deletes
= recovery_deletes
;
2949 t
->peering_crush_bucket_count
= pi
->peering_crush_bucket_count
;
2950 t
->peering_crush_bucket_target
= pi
->peering_crush_bucket_target
;
2951 t
->peering_crush_bucket_barrier
= pi
->peering_crush_bucket_barrier
;
2952 t
->peering_crush_mandatory_member
= pi
->peering_crush_mandatory_member
;
2953 ldout(cct
, 10) << __func__
<< " "
2954 << " raw pgid " << pgid
<< " -> actual " << t
->actual_pgid
2955 << " acting " << t
->acting
2956 << " primary " << acting_primary
<< dendl
;
2957 t
->used_replica
= false;
2958 if ((t
->flags
& (CEPH_OSD_FLAG_BALANCE_READS
|
2959 CEPH_OSD_FLAG_LOCALIZE_READS
)) &&
2960 !is_write
&& pi
->is_replicated() && t
->acting
.size() > 1) {
2962 ceph_assert(is_read
&& t
->acting
[0] == acting_primary
);
2963 if (t
->flags
& CEPH_OSD_FLAG_BALANCE_READS
) {
2964 int p
= rand() % t
->acting
.size();
2966 t
->used_replica
= true;
2968 ldout(cct
, 10) << " chose random osd." << osd
<< " of " << t
->acting
2971 // look for a local replica. prefer the primary if the
2972 // distance is the same.
2974 int best_locality
= 0;
2975 for (unsigned i
= 0; i
< t
->acting
.size(); ++i
) {
2976 int locality
= osdmap
->crush
->get_common_ancestor_distance(
2977 cct
, t
->acting
[i
], crush_location
);
2978 ldout(cct
, 20) << __func__
<< " localize: rank " << i
2979 << " osd." << t
->acting
[i
]
2980 << " locality " << locality
<< dendl
;
2982 (locality
>= 0 && best_locality
>= 0 &&
2983 locality
< best_locality
) ||
2984 (best_locality
< 0 && locality
>= 0)) {
2986 best_locality
= locality
;
2988 t
->used_replica
= true;
2991 ceph_assert(best
>= 0);
2992 osd
= t
->acting
[best
];
2996 t
->osd
= acting_primary
;
2999 if (legacy_change
|| unpaused
|| force_resend
) {
3000 return RECALC_OP_TARGET_NEED_RESEND
;
3002 if (split_or_merge
&&
3003 (osdmap
->require_osd_release
>= ceph_release_t::luminous
||
3004 HAVE_FEATURE(osdmap
->get_xinfo(acting_primary
).features
,
3005 RESEND_ON_SPLIT
))) {
3006 return RECALC_OP_TARGET_NEED_RESEND
;
3008 return RECALC_OP_TARGET_NO_ACTION
;
3011 int Objecter::_map_session(op_target_t
*target
, OSDSession
**s
,
3012 shunique_lock
<ceph::shared_mutex
>& sul
)
3014 _calc_target(target
, nullptr);
3015 return _get_session(target
->osd
, s
, sul
);
3018 void Objecter::_session_op_assign(OSDSession
*to
, Op
*op
)
3020 // to->lock is locked
3021 ceph_assert(op
->session
== NULL
);
3022 ceph_assert(op
->tid
);
3026 to
->ops
[op
->tid
] = op
;
3028 if (to
->is_homeless()) {
3032 ldout(cct
, 15) << __func__
<< " " << to
->osd
<< " " << op
->tid
<< dendl
;
3035 void Objecter::_session_op_remove(OSDSession
*from
, Op
*op
)
3037 ceph_assert(op
->session
== from
);
3038 // from->lock is locked
3040 if (from
->is_homeless()) {
3044 from
->ops
.erase(op
->tid
);
3048 ldout(cct
, 15) << __func__
<< " " << from
->osd
<< " " << op
->tid
<< dendl
;
3051 void Objecter::_session_linger_op_assign(OSDSession
*to
, LingerOp
*op
)
3053 // to lock is locked unique
3054 ceph_assert(op
->session
== NULL
);
3056 if (to
->is_homeless()) {
3062 to
->linger_ops
[op
->linger_id
] = op
;
3064 ldout(cct
, 15) << __func__
<< " " << to
->osd
<< " " << op
->linger_id
3068 void Objecter::_session_linger_op_remove(OSDSession
*from
, LingerOp
*op
)
3070 ceph_assert(from
== op
->session
);
3071 // from->lock is locked unique
3073 if (from
->is_homeless()) {
3077 from
->linger_ops
.erase(op
->linger_id
);
3081 ldout(cct
, 15) << __func__
<< " " << from
->osd
<< " " << op
->linger_id
3085 void Objecter::_session_command_op_remove(OSDSession
*from
, CommandOp
*op
)
3087 ceph_assert(from
== op
->session
);
3088 // from->lock is locked
3090 if (from
->is_homeless()) {
3094 from
->command_ops
.erase(op
->tid
);
3098 ldout(cct
, 15) << __func__
<< " " << from
->osd
<< " " << op
->tid
<< dendl
;
3101 void Objecter::_session_command_op_assign(OSDSession
*to
, CommandOp
*op
)
3103 // to->lock is locked
3104 ceph_assert(op
->session
== NULL
);
3105 ceph_assert(op
->tid
);
3107 if (to
->is_homeless()) {
3113 to
->command_ops
[op
->tid
] = op
;
3115 ldout(cct
, 15) << __func__
<< " " << to
->osd
<< " " << op
->tid
<< dendl
;
3118 int Objecter::_recalc_linger_op_target(LingerOp
*linger_op
,
3119 shunique_lock
<ceph::shared_mutex
>& sul
)
3121 // rwlock is locked unique
3123 int r
= _calc_target(&linger_op
->target
, nullptr, true);
3124 if (r
== RECALC_OP_TARGET_NEED_RESEND
) {
3125 ldout(cct
, 10) << "recalc_linger_op_target tid " << linger_op
->linger_id
3126 << " pgid " << linger_op
->target
.pgid
3127 << " acting " << linger_op
->target
.acting
<< dendl
;
3129 OSDSession
*s
= NULL
;
3130 r
= _get_session(linger_op
->target
.osd
, &s
, sul
);
3131 ceph_assert(r
== 0);
3133 if (linger_op
->session
!= s
) {
3134 // NB locking two sessions (s and linger_op->session) at the
3135 // same time here is only safe because we are the only one that
3136 // takes two, and we are holding rwlock for write. We use
3137 // std::shared_mutex in OSDSession because lockdep doesn't know
3139 unique_lock
sl(s
->lock
);
3140 _session_linger_op_remove(linger_op
->session
, linger_op
);
3141 _session_linger_op_assign(s
, linger_op
);
3145 return RECALC_OP_TARGET_NEED_RESEND
;
3150 void Objecter::_cancel_linger_op(Op
*op
)
3152 ldout(cct
, 15) << "cancel_op " << op
->tid
<< dendl
;
3154 ceph_assert(!op
->should_resend
);
3155 if (op
->has_completion()) {
3156 op
->onfinish
= nullptr;
3163 void Objecter::_finish_op(Op
*op
, int r
)
3165 ldout(cct
, 15) << __func__
<< " " << op
->tid
<< dendl
;
3167 // op->session->lock is locked unique or op->session is null
3169 if (!op
->ctx_budgeted
&& op
->budget
>= 0) {
3170 put_op_budget_bytes(op
->budget
);
3174 if (op
->ontimeout
&& r
!= -ETIMEDOUT
)
3175 timer
.cancel_event(op
->ontimeout
);
3178 _session_op_remove(op
->session
, op
);
3181 logger
->dec(l_osdc_op_active
);
3183 ceph_assert(check_latest_map_ops
.find(op
->tid
) == check_latest_map_ops
.end());
3190 Objecter::MOSDOp
*Objecter::_prepare_osd_op(Op
*op
)
3194 int flags
= op
->target
.flags
;
3195 flags
|= CEPH_OSD_FLAG_KNOWN_REDIR
;
3196 flags
|= CEPH_OSD_FLAG_SUPPORTSPOOLEIO
;
3198 // Nothing checks this any longer, but needed for compatibility with
3199 // pre-luminous osds
3200 flags
|= CEPH_OSD_FLAG_ONDISK
;
3202 if (!honor_pool_full
)
3203 flags
|= CEPH_OSD_FLAG_FULL_FORCE
;
3205 op
->target
.paused
= false;
3206 op
->stamp
= ceph::coarse_mono_clock::now();
3208 hobject_t hobj
= op
->target
.get_hobj();
3209 auto m
= new MOSDOp(client_inc
, op
->tid
,
3210 hobj
, op
->target
.actual_pgid
,
3211 osdmap
->get_epoch(),
3212 flags
, op
->features
);
3214 m
->set_snapid(op
->snapid
);
3215 m
->set_snap_seq(op
->snapc
.seq
);
3216 m
->set_snaps(op
->snapc
.snaps
);
3219 m
->set_mtime(op
->mtime
);
3220 m
->set_retry_attempt(op
->attempts
++);
3222 if (!op
->trace
.valid() && cct
->_conf
->osdc_blkin_trace_all
) {
3223 op
->trace
.init("op", &trace_endpoint
);
3227 m
->set_priority(op
->priority
);
3229 m
->set_priority(cct
->_conf
->osd_client_op_priority
);
3231 if (op
->reqid
!= osd_reqid_t()) {
3232 m
->set_reqid(op
->reqid
);
3235 logger
->inc(l_osdc_op_send
);
3237 for (unsigned i
= 0; i
< m
->ops
.size(); i
++) {
3238 sum
+= m
->ops
[i
].indata
.length();
3240 logger
->inc(l_osdc_op_send_bytes
, sum
);
3245 void Objecter::_send_op(Op
*op
)
3248 // op->session->lock is locked
3251 auto p
= op
->session
->backoffs
.find(op
->target
.actual_pgid
);
3252 if (p
!= op
->session
->backoffs
.end()) {
3253 hobject_t hoid
= op
->target
.get_hobj();
3254 auto q
= p
->second
.lower_bound(hoid
);
3255 if (q
!= p
->second
.begin()) {
3257 if (hoid
>= q
->second
.end
) {
3261 if (q
!= p
->second
.end()) {
3262 ldout(cct
, 20) << __func__
<< " ? " << q
->first
<< " [" << q
->second
.begin
3263 << "," << q
->second
.end
<< ")" << dendl
;
3264 int r
= cmp(hoid
, q
->second
.begin
);
3265 if (r
== 0 || (r
> 0 && hoid
< q
->second
.end
)) {
3266 ldout(cct
, 10) << __func__
<< " backoff " << op
->target
.actual_pgid
3267 << " id " << q
->second
.id
<< " on " << hoid
3268 << ", queuing " << op
<< " tid " << op
->tid
<< dendl
;
3274 ceph_assert(op
->tid
> 0);
3275 MOSDOp
*m
= _prepare_osd_op(op
);
3277 if (op
->target
.actual_pgid
!= m
->get_spg()) {
3278 ldout(cct
, 10) << __func__
<< " " << op
->tid
<< " pgid change from "
3279 << m
->get_spg() << " to " << op
->target
.actual_pgid
3280 << ", updating and reencoding" << dendl
;
3281 m
->set_spg(op
->target
.actual_pgid
);
3282 m
->clear_payload(); // reencode
3285 ldout(cct
, 15) << "_send_op " << op
->tid
<< " to "
3286 << op
->target
.actual_pgid
<< " on osd." << op
->session
->osd
3289 ConnectionRef con
= op
->session
->con
;
3293 // preallocated rx ceph::buffer?
3295 ldout(cct
, 20) << " revoking rx ceph::buffer for " << op
->tid
<< " on "
3296 << op
->con
<< dendl
;
3297 op
->con
->revoke_rx_buffer(op
->tid
);
3300 op
->ontimeout
== 0 && // only post rx_buffer if no timeout; see #9582
3301 op
->outbl
->length()) {
3302 op
->outbl
->invalidate_crc(); // messenger writes through c_str()
3303 ldout(cct
, 20) << " posting rx ceph::buffer for " << op
->tid
<< " on " << con
3306 op
->con
->post_rx_buffer(op
->tid
, *op
->outbl
);
3310 op
->incarnation
= op
->session
->incarnation
;
3312 if (op
->trace
.valid()) {
3313 m
->trace
.init("op msg", nullptr, &op
->trace
);
3315 op
->session
->con
->send_message(m
);
3318 int Objecter::calc_op_budget(const bc::small_vector_base
<OSDOp
>& ops
)
3321 for (auto i
= ops
.begin(); i
!= ops
.end(); ++i
) {
3322 if (i
->op
.op
& CEPH_OSD_OP_MODE_WR
) {
3323 op_budget
+= i
->indata
.length();
3324 } else if (ceph_osd_op_mode_read(i
->op
.op
)) {
3325 if (ceph_osd_op_uses_extent(i
->op
.op
)) {
3326 if ((int64_t)i
->op
.extent
.length
> 0)
3327 op_budget
+= (int64_t)i
->op
.extent
.length
;
3328 } else if (ceph_osd_op_type_attr(i
->op
.op
)) {
3329 op_budget
+= i
->op
.xattr
.name_len
+ i
->op
.xattr
.value_len
;
3336 void Objecter::_throttle_op(Op
*op
,
3337 shunique_lock
<ceph::shared_mutex
>& sul
,
3340 ceph_assert(sul
&& sul
.mutex() == &rwlock
);
3341 bool locked_for_write
= sul
.owns_lock();
3344 op_budget
= calc_op_budget(op
->ops
);
3345 if (!op_throttle_bytes
.get_or_fail(op_budget
)) { //couldn't take right now
3347 op_throttle_bytes
.get(op_budget
);
3348 if (locked_for_write
)
3353 if (!op_throttle_ops
.get_or_fail(1)) { //couldn't take right now
3355 op_throttle_ops
.get(1);
3356 if (locked_for_write
)
3363 int Objecter::take_linger_budget(LingerOp
*info
)
3368 /* This function DOES put the passed message before returning */
3369 void Objecter::handle_osd_op_reply(MOSDOpReply
*m
)
3371 ldout(cct
, 10) << "in handle_osd_op_reply" << dendl
;
3374 ceph_tid_t tid
= m
->get_tid();
3376 shunique_lock
sul(rwlock
, ceph::acquire_shared
);
3382 ConnectionRef con
= m
->get_connection();
3383 auto priv
= con
->get_priv();
3384 auto s
= static_cast<OSDSession
*>(priv
.get());
3385 if (!s
|| s
->con
!= con
) {
3386 ldout(cct
, 7) << __func__
<< " no session on con " << con
<< dendl
;
3391 unique_lock
sl(s
->lock
);
3393 map
<ceph_tid_t
, Op
*>::iterator iter
= s
->ops
.find(tid
);
3394 if (iter
== s
->ops
.end()) {
3395 ldout(cct
, 7) << "handle_osd_op_reply " << tid
3396 << (m
->is_ondisk() ? " ondisk" : (m
->is_onnvram() ?
3397 " onnvram" : " ack"))
3398 << " ... stray" << dendl
;
3404 ldout(cct
, 7) << "handle_osd_op_reply " << tid
3405 << (m
->is_ondisk() ? " ondisk" :
3406 (m
->is_onnvram() ? " onnvram" : " ack"))
3407 << " uv " << m
->get_user_version()
3408 << " in " << m
->get_pg()
3409 << " attempt " << m
->get_retry_attempt()
3411 Op
*op
= iter
->second
;
3412 op
->trace
.event("osd op reply");
3414 if (retry_writes_after_first_reply
&& op
->attempts
== 1 &&
3415 (op
->target
.flags
& CEPH_OSD_FLAG_WRITE
)) {
3416 ldout(cct
, 7) << "retrying write after first reply: " << tid
<< dendl
;
3417 if (op
->has_completion()) {
3420 _session_op_remove(s
, op
);
3423 _op_submit(op
, sul
, NULL
);
3428 if (m
->get_retry_attempt() >= 0) {
3429 if (m
->get_retry_attempt() != (op
->attempts
- 1)) {
3430 ldout(cct
, 7) << " ignoring reply from attempt "
3431 << m
->get_retry_attempt()
3432 << " from " << m
->get_source_inst()
3433 << "; last attempt " << (op
->attempts
- 1) << " sent to "
3434 << op
->session
->con
->get_peer_addr() << dendl
;
3440 // we don't know the request attempt because the server is old, so
3441 // just accept this one. we may do ACK callbacks we shouldn't
3442 // have, but that is better than doing callbacks out of order.
3445 decltype(op
->onfinish
) onfinish
;
3447 int rc
= m
->get_result();
3449 if (m
->is_redirect_reply()) {
3450 ldout(cct
, 5) << " got redirect reply; redirecting" << dendl
;
3451 if (op
->has_completion())
3453 _session_op_remove(s
, op
);
3456 // FIXME: two redirects could race and reorder
3459 m
->get_redirect().combine_with_locator(op
->target
.target_oloc
,
3460 op
->target
.target_oid
.name
);
3461 op
->target
.flags
|= (CEPH_OSD_FLAG_REDIRECTED
|
3462 CEPH_OSD_FLAG_IGNORE_CACHE
|
3463 CEPH_OSD_FLAG_IGNORE_OVERLAY
);
3464 _op_submit(op
, sul
, NULL
);
3469 if (rc
== -EAGAIN
) {
3470 ldout(cct
, 7) << " got -EAGAIN, resubmitting" << dendl
;
3471 if (op
->has_completion())
3473 _session_op_remove(s
, op
);
3477 op
->target
.flags
&= ~(CEPH_OSD_FLAG_BALANCE_READS
|
3478 CEPH_OSD_FLAG_LOCALIZE_READS
);
3479 op
->target
.pgid
= pg_t();
3480 _op_submit(op
, sul
, NULL
);
3488 *op
->objver
= m
->get_user_version();
3489 if (op
->reply_epoch
)
3490 *op
->reply_epoch
= m
->get_map_epoch();
3491 if (op
->data_offset
)
3492 *op
->data_offset
= m
->get_header().data_off
;
3498 op
->con
->revoke_rx_buffer(op
->tid
);
3500 auto& bl
= m
->get_data();
3501 if (op
->outbl
->length() == bl
.length() &&
3502 bl
.get_num_buffers() <= 1) {
3503 // this is here to keep previous users to *relied* on getting data
3504 // read into existing buffers happy. Notably,
3505 // libradosstriper::RadosStriperImpl::aio_read().
3506 ldout(cct
,10) << __func__
<< " copying resulting " << bl
.length()
3507 << " into existing ceph::buffer of length " << op
->outbl
->length()
3510 t
= std::move(*op
->outbl
);
3511 t
.invalidate_crc(); // we're overwriting the raw buffers via c_str()
3512 bl
.begin().copy(bl
.length(), t
.c_str());
3513 op
->outbl
->substr_of(t
, 0, bl
.length());
3515 m
->claim_data(*op
->outbl
);
3520 // per-op result demuxing
3521 vector
<OSDOp
> out_ops
;
3522 m
->claim_ops(out_ops
);
3524 if (out_ops
.size() != op
->ops
.size())
3525 ldout(cct
, 0) << "WARNING: tid " << op
->tid
<< " reply ops " << out_ops
3526 << " != request ops " << op
->ops
3527 << " from " << m
->get_source_inst() << dendl
;
3529 ceph_assert(op
->ops
.size() == op
->out_bl
.size());
3530 ceph_assert(op
->ops
.size() == op
->out_rval
.size());
3531 ceph_assert(op
->ops
.size() == op
->out_ec
.size());
3532 ceph_assert(op
->ops
.size() == op
->out_handler
.size());
3533 auto pb
= op
->out_bl
.begin();
3534 auto pr
= op
->out_rval
.begin();
3535 auto pe
= op
->out_ec
.begin();
3536 auto ph
= op
->out_handler
.begin();
3537 ceph_assert(op
->out_bl
.size() == op
->out_rval
.size());
3538 ceph_assert(op
->out_bl
.size() == op
->out_handler
.size());
3539 auto p
= out_ops
.begin();
3540 for (unsigned i
= 0;
3541 p
!= out_ops
.end() && pb
!= op
->out_bl
.end();
3542 ++i
, ++p
, ++pb
, ++pr
, ++pe
, ++ph
) {
3543 ldout(cct
, 10) << " op " << i
<< " rval " << p
->rval
3544 << " len " << p
->outdata
.length() << dendl
;
3547 // set rval before running handlers so that handlers
3548 // can change it if e.g. decoding fails
3550 **pr
= ceph_to_hostos_errno(p
->rval
);
3552 **pe
= p
->rval
< 0 ? bs::error_code(-p
->rval
, osd_category()) :
3555 std::move((*ph
))(p
->rval
< 0 ?
3556 bs::error_code(-p
->rval
, osd_category()) :
3558 p
->rval
, p
->outdata
);
3562 // NOTE: we assume that since we only request ONDISK ever we will
3563 // only ever get back one (type of) ack ever.
3565 if (op
->has_completion()) {
3567 onfinish
= std::move(op
->onfinish
);
3568 op
->onfinish
= nullptr;
3570 logger
->inc(l_osdc_op_reply
);
3571 logger
->tinc(l_osdc_op_latency
, ceph::coarse_mono_time::clock::now() - op
->stamp
);
3572 logger
->set(l_osdc_op_inflight
, num_in_flight
);
3574 /* get it before we call _finish_op() */
3575 auto completion_lock
= s
->get_lock(op
->target
.base_oid
);
3577 ldout(cct
, 15) << "handle_osd_op_reply completed tid " << tid
<< dendl
;
3580 ldout(cct
, 5) << num_in_flight
<< " in flight" << dendl
;
3582 // serialize completions
3583 if (completion_lock
.mutex()) {
3584 completion_lock
.lock();
3589 if (Op::has_completion(onfinish
)) {
3590 Op::complete(std::move(onfinish
), osdcode(rc
), rc
);
3592 if (completion_lock
.mutex()) {
3593 completion_lock
.unlock();
3599 void Objecter::handle_osd_backoff(MOSDBackoff
*m
)
3601 ldout(cct
, 10) << __func__
<< " " << *m
<< dendl
;
3602 shunique_lock
sul(rwlock
, ceph::acquire_shared
);
3608 ConnectionRef con
= m
->get_connection();
3609 auto priv
= con
->get_priv();
3610 auto s
= static_cast<OSDSession
*>(priv
.get());
3611 if (!s
|| s
->con
!= con
) {
3612 ldout(cct
, 7) << __func__
<< " no session on con " << con
<< dendl
;
3619 unique_lock
sl(s
->lock
);
3622 case CEPH_OSD_BACKOFF_OP_BLOCK
:
3625 OSDBackoff
& b
= s
->backoffs
[m
->pgid
][m
->begin
];
3626 s
->backoffs_by_id
.insert(make_pair(m
->id
, &b
));
3632 // ack with original backoff's epoch so that the osd can discard this if
3633 // there was a pg split.
3634 auto r
= new MOSDBackoff(m
->pgid
, m
->map_epoch
,
3635 CEPH_OSD_BACKOFF_OP_ACK_BLOCK
,
3636 m
->id
, m
->begin
, m
->end
);
3637 // this priority must match the MOSDOps from _prepare_osd_op
3638 r
->set_priority(cct
->_conf
->osd_client_op_priority
);
3639 con
->send_message(r
);
3643 case CEPH_OSD_BACKOFF_OP_UNBLOCK
:
3645 auto p
= s
->backoffs_by_id
.find(m
->id
);
3646 if (p
!= s
->backoffs_by_id
.end()) {
3647 OSDBackoff
*b
= p
->second
;
3648 if (b
->begin
!= m
->begin
&&
3650 lderr(cct
) << __func__
<< " got " << m
->pgid
<< " id " << m
->id
3652 << m
->begin
<< "," << m
->end
<< ") but backoff is ["
3653 << b
->begin
<< "," << b
->end
<< ")" << dendl
;
3654 // hrmpf, unblock it anyway.
3656 ldout(cct
, 10) << __func__
<< " unblock backoff " << b
->pgid
3658 << " [" << b
->begin
<< "," << b
->end
3660 auto spgp
= s
->backoffs
.find(b
->pgid
);
3661 ceph_assert(spgp
!= s
->backoffs
.end());
3662 spgp
->second
.erase(b
->begin
);
3663 if (spgp
->second
.empty()) {
3664 s
->backoffs
.erase(spgp
);
3666 s
->backoffs_by_id
.erase(p
);
3668 // check for any ops to resend
3669 for (auto& q
: s
->ops
) {
3670 if (q
.second
->target
.actual_pgid
== m
->pgid
) {
3671 int r
= q
.second
->target
.contained_by(m
->begin
, m
->end
);
3672 ldout(cct
, 20) << __func__
<< " contained_by " << r
<< " on "
3673 << q
.second
->target
.get_hobj() << dendl
;
3680 lderr(cct
) << __func__
<< " " << m
->pgid
<< " id " << m
->id
3682 << m
->begin
<< "," << m
->end
<< ") but backoff dne" << dendl
;
3688 ldout(cct
, 10) << __func__
<< " unrecognized op " << (int)m
->op
<< dendl
;
3698 uint32_t Objecter::list_nobjects_seek(NListContext
*list_context
,
3701 shared_lock
rl(rwlock
);
3702 list_context
->pos
= hobject_t(object_t(), string(), CEPH_NOSNAP
,
3703 pos
, list_context
->pool_id
, string());
3704 ldout(cct
, 10) << __func__
<< " " << list_context
3705 << " pos " << pos
<< " -> " << list_context
->pos
<< dendl
;
3706 pg_t actual
= osdmap
->raw_pg_to_pg(pg_t(pos
, list_context
->pool_id
));
3707 list_context
->current_pg
= actual
.ps();
3708 list_context
->at_end_of_pool
= false;
3712 uint32_t Objecter::list_nobjects_seek(NListContext
*list_context
,
3713 const hobject_t
& cursor
)
3715 shared_lock
rl(rwlock
);
3716 ldout(cct
, 10) << "list_nobjects_seek " << list_context
<< dendl
;
3717 list_context
->pos
= cursor
;
3718 list_context
->at_end_of_pool
= false;
3719 pg_t actual
= osdmap
->raw_pg_to_pg(pg_t(cursor
.get_hash(), list_context
->pool_id
));
3720 list_context
->current_pg
= actual
.ps();
3721 list_context
->sort_bitwise
= true;
3722 return list_context
->current_pg
;
3725 void Objecter::list_nobjects_get_cursor(NListContext
*list_context
,
3728 shared_lock
rl(rwlock
);
3729 if (list_context
->list
.empty()) {
3730 *cursor
= list_context
->pos
;
3732 const librados::ListObjectImpl
& entry
= list_context
->list
.front();
3733 const string
*key
= (entry
.locator
.empty() ? &entry
.oid
: &entry
.locator
);
3734 uint32_t h
= osdmap
->get_pg_pool(list_context
->pool_id
)->hash_key(*key
, entry
.nspace
);
3735 *cursor
= hobject_t(entry
.oid
, entry
.locator
, list_context
->pool_snap_seq
, h
, list_context
->pool_id
, entry
.nspace
);
3739 void Objecter::list_nobjects(NListContext
*list_context
, Context
*onfinish
)
3741 ldout(cct
, 10) << __func__
<< " pool_id " << list_context
->pool_id
3742 << " pool_snap_seq " << list_context
->pool_snap_seq
3743 << " max_entries " << list_context
->max_entries
3744 << " list_context " << list_context
3745 << " onfinish " << onfinish
3746 << " current_pg " << list_context
->current_pg
3747 << " pos " << list_context
->pos
<< dendl
;
3749 shared_lock
rl(rwlock
);
3750 const pg_pool_t
*pool
= osdmap
->get_pg_pool(list_context
->pool_id
);
3751 if (!pool
) { // pool is gone
3753 put_nlist_context_budget(list_context
);
3754 onfinish
->complete(-ENOENT
);
3757 int pg_num
= pool
->get_pg_num();
3758 bool sort_bitwise
= osdmap
->test_flag(CEPH_OSDMAP_SORTBITWISE
);
3760 if (list_context
->pos
.is_min()) {
3761 list_context
->starting_pg_num
= 0;
3762 list_context
->sort_bitwise
= sort_bitwise
;
3763 list_context
->starting_pg_num
= pg_num
;
3765 if (list_context
->sort_bitwise
!= sort_bitwise
) {
3766 list_context
->pos
= hobject_t(
3767 object_t(), string(), CEPH_NOSNAP
,
3768 list_context
->current_pg
, list_context
->pool_id
, string());
3769 list_context
->sort_bitwise
= sort_bitwise
;
3770 ldout(cct
, 10) << " hobject sort order changed, restarting this pg at "
3771 << list_context
->pos
<< dendl
;
3773 if (list_context
->starting_pg_num
!= pg_num
) {
3774 if (!sort_bitwise
) {
3775 // start reading from the beginning; the pgs have changed
3776 ldout(cct
, 10) << " pg_num changed; restarting with " << pg_num
<< dendl
;
3777 list_context
->pos
= collection_list_handle_t();
3779 list_context
->starting_pg_num
= pg_num
;
3782 if (list_context
->pos
.is_max()) {
3783 ldout(cct
, 20) << __func__
<< " end of pool, list "
3784 << list_context
->list
<< dendl
;
3785 if (list_context
->list
.empty()) {
3786 list_context
->at_end_of_pool
= true;
3788 // release the listing context's budget once all
3789 // OPs (in the session) are finished
3790 put_nlist_context_budget(list_context
);
3791 onfinish
->complete(0);
3796 op
.pg_nls(list_context
->max_entries
, list_context
->filter
,
3797 list_context
->pos
, osdmap
->get_epoch());
3798 list_context
->bl
.clear();
3799 auto onack
= new C_NList(list_context
, onfinish
, this);
3800 object_locator_t
oloc(list_context
->pool_id
, list_context
->nspace
);
3802 // note current_pg in case we don't have (or lose) SORTBITWISE
3803 list_context
->current_pg
= pool
->raw_hash_to_pg(list_context
->pos
.get_hash());
3806 pg_read(list_context
->current_pg
, oloc
, op
,
3807 &list_context
->bl
, 0, onack
, &onack
->epoch
,
3808 &list_context
->ctx_budget
);
3811 void Objecter::_nlist_reply(NListContext
*list_context
, int r
,
3812 Context
*final_finish
, epoch_t reply_epoch
)
3814 ldout(cct
, 10) << __func__
<< " " << list_context
<< dendl
;
3816 auto iter
= list_context
->bl
.cbegin();
3817 pg_nls_response_t response
;
3818 decode(response
, iter
);
3820 // we do this as legacy.
3821 cb::list legacy_extra_info
;
3822 decode(legacy_extra_info
, iter
);
3825 // if the osd returns 1 (newer code), or handle MAX, it means we
3826 // hit the end of the pg.
3827 if ((response
.handle
.is_max() || r
== 1) &&
3828 !list_context
->sort_bitwise
) {
3829 // legacy OSD and !sortbitwise, figure out the next PG on our own
3830 ++list_context
->current_pg
;
3831 if (list_context
->current_pg
== list_context
->starting_pg_num
) {
3833 list_context
->pos
= hobject_t::get_max();
3836 list_context
->pos
= hobject_t(object_t(), string(), CEPH_NOSNAP
,
3837 list_context
->current_pg
,
3838 list_context
->pool_id
, string());
3841 list_context
->pos
= response
.handle
;
3844 int response_size
= response
.entries
.size();
3845 ldout(cct
, 20) << " response.entries.size " << response_size
3846 << ", response.entries " << response
.entries
3847 << ", handle " << response
.handle
3848 << ", tentative new pos " << list_context
->pos
<< dendl
;
3849 if (response_size
) {
3850 std::move(response
.entries
.begin(), response
.entries
.end(),
3851 std::back_inserter(list_context
->list
));
3852 response
.entries
.clear();
3855 if (list_context
->list
.size() >= list_context
->max_entries
) {
3856 ldout(cct
, 20) << " hit max, returning results so far, "
3857 << list_context
->list
<< dendl
;
3858 // release the listing context's budget once all
3859 // OPs (in the session) are finished
3860 put_nlist_context_budget(list_context
);
3861 final_finish
->complete(0);
3866 list_nobjects(list_context
, final_finish
);
3869 void Objecter::put_nlist_context_budget(NListContext
*list_context
)
3871 if (list_context
->ctx_budget
>= 0) {
3872 ldout(cct
, 10) << " release listing context's budget " <<
3873 list_context
->ctx_budget
<< dendl
;
3874 put_op_budget_bytes(list_context
->ctx_budget
);
3875 list_context
->ctx_budget
= -1;
3881 void Objecter::create_pool_snap(int64_t pool
, std::string_view snap_name
,
3882 decltype(PoolOp::onfinish
)&& onfinish
)
3884 unique_lock
wl(rwlock
);
3885 ldout(cct
, 10) << "create_pool_snap; pool: " << pool
<< "; snap: "
3886 << snap_name
<< dendl
;
3888 const pg_pool_t
*p
= osdmap
->get_pg_pool(pool
);
3890 onfinish
->defer(std::move(onfinish
), osdc_errc::pool_dne
, cb::list
{});
3893 if (p
->snap_exists(snap_name
)) {
3894 onfinish
->defer(std::move(onfinish
), osdc_errc::snapshot_exists
,
3899 auto op
= new PoolOp
;
3900 op
->tid
= ++last_tid
;
3902 op
->name
= snap_name
;
3903 op
->onfinish
= std::move(onfinish
);
3904 op
->pool_op
= POOL_OP_CREATE_SNAP
;
3905 pool_ops
[op
->tid
] = op
;
3910 struct CB_SelfmanagedSnap
{
3911 std::unique_ptr
<ca::Completion
<void(bs::error_code
, snapid_t
)>> fin
;
3912 CB_SelfmanagedSnap(decltype(fin
)&& fin
)
3913 : fin(std::move(fin
)) {}
3914 void operator()(bs::error_code ec
, const cb::list
& bl
) {
3915 snapid_t snapid
= 0;
3918 auto p
= bl
.cbegin();
3920 } catch (const cb::error
& e
) {
3924 fin
->defer(std::move(fin
), ec
, snapid
);
3928 void Objecter::allocate_selfmanaged_snap(
3930 std::unique_ptr
<ca::Completion
<void(bs::error_code
, snapid_t
)>> onfinish
)
3932 unique_lock
wl(rwlock
);
3933 ldout(cct
, 10) << "allocate_selfmanaged_snap; pool: " << pool
<< dendl
;
3934 auto op
= new PoolOp
;
3935 op
->tid
= ++last_tid
;
3937 op
->onfinish
= PoolOp::OpComp::create(
3938 service
.get_executor(),
3939 CB_SelfmanagedSnap(std::move(onfinish
)));
3940 op
->pool_op
= POOL_OP_CREATE_UNMANAGED_SNAP
;
3941 pool_ops
[op
->tid
] = op
;
3946 void Objecter::delete_pool_snap(
3947 int64_t pool
, std::string_view snap_name
,
3948 decltype(PoolOp::onfinish
)&& onfinish
)
3950 unique_lock
wl(rwlock
);
3951 ldout(cct
, 10) << "delete_pool_snap; pool: " << pool
<< "; snap: "
3952 << snap_name
<< dendl
;
3954 const pg_pool_t
*p
= osdmap
->get_pg_pool(pool
);
3956 onfinish
->defer(std::move(onfinish
), osdc_errc::pool_dne
, cb::list
{});
3960 if (!p
->snap_exists(snap_name
)) {
3961 onfinish
->defer(std::move(onfinish
), osdc_errc::snapshot_dne
, cb::list
{});
3965 auto op
= new PoolOp
;
3966 op
->tid
= ++last_tid
;
3968 op
->name
= snap_name
;
3969 op
->onfinish
= std::move(onfinish
);
3970 op
->pool_op
= POOL_OP_DELETE_SNAP
;
3971 pool_ops
[op
->tid
] = op
;
3976 void Objecter::delete_selfmanaged_snap(int64_t pool
, snapid_t snap
,
3977 decltype(PoolOp::onfinish
)&& onfinish
)
3979 unique_lock
wl(rwlock
);
3980 ldout(cct
, 10) << "delete_selfmanaged_snap; pool: " << pool
<< "; snap: "
3982 auto op
= new PoolOp
;
3983 op
->tid
= ++last_tid
;
3985 op
->onfinish
= std::move(onfinish
);
3986 op
->pool_op
= POOL_OP_DELETE_UNMANAGED_SNAP
;
3988 pool_ops
[op
->tid
] = op
;
3993 void Objecter::create_pool(std::string_view name
,
3994 decltype(PoolOp::onfinish
)&& onfinish
,
3997 unique_lock
wl(rwlock
);
3998 ldout(cct
, 10) << "create_pool name=" << name
<< dendl
;
4000 if (osdmap
->lookup_pg_pool_name(name
) >= 0) {
4001 onfinish
->defer(std::move(onfinish
), osdc_errc::pool_exists
, cb::list
{});
4005 auto op
= new PoolOp
;
4006 op
->tid
= ++last_tid
;
4009 op
->onfinish
= std::move(onfinish
);
4010 op
->pool_op
= POOL_OP_CREATE
;
4011 pool_ops
[op
->tid
] = op
;
4012 op
->crush_rule
= crush_rule
;
4017 void Objecter::delete_pool(int64_t pool
,
4018 decltype(PoolOp::onfinish
)&& onfinish
)
4020 unique_lock
wl(rwlock
);
4021 ldout(cct
, 10) << "delete_pool " << pool
<< dendl
;
4023 if (!osdmap
->have_pg_pool(pool
))
4024 onfinish
->defer(std::move(onfinish
), osdc_errc::pool_dne
, cb::list
{});
4026 _do_delete_pool(pool
, std::move(onfinish
));
4029 void Objecter::delete_pool(std::string_view pool_name
,
4030 decltype(PoolOp::onfinish
)&& onfinish
)
4032 unique_lock
wl(rwlock
);
4033 ldout(cct
, 10) << "delete_pool " << pool_name
<< dendl
;
4035 int64_t pool
= osdmap
->lookup_pg_pool_name(pool_name
);
4037 // This only returns one error: -ENOENT.
4038 onfinish
->defer(std::move(onfinish
), osdc_errc::pool_dne
, cb::list
{});
4040 _do_delete_pool(pool
, std::move(onfinish
));
4043 void Objecter::_do_delete_pool(int64_t pool
,
4044 decltype(PoolOp::onfinish
)&& onfinish
)
4047 auto op
= new PoolOp
;
4048 op
->tid
= ++last_tid
;
4050 op
->name
= "delete";
4051 op
->onfinish
= std::move(onfinish
);
4052 op
->pool_op
= POOL_OP_DELETE
;
4053 pool_ops
[op
->tid
] = op
;
4057 void Objecter::pool_op_submit(PoolOp
*op
)
4060 if (mon_timeout
> timespan(0)) {
4061 op
->ontimeout
= timer
.add_event(mon_timeout
,
4063 pool_op_cancel(op
->tid
, -ETIMEDOUT
); });
4065 _pool_op_submit(op
);
4068 void Objecter::_pool_op_submit(PoolOp
*op
)
4070 // rwlock is locked unique
4072 ldout(cct
, 10) << "pool_op_submit " << op
->tid
<< dendl
;
4073 auto m
= new MPoolOp(monc
->get_fsid(), op
->tid
, op
->pool
,
4074 op
->name
, op
->pool_op
,
4075 last_seen_osdmap_version
);
4076 if (op
->snapid
) m
->snapid
= op
->snapid
;
4077 if (op
->crush_rule
) m
->crush_rule
= op
->crush_rule
;
4078 monc
->send_mon_message(m
);
4079 op
->last_submit
= ceph::coarse_mono_clock::now();
4081 logger
->inc(l_osdc_poolop_send
);
4085 * Handle a reply to a PoolOp message. Check that we sent the message
4086 * and give the caller responsibility for the returned cb::list.
4087 * Then either call the finisher or stash the PoolOp, depending on if we
4088 * have a new enough map.
4089 * Lastly, clean up the message and PoolOp.
4091 void Objecter::handle_pool_op_reply(MPoolOpReply
*m
)
4093 int rc
= m
->replyCode
;
4094 auto ec
= rc
< 0 ? bs::error_code(-rc
, mon_category()) : bs::error_code();
4096 shunique_lock
sul(rwlock
, acquire_shared
);
4103 ldout(cct
, 10) << "handle_pool_op_reply " << *m
<< dendl
;
4104 ceph_tid_t tid
= m
->get_tid();
4105 auto iter
= pool_ops
.find(tid
);
4106 if (iter
!= pool_ops
.end()) {
4107 PoolOp
*op
= iter
->second
;
4108 ldout(cct
, 10) << "have request " << tid
<< " at " << op
<< " Op: "
4109 << ceph_pool_op_name(op
->pool_op
) << dendl
;
4110 cb::list bl
{std::move(m
->response_data
)};
4111 if (m
->version
> last_seen_osdmap_version
)
4112 last_seen_osdmap_version
= m
->version
;
4113 if (osdmap
->get_epoch() < m
->epoch
) {
4116 // recheck op existence since we have let go of rwlock
4117 // (for promotion) above.
4118 iter
= pool_ops
.find(tid
);
4119 if (iter
== pool_ops
.end())
4120 goto done
; // op is gone.
4121 if (osdmap
->get_epoch() < m
->epoch
) {
4122 ldout(cct
, 20) << "waiting for client to reach epoch " << m
->epoch
4123 << " before calling back" << dendl
;
4124 _wait_for_new_map(OpCompletion::create(
4125 service
.get_executor(),
4126 [o
= std::move(op
->onfinish
),
4127 bl
= std::move(bl
)](
4128 bs::error_code ec
) mutable {
4129 o
->defer(std::move(o
), ec
, bl
);
4134 // map epoch changed, probably because a MOSDMap message
4135 // sneaked in. Do caller-specified callback now or else
4136 // we lose it forever.
4137 ceph_assert(op
->onfinish
);
4138 op
->onfinish
->defer(std::move(op
->onfinish
), ec
, std::move(bl
));
4141 ceph_assert(op
->onfinish
);
4142 op
->onfinish
->defer(std::move(op
->onfinish
), ec
, std::move(bl
));
4144 op
->onfinish
= nullptr;
4145 if (!sul
.owns_lock()) {
4149 iter
= pool_ops
.find(tid
);
4150 if (iter
!= pool_ops
.end()) {
4151 _finish_pool_op(op
, 0);
4154 ldout(cct
, 10) << "unknown request " << tid
<< dendl
;
4158 // Not strictly necessary, since we'll release it on return.
4161 ldout(cct
, 10) << "done" << dendl
;
4165 int Objecter::pool_op_cancel(ceph_tid_t tid
, int r
)
4167 ceph_assert(initialized
);
4169 unique_lock
wl(rwlock
);
4171 auto it
= pool_ops
.find(tid
);
4172 if (it
== pool_ops
.end()) {
4173 ldout(cct
, 10) << __func__
<< " tid " << tid
<< " dne" << dendl
;
4177 ldout(cct
, 10) << __func__
<< " tid " << tid
<< dendl
;
4179 PoolOp
*op
= it
->second
;
4181 op
->onfinish
->defer(std::move(op
->onfinish
), osdcode(r
), cb::list
{});
4183 _finish_pool_op(op
, r
);
4187 void Objecter::_finish_pool_op(PoolOp
*op
, int r
)
4189 // rwlock is locked unique
4190 pool_ops
.erase(op
->tid
);
4191 logger
->set(l_osdc_poolop_active
, pool_ops
.size());
4193 if (op
->ontimeout
&& r
!= -ETIMEDOUT
) {
4194 timer
.cancel_event(op
->ontimeout
);
4202 void Objecter::get_pool_stats(
4203 const std::vector
<std::string
>& pools
,
4204 decltype(PoolStatOp::onfinish
)&& onfinish
)
4206 ldout(cct
, 10) << "get_pool_stats " << pools
<< dendl
;
4208 auto op
= new PoolStatOp
;
4209 op
->tid
= ++last_tid
;
4211 op
->onfinish
= std::move(onfinish
);
4212 if (mon_timeout
> timespan(0)) {
4213 op
->ontimeout
= timer
.add_event(mon_timeout
,
4215 pool_stat_op_cancel(op
->tid
,
4221 unique_lock
wl(rwlock
);
4223 poolstat_ops
[op
->tid
] = op
;
4225 logger
->set(l_osdc_poolstat_active
, poolstat_ops
.size());
4227 _poolstat_submit(op
);
4230 void Objecter::_poolstat_submit(PoolStatOp
*op
)
4232 ldout(cct
, 10) << "_poolstat_submit " << op
->tid
<< dendl
;
4233 monc
->send_mon_message(new MGetPoolStats(monc
->get_fsid(), op
->tid
,
4235 last_seen_pgmap_version
));
4236 op
->last_submit
= ceph::coarse_mono_clock::now();
4238 logger
->inc(l_osdc_poolstat_send
);
4241 void Objecter::handle_get_pool_stats_reply(MGetPoolStatsReply
*m
)
4243 ldout(cct
, 10) << "handle_get_pool_stats_reply " << *m
<< dendl
;
4244 ceph_tid_t tid
= m
->get_tid();
4246 unique_lock
wl(rwlock
);
4252 auto iter
= poolstat_ops
.find(tid
);
4253 if (iter
!= poolstat_ops
.end()) {
4254 PoolStatOp
*op
= poolstat_ops
[tid
];
4255 ldout(cct
, 10) << "have request " << tid
<< " at " << op
<< dendl
;
4256 if (m
->version
> last_seen_pgmap_version
) {
4257 last_seen_pgmap_version
= m
->version
;
4259 op
->onfinish
->defer(std::move(op
->onfinish
), bs::error_code
{},
4260 std::move(m
->pool_stats
), m
->per_pool
);
4261 _finish_pool_stat_op(op
, 0);
4263 ldout(cct
, 10) << "unknown request " << tid
<< dendl
;
4265 ldout(cct
, 10) << "done" << dendl
;
4269 int Objecter::pool_stat_op_cancel(ceph_tid_t tid
, int r
)
4271 ceph_assert(initialized
);
4273 unique_lock
wl(rwlock
);
4275 auto it
= poolstat_ops
.find(tid
);
4276 if (it
== poolstat_ops
.end()) {
4277 ldout(cct
, 10) << __func__
<< " tid " << tid
<< " dne" << dendl
;
4281 ldout(cct
, 10) << __func__
<< " tid " << tid
<< dendl
;
4283 auto op
= it
->second
;
4285 op
->onfinish
->defer(std::move(op
->onfinish
), osdcode(r
),
4286 bc::flat_map
<std::string
, pool_stat_t
>{}, false);
4287 _finish_pool_stat_op(op
, r
);
4291 void Objecter::_finish_pool_stat_op(PoolStatOp
*op
, int r
)
4293 // rwlock is locked unique
4295 poolstat_ops
.erase(op
->tid
);
4296 logger
->set(l_osdc_poolstat_active
, poolstat_ops
.size());
4298 if (op
->ontimeout
&& r
!= -ETIMEDOUT
)
4299 timer
.cancel_event(op
->ontimeout
);
4304 void Objecter::get_fs_stats(std::optional
<int64_t> poolid
,
4305 decltype(StatfsOp::onfinish
)&& onfinish
)
4307 ldout(cct
, 10) << "get_fs_stats" << dendl
;
4308 unique_lock
l(rwlock
);
4310 auto op
= new StatfsOp
;
4311 op
->tid
= ++last_tid
;
4312 op
->data_pool
= poolid
;
4313 op
->onfinish
= std::move(onfinish
);
4314 if (mon_timeout
> timespan(0)) {
4315 op
->ontimeout
= timer
.add_event(mon_timeout
,
4317 statfs_op_cancel(op
->tid
,
4322 statfs_ops
[op
->tid
] = op
;
4324 logger
->set(l_osdc_statfs_active
, statfs_ops
.size());
4326 _fs_stats_submit(op
);
4329 void Objecter::_fs_stats_submit(StatfsOp
*op
)
4331 // rwlock is locked unique
4333 ldout(cct
, 10) << "fs_stats_submit" << op
->tid
<< dendl
;
4334 monc
->send_mon_message(new MStatfs(monc
->get_fsid(), op
->tid
,
4336 last_seen_pgmap_version
));
4337 op
->last_submit
= ceph::coarse_mono_clock::now();
4339 logger
->inc(l_osdc_statfs_send
);
4342 void Objecter::handle_fs_stats_reply(MStatfsReply
*m
)
4344 unique_lock
wl(rwlock
);
4350 ldout(cct
, 10) << "handle_fs_stats_reply " << *m
<< dendl
;
4351 ceph_tid_t tid
= m
->get_tid();
4353 if (statfs_ops
.count(tid
)) {
4354 StatfsOp
*op
= statfs_ops
[tid
];
4355 ldout(cct
, 10) << "have request " << tid
<< " at " << op
<< dendl
;
4356 if (m
->h
.version
> last_seen_pgmap_version
)
4357 last_seen_pgmap_version
= m
->h
.version
;
4358 op
->onfinish
->defer(std::move(op
->onfinish
), bs::error_code
{}, m
->h
.st
);
4359 _finish_statfs_op(op
, 0);
4361 ldout(cct
, 10) << "unknown request " << tid
<< dendl
;
4364 ldout(cct
, 10) << "done" << dendl
;
4367 int Objecter::statfs_op_cancel(ceph_tid_t tid
, int r
)
4369 ceph_assert(initialized
);
4371 unique_lock
wl(rwlock
);
4373 auto it
= statfs_ops
.find(tid
);
4374 if (it
== statfs_ops
.end()) {
4375 ldout(cct
, 10) << __func__
<< " tid " << tid
<< " dne" << dendl
;
4379 ldout(cct
, 10) << __func__
<< " tid " << tid
<< dendl
;
4381 auto op
= it
->second
;
4383 op
->onfinish
->defer(std::move(op
->onfinish
), osdcode(r
), ceph_statfs
{});
4384 _finish_statfs_op(op
, r
);
4388 void Objecter::_finish_statfs_op(StatfsOp
*op
, int r
)
4390 // rwlock is locked unique
4392 statfs_ops
.erase(op
->tid
);
4393 logger
->set(l_osdc_statfs_active
, statfs_ops
.size());
4395 if (op
->ontimeout
&& r
!= -ETIMEDOUT
)
4396 timer
.cancel_event(op
->ontimeout
);
4403 void Objecter::_sg_read_finish(vector
<ObjectExtent
>& extents
,
4404 vector
<cb::list
>& resultbl
,
4405 cb::list
*bl
, Context
*onfinish
)
4408 ldout(cct
, 15) << "_sg_read_finish" << dendl
;
4410 if (extents
.size() > 1) {
4411 Striper::StripedReadResult r
;
4412 auto bit
= resultbl
.begin();
4413 for (auto eit
= extents
.begin();
4414 eit
!= extents
.end();
4416 r
.add_partial_result(cct
, *bit
, eit
->buffer_extents
);
4419 r
.assemble_result(cct
, *bl
, false);
4421 ldout(cct
, 15) << " only one frag" << dendl
;
4422 *bl
= std::move(resultbl
[0]);
4426 uint64_t bytes_read
= bl
->length();
4427 ldout(cct
, 7) << "_sg_read_finish " << bytes_read
<< " bytes" << dendl
;
4430 onfinish
->complete(bytes_read
);// > 0 ? bytes_read:m->get_result());
4435 void Objecter::ms_handle_connect(Connection
*con
)
4437 ldout(cct
, 10) << "ms_handle_connect " << con
<< dendl
;
4441 if (con
->get_peer_type() == CEPH_ENTITY_TYPE_MON
)
4445 bool Objecter::ms_handle_reset(Connection
*con
)
4449 if (con
->get_peer_type() == CEPH_ENTITY_TYPE_OSD
) {
4450 unique_lock
wl(rwlock
);
4452 auto priv
= con
->get_priv();
4453 auto session
= static_cast<OSDSession
*>(priv
.get());
4455 ldout(cct
, 1) << "ms_handle_reset " << con
<< " session " << session
4456 << " osd." << session
->osd
<< dendl
;
4457 // the session maybe had been closed if new osdmap just handled
4458 // says the osd down
4459 if (!(initialized
&& osdmap
->is_up(session
->osd
))) {
4460 ldout(cct
, 1) << "ms_handle_reset aborted,initialized=" << initialized
<< dendl
;
4464 map
<uint64_t, LingerOp
*> lresend
;
4465 unique_lock
sl(session
->lock
);
4466 _reopen_session(session
);
4467 _kick_requests(session
, lresend
);
4469 _linger_ops_resend(lresend
, wl
);
4471 maybe_request_map();
4478 void Objecter::ms_handle_remote_reset(Connection
*con
)
4481 * treat these the same.
4483 ms_handle_reset(con
);
4486 bool Objecter::ms_handle_refused(Connection
*con
)
4489 if (osdmap
&& (con
->get_peer_type() == CEPH_ENTITY_TYPE_OSD
)) {
4490 int osd
= osdmap
->identify_osd(con
->get_peer_addr());
4492 ldout(cct
, 1) << "ms_handle_refused on osd." << osd
<< dendl
;
4498 void Objecter::op_target_t::dump(Formatter
*f
) const
4500 f
->dump_stream("pg") << pgid
;
4501 f
->dump_int("osd", osd
);
4502 f
->dump_stream("object_id") << base_oid
;
4503 f
->dump_stream("object_locator") << base_oloc
;
4504 f
->dump_stream("target_object_id") << target_oid
;
4505 f
->dump_stream("target_object_locator") << target_oloc
;
4506 f
->dump_int("paused", (int)paused
);
4507 f
->dump_int("used_replica", (int)used_replica
);
4508 f
->dump_int("precalc_pgid", (int)precalc_pgid
);
4511 void Objecter::_dump_active(OSDSession
*s
)
4513 for (auto p
= s
->ops
.begin(); p
!= s
->ops
.end(); ++p
) {
4515 ldout(cct
, 20) << op
->tid
<< "\t" << op
->target
.pgid
4516 << "\tosd." << (op
->session
? op
->session
->osd
: -1)
4517 << "\t" << op
->target
.base_oid
4518 << "\t" << op
->ops
<< dendl
;
4522 void Objecter::_dump_active()
4524 ldout(cct
, 20) << "dump_active .. " << num_homeless_ops
<< " homeless"
4526 for (auto siter
= osd_sessions
.begin();
4527 siter
!= osd_sessions
.end(); ++siter
) {
4528 auto s
= siter
->second
;
4529 shared_lock
sl(s
->lock
);
4533 _dump_active(homeless_session
);
4536 void Objecter::dump_active()
4538 shared_lock
rl(rwlock
);
4543 void Objecter::dump_requests(Formatter
*fmt
)
4545 // Read-lock on Objecter held here
4546 fmt
->open_object_section("requests");
4548 dump_linger_ops(fmt
);
4550 dump_pool_stat_ops(fmt
);
4551 dump_statfs_ops(fmt
);
4552 dump_command_ops(fmt
);
4553 fmt
->close_section(); // requests object
4556 void Objecter::_dump_ops(const OSDSession
*s
, Formatter
*fmt
)
4558 for (auto p
= s
->ops
.begin(); p
!= s
->ops
.end(); ++p
) {
4560 auto age
= std::chrono::duration
<double>(ceph::coarse_mono_clock::now() - op
->stamp
);
4561 fmt
->open_object_section("op");
4562 fmt
->dump_unsigned("tid", op
->tid
);
4563 op
->target
.dump(fmt
);
4564 fmt
->dump_stream("last_sent") << op
->stamp
;
4565 fmt
->dump_float("age", age
.count());
4566 fmt
->dump_int("attempts", op
->attempts
);
4567 fmt
->dump_stream("snapid") << op
->snapid
;
4568 fmt
->dump_stream("snap_context") << op
->snapc
;
4569 fmt
->dump_stream("mtime") << op
->mtime
;
4571 fmt
->open_array_section("osd_ops");
4572 for (auto it
= op
->ops
.begin(); it
!= op
->ops
.end(); ++it
) {
4573 fmt
->dump_stream("osd_op") << *it
;
4575 fmt
->close_section(); // osd_ops array
4577 fmt
->close_section(); // op object
4581 void Objecter::dump_ops(Formatter
*fmt
)
4583 // Read-lock on Objecter held
4584 fmt
->open_array_section("ops");
4585 for (auto siter
= osd_sessions
.begin();
4586 siter
!= osd_sessions
.end(); ++siter
) {
4587 OSDSession
*s
= siter
->second
;
4588 shared_lock
sl(s
->lock
);
4592 _dump_ops(homeless_session
, fmt
);
4593 fmt
->close_section(); // ops array
4596 void Objecter::_dump_linger_ops(const OSDSession
*s
, Formatter
*fmt
)
4598 for (auto p
= s
->linger_ops
.begin(); p
!= s
->linger_ops
.end(); ++p
) {
4599 auto op
= p
->second
;
4600 fmt
->open_object_section("linger_op");
4601 fmt
->dump_unsigned("linger_id", op
->linger_id
);
4602 op
->target
.dump(fmt
);
4603 fmt
->dump_stream("snapid") << op
->snap
;
4604 fmt
->dump_stream("registered") << op
->registered
;
4605 fmt
->close_section(); // linger_op object
4609 void Objecter::dump_linger_ops(Formatter
*fmt
)
4611 // We have a read-lock on the objecter
4612 fmt
->open_array_section("linger_ops");
4613 for (auto siter
= osd_sessions
.begin();
4614 siter
!= osd_sessions
.end(); ++siter
) {
4615 auto s
= siter
->second
;
4616 shared_lock
sl(s
->lock
);
4617 _dump_linger_ops(s
, fmt
);
4620 _dump_linger_ops(homeless_session
, fmt
);
4621 fmt
->close_section(); // linger_ops array
4624 void Objecter::_dump_command_ops(const OSDSession
*s
, Formatter
*fmt
)
4626 for (auto p
= s
->command_ops
.begin(); p
!= s
->command_ops
.end(); ++p
) {
4627 auto op
= p
->second
;
4628 fmt
->open_object_section("command_op");
4629 fmt
->dump_unsigned("command_id", op
->tid
);
4630 fmt
->dump_int("osd", op
->session
? op
->session
->osd
: -1);
4631 fmt
->open_array_section("command");
4632 for (auto q
= op
->cmd
.begin(); q
!= op
->cmd
.end(); ++q
)
4633 fmt
->dump_string("word", *q
);
4634 fmt
->close_section();
4635 if (op
->target_osd
>= 0)
4636 fmt
->dump_int("target_osd", op
->target_osd
);
4638 fmt
->dump_stream("target_pg") << op
->target_pg
;
4639 fmt
->close_section(); // command_op object
4643 void Objecter::dump_command_ops(Formatter
*fmt
)
4645 // We have a read-lock on the Objecter here
4646 fmt
->open_array_section("command_ops");
4647 for (auto siter
= osd_sessions
.begin();
4648 siter
!= osd_sessions
.end(); ++siter
) {
4649 auto s
= siter
->second
;
4650 shared_lock
sl(s
->lock
);
4651 _dump_command_ops(s
, fmt
);
4654 _dump_command_ops(homeless_session
, fmt
);
4655 fmt
->close_section(); // command_ops array
4658 void Objecter::dump_pool_ops(Formatter
*fmt
) const
4660 fmt
->open_array_section("pool_ops");
4661 for (auto p
= pool_ops
.begin(); p
!= pool_ops
.end(); ++p
) {
4662 auto op
= p
->second
;
4663 fmt
->open_object_section("pool_op");
4664 fmt
->dump_unsigned("tid", op
->tid
);
4665 fmt
->dump_int("pool", op
->pool
);
4666 fmt
->dump_string("name", op
->name
);
4667 fmt
->dump_int("operation_type", op
->pool_op
);
4668 fmt
->dump_unsigned("crush_rule", op
->crush_rule
);
4669 fmt
->dump_stream("snapid") << op
->snapid
;
4670 fmt
->dump_stream("last_sent") << op
->last_submit
;
4671 fmt
->close_section(); // pool_op object
4673 fmt
->close_section(); // pool_ops array
4676 void Objecter::dump_pool_stat_ops(Formatter
*fmt
) const
4678 fmt
->open_array_section("pool_stat_ops");
4679 for (auto p
= poolstat_ops
.begin();
4680 p
!= poolstat_ops
.end();
4682 PoolStatOp
*op
= p
->second
;
4683 fmt
->open_object_section("pool_stat_op");
4684 fmt
->dump_unsigned("tid", op
->tid
);
4685 fmt
->dump_stream("last_sent") << op
->last_submit
;
4687 fmt
->open_array_section("pools");
4688 for (const auto& it
: op
->pools
) {
4689 fmt
->dump_string("pool", it
);
4691 fmt
->close_section(); // pools array
4693 fmt
->close_section(); // pool_stat_op object
4695 fmt
->close_section(); // pool_stat_ops array
4698 void Objecter::dump_statfs_ops(Formatter
*fmt
) const
4700 fmt
->open_array_section("statfs_ops");
4701 for (auto p
= statfs_ops
.begin(); p
!= statfs_ops
.end(); ++p
) {
4702 auto op
= p
->second
;
4703 fmt
->open_object_section("statfs_op");
4704 fmt
->dump_unsigned("tid", op
->tid
);
4705 fmt
->dump_stream("last_sent") << op
->last_submit
;
4706 fmt
->close_section(); // statfs_op object
4708 fmt
->close_section(); // statfs_ops array
4711 Objecter::RequestStateHook::RequestStateHook(Objecter
*objecter
) :
4712 m_objecter(objecter
)
4716 int Objecter::RequestStateHook::call(std::string_view command
,
4717 const cmdmap_t
& cmdmap
,
4723 shared_lock
rl(m_objecter
->rwlock
);
4724 m_objecter
->dump_requests(f
);
4728 void Objecter::blocklist_self(bool set
)
4730 ldout(cct
, 10) << "blocklist_self " << (set
? "add" : "rm") << dendl
;
4733 cmd
.push_back("{\"prefix\":\"osd blocklist\", ");
4735 cmd
.push_back("\"blocklistop\":\"add\",");
4737 cmd
.push_back("\"blocklistop\":\"rm\",");
4739 // this is somewhat imprecise in that we are blocklisting our first addr only
4740 ss
<< messenger
->get_myaddrs().front().get_legacy_str();
4741 cmd
.push_back("\"addr\":\"" + ss
.str() + "\"");
4743 auto m
= new MMonCommand(monc
->get_fsid());
4746 // NOTE: no fallback to legacy blacklist command implemented here
4747 // since this is only used for test code.
4749 monc
->send_mon_message(m
);
4754 void Objecter::handle_command_reply(MCommandReply
*m
)
4756 unique_lock
wl(rwlock
);
4762 ConnectionRef con
= m
->get_connection();
4763 auto priv
= con
->get_priv();
4764 auto s
= static_cast<OSDSession
*>(priv
.get());
4765 if (!s
|| s
->con
!= con
) {
4766 ldout(cct
, 7) << __func__
<< " no session on con " << con
<< dendl
;
4771 shared_lock
sl(s
->lock
);
4772 auto p
= s
->command_ops
.find(m
->get_tid());
4773 if (p
== s
->command_ops
.end()) {
4774 ldout(cct
, 10) << "handle_command_reply tid " << m
->get_tid()
4775 << " not found" << dendl
;
4781 CommandOp
*c
= p
->second
;
4783 m
->get_connection() != c
->session
->con
) {
4784 ldout(cct
, 10) << "handle_command_reply tid " << m
->get_tid()
4785 << " got reply from wrong connection "
4786 << m
->get_connection() << " " << m
->get_source_inst()
4793 if (m
->r
== -EAGAIN
) {
4794 ldout(cct
,10) << __func__
<< " tid " << m
->get_tid()
4795 << " got EAGAIN, requesting map and resending" << dendl
;
4796 // NOTE: This might resend twice... once now, and once again when
4797 // we get an updated osdmap and the PG is found to have moved.
4798 _maybe_request_map();
4807 unique_lock
sul(s
->lock
);
4808 _finish_command(c
, m
->r
< 0 ? bs::error_code(-m
->r
, osd_category()) :
4809 bs::error_code(), std::move(m
->rs
),
4810 std::move(m
->get_data()));
4816 Objecter::LingerOp::LingerOp(Objecter
*o
, uint64_t linger_id
)
4818 linger_id(linger_id
),
4819 watch_lock(ceph::make_shared_mutex(
4820 fmt::format("LingerOp::watch_lock #{}", linger_id
)))
4823 void Objecter::submit_command(CommandOp
*c
, ceph_tid_t
*ptid
)
4825 shunique_lock
sul(rwlock
, ceph::acquire_unique
);
4827 ceph_tid_t tid
= ++last_tid
;
4828 ldout(cct
, 10) << "_submit_command " << tid
<< " " << c
->cmd
<< dendl
;
4832 unique_lock
hs_wl(homeless_session
->lock
);
4833 _session_command_op_assign(homeless_session
, c
);
4836 _calc_command_target(c
, sul
);
4837 _assign_command_session(c
, sul
);
4838 if (osd_timeout
> timespan(0)) {
4839 c
->ontimeout
= timer
.add_event(osd_timeout
,
4843 osdc_errc::timed_out
); });
4846 if (!c
->session
->is_homeless()) {
4849 _maybe_request_map();
4851 if (c
->map_check_error
)
4852 _send_command_map_check(c
);
4856 logger
->inc(l_osdc_command_active
);
4859 int Objecter::_calc_command_target(CommandOp
*c
,
4860 shunique_lock
<ceph::shared_mutex
>& sul
)
4862 ceph_assert(sul
.owns_lock() && sul
.mutex() == &rwlock
);
4864 c
->map_check_error
= 0;
4866 // ignore overlays, just like we do with pg ops
4867 c
->target
.flags
|= CEPH_OSD_FLAG_IGNORE_OVERLAY
;
4869 if (c
->target_osd
>= 0) {
4870 if (!osdmap
->exists(c
->target_osd
)) {
4871 c
->map_check_error
= -ENOENT
;
4872 c
->map_check_error_str
= "osd dne";
4874 return RECALC_OP_TARGET_OSD_DNE
;
4876 if (osdmap
->is_down(c
->target_osd
)) {
4877 c
->map_check_error
= -ENXIO
;
4878 c
->map_check_error_str
= "osd down";
4880 return RECALC_OP_TARGET_OSD_DOWN
;
4882 c
->target
.osd
= c
->target_osd
;
4884 int ret
= _calc_target(&(c
->target
), nullptr, true);
4885 if (ret
== RECALC_OP_TARGET_POOL_DNE
) {
4886 c
->map_check_error
= -ENOENT
;
4887 c
->map_check_error_str
= "pool dne";
4890 } else if (ret
== RECALC_OP_TARGET_OSD_DOWN
) {
4891 c
->map_check_error
= -ENXIO
;
4892 c
->map_check_error_str
= "osd down";
4899 int r
= _get_session(c
->target
.osd
, &s
, sul
);
4900 ceph_assert(r
!= -EAGAIN
); /* shouldn't happen as we're holding the write lock */
4902 if (c
->session
!= s
) {
4904 return RECALC_OP_TARGET_NEED_RESEND
;
4909 ldout(cct
, 20) << "_recalc_command_target " << c
->tid
<< " no change, "
4910 << c
->session
<< dendl
;
4912 return RECALC_OP_TARGET_NO_ACTION
;
4915 void Objecter::_assign_command_session(CommandOp
*c
,
4916 shunique_lock
<ceph::shared_mutex
>& sul
)
4918 ceph_assert(sul
.owns_lock() && sul
.mutex() == &rwlock
);
4921 int r
= _get_session(c
->target
.osd
, &s
, sul
);
4922 ceph_assert(r
!= -EAGAIN
); /* shouldn't happen as we're holding the write lock */
4924 if (c
->session
!= s
) {
4926 OSDSession
*cs
= c
->session
;
4927 unique_lock
csl(cs
->lock
);
4928 _session_command_op_remove(c
->session
, c
);
4931 unique_lock
sl(s
->lock
);
4932 _session_command_op_assign(s
, c
);
4938 void Objecter::_send_command(CommandOp
*c
)
4940 ldout(cct
, 10) << "_send_command " << c
->tid
<< dendl
;
4941 ceph_assert(c
->session
);
4942 ceph_assert(c
->session
->con
);
4943 auto m
= new MCommand(monc
->monmap
.fsid
);
4945 m
->set_data(c
->inbl
);
4947 c
->session
->con
->send_message(m
);
4948 logger
->inc(l_osdc_command_send
);
4951 int Objecter::command_op_cancel(OSDSession
*s
, ceph_tid_t tid
,
4954 ceph_assert(initialized
);
4956 unique_lock
wl(rwlock
);
4958 auto it
= s
->command_ops
.find(tid
);
4959 if (it
== s
->command_ops
.end()) {
4960 ldout(cct
, 10) << __func__
<< " tid " << tid
<< " dne" << dendl
;
4964 ldout(cct
, 10) << __func__
<< " tid " << tid
<< dendl
;
4966 CommandOp
*op
= it
->second
;
4967 _command_cancel_map_check(op
);
4968 unique_lock
sl(op
->session
->lock
);
4969 _finish_command(op
, ec
, {}, {});
4974 void Objecter::_finish_command(CommandOp
*c
, bs::error_code ec
,
4975 string
&& rs
, cb::list
&& bl
)
4977 // rwlock is locked unique
4978 // session lock is locked
4980 ldout(cct
, 10) << "_finish_command " << c
->tid
<< " = " << ec
<< " "
4984 c
->onfinish
->defer(std::move(c
->onfinish
), ec
, std::move(rs
), std::move(bl
));
4986 if (c
->ontimeout
&& ec
!= bs::errc::timed_out
)
4987 timer
.cancel_event(c
->ontimeout
);
4989 _session_command_op_remove(c
->session
, c
);
4993 logger
->dec(l_osdc_command_active
);
4996 Objecter::OSDSession::~OSDSession()
4998 // Caller is responsible for re-assigning or
4999 // destroying any ops that were assigned to us
5000 ceph_assert(ops
.empty());
5001 ceph_assert(linger_ops
.empty());
5002 ceph_assert(command_ops
.empty());
5005 Objecter::Objecter(CephContext
*cct
,
5006 Messenger
*m
, MonClient
*mc
,
5007 boost::asio::io_context
& service
) :
5008 Dispatcher(cct
), messenger(m
), monc(mc
), service(service
)
5010 mon_timeout
= cct
->_conf
.get_val
<std::chrono::seconds
>("rados_mon_op_timeout");
5011 osd_timeout
= cct
->_conf
.get_val
<std::chrono::seconds
>("rados_osd_op_timeout");
5014 Objecter::~Objecter()
5016 ceph_assert(homeless_session
->get_nref() == 1);
5017 ceph_assert(num_homeless_ops
== 0);
5018 homeless_session
->put();
5020 ceph_assert(osd_sessions
.empty());
5021 ceph_assert(poolstat_ops
.empty());
5022 ceph_assert(statfs_ops
.empty());
5023 ceph_assert(pool_ops
.empty());
5024 ceph_assert(waiting_for_map
.empty());
5025 ceph_assert(linger_ops
.empty());
5026 ceph_assert(check_latest_map_lingers
.empty());
5027 ceph_assert(check_latest_map_ops
.empty());
5028 ceph_assert(check_latest_map_commands
.empty());
5030 ceph_assert(!m_request_state_hook
);
5031 ceph_assert(!logger
);
5035 * Wait until this OSD map epoch is received before
5036 * sending any more operations to OSDs. Use this
5037 * when it is known that the client can't trust
5038 * anything from before this epoch (e.g. due to
5039 * client blocklist at this epoch).
5041 void Objecter::set_epoch_barrier(epoch_t epoch
)
5043 unique_lock
wl(rwlock
);
5045 ldout(cct
, 7) << __func__
<< ": barrier " << epoch
<< " (was "
5046 << epoch_barrier
<< ") current epoch " << osdmap
->get_epoch()
5048 if (epoch
> epoch_barrier
) {
5049 epoch_barrier
= epoch
;
5050 _maybe_request_map();
5056 hobject_t
Objecter::enumerate_objects_begin()
5061 hobject_t
Objecter::enumerate_objects_end()
5063 return hobject_t::get_max();
5066 template<typename T
>
5067 struct EnumerationContext
{
5069 const hobject_t end
;
5070 const cb::list filter
;
5072 const object_locator_t oloc
;
5075 fu2::unique_function
<void(bs::error_code
,
5077 hobject_t
) &&> on_finish
;
5082 EnumerationContext(Objecter
* objecter
,
5083 hobject_t end
, cb::list filter
,
5084 uint32_t max
, object_locator_t oloc
,
5085 decltype(on_finish
) on_finish
)
5086 : objecter(objecter
), end(std::move(end
)), filter(std::move(filter
)),
5087 max(max
), oloc(std::move(oloc
)), on_finish(std::move(on_finish
)) {}
5089 void operator()(bs::error_code ec
,
5093 objecter
->put_op_budget_bytes(budget
);
5097 std::move(on_finish
)(ec
, std::move(v
), std::move(h
));
5101 template<typename T
>
5102 struct CB_EnumerateReply
{
5106 std::unique_ptr
<EnumerationContext
<T
>> ctx
;
5108 CB_EnumerateReply(Objecter
* objecter
,
5109 std::unique_ptr
<EnumerationContext
<T
>>&& ctx
) :
5110 objecter(objecter
), ctx(std::move(ctx
)) {}
5112 void operator()(bs::error_code ec
) {
5113 objecter
->_enumerate_reply(std::move(bl
), ec
, std::move(ctx
));
5117 template<typename T
>
5118 void Objecter::enumerate_objects(
5120 std::string_view ns
,
5124 const cb::list
& filter_bl
,
5125 fu2::unique_function
<void(bs::error_code
,
5127 hobject_t
) &&> on_finish
) {
5128 if (!end
.is_max() && start
> end
) {
5129 lderr(cct
) << __func__
<< ": start " << start
<< " > end " << end
<< dendl
;
5130 std::move(on_finish
)(osdc_errc::precondition_violated
, {}, {});
5135 lderr(cct
) << __func__
<< ": result size may not be zero" << dendl
;
5136 std::move(on_finish
)(osdc_errc::precondition_violated
, {}, {});
5140 if (start
.is_max()) {
5141 std::move(on_finish
)({}, {}, {});
5145 shared_lock
rl(rwlock
);
5146 ceph_assert(osdmap
->get_epoch());
5147 if (!osdmap
->test_flag(CEPH_OSDMAP_SORTBITWISE
)) {
5149 lderr(cct
) << __func__
<< ": SORTBITWISE cluster flag not set" << dendl
;
5150 std::move(on_finish
)(osdc_errc::not_supported
, {}, {});
5153 const pg_pool_t
* p
= osdmap
->get_pg_pool(pool_id
);
5155 lderr(cct
) << __func__
<< ": pool " << pool_id
<< " DNE in osd epoch "
5156 << osdmap
->get_epoch() << dendl
;
5158 std::move(on_finish
)(osdc_errc::pool_dne
, {}, {});
5164 _issue_enumerate(start
,
5165 std::make_unique
<EnumerationContext
<T
>>(
5166 this, std::move(end
), filter_bl
,
5167 max
, object_locator_t
{pool_id
, ns
},
5168 std::move(on_finish
)));
5172 void Objecter::enumerate_objects
<librados::ListObjectImpl
>(
5174 std::string_view ns
,
5178 const cb::list
& filter_bl
,
5179 fu2::unique_function
<void(bs::error_code
,
5180 std::vector
<librados::ListObjectImpl
>,
5181 hobject_t
) &&> on_finish
);
5184 void Objecter::enumerate_objects
<neorados::Entry
>(
5186 std::string_view ns
,
5190 const cb::list
& filter_bl
,
5191 fu2::unique_function
<void(bs::error_code
,
5192 std::vector
<neorados::Entry
>,
5193 hobject_t
) &&> on_finish
);
5197 template<typename T
>
5198 void Objecter::_issue_enumerate(hobject_t start
,
5199 std::unique_ptr
<EnumerationContext
<T
>> ctx
) {
5202 op
.pg_nls(c
->max
, c
->filter
, start
, osdmap
->get_epoch());
5203 auto on_ack
= std::make_unique
<CB_EnumerateReply
<T
>>(this, std::move(ctx
));
5204 // I hate having to do this. Try to find a cleaner way
5206 auto epoch
= &c
->epoch
;
5207 auto budget
= &c
->budget
;
5208 auto pbl
= &on_ack
->bl
;
5210 // Issue. See you later in _enumerate_reply
5211 pg_read(start
.get_hash(),
5212 c
->oloc
, op
, pbl
, 0,
5213 Op::OpComp::create(service
.get_executor(),
5214 [c
= std::move(on_ack
)]
5215 (bs::error_code ec
) mutable {
5221 void Objecter::_issue_enumerate
<librados::ListObjectImpl
>(
5223 std::unique_ptr
<EnumerationContext
<librados::ListObjectImpl
>> ctx
);
5225 void Objecter::_issue_enumerate
<neorados::Entry
>(
5226 hobject_t start
, std::unique_ptr
<EnumerationContext
<neorados::Entry
>> ctx
);
5228 template<typename T
>
5229 void Objecter::_enumerate_reply(
5232 std::unique_ptr
<EnumerationContext
<T
>>&& ctx
)
5235 std::move(*ctx
)(ec
, {}, {});
5239 // Decode the results
5240 auto iter
= bl
.cbegin();
5241 pg_nls_response_template
<T
> response
;
5244 response
.decode(iter
);
5246 // extra_info isn't used anywhere. We do this solely to preserve
5247 // backward compatibility
5248 cb::list legacy_extra_info
;
5249 decode(legacy_extra_info
, iter
);
5251 } catch (const bs::system_error
& e
) {
5252 std::move(*ctx
)(e
.code(), {}, {});
5256 shared_lock
rl(rwlock
);
5257 auto pool
= osdmap
->get_pg_pool(ctx
->oloc
.get_pool());
5260 // pool is gone, drop any results which are now meaningless.
5261 std::move(*ctx
)(osdc_errc::pool_dne
, {}, {});
5266 if ((response
.handle
<= ctx
->end
)) {
5267 next
= response
.handle
;
5271 // drop anything after 'end'
5272 while (!response
.entries
.empty()) {
5273 uint32_t hash
= response
.entries
.back().locator
.empty() ?
5274 pool
->hash_key(response
.entries
.back().oid
,
5275 response
.entries
.back().nspace
) :
5276 pool
->hash_key(response
.entries
.back().locator
,
5277 response
.entries
.back().nspace
);
5278 hobject_t
last(response
.entries
.back().oid
,
5279 response
.entries
.back().locator
,
5282 ctx
->oloc
.get_pool(),
5283 response
.entries
.back().nspace
);
5284 if (last
< ctx
->end
)
5286 response
.entries
.pop_back();
5290 if (response
.entries
.size() <= ctx
->max
) {
5291 ctx
->max
-= response
.entries
.size();
5292 std::move(response
.entries
.begin(), response
.entries
.end(),
5293 std::back_inserter(ctx
->ls
));
5295 auto i
= response
.entries
.begin();
5296 while (ctx
->max
> 0) {
5297 ctx
->ls
.push_back(std::move(*i
));
5302 i
->locator
.empty() ?
5303 pool
->hash_key(i
->oid
, i
->nspace
) :
5304 pool
->hash_key(i
->locator
, i
->nspace
);
5306 next
= hobject_t
{i
->oid
, i
->locator
,
5309 ctx
->oloc
.get_pool(),
5313 if (next
== ctx
->end
|| ctx
->max
== 0) {
5314 std::move(*ctx
)(ec
, std::move(ctx
->ls
), std::move(next
));
5316 _issue_enumerate(next
, std::move(ctx
));
5321 void Objecter::_enumerate_reply
<librados::ListObjectImpl
>(
5324 std::unique_ptr
<EnumerationContext
<librados::ListObjectImpl
>>&& ctx
);
5327 void Objecter::_enumerate_reply
<neorados::Entry
>(
5330 std::unique_ptr
<EnumerationContext
<neorados::Entry
>>&& ctx
);
5333 using namespace librados
;
5335 template <typename T
>
5336 void do_decode(std::vector
<T
>& items
, std::vector
<cb::list
>& bls
)
5338 for (auto bl
: bls
) {
5339 auto p
= bl
.cbegin();
5346 struct C_ObjectOperation_scrub_ls
: public Context
{
5349 std::vector
<inconsistent_obj_t
> *objects
= nullptr;
5350 std::vector
<inconsistent_snapset_t
> *snapsets
= nullptr;
5353 C_ObjectOperation_scrub_ls(uint32_t* interval
,
5354 std::vector
<inconsistent_obj_t
>* objects
,
5356 : interval(interval
), objects(objects
), rval(rval
) {}
5357 C_ObjectOperation_scrub_ls(uint32_t* interval
,
5358 std::vector
<inconsistent_snapset_t
>* snapsets
,
5360 : interval(interval
), snapsets(snapsets
), rval(rval
) {}
5361 void finish(int r
) override
{
5362 if (r
< 0 && r
!= -EAGAIN
) {
5373 } catch (cb::error
&) {
5380 scrub_ls_result_t result
;
5381 auto p
= bl
.cbegin();
5383 *interval
= result
.interval
;
5385 do_decode(*objects
, result
.vals
);
5387 do_decode(*snapsets
, result
.vals
);
5392 template <typename T
>
5393 void do_scrub_ls(::ObjectOperation
* op
,
5394 const scrub_ls_arg_t
& arg
,
5395 std::vector
<T
> *items
,
5399 OSDOp
& osd_op
= op
->add_op(CEPH_OSD_OP_SCRUBLS
);
5400 op
->flags
|= CEPH_OSD_FLAG_PGOP
;
5401 ceph_assert(interval
);
5402 arg
.encode(osd_op
.indata
);
5403 unsigned p
= op
->ops
.size() - 1;
5404 auto h
= new C_ObjectOperation_scrub_ls
{interval
, items
, rval
};
5406 op
->out_bl
[p
] = &h
->bl
;
5407 op
->out_rval
[p
] = rval
;
5411 void ::ObjectOperation::scrub_ls(const librados::object_id_t
& start_after
,
5412 uint64_t max_to_get
,
5413 std::vector
<librados::inconsistent_obj_t
>* objects
,
5417 scrub_ls_arg_t arg
= {*interval
, 0, start_after
, max_to_get
};
5418 do_scrub_ls(this, arg
, objects
, interval
, rval
);
5421 void ::ObjectOperation::scrub_ls(const librados::object_id_t
& start_after
,
5422 uint64_t max_to_get
,
5423 std::vector
<librados::inconsistent_snapset_t
> *snapsets
,
5427 scrub_ls_arg_t arg
= {*interval
, 1, start_after
, max_to_get
};
5428 do_scrub_ls(this, arg
, snapsets
, interval
, rval
);