1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
16 #include "osd/OSDMap.h"
19 #include "mon/MonClient.h"
21 #include "msg/Messenger.h"
22 #include "msg/Message.h"
24 #include "messages/MPing.h"
25 #include "messages/MOSDOp.h"
26 #include "messages/MOSDOpReply.h"
27 #include "messages/MOSDBackoff.h"
28 #include "messages/MOSDMap.h"
30 #include "messages/MPoolOp.h"
31 #include "messages/MPoolOpReply.h"
33 #include "messages/MGetPoolStats.h"
34 #include "messages/MGetPoolStatsReply.h"
35 #include "messages/MStatfs.h"
36 #include "messages/MStatfsReply.h"
38 #include "messages/MMonCommand.h"
40 #include "messages/MCommand.h"
41 #include "messages/MCommandReply.h"
43 #include "messages/MWatchNotify.h"
47 #include "common/config.h"
48 #include "common/perf_counters.h"
49 #include "common/scrub_types.h"
50 #include "include/str_list.h"
51 #include "common/errno.h"
52 #include "common/EventTrace.h"
54 using ceph::real_time
;
55 using ceph::real_clock
;
57 using ceph::mono_clock
;
58 using ceph::mono_time
;
63 #define dout_subsys ceph_subsys_objecter
65 #define dout_prefix *_dout << messenger->get_myname() << ".objecter "
69 l_osdc_first
= 123200,
87 l_osdc_osdop_writefull
,
88 l_osdc_osdop_writesame
,
91 l_osdc_osdop_truncate
,
94 l_osdc_osdop_sparse_read
,
95 l_osdc_osdop_clonerange
,
96 l_osdc_osdop_getxattr
,
97 l_osdc_osdop_setxattr
,
98 l_osdc_osdop_cmpxattr
,
100 l_osdc_osdop_resetxattrs
,
101 l_osdc_osdop_tmap_up
,
102 l_osdc_osdop_tmap_put
,
103 l_osdc_osdop_tmap_get
,
107 l_osdc_osdop_src_cmpxattr
,
109 l_osdc_osdop_pgls_filter
,
112 l_osdc_linger_active
,
114 l_osdc_linger_resend
,
117 l_osdc_poolop_active
,
119 l_osdc_poolop_resend
,
121 l_osdc_poolstat_active
,
122 l_osdc_poolstat_send
,
123 l_osdc_poolstat_resend
,
125 l_osdc_statfs_active
,
127 l_osdc_statfs_resend
,
129 l_osdc_command_active
,
131 l_osdc_command_resend
,
138 l_osdc_osd_session_open
,
139 l_osdc_osd_session_close
,
142 l_osdc_osdop_omap_wr
,
143 l_osdc_osdop_omap_rd
,
144 l_osdc_osdop_omap_del
,
150 // config obs ----------------------------
152 static const char *config_keys
[] = {
157 class Objecter::RequestStateHook
: public AdminSocketHook
{
158 Objecter
*m_objecter
;
160 explicit RequestStateHook(Objecter
*objecter
);
161 bool call(std::string command
, cmdmap_t
& cmdmap
, std::string format
,
162 bufferlist
& out
) override
;
166 * This is a more limited form of C_Contexts, but that requires
167 * a ceph_context which we don't have here.
169 class ObjectOperation::C_TwoContexts
: public Context
{
173 C_TwoContexts(Context
*first
, Context
*second
) :
174 first(first
), second(second
) {}
175 void finish(int r
) override
{
182 ~C_TwoContexts() override
{
188 void ObjectOperation::add_handler(Context
*extra
) {
189 size_t last
= out_handler
.size() - 1;
190 Context
*orig
= out_handler
[last
];
192 Context
*wrapper
= new C_TwoContexts(orig
, extra
);
193 out_handler
[last
] = wrapper
;
195 out_handler
[last
] = extra
;
199 Objecter::OSDSession::unique_completion_lock
Objecter::OSDSession::get_lock(
202 if (oid
.name
.empty())
203 return unique_completion_lock();
205 static constexpr uint32_t HASH_PRIME
= 1021;
206 uint32_t h
= ceph_str_hash_linux(oid
.name
.c_str(), oid
.name
.size())
209 return unique_completion_lock(completion_locks
[h
% num_locks
],
213 const char** Objecter::get_tracked_conf_keys() const
219 void Objecter::handle_conf_change(const struct md_config_t
*conf
,
220 const std::set
<std::string
> &changed
)
222 if (changed
.count("crush_location")) {
223 update_crush_location();
227 void Objecter::update_crush_location()
229 unique_lock
wl(rwlock
);
230 crush_location
= cct
->crush_location
.get_location();
233 // messages ------------------------------
236 * initialize only internal data structures, don't initiate cluster interaction
238 void Objecter::init()
240 assert(!initialized
.read());
243 PerfCountersBuilder
pcb(cct
, "objecter", l_osdc_first
, l_osdc_last
);
245 pcb
.add_u64(l_osdc_op_active
, "op_active", "Operations active", "actv",
246 PerfCountersBuilder::PRIO_CRITICAL
);
247 pcb
.add_u64(l_osdc_op_laggy
, "op_laggy", "Laggy operations");
248 pcb
.add_u64_counter(l_osdc_op_send
, "op_send", "Sent operations");
249 pcb
.add_u64_counter(l_osdc_op_send_bytes
, "op_send_bytes", "Sent data");
250 pcb
.add_u64_counter(l_osdc_op_resend
, "op_resend", "Resent operations");
251 pcb
.add_u64_counter(l_osdc_op_reply
, "op_reply", "Operation reply");
253 pcb
.add_u64_counter(l_osdc_op
, "op", "Operations");
254 pcb
.add_u64_counter(l_osdc_op_r
, "op_r", "Read operations", "rd",
255 PerfCountersBuilder::PRIO_CRITICAL
);
256 pcb
.add_u64_counter(l_osdc_op_w
, "op_w", "Write operations", "wr",
257 PerfCountersBuilder::PRIO_CRITICAL
);
258 pcb
.add_u64_counter(l_osdc_op_rmw
, "op_rmw", "Read-modify-write operations",
259 "rdwr", PerfCountersBuilder::PRIO_INTERESTING
);
260 pcb
.add_u64_counter(l_osdc_op_pg
, "op_pg", "PG operation");
262 pcb
.add_u64_counter(l_osdc_osdop_stat
, "osdop_stat", "Stat operations");
263 pcb
.add_u64_counter(l_osdc_osdop_create
, "osdop_create",
264 "Create object operations");
265 pcb
.add_u64_counter(l_osdc_osdop_read
, "osdop_read", "Read operations");
266 pcb
.add_u64_counter(l_osdc_osdop_write
, "osdop_write", "Write operations");
267 pcb
.add_u64_counter(l_osdc_osdop_writefull
, "osdop_writefull",
268 "Write full object operations");
269 pcb
.add_u64_counter(l_osdc_osdop_writesame
, "osdop_writesame",
270 "Write same operations");
271 pcb
.add_u64_counter(l_osdc_osdop_append
, "osdop_append",
273 pcb
.add_u64_counter(l_osdc_osdop_zero
, "osdop_zero",
274 "Set object to zero operations");
275 pcb
.add_u64_counter(l_osdc_osdop_truncate
, "osdop_truncate",
276 "Truncate object operations");
277 pcb
.add_u64_counter(l_osdc_osdop_delete
, "osdop_delete",
278 "Delete object operations");
279 pcb
.add_u64_counter(l_osdc_osdop_mapext
, "osdop_mapext",
280 "Map extent operations");
281 pcb
.add_u64_counter(l_osdc_osdop_sparse_read
, "osdop_sparse_read",
282 "Sparse read operations");
283 pcb
.add_u64_counter(l_osdc_osdop_clonerange
, "osdop_clonerange",
284 "Clone range operations");
285 pcb
.add_u64_counter(l_osdc_osdop_getxattr
, "osdop_getxattr",
286 "Get xattr operations");
287 pcb
.add_u64_counter(l_osdc_osdop_setxattr
, "osdop_setxattr",
288 "Set xattr operations");
289 pcb
.add_u64_counter(l_osdc_osdop_cmpxattr
, "osdop_cmpxattr",
290 "Xattr comparison operations");
291 pcb
.add_u64_counter(l_osdc_osdop_rmxattr
, "osdop_rmxattr",
292 "Remove xattr operations");
293 pcb
.add_u64_counter(l_osdc_osdop_resetxattrs
, "osdop_resetxattrs",
294 "Reset xattr operations");
295 pcb
.add_u64_counter(l_osdc_osdop_tmap_up
, "osdop_tmap_up",
296 "TMAP update operations");
297 pcb
.add_u64_counter(l_osdc_osdop_tmap_put
, "osdop_tmap_put",
298 "TMAP put operations");
299 pcb
.add_u64_counter(l_osdc_osdop_tmap_get
, "osdop_tmap_get",
300 "TMAP get operations");
301 pcb
.add_u64_counter(l_osdc_osdop_call
, "osdop_call",
302 "Call (execute) operations");
303 pcb
.add_u64_counter(l_osdc_osdop_watch
, "osdop_watch",
304 "Watch by object operations");
305 pcb
.add_u64_counter(l_osdc_osdop_notify
, "osdop_notify",
306 "Notify about object operations");
307 pcb
.add_u64_counter(l_osdc_osdop_src_cmpxattr
, "osdop_src_cmpxattr",
308 "Extended attribute comparison in multi operations");
309 pcb
.add_u64_counter(l_osdc_osdop_pgls
, "osdop_pgls");
310 pcb
.add_u64_counter(l_osdc_osdop_pgls_filter
, "osdop_pgls_filter");
311 pcb
.add_u64_counter(l_osdc_osdop_other
, "osdop_other", "Other operations");
313 pcb
.add_u64(l_osdc_linger_active
, "linger_active",
314 "Active lingering operations");
315 pcb
.add_u64_counter(l_osdc_linger_send
, "linger_send",
316 "Sent lingering operations");
317 pcb
.add_u64_counter(l_osdc_linger_resend
, "linger_resend",
318 "Resent lingering operations");
319 pcb
.add_u64_counter(l_osdc_linger_ping
, "linger_ping",
320 "Sent pings to lingering operations");
322 pcb
.add_u64(l_osdc_poolop_active
, "poolop_active",
323 "Active pool operations");
324 pcb
.add_u64_counter(l_osdc_poolop_send
, "poolop_send",
325 "Sent pool operations");
326 pcb
.add_u64_counter(l_osdc_poolop_resend
, "poolop_resend",
327 "Resent pool operations");
329 pcb
.add_u64(l_osdc_poolstat_active
, "poolstat_active",
330 "Active get pool stat operations");
331 pcb
.add_u64_counter(l_osdc_poolstat_send
, "poolstat_send",
332 "Pool stat operations sent");
333 pcb
.add_u64_counter(l_osdc_poolstat_resend
, "poolstat_resend",
334 "Resent pool stats");
336 pcb
.add_u64(l_osdc_statfs_active
, "statfs_active", "Statfs operations");
337 pcb
.add_u64_counter(l_osdc_statfs_send
, "statfs_send", "Sent FS stats");
338 pcb
.add_u64_counter(l_osdc_statfs_resend
, "statfs_resend",
341 pcb
.add_u64(l_osdc_command_active
, "command_active", "Active commands");
342 pcb
.add_u64_counter(l_osdc_command_send
, "command_send",
344 pcb
.add_u64_counter(l_osdc_command_resend
, "command_resend",
347 pcb
.add_u64(l_osdc_map_epoch
, "map_epoch", "OSD map epoch");
348 pcb
.add_u64_counter(l_osdc_map_full
, "map_full",
349 "Full OSD maps received");
350 pcb
.add_u64_counter(l_osdc_map_inc
, "map_inc",
351 "Incremental OSD maps received");
353 pcb
.add_u64(l_osdc_osd_sessions
, "osd_sessions",
354 "Open sessions"); // open sessions
355 pcb
.add_u64_counter(l_osdc_osd_session_open
, "osd_session_open",
357 pcb
.add_u64_counter(l_osdc_osd_session_close
, "osd_session_close",
359 pcb
.add_u64(l_osdc_osd_laggy
, "osd_laggy", "Laggy OSD sessions");
361 pcb
.add_u64_counter(l_osdc_osdop_omap_wr
, "omap_wr",
362 "OSD OMAP write operations");
363 pcb
.add_u64_counter(l_osdc_osdop_omap_rd
, "omap_rd",
364 "OSD OMAP read operations");
365 pcb
.add_u64_counter(l_osdc_osdop_omap_del
, "omap_del",
366 "OSD OMAP delete operations");
368 logger
= pcb
.create_perf_counters();
369 cct
->get_perfcounters_collection()->add(logger
);
372 m_request_state_hook
= new RequestStateHook(this);
373 AdminSocket
* admin_socket
= cct
->get_admin_socket();
374 int ret
= admin_socket
->register_command("objecter_requests",
376 m_request_state_hook
,
377 "show in-progress osd requests");
379 /* Don't warn on EEXIST, happens if multiple ceph clients
380 * are instantiated from one process */
381 if (ret
< 0 && ret
!= -EEXIST
) {
382 lderr(cct
) << "error registering admin socket command: "
383 << cpp_strerror(ret
) << dendl
;
386 update_crush_location();
388 cct
->_conf
->add_observer(this);
394 * ok, cluster interaction can happen
396 void Objecter::start(const OSDMap
* o
)
398 shared_lock
rl(rwlock
);
402 osdmap
->deepish_copy_from(*o
);
403 } else if (osdmap
->get_epoch() == 0) {
404 _maybe_request_map();
408 void Objecter::shutdown()
410 assert(initialized
.read());
412 unique_lock
wl(rwlock
);
416 cct
->_conf
->remove_observer(this);
418 map
<int,OSDSession
*>::iterator p
;
419 while (!osd_sessions
.empty()) {
420 p
= osd_sessions
.begin();
421 close_session(p
->second
);
424 while(!check_latest_map_lingers
.empty()) {
425 map
<uint64_t, LingerOp
*>::iterator i
= check_latest_map_lingers
.begin();
427 check_latest_map_lingers
.erase(i
->first
);
430 while(!check_latest_map_ops
.empty()) {
431 map
<ceph_tid_t
, Op
*>::iterator i
= check_latest_map_ops
.begin();
433 check_latest_map_ops
.erase(i
->first
);
436 while(!check_latest_map_commands
.empty()) {
437 map
<ceph_tid_t
, CommandOp
*>::iterator i
438 = check_latest_map_commands
.begin();
440 check_latest_map_commands
.erase(i
->first
);
443 while(!poolstat_ops
.empty()) {
444 map
<ceph_tid_t
,PoolStatOp
*>::iterator i
= poolstat_ops
.begin();
446 poolstat_ops
.erase(i
->first
);
449 while(!statfs_ops
.empty()) {
450 map
<ceph_tid_t
, StatfsOp
*>::iterator i
= statfs_ops
.begin();
452 statfs_ops
.erase(i
->first
);
455 while(!pool_ops
.empty()) {
456 map
<ceph_tid_t
, PoolOp
*>::iterator i
= pool_ops
.begin();
458 pool_ops
.erase(i
->first
);
461 ldout(cct
, 20) << __func__
<< " clearing up homeless session..." << dendl
;
462 while(!homeless_session
->linger_ops
.empty()) {
463 std::map
<uint64_t, LingerOp
*>::iterator i
464 = homeless_session
->linger_ops
.begin();
465 ldout(cct
, 10) << " linger_op " << i
->first
<< dendl
;
466 LingerOp
*lop
= i
->second
;
468 OSDSession::unique_lock
swl(homeless_session
->lock
);
469 _session_linger_op_remove(homeless_session
, lop
);
471 linger_ops
.erase(lop
->linger_id
);
472 linger_ops_set
.erase(lop
);
476 while(!homeless_session
->ops
.empty()) {
477 std::map
<ceph_tid_t
, Op
*>::iterator i
= homeless_session
->ops
.begin();
478 ldout(cct
, 10) << " op " << i
->first
<< dendl
;
481 OSDSession::unique_lock
swl(homeless_session
->lock
);
482 _session_op_remove(homeless_session
, op
);
487 while(!homeless_session
->command_ops
.empty()) {
488 std::map
<ceph_tid_t
, CommandOp
*>::iterator i
489 = homeless_session
->command_ops
.begin();
490 ldout(cct
, 10) << " command_op " << i
->first
<< dendl
;
491 CommandOp
*cop
= i
->second
;
493 OSDSession::unique_lock
swl(homeless_session
->lock
);
494 _session_command_op_remove(homeless_session
, cop
);
500 if (timer
.cancel_event(tick_event
)) {
501 ldout(cct
, 10) << " successfully canceled tick" << dendl
;
507 cct
->get_perfcounters_collection()->remove(logger
);
512 // Let go of Objecter write lock so timer thread can shutdown
515 // Outside of lock to avoid cycle WRT calls to RequestStateHook
516 // This is safe because we guarantee no concurrent calls to
517 // shutdown() with the ::initialized check at start.
518 if (m_request_state_hook
) {
519 AdminSocket
* admin_socket
= cct
->get_admin_socket();
520 admin_socket
->unregister_command("objecter_requests");
521 delete m_request_state_hook
;
522 m_request_state_hook
= NULL
;
526 void Objecter::_send_linger(LingerOp
*info
,
529 assert(sul
.owns_lock() && sul
.mutex() == &rwlock
);
532 Context
*oncommit
= NULL
;
533 LingerOp::shared_lock
watchl(info
->watch_lock
);
534 bufferlist
*poutbl
= NULL
;
535 if (info
->registered
&& info
->is_watch
) {
536 ldout(cct
, 15) << "send_linger " << info
->linger_id
<< " reconnect"
538 opv
.push_back(OSDOp());
539 opv
.back().op
.op
= CEPH_OSD_OP_WATCH
;
540 opv
.back().op
.watch
.cookie
= info
->get_cookie();
541 opv
.back().op
.watch
.op
= CEPH_OSD_WATCH_OP_RECONNECT
;
542 opv
.back().op
.watch
.gen
= ++info
->register_gen
;
543 oncommit
= new C_Linger_Reconnect(this, info
);
545 ldout(cct
, 15) << "send_linger " << info
->linger_id
<< " register"
548 C_Linger_Commit
*c
= new C_Linger_Commit(this, info
);
549 if (!info
->is_watch
) {
556 Op
*o
= new Op(info
->target
.base_oid
, info
->target
.base_oloc
,
557 opv
, info
->target
.flags
| CEPH_OSD_FLAG_READ
,
558 oncommit
, info
->pobjver
);
560 o
->snapid
= info
->snap
;
561 o
->snapc
= info
->snapc
;
562 o
->mtime
= info
->mtime
;
564 o
->target
= info
->target
;
565 o
->tid
= last_tid
.inc();
567 // do not resend this; we will send a new op to reregister
568 o
->should_resend
= false;
570 if (info
->register_tid
) {
571 // repeat send. cancel old registeration op, if any.
572 OSDSession::unique_lock
sl(info
->session
->lock
);
573 if (info
->session
->ops
.count(info
->register_tid
)) {
574 Op
*o
= info
->session
->ops
[info
->register_tid
];
575 _op_cancel_map_check(o
);
576 _cancel_linger_op(o
);
580 _op_submit(o
, sul
, &info
->register_tid
);
583 _op_submit_with_budget(o
, sul
, &info
->register_tid
);
586 logger
->inc(l_osdc_linger_send
);
589 void Objecter::_linger_commit(LingerOp
*info
, int r
, bufferlist
& outbl
)
591 LingerOp::unique_lock
wl(info
->watch_lock
);
592 ldout(cct
, 10) << "_linger_commit " << info
->linger_id
<< dendl
;
593 if (info
->on_reg_commit
) {
594 info
->on_reg_commit
->complete(r
);
595 info
->on_reg_commit
= NULL
;
598 // only tell the user the first time we do this
599 info
->registered
= true;
600 info
->pobjver
= NULL
;
602 if (!info
->is_watch
) {
603 // make note of the notify_id
604 bufferlist::iterator p
= outbl
.begin();
606 ::decode(info
->notify_id
, p
);
607 ldout(cct
, 10) << "_linger_commit notify_id=" << info
->notify_id
610 catch (buffer::error
& e
) {
615 struct C_DoWatchError
: public Context
{
617 Objecter::LingerOp
*info
;
619 C_DoWatchError(Objecter
*o
, Objecter::LingerOp
*i
, int r
)
620 : objecter(o
), info(i
), err(r
) {
622 info
->_queued_async();
624 void finish(int r
) override
{
625 Objecter::unique_lock
wl(objecter
->rwlock
);
626 bool canceled
= info
->canceled
;
630 info
->watch_context
->handle_error(info
->get_cookie(), err
);
633 info
->finished_async();
638 int Objecter::_normalize_watch_error(int r
)
640 // translate ENOENT -> ENOTCONN so that a delete->disconnection
641 // notification and a failure to reconnect becuase we raced with
642 // the delete appear the same to the user.
648 void Objecter::_linger_reconnect(LingerOp
*info
, int r
)
650 ldout(cct
, 10) << __func__
<< " " << info
->linger_id
<< " = " << r
651 << " (last_error " << info
->last_error
<< ")" << dendl
;
653 LingerOp::unique_lock
wl(info
->watch_lock
);
654 if (!info
->last_error
) {
655 r
= _normalize_watch_error(r
);
656 info
->last_error
= r
;
657 if (info
->watch_context
) {
658 finisher
->queue(new C_DoWatchError(this, info
, r
));
665 void Objecter::_send_linger_ping(LingerOp
*info
)
667 // rwlock is locked unique
668 // info->session->lock is locked
670 if (cct
->_conf
->objecter_inject_no_watch_ping
) {
671 ldout(cct
, 10) << __func__
<< " " << info
->linger_id
<< " SKIPPING"
675 if (osdmap
->test_flag(CEPH_OSDMAP_PAUSERD
)) {
676 ldout(cct
, 10) << __func__
<< " PAUSERD" << dendl
;
680 ceph::mono_time now
= ceph::mono_clock::now();
681 ldout(cct
, 10) << __func__
<< " " << info
->linger_id
<< " now " << now
684 vector
<OSDOp
> opv(1);
685 opv
[0].op
.op
= CEPH_OSD_OP_WATCH
;
686 opv
[0].op
.watch
.cookie
= info
->get_cookie();
687 opv
[0].op
.watch
.op
= CEPH_OSD_WATCH_OP_PING
;
688 opv
[0].op
.watch
.gen
= info
->register_gen
;
689 C_Linger_Ping
*onack
= new C_Linger_Ping(this, info
);
690 Op
*o
= new Op(info
->target
.base_oid
, info
->target
.base_oloc
,
691 opv
, info
->target
.flags
| CEPH_OSD_FLAG_READ
,
693 o
->target
= info
->target
;
694 o
->should_resend
= false;
696 MOSDOp
*m
= _prepare_osd_op(o
);
697 o
->tid
= last_tid
.inc();
698 _session_op_assign(info
->session
, o
);
700 info
->ping_tid
= o
->tid
;
703 logger
->inc(l_osdc_linger_ping
);
706 void Objecter::_linger_ping(LingerOp
*info
, int r
, mono_time sent
,
707 uint32_t register_gen
)
709 LingerOp::unique_lock
l(info
->watch_lock
);
710 ldout(cct
, 10) << __func__
<< " " << info
->linger_id
711 << " sent " << sent
<< " gen " << register_gen
<< " = " << r
712 << " (last_error " << info
->last_error
713 << " register_gen " << info
->register_gen
<< ")" << dendl
;
714 if (info
->register_gen
== register_gen
) {
716 info
->watch_valid_thru
= sent
;
717 } else if (r
< 0 && !info
->last_error
) {
718 r
= _normalize_watch_error(r
);
719 info
->last_error
= r
;
720 if (info
->watch_context
) {
721 finisher
->queue(new C_DoWatchError(this, info
, r
));
725 ldout(cct
, 20) << " ignoring old gen" << dendl
;
729 int Objecter::linger_check(LingerOp
*info
)
731 LingerOp::shared_lock
l(info
->watch_lock
);
733 mono_time stamp
= info
->watch_valid_thru
;
734 if (!info
->watch_pending_async
.empty())
735 stamp
= MIN(info
->watch_valid_thru
, info
->watch_pending_async
.front());
736 auto age
= mono_clock::now() - stamp
;
738 ldout(cct
, 10) << __func__
<< " " << info
->linger_id
739 << " err " << info
->last_error
740 << " age " << age
<< dendl
;
741 if (info
->last_error
)
742 return info
->last_error
;
743 // return a safe upper bound (we are truncating to ms)
745 1 + std::chrono::duration_cast
<std::chrono::milliseconds
>(age
).count();
748 void Objecter::linger_cancel(LingerOp
*info
)
750 unique_lock
wl(rwlock
);
751 _linger_cancel(info
);
755 void Objecter::_linger_cancel(LingerOp
*info
)
757 // rwlock is locked unique
758 ldout(cct
, 20) << __func__
<< " linger_id=" << info
->linger_id
<< dendl
;
759 if (!info
->canceled
) {
760 OSDSession
*s
= info
->session
;
761 OSDSession::unique_lock
sl(s
->lock
);
762 _session_linger_op_remove(s
, info
);
765 linger_ops
.erase(info
->linger_id
);
766 linger_ops_set
.erase(info
);
767 assert(linger_ops
.size() == linger_ops_set
.size());
769 info
->canceled
= true;
772 logger
->dec(l_osdc_linger_active
);
778 Objecter::LingerOp
*Objecter::linger_register(const object_t
& oid
,
779 const object_locator_t
& oloc
,
782 LingerOp
*info
= new LingerOp
;
783 info
->target
.base_oid
= oid
;
784 info
->target
.base_oloc
= oloc
;
785 if (info
->target
.base_oloc
.key
== oid
)
786 info
->target
.base_oloc
.key
.clear();
787 info
->target
.flags
= flags
;
788 info
->watch_valid_thru
= mono_clock::now();
790 unique_lock
l(rwlock
);
793 info
->linger_id
= ++max_linger_id
;
794 ldout(cct
, 10) << __func__
<< " info " << info
795 << " linger_id " << info
->linger_id
796 << " cookie " << info
->get_cookie()
798 linger_ops
[info
->linger_id
] = info
;
799 linger_ops_set
.insert(info
);
800 assert(linger_ops
.size() == linger_ops_set
.size());
802 info
->get(); // for the caller
806 ceph_tid_t
Objecter::linger_watch(LingerOp
*info
,
808 const SnapContext
& snapc
,
814 info
->is_watch
= true;
817 info
->target
.flags
|= CEPH_OSD_FLAG_WRITE
;
821 info
->pobjver
= objver
;
822 info
->on_reg_commit
= oncommit
;
824 shunique_lock
sul(rwlock
, ceph::acquire_unique
);
825 _linger_submit(info
, sul
);
826 logger
->inc(l_osdc_linger_active
);
828 return info
->linger_id
;
831 ceph_tid_t
Objecter::linger_notify(LingerOp
*info
,
833 snapid_t snap
, bufferlist
& inbl
,
839 info
->target
.flags
|= CEPH_OSD_FLAG_READ
;
842 info
->poutbl
= poutbl
;
843 info
->pobjver
= objver
;
844 info
->on_reg_commit
= onfinish
;
846 shunique_lock
sul(rwlock
, ceph::acquire_unique
);
847 _linger_submit(info
, sul
);
848 logger
->inc(l_osdc_linger_active
);
850 return info
->linger_id
;
853 void Objecter::_linger_submit(LingerOp
*info
, shunique_lock
& sul
)
855 assert(sul
.owns_lock() && sul
.mutex() == &rwlock
);
856 assert(info
->linger_id
);
858 // Populate Op::target
859 OSDSession
*s
= NULL
;
860 _calc_target(&info
->target
, nullptr);
862 // Create LingerOp<->OSDSession relation
863 int r
= _get_session(info
->target
.osd
, &s
, sul
);
865 OSDSession::unique_lock
sl(s
->lock
);
866 _session_linger_op_assign(s
, info
);
870 _send_linger(info
, sul
);
873 struct C_DoWatchNotify
: public Context
{
875 Objecter::LingerOp
*info
;
877 C_DoWatchNotify(Objecter
*o
, Objecter::LingerOp
*i
, MWatchNotify
*m
)
878 : objecter(o
), info(i
), msg(m
) {
880 info
->_queued_async();
883 void finish(int r
) override
{
884 objecter
->_do_watch_notify(info
, msg
);
888 void Objecter::handle_watch_notify(MWatchNotify
*m
)
890 shared_lock
l(rwlock
);
891 if (!initialized
.read()) {
895 LingerOp
*info
= reinterpret_cast<LingerOp
*>(m
->cookie
);
896 if (linger_ops_set
.count(info
) == 0) {
897 ldout(cct
, 7) << __func__
<< " cookie " << m
->cookie
<< " dne" << dendl
;
900 LingerOp::unique_lock
wl(info
->watch_lock
);
901 if (m
->opcode
== CEPH_WATCH_EVENT_DISCONNECT
) {
902 if (!info
->last_error
) {
903 info
->last_error
= -ENOTCONN
;
904 if (info
->watch_context
) {
905 finisher
->queue(new C_DoWatchError(this, info
, -ENOTCONN
));
908 } else if (!info
->is_watch
) {
909 // we have CEPH_WATCH_EVENT_NOTIFY_COMPLETE; we can do this inline
910 // since we know the only user (librados) is safe to call in
911 // fast-dispatch context
912 if (info
->notify_id
&&
913 info
->notify_id
!= m
->notify_id
) {
914 ldout(cct
, 10) << __func__
<< " reply notify " << m
->notify_id
915 << " != " << info
->notify_id
<< ", ignoring" << dendl
;
916 } else if (info
->on_notify_finish
) {
917 info
->notify_result_bl
->claim(m
->get_data());
918 info
->on_notify_finish
->complete(m
->return_code
);
920 // if we race with reconnect we might get a second notify; only
921 // notify the caller once!
922 info
->on_notify_finish
= NULL
;
925 finisher
->queue(new C_DoWatchNotify(this, info
, m
));
929 void Objecter::_do_watch_notify(LingerOp
*info
, MWatchNotify
*m
)
931 ldout(cct
, 10) << __func__
<< " " << *m
<< dendl
;
933 shared_lock
l(rwlock
);
934 assert(initialized
.read());
936 if (info
->canceled
) {
941 // notify completion?
942 assert(info
->is_watch
);
943 assert(info
->watch_context
);
944 assert(m
->opcode
!= CEPH_WATCH_EVENT_DISCONNECT
);
949 case CEPH_WATCH_EVENT_NOTIFY
:
950 info
->watch_context
->handle_notify(m
->notify_id
, m
->cookie
,
951 m
->notifier_gid
, m
->bl
);
956 info
->finished_async();
961 bool Objecter::ms_dispatch(Message
*m
)
963 ldout(cct
, 10) << __func__
<< " " << cct
<< " " << *m
<< dendl
;
964 if (!initialized
.read())
967 switch (m
->get_type()) {
968 // these we exlusively handle
969 case CEPH_MSG_OSD_OPREPLY
:
970 handle_osd_op_reply(static_cast<MOSDOpReply
*>(m
));
973 case CEPH_MSG_OSD_BACKOFF
:
974 handle_osd_backoff(static_cast<MOSDBackoff
*>(m
));
977 case CEPH_MSG_WATCH_NOTIFY
:
978 handle_watch_notify(static_cast<MWatchNotify
*>(m
));
982 case MSG_COMMAND_REPLY
:
983 if (m
->get_source().type() == CEPH_ENTITY_TYPE_OSD
) {
984 handle_command_reply(static_cast<MCommandReply
*>(m
));
990 case MSG_GETPOOLSTATSREPLY
:
991 handle_get_pool_stats_reply(static_cast<MGetPoolStatsReply
*>(m
));
994 case CEPH_MSG_POOLOP_REPLY
:
995 handle_pool_op_reply(static_cast<MPoolOpReply
*>(m
));
998 case CEPH_MSG_STATFS_REPLY
:
999 handle_fs_stats_reply(static_cast<MStatfsReply
*>(m
));
1002 // these we give others a chance to inspect
1005 case CEPH_MSG_OSD_MAP
:
1006 handle_osd_map(static_cast<MOSDMap
*>(m
));
1012 void Objecter::_scan_requests(OSDSession
*s
,
1015 map
<int64_t, bool> *pool_full_map
,
1016 map
<ceph_tid_t
, Op
*>& need_resend
,
1017 list
<LingerOp
*>& need_resend_linger
,
1018 map
<ceph_tid_t
, CommandOp
*>& need_resend_command
,
1021 assert(sul
.owns_lock() && sul
.mutex() == &rwlock
);
1023 list
<LingerOp
*> unregister_lingers
;
1025 OSDSession::unique_lock
sl(s
->lock
);
1027 // check for changed linger mappings (_before_ regular ops)
1028 map
<ceph_tid_t
,LingerOp
*>::iterator lp
= s
->linger_ops
.begin();
1029 while (lp
!= s
->linger_ops
.end()) {
1030 LingerOp
*op
= lp
->second
;
1031 assert(op
->session
== s
);
1032 // check_linger_pool_dne() may touch linger_ops; prevent iterator
1035 ldout(cct
, 10) << " checking linger op " << op
->linger_id
<< dendl
;
1036 bool unregister
, force_resend_writes
= cluster_full
;
1037 int r
= _recalc_linger_op_target(op
, sul
);
1039 force_resend_writes
= force_resend_writes
||
1040 (*pool_full_map
)[op
->target
.base_oloc
.pool
];
1042 case RECALC_OP_TARGET_NO_ACTION
:
1043 if (!force_resend
&& !force_resend_writes
)
1046 case RECALC_OP_TARGET_NEED_RESEND
:
1047 need_resend_linger
.push_back(op
);
1048 _linger_cancel_map_check(op
);
1050 case RECALC_OP_TARGET_POOL_DNE
:
1051 _check_linger_pool_dne(op
, &unregister
);
1053 ldout(cct
, 10) << " need to unregister linger op "
1054 << op
->linger_id
<< dendl
;
1056 unregister_lingers
.push_back(op
);
1062 // check for changed request mappings
1063 map
<ceph_tid_t
,Op
*>::iterator p
= s
->ops
.begin();
1064 while (p
!= s
->ops
.end()) {
1066 ++p
; // check_op_pool_dne() may touch ops; prevent iterator invalidation
1067 ldout(cct
, 10) << " checking op " << op
->tid
<< dendl
;
1068 bool force_resend_writes
= cluster_full
;
1070 force_resend_writes
= force_resend_writes
||
1071 (*pool_full_map
)[op
->target
.base_oloc
.pool
];
1072 int r
= _calc_target(&op
->target
,
1073 op
->session
? op
->session
->con
.get() : nullptr);
1075 case RECALC_OP_TARGET_NO_ACTION
:
1076 if (!force_resend
&& !(force_resend_writes
&& op
->respects_full()))
1079 case RECALC_OP_TARGET_NEED_RESEND
:
1081 _session_op_remove(op
->session
, op
);
1083 need_resend
[op
->tid
] = op
;
1084 _op_cancel_map_check(op
);
1086 case RECALC_OP_TARGET_POOL_DNE
:
1087 _check_op_pool_dne(op
, &sl
);
1093 map
<ceph_tid_t
,CommandOp
*>::iterator cp
= s
->command_ops
.begin();
1094 while (cp
!= s
->command_ops
.end()) {
1095 CommandOp
*c
= cp
->second
;
1097 ldout(cct
, 10) << " checking command " << c
->tid
<< dendl
;
1098 bool force_resend_writes
= cluster_full
;
1100 force_resend_writes
= force_resend_writes
||
1101 (*pool_full_map
)[c
->target_pg
.pool()];
1102 int r
= _calc_command_target(c
, sul
);
1104 case RECALC_OP_TARGET_NO_ACTION
:
1105 // resend if skipped map; otherwise do nothing.
1106 if (!force_resend
&& !force_resend_writes
)
1109 case RECALC_OP_TARGET_NEED_RESEND
:
1110 need_resend_command
[c
->tid
] = c
;
1112 _session_command_op_remove(c
->session
, c
);
1114 _command_cancel_map_check(c
);
1116 case RECALC_OP_TARGET_POOL_DNE
:
1117 case RECALC_OP_TARGET_OSD_DNE
:
1118 case RECALC_OP_TARGET_OSD_DOWN
:
1119 _check_command_map_dne(c
);
1126 for (list
<LingerOp
*>::iterator iter
= unregister_lingers
.begin();
1127 iter
!= unregister_lingers
.end();
1129 _linger_cancel(*iter
);
1134 void Objecter::handle_osd_map(MOSDMap
*m
)
1136 shunique_lock
sul(rwlock
, acquire_unique
);
1137 if (!initialized
.read())
1142 if (m
->fsid
!= monc
->get_fsid()) {
1143 ldout(cct
, 0) << "handle_osd_map fsid " << m
->fsid
1144 << " != " << monc
->get_fsid() << dendl
;
1148 bool was_pauserd
= osdmap
->test_flag(CEPH_OSDMAP_PAUSERD
);
1149 bool cluster_full
= _osdmap_full_flag();
1150 bool was_pausewr
= osdmap
->test_flag(CEPH_OSDMAP_PAUSEWR
) || cluster_full
||
1151 _osdmap_has_pool_full();
1152 map
<int64_t, bool> pool_full_map
;
1153 for (map
<int64_t, pg_pool_t
>::const_iterator it
1154 = osdmap
->get_pools().begin();
1155 it
!= osdmap
->get_pools().end(); ++it
)
1156 pool_full_map
[it
->first
] = _osdmap_pool_full(it
->second
);
1159 list
<LingerOp
*> need_resend_linger
;
1160 map
<ceph_tid_t
, Op
*> need_resend
;
1161 map
<ceph_tid_t
, CommandOp
*> need_resend_command
;
1163 if (m
->get_last() <= osdmap
->get_epoch()) {
1164 ldout(cct
, 3) << "handle_osd_map ignoring epochs ["
1165 << m
->get_first() << "," << m
->get_last()
1166 << "] <= " << osdmap
->get_epoch() << dendl
;
1168 ldout(cct
, 3) << "handle_osd_map got epochs ["
1169 << m
->get_first() << "," << m
->get_last()
1170 << "] > " << osdmap
->get_epoch() << dendl
;
1172 if (osdmap
->get_epoch()) {
1173 bool skipped_map
= false;
1174 // we want incrementals
1175 for (epoch_t e
= osdmap
->get_epoch() + 1;
1179 if (osdmap
->get_epoch() == e
-1 &&
1180 m
->incremental_maps
.count(e
)) {
1181 ldout(cct
, 3) << "handle_osd_map decoding incremental epoch " << e
1183 OSDMap::Incremental
inc(m
->incremental_maps
[e
]);
1184 osdmap
->apply_incremental(inc
);
1185 logger
->inc(l_osdc_map_inc
);
1187 else if (m
->maps
.count(e
)) {
1188 ldout(cct
, 3) << "handle_osd_map decoding full epoch " << e
<< dendl
;
1189 osdmap
->decode(m
->maps
[e
]);
1190 logger
->inc(l_osdc_map_full
);
1193 if (e
>= m
->get_oldest()) {
1194 ldout(cct
, 3) << "handle_osd_map requesting missing epoch "
1195 << osdmap
->get_epoch()+1 << dendl
;
1196 _maybe_request_map();
1199 ldout(cct
, 3) << "handle_osd_map missing epoch "
1200 << osdmap
->get_epoch()+1
1201 << ", jumping to " << m
->get_oldest() << dendl
;
1202 e
= m
->get_oldest() - 1;
1206 logger
->set(l_osdc_map_epoch
, osdmap
->get_epoch());
1208 cluster_full
= cluster_full
|| _osdmap_full_flag();
1209 update_pool_full_map(pool_full_map
);
1211 // check all outstanding requests on every epoch
1212 _scan_requests(homeless_session
, skipped_map
, cluster_full
,
1213 &pool_full_map
, need_resend
,
1214 need_resend_linger
, need_resend_command
, sul
);
1215 for (map
<int,OSDSession
*>::iterator p
= osd_sessions
.begin();
1216 p
!= osd_sessions
.end(); ) {
1217 OSDSession
*s
= p
->second
;
1218 _scan_requests(s
, skipped_map
, cluster_full
,
1219 &pool_full_map
, need_resend
,
1220 need_resend_linger
, need_resend_command
, sul
);
1222 // osd down or addr change?
1223 if (!osdmap
->is_up(s
->osd
) ||
1225 s
->con
->get_peer_addr() != osdmap
->get_inst(s
->osd
).addr
)) {
1230 assert(e
== osdmap
->get_epoch());
1234 // first map. we want the full thing.
1235 if (m
->maps
.count(m
->get_last())) {
1236 for (map
<int,OSDSession
*>::iterator p
= osd_sessions
.begin();
1237 p
!= osd_sessions
.end(); ++p
) {
1238 OSDSession
*s
= p
->second
;
1239 _scan_requests(s
, false, false, NULL
, need_resend
,
1240 need_resend_linger
, need_resend_command
, sul
);
1242 ldout(cct
, 3) << "handle_osd_map decoding full epoch "
1243 << m
->get_last() << dendl
;
1244 osdmap
->decode(m
->maps
[m
->get_last()]);
1246 _scan_requests(homeless_session
, false, false, NULL
,
1247 need_resend
, need_resend_linger
,
1248 need_resend_command
, sul
);
1250 ldout(cct
, 3) << "handle_osd_map hmm, i want a full map, requesting"
1252 monc
->sub_want("osdmap", 0, CEPH_SUBSCRIBE_ONETIME
);
1258 // make sure need_resend targets reflect latest map
1259 for (auto p
= need_resend
.begin(); p
!= need_resend
.end(); ) {
1261 if (op
->target
.epoch
< osdmap
->get_epoch()) {
1262 ldout(cct
, 10) << __func__
<< " checking op " << p
->first
<< dendl
;
1263 int r
= _calc_target(&op
->target
, nullptr);
1264 if (r
== RECALC_OP_TARGET_POOL_DNE
) {
1265 p
= need_resend
.erase(p
);
1266 _check_op_pool_dne(op
, nullptr);
1275 bool pauserd
= osdmap
->test_flag(CEPH_OSDMAP_PAUSERD
);
1276 bool pausewr
= osdmap
->test_flag(CEPH_OSDMAP_PAUSEWR
) || _osdmap_full_flag()
1277 || _osdmap_has_pool_full();
1280 if (was_pauserd
|| was_pausewr
|| pauserd
|| pausewr
||
1281 osdmap
->get_epoch() < epoch_barrier
) {
1282 _maybe_request_map();
1286 for (map
<ceph_tid_t
, Op
*>::iterator p
= need_resend
.begin();
1287 p
!= need_resend
.end(); ++p
) {
1289 OSDSession
*s
= op
->session
;
1290 bool mapped_session
= false;
1292 int r
= _map_session(&op
->target
, &s
, sul
);
1294 mapped_session
= true;
1298 OSDSession::unique_lock
sl(s
->lock
);
1299 if (mapped_session
) {
1300 _session_op_assign(s
, op
);
1302 if (op
->should_resend
) {
1303 if (!op
->session
->is_homeless() && !op
->target
.paused
) {
1304 logger
->inc(l_osdc_op_resend
);
1308 _op_cancel_map_check(op
);
1309 _cancel_linger_op(op
);
1314 for (list
<LingerOp
*>::iterator p
= need_resend_linger
.begin();
1315 p
!= need_resend_linger
.end(); ++p
) {
1318 _calc_target(&op
->target
, nullptr);
1319 OSDSession
*s
= NULL
;
1320 int const r
= _get_session(op
->target
.osd
, &s
, sul
);
1326 if (!op
->session
->is_homeless()) {
1327 logger
->inc(l_osdc_linger_resend
);
1328 _send_linger(op
, sul
);
1331 for (map
<ceph_tid_t
,CommandOp
*>::iterator p
= need_resend_command
.begin();
1332 p
!= need_resend_command
.end(); ++p
) {
1333 CommandOp
*c
= p
->second
;
1334 if (c
->target
.osd
>= 0) {
1335 _assign_command_session(c
, sul
);
1336 if (c
->session
&& !c
->session
->is_homeless()) {
1344 // finish any Contexts that were waiting on a map update
1345 map
<epoch_t
,list
< pair
< Context
*, int > > >::iterator p
=
1346 waiting_for_map
.begin();
1347 while (p
!= waiting_for_map
.end() &&
1348 p
->first
<= osdmap
->get_epoch()) {
1349 //go through the list and call the onfinish methods
1350 for (list
<pair
<Context
*, int> >::iterator i
= p
->second
.begin();
1351 i
!= p
->second
.end(); ++i
) {
1352 i
->first
->complete(i
->second
);
1354 waiting_for_map
.erase(p
++);
1357 monc
->sub_got("osdmap", osdmap
->get_epoch());
1359 if (!waiting_for_map
.empty()) {
1360 _maybe_request_map();
1366 void Objecter::C_Op_Map_Latest::finish(int r
)
1368 if (r
== -EAGAIN
|| r
== -ECANCELED
)
1371 lgeneric_subdout(objecter
->cct
, objecter
, 10)
1372 << "op_map_latest r=" << r
<< " tid=" << tid
1373 << " latest " << latest
<< dendl
;
1375 Objecter::unique_lock
wl(objecter
->rwlock
);
1377 map
<ceph_tid_t
, Op
*>::iterator iter
=
1378 objecter
->check_latest_map_ops
.find(tid
);
1379 if (iter
== objecter
->check_latest_map_ops
.end()) {
1380 lgeneric_subdout(objecter
->cct
, objecter
, 10)
1381 << "op_map_latest op "<< tid
<< " not found" << dendl
;
1385 Op
*op
= iter
->second
;
1386 objecter
->check_latest_map_ops
.erase(iter
);
1388 lgeneric_subdout(objecter
->cct
, objecter
, 20)
1389 << "op_map_latest op "<< op
<< dendl
;
1391 if (op
->map_dne_bound
== 0)
1392 op
->map_dne_bound
= latest
;
1394 OSDSession::unique_lock
sl(op
->session
->lock
, defer_lock
);
1395 objecter
->_check_op_pool_dne(op
, &sl
);
1400 int Objecter::pool_snap_by_name(int64_t poolid
, const char *snap_name
,
1401 snapid_t
*snap
) const
1403 shared_lock
rl(rwlock
);
1405 auto& pools
= osdmap
->get_pools();
1406 auto iter
= pools
.find(poolid
);
1407 if (iter
== pools
.end()) {
1410 const pg_pool_t
& pg_pool
= iter
->second
;
1411 for (auto p
= pg_pool
.snaps
.begin();
1412 p
!= pg_pool
.snaps
.end();
1414 if (p
->second
.name
== snap_name
) {
1422 int Objecter::pool_snap_get_info(int64_t poolid
, snapid_t snap
,
1423 pool_snap_info_t
*info
) const
1425 shared_lock
rl(rwlock
);
1427 auto& pools
= osdmap
->get_pools();
1428 auto iter
= pools
.find(poolid
);
1429 if (iter
== pools
.end()) {
1432 const pg_pool_t
& pg_pool
= iter
->second
;
1433 auto p
= pg_pool
.snaps
.find(snap
);
1434 if (p
== pg_pool
.snaps
.end())
1441 int Objecter::pool_snap_list(int64_t poolid
, vector
<uint64_t> *snaps
)
1443 shared_lock
rl(rwlock
);
1445 const pg_pool_t
*pi
= osdmap
->get_pg_pool(poolid
);
1448 for (map
<snapid_t
,pool_snap_info_t
>::const_iterator p
= pi
->snaps
.begin();
1449 p
!= pi
->snaps
.end();
1451 snaps
->push_back(p
->first
);
1456 // sl may be unlocked.
1457 void Objecter::_check_op_pool_dne(Op
*op
, unique_lock
*sl
)
1459 // rwlock is locked unique
1461 if (op
->target
.pool_ever_existed
) {
1462 // the pool previously existed and now it does not, which means it
1464 op
->map_dne_bound
= osdmap
->get_epoch();
1465 ldout(cct
, 10) << "check_op_pool_dne tid " << op
->tid
1466 << " pool previously exists but now does not"
1469 ldout(cct
, 10) << "check_op_pool_dne tid " << op
->tid
1470 << " current " << osdmap
->get_epoch()
1471 << " map_dne_bound " << op
->map_dne_bound
1474 if (op
->map_dne_bound
> 0) {
1475 if (osdmap
->get_epoch() >= op
->map_dne_bound
) {
1476 // we had a new enough map
1477 ldout(cct
, 10) << "check_op_pool_dne tid " << op
->tid
1478 << " concluding pool " << op
->target
.base_pgid
.pool()
1481 op
->onfinish
->complete(-ENOENT
);
1484 OSDSession
*s
= op
->session
;
1487 assert(sl
->mutex() == &s
->lock
);
1488 bool session_locked
= sl
->owns_lock();
1489 if (!session_locked
) {
1493 if (!session_locked
) {
1497 _finish_op(op
, 0); // no session
1501 _send_op_map_check(op
);
1505 void Objecter::_send_op_map_check(Op
*op
)
1507 // rwlock is locked unique
1509 if (check_latest_map_ops
.count(op
->tid
) == 0) {
1511 check_latest_map_ops
[op
->tid
] = op
;
1512 C_Op_Map_Latest
*c
= new C_Op_Map_Latest(this, op
->tid
);
1513 monc
->get_version("osdmap", &c
->latest
, NULL
, c
);
1517 void Objecter::_op_cancel_map_check(Op
*op
)
1519 // rwlock is locked unique
1520 map
<ceph_tid_t
, Op
*>::iterator iter
=
1521 check_latest_map_ops
.find(op
->tid
);
1522 if (iter
!= check_latest_map_ops
.end()) {
1523 Op
*op
= iter
->second
;
1525 check_latest_map_ops
.erase(iter
);
1529 // linger pool check
1531 void Objecter::C_Linger_Map_Latest::finish(int r
)
1533 if (r
== -EAGAIN
|| r
== -ECANCELED
) {
1534 // ignore callback; we will retry in resend_mon_ops()
1538 unique_lock
wl(objecter
->rwlock
);
1540 map
<uint64_t, LingerOp
*>::iterator iter
=
1541 objecter
->check_latest_map_lingers
.find(linger_id
);
1542 if (iter
== objecter
->check_latest_map_lingers
.end()) {
1546 LingerOp
*op
= iter
->second
;
1547 objecter
->check_latest_map_lingers
.erase(iter
);
1549 if (op
->map_dne_bound
== 0)
1550 op
->map_dne_bound
= latest
;
1553 objecter
->_check_linger_pool_dne(op
, &unregister
);
1556 objecter
->_linger_cancel(op
);
1562 void Objecter::_check_linger_pool_dne(LingerOp
*op
, bool *need_unregister
)
1564 // rwlock is locked unique
1566 *need_unregister
= false;
1568 if (op
->register_gen
> 0) {
1569 ldout(cct
, 10) << "_check_linger_pool_dne linger_id " << op
->linger_id
1570 << " pool previously existed but now does not"
1572 op
->map_dne_bound
= osdmap
->get_epoch();
1574 ldout(cct
, 10) << "_check_linger_pool_dne linger_id " << op
->linger_id
1575 << " current " << osdmap
->get_epoch()
1576 << " map_dne_bound " << op
->map_dne_bound
1579 if (op
->map_dne_bound
> 0) {
1580 if (osdmap
->get_epoch() >= op
->map_dne_bound
) {
1581 if (op
->on_reg_commit
) {
1582 op
->on_reg_commit
->complete(-ENOENT
);
1584 *need_unregister
= true;
1587 _send_linger_map_check(op
);
1591 void Objecter::_send_linger_map_check(LingerOp
*op
)
1594 if (check_latest_map_lingers
.count(op
->linger_id
) == 0) {
1596 check_latest_map_lingers
[op
->linger_id
] = op
;
1597 C_Linger_Map_Latest
*c
= new C_Linger_Map_Latest(this, op
->linger_id
);
1598 monc
->get_version("osdmap", &c
->latest
, NULL
, c
);
1602 void Objecter::_linger_cancel_map_check(LingerOp
*op
)
1604 // rwlock is locked unique
1606 map
<uint64_t, LingerOp
*>::iterator iter
=
1607 check_latest_map_lingers
.find(op
->linger_id
);
1608 if (iter
!= check_latest_map_lingers
.end()) {
1609 LingerOp
*op
= iter
->second
;
1611 check_latest_map_lingers
.erase(iter
);
1615 // command pool check
1617 void Objecter::C_Command_Map_Latest::finish(int r
)
1619 if (r
== -EAGAIN
|| r
== -ECANCELED
) {
1620 // ignore callback; we will retry in resend_mon_ops()
1624 unique_lock
wl(objecter
->rwlock
);
1626 map
<uint64_t, CommandOp
*>::iterator iter
=
1627 objecter
->check_latest_map_commands
.find(tid
);
1628 if (iter
== objecter
->check_latest_map_commands
.end()) {
1632 CommandOp
*c
= iter
->second
;
1633 objecter
->check_latest_map_commands
.erase(iter
);
1635 if (c
->map_dne_bound
== 0)
1636 c
->map_dne_bound
= latest
;
1638 objecter
->_check_command_map_dne(c
);
1643 void Objecter::_check_command_map_dne(CommandOp
*c
)
1645 // rwlock is locked unique
1647 ldout(cct
, 10) << "_check_command_map_dne tid " << c
->tid
1648 << " current " << osdmap
->get_epoch()
1649 << " map_dne_bound " << c
->map_dne_bound
1651 if (c
->map_dne_bound
> 0) {
1652 if (osdmap
->get_epoch() >= c
->map_dne_bound
) {
1653 _finish_command(c
, c
->map_check_error
, c
->map_check_error_str
);
1656 _send_command_map_check(c
);
1660 void Objecter::_send_command_map_check(CommandOp
*c
)
1662 // rwlock is locked unique
1665 if (check_latest_map_commands
.count(c
->tid
) == 0) {
1667 check_latest_map_commands
[c
->tid
] = c
;
1668 C_Command_Map_Latest
*f
= new C_Command_Map_Latest(this, c
->tid
);
1669 monc
->get_version("osdmap", &f
->latest
, NULL
, f
);
1673 void Objecter::_command_cancel_map_check(CommandOp
*c
)
1675 // rwlock is locked uniqe
1677 map
<uint64_t, CommandOp
*>::iterator iter
=
1678 check_latest_map_commands
.find(c
->tid
);
1679 if (iter
!= check_latest_map_commands
.end()) {
1680 CommandOp
*c
= iter
->second
;
1682 check_latest_map_commands
.erase(iter
);
1688 * Look up OSDSession by OSD id.
1690 * @returns 0 on success, or -EAGAIN if the lock context requires
1691 * promotion to write.
1693 int Objecter::_get_session(int osd
, OSDSession
**session
, shunique_lock
& sul
)
1695 assert(sul
&& sul
.mutex() == &rwlock
);
1698 *session
= homeless_session
;
1699 ldout(cct
, 20) << __func__
<< " osd=" << osd
<< " returning homeless"
1704 map
<int,OSDSession
*>::iterator p
= osd_sessions
.find(osd
);
1705 if (p
!= osd_sessions
.end()) {
1706 OSDSession
*s
= p
->second
;
1709 ldout(cct
, 20) << __func__
<< " s=" << s
<< " osd=" << osd
<< " "
1710 << s
->get_nref() << dendl
;
1713 if (!sul
.owns_lock()) {
1716 OSDSession
*s
= new OSDSession(cct
, osd
);
1717 osd_sessions
[osd
] = s
;
1718 s
->con
= messenger
->get_connection(osdmap
->get_inst(osd
));
1719 s
->con
->set_priv(s
->get());
1720 logger
->inc(l_osdc_osd_session_open
);
1721 logger
->set(l_osdc_osd_sessions
, osd_sessions
.size());
1724 ldout(cct
, 20) << __func__
<< " s=" << s
<< " osd=" << osd
<< " "
1725 << s
->get_nref() << dendl
;
1729 void Objecter::put_session(Objecter::OSDSession
*s
)
1731 if (s
&& !s
->is_homeless()) {
1732 ldout(cct
, 20) << __func__
<< " s=" << s
<< " osd=" << s
->osd
<< " "
1733 << s
->get_nref() << dendl
;
1738 void Objecter::get_session(Objecter::OSDSession
*s
)
1742 if (!s
->is_homeless()) {
1743 ldout(cct
, 20) << __func__
<< " s=" << s
<< " osd=" << s
->osd
<< " "
1744 << s
->get_nref() << dendl
;
1749 void Objecter::_reopen_session(OSDSession
*s
)
1751 // s->lock is locked
1753 entity_inst_t inst
= osdmap
->get_inst(s
->osd
);
1754 ldout(cct
, 10) << "reopen_session osd." << s
->osd
<< " session, addr now "
1757 s
->con
->set_priv(NULL
);
1758 s
->con
->mark_down();
1759 logger
->inc(l_osdc_osd_session_close
);
1761 s
->con
= messenger
->get_connection(inst
);
1762 s
->con
->set_priv(s
->get());
1764 logger
->inc(l_osdc_osd_session_open
);
1767 void Objecter::close_session(OSDSession
*s
)
1769 // rwlock is locked unique
1771 ldout(cct
, 10) << "close_session for osd." << s
->osd
<< dendl
;
1773 s
->con
->set_priv(NULL
);
1774 s
->con
->mark_down();
1775 logger
->inc(l_osdc_osd_session_close
);
1777 OSDSession::unique_lock
sl(s
->lock
);
1779 std::list
<LingerOp
*> homeless_lingers
;
1780 std::list
<CommandOp
*> homeless_commands
;
1781 std::list
<Op
*> homeless_ops
;
1783 while (!s
->linger_ops
.empty()) {
1784 std::map
<uint64_t, LingerOp
*>::iterator i
= s
->linger_ops
.begin();
1785 ldout(cct
, 10) << " linger_op " << i
->first
<< dendl
;
1786 homeless_lingers
.push_back(i
->second
);
1787 _session_linger_op_remove(s
, i
->second
);
1790 while (!s
->ops
.empty()) {
1791 std::map
<ceph_tid_t
, Op
*>::iterator i
= s
->ops
.begin();
1792 ldout(cct
, 10) << " op " << i
->first
<< dendl
;
1793 homeless_ops
.push_back(i
->second
);
1794 _session_op_remove(s
, i
->second
);
1797 while (!s
->command_ops
.empty()) {
1798 std::map
<ceph_tid_t
, CommandOp
*>::iterator i
= s
->command_ops
.begin();
1799 ldout(cct
, 10) << " command_op " << i
->first
<< dendl
;
1800 homeless_commands
.push_back(i
->second
);
1801 _session_command_op_remove(s
, i
->second
);
1804 osd_sessions
.erase(s
->osd
);
1808 // Assign any leftover ops to the homeless session
1810 OSDSession::unique_lock
hsl(homeless_session
->lock
);
1811 for (std::list
<LingerOp
*>::iterator i
= homeless_lingers
.begin();
1812 i
!= homeless_lingers
.end(); ++i
) {
1813 _session_linger_op_assign(homeless_session
, *i
);
1815 for (std::list
<Op
*>::iterator i
= homeless_ops
.begin();
1816 i
!= homeless_ops
.end(); ++i
) {
1817 _session_op_assign(homeless_session
, *i
);
1819 for (std::list
<CommandOp
*>::iterator i
= homeless_commands
.begin();
1820 i
!= homeless_commands
.end(); ++i
) {
1821 _session_command_op_assign(homeless_session
, *i
);
1825 logger
->set(l_osdc_osd_sessions
, osd_sessions
.size());
1828 void Objecter::wait_for_osd_map()
1830 unique_lock
l(rwlock
);
1831 if (osdmap
->get_epoch()) {
1836 // Leave this since it goes with C_SafeCond
1841 C_SafeCond
*context
= new C_SafeCond(&lock
, &cond
, &done
, NULL
);
1842 waiting_for_map
[0].push_back(pair
<Context
*, int>(context
, 0));
1849 struct C_Objecter_GetVersion
: public Context
{
1851 uint64_t oldest
, newest
;
1853 C_Objecter_GetVersion(Objecter
*o
, Context
*c
)
1854 : objecter(o
), oldest(0), newest(0), fin(c
) {}
1855 void finish(int r
) override
{
1857 objecter
->get_latest_version(oldest
, newest
, fin
);
1858 } else if (r
== -EAGAIN
) { // try again as instructed
1859 objecter
->wait_for_latest_osdmap(fin
);
1861 // it doesn't return any other error codes!
1867 void Objecter::wait_for_latest_osdmap(Context
*fin
)
1869 ldout(cct
, 10) << __func__
<< dendl
;
1870 C_Objecter_GetVersion
*c
= new C_Objecter_GetVersion(this, fin
);
1871 monc
->get_version("osdmap", &c
->newest
, &c
->oldest
, c
);
1874 void Objecter::get_latest_version(epoch_t oldest
, epoch_t newest
, Context
*fin
)
1876 unique_lock
wl(rwlock
);
1877 _get_latest_version(oldest
, newest
, fin
);
1880 void Objecter::_get_latest_version(epoch_t oldest
, epoch_t newest
,
1883 // rwlock is locked unique
1884 if (osdmap
->get_epoch() >= newest
) {
1885 ldout(cct
, 10) << __func__
<< " latest " << newest
<< ", have it" << dendl
;
1891 ldout(cct
, 10) << __func__
<< " latest " << newest
<< ", waiting" << dendl
;
1892 _wait_for_new_map(fin
, newest
, 0);
1895 void Objecter::maybe_request_map()
1897 shared_lock
rl(rwlock
);
1898 _maybe_request_map();
1901 void Objecter::_maybe_request_map()
1905 if (_osdmap_full_flag()
1906 || osdmap
->test_flag(CEPH_OSDMAP_PAUSERD
)
1907 || osdmap
->test_flag(CEPH_OSDMAP_PAUSEWR
)) {
1908 ldout(cct
, 10) << "_maybe_request_map subscribing (continuous) to next "
1909 "osd map (FULL flag is set)" << dendl
;
1912 << "_maybe_request_map subscribing (onetime) to next osd map" << dendl
;
1913 flag
= CEPH_SUBSCRIBE_ONETIME
;
1915 epoch_t epoch
= osdmap
->get_epoch() ? osdmap
->get_epoch()+1 : 0;
1916 if (monc
->sub_want("osdmap", epoch
, flag
)) {
1921 void Objecter::_wait_for_new_map(Context
*c
, epoch_t epoch
, int err
)
1923 // rwlock is locked unique
1924 waiting_for_map
[epoch
].push_back(pair
<Context
*, int>(c
, err
));
1925 _maybe_request_map();
1930 * Use this together with wait_for_map: this is a pre-check to avoid
1931 * allocating a Context for wait_for_map if we can see that we
1932 * definitely already have the epoch.
1934 * This does *not* replace the need to handle the return value of
1935 * wait_for_map: just because we don't have it in this pre-check
1936 * doesn't mean we won't have it when calling back into wait_for_map,
1937 * since the objecter lock is dropped in between.
1939 bool Objecter::have_map(const epoch_t epoch
)
1941 shared_lock
rl(rwlock
);
1942 if (osdmap
->get_epoch() >= epoch
) {
1949 bool Objecter::wait_for_map(epoch_t epoch
, Context
*c
, int err
)
1951 unique_lock
wl(rwlock
);
1952 if (osdmap
->get_epoch() >= epoch
) {
1955 _wait_for_new_map(c
, epoch
, err
);
1959 void Objecter::kick_requests(OSDSession
*session
)
1961 ldout(cct
, 10) << "kick_requests for osd." << session
->osd
<< dendl
;
1963 map
<uint64_t, LingerOp
*> lresend
;
1964 unique_lock
wl(rwlock
);
1966 OSDSession::unique_lock
sl(session
->lock
);
1967 _kick_requests(session
, lresend
);
1970 _linger_ops_resend(lresend
, wl
);
1973 void Objecter::_kick_requests(OSDSession
*session
,
1974 map
<uint64_t, LingerOp
*>& lresend
)
1976 // rwlock is locked unique
1979 session
->backoffs
.clear();
1980 session
->backoffs_by_id
.clear();
1983 map
<ceph_tid_t
,Op
*> resend
; // resend in tid order
1984 for (map
<ceph_tid_t
, Op
*>::iterator p
= session
->ops
.begin();
1985 p
!= session
->ops
.end();) {
1988 logger
->inc(l_osdc_op_resend
);
1989 if (op
->should_resend
) {
1990 if (!op
->target
.paused
)
1991 resend
[op
->tid
] = op
;
1993 _op_cancel_map_check(op
);
1994 _cancel_linger_op(op
);
1998 while (!resend
.empty()) {
1999 _send_op(resend
.begin()->second
);
2000 resend
.erase(resend
.begin());
2004 for (map
<ceph_tid_t
, LingerOp
*>::iterator j
= session
->linger_ops
.begin();
2005 j
!= session
->linger_ops
.end(); ++j
) {
2006 LingerOp
*op
= j
->second
;
2008 logger
->inc(l_osdc_linger_resend
);
2009 assert(lresend
.count(j
->first
) == 0);
2010 lresend
[j
->first
] = op
;
2014 map
<uint64_t,CommandOp
*> cresend
; // resend in order
2015 for (map
<ceph_tid_t
, CommandOp
*>::iterator k
= session
->command_ops
.begin();
2016 k
!= session
->command_ops
.end(); ++k
) {
2017 logger
->inc(l_osdc_command_resend
);
2018 cresend
[k
->first
] = k
->second
;
2020 while (!cresend
.empty()) {
2021 _send_command(cresend
.begin()->second
);
2022 cresend
.erase(cresend
.begin());
2026 void Objecter::_linger_ops_resend(map
<uint64_t, LingerOp
*>& lresend
,
2029 assert(ul
.owns_lock());
2030 shunique_lock
sul(std::move(ul
));
2031 while (!lresend
.empty()) {
2032 LingerOp
*op
= lresend
.begin()->second
;
2033 if (!op
->canceled
) {
2034 _send_linger(op
, sul
);
2037 lresend
.erase(lresend
.begin());
2039 ul
= unique_lock(sul
.release_to_unique());
2042 void Objecter::start_tick()
2044 assert(tick_event
== 0);
2046 timer
.add_event(ceph::make_timespan(cct
->_conf
->objecter_tick_interval
),
2047 &Objecter::tick
, this);
2050 void Objecter::tick()
2052 shared_lock
rl(rwlock
);
2054 ldout(cct
, 10) << "tick" << dendl
;
2056 // we are only called by C_Tick
2059 if (!initialized
.read()) {
2060 // we raced with shutdown
2061 ldout(cct
, 10) << __func__
<< " raced with shutdown" << dendl
;
2065 set
<OSDSession
*> toping
;
2068 // look for laggy requests
2069 auto cutoff
= ceph::mono_clock::now();
2070 cutoff
-= ceph::make_timespan(cct
->_conf
->objecter_timeout
); // timeout
2072 unsigned laggy_ops
= 0;
2074 for (map
<int,OSDSession
*>::iterator siter
= osd_sessions
.begin();
2075 siter
!= osd_sessions
.end(); ++siter
) {
2076 OSDSession
*s
= siter
->second
;
2077 OSDSession::lock_guard
l(s
->lock
);
2079 for (map
<ceph_tid_t
,Op
*>::iterator p
= s
->ops
.begin();
2083 assert(op
->session
);
2084 if (op
->stamp
< cutoff
) {
2085 ldout(cct
, 2) << " tid " << p
->first
<< " on osd." << op
->session
->osd
2086 << " is laggy" << dendl
;
2091 for (map
<uint64_t,LingerOp
*>::iterator p
= s
->linger_ops
.begin();
2092 p
!= s
->linger_ops
.end();
2094 LingerOp
*op
= p
->second
;
2095 LingerOp::unique_lock
wl(op
->watch_lock
);
2096 assert(op
->session
);
2097 ldout(cct
, 10) << " pinging osd that serves lingering tid " << p
->first
2098 << " (osd." << op
->session
->osd
<< ")" << dendl
;
2100 if (op
->is_watch
&& op
->registered
&& !op
->last_error
)
2101 _send_linger_ping(op
);
2103 for (map
<uint64_t,CommandOp
*>::iterator p
= s
->command_ops
.begin();
2104 p
!= s
->command_ops
.end();
2106 CommandOp
*op
= p
->second
;
2107 assert(op
->session
);
2108 ldout(cct
, 10) << " pinging osd that serves command tid " << p
->first
2109 << " (osd." << op
->session
->osd
<< ")" << dendl
;
2115 if (num_homeless_ops
.read() || !toping
.empty()) {
2116 _maybe_request_map();
2119 logger
->set(l_osdc_op_laggy
, laggy_ops
);
2120 logger
->set(l_osdc_osd_laggy
, toping
.size());
2122 if (!toping
.empty()) {
2123 // send a ping to these osds, to ensure we detect any session resets
2124 // (osd reply message policy is lossy)
2125 for (set
<OSDSession
*>::const_iterator i
= toping
.begin();
2128 (*i
)->con
->send_message(new MPing
);
2132 // Make sure we don't resechedule if we wake up after shutdown
2133 if (initialized
.read()) {
2134 tick_event
= timer
.reschedule_me(ceph::make_timespan(
2135 cct
->_conf
->objecter_tick_interval
));
2139 void Objecter::resend_mon_ops()
2141 unique_lock
wl(rwlock
);
2143 ldout(cct
, 10) << "resend_mon_ops" << dendl
;
2145 for (map
<ceph_tid_t
,PoolStatOp
*>::iterator p
= poolstat_ops
.begin();
2146 p
!= poolstat_ops
.end();
2148 _poolstat_submit(p
->second
);
2149 logger
->inc(l_osdc_poolstat_resend
);
2152 for (map
<ceph_tid_t
,StatfsOp
*>::iterator p
= statfs_ops
.begin();
2153 p
!= statfs_ops
.end();
2155 _fs_stats_submit(p
->second
);
2156 logger
->inc(l_osdc_statfs_resend
);
2159 for (map
<ceph_tid_t
,PoolOp
*>::iterator p
= pool_ops
.begin();
2160 p
!= pool_ops
.end();
2162 _pool_op_submit(p
->second
);
2163 logger
->inc(l_osdc_poolop_resend
);
2166 for (map
<ceph_tid_t
, Op
*>::iterator p
= check_latest_map_ops
.begin();
2167 p
!= check_latest_map_ops
.end();
2169 C_Op_Map_Latest
*c
= new C_Op_Map_Latest(this, p
->second
->tid
);
2170 monc
->get_version("osdmap", &c
->latest
, NULL
, c
);
2173 for (map
<uint64_t, LingerOp
*>::iterator p
= check_latest_map_lingers
.begin();
2174 p
!= check_latest_map_lingers
.end();
2176 C_Linger_Map_Latest
*c
2177 = new C_Linger_Map_Latest(this, p
->second
->linger_id
);
2178 monc
->get_version("osdmap", &c
->latest
, NULL
, c
);
2181 for (map
<uint64_t, CommandOp
*>::iterator p
2182 = check_latest_map_commands
.begin();
2183 p
!= check_latest_map_commands
.end();
2185 C_Command_Map_Latest
*c
= new C_Command_Map_Latest(this, p
->second
->tid
);
2186 monc
->get_version("osdmap", &c
->latest
, NULL
, c
);
2190 // read | write ---------------------------
2192 void Objecter::op_submit(Op
*op
, ceph_tid_t
*ptid
, int *ctx_budget
)
2194 shunique_lock
rl(rwlock
, ceph::acquire_shared
);
2198 _op_submit_with_budget(op
, rl
, ptid
, ctx_budget
);
2201 void Objecter::_op_submit_with_budget(Op
*op
, shunique_lock
& sul
,
2205 assert(initialized
.read());
2207 assert(op
->ops
.size() == op
->out_bl
.size());
2208 assert(op
->ops
.size() == op
->out_rval
.size());
2209 assert(op
->ops
.size() == op
->out_handler
.size());
2211 // throttle. before we look at any state, because
2212 // _take_op_budget() may drop our lock while it blocks.
2213 if (!op
->ctx_budgeted
|| (ctx_budget
&& (*ctx_budget
== -1))) {
2214 int op_budget
= _take_op_budget(op
, sul
);
2215 // take and pass out the budget for the first OP
2216 // in the context session
2217 if (ctx_budget
&& (*ctx_budget
== -1)) {
2218 *ctx_budget
= op_budget
;
2222 if (osd_timeout
> timespan(0)) {
2224 op
->tid
= last_tid
.inc();
2226 op
->ontimeout
= timer
.add_event(osd_timeout
,
2228 op_cancel(tid
, -ETIMEDOUT
); });
2231 _op_submit(op
, sul
, ptid
);
2234 void Objecter::_send_op_account(Op
*op
)
2238 // add to gather set(s)
2240 num_in_flight
.inc();
2242 ldout(cct
, 20) << " note: not requesting reply" << dendl
;
2245 logger
->inc(l_osdc_op_active
);
2246 logger
->inc(l_osdc_op
);
2248 if ((op
->target
.flags
& (CEPH_OSD_FLAG_READ
| CEPH_OSD_FLAG_WRITE
)) ==
2249 (CEPH_OSD_FLAG_READ
|CEPH_OSD_FLAG_WRITE
))
2250 logger
->inc(l_osdc_op_rmw
);
2251 else if (op
->target
.flags
& CEPH_OSD_FLAG_WRITE
)
2252 logger
->inc(l_osdc_op_w
);
2253 else if (op
->target
.flags
& CEPH_OSD_FLAG_READ
)
2254 logger
->inc(l_osdc_op_r
);
2256 if (op
->target
.flags
& CEPH_OSD_FLAG_PGOP
)
2257 logger
->inc(l_osdc_op_pg
);
2259 for (vector
<OSDOp
>::iterator p
= op
->ops
.begin(); p
!= op
->ops
.end(); ++p
) {
2260 int code
= l_osdc_osdop_other
;
2262 case CEPH_OSD_OP_STAT
: code
= l_osdc_osdop_stat
; break;
2263 case CEPH_OSD_OP_CREATE
: code
= l_osdc_osdop_create
; break;
2264 case CEPH_OSD_OP_READ
: code
= l_osdc_osdop_read
; break;
2265 case CEPH_OSD_OP_WRITE
: code
= l_osdc_osdop_write
; break;
2266 case CEPH_OSD_OP_WRITEFULL
: code
= l_osdc_osdop_writefull
; break;
2267 case CEPH_OSD_OP_WRITESAME
: code
= l_osdc_osdop_writesame
; break;
2268 case CEPH_OSD_OP_APPEND
: code
= l_osdc_osdop_append
; break;
2269 case CEPH_OSD_OP_ZERO
: code
= l_osdc_osdop_zero
; break;
2270 case CEPH_OSD_OP_TRUNCATE
: code
= l_osdc_osdop_truncate
; break;
2271 case CEPH_OSD_OP_DELETE
: code
= l_osdc_osdop_delete
; break;
2272 case CEPH_OSD_OP_MAPEXT
: code
= l_osdc_osdop_mapext
; break;
2273 case CEPH_OSD_OP_SPARSE_READ
: code
= l_osdc_osdop_sparse_read
; break;
2274 case CEPH_OSD_OP_GETXATTR
: code
= l_osdc_osdop_getxattr
; break;
2275 case CEPH_OSD_OP_SETXATTR
: code
= l_osdc_osdop_setxattr
; break;
2276 case CEPH_OSD_OP_CMPXATTR
: code
= l_osdc_osdop_cmpxattr
; break;
2277 case CEPH_OSD_OP_RMXATTR
: code
= l_osdc_osdop_rmxattr
; break;
2278 case CEPH_OSD_OP_RESETXATTRS
: code
= l_osdc_osdop_resetxattrs
; break;
2279 case CEPH_OSD_OP_TMAPUP
: code
= l_osdc_osdop_tmap_up
; break;
2280 case CEPH_OSD_OP_TMAPPUT
: code
= l_osdc_osdop_tmap_put
; break;
2281 case CEPH_OSD_OP_TMAPGET
: code
= l_osdc_osdop_tmap_get
; break;
2283 // OMAP read operations
2284 case CEPH_OSD_OP_OMAPGETVALS
:
2285 case CEPH_OSD_OP_OMAPGETKEYS
:
2286 case CEPH_OSD_OP_OMAPGETHEADER
:
2287 case CEPH_OSD_OP_OMAPGETVALSBYKEYS
:
2288 case CEPH_OSD_OP_OMAP_CMP
: code
= l_osdc_osdop_omap_rd
; break;
2290 // OMAP write operations
2291 case CEPH_OSD_OP_OMAPSETVALS
:
2292 case CEPH_OSD_OP_OMAPSETHEADER
: code
= l_osdc_osdop_omap_wr
; break;
2294 // OMAP del operations
2295 case CEPH_OSD_OP_OMAPCLEAR
:
2296 case CEPH_OSD_OP_OMAPRMKEYS
: code
= l_osdc_osdop_omap_del
; break;
2298 case CEPH_OSD_OP_CALL
: code
= l_osdc_osdop_call
; break;
2299 case CEPH_OSD_OP_WATCH
: code
= l_osdc_osdop_watch
; break;
2300 case CEPH_OSD_OP_NOTIFY
: code
= l_osdc_osdop_notify
; break;
2307 void Objecter::_op_submit(Op
*op
, shunique_lock
& sul
, ceph_tid_t
*ptid
)
2311 ldout(cct
, 10) << __func__
<< " op " << op
<< dendl
;
2314 assert(op
->session
== NULL
);
2315 OSDSession
*s
= NULL
;
2317 bool check_for_latest_map
= _calc_target(&op
->target
, nullptr)
2318 == RECALC_OP_TARGET_POOL_DNE
;
2320 // Try to get a session, including a retry if we need to take write lock
2321 int r
= _get_session(op
->target
.osd
, &s
, sul
);
2323 (check_for_latest_map
&& sul
.owns_lock_shared())) {
2324 epoch_t orig_epoch
= osdmap
->get_epoch();
2326 if (cct
->_conf
->objecter_debug_inject_relock_delay
) {
2330 if (orig_epoch
!= osdmap
->get_epoch()) {
2331 // map changed; recalculate mapping
2332 ldout(cct
, 10) << __func__
<< " relock raced with osdmap, recalc target"
2334 check_for_latest_map
= _calc_target(&op
->target
, nullptr)
2335 == RECALC_OP_TARGET_POOL_DNE
;
2345 r
= _get_session(op
->target
.osd
, &s
, sul
);
2348 assert(s
); // may be homeless
2350 _send_op_account(op
);
2354 assert(op
->target
.flags
& (CEPH_OSD_FLAG_READ
|CEPH_OSD_FLAG_WRITE
));
2356 if (osdmap_full_try
) {
2357 op
->target
.flags
|= CEPH_OSD_FLAG_FULL_TRY
;
2360 bool need_send
= false;
2362 if (osdmap
->get_epoch() < epoch_barrier
) {
2363 ldout(cct
, 10) << " barrier, paused " << op
<< " tid " << op
->tid
2365 op
->target
.paused
= true;
2366 _maybe_request_map();
2367 } else if ((op
->target
.flags
& CEPH_OSD_FLAG_WRITE
) &&
2368 osdmap
->test_flag(CEPH_OSDMAP_PAUSEWR
)) {
2369 ldout(cct
, 10) << " paused modify " << op
<< " tid " << op
->tid
2371 op
->target
.paused
= true;
2372 _maybe_request_map();
2373 } else if ((op
->target
.flags
& CEPH_OSD_FLAG_READ
) &&
2374 osdmap
->test_flag(CEPH_OSDMAP_PAUSERD
)) {
2375 ldout(cct
, 10) << " paused read " << op
<< " tid " << op
->tid
2377 op
->target
.paused
= true;
2378 _maybe_request_map();
2379 } else if (op
->respects_full() &&
2380 (_osdmap_full_flag() ||
2381 _osdmap_pool_full(op
->target
.base_oloc
.pool
))) {
2382 ldout(cct
, 0) << " FULL, paused modify " << op
<< " tid "
2383 << op
->tid
<< dendl
;
2384 op
->target
.paused
= true;
2385 _maybe_request_map();
2386 } else if (!s
->is_homeless()) {
2389 _maybe_request_map();
2394 m
= _prepare_osd_op(op
);
2397 OSDSession::unique_lock
sl(s
->lock
);
2399 op
->tid
= last_tid
.inc();
2401 ldout(cct
, 10) << "_op_submit oid " << op
->target
.base_oid
2402 << " '" << op
->target
.base_oloc
<< "' '"
2403 << op
->target
.target_oloc
<< "' " << op
->ops
<< " tid "
2404 << op
->tid
<< " osd." << (!s
->is_homeless() ? s
->osd
: -1)
2407 _session_op_assign(s
, op
);
2413 // Last chance to touch Op here, after giving up session lock it can
2414 // be freed at any time by response handler.
2415 ceph_tid_t tid
= op
->tid
;
2416 if (check_for_latest_map
) {
2417 _send_op_map_check(op
);
2426 ldout(cct
, 5) << num_in_flight
.read() << " in flight" << dendl
;
2429 int Objecter::op_cancel(OSDSession
*s
, ceph_tid_t tid
, int r
)
2431 assert(initialized
.read());
2433 OSDSession::unique_lock
sl(s
->lock
);
2435 map
<ceph_tid_t
, Op
*>::iterator p
= s
->ops
.find(tid
);
2436 if (p
== s
->ops
.end()) {
2437 ldout(cct
, 10) << __func__
<< " tid " << tid
<< " dne in session "
2443 ldout(cct
, 20) << " revoking rx buffer for " << tid
2444 << " on " << s
->con
<< dendl
;
2445 s
->con
->revoke_rx_buffer(tid
);
2448 ldout(cct
, 10) << __func__
<< " tid " << tid
<< " in session " << s
->osd
2452 num_in_flight
.dec();
2453 op
->onfinish
->complete(r
);
2454 op
->onfinish
= NULL
;
2456 _op_cancel_map_check(op
);
2463 int Objecter::op_cancel(ceph_tid_t tid
, int r
)
2467 unique_lock
wl(rwlock
);
2468 ret
= _op_cancel(tid
, r
);
2473 int Objecter::_op_cancel(ceph_tid_t tid
, int r
)
2477 ldout(cct
, 5) << __func__
<< ": cancelling tid " << tid
<< " r=" << r
2482 for (map
<int, OSDSession
*>::iterator siter
= osd_sessions
.begin();
2483 siter
!= osd_sessions
.end(); ++siter
) {
2484 OSDSession
*s
= siter
->second
;
2485 OSDSession::shared_lock
sl(s
->lock
);
2486 if (s
->ops
.find(tid
) != s
->ops
.end()) {
2488 ret
= op_cancel(s
, tid
, r
);
2489 if (ret
== -ENOENT
) {
2490 /* oh no! raced, maybe tid moved to another session, restarting */
2497 ldout(cct
, 5) << __func__
<< ": tid " << tid
2498 << " not found in live sessions" << dendl
;
2500 // Handle case where the op is in homeless session
2501 OSDSession::shared_lock
sl(homeless_session
->lock
);
2502 if (homeless_session
->ops
.find(tid
) != homeless_session
->ops
.end()) {
2504 ret
= op_cancel(homeless_session
, tid
, r
);
2505 if (ret
== -ENOENT
) {
2506 /* oh no! raced, maybe tid moved to another session, restarting */
2515 ldout(cct
, 5) << __func__
<< ": tid " << tid
2516 << " not found in homeless session" << dendl
;
2522 epoch_t
Objecter::op_cancel_writes(int r
, int64_t pool
)
2524 unique_lock
wl(rwlock
);
2526 std::vector
<ceph_tid_t
> to_cancel
;
2529 for (map
<int, OSDSession
*>::iterator siter
= osd_sessions
.begin();
2530 siter
!= osd_sessions
.end(); ++siter
) {
2531 OSDSession
*s
= siter
->second
;
2532 OSDSession::shared_lock
sl(s
->lock
);
2533 for (map
<ceph_tid_t
, Op
*>::iterator op_i
= s
->ops
.begin();
2534 op_i
!= s
->ops
.end(); ++op_i
) {
2535 if (op_i
->second
->target
.flags
& CEPH_OSD_FLAG_WRITE
2536 && (pool
== -1 || op_i
->second
->target
.target_oloc
.pool
== pool
)) {
2537 to_cancel
.push_back(op_i
->first
);
2542 for (std::vector
<ceph_tid_t
>::iterator titer
= to_cancel
.begin();
2543 titer
!= to_cancel
.end();
2545 int cancel_result
= op_cancel(s
, *titer
, r
);
2546 // We hold rwlock across search and cancellation, so cancels
2547 // should always succeed
2548 assert(cancel_result
== 0);
2550 if (!found
&& to_cancel
.size())
2555 const epoch_t epoch
= osdmap
->get_epoch();
2566 bool Objecter::is_pg_changed(
2568 const vector
<int>& oldacting
,
2570 const vector
<int>& newacting
,
2573 if (OSDMap::primary_changed(
2579 if (any_change
&& oldacting
!= newacting
)
2581 return false; // same primary (tho replicas may have changed)
2584 bool Objecter::target_should_be_paused(op_target_t
*t
)
2586 const pg_pool_t
*pi
= osdmap
->get_pg_pool(t
->base_oloc
.pool
);
2587 bool pauserd
= osdmap
->test_flag(CEPH_OSDMAP_PAUSERD
);
2588 bool pausewr
= osdmap
->test_flag(CEPH_OSDMAP_PAUSEWR
) ||
2589 _osdmap_full_flag() || _osdmap_pool_full(*pi
);
2591 return (t
->flags
& CEPH_OSD_FLAG_READ
&& pauserd
) ||
2592 (t
->flags
& CEPH_OSD_FLAG_WRITE
&& pausewr
) ||
2593 (osdmap
->get_epoch() < epoch_barrier
);
2597 * Locking public accessor for _osdmap_full_flag
2599 bool Objecter::osdmap_full_flag() const
2601 shared_lock
rl(rwlock
);
2603 return _osdmap_full_flag();
2606 bool Objecter::osdmap_pool_full(const int64_t pool_id
) const
2608 shared_lock
rl(rwlock
);
2610 if (_osdmap_full_flag()) {
2614 return _osdmap_pool_full(pool_id
);
2617 bool Objecter::_osdmap_pool_full(const int64_t pool_id
) const
2619 const pg_pool_t
*pool
= osdmap
->get_pg_pool(pool_id
);
2621 ldout(cct
, 4) << __func__
<< ": DNE pool " << pool_id
<< dendl
;
2625 return _osdmap_pool_full(*pool
);
2628 bool Objecter::_osdmap_has_pool_full() const
2630 for (map
<int64_t, pg_pool_t
>::const_iterator it
2631 = osdmap
->get_pools().begin();
2632 it
!= osdmap
->get_pools().end(); ++it
) {
2633 if (_osdmap_pool_full(it
->second
))
2639 bool Objecter::_osdmap_pool_full(const pg_pool_t
&p
) const
2641 return p
.has_flag(pg_pool_t::FLAG_FULL
) && honor_osdmap_full
;
2645 * Wrapper around osdmap->test_flag for special handling of the FULL flag.
2647 bool Objecter::_osdmap_full_flag() const
2649 // Ignore the FULL flag if the caller has honor_osdmap_full
2650 return osdmap
->test_flag(CEPH_OSDMAP_FULL
) && honor_osdmap_full
;
2653 void Objecter::update_pool_full_map(map
<int64_t, bool>& pool_full_map
)
2655 for (map
<int64_t, pg_pool_t
>::const_iterator it
2656 = osdmap
->get_pools().begin();
2657 it
!= osdmap
->get_pools().end(); ++it
) {
2658 if (pool_full_map
.find(it
->first
) == pool_full_map
.end()) {
2659 pool_full_map
[it
->first
] = _osdmap_pool_full(it
->second
);
2661 pool_full_map
[it
->first
] = _osdmap_pool_full(it
->second
) ||
2662 pool_full_map
[it
->first
];
2667 int64_t Objecter::get_object_hash_position(int64_t pool
, const string
& key
,
2670 shared_lock
rl(rwlock
);
2671 const pg_pool_t
*p
= osdmap
->get_pg_pool(pool
);
2674 return p
->hash_key(key
, ns
);
2677 int64_t Objecter::get_object_pg_hash_position(int64_t pool
, const string
& key
,
2680 shared_lock
rl(rwlock
);
2681 const pg_pool_t
*p
= osdmap
->get_pg_pool(pool
);
2684 return p
->raw_hash_to_pg(p
->hash_key(key
, ns
));
2687 int Objecter::_calc_target(op_target_t
*t
, Connection
*con
, bool any_change
)
2690 bool is_read
= t
->flags
& CEPH_OSD_FLAG_READ
;
2691 bool is_write
= t
->flags
& CEPH_OSD_FLAG_WRITE
;
2692 t
->epoch
= osdmap
->get_epoch();
2693 ldout(cct
,20) << __func__
<< " epoch " << t
->epoch
2694 << " base " << t
->base_oid
<< " " << t
->base_oloc
2695 << " precalc_pgid " << (int)t
->precalc_pgid
2696 << " pgid " << t
->base_pgid
2697 << (is_read
? " is_read" : "")
2698 << (is_write
? " is_write" : "")
2701 const pg_pool_t
*pi
= osdmap
->get_pg_pool(t
->base_oloc
.pool
);
2704 return RECALC_OP_TARGET_POOL_DNE
;
2706 ldout(cct
,30) << __func__
<< " base pi " << pi
2707 << " pg_num " << pi
->get_pg_num() << dendl
;
2709 bool force_resend
= false;
2710 if (osdmap
->get_epoch() == pi
->last_force_op_resend
) {
2711 if (t
->last_force_resend
< pi
->last_force_op_resend
) {
2712 t
->last_force_resend
= pi
->last_force_op_resend
;
2713 force_resend
= true;
2714 } else if (t
->last_force_resend
== 0) {
2715 force_resend
= true;
2720 t
->target_oid
= t
->base_oid
;
2721 t
->target_oloc
= t
->base_oloc
;
2722 if ((t
->flags
& CEPH_OSD_FLAG_IGNORE_OVERLAY
) == 0) {
2723 if (is_read
&& pi
->has_read_tier())
2724 t
->target_oloc
.pool
= pi
->read_tier
;
2725 if (is_write
&& pi
->has_write_tier())
2726 t
->target_oloc
.pool
= pi
->write_tier
;
2727 pi
= osdmap
->get_pg_pool(t
->target_oloc
.pool
);
2730 return RECALC_OP_TARGET_POOL_DNE
;
2735 if (t
->precalc_pgid
) {
2736 assert(t
->flags
& CEPH_OSD_FLAG_IGNORE_OVERLAY
);
2737 assert(t
->base_oid
.name
.empty()); // make sure this is a pg op
2738 assert(t
->base_oloc
.pool
== (int64_t)t
->base_pgid
.pool());
2739 pgid
= t
->base_pgid
;
2741 int ret
= osdmap
->object_locator_to_pg(t
->target_oid
, t
->target_oloc
,
2743 if (ret
== -ENOENT
) {
2745 return RECALC_OP_TARGET_POOL_DNE
;
2748 ldout(cct
,20) << __func__
<< " target " << t
->target_oid
<< " "
2749 << t
->target_oloc
<< " -> pgid " << pgid
<< dendl
;
2750 ldout(cct
,30) << __func__
<< " target pi " << pi
2751 << " pg_num " << pi
->get_pg_num() << dendl
;
2752 t
->pool_ever_existed
= true;
2754 int size
= pi
->size
;
2755 int min_size
= pi
->min_size
;
2756 unsigned pg_num
= pi
->get_pg_num();
2757 int up_primary
, acting_primary
;
2758 vector
<int> up
, acting
;
2759 osdmap
->pg_to_up_acting_osds(pgid
, &up
, &up_primary
,
2760 &acting
, &acting_primary
);
2761 bool sort_bitwise
= osdmap
->test_flag(CEPH_OSDMAP_SORTBITWISE
);
2762 unsigned prev_seed
= ceph_stable_mod(pgid
.ps(), t
->pg_num
, t
->pg_num_mask
);
2763 pg_t
prev_pgid(prev_seed
, pgid
.pool());
2764 if (any_change
&& PastIntervals::is_new_interval(
2782 force_resend
= true;
2785 bool unpaused
= false;
2786 if (t
->paused
&& !target_should_be_paused(t
)) {
2791 bool legacy_change
=
2794 t
->acting_primary
, t
->acting
, acting_primary
, acting
,
2795 t
->used_replica
|| any_change
);
2798 split
= prev_pgid
.is_split(t
->pg_num
, pg_num
, nullptr);
2801 if (legacy_change
|| split
|| force_resend
) {
2804 t
->acting_primary
= acting_primary
;
2805 t
->up_primary
= up_primary
;
2808 t
->min_size
= min_size
;
2810 t
->pg_num_mask
= pi
->get_pg_num_mask();
2811 osdmap
->get_primary_shard(
2812 pg_t(ceph_stable_mod(pgid
.ps(), t
->pg_num
, t
->pg_num_mask
), pgid
.pool()),
2814 t
->sort_bitwise
= sort_bitwise
;
2815 ldout(cct
, 10) << __func__
<< " "
2816 << " raw pgid " << pgid
<< " -> actual " << t
->actual_pgid
2817 << " acting " << acting
2818 << " primary " << acting_primary
<< dendl
;
2819 t
->used_replica
= false;
2820 if (acting_primary
== -1) {
2824 bool read
= is_read
&& !is_write
;
2825 if (read
&& (t
->flags
& CEPH_OSD_FLAG_BALANCE_READS
)) {
2826 int p
= rand() % acting
.size();
2828 t
->used_replica
= true;
2830 ldout(cct
, 10) << " chose random osd." << osd
<< " of " << acting
2832 } else if (read
&& (t
->flags
& CEPH_OSD_FLAG_LOCALIZE_READS
) &&
2833 acting
.size() > 1) {
2834 // look for a local replica. prefer the primary if the
2835 // distance is the same.
2837 int best_locality
= 0;
2838 for (unsigned i
= 0; i
< acting
.size(); ++i
) {
2839 int locality
= osdmap
->crush
->get_common_ancestor_distance(
2840 cct
, acting
[i
], crush_location
);
2841 ldout(cct
, 20) << __func__
<< " localize: rank " << i
2842 << " osd." << acting
[i
]
2843 << " locality " << locality
<< dendl
;
2845 (locality
>= 0 && best_locality
>= 0 &&
2846 locality
< best_locality
) ||
2847 (best_locality
< 0 && locality
>= 0)) {
2849 best_locality
= locality
;
2851 t
->used_replica
= true;
2857 osd
= acting_primary
;
2862 if (legacy_change
|| unpaused
|| force_resend
) {
2863 return RECALC_OP_TARGET_NEED_RESEND
;
2865 if (split
&& con
&& con
->has_features(CEPH_FEATUREMASK_RESEND_ON_SPLIT
)) {
2866 return RECALC_OP_TARGET_NEED_RESEND
;
2868 return RECALC_OP_TARGET_NO_ACTION
;
2871 int Objecter::_map_session(op_target_t
*target
, OSDSession
**s
,
2874 _calc_target(target
, nullptr);
2875 return _get_session(target
->osd
, s
, sul
);
2878 void Objecter::_session_op_assign(OSDSession
*to
, Op
*op
)
2880 // to->lock is locked
2881 assert(op
->session
== NULL
);
2886 to
->ops
[op
->tid
] = op
;
2888 if (to
->is_homeless()) {
2889 num_homeless_ops
.inc();
2892 ldout(cct
, 15) << __func__
<< " " << to
->osd
<< " " << op
->tid
<< dendl
;
2895 void Objecter::_session_op_remove(OSDSession
*from
, Op
*op
)
2897 assert(op
->session
== from
);
2898 // from->lock is locked
2900 if (from
->is_homeless()) {
2901 num_homeless_ops
.dec();
2904 from
->ops
.erase(op
->tid
);
2908 ldout(cct
, 15) << __func__
<< " " << from
->osd
<< " " << op
->tid
<< dendl
;
2911 void Objecter::_session_linger_op_assign(OSDSession
*to
, LingerOp
*op
)
2913 // to lock is locked unique
2914 assert(op
->session
== NULL
);
2916 if (to
->is_homeless()) {
2917 num_homeless_ops
.inc();
2922 to
->linger_ops
[op
->linger_id
] = op
;
2924 ldout(cct
, 15) << __func__
<< " " << to
->osd
<< " " << op
->linger_id
2928 void Objecter::_session_linger_op_remove(OSDSession
*from
, LingerOp
*op
)
2930 assert(from
== op
->session
);
2931 // from->lock is locked unique
2933 if (from
->is_homeless()) {
2934 num_homeless_ops
.dec();
2937 from
->linger_ops
.erase(op
->linger_id
);
2941 ldout(cct
, 15) << __func__
<< " " << from
->osd
<< " " << op
->linger_id
2945 void Objecter::_session_command_op_remove(OSDSession
*from
, CommandOp
*op
)
2947 assert(from
== op
->session
);
2948 // from->lock is locked
2950 if (from
->is_homeless()) {
2951 num_homeless_ops
.dec();
2954 from
->command_ops
.erase(op
->tid
);
2958 ldout(cct
, 15) << __func__
<< " " << from
->osd
<< " " << op
->tid
<< dendl
;
2961 void Objecter::_session_command_op_assign(OSDSession
*to
, CommandOp
*op
)
2963 // to->lock is locked
2964 assert(op
->session
== NULL
);
2967 if (to
->is_homeless()) {
2968 num_homeless_ops
.inc();
2973 to
->command_ops
[op
->tid
] = op
;
2975 ldout(cct
, 15) << __func__
<< " " << to
->osd
<< " " << op
->tid
<< dendl
;
2978 int Objecter::_recalc_linger_op_target(LingerOp
*linger_op
,
2981 // rwlock is locked unique
2983 int r
= _calc_target(&linger_op
->target
, nullptr, true);
2984 if (r
== RECALC_OP_TARGET_NEED_RESEND
) {
2985 ldout(cct
, 10) << "recalc_linger_op_target tid " << linger_op
->linger_id
2986 << " pgid " << linger_op
->target
.pgid
2987 << " acting " << linger_op
->target
.acting
<< dendl
;
2989 OSDSession
*s
= NULL
;
2990 r
= _get_session(linger_op
->target
.osd
, &s
, sul
);
2993 if (linger_op
->session
!= s
) {
2994 // NB locking two sessions (s and linger_op->session) at the
2995 // same time here is only safe because we are the only one that
2996 // takes two, and we are holding rwlock for write. Disable
2997 // lockdep because it doesn't know that.
2998 OSDSession::unique_lock
sl(s
->lock
);
2999 _session_linger_op_remove(linger_op
->session
, linger_op
);
3000 _session_linger_op_assign(s
, linger_op
);
3004 return RECALC_OP_TARGET_NEED_RESEND
;
3009 void Objecter::_cancel_linger_op(Op
*op
)
3011 ldout(cct
, 15) << "cancel_op " << op
->tid
<< dendl
;
3013 assert(!op
->should_resend
);
3015 delete op
->onfinish
;
3016 num_in_flight
.dec();
3022 void Objecter::_finish_op(Op
*op
, int r
)
3024 ldout(cct
, 15) << "finish_op " << op
->tid
<< dendl
;
3026 // op->session->lock is locked unique or op->session is null
3028 if (!op
->ctx_budgeted
&& op
->budgeted
)
3031 if (op
->ontimeout
&& r
!= -ETIMEDOUT
)
3032 timer
.cancel_event(op
->ontimeout
);
3035 _session_op_remove(op
->session
, op
);
3038 logger
->dec(l_osdc_op_active
);
3040 assert(check_latest_map_ops
.find(op
->tid
) == check_latest_map_ops
.end());
3047 void Objecter::finish_op(OSDSession
*session
, ceph_tid_t tid
)
3049 ldout(cct
, 15) << "finish_op " << tid
<< dendl
;
3050 shared_lock
rl(rwlock
);
3052 OSDSession::unique_lock
wl(session
->lock
);
3054 map
<ceph_tid_t
, Op
*>::iterator iter
= session
->ops
.find(tid
);
3055 if (iter
== session
->ops
.end())
3058 Op
*op
= iter
->second
;
3063 MOSDOp
*Objecter::_prepare_osd_op(Op
*op
)
3067 int flags
= op
->target
.flags
;
3068 flags
|= CEPH_OSD_FLAG_KNOWN_REDIR
;
3070 // Nothing checks this any longer, but needed for compatibility with
3071 // pre-luminous osds
3072 flags
|= CEPH_OSD_FLAG_ONDISK
;
3074 if (!honor_osdmap_full
)
3075 flags
|= CEPH_OSD_FLAG_FULL_FORCE
;
3077 op
->target
.paused
= false;
3078 op
->stamp
= ceph::mono_clock::now();
3080 hobject_t hobj
= op
->target
.get_hobj();
3081 MOSDOp
*m
= new MOSDOp(client_inc
.read(), op
->tid
,
3082 hobj
, op
->target
.actual_pgid
,
3083 osdmap
->get_epoch(),
3084 flags
, op
->features
);
3086 m
->set_snapid(op
->snapid
);
3087 m
->set_snap_seq(op
->snapc
.seq
);
3088 m
->set_snaps(op
->snapc
.snaps
);
3091 m
->set_mtime(op
->mtime
);
3092 m
->set_retry_attempt(op
->attempts
++);
3093 m
->trace
= op
->trace
;
3094 if (!m
->trace
&& cct
->_conf
->osdc_blkin_trace_all
)
3095 m
->trace
.init("objecter op", &trace_endpoint
);
3098 m
->set_priority(op
->priority
);
3100 m
->set_priority(cct
->_conf
->osd_client_op_priority
);
3102 if (op
->reqid
!= osd_reqid_t()) {
3103 m
->set_reqid(op
->reqid
);
3106 logger
->inc(l_osdc_op_send
);
3107 logger
->inc(l_osdc_op_send_bytes
, m
->get_data().length());
3112 void Objecter::_send_op(Op
*op
, MOSDOp
*m
)
3115 // op->session->lock is locked
3118 hobject_t hoid
= op
->target
.get_hobj();
3119 auto p
= op
->session
->backoffs
.find(op
->target
.actual_pgid
);
3120 if (p
!= op
->session
->backoffs
.end()) {
3121 auto q
= p
->second
.lower_bound(hoid
);
3122 if (q
!= p
->second
.begin()) {
3124 if (hoid
>= q
->second
.end
) {
3128 if (q
!= p
->second
.end()) {
3129 ldout(cct
, 20) << __func__
<< " ? " << q
->first
<< " [" << q
->second
.begin
3130 << "," << q
->second
.end
<< ")" << dendl
;
3131 int r
= cmp(hoid
, q
->second
.begin
);
3132 if (r
== 0 || (r
> 0 && hoid
< q
->second
.end
)) {
3133 ldout(cct
, 10) << __func__
<< " backoff " << op
->target
.actual_pgid
3134 << " id " << q
->second
.id
<< " on " << hoid
3135 << ", queuing " << op
<< " tid " << op
->tid
<< dendl
;
3142 assert(op
->tid
> 0);
3143 m
= _prepare_osd_op(op
);
3146 if (op
->target
.actual_pgid
!= m
->get_spg()) {
3147 ldout(cct
, 10) << __func__
<< " " << op
->tid
<< " pgid change from "
3148 << m
->get_spg() << " to " << op
->target
.actual_pgid
3149 << ", updating and reencoding" << dendl
;
3150 m
->set_spg(op
->target
.actual_pgid
);
3151 m
->clear_payload(); // reencode
3154 ldout(cct
, 15) << "_send_op " << op
->tid
<< " to "
3155 << op
->target
.actual_pgid
<< " on osd." << op
->session
->osd
3158 ConnectionRef con
= op
->session
->con
;
3161 // preallocated rx buffer?
3163 ldout(cct
, 20) << " revoking rx buffer for " << op
->tid
<< " on "
3164 << op
->con
<< dendl
;
3165 op
->con
->revoke_rx_buffer(op
->tid
);
3168 op
->ontimeout
== 0 && // only post rx_buffer if no timeout; see #9582
3169 op
->outbl
->length()) {
3170 ldout(cct
, 20) << " posting rx buffer for " << op
->tid
<< " on " << con
3173 op
->con
->post_rx_buffer(op
->tid
, *op
->outbl
);
3176 op
->incarnation
= op
->session
->incarnation
;
3178 m
->set_tid(op
->tid
);
3180 op
->session
->con
->send_message(m
);
3183 int Objecter::calc_op_budget(Op
*op
)
3186 for (vector
<OSDOp
>::iterator i
= op
->ops
.begin();
3189 if (i
->op
.op
& CEPH_OSD_OP_MODE_WR
) {
3190 op_budget
+= i
->indata
.length();
3191 } else if (ceph_osd_op_mode_read(i
->op
.op
)) {
3192 if (ceph_osd_op_type_data(i
->op
.op
)) {
3193 if ((int64_t)i
->op
.extent
.length
> 0)
3194 op_budget
+= (int64_t)i
->op
.extent
.length
;
3195 } else if (ceph_osd_op_type_attr(i
->op
.op
)) {
3196 op_budget
+= i
->op
.xattr
.name_len
+ i
->op
.xattr
.value_len
;
3203 void Objecter::_throttle_op(Op
*op
,
3207 assert(sul
&& sul
.mutex() == &rwlock
);
3208 bool locked_for_write
= sul
.owns_lock();
3211 op_budget
= calc_op_budget(op
);
3212 if (!op_throttle_bytes
.get_or_fail(op_budget
)) { //couldn't take right now
3214 op_throttle_bytes
.get(op_budget
);
3215 if (locked_for_write
)
3220 if (!op_throttle_ops
.get_or_fail(1)) { //couldn't take right now
3222 op_throttle_ops
.get(1);
3223 if (locked_for_write
)
3230 void Objecter::unregister_op(Op
*op
)
3232 OSDSession::unique_lock
sl(op
->session
->lock
);
3233 op
->session
->ops
.erase(op
->tid
);
3235 put_session(op
->session
);
3241 /* This function DOES put the passed message before returning */
3242 void Objecter::handle_osd_op_reply(MOSDOpReply
*m
)
3244 ldout(cct
, 10) << "in handle_osd_op_reply" << dendl
;
3247 ceph_tid_t tid
= m
->get_tid();
3249 shunique_lock
sul(rwlock
, ceph::acquire_shared
);
3250 if (!initialized
.read()) {
3255 ConnectionRef con
= m
->get_connection();
3256 OSDSession
*s
= static_cast<OSDSession
*>(con
->get_priv());
3257 if (!s
|| s
->con
!= con
) {
3258 ldout(cct
, 7) << __func__
<< " no session on con " << con
<< dendl
;
3266 OSDSession::unique_lock
sl(s
->lock
);
3268 map
<ceph_tid_t
, Op
*>::iterator iter
= s
->ops
.find(tid
);
3269 if (iter
== s
->ops
.end()) {
3270 ldout(cct
, 7) << "handle_osd_op_reply " << tid
3271 << (m
->is_ondisk() ? " ondisk" : (m
->is_onnvram() ?
3272 " onnvram" : " ack"))
3273 << " ... stray" << dendl
;
3280 ldout(cct
, 7) << "handle_osd_op_reply " << tid
3281 << (m
->is_ondisk() ? " ondisk" :
3282 (m
->is_onnvram() ? " onnvram" : " ack"))
3283 << " uv " << m
->get_user_version()
3284 << " in " << m
->get_pg()
3285 << " attempt " << m
->get_retry_attempt()
3287 Op
*op
= iter
->second
;
3289 if (retry_writes_after_first_reply
&& op
->attempts
== 1 &&
3290 (op
->target
.flags
& CEPH_OSD_FLAG_WRITE
)) {
3291 ldout(cct
, 7) << "retrying write after first reply: " << tid
<< dendl
;
3293 num_in_flight
.dec();
3295 _session_op_remove(s
, op
);
3299 _op_submit(op
, sul
, NULL
);
3304 if (m
->get_retry_attempt() >= 0) {
3305 if (m
->get_retry_attempt() != (op
->attempts
- 1)) {
3306 ldout(cct
, 7) << " ignoring reply from attempt "
3307 << m
->get_retry_attempt()
3308 << " from " << m
->get_source_inst()
3309 << "; last attempt " << (op
->attempts
- 1) << " sent to "
3310 << op
->session
->con
->get_peer_addr() << dendl
;
3317 // we don't know the request attempt because the server is old, so
3318 // just accept this one. we may do ACK callbacks we shouldn't
3319 // have, but that is better than doing callbacks out of order.
3322 Context
*onfinish
= 0;
3324 int rc
= m
->get_result();
3326 if (m
->is_redirect_reply()) {
3327 ldout(cct
, 5) << " got redirect reply; redirecting" << dendl
;
3329 num_in_flight
.dec();
3330 _session_op_remove(s
, op
);
3334 // FIXME: two redirects could race and reorder
3337 m
->get_redirect().combine_with_locator(op
->target
.target_oloc
,
3338 op
->target
.target_oid
.name
);
3339 op
->target
.flags
|= CEPH_OSD_FLAG_REDIRECTED
;
3340 _op_submit(op
, sul
, NULL
);
3345 if (rc
== -EAGAIN
) {
3346 ldout(cct
, 7) << " got -EAGAIN, resubmitting" << dendl
;
3349 s
->ops
.erase(op
->tid
);
3350 op
->tid
= last_tid
.inc();
3362 *op
->objver
= m
->get_user_version();
3363 if (op
->reply_epoch
)
3364 *op
->reply_epoch
= m
->get_map_epoch();
3365 if (op
->data_offset
)
3366 *op
->data_offset
= m
->get_header().data_off
;
3371 op
->con
->revoke_rx_buffer(op
->tid
);
3372 m
->claim_data(*op
->outbl
);
3376 // per-op result demuxing
3377 vector
<OSDOp
> out_ops
;
3378 m
->claim_ops(out_ops
);
3380 if (out_ops
.size() != op
->ops
.size())
3381 ldout(cct
, 0) << "WARNING: tid " << op
->tid
<< " reply ops " << out_ops
3382 << " != request ops " << op
->ops
3383 << " from " << m
->get_source_inst() << dendl
;
3385 vector
<bufferlist
*>::iterator pb
= op
->out_bl
.begin();
3386 vector
<int*>::iterator pr
= op
->out_rval
.begin();
3387 vector
<Context
*>::iterator ph
= op
->out_handler
.begin();
3388 assert(op
->out_bl
.size() == op
->out_rval
.size());
3389 assert(op
->out_bl
.size() == op
->out_handler
.size());
3390 vector
<OSDOp
>::iterator p
= out_ops
.begin();
3391 for (unsigned i
= 0;
3392 p
!= out_ops
.end() && pb
!= op
->out_bl
.end();
3393 ++i
, ++p
, ++pb
, ++pr
, ++ph
) {
3394 ldout(cct
, 10) << " op " << i
<< " rval " << p
->rval
3395 << " len " << p
->outdata
.length() << dendl
;
3398 // set rval before running handlers so that handlers
3399 // can change it if e.g. decoding fails
3401 **pr
= ceph_to_host_errno(p
->rval
);
3403 ldout(cct
, 10) << " op " << i
<< " handler " << *ph
<< dendl
;
3404 (*ph
)->complete(ceph_to_host_errno(p
->rval
));
3409 // NOTE: we assume that since we only request ONDISK ever we will
3410 // only ever get back one (type of) ack ever.
3413 num_in_flight
.dec();
3414 onfinish
= op
->onfinish
;
3415 op
->onfinish
= NULL
;
3417 logger
->inc(l_osdc_op_reply
);
3419 /* get it before we call _finish_op() */
3420 auto completion_lock
= s
->get_lock(op
->target
.base_oid
);
3422 ldout(cct
, 15) << "handle_osd_op_reply completed tid " << tid
<< dendl
;
3425 ldout(cct
, 5) << num_in_flight
.read() << " in flight" << dendl
;
3427 // serialize completions
3428 if (completion_lock
.mutex()) {
3429 completion_lock
.lock();
3435 onfinish
->complete(rc
);
3437 if (completion_lock
.mutex()) {
3438 completion_lock
.unlock();
3445 void Objecter::handle_osd_backoff(MOSDBackoff
*m
)
3447 ldout(cct
, 10) << __func__
<< " " << *m
<< dendl
;
3448 shunique_lock
sul(rwlock
, ceph::acquire_shared
);
3449 if (!initialized
.read()) {
3454 ConnectionRef con
= m
->get_connection();
3455 OSDSession
*s
= static_cast<OSDSession
*>(con
->get_priv());
3456 if (!s
|| s
->con
!= con
) {
3457 ldout(cct
, 7) << __func__
<< " no session on con " << con
<< dendl
;
3465 s
->put(); // from get_priv() above
3467 OSDSession::unique_lock
sl(s
->lock
);
3470 case CEPH_OSD_BACKOFF_OP_BLOCK
:
3473 OSDBackoff
& b
= s
->backoffs
[m
->pgid
][m
->begin
];
3474 s
->backoffs_by_id
.insert(make_pair(m
->id
, &b
));
3480 // ack with original backoff's epoch so that the osd can discard this if
3481 // there was a pg split.
3482 Message
*r
= new MOSDBackoff(m
->pgid
,
3484 CEPH_OSD_BACKOFF_OP_ACK_BLOCK
,
3485 m
->id
, m
->begin
, m
->end
);
3486 // this priority must match the MOSDOps from _prepare_osd_op
3487 r
->set_priority(cct
->_conf
->osd_client_op_priority
);
3488 con
->send_message(r
);
3492 case CEPH_OSD_BACKOFF_OP_UNBLOCK
:
3494 auto p
= s
->backoffs_by_id
.find(m
->id
);
3495 if (p
!= s
->backoffs_by_id
.end()) {
3496 OSDBackoff
*b
= p
->second
;
3497 if (b
->begin
!= m
->begin
&&
3499 lderr(cct
) << __func__
<< " got " << m
->pgid
<< " id " << m
->id
3501 << m
->begin
<< "," << m
->end
<< ") but backoff is ["
3502 << b
->begin
<< "," << b
->end
<< ")" << dendl
;
3503 // hrmpf, unblock it anyway.
3505 ldout(cct
, 10) << __func__
<< " unblock backoff " << b
->pgid
3507 << " [" << b
->begin
<< "," << b
->end
3509 auto spgp
= s
->backoffs
.find(b
->pgid
);
3510 assert(spgp
!= s
->backoffs
.end());
3511 spgp
->second
.erase(b
->begin
);
3512 if (spgp
->second
.empty()) {
3513 s
->backoffs
.erase(spgp
);
3515 s
->backoffs_by_id
.erase(p
);
3517 // check for any ops to resend
3518 for (auto& q
: s
->ops
) {
3519 if (q
.second
->target
.actual_pgid
== m
->pgid
) {
3520 int r
= q
.second
->target
.contained_by(m
->begin
, m
->end
);
3521 ldout(cct
, 20) << __func__
<< " contained_by " << r
<< " on "
3522 << q
.second
->target
.get_hobj() << dendl
;
3529 lderr(cct
) << __func__
<< " " << m
->pgid
<< " id " << m
->id
3531 << m
->begin
<< "," << m
->end
<< ") but backoff dne" << dendl
;
3537 ldout(cct
, 10) << __func__
<< " unrecognized op " << (int)m
->op
<< dendl
;
3547 uint32_t Objecter::list_nobjects_seek(NListContext
*list_context
,
3550 shared_lock
rl(rwlock
);
3551 list_context
->pos
= hobject_t(object_t(), string(), CEPH_NOSNAP
,
3552 pos
, list_context
->pool_id
, string());
3553 ldout(cct
, 10) << __func__
<< list_context
3554 << " pos " << pos
<< " -> " << list_context
->pos
<< dendl
;
3555 pg_t actual
= osdmap
->raw_pg_to_pg(pg_t(pos
, list_context
->pool_id
));
3556 list_context
->current_pg
= actual
.ps();
3557 list_context
->at_end_of_pool
= false;
3561 uint32_t Objecter::list_nobjects_seek(NListContext
*list_context
,
3562 const hobject_t
& cursor
)
3564 shared_lock
rl(rwlock
);
3565 ldout(cct
, 10) << "list_nobjects_seek " << list_context
<< dendl
;
3566 list_context
->pos
= cursor
;
3567 list_context
->at_end_of_pool
= false;
3568 pg_t actual
= osdmap
->raw_pg_to_pg(pg_t(cursor
.get_hash(), list_context
->pool_id
));
3569 list_context
->current_pg
= actual
.ps();
3570 list_context
->sort_bitwise
= true;
3571 return list_context
->current_pg
;
3574 void Objecter::list_nobjects_get_cursor(NListContext
*list_context
,
3577 shared_lock
rl(rwlock
);
3578 if (list_context
->list
.empty()) {
3579 *cursor
= list_context
->pos
;
3581 const librados::ListObjectImpl
& entry
= list_context
->list
.front();
3582 const string
*key
= (entry
.locator
.empty() ? &entry
.oid
: &entry
.locator
);
3583 uint32_t h
= osdmap
->get_pg_pool(list_context
->pool_id
)->hash_key(*key
, entry
.nspace
);
3584 *cursor
= hobject_t(entry
.oid
, entry
.locator
, list_context
->pool_snap_seq
, h
, list_context
->pool_id
, entry
.nspace
);
3588 void Objecter::list_nobjects(NListContext
*list_context
, Context
*onfinish
)
3590 ldout(cct
, 10) << __func__
<< " pool_id " << list_context
->pool_id
3591 << " pool_snap_seq " << list_context
->pool_snap_seq
3592 << " max_entries " << list_context
->max_entries
3593 << " list_context " << list_context
3594 << " onfinish " << onfinish
3595 << " current_pg " << list_context
->current_pg
3596 << " pos " << list_context
->pos
<< dendl
;
3598 shared_lock
rl(rwlock
);
3599 const pg_pool_t
*pool
= osdmap
->get_pg_pool(list_context
->pool_id
);
3600 if (!pool
) { // pool is gone
3602 put_nlist_context_budget(list_context
);
3603 onfinish
->complete(-ENOENT
);
3606 int pg_num
= pool
->get_pg_num();
3607 bool sort_bitwise
= osdmap
->test_flag(CEPH_OSDMAP_SORTBITWISE
);
3609 if (list_context
->pos
.is_min()) {
3610 list_context
->starting_pg_num
= 0;
3611 list_context
->sort_bitwise
= sort_bitwise
;
3612 list_context
->starting_pg_num
= pg_num
;
3614 if (list_context
->sort_bitwise
!= sort_bitwise
) {
3615 list_context
->pos
= hobject_t(
3616 object_t(), string(), CEPH_NOSNAP
,
3617 list_context
->current_pg
, list_context
->pool_id
, string());
3618 list_context
->sort_bitwise
= sort_bitwise
;
3619 ldout(cct
, 10) << " hobject sort order changed, restarting this pg at "
3620 << list_context
->pos
<< dendl
;
3622 if (list_context
->starting_pg_num
!= pg_num
) {
3623 if (!sort_bitwise
) {
3624 // start reading from the beginning; the pgs have changed
3625 ldout(cct
, 10) << " pg_num changed; restarting with " << pg_num
<< dendl
;
3626 list_context
->pos
= collection_list_handle_t();
3628 list_context
->starting_pg_num
= pg_num
;
3631 if (list_context
->pos
.is_max()) {
3632 ldout(cct
, 20) << __func__
<< " end of pool, list "
3633 << list_context
->list
<< dendl
;
3634 if (list_context
->list
.empty()) {
3635 list_context
->at_end_of_pool
= true;
3637 // release the listing context's budget once all
3638 // OPs (in the session) are finished
3639 put_nlist_context_budget(list_context
);
3640 onfinish
->complete(0);
3645 op
.pg_nls(list_context
->max_entries
, list_context
->filter
,
3646 list_context
->pos
, osdmap
->get_epoch());
3647 list_context
->bl
.clear();
3648 C_NList
*onack
= new C_NList(list_context
, onfinish
, this);
3649 object_locator_t
oloc(list_context
->pool_id
, list_context
->nspace
);
3651 // note current_pg in case we don't have (or lose) SORTBITWISE
3652 list_context
->current_pg
= pool
->raw_hash_to_pg(list_context
->pos
.get_hash());
3655 pg_read(list_context
->current_pg
, oloc
, op
,
3656 &list_context
->bl
, 0, onack
, &onack
->epoch
,
3657 &list_context
->ctx_budget
);
3660 void Objecter::_nlist_reply(NListContext
*list_context
, int r
,
3661 Context
*final_finish
, epoch_t reply_epoch
)
3663 ldout(cct
, 10) << __func__
<< " " << list_context
<< dendl
;
3665 bufferlist::iterator iter
= list_context
->bl
.begin();
3666 pg_nls_response_t response
;
3667 bufferlist extra_info
;
3668 ::decode(response
, iter
);
3670 ::decode(extra_info
, iter
);
3673 // if the osd returns 1 (newer code), or handle MAX, it means we
3674 // hit the end of the pg.
3675 if ((response
.handle
.is_max() || r
== 1) &&
3676 !list_context
->sort_bitwise
) {
3677 // legacy OSD and !sortbitwise, figure out the next PG on our own
3678 ++list_context
->current_pg
;
3679 if (list_context
->current_pg
== list_context
->starting_pg_num
) {
3681 list_context
->pos
= hobject_t::get_max();
3684 list_context
->pos
= hobject_t(object_t(), string(), CEPH_NOSNAP
,
3685 list_context
->current_pg
,
3686 list_context
->pool_id
, string());
3689 list_context
->pos
= response
.handle
;
3692 int response_size
= response
.entries
.size();
3693 ldout(cct
, 20) << " response.entries.size " << response_size
3694 << ", response.entries " << response
.entries
3695 << ", handle " << response
.handle
3696 << ", tentative new pos " << list_context
->pos
<< dendl
;
3697 list_context
->extra_info
.append(extra_info
);
3698 if (response_size
) {
3699 list_context
->list
.splice(list_context
->list
.end(), response
.entries
);
3702 if (list_context
->list
.size() >= list_context
->max_entries
) {
3703 ldout(cct
, 20) << " hit max, returning results so far, "
3704 << list_context
->list
<< dendl
;
3705 // release the listing context's budget once all
3706 // OPs (in the session) are finished
3707 put_nlist_context_budget(list_context
);
3708 final_finish
->complete(0);
3713 list_nobjects(list_context
, final_finish
);
3716 void Objecter::put_nlist_context_budget(NListContext
*list_context
)
3718 if (list_context
->ctx_budget
>= 0) {
3719 ldout(cct
, 10) << " release listing context's budget " <<
3720 list_context
->ctx_budget
<< dendl
;
3721 put_op_budget_bytes(list_context
->ctx_budget
);
3722 list_context
->ctx_budget
= -1;
3728 int Objecter::create_pool_snap(int64_t pool
, string
& snap_name
,
3731 unique_lock
wl(rwlock
);
3732 ldout(cct
, 10) << "create_pool_snap; pool: " << pool
<< "; snap: "
3733 << snap_name
<< dendl
;
3735 const pg_pool_t
*p
= osdmap
->get_pg_pool(pool
);
3738 if (p
->snap_exists(snap_name
.c_str()))
3741 PoolOp
*op
= new PoolOp
;
3744 op
->tid
= last_tid
.inc();
3746 op
->name
= snap_name
;
3747 op
->onfinish
= onfinish
;
3748 op
->pool_op
= POOL_OP_CREATE_SNAP
;
3749 pool_ops
[op
->tid
] = op
;
3756 struct C_SelfmanagedSnap
: public Context
{
3760 C_SelfmanagedSnap(snapid_t
*ps
, Context
*f
) : psnapid(ps
), fin(f
) {}
3761 void finish(int r
) override
{
3763 bufferlist::iterator p
= bl
.begin();
3764 ::decode(*psnapid
, p
);
3770 int Objecter::allocate_selfmanaged_snap(int64_t pool
, snapid_t
*psnapid
,
3773 unique_lock
wl(rwlock
);
3774 ldout(cct
, 10) << "allocate_selfmanaged_snap; pool: " << pool
<< dendl
;
3775 PoolOp
*op
= new PoolOp
;
3776 if (!op
) return -ENOMEM
;
3777 op
->tid
= last_tid
.inc();
3779 C_SelfmanagedSnap
*fin
= new C_SelfmanagedSnap(psnapid
, onfinish
);
3782 op
->pool_op
= POOL_OP_CREATE_UNMANAGED_SNAP
;
3783 pool_ops
[op
->tid
] = op
;
3789 int Objecter::delete_pool_snap(int64_t pool
, string
& snap_name
,
3792 unique_lock
wl(rwlock
);
3793 ldout(cct
, 10) << "delete_pool_snap; pool: " << pool
<< "; snap: "
3794 << snap_name
<< dendl
;
3796 const pg_pool_t
*p
= osdmap
->get_pg_pool(pool
);
3799 if (!p
->snap_exists(snap_name
.c_str()))
3802 PoolOp
*op
= new PoolOp
;
3805 op
->tid
= last_tid
.inc();
3807 op
->name
= snap_name
;
3808 op
->onfinish
= onfinish
;
3809 op
->pool_op
= POOL_OP_DELETE_SNAP
;
3810 pool_ops
[op
->tid
] = op
;
3817 int Objecter::delete_selfmanaged_snap(int64_t pool
, snapid_t snap
,
3820 unique_lock
wl(rwlock
);
3821 ldout(cct
, 10) << "delete_selfmanaged_snap; pool: " << pool
<< "; snap: "
3823 PoolOp
*op
= new PoolOp
;
3824 if (!op
) return -ENOMEM
;
3825 op
->tid
= last_tid
.inc();
3827 op
->onfinish
= onfinish
;
3828 op
->pool_op
= POOL_OP_DELETE_UNMANAGED_SNAP
;
3830 pool_ops
[op
->tid
] = op
;
3837 int Objecter::create_pool(string
& name
, Context
*onfinish
, uint64_t auid
,
3840 unique_lock
wl(rwlock
);
3841 ldout(cct
, 10) << "create_pool name=" << name
<< dendl
;
3843 if (osdmap
->lookup_pg_pool_name(name
) >= 0)
3846 PoolOp
*op
= new PoolOp
;
3849 op
->tid
= last_tid
.inc();
3852 op
->onfinish
= onfinish
;
3853 op
->pool_op
= POOL_OP_CREATE
;
3854 pool_ops
[op
->tid
] = op
;
3856 op
->crush_rule
= crush_rule
;
3863 int Objecter::delete_pool(int64_t pool
, Context
*onfinish
)
3865 unique_lock
wl(rwlock
);
3866 ldout(cct
, 10) << "delete_pool " << pool
<< dendl
;
3868 if (!osdmap
->have_pg_pool(pool
))
3871 _do_delete_pool(pool
, onfinish
);
3875 int Objecter::delete_pool(const string
&pool_name
, Context
*onfinish
)
3877 unique_lock
wl(rwlock
);
3878 ldout(cct
, 10) << "delete_pool " << pool_name
<< dendl
;
3880 int64_t pool
= osdmap
->lookup_pg_pool_name(pool_name
);
3884 _do_delete_pool(pool
, onfinish
);
3888 void Objecter::_do_delete_pool(int64_t pool
, Context
*onfinish
)
3890 PoolOp
*op
= new PoolOp
;
3891 op
->tid
= last_tid
.inc();
3893 op
->name
= "delete";
3894 op
->onfinish
= onfinish
;
3895 op
->pool_op
= POOL_OP_DELETE
;
3896 pool_ops
[op
->tid
] = op
;
3901 * change the auid owner of a pool by contacting the monitor.
3902 * This requires the current connection to have write permissions
3903 * on both the pool's current auid and the new (parameter) auid.
3904 * Uses the standard Context callback when done.
3906 int Objecter::change_pool_auid(int64_t pool
, Context
*onfinish
, uint64_t auid
)
3908 unique_lock
wl(rwlock
);
3909 ldout(cct
, 10) << "change_pool_auid " << pool
<< " to " << auid
<< dendl
;
3910 PoolOp
*op
= new PoolOp
;
3911 if (!op
) return -ENOMEM
;
3912 op
->tid
= last_tid
.inc();
3914 op
->name
= "change_pool_auid";
3915 op
->onfinish
= onfinish
;
3916 op
->pool_op
= POOL_OP_AUID_CHANGE
;
3918 pool_ops
[op
->tid
] = op
;
3920 logger
->set(l_osdc_poolop_active
, pool_ops
.size());
3926 void Objecter::pool_op_submit(PoolOp
*op
)
3929 if (mon_timeout
> timespan(0)) {
3930 op
->ontimeout
= timer
.add_event(mon_timeout
,
3932 pool_op_cancel(op
->tid
, -ETIMEDOUT
); });
3934 _pool_op_submit(op
);
3937 void Objecter::_pool_op_submit(PoolOp
*op
)
3939 // rwlock is locked unique
3941 ldout(cct
, 10) << "pool_op_submit " << op
->tid
<< dendl
;
3942 MPoolOp
*m
= new MPoolOp(monc
->get_fsid(), op
->tid
, op
->pool
,
3943 op
->name
, op
->pool_op
,
3944 op
->auid
, last_seen_osdmap_version
);
3945 if (op
->snapid
) m
->snapid
= op
->snapid
;
3946 if (op
->crush_rule
) m
->crush_rule
= op
->crush_rule
;
3947 monc
->send_mon_message(m
);
3948 op
->last_submit
= ceph::mono_clock::now();
3950 logger
->inc(l_osdc_poolop_send
);
3954 * Handle a reply to a PoolOp message. Check that we sent the message
3955 * and give the caller responsibility for the returned bufferlist.
3956 * Then either call the finisher or stash the PoolOp, depending on if we
3957 * have a new enough map.
3958 * Lastly, clean up the message and PoolOp.
3960 void Objecter::handle_pool_op_reply(MPoolOpReply
*m
)
3963 shunique_lock
sul(rwlock
, acquire_shared
);
3964 if (!initialized
.read()) {
3970 ldout(cct
, 10) << "handle_pool_op_reply " << *m
<< dendl
;
3971 ceph_tid_t tid
= m
->get_tid();
3972 map
<ceph_tid_t
, PoolOp
*>::iterator iter
= pool_ops
.find(tid
);
3973 if (iter
!= pool_ops
.end()) {
3974 PoolOp
*op
= iter
->second
;
3975 ldout(cct
, 10) << "have request " << tid
<< " at " << op
<< " Op: "
3976 << ceph_pool_op_name(op
->pool_op
) << dendl
;
3978 op
->blp
->claim(m
->response_data
);
3979 if (m
->version
> last_seen_osdmap_version
)
3980 last_seen_osdmap_version
= m
->version
;
3981 if (osdmap
->get_epoch() < m
->epoch
) {
3984 // recheck op existence since we have let go of rwlock
3985 // (for promotion) above.
3986 iter
= pool_ops
.find(tid
);
3987 if (iter
== pool_ops
.end())
3988 goto done
; // op is gone.
3989 if (osdmap
->get_epoch() < m
->epoch
) {
3990 ldout(cct
, 20) << "waiting for client to reach epoch " << m
->epoch
3991 << " before calling back" << dendl
;
3992 _wait_for_new_map(op
->onfinish
, m
->epoch
, m
->replyCode
);
3994 // map epoch changed, probably because a MOSDMap message
3995 // sneaked in. Do caller-specified callback now or else
3996 // we lose it forever.
3997 assert(op
->onfinish
);
3998 op
->onfinish
->complete(m
->replyCode
);
4001 assert(op
->onfinish
);
4002 op
->onfinish
->complete(m
->replyCode
);
4004 op
->onfinish
= NULL
;
4005 if (!sul
.owns_lock()) {
4009 iter
= pool_ops
.find(tid
);
4010 if (iter
!= pool_ops
.end()) {
4011 _finish_pool_op(op
, 0);
4014 ldout(cct
, 10) << "unknown request " << tid
<< dendl
;
4018 // Not strictly necessary, since we'll release it on return.
4021 ldout(cct
, 10) << "done" << dendl
;
4025 int Objecter::pool_op_cancel(ceph_tid_t tid
, int r
)
4027 assert(initialized
.read());
4029 unique_lock
wl(rwlock
);
4031 map
<ceph_tid_t
, PoolOp
*>::iterator it
= pool_ops
.find(tid
);
4032 if (it
== pool_ops
.end()) {
4033 ldout(cct
, 10) << __func__
<< " tid " << tid
<< " dne" << dendl
;
4037 ldout(cct
, 10) << __func__
<< " tid " << tid
<< dendl
;
4039 PoolOp
*op
= it
->second
;
4041 op
->onfinish
->complete(r
);
4043 _finish_pool_op(op
, r
);
4047 void Objecter::_finish_pool_op(PoolOp
*op
, int r
)
4049 // rwlock is locked unique
4050 pool_ops
.erase(op
->tid
);
4051 logger
->set(l_osdc_poolop_active
, pool_ops
.size());
4053 if (op
->ontimeout
&& r
!= -ETIMEDOUT
) {
4054 timer
.cancel_event(op
->ontimeout
);
4062 void Objecter::get_pool_stats(list
<string
>& pools
,
4063 map
<string
,pool_stat_t
> *result
,
4066 ldout(cct
, 10) << "get_pool_stats " << pools
<< dendl
;
4068 PoolStatOp
*op
= new PoolStatOp
;
4069 op
->tid
= last_tid
.inc();
4071 op
->pool_stats
= result
;
4072 op
->onfinish
= onfinish
;
4073 if (mon_timeout
> timespan(0)) {
4074 op
->ontimeout
= timer
.add_event(mon_timeout
,
4076 pool_stat_op_cancel(op
->tid
,
4082 unique_lock
wl(rwlock
);
4084 poolstat_ops
[op
->tid
] = op
;
4086 logger
->set(l_osdc_poolstat_active
, poolstat_ops
.size());
4088 _poolstat_submit(op
);
4091 void Objecter::_poolstat_submit(PoolStatOp
*op
)
4093 ldout(cct
, 10) << "_poolstat_submit " << op
->tid
<< dendl
;
4094 monc
->send_mon_message(new MGetPoolStats(monc
->get_fsid(), op
->tid
,
4096 last_seen_pgmap_version
));
4097 op
->last_submit
= ceph::mono_clock::now();
4099 logger
->inc(l_osdc_poolstat_send
);
4102 void Objecter::handle_get_pool_stats_reply(MGetPoolStatsReply
*m
)
4104 ldout(cct
, 10) << "handle_get_pool_stats_reply " << *m
<< dendl
;
4105 ceph_tid_t tid
= m
->get_tid();
4107 unique_lock
wl(rwlock
);
4108 if (!initialized
.read()) {
4113 map
<ceph_tid_t
, PoolStatOp
*>::iterator iter
= poolstat_ops
.find(tid
);
4114 if (iter
!= poolstat_ops
.end()) {
4115 PoolStatOp
*op
= poolstat_ops
[tid
];
4116 ldout(cct
, 10) << "have request " << tid
<< " at " << op
<< dendl
;
4117 *op
->pool_stats
= m
->pool_stats
;
4118 if (m
->version
> last_seen_pgmap_version
) {
4119 last_seen_pgmap_version
= m
->version
;
4121 op
->onfinish
->complete(0);
4122 _finish_pool_stat_op(op
, 0);
4124 ldout(cct
, 10) << "unknown request " << tid
<< dendl
;
4126 ldout(cct
, 10) << "done" << dendl
;
4130 int Objecter::pool_stat_op_cancel(ceph_tid_t tid
, int r
)
4132 assert(initialized
.read());
4134 unique_lock
wl(rwlock
);
4136 map
<ceph_tid_t
, PoolStatOp
*>::iterator it
= poolstat_ops
.find(tid
);
4137 if (it
== poolstat_ops
.end()) {
4138 ldout(cct
, 10) << __func__
<< " tid " << tid
<< " dne" << dendl
;
4142 ldout(cct
, 10) << __func__
<< " tid " << tid
<< dendl
;
4144 PoolStatOp
*op
= it
->second
;
4146 op
->onfinish
->complete(r
);
4147 _finish_pool_stat_op(op
, r
);
4151 void Objecter::_finish_pool_stat_op(PoolStatOp
*op
, int r
)
4153 // rwlock is locked unique
4155 poolstat_ops
.erase(op
->tid
);
4156 logger
->set(l_osdc_poolstat_active
, poolstat_ops
.size());
4158 if (op
->ontimeout
&& r
!= -ETIMEDOUT
)
4159 timer
.cancel_event(op
->ontimeout
);
4164 void Objecter::get_fs_stats(ceph_statfs
& result
, Context
*onfinish
)
4166 ldout(cct
, 10) << "get_fs_stats" << dendl
;
4167 unique_lock
l(rwlock
);
4169 StatfsOp
*op
= new StatfsOp
;
4170 op
->tid
= last_tid
.inc();
4171 op
->stats
= &result
;
4172 op
->onfinish
= onfinish
;
4173 if (mon_timeout
> timespan(0)) {
4174 op
->ontimeout
= timer
.add_event(mon_timeout
,
4176 statfs_op_cancel(op
->tid
,
4181 statfs_ops
[op
->tid
] = op
;
4183 logger
->set(l_osdc_statfs_active
, statfs_ops
.size());
4185 _fs_stats_submit(op
);
4188 void Objecter::_fs_stats_submit(StatfsOp
*op
)
4190 // rwlock is locked unique
4192 ldout(cct
, 10) << "fs_stats_submit" << op
->tid
<< dendl
;
4193 monc
->send_mon_message(new MStatfs(monc
->get_fsid(), op
->tid
,
4194 last_seen_pgmap_version
));
4195 op
->last_submit
= ceph::mono_clock::now();
4197 logger
->inc(l_osdc_statfs_send
);
4200 void Objecter::handle_fs_stats_reply(MStatfsReply
*m
)
4202 unique_lock
wl(rwlock
);
4203 if (!initialized
.read()) {
4208 ldout(cct
, 10) << "handle_fs_stats_reply " << *m
<< dendl
;
4209 ceph_tid_t tid
= m
->get_tid();
4211 if (statfs_ops
.count(tid
)) {
4212 StatfsOp
*op
= statfs_ops
[tid
];
4213 ldout(cct
, 10) << "have request " << tid
<< " at " << op
<< dendl
;
4214 *(op
->stats
) = m
->h
.st
;
4215 if (m
->h
.version
> last_seen_pgmap_version
)
4216 last_seen_pgmap_version
= m
->h
.version
;
4217 op
->onfinish
->complete(0);
4218 _finish_statfs_op(op
, 0);
4220 ldout(cct
, 10) << "unknown request " << tid
<< dendl
;
4223 ldout(cct
, 10) << "done" << dendl
;
4226 int Objecter::statfs_op_cancel(ceph_tid_t tid
, int r
)
4228 assert(initialized
.read());
4230 unique_lock
wl(rwlock
);
4232 map
<ceph_tid_t
, StatfsOp
*>::iterator it
= statfs_ops
.find(tid
);
4233 if (it
== statfs_ops
.end()) {
4234 ldout(cct
, 10) << __func__
<< " tid " << tid
<< " dne" << dendl
;
4238 ldout(cct
, 10) << __func__
<< " tid " << tid
<< dendl
;
4240 StatfsOp
*op
= it
->second
;
4242 op
->onfinish
->complete(r
);
4243 _finish_statfs_op(op
, r
);
4247 void Objecter::_finish_statfs_op(StatfsOp
*op
, int r
)
4249 // rwlock is locked unique
4251 statfs_ops
.erase(op
->tid
);
4252 logger
->set(l_osdc_statfs_active
, statfs_ops
.size());
4254 if (op
->ontimeout
&& r
!= -ETIMEDOUT
)
4255 timer
.cancel_event(op
->ontimeout
);
4262 void Objecter::_sg_read_finish(vector
<ObjectExtent
>& extents
,
4263 vector
<bufferlist
>& resultbl
,
4264 bufferlist
*bl
, Context
*onfinish
)
4267 ldout(cct
, 15) << "_sg_read_finish" << dendl
;
4269 if (extents
.size() > 1) {
4270 Striper::StripedReadResult r
;
4271 vector
<bufferlist
>::iterator bit
= resultbl
.begin();
4272 for (vector
<ObjectExtent
>::iterator eit
= extents
.begin();
4273 eit
!= extents
.end();
4275 r
.add_partial_result(cct
, *bit
, eit
->buffer_extents
);
4278 r
.assemble_result(cct
, *bl
, false);
4280 ldout(cct
, 15) << " only one frag" << dendl
;
4281 bl
->claim(resultbl
[0]);
4285 uint64_t bytes_read
= bl
->length();
4286 ldout(cct
, 7) << "_sg_read_finish " << bytes_read
<< " bytes" << dendl
;
4289 onfinish
->complete(bytes_read
);// > 0 ? bytes_read:m->get_result());
4294 void Objecter::ms_handle_connect(Connection
*con
)
4296 ldout(cct
, 10) << "ms_handle_connect " << con
<< dendl
;
4297 if (!initialized
.read())
4300 if (con
->get_peer_type() == CEPH_ENTITY_TYPE_MON
)
4304 bool Objecter::ms_handle_reset(Connection
*con
)
4306 if (!initialized
.read())
4308 if (con
->get_peer_type() == CEPH_ENTITY_TYPE_OSD
) {
4309 OSDSession
*session
= static_cast<OSDSession
*>(con
->get_priv());
4311 ldout(cct
, 1) << "ms_handle_reset " << con
<< " session " << session
4312 << " osd." << session
->osd
<< dendl
;
4313 unique_lock
wl(rwlock
);
4314 if (!initialized
.read()) {
4318 map
<uint64_t, LingerOp
*> lresend
;
4319 OSDSession::unique_lock
sl(session
->lock
);
4320 _reopen_session(session
);
4321 _kick_requests(session
, lresend
);
4323 _linger_ops_resend(lresend
, wl
);
4325 maybe_request_map();
4333 void Objecter::ms_handle_remote_reset(Connection
*con
)
4336 * treat these the same.
4338 ms_handle_reset(con
);
4341 bool Objecter::ms_handle_refused(Connection
*con
)
4344 if (osdmap
&& (con
->get_peer_type() == CEPH_ENTITY_TYPE_OSD
)) {
4345 int osd
= osdmap
->identify_osd(con
->get_peer_addr());
4347 ldout(cct
, 1) << "ms_handle_refused on osd." << osd
<< dendl
;
4353 bool Objecter::ms_get_authorizer(int dest_type
,
4354 AuthAuthorizer
**authorizer
,
4357 if (!initialized
.read())
4359 if (dest_type
== CEPH_ENTITY_TYPE_MON
)
4361 *authorizer
= monc
->build_authorizer(dest_type
);
4362 return *authorizer
!= NULL
;
4365 void Objecter::op_target_t::dump(Formatter
*f
) const
4367 f
->dump_stream("pg") << pgid
;
4368 f
->dump_int("osd", osd
);
4369 f
->dump_stream("object_id") << base_oid
;
4370 f
->dump_stream("object_locator") << base_oloc
;
4371 f
->dump_stream("target_object_id") << target_oid
;
4372 f
->dump_stream("target_object_locator") << target_oloc
;
4373 f
->dump_int("paused", (int)paused
);
4374 f
->dump_int("used_replica", (int)used_replica
);
4375 f
->dump_int("precalc_pgid", (int)precalc_pgid
);
4378 void Objecter::_dump_active(OSDSession
*s
)
4380 for (map
<ceph_tid_t
,Op
*>::iterator p
= s
->ops
.begin();
4384 ldout(cct
, 20) << op
->tid
<< "\t" << op
->target
.pgid
4385 << "\tosd." << (op
->session
? op
->session
->osd
: -1)
4386 << "\t" << op
->target
.base_oid
4387 << "\t" << op
->ops
<< dendl
;
4391 void Objecter::_dump_active()
4393 ldout(cct
, 20) << "dump_active .. " << num_homeless_ops
.read() << " homeless"
4395 for (map
<int, OSDSession
*>::iterator siter
= osd_sessions
.begin();
4396 siter
!= osd_sessions
.end(); ++siter
) {
4397 OSDSession
*s
= siter
->second
;
4398 OSDSession::shared_lock
sl(s
->lock
);
4402 _dump_active(homeless_session
);
4405 void Objecter::dump_active()
4407 shared_lock
rl(rwlock
);
4412 void Objecter::dump_requests(Formatter
*fmt
)
4414 // Read-lock on Objecter held here
4415 fmt
->open_object_section("requests");
4417 dump_linger_ops(fmt
);
4419 dump_pool_stat_ops(fmt
);
4420 dump_statfs_ops(fmt
);
4421 dump_command_ops(fmt
);
4422 fmt
->close_section(); // requests object
4425 void Objecter::_dump_ops(const OSDSession
*s
, Formatter
*fmt
)
4427 for (map
<ceph_tid_t
,Op
*>::const_iterator p
= s
->ops
.begin();
4431 fmt
->open_object_section("op");
4432 fmt
->dump_unsigned("tid", op
->tid
);
4433 op
->target
.dump(fmt
);
4434 fmt
->dump_stream("last_sent") << op
->stamp
;
4435 fmt
->dump_int("attempts", op
->attempts
);
4436 fmt
->dump_stream("snapid") << op
->snapid
;
4437 fmt
->dump_stream("snap_context") << op
->snapc
;
4438 fmt
->dump_stream("mtime") << op
->mtime
;
4440 fmt
->open_array_section("osd_ops");
4441 for (vector
<OSDOp
>::const_iterator it
= op
->ops
.begin();
4442 it
!= op
->ops
.end();
4444 fmt
->dump_stream("osd_op") << *it
;
4446 fmt
->close_section(); // osd_ops array
4448 fmt
->close_section(); // op object
4452 void Objecter::dump_ops(Formatter
*fmt
)
4454 // Read-lock on Objecter held
4455 fmt
->open_array_section("ops");
4456 for (map
<int, OSDSession
*>::const_iterator siter
= osd_sessions
.begin();
4457 siter
!= osd_sessions
.end(); ++siter
) {
4458 OSDSession
*s
= siter
->second
;
4459 OSDSession::shared_lock
sl(s
->lock
);
4463 _dump_ops(homeless_session
, fmt
);
4464 fmt
->close_section(); // ops array
4467 void Objecter::_dump_linger_ops(const OSDSession
*s
, Formatter
*fmt
)
4469 for (map
<uint64_t, LingerOp
*>::const_iterator p
= s
->linger_ops
.begin();
4470 p
!= s
->linger_ops
.end();
4472 LingerOp
*op
= p
->second
;
4473 fmt
->open_object_section("linger_op");
4474 fmt
->dump_unsigned("linger_id", op
->linger_id
);
4475 op
->target
.dump(fmt
);
4476 fmt
->dump_stream("snapid") << op
->snap
;
4477 fmt
->dump_stream("registered") << op
->registered
;
4478 fmt
->close_section(); // linger_op object
4482 void Objecter::dump_linger_ops(Formatter
*fmt
)
4484 // We have a read-lock on the objecter
4485 fmt
->open_array_section("linger_ops");
4486 for (map
<int, OSDSession
*>::const_iterator siter
= osd_sessions
.begin();
4487 siter
!= osd_sessions
.end(); ++siter
) {
4488 OSDSession
*s
= siter
->second
;
4489 OSDSession::shared_lock
sl(s
->lock
);
4490 _dump_linger_ops(s
, fmt
);
4493 _dump_linger_ops(homeless_session
, fmt
);
4494 fmt
->close_section(); // linger_ops array
4497 void Objecter::_dump_command_ops(const OSDSession
*s
, Formatter
*fmt
)
4499 for (map
<uint64_t, CommandOp
*>::const_iterator p
= s
->command_ops
.begin();
4500 p
!= s
->command_ops
.end();
4502 CommandOp
*op
= p
->second
;
4503 fmt
->open_object_section("command_op");
4504 fmt
->dump_unsigned("command_id", op
->tid
);
4505 fmt
->dump_int("osd", op
->session
? op
->session
->osd
: -1);
4506 fmt
->open_array_section("command");
4507 for (vector
<string
>::const_iterator q
= op
->cmd
.begin();
4508 q
!= op
->cmd
.end(); ++q
)
4509 fmt
->dump_string("word", *q
);
4510 fmt
->close_section();
4511 if (op
->target_osd
>= 0)
4512 fmt
->dump_int("target_osd", op
->target_osd
);
4514 fmt
->dump_stream("target_pg") << op
->target_pg
;
4515 fmt
->close_section(); // command_op object
4519 void Objecter::dump_command_ops(Formatter
*fmt
)
4521 // We have a read-lock on the Objecter here
4522 fmt
->open_array_section("command_ops");
4523 for (map
<int, OSDSession
*>::const_iterator siter
= osd_sessions
.begin();
4524 siter
!= osd_sessions
.end(); ++siter
) {
4525 OSDSession
*s
= siter
->second
;
4526 OSDSession::shared_lock
sl(s
->lock
);
4527 _dump_command_ops(s
, fmt
);
4530 _dump_command_ops(homeless_session
, fmt
);
4531 fmt
->close_section(); // command_ops array
4534 void Objecter::dump_pool_ops(Formatter
*fmt
) const
4536 fmt
->open_array_section("pool_ops");
4537 for (map
<ceph_tid_t
, PoolOp
*>::const_iterator p
= pool_ops
.begin();
4538 p
!= pool_ops
.end();
4540 PoolOp
*op
= p
->second
;
4541 fmt
->open_object_section("pool_op");
4542 fmt
->dump_unsigned("tid", op
->tid
);
4543 fmt
->dump_int("pool", op
->pool
);
4544 fmt
->dump_string("name", op
->name
);
4545 fmt
->dump_int("operation_type", op
->pool_op
);
4546 fmt
->dump_unsigned("auid", op
->auid
);
4547 fmt
->dump_unsigned("crush_rule", op
->crush_rule
);
4548 fmt
->dump_stream("snapid") << op
->snapid
;
4549 fmt
->dump_stream("last_sent") << op
->last_submit
;
4550 fmt
->close_section(); // pool_op object
4552 fmt
->close_section(); // pool_ops array
4555 void Objecter::dump_pool_stat_ops(Formatter
*fmt
) const
4557 fmt
->open_array_section("pool_stat_ops");
4558 for (map
<ceph_tid_t
, PoolStatOp
*>::const_iterator p
= poolstat_ops
.begin();
4559 p
!= poolstat_ops
.end();
4561 PoolStatOp
*op
= p
->second
;
4562 fmt
->open_object_section("pool_stat_op");
4563 fmt
->dump_unsigned("tid", op
->tid
);
4564 fmt
->dump_stream("last_sent") << op
->last_submit
;
4566 fmt
->open_array_section("pools");
4567 for (list
<string
>::const_iterator it
= op
->pools
.begin();
4568 it
!= op
->pools
.end();
4570 fmt
->dump_string("pool", *it
);
4572 fmt
->close_section(); // pools array
4574 fmt
->close_section(); // pool_stat_op object
4576 fmt
->close_section(); // pool_stat_ops array
4579 void Objecter::dump_statfs_ops(Formatter
*fmt
) const
4581 fmt
->open_array_section("statfs_ops");
4582 for (map
<ceph_tid_t
, StatfsOp
*>::const_iterator p
= statfs_ops
.begin();
4583 p
!= statfs_ops
.end();
4585 StatfsOp
*op
= p
->second
;
4586 fmt
->open_object_section("statfs_op");
4587 fmt
->dump_unsigned("tid", op
->tid
);
4588 fmt
->dump_stream("last_sent") << op
->last_submit
;
4589 fmt
->close_section(); // statfs_op object
4591 fmt
->close_section(); // statfs_ops array
4594 Objecter::RequestStateHook::RequestStateHook(Objecter
*objecter
) :
4595 m_objecter(objecter
)
4599 bool Objecter::RequestStateHook::call(std::string command
, cmdmap_t
& cmdmap
,
4600 std::string format
, bufferlist
& out
)
4602 Formatter
*f
= Formatter::create(format
, "json-pretty", "json-pretty");
4603 shared_lock
rl(m_objecter
->rwlock
);
4604 m_objecter
->dump_requests(f
);
4610 void Objecter::blacklist_self(bool set
)
4612 ldout(cct
, 10) << "blacklist_self " << (set
? "add" : "rm") << dendl
;
4615 cmd
.push_back("{\"prefix\":\"osd blacklist\", ");
4617 cmd
.push_back("\"blacklistop\":\"add\",");
4619 cmd
.push_back("\"blacklistop\":\"rm\",");
4621 ss
<< messenger
->get_myaddr();
4622 cmd
.push_back("\"addr\":\"" + ss
.str() + "\"");
4624 MMonCommand
*m
= new MMonCommand(monc
->get_fsid());
4627 monc
->send_mon_message(m
);
4632 void Objecter::handle_command_reply(MCommandReply
*m
)
4634 unique_lock
wl(rwlock
);
4635 if (!initialized
.read()) {
4640 ConnectionRef con
= m
->get_connection();
4641 OSDSession
*s
= static_cast<OSDSession
*>(con
->get_priv());
4642 if (!s
|| s
->con
!= con
) {
4643 ldout(cct
, 7) << __func__
<< " no session on con " << con
<< dendl
;
4650 OSDSession::shared_lock
sl(s
->lock
);
4651 map
<ceph_tid_t
,CommandOp
*>::iterator p
= s
->command_ops
.find(m
->get_tid());
4652 if (p
== s
->command_ops
.end()) {
4653 ldout(cct
, 10) << "handle_command_reply tid " << m
->get_tid()
4654 << " not found" << dendl
;
4662 CommandOp
*c
= p
->second
;
4664 m
->get_connection() != c
->session
->con
) {
4665 ldout(cct
, 10) << "handle_command_reply tid " << m
->get_tid()
4666 << " got reply from wrong connection "
4667 << m
->get_connection() << " " << m
->get_source_inst()
4676 c
->poutbl
->claim(m
->get_data());
4682 _finish_command(c
, m
->r
, m
->rs
);
4688 void Objecter::submit_command(CommandOp
*c
, ceph_tid_t
*ptid
)
4690 shunique_lock
sul(rwlock
, ceph::acquire_unique
);
4692 ceph_tid_t tid
= last_tid
.inc();
4693 ldout(cct
, 10) << "_submit_command " << tid
<< " " << c
->cmd
<< dendl
;
4697 OSDSession::unique_lock
hs_wl(homeless_session
->lock
);
4698 _session_command_op_assign(homeless_session
, c
);
4701 _calc_command_target(c
, sul
);
4702 _assign_command_session(c
, sul
);
4703 if (osd_timeout
> timespan(0)) {
4704 c
->ontimeout
= timer
.add_event(osd_timeout
,
4706 command_op_cancel(c
->session
, tid
,
4710 if (!c
->session
->is_homeless()) {
4713 _maybe_request_map();
4715 if (c
->map_check_error
)
4716 _send_command_map_check(c
);
4719 logger
->inc(l_osdc_command_active
);
4722 int Objecter::_calc_command_target(CommandOp
*c
, shunique_lock
& sul
)
4724 assert(sul
.owns_lock() && sul
.mutex() == &rwlock
);
4726 c
->map_check_error
= 0;
4728 // ignore overlays, just like we do with pg ops
4729 c
->target
.flags
|= CEPH_OSD_FLAG_IGNORE_OVERLAY
;
4731 if (c
->target_osd
>= 0) {
4732 if (!osdmap
->exists(c
->target_osd
)) {
4733 c
->map_check_error
= -ENOENT
;
4734 c
->map_check_error_str
= "osd dne";
4736 return RECALC_OP_TARGET_OSD_DNE
;
4738 if (osdmap
->is_down(c
->target_osd
)) {
4739 c
->map_check_error
= -ENXIO
;
4740 c
->map_check_error_str
= "osd down";
4742 return RECALC_OP_TARGET_OSD_DOWN
;
4744 c
->target
.osd
= c
->target_osd
;
4746 int ret
= _calc_target(&(c
->target
), nullptr, true);
4747 if (ret
== RECALC_OP_TARGET_POOL_DNE
) {
4748 c
->map_check_error
= -ENOENT
;
4749 c
->map_check_error_str
= "pool dne";
4752 } else if (ret
== RECALC_OP_TARGET_OSD_DOWN
) {
4753 c
->map_check_error
= -ENXIO
;
4754 c
->map_check_error_str
= "osd down";
4761 int r
= _get_session(c
->target
.osd
, &s
, sul
);
4762 assert(r
!= -EAGAIN
); /* shouldn't happen as we're holding the write lock */
4764 if (c
->session
!= s
) {
4766 return RECALC_OP_TARGET_NEED_RESEND
;
4771 ldout(cct
, 20) << "_recalc_command_target " << c
->tid
<< " no change, "
4772 << c
->session
<< dendl
;
4774 return RECALC_OP_TARGET_NO_ACTION
;
4777 void Objecter::_assign_command_session(CommandOp
*c
,
4780 assert(sul
.owns_lock() && sul
.mutex() == &rwlock
);
4783 int r
= _get_session(c
->target
.osd
, &s
, sul
);
4784 assert(r
!= -EAGAIN
); /* shouldn't happen as we're holding the write lock */
4786 if (c
->session
!= s
) {
4788 OSDSession
*cs
= c
->session
;
4789 OSDSession::unique_lock
csl(cs
->lock
);
4790 _session_command_op_remove(c
->session
, c
);
4793 OSDSession::unique_lock
sl(s
->lock
);
4794 _session_command_op_assign(s
, c
);
4800 void Objecter::_send_command(CommandOp
*c
)
4802 ldout(cct
, 10) << "_send_command " << c
->tid
<< dendl
;
4804 assert(c
->session
->con
);
4805 MCommand
*m
= new MCommand(monc
->monmap
.fsid
);
4807 m
->set_data(c
->inbl
);
4809 c
->session
->con
->send_message(m
);
4810 logger
->inc(l_osdc_command_send
);
4813 int Objecter::command_op_cancel(OSDSession
*s
, ceph_tid_t tid
, int r
)
4815 assert(initialized
.read());
4817 unique_lock
wl(rwlock
);
4819 map
<ceph_tid_t
, CommandOp
*>::iterator it
= s
->command_ops
.find(tid
);
4820 if (it
== s
->command_ops
.end()) {
4821 ldout(cct
, 10) << __func__
<< " tid " << tid
<< " dne" << dendl
;
4825 ldout(cct
, 10) << __func__
<< " tid " << tid
<< dendl
;
4827 CommandOp
*op
= it
->second
;
4828 _command_cancel_map_check(op
);
4829 _finish_command(op
, r
, "");
4833 void Objecter::_finish_command(CommandOp
*c
, int r
, string rs
)
4835 // rwlock is locked unique
4837 ldout(cct
, 10) << "_finish_command " << c
->tid
<< " = " << r
<< " "
4842 c
->onfinish
->complete(r
);
4844 if (c
->ontimeout
&& r
!= -ETIMEDOUT
)
4845 timer
.cancel_event(c
->ontimeout
);
4847 OSDSession
*s
= c
->session
;
4848 OSDSession::unique_lock
sl(s
->lock
);
4849 _session_command_op_remove(c
->session
, c
);
4854 logger
->dec(l_osdc_command_active
);
4857 Objecter::OSDSession::~OSDSession()
4859 // Caller is responsible for re-assigning or
4860 // destroying any ops that were assigned to us
4861 assert(ops
.empty());
4862 assert(linger_ops
.empty());
4863 assert(command_ops
.empty());
4866 Objecter::~Objecter()
4870 assert(homeless_session
->get_nref() == 1);
4871 assert(num_homeless_ops
.read() == 0);
4872 homeless_session
->put();
4874 assert(osd_sessions
.empty());
4875 assert(poolstat_ops
.empty());
4876 assert(statfs_ops
.empty());
4877 assert(pool_ops
.empty());
4878 assert(waiting_for_map
.empty());
4879 assert(linger_ops
.empty());
4880 assert(check_latest_map_lingers
.empty());
4881 assert(check_latest_map_ops
.empty());
4882 assert(check_latest_map_commands
.empty());
4884 assert(!m_request_state_hook
);
4889 * Wait until this OSD map epoch is received before
4890 * sending any more operations to OSDs. Use this
4891 * when it is known that the client can't trust
4892 * anything from before this epoch (e.g. due to
4893 * client blacklist at this epoch).
4895 void Objecter::set_epoch_barrier(epoch_t epoch
)
4897 unique_lock
wl(rwlock
);
4899 ldout(cct
, 7) << __func__
<< ": barrier " << epoch
<< " (was "
4900 << epoch_barrier
<< ") current epoch " << osdmap
->get_epoch()
4902 if (epoch
> epoch_barrier
) {
4903 epoch_barrier
= epoch
;
4904 _maybe_request_map();
4910 hobject_t
Objecter::enumerate_objects_begin()
4915 hobject_t
Objecter::enumerate_objects_end()
4917 return hobject_t::get_max();
4920 struct C_EnumerateReply
: public Context
{
4925 std::list
<librados::ListObjectImpl
> *result
;
4926 const hobject_t end
;
4927 const int64_t pool_id
;
4933 C_EnumerateReply(Objecter
*objecter_
, hobject_t
*next_
,
4934 std::list
<librados::ListObjectImpl
> *result_
,
4935 const hobject_t end_
, const int64_t pool_id_
, Context
*on_finish_
) :
4936 objecter(objecter_
), next(next_
), result(result_
),
4937 end(end_
), pool_id(pool_id_
), on_finish(on_finish_
),
4941 void finish(int r
) override
{
4942 objecter
->_enumerate_reply(
4943 bl
, r
, end
, pool_id
, budget
, epoch
, result
, next
, on_finish
);
4947 void Objecter::enumerate_objects(
4949 const std::string
&ns
,
4950 const hobject_t
&start
,
4951 const hobject_t
&end
,
4953 const bufferlist
&filter_bl
,
4954 std::list
<librados::ListObjectImpl
> *result
,
4960 if (!end
.is_max() && start
> end
) {
4961 lderr(cct
) << __func__
<< ": start " << start
<< " > end " << end
<< dendl
;
4962 on_finish
->complete(-EINVAL
);
4967 lderr(cct
) << __func__
<< ": result size may not be zero" << dendl
;
4968 on_finish
->complete(-EINVAL
);
4972 if (start
.is_max()) {
4973 on_finish
->complete(0);
4977 shared_lock
rl(rwlock
);
4978 assert(osdmap
->get_epoch());
4979 if (!osdmap
->test_flag(CEPH_OSDMAP_SORTBITWISE
)) {
4981 lderr(cct
) << __func__
<< ": SORTBITWISE cluster flag not set" << dendl
;
4982 on_finish
->complete(-EOPNOTSUPP
);
4985 const pg_pool_t
*p
= osdmap
->get_pg_pool(pool_id
);
4987 lderr(cct
) << __func__
<< ": pool " << pool_id
<< " DNE in osd epoch "
4988 << osdmap
->get_epoch() << dendl
;
4990 on_finish
->complete(-ENOENT
);
4996 ldout(cct
, 20) << __func__
<< ": start=" << start
<< " end=" << end
<< dendl
;
4998 // Stash completion state
4999 C_EnumerateReply
*on_ack
= new C_EnumerateReply(
5000 this, next
, result
, end
, pool_id
, on_finish
);
5003 op
.pg_nls(max
, filter_bl
, start
, 0);
5005 // Issue. See you later in _enumerate_reply
5006 object_locator_t
oloc(pool_id
, ns
);
5007 pg_read(start
.get_hash(), oloc
, op
,
5008 &on_ack
->bl
, 0, on_ack
, &on_ack
->epoch
, &on_ack
->budget
);
5011 void Objecter::_enumerate_reply(
5014 const hobject_t
&end
,
5015 const int64_t pool_id
,
5017 epoch_t reply_epoch
,
5018 std::list
<librados::ListObjectImpl
> *result
,
5023 put_op_budget_bytes(budget
);
5027 ldout(cct
, 4) << __func__
<< ": remote error " << r
<< dendl
;
5028 on_finish
->complete(r
);
5032 assert(next
!= NULL
);
5034 // Decode the results
5035 bufferlist::iterator iter
= bl
.begin();
5036 pg_nls_response_t response
;
5038 // XXX extra_info doesn't seem used anywhere?
5039 bufferlist extra_info
;
5040 ::decode(response
, iter
);
5042 ::decode(extra_info
, iter
);
5045 ldout(cct
, 10) << __func__
<< ": got " << response
.entries
.size()
5046 << " handle " << response
.handle
5047 << " reply_epoch " << reply_epoch
<< dendl
;
5048 ldout(cct
, 20) << __func__
<< ": response.entries.size "
5049 << response
.entries
.size() << ", response.entries "
5050 << response
.entries
<< dendl
;
5051 if (response
.handle
<= end
) {
5052 *next
= response
.handle
;
5054 ldout(cct
, 10) << __func__
<< ": adjusted next down to end " << end
5058 // drop anything after 'end'
5059 shared_lock
rl(rwlock
);
5060 const pg_pool_t
*pool
= osdmap
->get_pg_pool(pool_id
);
5062 // pool is gone, drop any results which are now meaningless.
5064 on_finish
->complete(-ENOENT
);
5067 while (!response
.entries
.empty()) {
5068 uint32_t hash
= response
.entries
.back().locator
.empty() ?
5069 pool
->hash_key(response
.entries
.back().oid
,
5070 response
.entries
.back().nspace
) :
5071 pool
->hash_key(response
.entries
.back().locator
,
5072 response
.entries
.back().nspace
);
5073 hobject_t
last(response
.entries
.back().oid
,
5074 response
.entries
.back().locator
,
5078 response
.entries
.back().nspace
);
5081 ldout(cct
, 20) << __func__
<< " dropping item " << last
5082 << " >= end " << end
<< dendl
;
5083 response
.entries
.pop_back();
5087 if (!response
.entries
.empty()) {
5088 result
->merge(response
.entries
);
5091 // release the listing context's budget once all
5092 // OPs (in the session) are finished
5094 put_nlist_context_budget(list_context
);
5096 on_finish
->complete(r
);
5101 using namespace librados
;
5103 template <typename T
>
5104 void do_decode(std::vector
<T
>& items
, std::vector
<bufferlist
>& bls
)
5106 for (auto bl
: bls
) {
5107 auto p
= bl
.begin();
5114 struct C_ObjectOperation_scrub_ls
: public Context
{
5117 std::vector
<inconsistent_obj_t
> *objects
= nullptr;
5118 std::vector
<inconsistent_snapset_t
> *snapsets
= nullptr;
5121 C_ObjectOperation_scrub_ls(uint32_t *interval
,
5122 std::vector
<inconsistent_obj_t
> *objects
,
5124 : interval(interval
), objects(objects
), rval(rval
) {}
5125 C_ObjectOperation_scrub_ls(uint32_t *interval
,
5126 std::vector
<inconsistent_snapset_t
> *snapsets
,
5128 : interval(interval
), snapsets(snapsets
), rval(rval
) {}
5129 void finish(int r
) override
{
5130 if (r
< 0 && r
!= -EAGAIN
) {
5141 } catch (buffer::error
&) {
5148 scrub_ls_result_t result
;
5149 auto p
= bl
.begin();
5151 *interval
= result
.interval
;
5153 do_decode(*objects
, result
.vals
);
5155 do_decode(*snapsets
, result
.vals
);
5160 template <typename T
>
5161 void do_scrub_ls(::ObjectOperation
*op
,
5162 const scrub_ls_arg_t
& arg
,
5163 std::vector
<T
> *items
,
5167 OSDOp
& osd_op
= op
->add_op(CEPH_OSD_OP_SCRUBLS
);
5168 op
->flags
|= CEPH_OSD_FLAG_PGOP
;
5170 arg
.encode(osd_op
.indata
);
5171 unsigned p
= op
->ops
.size() - 1;
5172 auto *h
= new C_ObjectOperation_scrub_ls
{interval
, items
, rval
};
5173 op
->out_handler
[p
] = h
;
5174 op
->out_bl
[p
] = &h
->bl
;
5175 op
->out_rval
[p
] = rval
;
5179 void ::ObjectOperation::scrub_ls(const librados::object_id_t
& start_after
,
5180 uint64_t max_to_get
,
5181 std::vector
<librados::inconsistent_obj_t
> *objects
,
5185 scrub_ls_arg_t arg
= {*interval
, 0, start_after
, max_to_get
};
5186 do_scrub_ls(this, arg
, objects
, interval
, rval
);
5189 void ::ObjectOperation::scrub_ls(const librados::object_id_t
& start_after
,
5190 uint64_t max_to_get
,
5191 std::vector
<librados::inconsistent_snapset_t
> *snapsets
,
5195 scrub_ls_arg_t arg
= {*interval
, 1, start_after
, max_to_get
};
5196 do_scrub_ls(this, arg
, snapsets
, interval
, rval
);