1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
19 #include "common/config.h"
20 #include "events/EOpen.h"
21 #include "events/EUpdate.h"
23 #include "MDBalancer.h"
28 #include "messages/MInodeFileCaps.h"
29 #include "messages/MMDSPeerRequest.h"
31 #include "msg/Messenger.h"
32 #include "osdc/Objecter.h"
34 #define dout_subsys ceph_subsys_mds
36 #define dout_context g_ceph_context
37 #define dout_prefix _prefix(_dout, mds)
41 static ostream
& _prefix(std::ostream
*_dout
, MDSRank
*mds
) {
42 return *_dout
<< "mds." << mds
->get_nodeid() << ".locker ";
46 class LockerContext
: public MDSContext
{
49 MDSRank
*get_mds() override
55 explicit LockerContext(Locker
*locker_
) : locker(locker_
) {
56 ceph_assert(locker
!= NULL
);
60 class LockerLogContext
: public MDSLogContextBase
{
63 MDSRank
*get_mds() override
69 explicit LockerLogContext(Locker
*locker_
) : locker(locker_
) {
70 ceph_assert(locker
!= NULL
);
74 Locker::Locker(MDSRank
*m
, MDCache
*c
) :
75 need_snapflush_inodes(member_offset(CInode
, item_caps
)), mds(m
), mdcache(c
) {}
78 void Locker::dispatch(const cref_t
<Message
> &m
)
81 switch (m
->get_type()) {
84 handle_lock(ref_cast
<MLock
>(m
));
87 case MSG_MDS_INODEFILECAPS
:
88 handle_inode_file_caps(ref_cast
<MInodeFileCaps
>(m
));
91 case CEPH_MSG_CLIENT_CAPS
:
92 handle_client_caps(ref_cast
<MClientCaps
>(m
));
94 case CEPH_MSG_CLIENT_CAPRELEASE
:
95 handle_client_cap_release(ref_cast
<MClientCapRelease
>(m
));
97 case CEPH_MSG_CLIENT_LEASE
:
98 handle_client_lease(ref_cast
<MClientLease
>(m
));
101 derr
<< "locker unknown message " << m
->get_type() << dendl
;
102 ceph_abort_msg("locker unknown message");
119 void Locker::send_lock_message(SimpleLock
*lock
, int msg
)
121 for (const auto &it
: lock
->get_parent()->get_replicas()) {
122 if (mds
->is_cluster_degraded() &&
123 mds
->mdsmap
->get_state(it
.first
) < MDSMap::STATE_REJOIN
)
125 auto m
= make_message
<MLock
>(lock
, msg
, mds
->get_nodeid());
126 mds
->send_message_mds(m
, it
.first
);
130 void Locker::send_lock_message(SimpleLock
*lock
, int msg
, const bufferlist
&data
)
132 for (const auto &it
: lock
->get_parent()->get_replicas()) {
133 if (mds
->is_cluster_degraded() &&
134 mds
->mdsmap
->get_state(it
.first
) < MDSMap::STATE_REJOIN
)
136 auto m
= make_message
<MLock
>(lock
, msg
, mds
->get_nodeid());
138 mds
->send_message_mds(m
, it
.first
);
142 bool Locker::try_rdlock_snap_layout(CInode
*in
, MDRequestRef
& mdr
,
143 int n
, bool want_layout
)
145 dout(10) << __func__
<< " " << *mdr
<< " " << *in
<< dendl
;
146 // rdlock ancestor snaps
149 bool found_locked
= false;
150 bool found_layout
= false;
155 client_t client
= mdr
->get_client();
160 if (!found_locked
&& mdr
->is_rdlocked(&t
->snaplock
))
164 if (!t
->snaplock
.can_rdlock(client
)) {
165 t
->snaplock
.add_waiter(SimpleLock::WAIT_RD
, new C_MDS_RetryRequest(mdcache
, mdr
));
168 t
->snaplock
.get_rdlock();
169 mdr
->locks
.emplace(&t
->snaplock
, MutationImpl::LockOp::RDLOCK
);
170 dout(20) << " got rdlock on " << t
->snaplock
<< " " << *t
<< dendl
;
172 if (want_layout
&& !found_layout
) {
173 if (!mdr
->is_rdlocked(&t
->policylock
)) {
174 if (!t
->policylock
.can_rdlock(client
)) {
175 t
->policylock
.add_waiter(SimpleLock::WAIT_RD
, new C_MDS_RetryRequest(mdcache
, mdr
));
178 t
->policylock
.get_rdlock();
179 mdr
->locks
.emplace(&t
->policylock
, MutationImpl::LockOp::RDLOCK
);
180 dout(20) << " got rdlock on " << t
->policylock
<< " " << *t
<< dendl
;
182 if (t
->get_projected_inode()->has_layout()) {
183 mdr
->dir_layout
= t
->get_projected_inode()->layout
;
187 CDentry
* pdn
= t
->get_projected_parent_dn();
192 t
= pdn
->get_dir()->get_inode();
195 mdr
->dir_root
[n
] = root
;
196 mdr
->dir_depth
[n
] = depth
;
200 dout(10) << __func__
<< " failed" << dendl
;
202 drop_locks(mdr
.get(), nullptr);
203 mdr
->drop_local_auth_pins();
207 struct MarkEventOnDestruct
{
209 std::string_view message
;
211 MarkEventOnDestruct(MDRequestRef
& _mdr
, std::string_view _message
) :
215 ~MarkEventOnDestruct() {
217 mdr
->mark_event(message
);
221 /* If this function returns false, the mdr has been placed
222 * on the appropriate wait list */
223 bool Locker::acquire_locks(MDRequestRef
& mdr
,
224 MutationImpl::LockOpVec
& lov
,
225 CInode
*auth_pin_freeze
,
226 bool auth_pin_nonblocking
)
228 dout(10) << "acquire_locks " << *mdr
<< dendl
;
230 MarkEventOnDestruct
marker(mdr
, "failed to acquire_locks");
232 client_t client
= mdr
->get_client();
234 set
<MDSCacheObject
*> mustpin
; // items to authpin
236 mustpin
.insert(auth_pin_freeze
);
239 for (size_t i
= 0; i
< lov
.size(); ++i
) {
241 SimpleLock
*lock
= p
.lock
;
242 MDSCacheObject
*object
= lock
->get_parent();
245 if ((lock
->get_type() == CEPH_LOCK_ISNAP
||
246 lock
->get_type() == CEPH_LOCK_IPOLICY
) &&
247 mds
->is_cluster_degraded() &&
249 !mdr
->is_queued_for_replay()) {
250 // waiting for recovering mds, to guarantee replayed requests and mksnap/setlayout
251 // get processed in proper order.
253 if (object
->is_auth()) {
254 if (!mdr
->is_xlocked(lock
)) {
256 object
->list_replicas(ls
);
258 if (mds
->mdsmap
->get_state(m
) < MDSMap::STATE_ACTIVE
) {
265 // if the lock is the latest locked one, it's possible that peer mds got the lock
266 // while there are recovering mds.
267 if (!mdr
->is_xlocked(lock
) || mdr
->is_last_locked(lock
))
271 dout(10) << " must xlock " << *lock
<< " " << *object
272 << ", waiting for cluster recovered" << dendl
;
273 mds
->locker
->drop_locks(mdr
.get(), NULL
);
274 mdr
->drop_local_auth_pins();
275 mds
->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache
, mdr
));
280 dout(20) << " must xlock " << *lock
<< " " << *object
<< dendl
;
282 mustpin
.insert(object
);
284 // augment xlock with a versionlock?
285 if (lock
->get_type() == CEPH_LOCK_DN
) {
286 CDentry
*dn
= static_cast<CDentry
*>(object
);
289 if (mdr
->is_leader()) {
290 // leader. wrlock versionlock so we can pipeline dentry updates to journal.
291 lov
.add_wrlock(&dn
->versionlock
, i
+ 1);
293 // peer. exclusively lock the dentry version (i.e. block other journal updates).
294 // this makes rollback safe.
295 lov
.add_xlock(&dn
->versionlock
, i
+ 1);
298 if (lock
->get_type() >= CEPH_LOCK_IFIRST
&& lock
->get_type() != CEPH_LOCK_IVERSION
) {
299 // inode version lock?
300 CInode
*in
= static_cast<CInode
*>(object
);
303 if (mdr
->is_leader()) {
304 // leader. wrlock versionlock so we can pipeline inode updates to journal.
305 lov
.add_wrlock(&in
->versionlock
, i
+ 1);
307 // peer. exclusively lock the inode version (i.e. block other journal updates).
308 // this makes rollback safe.
309 lov
.add_xlock(&in
->versionlock
, i
+ 1);
312 } else if (p
.is_wrlock()) {
313 dout(20) << " must wrlock " << *lock
<< " " << *object
<< dendl
;
314 client_t _client
= p
.is_state_pin() ? lock
->get_excl_client() : client
;
315 if (object
->is_auth()) {
316 mustpin
.insert(object
);
317 } else if (!object
->is_auth() &&
318 !lock
->can_wrlock(_client
) && // we might have to request a scatter
319 !mdr
->is_peer()) { // if we are peer (remote_wrlock), the leader already authpinned
320 dout(15) << " will also auth_pin " << *object
321 << " in case we need to request a scatter" << dendl
;
322 mustpin
.insert(object
);
324 } else if (p
.is_remote_wrlock()) {
325 dout(20) << " must remote_wrlock on mds." << p
.wrlock_target
<< " "
326 << *lock
<< " " << *object
<< dendl
;
327 mustpin
.insert(object
);
328 } else if (p
.is_rdlock()) {
330 dout(20) << " must rdlock " << *lock
<< " " << *object
<< dendl
;
331 if (object
->is_auth()) {
332 mustpin
.insert(object
);
333 } else if (!object
->is_auth() &&
334 !lock
->can_rdlock(client
)) { // we might have to request an rdlock
335 dout(15) << " will also auth_pin " << *object
336 << " in case we need to request a rdlock" << dendl
;
337 mustpin
.insert(object
);
340 ceph_assert(0 == "locker unknown lock operation");
344 lov
.sort_and_merge();
347 map
<mds_rank_t
, set
<MDSCacheObject
*> > mustpin_remote
; // mds -> (object set)
349 // can i auth pin them all now?
350 marker
.message
= "failed to authpin local pins";
351 for (const auto &p
: mustpin
) {
352 MDSCacheObject
*object
= p
;
354 dout(10) << " must authpin " << *object
<< dendl
;
356 if (mdr
->is_auth_pinned(object
)) {
357 if (object
!= (MDSCacheObject
*)auth_pin_freeze
)
359 if (mdr
->more()->is_remote_frozen_authpin
) {
360 if (mdr
->more()->rename_inode
== auth_pin_freeze
)
362 // unfreeze auth pin for the wrong inode
363 mustpin_remote
[mdr
->more()->rename_inode
->authority().first
].size();
367 if (!object
->is_auth()) {
368 if (mdr
->lock_cache
) { // debug
369 ceph_assert(mdr
->lock_cache
->opcode
== CEPH_MDS_OP_UNLINK
);
370 CDentry
*dn
= mdr
->dn
[0].back();
371 ceph_assert(dn
->get_projected_linkage()->is_remote());
374 if (object
->is_ambiguous_auth()) {
376 dout(10) << " ambiguous auth, waiting to authpin " << *object
<< dendl
;
377 mdr
->disable_lock_cache();
378 drop_locks(mdr
.get());
379 mdr
->drop_local_auth_pins();
380 marker
.message
= "waiting for single auth, object is being migrated";
381 object
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
, new C_MDS_RetryRequest(mdcache
, mdr
));
384 mustpin_remote
[object
->authority().first
].insert(object
);
388 if (!object
->can_auth_pin(&err
)) {
389 if (mdr
->lock_cache
) {
391 if (CInode
*in
= dynamic_cast<CInode
*>(object
)) {
392 ceph_assert(!in
->is_frozen_inode() && !in
->is_frozen_auth_pin());
393 dir
= in
->get_projected_parent_dir();
394 } else if (CDentry
*dn
= dynamic_cast<CDentry
*>(object
)) {
397 ceph_assert(0 == "unknown type of lock parent");
399 if (dir
->get_inode() == mdr
->lock_cache
->get_dir_inode()) {
400 // forcibly auth pin if there is lock cache on parent dir
405 ceph_assert(mdr
->lock_cache
->opcode
== CEPH_MDS_OP_UNLINK
);
406 CDentry
*dn
= mdr
->dn
[0].back();
407 ceph_assert(dn
->get_projected_linkage()->is_remote());
412 mdr
->disable_lock_cache();
413 drop_locks(mdr
.get());
414 mdr
->drop_local_auth_pins();
415 if (auth_pin_nonblocking
) {
416 dout(10) << " can't auth_pin (freezing?) " << *object
<< ", nonblocking" << dendl
;
420 if (err
== MDSCacheObject::ERR_EXPORTING_TREE
) {
421 marker
.message
= "failed to authpin, subtree is being exported";
422 } else if (err
== MDSCacheObject::ERR_FRAGMENTING_DIR
) {
423 marker
.message
= "failed to authpin, dir is being fragmented";
424 } else if (err
== MDSCacheObject::ERR_EXPORTING_INODE
) {
425 marker
.message
= "failed to authpin, inode is being exported";
427 dout(10) << " can't auth_pin (freezing?), waiting to authpin " << *object
<< dendl
;
428 object
->add_waiter(MDSCacheObject::WAIT_UNFREEZE
, new C_MDS_RetryRequest(mdcache
, mdr
));
430 if (mdr
->is_any_remote_auth_pin())
431 notify_freeze_waiter(object
);
437 // ok, grab local auth pins
438 for (const auto& p
: mustpin
) {
439 MDSCacheObject
*object
= p
;
440 if (mdr
->is_auth_pinned(object
)) {
441 dout(10) << " already auth_pinned " << *object
<< dendl
;
442 } else if (object
->is_auth()) {
443 dout(10) << " auth_pinning " << *object
<< dendl
;
444 mdr
->auth_pin(object
);
448 // request remote auth_pins
449 if (!mustpin_remote
.empty()) {
450 marker
.message
= "requesting remote authpins";
451 for (const auto& p
: mdr
->object_states
) {
452 if (p
.second
.remote_auth_pinned
== MDS_RANK_NONE
)
454 ceph_assert(p
.second
.remote_auth_pinned
== p
.first
->authority().first
);
455 auto q
= mustpin_remote
.find(p
.second
.remote_auth_pinned
);
456 if (q
!= mustpin_remote
.end())
457 q
->second
.insert(p
.first
);
460 for (auto& p
: mustpin_remote
) {
461 dout(10) << "requesting remote auth_pins from mds." << p
.first
<< dendl
;
463 // wait for active auth
464 if (mds
->is_cluster_degraded() &&
465 !mds
->mdsmap
->is_clientreplay_or_active_or_stopping(p
.first
)) {
466 dout(10) << " mds." << p
.first
<< " is not active" << dendl
;
467 if (mdr
->more()->waiting_on_peer
.empty())
468 mds
->wait_for_active_peer(p
.first
, new C_MDS_RetryRequest(mdcache
, mdr
));
472 auto req
= make_message
<MMDSPeerRequest
>(mdr
->reqid
, mdr
->attempt
,
473 MMDSPeerRequest::OP_AUTHPIN
);
474 for (auto& o
: p
.second
) {
475 dout(10) << " req remote auth_pin of " << *o
<< dendl
;
476 MDSCacheObjectInfo info
;
477 o
->set_object_info(info
);
478 req
->get_authpins().push_back(info
);
479 if (o
== auth_pin_freeze
)
480 o
->set_object_info(req
->get_authpin_freeze());
483 if (auth_pin_nonblocking
)
484 req
->mark_nonblocking();
485 else if (!mdr
->locks
.empty())
486 req
->mark_notify_blocking();
488 mds
->send_message_mds(req
, p
.first
);
490 // put in waiting list
491 auto ret
= mdr
->more()->waiting_on_peer
.insert(p
.first
);
492 ceph_assert(ret
.second
);
497 // caps i'll need to issue
498 set
<CInode
*> issue_set
;
502 // make sure they match currently acquired locks.
503 for (const auto& p
: lov
) {
506 if (mdr
->is_xlocked(lock
)) {
507 dout(10) << " already xlocked " << *lock
<< " " << *lock
->get_parent() << dendl
;
510 if (mdr
->locking
&& lock
!= mdr
->locking
)
511 cancel_locking(mdr
.get(), &issue_set
);
512 if (!xlock_start(lock
, mdr
)) {
513 marker
.message
= "failed to xlock, waiting";
516 dout(10) << " got xlock on " << *lock
<< " " << *lock
->get_parent() << dendl
;
517 } else if (p
.is_wrlock() || p
.is_remote_wrlock()) {
518 auto it
= mdr
->locks
.find(lock
);
519 if (p
.is_remote_wrlock()) {
520 if (it
!= mdr
->locks
.end() && it
->is_remote_wrlock()) {
521 dout(10) << " already remote_wrlocked " << *lock
<< " " << *lock
->get_parent() << dendl
;
523 if (mdr
->locking
&& lock
!= mdr
->locking
)
524 cancel_locking(mdr
.get(), &issue_set
);
525 marker
.message
= "waiting for remote wrlocks";
526 remote_wrlock_start(lock
, p
.wrlock_target
, mdr
);
531 if (it
!= mdr
->locks
.end() && it
->is_wrlock()) {
532 dout(10) << " already wrlocked " << *lock
<< " " << *lock
->get_parent() << dendl
;
535 client_t _client
= p
.is_state_pin() ? lock
->get_excl_client() : client
;
536 if (p
.is_remote_wrlock()) {
537 // nowait if we have already gotten remote wrlock
538 if (!wrlock_try(lock
, mdr
, _client
)) {
539 marker
.message
= "failed to wrlock, dropping remote wrlock and waiting";
540 // can't take the wrlock because the scatter lock is gathering. need to
541 // release the remote wrlock, so that the gathering process can finish.
542 ceph_assert(it
!= mdr
->locks
.end());
543 remote_wrlock_finish(it
, mdr
.get());
544 remote_wrlock_start(lock
, p
.wrlock_target
, mdr
);
548 if (!wrlock_start(p
, mdr
)) {
549 ceph_assert(!p
.is_remote_wrlock());
550 marker
.message
= "failed to wrlock, waiting";
554 dout(10) << " got wrlock on " << *lock
<< " " << *lock
->get_parent() << dendl
;
557 if (mdr
->is_rdlocked(lock
)) {
558 dout(10) << " already rdlocked " << *lock
<< " " << *lock
->get_parent() << dendl
;
562 ceph_assert(mdr
->is_leader());
563 if (lock
->needs_recover()) {
564 if (mds
->is_cluster_degraded()) {
565 if (!mdr
->is_queued_for_replay()) {
566 // see comments in SimpleLock::set_state_rejoin() and
567 // ScatterLock::encode_state_for_rejoin()
568 drop_locks(mdr
.get());
569 mds
->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache
, mdr
));
570 dout(10) << " rejoin recovering " << *lock
<< " " << *lock
->get_parent()
571 << ", waiting for cluster recovered" << dendl
;
572 marker
.message
= "rejoin recovering lock, waiting for cluster recovered";
576 lock
->clear_need_recover();
580 if (!rdlock_start(lock
, mdr
)) {
581 marker
.message
= "failed to rdlock, waiting";
584 dout(10) << " got rdlock on " << *lock
<< " " << *lock
->get_parent() << dendl
;
588 mdr
->set_mds_stamp(ceph_clock_now());
590 marker
.message
= "acquired locks";
593 issue_caps_set(issue_set
);
597 void Locker::notify_freeze_waiter(MDSCacheObject
*o
)
600 if (CInode
*in
= dynamic_cast<CInode
*>(o
)) {
602 dir
= in
->get_parent_dir();
603 } else if (CDentry
*dn
= dynamic_cast<CDentry
*>(o
)) {
606 dir
= dynamic_cast<CDir
*>(o
);
610 if (dir
->is_freezing_dir())
611 mdcache
->fragment_freeze_inc_num_waiters(dir
);
612 if (dir
->is_freezing_tree()) {
613 while (!dir
->is_freezing_tree_root())
614 dir
= dir
->get_parent_dir();
615 mdcache
->migrator
->export_freeze_inc_num_waiters(dir
);
620 void Locker::set_xlocks_done(MutationImpl
*mut
, bool skip_dentry
)
622 for (const auto &p
: mut
->locks
) {
625 MDSCacheObject
*obj
= p
.lock
->get_parent();
626 ceph_assert(obj
->is_auth());
628 (p
.lock
->get_type() == CEPH_LOCK_DN
|| p
.lock
->get_type() == CEPH_LOCK_DVERSION
))
630 dout(10) << "set_xlocks_done on " << *p
.lock
<< " " << *obj
<< dendl
;
631 p
.lock
->set_xlock_done();
635 void Locker::_drop_locks(MutationImpl
*mut
, set
<CInode
*> *pneed_issue
,
638 set
<mds_rank_t
> peers
;
640 for (auto it
= mut
->locks
.begin(); it
!= mut
->locks
.end(); ) {
641 SimpleLock
*lock
= it
->lock
;
642 MDSCacheObject
*obj
= lock
->get_parent();
644 if (it
->is_xlock()) {
645 if (obj
->is_auth()) {
647 xlock_finish(it
++, mut
, &ni
);
649 pneed_issue
->insert(static_cast<CInode
*>(obj
));
651 ceph_assert(lock
->get_sm()->can_remote_xlock
);
652 peers
.insert(obj
->authority().first
);
654 mut
->locks
.erase(it
++);
656 } else if (it
->is_wrlock() || it
->is_remote_wrlock()) {
657 if (it
->is_remote_wrlock()) {
658 peers
.insert(it
->wrlock_target
);
659 it
->clear_remote_wrlock();
661 if (it
->is_wrlock()) {
663 wrlock_finish(it
++, mut
, &ni
);
665 pneed_issue
->insert(static_cast<CInode
*>(obj
));
667 mut
->locks
.erase(it
++);
669 } else if (drop_rdlocks
&& it
->is_rdlock()) {
671 rdlock_finish(it
++, mut
, &ni
);
673 pneed_issue
->insert(static_cast<CInode
*>(obj
));
680 if (mut
->lock_cache
) {
681 put_lock_cache(mut
->lock_cache
);
682 mut
->lock_cache
= nullptr;
686 for (set
<mds_rank_t
>::iterator p
= peers
.begin(); p
!= peers
.end(); ++p
) {
687 if (!mds
->is_cluster_degraded() ||
688 mds
->mdsmap
->get_state(*p
) >= MDSMap::STATE_REJOIN
) {
689 dout(10) << "_drop_non_rdlocks dropping remote locks on mds." << *p
<< dendl
;
690 auto peerreq
= make_message
<MMDSPeerRequest
>(mut
->reqid
, mut
->attempt
,
691 MMDSPeerRequest::OP_DROPLOCKS
);
692 mds
->send_message_mds(peerreq
, *p
);
697 void Locker::cancel_locking(MutationImpl
*mut
, set
<CInode
*> *pneed_issue
)
699 SimpleLock
*lock
= mut
->locking
;
701 dout(10) << "cancel_locking " << *lock
<< " on " << *mut
<< dendl
;
703 if (lock
->get_parent()->is_auth()) {
704 bool need_issue
= false;
705 if (lock
->get_state() == LOCK_PREXLOCK
) {
706 _finish_xlock(lock
, -1, &need_issue
);
707 } else if (lock
->get_state() == LOCK_LOCK_XLOCK
) {
708 lock
->set_state(LOCK_XLOCKDONE
);
709 eval_gather(lock
, true, &need_issue
);
712 pneed_issue
->insert(static_cast<CInode
*>(lock
->get_parent()));
714 mut
->finish_locking(lock
);
717 void Locker::drop_locks(MutationImpl
*mut
, set
<CInode
*> *pneed_issue
)
720 set
<CInode
*> my_need_issue
;
722 pneed_issue
= &my_need_issue
;
725 cancel_locking(mut
, pneed_issue
);
726 _drop_locks(mut
, pneed_issue
, true);
728 if (pneed_issue
== &my_need_issue
)
729 issue_caps_set(*pneed_issue
);
730 mut
->locking_state
= 0;
733 void Locker::drop_non_rdlocks(MutationImpl
*mut
, set
<CInode
*> *pneed_issue
)
735 set
<CInode
*> my_need_issue
;
737 pneed_issue
= &my_need_issue
;
739 _drop_locks(mut
, pneed_issue
, false);
741 if (pneed_issue
== &my_need_issue
)
742 issue_caps_set(*pneed_issue
);
745 void Locker::drop_rdlocks_for_early_reply(MutationImpl
*mut
)
747 set
<CInode
*> need_issue
;
749 for (auto it
= mut
->locks
.begin(); it
!= mut
->locks
.end(); ) {
750 if (!it
->is_rdlock()) {
754 SimpleLock
*lock
= it
->lock
;
755 // make later mksnap/setlayout (at other mds) wait for this unsafe request
756 if (lock
->get_type() == CEPH_LOCK_ISNAP
||
757 lock
->get_type() == CEPH_LOCK_IPOLICY
) {
762 rdlock_finish(it
++, mut
, &ni
);
764 need_issue
.insert(static_cast<CInode
*>(lock
->get_parent()));
767 issue_caps_set(need_issue
);
770 void Locker::drop_locks_for_fragment_unfreeze(MutationImpl
*mut
)
772 set
<CInode
*> need_issue
;
774 for (auto it
= mut
->locks
.begin(); it
!= mut
->locks
.end(); ) {
775 SimpleLock
*lock
= it
->lock
;
776 if (lock
->get_type() == CEPH_LOCK_IDFT
) {
781 wrlock_finish(it
++, mut
, &ni
);
783 need_issue
.insert(static_cast<CInode
*>(lock
->get_parent()));
785 issue_caps_set(need_issue
);
788 class C_MDL_DropCache
: public LockerContext
{
789 MDLockCache
*lock_cache
;
791 C_MDL_DropCache(Locker
*l
, MDLockCache
*lc
) :
792 LockerContext(l
), lock_cache(lc
) { }
793 void finish(int r
) override
{
794 locker
->drop_locks(lock_cache
);
795 lock_cache
->cleanup();
800 void Locker::put_lock_cache(MDLockCache
* lock_cache
)
802 ceph_assert(lock_cache
->ref
> 0);
803 if (--lock_cache
->ref
> 0)
806 ceph_assert(lock_cache
->invalidating
);
808 lock_cache
->detach_locks();
810 CInode
*diri
= lock_cache
->get_dir_inode();
811 for (auto dir
: lock_cache
->auth_pinned_dirfrags
) {
812 if (dir
->get_inode() != diri
)
814 dir
->enable_frozen_inode();
817 mds
->queue_waiter(new C_MDL_DropCache(this, lock_cache
));
820 int Locker::get_cap_bit_for_lock_cache(int op
)
823 case CEPH_MDS_OP_CREATE
:
824 return CEPH_CAP_DIR_CREATE
;
825 case CEPH_MDS_OP_UNLINK
:
826 return CEPH_CAP_DIR_UNLINK
;
828 ceph_assert(0 == "unsupported operation");
833 void Locker::invalidate_lock_cache(MDLockCache
*lock_cache
)
835 ceph_assert(lock_cache
->item_cap_lock_cache
.is_on_list());
836 if (lock_cache
->invalidating
) {
837 ceph_assert(!lock_cache
->client_cap
);
839 lock_cache
->invalidating
= true;
840 lock_cache
->detach_dirfrags();
843 Capability
*cap
= lock_cache
->client_cap
;
845 int cap_bit
= get_cap_bit_for_lock_cache(lock_cache
->opcode
);
846 cap
->clear_lock_cache_allowed(cap_bit
);
847 if (cap
->issued() & cap_bit
)
848 issue_caps(lock_cache
->get_dir_inode(), cap
);
854 lock_cache
->item_cap_lock_cache
.remove_myself();
855 put_lock_cache(lock_cache
);
859 void Locker::eval_lock_caches(Capability
*cap
)
861 for (auto p
= cap
->lock_caches
.begin(); !p
.end(); ) {
862 MDLockCache
*lock_cache
= *p
;
864 if (!lock_cache
->invalidating
)
866 int cap_bit
= get_cap_bit_for_lock_cache(lock_cache
->opcode
);
867 if (!(cap
->issued() & cap_bit
)) {
868 lock_cache
->item_cap_lock_cache
.remove_myself();
869 put_lock_cache(lock_cache
);
874 // ask lock caches to release auth pins
875 void Locker::invalidate_lock_caches(CDir
*dir
)
877 dout(10) << "invalidate_lock_caches on " << *dir
<< dendl
;
878 auto &lock_caches
= dir
->lock_caches_with_auth_pins
;
879 while (!lock_caches
.empty()) {
880 invalidate_lock_cache(lock_caches
.front()->parent
);
884 // ask lock caches to release locks
885 void Locker::invalidate_lock_caches(SimpleLock
*lock
)
887 dout(10) << "invalidate_lock_caches " << *lock
<< " on " << *lock
->get_parent() << dendl
;
888 if (lock
->is_cached()) {
889 auto&& lock_caches
= lock
->get_active_caches();
890 for (auto& lc
: lock_caches
)
891 invalidate_lock_cache(lc
);
895 void Locker::create_lock_cache(MDRequestRef
& mdr
, CInode
*diri
, file_layout_t
*dir_layout
)
900 client_t client
= mdr
->get_client();
901 int opcode
= mdr
->client_request
->get_op();
902 dout(10) << "create_lock_cache for client." << client
<< "/" << ceph_mds_op_name(opcode
)<< " on " << *diri
<< dendl
;
904 if (!diri
->is_auth()) {
905 dout(10) << " dir inode is not auth, noop" << dendl
;
909 if (mdr
->has_more() && !mdr
->more()->peers
.empty()) {
910 dout(10) << " there are peers requests for " << *mdr
<< ", noop" << dendl
;
914 Capability
*cap
= diri
->get_client_cap(client
);
916 dout(10) << " there is no cap for client." << client
<< ", noop" << dendl
;
920 for (auto p
= cap
->lock_caches
.begin(); !p
.end(); ++p
) {
921 if ((*p
)->opcode
== opcode
) {
922 dout(10) << " lock cache already exists for " << ceph_mds_op_name(opcode
) << ", noop" << dendl
;
927 set
<MDSCacheObject
*> ancestors
;
928 for (CInode
*in
= diri
; ; ) {
929 CDentry
*pdn
= in
->get_projected_parent_dn();
932 // ancestors.insert(pdn);
933 in
= pdn
->get_dir()->get_inode();
934 ancestors
.insert(in
);
937 for (auto& p
: mdr
->object_states
) {
938 if (p
.first
!= diri
&& !ancestors
.count(p
.first
))
940 auto& stat
= p
.second
;
941 if (stat
.auth_pinned
) {
942 if (!p
.first
->can_auth_pin()) {
943 dout(10) << " can't auth_pin(freezing?) lock parent " << *p
.first
<< ", noop" << dendl
;
946 if (CInode
*in
= dynamic_cast<CInode
*>(p
.first
); in
->is_parent_projected()) {
947 CDir
*dir
= in
->get_projected_parent_dir();
948 if (!dir
->can_auth_pin()) {
949 dout(10) << " can't auth_pin(!auth|freezing?) dirfrag " << *dir
<< ", noop" << dendl
;
956 std::vector
<CDir
*> dfv
;
957 dfv
.reserve(diri
->get_num_dirfrags());
959 diri
->get_dirfrags(dfv
);
960 for (auto dir
: dfv
) {
961 if (!dir
->is_auth() || !dir
->can_auth_pin()) {
962 dout(10) << " can't auth_pin(!auth|freezing?) dirfrag " << *dir
<< ", noop" << dendl
;
965 if (dir
->is_any_freezing_or_frozen_inode()) {
966 dout(10) << " there is freezing/frozen inode in " << *dir
<< ", noop" << dendl
;
971 for (auto& p
: mdr
->locks
) {
972 MDSCacheObject
*obj
= p
.lock
->get_parent();
973 if (obj
!= diri
&& !ancestors
.count(obj
))
975 if (!p
.lock
->is_stable()) {
976 dout(10) << " unstable " << *p
.lock
<< " on " << *obj
<< ", noop" << dendl
;
981 auto lock_cache
= new MDLockCache(cap
, opcode
);
983 lock_cache
->set_dir_layout(*dir_layout
);
984 cap
->set_lock_cache_allowed(get_cap_bit_for_lock_cache(opcode
));
986 for (auto dir
: dfv
) {
987 // prevent subtree migration
988 lock_cache
->auth_pin(dir
);
989 // prevent frozen inode
990 dir
->disable_frozen_inode();
993 for (auto& p
: mdr
->object_states
) {
994 if (p
.first
!= diri
&& !ancestors
.count(p
.first
))
996 auto& stat
= p
.second
;
997 if (stat
.auth_pinned
)
998 lock_cache
->auth_pin(p
.first
);
1000 lock_cache
->pin(p
.first
);
1002 if (CInode
*in
= dynamic_cast<CInode
*>(p
.first
)) {
1003 CDentry
*pdn
= in
->get_projected_parent_dn();
1005 dfv
.push_back(pdn
->get_dir());
1006 } else if (CDentry
*dn
= dynamic_cast<CDentry
*>(p
.first
)) {
1007 dfv
.push_back(dn
->get_dir());
1009 ceph_assert(0 == "unknown type of lock parent");
1012 lock_cache
->attach_dirfrags(std::move(dfv
));
1014 for (auto it
= mdr
->locks
.begin(); it
!= mdr
->locks
.end(); ) {
1015 MDSCacheObject
*obj
= it
->lock
->get_parent();
1016 if (obj
!= diri
&& !ancestors
.count(obj
)) {
1020 unsigned lock_flag
= 0;
1021 if (it
->is_wrlock()) {
1022 // skip wrlocks that were added by MDCache::predirty_journal_parent()
1024 lock_flag
= MutationImpl::LockOp::WRLOCK
;
1026 ceph_assert(it
->is_rdlock());
1027 lock_flag
= MutationImpl::LockOp::RDLOCK
;
1030 lock_cache
->emplace_lock(it
->lock
, lock_flag
);
1031 mdr
->locks
.erase(it
++);
1036 lock_cache
->attach_locks();
1039 mdr
->lock_cache
= lock_cache
;
1042 bool Locker::find_and_attach_lock_cache(MDRequestRef
& mdr
, CInode
*diri
)
1044 if (mdr
->lock_cache
)
1047 Capability
*cap
= diri
->get_client_cap(mdr
->get_client());
1051 int opcode
= mdr
->client_request
->get_op();
1052 for (auto p
= cap
->lock_caches
.begin(); !p
.end(); ++p
) {
1053 MDLockCache
*lock_cache
= *p
;
1054 if (lock_cache
->opcode
== opcode
) {
1055 dout(10) << "found lock cache for " << ceph_mds_op_name(opcode
) << " on " << *diri
<< dendl
;
1056 mdr
->lock_cache
= lock_cache
;
1057 mdr
->lock_cache
->ref
++;
1066 void Locker::eval_gather(SimpleLock
*lock
, bool first
, bool *pneed_issue
, MDSContext::vec
*pfinishers
)
1068 dout(10) << "eval_gather " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1069 ceph_assert(!lock
->is_stable());
1071 int next
= lock
->get_next_state();
1074 bool caps
= lock
->get_cap_shift();
1075 if (lock
->get_type() != CEPH_LOCK_DN
)
1076 in
= static_cast<CInode
*>(lock
->get_parent());
1078 bool need_issue
= false;
1080 int loner_issued
= 0, other_issued
= 0, xlocker_issued
= 0;
1081 ceph_assert(!caps
|| in
!= NULL
);
1082 if (caps
&& in
->is_head()) {
1083 in
->get_caps_issued(&loner_issued
, &other_issued
, &xlocker_issued
,
1084 lock
->get_cap_shift(), lock
->get_cap_mask());
1085 dout(10) << " next state is " << lock
->get_state_name(next
)
1086 << " issued/allows loner " << gcap_string(loner_issued
)
1087 << "/" << gcap_string(lock
->gcaps_allowed(CAP_LONER
, next
))
1088 << " xlocker " << gcap_string(xlocker_issued
)
1089 << "/" << gcap_string(lock
->gcaps_allowed(CAP_XLOCKER
, next
))
1090 << " other " << gcap_string(other_issued
)
1091 << "/" << gcap_string(lock
->gcaps_allowed(CAP_ANY
, next
))
1094 if (first
&& ((~lock
->gcaps_allowed(CAP_ANY
, next
) & other_issued
) ||
1095 (~lock
->gcaps_allowed(CAP_LONER
, next
) & loner_issued
) ||
1096 (~lock
->gcaps_allowed(CAP_XLOCKER
, next
) & xlocker_issued
)))
1100 #define IS_TRUE_AND_LT_AUTH(x, auth) (x && ((auth && x <= AUTH) || (!auth && x < AUTH)))
1101 bool auth
= lock
->get_parent()->is_auth();
1102 if (!lock
->is_gathering() &&
1103 (IS_TRUE_AND_LT_AUTH(lock
->get_sm()->states
[next
].can_rdlock
, auth
) || !lock
->is_rdlocked()) &&
1104 (IS_TRUE_AND_LT_AUTH(lock
->get_sm()->states
[next
].can_wrlock
, auth
) || !lock
->is_wrlocked()) &&
1105 (IS_TRUE_AND_LT_AUTH(lock
->get_sm()->states
[next
].can_xlock
, auth
) || !lock
->is_xlocked()) &&
1106 (IS_TRUE_AND_LT_AUTH(lock
->get_sm()->states
[next
].can_lease
, auth
) || !lock
->is_leased()) &&
1107 !(lock
->get_parent()->is_auth() && lock
->is_flushing()) && // i.e. wait for scatter_writebehind!
1108 (!caps
|| ((~lock
->gcaps_allowed(CAP_ANY
, next
) & other_issued
) == 0 &&
1109 (~lock
->gcaps_allowed(CAP_LONER
, next
) & loner_issued
) == 0 &&
1110 (~lock
->gcaps_allowed(CAP_XLOCKER
, next
) & xlocker_issued
) == 0)) &&
1111 lock
->get_state() != LOCK_SYNC_MIX2
&& // these states need an explicit trigger from the auth mds
1112 lock
->get_state() != LOCK_MIX_SYNC2
1114 dout(7) << "eval_gather finished gather on " << *lock
1115 << " on " << *lock
->get_parent() << dendl
;
1117 if (lock
->get_sm() == &sm_filelock
) {
1119 if (in
->state_test(CInode::STATE_RECOVERING
)) {
1120 dout(7) << "eval_gather finished gather, but still recovering" << dendl
;
1122 } else if (in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
1123 dout(7) << "eval_gather finished gather, but need to recover" << dendl
;
1124 mds
->mdcache
->queue_file_recover(in
);
1125 mds
->mdcache
->do_file_recover();
1130 if (!lock
->get_parent()->is_auth()) {
1131 // replica: tell auth
1132 mds_rank_t auth
= lock
->get_parent()->authority().first
;
1134 if (lock
->get_parent()->is_rejoining() &&
1135 mds
->mdsmap
->get_state(auth
) == MDSMap::STATE_REJOIN
) {
1136 dout(7) << "eval_gather finished gather, but still rejoining "
1137 << *lock
->get_parent() << dendl
;
1141 if (!mds
->is_cluster_degraded() ||
1142 mds
->mdsmap
->get_state(auth
) >= MDSMap::STATE_REJOIN
) {
1143 switch (lock
->get_state()) {
1144 case LOCK_SYNC_LOCK
:
1145 mds
->send_message_mds(make_message
<MLock
>(lock
, LOCK_AC_LOCKACK
, mds
->get_nodeid()), auth
);
1150 auto reply
= make_message
<MLock
>(lock
, LOCK_AC_SYNCACK
, mds
->get_nodeid());
1151 lock
->encode_locked_state(reply
->get_data());
1152 mds
->send_message_mds(reply
, auth
);
1153 next
= LOCK_MIX_SYNC2
;
1154 (static_cast<ScatterLock
*>(lock
))->start_flush();
1158 case LOCK_MIX_SYNC2
:
1159 (static_cast<ScatterLock
*>(lock
))->finish_flush();
1160 (static_cast<ScatterLock
*>(lock
))->clear_flushed();
1162 case LOCK_SYNC_MIX2
:
1163 // do nothing, we already acked
1168 auto reply
= make_message
<MLock
>(lock
, LOCK_AC_MIXACK
, mds
->get_nodeid());
1169 mds
->send_message_mds(reply
, auth
);
1170 next
= LOCK_SYNC_MIX2
;
1177 lock
->encode_locked_state(data
);
1178 mds
->send_message_mds(make_message
<MLock
>(lock
, LOCK_AC_LOCKACK
, mds
->get_nodeid(), data
), auth
);
1179 (static_cast<ScatterLock
*>(lock
))->start_flush();
1180 // we'll get an AC_LOCKFLUSHED to complete
1191 // once the first (local) stage of mix->lock gather complete we can
1192 // gather from replicas
1193 if (lock
->get_state() == LOCK_MIX_LOCK
&&
1194 lock
->get_parent()->is_replicated()) {
1195 dout(10) << " finished (local) gather for mix->lock, now gathering from replicas" << dendl
;
1196 send_lock_message(lock
, LOCK_AC_LOCK
);
1197 lock
->init_gather();
1198 lock
->set_state(LOCK_MIX_LOCK2
);
1202 if (lock
->is_dirty() && !lock
->is_flushed()) {
1203 scatter_writebehind(static_cast<ScatterLock
*>(lock
));
1206 lock
->clear_flushed();
1208 switch (lock
->get_state()) {
1214 in
->start_scatter(static_cast<ScatterLock
*>(lock
));
1215 if (lock
->get_parent()->is_replicated()) {
1216 bufferlist softdata
;
1217 lock
->encode_locked_state(softdata
);
1218 send_lock_message(lock
, LOCK_AC_MIX
, softdata
);
1220 (static_cast<ScatterLock
*>(lock
))->clear_scatter_wanted();
1224 case LOCK_XLOCKDONE
:
1225 if (next
!= LOCK_SYNC
)
1230 case LOCK_EXCL_SYNC
:
1231 case LOCK_LOCK_SYNC
:
1233 case LOCK_XSYN_SYNC
:
1234 if (lock
->get_parent()->is_replicated()) {
1235 bufferlist softdata
;
1236 lock
->encode_locked_state(softdata
);
1237 send_lock_message(lock
, LOCK_AC_SYNC
, softdata
);
1244 lock
->set_state(next
);
1246 if (lock
->get_parent()->is_auth() &&
1248 lock
->get_parent()->auth_unpin(lock
);
1250 // drop loner before doing waiters
1254 in
->get_wanted_loner() != in
->get_loner()) {
1255 dout(10) << " trying to drop loner" << dendl
;
1256 if (in
->try_drop_loner()) {
1257 dout(10) << " dropped loner" << dendl
;
1263 lock
->take_waiting(SimpleLock::WAIT_STABLE
|SimpleLock::WAIT_WR
|SimpleLock::WAIT_RD
|SimpleLock::WAIT_XLOCK
,
1266 lock
->finish_waiters(SimpleLock::WAIT_STABLE
|SimpleLock::WAIT_WR
|SimpleLock::WAIT_RD
|SimpleLock::WAIT_XLOCK
);
1268 if (caps
&& in
->is_head())
1271 if (lock
->get_parent()->is_auth() &&
1273 try_eval(lock
, &need_issue
);
1278 *pneed_issue
= true;
1279 else if (in
->is_head())
1285 bool Locker::eval(CInode
*in
, int mask
, bool caps_imported
)
1287 bool need_issue
= caps_imported
;
1288 MDSContext::vec finishers
;
1290 dout(10) << "eval " << mask
<< " " << *in
<< dendl
;
1293 if (in
->is_auth() && in
->is_head()) {
1294 client_t orig_loner
= in
->get_loner();
1295 if (in
->choose_ideal_loner()) {
1296 dout(10) << "eval set loner: client." << orig_loner
<< " -> client." << in
->get_loner() << dendl
;
1299 } else if (in
->get_wanted_loner() != in
->get_loner()) {
1300 dout(10) << "eval want loner: client." << in
->get_wanted_loner() << " but failed to set it" << dendl
;
1306 if (mask
& CEPH_LOCK_IFILE
)
1307 eval_any(&in
->filelock
, &need_issue
, &finishers
, caps_imported
);
1308 if (mask
& CEPH_LOCK_IAUTH
)
1309 eval_any(&in
->authlock
, &need_issue
, &finishers
, caps_imported
);
1310 if (mask
& CEPH_LOCK_ILINK
)
1311 eval_any(&in
->linklock
, &need_issue
, &finishers
, caps_imported
);
1312 if (mask
& CEPH_LOCK_IXATTR
)
1313 eval_any(&in
->xattrlock
, &need_issue
, &finishers
, caps_imported
);
1314 if (mask
& CEPH_LOCK_INEST
)
1315 eval_any(&in
->nestlock
, &need_issue
, &finishers
, caps_imported
);
1316 if (mask
& CEPH_LOCK_IFLOCK
)
1317 eval_any(&in
->flocklock
, &need_issue
, &finishers
, caps_imported
);
1318 if (mask
& CEPH_LOCK_IPOLICY
)
1319 eval_any(&in
->policylock
, &need_issue
, &finishers
, caps_imported
);
1322 if (in
->is_auth() && in
->is_head() && in
->get_wanted_loner() != in
->get_loner()) {
1323 if (in
->try_drop_loner()) {
1325 if (in
->get_wanted_loner() >= 0) {
1326 dout(10) << "eval end set loner to client." << in
->get_wanted_loner() << dendl
;
1327 bool ok
= in
->try_set_loner();
1335 finish_contexts(g_ceph_context
, finishers
);
1337 if (need_issue
&& in
->is_head())
1340 dout(10) << "eval done" << dendl
;
1344 class C_Locker_Eval
: public LockerContext
{
1348 C_Locker_Eval(Locker
*l
, MDSCacheObject
*pp
, int m
) : LockerContext(l
), p(pp
), mask(m
) {
1349 // We are used as an MDSCacheObject waiter, so should
1350 // only be invoked by someone already holding the big lock.
1351 ceph_assert(ceph_mutex_is_locked_by_me(locker
->mds
->mds_lock
));
1352 p
->get(MDSCacheObject::PIN_PTRWAITER
);
1354 void finish(int r
) override
{
1355 locker
->try_eval(p
, mask
);
1356 p
->put(MDSCacheObject::PIN_PTRWAITER
);
1360 void Locker::try_eval(MDSCacheObject
*p
, int mask
)
1362 // unstable and ambiguous auth?
1363 if (p
->is_ambiguous_auth()) {
1364 dout(7) << "try_eval ambiguous auth, waiting on " << *p
<< dendl
;
1365 p
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
, new C_Locker_Eval(this, p
, mask
));
1369 if (p
->is_auth() && p
->is_frozen()) {
1370 dout(7) << "try_eval frozen, waiting on " << *p
<< dendl
;
1371 p
->add_waiter(MDSCacheObject::WAIT_UNFREEZE
, new C_Locker_Eval(this, p
, mask
));
1375 if (mask
& CEPH_LOCK_DN
) {
1376 ceph_assert(mask
== CEPH_LOCK_DN
);
1377 bool need_issue
= false; // ignore this, no caps on dentries
1378 CDentry
*dn
= static_cast<CDentry
*>(p
);
1379 eval_any(&dn
->lock
, &need_issue
);
1381 CInode
*in
= static_cast<CInode
*>(p
);
1386 void Locker::try_eval(SimpleLock
*lock
, bool *pneed_issue
)
1388 MDSCacheObject
*p
= lock
->get_parent();
1390 // unstable and ambiguous auth?
1391 if (p
->is_ambiguous_auth()) {
1392 dout(7) << "try_eval " << *lock
<< " ambiguousauth, waiting on " << *p
<< dendl
;
1393 p
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
, new C_Locker_Eval(this, p
, lock
->get_type()));
1397 if (!p
->is_auth()) {
1398 dout(7) << "try_eval " << *lock
<< " not auth for " << *p
<< dendl
;
1402 if (p
->is_frozen()) {
1403 dout(7) << "try_eval " << *lock
<< " frozen, waiting on " << *p
<< dendl
;
1404 p
->add_waiter(MDSCacheObject::WAIT_UNFREEZE
, new C_Locker_Eval(this, p
, lock
->get_type()));
1409 * We could have a situation like:
1411 * - mds A authpins item on mds B
1412 * - mds B starts to freeze tree containing item
1413 * - mds A tries wrlock_start on A, sends REQSCATTER to B
1414 * - mds B lock is unstable, sets scatter_wanted
1415 * - mds B lock stabilizes, calls try_eval.
1417 * We can defer while freezing without causing a deadlock. Honor
1418 * scatter_wanted flag here. This will never get deferred by the
1419 * checks above due to the auth_pin held by the leader.
1421 if (lock
->is_scatterlock()) {
1422 ScatterLock
*slock
= static_cast<ScatterLock
*>(lock
);
1423 if (slock
->get_scatter_wanted() &&
1424 slock
->get_state() != LOCK_MIX
) {
1425 scatter_mix(slock
, pneed_issue
);
1426 if (!lock
->is_stable())
1428 } else if (slock
->get_unscatter_wanted() &&
1429 slock
->get_state() != LOCK_LOCK
) {
1430 simple_lock(slock
, pneed_issue
);
1431 if (!lock
->is_stable()) {
1437 if (lock
->get_type() != CEPH_LOCK_DN
&&
1438 lock
->get_type() != CEPH_LOCK_ISNAP
&&
1439 lock
->get_type() != CEPH_LOCK_IPOLICY
&&
1441 dout(7) << "try_eval " << *lock
<< " freezing, waiting on " << *p
<< dendl
;
1442 p
->add_waiter(MDSCacheObject::WAIT_UNFREEZE
, new C_Locker_Eval(this, p
, lock
->get_type()));
1446 eval(lock
, pneed_issue
);
1449 void Locker::eval_cap_gather(CInode
*in
, set
<CInode
*> *issue_set
)
1451 bool need_issue
= false;
1452 MDSContext::vec finishers
;
1455 if (!in
->filelock
.is_stable())
1456 eval_gather(&in
->filelock
, false, &need_issue
, &finishers
);
1457 if (!in
->authlock
.is_stable())
1458 eval_gather(&in
->authlock
, false, &need_issue
, &finishers
);
1459 if (!in
->linklock
.is_stable())
1460 eval_gather(&in
->linklock
, false, &need_issue
, &finishers
);
1461 if (!in
->xattrlock
.is_stable())
1462 eval_gather(&in
->xattrlock
, false, &need_issue
, &finishers
);
1464 if (need_issue
&& in
->is_head()) {
1466 issue_set
->insert(in
);
1471 finish_contexts(g_ceph_context
, finishers
);
1474 void Locker::eval_scatter_gathers(CInode
*in
)
1476 bool need_issue
= false;
1477 MDSContext::vec finishers
;
1479 dout(10) << "eval_scatter_gathers " << *in
<< dendl
;
1482 if (!in
->filelock
.is_stable())
1483 eval_gather(&in
->filelock
, false, &need_issue
, &finishers
);
1484 if (!in
->nestlock
.is_stable())
1485 eval_gather(&in
->nestlock
, false, &need_issue
, &finishers
);
1486 if (!in
->dirfragtreelock
.is_stable())
1487 eval_gather(&in
->dirfragtreelock
, false, &need_issue
, &finishers
);
1489 if (need_issue
&& in
->is_head())
1492 finish_contexts(g_ceph_context
, finishers
);
1495 void Locker::eval(SimpleLock
*lock
, bool *need_issue
)
1497 switch (lock
->get_type()) {
1498 case CEPH_LOCK_IFILE
:
1499 return file_eval(static_cast<ScatterLock
*>(lock
), need_issue
);
1500 case CEPH_LOCK_IDFT
:
1501 case CEPH_LOCK_INEST
:
1502 return scatter_eval(static_cast<ScatterLock
*>(lock
), need_issue
);
1504 return simple_eval(lock
, need_issue
);
1509 // ------------------
1512 bool Locker::_rdlock_kick(SimpleLock
*lock
, bool as_anon
)
1515 if (lock
->is_stable()) {
1516 if (lock
->get_parent()->is_auth()) {
1517 if (lock
->get_sm() == &sm_scatterlock
) {
1518 // not until tempsync is fully implemented
1519 //if (lock->get_parent()->is_replicated())
1520 //scatter_tempsync((ScatterLock*)lock);
1523 } else if (lock
->get_sm() == &sm_filelock
) {
1524 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1525 if (lock
->get_state() == LOCK_EXCL
&&
1526 in
->get_target_loner() >= 0 &&
1527 !in
->is_dir() && !as_anon
) // as_anon => caller wants SYNC, not XSYN
1535 // request rdlock state change from auth
1536 mds_rank_t auth
= lock
->get_parent()->authority().first
;
1537 if (!mds
->is_cluster_degraded() ||
1538 mds
->mdsmap
->is_clientreplay_or_active_or_stopping(auth
)) {
1539 dout(10) << "requesting rdlock from auth on "
1540 << *lock
<< " on " << *lock
->get_parent() << dendl
;
1541 mds
->send_message_mds(make_message
<MLock
>(lock
, LOCK_AC_REQRDLOCK
, mds
->get_nodeid()), auth
);
1546 if (lock
->get_type() == CEPH_LOCK_IFILE
) {
1547 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1548 if (in
->state_test(CInode::STATE_RECOVERING
)) {
1549 mds
->mdcache
->recovery_queue
.prioritize(in
);
1556 bool Locker::rdlock_try(SimpleLock
*lock
, client_t client
)
1558 dout(7) << "rdlock_try on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1560 // can read? grab ref.
1561 if (lock
->can_rdlock(client
))
1564 _rdlock_kick(lock
, false);
1566 if (lock
->can_rdlock(client
))
1572 bool Locker::rdlock_start(SimpleLock
*lock
, MDRequestRef
& mut
, bool as_anon
)
1574 dout(7) << "rdlock_start on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1576 // client may be allowed to rdlock the same item it has xlocked.
1577 // UNLESS someone passes in as_anon, or we're reading snapped version here.
1578 if (mut
->snapid
!= CEPH_NOSNAP
)
1580 client_t client
= as_anon
? -1 : mut
->get_client();
1583 if (lock
->get_type() != CEPH_LOCK_DN
)
1584 in
= static_cast<CInode
*>(lock
->get_parent());
1587 if (!lock->get_parent()->is_auth() &&
1588 lock->fw_rdlock_to_auth()) {
1589 mdcache->request_forward(mut, lock->get_parent()->authority().first);
1595 // can read? grab ref.
1596 if (lock
->can_rdlock(client
)) {
1598 mut
->emplace_lock(lock
, MutationImpl::LockOp::RDLOCK
);
1602 // hmm, wait a second.
1603 if (in
&& !in
->is_head() && in
->is_auth() &&
1604 lock
->get_state() == LOCK_SNAP_SYNC
) {
1605 // okay, we actually need to kick the head's lock to get ourselves synced up.
1606 CInode
*head
= mdcache
->get_inode(in
->ino());
1608 SimpleLock
*hlock
= head
->get_lock(CEPH_LOCK_IFILE
);
1609 if (hlock
->get_state() == LOCK_SYNC
)
1610 hlock
= head
->get_lock(lock
->get_type());
1612 if (hlock
->get_state() != LOCK_SYNC
) {
1613 dout(10) << "rdlock_start trying head inode " << *head
<< dendl
;
1614 if (!rdlock_start(hlock
, mut
, true)) // ** as_anon, no rdlock on EXCL **
1616 // oh, check our lock again then
1620 if (!_rdlock_kick(lock
, as_anon
))
1626 if (lock
->get_parent()->is_auth() && lock
->is_stable())
1627 wait_on
= SimpleLock::WAIT_RD
;
1629 wait_on
= SimpleLock::WAIT_STABLE
; // REQRDLOCK is ignored if lock is unstable, so we need to retry.
1630 dout(7) << "rdlock_start waiting on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1631 lock
->add_waiter(wait_on
, new C_MDS_RetryRequest(mdcache
, mut
));
1636 void Locker::nudge_log(SimpleLock
*lock
)
1638 dout(10) << "nudge_log " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1639 if (lock
->get_parent()->is_auth() && lock
->is_unstable_and_locked()) // as with xlockdone, or cap flush
1640 mds
->mdlog
->flush();
1643 void Locker::rdlock_finish(const MutationImpl::lock_iterator
& it
, MutationImpl
*mut
, bool *pneed_issue
)
1645 ceph_assert(it
->is_rdlock());
1646 SimpleLock
*lock
= it
->lock
;
1650 mut
->locks
.erase(it
);
1652 dout(7) << "rdlock_finish on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1655 if (!lock
->is_rdlocked()) {
1656 if (!lock
->is_stable())
1657 eval_gather(lock
, false, pneed_issue
);
1658 else if (lock
->get_parent()->is_auth())
1659 try_eval(lock
, pneed_issue
);
1663 bool Locker::rdlock_try_set(MutationImpl::LockOpVec
& lov
, MDRequestRef
& mdr
)
1665 dout(10) << __func__
<< dendl
;
1666 for (const auto& p
: lov
) {
1668 ceph_assert(p
.is_rdlock());
1669 if (!mdr
->is_rdlocked(lock
) && !rdlock_try(lock
, mdr
->get_client())) {
1670 lock
->add_waiter(SimpleLock::WAIT_STABLE
|SimpleLock::WAIT_RD
,
1671 new C_MDS_RetryRequest(mdcache
, mdr
));
1675 mdr
->emplace_lock(lock
, MutationImpl::LockOp::RDLOCK
);
1676 dout(20) << " got rdlock on " << *lock
<< " " << *lock
->get_parent() << dendl
;
1681 dout(10) << __func__
<< " failed" << dendl
;
1682 drop_locks(mdr
.get(), nullptr);
1683 mdr
->drop_local_auth_pins();
1687 bool Locker::rdlock_try_set(MutationImpl::LockOpVec
& lov
, MutationRef
& mut
)
1689 dout(10) << __func__
<< dendl
;
1690 for (const auto& p
: lov
) {
1692 ceph_assert(p
.is_rdlock());
1693 if (!lock
->can_rdlock(mut
->get_client()))
1695 p
.lock
->get_rdlock();
1696 mut
->emplace_lock(p
.lock
, MutationImpl::LockOp::RDLOCK
);
1701 // ------------------
1704 void Locker::wrlock_force(SimpleLock
*lock
, MutationRef
& mut
)
1706 if (lock
->get_type() == CEPH_LOCK_IVERSION
||
1707 lock
->get_type() == CEPH_LOCK_DVERSION
)
1708 return local_wrlock_grab(static_cast<LocalLockC
*>(lock
), mut
);
1710 dout(7) << "wrlock_force on " << *lock
1711 << " on " << *lock
->get_parent() << dendl
;
1712 lock
->get_wrlock(true);
1713 mut
->emplace_lock(lock
, MutationImpl::LockOp::WRLOCK
);
1716 bool Locker::wrlock_try(SimpleLock
*lock
, const MutationRef
& mut
, client_t client
)
1718 dout(10) << "wrlock_try " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1720 client
= mut
->get_client();
1723 if (lock
->can_wrlock(client
)) {
1725 auto it
= mut
->emplace_lock(lock
, MutationImpl::LockOp::WRLOCK
);
1726 it
->flags
|= MutationImpl::LockOp::WRLOCK
; // may already remote_wrlocked
1729 if (!lock
->is_stable())
1731 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1734 // caller may already has a log entry open. To avoid calling
1735 // scatter_writebehind or start_scatter. don't change nest lock
1736 // state if it has dirty scatterdata.
1737 if (lock
->is_dirty())
1739 // To avoid calling scatter_writebehind or start_scatter. don't
1740 // change nest lock state to MIX.
1741 ScatterLock
*slock
= static_cast<ScatterLock
*>(lock
);
1742 if (slock
->get_scatter_wanted() || in
->has_subtree_or_exporting_dirfrag())
1750 bool Locker::wrlock_start(const MutationImpl::LockOp
&op
, MDRequestRef
& mut
)
1752 SimpleLock
*lock
= op
.lock
;
1753 if (lock
->get_type() == CEPH_LOCK_IVERSION
||
1754 lock
->get_type() == CEPH_LOCK_DVERSION
)
1755 return local_wrlock_start(static_cast<LocalLockC
*>(lock
), mut
);
1757 dout(10) << "wrlock_start " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1759 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1760 client_t client
= op
.is_state_pin() ? lock
->get_excl_client() : mut
->get_client();
1761 bool want_scatter
= lock
->get_parent()->is_auth() &&
1762 (in
->has_subtree_or_exporting_dirfrag() ||
1763 static_cast<ScatterLock
*>(lock
)->get_scatter_wanted());
1767 if (lock
->can_wrlock(client
) &&
1768 (!want_scatter
|| lock
->get_state() == LOCK_MIX
)) {
1770 auto it
= mut
->emplace_lock(lock
, MutationImpl::LockOp::WRLOCK
);
1771 it
->flags
|= MutationImpl::LockOp::WRLOCK
; // may already remote_wrlocked
1775 if (lock
->get_type() == CEPH_LOCK_IFILE
&&
1776 in
->state_test(CInode::STATE_RECOVERING
)) {
1777 mds
->mdcache
->recovery_queue
.prioritize(in
);
1780 if (!lock
->is_stable())
1783 if (in
->is_auth()) {
1785 scatter_mix(static_cast<ScatterLock
*>(lock
));
1790 // auth should be auth_pinned (see acquire_locks wrlock weird mustpin case).
1791 mds_rank_t auth
= lock
->get_parent()->authority().first
;
1792 if (!mds
->is_cluster_degraded() ||
1793 mds
->mdsmap
->is_clientreplay_or_active_or_stopping(auth
)) {
1794 dout(10) << "requesting scatter from auth on "
1795 << *lock
<< " on " << *lock
->get_parent() << dendl
;
1796 mds
->send_message_mds(make_message
<MLock
>(lock
, LOCK_AC_REQSCATTER
, mds
->get_nodeid()), auth
);
1802 dout(7) << "wrlock_start waiting on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1803 lock
->add_waiter(SimpleLock::WAIT_STABLE
, new C_MDS_RetryRequest(mdcache
, mut
));
1809 void Locker::wrlock_finish(const MutationImpl::lock_iterator
& it
, MutationImpl
*mut
, bool *pneed_issue
)
1811 ceph_assert(it
->is_wrlock());
1812 SimpleLock
* lock
= it
->lock
;
1814 if (lock
->get_type() == CEPH_LOCK_IVERSION
||
1815 lock
->get_type() == CEPH_LOCK_DVERSION
)
1816 return local_wrlock_finish(it
, mut
);
1818 dout(7) << "wrlock_finish on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1821 if (it
->is_remote_wrlock())
1824 mut
->locks
.erase(it
);
1826 if (lock
->is_wrlocked()) {
1827 // Evaluate unstable lock after scatter_writebehind_finish(). Because
1828 // eval_gather() does not change lock's state when lock is flushing.
1829 if (!lock
->is_stable() && lock
->is_flushed() &&
1830 lock
->get_parent()->is_auth())
1831 eval_gather(lock
, false, pneed_issue
);
1833 if (!lock
->is_stable())
1834 eval_gather(lock
, false, pneed_issue
);
1835 else if (lock
->get_parent()->is_auth())
1836 try_eval(lock
, pneed_issue
);
1843 void Locker::remote_wrlock_start(SimpleLock
*lock
, mds_rank_t target
, MDRequestRef
& mut
)
1845 dout(7) << "remote_wrlock_start mds." << target
<< " on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1847 // wait for active target
1848 if (mds
->is_cluster_degraded() &&
1849 !mds
->mdsmap
->is_clientreplay_or_active_or_stopping(target
)) {
1850 dout(7) << " mds." << target
<< " is not active" << dendl
;
1851 if (mut
->more()->waiting_on_peer
.empty())
1852 mds
->wait_for_active_peer(target
, new C_MDS_RetryRequest(mdcache
, mut
));
1856 // send lock request
1857 mut
->start_locking(lock
, target
);
1858 mut
->more()->peers
.insert(target
);
1859 auto r
= make_message
<MMDSPeerRequest
>(mut
->reqid
, mut
->attempt
, MMDSPeerRequest::OP_WRLOCK
);
1860 r
->set_lock_type(lock
->get_type());
1861 lock
->get_parent()->set_object_info(r
->get_object_info());
1862 mds
->send_message_mds(r
, target
);
1864 ceph_assert(mut
->more()->waiting_on_peer
.count(target
) == 0);
1865 mut
->more()->waiting_on_peer
.insert(target
);
1868 void Locker::remote_wrlock_finish(const MutationImpl::lock_iterator
& it
, MutationImpl
*mut
)
1870 ceph_assert(it
->is_remote_wrlock());
1871 SimpleLock
*lock
= it
->lock
;
1872 mds_rank_t target
= it
->wrlock_target
;
1874 if (it
->is_wrlock())
1875 it
->clear_remote_wrlock();
1877 mut
->locks
.erase(it
);
1879 dout(7) << "remote_wrlock_finish releasing remote wrlock on mds." << target
1880 << " " << *lock
->get_parent() << dendl
;
1881 if (!mds
->is_cluster_degraded() ||
1882 mds
->mdsmap
->get_state(target
) >= MDSMap::STATE_REJOIN
) {
1883 auto peerreq
= make_message
<MMDSPeerRequest
>(mut
->reqid
, mut
->attempt
, MMDSPeerRequest::OP_UNWRLOCK
);
1884 peerreq
->set_lock_type(lock
->get_type());
1885 lock
->get_parent()->set_object_info(peerreq
->get_object_info());
1886 mds
->send_message_mds(peerreq
, target
);
1891 // ------------------
1894 bool Locker::xlock_start(SimpleLock
*lock
, MDRequestRef
& mut
)
1896 if (lock
->get_type() == CEPH_LOCK_IVERSION
||
1897 lock
->get_type() == CEPH_LOCK_DVERSION
)
1898 return local_xlock_start(static_cast<LocalLockC
*>(lock
), mut
);
1900 dout(7) << "xlock_start on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1901 client_t client
= mut
->get_client();
1903 CInode
*in
= nullptr;
1904 if (lock
->get_cap_shift())
1905 in
= static_cast<CInode
*>(lock
->get_parent());
1908 if (lock
->get_parent()->is_auth()) {
1911 if (mut
->locking
&& // started xlock (not preempt other request)
1912 lock
->can_xlock(client
) &&
1913 !(lock
->get_state() == LOCK_LOCK_XLOCK
&& // client is not xlocker or
1914 in
&& in
->issued_caps_need_gather(lock
))) { // xlocker does not hold shared cap
1915 lock
->set_state(LOCK_XLOCK
);
1916 lock
->get_xlock(mut
, client
);
1917 mut
->emplace_lock(lock
, MutationImpl::LockOp::XLOCK
);
1918 mut
->finish_locking(lock
);
1922 if (lock
->get_type() == CEPH_LOCK_IFILE
&&
1923 in
->state_test(CInode::STATE_RECOVERING
)) {
1924 mds
->mdcache
->recovery_queue
.prioritize(in
);
1927 if (!lock
->is_stable() && (lock
->get_state() != LOCK_XLOCKDONE
||
1928 lock
->get_xlock_by_client() != client
||
1929 lock
->is_waiter_for(SimpleLock::WAIT_STABLE
)))
1932 if (lock
->get_state() == LOCK_LOCK
|| lock
->get_state() == LOCK_XLOCKDONE
) {
1933 mut
->start_locking(lock
);
1940 lock
->add_waiter(SimpleLock::WAIT_WR
|SimpleLock::WAIT_STABLE
, new C_MDS_RetryRequest(mdcache
, mut
));
1945 ceph_assert(lock
->get_sm()->can_remote_xlock
);
1946 ceph_assert(!mut
->peer_request
);
1948 // wait for single auth
1949 if (lock
->get_parent()->is_ambiguous_auth()) {
1950 lock
->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
,
1951 new C_MDS_RetryRequest(mdcache
, mut
));
1955 // wait for active auth
1956 mds_rank_t auth
= lock
->get_parent()->authority().first
;
1957 if (mds
->is_cluster_degraded() &&
1958 !mds
->mdsmap
->is_clientreplay_or_active_or_stopping(auth
)) {
1959 dout(7) << " mds." << auth
<< " is not active" << dendl
;
1960 if (mut
->more()->waiting_on_peer
.empty())
1961 mds
->wait_for_active_peer(auth
, new C_MDS_RetryRequest(mdcache
, mut
));
1965 // send lock request
1966 mut
->more()->peers
.insert(auth
);
1967 mut
->start_locking(lock
, auth
);
1968 auto r
= make_message
<MMDSPeerRequest
>(mut
->reqid
, mut
->attempt
, MMDSPeerRequest::OP_XLOCK
);
1969 r
->set_lock_type(lock
->get_type());
1970 lock
->get_parent()->set_object_info(r
->get_object_info());
1971 mds
->send_message_mds(r
, auth
);
1973 ceph_assert(mut
->more()->waiting_on_peer
.count(auth
) == 0);
1974 mut
->more()->waiting_on_peer
.insert(auth
);
1980 void Locker::_finish_xlock(SimpleLock
*lock
, client_t xlocker
, bool *pneed_issue
)
1982 ceph_assert(!lock
->is_stable());
1983 if (lock
->get_type() != CEPH_LOCK_DN
&&
1984 lock
->get_type() != CEPH_LOCK_ISNAP
&&
1985 lock
->get_type() != CEPH_LOCK_IPOLICY
&&
1986 lock
->get_num_rdlocks() == 0 &&
1987 lock
->get_num_wrlocks() == 0 &&
1988 !lock
->is_leased() &&
1989 lock
->get_state() != LOCK_XLOCKSNAP
) {
1990 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1991 client_t loner
= in
->get_target_loner();
1992 if (loner
>= 0 && (xlocker
< 0 || xlocker
== loner
)) {
1993 lock
->set_state(LOCK_EXCL
);
1994 lock
->get_parent()->auth_unpin(lock
);
1995 lock
->finish_waiters(SimpleLock::WAIT_STABLE
|SimpleLock::WAIT_WR
|SimpleLock::WAIT_RD
);
1996 if (lock
->get_cap_shift())
1997 *pneed_issue
= true;
1998 if (lock
->get_parent()->is_auth() &&
2000 try_eval(lock
, pneed_issue
);
2004 // the xlocker may have CEPH_CAP_GSHARED, need to revoke it if next state is LOCK_LOCK
2005 eval_gather(lock
, lock
->get_state() != LOCK_XLOCKSNAP
, pneed_issue
);
2008 void Locker::xlock_finish(const MutationImpl::lock_iterator
& it
, MutationImpl
*mut
, bool *pneed_issue
)
2010 ceph_assert(it
->is_xlock());
2011 SimpleLock
*lock
= it
->lock
;
2013 if (lock
->get_type() == CEPH_LOCK_IVERSION
||
2014 lock
->get_type() == CEPH_LOCK_DVERSION
)
2015 return local_xlock_finish(it
, mut
);
2017 dout(10) << "xlock_finish on " << *lock
<< " " << *lock
->get_parent() << dendl
;
2019 client_t xlocker
= lock
->get_xlock_by_client();
2024 mut
->locks
.erase(it
);
2026 bool do_issue
= false;
2029 if (!lock
->get_parent()->is_auth()) {
2030 ceph_assert(lock
->get_sm()->can_remote_xlock
);
2033 dout(7) << "xlock_finish releasing remote xlock on " << *lock
->get_parent() << dendl
;
2034 mds_rank_t auth
= lock
->get_parent()->authority().first
;
2035 if (!mds
->is_cluster_degraded() ||
2036 mds
->mdsmap
->get_state(auth
) >= MDSMap::STATE_REJOIN
) {
2037 auto peerreq
= make_message
<MMDSPeerRequest
>(mut
->reqid
, mut
->attempt
, MMDSPeerRequest::OP_UNXLOCK
);
2038 peerreq
->set_lock_type(lock
->get_type());
2039 lock
->get_parent()->set_object_info(peerreq
->get_object_info());
2040 mds
->send_message_mds(peerreq
, auth
);
2043 lock
->finish_waiters(SimpleLock::WAIT_STABLE
|
2044 SimpleLock::WAIT_WR
|
2045 SimpleLock::WAIT_RD
, 0);
2047 if (lock
->get_num_xlocks() == 0 &&
2048 lock
->get_state() != LOCK_LOCK_XLOCK
) { // no one is taking xlock
2049 _finish_xlock(lock
, xlocker
, &do_issue
);
2054 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
2055 if (in
->is_head()) {
2057 *pneed_issue
= true;
2064 void Locker::xlock_export(const MutationImpl::lock_iterator
& it
, MutationImpl
*mut
)
2066 ceph_assert(it
->is_xlock());
2067 SimpleLock
*lock
= it
->lock
;
2068 dout(10) << "xlock_export on " << *lock
<< " " << *lock
->get_parent() << dendl
;
2071 mut
->locks
.erase(it
);
2073 MDSCacheObject
*p
= lock
->get_parent();
2074 ceph_assert(p
->state_test(CInode::STATE_AMBIGUOUSAUTH
)); // we are exporting this (inode)
2076 if (!lock
->is_stable())
2077 lock
->get_parent()->auth_unpin(lock
);
2079 lock
->set_state(LOCK_LOCK
);
2082 void Locker::xlock_import(SimpleLock
*lock
)
2084 dout(10) << "xlock_import on " << *lock
<< " " << *lock
->get_parent() << dendl
;
2085 lock
->get_parent()->auth_pin(lock
);
2088 void Locker::xlock_downgrade(SimpleLock
*lock
, MutationImpl
*mut
)
2090 dout(10) << "xlock_downgrade on " << *lock
<< " " << *lock
->get_parent() << dendl
;
2091 auto it
= mut
->locks
.find(lock
);
2092 if (it
->is_rdlock())
2093 return; // already downgraded
2095 ceph_assert(lock
->get_parent()->is_auth());
2096 ceph_assert(it
!= mut
->locks
.end());
2097 ceph_assert(it
->is_xlock());
2099 lock
->set_xlock_done();
2101 xlock_finish(it
, mut
, nullptr);
2102 mut
->emplace_lock(lock
, MutationImpl::LockOp::RDLOCK
);
2106 // file i/o -----------------------------------------
2108 version_t
Locker::issue_file_data_version(CInode
*in
)
2110 dout(7) << "issue_file_data_version on " << *in
<< dendl
;
2111 return in
->get_inode()->file_data_version
;
2114 class C_Locker_FileUpdate_finish
: public LockerLogContext
{
2119 ref_t
<MClientCaps
> ack
;
2121 C_Locker_FileUpdate_finish(Locker
*l
, CInode
*i
, MutationRef
& m
, unsigned f
,
2122 const ref_t
<MClientCaps
> &ack
, client_t c
=-1)
2123 : LockerLogContext(l
), in(i
), mut(m
), flags(f
), client(c
), ack(ack
) {
2124 in
->get(CInode::PIN_PTRWAITER
);
2126 void finish(int r
) override
{
2127 locker
->file_update_finish(in
, mut
, flags
, client
, ack
);
2128 in
->put(CInode::PIN_PTRWAITER
);
2133 UPDATE_SHAREMAX
= 1,
2134 UPDATE_NEEDSISSUE
= 2,
2135 UPDATE_SNAPFLUSH
= 4,
2138 void Locker::file_update_finish(CInode
*in
, MutationRef
& mut
, unsigned flags
,
2139 client_t client
, const ref_t
<MClientCaps
> &ack
)
2141 dout(10) << "file_update_finish on " << *in
<< dendl
;
2146 Session
*session
= mds
->get_session(client
);
2147 if (session
&& !session
->is_closed()) {
2148 // "oldest flush tid" > 0 means client uses unique TID for each flush
2149 if (ack
->get_oldest_flush_tid() > 0)
2150 session
->add_completed_flush(ack
->get_client_tid());
2151 mds
->send_message_client_counted(ack
, session
);
2153 dout(10) << " no session for client." << client
<< " " << *ack
<< dendl
;
2157 set
<CInode
*> need_issue
;
2158 drop_locks(mut
.get(), &need_issue
);
2160 if (in
->is_head()) {
2161 if ((flags
& UPDATE_NEEDSISSUE
) && need_issue
.count(in
) == 0) {
2162 Capability
*cap
= in
->get_client_cap(client
);
2163 if (cap
&& (cap
->wanted() & ~cap
->pending()))
2164 issue_caps(in
, cap
);
2167 if ((flags
& UPDATE_SHAREMAX
) && in
->is_auth() &&
2168 (in
->filelock
.gcaps_allowed(CAP_LONER
) & (CEPH_CAP_GWR
|CEPH_CAP_GBUFFER
)))
2169 share_inode_max_size(in
);
2171 } else if ((flags
& UPDATE_SNAPFLUSH
) && !in
->client_snap_caps
.empty()) {
2172 dout(10) << " client_snap_caps " << in
->client_snap_caps
<< dendl
;
2173 // check for snap writeback completion
2174 in
->client_snap_caps
.erase(client
);
2175 if (in
->client_snap_caps
.empty()) {
2176 for (int i
= 0; i
< num_cinode_locks
; i
++) {
2177 SimpleLock
*lock
= in
->get_lock(cinode_lock_info
[i
].lock
);
2181 in
->item_open_file
.remove_myself();
2182 in
->item_caps
.remove_myself();
2183 eval_cap_gather(in
, &need_issue
);
2186 issue_caps_set(need_issue
);
2188 mds
->balancer
->hit_inode(in
, META_POP_IWR
);
2190 // auth unpin after issuing caps
2194 Capability
* Locker::issue_new_caps(CInode
*in
,
2199 dout(7) << "issue_new_caps for mode " << mode
<< " on " << *in
<< dendl
;
2200 Session
*session
= mdr
->session
;
2201 bool new_inode
= (mdr
->alloc_ino
|| mdr
->used_prealloc_ino
);
2203 // if replay or async, try to reconnect cap, and otherwise do nothing.
2204 if (new_inode
&& mdr
->client_request
->is_queued_for_replay())
2205 return mds
->mdcache
->try_reconnect_cap(in
, session
);
2208 ceph_assert(session
->info
.inst
.name
.is_client());
2209 client_t my_client
= session
->get_client();
2210 int my_want
= ceph_caps_for_mode(mode
);
2212 // register a capability
2213 Capability
*cap
= in
->get_client_cap(my_client
);
2216 cap
= in
->add_client_cap(my_client
, session
, realm
, new_inode
);
2217 cap
->set_wanted(my_want
);
2220 // make sure it wants sufficient caps
2221 if (my_want
& ~cap
->wanted()) {
2222 // augment wanted caps for this client
2223 cap
->set_wanted(cap
->wanted() | my_want
);
2226 cap
->inc_suppress(); // suppress file cap messages (we'll bundle with the request reply)
2228 if (in
->is_auth()) {
2229 // [auth] twiddle mode?
2230 eval(in
, CEPH_CAP_LOCKS
);
2232 int all_allowed
= -1, loner_allowed
= -1, xlocker_allowed
= -1;
2233 int allowed
= get_allowed_caps(in
, cap
, all_allowed
, loner_allowed
,
2236 if (_need_flush_mdlog(in
, my_want
& ~allowed
, true))
2237 mds
->mdlog
->flush();
2240 // [replica] tell auth about any new caps wanted
2241 request_inode_file_caps(in
);
2244 // issue caps (pot. incl new one)
2245 //issue_caps(in); // note: _eval above may have done this already...
2247 // re-issue whatever we can
2248 //cap->issue(cap->pending());
2250 cap
->dec_suppress();
2255 void Locker::issue_caps_set(set
<CInode
*>& inset
)
2257 for (set
<CInode
*>::iterator p
= inset
.begin(); p
!= inset
.end(); ++p
)
2261 class C_Locker_RevokeStaleCap
: public LockerContext
{
2265 C_Locker_RevokeStaleCap(Locker
*l
, CInode
*i
, client_t c
) :
2266 LockerContext(l
), in(i
), client(c
) {
2267 in
->get(CInode::PIN_PTRWAITER
);
2269 void finish(int r
) override
{
2270 locker
->revoke_stale_cap(in
, client
);
2271 in
->put(CInode::PIN_PTRWAITER
);
2275 int Locker::get_allowed_caps(CInode
*in
, Capability
*cap
,
2276 int &all_allowed
, int &loner_allowed
,
2277 int &xlocker_allowed
)
2279 client_t client
= cap
->get_client();
2281 // allowed caps are determined by the lock mode.
2282 if (all_allowed
== -1)
2283 all_allowed
= in
->get_caps_allowed_by_type(CAP_ANY
);
2284 if (loner_allowed
== -1)
2285 loner_allowed
= in
->get_caps_allowed_by_type(CAP_LONER
);
2286 if (xlocker_allowed
== -1)
2287 xlocker_allowed
= in
->get_caps_allowed_by_type(CAP_XLOCKER
);
2289 client_t loner
= in
->get_loner();
2291 dout(7) << "get_allowed_caps loner client." << loner
2292 << " allowed=" << ccap_string(loner_allowed
)
2293 << ", xlocker allowed=" << ccap_string(xlocker_allowed
)
2294 << ", others allowed=" << ccap_string(all_allowed
)
2295 << " on " << *in
<< dendl
;
2297 dout(7) << "get_allowed_caps allowed=" << ccap_string(all_allowed
)
2298 << ", xlocker allowed=" << ccap_string(xlocker_allowed
)
2299 << " on " << *in
<< dendl
;
2302 // do not issue _new_ bits when size|mtime is projected
2304 if (loner
== client
)
2305 allowed
= loner_allowed
;
2307 allowed
= all_allowed
;
2309 // add in any xlocker-only caps (for locks this client is the xlocker for)
2310 allowed
|= xlocker_allowed
& in
->get_xlocker_mask(client
);
2312 allowed
&= ~CEPH_CAP_ANY_DIR_OPS
;
2313 if (allowed
& CEPH_CAP_FILE_EXCL
)
2314 allowed
|= cap
->get_lock_cache_allowed();
2317 if ((in
->get_inode()->inline_data
.version
!= CEPH_INLINE_NONE
&&
2318 cap
->is_noinline()) ||
2319 (!in
->get_inode()->layout
.pool_ns
.empty() &&
2320 cap
->is_nopoolns()))
2321 allowed
&= ~(CEPH_CAP_FILE_RD
| CEPH_CAP_FILE_WR
);
2326 int Locker::issue_caps(CInode
*in
, Capability
*only_cap
)
2328 // count conflicts with
2330 int all_allowed
= -1, loner_allowed
= -1, xlocker_allowed
= -1;
2332 ceph_assert(in
->is_head());
2335 map
<client_t
, Capability
>::iterator it
;
2337 it
= in
->client_caps
.find(only_cap
->get_client());
2339 it
= in
->client_caps
.begin();
2340 for (; it
!= in
->client_caps
.end(); ++it
) {
2341 Capability
*cap
= &it
->second
;
2342 int allowed
= get_allowed_caps(in
, cap
, all_allowed
, loner_allowed
,
2344 int pending
= cap
->pending();
2345 int wanted
= cap
->wanted();
2347 dout(20) << " client." << it
->first
2348 << " pending " << ccap_string(pending
)
2349 << " allowed " << ccap_string(allowed
)
2350 << " wanted " << ccap_string(wanted
)
2353 if (!(pending
& ~allowed
)) {
2354 // skip if suppress or new, and not revocation
2355 if (cap
->is_new() || cap
->is_suppress() || cap
->is_stale()) {
2356 dout(20) << " !revoke and new|suppressed|stale, skipping client." << it
->first
<< dendl
;
2360 ceph_assert(!cap
->is_new());
2361 if (cap
->is_stale()) {
2362 dout(20) << " revoke stale cap from client." << it
->first
<< dendl
;
2363 ceph_assert(!cap
->is_valid());
2364 cap
->issue(allowed
& pending
, false);
2365 mds
->queue_waiter_front(new C_Locker_RevokeStaleCap(this, in
, it
->first
));
2369 if (!cap
->is_valid() && (pending
& ~CEPH_CAP_PIN
)) {
2370 // After stale->resume circle, client thinks it only has CEPH_CAP_PIN.
2371 // mds needs to re-issue caps, then do revocation.
2372 long seq
= cap
->issue(pending
, true);
2374 dout(7) << " sending MClientCaps to client." << it
->first
2375 << " seq " << seq
<< " re-issue " << ccap_string(pending
) << dendl
;
2377 if (mds
->logger
) mds
->logger
->inc(l_mdss_ceph_cap_op_grant
);
2379 auto m
= make_message
<MClientCaps
>(CEPH_CAP_OP_GRANT
, in
->ino(),
2380 in
->find_snaprealm()->inode
->ino(),
2381 cap
->get_cap_id(), cap
->get_last_seq(),
2382 pending
, wanted
, 0, cap
->get_mseq(),
2383 mds
->get_osd_epoch_barrier());
2384 in
->encode_cap_message(m
, cap
);
2386 mds
->send_message_client_counted(m
, cap
->get_session());
2390 // notify clients about deleted inode, to make sure they release caps ASAP.
2391 if (in
->get_inode()->nlink
== 0)
2392 wanted
|= CEPH_CAP_LINK_SHARED
;
2394 // are there caps that the client _wants_ and can have, but aren't pending?
2395 // or do we need to revoke?
2396 if ((pending
& ~allowed
) || // need to revoke ~allowed caps.
2397 ((wanted
& allowed
) & ~pending
) || // missing wanted+allowed caps
2398 !cap
->is_valid()) { // after stale->resume circle
2402 // include caps that clients generally like, while we're at it.
2403 int likes
= in
->get_caps_liked();
2404 int before
= pending
;
2406 if (pending
& ~allowed
)
2407 seq
= cap
->issue((wanted
|likes
) & allowed
& pending
, true); // if revoking, don't issue anything new.
2409 seq
= cap
->issue((wanted
|likes
) & allowed
, true);
2410 int after
= cap
->pending();
2412 dout(7) << " sending MClientCaps to client." << it
->first
2413 << " seq " << seq
<< " new pending " << ccap_string(after
)
2414 << " was " << ccap_string(before
) << dendl
;
2416 int op
= (before
& ~after
) ? CEPH_CAP_OP_REVOKE
: CEPH_CAP_OP_GRANT
;
2417 if (op
== CEPH_CAP_OP_REVOKE
) {
2418 if (mds
->logger
) mds
->logger
->inc(l_mdss_ceph_cap_op_revoke
);
2419 revoking_caps
.push_back(&cap
->item_revoking_caps
);
2420 revoking_caps_by_client
[cap
->get_client()].push_back(&cap
->item_client_revoking_caps
);
2421 cap
->set_last_revoke_stamp(ceph_clock_now());
2422 cap
->reset_num_revoke_warnings();
2424 if (mds
->logger
) mds
->logger
->inc(l_mdss_ceph_cap_op_grant
);
2427 auto m
= make_message
<MClientCaps
>(op
, in
->ino(),
2428 in
->find_snaprealm()->inode
->ino(),
2429 cap
->get_cap_id(), cap
->get_last_seq(),
2430 after
, wanted
, 0, cap
->get_mseq(),
2431 mds
->get_osd_epoch_barrier());
2432 in
->encode_cap_message(m
, cap
);
2434 mds
->send_message_client_counted(m
, cap
->get_session());
2444 void Locker::issue_truncate(CInode
*in
)
2446 dout(7) << "issue_truncate on " << *in
<< dendl
;
2448 for (auto &p
: in
->client_caps
) {
2449 if (mds
->logger
) mds
->logger
->inc(l_mdss_ceph_cap_op_trunc
);
2450 Capability
*cap
= &p
.second
;
2451 auto m
= make_message
<MClientCaps
>(CEPH_CAP_OP_TRUNC
,
2453 in
->find_snaprealm()->inode
->ino(),
2454 cap
->get_cap_id(), cap
->get_last_seq(),
2455 cap
->pending(), cap
->wanted(), 0,
2457 mds
->get_osd_epoch_barrier());
2458 in
->encode_cap_message(m
, cap
);
2459 mds
->send_message_client_counted(m
, p
.first
);
2462 // should we increase max_size?
2463 if (in
->is_auth() && in
->is_file())
2464 check_inode_max_size(in
);
2468 void Locker::revoke_stale_cap(CInode
*in
, client_t client
)
2470 dout(7) << __func__
<< " client." << client
<< " on " << *in
<< dendl
;
2471 Capability
*cap
= in
->get_client_cap(client
);
2475 if (cap
->revoking() & CEPH_CAP_ANY_WR
) {
2476 CachedStackStringStream css
;
2477 mds
->evict_client(client
.v
, false, g_conf()->mds_session_blocklist_on_timeout
, *css
, nullptr);
2483 if (in
->is_auth() && in
->get_inode()->client_ranges
.count(cap
->get_client()))
2484 in
->state_set(CInode::STATE_NEEDSRECOVER
);
2486 if (in
->state_test(CInode::STATE_EXPORTINGCAPS
))
2489 if (!in
->filelock
.is_stable())
2490 eval_gather(&in
->filelock
);
2491 if (!in
->linklock
.is_stable())
2492 eval_gather(&in
->linklock
);
2493 if (!in
->authlock
.is_stable())
2494 eval_gather(&in
->authlock
);
2495 if (!in
->xattrlock
.is_stable())
2496 eval_gather(&in
->xattrlock
);
2499 try_eval(in
, CEPH_CAP_LOCKS
);
2501 request_inode_file_caps(in
);
2504 bool Locker::revoke_stale_caps(Session
*session
)
2506 dout(10) << "revoke_stale_caps for " << session
->info
.inst
.name
<< dendl
;
2508 // invalidate all caps
2509 session
->inc_cap_gen();
2512 std::vector
<CInode
*> to_eval
;
2514 for (auto p
= session
->caps
.begin(); !p
.end(); ) {
2515 Capability
*cap
= *p
;
2517 if (!cap
->is_notable()) {
2518 // the rest ones are not being revoked and don't have writeable range
2519 // and don't want exclusive caps or want file read/write. They don't
2520 // need recover, they don't affect eval_gather()/try_eval()
2524 int revoking
= cap
->revoking();
2528 if (revoking
& CEPH_CAP_ANY_WR
) {
2533 int issued
= cap
->issued();
2534 CInode
*in
= cap
->get_inode();
2535 dout(10) << " revoking " << ccap_string(issued
) << " on " << *in
<< dendl
;
2536 int revoked
= cap
->revoke();
2537 if (revoked
& CEPH_CAP_ANY_DIR_OPS
)
2538 eval_lock_caches(cap
);
2540 if (in
->is_auth() &&
2541 in
->get_inode()->client_ranges
.count(cap
->get_client()))
2542 in
->state_set(CInode::STATE_NEEDSRECOVER
);
2544 // eval lock/inode may finish contexts, which may modify other cap's position
2545 // in the session->caps.
2546 to_eval
.push_back(in
);
2549 for (auto in
: to_eval
) {
2550 if (in
->state_test(CInode::STATE_EXPORTINGCAPS
))
2553 if (!in
->filelock
.is_stable())
2554 eval_gather(&in
->filelock
);
2555 if (!in
->linklock
.is_stable())
2556 eval_gather(&in
->linklock
);
2557 if (!in
->authlock
.is_stable())
2558 eval_gather(&in
->authlock
);
2559 if (!in
->xattrlock
.is_stable())
2560 eval_gather(&in
->xattrlock
);
2563 try_eval(in
, CEPH_CAP_LOCKS
);
2565 request_inode_file_caps(in
);
2571 void Locker::resume_stale_caps(Session
*session
)
2573 dout(10) << "resume_stale_caps for " << session
->info
.inst
.name
<< dendl
;
2575 bool lazy
= session
->info
.has_feature(CEPHFS_FEATURE_LAZY_CAP_WANTED
);
2576 for (xlist
<Capability
*>::iterator p
= session
->caps
.begin(); !p
.end(); ) {
2577 Capability
*cap
= *p
;
2579 if (lazy
&& !cap
->is_notable())
2580 break; // see revoke_stale_caps()
2582 CInode
*in
= cap
->get_inode();
2583 ceph_assert(in
->is_head());
2584 dout(10) << " clearing stale flag on " << *in
<< dendl
;
2586 if (in
->state_test(CInode::STATE_EXPORTINGCAPS
)) {
2587 // if export succeeds, the cap will be removed. if export fails,
2588 // we need to re-issue the cap if it's not stale.
2589 in
->state_set(CInode::STATE_EVALSTALECAPS
);
2593 if (!in
->is_auth() || !eval(in
, CEPH_CAP_LOCKS
))
2594 issue_caps(in
, cap
);
2598 void Locker::remove_stale_leases(Session
*session
)
2600 dout(10) << "remove_stale_leases for " << session
->info
.inst
.name
<< dendl
;
2601 xlist
<ClientLease
*>::iterator p
= session
->leases
.begin();
2603 ClientLease
*l
= *p
;
2605 CDentry
*parent
= static_cast<CDentry
*>(l
->parent
);
2606 dout(15) << " removing lease on " << *parent
<< dendl
;
2607 parent
->remove_client_lease(l
, this);
2612 class C_MDL_RequestInodeFileCaps
: public LockerContext
{
2615 C_MDL_RequestInodeFileCaps(Locker
*l
, CInode
*i
) : LockerContext(l
), in(i
) {
2616 in
->get(CInode::PIN_PTRWAITER
);
2618 void finish(int r
) override
{
2620 locker
->request_inode_file_caps(in
);
2621 in
->put(CInode::PIN_PTRWAITER
);
2625 void Locker::request_inode_file_caps(CInode
*in
)
2627 ceph_assert(!in
->is_auth());
2629 int wanted
= in
->get_caps_wanted() & in
->get_caps_allowed_ever() & ~CEPH_CAP_PIN
;
2630 if (wanted
!= in
->replica_caps_wanted
) {
2631 // wait for single auth
2632 if (in
->is_ambiguous_auth()) {
2633 in
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
,
2634 new C_MDL_RequestInodeFileCaps(this, in
));
2638 mds_rank_t auth
= in
->authority().first
;
2639 if (mds
->is_cluster_degraded() &&
2640 mds
->mdsmap
->get_state(auth
) == MDSMap::STATE_REJOIN
) {
2641 mds
->wait_for_active_peer(auth
, new C_MDL_RequestInodeFileCaps(this, in
));
2645 dout(7) << "request_inode_file_caps " << ccap_string(wanted
)
2646 << " was " << ccap_string(in
->replica_caps_wanted
)
2647 << " on " << *in
<< " to mds." << auth
<< dendl
;
2649 in
->replica_caps_wanted
= wanted
;
2651 if (!mds
->is_cluster_degraded() ||
2652 mds
->mdsmap
->is_clientreplay_or_active_or_stopping(auth
))
2653 mds
->send_message_mds(make_message
<MInodeFileCaps
>(in
->ino(), in
->replica_caps_wanted
), auth
);
2657 void Locker::handle_inode_file_caps(const cref_t
<MInodeFileCaps
> &m
)
2659 // nobody should be talking to us during recovery.
2660 if (mds
->get_state() < MDSMap::STATE_CLIENTREPLAY
) {
2661 if (mds
->get_want_state() >= MDSMap::STATE_CLIENTREPLAY
) {
2662 mds
->wait_for_replay(new C_MDS_RetryMessage(mds
, m
));
2665 ceph_abort_msg("got unexpected message during recovery");
2669 CInode
*in
= mdcache
->get_inode(m
->get_ino());
2670 mds_rank_t from
= mds_rank_t(m
->get_source().num());
2673 ceph_assert(in
->is_auth());
2675 dout(7) << "handle_inode_file_caps replica mds." << from
<< " wants caps " << ccap_string(m
->get_caps()) << " on " << *in
<< dendl
;
2677 if (mds
->logger
) mds
->logger
->inc(l_mdss_handle_inode_file_caps
);
2679 in
->set_mds_caps_wanted(from
, m
->get_caps());
2681 try_eval(in
, CEPH_CAP_LOCKS
);
2685 class C_MDL_CheckMaxSize
: public LockerContext
{
2687 uint64_t new_max_size
;
2692 C_MDL_CheckMaxSize(Locker
*l
, CInode
*i
, uint64_t _new_max_size
,
2693 uint64_t _newsize
, utime_t _mtime
) :
2694 LockerContext(l
), in(i
),
2695 new_max_size(_new_max_size
), newsize(_newsize
), mtime(_mtime
)
2697 in
->get(CInode::PIN_PTRWAITER
);
2699 void finish(int r
) override
{
2701 locker
->check_inode_max_size(in
, false, new_max_size
, newsize
, mtime
);
2702 in
->put(CInode::PIN_PTRWAITER
);
2706 uint64_t Locker::calc_new_max_size(const CInode::inode_const_ptr
&pi
, uint64_t size
)
2708 uint64_t new_max
= (size
+ 1) << 1;
2709 uint64_t max_inc
= g_conf()->mds_client_writeable_range_max_inc_objs
;
2711 max_inc
*= pi
->layout
.object_size
;
2712 new_max
= std::min(new_max
, size
+ max_inc
);
2714 return round_up_to(new_max
, pi
->get_layout_size_increment());
2717 bool Locker::check_client_ranges(CInode
*in
, uint64_t size
)
2719 const auto& latest
= in
->get_projected_inode();
2721 if (latest
->has_layout()) {
2722 ms
= calc_new_max_size(latest
, size
);
2724 // Layout-less directories like ~mds0/, have zero size
2728 auto it
= latest
->client_ranges
.begin();
2729 for (auto &p
: in
->client_caps
) {
2730 if ((p
.second
.issued() | p
.second
.wanted()) & CEPH_CAP_ANY_FILE_WR
) {
2731 if (it
== latest
->client_ranges
.end())
2733 if (it
->first
!= p
.first
)
2735 if (ms
> it
->second
.range
.last
)
2740 return it
!= latest
->client_ranges
.end();
2743 bool Locker::calc_new_client_ranges(CInode
*in
, uint64_t size
, bool *max_increased
)
2745 const auto& latest
= in
->get_projected_inode();
2747 if (latest
->has_layout()) {
2748 ms
= calc_new_max_size(latest
, size
);
2750 // Layout-less directories like ~mds0/, have zero size
2754 auto pi
= in
->_get_projected_inode();
2755 bool updated
= false;
2757 // increase ranges as appropriate.
2758 // shrink to 0 if no WR|BUFFER caps issued.
2759 auto it
= pi
->client_ranges
.begin();
2760 for (auto &p
: in
->client_caps
) {
2761 if ((p
.second
.issued() | p
.second
.wanted()) & CEPH_CAP_ANY_FILE_WR
) {
2762 while (it
!= pi
->client_ranges
.end() && it
->first
< p
.first
) {
2763 it
= pi
->client_ranges
.erase(it
);
2767 if (it
!= pi
->client_ranges
.end() && it
->first
== p
.first
) {
2768 if (ms
> it
->second
.range
.last
) {
2769 it
->second
.range
.last
= ms
;
2772 *max_increased
= true;
2775 it
= pi
->client_ranges
.emplace_hint(it
, std::piecewise_construct
,
2776 std::forward_as_tuple(p
.first
),
2777 std::forward_as_tuple());
2778 it
->second
.range
.last
= ms
;
2779 it
->second
.follows
= in
->first
- 1;
2782 *max_increased
= true;
2784 p
.second
.mark_clientwriteable();
2787 p
.second
.clear_clientwriteable();
2790 while (it
!= pi
->client_ranges
.end()) {
2791 it
= pi
->client_ranges
.erase(it
);
2795 if (pi
->client_ranges
.empty())
2796 in
->clear_clientwriteable();
2798 in
->mark_clientwriteable();
2803 bool Locker::check_inode_max_size(CInode
*in
, bool force_wrlock
,
2804 uint64_t new_max_size
, uint64_t new_size
,
2807 ceph_assert(in
->is_auth());
2808 ceph_assert(in
->is_file());
2810 const auto& latest
= in
->get_projected_inode();
2811 uint64_t size
= latest
->size
;
2812 bool update_size
= new_size
> 0;
2815 new_size
= size
= std::max(size
, new_size
);
2816 new_mtime
= std::max(new_mtime
, latest
->mtime
);
2817 if (latest
->size
== new_size
&& latest
->mtime
== new_mtime
)
2818 update_size
= false;
2821 bool new_ranges
= check_client_ranges(in
, std::max(new_max_size
, size
));
2822 if (!update_size
&& !new_ranges
) {
2823 dout(20) << "check_inode_max_size no-op on " << *in
<< dendl
;
2827 dout(10) << "check_inode_max_size new_ranges " << new_ranges
2828 << " update_size " << update_size
2829 << " on " << *in
<< dendl
;
2831 if (in
->is_frozen()) {
2832 dout(10) << "check_inode_max_size frozen, waiting on " << *in
<< dendl
;
2833 in
->add_waiter(CInode::WAIT_UNFREEZE
,
2834 new C_MDL_CheckMaxSize(this, in
, new_max_size
, new_size
, new_mtime
));
2836 } else if (!force_wrlock
&& !in
->filelock
.can_wrlock(in
->get_loner())) {
2838 if (in
->filelock
.is_stable()) {
2839 if (in
->get_target_loner() >= 0)
2840 file_excl(&in
->filelock
);
2842 simple_lock(&in
->filelock
);
2844 if (!in
->filelock
.can_wrlock(in
->get_loner())) {
2845 dout(10) << "check_inode_max_size can't wrlock, waiting on " << *in
<< dendl
;
2846 in
->filelock
.add_waiter(SimpleLock::WAIT_STABLE
,
2847 new C_MDL_CheckMaxSize(this, in
, new_max_size
, new_size
, new_mtime
));
2852 MutationRef
mut(new MutationImpl());
2853 mut
->ls
= mds
->mdlog
->get_current_segment();
2855 auto pi
= in
->project_inode(mut
);
2856 pi
.inode
->version
= in
->pre_dirty();
2858 bool max_increased
= false;
2860 calc_new_client_ranges(in
, std::max(new_max_size
, size
), &max_increased
)) {
2861 dout(10) << "check_inode_max_size client_ranges "
2862 << in
->get_previous_projected_inode()->client_ranges
2863 << " -> " << pi
.inode
->client_ranges
<< dendl
;
2867 dout(10) << "check_inode_max_size size " << pi
.inode
->size
<< " -> " << new_size
<< dendl
;
2868 pi
.inode
->size
= new_size
;
2869 pi
.inode
->rstat
.rbytes
= new_size
;
2870 dout(10) << "check_inode_max_size mtime " << pi
.inode
->mtime
<< " -> " << new_mtime
<< dendl
;
2871 pi
.inode
->mtime
= new_mtime
;
2872 if (new_mtime
> pi
.inode
->ctime
) {
2873 pi
.inode
->ctime
= new_mtime
;
2874 if (new_mtime
> pi
.inode
->rstat
.rctime
)
2875 pi
.inode
->rstat
.rctime
= new_mtime
;
2879 // use EOpen if the file is still open; otherwise, use EUpdate.
2880 // this is just an optimization to push open files forward into
2881 // newer log segments.
2883 EMetaBlob
*metablob
;
2884 if (in
->is_any_caps_wanted() && in
->last
== CEPH_NOSNAP
) {
2885 EOpen
*eo
= new EOpen(mds
->mdlog
);
2886 eo
->add_ino(in
->ino());
2887 metablob
= &eo
->metablob
;
2890 EUpdate
*eu
= new EUpdate(mds
->mdlog
, "check_inode_max_size");
2891 metablob
= &eu
->metablob
;
2894 mds
->mdlog
->start_entry(le
);
2896 mdcache
->predirty_journal_parents(mut
, metablob
, in
, 0, PREDIRTY_PRIMARY
);
2898 CDentry
*parent
= in
->get_projected_parent_dn();
2899 metablob
->add_primary_dentry(parent
, in
, true);
2900 mdcache
->journal_dirty_inode(mut
.get(), metablob
, in
);
2902 mds
->mdlog
->submit_entry(le
, new C_Locker_FileUpdate_finish(this, in
, mut
,
2903 UPDATE_SHAREMAX
, ref_t
<MClientCaps
>()));
2904 wrlock_force(&in
->filelock
, mut
); // wrlock for duration of journal
2907 // make max_size _increase_ timely
2909 mds
->mdlog
->flush();
2915 void Locker::share_inode_max_size(CInode
*in
, Capability
*only_cap
)
2918 * only share if currently issued a WR cap. if client doesn't have it,
2919 * file_max doesn't matter, and the client will get it if/when they get
2922 dout(10) << "share_inode_max_size on " << *in
<< dendl
;
2923 map
<client_t
, Capability
>::iterator it
;
2925 it
= in
->client_caps
.find(only_cap
->get_client());
2927 it
= in
->client_caps
.begin();
2928 for (; it
!= in
->client_caps
.end(); ++it
) {
2929 const client_t client
= it
->first
;
2930 Capability
*cap
= &it
->second
;
2931 if (cap
->is_suppress())
2933 if (cap
->pending() & (CEPH_CAP_FILE_WR
|CEPH_CAP_FILE_BUFFER
)) {
2934 dout(10) << "share_inode_max_size with client." << client
<< dendl
;
2935 if (mds
->logger
) mds
->logger
->inc(l_mdss_ceph_cap_op_grant
);
2936 cap
->inc_last_seq();
2937 auto m
= make_message
<MClientCaps
>(CEPH_CAP_OP_GRANT
,
2939 in
->find_snaprealm()->inode
->ino(),
2941 cap
->get_last_seq(),
2945 mds
->get_osd_epoch_barrier());
2946 in
->encode_cap_message(m
, cap
);
2947 mds
->send_message_client_counted(m
, client
);
2954 bool Locker::_need_flush_mdlog(CInode
*in
, int wanted
, bool lock_state_any
)
2956 /* flush log if caps are wanted by client but corresponding lock is unstable and locked by
2957 * pending mutations. */
2958 if (((wanted
& (CEPH_CAP_FILE_RD
|CEPH_CAP_FILE_WR
|CEPH_CAP_FILE_SHARED
|CEPH_CAP_FILE_EXCL
)) &&
2959 (lock_state_any
? in
->filelock
.is_locked() : in
->filelock
.is_unstable_and_locked())) ||
2960 ((wanted
& (CEPH_CAP_AUTH_SHARED
|CEPH_CAP_AUTH_EXCL
)) &&
2961 (lock_state_any
? in
->authlock
.is_locked() : in
->authlock
.is_unstable_and_locked())) ||
2962 ((wanted
& (CEPH_CAP_LINK_SHARED
|CEPH_CAP_LINK_EXCL
)) &&
2963 (lock_state_any
? in
->linklock
.is_locked() : in
->linklock
.is_unstable_and_locked())) ||
2964 ((wanted
& (CEPH_CAP_XATTR_SHARED
|CEPH_CAP_XATTR_EXCL
)) &&
2965 (lock_state_any
? in
->xattrlock
.is_locked() : in
->xattrlock
.is_unstable_and_locked())))
2970 void Locker::adjust_cap_wanted(Capability
*cap
, int wanted
, int issue_seq
)
2972 if (ceph_seq_cmp(issue_seq
, cap
->get_last_issue()) == 0) {
2973 dout(10) << " wanted " << ccap_string(cap
->wanted())
2974 << " -> " << ccap_string(wanted
) << dendl
;
2975 cap
->set_wanted(wanted
);
2976 } else if (wanted
& ~cap
->wanted()) {
2977 dout(10) << " wanted " << ccap_string(cap
->wanted())
2978 << " -> " << ccap_string(wanted
)
2979 << " (added caps even though we had seq mismatch!)" << dendl
;
2980 cap
->set_wanted(wanted
| cap
->wanted());
2982 dout(10) << " NOT changing wanted " << ccap_string(cap
->wanted())
2983 << " -> " << ccap_string(wanted
)
2984 << " (issue_seq " << issue_seq
<< " != last_issue "
2985 << cap
->get_last_issue() << ")" << dendl
;
2989 CInode
*cur
= cap
->get_inode();
2990 if (!cur
->is_auth()) {
2991 request_inode_file_caps(cur
);
2995 if (cap
->wanted()) {
2996 if (cur
->state_test(CInode::STATE_RECOVERING
) &&
2997 (cap
->wanted() & (CEPH_CAP_FILE_RD
|
2998 CEPH_CAP_FILE_WR
))) {
2999 mds
->mdcache
->recovery_queue
.prioritize(cur
);
3002 if (mdcache
->open_file_table
.should_log_open(cur
)) {
3003 ceph_assert(cur
->last
== CEPH_NOSNAP
);
3004 EOpen
*le
= new EOpen(mds
->mdlog
);
3005 mds
->mdlog
->start_entry(le
);
3006 le
->add_clean_inode(cur
);
3007 mds
->mdlog
->submit_entry(le
);
3012 void Locker::snapflush_nudge(CInode
*in
)
3014 ceph_assert(in
->last
!= CEPH_NOSNAP
);
3015 if (in
->client_snap_caps
.empty())
3018 CInode
*head
= mdcache
->get_inode(in
->ino());
3019 // head inode gets unpinned when snapflush starts. It might get trimmed
3020 // before snapflush finishes.
3024 ceph_assert(head
->is_auth());
3025 if (head
->client_need_snapflush
.empty())
3028 SimpleLock
*hlock
= head
->get_lock(CEPH_LOCK_IFILE
);
3029 if (hlock
->get_state() == LOCK_SYNC
|| !hlock
->is_stable()) {
3031 for (int i
= 0; i
< num_cinode_locks
; i
++) {
3032 SimpleLock
*lock
= head
->get_lock(cinode_lock_info
[i
].lock
);
3033 if (lock
->get_state() != LOCK_SYNC
&& lock
->is_stable()) {
3040 _rdlock_kick(hlock
, true);
3042 // also, requeue, in case of unstable lock
3043 need_snapflush_inodes
.push_back(&in
->item_caps
);
3047 void Locker::mark_need_snapflush_inode(CInode
*in
)
3049 ceph_assert(in
->last
!= CEPH_NOSNAP
);
3050 if (!in
->item_caps
.is_on_list()) {
3051 need_snapflush_inodes
.push_back(&in
->item_caps
);
3052 utime_t now
= ceph_clock_now();
3053 in
->last_dirstat_prop
= now
;
3054 dout(10) << "mark_need_snapflush_inode " << *in
<< " - added at " << now
<< dendl
;
3058 bool Locker::is_revoking_any_caps_from(client_t client
)
3060 auto it
= revoking_caps_by_client
.find(client
);
3061 if (it
== revoking_caps_by_client
.end())
3063 return !it
->second
.empty();
3066 void Locker::_do_null_snapflush(CInode
*head_in
, client_t client
, snapid_t last
)
3068 dout(10) << "_do_null_snapflush client." << client
<< " on " << *head_in
<< dendl
;
3069 for (auto p
= head_in
->client_need_snapflush
.begin();
3070 p
!= head_in
->client_need_snapflush
.end() && p
->first
< last
; ) {
3071 snapid_t snapid
= p
->first
;
3072 auto &clients
= p
->second
;
3073 ++p
; // be careful, q loop below depends on this
3075 if (clients
.count(client
)) {
3076 dout(10) << " doing async NULL snapflush on " << snapid
<< " from client." << client
<< dendl
;
3077 CInode
*sin
= mdcache
->pick_inode_snap(head_in
, snapid
- 1);
3079 ceph_assert(sin
->first
<= snapid
);
3080 _do_snap_update(sin
, snapid
, 0, sin
->first
- 1, client
, ref_t
<MClientCaps
>(), ref_t
<MClientCaps
>());
3081 head_in
->remove_need_snapflush(sin
, snapid
, client
);
3087 bool Locker::should_defer_client_cap_frozen(CInode
*in
)
3089 if (in
->is_frozen())
3093 * This policy needs to be AT LEAST as permissive as allowing a client
3094 * request to go forward, or else a client request can release something,
3095 * the release gets deferred, but the request gets processed and deadlocks
3096 * because when the caps can't get revoked.
3098 * No auth_pin implies that there is no unstable lock and @in is not auth
3099 * pinnned by client request. If parent dirfrag is auth pinned by a lock
3100 * cache, later request from lock cache owner may forcibly auth pin the @in.
3102 if (in
->is_freezing() && in
->get_num_auth_pins() == 0) {
3103 CDir
* dir
= in
->get_parent_dir();
3104 if (!dir
|| !dir
->is_auth_pinned_by_lock_cache())
3110 void Locker::handle_client_caps(const cref_t
<MClientCaps
> &m
)
3112 client_t client
= m
->get_source().num();
3113 snapid_t follows
= m
->get_snap_follows();
3114 auto op
= m
->get_op();
3115 auto dirty
= m
->get_dirty();
3116 dout(7) << "handle_client_caps "
3117 << " on " << m
->get_ino()
3118 << " tid " << m
->get_client_tid() << " follows " << follows
3119 << " op " << ceph_cap_op_name(op
)
3120 << " flags 0x" << std::hex
<< m
->flags
<< std::dec
<< dendl
;
3122 Session
*session
= mds
->get_session(m
);
3123 if (!mds
->is_clientreplay() && !mds
->is_active() && !mds
->is_stopping()) {
3125 dout(5) << " no session, dropping " << *m
<< dendl
;
3128 if (session
->is_closed() ||
3129 session
->is_closing() ||
3130 session
->is_killing()) {
3131 dout(7) << " session closed|closing|killing, dropping " << *m
<< dendl
;
3134 if ((mds
->is_reconnect() || mds
->get_want_state() == MDSMap::STATE_RECONNECT
) &&
3135 dirty
&& m
->get_client_tid() > 0 &&
3136 !session
->have_completed_flush(m
->get_client_tid())) {
3137 mdcache
->set_reconnected_dirty_caps(client
, m
->get_ino(), dirty
,
3138 op
== CEPH_CAP_OP_FLUSHSNAP
);
3140 mds
->wait_for_replay(new C_MDS_RetryMessage(mds
, m
));
3144 if (mds
->logger
) mds
->logger
->inc(l_mdss_handle_client_caps
);
3146 if (mds
->logger
) mds
->logger
->inc(l_mdss_handle_client_caps_dirty
);
3149 if (m
->get_client_tid() > 0 && session
&&
3150 session
->have_completed_flush(m
->get_client_tid())) {
3151 dout(7) << "handle_client_caps already flushed tid " << m
->get_client_tid()
3152 << " for client." << client
<< dendl
;
3153 ref_t
<MClientCaps
> ack
;
3154 if (op
== CEPH_CAP_OP_FLUSHSNAP
) {
3155 if (mds
->logger
) mds
->logger
->inc(l_mdss_ceph_cap_op_flushsnap_ack
);
3156 ack
= make_message
<MClientCaps
>(CEPH_CAP_OP_FLUSHSNAP_ACK
, m
->get_ino(), 0, 0, 0, 0, 0, dirty
, 0, mds
->get_osd_epoch_barrier());
3158 if (mds
->logger
) mds
->logger
->inc(l_mdss_ceph_cap_op_flush_ack
);
3159 ack
= make_message
<MClientCaps
>(CEPH_CAP_OP_FLUSH_ACK
, m
->get_ino(), 0, m
->get_cap_id(), m
->get_seq(), m
->get_caps(), 0, dirty
, 0, mds
->get_osd_epoch_barrier());
3161 ack
->set_snap_follows(follows
);
3162 ack
->set_client_tid(m
->get_client_tid());
3163 mds
->send_message_client_counted(ack
, m
->get_connection());
3164 if (op
== CEPH_CAP_OP_FLUSHSNAP
) {
3167 // fall-thru because the message may release some caps
3169 op
= CEPH_CAP_OP_UPDATE
;
3173 // "oldest flush tid" > 0 means client uses unique TID for each flush
3174 if (m
->get_oldest_flush_tid() > 0 && session
) {
3175 if (session
->trim_completed_flushes(m
->get_oldest_flush_tid())) {
3176 mds
->mdlog
->get_current_segment()->touched_sessions
.insert(session
->info
.inst
.name
);
3178 if (session
->get_num_trim_flushes_warnings() > 0 &&
3179 session
->get_num_completed_flushes() * 2 < g_conf()->mds_max_completed_flushes
)
3180 session
->reset_num_trim_flushes_warnings();
3182 if (session
->get_num_completed_flushes() >=
3183 (g_conf()->mds_max_completed_flushes
<< session
->get_num_trim_flushes_warnings())) {
3184 session
->inc_num_trim_flushes_warnings();
3185 CachedStackStringStream css
;
3186 *css
<< "client." << session
->get_client() << " does not advance its oldest_flush_tid ("
3187 << m
->get_oldest_flush_tid() << "), "
3188 << session
->get_num_completed_flushes()
3189 << " completed flushes recorded in session";
3190 mds
->clog
->warn() << css
->strv();
3191 dout(20) << __func__
<< " " << css
->strv() << dendl
;
3196 CInode
*head_in
= mdcache
->get_inode(m
->get_ino());
3198 if (mds
->is_clientreplay()) {
3199 dout(7) << "handle_client_caps on unknown ino " << m
->get_ino()
3200 << ", will try again after replayed client requests" << dendl
;
3201 mdcache
->wait_replay_cap_reconnect(m
->get_ino(), new C_MDS_RetryMessage(mds
, m
));
3206 * "handle_client_caps on unknown ino xxx” is normal after migrating a subtree
3207 * Sequence of events that cause this are:
3208 * - client sends caps message to mds.a
3209 * - mds finishes subtree migration, send cap export to client
3210 * - mds trim its cache
3211 * - mds receives cap messages from client
3213 dout(7) << "handle_client_caps on unknown ino " << m
->get_ino() << ", dropping" << dendl
;
3217 if (m
->osd_epoch_barrier
&& !mds
->objecter
->have_map(m
->osd_epoch_barrier
)) {
3218 // Pause RADOS operations until we see the required epoch
3219 mds
->objecter
->set_epoch_barrier(m
->osd_epoch_barrier
);
3222 if (mds
->get_osd_epoch_barrier() < m
->osd_epoch_barrier
) {
3223 // Record the barrier so that we will retransmit it to clients
3224 mds
->set_osd_epoch_barrier(m
->osd_epoch_barrier
);
3227 dout(10) << " head inode " << *head_in
<< dendl
;
3229 Capability
*cap
= 0;
3230 cap
= head_in
->get_client_cap(client
);
3232 dout(7) << "handle_client_caps no cap for client." << client
<< " on " << *head_in
<< dendl
;
3238 if (should_defer_client_cap_frozen(head_in
)) {
3239 dout(7) << "handle_client_caps freezing|frozen on " << *head_in
<< dendl
;
3240 head_in
->add_waiter(CInode::WAIT_UNFREEZE
, new C_MDS_RetryMessage(mds
, m
));
3243 if (ceph_seq_cmp(m
->get_mseq(), cap
->get_mseq()) < 0) {
3244 dout(7) << "handle_client_caps mseq " << m
->get_mseq() << " < " << cap
->get_mseq()
3245 << ", dropping" << dendl
;
3249 bool need_unpin
= false;
3252 if (op
== CEPH_CAP_OP_FLUSHSNAP
) {
3253 if (!head_in
->is_auth()) {
3254 dout(7) << " not auth, ignoring flushsnap on " << *head_in
<< dendl
;
3258 SnapRealm
*realm
= head_in
->find_snaprealm();
3259 snapid_t snap
= realm
->get_snap_following(follows
);
3260 dout(10) << " flushsnap follows " << follows
<< " -> snap " << snap
<< dendl
;
3262 auto p
= head_in
->client_need_snapflush
.begin();
3263 if (p
!= head_in
->client_need_snapflush
.end() && p
->first
< snap
) {
3264 head_in
->auth_pin(this); // prevent subtree frozen
3266 _do_null_snapflush(head_in
, client
, snap
);
3269 CInode
*in
= head_in
;
3270 if (snap
!= CEPH_NOSNAP
) {
3271 in
= mdcache
->pick_inode_snap(head_in
, snap
- 1);
3273 dout(10) << " snapped inode " << *in
<< dendl
;
3276 // we can prepare the ack now, since this FLUSHEDSNAP is independent of any
3277 // other cap ops. (except possibly duplicate FLUSHSNAP requests, but worst
3278 // case we get a dup response, so whatever.)
3279 ref_t
<MClientCaps
> ack
;
3281 ack
= make_message
<MClientCaps
>(CEPH_CAP_OP_FLUSHSNAP_ACK
, in
->ino(), 0, 0, 0, 0, 0, dirty
, 0, mds
->get_osd_epoch_barrier());
3282 ack
->set_snap_follows(follows
);
3283 ack
->set_client_tid(m
->get_client_tid());
3284 ack
->set_oldest_flush_tid(m
->get_oldest_flush_tid());
3287 if (in
== head_in
||
3288 (head_in
->client_need_snapflush
.count(snap
) &&
3289 head_in
->client_need_snapflush
[snap
].count(client
))) {
3290 dout(7) << " flushsnap snap " << snap
3291 << " client." << client
<< " on " << *in
<< dendl
;
3293 // this cap now follows a later snap (i.e. the one initiating this flush, or later)
3295 cap
->client_follows
= snap
< CEPH_NOSNAP
? snap
: realm
->get_newest_seq();
3297 _do_snap_update(in
, snap
, dirty
, follows
, client
, m
, ack
);
3300 head_in
->remove_need_snapflush(in
, snap
, client
);
3302 dout(7) << " not expecting flushsnap " << snap
<< " from client." << client
<< " on " << *in
<< dendl
;
3304 if (mds
->logger
) mds
->logger
->inc(l_mdss_ceph_cap_op_flushsnap_ack
);
3305 mds
->send_message_client_counted(ack
, m
->get_connection());
3311 if (cap
->get_cap_id() != m
->get_cap_id()) {
3312 dout(7) << " ignoring client capid " << m
->get_cap_id() << " != my " << cap
->get_cap_id() << dendl
;
3314 CInode
*in
= head_in
;
3316 in
= mdcache
->pick_inode_snap(head_in
, follows
);
3317 // intermediate snap inodes
3318 while (in
!= head_in
) {
3319 ceph_assert(in
->last
!= CEPH_NOSNAP
);
3320 if (in
->is_auth() && dirty
) {
3321 dout(10) << " updating intermediate snapped inode " << *in
<< dendl
;
3322 _do_cap_update(in
, NULL
, dirty
, follows
, m
, ref_t
<MClientCaps
>());
3324 in
= mdcache
->pick_inode_snap(head_in
, in
->last
);
3328 // head inode, and cap
3329 ref_t
<MClientCaps
> ack
;
3331 int caps
= m
->get_caps();
3332 if (caps
& ~cap
->issued()) {
3333 dout(10) << " confirming not issued caps " << ccap_string(caps
& ~cap
->issued()) << dendl
;
3334 caps
&= cap
->issued();
3337 int revoked
= cap
->confirm_receipt(m
->get_seq(), caps
);
3338 dout(10) << " follows " << follows
3339 << " retains " << ccap_string(m
->get_caps())
3340 << " dirty " << ccap_string(dirty
)
3341 << " on " << *in
<< dendl
;
3343 if (revoked
& CEPH_CAP_ANY_DIR_OPS
)
3344 eval_lock_caches(cap
);
3346 // missing/skipped snapflush?
3347 // The client MAY send a snapflush if it is issued WR/EXCL caps, but
3348 // presently only does so when it has actual dirty metadata. But, we
3349 // set up the need_snapflush stuff based on the issued caps.
3350 // We can infer that the client WONT send a FLUSHSNAP once they have
3351 // released all WR/EXCL caps (the FLUSHSNAP always comes before the cap
3353 if (!head_in
->client_need_snapflush
.empty()) {
3354 if (!(cap
->issued() & CEPH_CAP_ANY_FILE_WR
) &&
3355 !(m
->flags
& MClientCaps::FLAG_PENDING_CAPSNAP
)) {
3356 head_in
->auth_pin(this); // prevent subtree frozen
3358 _do_null_snapflush(head_in
, client
);
3360 dout(10) << " revocation in progress, not making any conclusions about null snapflushes" << dendl
;
3363 if (cap
->need_snapflush() && !(m
->flags
& MClientCaps::FLAG_PENDING_CAPSNAP
))
3364 cap
->clear_needsnapflush();
3366 if (dirty
&& in
->is_auth()) {
3367 dout(7) << " flush client." << client
<< " dirty " << ccap_string(dirty
)
3368 << " seq " << m
->get_seq() << " on " << *in
<< dendl
;
3369 ack
= make_message
<MClientCaps
>(CEPH_CAP_OP_FLUSH_ACK
, in
->ino(), 0, cap
->get_cap_id(), m
->get_seq(),
3370 m
->get_caps(), 0, dirty
, 0, mds
->get_osd_epoch_barrier());
3371 ack
->set_client_tid(m
->get_client_tid());
3372 ack
->set_oldest_flush_tid(m
->get_oldest_flush_tid());
3375 // filter wanted based on what we could ever give out (given auth/replica status)
3376 bool need_flush
= m
->flags
& MClientCaps::FLAG_SYNC
;
3377 int new_wanted
= m
->get_wanted();
3378 if (new_wanted
!= cap
->wanted()) {
3379 if (!need_flush
&& in
->is_auth() && (new_wanted
& ~cap
->pending())) {
3380 // exapnding caps. make sure we aren't waiting for a log flush
3381 need_flush
= _need_flush_mdlog(head_in
, new_wanted
& ~cap
->pending());
3384 adjust_cap_wanted(cap
, new_wanted
, m
->get_issue_seq());
3387 if (in
->is_auth() &&
3388 _do_cap_update(in
, cap
, dirty
, follows
, m
, ack
, &need_flush
)) {
3390 eval(in
, CEPH_CAP_LOCKS
);
3392 if (!need_flush
&& (cap
->wanted() & ~cap
->pending()))
3393 need_flush
= _need_flush_mdlog(in
, cap
->wanted() & ~cap
->pending());
3395 // no update, ack now.
3397 if (mds
->logger
) mds
->logger
->inc(l_mdss_ceph_cap_op_flush_ack
);
3398 mds
->send_message_client_counted(ack
, m
->get_connection());
3401 bool did_issue
= eval(in
, CEPH_CAP_LOCKS
);
3402 if (!did_issue
&& (cap
->wanted() & ~cap
->pending()))
3403 issue_caps(in
, cap
);
3405 if (cap
->get_last_seq() == 0 &&
3406 (cap
->pending() & (CEPH_CAP_FILE_WR
|CEPH_CAP_FILE_BUFFER
))) {
3407 share_inode_max_size(in
, cap
);
3412 mds
->mdlog
->flush();
3417 head_in
->auth_unpin(this);
3421 class C_Locker_RetryRequestCapRelease
: public LockerContext
{
3423 ceph_mds_request_release item
;
3425 C_Locker_RetryRequestCapRelease(Locker
*l
, client_t c
, const ceph_mds_request_release
& it
) :
3426 LockerContext(l
), client(c
), item(it
) { }
3427 void finish(int r
) override
{
3429 MDRequestRef null_ref
;
3430 locker
->process_request_cap_release(null_ref
, client
, item
, dname
);
3434 void Locker::process_request_cap_release(MDRequestRef
& mdr
, client_t client
, const ceph_mds_request_release
& item
,
3435 std::string_view dname
)
3437 inodeno_t ino
= (uint64_t)item
.ino
;
3438 uint64_t cap_id
= item
.cap_id
;
3439 int caps
= item
.caps
;
3440 int wanted
= item
.wanted
;
3442 int issue_seq
= item
.issue_seq
;
3443 int mseq
= item
.mseq
;
3445 CInode
*in
= mdcache
->get_inode(ino
);
3449 if (dname
.length()) {
3450 frag_t fg
= in
->pick_dirfrag(dname
);
3451 CDir
*dir
= in
->get_dirfrag(fg
);
3453 CDentry
*dn
= dir
->lookup(dname
);
3455 ClientLease
*l
= dn
->get_client_lease(client
);
3457 dout(10) << __func__
<< " removing lease on " << *dn
<< dendl
;
3458 dn
->remove_client_lease(l
, this);
3460 dout(7) << __func__
<< " client." << client
3461 << " doesn't have lease on " << *dn
<< dendl
;
3464 dout(7) << __func__
<< " client." << client
<< " released lease on dn "
3465 << dir
->dirfrag() << "/" << dname
<< " which dne" << dendl
;
3470 Capability
*cap
= in
->get_client_cap(client
);
3474 dout(10) << __func__
<< " client." << client
<< " " << ccap_string(caps
) << " on " << *in
3475 << (mdr
? "" : " (DEFERRED, no mdr)")
3478 if (ceph_seq_cmp(mseq
, cap
->get_mseq()) < 0) {
3479 dout(7) << " mseq " << mseq
<< " < " << cap
->get_mseq() << ", dropping" << dendl
;
3483 if (cap
->get_cap_id() != cap_id
) {
3484 dout(7) << " cap_id " << cap_id
<< " != " << cap
->get_cap_id() << ", dropping" << dendl
;
3488 if (should_defer_client_cap_frozen(in
)) {
3489 dout(7) << " frozen, deferring" << dendl
;
3490 in
->add_waiter(CInode::WAIT_UNFREEZE
, new C_Locker_RetryRequestCapRelease(this, client
, item
));
3494 if (mds
->logger
) mds
->logger
->inc(l_mdss_process_request_cap_release
);
3496 if (caps
& ~cap
->issued()) {
3497 dout(10) << " confirming not issued caps " << ccap_string(caps
& ~cap
->issued()) << dendl
;
3498 caps
&= cap
->issued();
3500 int revoked
= cap
->confirm_receipt(seq
, caps
);
3501 if (revoked
& CEPH_CAP_ANY_DIR_OPS
)
3502 eval_lock_caches(cap
);
3504 if (!in
->client_need_snapflush
.empty() &&
3505 (cap
->issued() & CEPH_CAP_ANY_FILE_WR
) == 0) {
3506 _do_null_snapflush(in
, client
);
3509 adjust_cap_wanted(cap
, wanted
, issue_seq
);
3512 cap
->inc_suppress();
3513 eval(in
, CEPH_CAP_LOCKS
);
3515 cap
->dec_suppress();
3517 // take note; we may need to reissue on this cap later
3519 mdr
->cap_releases
[in
->vino()] = cap
->get_last_seq();
3522 class C_Locker_RetryKickIssueCaps
: public LockerContext
{
3527 C_Locker_RetryKickIssueCaps(Locker
*l
, CInode
*i
, client_t c
, ceph_seq_t s
) :
3528 LockerContext(l
), in(i
), client(c
), seq(s
) {
3529 in
->get(CInode::PIN_PTRWAITER
);
3531 void finish(int r
) override
{
3532 locker
->kick_issue_caps(in
, client
, seq
);
3533 in
->put(CInode::PIN_PTRWAITER
);
3537 void Locker::kick_issue_caps(CInode
*in
, client_t client
, ceph_seq_t seq
)
3539 Capability
*cap
= in
->get_client_cap(client
);
3540 if (!cap
|| cap
->get_last_seq() != seq
)
3542 if (in
->is_frozen()) {
3543 dout(10) << "kick_issue_caps waiting for unfreeze on " << *in
<< dendl
;
3544 in
->add_waiter(CInode::WAIT_UNFREEZE
,
3545 new C_Locker_RetryKickIssueCaps(this, in
, client
, seq
));
3548 dout(10) << "kick_issue_caps released at current seq " << seq
3549 << ", reissuing" << dendl
;
3550 issue_caps(in
, cap
);
3553 void Locker::kick_cap_releases(MDRequestRef
& mdr
)
3555 client_t client
= mdr
->get_client();
3556 for (map
<vinodeno_t
,ceph_seq_t
>::iterator p
= mdr
->cap_releases
.begin();
3557 p
!= mdr
->cap_releases
.end();
3559 CInode
*in
= mdcache
->get_inode(p
->first
);
3562 kick_issue_caps(in
, client
, p
->second
);
3567 * m and ack might be NULL, so don't dereference them unless dirty != 0
3569 void Locker::_do_snap_update(CInode
*in
, snapid_t snap
, int dirty
, snapid_t follows
, client_t client
, const cref_t
<MClientCaps
> &m
, const ref_t
<MClientCaps
> &ack
)
3571 dout(10) << "_do_snap_update dirty " << ccap_string(dirty
)
3572 << " follows " << follows
<< " snap " << snap
3573 << " on " << *in
<< dendl
;
3575 if (snap
== CEPH_NOSNAP
) {
3576 // hmm, i guess snap was already deleted? just ack!
3577 dout(10) << " wow, the snap following " << follows
3578 << " was already deleted. nothing to record, just ack." << dendl
;
3580 if (ack
->get_op() == CEPH_CAP_OP_FLUSHSNAP_ACK
) {
3581 if (mds
->logger
) mds
->logger
->inc(l_mdss_ceph_cap_op_flushsnap_ack
);
3583 mds
->send_message_client_counted(ack
, m
->get_connection());
3588 EUpdate
*le
= new EUpdate(mds
->mdlog
, "snap flush");
3589 mds
->mdlog
->start_entry(le
);
3590 MutationRef mut
= new MutationImpl();
3591 mut
->ls
= mds
->mdlog
->get_current_segment();
3593 // normal metadata updates that we can apply to the head as well.
3596 CInode::mempool_xattr_map
*px
= nullptr;
3597 bool xattrs
= (dirty
& CEPH_CAP_XATTR_EXCL
) &&
3598 m
->xattrbl
.length() &&
3599 m
->head
.xattr_version
> in
->get_projected_inode()->xattr_version
;
3601 CInode::mempool_old_inode
*oi
= nullptr;
3602 CInode::old_inode_map_ptr _old_inodes
;
3603 if (in
->is_any_old_inodes()) {
3604 auto last
= in
->pick_old_inode(snap
);
3606 _old_inodes
= CInode::allocate_old_inode_map(*in
->get_old_inodes());
3607 oi
= &_old_inodes
->at(last
);
3608 if (snap
> oi
->first
) {
3609 (*_old_inodes
)[snap
- 1] = *oi
;;
3615 CInode::mempool_inode
*i
;
3617 dout(10) << " writing into old inode" << dendl
;
3618 auto pi
= in
->project_inode(mut
);
3619 pi
.inode
->version
= in
->pre_dirty();
3624 auto pi
= in
->project_inode(mut
, xattrs
);
3625 pi
.inode
->version
= in
->pre_dirty();
3628 px
= pi
.xattrs
.get();
3631 _update_cap_fields(in
, dirty
, m
, i
);
3635 dout(7) << " xattrs v" << i
->xattr_version
<< " -> " << m
->head
.xattr_version
3636 << " len " << m
->xattrbl
.length() << dendl
;
3637 i
->xattr_version
= m
->head
.xattr_version
;
3638 auto p
= m
->xattrbl
.cbegin();
3643 auto it
= i
->client_ranges
.find(client
);
3644 if (it
!= i
->client_ranges
.end()) {
3645 if (in
->last
== snap
) {
3646 dout(10) << " removing client_range entirely" << dendl
;
3647 i
->client_ranges
.erase(it
);
3649 dout(10) << " client_range now follows " << snap
<< dendl
;
3650 it
->second
.follows
= snap
;
3656 in
->reset_old_inodes(std::move(_old_inodes
));
3659 mdcache
->predirty_journal_parents(mut
, &le
->metablob
, in
, 0, PREDIRTY_PRIMARY
, 0, follows
);
3660 mdcache
->journal_dirty_inode(mut
.get(), &le
->metablob
, in
, follows
);
3662 // "oldest flush tid" > 0 means client uses unique TID for each flush
3663 if (ack
&& ack
->get_oldest_flush_tid() > 0)
3664 le
->metablob
.add_client_flush(metareqid_t(m
->get_source(), ack
->get_client_tid()),
3665 ack
->get_oldest_flush_tid());
3667 mds
->mdlog
->submit_entry(le
, new C_Locker_FileUpdate_finish(this, in
, mut
, UPDATE_SNAPFLUSH
,
3671 void Locker::_update_cap_fields(CInode
*in
, int dirty
, const cref_t
<MClientCaps
> &m
, CInode::mempool_inode
*pi
)
3676 /* m must be valid if there are dirty caps */
3678 uint64_t features
= m
->get_connection()->get_features();
3680 if (m
->get_ctime() > pi
->ctime
) {
3681 dout(7) << " ctime " << pi
->ctime
<< " -> " << m
->get_ctime()
3682 << " for " << *in
<< dendl
;
3683 pi
->ctime
= m
->get_ctime();
3684 if (m
->get_ctime() > pi
->rstat
.rctime
)
3685 pi
->rstat
.rctime
= m
->get_ctime();
3688 if ((features
& CEPH_FEATURE_FS_CHANGE_ATTR
) &&
3689 m
->get_change_attr() > pi
->change_attr
) {
3690 dout(7) << " change_attr " << pi
->change_attr
<< " -> " << m
->get_change_attr()
3691 << " for " << *in
<< dendl
;
3692 pi
->change_attr
= m
->get_change_attr();
3696 if (dirty
& (CEPH_CAP_FILE_EXCL
|CEPH_CAP_FILE_WR
)) {
3697 utime_t atime
= m
->get_atime();
3698 utime_t mtime
= m
->get_mtime();
3699 uint64_t size
= m
->get_size();
3700 version_t inline_version
= m
->inline_version
;
3702 if (((dirty
& CEPH_CAP_FILE_WR
) && mtime
> pi
->mtime
) ||
3703 ((dirty
& CEPH_CAP_FILE_EXCL
) && mtime
!= pi
->mtime
)) {
3704 dout(7) << " mtime " << pi
->mtime
<< " -> " << mtime
3705 << " for " << *in
<< dendl
;
3707 if (mtime
> pi
->rstat
.rctime
)
3708 pi
->rstat
.rctime
= mtime
;
3710 if (in
->is_file() && // ONLY if regular file
3712 dout(7) << " size " << pi
->size
<< " -> " << size
3713 << " for " << *in
<< dendl
;
3715 pi
->rstat
.rbytes
= size
;
3717 if (in
->is_file() &&
3718 (dirty
& CEPH_CAP_FILE_WR
) &&
3719 inline_version
> pi
->inline_data
.version
) {
3720 pi
->inline_data
.version
= inline_version
;
3721 if (inline_version
!= CEPH_INLINE_NONE
&& m
->inline_data
.length() > 0)
3722 pi
->inline_data
.set_data(m
->inline_data
);
3724 pi
->inline_data
.free_data();
3726 if ((dirty
& CEPH_CAP_FILE_EXCL
) && atime
!= pi
->atime
) {
3727 dout(7) << " atime " << pi
->atime
<< " -> " << atime
3728 << " for " << *in
<< dendl
;
3731 if ((dirty
& CEPH_CAP_FILE_EXCL
) &&
3732 ceph_seq_cmp(pi
->time_warp_seq
, m
->get_time_warp_seq()) < 0) {
3733 dout(7) << " time_warp_seq " << pi
->time_warp_seq
<< " -> " << m
->get_time_warp_seq()
3734 << " for " << *in
<< dendl
;
3735 pi
->time_warp_seq
= m
->get_time_warp_seq();
3739 if (dirty
& CEPH_CAP_AUTH_EXCL
) {
3740 if (m
->head
.uid
!= pi
->uid
) {
3741 dout(7) << " uid " << pi
->uid
3742 << " -> " << m
->head
.uid
3743 << " for " << *in
<< dendl
;
3744 pi
->uid
= m
->head
.uid
;
3746 if (m
->head
.gid
!= pi
->gid
) {
3747 dout(7) << " gid " << pi
->gid
3748 << " -> " << m
->head
.gid
3749 << " for " << *in
<< dendl
;
3750 pi
->gid
= m
->head
.gid
;
3752 if (m
->head
.mode
!= pi
->mode
) {
3753 dout(7) << " mode " << oct
<< pi
->mode
3754 << " -> " << m
->head
.mode
<< dec
3755 << " for " << *in
<< dendl
;
3756 pi
->mode
= m
->head
.mode
;
3758 if ((features
& CEPH_FEATURE_FS_BTIME
) && m
->get_btime() != pi
->btime
) {
3759 dout(7) << " btime " << oct
<< pi
->btime
3760 << " -> " << m
->get_btime() << dec
3761 << " for " << *in
<< dendl
;
3762 pi
->btime
= m
->get_btime();
3768 * update inode based on cap flush|flushsnap|wanted.
3769 * adjust max_size, if needed.
3770 * if we update, return true; otherwise, false (no updated needed).
3772 bool Locker::_do_cap_update(CInode
*in
, Capability
*cap
,
3773 int dirty
, snapid_t follows
,
3774 const cref_t
<MClientCaps
> &m
, const ref_t
<MClientCaps
> &ack
,
3777 dout(10) << "_do_cap_update dirty " << ccap_string(dirty
)
3778 << " issued " << ccap_string(cap
? cap
->issued() : 0)
3779 << " wanted " << ccap_string(cap
? cap
->wanted() : 0)
3780 << " on " << *in
<< dendl
;
3781 ceph_assert(in
->is_auth());
3782 client_t client
= m
->get_source().num();
3783 const auto& latest
= in
->get_projected_inode();
3785 // increase or zero max_size?
3786 uint64_t size
= m
->get_size();
3787 bool change_max
= false;
3788 uint64_t old_max
= latest
->get_client_range(client
);
3789 uint64_t new_max
= old_max
;
3791 if (in
->is_file()) {
3792 bool forced_change_max
= false;
3793 dout(20) << "inode is file" << dendl
;
3794 if (cap
&& ((cap
->issued() | cap
->wanted()) & CEPH_CAP_ANY_FILE_WR
)) {
3795 dout(20) << "client has write caps; m->get_max_size="
3796 << m
->get_max_size() << "; old_max=" << old_max
<< dendl
;
3797 if (m
->get_max_size() > new_max
) {
3798 dout(10) << "client requests file_max " << m
->get_max_size()
3799 << " > max " << old_max
<< dendl
;
3801 forced_change_max
= true;
3802 new_max
= calc_new_max_size(latest
, m
->get_max_size());
3804 new_max
= calc_new_max_size(latest
, size
);
3806 if (new_max
> old_max
)
3818 if (in
->last
== CEPH_NOSNAP
&&
3820 !in
->filelock
.can_wrlock(client
) &&
3821 !in
->filelock
.can_force_wrlock(client
)) {
3822 dout(10) << " i want to change file_max, but lock won't allow it (yet)" << dendl
;
3823 if (in
->filelock
.is_stable()) {
3824 bool need_issue
= false;
3826 cap
->inc_suppress();
3827 if (in
->get_mds_caps_wanted().empty() &&
3828 (in
->get_loner() >= 0 || (in
->get_wanted_loner() >= 0 && in
->try_set_loner()))) {
3829 if (in
->filelock
.get_state() != LOCK_EXCL
)
3830 file_excl(&in
->filelock
, &need_issue
);
3832 simple_lock(&in
->filelock
, &need_issue
);
3836 cap
->dec_suppress();
3838 if (!in
->filelock
.can_wrlock(client
) &&
3839 !in
->filelock
.can_force_wrlock(client
)) {
3840 C_MDL_CheckMaxSize
*cms
= new C_MDL_CheckMaxSize(this, in
,
3841 forced_change_max
? new_max
: 0,
3844 in
->filelock
.add_waiter(SimpleLock::WAIT_STABLE
, cms
);
3850 if (m
->flockbl
.length()) {
3852 auto bli
= m
->flockbl
.cbegin();
3853 decode(num_locks
, bli
);
3854 for ( int i
=0; i
< num_locks
; ++i
) {
3855 ceph_filelock decoded_lock
;
3856 decode(decoded_lock
, bli
);
3857 in
->get_fcntl_lock_state()->held_locks
.
3858 insert(pair
<uint64_t, ceph_filelock
>(decoded_lock
.start
, decoded_lock
));
3859 ++in
->get_fcntl_lock_state()->client_held_lock_counts
[(client_t
)(decoded_lock
.client
)];
3861 decode(num_locks
, bli
);
3862 for ( int i
=0; i
< num_locks
; ++i
) {
3863 ceph_filelock decoded_lock
;
3864 decode(decoded_lock
, bli
);
3865 in
->get_flock_lock_state()->held_locks
.
3866 insert(pair
<uint64_t, ceph_filelock
>(decoded_lock
.start
, decoded_lock
));
3867 ++in
->get_flock_lock_state()->client_held_lock_counts
[(client_t
)(decoded_lock
.client
)];
3871 if (!dirty
&& !change_max
)
3874 Session
*session
= mds
->get_session(m
);
3875 if (session
->check_access(in
, MAY_WRITE
,
3876 m
->caller_uid
, m
->caller_gid
, NULL
, 0, 0) < 0) {
3877 dout(10) << "check_access failed, dropping cap update on " << *in
<< dendl
;
3882 EUpdate
*le
= new EUpdate(mds
->mdlog
, "cap update");
3883 mds
->mdlog
->start_entry(le
);
3885 bool xattr
= (dirty
& CEPH_CAP_XATTR_EXCL
) &&
3886 m
->xattrbl
.length() &&
3887 m
->head
.xattr_version
> in
->get_projected_inode()->xattr_version
;
3889 MutationRef
mut(new MutationImpl());
3890 mut
->ls
= mds
->mdlog
->get_current_segment();
3892 auto pi
= in
->project_inode(mut
, xattr
);
3893 pi
.inode
->version
= in
->pre_dirty();
3895 _update_cap_fields(in
, dirty
, m
, pi
.inode
.get());
3898 dout(7) << " max_size " << old_max
<< " -> " << new_max
3899 << " for " << *in
<< dendl
;
3901 auto &cr
= pi
.inode
->client_ranges
[client
];
3903 cr
.range
.last
= new_max
;
3904 cr
.follows
= in
->first
- 1;
3905 in
->mark_clientwriteable();
3907 cap
->mark_clientwriteable();
3909 pi
.inode
->client_ranges
.erase(client
);
3910 if (pi
.inode
->client_ranges
.empty())
3911 in
->clear_clientwriteable();
3913 cap
->clear_clientwriteable();
3917 if (change_max
|| (dirty
& (CEPH_CAP_FILE_EXCL
|CEPH_CAP_FILE_WR
)))
3918 wrlock_force(&in
->filelock
, mut
); // wrlock for duration of journal
3921 if (dirty
& CEPH_CAP_AUTH_EXCL
)
3922 wrlock_force(&in
->authlock
, mut
);
3926 dout(7) << " xattrs v" << pi
.inode
->xattr_version
<< " -> " << m
->head
.xattr_version
<< dendl
;
3927 pi
.inode
->xattr_version
= m
->head
.xattr_version
;
3928 auto p
= m
->xattrbl
.cbegin();
3929 decode_noshare(*pi
.xattrs
, p
);
3930 wrlock_force(&in
->xattrlock
, mut
);
3934 mdcache
->predirty_journal_parents(mut
, &le
->metablob
, in
, 0, PREDIRTY_PRIMARY
, 0, follows
);
3935 mdcache
->journal_dirty_inode(mut
.get(), &le
->metablob
, in
, follows
);
3937 // "oldest flush tid" > 0 means client uses unique TID for each flush
3938 if (ack
&& ack
->get_oldest_flush_tid() > 0)
3939 le
->metablob
.add_client_flush(metareqid_t(m
->get_source(), ack
->get_client_tid()),
3940 ack
->get_oldest_flush_tid());
3942 unsigned update_flags
= 0;
3944 update_flags
|= UPDATE_SHAREMAX
;
3946 update_flags
|= UPDATE_NEEDSISSUE
;
3947 mds
->mdlog
->submit_entry(le
, new C_Locker_FileUpdate_finish(this, in
, mut
, update_flags
,
3949 if (need_flush
&& !*need_flush
&&
3950 ((change_max
&& new_max
) || // max INCREASE
3951 _need_flush_mdlog(in
, dirty
)))
3957 void Locker::handle_client_cap_release(const cref_t
<MClientCapRelease
> &m
)
3959 client_t client
= m
->get_source().num();
3960 dout(10) << "handle_client_cap_release " << *m
<< dendl
;
3962 if (!mds
->is_clientreplay() && !mds
->is_active() && !mds
->is_stopping()) {
3963 mds
->wait_for_replay(new C_MDS_RetryMessage(mds
, m
));
3967 if (mds
->logger
) mds
->logger
->inc(l_mdss_handle_client_cap_release
);
3969 if (m
->osd_epoch_barrier
&& !mds
->objecter
->have_map(m
->osd_epoch_barrier
)) {
3970 // Pause RADOS operations until we see the required epoch
3971 mds
->objecter
->set_epoch_barrier(m
->osd_epoch_barrier
);
3974 if (mds
->get_osd_epoch_barrier() < m
->osd_epoch_barrier
) {
3975 // Record the barrier so that we will retransmit it to clients
3976 mds
->set_osd_epoch_barrier(m
->osd_epoch_barrier
);
3979 Session
*session
= mds
->get_session(m
);
3981 for (const auto &cap
: m
->caps
) {
3982 _do_cap_release(client
, inodeno_t((uint64_t)cap
.ino
) , cap
.cap_id
, cap
.migrate_seq
, cap
.seq
);
3986 session
->notify_cap_release(m
->caps
.size());
3990 class C_Locker_RetryCapRelease
: public LockerContext
{
3994 ceph_seq_t migrate_seq
;
3995 ceph_seq_t issue_seq
;
3997 C_Locker_RetryCapRelease(Locker
*l
, client_t c
, inodeno_t i
, uint64_t id
,
3998 ceph_seq_t mseq
, ceph_seq_t seq
) :
3999 LockerContext(l
), client(c
), ino(i
), cap_id(id
), migrate_seq(mseq
), issue_seq(seq
) {}
4000 void finish(int r
) override
{
4001 locker
->_do_cap_release(client
, ino
, cap_id
, migrate_seq
, issue_seq
);
4005 void Locker::_do_cap_release(client_t client
, inodeno_t ino
, uint64_t cap_id
,
4006 ceph_seq_t mseq
, ceph_seq_t seq
)
4008 CInode
*in
= mdcache
->get_inode(ino
);
4010 dout(7) << "_do_cap_release missing ino " << ino
<< dendl
;
4013 Capability
*cap
= in
->get_client_cap(client
);
4015 dout(7) << "_do_cap_release no cap for client" << client
<< " on "<< *in
<< dendl
;
4019 dout(7) << "_do_cap_release for client." << client
<< " on "<< *in
<< dendl
;
4020 if (cap
->get_cap_id() != cap_id
) {
4021 dout(7) << " capid " << cap_id
<< " != " << cap
->get_cap_id() << ", ignore" << dendl
;
4024 if (ceph_seq_cmp(mseq
, cap
->get_mseq()) < 0) {
4025 dout(7) << " mseq " << mseq
<< " < " << cap
->get_mseq() << ", ignore" << dendl
;
4028 if (should_defer_client_cap_frozen(in
)) {
4029 dout(7) << " freezing|frozen, deferring" << dendl
;
4030 in
->add_waiter(CInode::WAIT_UNFREEZE
,
4031 new C_Locker_RetryCapRelease(this, client
, ino
, cap_id
, mseq
, seq
));
4034 if (seq
!= cap
->get_last_issue()) {
4035 dout(7) << " issue_seq " << seq
<< " != " << cap
->get_last_issue() << dendl
;
4036 // clean out any old revoke history
4037 cap
->clean_revoke_from(seq
);
4038 eval_cap_gather(in
);
4041 remove_client_cap(in
, cap
);
4044 void Locker::remove_client_cap(CInode
*in
, Capability
*cap
, bool kill
)
4046 client_t client
= cap
->get_client();
4047 // clean out any pending snapflush state
4048 if (!in
->client_need_snapflush
.empty())
4049 _do_null_snapflush(in
, client
);
4051 while (!cap
->lock_caches
.empty()) {
4052 MDLockCache
* lock_cache
= cap
->lock_caches
.front();
4053 lock_cache
->client_cap
= nullptr;
4054 invalidate_lock_cache(lock_cache
);
4057 bool notable
= cap
->is_notable();
4058 in
->remove_client_cap(client
);
4062 if (in
->is_auth()) {
4063 // make sure we clear out the client byte range
4064 if (in
->get_projected_inode()->client_ranges
.count(client
) &&
4065 !(in
->get_inode()->nlink
== 0 && !in
->is_any_caps())) { // unless it's unlink + stray
4067 in
->state_set(CInode::STATE_NEEDSRECOVER
);
4069 check_inode_max_size(in
);
4072 request_inode_file_caps(in
);
4075 try_eval(in
, CEPH_CAP_LOCKS
);
4080 * Return true if any currently revoking caps exceed the
4081 * session_timeout threshold.
4083 bool Locker::any_late_revoking_caps(xlist
<Capability
*> const &revoking
,
4084 double timeout
) const
4086 xlist
<Capability
*>::const_iterator p
= revoking
.begin();
4088 // No revoking caps at the moment
4091 utime_t now
= ceph_clock_now();
4092 utime_t age
= now
- (*p
)->get_last_revoke_stamp();
4093 if (age
<= timeout
) {
4101 std::set
<client_t
> Locker::get_late_revoking_clients(double timeout
) const
4103 std::set
<client_t
> result
;
4105 if (any_late_revoking_caps(revoking_caps
, timeout
)) {
4106 // Slow path: execute in O(N_clients)
4107 for (auto &p
: revoking_caps_by_client
) {
4108 if (any_late_revoking_caps(p
.second
, timeout
)) {
4109 result
.insert(p
.first
);
4113 // Fast path: no misbehaving clients, execute in O(1)
4118 // Hard-code instead of surfacing a config settings because this is
4119 // really a hack that should go away at some point when we have better
4120 // inspection tools for getting at detailed cap state (#7316)
4121 #define MAX_WARN_CAPS 100
4123 void Locker::caps_tick()
4125 utime_t now
= ceph_clock_now();
4127 if (!need_snapflush_inodes
.empty()) {
4128 // snap inodes that needs flush are auth pinned, they affect
4129 // subtree/difrarg freeze.
4130 utime_t cutoff
= now
;
4131 cutoff
-= g_conf()->mds_freeze_tree_timeout
/ 3;
4133 CInode
*last
= need_snapflush_inodes
.back();
4134 while (!need_snapflush_inodes
.empty()) {
4135 CInode
*in
= need_snapflush_inodes
.front();
4136 if (in
->last_dirstat_prop
>= cutoff
)
4138 in
->item_caps
.remove_myself();
4139 snapflush_nudge(in
);
4145 dout(20) << __func__
<< " " << revoking_caps
.size() << " revoking caps" << dendl
;
4147 now
= ceph_clock_now();
4149 for (xlist
<Capability
*>::iterator p
= revoking_caps
.begin(); !p
.end(); ++p
) {
4150 Capability
*cap
= *p
;
4152 utime_t age
= now
- cap
->get_last_revoke_stamp();
4153 dout(20) << __func__
<< " age = " << age
<< " client." << cap
->get_client() << "." << cap
->get_inode()->ino() << dendl
;
4154 if (age
<= mds
->mdsmap
->get_session_timeout()) {
4155 dout(20) << __func__
<< " age below timeout " << mds
->mdsmap
->get_session_timeout() << dendl
;
4159 if (n
> MAX_WARN_CAPS
) {
4160 dout(1) << __func__
<< " more than " << MAX_WARN_CAPS
<< " caps are late"
4161 << "revoking, ignoring subsequent caps" << dendl
;
4165 // exponential backoff of warning intervals
4166 if (age
> mds
->mdsmap
->get_session_timeout() * (1 << cap
->get_num_revoke_warnings())) {
4167 cap
->inc_num_revoke_warnings();
4168 CachedStackStringStream css
;
4169 *css
<< "client." << cap
->get_client() << " isn't responding to mclientcaps(revoke), ino "
4170 << cap
->get_inode()->ino() << " pending " << ccap_string(cap
->pending())
4171 << " issued " << ccap_string(cap
->issued()) << ", sent " << age
<< " seconds ago";
4172 mds
->clog
->warn() << css
->strv();
4173 dout(20) << __func__
<< " " << css
->strv() << dendl
;
4175 dout(20) << __func__
<< " silencing log message (backoff) for " << "client." << cap
->get_client() << "." << cap
->get_inode()->ino() << dendl
;
4181 void Locker::handle_client_lease(const cref_t
<MClientLease
> &m
)
4183 dout(10) << "handle_client_lease " << *m
<< dendl
;
4185 ceph_assert(m
->get_source().is_client());
4186 client_t client
= m
->get_source().num();
4188 CInode
*in
= mdcache
->get_inode(m
->get_ino(), m
->get_last());
4190 dout(7) << "handle_client_lease don't have ino " << m
->get_ino() << "." << m
->get_last() << dendl
;
4195 frag_t fg
= in
->pick_dirfrag(m
->dname
);
4196 CDir
*dir
= in
->get_dirfrag(fg
);
4198 dn
= dir
->lookup(m
->dname
);
4200 dout(7) << "handle_client_lease don't have dn " << m
->get_ino() << " " << m
->dname
<< dendl
;
4203 dout(10) << " on " << *dn
<< dendl
;
4206 ClientLease
*l
= dn
->get_client_lease(client
);
4208 dout(7) << "handle_client_lease didn't have lease for client." << client
<< " of " << *dn
<< dendl
;
4212 switch (m
->get_action()) {
4213 case CEPH_MDS_LEASE_REVOKE_ACK
:
4214 case CEPH_MDS_LEASE_RELEASE
:
4215 if (l
->seq
!= m
->get_seq()) {
4216 dout(7) << "handle_client_lease release - seq " << l
->seq
<< " != provided " << m
->get_seq() << dendl
;
4218 dout(7) << "handle_client_lease client." << client
4219 << " on " << *dn
<< dendl
;
4220 dn
->remove_client_lease(l
, this);
4224 case CEPH_MDS_LEASE_RENEW
:
4226 dout(7) << "handle_client_lease client." << client
<< " renew on " << *dn
4227 << (!dn
->lock
.can_lease(client
)?", revoking lease":"") << dendl
;
4228 if (dn
->lock
.can_lease(client
)) {
4229 auto reply
= make_message
<MClientLease
>(*m
);
4230 int pool
= 1; // fixme.. do something smart!
4231 reply
->h
.duration_ms
= (int)(1000 * mdcache
->client_lease_durations
[pool
]);
4232 reply
->h
.seq
= ++l
->seq
;
4233 reply
->clear_payload();
4235 utime_t now
= ceph_clock_now();
4236 now
+= mdcache
->client_lease_durations
[pool
];
4237 mdcache
->touch_client_lease(l
, pool
, now
);
4239 mds
->send_message_client_counted(reply
, m
->get_connection());
4245 ceph_abort(); // implement me
4251 void Locker::issue_client_lease(CDentry
*dn
, CInode
*in
, MDRequestRef
&mdr
, utime_t now
,
4254 client_t client
= mdr
->get_client();
4255 Session
*session
= mdr
->session
;
4257 CInode
*diri
= dn
->get_dir()->get_inode();
4258 if (mdr
->snapid
== CEPH_NOSNAP
&&
4259 dn
->lock
.can_lease(client
) &&
4260 !diri
->is_stray() && // do not issue dn leases in stray dir!
4261 !diri
->filelock
.can_lease(client
) &&
4262 !(diri
->get_client_cap_pending(client
) & (CEPH_CAP_FILE_SHARED
| CEPH_CAP_FILE_EXCL
))) {
4264 CDentry::linkage_t
*dnl
= dn
->get_linkage(client
, mdr
);
4265 if (dnl
->is_primary()) {
4266 ceph_assert(dnl
->get_inode() == in
);
4267 mask
= CEPH_LEASE_PRIMARY_LINK
;
4269 if (dnl
->is_remote())
4270 ceph_assert(dnl
->get_remote_ino() == in
->ino());
4274 // issue a dentry lease
4275 ClientLease
*l
= dn
->add_client_lease(client
, session
);
4276 session
->touch_lease(l
);
4278 int pool
= 1; // fixme.. do something smart!
4279 now
+= mdcache
->client_lease_durations
[pool
];
4280 mdcache
->touch_client_lease(l
, pool
, now
);
4283 lstat
.mask
= CEPH_LEASE_VALID
| mask
;
4284 lstat
.duration_ms
= (uint32_t)(1000 * mdcache
->client_lease_durations
[pool
]);
4285 lstat
.seq
= ++l
->seq
;
4286 lstat
.alternate_name
= std::string(dn
->alternate_name
);
4287 encode_lease(bl
, session
->info
, lstat
);
4288 dout(20) << "issue_client_lease seq " << lstat
.seq
<< " dur " << lstat
.duration_ms
<< "ms "
4289 << " on " << *dn
<< dendl
;
4294 lstat
.alternate_name
= std::string(dn
->alternate_name
);
4295 encode_lease(bl
, session
->info
, lstat
);
4296 dout(20) << "issue_client_lease no/null lease on " << *dn
<< dendl
;
4301 void Locker::revoke_client_leases(SimpleLock
*lock
)
4304 CDentry
*dn
= static_cast<CDentry
*>(lock
->get_parent());
4305 for (map
<client_t
, ClientLease
*>::iterator p
= dn
->client_lease_map
.begin();
4306 p
!= dn
->client_lease_map
.end();
4308 ClientLease
*l
= p
->second
;
4311 ceph_assert(lock
->get_type() == CEPH_LOCK_DN
);
4313 CDentry
*dn
= static_cast<CDentry
*>(lock
->get_parent());
4314 int mask
= 1 | CEPH_LOCK_DN
; // old and new bits
4316 // i should also revoke the dir ICONTENT lease, if they have it!
4317 CInode
*diri
= dn
->get_dir()->get_inode();
4318 auto lease
= make_message
<MClientLease
>(CEPH_MDS_LEASE_REVOKE
, l
->seq
, mask
, diri
->ino(), diri
->first
, CEPH_NOSNAP
, dn
->get_name());
4319 mds
->send_message_client_counted(lease
, l
->client
);
4323 void Locker::encode_lease(bufferlist
& bl
, const session_info_t
& info
,
4324 const LeaseStat
& ls
)
4326 if (info
.has_feature(CEPHFS_FEATURE_REPLY_ENCODING
)) {
4327 ENCODE_START(2, 1, bl
);
4328 encode(ls
.mask
, bl
);
4329 encode(ls
.duration_ms
, bl
);
4331 encode(ls
.alternate_name
, bl
);
4335 encode(ls
.mask
, bl
);
4336 encode(ls
.duration_ms
, bl
);
4341 // locks ----------------------------------------------------------------
4343 SimpleLock
*Locker::get_lock(int lock_type
, const MDSCacheObjectInfo
&info
)
4345 switch (lock_type
) {
4348 // be careful; info.dirfrag may have incorrect frag; recalculate based on dname.
4349 CInode
*diri
= mdcache
->get_inode(info
.dirfrag
.ino
);
4354 fg
= diri
->pick_dirfrag(info
.dname
);
4355 dir
= diri
->get_dirfrag(fg
);
4357 dn
= dir
->lookup(info
.dname
, info
.snapid
);
4360 dout(7) << "get_lock don't have dn " << info
.dirfrag
.ino
<< " " << info
.dname
<< dendl
;
4366 case CEPH_LOCK_IAUTH
:
4367 case CEPH_LOCK_ILINK
:
4368 case CEPH_LOCK_IDFT
:
4369 case CEPH_LOCK_IFILE
:
4370 case CEPH_LOCK_INEST
:
4371 case CEPH_LOCK_IXATTR
:
4372 case CEPH_LOCK_ISNAP
:
4373 case CEPH_LOCK_IFLOCK
:
4374 case CEPH_LOCK_IPOLICY
:
4376 CInode
*in
= mdcache
->get_inode(info
.ino
, info
.snapid
);
4378 dout(7) << "get_lock don't have ino " << info
.ino
<< dendl
;
4381 switch (lock_type
) {
4382 case CEPH_LOCK_IAUTH
: return &in
->authlock
;
4383 case CEPH_LOCK_ILINK
: return &in
->linklock
;
4384 case CEPH_LOCK_IDFT
: return &in
->dirfragtreelock
;
4385 case CEPH_LOCK_IFILE
: return &in
->filelock
;
4386 case CEPH_LOCK_INEST
: return &in
->nestlock
;
4387 case CEPH_LOCK_IXATTR
: return &in
->xattrlock
;
4388 case CEPH_LOCK_ISNAP
: return &in
->snaplock
;
4389 case CEPH_LOCK_IFLOCK
: return &in
->flocklock
;
4390 case CEPH_LOCK_IPOLICY
: return &in
->policylock
;
4395 dout(7) << "get_lock don't know lock_type " << lock_type
<< dendl
;
4403 void Locker::handle_lock(const cref_t
<MLock
> &m
)
4405 // nobody should be talking to us during recovery.
4406 ceph_assert(mds
->is_rejoin() || mds
->is_clientreplay() || mds
->is_active() || mds
->is_stopping());
4408 SimpleLock
*lock
= get_lock(m
->get_lock_type(), m
->get_object_info());
4410 dout(10) << "don't have object " << m
->get_object_info() << ", must have trimmed, dropping" << dendl
;
4414 switch (lock
->get_type()) {
4416 case CEPH_LOCK_IAUTH
:
4417 case CEPH_LOCK_ILINK
:
4418 case CEPH_LOCK_ISNAP
:
4419 case CEPH_LOCK_IXATTR
:
4420 case CEPH_LOCK_IFLOCK
:
4421 case CEPH_LOCK_IPOLICY
:
4422 handle_simple_lock(lock
, m
);
4425 case CEPH_LOCK_IDFT
:
4426 case CEPH_LOCK_INEST
:
4427 //handle_scatter_lock((ScatterLock*)lock, m);
4430 case CEPH_LOCK_IFILE
:
4431 handle_file_lock(static_cast<ScatterLock
*>(lock
), m
);
4435 dout(7) << "handle_lock got otype " << m
->get_lock_type() << dendl
;
4445 // ==========================================================================
4448 /** This function may take a reference to m if it needs one, but does
4449 * not put references. */
4450 void Locker::handle_reqrdlock(SimpleLock
*lock
, const cref_t
<MLock
> &m
)
4452 MDSCacheObject
*parent
= lock
->get_parent();
4453 if (parent
->is_auth() &&
4454 lock
->get_state() != LOCK_SYNC
&&
4455 !parent
->is_frozen()) {
4456 dout(7) << "handle_reqrdlock got rdlock request on " << *lock
4457 << " on " << *parent
<< dendl
;
4458 ceph_assert(parent
->is_auth()); // replica auth pinned if they're doing this!
4459 if (lock
->is_stable()) {
4462 dout(7) << "handle_reqrdlock delaying request until lock is stable" << dendl
;
4463 lock
->add_waiter(SimpleLock::WAIT_STABLE
| MDSCacheObject::WAIT_UNFREEZE
,
4464 new C_MDS_RetryMessage(mds
, m
));
4467 dout(7) << "handle_reqrdlock dropping rdlock request on " << *lock
4468 << " on " << *parent
<< dendl
;
4469 // replica should retry
4473 void Locker::handle_simple_lock(SimpleLock
*lock
, const cref_t
<MLock
> &m
)
4475 int from
= m
->get_asker();
4477 dout(10) << "handle_simple_lock " << *m
4478 << " on " << *lock
<< " " << *lock
->get_parent() << dendl
;
4480 if (mds
->is_rejoin()) {
4481 if (lock
->get_parent()->is_rejoining()) {
4482 dout(7) << "handle_simple_lock still rejoining " << *lock
->get_parent()
4483 << ", dropping " << *m
<< dendl
;
4488 switch (m
->get_action()) {
4491 ceph_assert(lock
->get_state() == LOCK_LOCK
);
4492 lock
->decode_locked_state(m
->get_data());
4493 lock
->set_state(LOCK_SYNC
);
4494 lock
->finish_waiters(SimpleLock::WAIT_RD
|SimpleLock::WAIT_STABLE
);
4498 ceph_assert(lock
->get_state() == LOCK_SYNC
);
4499 lock
->set_state(LOCK_SYNC_LOCK
);
4500 if (lock
->is_leased())
4501 revoke_client_leases(lock
);
4502 eval_gather(lock
, true);
4503 if (lock
->is_unstable_and_locked()) {
4504 if (lock
->is_cached())
4505 invalidate_lock_caches(lock
);
4506 mds
->mdlog
->flush();
4512 case LOCK_AC_LOCKACK
:
4513 ceph_assert(lock
->get_state() == LOCK_SYNC_LOCK
||
4514 lock
->get_state() == LOCK_SYNC_EXCL
);
4515 ceph_assert(lock
->is_gathering(from
));
4516 lock
->remove_gather(from
);
4518 if (lock
->is_gathering()) {
4519 dout(7) << "handle_simple_lock " << *lock
<< " on " << *lock
->get_parent() << " from " << from
4520 << ", still gathering " << lock
->get_gather_set() << dendl
;
4522 dout(7) << "handle_simple_lock " << *lock
<< " on " << *lock
->get_parent() << " from " << from
4523 << ", last one" << dendl
;
4528 case LOCK_AC_REQRDLOCK
:
4529 handle_reqrdlock(lock
, m
);
4535 /* unused, currently.
4537 class C_Locker_SimpleEval : public Context {
4541 C_Locker_SimpleEval(Locker *l, SimpleLock *lk) : locker(l), lock(lk) {}
4542 void finish(int r) {
4543 locker->try_simple_eval(lock);
4547 void Locker::try_simple_eval(SimpleLock *lock)
4549 // unstable and ambiguous auth?
4550 if (!lock->is_stable() &&
4551 lock->get_parent()->is_ambiguous_auth()) {
4552 dout(7) << "simple_eval not stable and ambiguous auth, waiting on " << *lock->get_parent() << dendl;
4553 //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
4554 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_SimpleEval(this, lock));
4558 if (!lock->get_parent()->is_auth()) {
4559 dout(7) << "try_simple_eval not auth for " << *lock->get_parent() << dendl;
4563 if (!lock->get_parent()->can_auth_pin()) {
4564 dout(7) << "try_simple_eval can't auth_pin, waiting on " << *lock->get_parent() << dendl;
4565 //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
4566 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_SimpleEval(this, lock));
4570 if (lock->is_stable())
4576 void Locker::simple_eval(SimpleLock
*lock
, bool *need_issue
)
4578 dout(10) << "simple_eval " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4580 ceph_assert(lock
->get_parent()->is_auth());
4581 ceph_assert(lock
->is_stable());
4583 if (lock
->get_parent()->is_freezing_or_frozen()) {
4584 // dentry/snap lock in unreadable state can block path traverse
4585 if ((lock
->get_type() != CEPH_LOCK_DN
&&
4586 lock
->get_type() != CEPH_LOCK_ISNAP
&&
4587 lock
->get_type() != CEPH_LOCK_IPOLICY
) ||
4588 lock
->get_state() == LOCK_SYNC
||
4589 lock
->get_parent()->is_frozen())
4593 if (mdcache
->is_readonly()) {
4594 if (lock
->get_state() != LOCK_SYNC
) {
4595 dout(10) << "simple_eval read-only FS, syncing " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4596 simple_sync(lock
, need_issue
);
4603 if (lock
->get_cap_shift()) {
4604 in
= static_cast<CInode
*>(lock
->get_parent());
4605 in
->get_caps_wanted(&wanted
, NULL
, lock
->get_cap_shift());
4609 if (lock
->get_state() != LOCK_EXCL
&&
4610 in
&& in
->get_target_loner() >= 0 &&
4611 (wanted
& CEPH_CAP_GEXCL
)) {
4612 dout(7) << "simple_eval stable, going to excl " << *lock
4613 << " on " << *lock
->get_parent() << dendl
;
4614 simple_excl(lock
, need_issue
);
4618 else if (lock
->get_state() != LOCK_SYNC
&&
4619 !lock
->is_wrlocked() &&
4620 ((!(wanted
& CEPH_CAP_GEXCL
) && !lock
->is_waiter_for(SimpleLock::WAIT_WR
)) ||
4621 (lock
->get_state() == LOCK_EXCL
&& in
&& in
->get_target_loner() < 0))) {
4622 dout(7) << "simple_eval stable, syncing " << *lock
4623 << " on " << *lock
->get_parent() << dendl
;
4624 simple_sync(lock
, need_issue
);
4631 bool Locker::simple_sync(SimpleLock
*lock
, bool *need_issue
)
4633 dout(7) << "simple_sync on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4634 ceph_assert(lock
->get_parent()->is_auth());
4635 ceph_assert(lock
->is_stable());
4638 if (lock
->get_cap_shift())
4639 in
= static_cast<CInode
*>(lock
->get_parent());
4641 int old_state
= lock
->get_state();
4643 if (old_state
!= LOCK_TSYN
) {
4645 switch (lock
->get_state()) {
4646 case LOCK_MIX
: lock
->set_state(LOCK_MIX_SYNC
); break;
4647 case LOCK_LOCK
: lock
->set_state(LOCK_LOCK_SYNC
); break;
4648 case LOCK_XSYN
: lock
->set_state(LOCK_XSYN_SYNC
); break;
4649 case LOCK_EXCL
: lock
->set_state(LOCK_EXCL_SYNC
); break;
4650 default: ceph_abort();
4654 if (lock
->is_wrlocked()) {
4656 if (lock
->is_cached())
4657 invalidate_lock_caches(lock
);
4659 // After a client request is early replied the mdlog won't be flushed
4660 // immediately, but before safe replied the request will hold the write
4661 // locks. So if the client sends another request to a different MDS
4662 // daemon, which then needs to request read lock from current MDS daemon,
4663 // then that daemon maybe stuck at most for 5 seconds. Which will lead
4664 // the client stuck at most 5 seconds.
4666 // Let's try to flush the mdlog when the write lock is held, which will
4667 // release the write locks after mdlog is successfully flushed.
4668 mds
->mdlog
->flush();
4671 if (lock
->get_parent()->is_replicated() && old_state
== LOCK_MIX
) {
4672 send_lock_message(lock
, LOCK_AC_SYNC
);
4673 lock
->init_gather();
4677 if (in
&& in
->is_head()) {
4678 if (in
->issued_caps_need_gather(lock
)) {
4687 bool need_recover
= false;
4688 if (lock
->get_type() == CEPH_LOCK_IFILE
) {
4690 if (in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
4691 mds
->mdcache
->queue_file_recover(in
);
4692 need_recover
= true;
4697 if (!gather
&& lock
->is_dirty()) {
4698 lock
->get_parent()->auth_pin(lock
);
4699 scatter_writebehind(static_cast<ScatterLock
*>(lock
));
4704 lock
->get_parent()->auth_pin(lock
);
4706 mds
->mdcache
->do_file_recover();
4711 if (lock
->get_parent()->is_replicated()) { // FIXME
4713 lock
->encode_locked_state(data
);
4714 send_lock_message(lock
, LOCK_AC_SYNC
, data
);
4716 lock
->set_state(LOCK_SYNC
);
4717 lock
->finish_waiters(SimpleLock::WAIT_RD
|SimpleLock::WAIT_STABLE
);
4718 if (in
&& in
->is_head()) {
4727 void Locker::simple_excl(SimpleLock
*lock
, bool *need_issue
)
4729 dout(7) << "simple_excl on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4730 ceph_assert(lock
->get_parent()->is_auth());
4731 ceph_assert(lock
->is_stable());
4734 if (lock
->get_cap_shift())
4735 in
= static_cast<CInode
*>(lock
->get_parent());
4737 switch (lock
->get_state()) {
4738 case LOCK_LOCK
: lock
->set_state(LOCK_LOCK_EXCL
); break;
4739 case LOCK_SYNC
: lock
->set_state(LOCK_SYNC_EXCL
); break;
4740 case LOCK_XSYN
: lock
->set_state(LOCK_XSYN_EXCL
); break;
4741 default: ceph_abort();
4745 if (lock
->is_rdlocked())
4747 if (lock
->is_wrlocked())
4749 if (gather
&& lock
->is_cached())
4750 invalidate_lock_caches(lock
);
4752 if (lock
->get_parent()->is_replicated() &&
4753 lock
->get_state() != LOCK_LOCK_EXCL
&&
4754 lock
->get_state() != LOCK_XSYN_EXCL
) {
4755 send_lock_message(lock
, LOCK_AC_LOCK
);
4756 lock
->init_gather();
4760 if (in
&& in
->is_head()) {
4761 if (in
->issued_caps_need_gather(lock
)) {
4771 lock
->get_parent()->auth_pin(lock
);
4773 lock
->set_state(LOCK_EXCL
);
4774 lock
->finish_waiters(SimpleLock::WAIT_WR
|SimpleLock::WAIT_STABLE
);
4784 void Locker::simple_lock(SimpleLock
*lock
, bool *need_issue
)
4786 dout(7) << "simple_lock on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4787 ceph_assert(lock
->get_parent()->is_auth());
4788 ceph_assert(lock
->is_stable());
4789 ceph_assert(lock
->get_state() != LOCK_LOCK
);
4792 if (lock
->get_cap_shift())
4793 in
= static_cast<CInode
*>(lock
->get_parent());
4795 int old_state
= lock
->get_state();
4797 switch (lock
->get_state()) {
4798 case LOCK_SYNC
: lock
->set_state(LOCK_SYNC_LOCK
); break;
4799 case LOCK_XSYN
: lock
->set_state(LOCK_XSYN_LOCK
); break;
4800 case LOCK_EXCL
: lock
->set_state(LOCK_EXCL_LOCK
); break;
4801 case LOCK_MIX
: lock
->set_state(LOCK_MIX_LOCK
);
4802 (static_cast<ScatterLock
*>(lock
))->clear_unscatter_wanted();
4804 case LOCK_TSYN
: lock
->set_state(LOCK_TSYN_LOCK
); break;
4805 default: ceph_abort();
4809 if (lock
->is_leased()) {
4811 revoke_client_leases(lock
);
4813 if (lock
->is_rdlocked()) {
4814 if (lock
->is_cached())
4815 invalidate_lock_caches(lock
);
4818 if (in
&& in
->is_head()) {
4819 if (in
->issued_caps_need_gather(lock
)) {
4828 bool need_recover
= false;
4829 if (lock
->get_type() == CEPH_LOCK_IFILE
) {
4831 if(in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
4832 mds
->mdcache
->queue_file_recover(in
);
4833 need_recover
= true;
4838 if (lock
->get_parent()->is_replicated() &&
4839 lock
->get_state() == LOCK_MIX_LOCK
&&
4841 dout(10) << " doing local stage of mix->lock gather before gathering from replicas" << dendl
;
4843 // move to second stage of gather now, so we don't send the lock action later.
4844 if (lock
->get_state() == LOCK_MIX_LOCK
)
4845 lock
->set_state(LOCK_MIX_LOCK2
);
4847 if (lock
->get_parent()->is_replicated() &&
4848 lock
->get_sm()->states
[old_state
].replica_state
!= LOCK_LOCK
) { // replica may already be LOCK
4850 send_lock_message(lock
, LOCK_AC_LOCK
);
4851 lock
->init_gather();
4855 if (!gather
&& lock
->is_dirty()) {
4856 lock
->get_parent()->auth_pin(lock
);
4857 scatter_writebehind(static_cast<ScatterLock
*>(lock
));
4862 lock
->get_parent()->auth_pin(lock
);
4864 mds
->mdcache
->do_file_recover();
4866 lock
->set_state(LOCK_LOCK
);
4867 lock
->finish_waiters(ScatterLock::WAIT_XLOCK
|ScatterLock::WAIT_WR
|ScatterLock::WAIT_STABLE
);
4872 void Locker::simple_xlock(SimpleLock
*lock
)
4874 dout(7) << "simple_xlock on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4875 ceph_assert(lock
->get_parent()->is_auth());
4876 //assert(lock->is_stable());
4877 ceph_assert(lock
->get_state() != LOCK_XLOCK
);
4880 if (lock
->get_cap_shift())
4881 in
= static_cast<CInode
*>(lock
->get_parent());
4883 if (lock
->is_stable())
4884 lock
->get_parent()->auth_pin(lock
);
4886 switch (lock
->get_state()) {
4888 case LOCK_XLOCKDONE
: lock
->set_state(LOCK_LOCK_XLOCK
); break;
4889 default: ceph_abort();
4893 if (lock
->is_rdlocked())
4895 if (lock
->is_wrlocked())
4897 if (gather
&& lock
->is_cached())
4898 invalidate_lock_caches(lock
);
4900 if (in
&& in
->is_head()) {
4901 if (in
->issued_caps_need_gather(lock
)) {
4908 lock
->set_state(LOCK_PREXLOCK
);
4909 //assert("shouldn't be called if we are already xlockable" == 0);
4917 // ==========================================================================
4922 Some notes on scatterlocks.
4924 - The scatter/gather is driven by the inode lock. The scatter always
4925 brings in the latest metadata from the fragments.
4927 - When in a scattered/MIX state, fragments are only allowed to
4928 update/be written to if the accounted stat matches the inode's
4931 - That means, on gather, we _only_ assimilate diffs for frag metadata
4932 that match the current version, because those are the only ones
4933 written during this scatter/gather cycle. (Others didn't permit
4934 it.) We increment the version and journal this to disk.
4936 - When possible, we also simultaneously update our local frag
4937 accounted stats to match.
4939 - On scatter, the new inode info is broadcast to frags, both local
4940 and remote. If possible (auth and !frozen), the dirfrag auth
4941 should update the accounted state (if it isn't already up to date).
4942 Note that this may occur on both the local inode auth node and
4943 inode replicas, so there are two potential paths. If it is NOT
4944 possible, they need to mark_stale to prevent any possible writes.
4946 - A scatter can be to MIX (potentially writeable) or to SYNC (read
4947 only). Both are opportunities to update the frag accounted stats,
4948 even though only the MIX case is affected by a stale dirfrag.
4950 - Because many scatter/gather cycles can potentially go by without a
4951 frag being able to update its accounted stats (due to being frozen
4952 by exports/refragments in progress), the frag may have (even very)
4953 old stat versions. That's fine. If when we do want to update it,
4954 we can update accounted_* and the version first.
4958 class C_Locker_ScatterWB
: public LockerLogContext
{
4962 C_Locker_ScatterWB(Locker
*l
, ScatterLock
*sl
, MutationRef
& m
) :
4963 LockerLogContext(l
), lock(sl
), mut(m
) {}
4964 void finish(int r
) override
{
4965 locker
->scatter_writebehind_finish(lock
, mut
);
4969 void Locker::scatter_writebehind(ScatterLock
*lock
)
4971 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4972 dout(10) << "scatter_writebehind " << in
->get_inode()->mtime
<< " on " << *lock
<< " on " << *in
<< dendl
;
4975 MutationRef
mut(new MutationImpl());
4976 mut
->ls
= mds
->mdlog
->get_current_segment();
4978 // forcefully take a wrlock
4979 lock
->get_wrlock(true);
4980 mut
->emplace_lock(lock
, MutationImpl::LockOp::WRLOCK
);
4982 in
->pre_cow_old_inode(); // avoid cow mayhem
4984 auto pi
= in
->project_inode(mut
);
4985 pi
.inode
->version
= in
->pre_dirty();
4987 in
->finish_scatter_gather_update(lock
->get_type(), mut
);
4988 lock
->start_flush();
4990 EUpdate
*le
= new EUpdate(mds
->mdlog
, "scatter_writebehind");
4991 mds
->mdlog
->start_entry(le
);
4993 mdcache
->predirty_journal_parents(mut
, &le
->metablob
, in
, 0, PREDIRTY_PRIMARY
);
4994 mdcache
->journal_dirty_inode(mut
.get(), &le
->metablob
, in
);
4996 in
->finish_scatter_gather_update_accounted(lock
->get_type(), &le
->metablob
);
4998 mds
->mdlog
->submit_entry(le
, new C_Locker_ScatterWB(this, lock
, mut
));
4999 mds
->mdlog
->flush();
5002 void Locker::scatter_writebehind_finish(ScatterLock
*lock
, MutationRef
& mut
)
5004 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
5005 dout(10) << "scatter_writebehind_finish on " << *lock
<< " on " << *in
<< dendl
;
5009 lock
->finish_flush();
5011 // if replicas may have flushed in a mix->lock state, send another
5012 // message so they can finish_flush().
5013 if (in
->is_replicated()) {
5014 switch (lock
->get_state()) {
5016 case LOCK_MIX_LOCK2
:
5019 send_lock_message(lock
, LOCK_AC_LOCKFLUSHED
);
5023 drop_locks(mut
.get());
5026 if (lock
->is_stable())
5027 lock
->finish_waiters(ScatterLock::WAIT_STABLE
);
5029 //scatter_eval_gather(lock);
5032 void Locker::scatter_eval(ScatterLock
*lock
, bool *need_issue
)
5034 dout(10) << "scatter_eval " << *lock
<< " on " << *lock
->get_parent() << dendl
;
5036 ceph_assert(lock
->get_parent()->is_auth());
5037 ceph_assert(lock
->is_stable());
5039 if (lock
->get_parent()->is_freezing_or_frozen()) {
5040 dout(20) << " freezing|frozen" << dendl
;
5044 if (mdcache
->is_readonly()) {
5045 if (lock
->get_state() != LOCK_SYNC
) {
5046 dout(10) << "scatter_eval read-only FS, syncing " << *lock
<< " on " << *lock
->get_parent() << dendl
;
5047 simple_sync(lock
, need_issue
);
5052 if (!lock
->is_rdlocked() &&
5053 lock
->get_state() != LOCK_MIX
&&
5054 lock
->get_scatter_wanted()) {
5055 dout(10) << "scatter_eval scatter_wanted, bump to mix " << *lock
5056 << " on " << *lock
->get_parent() << dendl
;
5057 scatter_mix(lock
, need_issue
);
5061 if (lock
->get_type() == CEPH_LOCK_INEST
) {
5062 // in general, we want to keep INEST writable at all times.
5063 if (!lock
->is_rdlocked()) {
5064 if (lock
->get_parent()->is_replicated()) {
5065 if (lock
->get_state() != LOCK_MIX
)
5066 scatter_mix(lock
, need_issue
);
5068 if (lock
->get_state() != LOCK_LOCK
)
5069 simple_lock(lock
, need_issue
);
5075 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
5076 if (!in
->has_subtree_or_exporting_dirfrag() || in
->is_base()) {
5077 // i _should_ be sync.
5078 if (!lock
->is_wrlocked() &&
5079 lock
->get_state() != LOCK_SYNC
) {
5080 dout(10) << "scatter_eval no wrlocks|xlocks, not subtree root inode, syncing" << dendl
;
5081 simple_sync(lock
, need_issue
);
5088 * mark a scatterlock to indicate that the dir fnode has some dirty data
5090 void Locker::mark_updated_scatterlock(ScatterLock
*lock
)
5093 if (lock
->get_updated_item()->is_on_list()) {
5094 dout(10) << "mark_updated_scatterlock " << *lock
5095 << " - already on list since " << lock
->get_update_stamp() << dendl
;
5097 updated_scatterlocks
.push_back(lock
->get_updated_item());
5098 utime_t now
= ceph_clock_now();
5099 lock
->set_update_stamp(now
);
5100 dout(10) << "mark_updated_scatterlock " << *lock
5101 << " - added at " << now
<< dendl
;
5106 * this is called by scatter_tick and LogSegment::try_to_trim() when
5107 * trying to flush dirty scattered data (i.e. updated fnode) back to
5110 * we need to lock|scatter in order to push fnode changes into the
5113 void Locker::scatter_nudge(ScatterLock
*lock
, MDSContext
*c
, bool forcelockchange
)
5115 CInode
*p
= static_cast<CInode
*>(lock
->get_parent());
5117 if (p
->is_frozen() || p
->is_freezing()) {
5118 dout(10) << "scatter_nudge waiting for unfreeze on " << *p
<< dendl
;
5120 p
->add_waiter(MDSCacheObject::WAIT_UNFREEZE
, c
);
5121 else if (lock
->is_dirty())
5122 // just requeue. not ideal.. starvation prone..
5123 updated_scatterlocks
.push_back(lock
->get_updated_item());
5127 if (p
->is_ambiguous_auth()) {
5128 dout(10) << "scatter_nudge waiting for single auth on " << *p
<< dendl
;
5130 p
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
, c
);
5131 else if (lock
->is_dirty())
5132 // just requeue. not ideal.. starvation prone..
5133 updated_scatterlocks
.push_back(lock
->get_updated_item());
5140 if (lock
->is_stable()) {
5141 // can we do it now?
5142 // (only if we're not replicated.. if we are, we really do need
5143 // to nudge the lock state!)
5145 actually, even if we're not replicated, we can't stay in MIX, because another mds
5146 could discover and replicate us at any time. if that happens while we're flushing,
5147 they end up in MIX but their inode has the old scatterstat version.
5149 if (!forcelockchange && !lock->get_parent()->is_replicated() && lock->can_wrlock(-1)) {
5150 dout(10) << "scatter_nudge auth, propagating " << *lock << " on " << *p << dendl;
5151 scatter_writebehind(lock);
5153 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
5158 if (mdcache
->is_readonly()) {
5159 if (lock
->get_state() != LOCK_SYNC
) {
5160 dout(10) << "scatter_nudge auth, read-only FS, syncing " << *lock
<< " on " << *p
<< dendl
;
5161 simple_sync(static_cast<ScatterLock
*>(lock
));
5166 // adjust lock state
5167 dout(10) << "scatter_nudge auth, scatter/unscattering " << *lock
<< " on " << *p
<< dendl
;
5168 switch (lock
->get_type()) {
5169 case CEPH_LOCK_IFILE
:
5170 if (p
->is_replicated() && lock
->get_state() != LOCK_MIX
)
5171 scatter_mix(static_cast<ScatterLock
*>(lock
));
5172 else if (lock
->get_state() != LOCK_LOCK
)
5173 simple_lock(static_cast<ScatterLock
*>(lock
));
5175 simple_sync(static_cast<ScatterLock
*>(lock
));
5178 case CEPH_LOCK_IDFT
:
5179 case CEPH_LOCK_INEST
:
5180 if (p
->is_replicated() && lock
->get_state() != LOCK_MIX
)
5182 else if (lock
->get_state() != LOCK_LOCK
)
5191 if (lock
->is_stable() && count
== 2) {
5192 dout(10) << "scatter_nudge oh, stable after two cycles." << dendl
;
5193 // this should only realy happen when called via
5194 // handle_file_lock due to AC_NUDGE, because the rest of the
5195 // time we are replicated or have dirty data and won't get
5196 // called. bailing here avoids an infinite loop.
5201 dout(10) << "scatter_nudge auth, waiting for stable " << *lock
<< " on " << *p
<< dendl
;
5203 lock
->add_waiter(SimpleLock::WAIT_STABLE
, c
);
5208 dout(10) << "scatter_nudge replica, requesting scatter/unscatter of "
5209 << *lock
<< " on " << *p
<< dendl
;
5210 // request unscatter?
5211 mds_rank_t auth
= lock
->get_parent()->authority().first
;
5212 if (!mds
->is_cluster_degraded() || mds
->mdsmap
->is_clientreplay_or_active_or_stopping(auth
)) {
5213 mds
->send_message_mds(make_message
<MLock
>(lock
, LOCK_AC_NUDGE
, mds
->get_nodeid()), auth
);
5218 lock
->add_waiter(SimpleLock::WAIT_STABLE
, c
);
5220 // also, requeue, in case we had wrong auth or something
5221 if (lock
->is_dirty())
5222 updated_scatterlocks
.push_back(lock
->get_updated_item());
5226 void Locker::scatter_tick()
5228 dout(10) << "scatter_tick" << dendl
;
5231 utime_t now
= ceph_clock_now();
5232 int n
= updated_scatterlocks
.size();
5233 while (!updated_scatterlocks
.empty()) {
5234 ScatterLock
*lock
= updated_scatterlocks
.front();
5236 if (n
-- == 0) break; // scatter_nudge() may requeue; avoid looping
5238 if (!lock
->is_dirty()) {
5239 updated_scatterlocks
.pop_front();
5240 dout(10) << " removing from updated_scatterlocks "
5241 << *lock
<< " " << *lock
->get_parent() << dendl
;
5244 if (now
- lock
->get_update_stamp() < g_conf()->mds_scatter_nudge_interval
)
5246 updated_scatterlocks
.pop_front();
5247 scatter_nudge(lock
, 0);
5249 mds
->mdlog
->flush();
5253 void Locker::scatter_tempsync(ScatterLock
*lock
, bool *need_issue
)
5255 dout(10) << "scatter_tempsync " << *lock
5256 << " on " << *lock
->get_parent() << dendl
;
5257 ceph_assert(lock
->get_parent()->is_auth());
5258 ceph_assert(lock
->is_stable());
5260 ceph_abort_msg("not fully implemented, at least not for filelock");
5262 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
5264 switch (lock
->get_state()) {
5265 case LOCK_SYNC
: ceph_abort(); // this shouldn't happen
5266 case LOCK_LOCK
: lock
->set_state(LOCK_LOCK_TSYN
); break;
5267 case LOCK_MIX
: lock
->set_state(LOCK_MIX_TSYN
); break;
5268 default: ceph_abort();
5272 if (lock
->is_wrlocked()) {
5273 if (lock
->is_cached())
5274 invalidate_lock_caches(lock
);
5278 if (lock
->get_cap_shift() &&
5280 in
->issued_caps_need_gather(lock
)) {
5288 if (lock
->get_state() == LOCK_MIX_TSYN
&&
5289 in
->is_replicated()) {
5290 lock
->init_gather();
5291 send_lock_message(lock
, LOCK_AC_LOCK
);
5299 lock
->set_state(LOCK_TSYN
);
5300 lock
->finish_waiters(ScatterLock::WAIT_RD
|ScatterLock::WAIT_STABLE
);
5301 if (lock
->get_cap_shift()) {
5312 // ==========================================================================
5315 void Locker::local_wrlock_grab(LocalLockC
*lock
, MutationRef
& mut
)
5317 dout(7) << "local_wrlock_grab on " << *lock
5318 << " on " << *lock
->get_parent() << dendl
;
5320 ceph_assert(lock
->get_parent()->is_auth());
5321 ceph_assert(lock
->can_wrlock());
5322 lock
->get_wrlock(mut
->get_client());
5324 auto it
= mut
->emplace_lock(lock
, MutationImpl::LockOp::WRLOCK
);
5325 ceph_assert(it
->is_wrlock());
5328 bool Locker::local_wrlock_start(LocalLockC
*lock
, MDRequestRef
& mut
)
5330 dout(7) << "local_wrlock_start on " << *lock
5331 << " on " << *lock
->get_parent() << dendl
;
5333 ceph_assert(lock
->get_parent()->is_auth());
5334 if (lock
->can_wrlock()) {
5335 lock
->get_wrlock(mut
->get_client());
5336 auto it
= mut
->emplace_lock(lock
, MutationImpl::LockOp::WRLOCK
);
5337 ceph_assert(it
->is_wrlock());
5340 lock
->add_waiter(SimpleLock::WAIT_WR
|SimpleLock::WAIT_STABLE
, new C_MDS_RetryRequest(mdcache
, mut
));
5345 void Locker::local_wrlock_finish(const MutationImpl::lock_iterator
& it
, MutationImpl
*mut
)
5347 ceph_assert(it
->is_wrlock());
5348 LocalLockC
*lock
= static_cast<LocalLockC
*>(it
->lock
);
5349 dout(7) << "local_wrlock_finish on " << *lock
5350 << " on " << *lock
->get_parent() << dendl
;
5352 mut
->locks
.erase(it
);
5353 if (lock
->get_num_wrlocks() == 0) {
5354 lock
->finish_waiters(SimpleLock::WAIT_STABLE
|
5355 SimpleLock::WAIT_WR
|
5356 SimpleLock::WAIT_RD
);
5360 bool Locker::local_xlock_start(LocalLockC
*lock
, MDRequestRef
& mut
)
5362 dout(7) << "local_xlock_start on " << *lock
5363 << " on " << *lock
->get_parent() << dendl
;
5365 ceph_assert(lock
->get_parent()->is_auth());
5366 if (!lock
->can_xlock_local()) {
5367 lock
->add_waiter(SimpleLock::WAIT_WR
|SimpleLock::WAIT_STABLE
, new C_MDS_RetryRequest(mdcache
, mut
));
5371 lock
->get_xlock(mut
, mut
->get_client());
5372 mut
->emplace_lock(lock
, MutationImpl::LockOp::XLOCK
);
5376 void Locker::local_xlock_finish(const MutationImpl::lock_iterator
& it
, MutationImpl
*mut
)
5378 ceph_assert(it
->is_xlock());
5379 LocalLockC
*lock
= static_cast<LocalLockC
*>(it
->lock
);
5380 dout(7) << "local_xlock_finish on " << *lock
5381 << " on " << *lock
->get_parent() << dendl
;
5383 mut
->locks
.erase(it
);
5385 lock
->finish_waiters(SimpleLock::WAIT_STABLE
|
5386 SimpleLock::WAIT_WR
|
5387 SimpleLock::WAIT_RD
);
5392 // ==========================================================================
5396 void Locker::file_eval(ScatterLock
*lock
, bool *need_issue
)
5398 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
5399 int loner_wanted
, other_wanted
;
5400 int wanted
= in
->get_caps_wanted(&loner_wanted
, &other_wanted
, CEPH_CAP_SFILE
);
5401 dout(7) << "file_eval wanted=" << gcap_string(wanted
)
5402 << " loner_wanted=" << gcap_string(loner_wanted
)
5403 << " other_wanted=" << gcap_string(other_wanted
)
5404 << " filelock=" << *lock
<< " on " << *lock
->get_parent()
5407 ceph_assert(lock
->get_parent()->is_auth());
5408 ceph_assert(lock
->is_stable());
5410 if (lock
->get_parent()->is_freezing_or_frozen())
5413 if (mdcache
->is_readonly()) {
5414 if (lock
->get_state() != LOCK_SYNC
) {
5415 dout(10) << "file_eval read-only FS, syncing " << *lock
<< " on " << *lock
->get_parent() << dendl
;
5416 simple_sync(lock
, need_issue
);
5422 if (lock
->get_state() == LOCK_EXCL
) {
5423 dout(20) << " is excl" << dendl
;
5424 int loner_issued
, other_issued
, xlocker_issued
;
5425 in
->get_caps_issued(&loner_issued
, &other_issued
, &xlocker_issued
, CEPH_CAP_SFILE
);
5426 dout(7) << "file_eval loner_issued=" << gcap_string(loner_issued
)
5427 << " other_issued=" << gcap_string(other_issued
)
5428 << " xlocker_issued=" << gcap_string(xlocker_issued
)
5430 if (!((loner_wanted
|loner_issued
) & (CEPH_CAP_GEXCL
|CEPH_CAP_GWR
|CEPH_CAP_GBUFFER
)) ||
5431 (other_wanted
& (CEPH_CAP_GEXCL
|CEPH_CAP_GWR
|CEPH_CAP_GRD
)) ||
5432 (in
->is_dir() && in
->multiple_nonstale_caps())) { // FIXME.. :/
5433 dout(20) << " should lose it" << dendl
;
5434 // we should lose it.
5445 // -> any writer means MIX; RD doesn't matter.
5446 if (((other_wanted
|loner_wanted
) & CEPH_CAP_GWR
) ||
5447 lock
->is_waiter_for(SimpleLock::WAIT_WR
))
5448 scatter_mix(lock
, need_issue
);
5449 else if (!lock
->is_wrlocked()) // let excl wrlocks drain first
5450 simple_sync(lock
, need_issue
);
5452 dout(10) << " waiting for wrlock to drain" << dendl
;
5457 else if (lock
->get_state() != LOCK_EXCL
&&
5458 !lock
->is_rdlocked() &&
5459 //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
5460 in
->get_target_loner() >= 0 &&
5462 !in
->has_subtree_or_exporting_dirfrag() :
5463 (wanted
& (CEPH_CAP_GEXCL
|CEPH_CAP_GWR
|CEPH_CAP_GBUFFER
)))) {
5464 dout(7) << "file_eval stable, bump to loner " << *lock
5465 << " on " << *lock
->get_parent() << dendl
;
5466 file_excl(lock
, need_issue
);
5470 else if (lock
->get_state() != LOCK_MIX
&&
5471 !lock
->is_rdlocked() &&
5472 //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
5473 (lock
->get_scatter_wanted() ||
5474 (in
->get_target_loner() < 0 && (wanted
& CEPH_CAP_GWR
)))) {
5475 dout(7) << "file_eval stable, bump to mixed " << *lock
5476 << " on " << *lock
->get_parent() << dendl
;
5477 scatter_mix(lock
, need_issue
);
5481 else if (lock
->get_state() != LOCK_SYNC
&&
5482 !lock
->is_wrlocked() && // drain wrlocks first!
5483 !lock
->is_waiter_for(SimpleLock::WAIT_WR
) &&
5484 !(wanted
& CEPH_CAP_GWR
) &&
5485 !((lock
->get_state() == LOCK_MIX
) &&
5486 in
->is_dir() && in
->has_subtree_or_exporting_dirfrag()) // if we are a delegation point, stay where we are
5487 //((wanted & CEPH_CAP_RD) ||
5488 //in->is_replicated() ||
5489 //lock->is_leased() ||
5490 //(!loner && lock->get_state() == LOCK_EXCL)) &&
5492 dout(7) << "file_eval stable, bump to sync " << *lock
5493 << " on " << *lock
->get_parent() << dendl
;
5494 simple_sync(lock
, need_issue
);
5496 else if (in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
5497 mds
->mdcache
->queue_file_recover(in
);
5498 mds
->mdcache
->do_file_recover();
5504 void Locker::scatter_mix(ScatterLock
*lock
, bool *need_issue
)
5506 dout(7) << "scatter_mix " << *lock
<< " on " << *lock
->get_parent() << dendl
;
5508 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
5509 ceph_assert(in
->is_auth());
5510 ceph_assert(lock
->is_stable());
5512 if (lock
->get_state() == LOCK_LOCK
) {
5513 in
->start_scatter(lock
);
5514 if (in
->is_replicated()) {
5516 bufferlist softdata
;
5517 lock
->encode_locked_state(softdata
);
5519 // bcast to replicas
5520 send_lock_message(lock
, LOCK_AC_MIX
, softdata
);
5524 lock
->set_state(LOCK_MIX
);
5525 lock
->clear_scatter_wanted();
5526 if (lock
->get_cap_shift()) {
5534 switch (lock
->get_state()) {
5535 case LOCK_SYNC
: lock
->set_state(LOCK_SYNC_MIX
); break;
5536 case LOCK_EXCL
: lock
->set_state(LOCK_EXCL_MIX
); break;
5537 case LOCK_XSYN
: lock
->set_state(LOCK_XSYN_MIX
); break;
5538 case LOCK_TSYN
: lock
->set_state(LOCK_TSYN_MIX
); break;
5539 default: ceph_abort();
5543 if (lock
->is_rdlocked()) {
5544 if (lock
->is_cached())
5545 invalidate_lock_caches(lock
);
5548 if (in
->is_replicated()) {
5549 if (lock
->get_state() == LOCK_SYNC_MIX
) { // for the rest states, replicas are already LOCK
5550 send_lock_message(lock
, LOCK_AC_MIX
);
5551 lock
->init_gather();
5555 if (lock
->is_leased()) {
5556 revoke_client_leases(lock
);
5559 if (lock
->get_cap_shift() &&
5561 in
->issued_caps_need_gather(lock
)) {
5568 bool need_recover
= false;
5569 if (in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
5570 mds
->mdcache
->queue_file_recover(in
);
5571 need_recover
= true;
5576 lock
->get_parent()->auth_pin(lock
);
5578 mds
->mdcache
->do_file_recover();
5580 in
->start_scatter(lock
);
5581 lock
->set_state(LOCK_MIX
);
5582 lock
->clear_scatter_wanted();
5583 if (in
->is_replicated()) {
5584 bufferlist softdata
;
5585 lock
->encode_locked_state(softdata
);
5586 send_lock_message(lock
, LOCK_AC_MIX
, softdata
);
5588 if (lock
->get_cap_shift()) {
5599 void Locker::file_excl(ScatterLock
*lock
, bool *need_issue
)
5601 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
5602 dout(7) << "file_excl " << *lock
<< " on " << *lock
->get_parent() << dendl
;
5604 ceph_assert(in
->is_auth());
5605 ceph_assert(lock
->is_stable());
5607 ceph_assert((in
->get_loner() >= 0 && in
->get_mds_caps_wanted().empty()) ||
5608 (lock
->get_state() == LOCK_XSYN
)); // must do xsyn -> excl -> <anything else>
5610 switch (lock
->get_state()) {
5611 case LOCK_SYNC
: lock
->set_state(LOCK_SYNC_EXCL
); break;
5612 case LOCK_MIX
: lock
->set_state(LOCK_MIX_EXCL
); break;
5613 case LOCK_LOCK
: lock
->set_state(LOCK_LOCK_EXCL
); break;
5614 case LOCK_XSYN
: lock
->set_state(LOCK_XSYN_EXCL
); break;
5615 default: ceph_abort();
5619 if (lock
->is_rdlocked())
5621 if (lock
->is_wrlocked())
5623 if (gather
&& lock
->is_cached())
5624 invalidate_lock_caches(lock
);
5626 if (in
->is_replicated() &&
5627 lock
->get_state() != LOCK_LOCK_EXCL
&&
5628 lock
->get_state() != LOCK_XSYN_EXCL
) { // if we were lock, replicas are already lock.
5629 send_lock_message(lock
, LOCK_AC_LOCK
);
5630 lock
->init_gather();
5633 if (lock
->is_leased()) {
5634 revoke_client_leases(lock
);
5637 if (in
->is_head() &&
5638 in
->issued_caps_need_gather(lock
)) {
5645 bool need_recover
= false;
5646 if (in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
5647 mds
->mdcache
->queue_file_recover(in
);
5648 need_recover
= true;
5653 lock
->get_parent()->auth_pin(lock
);
5655 mds
->mdcache
->do_file_recover();
5657 lock
->set_state(LOCK_EXCL
);
5665 void Locker::file_xsyn(SimpleLock
*lock
, bool *need_issue
)
5667 dout(7) << "file_xsyn on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
5668 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
5669 ceph_assert(in
->is_auth());
5670 ceph_assert(in
->get_loner() >= 0 && in
->get_mds_caps_wanted().empty());
5672 switch (lock
->get_state()) {
5673 case LOCK_EXCL
: lock
->set_state(LOCK_EXCL_XSYN
); break;
5674 default: ceph_abort();
5678 if (lock
->is_wrlocked()) {
5679 if (lock
->is_cached())
5680 invalidate_lock_caches(lock
);
5684 if (in
->is_head() &&
5685 in
->issued_caps_need_gather(lock
)) {
5694 lock
->get_parent()->auth_pin(lock
);
5696 lock
->set_state(LOCK_XSYN
);
5697 lock
->finish_waiters(SimpleLock::WAIT_RD
|SimpleLock::WAIT_STABLE
);
5705 void Locker::file_recover(ScatterLock
*lock
)
5707 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
5708 dout(7) << "file_recover " << *lock
<< " on " << *in
<< dendl
;
5710 ceph_assert(in
->is_auth());
5711 //assert(lock->is_stable());
5712 ceph_assert(lock
->get_state() == LOCK_PRE_SCAN
); // only called from MDCache::start_files_to_recover()
5717 if (in->is_replicated()
5718 lock->get_sm()->states[oldstate].replica_state != LOCK_LOCK) {
5719 send_lock_message(lock, LOCK_AC_LOCK);
5720 lock->init_gather();
5724 if (in
->is_head() &&
5725 in
->issued_caps_need_gather(lock
)) {
5730 lock
->set_state(LOCK_SCAN
);
5732 in
->state_set(CInode::STATE_NEEDSRECOVER
);
5734 mds
->mdcache
->queue_file_recover(in
);
5739 void Locker::handle_file_lock(ScatterLock
*lock
, const cref_t
<MLock
> &m
)
5741 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
5742 int from
= m
->get_asker();
5744 if (mds
->is_rejoin()) {
5745 if (in
->is_rejoining()) {
5746 dout(7) << "handle_file_lock still rejoining " << *in
5747 << ", dropping " << *m
<< dendl
;
5752 dout(7) << "handle_file_lock a=" << lock
->get_lock_action_name(m
->get_action())
5754 << " from mds." << from
<< " "
5757 bool caps
= lock
->get_cap_shift();
5759 switch (m
->get_action()) {
5762 ceph_assert(lock
->get_state() == LOCK_LOCK
||
5763 lock
->get_state() == LOCK_MIX
||
5764 lock
->get_state() == LOCK_MIX_SYNC2
);
5766 if (lock
->get_state() == LOCK_MIX
) {
5767 lock
->set_state(LOCK_MIX_SYNC
);
5768 eval_gather(lock
, true);
5769 if (lock
->is_unstable_and_locked()) {
5770 if (lock
->is_cached())
5771 invalidate_lock_caches(lock
);
5772 mds
->mdlog
->flush();
5777 (static_cast<ScatterLock
*>(lock
))->finish_flush();
5778 (static_cast<ScatterLock
*>(lock
))->clear_flushed();
5781 lock
->decode_locked_state(m
->get_data());
5782 lock
->set_state(LOCK_SYNC
);
5787 lock
->finish_waiters(SimpleLock::WAIT_RD
|SimpleLock::WAIT_STABLE
);
5792 switch (lock
->get_state()) {
5793 case LOCK_SYNC
: lock
->set_state(LOCK_SYNC_LOCK
); break;
5794 case LOCK_MIX
: lock
->set_state(LOCK_MIX_LOCK
); break;
5795 default: ceph_abort();
5798 eval_gather(lock
, true);
5799 if (lock
->is_unstable_and_locked()) {
5800 if (lock
->is_cached())
5801 invalidate_lock_caches(lock
);
5802 mds
->mdlog
->flush();
5807 case LOCK_AC_LOCKFLUSHED
:
5808 (static_cast<ScatterLock
*>(lock
))->finish_flush();
5809 (static_cast<ScatterLock
*>(lock
))->clear_flushed();
5810 // wake up scatter_nudge waiters
5811 if (lock
->is_stable())
5812 lock
->finish_waiters(SimpleLock::WAIT_STABLE
);
5816 ceph_assert(lock
->get_state() == LOCK_SYNC
||
5817 lock
->get_state() == LOCK_LOCK
||
5818 lock
->get_state() == LOCK_SYNC_MIX2
);
5820 if (lock
->get_state() == LOCK_SYNC
) {
5822 lock
->set_state(LOCK_SYNC_MIX
);
5823 eval_gather(lock
, true);
5824 if (lock
->is_unstable_and_locked()) {
5825 if (lock
->is_cached())
5826 invalidate_lock_caches(lock
);
5827 mds
->mdlog
->flush();
5833 lock
->set_state(LOCK_MIX
);
5834 lock
->decode_locked_state(m
->get_data());
5839 lock
->finish_waiters(SimpleLock::WAIT_WR
|SimpleLock::WAIT_STABLE
);
5844 case LOCK_AC_LOCKACK
:
5845 ceph_assert(lock
->get_state() == LOCK_SYNC_LOCK
||
5846 lock
->get_state() == LOCK_MIX_LOCK
||
5847 lock
->get_state() == LOCK_MIX_LOCK2
||
5848 lock
->get_state() == LOCK_MIX_EXCL
||
5849 lock
->get_state() == LOCK_SYNC_EXCL
||
5850 lock
->get_state() == LOCK_SYNC_MIX
||
5851 lock
->get_state() == LOCK_MIX_TSYN
);
5852 ceph_assert(lock
->is_gathering(from
));
5853 lock
->remove_gather(from
);
5855 if (lock
->get_state() == LOCK_MIX_LOCK
||
5856 lock
->get_state() == LOCK_MIX_LOCK2
||
5857 lock
->get_state() == LOCK_MIX_EXCL
||
5858 lock
->get_state() == LOCK_MIX_TSYN
) {
5859 lock
->decode_locked_state(m
->get_data());
5860 // replica is waiting for AC_LOCKFLUSHED, eval_gather() should not
5861 // delay calling scatter_writebehind().
5862 lock
->clear_flushed();
5865 if (lock
->is_gathering()) {
5866 dout(7) << "handle_file_lock " << *in
<< " from " << from
5867 << ", still gathering " << lock
->get_gather_set() << dendl
;
5869 dout(7) << "handle_file_lock " << *in
<< " from " << from
5870 << ", last one" << dendl
;
5875 case LOCK_AC_SYNCACK
:
5876 ceph_assert(lock
->get_state() == LOCK_MIX_SYNC
);
5877 ceph_assert(lock
->is_gathering(from
));
5878 lock
->remove_gather(from
);
5880 lock
->decode_locked_state(m
->get_data());
5882 if (lock
->is_gathering()) {
5883 dout(7) << "handle_file_lock " << *in
<< " from " << from
5884 << ", still gathering " << lock
->get_gather_set() << dendl
;
5886 dout(7) << "handle_file_lock " << *in
<< " from " << from
5887 << ", last one" << dendl
;
5892 case LOCK_AC_MIXACK
:
5893 ceph_assert(lock
->get_state() == LOCK_SYNC_MIX
);
5894 ceph_assert(lock
->is_gathering(from
));
5895 lock
->remove_gather(from
);
5897 if (lock
->is_gathering()) {
5898 dout(7) << "handle_file_lock " << *in
<< " from " << from
5899 << ", still gathering " << lock
->get_gather_set() << dendl
;
5901 dout(7) << "handle_file_lock " << *in
<< " from " << from
5902 << ", last one" << dendl
;
5909 case LOCK_AC_REQSCATTER
:
5910 if (lock
->is_stable()) {
5911 /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5912 * because the replica should be holding an auth_pin if they're
5913 * doing this (and thus, we are freezing, not frozen, and indefinite
5914 * starvation isn't an issue).
5916 dout(7) << "handle_file_lock got scatter request on " << *lock
5917 << " on " << *lock
->get_parent() << dendl
;
5918 if (lock
->get_state() != LOCK_MIX
) // i.e., the reqscatter didn't race with an actual mix/scatter
5921 dout(7) << "handle_file_lock got scatter request, !stable, marking scatter_wanted on " << *lock
5922 << " on " << *lock
->get_parent() << dendl
;
5923 lock
->set_scatter_wanted();
5927 case LOCK_AC_REQUNSCATTER
:
5928 if (lock
->is_stable()) {
5929 /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5930 * because the replica should be holding an auth_pin if they're
5931 * doing this (and thus, we are freezing, not frozen, and indefinite
5932 * starvation isn't an issue).
5934 dout(7) << "handle_file_lock got unscatter request on " << *lock
5935 << " on " << *lock
->get_parent() << dendl
;
5936 if (lock
->get_state() == LOCK_MIX
) // i.e., the reqscatter didn't race with an actual mix/scatter
5937 simple_lock(lock
); // FIXME tempsync?
5939 dout(7) << "handle_file_lock ignoring unscatter request on " << *lock
5940 << " on " << *lock
->get_parent() << dendl
;
5941 lock
->set_unscatter_wanted();
5945 case LOCK_AC_REQRDLOCK
:
5946 handle_reqrdlock(lock
, m
);
5950 if (!lock
->get_parent()->is_auth()) {
5951 dout(7) << "handle_file_lock IGNORING nudge on non-auth " << *lock
5952 << " on " << *lock
->get_parent() << dendl
;
5953 } else if (!lock
->get_parent()->is_replicated()) {
5954 dout(7) << "handle_file_lock IGNORING nudge on non-replicated " << *lock
5955 << " on " << *lock
->get_parent() << dendl
;
5957 dout(7) << "handle_file_lock trying nudge on " << *lock
5958 << " on " << *lock
->get_parent() << dendl
;
5959 scatter_nudge(lock
, 0, true);
5960 mds
->mdlog
->flush();