1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
18 #include "osd/OSDMap.h"
21 #include "mon/MonClient.h"
23 #include "msg/Messenger.h"
24 #include "msg/Message.h"
26 #include "messages/MPing.h"
27 #include "messages/MOSDOp.h"
28 #include "messages/MOSDOpReply.h"
29 #include "messages/MOSDBackoff.h"
30 #include "messages/MOSDMap.h"
32 #include "messages/MPoolOp.h"
33 #include "messages/MPoolOpReply.h"
35 #include "messages/MGetPoolStats.h"
36 #include "messages/MGetPoolStatsReply.h"
37 #include "messages/MStatfs.h"
38 #include "messages/MStatfsReply.h"
40 #include "messages/MMonCommand.h"
42 #include "messages/MCommand.h"
43 #include "messages/MCommandReply.h"
45 #include "messages/MWatchNotify.h"
48 #include "common/config.h"
49 #include "common/perf_counters.h"
50 #include "common/scrub_types.h"
51 #include "include/str_list.h"
52 #include "common/errno.h"
53 #include "common/EventTrace.h"
59 using std::ostringstream
;
63 using std::stringstream
;
68 using ceph::Formatter
;
70 using std::defer_lock
;
72 using ceph::real_time
;
73 using ceph::real_clock
;
75 using ceph::mono_clock
;
76 using ceph::mono_time
;
80 using ceph::shunique_lock
;
81 using ceph::acquire_shared
;
82 using ceph::acquire_unique
;
84 #define dout_subsys ceph_subsys_objecter
86 #define dout_prefix *_dout << messenger->get_myname() << ".objecter "
90 l_osdc_first
= 123200,
108 l_osdc_osdop_writefull
,
109 l_osdc_osdop_writesame
,
112 l_osdc_osdop_truncate
,
115 l_osdc_osdop_sparse_read
,
116 l_osdc_osdop_clonerange
,
117 l_osdc_osdop_getxattr
,
118 l_osdc_osdop_setxattr
,
119 l_osdc_osdop_cmpxattr
,
120 l_osdc_osdop_rmxattr
,
121 l_osdc_osdop_resetxattrs
,
125 l_osdc_osdop_src_cmpxattr
,
127 l_osdc_osdop_pgls_filter
,
130 l_osdc_linger_active
,
132 l_osdc_linger_resend
,
135 l_osdc_poolop_active
,
137 l_osdc_poolop_resend
,
139 l_osdc_poolstat_active
,
140 l_osdc_poolstat_send
,
141 l_osdc_poolstat_resend
,
143 l_osdc_statfs_active
,
145 l_osdc_statfs_resend
,
147 l_osdc_command_active
,
149 l_osdc_command_resend
,
156 l_osdc_osd_session_open
,
157 l_osdc_osd_session_close
,
160 l_osdc_osdop_omap_wr
,
161 l_osdc_osdop_omap_rd
,
162 l_osdc_osdop_omap_del
,
168 // config obs ----------------------------
170 static const char *config_keys
[] = {
175 class Objecter::RequestStateHook
: public AdminSocketHook
{
176 Objecter
*m_objecter
;
178 explicit RequestStateHook(Objecter
*objecter
);
179 int call(std::string_view command
, const cmdmap_t
& cmdmap
,
182 ceph::buffer::list
& out
) override
;
186 * This is a more limited form of C_Contexts, but that requires
187 * a ceph_context which we don't have here.
189 class ObjectOperation::C_TwoContexts
: public Context
{
193 C_TwoContexts(Context
*first
, Context
*second
) :
194 first(first
), second(second
) {}
195 void finish(int r
) override
{
202 ~C_TwoContexts() override
{
208 void ObjectOperation::add_handler(Context
*extra
) {
209 size_t last
= out_handler
.size() - 1;
210 Context
*orig
= out_handler
[last
];
212 Context
*wrapper
= new C_TwoContexts(orig
, extra
);
213 out_handler
[last
] = wrapper
;
215 out_handler
[last
] = extra
;
219 Objecter::OSDSession::unique_completion_lock
Objecter::OSDSession::get_lock(
222 if (oid
.name
.empty())
223 return unique_completion_lock();
225 static constexpr uint32_t HASH_PRIME
= 1021;
226 uint32_t h
= ceph_str_hash_linux(oid
.name
.c_str(), oid
.name
.size())
229 return unique_completion_lock(completion_locks
[h
% num_locks
],
233 const char** Objecter::get_tracked_conf_keys() const
239 void Objecter::handle_conf_change(const ConfigProxy
& conf
,
240 const std::set
<std::string
> &changed
)
242 if (changed
.count("crush_location")) {
243 update_crush_location();
247 void Objecter::update_crush_location()
249 unique_lock
wl(rwlock
);
250 crush_location
= cct
->crush_location
.get_location();
253 // messages ------------------------------
256 * initialize only internal data structures, don't initiate cluster interaction
258 void Objecter::init()
260 ceph_assert(!initialized
);
263 PerfCountersBuilder
pcb(cct
, "objecter", l_osdc_first
, l_osdc_last
);
265 pcb
.add_u64(l_osdc_op_active
, "op_active", "Operations active", "actv",
266 PerfCountersBuilder::PRIO_CRITICAL
);
267 pcb
.add_u64(l_osdc_op_laggy
, "op_laggy", "Laggy operations");
268 pcb
.add_u64_counter(l_osdc_op_send
, "op_send", "Sent operations");
269 pcb
.add_u64_counter(l_osdc_op_send_bytes
, "op_send_bytes", "Sent data", NULL
, 0, unit_t(UNIT_BYTES
));
270 pcb
.add_u64_counter(l_osdc_op_resend
, "op_resend", "Resent operations");
271 pcb
.add_u64_counter(l_osdc_op_reply
, "op_reply", "Operation reply");
273 pcb
.add_u64_counter(l_osdc_op
, "op", "Operations");
274 pcb
.add_u64_counter(l_osdc_op_r
, "op_r", "Read operations", "rd",
275 PerfCountersBuilder::PRIO_CRITICAL
);
276 pcb
.add_u64_counter(l_osdc_op_w
, "op_w", "Write operations", "wr",
277 PerfCountersBuilder::PRIO_CRITICAL
);
278 pcb
.add_u64_counter(l_osdc_op_rmw
, "op_rmw", "Read-modify-write operations",
279 "rdwr", PerfCountersBuilder::PRIO_INTERESTING
);
280 pcb
.add_u64_counter(l_osdc_op_pg
, "op_pg", "PG operation");
282 pcb
.add_u64_counter(l_osdc_osdop_stat
, "osdop_stat", "Stat operations");
283 pcb
.add_u64_counter(l_osdc_osdop_create
, "osdop_create",
284 "Create object operations");
285 pcb
.add_u64_counter(l_osdc_osdop_read
, "osdop_read", "Read operations");
286 pcb
.add_u64_counter(l_osdc_osdop_write
, "osdop_write", "Write operations");
287 pcb
.add_u64_counter(l_osdc_osdop_writefull
, "osdop_writefull",
288 "Write full object operations");
289 pcb
.add_u64_counter(l_osdc_osdop_writesame
, "osdop_writesame",
290 "Write same operations");
291 pcb
.add_u64_counter(l_osdc_osdop_append
, "osdop_append",
293 pcb
.add_u64_counter(l_osdc_osdop_zero
, "osdop_zero",
294 "Set object to zero operations");
295 pcb
.add_u64_counter(l_osdc_osdop_truncate
, "osdop_truncate",
296 "Truncate object operations");
297 pcb
.add_u64_counter(l_osdc_osdop_delete
, "osdop_delete",
298 "Delete object operations");
299 pcb
.add_u64_counter(l_osdc_osdop_mapext
, "osdop_mapext",
300 "Map extent operations");
301 pcb
.add_u64_counter(l_osdc_osdop_sparse_read
, "osdop_sparse_read",
302 "Sparse read operations");
303 pcb
.add_u64_counter(l_osdc_osdop_clonerange
, "osdop_clonerange",
304 "Clone range operations");
305 pcb
.add_u64_counter(l_osdc_osdop_getxattr
, "osdop_getxattr",
306 "Get xattr operations");
307 pcb
.add_u64_counter(l_osdc_osdop_setxattr
, "osdop_setxattr",
308 "Set xattr operations");
309 pcb
.add_u64_counter(l_osdc_osdop_cmpxattr
, "osdop_cmpxattr",
310 "Xattr comparison operations");
311 pcb
.add_u64_counter(l_osdc_osdop_rmxattr
, "osdop_rmxattr",
312 "Remove xattr operations");
313 pcb
.add_u64_counter(l_osdc_osdop_resetxattrs
, "osdop_resetxattrs",
314 "Reset xattr operations");
315 pcb
.add_u64_counter(l_osdc_osdop_call
, "osdop_call",
316 "Call (execute) operations");
317 pcb
.add_u64_counter(l_osdc_osdop_watch
, "osdop_watch",
318 "Watch by object operations");
319 pcb
.add_u64_counter(l_osdc_osdop_notify
, "osdop_notify",
320 "Notify about object operations");
321 pcb
.add_u64_counter(l_osdc_osdop_src_cmpxattr
, "osdop_src_cmpxattr",
322 "Extended attribute comparison in multi operations");
323 pcb
.add_u64_counter(l_osdc_osdop_pgls
, "osdop_pgls");
324 pcb
.add_u64_counter(l_osdc_osdop_pgls_filter
, "osdop_pgls_filter");
325 pcb
.add_u64_counter(l_osdc_osdop_other
, "osdop_other", "Other operations");
327 pcb
.add_u64(l_osdc_linger_active
, "linger_active",
328 "Active lingering operations");
329 pcb
.add_u64_counter(l_osdc_linger_send
, "linger_send",
330 "Sent lingering operations");
331 pcb
.add_u64_counter(l_osdc_linger_resend
, "linger_resend",
332 "Resent lingering operations");
333 pcb
.add_u64_counter(l_osdc_linger_ping
, "linger_ping",
334 "Sent pings to lingering operations");
336 pcb
.add_u64(l_osdc_poolop_active
, "poolop_active",
337 "Active pool operations");
338 pcb
.add_u64_counter(l_osdc_poolop_send
, "poolop_send",
339 "Sent pool operations");
340 pcb
.add_u64_counter(l_osdc_poolop_resend
, "poolop_resend",
341 "Resent pool operations");
343 pcb
.add_u64(l_osdc_poolstat_active
, "poolstat_active",
344 "Active get pool stat operations");
345 pcb
.add_u64_counter(l_osdc_poolstat_send
, "poolstat_send",
346 "Pool stat operations sent");
347 pcb
.add_u64_counter(l_osdc_poolstat_resend
, "poolstat_resend",
348 "Resent pool stats");
350 pcb
.add_u64(l_osdc_statfs_active
, "statfs_active", "Statfs operations");
351 pcb
.add_u64_counter(l_osdc_statfs_send
, "statfs_send", "Sent FS stats");
352 pcb
.add_u64_counter(l_osdc_statfs_resend
, "statfs_resend",
355 pcb
.add_u64(l_osdc_command_active
, "command_active", "Active commands");
356 pcb
.add_u64_counter(l_osdc_command_send
, "command_send",
358 pcb
.add_u64_counter(l_osdc_command_resend
, "command_resend",
361 pcb
.add_u64(l_osdc_map_epoch
, "map_epoch", "OSD map epoch");
362 pcb
.add_u64_counter(l_osdc_map_full
, "map_full",
363 "Full OSD maps received");
364 pcb
.add_u64_counter(l_osdc_map_inc
, "map_inc",
365 "Incremental OSD maps received");
367 pcb
.add_u64(l_osdc_osd_sessions
, "osd_sessions",
368 "Open sessions"); // open sessions
369 pcb
.add_u64_counter(l_osdc_osd_session_open
, "osd_session_open",
371 pcb
.add_u64_counter(l_osdc_osd_session_close
, "osd_session_close",
373 pcb
.add_u64(l_osdc_osd_laggy
, "osd_laggy", "Laggy OSD sessions");
375 pcb
.add_u64_counter(l_osdc_osdop_omap_wr
, "omap_wr",
376 "OSD OMAP write operations");
377 pcb
.add_u64_counter(l_osdc_osdop_omap_rd
, "omap_rd",
378 "OSD OMAP read operations");
379 pcb
.add_u64_counter(l_osdc_osdop_omap_del
, "omap_del",
380 "OSD OMAP delete operations");
382 logger
= pcb
.create_perf_counters();
383 cct
->get_perfcounters_collection()->add(logger
);
386 m_request_state_hook
= new RequestStateHook(this);
387 AdminSocket
* admin_socket
= cct
->get_admin_socket();
388 int ret
= admin_socket
->register_command("objecter_requests",
389 m_request_state_hook
,
390 "show in-progress osd requests");
392 /* Don't warn on EEXIST, happens if multiple ceph clients
393 * are instantiated from one process */
394 if (ret
< 0 && ret
!= -EEXIST
) {
395 lderr(cct
) << "error registering admin socket command: "
396 << cpp_strerror(ret
) << dendl
;
399 update_crush_location();
401 cct
->_conf
.add_observer(this);
407 * ok, cluster interaction can happen
409 void Objecter::start(const OSDMap
* o
)
411 shared_lock
rl(rwlock
);
415 osdmap
->deepish_copy_from(*o
);
416 prune_pg_mapping(osdmap
->get_pools());
417 } else if (osdmap
->get_epoch() == 0) {
418 _maybe_request_map();
422 void Objecter::shutdown()
424 ceph_assert(initialized
);
426 unique_lock
wl(rwlock
);
431 cct
->_conf
.remove_observer(this);
434 map
<int,OSDSession
*>::iterator p
;
435 while (!osd_sessions
.empty()) {
436 p
= osd_sessions
.begin();
437 close_session(p
->second
);
440 while(!check_latest_map_lingers
.empty()) {
441 map
<uint64_t, LingerOp
*>::iterator i
= check_latest_map_lingers
.begin();
443 check_latest_map_lingers
.erase(i
->first
);
446 while(!check_latest_map_ops
.empty()) {
447 map
<ceph_tid_t
, Op
*>::iterator i
= check_latest_map_ops
.begin();
449 check_latest_map_ops
.erase(i
->first
);
452 while(!check_latest_map_commands
.empty()) {
453 map
<ceph_tid_t
, CommandOp
*>::iterator i
454 = check_latest_map_commands
.begin();
456 check_latest_map_commands
.erase(i
->first
);
459 while(!poolstat_ops
.empty()) {
460 map
<ceph_tid_t
,PoolStatOp
*>::iterator i
= poolstat_ops
.begin();
462 poolstat_ops
.erase(i
->first
);
465 while(!statfs_ops
.empty()) {
466 map
<ceph_tid_t
, StatfsOp
*>::iterator i
= statfs_ops
.begin();
468 statfs_ops
.erase(i
->first
);
471 while(!pool_ops
.empty()) {
472 map
<ceph_tid_t
, PoolOp
*>::iterator i
= pool_ops
.begin();
474 pool_ops
.erase(i
->first
);
477 ldout(cct
, 20) << __func__
<< " clearing up homeless session..." << dendl
;
478 while(!homeless_session
->linger_ops
.empty()) {
479 std::map
<uint64_t, LingerOp
*>::iterator i
480 = homeless_session
->linger_ops
.begin();
481 ldout(cct
, 10) << " linger_op " << i
->first
<< dendl
;
482 LingerOp
*lop
= i
->second
;
484 OSDSession::unique_lock
swl(homeless_session
->lock
);
485 _session_linger_op_remove(homeless_session
, lop
);
487 linger_ops
.erase(lop
->linger_id
);
488 linger_ops_set
.erase(lop
);
492 while(!homeless_session
->ops
.empty()) {
493 std::map
<ceph_tid_t
, Op
*>::iterator i
= homeless_session
->ops
.begin();
494 ldout(cct
, 10) << " op " << i
->first
<< dendl
;
497 OSDSession::unique_lock
swl(homeless_session
->lock
);
498 _session_op_remove(homeless_session
, op
);
503 while(!homeless_session
->command_ops
.empty()) {
504 std::map
<ceph_tid_t
, CommandOp
*>::iterator i
505 = homeless_session
->command_ops
.begin();
506 ldout(cct
, 10) << " command_op " << i
->first
<< dendl
;
507 CommandOp
*cop
= i
->second
;
509 OSDSession::unique_lock
swl(homeless_session
->lock
);
510 _session_command_op_remove(homeless_session
, cop
);
516 if (timer
.cancel_event(tick_event
)) {
517 ldout(cct
, 10) << " successfully canceled tick" << dendl
;
523 cct
->get_perfcounters_collection()->remove(logger
);
528 // Let go of Objecter write lock so timer thread can shutdown
531 // Outside of lock to avoid cycle WRT calls to RequestStateHook
532 // This is safe because we guarantee no concurrent calls to
533 // shutdown() with the ::initialized check at start.
534 if (m_request_state_hook
) {
535 AdminSocket
* admin_socket
= cct
->get_admin_socket();
536 admin_socket
->unregister_commands(m_request_state_hook
);
537 delete m_request_state_hook
;
538 m_request_state_hook
= NULL
;
542 void Objecter::_send_linger(LingerOp
*info
,
545 ceph_assert(sul
.owns_lock() && sul
.mutex() == &rwlock
);
548 Context
*oncommit
= NULL
;
549 LingerOp::shared_lock
watchl(info
->watch_lock
);
550 ceph::buffer::list
*poutbl
= NULL
;
551 if (info
->registered
&& info
->is_watch
) {
552 ldout(cct
, 15) << "send_linger " << info
->linger_id
<< " reconnect"
554 opv
.push_back(OSDOp());
555 opv
.back().op
.op
= CEPH_OSD_OP_WATCH
;
556 opv
.back().op
.watch
.cookie
= info
->get_cookie();
557 opv
.back().op
.watch
.op
= CEPH_OSD_WATCH_OP_RECONNECT
;
558 opv
.back().op
.watch
.gen
= ++info
->register_gen
;
559 oncommit
= new C_Linger_Reconnect(this, info
);
561 ldout(cct
, 15) << "send_linger " << info
->linger_id
<< " register"
564 C_Linger_Commit
*c
= new C_Linger_Commit(this, info
);
565 if (!info
->is_watch
) {
572 Op
*o
= new Op(info
->target
.base_oid
, info
->target
.base_oloc
,
573 opv
, info
->target
.flags
| CEPH_OSD_FLAG_READ
,
574 oncommit
, info
->pobjver
);
576 o
->snapid
= info
->snap
;
577 o
->snapc
= info
->snapc
;
578 o
->mtime
= info
->mtime
;
580 o
->target
= info
->target
;
583 // do not resend this; we will send a new op to reregister
584 o
->should_resend
= false;
585 o
->ctx_budgeted
= true;
587 if (info
->register_tid
) {
588 // repeat send. cancel old registration op, if any.
589 OSDSession::unique_lock
sl(info
->session
->lock
);
590 if (info
->session
->ops
.count(info
->register_tid
)) {
591 Op
*o
= info
->session
->ops
[info
->register_tid
];
592 _op_cancel_map_check(o
);
593 _cancel_linger_op(o
);
598 _op_submit_with_budget(o
, sul
, &info
->register_tid
, &info
->ctx_budget
);
600 logger
->inc(l_osdc_linger_send
);
603 void Objecter::_linger_commit(LingerOp
*info
, int r
, ceph::buffer::list
& outbl
)
605 LingerOp::unique_lock
wl(info
->watch_lock
);
606 ldout(cct
, 10) << "_linger_commit " << info
->linger_id
<< dendl
;
607 if (info
->on_reg_commit
) {
608 info
->on_reg_commit
->complete(r
);
609 info
->on_reg_commit
= NULL
;
611 if (r
< 0 && info
->on_notify_finish
) {
612 info
->on_notify_finish
->complete(r
);
613 info
->on_notify_finish
= nullptr;
616 // only tell the user the first time we do this
617 info
->registered
= true;
618 info
->pobjver
= NULL
;
620 if (!info
->is_watch
) {
621 // make note of the notify_id
622 auto p
= outbl
.cbegin();
624 decode(info
->notify_id
, p
);
625 ldout(cct
, 10) << "_linger_commit notify_id=" << info
->notify_id
628 catch (ceph::buffer::error
& e
) {
633 struct C_DoWatchError
: public Context
{
635 Objecter::LingerOp
*info
;
637 C_DoWatchError(Objecter
*o
, Objecter::LingerOp
*i
, int r
)
638 : objecter(o
), info(i
), err(r
) {
640 info
->_queued_async();
642 void finish(int r
) override
{
643 Objecter::unique_lock
wl(objecter
->rwlock
);
644 bool canceled
= info
->canceled
;
648 info
->watch_context
->handle_error(info
->get_cookie(), err
);
651 info
->finished_async();
656 int Objecter::_normalize_watch_error(int r
)
658 // translate ENOENT -> ENOTCONN so that a delete->disconnection
659 // notification and a failure to reconnect because we raced with
660 // the delete appear the same to the user.
666 void Objecter::_linger_reconnect(LingerOp
*info
, int r
)
668 ldout(cct
, 10) << __func__
<< " " << info
->linger_id
<< " = " << r
669 << " (last_error " << info
->last_error
<< ")" << dendl
;
671 LingerOp::unique_lock
wl(info
->watch_lock
);
672 if (!info
->last_error
) {
673 r
= _normalize_watch_error(r
);
674 info
->last_error
= r
;
675 if (info
->watch_context
) {
676 finisher
->queue(new C_DoWatchError(this, info
, r
));
683 void Objecter::_send_linger_ping(LingerOp
*info
)
685 // rwlock is locked unique
686 // info->session->lock is locked
688 if (cct
->_conf
->objecter_inject_no_watch_ping
) {
689 ldout(cct
, 10) << __func__
<< " " << info
->linger_id
<< " SKIPPING"
693 if (osdmap
->test_flag(CEPH_OSDMAP_PAUSERD
)) {
694 ldout(cct
, 10) << __func__
<< " PAUSERD" << dendl
;
698 ceph::coarse_mono_time now
= ceph::coarse_mono_clock::now();
699 ldout(cct
, 10) << __func__
<< " " << info
->linger_id
<< " now " << now
702 vector
<OSDOp
> opv(1);
703 opv
[0].op
.op
= CEPH_OSD_OP_WATCH
;
704 opv
[0].op
.watch
.cookie
= info
->get_cookie();
705 opv
[0].op
.watch
.op
= CEPH_OSD_WATCH_OP_PING
;
706 opv
[0].op
.watch
.gen
= info
->register_gen
;
707 C_Linger_Ping
*onack
= new C_Linger_Ping(this, info
);
708 Op
*o
= new Op(info
->target
.base_oid
, info
->target
.base_oloc
,
709 opv
, info
->target
.flags
| CEPH_OSD_FLAG_READ
,
711 o
->target
= info
->target
;
712 o
->should_resend
= false;
715 _session_op_assign(info
->session
, o
);
717 info
->ping_tid
= o
->tid
;
720 logger
->inc(l_osdc_linger_ping
);
723 void Objecter::_linger_ping(LingerOp
*info
, int r
, ceph::coarse_mono_time sent
,
724 uint32_t register_gen
)
726 LingerOp::unique_lock
l(info
->watch_lock
);
727 ldout(cct
, 10) << __func__
<< " " << info
->linger_id
728 << " sent " << sent
<< " gen " << register_gen
<< " = " << r
729 << " (last_error " << info
->last_error
730 << " register_gen " << info
->register_gen
<< ")" << dendl
;
731 if (info
->register_gen
== register_gen
) {
733 info
->watch_valid_thru
= sent
;
734 } else if (r
< 0 && !info
->last_error
) {
735 r
= _normalize_watch_error(r
);
736 info
->last_error
= r
;
737 if (info
->watch_context
) {
738 finisher
->queue(new C_DoWatchError(this, info
, r
));
742 ldout(cct
, 20) << " ignoring old gen" << dendl
;
746 int Objecter::linger_check(LingerOp
*info
)
748 LingerOp::shared_lock
l(info
->watch_lock
);
750 ceph::coarse_mono_time stamp
= info
->watch_valid_thru
;
751 if (!info
->watch_pending_async
.empty())
752 stamp
= std::min(info
->watch_valid_thru
, info
->watch_pending_async
.front());
753 auto age
= ceph::coarse_mono_clock::now() - stamp
;
755 ldout(cct
, 10) << __func__
<< " " << info
->linger_id
756 << " err " << info
->last_error
757 << " age " << age
<< dendl
;
758 if (info
->last_error
)
759 return info
->last_error
;
760 // return a safe upper bound (we are truncating to ms)
762 1 + std::chrono::duration_cast
<std::chrono::milliseconds
>(age
).count();
765 void Objecter::linger_cancel(LingerOp
*info
)
767 unique_lock
wl(rwlock
);
768 _linger_cancel(info
);
772 void Objecter::_linger_cancel(LingerOp
*info
)
774 // rwlock is locked unique
775 ldout(cct
, 20) << __func__
<< " linger_id=" << info
->linger_id
<< dendl
;
776 if (!info
->canceled
) {
777 OSDSession
*s
= info
->session
;
778 OSDSession::unique_lock
sl(s
->lock
);
779 _session_linger_op_remove(s
, info
);
782 linger_ops
.erase(info
->linger_id
);
783 linger_ops_set
.erase(info
);
784 ceph_assert(linger_ops
.size() == linger_ops_set
.size());
786 info
->canceled
= true;
789 logger
->dec(l_osdc_linger_active
);
795 Objecter::LingerOp
*Objecter::linger_register(const object_t
& oid
,
796 const object_locator_t
& oloc
,
799 LingerOp
*info
= new LingerOp(this);
800 info
->target
.base_oid
= oid
;
801 info
->target
.base_oloc
= oloc
;
802 if (info
->target
.base_oloc
.key
== oid
)
803 info
->target
.base_oloc
.key
.clear();
804 info
->target
.flags
= flags
;
805 info
->watch_valid_thru
= ceph::coarse_mono_clock::now();
807 unique_lock
l(rwlock
);
810 info
->linger_id
= ++max_linger_id
;
811 ldout(cct
, 10) << __func__
<< " info " << info
812 << " linger_id " << info
->linger_id
813 << " cookie " << info
->get_cookie()
815 linger_ops
[info
->linger_id
] = info
;
816 linger_ops_set
.insert(info
);
817 ceph_assert(linger_ops
.size() == linger_ops_set
.size());
819 info
->get(); // for the caller
823 ceph_tid_t
Objecter::linger_watch(LingerOp
*info
,
825 const SnapContext
& snapc
,
827 ceph::buffer::list
& inbl
,
831 info
->is_watch
= true;
834 info
->target
.flags
|= CEPH_OSD_FLAG_WRITE
;
838 info
->pobjver
= objver
;
839 info
->on_reg_commit
= oncommit
;
841 info
->ctx_budget
= take_linger_budget(info
);
843 shunique_lock
sul(rwlock
, ceph::acquire_unique
);
844 _linger_submit(info
, sul
);
845 logger
->inc(l_osdc_linger_active
);
847 return info
->linger_id
;
850 ceph_tid_t
Objecter::linger_notify(LingerOp
*info
,
852 snapid_t snap
, ceph::buffer::list
& inbl
,
853 ceph::buffer::list
*poutbl
,
858 info
->target
.flags
|= CEPH_OSD_FLAG_READ
;
861 info
->poutbl
= poutbl
;
862 info
->pobjver
= objver
;
863 info
->on_reg_commit
= onfinish
;
865 info
->ctx_budget
= take_linger_budget(info
);
867 shunique_lock
sul(rwlock
, ceph::acquire_unique
);
868 _linger_submit(info
, sul
);
869 logger
->inc(l_osdc_linger_active
);
871 return info
->linger_id
;
874 void Objecter::_linger_submit(LingerOp
*info
, shunique_lock
& sul
)
876 ceph_assert(sul
.owns_lock() && sul
.mutex() == &rwlock
);
877 ceph_assert(info
->linger_id
);
878 ceph_assert(info
->ctx_budget
!= -1); // caller needs to have taken budget already!
880 // Populate Op::target
881 OSDSession
*s
= NULL
;
882 _calc_target(&info
->target
, nullptr);
884 // Create LingerOp<->OSDSession relation
885 int r
= _get_session(info
->target
.osd
, &s
, sul
);
887 OSDSession::unique_lock
sl(s
->lock
);
888 _session_linger_op_assign(s
, info
);
892 _send_linger(info
, sul
);
895 struct C_DoWatchNotify
: public Context
{
897 Objecter::LingerOp
*info
;
899 C_DoWatchNotify(Objecter
*o
, Objecter::LingerOp
*i
, MWatchNotify
*m
)
900 : objecter(o
), info(i
), msg(m
) {
902 info
->_queued_async();
905 void finish(int r
) override
{
906 objecter
->_do_watch_notify(info
, msg
);
910 void Objecter::handle_watch_notify(MWatchNotify
*m
)
912 shared_lock
l(rwlock
);
917 LingerOp
*info
= reinterpret_cast<LingerOp
*>(m
->cookie
);
918 if (linger_ops_set
.count(info
) == 0) {
919 ldout(cct
, 7) << __func__
<< " cookie " << m
->cookie
<< " dne" << dendl
;
922 LingerOp::unique_lock
wl(info
->watch_lock
);
923 if (m
->opcode
== CEPH_WATCH_EVENT_DISCONNECT
) {
924 if (!info
->last_error
) {
925 info
->last_error
= -ENOTCONN
;
926 if (info
->watch_context
) {
927 finisher
->queue(new C_DoWatchError(this, info
, -ENOTCONN
));
930 } else if (!info
->is_watch
) {
931 // we have CEPH_WATCH_EVENT_NOTIFY_COMPLETE; we can do this inline
932 // since we know the only user (librados) is safe to call in
933 // fast-dispatch context
934 if (info
->notify_id
&&
935 info
->notify_id
!= m
->notify_id
) {
936 ldout(cct
, 10) << __func__
<< " reply notify " << m
->notify_id
937 << " != " << info
->notify_id
<< ", ignoring" << dendl
;
938 } else if (info
->on_notify_finish
) {
939 info
->notify_result_bl
->claim(m
->get_data());
940 info
->on_notify_finish
->complete(m
->return_code
);
942 // if we race with reconnect we might get a second notify; only
943 // notify the caller once!
944 info
->on_notify_finish
= NULL
;
947 finisher
->queue(new C_DoWatchNotify(this, info
, m
));
951 void Objecter::_do_watch_notify(LingerOp
*info
, MWatchNotify
*m
)
953 ldout(cct
, 10) << __func__
<< " " << *m
<< dendl
;
955 shared_lock
l(rwlock
);
956 ceph_assert(initialized
);
958 if (info
->canceled
) {
963 // notify completion?
964 ceph_assert(info
->is_watch
);
965 ceph_assert(info
->watch_context
);
966 ceph_assert(m
->opcode
!= CEPH_WATCH_EVENT_DISCONNECT
);
971 case CEPH_WATCH_EVENT_NOTIFY
:
972 info
->watch_context
->handle_notify(m
->notify_id
, m
->cookie
,
973 m
->notifier_gid
, m
->bl
);
978 info
->finished_async();
983 bool Objecter::ms_dispatch(Message
*m
)
985 ldout(cct
, 10) << __func__
<< " " << cct
<< " " << *m
<< dendl
;
986 switch (m
->get_type()) {
987 // these we exlusively handle
988 case CEPH_MSG_OSD_OPREPLY
:
989 handle_osd_op_reply(static_cast<MOSDOpReply
*>(m
));
992 case CEPH_MSG_OSD_BACKOFF
:
993 handle_osd_backoff(static_cast<MOSDBackoff
*>(m
));
996 case CEPH_MSG_WATCH_NOTIFY
:
997 handle_watch_notify(static_cast<MWatchNotify
*>(m
));
1001 case MSG_COMMAND_REPLY
:
1002 if (m
->get_source().type() == CEPH_ENTITY_TYPE_OSD
) {
1003 handle_command_reply(static_cast<MCommandReply
*>(m
));
1009 case MSG_GETPOOLSTATSREPLY
:
1010 handle_get_pool_stats_reply(static_cast<MGetPoolStatsReply
*>(m
));
1013 case CEPH_MSG_POOLOP_REPLY
:
1014 handle_pool_op_reply(static_cast<MPoolOpReply
*>(m
));
1017 case CEPH_MSG_STATFS_REPLY
:
1018 handle_fs_stats_reply(static_cast<MStatfsReply
*>(m
));
1021 // these we give others a chance to inspect
1024 case CEPH_MSG_OSD_MAP
:
1025 handle_osd_map(static_cast<MOSDMap
*>(m
));
1031 void Objecter::_scan_requests(
1035 map
<int64_t, bool> *pool_full_map
,
1036 map
<ceph_tid_t
, Op
*>& need_resend
,
1037 list
<LingerOp
*>& need_resend_linger
,
1038 map
<ceph_tid_t
, CommandOp
*>& need_resend_command
,
1041 ceph_assert(sul
.owns_lock() && sul
.mutex() == &rwlock
);
1043 list
<LingerOp
*> unregister_lingers
;
1045 OSDSession::unique_lock
sl(s
->lock
);
1047 // check for changed linger mappings (_before_ regular ops)
1048 map
<ceph_tid_t
,LingerOp
*>::iterator lp
= s
->linger_ops
.begin();
1049 while (lp
!= s
->linger_ops
.end()) {
1050 LingerOp
*op
= lp
->second
;
1051 ceph_assert(op
->session
== s
);
1052 // check_linger_pool_dne() may touch linger_ops; prevent iterator
1055 ldout(cct
, 10) << " checking linger op " << op
->linger_id
<< dendl
;
1056 bool unregister
, force_resend_writes
= cluster_full
;
1057 int r
= _recalc_linger_op_target(op
, sul
);
1059 force_resend_writes
= force_resend_writes
||
1060 (*pool_full_map
)[op
->target
.base_oloc
.pool
];
1062 case RECALC_OP_TARGET_NO_ACTION
:
1063 if (!skipped_map
&& !force_resend_writes
)
1066 case RECALC_OP_TARGET_NEED_RESEND
:
1067 need_resend_linger
.push_back(op
);
1068 _linger_cancel_map_check(op
);
1070 case RECALC_OP_TARGET_POOL_DNE
:
1071 _check_linger_pool_dne(op
, &unregister
);
1073 ldout(cct
, 10) << " need to unregister linger op "
1074 << op
->linger_id
<< dendl
;
1076 unregister_lingers
.push_back(op
);
1082 // check for changed request mappings
1083 map
<ceph_tid_t
,Op
*>::iterator p
= s
->ops
.begin();
1084 while (p
!= s
->ops
.end()) {
1086 ++p
; // check_op_pool_dne() may touch ops; prevent iterator invalidation
1087 ldout(cct
, 10) << " checking op " << op
->tid
<< dendl
;
1088 _prune_snapc(osdmap
->get_new_removed_snaps(), op
);
1089 bool force_resend_writes
= cluster_full
;
1091 force_resend_writes
= force_resend_writes
||
1092 (*pool_full_map
)[op
->target
.base_oloc
.pool
];
1093 int r
= _calc_target(&op
->target
,
1094 op
->session
? op
->session
->con
.get() : nullptr);
1096 case RECALC_OP_TARGET_NO_ACTION
:
1097 if (!skipped_map
&& !(force_resend_writes
&& op
->target
.respects_full()))
1100 case RECALC_OP_TARGET_NEED_RESEND
:
1101 _session_op_remove(op
->session
, op
);
1102 need_resend
[op
->tid
] = op
;
1103 _op_cancel_map_check(op
);
1105 case RECALC_OP_TARGET_POOL_DNE
:
1106 _check_op_pool_dne(op
, &sl
);
1112 map
<ceph_tid_t
,CommandOp
*>::iterator cp
= s
->command_ops
.begin();
1113 while (cp
!= s
->command_ops
.end()) {
1114 CommandOp
*c
= cp
->second
;
1116 ldout(cct
, 10) << " checking command " << c
->tid
<< dendl
;
1117 bool force_resend_writes
= cluster_full
;
1119 force_resend_writes
= force_resend_writes
||
1120 (*pool_full_map
)[c
->target_pg
.pool()];
1121 int r
= _calc_command_target(c
, sul
);
1123 case RECALC_OP_TARGET_NO_ACTION
:
1124 // resend if skipped map; otherwise do nothing.
1125 if (!skipped_map
&& !force_resend_writes
)
1128 case RECALC_OP_TARGET_NEED_RESEND
:
1129 need_resend_command
[c
->tid
] = c
;
1130 _session_command_op_remove(c
->session
, c
);
1131 _command_cancel_map_check(c
);
1133 case RECALC_OP_TARGET_POOL_DNE
:
1134 case RECALC_OP_TARGET_OSD_DNE
:
1135 case RECALC_OP_TARGET_OSD_DOWN
:
1136 _check_command_map_dne(c
);
1143 for (list
<LingerOp
*>::iterator iter
= unregister_lingers
.begin();
1144 iter
!= unregister_lingers
.end();
1146 _linger_cancel(*iter
);
1151 void Objecter::handle_osd_map(MOSDMap
*m
)
1153 shunique_lock
sul(rwlock
, acquire_unique
);
1157 ceph_assert(osdmap
);
1159 if (m
->fsid
!= monc
->get_fsid()) {
1160 ldout(cct
, 0) << "handle_osd_map fsid " << m
->fsid
1161 << " != " << monc
->get_fsid() << dendl
;
1165 bool was_pauserd
= osdmap
->test_flag(CEPH_OSDMAP_PAUSERD
);
1166 bool cluster_full
= _osdmap_full_flag();
1167 bool was_pausewr
= osdmap
->test_flag(CEPH_OSDMAP_PAUSEWR
) || cluster_full
||
1168 _osdmap_has_pool_full();
1169 map
<int64_t, bool> pool_full_map
;
1170 for (map
<int64_t, pg_pool_t
>::const_iterator it
1171 = osdmap
->get_pools().begin();
1172 it
!= osdmap
->get_pools().end(); ++it
)
1173 pool_full_map
[it
->first
] = _osdmap_pool_full(it
->second
);
1176 list
<LingerOp
*> need_resend_linger
;
1177 map
<ceph_tid_t
, Op
*> need_resend
;
1178 map
<ceph_tid_t
, CommandOp
*> need_resend_command
;
1180 if (m
->get_last() <= osdmap
->get_epoch()) {
1181 ldout(cct
, 3) << "handle_osd_map ignoring epochs ["
1182 << m
->get_first() << "," << m
->get_last()
1183 << "] <= " << osdmap
->get_epoch() << dendl
;
1185 ldout(cct
, 3) << "handle_osd_map got epochs ["
1186 << m
->get_first() << "," << m
->get_last()
1187 << "] > " << osdmap
->get_epoch() << dendl
;
1189 if (osdmap
->get_epoch()) {
1190 bool skipped_map
= false;
1191 // we want incrementals
1192 for (epoch_t e
= osdmap
->get_epoch() + 1;
1196 if (osdmap
->get_epoch() == e
-1 &&
1197 m
->incremental_maps
.count(e
)) {
1198 ldout(cct
, 3) << "handle_osd_map decoding incremental epoch " << e
1200 OSDMap::Incremental
inc(m
->incremental_maps
[e
]);
1201 osdmap
->apply_incremental(inc
);
1203 emit_blacklist_events(inc
);
1205 logger
->inc(l_osdc_map_inc
);
1207 else if (m
->maps
.count(e
)) {
1208 ldout(cct
, 3) << "handle_osd_map decoding full epoch " << e
<< dendl
;
1209 auto new_osdmap
= std::make_unique
<OSDMap
>();
1210 new_osdmap
->decode(m
->maps
[e
]);
1212 emit_blacklist_events(*osdmap
, *new_osdmap
);
1213 osdmap
= std::move(new_osdmap
);
1215 logger
->inc(l_osdc_map_full
);
1218 if (e
>= m
->get_oldest()) {
1219 ldout(cct
, 3) << "handle_osd_map requesting missing epoch "
1220 << osdmap
->get_epoch()+1 << dendl
;
1221 _maybe_request_map();
1224 ldout(cct
, 3) << "handle_osd_map missing epoch "
1225 << osdmap
->get_epoch()+1
1226 << ", jumping to " << m
->get_oldest() << dendl
;
1227 e
= m
->get_oldest() - 1;
1231 logger
->set(l_osdc_map_epoch
, osdmap
->get_epoch());
1233 prune_pg_mapping(osdmap
->get_pools());
1234 cluster_full
= cluster_full
|| _osdmap_full_flag();
1235 update_pool_full_map(pool_full_map
);
1237 // check all outstanding requests on every epoch
1238 for (auto& i
: need_resend
) {
1239 _prune_snapc(osdmap
->get_new_removed_snaps(), i
.second
);
1241 _scan_requests(homeless_session
, skipped_map
, cluster_full
,
1242 &pool_full_map
, need_resend
,
1243 need_resend_linger
, need_resend_command
, sul
);
1244 for (map
<int,OSDSession
*>::iterator p
= osd_sessions
.begin();
1245 p
!= osd_sessions
.end(); ) {
1246 OSDSession
*s
= p
->second
;
1247 _scan_requests(s
, skipped_map
, cluster_full
,
1248 &pool_full_map
, need_resend
,
1249 need_resend_linger
, need_resend_command
, sul
);
1251 // osd down or addr change?
1252 if (!osdmap
->is_up(s
->osd
) ||
1254 s
->con
->get_peer_addrs() != osdmap
->get_addrs(s
->osd
))) {
1259 ceph_assert(e
== osdmap
->get_epoch());
1263 // first map. we want the full thing.
1264 if (m
->maps
.count(m
->get_last())) {
1265 for (map
<int,OSDSession
*>::iterator p
= osd_sessions
.begin();
1266 p
!= osd_sessions
.end(); ++p
) {
1267 OSDSession
*s
= p
->second
;
1268 _scan_requests(s
, false, false, NULL
, need_resend
,
1269 need_resend_linger
, need_resend_command
, sul
);
1271 ldout(cct
, 3) << "handle_osd_map decoding full epoch "
1272 << m
->get_last() << dendl
;
1273 osdmap
->decode(m
->maps
[m
->get_last()]);
1274 prune_pg_mapping(osdmap
->get_pools());
1276 _scan_requests(homeless_session
, false, false, NULL
,
1277 need_resend
, need_resend_linger
,
1278 need_resend_command
, sul
);
1280 ldout(cct
, 3) << "handle_osd_map hmm, i want a full map, requesting"
1282 monc
->sub_want("osdmap", 0, CEPH_SUBSCRIBE_ONETIME
);
1288 // make sure need_resend targets reflect latest map
1289 for (auto p
= need_resend
.begin(); p
!= need_resend
.end(); ) {
1291 if (op
->target
.epoch
< osdmap
->get_epoch()) {
1292 ldout(cct
, 10) << __func__
<< " checking op " << p
->first
<< dendl
;
1293 int r
= _calc_target(&op
->target
, nullptr);
1294 if (r
== RECALC_OP_TARGET_POOL_DNE
) {
1295 p
= need_resend
.erase(p
);
1296 _check_op_pool_dne(op
, nullptr);
1305 bool pauserd
= osdmap
->test_flag(CEPH_OSDMAP_PAUSERD
);
1306 bool pausewr
= osdmap
->test_flag(CEPH_OSDMAP_PAUSEWR
) || _osdmap_full_flag()
1307 || _osdmap_has_pool_full();
1310 if (was_pauserd
|| was_pausewr
|| pauserd
|| pausewr
||
1311 osdmap
->get_epoch() < epoch_barrier
) {
1312 _maybe_request_map();
1316 for (map
<ceph_tid_t
, Op
*>::iterator p
= need_resend
.begin();
1317 p
!= need_resend
.end(); ++p
) {
1319 OSDSession
*s
= op
->session
;
1320 bool mapped_session
= false;
1322 int r
= _map_session(&op
->target
, &s
, sul
);
1323 ceph_assert(r
== 0);
1324 mapped_session
= true;
1328 OSDSession::unique_lock
sl(s
->lock
);
1329 if (mapped_session
) {
1330 _session_op_assign(s
, op
);
1332 if (op
->should_resend
) {
1333 if (!op
->session
->is_homeless() && !op
->target
.paused
) {
1334 logger
->inc(l_osdc_op_resend
);
1338 _op_cancel_map_check(op
);
1339 _cancel_linger_op(op
);
1344 for (list
<LingerOp
*>::iterator p
= need_resend_linger
.begin();
1345 p
!= need_resend_linger
.end(); ++p
) {
1347 ceph_assert(op
->session
);
1348 if (!op
->session
->is_homeless()) {
1349 logger
->inc(l_osdc_linger_resend
);
1350 _send_linger(op
, sul
);
1353 for (map
<ceph_tid_t
,CommandOp
*>::iterator p
= need_resend_command
.begin();
1354 p
!= need_resend_command
.end(); ++p
) {
1355 CommandOp
*c
= p
->second
;
1356 if (c
->target
.osd
>= 0) {
1357 _assign_command_session(c
, sul
);
1358 if (c
->session
&& !c
->session
->is_homeless()) {
1366 // finish any Contexts that were waiting on a map update
1367 map
<epoch_t
,list
< pair
< Context
*, int > > >::iterator p
=
1368 waiting_for_map
.begin();
1369 while (p
!= waiting_for_map
.end() &&
1370 p
->first
<= osdmap
->get_epoch()) {
1371 //go through the list and call the onfinish methods
1372 for (list
<pair
<Context
*, int> >::iterator i
= p
->second
.begin();
1373 i
!= p
->second
.end(); ++i
) {
1374 i
->first
->complete(i
->second
);
1376 waiting_for_map
.erase(p
++);
1379 monc
->sub_got("osdmap", osdmap
->get_epoch());
1381 if (!waiting_for_map
.empty()) {
1382 _maybe_request_map();
1386 void Objecter::enable_blacklist_events()
1388 unique_lock
wl(rwlock
);
1390 blacklist_events_enabled
= true;
1393 void Objecter::consume_blacklist_events(std::set
<entity_addr_t
> *events
)
1395 unique_lock
wl(rwlock
);
1397 if (events
->empty()) {
1398 events
->swap(blacklist_events
);
1400 for (const auto &i
: blacklist_events
) {
1403 blacklist_events
.clear();
1407 void Objecter::emit_blacklist_events(const OSDMap::Incremental
&inc
)
1409 if (!blacklist_events_enabled
) {
1413 for (const auto &i
: inc
.new_blacklist
) {
1414 blacklist_events
.insert(i
.first
);
1418 void Objecter::emit_blacklist_events(const OSDMap
&old_osd_map
,
1419 const OSDMap
&new_osd_map
)
1421 if (!blacklist_events_enabled
) {
1425 std::set
<entity_addr_t
> old_set
;
1426 std::set
<entity_addr_t
> new_set
;
1428 old_osd_map
.get_blacklist(&old_set
);
1429 new_osd_map
.get_blacklist(&new_set
);
1431 std::set
<entity_addr_t
> delta_set
;
1432 std::set_difference(
1433 new_set
.begin(), new_set
.end(), old_set
.begin(), old_set
.end(),
1434 std::inserter(delta_set
, delta_set
.begin()));
1435 blacklist_events
.insert(delta_set
.begin(), delta_set
.end());
1440 void Objecter::C_Op_Map_Latest::finish(int r
)
1442 if (r
== -EAGAIN
|| r
== -ECANCELED
)
1445 lgeneric_subdout(objecter
->cct
, objecter
, 10)
1446 << "op_map_latest r=" << r
<< " tid=" << tid
1447 << " latest " << latest
<< dendl
;
1449 Objecter::unique_lock
wl(objecter
->rwlock
);
1451 map
<ceph_tid_t
, Op
*>::iterator iter
=
1452 objecter
->check_latest_map_ops
.find(tid
);
1453 if (iter
== objecter
->check_latest_map_ops
.end()) {
1454 lgeneric_subdout(objecter
->cct
, objecter
, 10)
1455 << "op_map_latest op "<< tid
<< " not found" << dendl
;
1459 Op
*op
= iter
->second
;
1460 objecter
->check_latest_map_ops
.erase(iter
);
1462 lgeneric_subdout(objecter
->cct
, objecter
, 20)
1463 << "op_map_latest op "<< op
<< dendl
;
1465 if (op
->map_dne_bound
== 0)
1466 op
->map_dne_bound
= latest
;
1468 OSDSession::unique_lock
sl(op
->session
->lock
, defer_lock
);
1469 objecter
->_check_op_pool_dne(op
, &sl
);
1474 int Objecter::pool_snap_by_name(int64_t poolid
, const char *snap_name
,
1475 snapid_t
*snap
) const
1477 shared_lock
rl(rwlock
);
1479 auto& pools
= osdmap
->get_pools();
1480 auto iter
= pools
.find(poolid
);
1481 if (iter
== pools
.end()) {
1484 const pg_pool_t
& pg_pool
= iter
->second
;
1485 for (auto p
= pg_pool
.snaps
.begin();
1486 p
!= pg_pool
.snaps
.end();
1488 if (p
->second
.name
== snap_name
) {
1496 int Objecter::pool_snap_get_info(int64_t poolid
, snapid_t snap
,
1497 pool_snap_info_t
*info
) const
1499 shared_lock
rl(rwlock
);
1501 auto& pools
= osdmap
->get_pools();
1502 auto iter
= pools
.find(poolid
);
1503 if (iter
== pools
.end()) {
1506 const pg_pool_t
& pg_pool
= iter
->second
;
1507 auto p
= pg_pool
.snaps
.find(snap
);
1508 if (p
== pg_pool
.snaps
.end())
1515 int Objecter::pool_snap_list(int64_t poolid
, vector
<uint64_t> *snaps
)
1517 shared_lock
rl(rwlock
);
1519 const pg_pool_t
*pi
= osdmap
->get_pg_pool(poolid
);
1522 for (map
<snapid_t
,pool_snap_info_t
>::const_iterator p
= pi
->snaps
.begin();
1523 p
!= pi
->snaps
.end();
1525 snaps
->push_back(p
->first
);
1530 // sl may be unlocked.
1531 void Objecter::_check_op_pool_dne(Op
*op
, unique_lock
*sl
)
1533 // rwlock is locked unique
1535 if (op
->target
.pool_ever_existed
) {
1536 // the pool previously existed and now it does not, which means it
1538 op
->map_dne_bound
= osdmap
->get_epoch();
1539 ldout(cct
, 10) << "check_op_pool_dne tid " << op
->tid
1540 << " pool previously exists but now does not"
1543 ldout(cct
, 10) << "check_op_pool_dne tid " << op
->tid
1544 << " current " << osdmap
->get_epoch()
1545 << " map_dne_bound " << op
->map_dne_bound
1548 if (op
->map_dne_bound
> 0) {
1549 if (osdmap
->get_epoch() >= op
->map_dne_bound
) {
1550 // we had a new enough map
1551 ldout(cct
, 10) << "check_op_pool_dne tid " << op
->tid
1552 << " concluding pool " << op
->target
.base_pgid
.pool()
1556 op
->onfinish
->complete(-ENOENT
);
1559 OSDSession
*s
= op
->session
;
1561 ceph_assert(s
!= NULL
);
1562 ceph_assert(sl
->mutex() == &s
->lock
);
1563 bool session_locked
= sl
->owns_lock();
1564 if (!session_locked
) {
1568 if (!session_locked
) {
1572 _finish_op(op
, 0); // no session
1576 _send_op_map_check(op
);
1580 void Objecter::_send_op_map_check(Op
*op
)
1582 // rwlock is locked unique
1584 if (check_latest_map_ops
.count(op
->tid
) == 0) {
1586 check_latest_map_ops
[op
->tid
] = op
;
1587 C_Op_Map_Latest
*c
= new C_Op_Map_Latest(this, op
->tid
);
1588 monc
->get_version("osdmap", &c
->latest
, NULL
, c
);
1592 void Objecter::_op_cancel_map_check(Op
*op
)
1594 // rwlock is locked unique
1595 map
<ceph_tid_t
, Op
*>::iterator iter
=
1596 check_latest_map_ops
.find(op
->tid
);
1597 if (iter
!= check_latest_map_ops
.end()) {
1598 Op
*op
= iter
->second
;
1600 check_latest_map_ops
.erase(iter
);
1604 // linger pool check
1606 void Objecter::C_Linger_Map_Latest::finish(int r
)
1608 if (r
== -EAGAIN
|| r
== -ECANCELED
) {
1609 // ignore callback; we will retry in resend_mon_ops()
1613 unique_lock
wl(objecter
->rwlock
);
1615 map
<uint64_t, LingerOp
*>::iterator iter
=
1616 objecter
->check_latest_map_lingers
.find(linger_id
);
1617 if (iter
== objecter
->check_latest_map_lingers
.end()) {
1621 LingerOp
*op
= iter
->second
;
1622 objecter
->check_latest_map_lingers
.erase(iter
);
1624 if (op
->map_dne_bound
== 0)
1625 op
->map_dne_bound
= latest
;
1628 objecter
->_check_linger_pool_dne(op
, &unregister
);
1631 objecter
->_linger_cancel(op
);
1637 void Objecter::_check_linger_pool_dne(LingerOp
*op
, bool *need_unregister
)
1639 // rwlock is locked unique
1641 *need_unregister
= false;
1643 if (op
->register_gen
> 0) {
1644 ldout(cct
, 10) << "_check_linger_pool_dne linger_id " << op
->linger_id
1645 << " pool previously existed but now does not"
1647 op
->map_dne_bound
= osdmap
->get_epoch();
1649 ldout(cct
, 10) << "_check_linger_pool_dne linger_id " << op
->linger_id
1650 << " current " << osdmap
->get_epoch()
1651 << " map_dne_bound " << op
->map_dne_bound
1654 if (op
->map_dne_bound
> 0) {
1655 if (osdmap
->get_epoch() >= op
->map_dne_bound
) {
1656 LingerOp::unique_lock wl
{op
->watch_lock
};
1657 if (op
->on_reg_commit
) {
1658 op
->on_reg_commit
->complete(-ENOENT
);
1659 op
->on_reg_commit
= nullptr;
1661 if (op
->on_notify_finish
) {
1662 op
->on_notify_finish
->complete(-ENOENT
);
1663 op
->on_notify_finish
= nullptr;
1665 *need_unregister
= true;
1668 _send_linger_map_check(op
);
1672 void Objecter::_send_linger_map_check(LingerOp
*op
)
1675 if (check_latest_map_lingers
.count(op
->linger_id
) == 0) {
1677 check_latest_map_lingers
[op
->linger_id
] = op
;
1678 C_Linger_Map_Latest
*c
= new C_Linger_Map_Latest(this, op
->linger_id
);
1679 monc
->get_version("osdmap", &c
->latest
, NULL
, c
);
1683 void Objecter::_linger_cancel_map_check(LingerOp
*op
)
1685 // rwlock is locked unique
1687 map
<uint64_t, LingerOp
*>::iterator iter
=
1688 check_latest_map_lingers
.find(op
->linger_id
);
1689 if (iter
!= check_latest_map_lingers
.end()) {
1690 LingerOp
*op
= iter
->second
;
1692 check_latest_map_lingers
.erase(iter
);
1696 // command pool check
1698 void Objecter::C_Command_Map_Latest::finish(int r
)
1700 if (r
== -EAGAIN
|| r
== -ECANCELED
) {
1701 // ignore callback; we will retry in resend_mon_ops()
1705 unique_lock
wl(objecter
->rwlock
);
1707 map
<uint64_t, CommandOp
*>::iterator iter
=
1708 objecter
->check_latest_map_commands
.find(tid
);
1709 if (iter
== objecter
->check_latest_map_commands
.end()) {
1713 CommandOp
*c
= iter
->second
;
1714 objecter
->check_latest_map_commands
.erase(iter
);
1716 if (c
->map_dne_bound
== 0)
1717 c
->map_dne_bound
= latest
;
1719 OSDSession::unique_lock
sul(c
->session
->lock
);
1720 objecter
->_check_command_map_dne(c
);
1726 void Objecter::_check_command_map_dne(CommandOp
*c
)
1728 // rwlock is locked unique
1729 // session is locked unique
1731 ldout(cct
, 10) << "_check_command_map_dne tid " << c
->tid
1732 << " current " << osdmap
->get_epoch()
1733 << " map_dne_bound " << c
->map_dne_bound
1735 if (c
->map_dne_bound
> 0) {
1736 if (osdmap
->get_epoch() >= c
->map_dne_bound
) {
1737 _finish_command(c
, c
->map_check_error
, c
->map_check_error_str
);
1740 _send_command_map_check(c
);
1744 void Objecter::_send_command_map_check(CommandOp
*c
)
1746 // rwlock is locked unique
1747 // session is locked unique
1750 if (check_latest_map_commands
.count(c
->tid
) == 0) {
1752 check_latest_map_commands
[c
->tid
] = c
;
1753 C_Command_Map_Latest
*f
= new C_Command_Map_Latest(this, c
->tid
);
1754 monc
->get_version("osdmap", &f
->latest
, NULL
, f
);
1758 void Objecter::_command_cancel_map_check(CommandOp
*c
)
1760 // rwlock is locked uniqe
1762 map
<uint64_t, CommandOp
*>::iterator iter
=
1763 check_latest_map_commands
.find(c
->tid
);
1764 if (iter
!= check_latest_map_commands
.end()) {
1765 CommandOp
*c
= iter
->second
;
1767 check_latest_map_commands
.erase(iter
);
1773 * Look up OSDSession by OSD id.
1775 * @returns 0 on success, or -EAGAIN if the lock context requires
1776 * promotion to write.
1778 int Objecter::_get_session(int osd
, OSDSession
**session
, shunique_lock
& sul
)
1780 ceph_assert(sul
&& sul
.mutex() == &rwlock
);
1783 *session
= homeless_session
;
1784 ldout(cct
, 20) << __func__
<< " osd=" << osd
<< " returning homeless"
1789 map
<int,OSDSession
*>::iterator p
= osd_sessions
.find(osd
);
1790 if (p
!= osd_sessions
.end()) {
1791 OSDSession
*s
= p
->second
;
1794 ldout(cct
, 20) << __func__
<< " s=" << s
<< " osd=" << osd
<< " "
1795 << s
->get_nref() << dendl
;
1798 if (!sul
.owns_lock()) {
1801 OSDSession
*s
= new OSDSession(cct
, osd
);
1802 osd_sessions
[osd
] = s
;
1803 s
->con
= messenger
->connect_to_osd(osdmap
->get_addrs(osd
));
1804 s
->con
->set_priv(RefCountedPtr
{s
});
1805 logger
->inc(l_osdc_osd_session_open
);
1806 logger
->set(l_osdc_osd_sessions
, osd_sessions
.size());
1809 ldout(cct
, 20) << __func__
<< " s=" << s
<< " osd=" << osd
<< " "
1810 << s
->get_nref() << dendl
;
1814 void Objecter::put_session(Objecter::OSDSession
*s
)
1816 if (s
&& !s
->is_homeless()) {
1817 ldout(cct
, 20) << __func__
<< " s=" << s
<< " osd=" << s
->osd
<< " "
1818 << s
->get_nref() << dendl
;
1823 void Objecter::get_session(Objecter::OSDSession
*s
)
1825 ceph_assert(s
!= NULL
);
1827 if (!s
->is_homeless()) {
1828 ldout(cct
, 20) << __func__
<< " s=" << s
<< " osd=" << s
->osd
<< " "
1829 << s
->get_nref() << dendl
;
1834 void Objecter::_reopen_session(OSDSession
*s
)
1836 // rwlock is locked unique
1837 // s->lock is locked
1839 auto addrs
= osdmap
->get_addrs(s
->osd
);
1840 ldout(cct
, 10) << "reopen_session osd." << s
->osd
<< " session, addr now "
1843 s
->con
->set_priv(NULL
);
1844 s
->con
->mark_down();
1845 logger
->inc(l_osdc_osd_session_close
);
1847 s
->con
= messenger
->connect_to_osd(addrs
);
1848 s
->con
->set_priv(RefCountedPtr
{s
});
1850 logger
->inc(l_osdc_osd_session_open
);
1853 void Objecter::close_session(OSDSession
*s
)
1855 // rwlock is locked unique
1857 ldout(cct
, 10) << "close_session for osd." << s
->osd
<< dendl
;
1859 s
->con
->set_priv(NULL
);
1860 s
->con
->mark_down();
1861 logger
->inc(l_osdc_osd_session_close
);
1863 OSDSession::unique_lock
sl(s
->lock
);
1865 std::list
<LingerOp
*> homeless_lingers
;
1866 std::list
<CommandOp
*> homeless_commands
;
1867 std::list
<Op
*> homeless_ops
;
1869 while (!s
->linger_ops
.empty()) {
1870 std::map
<uint64_t, LingerOp
*>::iterator i
= s
->linger_ops
.begin();
1871 ldout(cct
, 10) << " linger_op " << i
->first
<< dendl
;
1872 homeless_lingers
.push_back(i
->second
);
1873 _session_linger_op_remove(s
, i
->second
);
1876 while (!s
->ops
.empty()) {
1877 std::map
<ceph_tid_t
, Op
*>::iterator i
= s
->ops
.begin();
1878 ldout(cct
, 10) << " op " << i
->first
<< dendl
;
1879 homeless_ops
.push_back(i
->second
);
1880 _session_op_remove(s
, i
->second
);
1883 while (!s
->command_ops
.empty()) {
1884 std::map
<ceph_tid_t
, CommandOp
*>::iterator i
= s
->command_ops
.begin();
1885 ldout(cct
, 10) << " command_op " << i
->first
<< dendl
;
1886 homeless_commands
.push_back(i
->second
);
1887 _session_command_op_remove(s
, i
->second
);
1890 osd_sessions
.erase(s
->osd
);
1894 // Assign any leftover ops to the homeless session
1896 OSDSession::unique_lock
hsl(homeless_session
->lock
);
1897 for (std::list
<LingerOp
*>::iterator i
= homeless_lingers
.begin();
1898 i
!= homeless_lingers
.end(); ++i
) {
1899 _session_linger_op_assign(homeless_session
, *i
);
1901 for (std::list
<Op
*>::iterator i
= homeless_ops
.begin();
1902 i
!= homeless_ops
.end(); ++i
) {
1903 _session_op_assign(homeless_session
, *i
);
1905 for (std::list
<CommandOp
*>::iterator i
= homeless_commands
.begin();
1906 i
!= homeless_commands
.end(); ++i
) {
1907 _session_command_op_assign(homeless_session
, *i
);
1911 logger
->set(l_osdc_osd_sessions
, osd_sessions
.size());
1914 void Objecter::wait_for_osd_map(epoch_t e
)
1916 unique_lock
l(rwlock
);
1917 if (osdmap
->get_epoch() >= e
) {
1922 // Leave this since it goes with C_SafeCond
1923 ceph::mutex lock
= ceph::make_mutex("");
1924 ceph::condition_variable cond
;
1926 std::unique_lock mlock
{lock
};
1927 C_SafeCond
*context
= new C_SafeCond(lock
, cond
, &done
, NULL
);
1928 waiting_for_map
[e
].push_back(pair
<Context
*, int>(context
, 0));
1930 cond
.wait(mlock
, [&done
] { return done
; });
1933 struct C_Objecter_GetVersion
: public Context
{
1935 uint64_t oldest
, newest
;
1937 C_Objecter_GetVersion(Objecter
*o
, Context
*c
)
1938 : objecter(o
), oldest(0), newest(0), fin(c
) {}
1939 void finish(int r
) override
{
1941 objecter
->get_latest_version(oldest
, newest
, fin
);
1942 } else if (r
== -EAGAIN
) { // try again as instructed
1943 objecter
->wait_for_latest_osdmap(fin
);
1945 // it doesn't return any other error codes!
1951 void Objecter::wait_for_latest_osdmap(Context
*fin
)
1953 ldout(cct
, 10) << __func__
<< dendl
;
1954 C_Objecter_GetVersion
*c
= new C_Objecter_GetVersion(this, fin
);
1955 monc
->get_version("osdmap", &c
->newest
, &c
->oldest
, c
);
1958 void Objecter::get_latest_version(epoch_t oldest
, epoch_t newest
, Context
*fin
)
1960 unique_lock
wl(rwlock
);
1961 if (osdmap
->get_epoch() >= newest
) {
1962 ldout(cct
, 10) << __func__
<< " latest " << newest
<< ", have it" << dendl
;
1969 ldout(cct
, 10) << __func__
<< " latest " << newest
<< ", waiting" << dendl
;
1970 _wait_for_new_map(fin
, newest
, 0);
1973 void Objecter::maybe_request_map()
1975 shared_lock
rl(rwlock
);
1976 _maybe_request_map();
1979 void Objecter::_maybe_request_map()
1983 if (_osdmap_full_flag()
1984 || osdmap
->test_flag(CEPH_OSDMAP_PAUSERD
)
1985 || osdmap
->test_flag(CEPH_OSDMAP_PAUSEWR
)) {
1986 ldout(cct
, 10) << "_maybe_request_map subscribing (continuous) to next "
1987 "osd map (FULL flag is set)" << dendl
;
1990 << "_maybe_request_map subscribing (onetime) to next osd map" << dendl
;
1991 flag
= CEPH_SUBSCRIBE_ONETIME
;
1993 epoch_t epoch
= osdmap
->get_epoch() ? osdmap
->get_epoch()+1 : 0;
1994 if (monc
->sub_want("osdmap", epoch
, flag
)) {
1999 void Objecter::_wait_for_new_map(Context
*c
, epoch_t epoch
, int err
)
2001 // rwlock is locked unique
2002 waiting_for_map
[epoch
].push_back(pair
<Context
*, int>(c
, err
));
2003 _maybe_request_map();
2008 * Use this together with wait_for_map: this is a pre-check to avoid
2009 * allocating a Context for wait_for_map if we can see that we
2010 * definitely already have the epoch.
2012 * This does *not* replace the need to handle the return value of
2013 * wait_for_map: just because we don't have it in this pre-check
2014 * doesn't mean we won't have it when calling back into wait_for_map,
2015 * since the objecter lock is dropped in between.
2017 bool Objecter::have_map(const epoch_t epoch
)
2019 shared_lock
rl(rwlock
);
2020 if (osdmap
->get_epoch() >= epoch
) {
2027 bool Objecter::wait_for_map(epoch_t epoch
, Context
*c
, int err
)
2029 unique_lock
wl(rwlock
);
2030 if (osdmap
->get_epoch() >= epoch
) {
2033 _wait_for_new_map(c
, epoch
, err
);
2037 void Objecter::_kick_requests(OSDSession
*session
,
2038 map
<uint64_t, LingerOp
*>& lresend
)
2040 // rwlock is locked unique
2043 session
->backoffs
.clear();
2044 session
->backoffs_by_id
.clear();
2047 map
<ceph_tid_t
,Op
*> resend
; // resend in tid order
2048 for (map
<ceph_tid_t
, Op
*>::iterator p
= session
->ops
.begin();
2049 p
!= session
->ops
.end();) {
2052 if (op
->should_resend
) {
2053 if (!op
->target
.paused
)
2054 resend
[op
->tid
] = op
;
2056 _op_cancel_map_check(op
);
2057 _cancel_linger_op(op
);
2061 logger
->inc(l_osdc_op_resend
, resend
.size());
2062 while (!resend
.empty()) {
2063 _send_op(resend
.begin()->second
);
2064 resend
.erase(resend
.begin());
2068 logger
->inc(l_osdc_linger_resend
, session
->linger_ops
.size());
2069 for (map
<ceph_tid_t
, LingerOp
*>::iterator j
= session
->linger_ops
.begin();
2070 j
!= session
->linger_ops
.end(); ++j
) {
2071 LingerOp
*op
= j
->second
;
2073 ceph_assert(lresend
.count(j
->first
) == 0);
2074 lresend
[j
->first
] = op
;
2078 logger
->inc(l_osdc_command_resend
, session
->command_ops
.size());
2079 map
<uint64_t,CommandOp
*> cresend
; // resend in order
2080 for (map
<ceph_tid_t
, CommandOp
*>::iterator k
= session
->command_ops
.begin();
2081 k
!= session
->command_ops
.end(); ++k
) {
2082 cresend
[k
->first
] = k
->second
;
2084 while (!cresend
.empty()) {
2085 _send_command(cresend
.begin()->second
);
2086 cresend
.erase(cresend
.begin());
2090 void Objecter::_linger_ops_resend(map
<uint64_t, LingerOp
*>& lresend
,
2093 ceph_assert(ul
.owns_lock());
2094 shunique_lock
sul(std::move(ul
));
2095 while (!lresend
.empty()) {
2096 LingerOp
*op
= lresend
.begin()->second
;
2097 if (!op
->canceled
) {
2098 _send_linger(op
, sul
);
2101 lresend
.erase(lresend
.begin());
2103 ul
= sul
.release_to_unique();
2106 void Objecter::start_tick()
2108 ceph_assert(tick_event
== 0);
2110 timer
.add_event(ceph::make_timespan(cct
->_conf
->objecter_tick_interval
),
2111 &Objecter::tick
, this);
2114 void Objecter::tick()
2116 shared_lock
rl(rwlock
);
2118 ldout(cct
, 10) << "tick" << dendl
;
2120 // we are only called by C_Tick
2124 // we raced with shutdown
2125 ldout(cct
, 10) << __func__
<< " raced with shutdown" << dendl
;
2129 set
<OSDSession
*> toping
;
2132 // look for laggy requests
2133 auto cutoff
= ceph::coarse_mono_clock::now();
2134 cutoff
-= ceph::make_timespan(cct
->_conf
->objecter_timeout
); // timeout
2136 unsigned laggy_ops
= 0;
2138 for (map
<int,OSDSession
*>::iterator siter
= osd_sessions
.begin();
2139 siter
!= osd_sessions
.end(); ++siter
) {
2140 OSDSession
*s
= siter
->second
;
2141 OSDSession::lock_guard
l(s
->lock
);
2143 for (map
<ceph_tid_t
,Op
*>::iterator p
= s
->ops
.begin();
2147 ceph_assert(op
->session
);
2148 if (op
->stamp
< cutoff
) {
2149 ldout(cct
, 2) << " tid " << p
->first
<< " on osd." << op
->session
->osd
2150 << " is laggy" << dendl
;
2155 for (map
<uint64_t,LingerOp
*>::iterator p
= s
->linger_ops
.begin();
2156 p
!= s
->linger_ops
.end();
2158 LingerOp
*op
= p
->second
;
2159 LingerOp::unique_lock
wl(op
->watch_lock
);
2160 ceph_assert(op
->session
);
2161 ldout(cct
, 10) << " pinging osd that serves lingering tid " << p
->first
2162 << " (osd." << op
->session
->osd
<< ")" << dendl
;
2164 if (op
->is_watch
&& op
->registered
&& !op
->last_error
)
2165 _send_linger_ping(op
);
2167 for (map
<uint64_t,CommandOp
*>::iterator p
= s
->command_ops
.begin();
2168 p
!= s
->command_ops
.end();
2170 CommandOp
*op
= p
->second
;
2171 ceph_assert(op
->session
);
2172 ldout(cct
, 10) << " pinging osd that serves command tid " << p
->first
2173 << " (osd." << op
->session
->osd
<< ")" << dendl
;
2179 if (num_homeless_ops
|| !toping
.empty()) {
2180 _maybe_request_map();
2183 logger
->set(l_osdc_op_laggy
, laggy_ops
);
2184 logger
->set(l_osdc_osd_laggy
, toping
.size());
2186 if (!toping
.empty()) {
2187 // send a ping to these osds, to ensure we detect any session resets
2188 // (osd reply message policy is lossy)
2189 for (set
<OSDSession
*>::const_iterator i
= toping
.begin();
2192 (*i
)->con
->send_message(new MPing
);
2196 // Make sure we don't reschedule if we wake up after shutdown
2198 tick_event
= timer
.reschedule_me(ceph::make_timespan(
2199 cct
->_conf
->objecter_tick_interval
));
2203 void Objecter::resend_mon_ops()
2205 unique_lock
wl(rwlock
);
2207 ldout(cct
, 10) << "resend_mon_ops" << dendl
;
2209 for (map
<ceph_tid_t
,PoolStatOp
*>::iterator p
= poolstat_ops
.begin();
2210 p
!= poolstat_ops
.end();
2212 _poolstat_submit(p
->second
);
2213 logger
->inc(l_osdc_poolstat_resend
);
2216 for (map
<ceph_tid_t
,StatfsOp
*>::iterator p
= statfs_ops
.begin();
2217 p
!= statfs_ops
.end();
2219 _fs_stats_submit(p
->second
);
2220 logger
->inc(l_osdc_statfs_resend
);
2223 for (map
<ceph_tid_t
,PoolOp
*>::iterator p
= pool_ops
.begin();
2224 p
!= pool_ops
.end();
2226 _pool_op_submit(p
->second
);
2227 logger
->inc(l_osdc_poolop_resend
);
2230 for (map
<ceph_tid_t
, Op
*>::iterator p
= check_latest_map_ops
.begin();
2231 p
!= check_latest_map_ops
.end();
2233 C_Op_Map_Latest
*c
= new C_Op_Map_Latest(this, p
->second
->tid
);
2234 monc
->get_version("osdmap", &c
->latest
, NULL
, c
);
2237 for (map
<uint64_t, LingerOp
*>::iterator p
= check_latest_map_lingers
.begin();
2238 p
!= check_latest_map_lingers
.end();
2240 C_Linger_Map_Latest
*c
2241 = new C_Linger_Map_Latest(this, p
->second
->linger_id
);
2242 monc
->get_version("osdmap", &c
->latest
, NULL
, c
);
2245 for (map
<uint64_t, CommandOp
*>::iterator p
2246 = check_latest_map_commands
.begin();
2247 p
!= check_latest_map_commands
.end();
2249 C_Command_Map_Latest
*c
= new C_Command_Map_Latest(this, p
->second
->tid
);
2250 monc
->get_version("osdmap", &c
->latest
, NULL
, c
);
2254 // read | write ---------------------------
2256 void Objecter::op_submit(Op
*op
, ceph_tid_t
*ptid
, int *ctx_budget
)
2258 shunique_lock
rl(rwlock
, ceph::acquire_shared
);
2262 op
->trace
.event("op submit");
2263 _op_submit_with_budget(op
, rl
, ptid
, ctx_budget
);
2266 void Objecter::_op_submit_with_budget(Op
*op
, shunique_lock
& sul
,
2270 ceph_assert(initialized
);
2272 ceph_assert(op
->ops
.size() == op
->out_bl
.size());
2273 ceph_assert(op
->ops
.size() == op
->out_rval
.size());
2274 ceph_assert(op
->ops
.size() == op
->out_handler
.size());
2276 // throttle. before we look at any state, because
2277 // _take_op_budget() may drop our lock while it blocks.
2278 if (!op
->ctx_budgeted
|| (ctx_budget
&& (*ctx_budget
== -1))) {
2279 int op_budget
= _take_op_budget(op
, sul
);
2280 // take and pass out the budget for the first OP
2281 // in the context session
2282 if (ctx_budget
&& (*ctx_budget
== -1)) {
2283 *ctx_budget
= op_budget
;
2287 if (osd_timeout
> timespan(0)) {
2289 op
->tid
= ++last_tid
;
2291 op
->ontimeout
= timer
.add_event(osd_timeout
,
2293 op_cancel(tid
, -ETIMEDOUT
); });
2296 _op_submit(op
, sul
, ptid
);
2299 void Objecter::_send_op_account(Op
*op
)
2303 // add to gather set(s)
2307 ldout(cct
, 20) << " note: not requesting reply" << dendl
;
2310 logger
->inc(l_osdc_op_active
);
2311 logger
->inc(l_osdc_op
);
2313 if ((op
->target
.flags
& (CEPH_OSD_FLAG_READ
| CEPH_OSD_FLAG_WRITE
)) ==
2314 (CEPH_OSD_FLAG_READ
|CEPH_OSD_FLAG_WRITE
))
2315 logger
->inc(l_osdc_op_rmw
);
2316 else if (op
->target
.flags
& CEPH_OSD_FLAG_WRITE
)
2317 logger
->inc(l_osdc_op_w
);
2318 else if (op
->target
.flags
& CEPH_OSD_FLAG_READ
)
2319 logger
->inc(l_osdc_op_r
);
2321 if (op
->target
.flags
& CEPH_OSD_FLAG_PGOP
)
2322 logger
->inc(l_osdc_op_pg
);
2324 for (vector
<OSDOp
>::iterator p
= op
->ops
.begin(); p
!= op
->ops
.end(); ++p
) {
2325 int code
= l_osdc_osdop_other
;
2327 case CEPH_OSD_OP_STAT
: code
= l_osdc_osdop_stat
; break;
2328 case CEPH_OSD_OP_CREATE
: code
= l_osdc_osdop_create
; break;
2329 case CEPH_OSD_OP_READ
: code
= l_osdc_osdop_read
; break;
2330 case CEPH_OSD_OP_WRITE
: code
= l_osdc_osdop_write
; break;
2331 case CEPH_OSD_OP_WRITEFULL
: code
= l_osdc_osdop_writefull
; break;
2332 case CEPH_OSD_OP_WRITESAME
: code
= l_osdc_osdop_writesame
; break;
2333 case CEPH_OSD_OP_APPEND
: code
= l_osdc_osdop_append
; break;
2334 case CEPH_OSD_OP_ZERO
: code
= l_osdc_osdop_zero
; break;
2335 case CEPH_OSD_OP_TRUNCATE
: code
= l_osdc_osdop_truncate
; break;
2336 case CEPH_OSD_OP_DELETE
: code
= l_osdc_osdop_delete
; break;
2337 case CEPH_OSD_OP_MAPEXT
: code
= l_osdc_osdop_mapext
; break;
2338 case CEPH_OSD_OP_SPARSE_READ
: code
= l_osdc_osdop_sparse_read
; break;
2339 case CEPH_OSD_OP_GETXATTR
: code
= l_osdc_osdop_getxattr
; break;
2340 case CEPH_OSD_OP_SETXATTR
: code
= l_osdc_osdop_setxattr
; break;
2341 case CEPH_OSD_OP_CMPXATTR
: code
= l_osdc_osdop_cmpxattr
; break;
2342 case CEPH_OSD_OP_RMXATTR
: code
= l_osdc_osdop_rmxattr
; break;
2343 case CEPH_OSD_OP_RESETXATTRS
: code
= l_osdc_osdop_resetxattrs
; break;
2345 // OMAP read operations
2346 case CEPH_OSD_OP_OMAPGETVALS
:
2347 case CEPH_OSD_OP_OMAPGETKEYS
:
2348 case CEPH_OSD_OP_OMAPGETHEADER
:
2349 case CEPH_OSD_OP_OMAPGETVALSBYKEYS
:
2350 case CEPH_OSD_OP_OMAP_CMP
: code
= l_osdc_osdop_omap_rd
; break;
2352 // OMAP write operations
2353 case CEPH_OSD_OP_OMAPSETVALS
:
2354 case CEPH_OSD_OP_OMAPSETHEADER
: code
= l_osdc_osdop_omap_wr
; break;
2356 // OMAP del operations
2357 case CEPH_OSD_OP_OMAPCLEAR
:
2358 case CEPH_OSD_OP_OMAPRMKEYS
: code
= l_osdc_osdop_omap_del
; break;
2360 case CEPH_OSD_OP_CALL
: code
= l_osdc_osdop_call
; break;
2361 case CEPH_OSD_OP_WATCH
: code
= l_osdc_osdop_watch
; break;
2362 case CEPH_OSD_OP_NOTIFY
: code
= l_osdc_osdop_notify
; break;
2369 void Objecter::_op_submit(Op
*op
, shunique_lock
& sul
, ceph_tid_t
*ptid
)
2373 ldout(cct
, 10) << __func__
<< " op " << op
<< dendl
;
2376 ceph_assert(op
->session
== NULL
);
2377 OSDSession
*s
= NULL
;
2379 bool check_for_latest_map
= _calc_target(&op
->target
, nullptr)
2380 == RECALC_OP_TARGET_POOL_DNE
;
2382 // Try to get a session, including a retry if we need to take write lock
2383 int r
= _get_session(op
->target
.osd
, &s
, sul
);
2385 (check_for_latest_map
&& sul
.owns_lock_shared()) ||
2386 cct
->_conf
->objecter_debug_inject_relock_delay
) {
2387 epoch_t orig_epoch
= osdmap
->get_epoch();
2389 if (cct
->_conf
->objecter_debug_inject_relock_delay
) {
2393 if (orig_epoch
!= osdmap
->get_epoch()) {
2394 // map changed; recalculate mapping
2395 ldout(cct
, 10) << __func__
<< " relock raced with osdmap, recalc target"
2397 check_for_latest_map
= _calc_target(&op
->target
, nullptr)
2398 == RECALC_OP_TARGET_POOL_DNE
;
2407 ceph_assert(s
== NULL
);
2408 r
= _get_session(op
->target
.osd
, &s
, sul
);
2410 ceph_assert(r
== 0);
2411 ceph_assert(s
); // may be homeless
2413 _send_op_account(op
);
2417 ceph_assert(op
->target
.flags
& (CEPH_OSD_FLAG_READ
|CEPH_OSD_FLAG_WRITE
));
2419 if (pool_full_try
) {
2420 op
->target
.flags
|= CEPH_OSD_FLAG_FULL_TRY
;
2423 bool need_send
= false;
2424 if (op
->target
.paused
) {
2425 ldout(cct
, 10) << " tid " << op
->tid
<< " op " << op
<< " is paused"
2427 _maybe_request_map();
2428 } else if (!s
->is_homeless()) {
2431 _maybe_request_map();
2434 OSDSession::unique_lock
sl(s
->lock
);
2436 op
->tid
= ++last_tid
;
2438 ldout(cct
, 10) << "_op_submit oid " << op
->target
.base_oid
2439 << " '" << op
->target
.base_oloc
<< "' '"
2440 << op
->target
.target_oloc
<< "' " << op
->ops
<< " tid "
2441 << op
->tid
<< " osd." << (!s
->is_homeless() ? s
->osd
: -1)
2444 _session_op_assign(s
, op
);
2450 // Last chance to touch Op here, after giving up session lock it can
2451 // be freed at any time by response handler.
2452 ceph_tid_t tid
= op
->tid
;
2453 if (check_for_latest_map
) {
2454 _send_op_map_check(op
);
2463 ldout(cct
, 5) << num_in_flight
<< " in flight" << dendl
;
2466 int Objecter::op_cancel(OSDSession
*s
, ceph_tid_t tid
, int r
)
2468 ceph_assert(initialized
);
2470 OSDSession::unique_lock
sl(s
->lock
);
2472 map
<ceph_tid_t
, Op
*>::iterator p
= s
->ops
.find(tid
);
2473 if (p
== s
->ops
.end()) {
2474 ldout(cct
, 10) << __func__
<< " tid " << tid
<< " dne in session "
2481 ldout(cct
, 20) << " revoking rx ceph::buffer for " << tid
2482 << " on " << s
->con
<< dendl
;
2483 s
->con
->revoke_rx_buffer(tid
);
2487 ldout(cct
, 10) << __func__
<< " tid " << tid
<< " in session " << s
->osd
2492 op
->onfinish
->complete(r
);
2493 op
->onfinish
= NULL
;
2495 _op_cancel_map_check(op
);
2502 int Objecter::op_cancel(ceph_tid_t tid
, int r
)
2506 unique_lock
wl(rwlock
);
2507 ret
= _op_cancel(tid
, r
);
2512 int Objecter::op_cancel(const vector
<ceph_tid_t
>& tids
, int r
)
2514 unique_lock
wl(rwlock
);
2515 ldout(cct
,10) << __func__
<< " " << tids
<< dendl
;
2516 for (auto tid
: tids
) {
2522 int Objecter::_op_cancel(ceph_tid_t tid
, int r
)
2526 ldout(cct
, 5) << __func__
<< ": cancelling tid " << tid
<< " r=" << r
2531 for (map
<int, OSDSession
*>::iterator siter
= osd_sessions
.begin();
2532 siter
!= osd_sessions
.end(); ++siter
) {
2533 OSDSession
*s
= siter
->second
;
2534 OSDSession::shared_lock
sl(s
->lock
);
2535 if (s
->ops
.find(tid
) != s
->ops
.end()) {
2537 ret
= op_cancel(s
, tid
, r
);
2538 if (ret
== -ENOENT
) {
2539 /* oh no! raced, maybe tid moved to another session, restarting */
2546 ldout(cct
, 5) << __func__
<< ": tid " << tid
2547 << " not found in live sessions" << dendl
;
2549 // Handle case where the op is in homeless session
2550 OSDSession::shared_lock
sl(homeless_session
->lock
);
2551 if (homeless_session
->ops
.find(tid
) != homeless_session
->ops
.end()) {
2553 ret
= op_cancel(homeless_session
, tid
, r
);
2554 if (ret
== -ENOENT
) {
2555 /* oh no! raced, maybe tid moved to another session, restarting */
2564 ldout(cct
, 5) << __func__
<< ": tid " << tid
2565 << " not found in homeless session" << dendl
;
2571 epoch_t
Objecter::op_cancel_writes(int r
, int64_t pool
)
2573 unique_lock
wl(rwlock
);
2575 std::vector
<ceph_tid_t
> to_cancel
;
2578 for (map
<int, OSDSession
*>::iterator siter
= osd_sessions
.begin();
2579 siter
!= osd_sessions
.end(); ++siter
) {
2580 OSDSession
*s
= siter
->second
;
2581 OSDSession::shared_lock
sl(s
->lock
);
2582 for (map
<ceph_tid_t
, Op
*>::iterator op_i
= s
->ops
.begin();
2583 op_i
!= s
->ops
.end(); ++op_i
) {
2584 if (op_i
->second
->target
.flags
& CEPH_OSD_FLAG_WRITE
2585 && (pool
== -1 || op_i
->second
->target
.target_oloc
.pool
== pool
)) {
2586 to_cancel
.push_back(op_i
->first
);
2591 for (std::vector
<ceph_tid_t
>::iterator titer
= to_cancel
.begin();
2592 titer
!= to_cancel
.end();
2594 int cancel_result
= op_cancel(s
, *titer
, r
);
2595 // We hold rwlock across search and cancellation, so cancels
2596 // should always succeed
2597 ceph_assert(cancel_result
== 0);
2599 if (!found
&& to_cancel
.size())
2604 const epoch_t epoch
= osdmap
->get_epoch();
2615 bool Objecter::is_pg_changed(
2617 const vector
<int>& oldacting
,
2619 const vector
<int>& newacting
,
2622 if (OSDMap::primary_changed_broken( // https://tracker.ceph.com/issues/43213
2628 if (any_change
&& oldacting
!= newacting
)
2630 return false; // same primary (tho replicas may have changed)
2633 bool Objecter::target_should_be_paused(op_target_t
*t
)
2635 const pg_pool_t
*pi
= osdmap
->get_pg_pool(t
->base_oloc
.pool
);
2636 bool pauserd
= osdmap
->test_flag(CEPH_OSDMAP_PAUSERD
);
2637 bool pausewr
= osdmap
->test_flag(CEPH_OSDMAP_PAUSEWR
) ||
2638 (t
->respects_full() && (_osdmap_full_flag() || _osdmap_pool_full(*pi
)));
2640 return (t
->flags
& CEPH_OSD_FLAG_READ
&& pauserd
) ||
2641 (t
->flags
& CEPH_OSD_FLAG_WRITE
&& pausewr
) ||
2642 (osdmap
->get_epoch() < epoch_barrier
);
2646 * Locking public accessor for _osdmap_full_flag
2648 bool Objecter::osdmap_full_flag() const
2650 shared_lock
rl(rwlock
);
2652 return _osdmap_full_flag();
2655 bool Objecter::osdmap_pool_full(const int64_t pool_id
) const
2657 shared_lock
rl(rwlock
);
2659 if (_osdmap_full_flag()) {
2663 return _osdmap_pool_full(pool_id
);
2666 bool Objecter::_osdmap_pool_full(const int64_t pool_id
) const
2668 const pg_pool_t
*pool
= osdmap
->get_pg_pool(pool_id
);
2670 ldout(cct
, 4) << __func__
<< ": DNE pool " << pool_id
<< dendl
;
2674 return _osdmap_pool_full(*pool
);
2677 bool Objecter::_osdmap_has_pool_full() const
2679 for (map
<int64_t, pg_pool_t
>::const_iterator it
2680 = osdmap
->get_pools().begin();
2681 it
!= osdmap
->get_pools().end(); ++it
) {
2682 if (_osdmap_pool_full(it
->second
))
2689 * Wrapper around osdmap->test_flag for special handling of the FULL flag.
2691 bool Objecter::_osdmap_full_flag() const
2693 // Ignore the FULL flag if the caller does not have honor_osdmap_full
2694 return osdmap
->test_flag(CEPH_OSDMAP_FULL
) && honor_pool_full
;
2697 void Objecter::update_pool_full_map(map
<int64_t, bool>& pool_full_map
)
2699 for (map
<int64_t, pg_pool_t
>::const_iterator it
2700 = osdmap
->get_pools().begin();
2701 it
!= osdmap
->get_pools().end(); ++it
) {
2702 if (pool_full_map
.find(it
->first
) == pool_full_map
.end()) {
2703 pool_full_map
[it
->first
] = _osdmap_pool_full(it
->second
);
2705 pool_full_map
[it
->first
] = _osdmap_pool_full(it
->second
) ||
2706 pool_full_map
[it
->first
];
2711 int64_t Objecter::get_object_hash_position(int64_t pool
, const string
& key
,
2714 shared_lock
rl(rwlock
);
2715 const pg_pool_t
*p
= osdmap
->get_pg_pool(pool
);
2718 return p
->hash_key(key
, ns
);
2721 int64_t Objecter::get_object_pg_hash_position(int64_t pool
, const string
& key
,
2724 shared_lock
rl(rwlock
);
2725 const pg_pool_t
*p
= osdmap
->get_pg_pool(pool
);
2728 return p
->raw_hash_to_pg(p
->hash_key(key
, ns
));
2731 void Objecter::_prune_snapc(
2732 const mempool::osdmap::map
<int64_t,
2733 snap_interval_set_t
>& new_removed_snaps
,
2737 auto i
= new_removed_snaps
.find(op
->target
.base_pgid
.pool());
2738 if (i
!= new_removed_snaps
.end()) {
2739 for (auto s
: op
->snapc
.snaps
) {
2740 if (i
->second
.contains(s
)) {
2746 vector
<snapid_t
> new_snaps
;
2747 for (auto s
: op
->snapc
.snaps
) {
2748 if (!i
->second
.contains(s
)) {
2749 new_snaps
.push_back(s
);
2752 op
->snapc
.snaps
.swap(new_snaps
);
2753 ldout(cct
,10) << __func__
<< " op " << op
->tid
<< " snapc " << op
->snapc
2754 << " (was " << new_snaps
<< ")" << dendl
;
2759 int Objecter::_calc_target(op_target_t
*t
, Connection
*con
, bool any_change
)
2762 bool is_read
= t
->flags
& CEPH_OSD_FLAG_READ
;
2763 bool is_write
= t
->flags
& CEPH_OSD_FLAG_WRITE
;
2764 t
->epoch
= osdmap
->get_epoch();
2765 ldout(cct
,20) << __func__
<< " epoch " << t
->epoch
2766 << " base " << t
->base_oid
<< " " << t
->base_oloc
2767 << " precalc_pgid " << (int)t
->precalc_pgid
2768 << " pgid " << t
->base_pgid
2769 << (is_read
? " is_read" : "")
2770 << (is_write
? " is_write" : "")
2773 const pg_pool_t
*pi
= osdmap
->get_pg_pool(t
->base_oloc
.pool
);
2776 return RECALC_OP_TARGET_POOL_DNE
;
2778 ldout(cct
,30) << __func__
<< " base pi " << pi
2779 << " pg_num " << pi
->get_pg_num() << dendl
;
2781 bool force_resend
= false;
2782 if (osdmap
->get_epoch() == pi
->last_force_op_resend
) {
2783 if (t
->last_force_resend
< pi
->last_force_op_resend
) {
2784 t
->last_force_resend
= pi
->last_force_op_resend
;
2785 force_resend
= true;
2786 } else if (t
->last_force_resend
== 0) {
2787 force_resend
= true;
2792 t
->target_oid
= t
->base_oid
;
2793 t
->target_oloc
= t
->base_oloc
;
2794 if ((t
->flags
& CEPH_OSD_FLAG_IGNORE_OVERLAY
) == 0) {
2795 if (is_read
&& pi
->has_read_tier())
2796 t
->target_oloc
.pool
= pi
->read_tier
;
2797 if (is_write
&& pi
->has_write_tier())
2798 t
->target_oloc
.pool
= pi
->write_tier
;
2799 pi
= osdmap
->get_pg_pool(t
->target_oloc
.pool
);
2802 return RECALC_OP_TARGET_POOL_DNE
;
2807 if (t
->precalc_pgid
) {
2808 ceph_assert(t
->flags
& CEPH_OSD_FLAG_IGNORE_OVERLAY
);
2809 ceph_assert(t
->base_oid
.name
.empty()); // make sure this is a pg op
2810 ceph_assert(t
->base_oloc
.pool
== (int64_t)t
->base_pgid
.pool());
2811 pgid
= t
->base_pgid
;
2813 int ret
= osdmap
->object_locator_to_pg(t
->target_oid
, t
->target_oloc
,
2815 if (ret
== -ENOENT
) {
2817 return RECALC_OP_TARGET_POOL_DNE
;
2820 ldout(cct
,20) << __func__
<< " target " << t
->target_oid
<< " "
2821 << t
->target_oloc
<< " -> pgid " << pgid
<< dendl
;
2822 ldout(cct
,30) << __func__
<< " target pi " << pi
2823 << " pg_num " << pi
->get_pg_num() << dendl
;
2824 t
->pool_ever_existed
= true;
2826 int size
= pi
->size
;
2827 int min_size
= pi
->min_size
;
2828 unsigned pg_num
= pi
->get_pg_num();
2829 unsigned pg_num_mask
= pi
->get_pg_num_mask();
2830 unsigned pg_num_pending
= pi
->get_pg_num_pending();
2831 int up_primary
, acting_primary
;
2832 vector
<int> up
, acting
;
2833 ps_t actual_ps
= ceph_stable_mod(pgid
.ps(), pg_num
, pg_num_mask
);
2834 pg_t
actual_pgid(actual_ps
, pgid
.pool());
2835 pg_mapping_t pg_mapping
;
2836 pg_mapping
.epoch
= osdmap
->get_epoch();
2837 if (lookup_pg_mapping(actual_pgid
, &pg_mapping
)) {
2839 up_primary
= pg_mapping
.up_primary
;
2840 acting
= pg_mapping
.acting
;
2841 acting_primary
= pg_mapping
.acting_primary
;
2843 osdmap
->pg_to_up_acting_osds(actual_pgid
, &up
, &up_primary
,
2844 &acting
, &acting_primary
);
2845 pg_mapping_t
pg_mapping(osdmap
->get_epoch(),
2846 up
, up_primary
, acting
, acting_primary
);
2847 update_pg_mapping(actual_pgid
, std::move(pg_mapping
));
2849 bool sort_bitwise
= osdmap
->test_flag(CEPH_OSDMAP_SORTBITWISE
);
2850 bool recovery_deletes
= osdmap
->test_flag(CEPH_OSDMAP_RECOVERY_DELETES
);
2851 unsigned prev_seed
= ceph_stable_mod(pgid
.ps(), t
->pg_num
, t
->pg_num_mask
);
2852 pg_t
prev_pgid(prev_seed
, pgid
.pool());
2853 if (any_change
&& PastIntervals::is_new_interval(
2872 t
->recovery_deletes
,
2875 force_resend
= true;
2878 bool unpaused
= false;
2879 bool should_be_paused
= target_should_be_paused(t
);
2880 if (t
->paused
&& !should_be_paused
) {
2883 if (t
->paused
!= should_be_paused
) {
2884 ldout(cct
, 10) << __func__
<< " paused " << t
->paused
2885 << " -> " << should_be_paused
<< dendl
;
2886 t
->paused
= should_be_paused
;
2889 bool legacy_change
=
2892 t
->acting_primary
, t
->acting
, acting_primary
, acting
,
2893 t
->used_replica
|| any_change
);
2894 bool split_or_merge
= false;
2897 prev_pgid
.is_split(t
->pg_num
, pg_num
, nullptr) ||
2898 prev_pgid
.is_merge_source(t
->pg_num
, pg_num
, nullptr) ||
2899 prev_pgid
.is_merge_target(t
->pg_num
, pg_num
);
2902 if (legacy_change
|| split_or_merge
|| force_resend
) {
2905 t
->acting_primary
= acting_primary
;
2906 t
->up_primary
= up_primary
;
2909 t
->min_size
= min_size
;
2911 t
->pg_num_mask
= pg_num_mask
;
2912 t
->pg_num_pending
= pg_num_pending
;
2913 spg_t
spgid(actual_pgid
);
2914 if (pi
->is_erasure()) {
2915 for (uint8_t i
= 0; i
< acting
.size(); ++i
) {
2916 if (acting
[i
] == acting_primary
) {
2917 spgid
.reset_shard(shard_id_t(i
));
2922 t
->actual_pgid
= spgid
;
2923 t
->sort_bitwise
= sort_bitwise
;
2924 t
->recovery_deletes
= recovery_deletes
;
2925 ldout(cct
, 10) << __func__
<< " "
2926 << " raw pgid " << pgid
<< " -> actual " << t
->actual_pgid
2927 << " acting " << acting
2928 << " primary " << acting_primary
<< dendl
;
2929 t
->used_replica
= false;
2930 if (acting_primary
== -1) {
2934 bool read
= is_read
&& !is_write
;
2935 if (read
&& (t
->flags
& CEPH_OSD_FLAG_BALANCE_READS
)) {
2936 int p
= rand() % acting
.size();
2938 t
->used_replica
= true;
2940 ldout(cct
, 10) << " chose random osd." << osd
<< " of " << acting
2942 } else if (read
&& (t
->flags
& CEPH_OSD_FLAG_LOCALIZE_READS
) &&
2943 acting
.size() > 1) {
2944 // look for a local replica. prefer the primary if the
2945 // distance is the same.
2947 int best_locality
= 0;
2948 for (unsigned i
= 0; i
< acting
.size(); ++i
) {
2949 int locality
= osdmap
->crush
->get_common_ancestor_distance(
2950 cct
, acting
[i
], crush_location
);
2951 ldout(cct
, 20) << __func__
<< " localize: rank " << i
2952 << " osd." << acting
[i
]
2953 << " locality " << locality
<< dendl
;
2955 (locality
>= 0 && best_locality
>= 0 &&
2956 locality
< best_locality
) ||
2957 (best_locality
< 0 && locality
>= 0)) {
2959 best_locality
= locality
;
2961 t
->used_replica
= true;
2964 ceph_assert(best
>= 0);
2967 osd
= acting_primary
;
2972 if (legacy_change
|| unpaused
|| force_resend
) {
2973 return RECALC_OP_TARGET_NEED_RESEND
;
2975 if (split_or_merge
&&
2976 (osdmap
->require_osd_release
>= ceph_release_t::luminous
||
2977 HAVE_FEATURE(osdmap
->get_xinfo(acting_primary
).features
,
2978 RESEND_ON_SPLIT
))) {
2979 return RECALC_OP_TARGET_NEED_RESEND
;
2981 return RECALC_OP_TARGET_NO_ACTION
;
2984 int Objecter::_map_session(op_target_t
*target
, OSDSession
**s
,
2987 _calc_target(target
, nullptr);
2988 return _get_session(target
->osd
, s
, sul
);
2991 void Objecter::_session_op_assign(OSDSession
*to
, Op
*op
)
2993 // to->lock is locked
2994 ceph_assert(op
->session
== NULL
);
2995 ceph_assert(op
->tid
);
2999 to
->ops
[op
->tid
] = op
;
3001 if (to
->is_homeless()) {
3005 ldout(cct
, 15) << __func__
<< " " << to
->osd
<< " " << op
->tid
<< dendl
;
3008 void Objecter::_session_op_remove(OSDSession
*from
, Op
*op
)
3010 ceph_assert(op
->session
== from
);
3011 // from->lock is locked
3013 if (from
->is_homeless()) {
3017 from
->ops
.erase(op
->tid
);
3021 ldout(cct
, 15) << __func__
<< " " << from
->osd
<< " " << op
->tid
<< dendl
;
3024 void Objecter::_session_linger_op_assign(OSDSession
*to
, LingerOp
*op
)
3026 // to lock is locked unique
3027 ceph_assert(op
->session
== NULL
);
3029 if (to
->is_homeless()) {
3035 to
->linger_ops
[op
->linger_id
] = op
;
3037 ldout(cct
, 15) << __func__
<< " " << to
->osd
<< " " << op
->linger_id
3041 void Objecter::_session_linger_op_remove(OSDSession
*from
, LingerOp
*op
)
3043 ceph_assert(from
== op
->session
);
3044 // from->lock is locked unique
3046 if (from
->is_homeless()) {
3050 from
->linger_ops
.erase(op
->linger_id
);
3054 ldout(cct
, 15) << __func__
<< " " << from
->osd
<< " " << op
->linger_id
3058 void Objecter::_session_command_op_remove(OSDSession
*from
, CommandOp
*op
)
3060 ceph_assert(from
== op
->session
);
3061 // from->lock is locked
3063 if (from
->is_homeless()) {
3067 from
->command_ops
.erase(op
->tid
);
3071 ldout(cct
, 15) << __func__
<< " " << from
->osd
<< " " << op
->tid
<< dendl
;
3074 void Objecter::_session_command_op_assign(OSDSession
*to
, CommandOp
*op
)
3076 // to->lock is locked
3077 ceph_assert(op
->session
== NULL
);
3078 ceph_assert(op
->tid
);
3080 if (to
->is_homeless()) {
3086 to
->command_ops
[op
->tid
] = op
;
3088 ldout(cct
, 15) << __func__
<< " " << to
->osd
<< " " << op
->tid
<< dendl
;
3091 int Objecter::_recalc_linger_op_target(LingerOp
*linger_op
,
3094 // rwlock is locked unique
3096 int r
= _calc_target(&linger_op
->target
, nullptr, true);
3097 if (r
== RECALC_OP_TARGET_NEED_RESEND
) {
3098 ldout(cct
, 10) << "recalc_linger_op_target tid " << linger_op
->linger_id
3099 << " pgid " << linger_op
->target
.pgid
3100 << " acting " << linger_op
->target
.acting
<< dendl
;
3102 OSDSession
*s
= NULL
;
3103 r
= _get_session(linger_op
->target
.osd
, &s
, sul
);
3104 ceph_assert(r
== 0);
3106 if (linger_op
->session
!= s
) {
3107 // NB locking two sessions (s and linger_op->session) at the
3108 // same time here is only safe because we are the only one that
3109 // takes two, and we are holding rwlock for write. Disable
3110 // lockdep because it doesn't know that.
3111 OSDSession::unique_lock
sl(s
->lock
);
3112 _session_linger_op_remove(linger_op
->session
, linger_op
);
3113 _session_linger_op_assign(s
, linger_op
);
3117 return RECALC_OP_TARGET_NEED_RESEND
;
3122 void Objecter::_cancel_linger_op(Op
*op
)
3124 ldout(cct
, 15) << "cancel_op " << op
->tid
<< dendl
;
3126 ceph_assert(!op
->should_resend
);
3128 delete op
->onfinish
;
3135 void Objecter::_finish_op(Op
*op
, int r
)
3137 ldout(cct
, 15) << __func__
<< " " << op
->tid
<< dendl
;
3139 // op->session->lock is locked unique or op->session is null
3141 if (!op
->ctx_budgeted
&& op
->budget
>= 0) {
3142 put_op_budget_bytes(op
->budget
);
3146 if (op
->ontimeout
&& r
!= -ETIMEDOUT
)
3147 timer
.cancel_event(op
->ontimeout
);
3150 _session_op_remove(op
->session
, op
);
3153 logger
->dec(l_osdc_op_active
);
3155 ceph_assert(check_latest_map_ops
.find(op
->tid
) == check_latest_map_ops
.end());
3162 MOSDOp
*Objecter::_prepare_osd_op(Op
*op
)
3166 int flags
= op
->target
.flags
;
3167 flags
|= CEPH_OSD_FLAG_KNOWN_REDIR
;
3169 // Nothing checks this any longer, but needed for compatibility with
3170 // pre-luminous osds
3171 flags
|= CEPH_OSD_FLAG_ONDISK
;
3173 if (!honor_pool_full
)
3174 flags
|= CEPH_OSD_FLAG_FULL_FORCE
;
3176 op
->target
.paused
= false;
3177 op
->stamp
= ceph::coarse_mono_clock::now();
3179 hobject_t hobj
= op
->target
.get_hobj();
3180 MOSDOp
*m
= new MOSDOp(client_inc
, op
->tid
,
3181 hobj
, op
->target
.actual_pgid
,
3182 osdmap
->get_epoch(),
3183 flags
, op
->features
);
3185 m
->set_snapid(op
->snapid
);
3186 m
->set_snap_seq(op
->snapc
.seq
);
3187 m
->set_snaps(op
->snapc
.snaps
);
3190 m
->set_mtime(op
->mtime
);
3191 m
->set_retry_attempt(op
->attempts
++);
3193 if (!op
->trace
.valid() && cct
->_conf
->osdc_blkin_trace_all
) {
3194 op
->trace
.init("op", &trace_endpoint
);
3198 m
->set_priority(op
->priority
);
3200 m
->set_priority(cct
->_conf
->osd_client_op_priority
);
3202 if (op
->reqid
!= osd_reqid_t()) {
3203 m
->set_reqid(op
->reqid
);
3206 logger
->inc(l_osdc_op_send
);
3208 for (unsigned i
= 0; i
< m
->ops
.size(); i
++) {
3209 sum
+= m
->ops
[i
].indata
.length();
3211 logger
->inc(l_osdc_op_send_bytes
, sum
);
3216 void Objecter::_send_op(Op
*op
)
3219 // op->session->lock is locked
3222 auto p
= op
->session
->backoffs
.find(op
->target
.actual_pgid
);
3223 if (p
!= op
->session
->backoffs
.end()) {
3224 hobject_t hoid
= op
->target
.get_hobj();
3225 auto q
= p
->second
.lower_bound(hoid
);
3226 if (q
!= p
->second
.begin()) {
3228 if (hoid
>= q
->second
.end
) {
3232 if (q
!= p
->second
.end()) {
3233 ldout(cct
, 20) << __func__
<< " ? " << q
->first
<< " [" << q
->second
.begin
3234 << "," << q
->second
.end
<< ")" << dendl
;
3235 int r
= cmp(hoid
, q
->second
.begin
);
3236 if (r
== 0 || (r
> 0 && hoid
< q
->second
.end
)) {
3237 ldout(cct
, 10) << __func__
<< " backoff " << op
->target
.actual_pgid
3238 << " id " << q
->second
.id
<< " on " << hoid
3239 << ", queuing " << op
<< " tid " << op
->tid
<< dendl
;
3245 ceph_assert(op
->tid
> 0);
3246 MOSDOp
*m
= _prepare_osd_op(op
);
3248 if (op
->target
.actual_pgid
!= m
->get_spg()) {
3249 ldout(cct
, 10) << __func__
<< " " << op
->tid
<< " pgid change from "
3250 << m
->get_spg() << " to " << op
->target
.actual_pgid
3251 << ", updating and reencoding" << dendl
;
3252 m
->set_spg(op
->target
.actual_pgid
);
3253 m
->clear_payload(); // reencode
3256 ldout(cct
, 15) << "_send_op " << op
->tid
<< " to "
3257 << op
->target
.actual_pgid
<< " on osd." << op
->session
->osd
3260 ConnectionRef con
= op
->session
->con
;
3264 // preallocated rx ceph::buffer?
3266 ldout(cct
, 20) << " revoking rx ceph::buffer for " << op
->tid
<< " on "
3267 << op
->con
<< dendl
;
3268 op
->con
->revoke_rx_buffer(op
->tid
);
3271 op
->ontimeout
== 0 && // only post rx_buffer if no timeout; see #9582
3272 op
->outbl
->length()) {
3273 op
->outbl
->invalidate_crc(); // messenger writes through c_str()
3274 ldout(cct
, 20) << " posting rx ceph::buffer for " << op
->tid
<< " on " << con
3277 op
->con
->post_rx_buffer(op
->tid
, *op
->outbl
);
3281 op
->incarnation
= op
->session
->incarnation
;
3283 if (op
->trace
.valid()) {
3284 m
->trace
.init("op msg", nullptr, &op
->trace
);
3286 op
->session
->con
->send_message(m
);
3289 int Objecter::calc_op_budget(const vector
<OSDOp
>& ops
)
3292 for (vector
<OSDOp
>::const_iterator i
= ops
.begin();
3295 if (i
->op
.op
& CEPH_OSD_OP_MODE_WR
) {
3296 op_budget
+= i
->indata
.length();
3297 } else if (ceph_osd_op_mode_read(i
->op
.op
)) {
3298 if (ceph_osd_op_uses_extent(i
->op
.op
)) {
3299 if ((int64_t)i
->op
.extent
.length
> 0)
3300 op_budget
+= (int64_t)i
->op
.extent
.length
;
3301 } else if (ceph_osd_op_type_attr(i
->op
.op
)) {
3302 op_budget
+= i
->op
.xattr
.name_len
+ i
->op
.xattr
.value_len
;
3309 void Objecter::_throttle_op(Op
*op
,
3313 ceph_assert(sul
&& sul
.mutex() == &rwlock
);
3314 bool locked_for_write
= sul
.owns_lock();
3317 op_budget
= calc_op_budget(op
->ops
);
3318 if (!op_throttle_bytes
.get_or_fail(op_budget
)) { //couldn't take right now
3320 op_throttle_bytes
.get(op_budget
);
3321 if (locked_for_write
)
3326 if (!op_throttle_ops
.get_or_fail(1)) { //couldn't take right now
3328 op_throttle_ops
.get(1);
3329 if (locked_for_write
)
3336 int Objecter::take_linger_budget(LingerOp
*info
)
3341 /* This function DOES put the passed message before returning */
3342 void Objecter::handle_osd_op_reply(MOSDOpReply
*m
)
3344 ldout(cct
, 10) << "in handle_osd_op_reply" << dendl
;
3347 ceph_tid_t tid
= m
->get_tid();
3349 shunique_lock
sul(rwlock
, ceph::acquire_shared
);
3355 ConnectionRef con
= m
->get_connection();
3356 auto priv
= con
->get_priv();
3357 auto s
= static_cast<OSDSession
*>(priv
.get());
3358 if (!s
|| s
->con
!= con
) {
3359 ldout(cct
, 7) << __func__
<< " no session on con " << con
<< dendl
;
3364 OSDSession::unique_lock
sl(s
->lock
);
3366 map
<ceph_tid_t
, Op
*>::iterator iter
= s
->ops
.find(tid
);
3367 if (iter
== s
->ops
.end()) {
3368 ldout(cct
, 7) << "handle_osd_op_reply " << tid
3369 << (m
->is_ondisk() ? " ondisk" : (m
->is_onnvram() ?
3370 " onnvram" : " ack"))
3371 << " ... stray" << dendl
;
3377 ldout(cct
, 7) << "handle_osd_op_reply " << tid
3378 << (m
->is_ondisk() ? " ondisk" :
3379 (m
->is_onnvram() ? " onnvram" : " ack"))
3380 << " uv " << m
->get_user_version()
3381 << " in " << m
->get_pg()
3382 << " attempt " << m
->get_retry_attempt()
3384 Op
*op
= iter
->second
;
3385 op
->trace
.event("osd op reply");
3387 if (retry_writes_after_first_reply
&& op
->attempts
== 1 &&
3388 (op
->target
.flags
& CEPH_OSD_FLAG_WRITE
)) {
3389 ldout(cct
, 7) << "retrying write after first reply: " << tid
<< dendl
;
3393 _session_op_remove(s
, op
);
3396 _op_submit(op
, sul
, NULL
);
3401 if (m
->get_retry_attempt() >= 0) {
3402 if (m
->get_retry_attempt() != (op
->attempts
- 1)) {
3403 ldout(cct
, 7) << " ignoring reply from attempt "
3404 << m
->get_retry_attempt()
3405 << " from " << m
->get_source_inst()
3406 << "; last attempt " << (op
->attempts
- 1) << " sent to "
3407 << op
->session
->con
->get_peer_addr() << dendl
;
3413 // we don't know the request attempt because the server is old, so
3414 // just accept this one. we may do ACK callbacks we shouldn't
3415 // have, but that is better than doing callbacks out of order.
3418 Context
*onfinish
= 0;
3420 int rc
= m
->get_result();
3422 if (m
->is_redirect_reply()) {
3423 ldout(cct
, 5) << " got redirect reply; redirecting" << dendl
;
3426 _session_op_remove(s
, op
);
3429 // FIXME: two redirects could race and reorder
3432 m
->get_redirect().combine_with_locator(op
->target
.target_oloc
,
3433 op
->target
.target_oid
.name
);
3434 op
->target
.flags
|= (CEPH_OSD_FLAG_REDIRECTED
|
3435 CEPH_OSD_FLAG_IGNORE_CACHE
|
3436 CEPH_OSD_FLAG_IGNORE_OVERLAY
);
3437 _op_submit(op
, sul
, NULL
);
3442 if (rc
== -EAGAIN
) {
3443 ldout(cct
, 7) << " got -EAGAIN, resubmitting" << dendl
;
3446 _session_op_remove(s
, op
);
3450 op
->target
.flags
&= ~(CEPH_OSD_FLAG_BALANCE_READS
|
3451 CEPH_OSD_FLAG_LOCALIZE_READS
);
3452 op
->target
.pgid
= pg_t();
3453 _op_submit(op
, sul
, NULL
);
3461 *op
->objver
= m
->get_user_version();
3462 if (op
->reply_epoch
)
3463 *op
->reply_epoch
= m
->get_map_epoch();
3464 if (op
->data_offset
)
3465 *op
->data_offset
= m
->get_header().data_off
;
3471 op
->con
->revoke_rx_buffer(op
->tid
);
3473 auto& bl
= m
->get_data();
3474 if (op
->outbl
->length() == bl
.length() &&
3475 bl
.get_num_buffers() <= 1) {
3476 // this is here to keep previous users to *relied* on getting data
3477 // read into existing buffers happy. Notably,
3478 // libradosstriper::RadosStriperImpl::aio_read().
3479 ldout(cct
,10) << __func__
<< " copying resulting " << bl
.length()
3480 << " into existing ceph::buffer of length " << op
->outbl
->length()
3482 ceph::buffer::list t
;
3483 t
.claim(*op
->outbl
);
3484 t
.invalidate_crc(); // we're overwriting the raw buffers via c_str()
3485 bl
.begin().copy(bl
.length(), t
.c_str());
3486 op
->outbl
->substr_of(t
, 0, bl
.length());
3488 m
->claim_data(*op
->outbl
);
3493 // per-op result demuxing
3494 vector
<OSDOp
> out_ops
;
3495 m
->claim_ops(out_ops
);
3497 if (out_ops
.size() != op
->ops
.size())
3498 ldout(cct
, 0) << "WARNING: tid " << op
->tid
<< " reply ops " << out_ops
3499 << " != request ops " << op
->ops
3500 << " from " << m
->get_source_inst() << dendl
;
3502 vector
<ceph::buffer::list
*>::iterator pb
= op
->out_bl
.begin();
3503 vector
<int*>::iterator pr
= op
->out_rval
.begin();
3504 vector
<Context
*>::iterator ph
= op
->out_handler
.begin();
3505 ceph_assert(op
->out_bl
.size() == op
->out_rval
.size());
3506 ceph_assert(op
->out_bl
.size() == op
->out_handler
.size());
3507 vector
<OSDOp
>::iterator p
= out_ops
.begin();
3508 for (unsigned i
= 0;
3509 p
!= out_ops
.end() && pb
!= op
->out_bl
.end();
3510 ++i
, ++p
, ++pb
, ++pr
, ++ph
) {
3511 ldout(cct
, 10) << " op " << i
<< " rval " << p
->rval
3512 << " len " << p
->outdata
.length() << dendl
;
3515 // set rval before running handlers so that handlers
3516 // can change it if e.g. decoding fails
3518 **pr
= ceph_to_hostos_errno(p
->rval
);
3520 ldout(cct
, 10) << " op " << i
<< " handler " << *ph
<< dendl
;
3521 (*ph
)->complete(ceph_to_hostos_errno(p
->rval
));
3526 // NOTE: we assume that since we only request ONDISK ever we will
3527 // only ever get back one (type of) ack ever.
3531 onfinish
= op
->onfinish
;
3532 op
->onfinish
= NULL
;
3534 logger
->inc(l_osdc_op_reply
);
3536 /* get it before we call _finish_op() */
3537 auto completion_lock
= s
->get_lock(op
->target
.base_oid
);
3539 ldout(cct
, 15) << "handle_osd_op_reply completed tid " << tid
<< dendl
;
3542 ldout(cct
, 5) << num_in_flight
<< " in flight" << dendl
;
3544 // serialize completions
3545 if (completion_lock
.mutex()) {
3546 completion_lock
.lock();
3552 onfinish
->complete(rc
);
3554 if (completion_lock
.mutex()) {
3555 completion_lock
.unlock();
3561 void Objecter::handle_osd_backoff(MOSDBackoff
*m
)
3563 ldout(cct
, 10) << __func__
<< " " << *m
<< dendl
;
3564 shunique_lock
sul(rwlock
, ceph::acquire_shared
);
3570 ConnectionRef con
= m
->get_connection();
3571 auto priv
= con
->get_priv();
3572 auto s
= static_cast<OSDSession
*>(priv
.get());
3573 if (!s
|| s
->con
!= con
) {
3574 ldout(cct
, 7) << __func__
<< " no session on con " << con
<< dendl
;
3581 OSDSession::unique_lock
sl(s
->lock
);
3584 case CEPH_OSD_BACKOFF_OP_BLOCK
:
3587 OSDBackoff
& b
= s
->backoffs
[m
->pgid
][m
->begin
];
3588 s
->backoffs_by_id
.insert(make_pair(m
->id
, &b
));
3594 // ack with original backoff's epoch so that the osd can discard this if
3595 // there was a pg split.
3596 Message
*r
= new MOSDBackoff(m
->pgid
,
3598 CEPH_OSD_BACKOFF_OP_ACK_BLOCK
,
3599 m
->id
, m
->begin
, m
->end
);
3600 // this priority must match the MOSDOps from _prepare_osd_op
3601 r
->set_priority(cct
->_conf
->osd_client_op_priority
);
3602 con
->send_message(r
);
3606 case CEPH_OSD_BACKOFF_OP_UNBLOCK
:
3608 auto p
= s
->backoffs_by_id
.find(m
->id
);
3609 if (p
!= s
->backoffs_by_id
.end()) {
3610 OSDBackoff
*b
= p
->second
;
3611 if (b
->begin
!= m
->begin
&&
3613 lderr(cct
) << __func__
<< " got " << m
->pgid
<< " id " << m
->id
3615 << m
->begin
<< "," << m
->end
<< ") but backoff is ["
3616 << b
->begin
<< "," << b
->end
<< ")" << dendl
;
3617 // hrmpf, unblock it anyway.
3619 ldout(cct
, 10) << __func__
<< " unblock backoff " << b
->pgid
3621 << " [" << b
->begin
<< "," << b
->end
3623 auto spgp
= s
->backoffs
.find(b
->pgid
);
3624 ceph_assert(spgp
!= s
->backoffs
.end());
3625 spgp
->second
.erase(b
->begin
);
3626 if (spgp
->second
.empty()) {
3627 s
->backoffs
.erase(spgp
);
3629 s
->backoffs_by_id
.erase(p
);
3631 // check for any ops to resend
3632 for (auto& q
: s
->ops
) {
3633 if (q
.second
->target
.actual_pgid
== m
->pgid
) {
3634 int r
= q
.second
->target
.contained_by(m
->begin
, m
->end
);
3635 ldout(cct
, 20) << __func__
<< " contained_by " << r
<< " on "
3636 << q
.second
->target
.get_hobj() << dendl
;
3643 lderr(cct
) << __func__
<< " " << m
->pgid
<< " id " << m
->id
3645 << m
->begin
<< "," << m
->end
<< ") but backoff dne" << dendl
;
3651 ldout(cct
, 10) << __func__
<< " unrecognized op " << (int)m
->op
<< dendl
;
3661 uint32_t Objecter::list_nobjects_seek(NListContext
*list_context
,
3664 shared_lock
rl(rwlock
);
3665 list_context
->pos
= hobject_t(object_t(), string(), CEPH_NOSNAP
,
3666 pos
, list_context
->pool_id
, string());
3667 ldout(cct
, 10) << __func__
<< " " << list_context
3668 << " pos " << pos
<< " -> " << list_context
->pos
<< dendl
;
3669 pg_t actual
= osdmap
->raw_pg_to_pg(pg_t(pos
, list_context
->pool_id
));
3670 list_context
->current_pg
= actual
.ps();
3671 list_context
->at_end_of_pool
= false;
3675 uint32_t Objecter::list_nobjects_seek(NListContext
*list_context
,
3676 const hobject_t
& cursor
)
3678 shared_lock
rl(rwlock
);
3679 ldout(cct
, 10) << "list_nobjects_seek " << list_context
<< dendl
;
3680 list_context
->pos
= cursor
;
3681 list_context
->at_end_of_pool
= false;
3682 pg_t actual
= osdmap
->raw_pg_to_pg(pg_t(cursor
.get_hash(), list_context
->pool_id
));
3683 list_context
->current_pg
= actual
.ps();
3684 list_context
->sort_bitwise
= true;
3685 return list_context
->current_pg
;
3688 void Objecter::list_nobjects_get_cursor(NListContext
*list_context
,
3691 shared_lock
rl(rwlock
);
3692 if (list_context
->list
.empty()) {
3693 *cursor
= list_context
->pos
;
3695 const librados::ListObjectImpl
& entry
= list_context
->list
.front();
3696 const string
*key
= (entry
.locator
.empty() ? &entry
.oid
: &entry
.locator
);
3697 uint32_t h
= osdmap
->get_pg_pool(list_context
->pool_id
)->hash_key(*key
, entry
.nspace
);
3698 *cursor
= hobject_t(entry
.oid
, entry
.locator
, list_context
->pool_snap_seq
, h
, list_context
->pool_id
, entry
.nspace
);
3702 void Objecter::list_nobjects(NListContext
*list_context
, Context
*onfinish
)
3704 ldout(cct
, 10) << __func__
<< " pool_id " << list_context
->pool_id
3705 << " pool_snap_seq " << list_context
->pool_snap_seq
3706 << " max_entries " << list_context
->max_entries
3707 << " list_context " << list_context
3708 << " onfinish " << onfinish
3709 << " current_pg " << list_context
->current_pg
3710 << " pos " << list_context
->pos
<< dendl
;
3712 shared_lock
rl(rwlock
);
3713 const pg_pool_t
*pool
= osdmap
->get_pg_pool(list_context
->pool_id
);
3714 if (!pool
) { // pool is gone
3716 put_nlist_context_budget(list_context
);
3717 onfinish
->complete(-ENOENT
);
3720 int pg_num
= pool
->get_pg_num();
3721 bool sort_bitwise
= osdmap
->test_flag(CEPH_OSDMAP_SORTBITWISE
);
3723 if (list_context
->pos
.is_min()) {
3724 list_context
->starting_pg_num
= 0;
3725 list_context
->sort_bitwise
= sort_bitwise
;
3726 list_context
->starting_pg_num
= pg_num
;
3728 if (list_context
->sort_bitwise
!= sort_bitwise
) {
3729 list_context
->pos
= hobject_t(
3730 object_t(), string(), CEPH_NOSNAP
,
3731 list_context
->current_pg
, list_context
->pool_id
, string());
3732 list_context
->sort_bitwise
= sort_bitwise
;
3733 ldout(cct
, 10) << " hobject sort order changed, restarting this pg at "
3734 << list_context
->pos
<< dendl
;
3736 if (list_context
->starting_pg_num
!= pg_num
) {
3737 if (!sort_bitwise
) {
3738 // start reading from the beginning; the pgs have changed
3739 ldout(cct
, 10) << " pg_num changed; restarting with " << pg_num
<< dendl
;
3740 list_context
->pos
= collection_list_handle_t();
3742 list_context
->starting_pg_num
= pg_num
;
3745 if (list_context
->pos
.is_max()) {
3746 ldout(cct
, 20) << __func__
<< " end of pool, list "
3747 << list_context
->list
<< dendl
;
3748 if (list_context
->list
.empty()) {
3749 list_context
->at_end_of_pool
= true;
3751 // release the listing context's budget once all
3752 // OPs (in the session) are finished
3753 put_nlist_context_budget(list_context
);
3754 onfinish
->complete(0);
3759 op
.pg_nls(list_context
->max_entries
, list_context
->filter
,
3760 list_context
->pos
, osdmap
->get_epoch());
3761 list_context
->bl
.clear();
3762 C_NList
*onack
= new C_NList(list_context
, onfinish
, this);
3763 object_locator_t
oloc(list_context
->pool_id
, list_context
->nspace
);
3765 // note current_pg in case we don't have (or lose) SORTBITWISE
3766 list_context
->current_pg
= pool
->raw_hash_to_pg(list_context
->pos
.get_hash());
3769 pg_read(list_context
->current_pg
, oloc
, op
,
3770 &list_context
->bl
, 0, onack
, &onack
->epoch
,
3771 &list_context
->ctx_budget
);
3774 void Objecter::_nlist_reply(NListContext
*list_context
, int r
,
3775 Context
*final_finish
, epoch_t reply_epoch
)
3777 ldout(cct
, 10) << __func__
<< " " << list_context
<< dendl
;
3779 auto iter
= list_context
->bl
.cbegin();
3780 pg_nls_response_t response
;
3781 decode(response
, iter
);
3783 // we do this as legacy.
3784 ceph::buffer::list legacy_extra_info
;
3785 decode(legacy_extra_info
, iter
);
3788 // if the osd returns 1 (newer code), or handle MAX, it means we
3789 // hit the end of the pg.
3790 if ((response
.handle
.is_max() || r
== 1) &&
3791 !list_context
->sort_bitwise
) {
3792 // legacy OSD and !sortbitwise, figure out the next PG on our own
3793 ++list_context
->current_pg
;
3794 if (list_context
->current_pg
== list_context
->starting_pg_num
) {
3796 list_context
->pos
= hobject_t::get_max();
3799 list_context
->pos
= hobject_t(object_t(), string(), CEPH_NOSNAP
,
3800 list_context
->current_pg
,
3801 list_context
->pool_id
, string());
3804 list_context
->pos
= response
.handle
;
3807 int response_size
= response
.entries
.size();
3808 ldout(cct
, 20) << " response.entries.size " << response_size
3809 << ", response.entries " << response
.entries
3810 << ", handle " << response
.handle
3811 << ", tentative new pos " << list_context
->pos
<< dendl
;
3812 if (response_size
) {
3813 list_context
->list
.splice(list_context
->list
.end(), response
.entries
);
3816 if (list_context
->list
.size() >= list_context
->max_entries
) {
3817 ldout(cct
, 20) << " hit max, returning results so far, "
3818 << list_context
->list
<< dendl
;
3819 // release the listing context's budget once all
3820 // OPs (in the session) are finished
3821 put_nlist_context_budget(list_context
);
3822 final_finish
->complete(0);
3827 list_nobjects(list_context
, final_finish
);
3830 void Objecter::put_nlist_context_budget(NListContext
*list_context
)
3832 if (list_context
->ctx_budget
>= 0) {
3833 ldout(cct
, 10) << " release listing context's budget " <<
3834 list_context
->ctx_budget
<< dendl
;
3835 put_op_budget_bytes(list_context
->ctx_budget
);
3836 list_context
->ctx_budget
= -1;
3842 int Objecter::create_pool_snap(int64_t pool
, string
& snap_name
,
3845 unique_lock
wl(rwlock
);
3846 ldout(cct
, 10) << "create_pool_snap; pool: " << pool
<< "; snap: "
3847 << snap_name
<< dendl
;
3849 const pg_pool_t
*p
= osdmap
->get_pg_pool(pool
);
3852 if (p
->snap_exists(snap_name
.c_str()))
3855 PoolOp
*op
= new PoolOp
;
3858 op
->tid
= ++last_tid
;
3860 op
->name
= snap_name
;
3861 op
->onfinish
= onfinish
;
3862 op
->pool_op
= POOL_OP_CREATE_SNAP
;
3863 pool_ops
[op
->tid
] = op
;
3870 struct C_SelfmanagedSnap
: public Context
{
3871 ceph::buffer::list bl
;
3874 C_SelfmanagedSnap(snapid_t
*ps
, Context
*f
) : psnapid(ps
), fin(f
) {}
3875 void finish(int r
) override
{
3878 auto p
= bl
.cbegin();
3879 decode(*psnapid
, p
);
3880 } catch (ceph::buffer::error
&) {
3888 int Objecter::allocate_selfmanaged_snap(int64_t pool
, snapid_t
*psnapid
,
3891 unique_lock
wl(rwlock
);
3892 ldout(cct
, 10) << "allocate_selfmanaged_snap; pool: " << pool
<< dendl
;
3893 PoolOp
*op
= new PoolOp
;
3894 if (!op
) return -ENOMEM
;
3895 op
->tid
= ++last_tid
;
3897 C_SelfmanagedSnap
*fin
= new C_SelfmanagedSnap(psnapid
, onfinish
);
3900 op
->pool_op
= POOL_OP_CREATE_UNMANAGED_SNAP
;
3901 pool_ops
[op
->tid
] = op
;
3907 int Objecter::delete_pool_snap(int64_t pool
, string
& snap_name
,
3910 unique_lock
wl(rwlock
);
3911 ldout(cct
, 10) << "delete_pool_snap; pool: " << pool
<< "; snap: "
3912 << snap_name
<< dendl
;
3914 const pg_pool_t
*p
= osdmap
->get_pg_pool(pool
);
3917 if (!p
->snap_exists(snap_name
.c_str()))
3920 PoolOp
*op
= new PoolOp
;
3923 op
->tid
= ++last_tid
;
3925 op
->name
= snap_name
;
3926 op
->onfinish
= onfinish
;
3927 op
->pool_op
= POOL_OP_DELETE_SNAP
;
3928 pool_ops
[op
->tid
] = op
;
3935 int Objecter::delete_selfmanaged_snap(int64_t pool
, snapid_t snap
,
3938 unique_lock
wl(rwlock
);
3939 ldout(cct
, 10) << "delete_selfmanaged_snap; pool: " << pool
<< "; snap: "
3941 PoolOp
*op
= new PoolOp
;
3942 if (!op
) return -ENOMEM
;
3943 op
->tid
= ++last_tid
;
3945 op
->onfinish
= onfinish
;
3946 op
->pool_op
= POOL_OP_DELETE_UNMANAGED_SNAP
;
3948 pool_ops
[op
->tid
] = op
;
3955 int Objecter::create_pool(string
& name
, Context
*onfinish
,
3958 unique_lock
wl(rwlock
);
3959 ldout(cct
, 10) << "create_pool name=" << name
<< dendl
;
3961 if (osdmap
->lookup_pg_pool_name(name
) >= 0)
3964 PoolOp
*op
= new PoolOp
;
3967 op
->tid
= ++last_tid
;
3970 op
->onfinish
= onfinish
;
3971 op
->pool_op
= POOL_OP_CREATE
;
3972 pool_ops
[op
->tid
] = op
;
3973 op
->crush_rule
= crush_rule
;
3980 int Objecter::delete_pool(int64_t pool
, Context
*onfinish
)
3982 unique_lock
wl(rwlock
);
3983 ldout(cct
, 10) << "delete_pool " << pool
<< dendl
;
3985 if (!osdmap
->have_pg_pool(pool
))
3988 _do_delete_pool(pool
, onfinish
);
3992 int Objecter::delete_pool(const string
&pool_name
, Context
*onfinish
)
3994 unique_lock
wl(rwlock
);
3995 ldout(cct
, 10) << "delete_pool " << pool_name
<< dendl
;
3997 int64_t pool
= osdmap
->lookup_pg_pool_name(pool_name
);
4001 _do_delete_pool(pool
, onfinish
);
4005 void Objecter::_do_delete_pool(int64_t pool
, Context
*onfinish
)
4007 PoolOp
*op
= new PoolOp
;
4008 op
->tid
= ++last_tid
;
4010 op
->name
= "delete";
4011 op
->onfinish
= onfinish
;
4012 op
->pool_op
= POOL_OP_DELETE
;
4013 pool_ops
[op
->tid
] = op
;
4017 void Objecter::pool_op_submit(PoolOp
*op
)
4020 if (mon_timeout
> timespan(0)) {
4021 op
->ontimeout
= timer
.add_event(mon_timeout
,
4023 pool_op_cancel(op
->tid
, -ETIMEDOUT
); });
4025 _pool_op_submit(op
);
4028 void Objecter::_pool_op_submit(PoolOp
*op
)
4030 // rwlock is locked unique
4032 ldout(cct
, 10) << "pool_op_submit " << op
->tid
<< dendl
;
4033 MPoolOp
*m
= new MPoolOp(monc
->get_fsid(), op
->tid
, op
->pool
,
4034 op
->name
, op
->pool_op
,
4035 last_seen_osdmap_version
);
4036 if (op
->snapid
) m
->snapid
= op
->snapid
;
4037 if (op
->crush_rule
) m
->crush_rule
= op
->crush_rule
;
4038 monc
->send_mon_message(m
);
4039 op
->last_submit
= ceph::coarse_mono_clock::now();
4041 logger
->inc(l_osdc_poolop_send
);
4045 * Handle a reply to a PoolOp message. Check that we sent the message
4046 * and give the caller responsibility for the returned ceph::buffer::list.
4047 * Then either call the finisher or stash the PoolOp, depending on if we
4048 * have a new enough map.
4049 * Lastly, clean up the message and PoolOp.
4051 void Objecter::handle_pool_op_reply(MPoolOpReply
*m
)
4054 shunique_lock
sul(rwlock
, acquire_shared
);
4061 ldout(cct
, 10) << "handle_pool_op_reply " << *m
<< dendl
;
4062 ceph_tid_t tid
= m
->get_tid();
4063 map
<ceph_tid_t
, PoolOp
*>::iterator iter
= pool_ops
.find(tid
);
4064 if (iter
!= pool_ops
.end()) {
4065 PoolOp
*op
= iter
->second
;
4066 ldout(cct
, 10) << "have request " << tid
<< " at " << op
<< " Op: "
4067 << ceph_pool_op_name(op
->pool_op
) << dendl
;
4069 op
->blp
->claim(m
->response_data
);
4070 if (m
->version
> last_seen_osdmap_version
)
4071 last_seen_osdmap_version
= m
->version
;
4072 if (osdmap
->get_epoch() < m
->epoch
) {
4075 // recheck op existence since we have let go of rwlock
4076 // (for promotion) above.
4077 iter
= pool_ops
.find(tid
);
4078 if (iter
== pool_ops
.end())
4079 goto done
; // op is gone.
4080 if (osdmap
->get_epoch() < m
->epoch
) {
4081 ldout(cct
, 20) << "waiting for client to reach epoch " << m
->epoch
4082 << " before calling back" << dendl
;
4083 _wait_for_new_map(op
->onfinish
, m
->epoch
, m
->replyCode
);
4085 // map epoch changed, probably because a MOSDMap message
4086 // sneaked in. Do caller-specified callback now or else
4087 // we lose it forever.
4088 ceph_assert(op
->onfinish
);
4089 op
->onfinish
->complete(m
->replyCode
);
4092 ceph_assert(op
->onfinish
);
4093 op
->onfinish
->complete(m
->replyCode
);
4095 op
->onfinish
= NULL
;
4096 if (!sul
.owns_lock()) {
4100 iter
= pool_ops
.find(tid
);
4101 if (iter
!= pool_ops
.end()) {
4102 _finish_pool_op(op
, 0);
4105 ldout(cct
, 10) << "unknown request " << tid
<< dendl
;
4109 // Not strictly necessary, since we'll release it on return.
4112 ldout(cct
, 10) << "done" << dendl
;
4116 int Objecter::pool_op_cancel(ceph_tid_t tid
, int r
)
4118 ceph_assert(initialized
);
4120 unique_lock
wl(rwlock
);
4122 map
<ceph_tid_t
, PoolOp
*>::iterator it
= pool_ops
.find(tid
);
4123 if (it
== pool_ops
.end()) {
4124 ldout(cct
, 10) << __func__
<< " tid " << tid
<< " dne" << dendl
;
4128 ldout(cct
, 10) << __func__
<< " tid " << tid
<< dendl
;
4130 PoolOp
*op
= it
->second
;
4132 op
->onfinish
->complete(r
);
4134 _finish_pool_op(op
, r
);
4138 void Objecter::_finish_pool_op(PoolOp
*op
, int r
)
4140 // rwlock is locked unique
4141 pool_ops
.erase(op
->tid
);
4142 logger
->set(l_osdc_poolop_active
, pool_ops
.size());
4144 if (op
->ontimeout
&& r
!= -ETIMEDOUT
) {
4145 timer
.cancel_event(op
->ontimeout
);
4153 void Objecter::get_pool_stats(list
<string
>& pools
,
4154 map
<string
,pool_stat_t
> *result
,
4158 ldout(cct
, 10) << "get_pool_stats " << pools
<< dendl
;
4160 PoolStatOp
*op
= new PoolStatOp
;
4161 op
->tid
= ++last_tid
;
4163 op
->pool_stats
= result
;
4164 op
->per_pool
= per_pool
;
4165 op
->onfinish
= onfinish
;
4166 if (mon_timeout
> timespan(0)) {
4167 op
->ontimeout
= timer
.add_event(mon_timeout
,
4169 pool_stat_op_cancel(op
->tid
,
4175 unique_lock
wl(rwlock
);
4177 poolstat_ops
[op
->tid
] = op
;
4179 logger
->set(l_osdc_poolstat_active
, poolstat_ops
.size());
4181 _poolstat_submit(op
);
4184 void Objecter::_poolstat_submit(PoolStatOp
*op
)
4186 ldout(cct
, 10) << "_poolstat_submit " << op
->tid
<< dendl
;
4187 monc
->send_mon_message(new MGetPoolStats(monc
->get_fsid(), op
->tid
,
4189 last_seen_pgmap_version
));
4190 op
->last_submit
= ceph::coarse_mono_clock::now();
4192 logger
->inc(l_osdc_poolstat_send
);
4195 void Objecter::handle_get_pool_stats_reply(MGetPoolStatsReply
*m
)
4197 ldout(cct
, 10) << "handle_get_pool_stats_reply " << *m
<< dendl
;
4198 ceph_tid_t tid
= m
->get_tid();
4200 unique_lock
wl(rwlock
);
4206 map
<ceph_tid_t
, PoolStatOp
*>::iterator iter
= poolstat_ops
.find(tid
);
4207 if (iter
!= poolstat_ops
.end()) {
4208 PoolStatOp
*op
= poolstat_ops
[tid
];
4209 ldout(cct
, 10) << "have request " << tid
<< " at " << op
<< dendl
;
4210 *op
->pool_stats
= m
->pool_stats
;
4211 *op
->per_pool
= m
->per_pool
;
4212 if (m
->version
> last_seen_pgmap_version
) {
4213 last_seen_pgmap_version
= m
->version
;
4215 op
->onfinish
->complete(0);
4216 _finish_pool_stat_op(op
, 0);
4218 ldout(cct
, 10) << "unknown request " << tid
<< dendl
;
4220 ldout(cct
, 10) << "done" << dendl
;
4224 int Objecter::pool_stat_op_cancel(ceph_tid_t tid
, int r
)
4226 ceph_assert(initialized
);
4228 unique_lock
wl(rwlock
);
4230 map
<ceph_tid_t
, PoolStatOp
*>::iterator it
= poolstat_ops
.find(tid
);
4231 if (it
== poolstat_ops
.end()) {
4232 ldout(cct
, 10) << __func__
<< " tid " << tid
<< " dne" << dendl
;
4236 ldout(cct
, 10) << __func__
<< " tid " << tid
<< dendl
;
4238 PoolStatOp
*op
= it
->second
;
4240 op
->onfinish
->complete(r
);
4241 _finish_pool_stat_op(op
, r
);
4245 void Objecter::_finish_pool_stat_op(PoolStatOp
*op
, int r
)
4247 // rwlock is locked unique
4249 poolstat_ops
.erase(op
->tid
);
4250 logger
->set(l_osdc_poolstat_active
, poolstat_ops
.size());
4252 if (op
->ontimeout
&& r
!= -ETIMEDOUT
)
4253 timer
.cancel_event(op
->ontimeout
);
4258 void Objecter::get_fs_stats(ceph_statfs
& result
,
4259 boost::optional
<int64_t> data_pool
,
4262 ldout(cct
, 10) << "get_fs_stats" << dendl
;
4263 unique_lock
l(rwlock
);
4265 StatfsOp
*op
= new StatfsOp
;
4266 op
->tid
= ++last_tid
;
4267 op
->stats
= &result
;
4268 op
->data_pool
= data_pool
;
4269 op
->onfinish
= onfinish
;
4270 if (mon_timeout
> timespan(0)) {
4271 op
->ontimeout
= timer
.add_event(mon_timeout
,
4273 statfs_op_cancel(op
->tid
,
4278 statfs_ops
[op
->tid
] = op
;
4280 logger
->set(l_osdc_statfs_active
, statfs_ops
.size());
4282 _fs_stats_submit(op
);
4285 void Objecter::_fs_stats_submit(StatfsOp
*op
)
4287 // rwlock is locked unique
4289 ldout(cct
, 10) << "fs_stats_submit" << op
->tid
<< dendl
;
4290 monc
->send_mon_message(new MStatfs(monc
->get_fsid(), op
->tid
,
4292 last_seen_pgmap_version
));
4293 op
->last_submit
= ceph::coarse_mono_clock::now();
4295 logger
->inc(l_osdc_statfs_send
);
4298 void Objecter::handle_fs_stats_reply(MStatfsReply
*m
)
4300 unique_lock
wl(rwlock
);
4306 ldout(cct
, 10) << "handle_fs_stats_reply " << *m
<< dendl
;
4307 ceph_tid_t tid
= m
->get_tid();
4309 if (statfs_ops
.count(tid
)) {
4310 StatfsOp
*op
= statfs_ops
[tid
];
4311 ldout(cct
, 10) << "have request " << tid
<< " at " << op
<< dendl
;
4312 *(op
->stats
) = m
->h
.st
;
4313 if (m
->h
.version
> last_seen_pgmap_version
)
4314 last_seen_pgmap_version
= m
->h
.version
;
4315 op
->onfinish
->complete(0);
4316 _finish_statfs_op(op
, 0);
4318 ldout(cct
, 10) << "unknown request " << tid
<< dendl
;
4321 ldout(cct
, 10) << "done" << dendl
;
4324 int Objecter::statfs_op_cancel(ceph_tid_t tid
, int r
)
4326 ceph_assert(initialized
);
4328 unique_lock
wl(rwlock
);
4330 map
<ceph_tid_t
, StatfsOp
*>::iterator it
= statfs_ops
.find(tid
);
4331 if (it
== statfs_ops
.end()) {
4332 ldout(cct
, 10) << __func__
<< " tid " << tid
<< " dne" << dendl
;
4336 ldout(cct
, 10) << __func__
<< " tid " << tid
<< dendl
;
4338 StatfsOp
*op
= it
->second
;
4340 op
->onfinish
->complete(r
);
4341 _finish_statfs_op(op
, r
);
4345 void Objecter::_finish_statfs_op(StatfsOp
*op
, int r
)
4347 // rwlock is locked unique
4349 statfs_ops
.erase(op
->tid
);
4350 logger
->set(l_osdc_statfs_active
, statfs_ops
.size());
4352 if (op
->ontimeout
&& r
!= -ETIMEDOUT
)
4353 timer
.cancel_event(op
->ontimeout
);
4360 void Objecter::_sg_read_finish(vector
<ObjectExtent
>& extents
,
4361 vector
<ceph::buffer::list
>& resultbl
,
4362 ceph::buffer::list
*bl
, Context
*onfinish
)
4365 ldout(cct
, 15) << "_sg_read_finish" << dendl
;
4367 if (extents
.size() > 1) {
4368 Striper::StripedReadResult r
;
4369 vector
<ceph::buffer::list
>::iterator bit
= resultbl
.begin();
4370 for (vector
<ObjectExtent
>::iterator eit
= extents
.begin();
4371 eit
!= extents
.end();
4373 r
.add_partial_result(cct
, *bit
, eit
->buffer_extents
);
4376 r
.assemble_result(cct
, *bl
, false);
4378 ldout(cct
, 15) << " only one frag" << dendl
;
4379 bl
->claim(resultbl
[0]);
4383 uint64_t bytes_read
= bl
->length();
4384 ldout(cct
, 7) << "_sg_read_finish " << bytes_read
<< " bytes" << dendl
;
4387 onfinish
->complete(bytes_read
);// > 0 ? bytes_read:m->get_result());
4392 void Objecter::ms_handle_connect(Connection
*con
)
4394 ldout(cct
, 10) << "ms_handle_connect " << con
<< dendl
;
4398 if (con
->get_peer_type() == CEPH_ENTITY_TYPE_MON
)
4402 bool Objecter::ms_handle_reset(Connection
*con
)
4406 if (con
->get_peer_type() == CEPH_ENTITY_TYPE_OSD
) {
4407 unique_lock
wl(rwlock
);
4409 auto priv
= con
->get_priv();
4410 auto session
= static_cast<OSDSession
*>(priv
.get());
4412 ldout(cct
, 1) << "ms_handle_reset " << con
<< " session " << session
4413 << " osd." << session
->osd
<< dendl
;
4414 // the session maybe had been closed if new osdmap just handled
4415 // says the osd down
4416 if (!(initialized
&& osdmap
->is_up(session
->osd
))) {
4417 ldout(cct
, 1) << "ms_handle_reset aborted,initialized=" << initialized
<< dendl
;
4421 map
<uint64_t, LingerOp
*> lresend
;
4422 OSDSession::unique_lock
sl(session
->lock
);
4423 _reopen_session(session
);
4424 _kick_requests(session
, lresend
);
4426 _linger_ops_resend(lresend
, wl
);
4428 maybe_request_map();
4435 void Objecter::ms_handle_remote_reset(Connection
*con
)
4438 * treat these the same.
4440 ms_handle_reset(con
);
4443 bool Objecter::ms_handle_refused(Connection
*con
)
4446 if (osdmap
&& (con
->get_peer_type() == CEPH_ENTITY_TYPE_OSD
)) {
4447 int osd
= osdmap
->identify_osd(con
->get_peer_addr());
4449 ldout(cct
, 1) << "ms_handle_refused on osd." << osd
<< dendl
;
4455 void Objecter::op_target_t::dump(Formatter
*f
) const
4457 f
->dump_stream("pg") << pgid
;
4458 f
->dump_int("osd", osd
);
4459 f
->dump_stream("object_id") << base_oid
;
4460 f
->dump_stream("object_locator") << base_oloc
;
4461 f
->dump_stream("target_object_id") << target_oid
;
4462 f
->dump_stream("target_object_locator") << target_oloc
;
4463 f
->dump_int("paused", (int)paused
);
4464 f
->dump_int("used_replica", (int)used_replica
);
4465 f
->dump_int("precalc_pgid", (int)precalc_pgid
);
4468 void Objecter::_dump_active(OSDSession
*s
)
4470 for (map
<ceph_tid_t
,Op
*>::iterator p
= s
->ops
.begin();
4474 ldout(cct
, 20) << op
->tid
<< "\t" << op
->target
.pgid
4475 << "\tosd." << (op
->session
? op
->session
->osd
: -1)
4476 << "\t" << op
->target
.base_oid
4477 << "\t" << op
->ops
<< dendl
;
4481 void Objecter::_dump_active()
4483 ldout(cct
, 20) << "dump_active .. " << num_homeless_ops
<< " homeless"
4485 for (map
<int, OSDSession
*>::iterator siter
= osd_sessions
.begin();
4486 siter
!= osd_sessions
.end(); ++siter
) {
4487 OSDSession
*s
= siter
->second
;
4488 OSDSession::shared_lock
sl(s
->lock
);
4492 _dump_active(homeless_session
);
4495 void Objecter::dump_active()
4497 shared_lock
rl(rwlock
);
4502 void Objecter::dump_requests(Formatter
*fmt
)
4504 // Read-lock on Objecter held here
4505 fmt
->open_object_section("requests");
4507 dump_linger_ops(fmt
);
4509 dump_pool_stat_ops(fmt
);
4510 dump_statfs_ops(fmt
);
4511 dump_command_ops(fmt
);
4512 fmt
->close_section(); // requests object
4515 void Objecter::_dump_ops(const OSDSession
*s
, Formatter
*fmt
)
4517 for (map
<ceph_tid_t
,Op
*>::const_iterator p
= s
->ops
.begin();
4521 auto age
= std::chrono::duration
<double>(coarse_mono_clock::now() - op
->stamp
);
4522 fmt
->open_object_section("op");
4523 fmt
->dump_unsigned("tid", op
->tid
);
4524 op
->target
.dump(fmt
);
4525 fmt
->dump_stream("last_sent") << op
->stamp
;
4526 fmt
->dump_float("age", age
.count());
4527 fmt
->dump_int("attempts", op
->attempts
);
4528 fmt
->dump_stream("snapid") << op
->snapid
;
4529 fmt
->dump_stream("snap_context") << op
->snapc
;
4530 fmt
->dump_stream("mtime") << op
->mtime
;
4532 fmt
->open_array_section("osd_ops");
4533 for (vector
<OSDOp
>::const_iterator it
= op
->ops
.begin();
4534 it
!= op
->ops
.end();
4536 fmt
->dump_stream("osd_op") << *it
;
4538 fmt
->close_section(); // osd_ops array
4540 fmt
->close_section(); // op object
4544 void Objecter::dump_ops(Formatter
*fmt
)
4546 // Read-lock on Objecter held
4547 fmt
->open_array_section("ops");
4548 for (map
<int, OSDSession
*>::const_iterator siter
= osd_sessions
.begin();
4549 siter
!= osd_sessions
.end(); ++siter
) {
4550 OSDSession
*s
= siter
->second
;
4551 OSDSession::shared_lock
sl(s
->lock
);
4555 _dump_ops(homeless_session
, fmt
);
4556 fmt
->close_section(); // ops array
4559 void Objecter::_dump_linger_ops(const OSDSession
*s
, Formatter
*fmt
)
4561 for (map
<uint64_t, LingerOp
*>::const_iterator p
= s
->linger_ops
.begin();
4562 p
!= s
->linger_ops
.end();
4564 LingerOp
*op
= p
->second
;
4565 fmt
->open_object_section("linger_op");
4566 fmt
->dump_unsigned("linger_id", op
->linger_id
);
4567 op
->target
.dump(fmt
);
4568 fmt
->dump_stream("snapid") << op
->snap
;
4569 fmt
->dump_stream("registered") << op
->registered
;
4570 fmt
->close_section(); // linger_op object
4574 void Objecter::dump_linger_ops(Formatter
*fmt
)
4576 // We have a read-lock on the objecter
4577 fmt
->open_array_section("linger_ops");
4578 for (map
<int, OSDSession
*>::const_iterator siter
= osd_sessions
.begin();
4579 siter
!= osd_sessions
.end(); ++siter
) {
4580 OSDSession
*s
= siter
->second
;
4581 OSDSession::shared_lock
sl(s
->lock
);
4582 _dump_linger_ops(s
, fmt
);
4585 _dump_linger_ops(homeless_session
, fmt
);
4586 fmt
->close_section(); // linger_ops array
4589 void Objecter::_dump_command_ops(const OSDSession
*s
, Formatter
*fmt
)
4591 for (map
<uint64_t, CommandOp
*>::const_iterator p
= s
->command_ops
.begin();
4592 p
!= s
->command_ops
.end();
4594 CommandOp
*op
= p
->second
;
4595 fmt
->open_object_section("command_op");
4596 fmt
->dump_unsigned("command_id", op
->tid
);
4597 fmt
->dump_int("osd", op
->session
? op
->session
->osd
: -1);
4598 fmt
->open_array_section("command");
4599 for (vector
<string
>::const_iterator q
= op
->cmd
.begin();
4600 q
!= op
->cmd
.end(); ++q
)
4601 fmt
->dump_string("word", *q
);
4602 fmt
->close_section();
4603 if (op
->target_osd
>= 0)
4604 fmt
->dump_int("target_osd", op
->target_osd
);
4606 fmt
->dump_stream("target_pg") << op
->target_pg
;
4607 fmt
->close_section(); // command_op object
4611 void Objecter::dump_command_ops(Formatter
*fmt
)
4613 // We have a read-lock on the Objecter here
4614 fmt
->open_array_section("command_ops");
4615 for (map
<int, OSDSession
*>::const_iterator siter
= osd_sessions
.begin();
4616 siter
!= osd_sessions
.end(); ++siter
) {
4617 OSDSession
*s
= siter
->second
;
4618 OSDSession::shared_lock
sl(s
->lock
);
4619 _dump_command_ops(s
, fmt
);
4622 _dump_command_ops(homeless_session
, fmt
);
4623 fmt
->close_section(); // command_ops array
4626 void Objecter::dump_pool_ops(Formatter
*fmt
) const
4628 fmt
->open_array_section("pool_ops");
4629 for (map
<ceph_tid_t
, PoolOp
*>::const_iterator p
= pool_ops
.begin();
4630 p
!= pool_ops
.end();
4632 PoolOp
*op
= p
->second
;
4633 fmt
->open_object_section("pool_op");
4634 fmt
->dump_unsigned("tid", op
->tid
);
4635 fmt
->dump_int("pool", op
->pool
);
4636 fmt
->dump_string("name", op
->name
);
4637 fmt
->dump_int("operation_type", op
->pool_op
);
4638 fmt
->dump_unsigned("crush_rule", op
->crush_rule
);
4639 fmt
->dump_stream("snapid") << op
->snapid
;
4640 fmt
->dump_stream("last_sent") << op
->last_submit
;
4641 fmt
->close_section(); // pool_op object
4643 fmt
->close_section(); // pool_ops array
4646 void Objecter::dump_pool_stat_ops(Formatter
*fmt
) const
4648 fmt
->open_array_section("pool_stat_ops");
4649 for (map
<ceph_tid_t
, PoolStatOp
*>::const_iterator p
= poolstat_ops
.begin();
4650 p
!= poolstat_ops
.end();
4652 PoolStatOp
*op
= p
->second
;
4653 fmt
->open_object_section("pool_stat_op");
4654 fmt
->dump_unsigned("tid", op
->tid
);
4655 fmt
->dump_stream("last_sent") << op
->last_submit
;
4657 fmt
->open_array_section("pools");
4658 for (list
<string
>::const_iterator it
= op
->pools
.begin();
4659 it
!= op
->pools
.end();
4661 fmt
->dump_string("pool", *it
);
4663 fmt
->close_section(); // pools array
4665 fmt
->close_section(); // pool_stat_op object
4667 fmt
->close_section(); // pool_stat_ops array
4670 void Objecter::dump_statfs_ops(Formatter
*fmt
) const
4672 fmt
->open_array_section("statfs_ops");
4673 for (map
<ceph_tid_t
, StatfsOp
*>::const_iterator p
= statfs_ops
.begin();
4674 p
!= statfs_ops
.end();
4676 StatfsOp
*op
= p
->second
;
4677 fmt
->open_object_section("statfs_op");
4678 fmt
->dump_unsigned("tid", op
->tid
);
4679 fmt
->dump_stream("last_sent") << op
->last_submit
;
4680 fmt
->close_section(); // statfs_op object
4682 fmt
->close_section(); // statfs_ops array
4685 Objecter::RequestStateHook::RequestStateHook(Objecter
*objecter
) :
4686 m_objecter(objecter
)
4690 int Objecter::RequestStateHook::call(std::string_view command
,
4691 const cmdmap_t
& cmdmap
,
4694 ceph::buffer::list
& out
)
4696 shared_lock
rl(m_objecter
->rwlock
);
4697 m_objecter
->dump_requests(f
);
4701 void Objecter::blacklist_self(bool set
)
4703 ldout(cct
, 10) << "blacklist_self " << (set
? "add" : "rm") << dendl
;
4706 cmd
.push_back("{\"prefix\":\"osd blacklist\", ");
4708 cmd
.push_back("\"blacklistop\":\"add\",");
4710 cmd
.push_back("\"blacklistop\":\"rm\",");
4712 // this is somewhat imprecise in that we are blacklisting our first addr only
4713 ss
<< messenger
->get_myaddrs().front().get_legacy_str();
4714 cmd
.push_back("\"addr\":\"" + ss
.str() + "\"");
4716 MMonCommand
*m
= new MMonCommand(monc
->get_fsid());
4719 monc
->send_mon_message(m
);
4724 void Objecter::handle_command_reply(MCommandReply
*m
)
4726 unique_lock
wl(rwlock
);
4732 ConnectionRef con
= m
->get_connection();
4733 auto priv
= con
->get_priv();
4734 auto s
= static_cast<OSDSession
*>(priv
.get());
4735 if (!s
|| s
->con
!= con
) {
4736 ldout(cct
, 7) << __func__
<< " no session on con " << con
<< dendl
;
4741 OSDSession::shared_lock
sl(s
->lock
);
4742 map
<ceph_tid_t
,CommandOp
*>::iterator p
= s
->command_ops
.find(m
->get_tid());
4743 if (p
== s
->command_ops
.end()) {
4744 ldout(cct
, 10) << "handle_command_reply tid " << m
->get_tid()
4745 << " not found" << dendl
;
4751 CommandOp
*c
= p
->second
;
4753 m
->get_connection() != c
->session
->con
) {
4754 ldout(cct
, 10) << "handle_command_reply tid " << m
->get_tid()
4755 << " got reply from wrong connection "
4756 << m
->get_connection() << " " << m
->get_source_inst()
4762 if (m
->r
== -EAGAIN
) {
4763 ldout(cct
,10) << __func__
<< " tid " << m
->get_tid()
4764 << " got EAGAIN, requesting map and resending" << dendl
;
4765 // NOTE: This might resend twice... once now, and once again when
4766 // we get an updated osdmap and the PG is found to have moved.
4767 _maybe_request_map();
4775 c
->poutbl
->claim(m
->get_data());
4780 OSDSession::unique_lock
sul(s
->lock
);
4781 _finish_command(c
, m
->r
, m
->rs
);
4787 void Objecter::submit_command(CommandOp
*c
, ceph_tid_t
*ptid
)
4789 shunique_lock
sul(rwlock
, ceph::acquire_unique
);
4791 ceph_tid_t tid
= ++last_tid
;
4792 ldout(cct
, 10) << "_submit_command " << tid
<< " " << c
->cmd
<< dendl
;
4796 OSDSession::unique_lock
hs_wl(homeless_session
->lock
);
4797 _session_command_op_assign(homeless_session
, c
);
4800 _calc_command_target(c
, sul
);
4801 _assign_command_session(c
, sul
);
4802 if (osd_timeout
> timespan(0)) {
4803 c
->ontimeout
= timer
.add_event(osd_timeout
,
4805 command_op_cancel(c
->session
, tid
,
4809 if (!c
->session
->is_homeless()) {
4812 _maybe_request_map();
4814 if (c
->map_check_error
)
4815 _send_command_map_check(c
);
4818 logger
->inc(l_osdc_command_active
);
4821 int Objecter::_calc_command_target(CommandOp
*c
, shunique_lock
& sul
)
4823 ceph_assert(sul
.owns_lock() && sul
.mutex() == &rwlock
);
4825 c
->map_check_error
= 0;
4827 // ignore overlays, just like we do with pg ops
4828 c
->target
.flags
|= CEPH_OSD_FLAG_IGNORE_OVERLAY
;
4830 if (c
->target_osd
>= 0) {
4831 if (!osdmap
->exists(c
->target_osd
)) {
4832 c
->map_check_error
= -ENOENT
;
4833 c
->map_check_error_str
= "osd dne";
4835 return RECALC_OP_TARGET_OSD_DNE
;
4837 if (osdmap
->is_down(c
->target_osd
)) {
4838 c
->map_check_error
= -ENXIO
;
4839 c
->map_check_error_str
= "osd down";
4841 return RECALC_OP_TARGET_OSD_DOWN
;
4843 c
->target
.osd
= c
->target_osd
;
4845 int ret
= _calc_target(&(c
->target
), nullptr, true);
4846 if (ret
== RECALC_OP_TARGET_POOL_DNE
) {
4847 c
->map_check_error
= -ENOENT
;
4848 c
->map_check_error_str
= "pool dne";
4851 } else if (ret
== RECALC_OP_TARGET_OSD_DOWN
) {
4852 c
->map_check_error
= -ENXIO
;
4853 c
->map_check_error_str
= "osd down";
4860 int r
= _get_session(c
->target
.osd
, &s
, sul
);
4861 ceph_assert(r
!= -EAGAIN
); /* shouldn't happen as we're holding the write lock */
4863 if (c
->session
!= s
) {
4865 return RECALC_OP_TARGET_NEED_RESEND
;
4870 ldout(cct
, 20) << "_recalc_command_target " << c
->tid
<< " no change, "
4871 << c
->session
<< dendl
;
4873 return RECALC_OP_TARGET_NO_ACTION
;
4876 void Objecter::_assign_command_session(CommandOp
*c
,
4879 ceph_assert(sul
.owns_lock() && sul
.mutex() == &rwlock
);
4882 int r
= _get_session(c
->target
.osd
, &s
, sul
);
4883 ceph_assert(r
!= -EAGAIN
); /* shouldn't happen as we're holding the write lock */
4885 if (c
->session
!= s
) {
4887 OSDSession
*cs
= c
->session
;
4888 OSDSession::unique_lock
csl(cs
->lock
);
4889 _session_command_op_remove(c
->session
, c
);
4892 OSDSession::unique_lock
sl(s
->lock
);
4893 _session_command_op_assign(s
, c
);
4899 void Objecter::_send_command(CommandOp
*c
)
4901 ldout(cct
, 10) << "_send_command " << c
->tid
<< dendl
;
4902 ceph_assert(c
->session
);
4903 ceph_assert(c
->session
->con
);
4904 MCommand
*m
= new MCommand(monc
->monmap
.fsid
);
4906 m
->set_data(c
->inbl
);
4908 c
->session
->con
->send_message(m
);
4909 logger
->inc(l_osdc_command_send
);
4912 int Objecter::command_op_cancel(OSDSession
*s
, ceph_tid_t tid
, int r
)
4914 ceph_assert(initialized
);
4916 unique_lock
wl(rwlock
);
4918 map
<ceph_tid_t
, CommandOp
*>::iterator it
= s
->command_ops
.find(tid
);
4919 if (it
== s
->command_ops
.end()) {
4920 ldout(cct
, 10) << __func__
<< " tid " << tid
<< " dne" << dendl
;
4924 ldout(cct
, 10) << __func__
<< " tid " << tid
<< dendl
;
4926 CommandOp
*op
= it
->second
;
4927 _command_cancel_map_check(op
);
4928 OSDSession::unique_lock
sl(op
->session
->lock
);
4929 _finish_command(op
, r
, "");
4934 void Objecter::_finish_command(CommandOp
*c
, int r
, string rs
)
4936 // rwlock is locked unique
4937 // session lock is locked
4939 ldout(cct
, 10) << "_finish_command " << c
->tid
<< " = " << r
<< " "
4944 c
->onfinish
->complete(r
);
4946 if (c
->ontimeout
&& r
!= -ETIMEDOUT
)
4947 timer
.cancel_event(c
->ontimeout
);
4949 _session_command_op_remove(c
->session
, c
);
4953 logger
->dec(l_osdc_command_active
);
4956 Objecter::OSDSession::~OSDSession()
4958 // Caller is responsible for re-assigning or
4959 // destroying any ops that were assigned to us
4960 ceph_assert(ops
.empty());
4961 ceph_assert(linger_ops
.empty());
4962 ceph_assert(command_ops
.empty());
4965 Objecter::Objecter(CephContext
*cct_
, Messenger
*m
, MonClient
*mc
,
4968 double osd_timeout
) :
4969 Dispatcher(cct_
), messenger(m
), monc(mc
), finisher(fin
),
4970 trace_endpoint("0.0.0.0", 0, "Objecter"),
4971 osdmap
{std::make_unique
<OSDMap
>()},
4972 homeless_session(new OSDSession(cct
, -1)),
4973 mon_timeout(ceph::make_timespan(mon_timeout
)),
4974 osd_timeout(ceph::make_timespan(osd_timeout
)),
4975 op_throttle_bytes(cct
, "objecter_bytes",
4976 cct
->_conf
->objecter_inflight_op_bytes
),
4977 op_throttle_ops(cct
, "objecter_ops", cct
->_conf
->objecter_inflight_ops
),
4978 retry_writes_after_first_reply(cct
->_conf
->objecter_retry_writes_after_first_reply
)
4981 Objecter::~Objecter()
4983 ceph_assert(homeless_session
->get_nref() == 1);
4984 ceph_assert(num_homeless_ops
== 0);
4985 homeless_session
->put();
4987 ceph_assert(osd_sessions
.empty());
4988 ceph_assert(poolstat_ops
.empty());
4989 ceph_assert(statfs_ops
.empty());
4990 ceph_assert(pool_ops
.empty());
4991 ceph_assert(waiting_for_map
.empty());
4992 ceph_assert(linger_ops
.empty());
4993 ceph_assert(check_latest_map_lingers
.empty());
4994 ceph_assert(check_latest_map_ops
.empty());
4995 ceph_assert(check_latest_map_commands
.empty());
4997 ceph_assert(!m_request_state_hook
);
4998 ceph_assert(!logger
);
5002 * Wait until this OSD map epoch is received before
5003 * sending any more operations to OSDs. Use this
5004 * when it is known that the client can't trust
5005 * anything from before this epoch (e.g. due to
5006 * client blacklist at this epoch).
5008 void Objecter::set_epoch_barrier(epoch_t epoch
)
5010 unique_lock
wl(rwlock
);
5012 ldout(cct
, 7) << __func__
<< ": barrier " << epoch
<< " (was "
5013 << epoch_barrier
<< ") current epoch " << osdmap
->get_epoch()
5015 if (epoch
> epoch_barrier
) {
5016 epoch_barrier
= epoch
;
5017 _maybe_request_map();
5023 hobject_t
Objecter::enumerate_objects_begin()
5028 hobject_t
Objecter::enumerate_objects_end()
5030 return hobject_t::get_max();
5033 struct C_EnumerateReply
: public Context
{
5034 ceph::buffer::list bl
;
5038 std::list
<librados::ListObjectImpl
> *result
;
5039 const hobject_t end
;
5040 const int64_t pool_id
;
5046 C_EnumerateReply(Objecter
*objecter_
, hobject_t
*next_
,
5047 std::list
<librados::ListObjectImpl
> *result_
,
5048 const hobject_t end_
, const int64_t pool_id_
, Context
*on_finish_
) :
5049 objecter(objecter_
), next(next_
), result(result_
),
5050 end(end_
), pool_id(pool_id_
), on_finish(on_finish_
),
5051 epoch(0), budget(-1)
5054 void finish(int r
) override
{
5055 objecter
->_enumerate_reply(
5056 bl
, r
, end
, pool_id
, budget
, epoch
, result
, next
, on_finish
);
5060 void Objecter::enumerate_objects(
5062 const std::string
&ns
,
5063 const hobject_t
&start
,
5064 const hobject_t
&end
,
5066 const ceph::buffer::list
&filter_bl
,
5067 std::list
<librados::ListObjectImpl
> *result
,
5071 ceph_assert(result
);
5073 if (!end
.is_max() && start
> end
) {
5074 lderr(cct
) << __func__
<< ": start " << start
<< " > end " << end
<< dendl
;
5075 on_finish
->complete(-EINVAL
);
5080 lderr(cct
) << __func__
<< ": result size may not be zero" << dendl
;
5081 on_finish
->complete(-EINVAL
);
5085 if (start
.is_max()) {
5086 on_finish
->complete(0);
5090 shared_lock
rl(rwlock
);
5091 ceph_assert(osdmap
->get_epoch());
5092 if (!osdmap
->test_flag(CEPH_OSDMAP_SORTBITWISE
)) {
5094 lderr(cct
) << __func__
<< ": SORTBITWISE cluster flag not set" << dendl
;
5095 on_finish
->complete(-EOPNOTSUPP
);
5098 const pg_pool_t
*p
= osdmap
->get_pg_pool(pool_id
);
5100 lderr(cct
) << __func__
<< ": pool " << pool_id
<< " DNE in osd epoch "
5101 << osdmap
->get_epoch() << dendl
;
5103 on_finish
->complete(-ENOENT
);
5109 ldout(cct
, 20) << __func__
<< ": start=" << start
<< " end=" << end
<< dendl
;
5111 // Stash completion state
5112 C_EnumerateReply
*on_ack
= new C_EnumerateReply(
5113 this, next
, result
, end
, pool_id
, on_finish
);
5116 op
.pg_nls(max
, filter_bl
, start
, 0);
5118 // Issue. See you later in _enumerate_reply
5119 object_locator_t
oloc(pool_id
, ns
);
5120 pg_read(start
.get_hash(), oloc
, op
,
5121 &on_ack
->bl
, 0, on_ack
, &on_ack
->epoch
, &on_ack
->budget
);
5124 void Objecter::_enumerate_reply(
5125 ceph::buffer::list
&bl
,
5127 const hobject_t
&end
,
5128 const int64_t pool_id
,
5130 epoch_t reply_epoch
,
5131 std::list
<librados::ListObjectImpl
> *result
,
5136 put_op_budget_bytes(budget
);
5140 ldout(cct
, 4) << __func__
<< ": remote error " << r
<< dendl
;
5141 on_finish
->complete(r
);
5145 ceph_assert(next
!= NULL
);
5147 // Decode the results
5148 auto iter
= bl
.cbegin();
5149 pg_nls_response_t response
;
5151 decode(response
, iter
);
5153 // extra_info isn't used anywhere. We do this solely to preserve
5154 // backward compatibility
5155 ceph::buffer::list legacy_extra_info
;
5156 decode(legacy_extra_info
, iter
);
5159 ldout(cct
, 10) << __func__
<< ": got " << response
.entries
.size()
5160 << " handle " << response
.handle
5161 << " reply_epoch " << reply_epoch
<< dendl
;
5162 ldout(cct
, 20) << __func__
<< ": response.entries.size "
5163 << response
.entries
.size() << ", response.entries "
5164 << response
.entries
<< dendl
;
5165 if (response
.handle
<= end
) {
5166 *next
= response
.handle
;
5168 ldout(cct
, 10) << __func__
<< ": adjusted next down to end " << end
5172 // drop anything after 'end'
5173 shared_lock
rl(rwlock
);
5174 const pg_pool_t
*pool
= osdmap
->get_pg_pool(pool_id
);
5176 // pool is gone, drop any results which are now meaningless.
5178 on_finish
->complete(-ENOENT
);
5181 while (!response
.entries
.empty()) {
5182 uint32_t hash
= response
.entries
.back().locator
.empty() ?
5183 pool
->hash_key(response
.entries
.back().oid
,
5184 response
.entries
.back().nspace
) :
5185 pool
->hash_key(response
.entries
.back().locator
,
5186 response
.entries
.back().nspace
);
5187 hobject_t
last(response
.entries
.back().oid
,
5188 response
.entries
.back().locator
,
5192 response
.entries
.back().nspace
);
5195 ldout(cct
, 20) << __func__
<< " dropping item " << last
5196 << " >= end " << end
<< dendl
;
5197 response
.entries
.pop_back();
5201 if (!response
.entries
.empty()) {
5202 result
->merge(response
.entries
);
5205 // release the listing context's budget once all
5206 // OPs (in the session) are finished
5208 put_nlist_context_budget(list_context
);
5210 on_finish
->complete(r
);
5215 using namespace librados
;
5217 template <typename T
>
5218 void do_decode(std::vector
<T
>& items
, std::vector
<ceph::buffer::list
>& bls
)
5220 for (auto bl
: bls
) {
5221 auto p
= bl
.cbegin();
5228 struct C_ObjectOperation_scrub_ls
: public Context
{
5229 ceph::buffer::list bl
;
5231 std::vector
<inconsistent_obj_t
> *objects
= nullptr;
5232 std::vector
<inconsistent_snapset_t
> *snapsets
= nullptr;
5235 C_ObjectOperation_scrub_ls(uint32_t *interval
,
5236 std::vector
<inconsistent_obj_t
> *objects
,
5238 : interval(interval
), objects(objects
), rval(rval
) {}
5239 C_ObjectOperation_scrub_ls(uint32_t *interval
,
5240 std::vector
<inconsistent_snapset_t
> *snapsets
,
5242 : interval(interval
), snapsets(snapsets
), rval(rval
) {}
5243 void finish(int r
) override
{
5244 if (r
< 0 && r
!= -EAGAIN
) {
5255 } catch (ceph::buffer::error
&) {
5262 scrub_ls_result_t result
;
5263 auto p
= bl
.cbegin();
5265 *interval
= result
.interval
;
5267 do_decode(*objects
, result
.vals
);
5269 do_decode(*snapsets
, result
.vals
);
5274 template <typename T
>
5275 void do_scrub_ls(::ObjectOperation
*op
,
5276 const scrub_ls_arg_t
& arg
,
5277 std::vector
<T
> *items
,
5281 OSDOp
& osd_op
= op
->add_op(CEPH_OSD_OP_SCRUBLS
);
5282 op
->flags
|= CEPH_OSD_FLAG_PGOP
;
5283 ceph_assert(interval
);
5284 arg
.encode(osd_op
.indata
);
5285 unsigned p
= op
->ops
.size() - 1;
5286 auto *h
= new C_ObjectOperation_scrub_ls
{interval
, items
, rval
};
5287 op
->out_handler
[p
] = h
;
5288 op
->out_bl
[p
] = &h
->bl
;
5289 op
->out_rval
[p
] = rval
;
5293 void ::ObjectOperation::scrub_ls(const librados::object_id_t
& start_after
,
5294 uint64_t max_to_get
,
5295 std::vector
<librados::inconsistent_obj_t
> *objects
,
5299 scrub_ls_arg_t arg
= {*interval
, 0, start_after
, max_to_get
};
5300 do_scrub_ls(this, arg
, objects
, interval
, rval
);
5303 void ::ObjectOperation::scrub_ls(const librados::object_id_t
& start_after
,
5304 uint64_t max_to_get
,
5305 std::vector
<librados::inconsistent_snapset_t
> *snapsets
,
5309 scrub_ls_arg_t arg
= {*interval
, 1, start_after
, max_to_get
};
5310 do_scrub_ls(this, arg
, snapsets
, interval
, rval
);