1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
19 #include "common/config.h"
20 #include "events/EOpen.h"
21 #include "events/EUpdate.h"
23 #include "MDBalancer.h"
28 #include "messages/MInodeFileCaps.h"
29 #include "messages/MMDSPeerRequest.h"
31 #include "msg/Messenger.h"
32 #include "osdc/Objecter.h"
34 #define dout_subsys ceph_subsys_mds
36 #define dout_context g_ceph_context
37 #define dout_prefix _prefix(_dout, mds)
41 static ostream
& _prefix(std::ostream
*_dout
, MDSRank
*mds
) {
42 return *_dout
<< "mds." << mds
->get_nodeid() << ".locker ";
46 class LockerContext
: public MDSContext
{
49 MDSRank
*get_mds() override
55 explicit LockerContext(Locker
*locker_
) : locker(locker_
) {
56 ceph_assert(locker
!= NULL
);
60 class LockerLogContext
: public MDSLogContextBase
{
63 MDSRank
*get_mds() override
69 explicit LockerLogContext(Locker
*locker_
) : locker(locker_
) {
70 ceph_assert(locker
!= NULL
);
74 Locker::Locker(MDSRank
*m
, MDCache
*c
) :
75 need_snapflush_inodes(member_offset(CInode
, item_caps
)), mds(m
), mdcache(c
) {}
78 void Locker::dispatch(const cref_t
<Message
> &m
)
81 switch (m
->get_type()) {
84 handle_lock(ref_cast
<MLock
>(m
));
87 case MSG_MDS_INODEFILECAPS
:
88 handle_inode_file_caps(ref_cast
<MInodeFileCaps
>(m
));
91 case CEPH_MSG_CLIENT_CAPS
:
92 handle_client_caps(ref_cast
<MClientCaps
>(m
));
94 case CEPH_MSG_CLIENT_CAPRELEASE
:
95 handle_client_cap_release(ref_cast
<MClientCapRelease
>(m
));
97 case CEPH_MSG_CLIENT_LEASE
:
98 handle_client_lease(ref_cast
<MClientLease
>(m
));
101 derr
<< "locker unknown message " << m
->get_type() << dendl
;
102 ceph_abort_msg("locker unknown message");
119 void Locker::send_lock_message(SimpleLock
*lock
, int msg
)
121 for (const auto &it
: lock
->get_parent()->get_replicas()) {
122 if (mds
->is_cluster_degraded() &&
123 mds
->mdsmap
->get_state(it
.first
) < MDSMap::STATE_REJOIN
)
125 auto m
= make_message
<MLock
>(lock
, msg
, mds
->get_nodeid());
126 mds
->send_message_mds(m
, it
.first
);
130 void Locker::send_lock_message(SimpleLock
*lock
, int msg
, const bufferlist
&data
)
132 for (const auto &it
: lock
->get_parent()->get_replicas()) {
133 if (mds
->is_cluster_degraded() &&
134 mds
->mdsmap
->get_state(it
.first
) < MDSMap::STATE_REJOIN
)
136 auto m
= make_message
<MLock
>(lock
, msg
, mds
->get_nodeid());
138 mds
->send_message_mds(m
, it
.first
);
142 bool Locker::try_rdlock_snap_layout(CInode
*in
, MDRequestRef
& mdr
,
143 int n
, bool want_layout
)
145 dout(10) << __func__
<< " " << *mdr
<< " " << *in
<< dendl
;
146 // rdlock ancestor snaps
149 bool found_locked
= false;
150 bool found_layout
= false;
155 client_t client
= mdr
->get_client();
160 if (!found_locked
&& mdr
->is_rdlocked(&t
->snaplock
))
164 if (!t
->snaplock
.can_rdlock(client
)) {
165 t
->snaplock
.add_waiter(SimpleLock::WAIT_RD
, new C_MDS_RetryRequest(mdcache
, mdr
));
168 t
->snaplock
.get_rdlock();
169 mdr
->locks
.emplace(&t
->snaplock
, MutationImpl::LockOp::RDLOCK
);
170 dout(20) << " got rdlock on " << t
->snaplock
<< " " << *t
<< dendl
;
172 if (want_layout
&& !found_layout
) {
173 if (!mdr
->is_rdlocked(&t
->policylock
)) {
174 if (!t
->policylock
.can_rdlock(client
)) {
175 t
->policylock
.add_waiter(SimpleLock::WAIT_RD
, new C_MDS_RetryRequest(mdcache
, mdr
));
178 t
->policylock
.get_rdlock();
179 mdr
->locks
.emplace(&t
->policylock
, MutationImpl::LockOp::RDLOCK
);
180 dout(20) << " got rdlock on " << t
->policylock
<< " " << *t
<< dendl
;
182 if (t
->get_projected_inode()->has_layout()) {
183 mdr
->dir_layout
= t
->get_projected_inode()->layout
;
187 CDentry
* pdn
= t
->get_projected_parent_dn();
192 t
= pdn
->get_dir()->get_inode();
195 mdr
->dir_root
[n
] = root
;
196 mdr
->dir_depth
[n
] = depth
;
200 dout(10) << __func__
<< " failed" << dendl
;
202 drop_locks(mdr
.get(), nullptr);
203 mdr
->drop_local_auth_pins();
207 struct MarkEventOnDestruct
{
209 std::string_view message
;
211 MarkEventOnDestruct(MDRequestRef
& _mdr
, std::string_view _message
) :
215 ~MarkEventOnDestruct() {
217 mdr
->mark_event(message
);
221 /* If this function returns false, the mdr has been placed
222 * on the appropriate wait list */
223 bool Locker::acquire_locks(MDRequestRef
& mdr
,
224 MutationImpl::LockOpVec
& lov
,
225 CInode
*auth_pin_freeze
,
226 bool auth_pin_nonblocking
)
228 dout(10) << "acquire_locks " << *mdr
<< dendl
;
230 MarkEventOnDestruct
marker(mdr
, "failed to acquire_locks");
232 client_t client
= mdr
->get_client();
234 set
<MDSCacheObject
*> mustpin
; // items to authpin
236 mustpin
.insert(auth_pin_freeze
);
239 for (size_t i
= 0; i
< lov
.size(); ++i
) {
241 SimpleLock
*lock
= p
.lock
;
242 MDSCacheObject
*object
= lock
->get_parent();
245 if ((lock
->get_type() == CEPH_LOCK_ISNAP
||
246 lock
->get_type() == CEPH_LOCK_IPOLICY
) &&
247 mds
->is_cluster_degraded() &&
249 !mdr
->is_queued_for_replay()) {
250 // waiting for recovering mds, to guarantee replayed requests and mksnap/setlayout
251 // get processed in proper order.
253 if (object
->is_auth()) {
254 if (!mdr
->is_xlocked(lock
)) {
256 object
->list_replicas(ls
);
258 if (mds
->mdsmap
->get_state(m
) < MDSMap::STATE_ACTIVE
) {
265 // if the lock is the latest locked one, it's possible that peer mds got the lock
266 // while there are recovering mds.
267 if (!mdr
->is_xlocked(lock
) || mdr
->is_last_locked(lock
))
271 dout(10) << " must xlock " << *lock
<< " " << *object
272 << ", waiting for cluster recovered" << dendl
;
273 mds
->locker
->drop_locks(mdr
.get(), NULL
);
274 mdr
->drop_local_auth_pins();
275 mds
->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache
, mdr
));
280 dout(20) << " must xlock " << *lock
<< " " << *object
<< dendl
;
282 mustpin
.insert(object
);
284 // augment xlock with a versionlock?
285 if (lock
->get_type() == CEPH_LOCK_DN
) {
286 CDentry
*dn
= static_cast<CDentry
*>(object
);
289 if (mdr
->is_leader()) {
290 // leader. wrlock versionlock so we can pipeline dentry updates to journal.
291 lov
.add_wrlock(&dn
->versionlock
, i
+ 1);
293 // peer. exclusively lock the dentry version (i.e. block other journal updates).
294 // this makes rollback safe.
295 lov
.add_xlock(&dn
->versionlock
, i
+ 1);
298 if (lock
->get_type() >= CEPH_LOCK_IFIRST
&& lock
->get_type() != CEPH_LOCK_IVERSION
) {
299 // inode version lock?
300 CInode
*in
= static_cast<CInode
*>(object
);
303 if (mdr
->is_leader()) {
304 // leader. wrlock versionlock so we can pipeline inode updates to journal.
305 lov
.add_wrlock(&in
->versionlock
, i
+ 1);
307 // peer. exclusively lock the inode version (i.e. block other journal updates).
308 // this makes rollback safe.
309 lov
.add_xlock(&in
->versionlock
, i
+ 1);
312 } else if (p
.is_wrlock()) {
313 dout(20) << " must wrlock " << *lock
<< " " << *object
<< dendl
;
314 client_t _client
= p
.is_state_pin() ? lock
->get_excl_client() : client
;
315 if (object
->is_auth()) {
316 mustpin
.insert(object
);
317 } else if (!object
->is_auth() &&
318 !lock
->can_wrlock(_client
) && // we might have to request a scatter
319 !mdr
->is_peer()) { // if we are peer (remote_wrlock), the leader already authpinned
320 dout(15) << " will also auth_pin " << *object
321 << " in case we need to request a scatter" << dendl
;
322 mustpin
.insert(object
);
324 } else if (p
.is_remote_wrlock()) {
325 dout(20) << " must remote_wrlock on mds." << p
.wrlock_target
<< " "
326 << *lock
<< " " << *object
<< dendl
;
327 mustpin
.insert(object
);
328 } else if (p
.is_rdlock()) {
330 dout(20) << " must rdlock " << *lock
<< " " << *object
<< dendl
;
331 if (object
->is_auth()) {
332 mustpin
.insert(object
);
333 } else if (!object
->is_auth() &&
334 !lock
->can_rdlock(client
)) { // we might have to request an rdlock
335 dout(15) << " will also auth_pin " << *object
336 << " in case we need to request a rdlock" << dendl
;
337 mustpin
.insert(object
);
340 ceph_assert(0 == "locker unknown lock operation");
344 lov
.sort_and_merge();
347 map
<mds_rank_t
, set
<MDSCacheObject
*> > mustpin_remote
; // mds -> (object set)
349 // can i auth pin them all now?
350 marker
.message
= "failed to authpin local pins";
351 for (const auto &p
: mustpin
) {
352 MDSCacheObject
*object
= p
;
354 dout(10) << " must authpin " << *object
<< dendl
;
356 if (mdr
->is_auth_pinned(object
)) {
357 if (object
!= (MDSCacheObject
*)auth_pin_freeze
)
359 if (mdr
->more()->is_remote_frozen_authpin
) {
360 if (mdr
->more()->rename_inode
== auth_pin_freeze
)
362 // unfreeze auth pin for the wrong inode
363 mustpin_remote
[mdr
->more()->rename_inode
->authority().first
].size();
367 if (!object
->is_auth()) {
368 if (mdr
->lock_cache
) { // debug
369 ceph_assert(mdr
->lock_cache
->opcode
== CEPH_MDS_OP_UNLINK
);
370 CDentry
*dn
= mdr
->dn
[0].back();
371 ceph_assert(dn
->get_projected_linkage()->is_remote());
374 if (object
->is_ambiguous_auth()) {
376 dout(10) << " ambiguous auth, waiting to authpin " << *object
<< dendl
;
377 mdr
->disable_lock_cache();
378 drop_locks(mdr
.get());
379 mdr
->drop_local_auth_pins();
380 marker
.message
= "waiting for single auth, object is being migrated";
381 object
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
, new C_MDS_RetryRequest(mdcache
, mdr
));
384 mustpin_remote
[object
->authority().first
].insert(object
);
388 if (!object
->can_auth_pin(&err
)) {
389 if (mdr
->lock_cache
) {
391 if (CInode
*in
= dynamic_cast<CInode
*>(object
)) {
392 ceph_assert(!in
->is_frozen_inode() && !in
->is_frozen_auth_pin());
393 dir
= in
->get_projected_parent_dir();
394 } else if (CDentry
*dn
= dynamic_cast<CDentry
*>(object
)) {
397 ceph_assert(0 == "unknown type of lock parent");
399 if (dir
->get_inode() == mdr
->lock_cache
->get_dir_inode()) {
400 // forcibly auth pin if there is lock cache on parent dir
405 ceph_assert(mdr
->lock_cache
->opcode
== CEPH_MDS_OP_UNLINK
);
406 CDentry
*dn
= mdr
->dn
[0].back();
407 ceph_assert(dn
->get_projected_linkage()->is_remote());
412 mdr
->disable_lock_cache();
413 drop_locks(mdr
.get());
414 mdr
->drop_local_auth_pins();
415 if (auth_pin_nonblocking
) {
416 dout(10) << " can't auth_pin (freezing?) " << *object
<< ", nonblocking" << dendl
;
420 if (err
== MDSCacheObject::ERR_EXPORTING_TREE
) {
421 marker
.message
= "failed to authpin, subtree is being exported";
422 } else if (err
== MDSCacheObject::ERR_FRAGMENTING_DIR
) {
423 marker
.message
= "failed to authpin, dir is being fragmented";
424 } else if (err
== MDSCacheObject::ERR_EXPORTING_INODE
) {
425 marker
.message
= "failed to authpin, inode is being exported";
427 dout(10) << " can't auth_pin (freezing?), waiting to authpin " << *object
<< dendl
;
428 object
->add_waiter(MDSCacheObject::WAIT_UNFREEZE
, new C_MDS_RetryRequest(mdcache
, mdr
));
430 if (mdr
->is_any_remote_auth_pin())
431 notify_freeze_waiter(object
);
437 // ok, grab local auth pins
438 for (const auto& p
: mustpin
) {
439 MDSCacheObject
*object
= p
;
440 if (mdr
->is_auth_pinned(object
)) {
441 dout(10) << " already auth_pinned " << *object
<< dendl
;
442 } else if (object
->is_auth()) {
443 dout(10) << " auth_pinning " << *object
<< dendl
;
444 mdr
->auth_pin(object
);
448 // request remote auth_pins
449 if (!mustpin_remote
.empty()) {
450 marker
.message
= "requesting remote authpins";
451 for (const auto& p
: mdr
->object_states
) {
452 if (p
.second
.remote_auth_pinned
== MDS_RANK_NONE
)
454 ceph_assert(p
.second
.remote_auth_pinned
== p
.first
->authority().first
);
455 auto q
= mustpin_remote
.find(p
.second
.remote_auth_pinned
);
456 if (q
!= mustpin_remote
.end())
457 q
->second
.insert(p
.first
);
460 for (auto& p
: mustpin_remote
) {
461 dout(10) << "requesting remote auth_pins from mds." << p
.first
<< dendl
;
463 // wait for active auth
464 if (mds
->is_cluster_degraded() &&
465 !mds
->mdsmap
->is_clientreplay_or_active_or_stopping(p
.first
)) {
466 dout(10) << " mds." << p
.first
<< " is not active" << dendl
;
467 if (mdr
->more()->waiting_on_peer
.empty())
468 mds
->wait_for_active_peer(p
.first
, new C_MDS_RetryRequest(mdcache
, mdr
));
472 auto req
= make_message
<MMDSPeerRequest
>(mdr
->reqid
, mdr
->attempt
,
473 MMDSPeerRequest::OP_AUTHPIN
);
474 for (auto& o
: p
.second
) {
475 dout(10) << " req remote auth_pin of " << *o
<< dendl
;
476 MDSCacheObjectInfo info
;
477 o
->set_object_info(info
);
478 req
->get_authpins().push_back(info
);
479 if (o
== auth_pin_freeze
)
480 o
->set_object_info(req
->get_authpin_freeze());
483 if (auth_pin_nonblocking
)
484 req
->mark_nonblocking();
485 else if (!mdr
->locks
.empty())
486 req
->mark_notify_blocking();
488 mds
->send_message_mds(req
, p
.first
);
490 // put in waiting list
491 auto ret
= mdr
->more()->waiting_on_peer
.insert(p
.first
);
492 ceph_assert(ret
.second
);
497 // caps i'll need to issue
498 set
<CInode
*> issue_set
;
502 // make sure they match currently acquired locks.
503 for (const auto& p
: lov
) {
506 if (mdr
->is_xlocked(lock
)) {
507 dout(10) << " already xlocked " << *lock
<< " " << *lock
->get_parent() << dendl
;
510 if (mdr
->locking
&& lock
!= mdr
->locking
)
511 cancel_locking(mdr
.get(), &issue_set
);
512 if (!xlock_start(lock
, mdr
)) {
513 marker
.message
= "failed to xlock, waiting";
516 dout(10) << " got xlock on " << *lock
<< " " << *lock
->get_parent() << dendl
;
517 } else if (p
.is_wrlock() || p
.is_remote_wrlock()) {
518 auto it
= mdr
->locks
.find(lock
);
519 if (p
.is_remote_wrlock()) {
520 if (it
!= mdr
->locks
.end() && it
->is_remote_wrlock()) {
521 dout(10) << " already remote_wrlocked " << *lock
<< " " << *lock
->get_parent() << dendl
;
523 if (mdr
->locking
&& lock
!= mdr
->locking
)
524 cancel_locking(mdr
.get(), &issue_set
);
525 marker
.message
= "waiting for remote wrlocks";
526 remote_wrlock_start(lock
, p
.wrlock_target
, mdr
);
531 if (it
!= mdr
->locks
.end() && it
->is_wrlock()) {
532 dout(10) << " already wrlocked " << *lock
<< " " << *lock
->get_parent() << dendl
;
535 client_t _client
= p
.is_state_pin() ? lock
->get_excl_client() : client
;
536 if (p
.is_remote_wrlock()) {
537 // nowait if we have already gotten remote wrlock
538 if (!wrlock_try(lock
, mdr
, _client
)) {
539 marker
.message
= "failed to wrlock, dropping remote wrlock and waiting";
540 // can't take the wrlock because the scatter lock is gathering. need to
541 // release the remote wrlock, so that the gathering process can finish.
542 ceph_assert(it
!= mdr
->locks
.end());
543 remote_wrlock_finish(it
, mdr
.get());
544 remote_wrlock_start(lock
, p
.wrlock_target
, mdr
);
548 if (!wrlock_start(p
, mdr
)) {
549 ceph_assert(!p
.is_remote_wrlock());
550 marker
.message
= "failed to wrlock, waiting";
554 dout(10) << " got wrlock on " << *lock
<< " " << *lock
->get_parent() << dendl
;
557 if (mdr
->is_rdlocked(lock
)) {
558 dout(10) << " already rdlocked " << *lock
<< " " << *lock
->get_parent() << dendl
;
562 ceph_assert(mdr
->is_leader());
563 if (lock
->needs_recover()) {
564 if (mds
->is_cluster_degraded()) {
565 if (!mdr
->is_queued_for_replay()) {
566 // see comments in SimpleLock::set_state_rejoin() and
567 // ScatterLock::encode_state_for_rejoin()
568 drop_locks(mdr
.get());
569 mds
->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache
, mdr
));
570 dout(10) << " rejoin recovering " << *lock
<< " " << *lock
->get_parent()
571 << ", waiting for cluster recovered" << dendl
;
572 marker
.message
= "rejoin recovering lock, waiting for cluster recovered";
576 lock
->clear_need_recover();
580 if (!rdlock_start(lock
, mdr
)) {
581 marker
.message
= "failed to rdlock, waiting";
584 dout(10) << " got rdlock on " << *lock
<< " " << *lock
->get_parent() << dendl
;
588 mdr
->set_mds_stamp(ceph_clock_now());
590 marker
.message
= "acquired locks";
593 issue_caps_set(issue_set
);
597 void Locker::notify_freeze_waiter(MDSCacheObject
*o
)
600 if (CInode
*in
= dynamic_cast<CInode
*>(o
)) {
602 dir
= in
->get_parent_dir();
603 } else if (CDentry
*dn
= dynamic_cast<CDentry
*>(o
)) {
606 dir
= dynamic_cast<CDir
*>(o
);
610 if (dir
->is_freezing_dir())
611 mdcache
->fragment_freeze_inc_num_waiters(dir
);
612 if (dir
->is_freezing_tree()) {
613 while (!dir
->is_freezing_tree_root())
614 dir
= dir
->get_parent_dir();
615 mdcache
->migrator
->export_freeze_inc_num_waiters(dir
);
620 void Locker::set_xlocks_done(MutationImpl
*mut
, bool skip_dentry
)
622 for (const auto &p
: mut
->locks
) {
625 MDSCacheObject
*obj
= p
.lock
->get_parent();
626 ceph_assert(obj
->is_auth());
628 (p
.lock
->get_type() == CEPH_LOCK_DN
|| p
.lock
->get_type() == CEPH_LOCK_DVERSION
))
630 dout(10) << "set_xlocks_done on " << *p
.lock
<< " " << *obj
<< dendl
;
631 p
.lock
->set_xlock_done();
635 void Locker::_drop_locks(MutationImpl
*mut
, set
<CInode
*> *pneed_issue
,
638 set
<mds_rank_t
> peers
;
640 for (auto it
= mut
->locks
.begin(); it
!= mut
->locks
.end(); ) {
641 SimpleLock
*lock
= it
->lock
;
642 MDSCacheObject
*obj
= lock
->get_parent();
644 if (it
->is_xlock()) {
645 if (obj
->is_auth()) {
647 xlock_finish(it
++, mut
, &ni
);
649 pneed_issue
->insert(static_cast<CInode
*>(obj
));
651 ceph_assert(lock
->get_sm()->can_remote_xlock
);
652 peers
.insert(obj
->authority().first
);
654 mut
->locks
.erase(it
++);
656 } else if (it
->is_wrlock() || it
->is_remote_wrlock()) {
657 if (it
->is_remote_wrlock()) {
658 peers
.insert(it
->wrlock_target
);
659 it
->clear_remote_wrlock();
661 if (it
->is_wrlock()) {
663 wrlock_finish(it
++, mut
, &ni
);
665 pneed_issue
->insert(static_cast<CInode
*>(obj
));
667 mut
->locks
.erase(it
++);
669 } else if (drop_rdlocks
&& it
->is_rdlock()) {
671 rdlock_finish(it
++, mut
, &ni
);
673 pneed_issue
->insert(static_cast<CInode
*>(obj
));
680 if (mut
->lock_cache
) {
681 put_lock_cache(mut
->lock_cache
);
682 mut
->lock_cache
= nullptr;
686 for (set
<mds_rank_t
>::iterator p
= peers
.begin(); p
!= peers
.end(); ++p
) {
687 if (!mds
->is_cluster_degraded() ||
688 mds
->mdsmap
->get_state(*p
) >= MDSMap::STATE_REJOIN
) {
689 dout(10) << "_drop_non_rdlocks dropping remote locks on mds." << *p
<< dendl
;
690 auto peerreq
= make_message
<MMDSPeerRequest
>(mut
->reqid
, mut
->attempt
,
691 MMDSPeerRequest::OP_DROPLOCKS
);
692 mds
->send_message_mds(peerreq
, *p
);
697 void Locker::cancel_locking(MutationImpl
*mut
, set
<CInode
*> *pneed_issue
)
699 SimpleLock
*lock
= mut
->locking
;
701 dout(10) << "cancel_locking " << *lock
<< " on " << *mut
<< dendl
;
703 if (lock
->get_parent()->is_auth()) {
704 bool need_issue
= false;
705 if (lock
->get_state() == LOCK_PREXLOCK
) {
706 _finish_xlock(lock
, -1, &need_issue
);
707 } else if (lock
->get_state() == LOCK_LOCK_XLOCK
) {
708 lock
->set_state(LOCK_XLOCKDONE
);
709 eval_gather(lock
, true, &need_issue
);
712 pneed_issue
->insert(static_cast<CInode
*>(lock
->get_parent()));
714 mut
->finish_locking(lock
);
717 void Locker::drop_locks(MutationImpl
*mut
, set
<CInode
*> *pneed_issue
)
720 set
<CInode
*> my_need_issue
;
722 pneed_issue
= &my_need_issue
;
725 cancel_locking(mut
, pneed_issue
);
726 _drop_locks(mut
, pneed_issue
, true);
728 if (pneed_issue
== &my_need_issue
)
729 issue_caps_set(*pneed_issue
);
730 mut
->locking_state
= 0;
733 void Locker::drop_non_rdlocks(MutationImpl
*mut
, set
<CInode
*> *pneed_issue
)
735 set
<CInode
*> my_need_issue
;
737 pneed_issue
= &my_need_issue
;
739 _drop_locks(mut
, pneed_issue
, false);
741 if (pneed_issue
== &my_need_issue
)
742 issue_caps_set(*pneed_issue
);
745 void Locker::drop_rdlocks_for_early_reply(MutationImpl
*mut
)
747 set
<CInode
*> need_issue
;
749 for (auto it
= mut
->locks
.begin(); it
!= mut
->locks
.end(); ) {
750 if (!it
->is_rdlock()) {
754 SimpleLock
*lock
= it
->lock
;
755 // make later mksnap/setlayout (at other mds) wait for this unsafe request
756 if (lock
->get_type() == CEPH_LOCK_ISNAP
||
757 lock
->get_type() == CEPH_LOCK_IPOLICY
) {
762 rdlock_finish(it
++, mut
, &ni
);
764 need_issue
.insert(static_cast<CInode
*>(lock
->get_parent()));
767 issue_caps_set(need_issue
);
770 void Locker::drop_locks_for_fragment_unfreeze(MutationImpl
*mut
)
772 set
<CInode
*> need_issue
;
774 for (auto it
= mut
->locks
.begin(); it
!= mut
->locks
.end(); ) {
775 SimpleLock
*lock
= it
->lock
;
776 if (lock
->get_type() == CEPH_LOCK_IDFT
) {
781 wrlock_finish(it
++, mut
, &ni
);
783 need_issue
.insert(static_cast<CInode
*>(lock
->get_parent()));
785 issue_caps_set(need_issue
);
788 class C_MDL_DropCache
: public LockerContext
{
789 MDLockCache
*lock_cache
;
791 C_MDL_DropCache(Locker
*l
, MDLockCache
*lc
) :
792 LockerContext(l
), lock_cache(lc
) { }
793 void finish(int r
) override
{
794 locker
->drop_locks(lock_cache
);
795 lock_cache
->cleanup();
800 void Locker::put_lock_cache(MDLockCache
* lock_cache
)
802 ceph_assert(lock_cache
->ref
> 0);
803 if (--lock_cache
->ref
> 0)
806 ceph_assert(lock_cache
->invalidating
);
808 lock_cache
->detach_locks();
810 CInode
*diri
= lock_cache
->get_dir_inode();
811 for (auto dir
: lock_cache
->auth_pinned_dirfrags
) {
812 if (dir
->get_inode() != diri
)
814 dir
->enable_frozen_inode();
817 mds
->queue_waiter(new C_MDL_DropCache(this, lock_cache
));
820 int Locker::get_cap_bit_for_lock_cache(int op
)
823 case CEPH_MDS_OP_CREATE
:
824 return CEPH_CAP_DIR_CREATE
;
825 case CEPH_MDS_OP_UNLINK
:
826 return CEPH_CAP_DIR_UNLINK
;
828 ceph_assert(0 == "unsupported operation");
833 void Locker::invalidate_lock_cache(MDLockCache
*lock_cache
)
835 ceph_assert(lock_cache
->item_cap_lock_cache
.is_on_list());
836 if (lock_cache
->invalidating
) {
837 ceph_assert(!lock_cache
->client_cap
);
839 lock_cache
->invalidating
= true;
840 lock_cache
->detach_dirfrags();
843 Capability
*cap
= lock_cache
->client_cap
;
845 int cap_bit
= get_cap_bit_for_lock_cache(lock_cache
->opcode
);
846 cap
->clear_lock_cache_allowed(cap_bit
);
847 if (cap
->issued() & cap_bit
)
848 issue_caps(lock_cache
->get_dir_inode(), cap
);
854 lock_cache
->item_cap_lock_cache
.remove_myself();
855 put_lock_cache(lock_cache
);
859 void Locker::eval_lock_caches(Capability
*cap
)
861 for (auto p
= cap
->lock_caches
.begin(); !p
.end(); ) {
862 MDLockCache
*lock_cache
= *p
;
864 if (!lock_cache
->invalidating
)
866 int cap_bit
= get_cap_bit_for_lock_cache(lock_cache
->opcode
);
867 if (!(cap
->issued() & cap_bit
)) {
868 lock_cache
->item_cap_lock_cache
.remove_myself();
869 put_lock_cache(lock_cache
);
874 // ask lock caches to release auth pins
875 void Locker::invalidate_lock_caches(CDir
*dir
)
877 dout(10) << "invalidate_lock_caches on " << *dir
<< dendl
;
878 auto &lock_caches
= dir
->lock_caches_with_auth_pins
;
879 while (!lock_caches
.empty()) {
880 invalidate_lock_cache(lock_caches
.front()->parent
);
884 // ask lock caches to release locks
885 void Locker::invalidate_lock_caches(SimpleLock
*lock
)
887 dout(10) << "invalidate_lock_caches " << *lock
<< " on " << *lock
->get_parent() << dendl
;
888 if (lock
->is_cached()) {
889 auto&& lock_caches
= lock
->get_active_caches();
890 for (auto& lc
: lock_caches
)
891 invalidate_lock_cache(lc
);
895 void Locker::create_lock_cache(MDRequestRef
& mdr
, CInode
*diri
, file_layout_t
*dir_layout
)
900 client_t client
= mdr
->get_client();
901 int opcode
= mdr
->client_request
->get_op();
902 dout(10) << "create_lock_cache for client." << client
<< "/" << ceph_mds_op_name(opcode
)<< " on " << *diri
<< dendl
;
904 if (!diri
->is_auth()) {
905 dout(10) << " dir inode is not auth, noop" << dendl
;
909 if (mdr
->has_more() && !mdr
->more()->peers
.empty()) {
910 dout(10) << " there are peers requests for " << *mdr
<< ", noop" << dendl
;
914 Capability
*cap
= diri
->get_client_cap(client
);
916 dout(10) << " there is no cap for client." << client
<< ", noop" << dendl
;
920 for (auto p
= cap
->lock_caches
.begin(); !p
.end(); ++p
) {
921 if ((*p
)->opcode
== opcode
) {
922 dout(10) << " lock cache already exists for " << ceph_mds_op_name(opcode
) << ", noop" << dendl
;
927 set
<MDSCacheObject
*> ancestors
;
928 for (CInode
*in
= diri
; ; ) {
929 CDentry
*pdn
= in
->get_projected_parent_dn();
932 // ancestors.insert(pdn);
933 in
= pdn
->get_dir()->get_inode();
934 ancestors
.insert(in
);
937 for (auto& p
: mdr
->object_states
) {
938 if (p
.first
!= diri
&& !ancestors
.count(p
.first
))
940 auto& stat
= p
.second
;
941 if (stat
.auth_pinned
) {
942 if (!p
.first
->can_auth_pin()) {
943 dout(10) << " can't auth_pin(freezing?) lock parent " << *p
.first
<< ", noop" << dendl
;
946 if (CInode
*in
= dynamic_cast<CInode
*>(p
.first
); in
->is_parent_projected()) {
947 CDir
*dir
= in
->get_projected_parent_dir();
948 if (!dir
->can_auth_pin()) {
949 dout(10) << " can't auth_pin(!auth|freezing?) dirfrag " << *dir
<< ", noop" << dendl
;
956 std::vector
<CDir
*> dfv
;
957 dfv
.reserve(diri
->get_num_dirfrags());
959 diri
->get_dirfrags(dfv
);
960 for (auto dir
: dfv
) {
961 if (!dir
->is_auth() || !dir
->can_auth_pin()) {
962 dout(10) << " can't auth_pin(!auth|freezing?) dirfrag " << *dir
<< ", noop" << dendl
;
965 if (dir
->is_any_freezing_or_frozen_inode()) {
966 dout(10) << " there is freezing/frozen inode in " << *dir
<< ", noop" << dendl
;
971 for (auto& p
: mdr
->locks
) {
972 MDSCacheObject
*obj
= p
.lock
->get_parent();
973 if (obj
!= diri
&& !ancestors
.count(obj
))
975 if (!p
.lock
->is_stable()) {
976 dout(10) << " unstable " << *p
.lock
<< " on " << *obj
<< ", noop" << dendl
;
981 auto lock_cache
= new MDLockCache(cap
, opcode
);
983 lock_cache
->set_dir_layout(*dir_layout
);
984 cap
->set_lock_cache_allowed(get_cap_bit_for_lock_cache(opcode
));
986 for (auto dir
: dfv
) {
987 // prevent subtree migration
988 lock_cache
->auth_pin(dir
);
989 // prevent frozen inode
990 dir
->disable_frozen_inode();
993 for (auto& p
: mdr
->object_states
) {
994 if (p
.first
!= diri
&& !ancestors
.count(p
.first
))
996 auto& stat
= p
.second
;
997 if (stat
.auth_pinned
)
998 lock_cache
->auth_pin(p
.first
);
1000 lock_cache
->pin(p
.first
);
1002 if (CInode
*in
= dynamic_cast<CInode
*>(p
.first
)) {
1003 CDentry
*pdn
= in
->get_projected_parent_dn();
1005 dfv
.push_back(pdn
->get_dir());
1006 } else if (CDentry
*dn
= dynamic_cast<CDentry
*>(p
.first
)) {
1007 dfv
.push_back(dn
->get_dir());
1009 ceph_assert(0 == "unknown type of lock parent");
1012 lock_cache
->attach_dirfrags(std::move(dfv
));
1014 for (auto it
= mdr
->locks
.begin(); it
!= mdr
->locks
.end(); ) {
1015 MDSCacheObject
*obj
= it
->lock
->get_parent();
1016 if (obj
!= diri
&& !ancestors
.count(obj
)) {
1020 unsigned lock_flag
= 0;
1021 if (it
->is_wrlock()) {
1022 // skip wrlocks that were added by MDCache::predirty_journal_parent()
1024 lock_flag
= MutationImpl::LockOp::WRLOCK
;
1026 ceph_assert(it
->is_rdlock());
1027 lock_flag
= MutationImpl::LockOp::RDLOCK
;
1030 lock_cache
->emplace_lock(it
->lock
, lock_flag
);
1031 mdr
->locks
.erase(it
++);
1036 lock_cache
->attach_locks();
1039 mdr
->lock_cache
= lock_cache
;
1042 bool Locker::find_and_attach_lock_cache(MDRequestRef
& mdr
, CInode
*diri
)
1044 if (mdr
->lock_cache
)
1047 Capability
*cap
= diri
->get_client_cap(mdr
->get_client());
1051 int opcode
= mdr
->client_request
->get_op();
1052 for (auto p
= cap
->lock_caches
.begin(); !p
.end(); ++p
) {
1053 MDLockCache
*lock_cache
= *p
;
1054 if (lock_cache
->opcode
== opcode
) {
1055 dout(10) << "found lock cache for " << ceph_mds_op_name(opcode
) << " on " << *diri
<< dendl
;
1056 mdr
->lock_cache
= lock_cache
;
1057 mdr
->lock_cache
->ref
++;
1066 void Locker::eval_gather(SimpleLock
*lock
, bool first
, bool *pneed_issue
, MDSContext::vec
*pfinishers
)
1068 dout(10) << "eval_gather " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1069 ceph_assert(!lock
->is_stable());
1071 int next
= lock
->get_next_state();
1074 bool caps
= lock
->get_cap_shift();
1075 if (lock
->get_type() != CEPH_LOCK_DN
)
1076 in
= static_cast<CInode
*>(lock
->get_parent());
1078 bool need_issue
= false;
1080 int loner_issued
= 0, other_issued
= 0, xlocker_issued
= 0;
1081 ceph_assert(!caps
|| in
!= NULL
);
1082 if (caps
&& in
->is_head()) {
1083 in
->get_caps_issued(&loner_issued
, &other_issued
, &xlocker_issued
,
1084 lock
->get_cap_shift(), lock
->get_cap_mask());
1085 dout(10) << " next state is " << lock
->get_state_name(next
)
1086 << " issued/allows loner " << gcap_string(loner_issued
)
1087 << "/" << gcap_string(lock
->gcaps_allowed(CAP_LONER
, next
))
1088 << " xlocker " << gcap_string(xlocker_issued
)
1089 << "/" << gcap_string(lock
->gcaps_allowed(CAP_XLOCKER
, next
))
1090 << " other " << gcap_string(other_issued
)
1091 << "/" << gcap_string(lock
->gcaps_allowed(CAP_ANY
, next
))
1094 if (first
&& ((~lock
->gcaps_allowed(CAP_ANY
, next
) & other_issued
) ||
1095 (~lock
->gcaps_allowed(CAP_LONER
, next
) & loner_issued
) ||
1096 (~lock
->gcaps_allowed(CAP_XLOCKER
, next
) & xlocker_issued
)))
1100 #define IS_TRUE_AND_LT_AUTH(x, auth) (x && ((auth && x <= AUTH) || (!auth && x < AUTH)))
1101 bool auth
= lock
->get_parent()->is_auth();
1102 if (!lock
->is_gathering() &&
1103 (IS_TRUE_AND_LT_AUTH(lock
->get_sm()->states
[next
].can_rdlock
, auth
) || !lock
->is_rdlocked()) &&
1104 (IS_TRUE_AND_LT_AUTH(lock
->get_sm()->states
[next
].can_wrlock
, auth
) || !lock
->is_wrlocked()) &&
1105 (IS_TRUE_AND_LT_AUTH(lock
->get_sm()->states
[next
].can_xlock
, auth
) || !lock
->is_xlocked()) &&
1106 (IS_TRUE_AND_LT_AUTH(lock
->get_sm()->states
[next
].can_lease
, auth
) || !lock
->is_leased()) &&
1107 !(lock
->get_parent()->is_auth() && lock
->is_flushing()) && // i.e. wait for scatter_writebehind!
1108 (!caps
|| ((~lock
->gcaps_allowed(CAP_ANY
, next
) & other_issued
) == 0 &&
1109 (~lock
->gcaps_allowed(CAP_LONER
, next
) & loner_issued
) == 0 &&
1110 (~lock
->gcaps_allowed(CAP_XLOCKER
, next
) & xlocker_issued
) == 0)) &&
1111 lock
->get_state() != LOCK_SYNC_MIX2
&& // these states need an explicit trigger from the auth mds
1112 lock
->get_state() != LOCK_MIX_SYNC2
1114 dout(7) << "eval_gather finished gather on " << *lock
1115 << " on " << *lock
->get_parent() << dendl
;
1117 if (lock
->get_sm() == &sm_filelock
) {
1119 if (in
->state_test(CInode::STATE_RECOVERING
)) {
1120 dout(7) << "eval_gather finished gather, but still recovering" << dendl
;
1122 } else if (in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
1123 dout(7) << "eval_gather finished gather, but need to recover" << dendl
;
1124 mds
->mdcache
->queue_file_recover(in
);
1125 mds
->mdcache
->do_file_recover();
1130 if (!lock
->get_parent()->is_auth()) {
1131 // replica: tell auth
1132 mds_rank_t auth
= lock
->get_parent()->authority().first
;
1134 if (lock
->get_parent()->is_rejoining() &&
1135 mds
->mdsmap
->get_state(auth
) == MDSMap::STATE_REJOIN
) {
1136 dout(7) << "eval_gather finished gather, but still rejoining "
1137 << *lock
->get_parent() << dendl
;
1141 if (!mds
->is_cluster_degraded() ||
1142 mds
->mdsmap
->get_state(auth
) >= MDSMap::STATE_REJOIN
) {
1143 switch (lock
->get_state()) {
1144 case LOCK_SYNC_LOCK
:
1145 mds
->send_message_mds(make_message
<MLock
>(lock
, LOCK_AC_LOCKACK
, mds
->get_nodeid()), auth
);
1150 auto reply
= make_message
<MLock
>(lock
, LOCK_AC_SYNCACK
, mds
->get_nodeid());
1151 lock
->encode_locked_state(reply
->get_data());
1152 mds
->send_message_mds(reply
, auth
);
1153 next
= LOCK_MIX_SYNC2
;
1154 (static_cast<ScatterLock
*>(lock
))->start_flush();
1158 case LOCK_MIX_SYNC2
:
1159 (static_cast<ScatterLock
*>(lock
))->finish_flush();
1160 (static_cast<ScatterLock
*>(lock
))->clear_flushed();
1162 case LOCK_SYNC_MIX2
:
1163 // do nothing, we already acked
1168 auto reply
= make_message
<MLock
>(lock
, LOCK_AC_MIXACK
, mds
->get_nodeid());
1169 mds
->send_message_mds(reply
, auth
);
1170 next
= LOCK_SYNC_MIX2
;
1177 lock
->encode_locked_state(data
);
1178 mds
->send_message_mds(make_message
<MLock
>(lock
, LOCK_AC_LOCKACK
, mds
->get_nodeid(), data
), auth
);
1179 (static_cast<ScatterLock
*>(lock
))->start_flush();
1180 // we'll get an AC_LOCKFLUSHED to complete
1191 // once the first (local) stage of mix->lock gather complete we can
1192 // gather from replicas
1193 if (lock
->get_state() == LOCK_MIX_LOCK
&&
1194 lock
->get_parent()->is_replicated()) {
1195 dout(10) << " finished (local) gather for mix->lock, now gathering from replicas" << dendl
;
1196 send_lock_message(lock
, LOCK_AC_LOCK
);
1197 lock
->init_gather();
1198 lock
->set_state(LOCK_MIX_LOCK2
);
1202 if (lock
->is_dirty() && !lock
->is_flushed()) {
1203 scatter_writebehind(static_cast<ScatterLock
*>(lock
));
1204 mds
->mdlog
->flush();
1207 lock
->clear_flushed();
1209 switch (lock
->get_state()) {
1215 in
->start_scatter(static_cast<ScatterLock
*>(lock
));
1216 if (lock
->get_parent()->is_replicated()) {
1217 bufferlist softdata
;
1218 lock
->encode_locked_state(softdata
);
1219 send_lock_message(lock
, LOCK_AC_MIX
, softdata
);
1221 (static_cast<ScatterLock
*>(lock
))->clear_scatter_wanted();
1225 case LOCK_XLOCKDONE
:
1226 if (next
!= LOCK_SYNC
)
1231 case LOCK_EXCL_SYNC
:
1232 case LOCK_LOCK_SYNC
:
1234 case LOCK_XSYN_SYNC
:
1235 if (lock
->get_parent()->is_replicated()) {
1236 bufferlist softdata
;
1237 lock
->encode_locked_state(softdata
);
1238 send_lock_message(lock
, LOCK_AC_SYNC
, softdata
);
1245 lock
->set_state(next
);
1247 if (lock
->get_parent()->is_auth() &&
1249 lock
->get_parent()->auth_unpin(lock
);
1251 // drop loner before doing waiters
1255 in
->get_wanted_loner() != in
->get_loner()) {
1256 dout(10) << " trying to drop loner" << dendl
;
1257 if (in
->try_drop_loner()) {
1258 dout(10) << " dropped loner" << dendl
;
1264 lock
->take_waiting(SimpleLock::WAIT_STABLE
|SimpleLock::WAIT_WR
|SimpleLock::WAIT_RD
|SimpleLock::WAIT_XLOCK
,
1267 lock
->finish_waiters(SimpleLock::WAIT_STABLE
|SimpleLock::WAIT_WR
|SimpleLock::WAIT_RD
|SimpleLock::WAIT_XLOCK
);
1269 if (caps
&& in
->is_head())
1272 if (lock
->get_parent()->is_auth() &&
1274 try_eval(lock
, &need_issue
);
1279 *pneed_issue
= true;
1280 else if (in
->is_head())
1286 bool Locker::eval(CInode
*in
, int mask
, bool caps_imported
)
1288 bool need_issue
= caps_imported
;
1289 MDSContext::vec finishers
;
1291 dout(10) << "eval " << mask
<< " " << *in
<< dendl
;
1294 if (in
->is_auth() && in
->is_head()) {
1295 client_t orig_loner
= in
->get_loner();
1296 if (in
->choose_ideal_loner()) {
1297 dout(10) << "eval set loner: client." << orig_loner
<< " -> client." << in
->get_loner() << dendl
;
1300 } else if (in
->get_wanted_loner() != in
->get_loner()) {
1301 dout(10) << "eval want loner: client." << in
->get_wanted_loner() << " but failed to set it" << dendl
;
1307 if (mask
& CEPH_LOCK_IFILE
)
1308 eval_any(&in
->filelock
, &need_issue
, &finishers
, caps_imported
);
1309 if (mask
& CEPH_LOCK_IAUTH
)
1310 eval_any(&in
->authlock
, &need_issue
, &finishers
, caps_imported
);
1311 if (mask
& CEPH_LOCK_ILINK
)
1312 eval_any(&in
->linklock
, &need_issue
, &finishers
, caps_imported
);
1313 if (mask
& CEPH_LOCK_IXATTR
)
1314 eval_any(&in
->xattrlock
, &need_issue
, &finishers
, caps_imported
);
1315 if (mask
& CEPH_LOCK_INEST
)
1316 eval_any(&in
->nestlock
, &need_issue
, &finishers
, caps_imported
);
1317 if (mask
& CEPH_LOCK_IFLOCK
)
1318 eval_any(&in
->flocklock
, &need_issue
, &finishers
, caps_imported
);
1319 if (mask
& CEPH_LOCK_IPOLICY
)
1320 eval_any(&in
->policylock
, &need_issue
, &finishers
, caps_imported
);
1323 if (in
->is_auth() && in
->is_head() && in
->get_wanted_loner() != in
->get_loner()) {
1324 if (in
->try_drop_loner()) {
1326 if (in
->get_wanted_loner() >= 0) {
1327 dout(10) << "eval end set loner to client." << in
->get_wanted_loner() << dendl
;
1328 bool ok
= in
->try_set_loner();
1336 finish_contexts(g_ceph_context
, finishers
);
1338 if (need_issue
&& in
->is_head())
1341 dout(10) << "eval done" << dendl
;
1345 class C_Locker_Eval
: public LockerContext
{
1349 C_Locker_Eval(Locker
*l
, MDSCacheObject
*pp
, int m
) : LockerContext(l
), p(pp
), mask(m
) {
1350 // We are used as an MDSCacheObject waiter, so should
1351 // only be invoked by someone already holding the big lock.
1352 ceph_assert(ceph_mutex_is_locked_by_me(locker
->mds
->mds_lock
));
1353 p
->get(MDSCacheObject::PIN_PTRWAITER
);
1355 void finish(int r
) override
{
1356 locker
->try_eval(p
, mask
);
1357 p
->put(MDSCacheObject::PIN_PTRWAITER
);
1361 void Locker::try_eval(MDSCacheObject
*p
, int mask
)
1363 // unstable and ambiguous auth?
1364 if (p
->is_ambiguous_auth()) {
1365 dout(7) << "try_eval ambiguous auth, waiting on " << *p
<< dendl
;
1366 p
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
, new C_Locker_Eval(this, p
, mask
));
1370 if (p
->is_auth() && p
->is_frozen()) {
1371 dout(7) << "try_eval frozen, waiting on " << *p
<< dendl
;
1372 p
->add_waiter(MDSCacheObject::WAIT_UNFREEZE
, new C_Locker_Eval(this, p
, mask
));
1376 if (mask
& CEPH_LOCK_DN
) {
1377 ceph_assert(mask
== CEPH_LOCK_DN
);
1378 bool need_issue
= false; // ignore this, no caps on dentries
1379 CDentry
*dn
= static_cast<CDentry
*>(p
);
1380 eval_any(&dn
->lock
, &need_issue
);
1382 CInode
*in
= static_cast<CInode
*>(p
);
1387 void Locker::try_eval(SimpleLock
*lock
, bool *pneed_issue
)
1389 MDSCacheObject
*p
= lock
->get_parent();
1391 // unstable and ambiguous auth?
1392 if (p
->is_ambiguous_auth()) {
1393 dout(7) << "try_eval " << *lock
<< " ambiguousauth, waiting on " << *p
<< dendl
;
1394 p
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
, new C_Locker_Eval(this, p
, lock
->get_type()));
1398 if (!p
->is_auth()) {
1399 dout(7) << "try_eval " << *lock
<< " not auth for " << *p
<< dendl
;
1403 if (p
->is_frozen()) {
1404 dout(7) << "try_eval " << *lock
<< " frozen, waiting on " << *p
<< dendl
;
1405 p
->add_waiter(MDSCacheObject::WAIT_UNFREEZE
, new C_Locker_Eval(this, p
, lock
->get_type()));
1410 * We could have a situation like:
1412 * - mds A authpins item on mds B
1413 * - mds B starts to freeze tree containing item
1414 * - mds A tries wrlock_start on A, sends REQSCATTER to B
1415 * - mds B lock is unstable, sets scatter_wanted
1416 * - mds B lock stabilizes, calls try_eval.
1418 * We can defer while freezing without causing a deadlock. Honor
1419 * scatter_wanted flag here. This will never get deferred by the
1420 * checks above due to the auth_pin held by the leader.
1422 if (lock
->is_scatterlock()) {
1423 ScatterLock
*slock
= static_cast<ScatterLock
*>(lock
);
1424 if (slock
->get_scatter_wanted() &&
1425 slock
->get_state() != LOCK_MIX
) {
1426 scatter_mix(slock
, pneed_issue
);
1427 if (!lock
->is_stable())
1429 } else if (slock
->get_unscatter_wanted() &&
1430 slock
->get_state() != LOCK_LOCK
) {
1431 simple_lock(slock
, pneed_issue
);
1432 if (!lock
->is_stable()) {
1438 if (lock
->get_type() != CEPH_LOCK_DN
&&
1439 lock
->get_type() != CEPH_LOCK_ISNAP
&&
1440 lock
->get_type() != CEPH_LOCK_IPOLICY
&&
1442 dout(7) << "try_eval " << *lock
<< " freezing, waiting on " << *p
<< dendl
;
1443 p
->add_waiter(MDSCacheObject::WAIT_UNFREEZE
, new C_Locker_Eval(this, p
, lock
->get_type()));
1447 eval(lock
, pneed_issue
);
1450 void Locker::eval_cap_gather(CInode
*in
, set
<CInode
*> *issue_set
)
1452 bool need_issue
= false;
1453 MDSContext::vec finishers
;
1456 if (!in
->filelock
.is_stable())
1457 eval_gather(&in
->filelock
, false, &need_issue
, &finishers
);
1458 if (!in
->authlock
.is_stable())
1459 eval_gather(&in
->authlock
, false, &need_issue
, &finishers
);
1460 if (!in
->linklock
.is_stable())
1461 eval_gather(&in
->linklock
, false, &need_issue
, &finishers
);
1462 if (!in
->xattrlock
.is_stable())
1463 eval_gather(&in
->xattrlock
, false, &need_issue
, &finishers
);
1465 if (need_issue
&& in
->is_head()) {
1467 issue_set
->insert(in
);
1472 finish_contexts(g_ceph_context
, finishers
);
1475 void Locker::eval_scatter_gathers(CInode
*in
)
1477 bool need_issue
= false;
1478 MDSContext::vec finishers
;
1480 dout(10) << "eval_scatter_gathers " << *in
<< dendl
;
1483 if (!in
->filelock
.is_stable())
1484 eval_gather(&in
->filelock
, false, &need_issue
, &finishers
);
1485 if (!in
->nestlock
.is_stable())
1486 eval_gather(&in
->nestlock
, false, &need_issue
, &finishers
);
1487 if (!in
->dirfragtreelock
.is_stable())
1488 eval_gather(&in
->dirfragtreelock
, false, &need_issue
, &finishers
);
1490 if (need_issue
&& in
->is_head())
1493 finish_contexts(g_ceph_context
, finishers
);
1496 void Locker::eval(SimpleLock
*lock
, bool *need_issue
)
1498 switch (lock
->get_type()) {
1499 case CEPH_LOCK_IFILE
:
1500 return file_eval(static_cast<ScatterLock
*>(lock
), need_issue
);
1501 case CEPH_LOCK_IDFT
:
1502 case CEPH_LOCK_INEST
:
1503 return scatter_eval(static_cast<ScatterLock
*>(lock
), need_issue
);
1505 return simple_eval(lock
, need_issue
);
1510 // ------------------
1513 bool Locker::_rdlock_kick(SimpleLock
*lock
, bool as_anon
)
1516 if (lock
->is_stable()) {
1517 if (lock
->get_parent()->is_auth()) {
1518 if (lock
->get_sm() == &sm_scatterlock
) {
1519 // not until tempsync is fully implemented
1520 //if (lock->get_parent()->is_replicated())
1521 //scatter_tempsync((ScatterLock*)lock);
1524 } else if (lock
->get_sm() == &sm_filelock
) {
1525 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1526 if (lock
->get_state() == LOCK_EXCL
&&
1527 in
->get_target_loner() >= 0 &&
1528 !in
->is_dir() && !as_anon
) // as_anon => caller wants SYNC, not XSYN
1536 // request rdlock state change from auth
1537 mds_rank_t auth
= lock
->get_parent()->authority().first
;
1538 if (!mds
->is_cluster_degraded() ||
1539 mds
->mdsmap
->is_clientreplay_or_active_or_stopping(auth
)) {
1540 dout(10) << "requesting rdlock from auth on "
1541 << *lock
<< " on " << *lock
->get_parent() << dendl
;
1542 mds
->send_message_mds(make_message
<MLock
>(lock
, LOCK_AC_REQRDLOCK
, mds
->get_nodeid()), auth
);
1547 if (lock
->get_type() == CEPH_LOCK_IFILE
) {
1548 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1549 if (in
->state_test(CInode::STATE_RECOVERING
)) {
1550 mds
->mdcache
->recovery_queue
.prioritize(in
);
1557 bool Locker::rdlock_try(SimpleLock
*lock
, client_t client
)
1559 dout(7) << "rdlock_try on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1561 // can read? grab ref.
1562 if (lock
->can_rdlock(client
))
1565 _rdlock_kick(lock
, false);
1567 if (lock
->can_rdlock(client
))
1573 bool Locker::rdlock_start(SimpleLock
*lock
, MDRequestRef
& mut
, bool as_anon
)
1575 dout(7) << "rdlock_start on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1577 // client may be allowed to rdlock the same item it has xlocked.
1578 // UNLESS someone passes in as_anon, or we're reading snapped version here.
1579 if (mut
->snapid
!= CEPH_NOSNAP
)
1581 client_t client
= as_anon
? -1 : mut
->get_client();
1584 if (lock
->get_type() != CEPH_LOCK_DN
)
1585 in
= static_cast<CInode
*>(lock
->get_parent());
1588 if (!lock->get_parent()->is_auth() &&
1589 lock->fw_rdlock_to_auth()) {
1590 mdcache->request_forward(mut, lock->get_parent()->authority().first);
1596 // can read? grab ref.
1597 if (lock
->can_rdlock(client
)) {
1599 mut
->emplace_lock(lock
, MutationImpl::LockOp::RDLOCK
);
1603 // hmm, wait a second.
1604 if (in
&& !in
->is_head() && in
->is_auth() &&
1605 lock
->get_state() == LOCK_SNAP_SYNC
) {
1606 // okay, we actually need to kick the head's lock to get ourselves synced up.
1607 CInode
*head
= mdcache
->get_inode(in
->ino());
1609 SimpleLock
*hlock
= head
->get_lock(CEPH_LOCK_IFILE
);
1610 if (hlock
->get_state() == LOCK_SYNC
)
1611 hlock
= head
->get_lock(lock
->get_type());
1613 if (hlock
->get_state() != LOCK_SYNC
) {
1614 dout(10) << "rdlock_start trying head inode " << *head
<< dendl
;
1615 if (!rdlock_start(hlock
, mut
, true)) // ** as_anon, no rdlock on EXCL **
1617 // oh, check our lock again then
1621 if (!_rdlock_kick(lock
, as_anon
))
1627 if (lock
->get_parent()->is_auth() && lock
->is_stable())
1628 wait_on
= SimpleLock::WAIT_RD
;
1630 wait_on
= SimpleLock::WAIT_STABLE
; // REQRDLOCK is ignored if lock is unstable, so we need to retry.
1631 dout(7) << "rdlock_start waiting on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1632 lock
->add_waiter(wait_on
, new C_MDS_RetryRequest(mdcache
, mut
));
1637 void Locker::nudge_log(SimpleLock
*lock
)
1639 dout(10) << "nudge_log " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1640 if (lock
->get_parent()->is_auth() && lock
->is_unstable_and_locked()) // as with xlockdone, or cap flush
1641 mds
->mdlog
->flush();
1644 void Locker::rdlock_finish(const MutationImpl::lock_iterator
& it
, MutationImpl
*mut
, bool *pneed_issue
)
1646 ceph_assert(it
->is_rdlock());
1647 SimpleLock
*lock
= it
->lock
;
1651 mut
->locks
.erase(it
);
1653 dout(7) << "rdlock_finish on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1656 if (!lock
->is_rdlocked()) {
1657 if (!lock
->is_stable())
1658 eval_gather(lock
, false, pneed_issue
);
1659 else if (lock
->get_parent()->is_auth())
1660 try_eval(lock
, pneed_issue
);
1664 bool Locker::rdlock_try_set(MutationImpl::LockOpVec
& lov
, MDRequestRef
& mdr
)
1666 dout(10) << __func__
<< dendl
;
1667 for (const auto& p
: lov
) {
1669 ceph_assert(p
.is_rdlock());
1670 if (!mdr
->is_rdlocked(lock
) && !rdlock_try(lock
, mdr
->get_client())) {
1671 lock
->add_waiter(SimpleLock::WAIT_STABLE
|SimpleLock::WAIT_RD
,
1672 new C_MDS_RetryRequest(mdcache
, mdr
));
1676 mdr
->emplace_lock(lock
, MutationImpl::LockOp::RDLOCK
);
1677 dout(20) << " got rdlock on " << *lock
<< " " << *lock
->get_parent() << dendl
;
1682 dout(10) << __func__
<< " failed" << dendl
;
1683 drop_locks(mdr
.get(), nullptr);
1684 mdr
->drop_local_auth_pins();
1688 bool Locker::rdlock_try_set(MutationImpl::LockOpVec
& lov
, MutationRef
& mut
)
1690 dout(10) << __func__
<< dendl
;
1691 for (const auto& p
: lov
) {
1693 ceph_assert(p
.is_rdlock());
1694 if (!lock
->can_rdlock(mut
->get_client()))
1696 p
.lock
->get_rdlock();
1697 mut
->emplace_lock(p
.lock
, MutationImpl::LockOp::RDLOCK
);
1702 // ------------------
1705 void Locker::wrlock_force(SimpleLock
*lock
, MutationRef
& mut
)
1707 if (lock
->get_type() == CEPH_LOCK_IVERSION
||
1708 lock
->get_type() == CEPH_LOCK_DVERSION
)
1709 return local_wrlock_grab(static_cast<LocalLockC
*>(lock
), mut
);
1711 dout(7) << "wrlock_force on " << *lock
1712 << " on " << *lock
->get_parent() << dendl
;
1713 lock
->get_wrlock(true);
1714 mut
->emplace_lock(lock
, MutationImpl::LockOp::WRLOCK
);
1717 bool Locker::wrlock_try(SimpleLock
*lock
, const MutationRef
& mut
, client_t client
)
1719 dout(10) << "wrlock_try " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1721 client
= mut
->get_client();
1724 if (lock
->can_wrlock(client
)) {
1726 auto it
= mut
->emplace_lock(lock
, MutationImpl::LockOp::WRLOCK
);
1727 it
->flags
|= MutationImpl::LockOp::WRLOCK
; // may already remote_wrlocked
1730 if (!lock
->is_stable())
1732 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1735 // caller may already has a log entry open. To avoid calling
1736 // scatter_writebehind or start_scatter. don't change nest lock
1737 // state if it has dirty scatterdata.
1738 if (lock
->is_dirty())
1740 // To avoid calling scatter_writebehind or start_scatter. don't
1741 // change nest lock state to MIX.
1742 ScatterLock
*slock
= static_cast<ScatterLock
*>(lock
);
1743 if (slock
->get_scatter_wanted() || in
->has_subtree_or_exporting_dirfrag())
1751 bool Locker::wrlock_start(const MutationImpl::LockOp
&op
, MDRequestRef
& mut
)
1753 SimpleLock
*lock
= op
.lock
;
1754 if (lock
->get_type() == CEPH_LOCK_IVERSION
||
1755 lock
->get_type() == CEPH_LOCK_DVERSION
)
1756 return local_wrlock_start(static_cast<LocalLockC
*>(lock
), mut
);
1758 dout(10) << "wrlock_start " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1760 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1761 client_t client
= op
.is_state_pin() ? lock
->get_excl_client() : mut
->get_client();
1762 bool want_scatter
= lock
->get_parent()->is_auth() &&
1763 (in
->has_subtree_or_exporting_dirfrag() ||
1764 static_cast<ScatterLock
*>(lock
)->get_scatter_wanted());
1768 if (lock
->can_wrlock(client
) &&
1769 (!want_scatter
|| lock
->get_state() == LOCK_MIX
)) {
1771 auto it
= mut
->emplace_lock(lock
, MutationImpl::LockOp::WRLOCK
);
1772 it
->flags
|= MutationImpl::LockOp::WRLOCK
; // may already remote_wrlocked
1776 if (lock
->get_type() == CEPH_LOCK_IFILE
&&
1777 in
->state_test(CInode::STATE_RECOVERING
)) {
1778 mds
->mdcache
->recovery_queue
.prioritize(in
);
1781 if (!lock
->is_stable())
1784 if (in
->is_auth()) {
1786 scatter_mix(static_cast<ScatterLock
*>(lock
));
1791 // auth should be auth_pinned (see acquire_locks wrlock weird mustpin case).
1792 mds_rank_t auth
= lock
->get_parent()->authority().first
;
1793 if (!mds
->is_cluster_degraded() ||
1794 mds
->mdsmap
->is_clientreplay_or_active_or_stopping(auth
)) {
1795 dout(10) << "requesting scatter from auth on "
1796 << *lock
<< " on " << *lock
->get_parent() << dendl
;
1797 mds
->send_message_mds(make_message
<MLock
>(lock
, LOCK_AC_REQSCATTER
, mds
->get_nodeid()), auth
);
1803 dout(7) << "wrlock_start waiting on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1804 lock
->add_waiter(SimpleLock::WAIT_STABLE
, new C_MDS_RetryRequest(mdcache
, mut
));
1810 void Locker::wrlock_finish(const MutationImpl::lock_iterator
& it
, MutationImpl
*mut
, bool *pneed_issue
)
1812 ceph_assert(it
->is_wrlock());
1813 SimpleLock
* lock
= it
->lock
;
1815 if (lock
->get_type() == CEPH_LOCK_IVERSION
||
1816 lock
->get_type() == CEPH_LOCK_DVERSION
)
1817 return local_wrlock_finish(it
, mut
);
1819 dout(7) << "wrlock_finish on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1822 if (it
->is_remote_wrlock())
1825 mut
->locks
.erase(it
);
1827 if (lock
->is_wrlocked()) {
1828 // Evaluate unstable lock after scatter_writebehind_finish(). Because
1829 // eval_gather() does not change lock's state when lock is flushing.
1830 if (!lock
->is_stable() && lock
->is_flushed() &&
1831 lock
->get_parent()->is_auth())
1832 eval_gather(lock
, false, pneed_issue
);
1834 if (!lock
->is_stable())
1835 eval_gather(lock
, false, pneed_issue
);
1836 else if (lock
->get_parent()->is_auth())
1837 try_eval(lock
, pneed_issue
);
1844 void Locker::remote_wrlock_start(SimpleLock
*lock
, mds_rank_t target
, MDRequestRef
& mut
)
1846 dout(7) << "remote_wrlock_start mds." << target
<< " on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1848 // wait for active target
1849 if (mds
->is_cluster_degraded() &&
1850 !mds
->mdsmap
->is_clientreplay_or_active_or_stopping(target
)) {
1851 dout(7) << " mds." << target
<< " is not active" << dendl
;
1852 if (mut
->more()->waiting_on_peer
.empty())
1853 mds
->wait_for_active_peer(target
, new C_MDS_RetryRequest(mdcache
, mut
));
1857 // send lock request
1858 mut
->start_locking(lock
, target
);
1859 mut
->more()->peers
.insert(target
);
1860 auto r
= make_message
<MMDSPeerRequest
>(mut
->reqid
, mut
->attempt
, MMDSPeerRequest::OP_WRLOCK
);
1861 r
->set_lock_type(lock
->get_type());
1862 lock
->get_parent()->set_object_info(r
->get_object_info());
1863 mds
->send_message_mds(r
, target
);
1865 ceph_assert(mut
->more()->waiting_on_peer
.count(target
) == 0);
1866 mut
->more()->waiting_on_peer
.insert(target
);
1869 void Locker::remote_wrlock_finish(const MutationImpl::lock_iterator
& it
, MutationImpl
*mut
)
1871 ceph_assert(it
->is_remote_wrlock());
1872 SimpleLock
*lock
= it
->lock
;
1873 mds_rank_t target
= it
->wrlock_target
;
1875 if (it
->is_wrlock())
1876 it
->clear_remote_wrlock();
1878 mut
->locks
.erase(it
);
1880 dout(7) << "remote_wrlock_finish releasing remote wrlock on mds." << target
1881 << " " << *lock
->get_parent() << dendl
;
1882 if (!mds
->is_cluster_degraded() ||
1883 mds
->mdsmap
->get_state(target
) >= MDSMap::STATE_REJOIN
) {
1884 auto peerreq
= make_message
<MMDSPeerRequest
>(mut
->reqid
, mut
->attempt
, MMDSPeerRequest::OP_UNWRLOCK
);
1885 peerreq
->set_lock_type(lock
->get_type());
1886 lock
->get_parent()->set_object_info(peerreq
->get_object_info());
1887 mds
->send_message_mds(peerreq
, target
);
1892 // ------------------
1895 bool Locker::xlock_start(SimpleLock
*lock
, MDRequestRef
& mut
)
1897 if (lock
->get_type() == CEPH_LOCK_IVERSION
||
1898 lock
->get_type() == CEPH_LOCK_DVERSION
)
1899 return local_xlock_start(static_cast<LocalLockC
*>(lock
), mut
);
1901 dout(7) << "xlock_start on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1902 client_t client
= mut
->get_client();
1904 CInode
*in
= nullptr;
1905 if (lock
->get_cap_shift())
1906 in
= static_cast<CInode
*>(lock
->get_parent());
1909 if (lock
->get_parent()->is_auth()) {
1912 if (mut
->locking
&& // started xlock (not preempt other request)
1913 lock
->can_xlock(client
) &&
1914 !(lock
->get_state() == LOCK_LOCK_XLOCK
&& // client is not xlocker or
1915 in
&& in
->issued_caps_need_gather(lock
))) { // xlocker does not hold shared cap
1916 lock
->set_state(LOCK_XLOCK
);
1917 lock
->get_xlock(mut
, client
);
1918 mut
->emplace_lock(lock
, MutationImpl::LockOp::XLOCK
);
1919 mut
->finish_locking(lock
);
1923 if (lock
->get_type() == CEPH_LOCK_IFILE
&&
1924 in
->state_test(CInode::STATE_RECOVERING
)) {
1925 mds
->mdcache
->recovery_queue
.prioritize(in
);
1928 if (!lock
->is_stable() && (lock
->get_state() != LOCK_XLOCKDONE
||
1929 lock
->get_xlock_by_client() != client
||
1930 lock
->is_waiter_for(SimpleLock::WAIT_STABLE
)))
1933 if (lock
->get_state() == LOCK_LOCK
|| lock
->get_state() == LOCK_XLOCKDONE
) {
1934 mut
->start_locking(lock
);
1941 lock
->add_waiter(SimpleLock::WAIT_WR
|SimpleLock::WAIT_STABLE
, new C_MDS_RetryRequest(mdcache
, mut
));
1946 ceph_assert(lock
->get_sm()->can_remote_xlock
);
1947 ceph_assert(!mut
->peer_request
);
1949 // wait for single auth
1950 if (lock
->get_parent()->is_ambiguous_auth()) {
1951 lock
->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
,
1952 new C_MDS_RetryRequest(mdcache
, mut
));
1956 // wait for active auth
1957 mds_rank_t auth
= lock
->get_parent()->authority().first
;
1958 if (mds
->is_cluster_degraded() &&
1959 !mds
->mdsmap
->is_clientreplay_or_active_or_stopping(auth
)) {
1960 dout(7) << " mds." << auth
<< " is not active" << dendl
;
1961 if (mut
->more()->waiting_on_peer
.empty())
1962 mds
->wait_for_active_peer(auth
, new C_MDS_RetryRequest(mdcache
, mut
));
1966 // send lock request
1967 mut
->more()->peers
.insert(auth
);
1968 mut
->start_locking(lock
, auth
);
1969 auto r
= make_message
<MMDSPeerRequest
>(mut
->reqid
, mut
->attempt
, MMDSPeerRequest::OP_XLOCK
);
1970 r
->set_lock_type(lock
->get_type());
1971 lock
->get_parent()->set_object_info(r
->get_object_info());
1972 mds
->send_message_mds(r
, auth
);
1974 ceph_assert(mut
->more()->waiting_on_peer
.count(auth
) == 0);
1975 mut
->more()->waiting_on_peer
.insert(auth
);
1981 void Locker::_finish_xlock(SimpleLock
*lock
, client_t xlocker
, bool *pneed_issue
)
1983 ceph_assert(!lock
->is_stable());
1984 if (lock
->get_type() != CEPH_LOCK_DN
&&
1985 lock
->get_type() != CEPH_LOCK_ISNAP
&&
1986 lock
->get_type() != CEPH_LOCK_IPOLICY
&&
1987 lock
->get_num_rdlocks() == 0 &&
1988 lock
->get_num_wrlocks() == 0 &&
1989 !lock
->is_leased() &&
1990 lock
->get_state() != LOCK_XLOCKSNAP
) {
1991 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1992 client_t loner
= in
->get_target_loner();
1993 if (loner
>= 0 && (xlocker
< 0 || xlocker
== loner
)) {
1994 lock
->set_state(LOCK_EXCL
);
1995 lock
->get_parent()->auth_unpin(lock
);
1996 lock
->finish_waiters(SimpleLock::WAIT_STABLE
|SimpleLock::WAIT_WR
|SimpleLock::WAIT_RD
);
1997 if (lock
->get_cap_shift())
1998 *pneed_issue
= true;
1999 if (lock
->get_parent()->is_auth() &&
2001 try_eval(lock
, pneed_issue
);
2005 // the xlocker may have CEPH_CAP_GSHARED, need to revoke it if next state is LOCK_LOCK
2006 eval_gather(lock
, lock
->get_state() != LOCK_XLOCKSNAP
, pneed_issue
);
2009 void Locker::xlock_finish(const MutationImpl::lock_iterator
& it
, MutationImpl
*mut
, bool *pneed_issue
)
2011 ceph_assert(it
->is_xlock());
2012 SimpleLock
*lock
= it
->lock
;
2014 if (lock
->get_type() == CEPH_LOCK_IVERSION
||
2015 lock
->get_type() == CEPH_LOCK_DVERSION
)
2016 return local_xlock_finish(it
, mut
);
2018 dout(10) << "xlock_finish on " << *lock
<< " " << *lock
->get_parent() << dendl
;
2020 client_t xlocker
= lock
->get_xlock_by_client();
2025 mut
->locks
.erase(it
);
2027 bool do_issue
= false;
2030 if (!lock
->get_parent()->is_auth()) {
2031 ceph_assert(lock
->get_sm()->can_remote_xlock
);
2034 dout(7) << "xlock_finish releasing remote xlock on " << *lock
->get_parent() << dendl
;
2035 mds_rank_t auth
= lock
->get_parent()->authority().first
;
2036 if (!mds
->is_cluster_degraded() ||
2037 mds
->mdsmap
->get_state(auth
) >= MDSMap::STATE_REJOIN
) {
2038 auto peerreq
= make_message
<MMDSPeerRequest
>(mut
->reqid
, mut
->attempt
, MMDSPeerRequest::OP_UNXLOCK
);
2039 peerreq
->set_lock_type(lock
->get_type());
2040 lock
->get_parent()->set_object_info(peerreq
->get_object_info());
2041 mds
->send_message_mds(peerreq
, auth
);
2044 lock
->finish_waiters(SimpleLock::WAIT_STABLE
|
2045 SimpleLock::WAIT_WR
|
2046 SimpleLock::WAIT_RD
, 0);
2048 if (lock
->get_num_xlocks() == 0 &&
2049 lock
->get_state() != LOCK_LOCK_XLOCK
) { // no one is taking xlock
2050 _finish_xlock(lock
, xlocker
, &do_issue
);
2055 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
2056 if (in
->is_head()) {
2058 *pneed_issue
= true;
2065 void Locker::xlock_export(const MutationImpl::lock_iterator
& it
, MutationImpl
*mut
)
2067 ceph_assert(it
->is_xlock());
2068 SimpleLock
*lock
= it
->lock
;
2069 dout(10) << "xlock_export on " << *lock
<< " " << *lock
->get_parent() << dendl
;
2072 mut
->locks
.erase(it
);
2074 MDSCacheObject
*p
= lock
->get_parent();
2075 ceph_assert(p
->state_test(CInode::STATE_AMBIGUOUSAUTH
)); // we are exporting this (inode)
2077 if (!lock
->is_stable())
2078 lock
->get_parent()->auth_unpin(lock
);
2080 lock
->set_state(LOCK_LOCK
);
2083 void Locker::xlock_import(SimpleLock
*lock
)
2085 dout(10) << "xlock_import on " << *lock
<< " " << *lock
->get_parent() << dendl
;
2086 lock
->get_parent()->auth_pin(lock
);
2089 void Locker::xlock_downgrade(SimpleLock
*lock
, MutationImpl
*mut
)
2091 dout(10) << "xlock_downgrade on " << *lock
<< " " << *lock
->get_parent() << dendl
;
2092 auto it
= mut
->locks
.find(lock
);
2093 if (it
->is_rdlock())
2094 return; // already downgraded
2096 ceph_assert(lock
->get_parent()->is_auth());
2097 ceph_assert(it
!= mut
->locks
.end());
2098 ceph_assert(it
->is_xlock());
2100 lock
->set_xlock_done();
2102 xlock_finish(it
, mut
, nullptr);
2103 mut
->emplace_lock(lock
, MutationImpl::LockOp::RDLOCK
);
2107 // file i/o -----------------------------------------
2109 version_t
Locker::issue_file_data_version(CInode
*in
)
2111 dout(7) << "issue_file_data_version on " << *in
<< dendl
;
2112 return in
->get_inode()->file_data_version
;
2115 class C_Locker_FileUpdate_finish
: public LockerLogContext
{
2120 ref_t
<MClientCaps
> ack
;
2122 C_Locker_FileUpdate_finish(Locker
*l
, CInode
*i
, MutationRef
& m
, unsigned f
,
2123 const ref_t
<MClientCaps
> &ack
, client_t c
=-1)
2124 : LockerLogContext(l
), in(i
), mut(m
), flags(f
), client(c
), ack(ack
) {
2125 in
->get(CInode::PIN_PTRWAITER
);
2127 void finish(int r
) override
{
2128 locker
->file_update_finish(in
, mut
, flags
, client
, ack
);
2129 in
->put(CInode::PIN_PTRWAITER
);
2134 UPDATE_SHAREMAX
= 1,
2135 UPDATE_NEEDSISSUE
= 2,
2136 UPDATE_SNAPFLUSH
= 4,
2139 void Locker::file_update_finish(CInode
*in
, MutationRef
& mut
, unsigned flags
,
2140 client_t client
, const ref_t
<MClientCaps
> &ack
)
2142 dout(10) << "file_update_finish on " << *in
<< dendl
;
2147 Session
*session
= mds
->get_session(client
);
2148 if (session
&& !session
->is_closed()) {
2149 // "oldest flush tid" > 0 means client uses unique TID for each flush
2150 if (ack
->get_oldest_flush_tid() > 0)
2151 session
->add_completed_flush(ack
->get_client_tid());
2152 mds
->send_message_client_counted(ack
, session
);
2154 dout(10) << " no session for client." << client
<< " " << *ack
<< dendl
;
2158 set
<CInode
*> need_issue
;
2159 drop_locks(mut
.get(), &need_issue
);
2161 if (in
->is_head()) {
2162 if ((flags
& UPDATE_NEEDSISSUE
) && need_issue
.count(in
) == 0) {
2163 Capability
*cap
= in
->get_client_cap(client
);
2164 if (cap
&& (cap
->wanted() & ~cap
->pending()))
2165 issue_caps(in
, cap
);
2168 if ((flags
& UPDATE_SHAREMAX
) && in
->is_auth() &&
2169 (in
->filelock
.gcaps_allowed(CAP_LONER
) & (CEPH_CAP_GWR
|CEPH_CAP_GBUFFER
)))
2170 share_inode_max_size(in
);
2172 } else if ((flags
& UPDATE_SNAPFLUSH
) && !in
->client_snap_caps
.empty()) {
2173 dout(10) << " client_snap_caps " << in
->client_snap_caps
<< dendl
;
2174 // check for snap writeback completion
2175 in
->client_snap_caps
.erase(client
);
2176 if (in
->client_snap_caps
.empty()) {
2177 for (int i
= 0; i
< num_cinode_locks
; i
++) {
2178 SimpleLock
*lock
= in
->get_lock(cinode_lock_info
[i
].lock
);
2182 in
->item_open_file
.remove_myself();
2183 in
->item_caps
.remove_myself();
2184 eval_cap_gather(in
, &need_issue
);
2187 issue_caps_set(need_issue
);
2189 mds
->balancer
->hit_inode(in
, META_POP_IWR
);
2191 // auth unpin after issuing caps
2195 Capability
* Locker::issue_new_caps(CInode
*in
,
2200 dout(7) << "issue_new_caps for mode " << mode
<< " on " << *in
<< dendl
;
2201 Session
*session
= mdr
->session
;
2202 bool new_inode
= (mdr
->alloc_ino
|| mdr
->used_prealloc_ino
);
2204 // if replay or async, try to reconnect cap, and otherwise do nothing.
2205 if (new_inode
&& mdr
->client_request
->is_queued_for_replay())
2206 return mds
->mdcache
->try_reconnect_cap(in
, session
);
2209 ceph_assert(session
->info
.inst
.name
.is_client());
2210 client_t my_client
= session
->get_client();
2211 int my_want
= ceph_caps_for_mode(mode
);
2213 // register a capability
2214 Capability
*cap
= in
->get_client_cap(my_client
);
2217 cap
= in
->add_client_cap(my_client
, session
, realm
, new_inode
);
2218 cap
->set_wanted(my_want
);
2221 // make sure it wants sufficient caps
2222 if (my_want
& ~cap
->wanted()) {
2223 // augment wanted caps for this client
2224 cap
->set_wanted(cap
->wanted() | my_want
);
2227 cap
->inc_suppress(); // suppress file cap messages (we'll bundle with the request reply)
2229 if (in
->is_auth()) {
2230 // [auth] twiddle mode?
2231 eval(in
, CEPH_CAP_LOCKS
);
2233 if (_need_flush_mdlog(in
, my_want
))
2234 mds
->mdlog
->flush();
2237 // [replica] tell auth about any new caps wanted
2238 request_inode_file_caps(in
);
2241 // issue caps (pot. incl new one)
2242 //issue_caps(in); // note: _eval above may have done this already...
2244 // re-issue whatever we can
2245 //cap->issue(cap->pending());
2247 cap
->dec_suppress();
2252 void Locker::issue_caps_set(set
<CInode
*>& inset
)
2254 for (set
<CInode
*>::iterator p
= inset
.begin(); p
!= inset
.end(); ++p
)
2258 class C_Locker_RevokeStaleCap
: public LockerContext
{
2262 C_Locker_RevokeStaleCap(Locker
*l
, CInode
*i
, client_t c
) :
2263 LockerContext(l
), in(i
), client(c
) {
2264 in
->get(CInode::PIN_PTRWAITER
);
2266 void finish(int r
) override
{
2267 locker
->revoke_stale_cap(in
, client
);
2268 in
->put(CInode::PIN_PTRWAITER
);
2272 int Locker::issue_caps(CInode
*in
, Capability
*only_cap
)
2274 // allowed caps are determined by the lock mode.
2275 int all_allowed
= in
->get_caps_allowed_by_type(CAP_ANY
);
2276 int loner_allowed
= in
->get_caps_allowed_by_type(CAP_LONER
);
2277 int xlocker_allowed
= in
->get_caps_allowed_by_type(CAP_XLOCKER
);
2279 client_t loner
= in
->get_loner();
2281 dout(7) << "issue_caps loner client." << loner
2282 << " allowed=" << ccap_string(loner_allowed
)
2283 << ", xlocker allowed=" << ccap_string(xlocker_allowed
)
2284 << ", others allowed=" << ccap_string(all_allowed
)
2285 << " on " << *in
<< dendl
;
2287 dout(7) << "issue_caps allowed=" << ccap_string(all_allowed
)
2288 << ", xlocker allowed=" << ccap_string(xlocker_allowed
)
2289 << " on " << *in
<< dendl
;
2292 ceph_assert(in
->is_head());
2294 // count conflicts with
2298 map
<client_t
, Capability
>::iterator it
;
2300 it
= in
->client_caps
.find(only_cap
->get_client());
2302 it
= in
->client_caps
.begin();
2303 for (; it
!= in
->client_caps
.end(); ++it
) {
2304 Capability
*cap
= &it
->second
;
2306 // do not issue _new_ bits when size|mtime is projected
2308 if (loner
== it
->first
)
2309 allowed
= loner_allowed
;
2311 allowed
= all_allowed
;
2313 // add in any xlocker-only caps (for locks this client is the xlocker for)
2314 allowed
|= xlocker_allowed
& in
->get_xlocker_mask(it
->first
);
2316 allowed
&= ~CEPH_CAP_ANY_DIR_OPS
;
2317 if (allowed
& CEPH_CAP_FILE_EXCL
)
2318 allowed
|= cap
->get_lock_cache_allowed();
2321 if ((in
->get_inode()->inline_data
.version
!= CEPH_INLINE_NONE
&&
2322 cap
->is_noinline()) ||
2323 (!in
->get_inode()->layout
.pool_ns
.empty() &&
2324 cap
->is_nopoolns()))
2325 allowed
&= ~(CEPH_CAP_FILE_RD
| CEPH_CAP_FILE_WR
);
2327 int pending
= cap
->pending();
2328 int wanted
= cap
->wanted();
2330 dout(20) << " client." << it
->first
2331 << " pending " << ccap_string(pending
)
2332 << " allowed " << ccap_string(allowed
)
2333 << " wanted " << ccap_string(wanted
)
2336 if (!(pending
& ~allowed
)) {
2337 // skip if suppress or new, and not revocation
2338 if (cap
->is_new() || cap
->is_suppress() || cap
->is_stale()) {
2339 dout(20) << " !revoke and new|suppressed|stale, skipping client." << it
->first
<< dendl
;
2343 ceph_assert(!cap
->is_new());
2344 if (cap
->is_stale()) {
2345 dout(20) << " revoke stale cap from client." << it
->first
<< dendl
;
2346 ceph_assert(!cap
->is_valid());
2347 cap
->issue(allowed
& pending
, false);
2348 mds
->queue_waiter_front(new C_Locker_RevokeStaleCap(this, in
, it
->first
));
2352 if (!cap
->is_valid() && (pending
& ~CEPH_CAP_PIN
)) {
2353 // After stale->resume circle, client thinks it only has CEPH_CAP_PIN.
2354 // mds needs to re-issue caps, then do revocation.
2355 long seq
= cap
->issue(pending
, true);
2357 dout(7) << " sending MClientCaps to client." << it
->first
2358 << " seq " << seq
<< " re-issue " << ccap_string(pending
) << dendl
;
2360 if (mds
->logger
) mds
->logger
->inc(l_mdss_ceph_cap_op_grant
);
2362 auto m
= make_message
<MClientCaps
>(CEPH_CAP_OP_GRANT
, in
->ino(),
2363 in
->find_snaprealm()->inode
->ino(),
2364 cap
->get_cap_id(), cap
->get_last_seq(),
2365 pending
, wanted
, 0, cap
->get_mseq(),
2366 mds
->get_osd_epoch_barrier());
2367 in
->encode_cap_message(m
, cap
);
2369 mds
->send_message_client_counted(m
, cap
->get_session());
2373 // notify clients about deleted inode, to make sure they release caps ASAP.
2374 if (in
->get_inode()->nlink
== 0)
2375 wanted
|= CEPH_CAP_LINK_SHARED
;
2377 // are there caps that the client _wants_ and can have, but aren't pending?
2378 // or do we need to revoke?
2379 if ((pending
& ~allowed
) || // need to revoke ~allowed caps.
2380 ((wanted
& allowed
) & ~pending
) || // missing wanted+allowed caps
2381 !cap
->is_valid()) { // after stale->resume circle
2385 // include caps that clients generally like, while we're at it.
2386 int likes
= in
->get_caps_liked();
2387 int before
= pending
;
2389 if (pending
& ~allowed
)
2390 seq
= cap
->issue((wanted
|likes
) & allowed
& pending
, true); // if revoking, don't issue anything new.
2392 seq
= cap
->issue((wanted
|likes
) & allowed
, true);
2393 int after
= cap
->pending();
2395 dout(7) << " sending MClientCaps to client." << it
->first
2396 << " seq " << seq
<< " new pending " << ccap_string(after
)
2397 << " was " << ccap_string(before
) << dendl
;
2399 int op
= (before
& ~after
) ? CEPH_CAP_OP_REVOKE
: CEPH_CAP_OP_GRANT
;
2400 if (op
== CEPH_CAP_OP_REVOKE
) {
2401 if (mds
->logger
) mds
->logger
->inc(l_mdss_ceph_cap_op_revoke
);
2402 revoking_caps
.push_back(&cap
->item_revoking_caps
);
2403 revoking_caps_by_client
[cap
->get_client()].push_back(&cap
->item_client_revoking_caps
);
2404 cap
->set_last_revoke_stamp(ceph_clock_now());
2405 cap
->reset_num_revoke_warnings();
2407 if (mds
->logger
) mds
->logger
->inc(l_mdss_ceph_cap_op_grant
);
2410 auto m
= make_message
<MClientCaps
>(op
, in
->ino(),
2411 in
->find_snaprealm()->inode
->ino(),
2412 cap
->get_cap_id(), cap
->get_last_seq(),
2413 after
, wanted
, 0, cap
->get_mseq(),
2414 mds
->get_osd_epoch_barrier());
2415 in
->encode_cap_message(m
, cap
);
2417 mds
->send_message_client_counted(m
, cap
->get_session());
2427 void Locker::issue_truncate(CInode
*in
)
2429 dout(7) << "issue_truncate on " << *in
<< dendl
;
2431 for (auto &p
: in
->client_caps
) {
2432 if (mds
->logger
) mds
->logger
->inc(l_mdss_ceph_cap_op_trunc
);
2433 Capability
*cap
= &p
.second
;
2434 auto m
= make_message
<MClientCaps
>(CEPH_CAP_OP_TRUNC
,
2436 in
->find_snaprealm()->inode
->ino(),
2437 cap
->get_cap_id(), cap
->get_last_seq(),
2438 cap
->pending(), cap
->wanted(), 0,
2440 mds
->get_osd_epoch_barrier());
2441 in
->encode_cap_message(m
, cap
);
2442 mds
->send_message_client_counted(m
, p
.first
);
2445 // should we increase max_size?
2446 if (in
->is_auth() && in
->is_file())
2447 check_inode_max_size(in
);
2451 void Locker::revoke_stale_cap(CInode
*in
, client_t client
)
2453 dout(7) << __func__
<< " client." << client
<< " on " << *in
<< dendl
;
2454 Capability
*cap
= in
->get_client_cap(client
);
2458 if (cap
->revoking() & CEPH_CAP_ANY_WR
) {
2459 CachedStackStringStream css
;
2460 mds
->evict_client(client
.v
, false, g_conf()->mds_session_blocklist_on_timeout
, *css
, nullptr);
2466 if (in
->is_auth() && in
->get_inode()->client_ranges
.count(cap
->get_client()))
2467 in
->state_set(CInode::STATE_NEEDSRECOVER
);
2469 if (in
->state_test(CInode::STATE_EXPORTINGCAPS
))
2472 if (!in
->filelock
.is_stable())
2473 eval_gather(&in
->filelock
);
2474 if (!in
->linklock
.is_stable())
2475 eval_gather(&in
->linklock
);
2476 if (!in
->authlock
.is_stable())
2477 eval_gather(&in
->authlock
);
2478 if (!in
->xattrlock
.is_stable())
2479 eval_gather(&in
->xattrlock
);
2482 try_eval(in
, CEPH_CAP_LOCKS
);
2484 request_inode_file_caps(in
);
2487 bool Locker::revoke_stale_caps(Session
*session
)
2489 dout(10) << "revoke_stale_caps for " << session
->info
.inst
.name
<< dendl
;
2491 // invalidate all caps
2492 session
->inc_cap_gen();
2495 std::vector
<CInode
*> to_eval
;
2497 for (auto p
= session
->caps
.begin(); !p
.end(); ) {
2498 Capability
*cap
= *p
;
2500 if (!cap
->is_notable()) {
2501 // the rest ones are not being revoked and don't have writeable range
2502 // and don't want exclusive caps or want file read/write. They don't
2503 // need recover, they don't affect eval_gather()/try_eval()
2507 int revoking
= cap
->revoking();
2511 if (revoking
& CEPH_CAP_ANY_WR
) {
2516 int issued
= cap
->issued();
2517 CInode
*in
= cap
->get_inode();
2518 dout(10) << " revoking " << ccap_string(issued
) << " on " << *in
<< dendl
;
2519 int revoked
= cap
->revoke();
2520 if (revoked
& CEPH_CAP_ANY_DIR_OPS
)
2521 eval_lock_caches(cap
);
2523 if (in
->is_auth() &&
2524 in
->get_inode()->client_ranges
.count(cap
->get_client()))
2525 in
->state_set(CInode::STATE_NEEDSRECOVER
);
2527 // eval lock/inode may finish contexts, which may modify other cap's position
2528 // in the session->caps.
2529 to_eval
.push_back(in
);
2532 for (auto in
: to_eval
) {
2533 if (in
->state_test(CInode::STATE_EXPORTINGCAPS
))
2536 if (!in
->filelock
.is_stable())
2537 eval_gather(&in
->filelock
);
2538 if (!in
->linklock
.is_stable())
2539 eval_gather(&in
->linklock
);
2540 if (!in
->authlock
.is_stable())
2541 eval_gather(&in
->authlock
);
2542 if (!in
->xattrlock
.is_stable())
2543 eval_gather(&in
->xattrlock
);
2546 try_eval(in
, CEPH_CAP_LOCKS
);
2548 request_inode_file_caps(in
);
2554 void Locker::resume_stale_caps(Session
*session
)
2556 dout(10) << "resume_stale_caps for " << session
->info
.inst
.name
<< dendl
;
2558 bool lazy
= session
->info
.has_feature(CEPHFS_FEATURE_LAZY_CAP_WANTED
);
2559 for (xlist
<Capability
*>::iterator p
= session
->caps
.begin(); !p
.end(); ) {
2560 Capability
*cap
= *p
;
2562 if (lazy
&& !cap
->is_notable())
2563 break; // see revoke_stale_caps()
2565 CInode
*in
= cap
->get_inode();
2566 ceph_assert(in
->is_head());
2567 dout(10) << " clearing stale flag on " << *in
<< dendl
;
2569 if (in
->state_test(CInode::STATE_EXPORTINGCAPS
)) {
2570 // if export succeeds, the cap will be removed. if export fails,
2571 // we need to re-issue the cap if it's not stale.
2572 in
->state_set(CInode::STATE_EVALSTALECAPS
);
2576 if (!in
->is_auth() || !eval(in
, CEPH_CAP_LOCKS
))
2577 issue_caps(in
, cap
);
2581 void Locker::remove_stale_leases(Session
*session
)
2583 dout(10) << "remove_stale_leases for " << session
->info
.inst
.name
<< dendl
;
2584 xlist
<ClientLease
*>::iterator p
= session
->leases
.begin();
2586 ClientLease
*l
= *p
;
2588 CDentry
*parent
= static_cast<CDentry
*>(l
->parent
);
2589 dout(15) << " removing lease on " << *parent
<< dendl
;
2590 parent
->remove_client_lease(l
, this);
2595 class C_MDL_RequestInodeFileCaps
: public LockerContext
{
2598 C_MDL_RequestInodeFileCaps(Locker
*l
, CInode
*i
) : LockerContext(l
), in(i
) {
2599 in
->get(CInode::PIN_PTRWAITER
);
2601 void finish(int r
) override
{
2603 locker
->request_inode_file_caps(in
);
2604 in
->put(CInode::PIN_PTRWAITER
);
2608 void Locker::request_inode_file_caps(CInode
*in
)
2610 ceph_assert(!in
->is_auth());
2612 int wanted
= in
->get_caps_wanted() & in
->get_caps_allowed_ever() & ~CEPH_CAP_PIN
;
2613 if (wanted
!= in
->replica_caps_wanted
) {
2614 // wait for single auth
2615 if (in
->is_ambiguous_auth()) {
2616 in
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
,
2617 new C_MDL_RequestInodeFileCaps(this, in
));
2621 mds_rank_t auth
= in
->authority().first
;
2622 if (mds
->is_cluster_degraded() &&
2623 mds
->mdsmap
->get_state(auth
) == MDSMap::STATE_REJOIN
) {
2624 mds
->wait_for_active_peer(auth
, new C_MDL_RequestInodeFileCaps(this, in
));
2628 dout(7) << "request_inode_file_caps " << ccap_string(wanted
)
2629 << " was " << ccap_string(in
->replica_caps_wanted
)
2630 << " on " << *in
<< " to mds." << auth
<< dendl
;
2632 in
->replica_caps_wanted
= wanted
;
2634 if (!mds
->is_cluster_degraded() ||
2635 mds
->mdsmap
->is_clientreplay_or_active_or_stopping(auth
))
2636 mds
->send_message_mds(make_message
<MInodeFileCaps
>(in
->ino(), in
->replica_caps_wanted
), auth
);
2640 void Locker::handle_inode_file_caps(const cref_t
<MInodeFileCaps
> &m
)
2642 // nobody should be talking to us during recovery.
2643 if (mds
->get_state() < MDSMap::STATE_CLIENTREPLAY
) {
2644 if (mds
->get_want_state() >= MDSMap::STATE_CLIENTREPLAY
) {
2645 mds
->wait_for_replay(new C_MDS_RetryMessage(mds
, m
));
2648 ceph_abort_msg("got unexpected message during recovery");
2652 CInode
*in
= mdcache
->get_inode(m
->get_ino());
2653 mds_rank_t from
= mds_rank_t(m
->get_source().num());
2656 ceph_assert(in
->is_auth());
2658 dout(7) << "handle_inode_file_caps replica mds." << from
<< " wants caps " << ccap_string(m
->get_caps()) << " on " << *in
<< dendl
;
2660 if (mds
->logger
) mds
->logger
->inc(l_mdss_handle_inode_file_caps
);
2662 in
->set_mds_caps_wanted(from
, m
->get_caps());
2664 try_eval(in
, CEPH_CAP_LOCKS
);
2668 class C_MDL_CheckMaxSize
: public LockerContext
{
2670 uint64_t new_max_size
;
2675 C_MDL_CheckMaxSize(Locker
*l
, CInode
*i
, uint64_t _new_max_size
,
2676 uint64_t _newsize
, utime_t _mtime
) :
2677 LockerContext(l
), in(i
),
2678 new_max_size(_new_max_size
), newsize(_newsize
), mtime(_mtime
)
2680 in
->get(CInode::PIN_PTRWAITER
);
2682 void finish(int r
) override
{
2684 locker
->check_inode_max_size(in
, false, new_max_size
, newsize
, mtime
);
2685 in
->put(CInode::PIN_PTRWAITER
);
2689 uint64_t Locker::calc_new_max_size(const CInode::inode_const_ptr
&pi
, uint64_t size
)
2691 uint64_t new_max
= (size
+ 1) << 1;
2692 uint64_t max_inc
= g_conf()->mds_client_writeable_range_max_inc_objs
;
2694 max_inc
*= pi
->layout
.object_size
;
2695 new_max
= std::min(new_max
, size
+ max_inc
);
2697 return round_up_to(new_max
, pi
->get_layout_size_increment());
2700 bool Locker::check_client_ranges(CInode
*in
, uint64_t size
)
2702 const auto& latest
= in
->get_projected_inode();
2704 if (latest
->has_layout()) {
2705 ms
= calc_new_max_size(latest
, size
);
2707 // Layout-less directories like ~mds0/, have zero size
2711 auto it
= latest
->client_ranges
.begin();
2712 for (auto &p
: in
->client_caps
) {
2713 if ((p
.second
.issued() | p
.second
.wanted()) & CEPH_CAP_ANY_FILE_WR
) {
2714 if (it
== latest
->client_ranges
.end())
2716 if (it
->first
!= p
.first
)
2718 if (ms
> it
->second
.range
.last
)
2723 return it
!= latest
->client_ranges
.end();
2726 bool Locker::calc_new_client_ranges(CInode
*in
, uint64_t size
, bool *max_increased
)
2728 const auto& latest
= in
->get_projected_inode();
2730 if (latest
->has_layout()) {
2731 ms
= calc_new_max_size(latest
, size
);
2733 // Layout-less directories like ~mds0/, have zero size
2737 auto pi
= in
->_get_projected_inode();
2738 bool updated
= false;
2740 // increase ranges as appropriate.
2741 // shrink to 0 if no WR|BUFFER caps issued.
2742 auto it
= pi
->client_ranges
.begin();
2743 for (auto &p
: in
->client_caps
) {
2744 if ((p
.second
.issued() | p
.second
.wanted()) & CEPH_CAP_ANY_FILE_WR
) {
2745 while (it
!= pi
->client_ranges
.end() && it
->first
< p
.first
) {
2746 it
= pi
->client_ranges
.erase(it
);
2750 if (it
!= pi
->client_ranges
.end() && it
->first
== p
.first
) {
2751 if (ms
> it
->second
.range
.last
) {
2752 it
->second
.range
.last
= ms
;
2755 *max_increased
= true;
2758 it
= pi
->client_ranges
.emplace_hint(it
, std::piecewise_construct
,
2759 std::forward_as_tuple(p
.first
),
2760 std::forward_as_tuple());
2761 it
->second
.range
.last
= ms
;
2762 it
->second
.follows
= in
->first
- 1;
2765 *max_increased
= true;
2767 p
.second
.mark_clientwriteable();
2770 p
.second
.clear_clientwriteable();
2773 while (it
!= pi
->client_ranges
.end()) {
2774 it
= pi
->client_ranges
.erase(it
);
2778 if (pi
->client_ranges
.empty())
2779 in
->clear_clientwriteable();
2781 in
->mark_clientwriteable();
2786 bool Locker::check_inode_max_size(CInode
*in
, bool force_wrlock
,
2787 uint64_t new_max_size
, uint64_t new_size
,
2790 ceph_assert(in
->is_auth());
2791 ceph_assert(in
->is_file());
2793 const auto& latest
= in
->get_projected_inode();
2794 uint64_t size
= latest
->size
;
2795 bool update_size
= new_size
> 0;
2798 new_size
= size
= std::max(size
, new_size
);
2799 new_mtime
= std::max(new_mtime
, latest
->mtime
);
2800 if (latest
->size
== new_size
&& latest
->mtime
== new_mtime
)
2801 update_size
= false;
2804 bool new_ranges
= check_client_ranges(in
, std::max(new_max_size
, size
));
2805 if (!update_size
&& !new_ranges
) {
2806 dout(20) << "check_inode_max_size no-op on " << *in
<< dendl
;
2810 dout(10) << "check_inode_max_size new_ranges " << new_ranges
2811 << " update_size " << update_size
2812 << " on " << *in
<< dendl
;
2814 if (in
->is_frozen()) {
2815 dout(10) << "check_inode_max_size frozen, waiting on " << *in
<< dendl
;
2816 in
->add_waiter(CInode::WAIT_UNFREEZE
,
2817 new C_MDL_CheckMaxSize(this, in
, new_max_size
, new_size
, new_mtime
));
2819 } else if (!force_wrlock
&& !in
->filelock
.can_wrlock(in
->get_loner())) {
2821 if (in
->filelock
.is_stable()) {
2822 if (in
->get_target_loner() >= 0)
2823 file_excl(&in
->filelock
);
2825 simple_lock(&in
->filelock
);
2827 if (!in
->filelock
.can_wrlock(in
->get_loner())) {
2828 dout(10) << "check_inode_max_size can't wrlock, waiting on " << *in
<< dendl
;
2829 in
->filelock
.add_waiter(SimpleLock::WAIT_STABLE
,
2830 new C_MDL_CheckMaxSize(this, in
, new_max_size
, new_size
, new_mtime
));
2835 MutationRef
mut(new MutationImpl());
2836 mut
->ls
= mds
->mdlog
->get_current_segment();
2838 auto pi
= in
->project_inode(mut
);
2839 pi
.inode
->version
= in
->pre_dirty();
2841 bool max_increased
= false;
2843 calc_new_client_ranges(in
, std::max(new_max_size
, size
), &max_increased
)) {
2844 dout(10) << "check_inode_max_size client_ranges "
2845 << in
->get_previous_projected_inode()->client_ranges
2846 << " -> " << pi
.inode
->client_ranges
<< dendl
;
2850 dout(10) << "check_inode_max_size size " << pi
.inode
->size
<< " -> " << new_size
<< dendl
;
2851 pi
.inode
->size
= new_size
;
2852 pi
.inode
->rstat
.rbytes
= new_size
;
2853 dout(10) << "check_inode_max_size mtime " << pi
.inode
->mtime
<< " -> " << new_mtime
<< dendl
;
2854 pi
.inode
->mtime
= new_mtime
;
2855 if (new_mtime
> pi
.inode
->ctime
) {
2856 pi
.inode
->ctime
= new_mtime
;
2857 if (new_mtime
> pi
.inode
->rstat
.rctime
)
2858 pi
.inode
->rstat
.rctime
= new_mtime
;
2862 // use EOpen if the file is still open; otherwise, use EUpdate.
2863 // this is just an optimization to push open files forward into
2864 // newer log segments.
2866 EMetaBlob
*metablob
;
2867 if (in
->is_any_caps_wanted() && in
->last
== CEPH_NOSNAP
) {
2868 EOpen
*eo
= new EOpen(mds
->mdlog
);
2869 eo
->add_ino(in
->ino());
2870 metablob
= &eo
->metablob
;
2873 EUpdate
*eu
= new EUpdate(mds
->mdlog
, "check_inode_max_size");
2874 metablob
= &eu
->metablob
;
2877 mds
->mdlog
->start_entry(le
);
2879 mdcache
->predirty_journal_parents(mut
, metablob
, in
, 0, PREDIRTY_PRIMARY
);
2881 CDentry
*parent
= in
->get_projected_parent_dn();
2882 metablob
->add_primary_dentry(parent
, in
, true);
2883 mdcache
->journal_dirty_inode(mut
.get(), metablob
, in
);
2885 mds
->mdlog
->submit_entry(le
, new C_Locker_FileUpdate_finish(this, in
, mut
,
2886 UPDATE_SHAREMAX
, ref_t
<MClientCaps
>()));
2887 wrlock_force(&in
->filelock
, mut
); // wrlock for duration of journal
2890 // make max_size _increase_ timely
2892 mds
->mdlog
->flush();
2898 void Locker::share_inode_max_size(CInode
*in
, Capability
*only_cap
)
2901 * only share if currently issued a WR cap. if client doesn't have it,
2902 * file_max doesn't matter, and the client will get it if/when they get
2905 dout(10) << "share_inode_max_size on " << *in
<< dendl
;
2906 map
<client_t
, Capability
>::iterator it
;
2908 it
= in
->client_caps
.find(only_cap
->get_client());
2910 it
= in
->client_caps
.begin();
2911 for (; it
!= in
->client_caps
.end(); ++it
) {
2912 const client_t client
= it
->first
;
2913 Capability
*cap
= &it
->second
;
2914 if (cap
->is_suppress())
2916 if (cap
->pending() & (CEPH_CAP_FILE_WR
|CEPH_CAP_FILE_BUFFER
)) {
2917 dout(10) << "share_inode_max_size with client." << client
<< dendl
;
2918 if (mds
->logger
) mds
->logger
->inc(l_mdss_ceph_cap_op_grant
);
2919 cap
->inc_last_seq();
2920 auto m
= make_message
<MClientCaps
>(CEPH_CAP_OP_GRANT
,
2922 in
->find_snaprealm()->inode
->ino(),
2924 cap
->get_last_seq(),
2928 mds
->get_osd_epoch_barrier());
2929 in
->encode_cap_message(m
, cap
);
2930 mds
->send_message_client_counted(m
, client
);
2937 bool Locker::_need_flush_mdlog(CInode
*in
, int wanted
)
2939 /* flush log if caps are wanted by client but corresponding lock is unstable and locked by
2940 * pending mutations. */
2941 if (((wanted
& (CEPH_CAP_FILE_RD
|CEPH_CAP_FILE_WR
|CEPH_CAP_FILE_SHARED
|CEPH_CAP_FILE_EXCL
)) &&
2942 in
->filelock
.is_unstable_and_locked()) ||
2943 ((wanted
& (CEPH_CAP_AUTH_SHARED
|CEPH_CAP_AUTH_EXCL
)) &&
2944 in
->authlock
.is_unstable_and_locked()) ||
2945 ((wanted
& (CEPH_CAP_LINK_SHARED
|CEPH_CAP_LINK_EXCL
)) &&
2946 in
->linklock
.is_unstable_and_locked()) ||
2947 ((wanted
& (CEPH_CAP_XATTR_SHARED
|CEPH_CAP_XATTR_EXCL
)) &&
2948 in
->xattrlock
.is_unstable_and_locked()))
2953 void Locker::adjust_cap_wanted(Capability
*cap
, int wanted
, int issue_seq
)
2955 if (ceph_seq_cmp(issue_seq
, cap
->get_last_issue()) == 0) {
2956 dout(10) << " wanted " << ccap_string(cap
->wanted())
2957 << " -> " << ccap_string(wanted
) << dendl
;
2958 cap
->set_wanted(wanted
);
2959 } else if (wanted
& ~cap
->wanted()) {
2960 dout(10) << " wanted " << ccap_string(cap
->wanted())
2961 << " -> " << ccap_string(wanted
)
2962 << " (added caps even though we had seq mismatch!)" << dendl
;
2963 cap
->set_wanted(wanted
| cap
->wanted());
2965 dout(10) << " NOT changing wanted " << ccap_string(cap
->wanted())
2966 << " -> " << ccap_string(wanted
)
2967 << " (issue_seq " << issue_seq
<< " != last_issue "
2968 << cap
->get_last_issue() << ")" << dendl
;
2972 CInode
*cur
= cap
->get_inode();
2973 if (!cur
->is_auth()) {
2974 request_inode_file_caps(cur
);
2978 if (cap
->wanted()) {
2979 if (cur
->state_test(CInode::STATE_RECOVERING
) &&
2980 (cap
->wanted() & (CEPH_CAP_FILE_RD
|
2981 CEPH_CAP_FILE_WR
))) {
2982 mds
->mdcache
->recovery_queue
.prioritize(cur
);
2985 if (mdcache
->open_file_table
.should_log_open(cur
)) {
2986 ceph_assert(cur
->last
== CEPH_NOSNAP
);
2987 EOpen
*le
= new EOpen(mds
->mdlog
);
2988 mds
->mdlog
->start_entry(le
);
2989 le
->add_clean_inode(cur
);
2990 mds
->mdlog
->submit_entry(le
);
2995 void Locker::snapflush_nudge(CInode
*in
)
2997 ceph_assert(in
->last
!= CEPH_NOSNAP
);
2998 if (in
->client_snap_caps
.empty())
3001 CInode
*head
= mdcache
->get_inode(in
->ino());
3002 // head inode gets unpinned when snapflush starts. It might get trimmed
3003 // before snapflush finishes.
3007 ceph_assert(head
->is_auth());
3008 if (head
->client_need_snapflush
.empty())
3011 SimpleLock
*hlock
= head
->get_lock(CEPH_LOCK_IFILE
);
3012 if (hlock
->get_state() == LOCK_SYNC
|| !hlock
->is_stable()) {
3014 for (int i
= 0; i
< num_cinode_locks
; i
++) {
3015 SimpleLock
*lock
= head
->get_lock(cinode_lock_info
[i
].lock
);
3016 if (lock
->get_state() != LOCK_SYNC
&& lock
->is_stable()) {
3023 _rdlock_kick(hlock
, true);
3025 // also, requeue, in case of unstable lock
3026 need_snapflush_inodes
.push_back(&in
->item_caps
);
3030 void Locker::mark_need_snapflush_inode(CInode
*in
)
3032 ceph_assert(in
->last
!= CEPH_NOSNAP
);
3033 if (!in
->item_caps
.is_on_list()) {
3034 need_snapflush_inodes
.push_back(&in
->item_caps
);
3035 utime_t now
= ceph_clock_now();
3036 in
->last_dirstat_prop
= now
;
3037 dout(10) << "mark_need_snapflush_inode " << *in
<< " - added at " << now
<< dendl
;
3041 bool Locker::is_revoking_any_caps_from(client_t client
)
3043 auto it
= revoking_caps_by_client
.find(client
);
3044 if (it
== revoking_caps_by_client
.end())
3046 return !it
->second
.empty();
3049 void Locker::_do_null_snapflush(CInode
*head_in
, client_t client
, snapid_t last
)
3051 dout(10) << "_do_null_snapflush client." << client
<< " on " << *head_in
<< dendl
;
3052 for (auto p
= head_in
->client_need_snapflush
.begin();
3053 p
!= head_in
->client_need_snapflush
.end() && p
->first
< last
; ) {
3054 snapid_t snapid
= p
->first
;
3055 auto &clients
= p
->second
;
3056 ++p
; // be careful, q loop below depends on this
3058 if (clients
.count(client
)) {
3059 dout(10) << " doing async NULL snapflush on " << snapid
<< " from client." << client
<< dendl
;
3060 CInode
*sin
= mdcache
->pick_inode_snap(head_in
, snapid
- 1);
3062 ceph_assert(sin
->first
<= snapid
);
3063 _do_snap_update(sin
, snapid
, 0, sin
->first
- 1, client
, ref_t
<MClientCaps
>(), ref_t
<MClientCaps
>());
3064 head_in
->remove_need_snapflush(sin
, snapid
, client
);
3070 bool Locker::should_defer_client_cap_frozen(CInode
*in
)
3072 if (in
->is_frozen())
3076 * This policy needs to be AT LEAST as permissive as allowing a client
3077 * request to go forward, or else a client request can release something,
3078 * the release gets deferred, but the request gets processed and deadlocks
3079 * because when the caps can't get revoked.
3081 * No auth_pin implies that there is no unstable lock and @in is not auth
3082 * pinnned by client request. If parent dirfrag is auth pinned by a lock
3083 * cache, later request from lock cache owner may forcibly auth pin the @in.
3085 if (in
->is_freezing() && in
->get_num_auth_pins() == 0) {
3086 CDir
* dir
= in
->get_parent_dir();
3087 if (!dir
|| !dir
->is_auth_pinned_by_lock_cache())
3093 void Locker::handle_client_caps(const cref_t
<MClientCaps
> &m
)
3095 client_t client
= m
->get_source().num();
3096 snapid_t follows
= m
->get_snap_follows();
3097 auto op
= m
->get_op();
3098 auto dirty
= m
->get_dirty();
3099 dout(7) << "handle_client_caps "
3100 << " on " << m
->get_ino()
3101 << " tid " << m
->get_client_tid() << " follows " << follows
3102 << " op " << ceph_cap_op_name(op
)
3103 << " flags 0x" << std::hex
<< m
->flags
<< std::dec
<< dendl
;
3105 Session
*session
= mds
->get_session(m
);
3106 if (!mds
->is_clientreplay() && !mds
->is_active() && !mds
->is_stopping()) {
3108 dout(5) << " no session, dropping " << *m
<< dendl
;
3111 if (session
->is_closed() ||
3112 session
->is_closing() ||
3113 session
->is_killing()) {
3114 dout(7) << " session closed|closing|killing, dropping " << *m
<< dendl
;
3117 if ((mds
->is_reconnect() || mds
->get_want_state() == MDSMap::STATE_RECONNECT
) &&
3118 dirty
&& m
->get_client_tid() > 0 &&
3119 !session
->have_completed_flush(m
->get_client_tid())) {
3120 mdcache
->set_reconnected_dirty_caps(client
, m
->get_ino(), dirty
,
3121 op
== CEPH_CAP_OP_FLUSHSNAP
);
3123 mds
->wait_for_replay(new C_MDS_RetryMessage(mds
, m
));
3127 if (mds
->logger
) mds
->logger
->inc(l_mdss_handle_client_caps
);
3129 if (mds
->logger
) mds
->logger
->inc(l_mdss_handle_client_caps_dirty
);
3132 if (m
->get_client_tid() > 0 && session
&&
3133 session
->have_completed_flush(m
->get_client_tid())) {
3134 dout(7) << "handle_client_caps already flushed tid " << m
->get_client_tid()
3135 << " for client." << client
<< dendl
;
3136 ref_t
<MClientCaps
> ack
;
3137 if (op
== CEPH_CAP_OP_FLUSHSNAP
) {
3138 if (mds
->logger
) mds
->logger
->inc(l_mdss_ceph_cap_op_flushsnap_ack
);
3139 ack
= make_message
<MClientCaps
>(CEPH_CAP_OP_FLUSHSNAP_ACK
, m
->get_ino(), 0, 0, 0, 0, 0, dirty
, 0, mds
->get_osd_epoch_barrier());
3141 if (mds
->logger
) mds
->logger
->inc(l_mdss_ceph_cap_op_flush_ack
);
3142 ack
= make_message
<MClientCaps
>(CEPH_CAP_OP_FLUSH_ACK
, m
->get_ino(), 0, m
->get_cap_id(), m
->get_seq(), m
->get_caps(), 0, dirty
, 0, mds
->get_osd_epoch_barrier());
3144 ack
->set_snap_follows(follows
);
3145 ack
->set_client_tid(m
->get_client_tid());
3146 mds
->send_message_client_counted(ack
, m
->get_connection());
3147 if (op
== CEPH_CAP_OP_FLUSHSNAP
) {
3150 // fall-thru because the message may release some caps
3152 op
= CEPH_CAP_OP_UPDATE
;
3156 // "oldest flush tid" > 0 means client uses unique TID for each flush
3157 if (m
->get_oldest_flush_tid() > 0 && session
) {
3158 if (session
->trim_completed_flushes(m
->get_oldest_flush_tid())) {
3159 mds
->mdlog
->get_current_segment()->touched_sessions
.insert(session
->info
.inst
.name
);
3161 if (session
->get_num_trim_flushes_warnings() > 0 &&
3162 session
->get_num_completed_flushes() * 2 < g_conf()->mds_max_completed_flushes
)
3163 session
->reset_num_trim_flushes_warnings();
3165 if (session
->get_num_completed_flushes() >=
3166 (g_conf()->mds_max_completed_flushes
<< session
->get_num_trim_flushes_warnings())) {
3167 session
->inc_num_trim_flushes_warnings();
3168 CachedStackStringStream css
;
3169 *css
<< "client." << session
->get_client() << " does not advance its oldest_flush_tid ("
3170 << m
->get_oldest_flush_tid() << "), "
3171 << session
->get_num_completed_flushes()
3172 << " completed flushes recorded in session";
3173 mds
->clog
->warn() << css
->strv();
3174 dout(20) << __func__
<< " " << css
->strv() << dendl
;
3179 CInode
*head_in
= mdcache
->get_inode(m
->get_ino());
3181 if (mds
->is_clientreplay()) {
3182 dout(7) << "handle_client_caps on unknown ino " << m
->get_ino()
3183 << ", will try again after replayed client requests" << dendl
;
3184 mdcache
->wait_replay_cap_reconnect(m
->get_ino(), new C_MDS_RetryMessage(mds
, m
));
3189 * "handle_client_caps on unknown ino xxx” is normal after migrating a subtree
3190 * Sequence of events that cause this are:
3191 * - client sends caps message to mds.a
3192 * - mds finishes subtree migration, send cap export to client
3193 * - mds trim its cache
3194 * - mds receives cap messages from client
3196 dout(7) << "handle_client_caps on unknown ino " << m
->get_ino() << ", dropping" << dendl
;
3200 if (m
->osd_epoch_barrier
&& !mds
->objecter
->have_map(m
->osd_epoch_barrier
)) {
3201 // Pause RADOS operations until we see the required epoch
3202 mds
->objecter
->set_epoch_barrier(m
->osd_epoch_barrier
);
3205 if (mds
->get_osd_epoch_barrier() < m
->osd_epoch_barrier
) {
3206 // Record the barrier so that we will retransmit it to clients
3207 mds
->set_osd_epoch_barrier(m
->osd_epoch_barrier
);
3210 dout(10) << " head inode " << *head_in
<< dendl
;
3212 Capability
*cap
= 0;
3213 cap
= head_in
->get_client_cap(client
);
3215 dout(7) << "handle_client_caps no cap for client." << client
<< " on " << *head_in
<< dendl
;
3221 if (should_defer_client_cap_frozen(head_in
)) {
3222 dout(7) << "handle_client_caps freezing|frozen on " << *head_in
<< dendl
;
3223 head_in
->add_waiter(CInode::WAIT_UNFREEZE
, new C_MDS_RetryMessage(mds
, m
));
3226 if (ceph_seq_cmp(m
->get_mseq(), cap
->get_mseq()) < 0) {
3227 dout(7) << "handle_client_caps mseq " << m
->get_mseq() << " < " << cap
->get_mseq()
3228 << ", dropping" << dendl
;
3232 bool need_unpin
= false;
3235 if (op
== CEPH_CAP_OP_FLUSHSNAP
) {
3236 if (!head_in
->is_auth()) {
3237 dout(7) << " not auth, ignoring flushsnap on " << *head_in
<< dendl
;
3241 SnapRealm
*realm
= head_in
->find_snaprealm();
3242 snapid_t snap
= realm
->get_snap_following(follows
);
3243 dout(10) << " flushsnap follows " << follows
<< " -> snap " << snap
<< dendl
;
3245 auto p
= head_in
->client_need_snapflush
.begin();
3246 if (p
!= head_in
->client_need_snapflush
.end() && p
->first
< snap
) {
3247 head_in
->auth_pin(this); // prevent subtree frozen
3249 _do_null_snapflush(head_in
, client
, snap
);
3252 CInode
*in
= head_in
;
3253 if (snap
!= CEPH_NOSNAP
) {
3254 in
= mdcache
->pick_inode_snap(head_in
, snap
- 1);
3256 dout(10) << " snapped inode " << *in
<< dendl
;
3259 // we can prepare the ack now, since this FLUSHEDSNAP is independent of any
3260 // other cap ops. (except possibly duplicate FLUSHSNAP requests, but worst
3261 // case we get a dup response, so whatever.)
3262 ref_t
<MClientCaps
> ack
;
3264 ack
= make_message
<MClientCaps
>(CEPH_CAP_OP_FLUSHSNAP_ACK
, in
->ino(), 0, 0, 0, 0, 0, dirty
, 0, mds
->get_osd_epoch_barrier());
3265 ack
->set_snap_follows(follows
);
3266 ack
->set_client_tid(m
->get_client_tid());
3267 ack
->set_oldest_flush_tid(m
->get_oldest_flush_tid());
3270 if (in
== head_in
||
3271 (head_in
->client_need_snapflush
.count(snap
) &&
3272 head_in
->client_need_snapflush
[snap
].count(client
))) {
3273 dout(7) << " flushsnap snap " << snap
3274 << " client." << client
<< " on " << *in
<< dendl
;
3276 // this cap now follows a later snap (i.e. the one initiating this flush, or later)
3278 cap
->client_follows
= snap
< CEPH_NOSNAP
? snap
: realm
->get_newest_seq();
3280 _do_snap_update(in
, snap
, dirty
, follows
, client
, m
, ack
);
3283 head_in
->remove_need_snapflush(in
, snap
, client
);
3285 dout(7) << " not expecting flushsnap " << snap
<< " from client." << client
<< " on " << *in
<< dendl
;
3287 if (mds
->logger
) mds
->logger
->inc(l_mdss_ceph_cap_op_flushsnap_ack
);
3288 mds
->send_message_client_counted(ack
, m
->get_connection());
3294 if (cap
->get_cap_id() != m
->get_cap_id()) {
3295 dout(7) << " ignoring client capid " << m
->get_cap_id() << " != my " << cap
->get_cap_id() << dendl
;
3297 CInode
*in
= head_in
;
3299 in
= mdcache
->pick_inode_snap(head_in
, follows
);
3300 // intermediate snap inodes
3301 while (in
!= head_in
) {
3302 ceph_assert(in
->last
!= CEPH_NOSNAP
);
3303 if (in
->is_auth() && dirty
) {
3304 dout(10) << " updating intermediate snapped inode " << *in
<< dendl
;
3305 _do_cap_update(in
, NULL
, dirty
, follows
, m
, ref_t
<MClientCaps
>());
3307 in
= mdcache
->pick_inode_snap(head_in
, in
->last
);
3311 // head inode, and cap
3312 ref_t
<MClientCaps
> ack
;
3314 int caps
= m
->get_caps();
3315 if (caps
& ~cap
->issued()) {
3316 dout(10) << " confirming not issued caps " << ccap_string(caps
& ~cap
->issued()) << dendl
;
3317 caps
&= cap
->issued();
3320 int revoked
= cap
->confirm_receipt(m
->get_seq(), caps
);
3321 dout(10) << " follows " << follows
3322 << " retains " << ccap_string(m
->get_caps())
3323 << " dirty " << ccap_string(dirty
)
3324 << " on " << *in
<< dendl
;
3326 if (revoked
& CEPH_CAP_ANY_DIR_OPS
)
3327 eval_lock_caches(cap
);
3329 // missing/skipped snapflush?
3330 // The client MAY send a snapflush if it is issued WR/EXCL caps, but
3331 // presently only does so when it has actual dirty metadata. But, we
3332 // set up the need_snapflush stuff based on the issued caps.
3333 // We can infer that the client WONT send a FLUSHSNAP once they have
3334 // released all WR/EXCL caps (the FLUSHSNAP always comes before the cap
3336 if (!head_in
->client_need_snapflush
.empty()) {
3337 if (!(cap
->issued() & CEPH_CAP_ANY_FILE_WR
) &&
3338 !(m
->flags
& MClientCaps::FLAG_PENDING_CAPSNAP
)) {
3339 head_in
->auth_pin(this); // prevent subtree frozen
3341 _do_null_snapflush(head_in
, client
);
3343 dout(10) << " revocation in progress, not making any conclusions about null snapflushes" << dendl
;
3346 if (cap
->need_snapflush() && !(m
->flags
& MClientCaps::FLAG_PENDING_CAPSNAP
))
3347 cap
->clear_needsnapflush();
3349 if (dirty
&& in
->is_auth()) {
3350 dout(7) << " flush client." << client
<< " dirty " << ccap_string(dirty
)
3351 << " seq " << m
->get_seq() << " on " << *in
<< dendl
;
3352 ack
= make_message
<MClientCaps
>(CEPH_CAP_OP_FLUSH_ACK
, in
->ino(), 0, cap
->get_cap_id(), m
->get_seq(),
3353 m
->get_caps(), 0, dirty
, 0, mds
->get_osd_epoch_barrier());
3354 ack
->set_client_tid(m
->get_client_tid());
3355 ack
->set_oldest_flush_tid(m
->get_oldest_flush_tid());
3358 // filter wanted based on what we could ever give out (given auth/replica status)
3359 bool need_flush
= m
->flags
& MClientCaps::FLAG_SYNC
;
3360 int new_wanted
= m
->get_wanted();
3361 if (new_wanted
!= cap
->wanted()) {
3362 if (!need_flush
&& in
->is_auth() && (new_wanted
& ~cap
->pending())) {
3363 // exapnding caps. make sure we aren't waiting for a log flush
3364 need_flush
= _need_flush_mdlog(head_in
, new_wanted
& ~cap
->pending());
3367 adjust_cap_wanted(cap
, new_wanted
, m
->get_issue_seq());
3370 if (in
->is_auth() &&
3371 _do_cap_update(in
, cap
, dirty
, follows
, m
, ack
, &need_flush
)) {
3373 eval(in
, CEPH_CAP_LOCKS
);
3375 if (!need_flush
&& (cap
->wanted() & ~cap
->pending()))
3376 need_flush
= _need_flush_mdlog(in
, cap
->wanted() & ~cap
->pending());
3378 // no update, ack now.
3380 if (mds
->logger
) mds
->logger
->inc(l_mdss_ceph_cap_op_flush_ack
);
3381 mds
->send_message_client_counted(ack
, m
->get_connection());
3384 bool did_issue
= eval(in
, CEPH_CAP_LOCKS
);
3385 if (!did_issue
&& (cap
->wanted() & ~cap
->pending()))
3386 issue_caps(in
, cap
);
3388 if (cap
->get_last_seq() == 0 &&
3389 (cap
->pending() & (CEPH_CAP_FILE_WR
|CEPH_CAP_FILE_BUFFER
))) {
3390 share_inode_max_size(in
, cap
);
3395 mds
->mdlog
->flush();
3400 head_in
->auth_unpin(this);
3404 class C_Locker_RetryRequestCapRelease
: public LockerContext
{
3406 ceph_mds_request_release item
;
3408 C_Locker_RetryRequestCapRelease(Locker
*l
, client_t c
, const ceph_mds_request_release
& it
) :
3409 LockerContext(l
), client(c
), item(it
) { }
3410 void finish(int r
) override
{
3412 MDRequestRef null_ref
;
3413 locker
->process_request_cap_release(null_ref
, client
, item
, dname
);
3417 void Locker::process_request_cap_release(MDRequestRef
& mdr
, client_t client
, const ceph_mds_request_release
& item
,
3418 std::string_view dname
)
3420 inodeno_t ino
= (uint64_t)item
.ino
;
3421 uint64_t cap_id
= item
.cap_id
;
3422 int caps
= item
.caps
;
3423 int wanted
= item
.wanted
;
3425 int issue_seq
= item
.issue_seq
;
3426 int mseq
= item
.mseq
;
3428 CInode
*in
= mdcache
->get_inode(ino
);
3432 if (dname
.length()) {
3433 frag_t fg
= in
->pick_dirfrag(dname
);
3434 CDir
*dir
= in
->get_dirfrag(fg
);
3436 CDentry
*dn
= dir
->lookup(dname
);
3438 ClientLease
*l
= dn
->get_client_lease(client
);
3440 dout(10) << __func__
<< " removing lease on " << *dn
<< dendl
;
3441 dn
->remove_client_lease(l
, this);
3443 dout(7) << __func__
<< " client." << client
3444 << " doesn't have lease on " << *dn
<< dendl
;
3447 dout(7) << __func__
<< " client." << client
<< " released lease on dn "
3448 << dir
->dirfrag() << "/" << dname
<< " which dne" << dendl
;
3453 Capability
*cap
= in
->get_client_cap(client
);
3457 dout(10) << __func__
<< " client." << client
<< " " << ccap_string(caps
) << " on " << *in
3458 << (mdr
? "" : " (DEFERRED, no mdr)")
3461 if (ceph_seq_cmp(mseq
, cap
->get_mseq()) < 0) {
3462 dout(7) << " mseq " << mseq
<< " < " << cap
->get_mseq() << ", dropping" << dendl
;
3466 if (cap
->get_cap_id() != cap_id
) {
3467 dout(7) << " cap_id " << cap_id
<< " != " << cap
->get_cap_id() << ", dropping" << dendl
;
3471 if (should_defer_client_cap_frozen(in
)) {
3472 dout(7) << " frozen, deferring" << dendl
;
3473 in
->add_waiter(CInode::WAIT_UNFREEZE
, new C_Locker_RetryRequestCapRelease(this, client
, item
));
3477 if (mds
->logger
) mds
->logger
->inc(l_mdss_process_request_cap_release
);
3479 if (caps
& ~cap
->issued()) {
3480 dout(10) << " confirming not issued caps " << ccap_string(caps
& ~cap
->issued()) << dendl
;
3481 caps
&= cap
->issued();
3483 int revoked
= cap
->confirm_receipt(seq
, caps
);
3484 if (revoked
& CEPH_CAP_ANY_DIR_OPS
)
3485 eval_lock_caches(cap
);
3487 if (!in
->client_need_snapflush
.empty() &&
3488 (cap
->issued() & CEPH_CAP_ANY_FILE_WR
) == 0) {
3489 _do_null_snapflush(in
, client
);
3492 adjust_cap_wanted(cap
, wanted
, issue_seq
);
3495 cap
->inc_suppress();
3496 eval(in
, CEPH_CAP_LOCKS
);
3498 cap
->dec_suppress();
3500 // take note; we may need to reissue on this cap later
3502 mdr
->cap_releases
[in
->vino()] = cap
->get_last_seq();
3505 class C_Locker_RetryKickIssueCaps
: public LockerContext
{
3510 C_Locker_RetryKickIssueCaps(Locker
*l
, CInode
*i
, client_t c
, ceph_seq_t s
) :
3511 LockerContext(l
), in(i
), client(c
), seq(s
) {
3512 in
->get(CInode::PIN_PTRWAITER
);
3514 void finish(int r
) override
{
3515 locker
->kick_issue_caps(in
, client
, seq
);
3516 in
->put(CInode::PIN_PTRWAITER
);
3520 void Locker::kick_issue_caps(CInode
*in
, client_t client
, ceph_seq_t seq
)
3522 Capability
*cap
= in
->get_client_cap(client
);
3523 if (!cap
|| cap
->get_last_seq() != seq
)
3525 if (in
->is_frozen()) {
3526 dout(10) << "kick_issue_caps waiting for unfreeze on " << *in
<< dendl
;
3527 in
->add_waiter(CInode::WAIT_UNFREEZE
,
3528 new C_Locker_RetryKickIssueCaps(this, in
, client
, seq
));
3531 dout(10) << "kick_issue_caps released at current seq " << seq
3532 << ", reissuing" << dendl
;
3533 issue_caps(in
, cap
);
3536 void Locker::kick_cap_releases(MDRequestRef
& mdr
)
3538 client_t client
= mdr
->get_client();
3539 for (map
<vinodeno_t
,ceph_seq_t
>::iterator p
= mdr
->cap_releases
.begin();
3540 p
!= mdr
->cap_releases
.end();
3542 CInode
*in
= mdcache
->get_inode(p
->first
);
3545 kick_issue_caps(in
, client
, p
->second
);
3550 * m and ack might be NULL, so don't dereference them unless dirty != 0
3552 void Locker::_do_snap_update(CInode
*in
, snapid_t snap
, int dirty
, snapid_t follows
, client_t client
, const cref_t
<MClientCaps
> &m
, const ref_t
<MClientCaps
> &ack
)
3554 dout(10) << "_do_snap_update dirty " << ccap_string(dirty
)
3555 << " follows " << follows
<< " snap " << snap
3556 << " on " << *in
<< dendl
;
3558 if (snap
== CEPH_NOSNAP
) {
3559 // hmm, i guess snap was already deleted? just ack!
3560 dout(10) << " wow, the snap following " << follows
3561 << " was already deleted. nothing to record, just ack." << dendl
;
3563 if (ack
->get_op() == CEPH_CAP_OP_FLUSHSNAP_ACK
) {
3564 if (mds
->logger
) mds
->logger
->inc(l_mdss_ceph_cap_op_flushsnap_ack
);
3566 mds
->send_message_client_counted(ack
, m
->get_connection());
3571 EUpdate
*le
= new EUpdate(mds
->mdlog
, "snap flush");
3572 mds
->mdlog
->start_entry(le
);
3573 MutationRef mut
= new MutationImpl();
3574 mut
->ls
= mds
->mdlog
->get_current_segment();
3576 // normal metadata updates that we can apply to the head as well.
3579 CInode::mempool_xattr_map
*px
= nullptr;
3580 bool xattrs
= (dirty
& CEPH_CAP_XATTR_EXCL
) &&
3581 m
->xattrbl
.length() &&
3582 m
->head
.xattr_version
> in
->get_projected_inode()->xattr_version
;
3584 CInode::mempool_old_inode
*oi
= nullptr;
3585 CInode::old_inode_map_ptr _old_inodes
;
3586 if (in
->is_any_old_inodes()) {
3587 auto last
= in
->pick_old_inode(snap
);
3589 _old_inodes
= CInode::allocate_old_inode_map(*in
->get_old_inodes());
3590 oi
= &_old_inodes
->at(last
);
3591 if (snap
> oi
->first
) {
3592 (*_old_inodes
)[snap
- 1] = *oi
;;
3598 CInode::mempool_inode
*i
;
3600 dout(10) << " writing into old inode" << dendl
;
3601 auto pi
= in
->project_inode(mut
);
3602 pi
.inode
->version
= in
->pre_dirty();
3607 auto pi
= in
->project_inode(mut
, xattrs
);
3608 pi
.inode
->version
= in
->pre_dirty();
3611 px
= pi
.xattrs
.get();
3614 _update_cap_fields(in
, dirty
, m
, i
);
3618 dout(7) << " xattrs v" << i
->xattr_version
<< " -> " << m
->head
.xattr_version
3619 << " len " << m
->xattrbl
.length() << dendl
;
3620 i
->xattr_version
= m
->head
.xattr_version
;
3621 auto p
= m
->xattrbl
.cbegin();
3626 auto it
= i
->client_ranges
.find(client
);
3627 if (it
!= i
->client_ranges
.end()) {
3628 if (in
->last
== snap
) {
3629 dout(10) << " removing client_range entirely" << dendl
;
3630 i
->client_ranges
.erase(it
);
3632 dout(10) << " client_range now follows " << snap
<< dendl
;
3633 it
->second
.follows
= snap
;
3639 in
->reset_old_inodes(std::move(_old_inodes
));
3642 mdcache
->predirty_journal_parents(mut
, &le
->metablob
, in
, 0, PREDIRTY_PRIMARY
, 0, follows
);
3643 mdcache
->journal_dirty_inode(mut
.get(), &le
->metablob
, in
, follows
);
3645 // "oldest flush tid" > 0 means client uses unique TID for each flush
3646 if (ack
&& ack
->get_oldest_flush_tid() > 0)
3647 le
->metablob
.add_client_flush(metareqid_t(m
->get_source(), ack
->get_client_tid()),
3648 ack
->get_oldest_flush_tid());
3650 mds
->mdlog
->submit_entry(le
, new C_Locker_FileUpdate_finish(this, in
, mut
, UPDATE_SNAPFLUSH
,
3654 void Locker::_update_cap_fields(CInode
*in
, int dirty
, const cref_t
<MClientCaps
> &m
, CInode::mempool_inode
*pi
)
3659 /* m must be valid if there are dirty caps */
3661 uint64_t features
= m
->get_connection()->get_features();
3663 if (m
->get_ctime() > pi
->ctime
) {
3664 dout(7) << " ctime " << pi
->ctime
<< " -> " << m
->get_ctime()
3665 << " for " << *in
<< dendl
;
3666 pi
->ctime
= m
->get_ctime();
3667 if (m
->get_ctime() > pi
->rstat
.rctime
)
3668 pi
->rstat
.rctime
= m
->get_ctime();
3671 if ((features
& CEPH_FEATURE_FS_CHANGE_ATTR
) &&
3672 m
->get_change_attr() > pi
->change_attr
) {
3673 dout(7) << " change_attr " << pi
->change_attr
<< " -> " << m
->get_change_attr()
3674 << " for " << *in
<< dendl
;
3675 pi
->change_attr
= m
->get_change_attr();
3679 if (dirty
& (CEPH_CAP_FILE_EXCL
|CEPH_CAP_FILE_WR
)) {
3680 utime_t atime
= m
->get_atime();
3681 utime_t mtime
= m
->get_mtime();
3682 uint64_t size
= m
->get_size();
3683 version_t inline_version
= m
->inline_version
;
3685 if (((dirty
& CEPH_CAP_FILE_WR
) && mtime
> pi
->mtime
) ||
3686 ((dirty
& CEPH_CAP_FILE_EXCL
) && mtime
!= pi
->mtime
)) {
3687 dout(7) << " mtime " << pi
->mtime
<< " -> " << mtime
3688 << " for " << *in
<< dendl
;
3690 if (mtime
> pi
->rstat
.rctime
)
3691 pi
->rstat
.rctime
= mtime
;
3693 if (in
->is_file() && // ONLY if regular file
3695 dout(7) << " size " << pi
->size
<< " -> " << size
3696 << " for " << *in
<< dendl
;
3698 pi
->rstat
.rbytes
= size
;
3700 if (in
->is_file() &&
3701 (dirty
& CEPH_CAP_FILE_WR
) &&
3702 inline_version
> pi
->inline_data
.version
) {
3703 pi
->inline_data
.version
= inline_version
;
3704 if (inline_version
!= CEPH_INLINE_NONE
&& m
->inline_data
.length() > 0)
3705 pi
->inline_data
.set_data(m
->inline_data
);
3707 pi
->inline_data
.free_data();
3709 if ((dirty
& CEPH_CAP_FILE_EXCL
) && atime
!= pi
->atime
) {
3710 dout(7) << " atime " << pi
->atime
<< " -> " << atime
3711 << " for " << *in
<< dendl
;
3714 if ((dirty
& CEPH_CAP_FILE_EXCL
) &&
3715 ceph_seq_cmp(pi
->time_warp_seq
, m
->get_time_warp_seq()) < 0) {
3716 dout(7) << " time_warp_seq " << pi
->time_warp_seq
<< " -> " << m
->get_time_warp_seq()
3717 << " for " << *in
<< dendl
;
3718 pi
->time_warp_seq
= m
->get_time_warp_seq();
3722 if (dirty
& CEPH_CAP_AUTH_EXCL
) {
3723 if (m
->head
.uid
!= pi
->uid
) {
3724 dout(7) << " uid " << pi
->uid
3725 << " -> " << m
->head
.uid
3726 << " for " << *in
<< dendl
;
3727 pi
->uid
= m
->head
.uid
;
3729 if (m
->head
.gid
!= pi
->gid
) {
3730 dout(7) << " gid " << pi
->gid
3731 << " -> " << m
->head
.gid
3732 << " for " << *in
<< dendl
;
3733 pi
->gid
= m
->head
.gid
;
3735 if (m
->head
.mode
!= pi
->mode
) {
3736 dout(7) << " mode " << oct
<< pi
->mode
3737 << " -> " << m
->head
.mode
<< dec
3738 << " for " << *in
<< dendl
;
3739 pi
->mode
= m
->head
.mode
;
3741 if ((features
& CEPH_FEATURE_FS_BTIME
) && m
->get_btime() != pi
->btime
) {
3742 dout(7) << " btime " << oct
<< pi
->btime
3743 << " -> " << m
->get_btime() << dec
3744 << " for " << *in
<< dendl
;
3745 pi
->btime
= m
->get_btime();
3751 * update inode based on cap flush|flushsnap|wanted.
3752 * adjust max_size, if needed.
3753 * if we update, return true; otherwise, false (no updated needed).
3755 bool Locker::_do_cap_update(CInode
*in
, Capability
*cap
,
3756 int dirty
, snapid_t follows
,
3757 const cref_t
<MClientCaps
> &m
, const ref_t
<MClientCaps
> &ack
,
3760 dout(10) << "_do_cap_update dirty " << ccap_string(dirty
)
3761 << " issued " << ccap_string(cap
? cap
->issued() : 0)
3762 << " wanted " << ccap_string(cap
? cap
->wanted() : 0)
3763 << " on " << *in
<< dendl
;
3764 ceph_assert(in
->is_auth());
3765 client_t client
= m
->get_source().num();
3766 const auto& latest
= in
->get_projected_inode();
3768 // increase or zero max_size?
3769 uint64_t size
= m
->get_size();
3770 bool change_max
= false;
3771 uint64_t old_max
= latest
->get_client_range(client
);
3772 uint64_t new_max
= old_max
;
3774 if (in
->is_file()) {
3775 bool forced_change_max
= false;
3776 dout(20) << "inode is file" << dendl
;
3777 if (cap
&& ((cap
->issued() | cap
->wanted()) & CEPH_CAP_ANY_FILE_WR
)) {
3778 dout(20) << "client has write caps; m->get_max_size="
3779 << m
->get_max_size() << "; old_max=" << old_max
<< dendl
;
3780 if (m
->get_max_size() > new_max
) {
3781 dout(10) << "client requests file_max " << m
->get_max_size()
3782 << " > max " << old_max
<< dendl
;
3784 forced_change_max
= true;
3785 new_max
= calc_new_max_size(latest
, m
->get_max_size());
3787 new_max
= calc_new_max_size(latest
, size
);
3789 if (new_max
> old_max
)
3801 if (in
->last
== CEPH_NOSNAP
&&
3803 !in
->filelock
.can_wrlock(client
) &&
3804 !in
->filelock
.can_force_wrlock(client
)) {
3805 dout(10) << " i want to change file_max, but lock won't allow it (yet)" << dendl
;
3806 if (in
->filelock
.is_stable()) {
3807 bool need_issue
= false;
3809 cap
->inc_suppress();
3810 if (in
->get_mds_caps_wanted().empty() &&
3811 (in
->get_loner() >= 0 || (in
->get_wanted_loner() >= 0 && in
->try_set_loner()))) {
3812 if (in
->filelock
.get_state() != LOCK_EXCL
)
3813 file_excl(&in
->filelock
, &need_issue
);
3815 simple_lock(&in
->filelock
, &need_issue
);
3819 cap
->dec_suppress();
3821 if (!in
->filelock
.can_wrlock(client
) &&
3822 !in
->filelock
.can_force_wrlock(client
)) {
3823 C_MDL_CheckMaxSize
*cms
= new C_MDL_CheckMaxSize(this, in
,
3824 forced_change_max
? new_max
: 0,
3827 in
->filelock
.add_waiter(SimpleLock::WAIT_STABLE
, cms
);
3833 if (m
->flockbl
.length()) {
3835 auto bli
= m
->flockbl
.cbegin();
3836 decode(num_locks
, bli
);
3837 for ( int i
=0; i
< num_locks
; ++i
) {
3838 ceph_filelock decoded_lock
;
3839 decode(decoded_lock
, bli
);
3840 in
->get_fcntl_lock_state()->held_locks
.
3841 insert(pair
<uint64_t, ceph_filelock
>(decoded_lock
.start
, decoded_lock
));
3842 ++in
->get_fcntl_lock_state()->client_held_lock_counts
[(client_t
)(decoded_lock
.client
)];
3844 decode(num_locks
, bli
);
3845 for ( int i
=0; i
< num_locks
; ++i
) {
3846 ceph_filelock decoded_lock
;
3847 decode(decoded_lock
, bli
);
3848 in
->get_flock_lock_state()->held_locks
.
3849 insert(pair
<uint64_t, ceph_filelock
>(decoded_lock
.start
, decoded_lock
));
3850 ++in
->get_flock_lock_state()->client_held_lock_counts
[(client_t
)(decoded_lock
.client
)];
3854 if (!dirty
&& !change_max
)
3857 Session
*session
= mds
->get_session(m
);
3858 if (session
->check_access(in
, MAY_WRITE
,
3859 m
->caller_uid
, m
->caller_gid
, NULL
, 0, 0) < 0) {
3860 dout(10) << "check_access failed, dropping cap update on " << *in
<< dendl
;
3865 EUpdate
*le
= new EUpdate(mds
->mdlog
, "cap update");
3866 mds
->mdlog
->start_entry(le
);
3868 bool xattr
= (dirty
& CEPH_CAP_XATTR_EXCL
) &&
3869 m
->xattrbl
.length() &&
3870 m
->head
.xattr_version
> in
->get_projected_inode()->xattr_version
;
3872 MutationRef
mut(new MutationImpl());
3873 mut
->ls
= mds
->mdlog
->get_current_segment();
3875 auto pi
= in
->project_inode(mut
, xattr
);
3876 pi
.inode
->version
= in
->pre_dirty();
3878 _update_cap_fields(in
, dirty
, m
, pi
.inode
.get());
3881 dout(7) << " max_size " << old_max
<< " -> " << new_max
3882 << " for " << *in
<< dendl
;
3884 auto &cr
= pi
.inode
->client_ranges
[client
];
3886 cr
.range
.last
= new_max
;
3887 cr
.follows
= in
->first
- 1;
3888 in
->mark_clientwriteable();
3890 cap
->mark_clientwriteable();
3892 pi
.inode
->client_ranges
.erase(client
);
3893 if (pi
.inode
->client_ranges
.empty())
3894 in
->clear_clientwriteable();
3896 cap
->clear_clientwriteable();
3900 if (change_max
|| (dirty
& (CEPH_CAP_FILE_EXCL
|CEPH_CAP_FILE_WR
)))
3901 wrlock_force(&in
->filelock
, mut
); // wrlock for duration of journal
3904 if (dirty
& CEPH_CAP_AUTH_EXCL
)
3905 wrlock_force(&in
->authlock
, mut
);
3909 dout(7) << " xattrs v" << pi
.inode
->xattr_version
<< " -> " << m
->head
.xattr_version
<< dendl
;
3910 pi
.inode
->xattr_version
= m
->head
.xattr_version
;
3911 auto p
= m
->xattrbl
.cbegin();
3912 decode_noshare(*pi
.xattrs
, p
);
3913 wrlock_force(&in
->xattrlock
, mut
);
3917 mdcache
->predirty_journal_parents(mut
, &le
->metablob
, in
, 0, PREDIRTY_PRIMARY
, 0, follows
);
3918 mdcache
->journal_dirty_inode(mut
.get(), &le
->metablob
, in
, follows
);
3920 // "oldest flush tid" > 0 means client uses unique TID for each flush
3921 if (ack
&& ack
->get_oldest_flush_tid() > 0)
3922 le
->metablob
.add_client_flush(metareqid_t(m
->get_source(), ack
->get_client_tid()),
3923 ack
->get_oldest_flush_tid());
3925 unsigned update_flags
= 0;
3927 update_flags
|= UPDATE_SHAREMAX
;
3929 update_flags
|= UPDATE_NEEDSISSUE
;
3930 mds
->mdlog
->submit_entry(le
, new C_Locker_FileUpdate_finish(this, in
, mut
, update_flags
,
3932 if (need_flush
&& !*need_flush
&&
3933 ((change_max
&& new_max
) || // max INCREASE
3934 _need_flush_mdlog(in
, dirty
)))
3940 void Locker::handle_client_cap_release(const cref_t
<MClientCapRelease
> &m
)
3942 client_t client
= m
->get_source().num();
3943 dout(10) << "handle_client_cap_release " << *m
<< dendl
;
3945 if (!mds
->is_clientreplay() && !mds
->is_active() && !mds
->is_stopping()) {
3946 mds
->wait_for_replay(new C_MDS_RetryMessage(mds
, m
));
3950 if (mds
->logger
) mds
->logger
->inc(l_mdss_handle_client_cap_release
);
3952 if (m
->osd_epoch_barrier
&& !mds
->objecter
->have_map(m
->osd_epoch_barrier
)) {
3953 // Pause RADOS operations until we see the required epoch
3954 mds
->objecter
->set_epoch_barrier(m
->osd_epoch_barrier
);
3957 if (mds
->get_osd_epoch_barrier() < m
->osd_epoch_barrier
) {
3958 // Record the barrier so that we will retransmit it to clients
3959 mds
->set_osd_epoch_barrier(m
->osd_epoch_barrier
);
3962 Session
*session
= mds
->get_session(m
);
3964 for (const auto &cap
: m
->caps
) {
3965 _do_cap_release(client
, inodeno_t((uint64_t)cap
.ino
) , cap
.cap_id
, cap
.migrate_seq
, cap
.seq
);
3969 session
->notify_cap_release(m
->caps
.size());
3973 class C_Locker_RetryCapRelease
: public LockerContext
{
3977 ceph_seq_t migrate_seq
;
3978 ceph_seq_t issue_seq
;
3980 C_Locker_RetryCapRelease(Locker
*l
, client_t c
, inodeno_t i
, uint64_t id
,
3981 ceph_seq_t mseq
, ceph_seq_t seq
) :
3982 LockerContext(l
), client(c
), ino(i
), cap_id(id
), migrate_seq(mseq
), issue_seq(seq
) {}
3983 void finish(int r
) override
{
3984 locker
->_do_cap_release(client
, ino
, cap_id
, migrate_seq
, issue_seq
);
3988 void Locker::_do_cap_release(client_t client
, inodeno_t ino
, uint64_t cap_id
,
3989 ceph_seq_t mseq
, ceph_seq_t seq
)
3991 CInode
*in
= mdcache
->get_inode(ino
);
3993 dout(7) << "_do_cap_release missing ino " << ino
<< dendl
;
3996 Capability
*cap
= in
->get_client_cap(client
);
3998 dout(7) << "_do_cap_release no cap for client" << client
<< " on "<< *in
<< dendl
;
4002 dout(7) << "_do_cap_release for client." << client
<< " on "<< *in
<< dendl
;
4003 if (cap
->get_cap_id() != cap_id
) {
4004 dout(7) << " capid " << cap_id
<< " != " << cap
->get_cap_id() << ", ignore" << dendl
;
4007 if (ceph_seq_cmp(mseq
, cap
->get_mseq()) < 0) {
4008 dout(7) << " mseq " << mseq
<< " < " << cap
->get_mseq() << ", ignore" << dendl
;
4011 if (should_defer_client_cap_frozen(in
)) {
4012 dout(7) << " freezing|frozen, deferring" << dendl
;
4013 in
->add_waiter(CInode::WAIT_UNFREEZE
,
4014 new C_Locker_RetryCapRelease(this, client
, ino
, cap_id
, mseq
, seq
));
4017 if (seq
!= cap
->get_last_issue()) {
4018 dout(7) << " issue_seq " << seq
<< " != " << cap
->get_last_issue() << dendl
;
4019 // clean out any old revoke history
4020 cap
->clean_revoke_from(seq
);
4021 eval_cap_gather(in
);
4024 remove_client_cap(in
, cap
);
4027 void Locker::remove_client_cap(CInode
*in
, Capability
*cap
, bool kill
)
4029 client_t client
= cap
->get_client();
4030 // clean out any pending snapflush state
4031 if (!in
->client_need_snapflush
.empty())
4032 _do_null_snapflush(in
, client
);
4034 while (!cap
->lock_caches
.empty()) {
4035 MDLockCache
* lock_cache
= cap
->lock_caches
.front();
4036 lock_cache
->client_cap
= nullptr;
4037 invalidate_lock_cache(lock_cache
);
4040 bool notable
= cap
->is_notable();
4041 in
->remove_client_cap(client
);
4045 if (in
->is_auth()) {
4046 // make sure we clear out the client byte range
4047 if (in
->get_projected_inode()->client_ranges
.count(client
) &&
4048 !(in
->get_inode()->nlink
== 0 && !in
->is_any_caps())) { // unless it's unlink + stray
4050 in
->state_set(CInode::STATE_NEEDSRECOVER
);
4052 check_inode_max_size(in
);
4055 request_inode_file_caps(in
);
4058 try_eval(in
, CEPH_CAP_LOCKS
);
4063 * Return true if any currently revoking caps exceed the
4064 * session_timeout threshold.
4066 bool Locker::any_late_revoking_caps(xlist
<Capability
*> const &revoking
,
4067 double timeout
) const
4069 xlist
<Capability
*>::const_iterator p
= revoking
.begin();
4071 // No revoking caps at the moment
4074 utime_t now
= ceph_clock_now();
4075 utime_t age
= now
- (*p
)->get_last_revoke_stamp();
4076 if (age
<= timeout
) {
4084 std::set
<client_t
> Locker::get_late_revoking_clients(double timeout
) const
4086 std::set
<client_t
> result
;
4088 if (any_late_revoking_caps(revoking_caps
, timeout
)) {
4089 // Slow path: execute in O(N_clients)
4090 for (auto &p
: revoking_caps_by_client
) {
4091 if (any_late_revoking_caps(p
.second
, timeout
)) {
4092 result
.insert(p
.first
);
4096 // Fast path: no misbehaving clients, execute in O(1)
4101 // Hard-code instead of surfacing a config settings because this is
4102 // really a hack that should go away at some point when we have better
4103 // inspection tools for getting at detailed cap state (#7316)
4104 #define MAX_WARN_CAPS 100
4106 void Locker::caps_tick()
4108 utime_t now
= ceph_clock_now();
4110 if (!need_snapflush_inodes
.empty()) {
4111 // snap inodes that needs flush are auth pinned, they affect
4112 // subtree/difrarg freeze.
4113 utime_t cutoff
= now
;
4114 cutoff
-= g_conf()->mds_freeze_tree_timeout
/ 3;
4116 CInode
*last
= need_snapflush_inodes
.back();
4117 while (!need_snapflush_inodes
.empty()) {
4118 CInode
*in
= need_snapflush_inodes
.front();
4119 if (in
->last_dirstat_prop
>= cutoff
)
4121 in
->item_caps
.remove_myself();
4122 snapflush_nudge(in
);
4128 dout(20) << __func__
<< " " << revoking_caps
.size() << " revoking caps" << dendl
;
4130 now
= ceph_clock_now();
4132 for (xlist
<Capability
*>::iterator p
= revoking_caps
.begin(); !p
.end(); ++p
) {
4133 Capability
*cap
= *p
;
4135 utime_t age
= now
- cap
->get_last_revoke_stamp();
4136 dout(20) << __func__
<< " age = " << age
<< " client." << cap
->get_client() << "." << cap
->get_inode()->ino() << dendl
;
4137 if (age
<= mds
->mdsmap
->get_session_timeout()) {
4138 dout(20) << __func__
<< " age below timeout " << mds
->mdsmap
->get_session_timeout() << dendl
;
4142 if (n
> MAX_WARN_CAPS
) {
4143 dout(1) << __func__
<< " more than " << MAX_WARN_CAPS
<< " caps are late"
4144 << "revoking, ignoring subsequent caps" << dendl
;
4148 // exponential backoff of warning intervals
4149 if (age
> mds
->mdsmap
->get_session_timeout() * (1 << cap
->get_num_revoke_warnings())) {
4150 cap
->inc_num_revoke_warnings();
4151 CachedStackStringStream css
;
4152 *css
<< "client." << cap
->get_client() << " isn't responding to mclientcaps(revoke), ino "
4153 << cap
->get_inode()->ino() << " pending " << ccap_string(cap
->pending())
4154 << " issued " << ccap_string(cap
->issued()) << ", sent " << age
<< " seconds ago";
4155 mds
->clog
->warn() << css
->strv();
4156 dout(20) << __func__
<< " " << css
->strv() << dendl
;
4158 dout(20) << __func__
<< " silencing log message (backoff) for " << "client." << cap
->get_client() << "." << cap
->get_inode()->ino() << dendl
;
4164 void Locker::handle_client_lease(const cref_t
<MClientLease
> &m
)
4166 dout(10) << "handle_client_lease " << *m
<< dendl
;
4168 ceph_assert(m
->get_source().is_client());
4169 client_t client
= m
->get_source().num();
4171 CInode
*in
= mdcache
->get_inode(m
->get_ino(), m
->get_last());
4173 dout(7) << "handle_client_lease don't have ino " << m
->get_ino() << "." << m
->get_last() << dendl
;
4178 frag_t fg
= in
->pick_dirfrag(m
->dname
);
4179 CDir
*dir
= in
->get_dirfrag(fg
);
4181 dn
= dir
->lookup(m
->dname
);
4183 dout(7) << "handle_client_lease don't have dn " << m
->get_ino() << " " << m
->dname
<< dendl
;
4186 dout(10) << " on " << *dn
<< dendl
;
4189 ClientLease
*l
= dn
->get_client_lease(client
);
4191 dout(7) << "handle_client_lease didn't have lease for client." << client
<< " of " << *dn
<< dendl
;
4195 switch (m
->get_action()) {
4196 case CEPH_MDS_LEASE_REVOKE_ACK
:
4197 case CEPH_MDS_LEASE_RELEASE
:
4198 if (l
->seq
!= m
->get_seq()) {
4199 dout(7) << "handle_client_lease release - seq " << l
->seq
<< " != provided " << m
->get_seq() << dendl
;
4201 dout(7) << "handle_client_lease client." << client
4202 << " on " << *dn
<< dendl
;
4203 dn
->remove_client_lease(l
, this);
4207 case CEPH_MDS_LEASE_RENEW
:
4209 dout(7) << "handle_client_lease client." << client
<< " renew on " << *dn
4210 << (!dn
->lock
.can_lease(client
)?", revoking lease":"") << dendl
;
4211 if (dn
->lock
.can_lease(client
)) {
4212 auto reply
= make_message
<MClientLease
>(*m
);
4213 int pool
= 1; // fixme.. do something smart!
4214 reply
->h
.duration_ms
= (int)(1000 * mdcache
->client_lease_durations
[pool
]);
4215 reply
->h
.seq
= ++l
->seq
;
4216 reply
->clear_payload();
4218 utime_t now
= ceph_clock_now();
4219 now
+= mdcache
->client_lease_durations
[pool
];
4220 mdcache
->touch_client_lease(l
, pool
, now
);
4222 mds
->send_message_client_counted(reply
, m
->get_connection());
4228 ceph_abort(); // implement me
4234 void Locker::issue_client_lease(CDentry
*dn
, MDRequestRef
&mdr
, int mask
,
4235 utime_t now
, bufferlist
&bl
)
4237 client_t client
= mdr
->get_client();
4238 Session
*session
= mdr
->session
;
4240 CInode
*diri
= dn
->get_dir()->get_inode();
4241 if (mdr
->snapid
== CEPH_NOSNAP
&&
4242 dn
->lock
.can_lease(client
) &&
4243 !diri
->is_stray() && // do not issue dn leases in stray dir!
4244 !diri
->filelock
.can_lease(client
) &&
4245 !(diri
->get_client_cap_pending(client
) & (CEPH_CAP_FILE_SHARED
| CEPH_CAP_FILE_EXCL
))) {
4246 // issue a dentry lease
4247 ClientLease
*l
= dn
->add_client_lease(client
, session
);
4248 session
->touch_lease(l
);
4250 int pool
= 1; // fixme.. do something smart!
4251 now
+= mdcache
->client_lease_durations
[pool
];
4252 mdcache
->touch_client_lease(l
, pool
, now
);
4255 lstat
.mask
= CEPH_LEASE_VALID
| mask
;
4256 lstat
.duration_ms
= (uint32_t)(1000 * mdcache
->client_lease_durations
[pool
]);
4257 lstat
.seq
= ++l
->seq
;
4258 lstat
.alternate_name
= std::string(dn
->alternate_name
);
4259 encode_lease(bl
, session
->info
, lstat
);
4260 dout(20) << "issue_client_lease seq " << lstat
.seq
<< " dur " << lstat
.duration_ms
<< "ms "
4261 << " on " << *dn
<< dendl
;
4266 lstat
.alternate_name
= std::string(dn
->alternate_name
);
4267 encode_lease(bl
, session
->info
, lstat
);
4268 dout(20) << "issue_client_lease no/null lease on " << *dn
<< dendl
;
4273 void Locker::revoke_client_leases(SimpleLock
*lock
)
4276 CDentry
*dn
= static_cast<CDentry
*>(lock
->get_parent());
4277 for (map
<client_t
, ClientLease
*>::iterator p
= dn
->client_lease_map
.begin();
4278 p
!= dn
->client_lease_map
.end();
4280 ClientLease
*l
= p
->second
;
4283 ceph_assert(lock
->get_type() == CEPH_LOCK_DN
);
4285 CDentry
*dn
= static_cast<CDentry
*>(lock
->get_parent());
4286 int mask
= 1 | CEPH_LOCK_DN
; // old and new bits
4288 // i should also revoke the dir ICONTENT lease, if they have it!
4289 CInode
*diri
= dn
->get_dir()->get_inode();
4290 auto lease
= make_message
<MClientLease
>(CEPH_MDS_LEASE_REVOKE
, l
->seq
, mask
, diri
->ino(), diri
->first
, CEPH_NOSNAP
, dn
->get_name());
4291 mds
->send_message_client_counted(lease
, l
->client
);
4295 void Locker::encode_lease(bufferlist
& bl
, const session_info_t
& info
,
4296 const LeaseStat
& ls
)
4298 if (info
.has_feature(CEPHFS_FEATURE_REPLY_ENCODING
)) {
4299 ENCODE_START(2, 1, bl
);
4300 encode(ls
.mask
, bl
);
4301 encode(ls
.duration_ms
, bl
);
4303 encode(ls
.alternate_name
, bl
);
4307 encode(ls
.mask
, bl
);
4308 encode(ls
.duration_ms
, bl
);
4313 // locks ----------------------------------------------------------------
4315 SimpleLock
*Locker::get_lock(int lock_type
, const MDSCacheObjectInfo
&info
)
4317 switch (lock_type
) {
4320 // be careful; info.dirfrag may have incorrect frag; recalculate based on dname.
4321 CInode
*diri
= mdcache
->get_inode(info
.dirfrag
.ino
);
4326 fg
= diri
->pick_dirfrag(info
.dname
);
4327 dir
= diri
->get_dirfrag(fg
);
4329 dn
= dir
->lookup(info
.dname
, info
.snapid
);
4332 dout(7) << "get_lock don't have dn " << info
.dirfrag
.ino
<< " " << info
.dname
<< dendl
;
4338 case CEPH_LOCK_IAUTH
:
4339 case CEPH_LOCK_ILINK
:
4340 case CEPH_LOCK_IDFT
:
4341 case CEPH_LOCK_IFILE
:
4342 case CEPH_LOCK_INEST
:
4343 case CEPH_LOCK_IXATTR
:
4344 case CEPH_LOCK_ISNAP
:
4345 case CEPH_LOCK_IFLOCK
:
4346 case CEPH_LOCK_IPOLICY
:
4348 CInode
*in
= mdcache
->get_inode(info
.ino
, info
.snapid
);
4350 dout(7) << "get_lock don't have ino " << info
.ino
<< dendl
;
4353 switch (lock_type
) {
4354 case CEPH_LOCK_IAUTH
: return &in
->authlock
;
4355 case CEPH_LOCK_ILINK
: return &in
->linklock
;
4356 case CEPH_LOCK_IDFT
: return &in
->dirfragtreelock
;
4357 case CEPH_LOCK_IFILE
: return &in
->filelock
;
4358 case CEPH_LOCK_INEST
: return &in
->nestlock
;
4359 case CEPH_LOCK_IXATTR
: return &in
->xattrlock
;
4360 case CEPH_LOCK_ISNAP
: return &in
->snaplock
;
4361 case CEPH_LOCK_IFLOCK
: return &in
->flocklock
;
4362 case CEPH_LOCK_IPOLICY
: return &in
->policylock
;
4367 dout(7) << "get_lock don't know lock_type " << lock_type
<< dendl
;
4375 void Locker::handle_lock(const cref_t
<MLock
> &m
)
4377 // nobody should be talking to us during recovery.
4378 ceph_assert(mds
->is_rejoin() || mds
->is_clientreplay() || mds
->is_active() || mds
->is_stopping());
4380 SimpleLock
*lock
= get_lock(m
->get_lock_type(), m
->get_object_info());
4382 dout(10) << "don't have object " << m
->get_object_info() << ", must have trimmed, dropping" << dendl
;
4386 switch (lock
->get_type()) {
4388 case CEPH_LOCK_IAUTH
:
4389 case CEPH_LOCK_ILINK
:
4390 case CEPH_LOCK_ISNAP
:
4391 case CEPH_LOCK_IXATTR
:
4392 case CEPH_LOCK_IFLOCK
:
4393 case CEPH_LOCK_IPOLICY
:
4394 handle_simple_lock(lock
, m
);
4397 case CEPH_LOCK_IDFT
:
4398 case CEPH_LOCK_INEST
:
4399 //handle_scatter_lock((ScatterLock*)lock, m);
4402 case CEPH_LOCK_IFILE
:
4403 handle_file_lock(static_cast<ScatterLock
*>(lock
), m
);
4407 dout(7) << "handle_lock got otype " << m
->get_lock_type() << dendl
;
4417 // ==========================================================================
4420 /** This function may take a reference to m if it needs one, but does
4421 * not put references. */
4422 void Locker::handle_reqrdlock(SimpleLock
*lock
, const cref_t
<MLock
> &m
)
4424 MDSCacheObject
*parent
= lock
->get_parent();
4425 if (parent
->is_auth() &&
4426 lock
->get_state() != LOCK_SYNC
&&
4427 !parent
->is_frozen()) {
4428 dout(7) << "handle_reqrdlock got rdlock request on " << *lock
4429 << " on " << *parent
<< dendl
;
4430 ceph_assert(parent
->is_auth()); // replica auth pinned if they're doing this!
4431 if (lock
->is_stable()) {
4434 dout(7) << "handle_reqrdlock delaying request until lock is stable" << dendl
;
4435 lock
->add_waiter(SimpleLock::WAIT_STABLE
| MDSCacheObject::WAIT_UNFREEZE
,
4436 new C_MDS_RetryMessage(mds
, m
));
4439 dout(7) << "handle_reqrdlock dropping rdlock request on " << *lock
4440 << " on " << *parent
<< dendl
;
4441 // replica should retry
4445 void Locker::handle_simple_lock(SimpleLock
*lock
, const cref_t
<MLock
> &m
)
4447 int from
= m
->get_asker();
4449 dout(10) << "handle_simple_lock " << *m
4450 << " on " << *lock
<< " " << *lock
->get_parent() << dendl
;
4452 if (mds
->is_rejoin()) {
4453 if (lock
->get_parent()->is_rejoining()) {
4454 dout(7) << "handle_simple_lock still rejoining " << *lock
->get_parent()
4455 << ", dropping " << *m
<< dendl
;
4460 switch (m
->get_action()) {
4463 ceph_assert(lock
->get_state() == LOCK_LOCK
);
4464 lock
->decode_locked_state(m
->get_data());
4465 lock
->set_state(LOCK_SYNC
);
4466 lock
->finish_waiters(SimpleLock::WAIT_RD
|SimpleLock::WAIT_STABLE
);
4470 ceph_assert(lock
->get_state() == LOCK_SYNC
);
4471 lock
->set_state(LOCK_SYNC_LOCK
);
4472 if (lock
->is_leased())
4473 revoke_client_leases(lock
);
4474 eval_gather(lock
, true);
4475 if (lock
->is_unstable_and_locked()) {
4476 if (lock
->is_cached())
4477 invalidate_lock_caches(lock
);
4478 mds
->mdlog
->flush();
4484 case LOCK_AC_LOCKACK
:
4485 ceph_assert(lock
->get_state() == LOCK_SYNC_LOCK
||
4486 lock
->get_state() == LOCK_SYNC_EXCL
);
4487 ceph_assert(lock
->is_gathering(from
));
4488 lock
->remove_gather(from
);
4490 if (lock
->is_gathering()) {
4491 dout(7) << "handle_simple_lock " << *lock
<< " on " << *lock
->get_parent() << " from " << from
4492 << ", still gathering " << lock
->get_gather_set() << dendl
;
4494 dout(7) << "handle_simple_lock " << *lock
<< " on " << *lock
->get_parent() << " from " << from
4495 << ", last one" << dendl
;
4500 case LOCK_AC_REQRDLOCK
:
4501 handle_reqrdlock(lock
, m
);
4507 /* unused, currently.
4509 class C_Locker_SimpleEval : public Context {
4513 C_Locker_SimpleEval(Locker *l, SimpleLock *lk) : locker(l), lock(lk) {}
4514 void finish(int r) {
4515 locker->try_simple_eval(lock);
4519 void Locker::try_simple_eval(SimpleLock *lock)
4521 // unstable and ambiguous auth?
4522 if (!lock->is_stable() &&
4523 lock->get_parent()->is_ambiguous_auth()) {
4524 dout(7) << "simple_eval not stable and ambiguous auth, waiting on " << *lock->get_parent() << dendl;
4525 //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
4526 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_SimpleEval(this, lock));
4530 if (!lock->get_parent()->is_auth()) {
4531 dout(7) << "try_simple_eval not auth for " << *lock->get_parent() << dendl;
4535 if (!lock->get_parent()->can_auth_pin()) {
4536 dout(7) << "try_simple_eval can't auth_pin, waiting on " << *lock->get_parent() << dendl;
4537 //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
4538 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_SimpleEval(this, lock));
4542 if (lock->is_stable())
4548 void Locker::simple_eval(SimpleLock
*lock
, bool *need_issue
)
4550 dout(10) << "simple_eval " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4552 ceph_assert(lock
->get_parent()->is_auth());
4553 ceph_assert(lock
->is_stable());
4555 if (lock
->get_parent()->is_freezing_or_frozen()) {
4556 // dentry/snap lock in unreadable state can block path traverse
4557 if ((lock
->get_type() != CEPH_LOCK_DN
&&
4558 lock
->get_type() != CEPH_LOCK_ISNAP
&&
4559 lock
->get_type() != CEPH_LOCK_IPOLICY
) ||
4560 lock
->get_state() == LOCK_SYNC
||
4561 lock
->get_parent()->is_frozen())
4565 if (mdcache
->is_readonly()) {
4566 if (lock
->get_state() != LOCK_SYNC
) {
4567 dout(10) << "simple_eval read-only FS, syncing " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4568 simple_sync(lock
, need_issue
);
4575 if (lock
->get_cap_shift()) {
4576 in
= static_cast<CInode
*>(lock
->get_parent());
4577 in
->get_caps_wanted(&wanted
, NULL
, lock
->get_cap_shift());
4581 if (lock
->get_state() != LOCK_EXCL
&&
4582 in
&& in
->get_target_loner() >= 0 &&
4583 (wanted
& CEPH_CAP_GEXCL
)) {
4584 dout(7) << "simple_eval stable, going to excl " << *lock
4585 << " on " << *lock
->get_parent() << dendl
;
4586 simple_excl(lock
, need_issue
);
4590 else if (lock
->get_state() != LOCK_SYNC
&&
4591 !lock
->is_wrlocked() &&
4592 ((!(wanted
& CEPH_CAP_GEXCL
) && !lock
->is_waiter_for(SimpleLock::WAIT_WR
)) ||
4593 (lock
->get_state() == LOCK_EXCL
&& in
&& in
->get_target_loner() < 0))) {
4594 dout(7) << "simple_eval stable, syncing " << *lock
4595 << " on " << *lock
->get_parent() << dendl
;
4596 simple_sync(lock
, need_issue
);
4603 bool Locker::simple_sync(SimpleLock
*lock
, bool *need_issue
)
4605 dout(7) << "simple_sync on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4606 ceph_assert(lock
->get_parent()->is_auth());
4607 ceph_assert(lock
->is_stable());
4610 if (lock
->get_cap_shift())
4611 in
= static_cast<CInode
*>(lock
->get_parent());
4613 int old_state
= lock
->get_state();
4615 if (old_state
!= LOCK_TSYN
) {
4617 switch (lock
->get_state()) {
4618 case LOCK_MIX
: lock
->set_state(LOCK_MIX_SYNC
); break;
4619 case LOCK_LOCK
: lock
->set_state(LOCK_LOCK_SYNC
); break;
4620 case LOCK_XSYN
: lock
->set_state(LOCK_XSYN_SYNC
); break;
4621 case LOCK_EXCL
: lock
->set_state(LOCK_EXCL_SYNC
); break;
4622 default: ceph_abort();
4626 if (lock
->is_wrlocked()) {
4628 if (lock
->is_cached())
4629 invalidate_lock_caches(lock
);
4631 // After a client request is early replied the mdlog won't be flushed
4632 // immediately, but before safe replied the request will hold the write
4633 // locks. So if the client sends another request to a different MDS
4634 // daemon, which then needs to request read lock from current MDS daemon,
4635 // then that daemon maybe stuck at most for 5 seconds. Which will lead
4636 // the client stuck at most 5 seconds.
4638 // Let's try to flush the mdlog when the write lock is held, which will
4639 // release the write locks after mdlog is successfully flushed.
4640 mds
->mdlog
->flush();
4643 if (lock
->get_parent()->is_replicated() && old_state
== LOCK_MIX
) {
4644 send_lock_message(lock
, LOCK_AC_SYNC
);
4645 lock
->init_gather();
4649 if (in
&& in
->is_head()) {
4650 if (in
->issued_caps_need_gather(lock
)) {
4659 bool need_recover
= false;
4660 if (lock
->get_type() == CEPH_LOCK_IFILE
) {
4662 if (in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
4663 mds
->mdcache
->queue_file_recover(in
);
4664 need_recover
= true;
4669 if (!gather
&& lock
->is_dirty()) {
4670 lock
->get_parent()->auth_pin(lock
);
4671 scatter_writebehind(static_cast<ScatterLock
*>(lock
));
4672 mds
->mdlog
->flush();
4677 lock
->get_parent()->auth_pin(lock
);
4679 mds
->mdcache
->do_file_recover();
4684 if (lock
->get_parent()->is_replicated()) { // FIXME
4686 lock
->encode_locked_state(data
);
4687 send_lock_message(lock
, LOCK_AC_SYNC
, data
);
4689 lock
->set_state(LOCK_SYNC
);
4690 lock
->finish_waiters(SimpleLock::WAIT_RD
|SimpleLock::WAIT_STABLE
);
4691 if (in
&& in
->is_head()) {
4700 void Locker::simple_excl(SimpleLock
*lock
, bool *need_issue
)
4702 dout(7) << "simple_excl on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4703 ceph_assert(lock
->get_parent()->is_auth());
4704 ceph_assert(lock
->is_stable());
4707 if (lock
->get_cap_shift())
4708 in
= static_cast<CInode
*>(lock
->get_parent());
4710 switch (lock
->get_state()) {
4711 case LOCK_LOCK
: lock
->set_state(LOCK_LOCK_EXCL
); break;
4712 case LOCK_SYNC
: lock
->set_state(LOCK_SYNC_EXCL
); break;
4713 case LOCK_XSYN
: lock
->set_state(LOCK_XSYN_EXCL
); break;
4714 default: ceph_abort();
4718 if (lock
->is_rdlocked())
4720 if (lock
->is_wrlocked())
4722 if (gather
&& lock
->is_cached())
4723 invalidate_lock_caches(lock
);
4725 if (lock
->get_parent()->is_replicated() &&
4726 lock
->get_state() != LOCK_LOCK_EXCL
&&
4727 lock
->get_state() != LOCK_XSYN_EXCL
) {
4728 send_lock_message(lock
, LOCK_AC_LOCK
);
4729 lock
->init_gather();
4733 if (in
&& in
->is_head()) {
4734 if (in
->issued_caps_need_gather(lock
)) {
4744 lock
->get_parent()->auth_pin(lock
);
4746 lock
->set_state(LOCK_EXCL
);
4747 lock
->finish_waiters(SimpleLock::WAIT_WR
|SimpleLock::WAIT_STABLE
);
4757 void Locker::simple_lock(SimpleLock
*lock
, bool *need_issue
)
4759 dout(7) << "simple_lock on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4760 ceph_assert(lock
->get_parent()->is_auth());
4761 ceph_assert(lock
->is_stable());
4762 ceph_assert(lock
->get_state() != LOCK_LOCK
);
4765 if (lock
->get_cap_shift())
4766 in
= static_cast<CInode
*>(lock
->get_parent());
4768 int old_state
= lock
->get_state();
4770 switch (lock
->get_state()) {
4771 case LOCK_SYNC
: lock
->set_state(LOCK_SYNC_LOCK
); break;
4772 case LOCK_XSYN
: lock
->set_state(LOCK_XSYN_LOCK
); break;
4773 case LOCK_EXCL
: lock
->set_state(LOCK_EXCL_LOCK
); break;
4774 case LOCK_MIX
: lock
->set_state(LOCK_MIX_LOCK
);
4775 (static_cast<ScatterLock
*>(lock
))->clear_unscatter_wanted();
4777 case LOCK_TSYN
: lock
->set_state(LOCK_TSYN_LOCK
); break;
4778 default: ceph_abort();
4782 if (lock
->is_leased()) {
4784 revoke_client_leases(lock
);
4786 if (lock
->is_rdlocked()) {
4787 if (lock
->is_cached())
4788 invalidate_lock_caches(lock
);
4791 if (in
&& in
->is_head()) {
4792 if (in
->issued_caps_need_gather(lock
)) {
4801 bool need_recover
= false;
4802 if (lock
->get_type() == CEPH_LOCK_IFILE
) {
4804 if(in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
4805 mds
->mdcache
->queue_file_recover(in
);
4806 need_recover
= true;
4811 if (lock
->get_parent()->is_replicated() &&
4812 lock
->get_state() == LOCK_MIX_LOCK
&&
4814 dout(10) << " doing local stage of mix->lock gather before gathering from replicas" << dendl
;
4816 // move to second stage of gather now, so we don't send the lock action later.
4817 if (lock
->get_state() == LOCK_MIX_LOCK
)
4818 lock
->set_state(LOCK_MIX_LOCK2
);
4820 if (lock
->get_parent()->is_replicated() &&
4821 lock
->get_sm()->states
[old_state
].replica_state
!= LOCK_LOCK
) { // replica may already be LOCK
4823 send_lock_message(lock
, LOCK_AC_LOCK
);
4824 lock
->init_gather();
4828 if (!gather
&& lock
->is_dirty()) {
4829 lock
->get_parent()->auth_pin(lock
);
4830 scatter_writebehind(static_cast<ScatterLock
*>(lock
));
4831 mds
->mdlog
->flush();
4836 lock
->get_parent()->auth_pin(lock
);
4838 mds
->mdcache
->do_file_recover();
4840 lock
->set_state(LOCK_LOCK
);
4841 lock
->finish_waiters(ScatterLock::WAIT_XLOCK
|ScatterLock::WAIT_WR
|ScatterLock::WAIT_STABLE
);
4846 void Locker::simple_xlock(SimpleLock
*lock
)
4848 dout(7) << "simple_xlock on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4849 ceph_assert(lock
->get_parent()->is_auth());
4850 //assert(lock->is_stable());
4851 ceph_assert(lock
->get_state() != LOCK_XLOCK
);
4854 if (lock
->get_cap_shift())
4855 in
= static_cast<CInode
*>(lock
->get_parent());
4857 if (lock
->is_stable())
4858 lock
->get_parent()->auth_pin(lock
);
4860 switch (lock
->get_state()) {
4862 case LOCK_XLOCKDONE
: lock
->set_state(LOCK_LOCK_XLOCK
); break;
4863 default: ceph_abort();
4867 if (lock
->is_rdlocked())
4869 if (lock
->is_wrlocked())
4871 if (gather
&& lock
->is_cached())
4872 invalidate_lock_caches(lock
);
4874 if (in
&& in
->is_head()) {
4875 if (in
->issued_caps_need_gather(lock
)) {
4882 lock
->set_state(LOCK_PREXLOCK
);
4883 //assert("shouldn't be called if we are already xlockable" == 0);
4891 // ==========================================================================
4896 Some notes on scatterlocks.
4898 - The scatter/gather is driven by the inode lock. The scatter always
4899 brings in the latest metadata from the fragments.
4901 - When in a scattered/MIX state, fragments are only allowed to
4902 update/be written to if the accounted stat matches the inode's
4905 - That means, on gather, we _only_ assimilate diffs for frag metadata
4906 that match the current version, because those are the only ones
4907 written during this scatter/gather cycle. (Others didn't permit
4908 it.) We increment the version and journal this to disk.
4910 - When possible, we also simultaneously update our local frag
4911 accounted stats to match.
4913 - On scatter, the new inode info is broadcast to frags, both local
4914 and remote. If possible (auth and !frozen), the dirfrag auth
4915 should update the accounted state (if it isn't already up to date).
4916 Note that this may occur on both the local inode auth node and
4917 inode replicas, so there are two potential paths. If it is NOT
4918 possible, they need to mark_stale to prevent any possible writes.
4920 - A scatter can be to MIX (potentially writeable) or to SYNC (read
4921 only). Both are opportunities to update the frag accounted stats,
4922 even though only the MIX case is affected by a stale dirfrag.
4924 - Because many scatter/gather cycles can potentially go by without a
4925 frag being able to update its accounted stats (due to being frozen
4926 by exports/refragments in progress), the frag may have (even very)
4927 old stat versions. That's fine. If when we do want to update it,
4928 we can update accounted_* and the version first.
4932 class C_Locker_ScatterWB
: public LockerLogContext
{
4936 C_Locker_ScatterWB(Locker
*l
, ScatterLock
*sl
, MutationRef
& m
) :
4937 LockerLogContext(l
), lock(sl
), mut(m
) {}
4938 void finish(int r
) override
{
4939 locker
->scatter_writebehind_finish(lock
, mut
);
4943 void Locker::scatter_writebehind(ScatterLock
*lock
)
4945 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4946 dout(10) << "scatter_writebehind " << in
->get_inode()->mtime
<< " on " << *lock
<< " on " << *in
<< dendl
;
4949 MutationRef
mut(new MutationImpl());
4950 mut
->ls
= mds
->mdlog
->get_current_segment();
4952 // forcefully take a wrlock
4953 lock
->get_wrlock(true);
4954 mut
->emplace_lock(lock
, MutationImpl::LockOp::WRLOCK
);
4956 in
->pre_cow_old_inode(); // avoid cow mayhem
4958 auto pi
= in
->project_inode(mut
);
4959 pi
.inode
->version
= in
->pre_dirty();
4961 in
->finish_scatter_gather_update(lock
->get_type(), mut
);
4962 lock
->start_flush();
4964 EUpdate
*le
= new EUpdate(mds
->mdlog
, "scatter_writebehind");
4965 mds
->mdlog
->start_entry(le
);
4967 mdcache
->predirty_journal_parents(mut
, &le
->metablob
, in
, 0, PREDIRTY_PRIMARY
);
4968 mdcache
->journal_dirty_inode(mut
.get(), &le
->metablob
, in
);
4970 in
->finish_scatter_gather_update_accounted(lock
->get_type(), &le
->metablob
);
4972 mds
->mdlog
->submit_entry(le
, new C_Locker_ScatterWB(this, lock
, mut
));
4975 void Locker::scatter_writebehind_finish(ScatterLock
*lock
, MutationRef
& mut
)
4977 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4978 dout(10) << "scatter_writebehind_finish on " << *lock
<< " on " << *in
<< dendl
;
4982 lock
->finish_flush();
4984 // if replicas may have flushed in a mix->lock state, send another
4985 // message so they can finish_flush().
4986 if (in
->is_replicated()) {
4987 switch (lock
->get_state()) {
4989 case LOCK_MIX_LOCK2
:
4992 send_lock_message(lock
, LOCK_AC_LOCKFLUSHED
);
4996 drop_locks(mut
.get());
4999 if (lock
->is_stable())
5000 lock
->finish_waiters(ScatterLock::WAIT_STABLE
);
5002 //scatter_eval_gather(lock);
5005 void Locker::scatter_eval(ScatterLock
*lock
, bool *need_issue
)
5007 dout(10) << "scatter_eval " << *lock
<< " on " << *lock
->get_parent() << dendl
;
5009 ceph_assert(lock
->get_parent()->is_auth());
5010 ceph_assert(lock
->is_stable());
5012 if (lock
->get_parent()->is_freezing_or_frozen()) {
5013 dout(20) << " freezing|frozen" << dendl
;
5017 if (mdcache
->is_readonly()) {
5018 if (lock
->get_state() != LOCK_SYNC
) {
5019 dout(10) << "scatter_eval read-only FS, syncing " << *lock
<< " on " << *lock
->get_parent() << dendl
;
5020 simple_sync(lock
, need_issue
);
5025 if (!lock
->is_rdlocked() &&
5026 lock
->get_state() != LOCK_MIX
&&
5027 lock
->get_scatter_wanted()) {
5028 dout(10) << "scatter_eval scatter_wanted, bump to mix " << *lock
5029 << " on " << *lock
->get_parent() << dendl
;
5030 scatter_mix(lock
, need_issue
);
5034 if (lock
->get_type() == CEPH_LOCK_INEST
) {
5035 // in general, we want to keep INEST writable at all times.
5036 if (!lock
->is_rdlocked()) {
5037 if (lock
->get_parent()->is_replicated()) {
5038 if (lock
->get_state() != LOCK_MIX
)
5039 scatter_mix(lock
, need_issue
);
5041 if (lock
->get_state() != LOCK_LOCK
)
5042 simple_lock(lock
, need_issue
);
5048 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
5049 if (!in
->has_subtree_or_exporting_dirfrag() || in
->is_base()) {
5050 // i _should_ be sync.
5051 if (!lock
->is_wrlocked() &&
5052 lock
->get_state() != LOCK_SYNC
) {
5053 dout(10) << "scatter_eval no wrlocks|xlocks, not subtree root inode, syncing" << dendl
;
5054 simple_sync(lock
, need_issue
);
5061 * mark a scatterlock to indicate that the dir fnode has some dirty data
5063 void Locker::mark_updated_scatterlock(ScatterLock
*lock
)
5066 if (lock
->get_updated_item()->is_on_list()) {
5067 dout(10) << "mark_updated_scatterlock " << *lock
5068 << " - already on list since " << lock
->get_update_stamp() << dendl
;
5070 updated_scatterlocks
.push_back(lock
->get_updated_item());
5071 utime_t now
= ceph_clock_now();
5072 lock
->set_update_stamp(now
);
5073 dout(10) << "mark_updated_scatterlock " << *lock
5074 << " - added at " << now
<< dendl
;
5079 * this is called by scatter_tick and LogSegment::try_to_trim() when
5080 * trying to flush dirty scattered data (i.e. updated fnode) back to
5083 * we need to lock|scatter in order to push fnode changes into the
5086 void Locker::scatter_nudge(ScatterLock
*lock
, MDSContext
*c
, bool forcelockchange
)
5088 CInode
*p
= static_cast<CInode
*>(lock
->get_parent());
5090 if (p
->is_frozen() || p
->is_freezing()) {
5091 dout(10) << "scatter_nudge waiting for unfreeze on " << *p
<< dendl
;
5093 p
->add_waiter(MDSCacheObject::WAIT_UNFREEZE
, c
);
5094 else if (lock
->is_dirty())
5095 // just requeue. not ideal.. starvation prone..
5096 updated_scatterlocks
.push_back(lock
->get_updated_item());
5100 if (p
->is_ambiguous_auth()) {
5101 dout(10) << "scatter_nudge waiting for single auth on " << *p
<< dendl
;
5103 p
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
, c
);
5104 else if (lock
->is_dirty())
5105 // just requeue. not ideal.. starvation prone..
5106 updated_scatterlocks
.push_back(lock
->get_updated_item());
5113 if (lock
->is_stable()) {
5114 // can we do it now?
5115 // (only if we're not replicated.. if we are, we really do need
5116 // to nudge the lock state!)
5118 actually, even if we're not replicated, we can't stay in MIX, because another mds
5119 could discover and replicate us at any time. if that happens while we're flushing,
5120 they end up in MIX but their inode has the old scatterstat version.
5122 if (!forcelockchange && !lock->get_parent()->is_replicated() && lock->can_wrlock(-1)) {
5123 dout(10) << "scatter_nudge auth, propagating " << *lock << " on " << *p << dendl;
5124 scatter_writebehind(lock);
5126 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
5131 if (mdcache
->is_readonly()) {
5132 if (lock
->get_state() != LOCK_SYNC
) {
5133 dout(10) << "scatter_nudge auth, read-only FS, syncing " << *lock
<< " on " << *p
<< dendl
;
5134 simple_sync(static_cast<ScatterLock
*>(lock
));
5139 // adjust lock state
5140 dout(10) << "scatter_nudge auth, scatter/unscattering " << *lock
<< " on " << *p
<< dendl
;
5141 switch (lock
->get_type()) {
5142 case CEPH_LOCK_IFILE
:
5143 if (p
->is_replicated() && lock
->get_state() != LOCK_MIX
)
5144 scatter_mix(static_cast<ScatterLock
*>(lock
));
5145 else if (lock
->get_state() != LOCK_LOCK
)
5146 simple_lock(static_cast<ScatterLock
*>(lock
));
5148 simple_sync(static_cast<ScatterLock
*>(lock
));
5151 case CEPH_LOCK_IDFT
:
5152 case CEPH_LOCK_INEST
:
5153 if (p
->is_replicated() && lock
->get_state() != LOCK_MIX
)
5155 else if (lock
->get_state() != LOCK_LOCK
)
5164 if (lock
->is_stable() && count
== 2) {
5165 dout(10) << "scatter_nudge oh, stable after two cycles." << dendl
;
5166 // this should only realy happen when called via
5167 // handle_file_lock due to AC_NUDGE, because the rest of the
5168 // time we are replicated or have dirty data and won't get
5169 // called. bailing here avoids an infinite loop.
5174 dout(10) << "scatter_nudge auth, waiting for stable " << *lock
<< " on " << *p
<< dendl
;
5176 lock
->add_waiter(SimpleLock::WAIT_STABLE
, c
);
5181 dout(10) << "scatter_nudge replica, requesting scatter/unscatter of "
5182 << *lock
<< " on " << *p
<< dendl
;
5183 // request unscatter?
5184 mds_rank_t auth
= lock
->get_parent()->authority().first
;
5185 if (!mds
->is_cluster_degraded() || mds
->mdsmap
->is_clientreplay_or_active_or_stopping(auth
)) {
5186 mds
->send_message_mds(make_message
<MLock
>(lock
, LOCK_AC_NUDGE
, mds
->get_nodeid()), auth
);
5191 lock
->add_waiter(SimpleLock::WAIT_STABLE
, c
);
5193 // also, requeue, in case we had wrong auth or something
5194 if (lock
->is_dirty())
5195 updated_scatterlocks
.push_back(lock
->get_updated_item());
5199 void Locker::scatter_tick()
5201 dout(10) << "scatter_tick" << dendl
;
5204 utime_t now
= ceph_clock_now();
5205 int n
= updated_scatterlocks
.size();
5206 while (!updated_scatterlocks
.empty()) {
5207 ScatterLock
*lock
= updated_scatterlocks
.front();
5209 if (n
-- == 0) break; // scatter_nudge() may requeue; avoid looping
5211 if (!lock
->is_dirty()) {
5212 updated_scatterlocks
.pop_front();
5213 dout(10) << " removing from updated_scatterlocks "
5214 << *lock
<< " " << *lock
->get_parent() << dendl
;
5217 if (now
- lock
->get_update_stamp() < g_conf()->mds_scatter_nudge_interval
)
5219 updated_scatterlocks
.pop_front();
5220 scatter_nudge(lock
, 0);
5222 mds
->mdlog
->flush();
5226 void Locker::scatter_tempsync(ScatterLock
*lock
, bool *need_issue
)
5228 dout(10) << "scatter_tempsync " << *lock
5229 << " on " << *lock
->get_parent() << dendl
;
5230 ceph_assert(lock
->get_parent()->is_auth());
5231 ceph_assert(lock
->is_stable());
5233 ceph_abort_msg("not fully implemented, at least not for filelock");
5235 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
5237 switch (lock
->get_state()) {
5238 case LOCK_SYNC
: ceph_abort(); // this shouldn't happen
5239 case LOCK_LOCK
: lock
->set_state(LOCK_LOCK_TSYN
); break;
5240 case LOCK_MIX
: lock
->set_state(LOCK_MIX_TSYN
); break;
5241 default: ceph_abort();
5245 if (lock
->is_wrlocked()) {
5246 if (lock
->is_cached())
5247 invalidate_lock_caches(lock
);
5251 if (lock
->get_cap_shift() &&
5253 in
->issued_caps_need_gather(lock
)) {
5261 if (lock
->get_state() == LOCK_MIX_TSYN
&&
5262 in
->is_replicated()) {
5263 lock
->init_gather();
5264 send_lock_message(lock
, LOCK_AC_LOCK
);
5272 lock
->set_state(LOCK_TSYN
);
5273 lock
->finish_waiters(ScatterLock::WAIT_RD
|ScatterLock::WAIT_STABLE
);
5274 if (lock
->get_cap_shift()) {
5285 // ==========================================================================
5288 void Locker::local_wrlock_grab(LocalLockC
*lock
, MutationRef
& mut
)
5290 dout(7) << "local_wrlock_grab on " << *lock
5291 << " on " << *lock
->get_parent() << dendl
;
5293 ceph_assert(lock
->get_parent()->is_auth());
5294 ceph_assert(lock
->can_wrlock());
5295 lock
->get_wrlock(mut
->get_client());
5297 auto it
= mut
->emplace_lock(lock
, MutationImpl::LockOp::WRLOCK
);
5298 ceph_assert(it
->is_wrlock());
5301 bool Locker::local_wrlock_start(LocalLockC
*lock
, MDRequestRef
& mut
)
5303 dout(7) << "local_wrlock_start on " << *lock
5304 << " on " << *lock
->get_parent() << dendl
;
5306 ceph_assert(lock
->get_parent()->is_auth());
5307 if (lock
->can_wrlock()) {
5308 lock
->get_wrlock(mut
->get_client());
5309 auto it
= mut
->emplace_lock(lock
, MutationImpl::LockOp::WRLOCK
);
5310 ceph_assert(it
->is_wrlock());
5313 lock
->add_waiter(SimpleLock::WAIT_WR
|SimpleLock::WAIT_STABLE
, new C_MDS_RetryRequest(mdcache
, mut
));
5318 void Locker::local_wrlock_finish(const MutationImpl::lock_iterator
& it
, MutationImpl
*mut
)
5320 ceph_assert(it
->is_wrlock());
5321 LocalLockC
*lock
= static_cast<LocalLockC
*>(it
->lock
);
5322 dout(7) << "local_wrlock_finish on " << *lock
5323 << " on " << *lock
->get_parent() << dendl
;
5325 mut
->locks
.erase(it
);
5326 if (lock
->get_num_wrlocks() == 0) {
5327 lock
->finish_waiters(SimpleLock::WAIT_STABLE
|
5328 SimpleLock::WAIT_WR
|
5329 SimpleLock::WAIT_RD
);
5333 bool Locker::local_xlock_start(LocalLockC
*lock
, MDRequestRef
& mut
)
5335 dout(7) << "local_xlock_start on " << *lock
5336 << " on " << *lock
->get_parent() << dendl
;
5338 ceph_assert(lock
->get_parent()->is_auth());
5339 if (!lock
->can_xlock_local()) {
5340 lock
->add_waiter(SimpleLock::WAIT_WR
|SimpleLock::WAIT_STABLE
, new C_MDS_RetryRequest(mdcache
, mut
));
5344 lock
->get_xlock(mut
, mut
->get_client());
5345 mut
->emplace_lock(lock
, MutationImpl::LockOp::XLOCK
);
5349 void Locker::local_xlock_finish(const MutationImpl::lock_iterator
& it
, MutationImpl
*mut
)
5351 ceph_assert(it
->is_xlock());
5352 LocalLockC
*lock
= static_cast<LocalLockC
*>(it
->lock
);
5353 dout(7) << "local_xlock_finish on " << *lock
5354 << " on " << *lock
->get_parent() << dendl
;
5356 mut
->locks
.erase(it
);
5358 lock
->finish_waiters(SimpleLock::WAIT_STABLE
|
5359 SimpleLock::WAIT_WR
|
5360 SimpleLock::WAIT_RD
);
5365 // ==========================================================================
5369 void Locker::file_eval(ScatterLock
*lock
, bool *need_issue
)
5371 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
5372 int loner_wanted
, other_wanted
;
5373 int wanted
= in
->get_caps_wanted(&loner_wanted
, &other_wanted
, CEPH_CAP_SFILE
);
5374 dout(7) << "file_eval wanted=" << gcap_string(wanted
)
5375 << " loner_wanted=" << gcap_string(loner_wanted
)
5376 << " other_wanted=" << gcap_string(other_wanted
)
5377 << " filelock=" << *lock
<< " on " << *lock
->get_parent()
5380 ceph_assert(lock
->get_parent()->is_auth());
5381 ceph_assert(lock
->is_stable());
5383 if (lock
->get_parent()->is_freezing_or_frozen())
5386 if (mdcache
->is_readonly()) {
5387 if (lock
->get_state() != LOCK_SYNC
) {
5388 dout(10) << "file_eval read-only FS, syncing " << *lock
<< " on " << *lock
->get_parent() << dendl
;
5389 simple_sync(lock
, need_issue
);
5395 if (lock
->get_state() == LOCK_EXCL
) {
5396 dout(20) << " is excl" << dendl
;
5397 int loner_issued
, other_issued
, xlocker_issued
;
5398 in
->get_caps_issued(&loner_issued
, &other_issued
, &xlocker_issued
, CEPH_CAP_SFILE
);
5399 dout(7) << "file_eval loner_issued=" << gcap_string(loner_issued
)
5400 << " other_issued=" << gcap_string(other_issued
)
5401 << " xlocker_issued=" << gcap_string(xlocker_issued
)
5403 if (!((loner_wanted
|loner_issued
) & (CEPH_CAP_GEXCL
|CEPH_CAP_GWR
|CEPH_CAP_GBUFFER
)) ||
5404 (other_wanted
& (CEPH_CAP_GEXCL
|CEPH_CAP_GWR
|CEPH_CAP_GRD
)) ||
5405 (in
->is_dir() && in
->multiple_nonstale_caps())) { // FIXME.. :/
5406 dout(20) << " should lose it" << dendl
;
5407 // we should lose it.
5418 // -> any writer means MIX; RD doesn't matter.
5419 if (((other_wanted
|loner_wanted
) & CEPH_CAP_GWR
) ||
5420 lock
->is_waiter_for(SimpleLock::WAIT_WR
))
5421 scatter_mix(lock
, need_issue
);
5422 else if (!lock
->is_wrlocked()) // let excl wrlocks drain first
5423 simple_sync(lock
, need_issue
);
5425 dout(10) << " waiting for wrlock to drain" << dendl
;
5430 else if (lock
->get_state() != LOCK_EXCL
&&
5431 !lock
->is_rdlocked() &&
5432 //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
5433 in
->get_target_loner() >= 0 &&
5435 !in
->has_subtree_or_exporting_dirfrag() :
5436 (wanted
& (CEPH_CAP_GEXCL
|CEPH_CAP_GWR
|CEPH_CAP_GBUFFER
)))) {
5437 dout(7) << "file_eval stable, bump to loner " << *lock
5438 << " on " << *lock
->get_parent() << dendl
;
5439 file_excl(lock
, need_issue
);
5443 else if (lock
->get_state() != LOCK_MIX
&&
5444 !lock
->is_rdlocked() &&
5445 //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
5446 (lock
->get_scatter_wanted() ||
5447 (in
->get_target_loner() < 0 && (wanted
& CEPH_CAP_GWR
)))) {
5448 dout(7) << "file_eval stable, bump to mixed " << *lock
5449 << " on " << *lock
->get_parent() << dendl
;
5450 scatter_mix(lock
, need_issue
);
5454 else if (lock
->get_state() != LOCK_SYNC
&&
5455 !lock
->is_wrlocked() && // drain wrlocks first!
5456 !lock
->is_waiter_for(SimpleLock::WAIT_WR
) &&
5457 !(wanted
& CEPH_CAP_GWR
) &&
5458 !((lock
->get_state() == LOCK_MIX
) &&
5459 in
->is_dir() && in
->has_subtree_or_exporting_dirfrag()) // if we are a delegation point, stay where we are
5460 //((wanted & CEPH_CAP_RD) ||
5461 //in->is_replicated() ||
5462 //lock->is_leased() ||
5463 //(!loner && lock->get_state() == LOCK_EXCL)) &&
5465 dout(7) << "file_eval stable, bump to sync " << *lock
5466 << " on " << *lock
->get_parent() << dendl
;
5467 simple_sync(lock
, need_issue
);
5469 else if (in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
5470 mds
->mdcache
->queue_file_recover(in
);
5471 mds
->mdcache
->do_file_recover();
5477 void Locker::scatter_mix(ScatterLock
*lock
, bool *need_issue
)
5479 dout(7) << "scatter_mix " << *lock
<< " on " << *lock
->get_parent() << dendl
;
5481 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
5482 ceph_assert(in
->is_auth());
5483 ceph_assert(lock
->is_stable());
5485 if (lock
->get_state() == LOCK_LOCK
) {
5486 in
->start_scatter(lock
);
5487 if (in
->is_replicated()) {
5489 bufferlist softdata
;
5490 lock
->encode_locked_state(softdata
);
5492 // bcast to replicas
5493 send_lock_message(lock
, LOCK_AC_MIX
, softdata
);
5497 lock
->set_state(LOCK_MIX
);
5498 lock
->clear_scatter_wanted();
5499 if (lock
->get_cap_shift()) {
5507 switch (lock
->get_state()) {
5508 case LOCK_SYNC
: lock
->set_state(LOCK_SYNC_MIX
); break;
5509 case LOCK_EXCL
: lock
->set_state(LOCK_EXCL_MIX
); break;
5510 case LOCK_XSYN
: lock
->set_state(LOCK_XSYN_MIX
); break;
5511 case LOCK_TSYN
: lock
->set_state(LOCK_TSYN_MIX
); break;
5512 default: ceph_abort();
5516 if (lock
->is_rdlocked()) {
5517 if (lock
->is_cached())
5518 invalidate_lock_caches(lock
);
5521 if (in
->is_replicated()) {
5522 if (lock
->get_state() == LOCK_SYNC_MIX
) { // for the rest states, replicas are already LOCK
5523 send_lock_message(lock
, LOCK_AC_MIX
);
5524 lock
->init_gather();
5528 if (lock
->is_leased()) {
5529 revoke_client_leases(lock
);
5532 if (lock
->get_cap_shift() &&
5534 in
->issued_caps_need_gather(lock
)) {
5541 bool need_recover
= false;
5542 if (in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
5543 mds
->mdcache
->queue_file_recover(in
);
5544 need_recover
= true;
5549 lock
->get_parent()->auth_pin(lock
);
5551 mds
->mdcache
->do_file_recover();
5553 in
->start_scatter(lock
);
5554 lock
->set_state(LOCK_MIX
);
5555 lock
->clear_scatter_wanted();
5556 if (in
->is_replicated()) {
5557 bufferlist softdata
;
5558 lock
->encode_locked_state(softdata
);
5559 send_lock_message(lock
, LOCK_AC_MIX
, softdata
);
5561 if (lock
->get_cap_shift()) {
5572 void Locker::file_excl(ScatterLock
*lock
, bool *need_issue
)
5574 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
5575 dout(7) << "file_excl " << *lock
<< " on " << *lock
->get_parent() << dendl
;
5577 ceph_assert(in
->is_auth());
5578 ceph_assert(lock
->is_stable());
5580 ceph_assert((in
->get_loner() >= 0 && in
->get_mds_caps_wanted().empty()) ||
5581 (lock
->get_state() == LOCK_XSYN
)); // must do xsyn -> excl -> <anything else>
5583 switch (lock
->get_state()) {
5584 case LOCK_SYNC
: lock
->set_state(LOCK_SYNC_EXCL
); break;
5585 case LOCK_MIX
: lock
->set_state(LOCK_MIX_EXCL
); break;
5586 case LOCK_LOCK
: lock
->set_state(LOCK_LOCK_EXCL
); break;
5587 case LOCK_XSYN
: lock
->set_state(LOCK_XSYN_EXCL
); break;
5588 default: ceph_abort();
5592 if (lock
->is_rdlocked())
5594 if (lock
->is_wrlocked())
5596 if (gather
&& lock
->is_cached())
5597 invalidate_lock_caches(lock
);
5599 if (in
->is_replicated() &&
5600 lock
->get_state() != LOCK_LOCK_EXCL
&&
5601 lock
->get_state() != LOCK_XSYN_EXCL
) { // if we were lock, replicas are already lock.
5602 send_lock_message(lock
, LOCK_AC_LOCK
);
5603 lock
->init_gather();
5606 if (lock
->is_leased()) {
5607 revoke_client_leases(lock
);
5610 if (in
->is_head() &&
5611 in
->issued_caps_need_gather(lock
)) {
5618 bool need_recover
= false;
5619 if (in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
5620 mds
->mdcache
->queue_file_recover(in
);
5621 need_recover
= true;
5626 lock
->get_parent()->auth_pin(lock
);
5628 mds
->mdcache
->do_file_recover();
5630 lock
->set_state(LOCK_EXCL
);
5638 void Locker::file_xsyn(SimpleLock
*lock
, bool *need_issue
)
5640 dout(7) << "file_xsyn on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
5641 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
5642 ceph_assert(in
->is_auth());
5643 ceph_assert(in
->get_loner() >= 0 && in
->get_mds_caps_wanted().empty());
5645 switch (lock
->get_state()) {
5646 case LOCK_EXCL
: lock
->set_state(LOCK_EXCL_XSYN
); break;
5647 default: ceph_abort();
5651 if (lock
->is_wrlocked()) {
5652 if (lock
->is_cached())
5653 invalidate_lock_caches(lock
);
5657 if (in
->is_head() &&
5658 in
->issued_caps_need_gather(lock
)) {
5667 lock
->get_parent()->auth_pin(lock
);
5669 lock
->set_state(LOCK_XSYN
);
5670 lock
->finish_waiters(SimpleLock::WAIT_RD
|SimpleLock::WAIT_STABLE
);
5678 void Locker::file_recover(ScatterLock
*lock
)
5680 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
5681 dout(7) << "file_recover " << *lock
<< " on " << *in
<< dendl
;
5683 ceph_assert(in
->is_auth());
5684 //assert(lock->is_stable());
5685 ceph_assert(lock
->get_state() == LOCK_PRE_SCAN
); // only called from MDCache::start_files_to_recover()
5690 if (in->is_replicated()
5691 lock->get_sm()->states[oldstate].replica_state != LOCK_LOCK) {
5692 send_lock_message(lock, LOCK_AC_LOCK);
5693 lock->init_gather();
5697 if (in
->is_head() &&
5698 in
->issued_caps_need_gather(lock
)) {
5703 lock
->set_state(LOCK_SCAN
);
5705 in
->state_set(CInode::STATE_NEEDSRECOVER
);
5707 mds
->mdcache
->queue_file_recover(in
);
5712 void Locker::handle_file_lock(ScatterLock
*lock
, const cref_t
<MLock
> &m
)
5714 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
5715 int from
= m
->get_asker();
5717 if (mds
->is_rejoin()) {
5718 if (in
->is_rejoining()) {
5719 dout(7) << "handle_file_lock still rejoining " << *in
5720 << ", dropping " << *m
<< dendl
;
5725 dout(7) << "handle_file_lock a=" << lock
->get_lock_action_name(m
->get_action())
5727 << " from mds." << from
<< " "
5730 bool caps
= lock
->get_cap_shift();
5732 switch (m
->get_action()) {
5735 ceph_assert(lock
->get_state() == LOCK_LOCK
||
5736 lock
->get_state() == LOCK_MIX
||
5737 lock
->get_state() == LOCK_MIX_SYNC2
);
5739 if (lock
->get_state() == LOCK_MIX
) {
5740 lock
->set_state(LOCK_MIX_SYNC
);
5741 eval_gather(lock
, true);
5742 if (lock
->is_unstable_and_locked()) {
5743 if (lock
->is_cached())
5744 invalidate_lock_caches(lock
);
5745 mds
->mdlog
->flush();
5750 (static_cast<ScatterLock
*>(lock
))->finish_flush();
5751 (static_cast<ScatterLock
*>(lock
))->clear_flushed();
5754 lock
->decode_locked_state(m
->get_data());
5755 lock
->set_state(LOCK_SYNC
);
5760 lock
->finish_waiters(SimpleLock::WAIT_RD
|SimpleLock::WAIT_STABLE
);
5765 switch (lock
->get_state()) {
5766 case LOCK_SYNC
: lock
->set_state(LOCK_SYNC_LOCK
); break;
5767 case LOCK_MIX
: lock
->set_state(LOCK_MIX_LOCK
); break;
5768 default: ceph_abort();
5771 eval_gather(lock
, true);
5772 if (lock
->is_unstable_and_locked()) {
5773 if (lock
->is_cached())
5774 invalidate_lock_caches(lock
);
5775 mds
->mdlog
->flush();
5780 case LOCK_AC_LOCKFLUSHED
:
5781 (static_cast<ScatterLock
*>(lock
))->finish_flush();
5782 (static_cast<ScatterLock
*>(lock
))->clear_flushed();
5783 // wake up scatter_nudge waiters
5784 if (lock
->is_stable())
5785 lock
->finish_waiters(SimpleLock::WAIT_STABLE
);
5789 ceph_assert(lock
->get_state() == LOCK_SYNC
||
5790 lock
->get_state() == LOCK_LOCK
||
5791 lock
->get_state() == LOCK_SYNC_MIX2
);
5793 if (lock
->get_state() == LOCK_SYNC
) {
5795 lock
->set_state(LOCK_SYNC_MIX
);
5796 eval_gather(lock
, true);
5797 if (lock
->is_unstable_and_locked()) {
5798 if (lock
->is_cached())
5799 invalidate_lock_caches(lock
);
5800 mds
->mdlog
->flush();
5806 lock
->set_state(LOCK_MIX
);
5807 lock
->decode_locked_state(m
->get_data());
5812 lock
->finish_waiters(SimpleLock::WAIT_WR
|SimpleLock::WAIT_STABLE
);
5817 case LOCK_AC_LOCKACK
:
5818 ceph_assert(lock
->get_state() == LOCK_SYNC_LOCK
||
5819 lock
->get_state() == LOCK_MIX_LOCK
||
5820 lock
->get_state() == LOCK_MIX_LOCK2
||
5821 lock
->get_state() == LOCK_MIX_EXCL
||
5822 lock
->get_state() == LOCK_SYNC_EXCL
||
5823 lock
->get_state() == LOCK_SYNC_MIX
||
5824 lock
->get_state() == LOCK_MIX_TSYN
);
5825 ceph_assert(lock
->is_gathering(from
));
5826 lock
->remove_gather(from
);
5828 if (lock
->get_state() == LOCK_MIX_LOCK
||
5829 lock
->get_state() == LOCK_MIX_LOCK2
||
5830 lock
->get_state() == LOCK_MIX_EXCL
||
5831 lock
->get_state() == LOCK_MIX_TSYN
) {
5832 lock
->decode_locked_state(m
->get_data());
5833 // replica is waiting for AC_LOCKFLUSHED, eval_gather() should not
5834 // delay calling scatter_writebehind().
5835 lock
->clear_flushed();
5838 if (lock
->is_gathering()) {
5839 dout(7) << "handle_file_lock " << *in
<< " from " << from
5840 << ", still gathering " << lock
->get_gather_set() << dendl
;
5842 dout(7) << "handle_file_lock " << *in
<< " from " << from
5843 << ", last one" << dendl
;
5848 case LOCK_AC_SYNCACK
:
5849 ceph_assert(lock
->get_state() == LOCK_MIX_SYNC
);
5850 ceph_assert(lock
->is_gathering(from
));
5851 lock
->remove_gather(from
);
5853 lock
->decode_locked_state(m
->get_data());
5855 if (lock
->is_gathering()) {
5856 dout(7) << "handle_file_lock " << *in
<< " from " << from
5857 << ", still gathering " << lock
->get_gather_set() << dendl
;
5859 dout(7) << "handle_file_lock " << *in
<< " from " << from
5860 << ", last one" << dendl
;
5865 case LOCK_AC_MIXACK
:
5866 ceph_assert(lock
->get_state() == LOCK_SYNC_MIX
);
5867 ceph_assert(lock
->is_gathering(from
));
5868 lock
->remove_gather(from
);
5870 if (lock
->is_gathering()) {
5871 dout(7) << "handle_file_lock " << *in
<< " from " << from
5872 << ", still gathering " << lock
->get_gather_set() << dendl
;
5874 dout(7) << "handle_file_lock " << *in
<< " from " << from
5875 << ", last one" << dendl
;
5882 case LOCK_AC_REQSCATTER
:
5883 if (lock
->is_stable()) {
5884 /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5885 * because the replica should be holding an auth_pin if they're
5886 * doing this (and thus, we are freezing, not frozen, and indefinite
5887 * starvation isn't an issue).
5889 dout(7) << "handle_file_lock got scatter request on " << *lock
5890 << " on " << *lock
->get_parent() << dendl
;
5891 if (lock
->get_state() != LOCK_MIX
) // i.e., the reqscatter didn't race with an actual mix/scatter
5894 dout(7) << "handle_file_lock got scatter request, !stable, marking scatter_wanted on " << *lock
5895 << " on " << *lock
->get_parent() << dendl
;
5896 lock
->set_scatter_wanted();
5900 case LOCK_AC_REQUNSCATTER
:
5901 if (lock
->is_stable()) {
5902 /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5903 * because the replica should be holding an auth_pin if they're
5904 * doing this (and thus, we are freezing, not frozen, and indefinite
5905 * starvation isn't an issue).
5907 dout(7) << "handle_file_lock got unscatter request on " << *lock
5908 << " on " << *lock
->get_parent() << dendl
;
5909 if (lock
->get_state() == LOCK_MIX
) // i.e., the reqscatter didn't race with an actual mix/scatter
5910 simple_lock(lock
); // FIXME tempsync?
5912 dout(7) << "handle_file_lock ignoring unscatter request on " << *lock
5913 << " on " << *lock
->get_parent() << dendl
;
5914 lock
->set_unscatter_wanted();
5918 case LOCK_AC_REQRDLOCK
:
5919 handle_reqrdlock(lock
, m
);
5923 if (!lock
->get_parent()->is_auth()) {
5924 dout(7) << "handle_file_lock IGNORING nudge on non-auth " << *lock
5925 << " on " << *lock
->get_parent() << dendl
;
5926 } else if (!lock
->get_parent()->is_replicated()) {
5927 dout(7) << "handle_file_lock IGNORING nudge on non-replicated " << *lock
5928 << " on " << *lock
->get_parent() << dendl
;
5930 dout(7) << "handle_file_lock trying nudge on " << *lock
5931 << " on " << *lock
->get_parent() << dendl
;
5932 scatter_nudge(lock
, 0, true);
5933 mds
->mdlog
->flush();